db.go 56 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. "encoding/binary"
  10. "errors"
  11. "expvar"
  12. "fmt"
  13. "math"
  14. "os"
  15. "path/filepath"
  16. "sort"
  17. "strings"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. humanize "github.com/dustin/go-humanize"
  22. "github.com/dgraph-io/badger/v4/fb"
  23. "github.com/dgraph-io/badger/v4/options"
  24. "github.com/dgraph-io/badger/v4/pb"
  25. "github.com/dgraph-io/badger/v4/skl"
  26. "github.com/dgraph-io/badger/v4/table"
  27. "github.com/dgraph-io/badger/v4/y"
  28. "github.com/dgraph-io/ristretto/v2"
  29. "github.com/dgraph-io/ristretto/v2/z"
  30. )
  31. var (
  32. badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
  33. txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
  34. bannedNsKey = []byte("!badger!banned") // For storing the banned namespaces.
  35. )
  36. type closers struct {
  37. updateSize *z.Closer
  38. compactors *z.Closer
  39. memtable *z.Closer
  40. writes *z.Closer
  41. valueGC *z.Closer
  42. pub *z.Closer
  43. cacheHealth *z.Closer
  44. }
  45. type lockedKeys struct {
  46. sync.RWMutex
  47. keys map[uint64]struct{}
  48. }
  49. func (lk *lockedKeys) add(key uint64) {
  50. lk.Lock()
  51. defer lk.Unlock()
  52. lk.keys[key] = struct{}{}
  53. }
  54. func (lk *lockedKeys) has(key uint64) bool {
  55. lk.RLock()
  56. defer lk.RUnlock()
  57. _, ok := lk.keys[key]
  58. return ok
  59. }
  60. func (lk *lockedKeys) all() []uint64 {
  61. lk.RLock()
  62. defer lk.RUnlock()
  63. keys := make([]uint64, 0, len(lk.keys))
  64. for key := range lk.keys {
  65. keys = append(keys, key)
  66. }
  67. return keys
  68. }
  69. // DB provides the various functions required to interact with Badger.
  70. // DB is thread-safe.
  71. type DB struct {
  72. testOnlyDBExtensions
  73. lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
  74. dirLockGuard *directoryLockGuard
  75. // nil if Dir and ValueDir are the same
  76. valueDirGuard *directoryLockGuard
  77. closers closers
  78. mt *memTable // Our latest (actively written) in-memory table
  79. imm []*memTable // Add here only AFTER pushing to flushChan.
  80. // Initialized via openMemTables.
  81. nextMemFid int
  82. opt Options
  83. manifest *manifestFile
  84. lc *levelsController
  85. vlog valueLog
  86. writeCh chan *request
  87. flushChan chan *memTable // For flushing memtables.
  88. closeOnce sync.Once // For closing DB only once.
  89. blockWrites atomic.Int32
  90. isClosed atomic.Uint32
  91. orc *oracle
  92. bannedNamespaces *lockedKeys
  93. threshold *vlogThreshold
  94. pub *publisher
  95. registry *KeyRegistry
  96. blockCache *ristretto.Cache[[]byte, *table.Block]
  97. indexCache *ristretto.Cache[uint64, *fb.TableIndex]
  98. allocPool *z.AllocatorPool
  99. }
  100. const (
  101. kvWriteChCapacity = 1000
  102. )
  103. func checkAndSetOptions(opt *Options) error {
  104. // It's okay to have zero compactors which will disable all compactions but
  105. // we cannot have just one compactor otherwise we will end up with all data
  106. // on level 2.
  107. if opt.NumCompactors == 1 {
  108. return errors.New("Cannot have 1 compactor. Need at least 2")
  109. }
  110. if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
  111. return errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
  112. }
  113. opt.maxBatchSize = (15 * opt.MemTableSize) / 100
  114. opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
  115. // This is the maximum value, vlogThreshold can have if dynamic thresholding is enabled.
  116. opt.maxValueThreshold = math.Min(maxValueThreshold, float64(opt.maxBatchSize))
  117. if opt.VLogPercentile < 0.0 || opt.VLogPercentile > 1.0 {
  118. return errors.New("vlogPercentile must be within range of 0.0-1.0")
  119. }
  120. // We are limiting opt.ValueThreshold to maxValueThreshold for now.
  121. if opt.ValueThreshold > maxValueThreshold {
  122. return fmt.Errorf("Invalid ValueThreshold, must be less or equal to %d",
  123. maxValueThreshold)
  124. }
  125. // If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using
  126. // the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize.
  127. if opt.ValueThreshold > opt.maxBatchSize {
  128. return fmt.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+
  129. "reduce opt.ValueThreshold or increase opt.BaseTableSize.",
  130. opt.ValueThreshold, opt.maxBatchSize)
  131. }
  132. // ValueLogFileSize should be strictly LESS than 2<<30 otherwise we will
  133. // overflow the uint32 when we mmap it in OpenMemtable.
  134. if !(opt.ValueLogFileSize < 2<<30 && opt.ValueLogFileSize >= 1<<20) {
  135. return ErrValueLogSize
  136. }
  137. if opt.ReadOnly {
  138. // Do not perform compaction in read only mode.
  139. opt.CompactL0OnClose = false
  140. }
  141. needCache := (opt.Compression != options.None) || (len(opt.EncryptionKey) > 0)
  142. if needCache && opt.BlockCacheSize == 0 {
  143. panic("BlockCacheSize should be set since compression/encryption are enabled")
  144. }
  145. return nil
  146. }
  147. // Open returns a new DB object.
  148. func Open(opt Options) (*DB, error) {
  149. if err := checkAndSetOptions(&opt); err != nil {
  150. return nil, err
  151. }
  152. var dirLockGuard, valueDirLockGuard *directoryLockGuard
  153. // Create directories and acquire lock on it only if badger is not running in InMemory mode.
  154. // We don't have any directories/files in InMemory mode so we don't need to acquire
  155. // any locks on them.
  156. if !opt.InMemory {
  157. if err := createDirs(opt); err != nil {
  158. return nil, err
  159. }
  160. var err error
  161. if !opt.BypassLockGuard {
  162. dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
  163. if err != nil {
  164. return nil, err
  165. }
  166. defer func() {
  167. if dirLockGuard != nil {
  168. _ = dirLockGuard.release()
  169. }
  170. }()
  171. absDir, err := filepath.Abs(opt.Dir)
  172. if err != nil {
  173. return nil, err
  174. }
  175. absValueDir, err := filepath.Abs(opt.ValueDir)
  176. if err != nil {
  177. return nil, err
  178. }
  179. if absValueDir != absDir {
  180. valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
  181. if err != nil {
  182. return nil, err
  183. }
  184. defer func() {
  185. if valueDirLockGuard != nil {
  186. _ = valueDirLockGuard.release()
  187. }
  188. }()
  189. }
  190. }
  191. }
  192. manifestFile, manifest, err := openOrCreateManifestFile(opt)
  193. if err != nil {
  194. return nil, err
  195. }
  196. defer func() {
  197. if manifestFile != nil {
  198. _ = manifestFile.close()
  199. }
  200. }()
  201. db := &DB{
  202. imm: make([]*memTable, 0, opt.NumMemtables),
  203. flushChan: make(chan *memTable, opt.NumMemtables),
  204. writeCh: make(chan *request, kvWriteChCapacity),
  205. opt: opt,
  206. manifest: manifestFile,
  207. dirLockGuard: dirLockGuard,
  208. valueDirGuard: valueDirLockGuard,
  209. orc: newOracle(opt),
  210. pub: newPublisher(),
  211. allocPool: z.NewAllocatorPool(8),
  212. bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
  213. threshold: initVlogThreshold(&opt),
  214. }
  215. db.syncChan = opt.syncChan
  216. // Cleanup all the goroutines started by badger in case of an error.
  217. defer func() {
  218. if err != nil {
  219. opt.Errorf("Received err: %v. Cleaning up...", err)
  220. db.cleanup()
  221. db = nil
  222. }
  223. }()
  224. if opt.BlockCacheSize > 0 {
  225. numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
  226. if numInCache == 0 {
  227. // Make the value of this variable at least one since the cache requires
  228. // the number of counters to be greater than zero.
  229. numInCache = 1
  230. }
  231. config := ristretto.Config[[]byte, *table.Block]{
  232. NumCounters: numInCache * 8,
  233. MaxCost: opt.BlockCacheSize,
  234. BufferItems: 64,
  235. Metrics: true,
  236. OnExit: table.BlockEvictHandler,
  237. }
  238. db.blockCache, err = ristretto.NewCache[[]byte, *table.Block](&config)
  239. if err != nil {
  240. return nil, y.Wrap(err, "failed to create data cache")
  241. }
  242. }
  243. if opt.IndexCacheSize > 0 {
  244. // Index size is around 5% of the table size.
  245. indexSz := int64(float64(opt.MemTableSize) * 0.05)
  246. numInCache := opt.IndexCacheSize / indexSz
  247. if numInCache == 0 {
  248. // Make the value of this variable at least one since the cache requires
  249. // the number of counters to be greater than zero.
  250. numInCache = 1
  251. }
  252. config := ristretto.Config[uint64, *fb.TableIndex]{
  253. NumCounters: numInCache * 8,
  254. MaxCost: opt.IndexCacheSize,
  255. BufferItems: 64,
  256. Metrics: true,
  257. }
  258. db.indexCache, err = ristretto.NewCache(&config)
  259. if err != nil {
  260. return nil, y.Wrap(err, "failed to create bf cache")
  261. }
  262. }
  263. db.closers.cacheHealth = z.NewCloser(1)
  264. go db.monitorCache(db.closers.cacheHealth)
  265. if db.opt.InMemory {
  266. db.opt.SyncWrites = false
  267. // If badger is running in memory mode, push everything into the LSM Tree.
  268. db.opt.ValueThreshold = math.MaxInt32
  269. }
  270. krOpt := KeyRegistryOptions{
  271. ReadOnly: opt.ReadOnly,
  272. Dir: opt.Dir,
  273. EncryptionKey: opt.EncryptionKey,
  274. EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
  275. InMemory: opt.InMemory,
  276. }
  277. if db.registry, err = OpenKeyRegistry(krOpt); err != nil {
  278. return db, err
  279. }
  280. db.calculateSize()
  281. db.closers.updateSize = z.NewCloser(1)
  282. go db.updateSize(db.closers.updateSize)
  283. if err := db.openMemTables(db.opt); err != nil {
  284. return nil, y.Wrapf(err, "while opening memtables")
  285. }
  286. if !db.opt.ReadOnly {
  287. if db.mt, err = db.newMemTable(); err != nil {
  288. return nil, y.Wrapf(err, "cannot create memtable")
  289. }
  290. }
  291. // newLevelsController potentially loads files in directory.
  292. if db.lc, err = newLevelsController(db, &manifest); err != nil {
  293. return db, err
  294. }
  295. // Initialize vlog struct.
  296. db.vlog.init(db)
  297. if !opt.ReadOnly {
  298. db.closers.compactors = z.NewCloser(1)
  299. db.lc.startCompact(db.closers.compactors)
  300. db.closers.memtable = z.NewCloser(1)
  301. go func() {
  302. db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
  303. }()
  304. // Flush them to disk asap.
  305. for _, mt := range db.imm {
  306. db.flushChan <- mt
  307. }
  308. }
  309. // We do increment nextTxnTs below. So, no need to do it here.
  310. db.orc.nextTxnTs = db.MaxVersion()
  311. db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
  312. if err = db.vlog.open(db); err != nil {
  313. return db, y.Wrapf(err, "During db.vlog.open")
  314. }
  315. // Let's advance nextTxnTs to one more than whatever we observed via
  316. // replaying the logs.
  317. db.orc.txnMark.Done(db.orc.nextTxnTs)
  318. // In normal mode, we must update readMark so older versions of keys can be removed during
  319. // compaction when run in offline mode via the flatten tool.
  320. db.orc.readMark.Done(db.orc.nextTxnTs)
  321. db.orc.incrementNextTs()
  322. go db.threshold.listenForValueThresholdUpdate()
  323. if err := db.initBannedNamespaces(); err != nil {
  324. return db, fmt.Errorf("While setting banned keys: %w", err)
  325. }
  326. db.closers.writes = z.NewCloser(1)
  327. go db.doWrites(db.closers.writes)
  328. if !db.opt.InMemory {
  329. db.closers.valueGC = z.NewCloser(1)
  330. go db.vlog.waitOnGC(db.closers.valueGC)
  331. }
  332. db.closers.pub = z.NewCloser(1)
  333. go db.pub.listenForUpdates(db.closers.pub)
  334. valueDirLockGuard = nil
  335. dirLockGuard = nil
  336. manifestFile = nil
  337. return db, nil
  338. }
  339. // initBannedNamespaces retrieves the banned namespaces from the DB and updates in-memory structure.
  340. func (db *DB) initBannedNamespaces() error {
  341. if db.opt.NamespaceOffset < 0 {
  342. return nil
  343. }
  344. return db.View(func(txn *Txn) error {
  345. iopts := DefaultIteratorOptions
  346. iopts.Prefix = bannedNsKey
  347. iopts.PrefetchValues = false
  348. iopts.InternalAccess = true
  349. itr := txn.NewIterator(iopts)
  350. defer itr.Close()
  351. for itr.Rewind(); itr.Valid(); itr.Next() {
  352. key := y.BytesToU64(itr.Item().Key()[len(bannedNsKey):])
  353. db.bannedNamespaces.add(key)
  354. }
  355. return nil
  356. })
  357. }
  358. func (db *DB) MaxVersion() uint64 {
  359. var maxVersion uint64
  360. update := func(a uint64) {
  361. if a > maxVersion {
  362. maxVersion = a
  363. }
  364. }
  365. db.lock.Lock()
  366. // In read only mode, we do not create new mem table.
  367. if !db.opt.ReadOnly {
  368. update(db.mt.maxVersion)
  369. }
  370. for _, mt := range db.imm {
  371. update(mt.maxVersion)
  372. }
  373. db.lock.Unlock()
  374. for _, ti := range db.Tables() {
  375. update(ti.MaxVersion)
  376. }
  377. return maxVersion
  378. }
  379. func (db *DB) monitorCache(c *z.Closer) {
  380. defer c.Done()
  381. count := 0
  382. analyze := func(name string, metrics *ristretto.Metrics) {
  383. // If the mean life expectancy is less than 10 seconds, the cache
  384. // might be too small.
  385. le := metrics.LifeExpectancySeconds()
  386. if le == nil {
  387. return
  388. }
  389. lifeTooShort := le.Count > 0 && float64(le.Sum)/float64(le.Count) < 10
  390. hitRatioTooLow := metrics.Ratio() > 0 && metrics.Ratio() < 0.4
  391. if lifeTooShort && hitRatioTooLow {
  392. db.opt.Warningf("%s might be too small. Metrics: %s\n", name, metrics)
  393. db.opt.Warningf("Cache life expectancy (in seconds): %+v\n", le)
  394. } else if le.Count > 1000 && count%5 == 0 {
  395. db.opt.Infof("%s metrics: %s\n", name, metrics)
  396. }
  397. }
  398. ticker := time.NewTicker(1 * time.Minute)
  399. defer ticker.Stop()
  400. for {
  401. select {
  402. case <-c.HasBeenClosed():
  403. return
  404. case <-ticker.C:
  405. }
  406. analyze("Block cache", db.BlockCacheMetrics())
  407. analyze("Index cache", db.IndexCacheMetrics())
  408. count++
  409. }
  410. }
  411. // cleanup stops all the goroutines started by badger. This is used in open to
  412. // cleanup goroutines in case of an error.
  413. func (db *DB) cleanup() {
  414. db.stopMemoryFlush()
  415. db.stopCompactions()
  416. db.blockCache.Close()
  417. db.indexCache.Close()
  418. if db.closers.updateSize != nil {
  419. db.closers.updateSize.Signal()
  420. }
  421. if db.closers.valueGC != nil {
  422. db.closers.valueGC.Signal()
  423. }
  424. if db.closers.writes != nil {
  425. db.closers.writes.Signal()
  426. }
  427. if db.closers.pub != nil {
  428. db.closers.pub.Signal()
  429. }
  430. db.orc.Stop()
  431. // Do not use vlog.Close() here. vlog.Close truncates the files. We don't
  432. // want to truncate files unless the user has specified the truncate flag.
  433. }
  434. // BlockCacheMetrics returns the metrics for the underlying block cache.
  435. func (db *DB) BlockCacheMetrics() *ristretto.Metrics {
  436. if db.blockCache != nil {
  437. return db.blockCache.Metrics
  438. }
  439. return nil
  440. }
  441. // IndexCacheMetrics returns the metrics for the underlying index cache.
  442. func (db *DB) IndexCacheMetrics() *ristretto.Metrics {
  443. if db.indexCache != nil {
  444. return db.indexCache.Metrics
  445. }
  446. return nil
  447. }
  448. // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
  449. // disk. Calling DB.Close() multiple times would still only close the DB once.
  450. func (db *DB) Close() error {
  451. var err error
  452. db.closeOnce.Do(func() {
  453. err = db.close()
  454. })
  455. return err
  456. }
  457. // IsClosed denotes if the badger DB is closed or not. A DB instance should not
  458. // be used after closing it.
  459. func (db *DB) IsClosed() bool {
  460. return db.isClosed.Load() == 1
  461. }
  462. func (db *DB) close() (err error) {
  463. defer db.allocPool.Release()
  464. db.opt.Debugf("Closing database")
  465. db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs.Load()))
  466. db.blockWrites.Store(1)
  467. db.isClosed.Store(1)
  468. if !db.opt.InMemory {
  469. // Stop value GC first.
  470. db.closers.valueGC.SignalAndWait()
  471. }
  472. // Stop writes next.
  473. db.closers.writes.SignalAndWait()
  474. // Don't accept any more write.
  475. close(db.writeCh)
  476. db.closers.pub.SignalAndWait()
  477. db.closers.cacheHealth.Signal()
  478. // Make sure that block writer is done pushing stuff into memtable!
  479. // Otherwise, you will have a race condition: we are trying to flush memtables
  480. // and remove them completely, while the block / memtable writer is still
  481. // trying to push stuff into the memtable. This will also resolve the value
  482. // offset problem: as we push into memtable, we update value offsets there.
  483. if db.mt != nil {
  484. if db.mt.sl.Empty() {
  485. // Remove the memtable if empty.
  486. db.mt.DecrRef()
  487. } else {
  488. db.opt.Debugf("Flushing memtable")
  489. for {
  490. pushedMemTable := func() bool {
  491. db.lock.Lock()
  492. defer db.lock.Unlock()
  493. y.AssertTrue(db.mt != nil)
  494. select {
  495. case db.flushChan <- db.mt:
  496. db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
  497. db.mt = nil // Will segfault if we try writing!
  498. db.opt.Debugf("pushed to flush chan\n")
  499. return true
  500. default:
  501. // If we fail to push, we need to unlock and wait for a short while.
  502. // The flushing operation needs to update s.imm. Otherwise, we have a
  503. // deadlock.
  504. // TODO: Think about how to do this more cleanly, maybe without any locks.
  505. }
  506. return false
  507. }()
  508. if pushedMemTable {
  509. break
  510. }
  511. time.Sleep(10 * time.Millisecond)
  512. }
  513. }
  514. }
  515. db.stopMemoryFlush()
  516. db.stopCompactions()
  517. // Force Compact L0
  518. // We don't need to care about cstatus since no parallel compaction is running.
  519. if db.opt.CompactL0OnClose {
  520. err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
  521. switch err {
  522. case errFillTables:
  523. // This error only means that there might be enough tables to do a compaction. So, we
  524. // should not report it to the end user to avoid confusing them.
  525. case nil:
  526. db.opt.Debugf("Force compaction on level 0 done")
  527. default:
  528. db.opt.Warningf("While forcing compaction on level 0: %v", err)
  529. }
  530. }
  531. // Now close the value log.
  532. if vlogErr := db.vlog.Close(); vlogErr != nil {
  533. err = y.Wrap(vlogErr, "DB.Close")
  534. }
  535. db.opt.Infof(db.LevelsToString())
  536. if lcErr := db.lc.close(); err == nil {
  537. err = y.Wrap(lcErr, "DB.Close")
  538. }
  539. db.opt.Debugf("Waiting for closer")
  540. db.closers.updateSize.SignalAndWait()
  541. db.orc.Stop()
  542. db.blockCache.Close()
  543. db.indexCache.Close()
  544. db.threshold.close()
  545. if db.opt.InMemory {
  546. return
  547. }
  548. if db.dirLockGuard != nil {
  549. if guardErr := db.dirLockGuard.release(); err == nil {
  550. err = y.Wrap(guardErr, "DB.Close")
  551. }
  552. }
  553. if db.valueDirGuard != nil {
  554. if guardErr := db.valueDirGuard.release(); err == nil {
  555. err = y.Wrap(guardErr, "DB.Close")
  556. }
  557. }
  558. if manifestErr := db.manifest.close(); err == nil {
  559. err = y.Wrap(manifestErr, "DB.Close")
  560. }
  561. if registryErr := db.registry.Close(); err == nil {
  562. err = y.Wrap(registryErr, "DB.Close")
  563. }
  564. // Fsync directories to ensure that lock file, and any other removed files whose directory
  565. // we haven't specifically fsynced, are guaranteed to have their directory entry removal
  566. // persisted to disk.
  567. if syncErr := db.syncDir(db.opt.Dir); err == nil {
  568. err = y.Wrap(syncErr, "DB.Close")
  569. }
  570. if syncErr := db.syncDir(db.opt.ValueDir); err == nil {
  571. err = y.Wrap(syncErr, "DB.Close")
  572. }
  573. return err
  574. }
  575. // VerifyChecksum verifies checksum for all tables on all levels.
  576. // This method can be used to verify checksum, if opt.ChecksumVerificationMode is NoVerification.
  577. func (db *DB) VerifyChecksum() error {
  578. return db.lc.verifyChecksum()
  579. }
  580. const (
  581. lockFile = "LOCK"
  582. )
  583. // Sync syncs database content to disk. This function provides
  584. // more control to user to sync data whenever required.
  585. func (db *DB) Sync() error {
  586. /**
  587. Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
  588. Cases:
  589. - All_ok :: If both the logs sync successfully.
  590. - Entry_Lost :: If an entry with a value pointer was present in the active memtable's WAL,
  591. :: and the WAL was synced but there was an error in syncing the vLog.
  592. :: The entry will be considered lost and this case will need to be handled during recovery.
  593. - Entries_Lost :: If there were errors in syncing both the logs, multiple entries would be lost.
  594. - Entries_Lost :: If the active memtable's WAL is not synced but the vLog is synced, it will
  595. :: result in entries being lost because recovery of the active memtable is done from its WAL.
  596. :: Check `UpdateSkipList` in memtable.go.
  597. - Nothing_lost :: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
  598. :: but there was an error in syncing the vLog.
  599. :: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.
  600. - Partially_lost :: If entries were written partially in either of the logs,
  601. :: the logs will be truncated during recovery.
  602. :: As a result of truncation, some entries might be lost.
  603. :: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
  604. :: of data and then the machine shuts down or the disk failure happens,
  605. :: this will result in partial writes. [[This case needs verification]]
  606. */
  607. db.lock.RLock()
  608. memtableSyncError := db.mt.SyncWAL()
  609. db.lock.RUnlock()
  610. vLogSyncError := db.vlog.sync()
  611. return y.CombineErrors(memtableSyncError, vLogSyncError)
  612. }
  613. // getMemtables returns the current memtables and get references.
  614. func (db *DB) getMemTables() ([]*memTable, func()) {
  615. db.lock.RLock()
  616. defer db.lock.RUnlock()
  617. var tables []*memTable
  618. // Mutable memtable does not exist in read-only mode.
  619. if !db.opt.ReadOnly {
  620. // Get mutable memtable.
  621. tables = append(tables, db.mt)
  622. db.mt.IncrRef()
  623. }
  624. // Get immutable memtables.
  625. last := len(db.imm) - 1
  626. for i := range db.imm {
  627. tables = append(tables, db.imm[last-i])
  628. db.imm[last-i].IncrRef()
  629. }
  630. return tables, func() {
  631. for _, tbl := range tables {
  632. tbl.DecrRef()
  633. }
  634. }
  635. }
  636. // get returns the value in memtable or disk for given key.
  637. // Note that value will include meta byte.
  638. //
  639. // IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
  640. // maintain this invariant to search for the latest value of a key, or else we need to search in all
  641. // tables and find the max version among them. To maintain this invariant, we also need to ensure
  642. // that all versions of a key are always present in the same table from level 1, because compaction
  643. // can push any table down.
  644. //
  645. // Update(23/09/2020) - We have dropped the move key implementation. Earlier we
  646. // were inserting move keys to fix the invalid value pointers but we no longer
  647. // do that. For every get("fooX") call where X is the version, we will search
  648. // for "fooX" in all the levels of the LSM tree. This is expensive but it
  649. // removes the overhead of handling move keys completely.
  650. func (db *DB) get(key []byte) (y.ValueStruct, error) {
  651. if db.IsClosed() {
  652. return y.ValueStruct{}, ErrDBClosed
  653. }
  654. tables, decr := db.getMemTables() // Lock should be released.
  655. defer decr()
  656. var maxVs y.ValueStruct
  657. version := y.ParseTs(key)
  658. y.NumGetsAdd(db.opt.MetricsEnabled, 1)
  659. for i := 0; i < len(tables); i++ {
  660. vs := tables[i].sl.Get(key)
  661. y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1)
  662. if vs.Meta == 0 && vs.Value == nil {
  663. continue
  664. }
  665. // Found the required version of the key, return immediately.
  666. if vs.Version == version {
  667. y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
  668. return vs, nil
  669. }
  670. if maxVs.Version < vs.Version {
  671. maxVs = vs
  672. }
  673. }
  674. return db.lc.get(key, maxVs, 0)
  675. }
  676. var requestPool = sync.Pool{
  677. New: func() interface{} {
  678. return new(request)
  679. },
  680. }
  681. func (db *DB) writeToLSM(b *request) error {
  682. // We should check the length of b.Prts and b.Entries only when badger is not
  683. // running in InMemory mode. In InMemory mode, we don't write anything to the
  684. // value log and that's why the length of b.Ptrs will always be zero.
  685. if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
  686. return fmt.Errorf("Ptrs and Entries don't match: %+v", b)
  687. }
  688. for i, entry := range b.Entries {
  689. var err error
  690. if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
  691. // Will include deletion / tombstone case.
  692. err = db.mt.Put(entry.Key,
  693. y.ValueStruct{
  694. Value: entry.Value,
  695. // Ensure value pointer flag is removed. Otherwise, the value will fail
  696. // to be retrieved during iterator prefetch. `bitValuePointer` is only
  697. // known to be set in write to LSM when the entry is loaded from a backup
  698. // with lower ValueThreshold and its value was stored in the value log.
  699. Meta: entry.meta &^ bitValuePointer,
  700. UserMeta: entry.UserMeta,
  701. ExpiresAt: entry.ExpiresAt,
  702. })
  703. } else {
  704. // Write pointer to Memtable.
  705. err = db.mt.Put(entry.Key,
  706. y.ValueStruct{
  707. Value: b.Ptrs[i].Encode(),
  708. Meta: entry.meta | bitValuePointer,
  709. UserMeta: entry.UserMeta,
  710. ExpiresAt: entry.ExpiresAt,
  711. })
  712. }
  713. if err != nil {
  714. return y.Wrapf(err, "while writing to memTable")
  715. }
  716. }
  717. if db.opt.SyncWrites {
  718. return db.mt.SyncWAL()
  719. }
  720. return nil
  721. }
  722. // writeRequests is called serially by only one goroutine.
  723. func (db *DB) writeRequests(reqs []*request) error {
  724. if len(reqs) == 0 {
  725. return nil
  726. }
  727. done := func(err error) {
  728. for _, r := range reqs {
  729. r.Err = err
  730. r.Wg.Done()
  731. }
  732. }
  733. db.opt.Debugf("writeRequests called. Writing to value log")
  734. err := db.vlog.write(reqs)
  735. if err != nil {
  736. done(err)
  737. return err
  738. }
  739. db.opt.Debugf("Writing to memtable")
  740. var count int
  741. for _, b := range reqs {
  742. if len(b.Entries) == 0 {
  743. continue
  744. }
  745. count += len(b.Entries)
  746. var i uint64
  747. var err error
  748. for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
  749. i++
  750. if i%100 == 0 {
  751. db.opt.Debugf("Making room for writes")
  752. }
  753. // We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
  754. // When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
  755. // you will get a deadlock.
  756. time.Sleep(10 * time.Millisecond)
  757. }
  758. if err != nil {
  759. done(err)
  760. return y.Wrap(err, "writeRequests")
  761. }
  762. if err := db.writeToLSM(b); err != nil {
  763. done(err)
  764. return y.Wrap(err, "writeRequests")
  765. }
  766. }
  767. db.opt.Debugf("Sending updates to subscribers")
  768. db.pub.sendUpdates(reqs)
  769. done(nil)
  770. db.opt.Debugf("%d entries written", count)
  771. return nil
  772. }
  773. func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
  774. if db.blockWrites.Load() == 1 {
  775. return nil, ErrBlockedWrites
  776. }
  777. var count, size int64
  778. for _, e := range entries {
  779. size += e.estimateSizeAndSetThreshold(db.valueThreshold())
  780. count++
  781. }
  782. y.NumBytesWrittenUserAdd(db.opt.MetricsEnabled, size)
  783. if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
  784. return nil, ErrTxnTooBig
  785. }
  786. // We can only service one request because we need each txn to be stored in a contiguous section.
  787. // Txns should not interleave among other txns or rewrites.
  788. req := requestPool.Get().(*request)
  789. req.reset()
  790. req.Entries = entries
  791. req.Wg.Add(1)
  792. req.IncrRef() // for db write
  793. db.writeCh <- req // Handled in doWrites.
  794. y.NumPutsAdd(db.opt.MetricsEnabled, int64(len(entries)))
  795. return req, nil
  796. }
  797. func (db *DB) doWrites(lc *z.Closer) {
  798. defer lc.Done()
  799. pendingCh := make(chan struct{}, 1)
  800. writeRequests := func(reqs []*request) {
  801. if err := db.writeRequests(reqs); err != nil {
  802. db.opt.Errorf("writeRequests: %v", err)
  803. }
  804. <-pendingCh
  805. }
  806. // This variable tracks the number of pending writes.
  807. reqLen := new(expvar.Int)
  808. y.PendingWritesSet(db.opt.MetricsEnabled, db.opt.Dir, reqLen)
  809. reqs := make([]*request, 0, 10)
  810. for {
  811. var r *request
  812. select {
  813. case r = <-db.writeCh:
  814. case <-lc.HasBeenClosed():
  815. goto closedCase
  816. }
  817. for {
  818. reqs = append(reqs, r)
  819. reqLen.Set(int64(len(reqs)))
  820. if len(reqs) >= 3*kvWriteChCapacity {
  821. pendingCh <- struct{}{} // blocking.
  822. goto writeCase
  823. }
  824. select {
  825. // Either push to pending, or continue to pick from writeCh.
  826. case r = <-db.writeCh:
  827. case pendingCh <- struct{}{}:
  828. goto writeCase
  829. case <-lc.HasBeenClosed():
  830. goto closedCase
  831. }
  832. }
  833. closedCase:
  834. // All the pending request are drained.
  835. // Don't close the writeCh, because it has be used in several places.
  836. for {
  837. select {
  838. case r = <-db.writeCh:
  839. reqs = append(reqs, r)
  840. default:
  841. pendingCh <- struct{}{} // Push to pending before doing a write.
  842. writeRequests(reqs)
  843. return
  844. }
  845. }
  846. writeCase:
  847. go writeRequests(reqs)
  848. reqs = make([]*request, 0, 10)
  849. reqLen.Set(0)
  850. }
  851. }
  852. // batchSet applies a list of badger.Entry. If a request level error occurs it
  853. // will be returned.
  854. //
  855. // Check(kv.BatchSet(entries))
  856. func (db *DB) batchSet(entries []*Entry) error {
  857. req, err := db.sendToWriteCh(entries)
  858. if err != nil {
  859. return err
  860. }
  861. return req.Wait()
  862. }
  863. // batchSetAsync is the asynchronous version of batchSet. It accepts a callback
  864. // function which is called when all the sets are complete. If a request level
  865. // error occurs, it will be passed back via the callback.
  866. //
  867. // err := kv.BatchSetAsync(entries, func(err error)) {
  868. // Check(err)
  869. // }
  870. func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
  871. req, err := db.sendToWriteCh(entries)
  872. if err != nil {
  873. return err
  874. }
  875. go func() {
  876. err := req.Wait()
  877. // Write is complete. Let's call the callback function now.
  878. f(err)
  879. }()
  880. return nil
  881. }
  882. var errNoRoom = errors.New("No room for write")
  883. // ensureRoomForWrite is always called serially.
  884. func (db *DB) ensureRoomForWrite() error {
  885. var err error
  886. db.lock.Lock()
  887. defer db.lock.Unlock()
  888. y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
  889. if !db.mt.isFull() {
  890. return nil
  891. }
  892. select {
  893. case db.flushChan <- db.mt:
  894. db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
  895. db.mt.sl.MemSize(), len(db.flushChan))
  896. // We manage to push this task. Let's modify imm.
  897. db.imm = append(db.imm, db.mt)
  898. db.mt, err = db.newMemTable()
  899. if err != nil {
  900. return y.Wrapf(err, "cannot create new mem table")
  901. }
  902. // New memtable is empty. We certainly have room.
  903. return nil
  904. default:
  905. // We need to do this to unlock and allow the flusher to modify imm.
  906. return errNoRoom
  907. }
  908. }
  909. func arenaSize(opt Options) int64 {
  910. return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
  911. }
  912. // buildL0Table builds a new table from the memtable.
  913. func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
  914. defer iter.Close()
  915. b := table.NewTableBuilder(bopts)
  916. for iter.Rewind(); iter.Valid(); iter.Next() {
  917. if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
  918. continue
  919. }
  920. vs := iter.Value()
  921. var vp valuePointer
  922. if vs.Meta&bitValuePointer > 0 {
  923. vp.Decode(vs.Value)
  924. }
  925. b.Add(iter.Key(), iter.Value(), vp.Len)
  926. }
  927. return b
  928. }
  929. // handleMemTableFlush must be run serially.
  930. func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
  931. bopts := buildTableOptions(db)
  932. itr := mt.sl.NewUniIterator(false)
  933. builder := buildL0Table(itr, nil, bopts)
  934. defer builder.Close()
  935. // buildL0Table can return nil if the none of the items in the skiplist are
  936. // added to the builder. This can happen when drop prefix is set and all
  937. // the items are skipped.
  938. if builder.Empty() {
  939. builder.Finish()
  940. return nil
  941. }
  942. fileID := db.lc.reserveFileID()
  943. var tbl *table.Table
  944. var err error
  945. if db.opt.InMemory {
  946. data := builder.Finish()
  947. tbl, err = table.OpenInMemoryTable(data, fileID, &bopts)
  948. } else {
  949. tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), builder)
  950. }
  951. if err != nil {
  952. return y.Wrap(err, "error while creating table")
  953. }
  954. // We own a ref on tbl.
  955. err = db.lc.addLevel0Table(tbl) // This will incrRef
  956. _ = tbl.DecrRef() // Releases our ref.
  957. return err
  958. }
  959. // flushMemtable must keep running until we send it an empty memtable. If there
  960. // are errors during handling the memtable flush, we'll retry indefinitely.
  961. func (db *DB) flushMemtable(lc *z.Closer) {
  962. defer lc.Done()
  963. for mt := range db.flushChan {
  964. if mt == nil {
  965. continue
  966. }
  967. for {
  968. if err := db.handleMemTableFlush(mt, nil); err != nil {
  969. // Encountered error. Retry indefinitely.
  970. db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
  971. time.Sleep(time.Second)
  972. continue
  973. }
  974. // Update s.imm. Need a lock.
  975. db.lock.Lock()
  976. // This is a single-threaded operation. mt corresponds to the head of
  977. // db.imm list. Once we flush it, we advance db.imm. The next mt
  978. // which would arrive here would match db.imm[0], because we acquire a
  979. // lock over DB when pushing to flushChan.
  980. // TODO: This logic is dirty AF. Any change and this could easily break.
  981. y.AssertTrue(mt == db.imm[0])
  982. db.imm = db.imm[1:]
  983. mt.DecrRef() // Return memory.
  984. // unlock
  985. db.lock.Unlock()
  986. break
  987. }
  988. }
  989. }
  990. func exists(path string) (bool, error) {
  991. _, err := os.Stat(path)
  992. if err == nil {
  993. return true, nil
  994. }
  995. if os.IsNotExist(err) {
  996. return false, nil
  997. }
  998. return true, err
  999. }
  1000. // This function does a filewalk, calculates the size of vlog and sst files and stores it in
  1001. // y.LSMSize and y.VlogSize.
  1002. func (db *DB) calculateSize() {
  1003. if db.opt.InMemory {
  1004. return
  1005. }
  1006. newInt := func(val int64) *expvar.Int {
  1007. v := new(expvar.Int)
  1008. v.Add(val)
  1009. return v
  1010. }
  1011. totalSize := func(dir string) (int64, int64) {
  1012. var lsmSize, vlogSize int64
  1013. err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
  1014. if err != nil {
  1015. return err
  1016. }
  1017. ext := filepath.Ext(path)
  1018. switch ext {
  1019. case ".sst":
  1020. lsmSize += info.Size()
  1021. case ".vlog":
  1022. vlogSize += info.Size()
  1023. }
  1024. return nil
  1025. })
  1026. if err != nil {
  1027. db.opt.Debugf("Got error while calculating total size of directory: %s", dir)
  1028. }
  1029. return lsmSize, vlogSize
  1030. }
  1031. lsmSize, vlogSize := totalSize(db.opt.Dir)
  1032. y.LSMSizeSet(db.opt.MetricsEnabled, db.opt.Dir, newInt(lsmSize))
  1033. // If valueDir is different from dir, we'd have to do another walk.
  1034. if db.opt.ValueDir != db.opt.Dir {
  1035. _, vlogSize = totalSize(db.opt.ValueDir)
  1036. }
  1037. y.VlogSizeSet(db.opt.MetricsEnabled, db.opt.ValueDir, newInt(vlogSize))
  1038. }
  1039. func (db *DB) updateSize(lc *z.Closer) {
  1040. defer lc.Done()
  1041. if db.opt.InMemory {
  1042. return
  1043. }
  1044. metricsTicker := time.NewTicker(time.Minute)
  1045. defer metricsTicker.Stop()
  1046. for {
  1047. select {
  1048. case <-metricsTicker.C:
  1049. db.calculateSize()
  1050. case <-lc.HasBeenClosed():
  1051. return
  1052. }
  1053. }
  1054. }
  1055. // RunValueLogGC triggers a value log garbage collection.
  1056. //
  1057. // It picks value log files to perform GC based on statistics that are collected
  1058. // during compactions. If no such statistics are available, then log files are
  1059. // picked in random order. The process stops as soon as the first log file is
  1060. // encountered which does not result in garbage collection.
  1061. //
  1062. // When a log file is picked, it is first sampled. If the sample shows that we
  1063. // can discard at least discardRatio space of that file, it would be rewritten.
  1064. //
  1065. // If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
  1066. // thrown indicating that the call resulted in no file rewrites.
  1067. //
  1068. // We recommend setting discardRatio to 0.5, thus indicating that a file be
  1069. // rewritten if half the space can be discarded. This results in a lifetime
  1070. // value log write amplification of 2 (1 from original write + 0.5 rewrite +
  1071. // 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
  1072. // space reclaims, while setting it to a lower value would result in more space
  1073. // reclaims at the cost of increased activity on the LSM tree. discardRatio
  1074. // must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
  1075. // ErrInvalidRequest is returned.
  1076. //
  1077. // Only one GC is allowed at a time. If another value log GC is running, or DB
  1078. // has been closed, this would return an ErrRejected.
  1079. //
  1080. // Note: Every time GC is run, it would produce a spike of activity on the LSM
  1081. // tree.
  1082. func (db *DB) RunValueLogGC(discardRatio float64) error {
  1083. if db.opt.InMemory {
  1084. return ErrGCInMemoryMode
  1085. }
  1086. if discardRatio >= 1.0 || discardRatio <= 0.0 {
  1087. return ErrInvalidRequest
  1088. }
  1089. // Pick a log file and run GC
  1090. return db.vlog.runGC(discardRatio)
  1091. }
  1092. // Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
  1093. // call RunValueLogGC.
  1094. func (db *DB) Size() (lsm, vlog int64) {
  1095. if y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir) == nil {
  1096. lsm, vlog = 0, 0
  1097. return
  1098. }
  1099. lsm = y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir).(*expvar.Int).Value()
  1100. vlog = y.VlogSizeGet(db.opt.MetricsEnabled, db.opt.ValueDir).(*expvar.Int).Value()
  1101. return
  1102. }
  1103. // Sequence represents a Badger sequence.
  1104. type Sequence struct {
  1105. lock sync.Mutex
  1106. db *DB
  1107. key []byte
  1108. next uint64
  1109. leased uint64
  1110. bandwidth uint64
  1111. }
  1112. // Next would return the next integer in the sequence, updating the lease by running a transaction
  1113. // if needed.
  1114. func (seq *Sequence) Next() (uint64, error) {
  1115. seq.lock.Lock()
  1116. defer seq.lock.Unlock()
  1117. if seq.next >= seq.leased {
  1118. if err := seq.updateLease(); err != nil {
  1119. return 0, err
  1120. }
  1121. }
  1122. val := seq.next
  1123. seq.next++
  1124. return val, nil
  1125. }
  1126. // Release the leased sequence to avoid wasted integers. This should be done right
  1127. // before closing the associated DB. However it is valid to use the sequence after
  1128. // it was released, causing a new lease with full bandwidth.
  1129. func (seq *Sequence) Release() error {
  1130. seq.lock.Lock()
  1131. defer seq.lock.Unlock()
  1132. err := seq.db.Update(func(txn *Txn) error {
  1133. item, err := txn.Get(seq.key)
  1134. if err != nil {
  1135. return err
  1136. }
  1137. var num uint64
  1138. if err := item.Value(func(v []byte) error {
  1139. num = binary.BigEndian.Uint64(v)
  1140. return nil
  1141. }); err != nil {
  1142. return err
  1143. }
  1144. if num == seq.leased {
  1145. var buf [8]byte
  1146. binary.BigEndian.PutUint64(buf[:], seq.next)
  1147. return txn.SetEntry(NewEntry(seq.key, buf[:]))
  1148. }
  1149. return nil
  1150. })
  1151. if err != nil {
  1152. return err
  1153. }
  1154. seq.leased = seq.next
  1155. return nil
  1156. }
  1157. func (seq *Sequence) updateLease() error {
  1158. return seq.db.Update(func(txn *Txn) error {
  1159. item, err := txn.Get(seq.key)
  1160. switch {
  1161. case err == ErrKeyNotFound:
  1162. seq.next = 0
  1163. case err != nil:
  1164. return err
  1165. default:
  1166. var num uint64
  1167. if err := item.Value(func(v []byte) error {
  1168. num = binary.BigEndian.Uint64(v)
  1169. return nil
  1170. }); err != nil {
  1171. return err
  1172. }
  1173. seq.next = num
  1174. }
  1175. lease := seq.next + seq.bandwidth
  1176. var buf [8]byte
  1177. binary.BigEndian.PutUint64(buf[:], lease)
  1178. if err = txn.SetEntry(NewEntry(seq.key, buf[:])); err != nil {
  1179. return err
  1180. }
  1181. seq.leased = lease
  1182. return nil
  1183. })
  1184. }
  1185. // GetSequence would initiate a new sequence object, generating it from the stored lease, if
  1186. // available, in the database. Sequence can be used to get a list of monotonically increasing
  1187. // integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
  1188. // size of the lease, determining how many Next() requests can be served from memory.
  1189. //
  1190. // GetSequence is not supported on ManagedDB. Calling this would result in a panic.
  1191. func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
  1192. if db.opt.managedTxns {
  1193. panic("Cannot use GetSequence with managedDB=true.")
  1194. }
  1195. switch {
  1196. case len(key) == 0:
  1197. return nil, ErrEmptyKey
  1198. case bandwidth == 0:
  1199. return nil, ErrZeroBandwidth
  1200. }
  1201. seq := &Sequence{
  1202. db: db,
  1203. key: key,
  1204. next: 0,
  1205. leased: 0,
  1206. bandwidth: bandwidth,
  1207. }
  1208. err := seq.updateLease()
  1209. return seq, err
  1210. }
  1211. // Tables gets the TableInfo objects from the level controller. If withKeysCount
  1212. // is true, TableInfo objects also contain counts of keys for the tables.
  1213. func (db *DB) Tables() []TableInfo {
  1214. return db.lc.getTableInfo()
  1215. }
  1216. // Levels gets the LevelInfo.
  1217. func (db *DB) Levels() []LevelInfo {
  1218. return db.lc.getLevelInfo()
  1219. }
  1220. // EstimateSize can be used to get rough estimate of data size for a given prefix.
  1221. func (db *DB) EstimateSize(prefix []byte) (uint64, uint64) {
  1222. var onDiskSize, uncompressedSize uint64
  1223. tables := db.Tables()
  1224. for _, ti := range tables {
  1225. if bytes.HasPrefix(ti.Left, prefix) && bytes.HasPrefix(ti.Right, prefix) {
  1226. onDiskSize += uint64(ti.OnDiskSize)
  1227. uncompressedSize += uint64(ti.UncompressedSize)
  1228. }
  1229. }
  1230. return onDiskSize, uncompressedSize
  1231. }
  1232. // Ranges can be used to get rough key ranges to divide up iteration over the DB. The ranges here
  1233. // would consider the prefix, but would not necessarily start or end with the prefix. In fact, the
  1234. // first range would have nil as left key, and the last range would have nil as the right key.
  1235. func (db *DB) Ranges(prefix []byte, numRanges int) []*keyRange {
  1236. var splits []string
  1237. tables := db.Tables()
  1238. // We just want table ranges here and not keys count.
  1239. for _, ti := range tables {
  1240. // We don't use ti.Left, because that has a tendency to store !badger keys. Skip over tables
  1241. // at upper levels. Only choose tables from the last level.
  1242. if ti.Level != db.opt.MaxLevels-1 {
  1243. continue
  1244. }
  1245. if bytes.HasPrefix(ti.Right, prefix) {
  1246. splits = append(splits, string(ti.Right))
  1247. }
  1248. }
  1249. // If the number of splits is low, look at the offsets inside the
  1250. // tables to generate more splits.
  1251. if len(splits) < 32 {
  1252. numTables := len(tables)
  1253. if numTables == 0 {
  1254. numTables = 1
  1255. }
  1256. numPerTable := 32 / numTables
  1257. if numPerTable == 0 {
  1258. numPerTable = 1
  1259. }
  1260. splits = db.lc.keySplits(numPerTable, prefix)
  1261. }
  1262. // If the number of splits is still < 32, then look at the memtables.
  1263. if len(splits) < 32 {
  1264. maxPerSplit := 10000
  1265. mtSplits := func(mt *memTable) {
  1266. if mt == nil {
  1267. return
  1268. }
  1269. count := 0
  1270. iter := mt.sl.NewIterator()
  1271. for iter.SeekToFirst(); iter.Valid(); iter.Next() {
  1272. if count%maxPerSplit == 0 {
  1273. // Add a split every maxPerSplit keys.
  1274. if bytes.HasPrefix(iter.Key(), prefix) {
  1275. splits = append(splits, string(iter.Key()))
  1276. }
  1277. }
  1278. count += 1
  1279. }
  1280. _ = iter.Close()
  1281. }
  1282. db.lock.Lock()
  1283. defer db.lock.Unlock()
  1284. var memTables []*memTable
  1285. memTables = append(memTables, db.imm...)
  1286. for _, mt := range memTables {
  1287. mtSplits(mt)
  1288. }
  1289. mtSplits(db.mt)
  1290. }
  1291. // We have our splits now. Let's convert them to ranges.
  1292. sort.Strings(splits)
  1293. var ranges []*keyRange
  1294. var start []byte
  1295. for _, key := range splits {
  1296. ranges = append(ranges, &keyRange{left: start, right: y.SafeCopy(nil, []byte(key))})
  1297. start = y.SafeCopy(nil, []byte(key))
  1298. }
  1299. ranges = append(ranges, &keyRange{left: start})
  1300. // Figure out the approximate table size this range has to deal with.
  1301. for _, t := range tables {
  1302. tr := keyRange{left: t.Left, right: t.Right}
  1303. for _, r := range ranges {
  1304. if len(r.left) == 0 || len(r.right) == 0 {
  1305. continue
  1306. }
  1307. if r.overlapsWith(tr) {
  1308. r.size += int64(t.UncompressedSize)
  1309. }
  1310. }
  1311. }
  1312. var total int64
  1313. for _, r := range ranges {
  1314. total += r.size
  1315. }
  1316. if total == 0 {
  1317. return ranges
  1318. }
  1319. // Figure out the average size, so we know how to bin the ranges together.
  1320. avg := total / int64(numRanges)
  1321. var out []*keyRange
  1322. var i int
  1323. for i < len(ranges) {
  1324. r := ranges[i]
  1325. cur := &keyRange{left: r.left, size: r.size, right: r.right}
  1326. i++
  1327. for ; i < len(ranges); i++ {
  1328. next := ranges[i]
  1329. if cur.size+next.size > avg {
  1330. break
  1331. }
  1332. cur.right = next.right
  1333. cur.size += next.size
  1334. }
  1335. out = append(out, cur)
  1336. }
  1337. return out
  1338. }
  1339. // MaxBatchCount returns max possible entries in batch
  1340. func (db *DB) MaxBatchCount() int64 {
  1341. return db.opt.maxBatchCount
  1342. }
  1343. // MaxBatchSize returns max possible batch size
  1344. func (db *DB) MaxBatchSize() int64 {
  1345. return db.opt.maxBatchSize
  1346. }
  1347. func (db *DB) stopMemoryFlush() {
  1348. // Stop memtable flushes.
  1349. if db.closers.memtable != nil {
  1350. close(db.flushChan)
  1351. db.closers.memtable.SignalAndWait()
  1352. }
  1353. }
  1354. func (db *DB) stopCompactions() {
  1355. // Stop compactions.
  1356. if db.closers.compactors != nil {
  1357. db.closers.compactors.SignalAndWait()
  1358. }
  1359. }
  1360. func (db *DB) startCompactions() {
  1361. // Resume compactions.
  1362. if db.closers.compactors != nil {
  1363. db.closers.compactors = z.NewCloser(1)
  1364. db.lc.startCompact(db.closers.compactors)
  1365. }
  1366. }
  1367. func (db *DB) startMemoryFlush() {
  1368. // Start memory fluhser.
  1369. if db.closers.memtable != nil {
  1370. db.flushChan = make(chan *memTable, db.opt.NumMemtables)
  1371. db.closers.memtable = z.NewCloser(1)
  1372. go func() {
  1373. db.flushMemtable(db.closers.memtable)
  1374. }()
  1375. }
  1376. }
  1377. // Flatten can be used to force compactions on the LSM tree so all the tables fall on the same
  1378. // level. This ensures that all the versions of keys are colocated and not split across multiple
  1379. // levels, which is necessary after a restore from backup. During Flatten, live compactions are
  1380. // stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition
  1381. // between flattening the tree and new tables being created at level zero.
  1382. func (db *DB) Flatten(workers int) error {
  1383. db.stopCompactions()
  1384. defer db.startCompactions()
  1385. compactAway := func(cp compactionPriority) error {
  1386. db.opt.Infof("Attempting to compact with %+v\n", cp)
  1387. errCh := make(chan error, 1)
  1388. for i := 0; i < workers; i++ {
  1389. go func() {
  1390. errCh <- db.lc.doCompact(175, cp)
  1391. }()
  1392. }
  1393. var success int
  1394. var rerr error
  1395. for i := 0; i < workers; i++ {
  1396. err := <-errCh
  1397. if err != nil {
  1398. rerr = err
  1399. db.opt.Warningf("While running doCompact with %+v. Error: %v\n", cp, err)
  1400. } else {
  1401. success++
  1402. }
  1403. }
  1404. if success == 0 {
  1405. return rerr
  1406. }
  1407. // We could do at least one successful compaction. So, we'll consider this a success.
  1408. db.opt.Infof("%d compactor(s) succeeded. One or more tables from level %d compacted.\n",
  1409. success, cp.level)
  1410. return nil
  1411. }
  1412. hbytes := func(sz int64) string {
  1413. return humanize.IBytes(uint64(sz))
  1414. }
  1415. t := db.lc.levelTargets()
  1416. for {
  1417. db.opt.Infof("\n")
  1418. var levels []int
  1419. for i, l := range db.lc.levels {
  1420. sz := l.getTotalSize()
  1421. db.opt.Infof("Level: %d. %8s Size. %8s Max.\n",
  1422. i, hbytes(l.getTotalSize()), hbytes(t.targetSz[i]))
  1423. if sz > 0 {
  1424. levels = append(levels, i)
  1425. }
  1426. }
  1427. if len(levels) <= 1 {
  1428. prios := db.lc.pickCompactLevels(nil)
  1429. if len(prios) == 0 || prios[0].score <= 1.0 {
  1430. db.opt.Infof("All tables consolidated into one level. Flattening done.\n")
  1431. return nil
  1432. }
  1433. if err := compactAway(prios[0]); err != nil {
  1434. return err
  1435. }
  1436. continue
  1437. }
  1438. // Create an artificial compaction priority, to ensure that we compact the level.
  1439. cp := compactionPriority{level: levels[0], score: 1.71}
  1440. if err := compactAway(cp); err != nil {
  1441. return err
  1442. }
  1443. }
  1444. }
  1445. func (db *DB) blockWrite() error {
  1446. // Stop accepting new writes.
  1447. if !db.blockWrites.CompareAndSwap(0, 1) {
  1448. return ErrBlockedWrites
  1449. }
  1450. // Make all pending writes finish. The following will also close writeCh.
  1451. db.closers.writes.SignalAndWait()
  1452. db.opt.Infof("Writes flushed. Stopping compactions now...")
  1453. return nil
  1454. }
  1455. func (db *DB) unblockWrite() {
  1456. db.closers.writes = z.NewCloser(1)
  1457. go db.doWrites(db.closers.writes)
  1458. // Resume writes.
  1459. db.blockWrites.Store(0)
  1460. }
  1461. func (db *DB) prepareToDrop() (func(), error) {
  1462. if db.opt.ReadOnly {
  1463. panic("Attempting to drop data in read-only mode.")
  1464. }
  1465. // In order prepare for drop, we need to block the incoming writes and
  1466. // write it to db. Then, flush all the pending memtable. So that, we
  1467. // don't miss any entries.
  1468. if err := db.blockWrite(); err != nil {
  1469. return func() {}, err
  1470. }
  1471. reqs := make([]*request, 0, 10)
  1472. for {
  1473. select {
  1474. case r := <-db.writeCh:
  1475. reqs = append(reqs, r)
  1476. default:
  1477. if err := db.writeRequests(reqs); err != nil {
  1478. db.opt.Errorf("writeRequests: %v", err)
  1479. }
  1480. db.stopMemoryFlush()
  1481. return func() {
  1482. db.opt.Infof("Resuming writes")
  1483. db.startMemoryFlush()
  1484. db.unblockWrite()
  1485. }, nil
  1486. }
  1487. }
  1488. }
  1489. // DropAll would drop all the data stored in Badger. It does this in the following way.
  1490. // - Stop accepting new writes.
  1491. // - Pause memtable flushes and compactions.
  1492. // - Pick all tables from all levels, create a changeset to delete all these
  1493. // tables and apply it to manifest.
  1494. // - Pick all log files from value log, and delete all of them. Restart value log files from zero.
  1495. // - Resume memtable flushes and compactions.
  1496. //
  1497. // NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do
  1498. // any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and
  1499. // writes are paused before running DropAll, and resumed after it is finished.
  1500. func (db *DB) DropAll() error {
  1501. f, err := db.dropAll()
  1502. if f != nil {
  1503. f()
  1504. }
  1505. return err
  1506. }
  1507. func (db *DB) dropAll() (func(), error) {
  1508. db.opt.Infof("DropAll called. Blocking writes...")
  1509. f, err := db.prepareToDrop()
  1510. if err != nil {
  1511. return f, err
  1512. }
  1513. // prepareToDrop will stop all the incoming write and flushes any pending memtables.
  1514. // Before we drop, we'll stop the compaction because anyways all the datas are going to
  1515. // be deleted.
  1516. db.stopCompactions()
  1517. resume := func() {
  1518. db.startCompactions()
  1519. f()
  1520. }
  1521. // Block all foreign interactions with memory tables.
  1522. db.lock.Lock()
  1523. defer db.lock.Unlock()
  1524. // Remove inmemory tables. Calling DecrRef for safety. Not sure if they're absolutely needed.
  1525. db.mt.DecrRef()
  1526. for _, mt := range db.imm {
  1527. mt.DecrRef()
  1528. }
  1529. db.imm = db.imm[:0]
  1530. db.mt, err = db.newMemTable() // Set it up for future writes.
  1531. if err != nil {
  1532. return resume, y.Wrapf(err, "cannot open new memtable")
  1533. }
  1534. num, err := db.lc.dropTree()
  1535. if err != nil {
  1536. return resume, err
  1537. }
  1538. db.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", num)
  1539. num, err = db.vlog.dropAll()
  1540. if err != nil {
  1541. return resume, err
  1542. }
  1543. db.lc.nextFileID.Store(1)
  1544. db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
  1545. db.blockCache.Clear()
  1546. db.indexCache.Clear()
  1547. db.threshold.Clear(db.opt)
  1548. return resume, nil
  1549. }
  1550. // DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
  1551. // - Stop accepting new writes.
  1552. // - Stop memtable flushes before acquiring lock. Because we're acquiring lock here
  1553. // and memtable flush stalls for lock, which leads to deadlock
  1554. // - Flush out all memtables, skipping over keys with the given prefix, Kp.
  1555. // - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
  1556. // back after a restart.
  1557. // - Stop compaction.
  1558. // - Compact L0->L1, skipping over Kp.
  1559. // - Compact rest of the levels, Li->Li, picking tables which have Kp.
  1560. // - Resume memtable flushes, compactions and writes.
  1561. func (db *DB) DropPrefix(prefixes ...[]byte) error {
  1562. if len(prefixes) == 0 {
  1563. return nil
  1564. }
  1565. db.opt.Infof("DropPrefix called for %s", prefixes)
  1566. f, err := db.prepareToDrop()
  1567. if err != nil {
  1568. return err
  1569. }
  1570. defer f()
  1571. var filtered [][]byte
  1572. if filtered, err = db.filterPrefixesToDrop(prefixes); err != nil {
  1573. return err
  1574. }
  1575. // If there is no prefix for which the data already exist, do not do anything.
  1576. if len(filtered) == 0 {
  1577. db.opt.Infof("No prefixes to drop")
  1578. return nil
  1579. }
  1580. // Block all foreign interactions with memory tables.
  1581. db.lock.Lock()
  1582. defer db.lock.Unlock()
  1583. db.imm = append(db.imm, db.mt)
  1584. for _, memtable := range db.imm {
  1585. if memtable.sl.Empty() {
  1586. memtable.DecrRef()
  1587. continue
  1588. }
  1589. db.opt.Debugf("Flushing memtable")
  1590. if err := db.handleMemTableFlush(memtable, filtered); err != nil {
  1591. db.opt.Errorf("While trying to flush memtable: %v", err)
  1592. return err
  1593. }
  1594. memtable.DecrRef()
  1595. }
  1596. db.stopCompactions()
  1597. defer db.startCompactions()
  1598. db.imm = db.imm[:0]
  1599. db.mt, err = db.newMemTable()
  1600. if err != nil {
  1601. return y.Wrapf(err, "cannot create new mem table")
  1602. }
  1603. // Drop prefixes from the levels.
  1604. if err := db.lc.dropPrefixes(filtered); err != nil {
  1605. return err
  1606. }
  1607. db.opt.Infof("DropPrefix done")
  1608. return nil
  1609. }
  1610. func (db *DB) filterPrefixesToDrop(prefixes [][]byte) ([][]byte, error) {
  1611. var filtered [][]byte
  1612. for _, prefix := range prefixes {
  1613. err := db.View(func(txn *Txn) error {
  1614. iopts := DefaultIteratorOptions
  1615. iopts.Prefix = prefix
  1616. iopts.PrefetchValues = false
  1617. itr := txn.NewIterator(iopts)
  1618. defer itr.Close()
  1619. itr.Rewind()
  1620. if itr.ValidForPrefix(prefix) {
  1621. filtered = append(filtered, prefix)
  1622. }
  1623. return nil
  1624. })
  1625. if err != nil {
  1626. return filtered, err
  1627. }
  1628. }
  1629. return filtered, nil
  1630. }
  1631. // Checks if the key is banned. Returns the respective error if the key belongs to any of the banned
  1632. // namepspaces. Else it returns nil.
  1633. func (db *DB) isBanned(key []byte) error {
  1634. if db.opt.NamespaceOffset < 0 {
  1635. return nil
  1636. }
  1637. if len(key) <= db.opt.NamespaceOffset+8 {
  1638. return nil
  1639. }
  1640. if db.bannedNamespaces.has(y.BytesToU64(key[db.opt.NamespaceOffset:])) {
  1641. return ErrBannedKey
  1642. }
  1643. return nil
  1644. }
  1645. // BanNamespace bans a namespace. Read/write to keys belonging to any of such namespace is denied.
  1646. func (db *DB) BanNamespace(ns uint64) error {
  1647. if db.opt.NamespaceOffset < 0 {
  1648. return ErrNamespaceMode
  1649. }
  1650. db.opt.Infof("Banning namespace: %d", ns)
  1651. // First set the banned namespaces in DB and then update the in-memory structure.
  1652. key := y.KeyWithTs(append(bannedNsKey, y.U64ToBytes(ns)...), 1)
  1653. entry := []*Entry{{
  1654. Key: key,
  1655. Value: nil,
  1656. }}
  1657. req, err := db.sendToWriteCh(entry)
  1658. if err != nil {
  1659. return err
  1660. }
  1661. if err := req.Wait(); err != nil {
  1662. return err
  1663. }
  1664. db.bannedNamespaces.add(ns)
  1665. return nil
  1666. }
  1667. // BannedNamespaces returns the list of prefixes banned for DB.
  1668. func (db *DB) BannedNamespaces() []uint64 {
  1669. return db.bannedNamespaces.all()
  1670. }
  1671. // KVList contains a list of key-value pairs.
  1672. type KVList = pb.KVList
  1673. // Subscribe can be used to watch key changes for the given key prefixes and the ignore string.
  1674. // At least one prefix should be passed, or an error will be returned.
  1675. // You can use an empty prefix to monitor all changes to the DB.
  1676. // Ignore string is the byte ranges for which prefix matching will be ignored.
  1677. // For example: ignore = "2-3", and prefix = "abc" will match for keys "abxxc", "abdfc" etc.
  1678. // This function blocks until the given context is done or an error occurs.
  1679. // The given function will be called with a new KVList containing the modified keys and the
  1680. // corresponding values.
  1681. func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches []pb.Match) error {
  1682. if cb == nil {
  1683. return ErrNilCallback
  1684. }
  1685. c := z.NewCloser(1)
  1686. s, err := db.pub.newSubscriber(c, matches)
  1687. if err != nil {
  1688. return y.Wrapf(err, "while creating a new subscriber")
  1689. }
  1690. slurp := func(batch *pb.KVList) error {
  1691. for {
  1692. select {
  1693. case kvs := <-s.sendCh:
  1694. batch.Kv = append(batch.Kv, kvs.Kv...)
  1695. default:
  1696. if len(batch.GetKv()) > 0 {
  1697. return cb(batch)
  1698. }
  1699. return nil
  1700. }
  1701. }
  1702. }
  1703. drain := func() {
  1704. for {
  1705. select {
  1706. case _, ok := <-s.sendCh:
  1707. if !ok {
  1708. // Channel is closed.
  1709. return
  1710. }
  1711. default:
  1712. return
  1713. }
  1714. }
  1715. }
  1716. for {
  1717. select {
  1718. case <-c.HasBeenClosed():
  1719. // No need to delete here. Closer will be called only while
  1720. // closing DB. Subscriber will be deleted by cleanSubscribers.
  1721. err := slurp(new(pb.KVList))
  1722. // Drain if any pending updates.
  1723. c.Done()
  1724. return err
  1725. case <-ctx.Done():
  1726. c.Done()
  1727. s.active.Store(0)
  1728. drain()
  1729. db.pub.deleteSubscriber(s.id)
  1730. // Delete the subscriber to avoid further updates.
  1731. return ctx.Err()
  1732. case batch := <-s.sendCh:
  1733. err := slurp(batch)
  1734. if err != nil {
  1735. c.Done()
  1736. s.active.Store(0)
  1737. drain()
  1738. // Delete the subscriber if there is an error by the callback.
  1739. db.pub.deleteSubscriber(s.id)
  1740. return err
  1741. }
  1742. }
  1743. }
  1744. }
  1745. func (db *DB) syncDir(dir string) error {
  1746. if db.opt.InMemory {
  1747. return nil
  1748. }
  1749. return syncDir(dir)
  1750. }
  1751. func createDirs(opt Options) error {
  1752. for _, path := range []string{opt.Dir, opt.ValueDir} {
  1753. dirExists, err := exists(path)
  1754. if err != nil {
  1755. return y.Wrapf(err, "Invalid Dir: %q", path)
  1756. }
  1757. if !dirExists {
  1758. if opt.ReadOnly {
  1759. return fmt.Errorf("Cannot find directory %q for read-only open", path)
  1760. }
  1761. // Try to create the directory
  1762. err = os.MkdirAll(path, 0700)
  1763. if err != nil {
  1764. return y.Wrapf(err, "Error Creating Dir: %q", path)
  1765. }
  1766. }
  1767. }
  1768. return nil
  1769. }
  1770. // Stream the contents of this DB to a new DB with options outOptions that will be
  1771. // created in outDir.
  1772. func (db *DB) StreamDB(outOptions Options) error {
  1773. outDir := outOptions.Dir
  1774. // Open output DB.
  1775. outDB, err := OpenManaged(outOptions)
  1776. if err != nil {
  1777. return y.Wrapf(err, "cannot open out DB at %s", outDir)
  1778. }
  1779. defer outDB.Close()
  1780. writer := outDB.NewStreamWriter()
  1781. if err := writer.Prepare(); err != nil {
  1782. return y.Wrapf(err, "cannot create stream writer in out DB at %s", outDir)
  1783. }
  1784. // Stream contents of DB to the output DB.
  1785. stream := db.NewStreamAt(math.MaxUint64)
  1786. stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
  1787. stream.Send = func(buf *z.Buffer) error {
  1788. return writer.Write(buf)
  1789. }
  1790. if err := stream.Orchestrate(context.Background()); err != nil {
  1791. return y.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
  1792. }
  1793. if err := writer.Flush(); err != nil {
  1794. return y.Wrapf(err, "cannot flush writer")
  1795. }
  1796. return nil
  1797. }
  1798. // Opts returns a copy of the DB options.
  1799. func (db *DB) Opts() Options {
  1800. return db.opt
  1801. }
  1802. type CacheType int
  1803. const (
  1804. BlockCache CacheType = iota
  1805. IndexCache
  1806. )
  1807. // CacheMaxCost updates the max cost of the given cache (either block or index cache).
  1808. // The call will have an effect only if the DB was created with the cache. Otherwise it is
  1809. // a no-op. If you pass a negative value, the function will return the current value
  1810. // without updating it.
  1811. func (db *DB) CacheMaxCost(cache CacheType, maxCost int64) (int64, error) {
  1812. if db == nil {
  1813. return 0, nil
  1814. }
  1815. if maxCost < 0 {
  1816. switch cache {
  1817. case BlockCache:
  1818. return db.blockCache.MaxCost(), nil
  1819. case IndexCache:
  1820. return db.indexCache.MaxCost(), nil
  1821. default:
  1822. return 0, errors.New("invalid cache type")
  1823. }
  1824. }
  1825. switch cache {
  1826. case BlockCache:
  1827. db.blockCache.UpdateMaxCost(maxCost)
  1828. return maxCost, nil
  1829. case IndexCache:
  1830. db.indexCache.UpdateMaxCost(maxCost)
  1831. return maxCost, nil
  1832. default:
  1833. return 0, errors.New("invalid cache type")
  1834. }
  1835. }
  1836. func (db *DB) LevelsToString() string {
  1837. levels := db.Levels()
  1838. h := func(sz int64) string {
  1839. return humanize.IBytes(uint64(sz))
  1840. }
  1841. base := func(b bool) string {
  1842. if b {
  1843. return "B"
  1844. }
  1845. return " "
  1846. }
  1847. var b strings.Builder
  1848. b.WriteRune('\n')
  1849. for _, li := range levels {
  1850. b.WriteString(fmt.Sprintf(
  1851. "Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f"+
  1852. " StaleData: %s Target FileSize: %s\n",
  1853. li.Level, base(li.IsBaseLevel), li.NumTables,
  1854. h(li.Size), h(li.TargetSize), li.Score, li.Adjusted, h(li.StaleDatSize),
  1855. h(li.TargetFileSize)))
  1856. }
  1857. b.WriteString("Level Done\n")
  1858. return b.String()
  1859. }