db.go 56 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/binary"
  21. stderrors "errors"
  22. "expvar"
  23. "fmt"
  24. "math"
  25. "os"
  26. "path/filepath"
  27. "sort"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. humanize "github.com/dustin/go-humanize"
  33. "github.com/pkg/errors"
  34. "github.com/dgraph-io/badger/v4/fb"
  35. "github.com/dgraph-io/badger/v4/options"
  36. "github.com/dgraph-io/badger/v4/pb"
  37. "github.com/dgraph-io/badger/v4/skl"
  38. "github.com/dgraph-io/badger/v4/table"
  39. "github.com/dgraph-io/badger/v4/y"
  40. "github.com/dgraph-io/ristretto/v2"
  41. "github.com/dgraph-io/ristretto/v2/z"
  42. )
  43. var (
  44. badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
  45. txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
  46. bannedNsKey = []byte("!badger!banned") // For storing the banned namespaces.
  47. )
  48. type closers struct {
  49. updateSize *z.Closer
  50. compactors *z.Closer
  51. memtable *z.Closer
  52. writes *z.Closer
  53. valueGC *z.Closer
  54. pub *z.Closer
  55. cacheHealth *z.Closer
  56. }
  57. type lockedKeys struct {
  58. sync.RWMutex
  59. keys map[uint64]struct{}
  60. }
  61. func (lk *lockedKeys) add(key uint64) {
  62. lk.Lock()
  63. defer lk.Unlock()
  64. lk.keys[key] = struct{}{}
  65. }
  66. func (lk *lockedKeys) has(key uint64) bool {
  67. lk.RLock()
  68. defer lk.RUnlock()
  69. _, ok := lk.keys[key]
  70. return ok
  71. }
  72. func (lk *lockedKeys) all() []uint64 {
  73. lk.RLock()
  74. defer lk.RUnlock()
  75. keys := make([]uint64, 0, len(lk.keys))
  76. for key := range lk.keys {
  77. keys = append(keys, key)
  78. }
  79. return keys
  80. }
  81. // DB provides the various functions required to interact with Badger.
  82. // DB is thread-safe.
  83. type DB struct {
  84. testOnlyDBExtensions
  85. lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
  86. dirLockGuard *directoryLockGuard
  87. // nil if Dir and ValueDir are the same
  88. valueDirGuard *directoryLockGuard
  89. closers closers
  90. mt *memTable // Our latest (actively written) in-memory table
  91. imm []*memTable // Add here only AFTER pushing to flushChan.
  92. // Initialized via openMemTables.
  93. nextMemFid int
  94. opt Options
  95. manifest *manifestFile
  96. lc *levelsController
  97. vlog valueLog
  98. writeCh chan *request
  99. flushChan chan *memTable // For flushing memtables.
  100. closeOnce sync.Once // For closing DB only once.
  101. blockWrites atomic.Int32
  102. isClosed atomic.Uint32
  103. orc *oracle
  104. bannedNamespaces *lockedKeys
  105. threshold *vlogThreshold
  106. pub *publisher
  107. registry *KeyRegistry
  108. blockCache *ristretto.Cache[[]byte, *table.Block]
  109. indexCache *ristretto.Cache[uint64, *fb.TableIndex]
  110. allocPool *z.AllocatorPool
  111. }
  112. const (
  113. kvWriteChCapacity = 1000
  114. )
  115. func checkAndSetOptions(opt *Options) error {
  116. // It's okay to have zero compactors which will disable all compactions but
  117. // we cannot have just one compactor otherwise we will end up with all data
  118. // on level 2.
  119. if opt.NumCompactors == 1 {
  120. return errors.New("Cannot have 1 compactor. Need at least 2")
  121. }
  122. if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
  123. return errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
  124. }
  125. opt.maxBatchSize = (15 * opt.MemTableSize) / 100
  126. opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize)
  127. // This is the maximum value, vlogThreshold can have if dynamic thresholding is enabled.
  128. opt.maxValueThreshold = math.Min(maxValueThreshold, float64(opt.maxBatchSize))
  129. if opt.VLogPercentile < 0.0 || opt.VLogPercentile > 1.0 {
  130. return errors.New("vlogPercentile must be within range of 0.0-1.0")
  131. }
  132. // We are limiting opt.ValueThreshold to maxValueThreshold for now.
  133. if opt.ValueThreshold > maxValueThreshold {
  134. return errors.Errorf("Invalid ValueThreshold, must be less or equal to %d",
  135. maxValueThreshold)
  136. }
  137. // If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using
  138. // the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize.
  139. if opt.ValueThreshold > opt.maxBatchSize {
  140. return errors.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+
  141. "reduce opt.ValueThreshold or increase opt.BaseTableSize.",
  142. opt.ValueThreshold, opt.maxBatchSize)
  143. }
  144. // ValueLogFileSize should be strictly LESS than 2<<30 otherwise we will
  145. // overflow the uint32 when we mmap it in OpenMemtable.
  146. if !(opt.ValueLogFileSize < 2<<30 && opt.ValueLogFileSize >= 1<<20) {
  147. return ErrValueLogSize
  148. }
  149. if opt.ReadOnly {
  150. // Do not perform compaction in read only mode.
  151. opt.CompactL0OnClose = false
  152. }
  153. needCache := (opt.Compression != options.None) || (len(opt.EncryptionKey) > 0)
  154. if needCache && opt.BlockCacheSize == 0 {
  155. panic("BlockCacheSize should be set since compression/encryption are enabled")
  156. }
  157. return nil
  158. }
  159. // Open returns a new DB object.
  160. func Open(opt Options) (*DB, error) {
  161. if err := checkAndSetOptions(&opt); err != nil {
  162. return nil, err
  163. }
  164. var dirLockGuard, valueDirLockGuard *directoryLockGuard
  165. // Create directories and acquire lock on it only if badger is not running in InMemory mode.
  166. // We don't have any directories/files in InMemory mode so we don't need to acquire
  167. // any locks on them.
  168. if !opt.InMemory {
  169. if err := createDirs(opt); err != nil {
  170. return nil, err
  171. }
  172. var err error
  173. if !opt.BypassLockGuard {
  174. dirLockGuard, err = acquireDirectoryLock(opt.Dir, lockFile, opt.ReadOnly)
  175. if err != nil {
  176. return nil, err
  177. }
  178. defer func() {
  179. if dirLockGuard != nil {
  180. _ = dirLockGuard.release()
  181. }
  182. }()
  183. absDir, err := filepath.Abs(opt.Dir)
  184. if err != nil {
  185. return nil, err
  186. }
  187. absValueDir, err := filepath.Abs(opt.ValueDir)
  188. if err != nil {
  189. return nil, err
  190. }
  191. if absValueDir != absDir {
  192. valueDirLockGuard, err = acquireDirectoryLock(opt.ValueDir, lockFile, opt.ReadOnly)
  193. if err != nil {
  194. return nil, err
  195. }
  196. defer func() {
  197. if valueDirLockGuard != nil {
  198. _ = valueDirLockGuard.release()
  199. }
  200. }()
  201. }
  202. }
  203. }
  204. manifestFile, manifest, err := openOrCreateManifestFile(opt)
  205. if err != nil {
  206. return nil, err
  207. }
  208. defer func() {
  209. if manifestFile != nil {
  210. _ = manifestFile.close()
  211. }
  212. }()
  213. db := &DB{
  214. imm: make([]*memTable, 0, opt.NumMemtables),
  215. flushChan: make(chan *memTable, opt.NumMemtables),
  216. writeCh: make(chan *request, kvWriteChCapacity),
  217. opt: opt,
  218. manifest: manifestFile,
  219. dirLockGuard: dirLockGuard,
  220. valueDirGuard: valueDirLockGuard,
  221. orc: newOracle(opt),
  222. pub: newPublisher(),
  223. allocPool: z.NewAllocatorPool(8),
  224. bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
  225. threshold: initVlogThreshold(&opt),
  226. }
  227. db.syncChan = opt.syncChan
  228. // Cleanup all the goroutines started by badger in case of an error.
  229. defer func() {
  230. if err != nil {
  231. opt.Errorf("Received err: %v. Cleaning up...", err)
  232. db.cleanup()
  233. db = nil
  234. }
  235. }()
  236. if opt.BlockCacheSize > 0 {
  237. numInCache := opt.BlockCacheSize / int64(opt.BlockSize)
  238. if numInCache == 0 {
  239. // Make the value of this variable at least one since the cache requires
  240. // the number of counters to be greater than zero.
  241. numInCache = 1
  242. }
  243. config := ristretto.Config[[]byte, *table.Block]{
  244. NumCounters: numInCache * 8,
  245. MaxCost: opt.BlockCacheSize,
  246. BufferItems: 64,
  247. Metrics: true,
  248. OnExit: table.BlockEvictHandler,
  249. }
  250. db.blockCache, err = ristretto.NewCache[[]byte, *table.Block](&config)
  251. if err != nil {
  252. return nil, y.Wrap(err, "failed to create data cache")
  253. }
  254. }
  255. if opt.IndexCacheSize > 0 {
  256. // Index size is around 5% of the table size.
  257. indexSz := int64(float64(opt.MemTableSize) * 0.05)
  258. numInCache := opt.IndexCacheSize / indexSz
  259. if numInCache == 0 {
  260. // Make the value of this variable at least one since the cache requires
  261. // the number of counters to be greater than zero.
  262. numInCache = 1
  263. }
  264. config := ristretto.Config[uint64, *fb.TableIndex]{
  265. NumCounters: numInCache * 8,
  266. MaxCost: opt.IndexCacheSize,
  267. BufferItems: 64,
  268. Metrics: true,
  269. }
  270. db.indexCache, err = ristretto.NewCache(&config)
  271. if err != nil {
  272. return nil, y.Wrap(err, "failed to create bf cache")
  273. }
  274. }
  275. db.closers.cacheHealth = z.NewCloser(1)
  276. go db.monitorCache(db.closers.cacheHealth)
  277. if db.opt.InMemory {
  278. db.opt.SyncWrites = false
  279. // If badger is running in memory mode, push everything into the LSM Tree.
  280. db.opt.ValueThreshold = math.MaxInt32
  281. }
  282. krOpt := KeyRegistryOptions{
  283. ReadOnly: opt.ReadOnly,
  284. Dir: opt.Dir,
  285. EncryptionKey: opt.EncryptionKey,
  286. EncryptionKeyRotationDuration: opt.EncryptionKeyRotationDuration,
  287. InMemory: opt.InMemory,
  288. }
  289. if db.registry, err = OpenKeyRegistry(krOpt); err != nil {
  290. return db, err
  291. }
  292. db.calculateSize()
  293. db.closers.updateSize = z.NewCloser(1)
  294. go db.updateSize(db.closers.updateSize)
  295. if err := db.openMemTables(db.opt); err != nil {
  296. return nil, y.Wrapf(err, "while opening memtables")
  297. }
  298. if !db.opt.ReadOnly {
  299. if db.mt, err = db.newMemTable(); err != nil {
  300. return nil, y.Wrapf(err, "cannot create memtable")
  301. }
  302. }
  303. // newLevelsController potentially loads files in directory.
  304. if db.lc, err = newLevelsController(db, &manifest); err != nil {
  305. return db, err
  306. }
  307. // Initialize vlog struct.
  308. db.vlog.init(db)
  309. if !opt.ReadOnly {
  310. db.closers.compactors = z.NewCloser(1)
  311. db.lc.startCompact(db.closers.compactors)
  312. db.closers.memtable = z.NewCloser(1)
  313. go func() {
  314. db.flushMemtable(db.closers.memtable) // Need levels controller to be up.
  315. }()
  316. // Flush them to disk asap.
  317. for _, mt := range db.imm {
  318. db.flushChan <- mt
  319. }
  320. }
  321. // We do increment nextTxnTs below. So, no need to do it here.
  322. db.orc.nextTxnTs = db.MaxVersion()
  323. db.opt.Infof("Set nextTxnTs to %d", db.orc.nextTxnTs)
  324. if err = db.vlog.open(db); err != nil {
  325. return db, y.Wrapf(err, "During db.vlog.open")
  326. }
  327. // Let's advance nextTxnTs to one more than whatever we observed via
  328. // replaying the logs.
  329. db.orc.txnMark.Done(db.orc.nextTxnTs)
  330. // In normal mode, we must update readMark so older versions of keys can be removed during
  331. // compaction when run in offline mode via the flatten tool.
  332. db.orc.readMark.Done(db.orc.nextTxnTs)
  333. db.orc.incrementNextTs()
  334. go db.threshold.listenForValueThresholdUpdate()
  335. if err := db.initBannedNamespaces(); err != nil {
  336. return db, errors.Wrapf(err, "While setting banned keys")
  337. }
  338. db.closers.writes = z.NewCloser(1)
  339. go db.doWrites(db.closers.writes)
  340. if !db.opt.InMemory {
  341. db.closers.valueGC = z.NewCloser(1)
  342. go db.vlog.waitOnGC(db.closers.valueGC)
  343. }
  344. db.closers.pub = z.NewCloser(1)
  345. go db.pub.listenForUpdates(db.closers.pub)
  346. valueDirLockGuard = nil
  347. dirLockGuard = nil
  348. manifestFile = nil
  349. return db, nil
  350. }
  351. // initBannedNamespaces retrieves the banned namespaces from the DB and updates in-memory structure.
  352. func (db *DB) initBannedNamespaces() error {
  353. if db.opt.NamespaceOffset < 0 {
  354. return nil
  355. }
  356. return db.View(func(txn *Txn) error {
  357. iopts := DefaultIteratorOptions
  358. iopts.Prefix = bannedNsKey
  359. iopts.PrefetchValues = false
  360. iopts.InternalAccess = true
  361. itr := txn.NewIterator(iopts)
  362. defer itr.Close()
  363. for itr.Rewind(); itr.Valid(); itr.Next() {
  364. key := y.BytesToU64(itr.Item().Key()[len(bannedNsKey):])
  365. db.bannedNamespaces.add(key)
  366. }
  367. return nil
  368. })
  369. }
  370. func (db *DB) MaxVersion() uint64 {
  371. var maxVersion uint64
  372. update := func(a uint64) {
  373. if a > maxVersion {
  374. maxVersion = a
  375. }
  376. }
  377. db.lock.Lock()
  378. // In read only mode, we do not create new mem table.
  379. if !db.opt.ReadOnly {
  380. update(db.mt.maxVersion)
  381. }
  382. for _, mt := range db.imm {
  383. update(mt.maxVersion)
  384. }
  385. db.lock.Unlock()
  386. for _, ti := range db.Tables() {
  387. update(ti.MaxVersion)
  388. }
  389. return maxVersion
  390. }
  391. func (db *DB) monitorCache(c *z.Closer) {
  392. defer c.Done()
  393. count := 0
  394. analyze := func(name string, metrics *ristretto.Metrics) {
  395. // If the mean life expectancy is less than 10 seconds, the cache
  396. // might be too small.
  397. le := metrics.LifeExpectancySeconds()
  398. if le == nil {
  399. return
  400. }
  401. lifeTooShort := le.Count > 0 && float64(le.Sum)/float64(le.Count) < 10
  402. hitRatioTooLow := metrics.Ratio() > 0 && metrics.Ratio() < 0.4
  403. if lifeTooShort && hitRatioTooLow {
  404. db.opt.Warningf("%s might be too small. Metrics: %s\n", name, metrics)
  405. db.opt.Warningf("Cache life expectancy (in seconds): %+v\n", le)
  406. } else if le.Count > 1000 && count%5 == 0 {
  407. db.opt.Infof("%s metrics: %s\n", name, metrics)
  408. }
  409. }
  410. ticker := time.NewTicker(1 * time.Minute)
  411. defer ticker.Stop()
  412. for {
  413. select {
  414. case <-c.HasBeenClosed():
  415. return
  416. case <-ticker.C:
  417. }
  418. analyze("Block cache", db.BlockCacheMetrics())
  419. analyze("Index cache", db.IndexCacheMetrics())
  420. count++
  421. }
  422. }
  423. // cleanup stops all the goroutines started by badger. This is used in open to
  424. // cleanup goroutines in case of an error.
  425. func (db *DB) cleanup() {
  426. db.stopMemoryFlush()
  427. db.stopCompactions()
  428. db.blockCache.Close()
  429. db.indexCache.Close()
  430. if db.closers.updateSize != nil {
  431. db.closers.updateSize.Signal()
  432. }
  433. if db.closers.valueGC != nil {
  434. db.closers.valueGC.Signal()
  435. }
  436. if db.closers.writes != nil {
  437. db.closers.writes.Signal()
  438. }
  439. if db.closers.pub != nil {
  440. db.closers.pub.Signal()
  441. }
  442. db.orc.Stop()
  443. // Do not use vlog.Close() here. vlog.Close truncates the files. We don't
  444. // want to truncate files unless the user has specified the truncate flag.
  445. }
  446. // BlockCacheMetrics returns the metrics for the underlying block cache.
  447. func (db *DB) BlockCacheMetrics() *ristretto.Metrics {
  448. if db.blockCache != nil {
  449. return db.blockCache.Metrics
  450. }
  451. return nil
  452. }
  453. // IndexCacheMetrics returns the metrics for the underlying index cache.
  454. func (db *DB) IndexCacheMetrics() *ristretto.Metrics {
  455. if db.indexCache != nil {
  456. return db.indexCache.Metrics
  457. }
  458. return nil
  459. }
  460. // Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
  461. // disk. Calling DB.Close() multiple times would still only close the DB once.
  462. func (db *DB) Close() error {
  463. var err error
  464. db.closeOnce.Do(func() {
  465. err = db.close()
  466. })
  467. return err
  468. }
  469. // IsClosed denotes if the badger DB is closed or not. A DB instance should not
  470. // be used after closing it.
  471. func (db *DB) IsClosed() bool {
  472. return db.isClosed.Load() == 1
  473. }
  474. func (db *DB) close() (err error) {
  475. defer db.allocPool.Release()
  476. db.opt.Debugf("Closing database")
  477. db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs.Load()))
  478. db.blockWrites.Store(1)
  479. db.isClosed.Store(1)
  480. if !db.opt.InMemory {
  481. // Stop value GC first.
  482. db.closers.valueGC.SignalAndWait()
  483. }
  484. // Stop writes next.
  485. db.closers.writes.SignalAndWait()
  486. // Don't accept any more write.
  487. close(db.writeCh)
  488. db.closers.pub.SignalAndWait()
  489. db.closers.cacheHealth.Signal()
  490. // Make sure that block writer is done pushing stuff into memtable!
  491. // Otherwise, you will have a race condition: we are trying to flush memtables
  492. // and remove them completely, while the block / memtable writer is still
  493. // trying to push stuff into the memtable. This will also resolve the value
  494. // offset problem: as we push into memtable, we update value offsets there.
  495. if db.mt != nil {
  496. if db.mt.sl.Empty() {
  497. // Remove the memtable if empty.
  498. db.mt.DecrRef()
  499. } else {
  500. db.opt.Debugf("Flushing memtable")
  501. for {
  502. pushedMemTable := func() bool {
  503. db.lock.Lock()
  504. defer db.lock.Unlock()
  505. y.AssertTrue(db.mt != nil)
  506. select {
  507. case db.flushChan <- db.mt:
  508. db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
  509. db.mt = nil // Will segfault if we try writing!
  510. db.opt.Debugf("pushed to flush chan\n")
  511. return true
  512. default:
  513. // If we fail to push, we need to unlock and wait for a short while.
  514. // The flushing operation needs to update s.imm. Otherwise, we have a
  515. // deadlock.
  516. // TODO: Think about how to do this more cleanly, maybe without any locks.
  517. }
  518. return false
  519. }()
  520. if pushedMemTable {
  521. break
  522. }
  523. time.Sleep(10 * time.Millisecond)
  524. }
  525. }
  526. }
  527. db.stopMemoryFlush()
  528. db.stopCompactions()
  529. // Force Compact L0
  530. // We don't need to care about cstatus since no parallel compaction is running.
  531. if db.opt.CompactL0OnClose {
  532. err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
  533. switch err {
  534. case errFillTables:
  535. // This error only means that there might be enough tables to do a compaction. So, we
  536. // should not report it to the end user to avoid confusing them.
  537. case nil:
  538. db.opt.Debugf("Force compaction on level 0 done")
  539. default:
  540. db.opt.Warningf("While forcing compaction on level 0: %v", err)
  541. }
  542. }
  543. // Now close the value log.
  544. if vlogErr := db.vlog.Close(); vlogErr != nil {
  545. err = y.Wrap(vlogErr, "DB.Close")
  546. }
  547. db.opt.Infof(db.LevelsToString())
  548. if lcErr := db.lc.close(); err == nil {
  549. err = y.Wrap(lcErr, "DB.Close")
  550. }
  551. db.opt.Debugf("Waiting for closer")
  552. db.closers.updateSize.SignalAndWait()
  553. db.orc.Stop()
  554. db.blockCache.Close()
  555. db.indexCache.Close()
  556. db.threshold.close()
  557. if db.opt.InMemory {
  558. return
  559. }
  560. if db.dirLockGuard != nil {
  561. if guardErr := db.dirLockGuard.release(); err == nil {
  562. err = y.Wrap(guardErr, "DB.Close")
  563. }
  564. }
  565. if db.valueDirGuard != nil {
  566. if guardErr := db.valueDirGuard.release(); err == nil {
  567. err = y.Wrap(guardErr, "DB.Close")
  568. }
  569. }
  570. if manifestErr := db.manifest.close(); err == nil {
  571. err = y.Wrap(manifestErr, "DB.Close")
  572. }
  573. if registryErr := db.registry.Close(); err == nil {
  574. err = y.Wrap(registryErr, "DB.Close")
  575. }
  576. // Fsync directories to ensure that lock file, and any other removed files whose directory
  577. // we haven't specifically fsynced, are guaranteed to have their directory entry removal
  578. // persisted to disk.
  579. if syncErr := db.syncDir(db.opt.Dir); err == nil {
  580. err = y.Wrap(syncErr, "DB.Close")
  581. }
  582. if syncErr := db.syncDir(db.opt.ValueDir); err == nil {
  583. err = y.Wrap(syncErr, "DB.Close")
  584. }
  585. return err
  586. }
  587. // VerifyChecksum verifies checksum for all tables on all levels.
  588. // This method can be used to verify checksum, if opt.ChecksumVerificationMode is NoVerification.
  589. func (db *DB) VerifyChecksum() error {
  590. return db.lc.verifyChecksum()
  591. }
  592. const (
  593. lockFile = "LOCK"
  594. )
  595. // Sync syncs database content to disk. This function provides
  596. // more control to user to sync data whenever required.
  597. func (db *DB) Sync() error {
  598. /**
  599. Make an attempt to sync both the logs, the active memtable's WAL and the vLog (1847).
  600. Cases:
  601. - All_ok :: If both the logs sync successfully.
  602. - Entry_Lost :: If an entry with a value pointer was present in the active memtable's WAL,
  603. :: and the WAL was synced but there was an error in syncing the vLog.
  604. :: The entry will be considered lost and this case will need to be handled during recovery.
  605. - Entries_Lost :: If there were errors in syncing both the logs, multiple entries would be lost.
  606. - Entries_Lost :: If the active memtable's WAL is not synced but the vLog is synced, it will
  607. :: result in entries being lost because recovery of the active memtable is done from its WAL.
  608. :: Check `UpdateSkipList` in memtable.go.
  609. - Nothing_lost :: If an entry with its value was present in the active memtable's WAL, and the WAL was synced,
  610. :: but there was an error in syncing the vLog.
  611. :: Nothing is lost for this very specific entry because the entry is completely present in the memtable's WAL.
  612. - Partially_lost :: If entries were written partially in either of the logs,
  613. :: the logs will be truncated during recovery.
  614. :: As a result of truncation, some entries might be lost.
  615. :: Assume that 4KB of data is to be synced and invoking `Sync` results only in syncing 3KB
  616. :: of data and then the machine shuts down or the disk failure happens,
  617. :: this will result in partial writes. [[This case needs verification]]
  618. */
  619. db.lock.RLock()
  620. memtableSyncError := db.mt.SyncWAL()
  621. db.lock.RUnlock()
  622. vLogSyncError := db.vlog.sync()
  623. return y.CombineErrors(memtableSyncError, vLogSyncError)
  624. }
  625. // getMemtables returns the current memtables and get references.
  626. func (db *DB) getMemTables() ([]*memTable, func()) {
  627. db.lock.RLock()
  628. defer db.lock.RUnlock()
  629. var tables []*memTable
  630. // Mutable memtable does not exist in read-only mode.
  631. if !db.opt.ReadOnly {
  632. // Get mutable memtable.
  633. tables = append(tables, db.mt)
  634. db.mt.IncrRef()
  635. }
  636. // Get immutable memtables.
  637. last := len(db.imm) - 1
  638. for i := range db.imm {
  639. tables = append(tables, db.imm[last-i])
  640. db.imm[last-i].IncrRef()
  641. }
  642. return tables, func() {
  643. for _, tbl := range tables {
  644. tbl.DecrRef()
  645. }
  646. }
  647. }
  648. // get returns the value in memtable or disk for given key.
  649. // Note that value will include meta byte.
  650. //
  651. // IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to
  652. // maintain this invariant to search for the latest value of a key, or else we need to search in all
  653. // tables and find the max version among them. To maintain this invariant, we also need to ensure
  654. // that all versions of a key are always present in the same table from level 1, because compaction
  655. // can push any table down.
  656. //
  657. // Update(23/09/2020) - We have dropped the move key implementation. Earlier we
  658. // were inserting move keys to fix the invalid value pointers but we no longer
  659. // do that. For every get("fooX") call where X is the version, we will search
  660. // for "fooX" in all the levels of the LSM tree. This is expensive but it
  661. // removes the overhead of handling move keys completely.
  662. func (db *DB) get(key []byte) (y.ValueStruct, error) {
  663. if db.IsClosed() {
  664. return y.ValueStruct{}, ErrDBClosed
  665. }
  666. tables, decr := db.getMemTables() // Lock should be released.
  667. defer decr()
  668. var maxVs y.ValueStruct
  669. version := y.ParseTs(key)
  670. y.NumGetsAdd(db.opt.MetricsEnabled, 1)
  671. for i := 0; i < len(tables); i++ {
  672. vs := tables[i].sl.Get(key)
  673. y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1)
  674. if vs.Meta == 0 && vs.Value == nil {
  675. continue
  676. }
  677. // Found the required version of the key, return immediately.
  678. if vs.Version == version {
  679. y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
  680. return vs, nil
  681. }
  682. if maxVs.Version < vs.Version {
  683. maxVs = vs
  684. }
  685. }
  686. return db.lc.get(key, maxVs, 0)
  687. }
  688. var requestPool = sync.Pool{
  689. New: func() interface{} {
  690. return new(request)
  691. },
  692. }
  693. func (db *DB) writeToLSM(b *request) error {
  694. // We should check the length of b.Prts and b.Entries only when badger is not
  695. // running in InMemory mode. In InMemory mode, we don't write anything to the
  696. // value log and that's why the length of b.Ptrs will always be zero.
  697. if !db.opt.InMemory && len(b.Ptrs) != len(b.Entries) {
  698. return errors.Errorf("Ptrs and Entries don't match: %+v", b)
  699. }
  700. for i, entry := range b.Entries {
  701. var err error
  702. if entry.skipVlogAndSetThreshold(db.valueThreshold()) {
  703. // Will include deletion / tombstone case.
  704. err = db.mt.Put(entry.Key,
  705. y.ValueStruct{
  706. Value: entry.Value,
  707. // Ensure value pointer flag is removed. Otherwise, the value will fail
  708. // to be retrieved during iterator prefetch. `bitValuePointer` is only
  709. // known to be set in write to LSM when the entry is loaded from a backup
  710. // with lower ValueThreshold and its value was stored in the value log.
  711. Meta: entry.meta &^ bitValuePointer,
  712. UserMeta: entry.UserMeta,
  713. ExpiresAt: entry.ExpiresAt,
  714. })
  715. } else {
  716. // Write pointer to Memtable.
  717. err = db.mt.Put(entry.Key,
  718. y.ValueStruct{
  719. Value: b.Ptrs[i].Encode(),
  720. Meta: entry.meta | bitValuePointer,
  721. UserMeta: entry.UserMeta,
  722. ExpiresAt: entry.ExpiresAt,
  723. })
  724. }
  725. if err != nil {
  726. return y.Wrapf(err, "while writing to memTable")
  727. }
  728. }
  729. if db.opt.SyncWrites {
  730. return db.mt.SyncWAL()
  731. }
  732. return nil
  733. }
  734. // writeRequests is called serially by only one goroutine.
  735. func (db *DB) writeRequests(reqs []*request) error {
  736. if len(reqs) == 0 {
  737. return nil
  738. }
  739. done := func(err error) {
  740. for _, r := range reqs {
  741. r.Err = err
  742. r.Wg.Done()
  743. }
  744. }
  745. db.opt.Debugf("writeRequests called. Writing to value log")
  746. err := db.vlog.write(reqs)
  747. if err != nil {
  748. done(err)
  749. return err
  750. }
  751. db.opt.Debugf("Writing to memtable")
  752. var count int
  753. for _, b := range reqs {
  754. if len(b.Entries) == 0 {
  755. continue
  756. }
  757. count += len(b.Entries)
  758. var i uint64
  759. var err error
  760. for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
  761. i++
  762. if i%100 == 0 {
  763. db.opt.Debugf("Making room for writes")
  764. }
  765. // We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
  766. // When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
  767. // you will get a deadlock.
  768. time.Sleep(10 * time.Millisecond)
  769. }
  770. if err != nil {
  771. done(err)
  772. return y.Wrap(err, "writeRequests")
  773. }
  774. if err := db.writeToLSM(b); err != nil {
  775. done(err)
  776. return y.Wrap(err, "writeRequests")
  777. }
  778. }
  779. db.opt.Debugf("Sending updates to subscribers")
  780. db.pub.sendUpdates(reqs)
  781. done(nil)
  782. db.opt.Debugf("%d entries written", count)
  783. return nil
  784. }
  785. func (db *DB) sendToWriteCh(entries []*Entry) (*request, error) {
  786. if db.blockWrites.Load() == 1 {
  787. return nil, ErrBlockedWrites
  788. }
  789. var count, size int64
  790. for _, e := range entries {
  791. size += e.estimateSizeAndSetThreshold(db.valueThreshold())
  792. count++
  793. }
  794. y.NumBytesWrittenUserAdd(db.opt.MetricsEnabled, size)
  795. if count >= db.opt.maxBatchCount || size >= db.opt.maxBatchSize {
  796. return nil, ErrTxnTooBig
  797. }
  798. // We can only service one request because we need each txn to be stored in a contiguous section.
  799. // Txns should not interleave among other txns or rewrites.
  800. req := requestPool.Get().(*request)
  801. req.reset()
  802. req.Entries = entries
  803. req.Wg.Add(1)
  804. req.IncrRef() // for db write
  805. db.writeCh <- req // Handled in doWrites.
  806. y.NumPutsAdd(db.opt.MetricsEnabled, int64(len(entries)))
  807. return req, nil
  808. }
  809. func (db *DB) doWrites(lc *z.Closer) {
  810. defer lc.Done()
  811. pendingCh := make(chan struct{}, 1)
  812. writeRequests := func(reqs []*request) {
  813. if err := db.writeRequests(reqs); err != nil {
  814. db.opt.Errorf("writeRequests: %v", err)
  815. }
  816. <-pendingCh
  817. }
  818. // This variable tracks the number of pending writes.
  819. reqLen := new(expvar.Int)
  820. y.PendingWritesSet(db.opt.MetricsEnabled, db.opt.Dir, reqLen)
  821. reqs := make([]*request, 0, 10)
  822. for {
  823. var r *request
  824. select {
  825. case r = <-db.writeCh:
  826. case <-lc.HasBeenClosed():
  827. goto closedCase
  828. }
  829. for {
  830. reqs = append(reqs, r)
  831. reqLen.Set(int64(len(reqs)))
  832. if len(reqs) >= 3*kvWriteChCapacity {
  833. pendingCh <- struct{}{} // blocking.
  834. goto writeCase
  835. }
  836. select {
  837. // Either push to pending, or continue to pick from writeCh.
  838. case r = <-db.writeCh:
  839. case pendingCh <- struct{}{}:
  840. goto writeCase
  841. case <-lc.HasBeenClosed():
  842. goto closedCase
  843. }
  844. }
  845. closedCase:
  846. // All the pending request are drained.
  847. // Don't close the writeCh, because it has be used in several places.
  848. for {
  849. select {
  850. case r = <-db.writeCh:
  851. reqs = append(reqs, r)
  852. default:
  853. pendingCh <- struct{}{} // Push to pending before doing a write.
  854. writeRequests(reqs)
  855. return
  856. }
  857. }
  858. writeCase:
  859. go writeRequests(reqs)
  860. reqs = make([]*request, 0, 10)
  861. reqLen.Set(0)
  862. }
  863. }
  864. // batchSet applies a list of badger.Entry. If a request level error occurs it
  865. // will be returned.
  866. //
  867. // Check(kv.BatchSet(entries))
  868. func (db *DB) batchSet(entries []*Entry) error {
  869. req, err := db.sendToWriteCh(entries)
  870. if err != nil {
  871. return err
  872. }
  873. return req.Wait()
  874. }
  875. // batchSetAsync is the asynchronous version of batchSet. It accepts a callback
  876. // function which is called when all the sets are complete. If a request level
  877. // error occurs, it will be passed back via the callback.
  878. //
  879. // err := kv.BatchSetAsync(entries, func(err error)) {
  880. // Check(err)
  881. // }
  882. func (db *DB) batchSetAsync(entries []*Entry, f func(error)) error {
  883. req, err := db.sendToWriteCh(entries)
  884. if err != nil {
  885. return err
  886. }
  887. go func() {
  888. err := req.Wait()
  889. // Write is complete. Let's call the callback function now.
  890. f(err)
  891. }()
  892. return nil
  893. }
  894. var errNoRoom = stderrors.New("No room for write")
  895. // ensureRoomForWrite is always called serially.
  896. func (db *DB) ensureRoomForWrite() error {
  897. var err error
  898. db.lock.Lock()
  899. defer db.lock.Unlock()
  900. y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed.
  901. if !db.mt.isFull() {
  902. return nil
  903. }
  904. select {
  905. case db.flushChan <- db.mt:
  906. db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n",
  907. db.mt.sl.MemSize(), len(db.flushChan))
  908. // We manage to push this task. Let's modify imm.
  909. db.imm = append(db.imm, db.mt)
  910. db.mt, err = db.newMemTable()
  911. if err != nil {
  912. return y.Wrapf(err, "cannot create new mem table")
  913. }
  914. // New memtable is empty. We certainly have room.
  915. return nil
  916. default:
  917. // We need to do this to unlock and allow the flusher to modify imm.
  918. return errNoRoom
  919. }
  920. }
  921. func arenaSize(opt Options) int64 {
  922. return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize)
  923. }
  924. // buildL0Table builds a new table from the memtable.
  925. func buildL0Table(iter y.Iterator, dropPrefixes [][]byte, bopts table.Options) *table.Builder {
  926. defer iter.Close()
  927. b := table.NewTableBuilder(bopts)
  928. for iter.Rewind(); iter.Valid(); iter.Next() {
  929. if len(dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), dropPrefixes) {
  930. continue
  931. }
  932. vs := iter.Value()
  933. var vp valuePointer
  934. if vs.Meta&bitValuePointer > 0 {
  935. vp.Decode(vs.Value)
  936. }
  937. b.Add(iter.Key(), iter.Value(), vp.Len)
  938. }
  939. return b
  940. }
  941. // handleMemTableFlush must be run serially.
  942. func (db *DB) handleMemTableFlush(mt *memTable, dropPrefixes [][]byte) error {
  943. bopts := buildTableOptions(db)
  944. itr := mt.sl.NewUniIterator(false)
  945. builder := buildL0Table(itr, nil, bopts)
  946. defer builder.Close()
  947. // buildL0Table can return nil if the none of the items in the skiplist are
  948. // added to the builder. This can happen when drop prefix is set and all
  949. // the items are skipped.
  950. if builder.Empty() {
  951. builder.Finish()
  952. return nil
  953. }
  954. fileID := db.lc.reserveFileID()
  955. var tbl *table.Table
  956. var err error
  957. if db.opt.InMemory {
  958. data := builder.Finish()
  959. tbl, err = table.OpenInMemoryTable(data, fileID, &bopts)
  960. } else {
  961. tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), builder)
  962. }
  963. if err != nil {
  964. return y.Wrap(err, "error while creating table")
  965. }
  966. // We own a ref on tbl.
  967. err = db.lc.addLevel0Table(tbl) // This will incrRef
  968. _ = tbl.DecrRef() // Releases our ref.
  969. return err
  970. }
  971. // flushMemtable must keep running until we send it an empty memtable. If there
  972. // are errors during handling the memtable flush, we'll retry indefinitely.
  973. func (db *DB) flushMemtable(lc *z.Closer) {
  974. defer lc.Done()
  975. for mt := range db.flushChan {
  976. if mt == nil {
  977. continue
  978. }
  979. for {
  980. if err := db.handleMemTableFlush(mt, nil); err != nil {
  981. // Encountered error. Retry indefinitely.
  982. db.opt.Errorf("error flushing memtable to disk: %v, retrying", err)
  983. time.Sleep(time.Second)
  984. continue
  985. }
  986. // Update s.imm. Need a lock.
  987. db.lock.Lock()
  988. // This is a single-threaded operation. mt corresponds to the head of
  989. // db.imm list. Once we flush it, we advance db.imm. The next mt
  990. // which would arrive here would match db.imm[0], because we acquire a
  991. // lock over DB when pushing to flushChan.
  992. // TODO: This logic is dirty AF. Any change and this could easily break.
  993. y.AssertTrue(mt == db.imm[0])
  994. db.imm = db.imm[1:]
  995. mt.DecrRef() // Return memory.
  996. // unlock
  997. db.lock.Unlock()
  998. break
  999. }
  1000. }
  1001. }
  1002. func exists(path string) (bool, error) {
  1003. _, err := os.Stat(path)
  1004. if err == nil {
  1005. return true, nil
  1006. }
  1007. if os.IsNotExist(err) {
  1008. return false, nil
  1009. }
  1010. return true, err
  1011. }
  1012. // This function does a filewalk, calculates the size of vlog and sst files and stores it in
  1013. // y.LSMSize and y.VlogSize.
  1014. func (db *DB) calculateSize() {
  1015. if db.opt.InMemory {
  1016. return
  1017. }
  1018. newInt := func(val int64) *expvar.Int {
  1019. v := new(expvar.Int)
  1020. v.Add(val)
  1021. return v
  1022. }
  1023. totalSize := func(dir string) (int64, int64) {
  1024. var lsmSize, vlogSize int64
  1025. err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
  1026. if err != nil {
  1027. return err
  1028. }
  1029. ext := filepath.Ext(path)
  1030. switch ext {
  1031. case ".sst":
  1032. lsmSize += info.Size()
  1033. case ".vlog":
  1034. vlogSize += info.Size()
  1035. }
  1036. return nil
  1037. })
  1038. if err != nil {
  1039. db.opt.Debugf("Got error while calculating total size of directory: %s", dir)
  1040. }
  1041. return lsmSize, vlogSize
  1042. }
  1043. lsmSize, vlogSize := totalSize(db.opt.Dir)
  1044. y.LSMSizeSet(db.opt.MetricsEnabled, db.opt.Dir, newInt(lsmSize))
  1045. // If valueDir is different from dir, we'd have to do another walk.
  1046. if db.opt.ValueDir != db.opt.Dir {
  1047. _, vlogSize = totalSize(db.opt.ValueDir)
  1048. }
  1049. y.VlogSizeSet(db.opt.MetricsEnabled, db.opt.ValueDir, newInt(vlogSize))
  1050. }
  1051. func (db *DB) updateSize(lc *z.Closer) {
  1052. defer lc.Done()
  1053. if db.opt.InMemory {
  1054. return
  1055. }
  1056. metricsTicker := time.NewTicker(time.Minute)
  1057. defer metricsTicker.Stop()
  1058. for {
  1059. select {
  1060. case <-metricsTicker.C:
  1061. db.calculateSize()
  1062. case <-lc.HasBeenClosed():
  1063. return
  1064. }
  1065. }
  1066. }
  1067. // RunValueLogGC triggers a value log garbage collection.
  1068. //
  1069. // It picks value log files to perform GC based on statistics that are collected
  1070. // during compactions. If no such statistics are available, then log files are
  1071. // picked in random order. The process stops as soon as the first log file is
  1072. // encountered which does not result in garbage collection.
  1073. //
  1074. // When a log file is picked, it is first sampled. If the sample shows that we
  1075. // can discard at least discardRatio space of that file, it would be rewritten.
  1076. //
  1077. // If a call to RunValueLogGC results in no rewrites, then an ErrNoRewrite is
  1078. // thrown indicating that the call resulted in no file rewrites.
  1079. //
  1080. // We recommend setting discardRatio to 0.5, thus indicating that a file be
  1081. // rewritten if half the space can be discarded. This results in a lifetime
  1082. // value log write amplification of 2 (1 from original write + 0.5 rewrite +
  1083. // 0.25 + 0.125 + ... = 2). Setting it to higher value would result in fewer
  1084. // space reclaims, while setting it to a lower value would result in more space
  1085. // reclaims at the cost of increased activity on the LSM tree. discardRatio
  1086. // must be in the range (0.0, 1.0), both endpoints excluded, otherwise an
  1087. // ErrInvalidRequest is returned.
  1088. //
  1089. // Only one GC is allowed at a time. If another value log GC is running, or DB
  1090. // has been closed, this would return an ErrRejected.
  1091. //
  1092. // Note: Every time GC is run, it would produce a spike of activity on the LSM
  1093. // tree.
  1094. func (db *DB) RunValueLogGC(discardRatio float64) error {
  1095. if db.opt.InMemory {
  1096. return ErrGCInMemoryMode
  1097. }
  1098. if discardRatio >= 1.0 || discardRatio <= 0.0 {
  1099. return ErrInvalidRequest
  1100. }
  1101. // Pick a log file and run GC
  1102. return db.vlog.runGC(discardRatio)
  1103. }
  1104. // Size returns the size of lsm and value log files in bytes. It can be used to decide how often to
  1105. // call RunValueLogGC.
  1106. func (db *DB) Size() (lsm, vlog int64) {
  1107. if y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir) == nil {
  1108. lsm, vlog = 0, 0
  1109. return
  1110. }
  1111. lsm = y.LSMSizeGet(db.opt.MetricsEnabled, db.opt.Dir).(*expvar.Int).Value()
  1112. vlog = y.VlogSizeGet(db.opt.MetricsEnabled, db.opt.ValueDir).(*expvar.Int).Value()
  1113. return
  1114. }
  1115. // Sequence represents a Badger sequence.
  1116. type Sequence struct {
  1117. lock sync.Mutex
  1118. db *DB
  1119. key []byte
  1120. next uint64
  1121. leased uint64
  1122. bandwidth uint64
  1123. }
  1124. // Next would return the next integer in the sequence, updating the lease by running a transaction
  1125. // if needed.
  1126. func (seq *Sequence) Next() (uint64, error) {
  1127. seq.lock.Lock()
  1128. defer seq.lock.Unlock()
  1129. if seq.next >= seq.leased {
  1130. if err := seq.updateLease(); err != nil {
  1131. return 0, err
  1132. }
  1133. }
  1134. val := seq.next
  1135. seq.next++
  1136. return val, nil
  1137. }
  1138. // Release the leased sequence to avoid wasted integers. This should be done right
  1139. // before closing the associated DB. However it is valid to use the sequence after
  1140. // it was released, causing a new lease with full bandwidth.
  1141. func (seq *Sequence) Release() error {
  1142. seq.lock.Lock()
  1143. defer seq.lock.Unlock()
  1144. err := seq.db.Update(func(txn *Txn) error {
  1145. item, err := txn.Get(seq.key)
  1146. if err != nil {
  1147. return err
  1148. }
  1149. var num uint64
  1150. if err := item.Value(func(v []byte) error {
  1151. num = binary.BigEndian.Uint64(v)
  1152. return nil
  1153. }); err != nil {
  1154. return err
  1155. }
  1156. if num == seq.leased {
  1157. var buf [8]byte
  1158. binary.BigEndian.PutUint64(buf[:], seq.next)
  1159. return txn.SetEntry(NewEntry(seq.key, buf[:]))
  1160. }
  1161. return nil
  1162. })
  1163. if err != nil {
  1164. return err
  1165. }
  1166. seq.leased = seq.next
  1167. return nil
  1168. }
  1169. func (seq *Sequence) updateLease() error {
  1170. return seq.db.Update(func(txn *Txn) error {
  1171. item, err := txn.Get(seq.key)
  1172. switch {
  1173. case err == ErrKeyNotFound:
  1174. seq.next = 0
  1175. case err != nil:
  1176. return err
  1177. default:
  1178. var num uint64
  1179. if err := item.Value(func(v []byte) error {
  1180. num = binary.BigEndian.Uint64(v)
  1181. return nil
  1182. }); err != nil {
  1183. return err
  1184. }
  1185. seq.next = num
  1186. }
  1187. lease := seq.next + seq.bandwidth
  1188. var buf [8]byte
  1189. binary.BigEndian.PutUint64(buf[:], lease)
  1190. if err = txn.SetEntry(NewEntry(seq.key, buf[:])); err != nil {
  1191. return err
  1192. }
  1193. seq.leased = lease
  1194. return nil
  1195. })
  1196. }
  1197. // GetSequence would initiate a new sequence object, generating it from the stored lease, if
  1198. // available, in the database. Sequence can be used to get a list of monotonically increasing
  1199. // integers. Multiple sequences can be created by providing different keys. Bandwidth sets the
  1200. // size of the lease, determining how many Next() requests can be served from memory.
  1201. //
  1202. // GetSequence is not supported on ManagedDB. Calling this would result in a panic.
  1203. func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) {
  1204. if db.opt.managedTxns {
  1205. panic("Cannot use GetSequence with managedDB=true.")
  1206. }
  1207. switch {
  1208. case len(key) == 0:
  1209. return nil, ErrEmptyKey
  1210. case bandwidth == 0:
  1211. return nil, ErrZeroBandwidth
  1212. }
  1213. seq := &Sequence{
  1214. db: db,
  1215. key: key,
  1216. next: 0,
  1217. leased: 0,
  1218. bandwidth: bandwidth,
  1219. }
  1220. err := seq.updateLease()
  1221. return seq, err
  1222. }
  1223. // Tables gets the TableInfo objects from the level controller. If withKeysCount
  1224. // is true, TableInfo objects also contain counts of keys for the tables.
  1225. func (db *DB) Tables() []TableInfo {
  1226. return db.lc.getTableInfo()
  1227. }
  1228. // Levels gets the LevelInfo.
  1229. func (db *DB) Levels() []LevelInfo {
  1230. return db.lc.getLevelInfo()
  1231. }
  1232. // EstimateSize can be used to get rough estimate of data size for a given prefix.
  1233. func (db *DB) EstimateSize(prefix []byte) (uint64, uint64) {
  1234. var onDiskSize, uncompressedSize uint64
  1235. tables := db.Tables()
  1236. for _, ti := range tables {
  1237. if bytes.HasPrefix(ti.Left, prefix) && bytes.HasPrefix(ti.Right, prefix) {
  1238. onDiskSize += uint64(ti.OnDiskSize)
  1239. uncompressedSize += uint64(ti.UncompressedSize)
  1240. }
  1241. }
  1242. return onDiskSize, uncompressedSize
  1243. }
  1244. // Ranges can be used to get rough key ranges to divide up iteration over the DB. The ranges here
  1245. // would consider the prefix, but would not necessarily start or end with the prefix. In fact, the
  1246. // first range would have nil as left key, and the last range would have nil as the right key.
  1247. func (db *DB) Ranges(prefix []byte, numRanges int) []*keyRange {
  1248. var splits []string
  1249. tables := db.Tables()
  1250. // We just want table ranges here and not keys count.
  1251. for _, ti := range tables {
  1252. // We don't use ti.Left, because that has a tendency to store !badger keys. Skip over tables
  1253. // at upper levels. Only choose tables from the last level.
  1254. if ti.Level != db.opt.MaxLevels-1 {
  1255. continue
  1256. }
  1257. if bytes.HasPrefix(ti.Right, prefix) {
  1258. splits = append(splits, string(ti.Right))
  1259. }
  1260. }
  1261. // If the number of splits is low, look at the offsets inside the
  1262. // tables to generate more splits.
  1263. if len(splits) < 32 {
  1264. numTables := len(tables)
  1265. if numTables == 0 {
  1266. numTables = 1
  1267. }
  1268. numPerTable := 32 / numTables
  1269. if numPerTable == 0 {
  1270. numPerTable = 1
  1271. }
  1272. splits = db.lc.keySplits(numPerTable, prefix)
  1273. }
  1274. // If the number of splits is still < 32, then look at the memtables.
  1275. if len(splits) < 32 {
  1276. maxPerSplit := 10000
  1277. mtSplits := func(mt *memTable) {
  1278. if mt == nil {
  1279. return
  1280. }
  1281. count := 0
  1282. iter := mt.sl.NewIterator()
  1283. for iter.SeekToFirst(); iter.Valid(); iter.Next() {
  1284. if count%maxPerSplit == 0 {
  1285. // Add a split every maxPerSplit keys.
  1286. if bytes.HasPrefix(iter.Key(), prefix) {
  1287. splits = append(splits, string(iter.Key()))
  1288. }
  1289. }
  1290. count += 1
  1291. }
  1292. _ = iter.Close()
  1293. }
  1294. db.lock.Lock()
  1295. defer db.lock.Unlock()
  1296. var memTables []*memTable
  1297. memTables = append(memTables, db.imm...)
  1298. for _, mt := range memTables {
  1299. mtSplits(mt)
  1300. }
  1301. mtSplits(db.mt)
  1302. }
  1303. // We have our splits now. Let's convert them to ranges.
  1304. sort.Strings(splits)
  1305. var ranges []*keyRange
  1306. var start []byte
  1307. for _, key := range splits {
  1308. ranges = append(ranges, &keyRange{left: start, right: y.SafeCopy(nil, []byte(key))})
  1309. start = y.SafeCopy(nil, []byte(key))
  1310. }
  1311. ranges = append(ranges, &keyRange{left: start})
  1312. // Figure out the approximate table size this range has to deal with.
  1313. for _, t := range tables {
  1314. tr := keyRange{left: t.Left, right: t.Right}
  1315. for _, r := range ranges {
  1316. if len(r.left) == 0 || len(r.right) == 0 {
  1317. continue
  1318. }
  1319. if r.overlapsWith(tr) {
  1320. r.size += int64(t.UncompressedSize)
  1321. }
  1322. }
  1323. }
  1324. var total int64
  1325. for _, r := range ranges {
  1326. total += r.size
  1327. }
  1328. if total == 0 {
  1329. return ranges
  1330. }
  1331. // Figure out the average size, so we know how to bin the ranges together.
  1332. avg := total / int64(numRanges)
  1333. var out []*keyRange
  1334. var i int
  1335. for i < len(ranges) {
  1336. r := ranges[i]
  1337. cur := &keyRange{left: r.left, size: r.size, right: r.right}
  1338. i++
  1339. for ; i < len(ranges); i++ {
  1340. next := ranges[i]
  1341. if cur.size+next.size > avg {
  1342. break
  1343. }
  1344. cur.right = next.right
  1345. cur.size += next.size
  1346. }
  1347. out = append(out, cur)
  1348. }
  1349. return out
  1350. }
  1351. // MaxBatchCount returns max possible entries in batch
  1352. func (db *DB) MaxBatchCount() int64 {
  1353. return db.opt.maxBatchCount
  1354. }
  1355. // MaxBatchSize returns max possible batch size
  1356. func (db *DB) MaxBatchSize() int64 {
  1357. return db.opt.maxBatchSize
  1358. }
  1359. func (db *DB) stopMemoryFlush() {
  1360. // Stop memtable flushes.
  1361. if db.closers.memtable != nil {
  1362. close(db.flushChan)
  1363. db.closers.memtable.SignalAndWait()
  1364. }
  1365. }
  1366. func (db *DB) stopCompactions() {
  1367. // Stop compactions.
  1368. if db.closers.compactors != nil {
  1369. db.closers.compactors.SignalAndWait()
  1370. }
  1371. }
  1372. func (db *DB) startCompactions() {
  1373. // Resume compactions.
  1374. if db.closers.compactors != nil {
  1375. db.closers.compactors = z.NewCloser(1)
  1376. db.lc.startCompact(db.closers.compactors)
  1377. }
  1378. }
  1379. func (db *DB) startMemoryFlush() {
  1380. // Start memory fluhser.
  1381. if db.closers.memtable != nil {
  1382. db.flushChan = make(chan *memTable, db.opt.NumMemtables)
  1383. db.closers.memtable = z.NewCloser(1)
  1384. go func() {
  1385. db.flushMemtable(db.closers.memtable)
  1386. }()
  1387. }
  1388. }
  1389. // Flatten can be used to force compactions on the LSM tree so all the tables fall on the same
  1390. // level. This ensures that all the versions of keys are colocated and not split across multiple
  1391. // levels, which is necessary after a restore from backup. During Flatten, live compactions are
  1392. // stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition
  1393. // between flattening the tree and new tables being created at level zero.
  1394. func (db *DB) Flatten(workers int) error {
  1395. db.stopCompactions()
  1396. defer db.startCompactions()
  1397. compactAway := func(cp compactionPriority) error {
  1398. db.opt.Infof("Attempting to compact with %+v\n", cp)
  1399. errCh := make(chan error, 1)
  1400. for i := 0; i < workers; i++ {
  1401. go func() {
  1402. errCh <- db.lc.doCompact(175, cp)
  1403. }()
  1404. }
  1405. var success int
  1406. var rerr error
  1407. for i := 0; i < workers; i++ {
  1408. err := <-errCh
  1409. if err != nil {
  1410. rerr = err
  1411. db.opt.Warningf("While running doCompact with %+v. Error: %v\n", cp, err)
  1412. } else {
  1413. success++
  1414. }
  1415. }
  1416. if success == 0 {
  1417. return rerr
  1418. }
  1419. // We could do at least one successful compaction. So, we'll consider this a success.
  1420. db.opt.Infof("%d compactor(s) succeeded. One or more tables from level %d compacted.\n",
  1421. success, cp.level)
  1422. return nil
  1423. }
  1424. hbytes := func(sz int64) string {
  1425. return humanize.IBytes(uint64(sz))
  1426. }
  1427. t := db.lc.levelTargets()
  1428. for {
  1429. db.opt.Infof("\n")
  1430. var levels []int
  1431. for i, l := range db.lc.levels {
  1432. sz := l.getTotalSize()
  1433. db.opt.Infof("Level: %d. %8s Size. %8s Max.\n",
  1434. i, hbytes(l.getTotalSize()), hbytes(t.targetSz[i]))
  1435. if sz > 0 {
  1436. levels = append(levels, i)
  1437. }
  1438. }
  1439. if len(levels) <= 1 {
  1440. prios := db.lc.pickCompactLevels(nil)
  1441. if len(prios) == 0 || prios[0].score <= 1.0 {
  1442. db.opt.Infof("All tables consolidated into one level. Flattening done.\n")
  1443. return nil
  1444. }
  1445. if err := compactAway(prios[0]); err != nil {
  1446. return err
  1447. }
  1448. continue
  1449. }
  1450. // Create an artificial compaction priority, to ensure that we compact the level.
  1451. cp := compactionPriority{level: levels[0], score: 1.71}
  1452. if err := compactAway(cp); err != nil {
  1453. return err
  1454. }
  1455. }
  1456. }
  1457. func (db *DB) blockWrite() error {
  1458. // Stop accepting new writes.
  1459. if !db.blockWrites.CompareAndSwap(0, 1) {
  1460. return ErrBlockedWrites
  1461. }
  1462. // Make all pending writes finish. The following will also close writeCh.
  1463. db.closers.writes.SignalAndWait()
  1464. db.opt.Infof("Writes flushed. Stopping compactions now...")
  1465. return nil
  1466. }
  1467. func (db *DB) unblockWrite() {
  1468. db.closers.writes = z.NewCloser(1)
  1469. go db.doWrites(db.closers.writes)
  1470. // Resume writes.
  1471. db.blockWrites.Store(0)
  1472. }
  1473. func (db *DB) prepareToDrop() (func(), error) {
  1474. if db.opt.ReadOnly {
  1475. panic("Attempting to drop data in read-only mode.")
  1476. }
  1477. // In order prepare for drop, we need to block the incoming writes and
  1478. // write it to db. Then, flush all the pending memtable. So that, we
  1479. // don't miss any entries.
  1480. if err := db.blockWrite(); err != nil {
  1481. return func() {}, err
  1482. }
  1483. reqs := make([]*request, 0, 10)
  1484. for {
  1485. select {
  1486. case r := <-db.writeCh:
  1487. reqs = append(reqs, r)
  1488. default:
  1489. if err := db.writeRequests(reqs); err != nil {
  1490. db.opt.Errorf("writeRequests: %v", err)
  1491. }
  1492. db.stopMemoryFlush()
  1493. return func() {
  1494. db.opt.Infof("Resuming writes")
  1495. db.startMemoryFlush()
  1496. db.unblockWrite()
  1497. }, nil
  1498. }
  1499. }
  1500. }
  1501. // DropAll would drop all the data stored in Badger. It does this in the following way.
  1502. // - Stop accepting new writes.
  1503. // - Pause memtable flushes and compactions.
  1504. // - Pick all tables from all levels, create a changeset to delete all these
  1505. // tables and apply it to manifest.
  1506. // - Pick all log files from value log, and delete all of them. Restart value log files from zero.
  1507. // - Resume memtable flushes and compactions.
  1508. //
  1509. // NOTE: DropAll is resilient to concurrent writes, but not to reads. It is up to the user to not do
  1510. // any reads while DropAll is going on, otherwise they may result in panics. Ideally, both reads and
  1511. // writes are paused before running DropAll, and resumed after it is finished.
  1512. func (db *DB) DropAll() error {
  1513. f, err := db.dropAll()
  1514. if f != nil {
  1515. f()
  1516. }
  1517. return err
  1518. }
  1519. func (db *DB) dropAll() (func(), error) {
  1520. db.opt.Infof("DropAll called. Blocking writes...")
  1521. f, err := db.prepareToDrop()
  1522. if err != nil {
  1523. return f, err
  1524. }
  1525. // prepareToDrop will stop all the incoming write and flushes any pending memtables.
  1526. // Before we drop, we'll stop the compaction because anyways all the datas are going to
  1527. // be deleted.
  1528. db.stopCompactions()
  1529. resume := func() {
  1530. db.startCompactions()
  1531. f()
  1532. }
  1533. // Block all foreign interactions with memory tables.
  1534. db.lock.Lock()
  1535. defer db.lock.Unlock()
  1536. // Remove inmemory tables. Calling DecrRef for safety. Not sure if they're absolutely needed.
  1537. db.mt.DecrRef()
  1538. for _, mt := range db.imm {
  1539. mt.DecrRef()
  1540. }
  1541. db.imm = db.imm[:0]
  1542. db.mt, err = db.newMemTable() // Set it up for future writes.
  1543. if err != nil {
  1544. return resume, y.Wrapf(err, "cannot open new memtable")
  1545. }
  1546. num, err := db.lc.dropTree()
  1547. if err != nil {
  1548. return resume, err
  1549. }
  1550. db.opt.Infof("Deleted %d SSTables. Now deleting value logs...\n", num)
  1551. num, err = db.vlog.dropAll()
  1552. if err != nil {
  1553. return resume, err
  1554. }
  1555. db.lc.nextFileID.Store(1)
  1556. db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
  1557. db.blockCache.Clear()
  1558. db.indexCache.Clear()
  1559. db.threshold.Clear(db.opt)
  1560. return resume, nil
  1561. }
  1562. // DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
  1563. // - Stop accepting new writes.
  1564. // - Stop memtable flushes before acquiring lock. Because we're acquiring lock here
  1565. // and memtable flush stalls for lock, which leads to deadlock
  1566. // - Flush out all memtables, skipping over keys with the given prefix, Kp.
  1567. // - Write out the value log header to memtables when flushing, so we don't accidentally bring Kp
  1568. // back after a restart.
  1569. // - Stop compaction.
  1570. // - Compact L0->L1, skipping over Kp.
  1571. // - Compact rest of the levels, Li->Li, picking tables which have Kp.
  1572. // - Resume memtable flushes, compactions and writes.
  1573. func (db *DB) DropPrefix(prefixes ...[]byte) error {
  1574. if len(prefixes) == 0 {
  1575. return nil
  1576. }
  1577. db.opt.Infof("DropPrefix called for %s", prefixes)
  1578. f, err := db.prepareToDrop()
  1579. if err != nil {
  1580. return err
  1581. }
  1582. defer f()
  1583. var filtered [][]byte
  1584. if filtered, err = db.filterPrefixesToDrop(prefixes); err != nil {
  1585. return err
  1586. }
  1587. // If there is no prefix for which the data already exist, do not do anything.
  1588. if len(filtered) == 0 {
  1589. db.opt.Infof("No prefixes to drop")
  1590. return nil
  1591. }
  1592. // Block all foreign interactions with memory tables.
  1593. db.lock.Lock()
  1594. defer db.lock.Unlock()
  1595. db.imm = append(db.imm, db.mt)
  1596. for _, memtable := range db.imm {
  1597. if memtable.sl.Empty() {
  1598. memtable.DecrRef()
  1599. continue
  1600. }
  1601. db.opt.Debugf("Flushing memtable")
  1602. if err := db.handleMemTableFlush(memtable, filtered); err != nil {
  1603. db.opt.Errorf("While trying to flush memtable: %v", err)
  1604. return err
  1605. }
  1606. memtable.DecrRef()
  1607. }
  1608. db.stopCompactions()
  1609. defer db.startCompactions()
  1610. db.imm = db.imm[:0]
  1611. db.mt, err = db.newMemTable()
  1612. if err != nil {
  1613. return y.Wrapf(err, "cannot create new mem table")
  1614. }
  1615. // Drop prefixes from the levels.
  1616. if err := db.lc.dropPrefixes(filtered); err != nil {
  1617. return err
  1618. }
  1619. db.opt.Infof("DropPrefix done")
  1620. return nil
  1621. }
  1622. func (db *DB) filterPrefixesToDrop(prefixes [][]byte) ([][]byte, error) {
  1623. var filtered [][]byte
  1624. for _, prefix := range prefixes {
  1625. err := db.View(func(txn *Txn) error {
  1626. iopts := DefaultIteratorOptions
  1627. iopts.Prefix = prefix
  1628. iopts.PrefetchValues = false
  1629. itr := txn.NewIterator(iopts)
  1630. defer itr.Close()
  1631. itr.Rewind()
  1632. if itr.ValidForPrefix(prefix) {
  1633. filtered = append(filtered, prefix)
  1634. }
  1635. return nil
  1636. })
  1637. if err != nil {
  1638. return filtered, err
  1639. }
  1640. }
  1641. return filtered, nil
  1642. }
  1643. // Checks if the key is banned. Returns the respective error if the key belongs to any of the banned
  1644. // namepspaces. Else it returns nil.
  1645. func (db *DB) isBanned(key []byte) error {
  1646. if db.opt.NamespaceOffset < 0 {
  1647. return nil
  1648. }
  1649. if len(key) <= db.opt.NamespaceOffset+8 {
  1650. return nil
  1651. }
  1652. if db.bannedNamespaces.has(y.BytesToU64(key[db.opt.NamespaceOffset:])) {
  1653. return ErrBannedKey
  1654. }
  1655. return nil
  1656. }
  1657. // BanNamespace bans a namespace. Read/write to keys belonging to any of such namespace is denied.
  1658. func (db *DB) BanNamespace(ns uint64) error {
  1659. if db.opt.NamespaceOffset < 0 {
  1660. return ErrNamespaceMode
  1661. }
  1662. db.opt.Infof("Banning namespace: %d", ns)
  1663. // First set the banned namespaces in DB and then update the in-memory structure.
  1664. key := y.KeyWithTs(append(bannedNsKey, y.U64ToBytes(ns)...), 1)
  1665. entry := []*Entry{{
  1666. Key: key,
  1667. Value: nil,
  1668. }}
  1669. req, err := db.sendToWriteCh(entry)
  1670. if err != nil {
  1671. return err
  1672. }
  1673. if err := req.Wait(); err != nil {
  1674. return err
  1675. }
  1676. db.bannedNamespaces.add(ns)
  1677. return nil
  1678. }
  1679. // BannedNamespaces returns the list of prefixes banned for DB.
  1680. func (db *DB) BannedNamespaces() []uint64 {
  1681. return db.bannedNamespaces.all()
  1682. }
  1683. // KVList contains a list of key-value pairs.
  1684. type KVList = pb.KVList
  1685. // Subscribe can be used to watch key changes for the given key prefixes and the ignore string.
  1686. // At least one prefix should be passed, or an error will be returned.
  1687. // You can use an empty prefix to monitor all changes to the DB.
  1688. // Ignore string is the byte ranges for which prefix matching will be ignored.
  1689. // For example: ignore = "2-3", and prefix = "abc" will match for keys "abxxc", "abdfc" etc.
  1690. // This function blocks until the given context is done or an error occurs.
  1691. // The given function will be called with a new KVList containing the modified keys and the
  1692. // corresponding values.
  1693. func (db *DB) Subscribe(ctx context.Context, cb func(kv *KVList) error, matches []pb.Match) error {
  1694. if cb == nil {
  1695. return ErrNilCallback
  1696. }
  1697. c := z.NewCloser(1)
  1698. s, err := db.pub.newSubscriber(c, matches)
  1699. if err != nil {
  1700. return y.Wrapf(err, "while creating a new subscriber")
  1701. }
  1702. slurp := func(batch *pb.KVList) error {
  1703. for {
  1704. select {
  1705. case kvs := <-s.sendCh:
  1706. batch.Kv = append(batch.Kv, kvs.Kv...)
  1707. default:
  1708. if len(batch.GetKv()) > 0 {
  1709. return cb(batch)
  1710. }
  1711. return nil
  1712. }
  1713. }
  1714. }
  1715. drain := func() {
  1716. for {
  1717. select {
  1718. case _, ok := <-s.sendCh:
  1719. if !ok {
  1720. // Channel is closed.
  1721. return
  1722. }
  1723. default:
  1724. return
  1725. }
  1726. }
  1727. }
  1728. for {
  1729. select {
  1730. case <-c.HasBeenClosed():
  1731. // No need to delete here. Closer will be called only while
  1732. // closing DB. Subscriber will be deleted by cleanSubscribers.
  1733. err := slurp(new(pb.KVList))
  1734. // Drain if any pending updates.
  1735. c.Done()
  1736. return err
  1737. case <-ctx.Done():
  1738. c.Done()
  1739. s.active.Store(0)
  1740. drain()
  1741. db.pub.deleteSubscriber(s.id)
  1742. // Delete the subscriber to avoid further updates.
  1743. return ctx.Err()
  1744. case batch := <-s.sendCh:
  1745. err := slurp(batch)
  1746. if err != nil {
  1747. c.Done()
  1748. s.active.Store(0)
  1749. drain()
  1750. // Delete the subscriber if there is an error by the callback.
  1751. db.pub.deleteSubscriber(s.id)
  1752. return err
  1753. }
  1754. }
  1755. }
  1756. }
  1757. func (db *DB) syncDir(dir string) error {
  1758. if db.opt.InMemory {
  1759. return nil
  1760. }
  1761. return syncDir(dir)
  1762. }
  1763. func createDirs(opt Options) error {
  1764. for _, path := range []string{opt.Dir, opt.ValueDir} {
  1765. dirExists, err := exists(path)
  1766. if err != nil {
  1767. return y.Wrapf(err, "Invalid Dir: %q", path)
  1768. }
  1769. if !dirExists {
  1770. if opt.ReadOnly {
  1771. return errors.Errorf("Cannot find directory %q for read-only open", path)
  1772. }
  1773. // Try to create the directory
  1774. err = os.MkdirAll(path, 0700)
  1775. if err != nil {
  1776. return y.Wrapf(err, "Error Creating Dir: %q", path)
  1777. }
  1778. }
  1779. }
  1780. return nil
  1781. }
  1782. // Stream the contents of this DB to a new DB with options outOptions that will be
  1783. // created in outDir.
  1784. func (db *DB) StreamDB(outOptions Options) error {
  1785. outDir := outOptions.Dir
  1786. // Open output DB.
  1787. outDB, err := OpenManaged(outOptions)
  1788. if err != nil {
  1789. return y.Wrapf(err, "cannot open out DB at %s", outDir)
  1790. }
  1791. defer outDB.Close()
  1792. writer := outDB.NewStreamWriter()
  1793. if err := writer.Prepare(); err != nil {
  1794. return y.Wrapf(err, "cannot create stream writer in out DB at %s", outDir)
  1795. }
  1796. // Stream contents of DB to the output DB.
  1797. stream := db.NewStreamAt(math.MaxUint64)
  1798. stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir)
  1799. stream.Send = func(buf *z.Buffer) error {
  1800. return writer.Write(buf)
  1801. }
  1802. if err := stream.Orchestrate(context.Background()); err != nil {
  1803. return y.Wrapf(err, "cannot stream DB to out DB at %s", outDir)
  1804. }
  1805. if err := writer.Flush(); err != nil {
  1806. return y.Wrapf(err, "cannot flush writer")
  1807. }
  1808. return nil
  1809. }
  1810. // Opts returns a copy of the DB options.
  1811. func (db *DB) Opts() Options {
  1812. return db.opt
  1813. }
  1814. type CacheType int
  1815. const (
  1816. BlockCache CacheType = iota
  1817. IndexCache
  1818. )
  1819. // CacheMaxCost updates the max cost of the given cache (either block or index cache).
  1820. // The call will have an effect only if the DB was created with the cache. Otherwise it is
  1821. // a no-op. If you pass a negative value, the function will return the current value
  1822. // without updating it.
  1823. func (db *DB) CacheMaxCost(cache CacheType, maxCost int64) (int64, error) {
  1824. if db == nil {
  1825. return 0, nil
  1826. }
  1827. if maxCost < 0 {
  1828. switch cache {
  1829. case BlockCache:
  1830. return db.blockCache.MaxCost(), nil
  1831. case IndexCache:
  1832. return db.indexCache.MaxCost(), nil
  1833. default:
  1834. return 0, errors.Errorf("invalid cache type")
  1835. }
  1836. }
  1837. switch cache {
  1838. case BlockCache:
  1839. db.blockCache.UpdateMaxCost(maxCost)
  1840. return maxCost, nil
  1841. case IndexCache:
  1842. db.indexCache.UpdateMaxCost(maxCost)
  1843. return maxCost, nil
  1844. default:
  1845. return 0, errors.Errorf("invalid cache type")
  1846. }
  1847. }
  1848. func (db *DB) LevelsToString() string {
  1849. levels := db.Levels()
  1850. h := func(sz int64) string {
  1851. return humanize.IBytes(uint64(sz))
  1852. }
  1853. base := func(b bool) string {
  1854. if b {
  1855. return "B"
  1856. }
  1857. return " "
  1858. }
  1859. var b strings.Builder
  1860. b.WriteRune('\n')
  1861. for _, li := range levels {
  1862. b.WriteString(fmt.Sprintf(
  1863. "Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f"+
  1864. " StaleData: %s Target FileSize: %s\n",
  1865. li.Level, base(li.IsBaseLevel), li.NumTables,
  1866. h(li.Size), h(li.TargetSize), li.Score, li.Adjusted, h(li.StaleDatSize),
  1867. h(li.TargetFileSize)))
  1868. }
  1869. b.WriteString("Level Done\n")
  1870. return b.String()
  1871. }