db.go 56 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. "encoding/binary"
  10. stderrors "errors"
  11. "expvar"
  12. "fmt"
  13. "math"
  14. "os"
  15. "path/filepath"
  16. "sort"
  17. "strings"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. humanize "github.com/dustin/go-humanize"
  22. "github.com/pkg/errors"
  23. "github.com/dgraph-io/badger/v4/fb"
  24. "github.com/dgraph-io/badger/v4/options"
  25. "github.com/dgraph-io/badger/v4/pb"
  26. "github.com/dgraph-io/badger/v4/skl"
  27. "github.com/dgraph-io/badger/v4/table"
  28. "github.com/dgraph-io/badger/v4/y"
  29. "github.com/dgraph-io/ristretto/v2"
  30. "github.com/dgraph-io/ristretto/v2/z"
  31. )
  32. var (
  33. badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
  34. txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
  35. bannedNsKey = []byte("!badger!banned") // For storing the banned namespaces.
  36. )
  37. type closers struct {
  38. updateSize *z.Closer
  39. compactors *z.Closer
  40. memtable *z.Closer
  41. writes *z.Closer
  42. valueGC *z.Closer
  43. pub *z.Closer
  44. cacheHealth *z.Closer
  45. }
  46. type lockedKeys struct {
  47. sync.RWMutex
  48. keys map[uint64]struct{}
  49. }
  50. func (lk *lockedKeys) add(key uint64) {
  51. lk.Lock()
  52. defer lk.Unlock()
  53. lk.keys[key] = struct{}{}
  54. }
  55. func (lk *lockedKeys) has(key uint64) bool {
  56. lk.RLock()
  57. defer lk.RUnlock()
  58. _, ok := lk.keys[key]
  59. return ok
  60. }
  61. func (lk *lockedKeys) all() []uint64 {
  62. lk.RLock()
  63. defer lk.RUnlock()
  64. keys := make([]uint64, 0, len(lk.keys))
  65. for key := range lk.keys {
  66. keys = append(keys, key)
  67. }
  68. return keys
  69. }
  70. // DB provides the various functions required to interact with Badger.
  71. // DB is thread-safe.
  72. type DB struct {
  73. testOnlyDBExtensions
  74. lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
  75. dirLockGuard *directoryLockGuard
  76. // nil if Dir and ValueDir are the same
  77. valueDirGuard *directoryLockGuard
  78. closers closers
  79. mt *memTable // Our latest (actively written) in-memory table
  80. imm []*memTable // Add here only AFTER pushing to flushChan.
  81. // Initialized via openMemTables.
  82. nextMemFid int
  83. opt Options
  84. manifest *manifestFile
  85. lc *levelsController
  86. vlog valueLog
  87. writeCh chan *request
  88. flushChan chan *memTable // For flushing memtables.
  89. closeOnce sync.Once // For closing DB only once.
  90. blockWrites atomic.Int32
  91. isClosed atomic.Uint32
  92. orc *oracle
  93. bannedNamespaces *lockedKeys
  94. threshold *vlogThreshold
  95. pub *publisher
  96. registry *KeyRegistry
  97. blockCache *ristretto.Cache[[]byte, *table.Block]
  98. indexCache *ristretto.Cache[uint64, *fb.TableIndex]
  99. allocPool *z.AllocatorPool
  100. }
  101. const (
  102. kvWriteChCapacity = 1000
  103. )
  104. func checkAndSetOptions(opt *Options) error {
  105. // It's okay to have zero compactors which will disable all compactions but
  106. // we cannot have just one compactor otherwise we will end up with all data
  107. // on level 2.
  108. if opt.NumCompactors == 1 {
  109. return errors.New("Cannot have 1 compactor. Need at least 2")
  110. }
  111. if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
  112. return errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
  113. }
  114. opt.maxBatchSize = (15 * opt.MemTableSize) / 100
  115. opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
  116. // This is the maximum value, vlogThreshold can have if dynamic thresholding is enabled.
  117. opt.maxValueThreshold = math.Min(maxValueThreshold, float64(opt.maxBatchSize))
  118. if opt.VLogPercentile < 0.0 || opt.VLogPercentile > 1.0 {
  119. return errors.New("vlogPercentile must be within range of 0.0-1.0")
  120. }
  121. // We are limiting opt.ValueThreshold to maxValueThreshold for now.
  122. if opt.ValueThreshold > maxValueThreshold {
  123. return errors.Errorf("Invalid ValueThreshold, must be less or equal to %d",
  124. maxValueThreshold)
  125. }
  126. // If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using
  127. // the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize.
  128. if opt.ValueThreshold > opt.maxBatchSize {
  129. return errors.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+
  130. "reduce opt.ValueThreshold or increase opt.BaseTableSize.",
  131. opt.ValueThreshold, opt.maxBatchSize)
  132. }
  133. // ValueLogFileSize should be strictly LESS than 2<<30 otherwise we will
  134. // overflow the uint32 when we mmap it in OpenMemtable.
  135. if !(opt.ValueLogFileSize < 2<<30 && opt.ValueLogFileSize >= 1<<20) {
  136. return ErrValueLogSize
  137. }
  138. if opt.ReadOnly {
  139. // Do not perform compaction in read only mode.
  140. opt.CompactL0OnClose = false
  141. }
  142. needCache := (opt.Compression != options.None) || (len(opt.EncryptionKey) > 0)
  143. if needCache && opt.BlockCacheSize == 0 {
  144. panic("BlockCacheSize should be set since compression/encryption are enabled")
  145. }
  146. return nil
  147. }
  148. // Open returns a new DB object.
  149. func Open(opt Options) (*DB, error) {
  150. if err := checkAndSetOptions(&opt); err != nil {
  151. return nil, err
  152. }
  153. var dirLockGuard, valueDirLockGuard *directoryLockGuard
  154. // Create directories and acquire lock on it only if badger is not running in InMemory mode.
  155. // We don't have any directories/files in InMemory mode so we don't need to acquire
  156. // any locks on them.
  157. if !opt.InMemory {
  158. if err := createDirs(opt); err != nil {
  159. return nil, err
  160. }
  161. var err error
  162. if !opt.BypassLockGuard {
  163. dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
  164. if err != nil {
  165. return nil, err
  166. }
  167. defer func() {
  168. if dirLockGuard != nil {
  169. _ = dirLockGuard.release()
  170. }
  171. }()
  172. absDir, err := filepath.Abs(opt.Dir)
  173. if err != nil {
  174. return nil, err
  175. }
  176. absValueDir, err := filepath.Abs(opt.ValueDir)
  177. if err != nil {
  178. return nil, err
  179. }
  180. if absValueDir != absDir {
  181. valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
  182. if err != nil {
  183. return nil, err
  184. }
  185. defer func() {
  186. if valueDirLockGuard != nil {
  187. _ = valueDirLockGuard.release()
  188. }
  189. }()
  190. }
  191. }
  192. }
  193. manifestFile, manifest, err := openOrCreateManifestFile(opt)
  194. if err != nil {
  195. return nil, err
  196. }
  197. defer func() {
  198. if manifestFile != nil {
  199. _ = manifestFile.close()
  200. }
  201. }()
  202. db := &DB{
  203. imm: make([]*memTable, 0, opt.NumMemtables),
  204. flushChan: make(chan *memTable, opt.NumMemtables),
  205. writeCh: make(chan *request, kvWriteChCapacity),
  206. opt: opt,
  207. manifest: manifestFile,
  208. dirLockGuard: dirLockGuard,
  209. valueDirGuard: valueDirLockGuard,
  210. orc: newOracle(opt),
  211. pub: newPublisher(),
  212. allocPool: z.NewAllocatorPool(8),
  213. bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
  214. threshold: initVlogThreshold(&opt),
  215. }
  216. db.syncChan = opt.syncChan
  217. // Cleanup all the goroutines started by badger in case of an error.
  218. defer func() {
  219. if err != nil {
  220. opt.Errorf("Received err: %v. Cleaning up...", err)
  221. db.cleanup()
  222. db = nil
  223. }
  224. }()
  225. if opt.BlockCacheSize > 0 {
  226. numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
  227. if numInCache == 0 {
  228. // Make the value of this variable at least one since the cache requires
  229. // the number of counters to be greater than zero.
  230. numInCache = 1
  231. }
  232. config := ristretto.Config[[]byte, *table.Block]{
  233. NumCounters: numInCache * 8,
  234. MaxCost: opt.BlockCacheSize,
  235. BufferItems: 64,
  236. Metrics: true,
  237. OnExit: table.BlockEvictHandler,
  238. }
  239. db.blockCache, err = ristretto.NewCache[[]byte, *table.Block](&config)
  240. if err != nil {
  241. return nil, y.Wrap(err, "failed to create data cache")
  242. }
  243. }
  244. if opt.IndexCacheSize > 0 {
  245. // Index size is around 5% of the table size.
  246. indexSz := int64(float64(opt.MemTableSize) * 0.05)
  247. numInCache := opt.IndexCacheSize / indexSz
  248. if numInCache == 0 {
  249. // Make the value of this variable at least one since the cache requires
  250. // the number of counters to be greater than zero.
  251. numInCache = 1
  252. }
  253. config := ristretto.Config[uint64, *fb.TableIndex]{
  254. NumCounters: numInCache * 8,
  255. MaxCost: opt.IndexCacheSize,
  256. BufferItems: 64,
  257. Metrics: true,
  258. }
  259. db.indexCache, err = ristretto.NewCache(&config)
  260. if err != nil {
  261. return nil, y.Wrap(err, "failed to create bf cache")
  262. }
  263. }
  264. db.closers.cacheHealth = z.NewCloser(1)
  265. go db.monitorCache(db.closers.cacheHealth)
  266. if db.opt.InMemory {
  267. db.opt.SyncWrites = false
  268. // If badger is running in memory mode, push everything into the LSM Tree.
  269. db.opt.ValueThreshold = math.MaxInt32
  270. }
  271. krOpt := KeyRegistryOptions{
  272. ReadOnly: opt.ReadOnly,
  273. Dir: opt.Dir,
  274. EncryptionKey: opt.EncryptionKey,
  275. EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
  276. InMemory: opt.InMemory,
  277. }
  278. if db.registry, err = OpenKeyRegistry(krOpt); err != nil {
  279. return db, err
  280. }
  281. db.calculateSize()
  282. db.closers.updateSize = z.NewCloser(1)
  283. go db.updateSize(db.closers.updateSize)
  284. if err := db.openMemTables(db.opt); err != nil {
  285. return nil, y.Wrapf(err, "while opening memtables")
  286. }
  287. if !db.opt.ReadOnly {
  288. if db.mt, err = db.newMemTable(); err != nil {
  289. return nil, y.Wrapf(err, "cannot create memtable")
  290. }
  291. }
  292. // newLevelsController potentially loads files in directory.
  293. if db.lc, err = newLevelsController(db, &manifest); err != nil {
  294. return db, err
  295. }
  296. // Initialize vlog struct.
  297. db.vlog.init(db)
  298. if !opt.ReadOnly {
  299. db.closers.compactors = z.NewCloser(1)
  300. db.lc.startCompact(db.closers.compactors)
  301. db.closers.memtable = z.NewCloser(1)
  302. go func() {
  303. db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
  304. }()
  305. // Flush them to disk asap.
  306. for _, mt := range db.imm {
  307. db.flushChan <- mt
  308. }
  309. }
  310. // We do increment nextTxnTs below. So, no need to do it here.
  311. db.orc.nextTxnTs = db.MaxVersion()
  312. db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
  313. if err = db.vlog.open(db); err != nil {
  314. return db, y.Wrapf(err, "During db.vlog.open")
  315. }
  316. // Let's advance nextTxnTs to one more than whatever we observed via
  317. // replaying the logs.
  318. db.orc.txnMark.Done(db.orc.nextTxnTs)
  319. // In normal mode, we must update readMark so older versions of keys can be removed during
  320. // compaction when run in offline mode via the flatten tool.
  321. db.orc.readMark.Done(db.orc.nextTxnTs)
  322. db.orc.incrementNextTs()
  323. go db.threshold.listenForValueThresholdUpdate()
  324. if err := db.initBannedNamespaces(); err != nil {
  325. return db, errors.Wrapf(err, "While setting banned keys")
  326. }
  327. db.closers.writes = z.NewCloser(1)
  328. go db.doWrites(db.closers.writes)
  329. if !db.opt.InMemory {
  330. db.closers.valueGC = z.NewCloser(1)
  331. go db.vlog.waitOnGC(db.closers.valueGC)
  332. }
  333. db.closers.pub = z.NewCloser(1)
  334. go db.pub.listenForUpdates(db.closers.pub)
  335. valueDirLockGuard = nil
  336. dirLockGuard = nil
  337. manifestFile = nil
  338. return db, nil
  339. }
  340. // initBannedNamespaces retrieves the banned namespaces from the DB and updates in-memory structure.
  341. func (db *DB) initBannedNamespaces() error {
  342. if db.opt.NamespaceOffset < 0 {
  343. return nil
  344. }
  345. return db.View(func(txn *Txn) error {
  346. iopts := DefaultIteratorOptions
  347. iopts.Prefix = bannedNsKey
  348. iopts.PrefetchValues = false
  349. iopts.InternalAccess = true
  350. itr := txn.NewIterator(iopts)
  351. defer itr.Close()
  352. for itr.Rewind(); itr.Valid(); itr.Next() {
  353. key := y.BytesToU64(itr.Item().Key()[len(bannedNsKey):])
  354. db.bannedNamespaces.add(key)
  355. }
  356. return nil
  357. })
  358. }
  359. func (db *DB) MaxVersion() uint64 {
  360. var maxVersion uint64
  361. update := func(a uint64) {
  362. if a > maxVersion {
  363. maxVersion = a
  364. }
  365. }
  366. db.lock.Lock()
  367. // In read only mode, we do not create new mem table.
  368. if !db.opt.ReadOnly {
  369. update(db.mt.maxVersion)
  370. }
  371. for _, mt := range db.imm {
  372. update(mt.maxVersion)
  373. }
  374. db.lock.Unlock()
  375. for _, ti := range db.Tables() {
  376. update(ti.MaxVersion)
  377. }
  378. return maxVersion
  379. }
  380. func (db *DB) monitorCache(c *z.Closer) {
  381. defer c.Done()
  382. count := 0
  383. analyze := func(name string, metrics *ristretto.Metrics) {
  384. // If the mean life expectancy is less than 10 seconds, the cache
  385. // might be too small.
  386. le := metrics.LifeExpectancySeconds()
  387. if le == nil {
  388. return
  389. }
  390. lifeTooShort := le.Count > 0 && float64(le.Sum)/float64(le.Count) < 10
  391. hitRatioTooLow := metrics.Ratio() > 0 && metrics.Ratio() < 0.4
  392. if lifeTooShort && hitRatioTooLow {
  393. db.opt.Warningf("%s might be too small. Metrics: %s\n", name, metrics)
  394. db.opt.Warningf("Cache life expectancy (in seconds): %+v\n", le)
  395. } else if le.Count > 1000 && count%5 == 0 {
  396. db.opt.Infof("%s metrics: %s\n", name, metrics)
  397. }
  398. }
  399. ticker := time.NewTicker(1 * time.Minute)
  400. defer ticker.Stop()
  401. for {
  402. select {
  403. case <-c.HasBeenClosed():
  404. return
  405. case <-ticker.C:
  406. }
  407. analyze("Block cache", db.BlockCacheMetrics())
  408. analyze("Index cache", db.IndexCacheMetrics())
  409. count++
  410. }
  411. }
  412. // cleanup stops all the goroutines started by badger. This is used in open to
  413. // cleanup goroutines in case of an error.
  414. func (db *DB) cleanup() {
  415. db.stopMemoryFlush()
  416. db.stopCompactions()
  417. db.blockCache.Close()
  418. db.indexCache.Close()
  419. if db.closers.updateSize != nil {
  420. db.closers.updateSize.Signal()
  421. }
  422. if db.closers.valueGC != nil {
  423. db.closers.valueGC.Signal()
  424. }
  425. if db.closers.writes != nil {
  426. db.closers.writes.Signal()
  427. }
  428. if db.closers.pub != nil {
  429. db.closers.pub.Signal()
  430. }
  431. db.orc.Stop()
  432. // Do not use vlog.Close() here. vlog.Close truncates the files. We don't
  433. // want to truncate files unless the user has specified the truncate flag.
  434. }
  435. // BlockCacheMetrics returns the metrics for the underlying block cache.
  436. func (db *DB) BlockCacheMetrics() *ristretto.Metrics {
  437. if db.blockCache != nil {
  438. return db.blockCache.Metrics
  439. }
  440. return nil
  441. }
  442. // IndexCacheMetrics returns the metrics for the underlying index cache.
  443. func (db *DB) IndexCacheMetrics() *ristretto.Metrics {
  444. if db.indexCache != nil {
  445. return db.indexCache.Metrics
  446. }
  447. return nil
  448. }
  449. // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
  450. // disk. Calling DB.Close() multiple times would still only close the DB once.
  451. func (db *DB) Close() error {
  452. var err error
  453. db.closeOnce.Do(func() {
  454. err = db.close()
  455. })
  456. return err
  457. }
  458. // IsClosed denotes if the badger DB is closed or not. A DB instance should not
  459. // be used after closing it.
  460. func (db *DB) IsClosed() bool {
  461. return db.isClosed.Load() == 1
  462. }
  463. func (db *DB) close() (err error) {
  464. defer db.allocPool.Release()
  465. db.opt.Debugf("Closing database")
  466. db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs.Load()))
  467. db.blockWrites.Store(1)
  468. db.isClosed.Store(1)
  469. if !db.opt.InMemory {
  470. // Stop value GC first.
  471. db.closers.valueGC.SignalAndWait()
  472. }
  473. // Stop writes next.
  474. db.closers.writes.SignalAndWait()
  475. // Don't accept any more write.
  476. close(db.writeCh)
  477. db.closers.pub.SignalAndWait()
  478. db.closers.cacheHealth.Signal()
  479. // Make sure that block writer is done pushing stuff into memtable!
  480. // Otherwise, you will have a race condition: we are trying to flush memtables
  481. // and remove them completely, while the block / memtable writer is still
  482. // trying to push stuff into the memtable. This will also resolve the value
  483. // offset problem: as we push into memtable, we update value offsets there.
  484. if db.mt != nil {
  485. if db.mt.sl.Empty() {
  486. // Remove the memtable if empty.
  487. db.mt.DecrRef()
  488. } else {
  489. db.opt.Debugf("Flushing memtable")
  490. for {
  491. pushedMemTable := func() bool {
  492. db.lock.Lock()
  493. defer db.lock.Unlock()
  494. y.AssertTrue(db.mt != nil)
  495. select {
  496. case db.flushChan <- db.mt:
  497. db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
  498. db.mt = nil // Will segfault if we try writing!
  499. db.opt.Debugf("pushed to flush chan\n")
  500. return true
  501. default:
  502. // If we fail to push, we need to unlock and wait for a short while.
  503. // The flushing operation needs to update s.imm. Otherwise, we have a
  504. // deadlock.
  505. // TODO: Think about how to do this more cleanly, maybe without any locks.
  506. }
  507. return false
  508. }()
  509. if pushedMemTable {
  510. break
  511. }
  512. time.Sleep(10 * time.Millisecond)
  513. }
  514. }
  515. }
  516. db.stopMemoryFlush()
  517. db.stopCompactions()
  518. // Force Compact L0
  519. // We don't need to care about cstatus since no parallel compaction is running.
  520. if db.opt.CompactL0OnClose {
  521. err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
  522. switch err {
  523. case errFillTables:
  524. // This error only means that there might be enough tables to do a compaction. So, we
  525. // should not report it to the end user to avoid confusing them.
  526. case nil:
  527. db.opt.Debugf("Force compaction on level 0 done")
  528. default:
  529. db.opt.Warningf("While forcing compaction on level 0: %v", err)
  530. }
  531. }
  532. // Now close the value log.
  533. if vlogErr := db.vlog.Close(); vlogErr != nil {
  534. err = y.Wrap(vlogErr, "DB.Close")
  535. }
  536. db.opt.Infof(db.LevelsToString())
  537. if lcErr := db.lc.close(); err == nil {
  538. err = y.Wrap(lcErr, "DB.Close")
  539. }
  540. db.opt.Debugf("Waiting for closer")
  541. db.closers.updateSize.SignalAndWait()
  542. db.orc.Stop()
  543. db.blockCache.Close()
  544. db.indexCache.Close()
  545. db.threshold.close()
  546. if db.opt.InMemory {
  547. return
  548. }
  549. if db.dirLockGuard != nil {
  550. if guardErr := db.dirLockGuard.release(); err == nil {
  551. err = y.Wrap(guardErr, "DB.Close")
  552. }
  553. }
  554. if db.valueDirGuard != nil {
  555. if guardErr := db.valueDirGuard.release(); err == nil {
  556. err = y.Wrap(guardErr, "DB.Close")
  557. }
  558. }
  559. if manifestErr := db.manifest.close(); err == nil {
  560. err = y.Wrap(manifestErr, "DB.Close")
  561. }
  562. if registryErr := db.registry.Close(); err == nil {
  563. err = y.Wrap(registryErr, "DB.Close")
  564. }
  565. // Fsync directories to ensure that lock file, and any other removed files whose directory
  566. // we haven't specifically fsynced, are guaranteed to have their directory entry removal
  567. // persisted to disk.
  568. if syncErr := db.syncDir(db.opt.Dir); err == nil {
  569. err = y.Wrap(syncErr, "DB.Close")
  570. }
  571. if syncErr := db.syncDir(db.opt.ValueDir); err == nil {
  572. err = y.Wrap(syncErr, "DB.Close")
  573. }
  574. return err
  575. }
  576. // VerifyChecksum verifies checksum for all tables on all levels.
  577. // This method can be used to verify checksum, if opt.ChecksumVerificationMode is NoVerification.
  578. func (db *DB) VerifyChecksum() error {
  579. return db.lc.verifyChecksum()
  580. }
  581. const (
  582. lockFile = "LOCK"
  583. )
  584. // Sync syncs database content to disk. This function provides
  585. // more control to user to sync data whenever required.
  586. func (db *DB) Sync() error {
  587. /**
  588. Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
  589. Cases:
  590. - All_ok :: If both the logs sync successfully.
  591. - Entry_Lost :: If an entry with a value pointer was present in the active memtable's WAL,
  592. :: and the WAL was synced but there was an error in syncing the vLog.
  593. :: The entry will be considered lost and this case will need to be handled during recovery.
  594. - Entries_Lost :: If there were errors in syncing both the logs, multiple entries would be lost.
  595. - Entries_Lost :: If the active memtable's WAL is not synced but the vLog is synced, it will
  596. :: result in entries being lost because recovery of the active memtable is done from its WAL.
  597. :: Check `UpdateSkipList` in memtable.go.
  598. - Nothing_lost :: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
  599. :: but there was an error in syncing the vLog.
  600. :: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.
  601. - Partially_lost :: If entries were written partially in either of the logs,
  602. :: the logs will be truncated during recovery.
  603. :: As a result of truncation, some entries might be lost.
  604. :: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
  605. :: of data and then the machine shuts down or the disk failure happens,
  606. :: this will result in partial writes. [[This case needs verification]]
  607. */
  608. db.lock.RLock()
  609. memtableSyncError := db.mt.SyncWAL()
  610. db.lock.RUnlock()
  611. vLogSyncError := db.vlog.sync()
  612. return y.CombineErrors(memtableSyncError, vLogSyncError)
  613. }
  614. // getMemtables returns the current memtables and get references.
  615. func (db *DB) getMemTables() ([]*memTable, func()) {
  616. db.lock.RLock()
  617. defer db.lock.RUnlock()
  618. var tables []*memTable
  619. // Mutable memtable does not exist in read-only mode.
  620. if !db.opt.ReadOnly {
  621. // Get mutable memtable.
  622. tables = append(tables, db.mt)
  623. db.mt.IncrRef()
  624. }
  625. // Get immutable memtables.
  626. last := len(db.imm) - 1
  627. for i := range db.imm {
  628. tables = append(tables, db.imm[last-i])
  629. db.imm[last-i].IncrRef()
  630. }
  631. return tables, func() {
  632. for _, tbl := range tables {
  633. tbl.DecrRef()
  634. }
  635. }
  636. }
  637. // get returns the value in memtable or disk for given key.
  638. // Note that value will include meta byte.
  639. //
  640. // IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
  641. // maintain this invariant to search for the latest value of a key, or else we need to search in all
  642. // tables and find the max version among them. To maintain this invariant, we also need to ensure
  643. // that all versions of a key are always present in the same table from level 1, because compaction
  644. // can push any table down.
  645. //
  646. // Update(23/09/2020) - We have dropped the move key implementation. Earlier we
  647. // were inserting move keys to fix the invalid value pointers but we no longer
  648. // do that. For every get("fooX") call where X is the version, we will search
  649. // for "fooX" in all the levels of the LSM tree. This is expensive but it
  650. // removes the overhead of handling move keys completely.
  651. func (db *DB) get(key []byte) (y.ValueStruct, error) {
  652. if db.IsClosed() {
  653. return y.ValueStruct{}, ErrDBClosed
  654. }
  655. tables, decr := db.getMemTables() // Lock should be released.
  656. defer decr()
  657. var maxVs y.ValueStruct
  658. version := y.ParseTs(key)
  659. y.NumGetsAdd(db.opt.MetricsEnabled, 1)
  660. for i := 0; i < len(tables); i++ {
  661. vs := tables[i].sl.Get(key)
  662. y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1)
  663. if vs.Meta == 0 && vs.Value == nil {
  664. continue
  665. }
  666. // Found the required version of the key, return immediately.
  667. if vs.Version == version {
  668. y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
  669. return vs, nil
  670. }
  671. if maxVs.Version < vs.Version {
  672. maxVs = vs
  673. }
  674. }
  675. return db.lc.get(key, maxVs, 0)
  676. }
  677. var requestPool = sync.Pool{
  678. New: func() interface{} {
  679. return new(request)
  680. },
  681. }
  682. func (db *DB) writeToLSM(b *request) error {
  683. // We should check the length of b.Prts and b.Entries only when badger is not
  684. // running in InMemory mode. In InMemory mode, we don't write anything to the
  685. // value log and that's why the length of b.Ptrs will always be zero.
  686. if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
  687. return errors.Errorf("Ptrs and Entries don't match: %+v", b)
  688. }
  689. for i, entry := range b.Entries {
  690. var err error
  691. if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
  692. // Will include deletion / tombstone case.
  693. err = db.mt.Put(entry.Key,
  694. y.ValueStruct{
  695. Value: entry.Value,
  696. // Ensure value pointer flag is removed. Otherwise, the value will fail
  697. // to be retrieved during iterator prefetch. `bitValuePointer` is only
  698. // known to be set in write to LSM when the entry is loaded from a backup
  699. // with lower ValueThreshold and its value was stored in the value log.
  700. Meta: entry.meta &^ bitValuePointer,
  701. UserMeta: entry.UserMeta,
  702. ExpiresAt: entry.ExpiresAt,
  703. })
  704. } else {
  705. // Write pointer to Memtable.
  706. err = db.mt.Put(entry.Key,
  707. y.ValueStruct{
  708. Value: b.Ptrs[i].Encode(),
  709. Meta: entry.meta | bitValuePointer,
  710. UserMeta: entry.UserMeta,
  711. ExpiresAt: entry.ExpiresAt,
  712. })
  713. }
  714. if err != nil {
  715. return y.Wrapf(err, "while writing to memTable")
  716. }
  717. }
  718. if db.opt.SyncWrites {
  719. return db.mt.SyncWAL()
  720. }
  721. return nil
  722. }
  723. // writeRequests is called serially by only one goroutine.
  724. func (db *DB) writeRequests(reqs []*request) error {
  725. if len(reqs) == 0 {
  726. return nil
  727. }
  728. done := func(err error) {
  729. for _, r := range reqs {
  730. r.Err = err
  731. r.Wg.Done()
  732. }
  733. }
  734. db.opt.Debugf("writeRequests called. Writing to value log")
  735. err := db.vlog.write(reqs)
  736. if err != nil {
  737. done(err)
  738. return err
  739. }
  740. db.opt.Debugf("Writing to memtable")
  741. var count int
  742. for _, b := range reqs {
  743. if len(b.Entries) == 0 {
  744. continue
  745. }
  746. count += len(b.Entries)
  747. var i uint64
  748. var err error
  749. for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
  750. i++
  751. if i%100 == 0 {
  752. db.opt.Debugf("Making room for writes")
  753. }
  754. // We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
  755. // When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
  756. // you will get a deadlock.
  757. time.Sleep(10 * time.Millisecond)
  758. }
  759. if err != nil {
  760. done(err)
  761. return y.Wrap(err, "writeRequests")
  762. }
  763. if err := db.writeToLSM(b); err != nil {
  764. done(err)
  765. return y.Wrap(err, "writeRequests")
  766. }
  767. }
  768. db.opt.Debugf("Sending updates to subscribers")
  769. db.pub.sendUpdates(reqs)
  770. done(nil)
  771. db.opt.Debugf("%d entries written", count)
  772. return nil
  773. }
  774. func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
  775. if db.blockWrites.Load() == 1 {
  776. return nil, ErrBlockedWrites
  777. }
  778. var count, size int64
  779. for _, e := range entries {
  780. size += e.estimateSizeAndSetThreshold(db.valueThreshold())
  781. count++
  782. }
  783. y.NumBytesWrittenUserAdd(db.opt.MetricsEnabled, size)
  784. if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
  785. return nil, ErrTxnTooBig
  786. }
  787. // We can only service one request because we need each txn to be stored in a contiguous section.
  788. // Txns should not interleave among other txns or rewrites.
  789. req := requestPool.Get().(*request)
  790. req.reset()
  791. req.Entries = entries
  792. req.Wg.Add(1)
  793. req.IncrRef() // for db write
  794. db.writeCh <- req // Handled in doWrites.
  795. y.NumPutsAdd(db.opt.MetricsEnabled, int64(len(entries)))
  796. return req, nil
  797. }
  798. func (db *DB) doWrites(lc *z.Closer) {
  799. defer lc.Done()
  800. pendingCh := make(chan struct{}, 1)
  801. writeRequests := func(reqs []*request) {
  802. if err := db.writeRequests(reqs); err != nil {
  803. db.opt.Errorf("writeRequests: %v", err)
  804. }
  805. <-pendingCh
  806. }
  807. // This variable tracks the number of pending writes.
  808. reqLen := new(expvar.Int)
  809. y.PendingWritesSet(db.opt.MetricsEnabled, db.opt.Dir, reqLen)
  810. reqs := make([]*request, 0, 10)
  811. for {
  812. var r *request
  813. select {
  814. case r = <-db.writeCh:
  815. case <-lc.HasBeenClosed():
  816. goto closedCase
  817. }
  818. for {
  819. reqs = append(reqs, r)
  820. reqLen.Set(int64(len(reqs)))
  821. if len(reqs) >= 3*kvWriteChCapacity {
  822. pendingCh <- struct{}{} // blocking.
  823. goto writeCase
  824. }
  825. select {
  826. // Either push to pending, or continue to pick from writeCh.
  827. case r = <-db.writeCh:
  828. case pendingCh <- struct{}{}:
  829. goto writeCase
  830. case <-lc.HasBeenClosed():
  831. goto closedCase
  832. }
  833. }
  834. closedCase:
  835. // All the pending request are drained.
  836. // Don't close the writeCh, because it has be used in several places.
  837. for {
  838. select {
  839. case r = <-db.writeCh:
  840. reqs = append(reqs, r)
  841. default:
  842. pendingCh <- struct{}{} // Push to pending before doing a write.
  843. writeRequests(reqs)
  844. return
  845. }
  846. }
  847. writeCase:
  848. go writeRequests(reqs)
  849. reqs = make([]*request, 0, 10)
  850. reqLen.Set(0)
  851. }
  852. }
  853. // batchSet applies a list of badger.Entry. If a request level error occurs it
  854. // will be returned.
  855. //
  856. // Check(kv.BatchSet(entries))
  857. func (db *DB) batchSet(entries []*Entry) error {
  858. req, err := db.sendToWriteCh(entries)
  859. if err != nil {
  860. return err
  861. }
  862. return req.Wait()
  863. }
  864. // batchSetAsync is the asynchronous version of batchSet. It accepts a callback
  865. // function which is called when all the sets are complete. If a request level
  866. // error occurs, it will be passed back via the callback.
  867. //
  868. // err := kv.BatchSetAsync(entries, func(err error)) {
  869. // Check(err)
  870. // }
  871. func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
  872. req, err := db.sendToWriteCh(entries)
  873. if err != nil {
  874. return err
  875. }
  876. go func() {
  877. err := req.Wait()
  878. // Write is complete. Let's call the callback function now.
  879. f(err)
  880. }()
  881. return nil
  882. }
  883. var errNoRoom = stderrors.New("No room for write")
  884. // ensureRoomForWrite is always called serially.
  885. func (db *DB) ensureRoomForWrite() error {
  886. var err error
  887. db.lock.Lock()
  888. defer db.lock.Unlock()
  889. y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
  890. if !db.mt.isFull() {
  891. return nil
  892. }
  893. select {
  894. case db.flushChan <- db.mt:
  895. db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
  896. db.mt.sl.MemSize(), len(db.flushChan))
  897. // We manage to push this task. Let's modify imm.
  898. db.imm = append(db.imm, db.mt)
  899. db.mt, err = db.newMemTable()
  900. if err != nil {
  901. return y.Wrapf(err, "cannot create new mem table")
  902. }
  903. // New memtable is empty. We certainly have room.
  904. return nil
  905. default:
  906. // We need to do this to unlock and allow the flusher to modify imm.
  907. return errNoRoom
  908. }
  909. }
  910. func arenaSize(opt Options) int64 {
  911. return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
  912. }
  913. // buildL0Table builds a new table from the memtable.
  914. func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
  915. defer iter.Close()
  916. b := table.NewTableBuilder(bopts)
  917. for iter.Rewind(); iter.Valid(); iter.Next() {
  918. if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
  919. continue
  920. }
  921. vs := iter.Value()
  922. var vp valuePointer
  923. if vs.Meta&bitValuePointer > 0 {
  924. vp.Decode(vs.Value)
  925. }
  926. b.Add(iter.Key(), iter.Value(), vp.Len)
  927. }
  928. return b
  929. }
  930. // handleMemTableFlush must be run serially.
  931. func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
  932. bopts := buildTableOptions(db)
  933. itr := mt.sl.NewUniIterator(false)
  934. builder := buildL0Table(itr, nil, bopts)
  935. defer builder.Close()
  936. // buildL0Table can return nil if the none of the items in the skiplist are
  937. // added to the builder. This can happen when drop prefix is set and all
  938. // the items are skipped.
  939. if builder.Empty() {
  940. builder.Finish()
  941. return nil
  942. }
  943. fileID := db.lc.reserveFileID()
  944. var tbl *table.Table
  945. var err error
  946. if db.opt.InMemory {
  947. data := builder.Finish()
  948. tbl, err = table.OpenInMemoryTable(data, fileID, &bopts)
  949. } else {
  950. tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), builder)
  951. }
  952. if err != nil {
  953. return y.Wrap(err, "error while creating table")
  954. }
  955. // We own a ref on tbl.
  956. err = db.lc.addLevel0Table(tbl) // This will incrRef
  957. _ = tbl.DecrRef() // Releases our ref.
  958. return err
  959. }
  960. // flushMemtable must keep running until we send it an empty memtable. If there
  961. // are errors during handling the memtable flush, we'll retry indefinitely.
  962. func (db *DB) flushMemtable(lc *z.Closer) {
  963. defer lc.Done()
  964. for mt := range db.flushChan {
  965. if mt == nil {
  966. continue
  967. }
  968. for {
  969. if err := db.handleMemTableFlush(mt, nil); err != nil {
  970. // Encountered error. Retry indefinitely.
  971. db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
  972. time.Sleep(time.Second)
  973. continue
  974. }
  975. // Update s.imm. Need a lock.
  976. db.lock.Lock()
  977. // This is a single-threaded operation. mt corresponds to the head of
  978. // db.imm list. Once we flush it, we advance db.imm. The next mt
  979. // which would arrive here would match db.imm[0], because we acquire a
  980. // lock over DB when pushing to flushChan.
  981. // TODO: This logic is dirty AF. Any change and this could easily break.
  982. y.AssertTrue(mt == db.imm[0])
  983. db.imm = db.imm[1:]
  984. mt.DecrRef() // Return memory.
  985. // unlock
  986. db.lock.Unlock()
  987. break
  988. }
  989. }
  990. }
  991. func exists(path string) (bool, error) {
  992. _, err := os.Stat(path)
  993. if err == nil {
  994. return true, nil
  995. }
  996. if os.IsNotExist(err) {
  997. return false, nil
  998. }
  999. return true, err
  1000. }
  1001. // This function does a filewalk, calculates the size of vlog and sst files and stores it in
  1002. // y.LSMSize and y.VlogSize.
  1003. func (db *DB) calculateSize() {
  1004. if db.opt.InMemory {
  1005. return
  1006. }
  1007. newInt := func(val int64) *expvar.Int {
  1008. v := new(expvar.Int)
  1009. v.Add(val)
  1010. return v
  1011. }
  1012. totalSize := func(dir string) (int64, int64) {
  1013. var lsmSize, vlogSize int64
  1014. err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
  1015. if err != nil {
  1016. return err
  1017. }
  1018. ext := filepath.Ext(path)
  1019. switch ext {
  1020. case ".sst":
  1021. lsmSize += info.Size()
  1022. case ".vlog":
  1023. vlogSize += info.Size()
  1024. }
  1025. return nil
  1026. })
  1027. if err != nil {
  1028. db.opt.Debugf("Got error while calculating total size of directory: %s", dir)
  1029. }
  1030. return lsmSize, vlogSize
  1031. }
  1032. lsmSize, vlogSize := totalSize(db.opt.Dir)
  1033. y.LSMSizeSet(db.opt.MetricsEnabled, db.opt.Dir, newInt(lsmSize))
  1034. // If valueDir is different from dir, we'd have to do another walk.
  1035. if db.opt.ValueDir != db.opt.Dir {
  1036. _, vlogSize = totalSize(db.opt.ValueDir)
  1037. }
  1038. y.VlogSizeSet(db.opt.MetricsEnabled, db.opt.ValueDir, newInt(vlogSize))
  1039. }
  1040. func (db *DB) updateSize(lc *z.Closer) {
  1041. defer lc.Done()
  1042. if db.opt.InMemory {
  1043. return
  1044. }
  1045. metricsTicker := time.NewTicker(time.Minute)
  1046. defer metricsTicker.Stop()
  1047. for {
  1048. select {
  1049. case <-metricsTicker.C:
  1050. db.calculateSize()
  1051. case <-lc.HasBeenClosed():
  1052. return
  1053. }
  1054. }
  1055. }
  1056. // RunValueLogGC triggers a value log garbage collection.
  1057. //
  1058. // It picks value log files to perform GC based on statistics that are collected
  1059. // during compactions. If no such statistics are available, then log files are
  1060. // picked in random order. The process stops as soon as the first log file is
  1061. // encountered which does not result in garbage collection.
  1062. //
  1063. // When a log file is picked, it is first sampled. If the sample shows that we
  1064. // can discard at least discardRatio space of that file, it would be rewritten.
  1065. //
  1066. // If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
  1067. // thrown indicating that the call resulted in no file rewrites.
  1068. //
  1069. // We recommend setting discardRatio to 0.5, thus indicating that a file be
  1070. // rewritten if half the space can be discarded. This results in a lifetime
  1071. // value log write amplification of 2 (1 from original write + 0.5 rewrite +
  1072. // 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
  1073. // space reclaims, while setting it to a lower value would result in more space
  1074. // reclaims at the cost of increased activity on the LSM tree. discardRatio
  1075. // must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
  1076. // ErrInvalidRequest is returned.
  1077. //
  1078. // Only one GC is allowed at a time. If another value log GC is running, or DB
  1079. // has been closed, this would return an ErrRejected.
  1080. //
  1081. // Note: Every time GC is run, it would produce a spike of activity on the LSM
  1082. // tree.
  1083. func (db *DB) RunValueLogGC(discardRatio float64) error {
  1084. if db.opt.InMemory {
  1085. return ErrGCInMemoryMode
  1086. }
  1087. if discardRatio >= 1.0 || discardRatio <= 0.0 {
  1088. return ErrInvalidRequest
  1089. }
  1090. // Pick a log file and run GC
  1091. return db.vlog.runGC(discardRatio)
  1092. }
  1093. // Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
  1094. // call RunValueLogGC.
  1095. func (db *DB) Size() (lsm, vlog int64) {
  1096. if y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir) == nil {
  1097. lsm, vlog = 0, 0
  1098. return
  1099. }
  1100. lsm = y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir).(*expvar.Int).Value()
  1101. vlog = y.VlogSizeGet(db.opt.MetricsEnabled, db.opt.ValueDir).(*expvar.Int).Value()
  1102. return
  1103. }
  1104. // Sequence represents a Badger sequence.
  1105. type Sequence struct {
  1106. lock sync.Mutex
  1107. db *DB
  1108. key []byte
  1109. next uint64
  1110. leased uint64
  1111. bandwidth uint64
  1112. }
  1113. // Next would return the next integer in the sequence, updating the lease by running a transaction
  1114. // if needed.
  1115. func (seq *Sequence) Next() (uint64, error) {
  1116. seq.lock.Lock()
  1117. defer seq.lock.Unlock()
  1118. if seq.next >= seq.leased {
  1119. if err := seq.updateLease(); err != nil {
  1120. return 0, err
  1121. }
  1122. }
  1123. val := seq.next
  1124. seq.next++
  1125. return val, nil
  1126. }
  1127. // Release the leased sequence to avoid wasted integers. This should be done right
  1128. // before closing the associated DB. However it is valid to use the sequence after
  1129. // it was released, causing a new lease with full bandwidth.
  1130. func (seq *Sequence) Release() error {
  1131. seq.lock.Lock()
  1132. defer seq.lock.Unlock()
  1133. err := seq.db.Update(func(txn *Txn) error {
  1134. item, err := txn.Get(seq.key)
  1135. if err != nil {
  1136. return err
  1137. }
  1138. var num uint64
  1139. if err := item.Value(func(v []byte) error {
  1140. num = binary.BigEndian.Uint64(v)
  1141. return nil
  1142. }); err != nil {
  1143. return err
  1144. }
  1145. if num == seq.leased {
  1146. var buf [8]byte
  1147. binary.BigEndian.PutUint64(buf[:], seq.next)
  1148. return txn.SetEntry(NewEntry(seq.key, buf[:]))
  1149. }
  1150. return nil
  1151. })
  1152. if err != nil {
  1153. return err
  1154. }
  1155. seq.leased = seq.next
  1156. return nil
  1157. }
  1158. func (seq *Sequence) updateLease() error {
  1159. return seq.db.Update(func(txn *Txn) error {
  1160. item, err := txn.Get(seq.key)
  1161. switch {
  1162. case err == ErrKeyNotFound:
  1163. seq.next = 0
  1164. case err != nil:
  1165. return err
  1166. default:
  1167. var num uint64
  1168. if err := item.Value(func(v []byte) error {
  1169. num = binary.BigEndian.Uint64(v)
  1170. return nil
  1171. }); err != nil {
  1172. return err
  1173. }
  1174. seq.next = num
  1175. }
  1176. lease := seq.next + seq.bandwidth
  1177. var buf [8]byte
  1178. binary.BigEndian.PutUint64(buf[:], lease)
  1179. if err = txn.SetEntry(NewEntry(seq.key, buf[:])); err != nil {
  1180. return err
  1181. }
  1182. seq.leased = lease
  1183. return nil
  1184. })
  1185. }
  1186. // GetSequence would initiate a new sequence object, generating it from the stored lease, if
  1187. // available, in the database. Sequence can be used to get a list of monotonically increasing
  1188. // integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
  1189. // size of the lease, determining how many Next() requests can be served from memory.
  1190. //
  1191. // GetSequence is not supported on ManagedDB. Calling this would result in a panic.
  1192. func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
  1193. if db.opt.managedTxns {
  1194. panic("Cannot use GetSequence with managedDB=true.")
  1195. }
  1196. switch {
  1197. case len(key) == 0:
  1198. return nil, ErrEmptyKey
  1199. case bandwidth == 0:
  1200. return nil, ErrZeroBandwidth
  1201. }
  1202. seq := &Sequence{
  1203. db: db,
  1204. key: key,
  1205. next: 0,
  1206. leased: 0,
  1207. bandwidth: bandwidth,
  1208. }
  1209. err := seq.updateLease()
  1210. return seq, err
  1211. }
  1212. // Tables gets the TableInfo objects from the level controller. If withKeysCount
  1213. // is true, TableInfo objects also contain counts of keys for the tables.
  1214. func (db *DB) Tables() []TableInfo {
  1215. return db.lc.getTableInfo()
  1216. }
  1217. // Levels gets the LevelInfo.
  1218. func (db *DB) Levels() []LevelInfo {
  1219. return db.lc.getLevelInfo()
  1220. }
  1221. // EstimateSize can be used to get rough estimate of data size for a given prefix.
  1222. func (db *DB) EstimateSize(prefix []byte) (uint64, uint64) {
  1223. var onDiskSize, uncompressedSize uint64
  1224. tables := db.Tables()
  1225. for _, ti := range tables {
  1226. if bytes.HasPrefix(ti.Left, prefix) && bytes.HasPrefix(ti.Right, prefix) {
  1227. onDiskSize += uint64(ti.OnDiskSize)
  1228. uncompressedSize += uint64(ti.UncompressedSize)
  1229. }
  1230. }
  1231. return onDiskSize, uncompressedSize
  1232. }
  1233. // Ranges can be used to get rough key ranges to divide up iteration over the DB. The ranges here
  1234. // would consider the prefix, but would not necessarily start or end with the prefix. In fact, the
  1235. // first range would have nil as left key, and the last range would have nil as the right key.
  1236. func (db *DB) Ranges(prefix []byte, numRanges int) []*keyRange {
  1237. var splits []string
  1238. tables := db.Tables()
  1239. // We just want table ranges here and not keys count.
  1240. for _, ti := range tables {
  1241. // We don't use ti.Left, because that has a tendency to store !badger keys. Skip over tables
  1242. // at upper levels. Only choose tables from the last level.
  1243. if ti.Level != db.opt.MaxLevels-1 {
  1244. continue
  1245. }
  1246. if bytes.HasPrefix(ti.Right, prefix) {
  1247. splits = append(splits, string(ti.Right))
  1248. }
  1249. }
  1250. // If the number of splits is low, look at the offsets inside the
  1251. // tables to generate more splits.
  1252. if len(splits) < 32 {
  1253. numTables := len(tables)
  1254. if numTables == 0 {
  1255. numTables = 1
  1256. }
  1257. numPerTable := 32 / numTables
  1258. if numPerTable == 0 {
  1259. numPerTable = 1
  1260. }
  1261. splits = db.lc.keySplits(numPerTable, prefix)
  1262. }
  1263. // If the number of splits is still < 32, then look at the memtables.
  1264. if len(splits) < 32 {
  1265. maxPerSplit := 10000
  1266. mtSplits := func(mt *memTable) {
  1267. if mt == nil {
  1268. return
  1269. }
  1270. count := 0
  1271. iter := mt.sl.NewIterator()
  1272. for iter.SeekToFirst(); iter.Valid(); iter.Next() {
  1273. if count%maxPerSplit == 0 {
  1274. // Add a split every maxPerSplit keys.
  1275. if bytes.HasPrefix(iter.Key(), prefix) {
  1276. splits = append(splits, string(iter.Key()))
  1277. }
  1278. }
  1279. count += 1
  1280. }
  1281. _ = iter.Close()
  1282. }
  1283. db.lock.Lock()
  1284. defer db.lock.Unlock()
  1285. var memTables []*memTable
  1286. memTables = append(memTables, db.imm...)
  1287. for _, mt := range memTables {
  1288. mtSplits(mt)
  1289. }
  1290. mtSplits(db.mt)
  1291. }
  1292. // We have our splits now. Let's convert them to ranges.
  1293. sort.Strings(splits)
  1294. var ranges []*keyRange
  1295. var start []byte
  1296. for _, key := range splits {
  1297. ranges = append(ranges, &keyRange{left: start, right: y.SafeCopy(nil, []byte(key))})
  1298. start = y.SafeCopy(nil, []byte(key))
  1299. }
  1300. ranges = append(ranges, &keyRange{left: start})
  1301. // Figure out the approximate table size this range has to deal with.
  1302. for _, t := range tables {
  1303. tr := keyRange{left: t.Left, right: t.Right}
  1304. for _, r := range ranges {
  1305. if len(r.left) == 0 || len(r.right) == 0 {
  1306. continue
  1307. }
  1308. if r.overlapsWith(tr) {
  1309. r.size += int64(t.UncompressedSize)
  1310. }
  1311. }
  1312. }
  1313. var total int64
  1314. for _, r := range ranges {
  1315. total += r.size
  1316. }
  1317. if total == 0 {
  1318. return ranges
  1319. }
  1320. // Figure out the average size, so we know how to bin the ranges together.
  1321. avg := total / int64(numRanges)
  1322. var out []*keyRange
  1323. var i int
  1324. for i < len(ranges) {
  1325. r := ranges[i]
  1326. cur := &keyRange{left: r.left, size: r.size, right: r.right}
  1327. i++
  1328. for ; i < len(ranges); i++ {
  1329. next := ranges[i]
  1330. if cur.size+next.size > avg {
  1331. break
  1332. }
  1333. cur.right = next.right
  1334. cur.size += next.size
  1335. }
  1336. out = append(out, cur)
  1337. }
  1338. return out
  1339. }
  1340. // MaxBatchCount returns max possible entries in batch
  1341. func (db *DB) MaxBatchCount() int64 {
  1342. return db.opt.maxBatchCount
  1343. }
  1344. // MaxBatchSize returns max possible batch size
  1345. func (db *DB) MaxBatchSize() int64 {
  1346. return db.opt.maxBatchSize
  1347. }
  1348. func (db *DB) stopMemoryFlush() {
  1349. // Stop memtable flushes.
  1350. if db.closers.memtable != nil {
  1351. close(db.flushChan)
  1352. db.closers.memtable.SignalAndWait()
  1353. }
  1354. }
  1355. func (db *DB) stopCompactions() {
  1356. // Stop compactions.
  1357. if db.closers.compactors != nil {
  1358. db.closers.compactors.SignalAndWait()
  1359. }
  1360. }
  1361. func (db *DB) startCompactions() {
  1362. // Resume compactions.
  1363. if db.closers.compactors != nil {
  1364. db.closers.compactors = z.NewCloser(1)
  1365. db.lc.startCompact(db.closers.compactors)
  1366. }
  1367. }
  1368. func (db *DB) startMemoryFlush() {
  1369. // Start memory fluhser.
  1370. if db.closers.memtable != nil {
  1371. db.flushChan = make(chan *memTable, db.opt.NumMemtables)
  1372. db.closers.memtable = z.NewCloser(1)
  1373. go func() {
  1374. db.flushMemtable(db.closers.memtable)
  1375. }()
  1376. }
  1377. }
  1378. // Flatten can be used to force compactions on the LSM tree so all the tables fall on the same
  1379. // level. This ensures that all the versions of keys are colocated and not split across multiple
  1380. // levels, which is necessary after a restore from backup. During Flatten, live compactions are
  1381. // stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition
  1382. // between flattening the tree and new tables being created at level zero.
  1383. func (db *DB) Flatten(workers int) error {
  1384. db.stopCompactions()
  1385. defer db.startCompactions()
  1386. compactAway := func(cp compactionPriority) error {
  1387. db.opt.Infof("Attempting to compact with %+v\n", cp)
  1388. errCh := make(chan error, 1)
  1389. for i := 0; i < workers; i++ {
  1390. go func() {
  1391. errCh <- db.lc.doCompact(175, cp)
  1392. }()
  1393. }
  1394. var success int
  1395. var rerr error
  1396. for i := 0; i < workers; i++ {
  1397. err := <-errCh
  1398. if err != nil {
  1399. rerr = err
  1400. db.opt.Warningf("While running doCompact with %+v. Error: %v\n", cp, err)
  1401. } else {
  1402. success++
  1403. }
  1404. }
  1405. if success == 0 {
  1406. return rerr
  1407. }
  1408. // We could do at least one successful compaction. So, we'll consider this a success.
  1409. db.opt.Infof("%d compactor(s) succeeded. One or more tables from level %d compacted.\n",
  1410. success, cp.level)
  1411. return nil
  1412. }
  1413. hbytes := func(sz int64) string {
  1414. return humanize.IBytes(uint64(sz))
  1415. }
  1416. t := db.lc.levelTargets()
  1417. for {
  1418. db.opt.Infof("\n")
  1419. var levels []int
  1420. for i, l := range db.lc.levels {
  1421. sz := l.getTotalSize()
  1422. db.opt.Infof("Level: %d. %8s Size. %8s Max.\n",
  1423. i, hbytes(l.getTotalSize()), hbytes(t.targetSz[i]))
  1424. if sz > 0 {
  1425. levels = append(levels, i)
  1426. }
  1427. }
  1428. if len(levels) <= 1 {
  1429. prios := db.lc.pickCompactLevels(nil)
  1430. if len(prios) == 0 || prios[0].score <= 1.0 {
  1431. db.opt.Infof("All tables consolidated into one level. Flattening done.\n")
  1432. return nil
  1433. }
  1434. if err := compactAway(prios[0]); err != nil {
  1435. return err
  1436. }
  1437. continue
  1438. }
  1439. // Create an artificial compaction priority, to ensure that we compact the level.
  1440. cp := compactionPriority{level: levels[0], score: 1.71}
  1441. if err := compactAway(cp); err != nil {
  1442. return err
  1443. }
  1444. }
  1445. }
  1446. func (db *DB) blockWrite() error {
  1447. // Stop accepting new writes.
  1448. if !db.blockWrites.CompareAndSwap(0, 1) {
  1449. return ErrBlockedWrites
  1450. }
  1451. // Make all pending writes finish. The following will also close writeCh.
  1452. db.closers.writes.SignalAndWait()
  1453. db.opt.Infof("Writes flushed. Stopping compactions now...")
  1454. return nil
  1455. }
  1456. func (db *DB) unblockWrite() {
  1457. db.closers.writes = z.NewCloser(1)
  1458. go db.doWrites(db.closers.writes)
  1459. // Resume writes.
  1460. db.blockWrites.Store(0)
  1461. }
  1462. func (db *DB) prepareToDrop() (func(), error) {
  1463. if db.opt.ReadOnly {
  1464. panic("Attempting to drop data in read-only mode.")
  1465. }
  1466. // In order prepare for drop, we need to block the incoming writes and
  1467. // write it to db. Then, flush all the pending memtable. So that, we
  1468. // don't miss any entries.
  1469. if err := db.blockWrite(); err != nil {
  1470. return func() {}, err
  1471. }
  1472. reqs := make([]*request, 0, 10)
  1473. for {
  1474. select {
  1475. case r := <-db.writeCh:
  1476. reqs = append(reqs, r)
  1477. default:
  1478. if err := db.writeRequests(reqs); err != nil {
  1479. db.opt.Errorf("writeRequests: %v", err)
  1480. }
  1481. db.stopMemoryFlush()
  1482. return func() {
  1483. db.opt.Infof("Resuming writes")
  1484. db.startMemoryFlush()
  1485. db.unblockWrite()
  1486. }, nil
  1487. }
  1488. }
  1489. }
  1490. // DropAll would drop all the data stored in Badger. It does this in the following way.
  1491. // - Stop accepting new writes.
  1492. // - Pause memtable flushes and compactions.
  1493. // - Pick all tables from all levels, create a changeset to delete all these
  1494. // tables and apply it to manifest.
  1495. // - Pick all log files from value log, and delete all of them. Restart value log files from zero.
  1496. // - Resume memtable flushes and compactions.
  1497. //
  1498. // NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do
  1499. // any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and
  1500. // writes are paused before running DropAll, and resumed after it is finished.
  1501. func (db *DB) DropAll() error {
  1502. f, err := db.dropAll()
  1503. if f != nil {
  1504. f()
  1505. }
  1506. return err
  1507. }
  1508. func (db *DB) dropAll() (func(), error) {
  1509. db.opt.Infof("DropAll called. Blocking writes...")
  1510. f, err := db.prepareToDrop()
  1511. if err != nil {
  1512. return f, err
  1513. }
  1514. // prepareToDrop will stop all the incoming write and flushes any pending memtables.
  1515. // Before we drop, we'll stop the compaction because anyways all the datas are going to
  1516. // be deleted.
  1517. db.stopCompactions()
  1518. resume := func() {
  1519. db.startCompactions()
  1520. f()
  1521. }
  1522. // Block all foreign interactions with memory tables.
  1523. db.lock.Lock()
  1524. defer db.lock.Unlock()
  1525. // Remove inmemory tables. Calling DecrRef for safety. Not sure if they're absolutely needed.
  1526. db.mt.DecrRef()
  1527. for _, mt := range db.imm {
  1528. mt.DecrRef()
  1529. }
  1530. db.imm = db.imm[:0]
  1531. db.mt, err = db.newMemTable() // Set it up for future writes.
  1532. if err != nil {
  1533. return resume, y.Wrapf(err, "cannot open new memtable")
  1534. }
  1535. num, err := db.lc.dropTree()
  1536. if err != nil {
  1537. return resume, err
  1538. }
  1539. db.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", num)
  1540. num, err = db.vlog.dropAll()
  1541. if err != nil {
  1542. return resume, err
  1543. }
  1544. db.lc.nextFileID.Store(1)
  1545. db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
  1546. db.blockCache.Clear()
  1547. db.indexCache.Clear()
  1548. db.threshold.Clear(db.opt)
  1549. return resume, nil
  1550. }
  1551. // DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
  1552. // - Stop accepting new writes.
  1553. // - Stop memtable flushes before acquiring lock. Because we're acquiring lock here
  1554. // and memtable flush stalls for lock, which leads to deadlock
  1555. // - Flush out all memtables, skipping over keys with the given prefix, Kp.
  1556. // - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
  1557. // back after a restart.
  1558. // - Stop compaction.
  1559. // - Compact L0->L1, skipping over Kp.
  1560. // - Compact rest of the levels, Li->Li, picking tables which have Kp.
  1561. // - Resume memtable flushes, compactions and writes.
  1562. func (db *DB) DropPrefix(prefixes ...[]byte) error {
  1563. if len(prefixes) == 0 {
  1564. return nil
  1565. }
  1566. db.opt.Infof("DropPrefix called for %s", prefixes)
  1567. f, err := db.prepareToDrop()
  1568. if err != nil {
  1569. return err
  1570. }
  1571. defer f()
  1572. var filtered [][]byte
  1573. if filtered, err = db.filterPrefixesToDrop(prefixes); err != nil {
  1574. return err
  1575. }
  1576. // If there is no prefix for which the data already exist, do not do anything.
  1577. if len(filtered) == 0 {
  1578. db.opt.Infof("No prefixes to drop")
  1579. return nil
  1580. }
  1581. // Block all foreign interactions with memory tables.
  1582. db.lock.Lock()
  1583. defer db.lock.Unlock()
  1584. db.imm = append(db.imm, db.mt)
  1585. for _, memtable := range db.imm {
  1586. if memtable.sl.Empty() {
  1587. memtable.DecrRef()
  1588. continue
  1589. }
  1590. db.opt.Debugf("Flushing memtable")
  1591. if err := db.handleMemTableFlush(memtable, filtered); err != nil {
  1592. db.opt.Errorf("While trying to flush memtable: %v", err)
  1593. return err
  1594. }
  1595. memtable.DecrRef()
  1596. }
  1597. db.stopCompactions()
  1598. defer db.startCompactions()
  1599. db.imm = db.imm[:0]
  1600. db.mt, err = db.newMemTable()
  1601. if err != nil {
  1602. return y.Wrapf(err, "cannot create new mem table")
  1603. }
  1604. // Drop prefixes from the levels.
  1605. if err := db.lc.dropPrefixes(filtered); err != nil {
  1606. return err
  1607. }
  1608. db.opt.Infof("DropPrefix done")
  1609. return nil
  1610. }
  1611. func (db *DB) filterPrefixesToDrop(prefixes [][]byte) ([][]byte, error) {
  1612. var filtered [][]byte
  1613. for _, prefix := range prefixes {
  1614. err := db.View(func(txn *Txn) error {
  1615. iopts := DefaultIteratorOptions
  1616. iopts.Prefix = prefix
  1617. iopts.PrefetchValues = false
  1618. itr := txn.NewIterator(iopts)
  1619. defer itr.Close()
  1620. itr.Rewind()
  1621. if itr.ValidForPrefix(prefix) {
  1622. filtered = append(filtered, prefix)
  1623. }
  1624. return nil
  1625. })
  1626. if err != nil {
  1627. return filtered, err
  1628. }
  1629. }
  1630. return filtered, nil
  1631. }
  1632. // Checks if the key is banned. Returns the respective error if the key belongs to any of the banned
  1633. // namepspaces. Else it returns nil.
  1634. func (db *DB) isBanned(key []byte) error {
  1635. if db.opt.NamespaceOffset < 0 {
  1636. return nil
  1637. }
  1638. if len(key) <= db.opt.NamespaceOffset+8 {
  1639. return nil
  1640. }
  1641. if db.bannedNamespaces.has(y.BytesToU64(key[db.opt.NamespaceOffset:])) {
  1642. return ErrBannedKey
  1643. }
  1644. return nil
  1645. }
  1646. // BanNamespace bans a namespace. Read/write to keys belonging to any of such namespace is denied.
  1647. func (db *DB) BanNamespace(ns uint64) error {
  1648. if db.opt.NamespaceOffset < 0 {
  1649. return ErrNamespaceMode
  1650. }
  1651. db.opt.Infof("Banning namespace: %d", ns)
  1652. // First set the banned namespaces in DB and then update the in-memory structure.
  1653. key := y.KeyWithTs(append(bannedNsKey, y.U64ToBytes(ns)...), 1)
  1654. entry := []*Entry{{
  1655. Key: key,
  1656. Value: nil,
  1657. }}
  1658. req, err := db.sendToWriteCh(entry)
  1659. if err != nil {
  1660. return err
  1661. }
  1662. if err := req.Wait(); err != nil {
  1663. return err
  1664. }
  1665. db.bannedNamespaces.add(ns)
  1666. return nil
  1667. }
  1668. // BannedNamespaces returns the list of prefixes banned for DB.
  1669. func (db *DB) BannedNamespaces() []uint64 {
  1670. return db.bannedNamespaces.all()
  1671. }
  1672. // KVList contains a list of key-value pairs.
  1673. type KVList = pb.KVList
  1674. // Subscribe can be used to watch key changes for the given key prefixes and the ignore string.
  1675. // At least one prefix should be passed, or an error will be returned.
  1676. // You can use an empty prefix to monitor all changes to the DB.
  1677. // Ignore string is the byte ranges for which prefix matching will be ignored.
  1678. // For example: ignore = "2-3", and prefix = "abc" will match for keys "abxxc", "abdfc" etc.
  1679. // This function blocks until the given context is done or an error occurs.
  1680. // The given function will be called with a new KVList containing the modified keys and the
  1681. // corresponding values.
  1682. func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches []pb.Match) error {
  1683. if cb == nil {
  1684. return ErrNilCallback
  1685. }
  1686. c := z.NewCloser(1)
  1687. s, err := db.pub.newSubscriber(c, matches)
  1688. if err != nil {
  1689. return y.Wrapf(err, "while creating a new subscriber")
  1690. }
  1691. slurp := func(batch *pb.KVList) error {
  1692. for {
  1693. select {
  1694. case kvs := <-s.sendCh:
  1695. batch.Kv = append(batch.Kv, kvs.Kv...)
  1696. default:
  1697. if len(batch.GetKv()) > 0 {
  1698. return cb(batch)
  1699. }
  1700. return nil
  1701. }
  1702. }
  1703. }
  1704. drain := func() {
  1705. for {
  1706. select {
  1707. case _, ok := <-s.sendCh:
  1708. if !ok {
  1709. // Channel is closed.
  1710. return
  1711. }
  1712. default:
  1713. return
  1714. }
  1715. }
  1716. }
  1717. for {
  1718. select {
  1719. case <-c.HasBeenClosed():
  1720. // No need to delete here. Closer will be called only while
  1721. // closing DB. Subscriber will be deleted by cleanSubscribers.
  1722. err := slurp(new(pb.KVList))
  1723. // Drain if any pending updates.
  1724. c.Done()
  1725. return err
  1726. case <-ctx.Done():
  1727. c.Done()
  1728. s.active.Store(0)
  1729. drain()
  1730. db.pub.deleteSubscriber(s.id)
  1731. // Delete the subscriber to avoid further updates.
  1732. return ctx.Err()
  1733. case batch := <-s.sendCh:
  1734. err := slurp(batch)
  1735. if err != nil {
  1736. c.Done()
  1737. s.active.Store(0)
  1738. drain()
  1739. // Delete the subscriber if there is an error by the callback.
  1740. db.pub.deleteSubscriber(s.id)
  1741. return err
  1742. }
  1743. }
  1744. }
  1745. }
  1746. func (db *DB) syncDir(dir string) error {
  1747. if db.opt.InMemory {
  1748. return nil
  1749. }
  1750. return syncDir(dir)
  1751. }
  1752. func createDirs(opt Options) error {
  1753. for _, path := range []string{opt.Dir, opt.ValueDir} {
  1754. dirExists, err := exists(path)
  1755. if err != nil {
  1756. return y.Wrapf(err, "Invalid Dir: %q", path)
  1757. }
  1758. if !dirExists {
  1759. if opt.ReadOnly {
  1760. return errors.Errorf("Cannot find directory %q for read-only open", path)
  1761. }
  1762. // Try to create the directory
  1763. err = os.MkdirAll(path, 0700)
  1764. if err != nil {
  1765. return y.Wrapf(err, "Error Creating Dir: %q", path)
  1766. }
  1767. }
  1768. }
  1769. return nil
  1770. }
  1771. // Stream the contents of this DB to a new DB with options outOptions that will be
  1772. // created in outDir.
  1773. func (db *DB) StreamDB(outOptions Options) error {
  1774. outDir := outOptions.Dir
  1775. // Open output DB.
  1776. outDB, err := OpenManaged(outOptions)
  1777. if err != nil {
  1778. return y.Wrapf(err, "cannot open out DB at %s", outDir)
  1779. }
  1780. defer outDB.Close()
  1781. writer := outDB.NewStreamWriter()
  1782. if err := writer.Prepare(); err != nil {
  1783. return y.Wrapf(err, "cannot create stream writer in out DB at %s", outDir)
  1784. }
  1785. // Stream contents of DB to the output DB.
  1786. stream := db.NewStreamAt(math.MaxUint64)
  1787. stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
  1788. stream.Send = func(buf *z.Buffer) error {
  1789. return writer.Write(buf)
  1790. }
  1791. if err := stream.Orchestrate(context.Background()); err != nil {
  1792. return y.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
  1793. }
  1794. if err := writer.Flush(); err != nil {
  1795. return y.Wrapf(err, "cannot flush writer")
  1796. }
  1797. return nil
  1798. }
  1799. // Opts returns a copy of the DB options.
  1800. func (db *DB) Opts() Options {
  1801. return db.opt
  1802. }
  1803. type CacheType int
  1804. const (
  1805. BlockCache CacheType = iota
  1806. IndexCache
  1807. )
  1808. // CacheMaxCost updates the max cost of the given cache (either block or index cache).
  1809. // The call will have an effect only if the DB was created with the cache. Otherwise it is
  1810. // a no-op. If you pass a negative value, the function will return the current value
  1811. // without updating it.
  1812. func (db *DB) CacheMaxCost(cache CacheType, maxCost int64) (int64, error) {
  1813. if db == nil {
  1814. return 0, nil
  1815. }
  1816. if maxCost < 0 {
  1817. switch cache {
  1818. case BlockCache:
  1819. return db.blockCache.MaxCost(), nil
  1820. case IndexCache:
  1821. return db.indexCache.MaxCost(), nil
  1822. default:
  1823. return 0, errors.Errorf("invalid cache type")
  1824. }
  1825. }
  1826. switch cache {
  1827. case BlockCache:
  1828. db.blockCache.UpdateMaxCost(maxCost)
  1829. return maxCost, nil
  1830. case IndexCache:
  1831. db.indexCache.UpdateMaxCost(maxCost)
  1832. return maxCost, nil
  1833. default:
  1834. return 0, errors.Errorf("invalid cache type")
  1835. }
  1836. }
  1837. func (db *DB) LevelsToString() string {
  1838. levels := db.Levels()
  1839. h := func(sz int64) string {
  1840. return humanize.IBytes(uint64(sz))
  1841. }
  1842. base := func(b bool) string {
  1843. if b {
  1844. return "B"
  1845. }
  1846. return " "
  1847. }
  1848. var b strings.Builder
  1849. b.WriteRune('\n')
  1850. for _, li := range levels {
  1851. b.WriteString(fmt.Sprintf(
  1852. "Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f"+
  1853. " StaleData: %s Target FileSize: %s\n",
  1854. li.Level, base(li.IsBaseLevel), li.NumTables,
  1855. h(li.Size), h(li.TargetSize), li.Score, li.Adjusted, h(li.StaleDatSize),
  1856. h(li.TargetFileSize)))
  1857. }
  1858. b.WriteString("Level Done\n")
  1859. return b.String()
  1860. }