| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078 |
- /*
- * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
- * SPDX-License-Identifier: Apache-2.0
- */
- package badger
- import (
- "bytes"
- "context"
- "encoding/binary"
- stderrors "errors"
- "expvar"
- "fmt"
- "math"
- "os"
- "path/filepath"
- "sort"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- humanize "github.com/dustin/go-humanize"
- "github.com/pkg/errors"
- "github.com/dgraph-io/badger/v4/fb"
- "github.com/dgraph-io/badger/v4/options"
- "github.com/dgraph-io/badger/v4/pb"
- "github.com/dgraph-io/badger/v4/skl"
- "github.com/dgraph-io/badger/v4/table"
- "github.com/dgraph-io/badger/v4/y"
- "github.com/dgraph-io/ristretto/v2"
- "github.com/dgraph-io/ristretto/v2/z"
- )
- var (
- badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
- txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
- bannedNsKey = []byte("!badger!banned") // For storing the banned namespaces.
- )
- type closers struct {
- updateSize *z.Closer
- compactors *z.Closer
- memtable *z.Closer
- writes *z.Closer
- valueGC *z.Closer
- pub *z.Closer
- cacheHealth *z.Closer
- }
- type lockedKeys struct {
- sync.RWMutex
- keys map[uint64]struct{}
- }
- func (lk *lockedKeys) add(key uint64) {
- lk.Lock()
- defer lk.Unlock()
- lk.keys[key] = struct{}{}
- }
- func (lk *lockedKeys) has(key uint64) bool {
- lk.RLock()
- defer lk.RUnlock()
- _, ok := lk.keys[key]
- return ok
- }
- func (lk *lockedKeys) all() []uint64 {
- lk.RLock()
- defer lk.RUnlock()
- keys := make([]uint64, 0, len(lk.keys))
- for key := range lk.keys {
- keys = append(keys, key)
- }
- return keys
- }
- // DB provides the various functions required to interact with Badger.
- // DB is thread-safe.
- type DB struct {
- testOnlyDBExtensions
- lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
- dirLockGuard *directoryLockGuard
- // nil if Dir and ValueDir are the same
- valueDirGuard *directoryLockGuard
- closers closers
- mt *memTable // Our latest (actively written) in-memory table
- imm []*memTable // Add here only AFTER pushing to flushChan.
- // Initialized via openMemTables.
- nextMemFid int
- opt Options
- manifest *manifestFile
- lc *levelsController
- vlog valueLog
- writeCh chan *request
- flushChan chan *memTable // For flushing memtables.
- closeOnce sync.Once // For closing DB only once.
- blockWrites atomic.Int32
- isClosed atomic.Uint32
- orc *oracle
- bannedNamespaces *lockedKeys
- threshold *vlogThreshold
- pub *publisher
- registry *KeyRegistry
- blockCache *ristretto.Cache[[]byte, *table.Block]
- indexCache *ristretto.Cache[uint64, *fb.TableIndex]
- allocPool *z.AllocatorPool
- }
- const (
- kvWriteChCapacity = 1000
- )
- func checkAndSetOptions(opt *Options) error {
- // It's okay to have zero compactors which will disable all compactions but
- // we cannot have just one compactor otherwise we will end up with all data
- // on level 2.
- if opt.NumCompactors == 1 {
- return errors.New("Cannot have 1 compactor. Need at least 2")
- }
- if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
- return errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
- }
- opt.maxBatchSize = (15 * opt.MemTableSize) / 100
- opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
- // This is the maximum value, vlogThreshold can have if dynamic thresholding is enabled.
- opt.maxValueThreshold = math.Min(maxValueThreshold, float64(opt.maxBatchSize))
- if opt.VLogPercentile < 0.0 || opt.VLogPercentile > 1.0 {
- return errors.New("vlogPercentile must be within range of 0.0-1.0")
- }
- // We are limiting opt.ValueThreshold to maxValueThreshold for now.
- if opt.ValueThreshold > maxValueThreshold {
- return errors.Errorf("Invalid ValueThreshold, must be less or equal to %d",
- maxValueThreshold)
- }
- // If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using
- // the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize.
- if opt.ValueThreshold > opt.maxBatchSize {
- return errors.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+
- "reduce opt.ValueThreshold or increase opt.BaseTableSize.",
- opt.ValueThreshold, opt.maxBatchSize)
- }
- // ValueLogFileSize should be strictly LESS than 2<<30 otherwise we will
- // overflow the uint32 when we mmap it in OpenMemtable.
- if !(opt.ValueLogFileSize < 2<<30 && opt.ValueLogFileSize >= 1<<20) {
- return ErrValueLogSize
- }
- if opt.ReadOnly {
- // Do not perform compaction in read only mode.
- opt.CompactL0OnClose = false
- }
- needCache := (opt.Compression != options.None) || (len(opt.EncryptionKey) > 0)
- if needCache && opt.BlockCacheSize == 0 {
- panic("BlockCacheSize should be set since compression/encryption are enabled")
- }
- return nil
- }
- // Open returns a new DB object.
- func Open(opt Options) (*DB, error) {
- if err := checkAndSetOptions(&opt); err != nil {
- return nil, err
- }
- var dirLockGuard, valueDirLockGuard *directoryLockGuard
- // Create directories and acquire lock on it only if badger is not running in InMemory mode.
- // We don't have any directories/files in InMemory mode so we don't need to acquire
- // any locks on them.
- if !opt.InMemory {
- if err := createDirs(opt); err != nil {
- return nil, err
- }
- var err error
- if !opt.BypassLockGuard {
- dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
- if err != nil {
- return nil, err
- }
- defer func() {
- if dirLockGuard != nil {
- _ = dirLockGuard.release()
- }
- }()
- absDir, err := filepath.Abs(opt.Dir)
- if err != nil {
- return nil, err
- }
- absValueDir, err := filepath.Abs(opt.ValueDir)
- if err != nil {
- return nil, err
- }
- if absValueDir != absDir {
- valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
- if err != nil {
- return nil, err
- }
- defer func() {
- if valueDirLockGuard != nil {
- _ = valueDirLockGuard.release()
- }
- }()
- }
- }
- }
- manifestFile, manifest, err := openOrCreateManifestFile(opt)
- if err != nil {
- return nil, err
- }
- defer func() {
- if manifestFile != nil {
- _ = manifestFile.close()
- }
- }()
- db := &DB{
- imm: make([]*memTable, 0, opt.NumMemtables),
- flushChan: make(chan *memTable, opt.NumMemtables),
- writeCh: make(chan *request, kvWriteChCapacity),
- opt: opt,
- manifest: manifestFile,
- dirLockGuard: dirLockGuard,
- valueDirGuard: valueDirLockGuard,
- orc: newOracle(opt),
- pub: newPublisher(),
- allocPool: z.NewAllocatorPool(8),
- bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
- threshold: initVlogThreshold(&opt),
- }
- db.syncChan = opt.syncChan
- // Cleanup all the goroutines started by badger in case of an error.
- defer func() {
- if err != nil {
- opt.Errorf("Received err: %v. Cleaning up...", err)
- db.cleanup()
- db = nil
- }
- }()
- if opt.BlockCacheSize > 0 {
- numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
- if numInCache == 0 {
- // Make the value of this variable at least one since the cache requires
- // the number of counters to be greater than zero.
- numInCache = 1
- }
- config := ristretto.Config[[]byte, *table.Block]{
- NumCounters: numInCache * 8,
- MaxCost: opt.BlockCacheSize,
- BufferItems: 64,
- Metrics: true,
- OnExit: table.BlockEvictHandler,
- }
- db.blockCache, err = ristretto.NewCache[[]byte, *table.Block](&config)
- if err != nil {
- return nil, y.Wrap(err, "failed to create data cache")
- }
- }
- if opt.IndexCacheSize > 0 {
- // Index size is around 5% of the table size.
- indexSz := int64(float64(opt.MemTableSize) * 0.05)
- numInCache := opt.IndexCacheSize / indexSz
- if numInCache == 0 {
- // Make the value of this variable at least one since the cache requires
- // the number of counters to be greater than zero.
- numInCache = 1
- }
- config := ristretto.Config[uint64, *fb.TableIndex]{
- NumCounters: numInCache * 8,
- MaxCost: opt.IndexCacheSize,
- BufferItems: 64,
- Metrics: true,
- }
- db.indexCache, err = ristretto.NewCache(&config)
- if err != nil {
- return nil, y.Wrap(err, "failed to create bf cache")
- }
- }
- db.closers.cacheHealth = z.NewCloser(1)
- go db.monitorCache(db.closers.cacheHealth)
- if db.opt.InMemory {
- db.opt.SyncWrites = false
- // If badger is running in memory mode, push everything into the LSM Tree.
- db.opt.ValueThreshold = math.MaxInt32
- }
- krOpt := KeyRegistryOptions{
- ReadOnly: opt.ReadOnly,
- Dir: opt.Dir,
- EncryptionKey: opt.EncryptionKey,
- EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
- InMemory: opt.InMemory,
- }
- if db.registry, err = OpenKeyRegistry(krOpt); err != nil {
- return db, err
- }
- db.calculateSize()
- db.closers.updateSize = z.NewCloser(1)
- go db.updateSize(db.closers.updateSize)
- if err := db.openMemTables(db.opt); err != nil {
- return nil, y.Wrapf(err, "while opening memtables")
- }
- if !db.opt.ReadOnly {
- if db.mt, err = db.newMemTable(); err != nil {
- return nil, y.Wrapf(err, "cannot create memtable")
- }
- }
- // newLevelsController potentially loads files in directory.
- if db.lc, err = newLevelsController(db, &manifest); err != nil {
- return db, err
- }
- // Initialize vlog struct.
- db.vlog.init(db)
- if !opt.ReadOnly {
- db.closers.compactors = z.NewCloser(1)
- db.lc.startCompact(db.closers.compactors)
- db.closers.memtable = z.NewCloser(1)
- go func() {
- db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
- }()
- // Flush them to disk asap.
- for _, mt := range db.imm {
- db.flushChan <- mt
- }
- }
- // We do increment nextTxnTs below. So, no need to do it here.
- db.orc.nextTxnTs = db.MaxVersion()
- db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
- if err = db.vlog.open(db); err != nil {
- return db, y.Wrapf(err, "During db.vlog.open")
- }
- // Let's advance nextTxnTs to one more than whatever we observed via
- // replaying the logs.
- db.orc.txnMark.Done(db.orc.nextTxnTs)
- // In normal mode, we must update readMark so older versions of keys can be removed during
- // compaction when run in offline mode via the flatten tool.
- db.orc.readMark.Done(db.orc.nextTxnTs)
- db.orc.incrementNextTs()
- go db.threshold.listenForValueThresholdUpdate()
- if err := db.initBannedNamespaces(); err != nil {
- return db, errors.Wrapf(err, "While setting banned keys")
- }
- db.closers.writes = z.NewCloser(1)
- go db.doWrites(db.closers.writes)
- if !db.opt.InMemory {
- db.closers.valueGC = z.NewCloser(1)
- go db.vlog.waitOnGC(db.closers.valueGC)
- }
- db.closers.pub = z.NewCloser(1)
- go db.pub.listenForUpdates(db.closers.pub)
- valueDirLockGuard = nil
- dirLockGuard = nil
- manifestFile = nil
- return db, nil
- }
- // initBannedNamespaces retrieves the banned namespaces from the DB and updates in-memory structure.
- func (db *DB) initBannedNamespaces() error {
- if db.opt.NamespaceOffset < 0 {
- return nil
- }
- return db.View(func(txn *Txn) error {
- iopts := DefaultIteratorOptions
- iopts.Prefix = bannedNsKey
- iopts.PrefetchValues = false
- iopts.InternalAccess = true
- itr := txn.NewIterator(iopts)
- defer itr.Close()
- for itr.Rewind(); itr.Valid(); itr.Next() {
- key := y.BytesToU64(itr.Item().Key()[len(bannedNsKey):])
- db.bannedNamespaces.add(key)
- }
- return nil
- })
- }
- func (db *DB) MaxVersion() uint64 {
- var maxVersion uint64
- update := func(a uint64) {
- if a > maxVersion {
- maxVersion = a
- }
- }
- db.lock.Lock()
- // In read only mode, we do not create new mem table.
- if !db.opt.ReadOnly {
- update(db.mt.maxVersion)
- }
- for _, mt := range db.imm {
- update(mt.maxVersion)
- }
- db.lock.Unlock()
- for _, ti := range db.Tables() {
- update(ti.MaxVersion)
- }
- return maxVersion
- }
- func (db *DB) monitorCache(c *z.Closer) {
- defer c.Done()
- count := 0
- analyze := func(name string, metrics *ristretto.Metrics) {
- // If the mean life expectancy is less than 10 seconds, the cache
- // might be too small.
- le := metrics.LifeExpectancySeconds()
- if le == nil {
- return
- }
- lifeTooShort := le.Count > 0 && float64(le.Sum)/float64(le.Count) < 10
- hitRatioTooLow := metrics.Ratio() > 0 && metrics.Ratio() < 0.4
- if lifeTooShort && hitRatioTooLow {
- db.opt.Warningf("%s might be too small. Metrics: %s\n", name, metrics)
- db.opt.Warningf("Cache life expectancy (in seconds): %+v\n", le)
- } else if le.Count > 1000 && count%5 == 0 {
- db.opt.Infof("%s metrics: %s\n", name, metrics)
- }
- }
- ticker := time.NewTicker(1 * time.Minute)
- defer ticker.Stop()
- for {
- select {
- case <-c.HasBeenClosed():
- return
- case <-ticker.C:
- }
- analyze("Block cache", db.BlockCacheMetrics())
- analyze("Index cache", db.IndexCacheMetrics())
- count++
- }
- }
- // cleanup stops all the goroutines started by badger. This is used in open to
- // cleanup goroutines in case of an error.
- func (db *DB) cleanup() {
- db.stopMemoryFlush()
- db.stopCompactions()
- db.blockCache.Close()
- db.indexCache.Close()
- if db.closers.updateSize != nil {
- db.closers.updateSize.Signal()
- }
- if db.closers.valueGC != nil {
- db.closers.valueGC.Signal()
- }
- if db.closers.writes != nil {
- db.closers.writes.Signal()
- }
- if db.closers.pub != nil {
- db.closers.pub.Signal()
- }
- db.orc.Stop()
- // Do not use vlog.Close() here. vlog.Close truncates the files. We don't
- // want to truncate files unless the user has specified the truncate flag.
- }
- // BlockCacheMetrics returns the metrics for the underlying block cache.
- func (db *DB) BlockCacheMetrics() *ristretto.Metrics {
- if db.blockCache != nil {
- return db.blockCache.Metrics
- }
- return nil
- }
- // IndexCacheMetrics returns the metrics for the underlying index cache.
- func (db *DB) IndexCacheMetrics() *ristretto.Metrics {
- if db.indexCache != nil {
- return db.indexCache.Metrics
- }
- return nil
- }
- // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
- // disk. Calling DB.Close() multiple times would still only close the DB once.
- func (db *DB) Close() error {
- var err error
- db.closeOnce.Do(func() {
- err = db.close()
- })
- return err
- }
- // IsClosed denotes if the badger DB is closed or not. A DB instance should not
- // be used after closing it.
- func (db *DB) IsClosed() bool {
- return db.isClosed.Load() == 1
- }
- func (db *DB) close() (err error) {
- defer db.allocPool.Release()
- db.opt.Debugf("Closing database")
- db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs.Load()))
- db.blockWrites.Store(1)
- db.isClosed.Store(1)
- if !db.opt.InMemory {
- // Stop value GC first.
- db.closers.valueGC.SignalAndWait()
- }
- // Stop writes next.
- db.closers.writes.SignalAndWait()
- // Don't accept any more write.
- close(db.writeCh)
- db.closers.pub.SignalAndWait()
- db.closers.cacheHealth.Signal()
- // Make sure that block writer is done pushing stuff into memtable!
- // Otherwise, you will have a race condition: we are trying to flush memtables
- // and remove them completely, while the block / memtable writer is still
- // trying to push stuff into the memtable. This will also resolve the value
- // offset problem: as we push into memtable, we update value offsets there.
- if db.mt != nil {
- if db.mt.sl.Empty() {
- // Remove the memtable if empty.
- db.mt.DecrRef()
- } else {
- db.opt.Debugf("Flushing memtable")
- for {
- pushedMemTable := func() bool {
- db.lock.Lock()
- defer db.lock.Unlock()
- y.AssertTrue(db.mt != nil)
- select {
- case db.flushChan <- db.mt:
- db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
- db.mt = nil // Will segfault if we try writing!
- db.opt.Debugf("pushed to flush chan\n")
- return true
- default:
- // If we fail to push, we need to unlock and wait for a short while.
- // The flushing operation needs to update s.imm. Otherwise, we have a
- // deadlock.
- // TODO: Think about how to do this more cleanly, maybe without any locks.
- }
- return false
- }()
- if pushedMemTable {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- }
- }
- db.stopMemoryFlush()
- db.stopCompactions()
- // Force Compact L0
- // We don't need to care about cstatus since no parallel compaction is running.
- if db.opt.CompactL0OnClose {
- err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
- switch err {
- case errFillTables:
- // This error only means that there might be enough tables to do a compaction. So, we
- // should not report it to the end user to avoid confusing them.
- case nil:
- db.opt.Debugf("Force compaction on level 0 done")
- default:
- db.opt.Warningf("While forcing compaction on level 0: %v", err)
- }
- }
- // Now close the value log.
- if vlogErr := db.vlog.Close(); vlogErr != nil {
- err = y.Wrap(vlogErr, "DB.Close")
- }
- db.opt.Infof(db.LevelsToString())
- if lcErr := db.lc.close(); err == nil {
- err = y.Wrap(lcErr, "DB.Close")
- }
- db.opt.Debugf("Waiting for closer")
- db.closers.updateSize.SignalAndWait()
- db.orc.Stop()
- db.blockCache.Close()
- db.indexCache.Close()
- db.threshold.close()
- if db.opt.InMemory {
- return
- }
- if db.dirLockGuard != nil {
- if guardErr := db.dirLockGuard.release(); err == nil {
- err = y.Wrap(guardErr, "DB.Close")
- }
- }
- if db.valueDirGuard != nil {
- if guardErr := db.valueDirGuard.release(); err == nil {
- err = y.Wrap(guardErr, "DB.Close")
- }
- }
- if manifestErr := db.manifest.close(); err == nil {
- err = y.Wrap(manifestErr, "DB.Close")
- }
- if registryErr := db.registry.Close(); err == nil {
- err = y.Wrap(registryErr, "DB.Close")
- }
- // Fsync directories to ensure that lock file, and any other removed files whose directory
- // we haven't specifically fsynced, are guaranteed to have their directory entry removal
- // persisted to disk.
- if syncErr := db.syncDir(db.opt.Dir); err == nil {
- err = y.Wrap(syncErr, "DB.Close")
- }
- if syncErr := db.syncDir(db.opt.ValueDir); err == nil {
- err = y.Wrap(syncErr, "DB.Close")
- }
- return err
- }
- // VerifyChecksum verifies checksum for all tables on all levels.
- // This method can be used to verify checksum, if opt.ChecksumVerificationMode is NoVerification.
- func (db *DB) VerifyChecksum() error {
- return db.lc.verifyChecksum()
- }
- const (
- lockFile = "LOCK"
- )
- // Sync syncs database content to disk. This function provides
- // more control to user to sync data whenever required.
- func (db *DB) Sync() error {
- /**
- Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
- Cases:
- - All_ok :: If both the logs sync successfully.
- - Entry_Lost :: If an entry with a value pointer was present in the active memtable's WAL,
- :: and the WAL was synced but there was an error in syncing the vLog.
- :: The entry will be considered lost and this case will need to be handled during recovery.
- - Entries_Lost :: If there were errors in syncing both the logs, multiple entries would be lost.
- - Entries_Lost :: If the active memtable's WAL is not synced but the vLog is synced, it will
- :: result in entries being lost because recovery of the active memtable is done from its WAL.
- :: Check `UpdateSkipList` in memtable.go.
- - Nothing_lost :: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
- :: but there was an error in syncing the vLog.
- :: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.
- - Partially_lost :: If entries were written partially in either of the logs,
- :: the logs will be truncated during recovery.
- :: As a result of truncation, some entries might be lost.
- :: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
- :: of data and then the machine shuts down or the disk failure happens,
- :: this will result in partial writes. [[This case needs verification]]
- */
- db.lock.RLock()
- memtableSyncError := db.mt.SyncWAL()
- db.lock.RUnlock()
- vLogSyncError := db.vlog.sync()
- return y.CombineErrors(memtableSyncError, vLogSyncError)
- }
- // getMemtables returns the current memtables and get references.
- func (db *DB) getMemTables() ([]*memTable, func()) {
- db.lock.RLock()
- defer db.lock.RUnlock()
- var tables []*memTable
- // Mutable memtable does not exist in read-only mode.
- if !db.opt.ReadOnly {
- // Get mutable memtable.
- tables = append(tables, db.mt)
- db.mt.IncrRef()
- }
- // Get immutable memtables.
- last := len(db.imm) - 1
- for i := range db.imm {
- tables = append(tables, db.imm[last-i])
- db.imm[last-i].IncrRef()
- }
- return tables, func() {
- for _, tbl := range tables {
- tbl.DecrRef()
- }
- }
- }
- // get returns the value in memtable or disk for given key.
- // Note that value will include meta byte.
- //
- // IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
- // maintain this invariant to search for the latest value of a key, or else we need to search in all
- // tables and find the max version among them. To maintain this invariant, we also need to ensure
- // that all versions of a key are always present in the same table from level 1, because compaction
- // can push any table down.
- //
- // Update(23/09/2020) - We have dropped the move key implementation. Earlier we
- // were inserting move keys to fix the invalid value pointers but we no longer
- // do that. For every get("fooX") call where X is the version, we will search
- // for "fooX" in all the levels of the LSM tree. This is expensive but it
- // removes the overhead of handling move keys completely.
- func (db *DB) get(key []byte) (y.ValueStruct, error) {
- if db.IsClosed() {
- return y.ValueStruct{}, ErrDBClosed
- }
- tables, decr := db.getMemTables() // Lock should be released.
- defer decr()
- var maxVs y.ValueStruct
- version := y.ParseTs(key)
- y.NumGetsAdd(db.opt.MetricsEnabled, 1)
- for i := 0; i < len(tables); i++ {
- vs := tables[i].sl.Get(key)
- y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1)
- if vs.Meta == 0 && vs.Value == nil {
- continue
- }
- // Found the required version of the key, return immediately.
- if vs.Version == version {
- y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
- return vs, nil
- }
- if maxVs.Version < vs.Version {
- maxVs = vs
- }
- }
- return db.lc.get(key, maxVs, 0)
- }
- var requestPool = sync.Pool{
- New: func() interface{} {
- return new(request)
- },
- }
- func (db *DB) writeToLSM(b *request) error {
- // We should check the length of b.Prts and b.Entries only when badger is not
- // running in InMemory mode. In InMemory mode, we don't write anything to the
- // value log and that's why the length of b.Ptrs will always be zero.
- if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
- return errors.Errorf("Ptrs and Entries don't match: %+v", b)
- }
- for i, entry := range b.Entries {
- var err error
- if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
- // Will include deletion / tombstone case.
- err = db.mt.Put(entry.Key,
- y.ValueStruct{
- Value: entry.Value,
- // Ensure value pointer flag is removed. Otherwise, the value will fail
- // to be retrieved during iterator prefetch. `bitValuePointer` is only
- // known to be set in write to LSM when the entry is loaded from a backup
- // with lower ValueThreshold and its value was stored in the value log.
- Meta: entry.meta &^ bitValuePointer,
- UserMeta: entry.UserMeta,
- ExpiresAt: entry.ExpiresAt,
- })
- } else {
- // Write pointer to Memtable.
- err = db.mt.Put(entry.Key,
- y.ValueStruct{
- Value: b.Ptrs[i].Encode(),
- Meta: entry.meta | bitValuePointer,
- UserMeta: entry.UserMeta,
- ExpiresAt: entry.ExpiresAt,
- })
- }
- if err != nil {
- return y.Wrapf(err, "while writing to memTable")
- }
- }
- if db.opt.SyncWrites {
- return db.mt.SyncWAL()
- }
- return nil
- }
- // writeRequests is called serially by only one goroutine.
- func (db *DB) writeRequests(reqs []*request) error {
- if len(reqs) == 0 {
- return nil
- }
- done := func(err error) {
- for _, r := range reqs {
- r.Err = err
- r.Wg.Done()
- }
- }
- db.opt.Debugf("writeRequests called. Writing to value log")
- err := db.vlog.write(reqs)
- if err != nil {
- done(err)
- return err
- }
- db.opt.Debugf("Writing to memtable")
- var count int
- for _, b := range reqs {
- if len(b.Entries) == 0 {
- continue
- }
- count += len(b.Entries)
- var i uint64
- var err error
- for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
- i++
- if i%100 == 0 {
- db.opt.Debugf("Making room for writes")
- }
- // We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
- // When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
- // you will get a deadlock.
- time.Sleep(10 * time.Millisecond)
- }
- if err != nil {
- done(err)
- return y.Wrap(err, "writeRequests")
- }
- if err := db.writeToLSM(b); err != nil {
- done(err)
- return y.Wrap(err, "writeRequests")
- }
- }
- db.opt.Debugf("Sending updates to subscribers")
- db.pub.sendUpdates(reqs)
- done(nil)
- db.opt.Debugf("%d entries written", count)
- return nil
- }
- func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
- if db.blockWrites.Load() == 1 {
- return nil, ErrBlockedWrites
- }
- var count, size int64
- for _, e := range entries {
- size += e.estimateSizeAndSetThreshold(db.valueThreshold())
- count++
- }
- y.NumBytesWrittenUserAdd(db.opt.MetricsEnabled, size)
- if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
- return nil, ErrTxnTooBig
- }
- // We can only service one request because we need each txn to be stored in a contiguous section.
- // Txns should not interleave among other txns or rewrites.
- req := requestPool.Get().(*request)
- req.reset()
- req.Entries = entries
- req.Wg.Add(1)
- req.IncrRef() // for db write
- db.writeCh <- req // Handled in doWrites.
- y.NumPutsAdd(db.opt.MetricsEnabled, int64(len(entries)))
- return req, nil
- }
- func (db *DB) doWrites(lc *z.Closer) {
- defer lc.Done()
- pendingCh := make(chan struct{}, 1)
- writeRequests := func(reqs []*request) {
- if err := db.writeRequests(reqs); err != nil {
- db.opt.Errorf("writeRequests: %v", err)
- }
- <-pendingCh
- }
- // This variable tracks the number of pending writes.
- reqLen := new(expvar.Int)
- y.PendingWritesSet(db.opt.MetricsEnabled, db.opt.Dir, reqLen)
- reqs := make([]*request, 0, 10)
- for {
- var r *request
- select {
- case r = <-db.writeCh:
- case <-lc.HasBeenClosed():
- goto closedCase
- }
- for {
- reqs = append(reqs, r)
- reqLen.Set(int64(len(reqs)))
- if len(reqs) >= 3*kvWriteChCapacity {
- pendingCh <- struct{}{} // blocking.
- goto writeCase
- }
- select {
- // Either push to pending, or continue to pick from writeCh.
- case r = <-db.writeCh:
- case pendingCh <- struct{}{}:
- goto writeCase
- case <-lc.HasBeenClosed():
- goto closedCase
- }
- }
- closedCase:
- // All the pending request are drained.
- // Don't close the writeCh, because it has be used in several places.
- for {
- select {
- case r = <-db.writeCh:
- reqs = append(reqs, r)
- default:
- pendingCh <- struct{}{} // Push to pending before doing a write.
- writeRequests(reqs)
- return
- }
- }
- writeCase:
- go writeRequests(reqs)
- reqs = make([]*request, 0, 10)
- reqLen.Set(0)
- }
- }
- // batchSet applies a list of badger.Entry. If a request level error occurs it
- // will be returned.
- //
- // Check(kv.BatchSet(entries))
- func (db *DB) batchSet(entries []*Entry) error {
- req, err := db.sendToWriteCh(entries)
- if err != nil {
- return err
- }
- return req.Wait()
- }
- // batchSetAsync is the asynchronous version of batchSet. It accepts a callback
- // function which is called when all the sets are complete. If a request level
- // error occurs, it will be passed back via the callback.
- //
- // err := kv.BatchSetAsync(entries, func(err error)) {
- // Check(err)
- // }
- func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
- req, err := db.sendToWriteCh(entries)
- if err != nil {
- return err
- }
- go func() {
- err := req.Wait()
- // Write is complete. Let's call the callback function now.
- f(err)
- }()
- return nil
- }
- var errNoRoom = stderrors.New("No room for write")
- // ensureRoomForWrite is always called serially.
- func (db *DB) ensureRoomForWrite() error {
- var err error
- db.lock.Lock()
- defer db.lock.Unlock()
- y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
- if !db.mt.isFull() {
- return nil
- }
- select {
- case db.flushChan <- db.mt:
- db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
- db.mt.sl.MemSize(), len(db.flushChan))
- // We manage to push this task. Let's modify imm.
- db.imm = append(db.imm, db.mt)
- db.mt, err = db.newMemTable()
- if err != nil {
- return y.Wrapf(err, "cannot create new mem table")
- }
- // New memtable is empty. We certainly have room.
- return nil
- default:
- // We need to do this to unlock and allow the flusher to modify imm.
- return errNoRoom
- }
- }
- func arenaSize(opt Options) int64 {
- return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
- }
- // buildL0Table builds a new table from the memtable.
- func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
- defer iter.Close()
- b := table.NewTableBuilder(bopts)
- for iter.Rewind(); iter.Valid(); iter.Next() {
- if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
- continue
- }
- vs := iter.Value()
- var vp valuePointer
- if vs.Meta&bitValuePointer > 0 {
- vp.Decode(vs.Value)
- }
- b.Add(iter.Key(), iter.Value(), vp.Len)
- }
- return b
- }
- // handleMemTableFlush must be run serially.
- func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
- bopts := buildTableOptions(db)
- itr := mt.sl.NewUniIterator(false)
- builder := buildL0Table(itr, nil, bopts)
- defer builder.Close()
- // buildL0Table can return nil if the none of the items in the skiplist are
- // added to the builder. This can happen when drop prefix is set and all
- // the items are skipped.
- if builder.Empty() {
- builder.Finish()
- return nil
- }
- fileID := db.lc.reserveFileID()
- var tbl *table.Table
- var err error
- if db.opt.InMemory {
- data := builder.Finish()
- tbl, err = table.OpenInMemoryTable(data, fileID, &bopts)
- } else {
- tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), builder)
- }
- if err != nil {
- return y.Wrap(err, "error while creating table")
- }
- // We own a ref on tbl.
- err = db.lc.addLevel0Table(tbl) // This will incrRef
- _ = tbl.DecrRef() // Releases our ref.
- return err
- }
- // flushMemtable must keep running until we send it an empty memtable. If there
- // are errors during handling the memtable flush, we'll retry indefinitely.
- func (db *DB) flushMemtable(lc *z.Closer) {
- defer lc.Done()
- for mt := range db.flushChan {
- if mt == nil {
- continue
- }
- for {
- if err := db.handleMemTableFlush(mt, nil); err != nil {
- // Encountered error. Retry indefinitely.
- db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
- time.Sleep(time.Second)
- continue
- }
- // Update s.imm. Need a lock.
- db.lock.Lock()
- // This is a single-threaded operation. mt corresponds to the head of
- // db.imm list. Once we flush it, we advance db.imm. The next mt
- // which would arrive here would match db.imm[0], because we acquire a
- // lock over DB when pushing to flushChan.
- // TODO: This logic is dirty AF. Any change and this could easily break.
- y.AssertTrue(mt == db.imm[0])
- db.imm = db.imm[1:]
- mt.DecrRef() // Return memory.
- // unlock
- db.lock.Unlock()
- break
- }
- }
- }
- func exists(path string) (bool, error) {
- _, err := os.Stat(path)
- if err == nil {
- return true, nil
- }
- if os.IsNotExist(err) {
- return false, nil
- }
- return true, err
- }
- // This function does a filewalk, calculates the size of vlog and sst files and stores it in
- // y.LSMSize and y.VlogSize.
- func (db *DB) calculateSize() {
- if db.opt.InMemory {
- return
- }
- newInt := func(val int64) *expvar.Int {
- v := new(expvar.Int)
- v.Add(val)
- return v
- }
- totalSize := func(dir string) (int64, int64) {
- var lsmSize, vlogSize int64
- err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- ext := filepath.Ext(path)
- switch ext {
- case ".sst":
- lsmSize += info.Size()
- case ".vlog":
- vlogSize += info.Size()
- }
- return nil
- })
- if err != nil {
- db.opt.Debugf("Got error while calculating total size of directory: %s", dir)
- }
- return lsmSize, vlogSize
- }
- lsmSize, vlogSize := totalSize(db.opt.Dir)
- y.LSMSizeSet(db.opt.MetricsEnabled, db.opt.Dir, newInt(lsmSize))
- // If valueDir is different from dir, we'd have to do another walk.
- if db.opt.ValueDir != db.opt.Dir {
- _, vlogSize = totalSize(db.opt.ValueDir)
- }
- y.VlogSizeSet(db.opt.MetricsEnabled, db.opt.ValueDir, newInt(vlogSize))
- }
- func (db *DB) updateSize(lc *z.Closer) {
- defer lc.Done()
- if db.opt.InMemory {
- return
- }
- metricsTicker := time.NewTicker(time.Minute)
- defer metricsTicker.Stop()
- for {
- select {
- case <-metricsTicker.C:
- db.calculateSize()
- case <-lc.HasBeenClosed():
- return
- }
- }
- }
- // RunValueLogGC triggers a value log garbage collection.
- //
- // It picks value log files to perform GC based on statistics that are collected
- // during compactions. If no such statistics are available, then log files are
- // picked in random order. The process stops as soon as the first log file is
- // encountered which does not result in garbage collection.
- //
- // When a log file is picked, it is first sampled. If the sample shows that we
- // can discard at least discardRatio space of that file, it would be rewritten.
- //
- // If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
- // thrown indicating that the call resulted in no file rewrites.
- //
- // We recommend setting discardRatio to 0.5, thus indicating that a file be
- // rewritten if half the space can be discarded. This results in a lifetime
- // value log write amplification of 2 (1 from original write + 0.5 rewrite +
- // 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
- // space reclaims, while setting it to a lower value would result in more space
- // reclaims at the cost of increased activity on the LSM tree. discardRatio
- // must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
- // ErrInvalidRequest is returned.
- //
- // Only one GC is allowed at a time. If another value log GC is running, or DB
- // has been closed, this would return an ErrRejected.
- //
- // Note: Every time GC is run, it would produce a spike of activity on the LSM
- // tree.
- func (db *DB) RunValueLogGC(discardRatio float64) error {
- if db.opt.InMemory {
- return ErrGCInMemoryMode
- }
- if discardRatio >= 1.0 || discardRatio <= 0.0 {
- return ErrInvalidRequest
- }
- // Pick a log file and run GC
- return db.vlog.runGC(discardRatio)
- }
- // Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
- // call RunValueLogGC.
- func (db *DB) Size() (lsm, vlog int64) {
- if y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir) == nil {
- lsm, vlog = 0, 0
- return
- }
- lsm = y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir).(*expvar.Int).Value()
- vlog = y.VlogSizeGet(db.opt.MetricsEnabled, db.opt.ValueDir).(*expvar.Int).Value()
- return
- }
- // Sequence represents a Badger sequence.
- type Sequence struct {
- lock sync.Mutex
- db *DB
- key []byte
- next uint64
- leased uint64
- bandwidth uint64
- }
- // Next would return the next integer in the sequence, updating the lease by running a transaction
- // if needed.
- func (seq *Sequence) Next() (uint64, error) {
- seq.lock.Lock()
- defer seq.lock.Unlock()
- if seq.next >= seq.leased {
- if err := seq.updateLease(); err != nil {
- return 0, err
- }
- }
- val := seq.next
- seq.next++
- return val, nil
- }
- // Release the leased sequence to avoid wasted integers. This should be done right
- // before closing the associated DB. However it is valid to use the sequence after
- // it was released, causing a new lease with full bandwidth.
- func (seq *Sequence) Release() error {
- seq.lock.Lock()
- defer seq.lock.Unlock()
- err := seq.db.Update(func(txn *Txn) error {
- item, err := txn.Get(seq.key)
- if err != nil {
- return err
- }
- var num uint64
- if err := item.Value(func(v []byte) error {
- num = binary.BigEndian.Uint64(v)
- return nil
- }); err != nil {
- return err
- }
- if num == seq.leased {
- var buf [8]byte
- binary.BigEndian.PutUint64(buf[:], seq.next)
- return txn.SetEntry(NewEntry(seq.key, buf[:]))
- }
- return nil
- })
- if err != nil {
- return err
- }
- seq.leased = seq.next
- return nil
- }
- func (seq *Sequence) updateLease() error {
- return seq.db.Update(func(txn *Txn) error {
- item, err := txn.Get(seq.key)
- switch {
- case err == ErrKeyNotFound:
- seq.next = 0
- case err != nil:
- return err
- default:
- var num uint64
- if err := item.Value(func(v []byte) error {
- num = binary.BigEndian.Uint64(v)
- return nil
- }); err != nil {
- return err
- }
- seq.next = num
- }
- lease := seq.next + seq.bandwidth
- var buf [8]byte
- binary.BigEndian.PutUint64(buf[:], lease)
- if err = txn.SetEntry(NewEntry(seq.key, buf[:])); err != nil {
- return err
- }
- seq.leased = lease
- return nil
- })
- }
- // GetSequence would initiate a new sequence object, generating it from the stored lease, if
- // available, in the database. Sequence can be used to get a list of monotonically increasing
- // integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
- // size of the lease, determining how many Next() requests can be served from memory.
- //
- // GetSequence is not supported on ManagedDB. Calling this would result in a panic.
- func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
- if db.opt.managedTxns {
- panic("Cannot use GetSequence with managedDB=true.")
- }
- switch {
- case len(key) == 0:
- return nil, ErrEmptyKey
- case bandwidth == 0:
- return nil, ErrZeroBandwidth
- }
- seq := &Sequence{
- db: db,
- key: key,
- next: 0,
- leased: 0,
- bandwidth: bandwidth,
- }
- err := seq.updateLease()
- return seq, err
- }
- // Tables gets the TableInfo objects from the level controller. If withKeysCount
- // is true, TableInfo objects also contain counts of keys for the tables.
- func (db *DB) Tables() []TableInfo {
- return db.lc.getTableInfo()
- }
- // Levels gets the LevelInfo.
- func (db *DB) Levels() []LevelInfo {
- return db.lc.getLevelInfo()
- }
- // EstimateSize can be used to get rough estimate of data size for a given prefix.
- func (db *DB) EstimateSize(prefix []byte) (uint64, uint64) {
- var onDiskSize, uncompressedSize uint64
- tables := db.Tables()
- for _, ti := range tables {
- if bytes.HasPrefix(ti.Left, prefix) && bytes.HasPrefix(ti.Right, prefix) {
- onDiskSize += uint64(ti.OnDiskSize)
- uncompressedSize += uint64(ti.UncompressedSize)
- }
- }
- return onDiskSize, uncompressedSize
- }
- // Ranges can be used to get rough key ranges to divide up iteration over the DB. The ranges here
- // would consider the prefix, but would not necessarily start or end with the prefix. In fact, the
- // first range would have nil as left key, and the last range would have nil as the right key.
- func (db *DB) Ranges(prefix []byte, numRanges int) []*keyRange {
- var splits []string
- tables := db.Tables()
- // We just want table ranges here and not keys count.
- for _, ti := range tables {
- // We don't use ti.Left, because that has a tendency to store !badger keys. Skip over tables
- // at upper levels. Only choose tables from the last level.
- if ti.Level != db.opt.MaxLevels-1 {
- continue
- }
- if bytes.HasPrefix(ti.Right, prefix) {
- splits = append(splits, string(ti.Right))
- }
- }
- // If the number of splits is low, look at the offsets inside the
- // tables to generate more splits.
- if len(splits) < 32 {
- numTables := len(tables)
- if numTables == 0 {
- numTables = 1
- }
- numPerTable := 32 / numTables
- if numPerTable == 0 {
- numPerTable = 1
- }
- splits = db.lc.keySplits(numPerTable, prefix)
- }
- // If the number of splits is still < 32, then look at the memtables.
- if len(splits) < 32 {
- maxPerSplit := 10000
- mtSplits := func(mt *memTable) {
- if mt == nil {
- return
- }
- count := 0
- iter := mt.sl.NewIterator()
- for iter.SeekToFirst(); iter.Valid(); iter.Next() {
- if count%maxPerSplit == 0 {
- // Add a split every maxPerSplit keys.
- if bytes.HasPrefix(iter.Key(), prefix) {
- splits = append(splits, string(iter.Key()))
- }
- }
- count += 1
- }
- _ = iter.Close()
- }
- db.lock.Lock()
- defer db.lock.Unlock()
- var memTables []*memTable
- memTables = append(memTables, db.imm...)
- for _, mt := range memTables {
- mtSplits(mt)
- }
- mtSplits(db.mt)
- }
- // We have our splits now. Let's convert them to ranges.
- sort.Strings(splits)
- var ranges []*keyRange
- var start []byte
- for _, key := range splits {
- ranges = append(ranges, &keyRange{left: start, right: y.SafeCopy(nil, []byte(key))})
- start = y.SafeCopy(nil, []byte(key))
- }
- ranges = append(ranges, &keyRange{left: start})
- // Figure out the approximate table size this range has to deal with.
- for _, t := range tables {
- tr := keyRange{left: t.Left, right: t.Right}
- for _, r := range ranges {
- if len(r.left) == 0 || len(r.right) == 0 {
- continue
- }
- if r.overlapsWith(tr) {
- r.size += int64(t.UncompressedSize)
- }
- }
- }
- var total int64
- for _, r := range ranges {
- total += r.size
- }
- if total == 0 {
- return ranges
- }
- // Figure out the average size, so we know how to bin the ranges together.
- avg := total / int64(numRanges)
- var out []*keyRange
- var i int
- for i < len(ranges) {
- r := ranges[i]
- cur := &keyRange{left: r.left, size: r.size, right: r.right}
- i++
- for ; i < len(ranges); i++ {
- next := ranges[i]
- if cur.size+next.size > avg {
- break
- }
- cur.right = next.right
- cur.size += next.size
- }
- out = append(out, cur)
- }
- return out
- }
- // MaxBatchCount returns max possible entries in batch
- func (db *DB) MaxBatchCount() int64 {
- return db.opt.maxBatchCount
- }
- // MaxBatchSize returns max possible batch size
- func (db *DB) MaxBatchSize() int64 {
- return db.opt.maxBatchSize
- }
- func (db *DB) stopMemoryFlush() {
- // Stop memtable flushes.
- if db.closers.memtable != nil {
- close(db.flushChan)
- db.closers.memtable.SignalAndWait()
- }
- }
- func (db *DB) stopCompactions() {
- // Stop compactions.
- if db.closers.compactors != nil {
- db.closers.compactors.SignalAndWait()
- }
- }
- func (db *DB) startCompactions() {
- // Resume compactions.
- if db.closers.compactors != nil {
- db.closers.compactors = z.NewCloser(1)
- db.lc.startCompact(db.closers.compactors)
- }
- }
- func (db *DB) startMemoryFlush() {
- // Start memory fluhser.
- if db.closers.memtable != nil {
- db.flushChan = make(chan *memTable, db.opt.NumMemtables)
- db.closers.memtable = z.NewCloser(1)
- go func() {
- db.flushMemtable(db.closers.memtable)
- }()
- }
- }
- // Flatten can be used to force compactions on the LSM tree so all the tables fall on the same
- // level. This ensures that all the versions of keys are colocated and not split across multiple
- // levels, which is necessary after a restore from backup. During Flatten, live compactions are
- // stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition
- // between flattening the tree and new tables being created at level zero.
- func (db *DB) Flatten(workers int) error {
- db.stopCompactions()
- defer db.startCompactions()
- compactAway := func(cp compactionPriority) error {
- db.opt.Infof("Attempting to compact with %+v\n", cp)
- errCh := make(chan error, 1)
- for i := 0; i < workers; i++ {
- go func() {
- errCh <- db.lc.doCompact(175, cp)
- }()
- }
- var success int
- var rerr error
- for i := 0; i < workers; i++ {
- err := <-errCh
- if err != nil {
- rerr = err
- db.opt.Warningf("While running doCompact with %+v. Error: %v\n", cp, err)
- } else {
- success++
- }
- }
- if success == 0 {
- return rerr
- }
- // We could do at least one successful compaction. So, we'll consider this a success.
- db.opt.Infof("%d compactor(s) succeeded. One or more tables from level %d compacted.\n",
- success, cp.level)
- return nil
- }
- hbytes := func(sz int64) string {
- return humanize.IBytes(uint64(sz))
- }
- t := db.lc.levelTargets()
- for {
- db.opt.Infof("\n")
- var levels []int
- for i, l := range db.lc.levels {
- sz := l.getTotalSize()
- db.opt.Infof("Level: %d. %8s Size. %8s Max.\n",
- i, hbytes(l.getTotalSize()), hbytes(t.targetSz[i]))
- if sz > 0 {
- levels = append(levels, i)
- }
- }
- if len(levels) <= 1 {
- prios := db.lc.pickCompactLevels(nil)
- if len(prios) == 0 || prios[0].score <= 1.0 {
- db.opt.Infof("All tables consolidated into one level. Flattening done.\n")
- return nil
- }
- if err := compactAway(prios[0]); err != nil {
- return err
- }
- continue
- }
- // Create an artificial compaction priority, to ensure that we compact the level.
- cp := compactionPriority{level: levels[0], score: 1.71}
- if err := compactAway(cp); err != nil {
- return err
- }
- }
- }
- func (db *DB) blockWrite() error {
- // Stop accepting new writes.
- if !db.blockWrites.CompareAndSwap(0, 1) {
- return ErrBlockedWrites
- }
- // Make all pending writes finish. The following will also close writeCh.
- db.closers.writes.SignalAndWait()
- db.opt.Infof("Writes flushed. Stopping compactions now...")
- return nil
- }
- func (db *DB) unblockWrite() {
- db.closers.writes = z.NewCloser(1)
- go db.doWrites(db.closers.writes)
- // Resume writes.
- db.blockWrites.Store(0)
- }
- func (db *DB) prepareToDrop() (func(), error) {
- if db.opt.ReadOnly {
- panic("Attempting to drop data in read-only mode.")
- }
- // In order prepare for drop, we need to block the incoming writes and
- // write it to db. Then, flush all the pending memtable. So that, we
- // don't miss any entries.
- if err := db.blockWrite(); err != nil {
- return func() {}, err
- }
- reqs := make([]*request, 0, 10)
- for {
- select {
- case r := <-db.writeCh:
- reqs = append(reqs, r)
- default:
- if err := db.writeRequests(reqs); err != nil {
- db.opt.Errorf("writeRequests: %v", err)
- }
- db.stopMemoryFlush()
- return func() {
- db.opt.Infof("Resuming writes")
- db.startMemoryFlush()
- db.unblockWrite()
- }, nil
- }
- }
- }
- // DropAll would drop all the data stored in Badger. It does this in the following way.
- // - Stop accepting new writes.
- // - Pause memtable flushes and compactions.
- // - Pick all tables from all levels, create a changeset to delete all these
- // tables and apply it to manifest.
- // - Pick all log files from value log, and delete all of them. Restart value log files from zero.
- // - Resume memtable flushes and compactions.
- //
- // NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do
- // any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and
- // writes are paused before running DropAll, and resumed after it is finished.
- func (db *DB) DropAll() error {
- f, err := db.dropAll()
- if f != nil {
- f()
- }
- return err
- }
- func (db *DB) dropAll() (func(), error) {
- db.opt.Infof("DropAll called. Blocking writes...")
- f, err := db.prepareToDrop()
- if err != nil {
- return f, err
- }
- // prepareToDrop will stop all the incoming write and flushes any pending memtables.
- // Before we drop, we'll stop the compaction because anyways all the datas are going to
- // be deleted.
- db.stopCompactions()
- resume := func() {
- db.startCompactions()
- f()
- }
- // Block all foreign interactions with memory tables.
- db.lock.Lock()
- defer db.lock.Unlock()
- // Remove inmemory tables. Calling DecrRef for safety. Not sure if they're absolutely needed.
- db.mt.DecrRef()
- for _, mt := range db.imm {
- mt.DecrRef()
- }
- db.imm = db.imm[:0]
- db.mt, err = db.newMemTable() // Set it up for future writes.
- if err != nil {
- return resume, y.Wrapf(err, "cannot open new memtable")
- }
- num, err := db.lc.dropTree()
- if err != nil {
- return resume, err
- }
- db.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", num)
- num, err = db.vlog.dropAll()
- if err != nil {
- return resume, err
- }
- db.lc.nextFileID.Store(1)
- db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
- db.blockCache.Clear()
- db.indexCache.Clear()
- db.threshold.Clear(db.opt)
- return resume, nil
- }
- // DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
- // - Stop accepting new writes.
- // - Stop memtable flushes before acquiring lock. Because we're acquiring lock here
- // and memtable flush stalls for lock, which leads to deadlock
- // - Flush out all memtables, skipping over keys with the given prefix, Kp.
- // - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
- // back after a restart.
- // - Stop compaction.
- // - Compact L0->L1, skipping over Kp.
- // - Compact rest of the levels, Li->Li, picking tables which have Kp.
- // - Resume memtable flushes, compactions and writes.
- func (db *DB) DropPrefix(prefixes ...[]byte) error {
- if len(prefixes) == 0 {
- return nil
- }
- db.opt.Infof("DropPrefix called for %s", prefixes)
- f, err := db.prepareToDrop()
- if err != nil {
- return err
- }
- defer f()
- var filtered [][]byte
- if filtered, err = db.filterPrefixesToDrop(prefixes); err != nil {
- return err
- }
- // If there is no prefix for which the data already exist, do not do anything.
- if len(filtered) == 0 {
- db.opt.Infof("No prefixes to drop")
- return nil
- }
- // Block all foreign interactions with memory tables.
- db.lock.Lock()
- defer db.lock.Unlock()
- db.imm = append(db.imm, db.mt)
- for _, memtable := range db.imm {
- if memtable.sl.Empty() {
- memtable.DecrRef()
- continue
- }
- db.opt.Debugf("Flushing memtable")
- if err := db.handleMemTableFlush(memtable, filtered); err != nil {
- db.opt.Errorf("While trying to flush memtable: %v", err)
- return err
- }
- memtable.DecrRef()
- }
- db.stopCompactions()
- defer db.startCompactions()
- db.imm = db.imm[:0]
- db.mt, err = db.newMemTable()
- if err != nil {
- return y.Wrapf(err, "cannot create new mem table")
- }
- // Drop prefixes from the levels.
- if err := db.lc.dropPrefixes(filtered); err != nil {
- return err
- }
- db.opt.Infof("DropPrefix done")
- return nil
- }
- func (db *DB) filterPrefixesToDrop(prefixes [][]byte) ([][]byte, error) {
- var filtered [][]byte
- for _, prefix := range prefixes {
- err := db.View(func(txn *Txn) error {
- iopts := DefaultIteratorOptions
- iopts.Prefix = prefix
- iopts.PrefetchValues = false
- itr := txn.NewIterator(iopts)
- defer itr.Close()
- itr.Rewind()
- if itr.ValidForPrefix(prefix) {
- filtered = append(filtered, prefix)
- }
- return nil
- })
- if err != nil {
- return filtered, err
- }
- }
- return filtered, nil
- }
- // Checks if the key is banned. Returns the respective error if the key belongs to any of the banned
- // namepspaces. Else it returns nil.
- func (db *DB) isBanned(key []byte) error {
- if db.opt.NamespaceOffset < 0 {
- return nil
- }
- if len(key) <= db.opt.NamespaceOffset+8 {
- return nil
- }
- if db.bannedNamespaces.has(y.BytesToU64(key[db.opt.NamespaceOffset:])) {
- return ErrBannedKey
- }
- return nil
- }
- // BanNamespace bans a namespace. Read/write to keys belonging to any of such namespace is denied.
- func (db *DB) BanNamespace(ns uint64) error {
- if db.opt.NamespaceOffset < 0 {
- return ErrNamespaceMode
- }
- db.opt.Infof("Banning namespace: %d", ns)
- // First set the banned namespaces in DB and then update the in-memory structure.
- key := y.KeyWithTs(append(bannedNsKey, y.U64ToBytes(ns)...), 1)
- entry := []*Entry{{
- Key: key,
- Value: nil,
- }}
- req, err := db.sendToWriteCh(entry)
- if err != nil {
- return err
- }
- if err := req.Wait(); err != nil {
- return err
- }
- db.bannedNamespaces.add(ns)
- return nil
- }
- // BannedNamespaces returns the list of prefixes banned for DB.
- func (db *DB) BannedNamespaces() []uint64 {
- return db.bannedNamespaces.all()
- }
- // KVList contains a list of key-value pairs.
- type KVList = pb.KVList
- // Subscribe can be used to watch key changes for the given key prefixes and the ignore string.
- // At least one prefix should be passed, or an error will be returned.
- // You can use an empty prefix to monitor all changes to the DB.
- // Ignore string is the byte ranges for which prefix matching will be ignored.
- // For example: ignore = "2-3", and prefix = "abc" will match for keys "abxxc", "abdfc" etc.
- // This function blocks until the given context is done or an error occurs.
- // The given function will be called with a new KVList containing the modified keys and the
- // corresponding values.
- func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches []pb.Match) error {
- if cb == nil {
- return ErrNilCallback
- }
- c := z.NewCloser(1)
- s, err := db.pub.newSubscriber(c, matches)
- if err != nil {
- return y.Wrapf(err, "while creating a new subscriber")
- }
- slurp := func(batch *pb.KVList) error {
- for {
- select {
- case kvs := <-s.sendCh:
- batch.Kv = append(batch.Kv, kvs.Kv...)
- default:
- if len(batch.GetKv()) > 0 {
- return cb(batch)
- }
- return nil
- }
- }
- }
- drain := func() {
- for {
- select {
- case _, ok := <-s.sendCh:
- if !ok {
- // Channel is closed.
- return
- }
- default:
- return
- }
- }
- }
- for {
- select {
- case <-c.HasBeenClosed():
- // No need to delete here. Closer will be called only while
- // closing DB. Subscriber will be deleted by cleanSubscribers.
- err := slurp(new(pb.KVList))
- // Drain if any pending updates.
- c.Done()
- return err
- case <-ctx.Done():
- c.Done()
- s.active.Store(0)
- drain()
- db.pub.deleteSubscriber(s.id)
- // Delete the subscriber to avoid further updates.
- return ctx.Err()
- case batch := <-s.sendCh:
- err := slurp(batch)
- if err != nil {
- c.Done()
- s.active.Store(0)
- drain()
- // Delete the subscriber if there is an error by the callback.
- db.pub.deleteSubscriber(s.id)
- return err
- }
- }
- }
- }
- func (db *DB) syncDir(dir string) error {
- if db.opt.InMemory {
- return nil
- }
- return syncDir(dir)
- }
- func createDirs(opt Options) error {
- for _, path := range []string{opt.Dir, opt.ValueDir} {
- dirExists, err := exists(path)
- if err != nil {
- return y.Wrapf(err, "Invalid Dir: %q", path)
- }
- if !dirExists {
- if opt.ReadOnly {
- return errors.Errorf("Cannot find directory %q for read-only open", path)
- }
- // Try to create the directory
- err = os.MkdirAll(path, 0700)
- if err != nil {
- return y.Wrapf(err, "Error Creating Dir: %q", path)
- }
- }
- }
- return nil
- }
- // Stream the contents of this DB to a new DB with options outOptions that will be
- // created in outDir.
- func (db *DB) StreamDB(outOptions Options) error {
- outDir := outOptions.Dir
- // Open output DB.
- outDB, err := OpenManaged(outOptions)
- if err != nil {
- return y.Wrapf(err, "cannot open out DB at %s", outDir)
- }
- defer outDB.Close()
- writer := outDB.NewStreamWriter()
- if err := writer.Prepare(); err != nil {
- return y.Wrapf(err, "cannot create stream writer in out DB at %s", outDir)
- }
- // Stream contents of DB to the output DB.
- stream := db.NewStreamAt(math.MaxUint64)
- stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
- stream.Send = func(buf *z.Buffer) error {
- return writer.Write(buf)
- }
- if err := stream.Orchestrate(context.Background()); err != nil {
- return y.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
- }
- if err := writer.Flush(); err != nil {
- return y.Wrapf(err, "cannot flush writer")
- }
- return nil
- }
- // Opts returns a copy of the DB options.
- func (db *DB) Opts() Options {
- return db.opt
- }
- type CacheType int
- const (
- BlockCache CacheType = iota
- IndexCache
- )
- // CacheMaxCost updates the max cost of the given cache (either block or index cache).
- // The call will have an effect only if the DB was created with the cache. Otherwise it is
- // a no-op. If you pass a negative value, the function will return the current value
- // without updating it.
- func (db *DB) CacheMaxCost(cache CacheType, maxCost int64) (int64, error) {
- if db == nil {
- return 0, nil
- }
- if maxCost < 0 {
- switch cache {
- case BlockCache:
- return db.blockCache.MaxCost(), nil
- case IndexCache:
- return db.indexCache.MaxCost(), nil
- default:
- return 0, errors.Errorf("invalid cache type")
- }
- }
- switch cache {
- case BlockCache:
- db.blockCache.UpdateMaxCost(maxCost)
- return maxCost, nil
- case IndexCache:
- db.indexCache.UpdateMaxCost(maxCost)
- return maxCost, nil
- default:
- return 0, errors.Errorf("invalid cache type")
- }
- }
- func (db *DB) LevelsToString() string {
- levels := db.Levels()
- h := func(sz int64) string {
- return humanize.IBytes(uint64(sz))
- }
- base := func(b bool) string {
- if b {
- return "B"
- }
- return " "
- }
- var b strings.Builder
- b.WriteRune('\n')
- for _, li := range levels {
- b.WriteString(fmt.Sprintf(
- "Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f"+
- " StaleData: %s Target FileSize: %s\n",
- li.Level, base(li.IsBaseLevel), li.NumTables,
- h(li.Size), h(li.TargetSize), li.Score, li.Adjusted, h(li.StaleDatSize),
- h(li.TargetFileSize)))
- }
- b.WriteString("Level Done\n")
- return b.String()
- }
|