levels.go 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. "encoding/hex"
  10. stderrors "errors"
  11. "fmt"
  12. "math"
  13. "math/rand"
  14. "os"
  15. "sort"
  16. "strings"
  17. "sync"
  18. "sync/atomic"
  19. "time"
  20. "github.com/pkg/errors"
  21. "go.opentelemetry.io/otel"
  22. "go.opentelemetry.io/otel/attribute"
  23. "github.com/dgraph-io/badger/v4/pb"
  24. "github.com/dgraph-io/badger/v4/table"
  25. "github.com/dgraph-io/badger/v4/y"
  26. "github.com/dgraph-io/ristretto/v2/z"
  27. )
  28. type levelsController struct {
  29. nextFileID atomic.Uint64
  30. l0stallsMs atomic.Int64
  31. // The following are initialized once and const.
  32. levels []*levelHandler
  33. kv *DB
  34. cstatus compactStatus
  35. }
  36. // revertToManifest checks that all necessary table files exist and removes all table files not
  37. // referenced by the manifest. idMap is a set of table file id's that were read from the directory
  38. // listing.
  39. func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error {
  40. // 1. Check all files in manifest exist.
  41. for id := range mf.Tables {
  42. if _, ok := idMap[id]; !ok {
  43. return fmt.Errorf("file does not exist for table %d", id)
  44. }
  45. }
  46. // 2. Delete files that shouldn't exist.
  47. for id := range idMap {
  48. if _, ok := mf.Tables[id]; !ok {
  49. kv.opt.Debugf("Table file %d not referenced in MANIFEST\n", id)
  50. filename := table.NewFilename(id, kv.opt.Dir)
  51. if err := os.Remove(filename); err != nil {
  52. return y.Wrapf(err, "While removing table %d", id)
  53. }
  54. }
  55. }
  56. return nil
  57. }
  58. func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
  59. y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
  60. s := &levelsController{
  61. kv: db,
  62. levels: make([]*levelHandler, db.opt.MaxLevels),
  63. }
  64. s.cstatus.tables = make(map[uint64]struct{})
  65. s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
  66. for i := 0; i < db.opt.MaxLevels; i++ {
  67. s.levels[i] = newLevelHandler(db, i)
  68. s.cstatus.levels[i] = new(levelCompactStatus)
  69. }
  70. if db.opt.InMemory {
  71. return s, nil
  72. }
  73. // Compare manifest against directory, check for existent/non-existent files, and remove.
  74. if err := revertToManifest(db, mf, getIDMap(db.opt.Dir)); err != nil {
  75. return nil, err
  76. }
  77. var mu sync.Mutex
  78. tables := make([][]*table.Table, db.opt.MaxLevels)
  79. var maxFileID uint64
  80. // We found that using 3 goroutines allows disk throughput to be utilized to its max.
  81. // Disk utilization is the main thing we should focus on, while trying to read the data. That's
  82. // the one factor that remains constant between HDD and SSD.
  83. throttle := y.NewThrottle(3)
  84. start := time.Now()
  85. var numOpened atomic.Int32
  86. tick := time.NewTicker(3 * time.Second)
  87. defer tick.Stop()
  88. for fileID, tf := range mf.Tables {
  89. fname := table.NewFilename(fileID, db.opt.Dir)
  90. select {
  91. case <-tick.C:
  92. db.opt.Infof("%d tables out of %d opened in %s\n", numOpened.Load(),
  93. len(mf.Tables), time.Since(start).Round(time.Millisecond))
  94. default:
  95. }
  96. if err := throttle.Do(); err != nil {
  97. closeAllTables(tables)
  98. return nil, err
  99. }
  100. if fileID > maxFileID {
  101. maxFileID = fileID
  102. }
  103. go func(fname string, tf TableManifest) {
  104. var rerr error
  105. defer func() {
  106. throttle.Done(rerr)
  107. numOpened.Add(1)
  108. }()
  109. dk, err := db.registry.DataKey(tf.KeyID)
  110. if err != nil {
  111. rerr = y.Wrapf(err, "Error while reading datakey")
  112. return
  113. }
  114. topt := buildTableOptions(db)
  115. // Explicitly set Compression and DataKey based on how the table was generated.
  116. topt.Compression = tf.Compression
  117. topt.DataKey = dk
  118. mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
  119. if err != nil {
  120. rerr = y.Wrapf(err, "Opening file: %q", fname)
  121. return
  122. }
  123. t, err := table.OpenTable(mf, topt)
  124. if err != nil {
  125. if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
  126. db.opt.Errorf(err.Error())
  127. db.opt.Errorf("Ignoring table %s", mf.Fd.Name())
  128. // Do not set rerr. We will continue without this table.
  129. } else {
  130. rerr = y.Wrapf(err, "Opening table: %q", fname)
  131. }
  132. return
  133. }
  134. mu.Lock()
  135. tables[tf.Level] = append(tables[tf.Level], t)
  136. mu.Unlock()
  137. }(fname, tf)
  138. }
  139. if err := throttle.Finish(); err != nil {
  140. closeAllTables(tables)
  141. return nil, err
  142. }
  143. db.opt.Infof("All %d tables opened in %s\n", numOpened.Load(),
  144. time.Since(start).Round(time.Millisecond))
  145. s.nextFileID.Store(maxFileID + 1)
  146. for i, tbls := range tables {
  147. s.levels[i].initTables(tbls)
  148. }
  149. // Make sure key ranges do not overlap etc.
  150. if err := s.validate(); err != nil {
  151. _ = s.cleanupLevels()
  152. return nil, y.Wrap(err, "Level validation")
  153. }
  154. // Sync directory (because we have at least removed some files, or previously created the
  155. // manifest file).
  156. if err := syncDir(db.opt.Dir); err != nil {
  157. _ = s.close()
  158. return nil, err
  159. }
  160. return s, nil
  161. }
  162. // Closes the tables, for cleanup in newLevelsController. (We Close() instead of using DecrRef()
  163. // because that would delete the underlying files.) We ignore errors, which is OK because tables
  164. // are read-only.
  165. func closeAllTables(tables [][]*table.Table) {
  166. for _, tableSlice := range tables {
  167. for _, table := range tableSlice {
  168. _ = table.Close(-1)
  169. }
  170. }
  171. }
  172. func (s *levelsController) cleanupLevels() error {
  173. var firstErr error
  174. for _, l := range s.levels {
  175. if err := l.close(); err != nil && firstErr == nil {
  176. firstErr = err
  177. }
  178. }
  179. return firstErr
  180. }
  181. // dropTree picks all tables from all levels, creates a manifest changeset,
  182. // applies it, and then decrements the refs of these tables, which would result
  183. // in their deletion.
  184. func (s *levelsController) dropTree() (int, error) {
  185. // First pick all tables, so we can create a manifest changelog.
  186. var all []*table.Table
  187. for _, l := range s.levels {
  188. l.RLock()
  189. all = append(all, l.tables...)
  190. l.RUnlock()
  191. }
  192. if len(all) == 0 {
  193. return 0, nil
  194. }
  195. // Generate the manifest changes.
  196. changes := []*pb.ManifestChange{}
  197. for _, table := range all {
  198. // Add a delete change only if the table is not in memory.
  199. if !table.IsInmemory {
  200. changes = append(changes, newDeleteChange(table.ID()))
  201. }
  202. }
  203. changeSet := pb.ManifestChangeSet{Changes: changes}
  204. if err := s.kv.manifest.addChanges(changeSet.Changes); err != nil {
  205. return 0, err
  206. }
  207. // Now that manifest has been successfully written, we can delete the tables.
  208. for _, l := range s.levels {
  209. l.Lock()
  210. l.totalSize = 0
  211. l.tables = l.tables[:0]
  212. l.Unlock()
  213. }
  214. for _, table := range all {
  215. if err := table.DecrRef(); err != nil {
  216. return 0, err
  217. }
  218. }
  219. return len(all), nil
  220. }
  221. // dropPrefix runs a L0->L1 compaction, and then runs same level compaction on the rest of the
  222. // levels. For L0->L1 compaction, it runs compactions normally, but skips over
  223. // all the keys with the provided prefix.
  224. // For Li->Li compactions, it picks up the tables which would have the prefix. The
  225. // tables who only have keys with this prefix are quickly dropped. The ones which have other keys
  226. // are run through MergeIterator and compacted to create new tables. All the mechanisms of
  227. // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
  228. func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
  229. opt := s.kv.opt
  230. // Iterate levels in the reverse order because if we were to iterate from
  231. // lower level (say level 0) to a higher level (say level 3) we could have
  232. // a state in which level 0 is compacted and an older version of a key exists in lower level.
  233. // At this point, if someone creates an iterator, they would see an old
  234. // value for a key from lower levels. Iterating in reverse order ensures we
  235. // drop the oldest data first so that lookups never return stale data.
  236. for i := len(s.levels) - 1; i >= 0; i-- {
  237. l := s.levels[i]
  238. l.RLock()
  239. if l.level == 0 {
  240. size := len(l.tables)
  241. l.RUnlock()
  242. if size > 0 {
  243. cp := compactionPriority{
  244. level: 0,
  245. score: 1.74,
  246. // A unique number greater than 1.0 does two things. Helps identify this
  247. // function in logs, and forces a compaction.
  248. dropPrefixes: prefixes,
  249. }
  250. if err := s.doCompact(174, cp); err != nil {
  251. opt.Warningf("While compacting level 0: %v", err)
  252. return nil
  253. }
  254. }
  255. continue
  256. }
  257. // Build a list of compaction tableGroups affecting all the prefixes we
  258. // need to drop. We need to build tableGroups that satisfy the invariant that
  259. // bottom tables are consecutive.
  260. // tableGroup contains groups of consecutive tables.
  261. var tableGroups [][]*table.Table
  262. var tableGroup []*table.Table
  263. finishGroup := func() {
  264. if len(tableGroup) > 0 {
  265. tableGroups = append(tableGroups, tableGroup)
  266. tableGroup = nil
  267. }
  268. }
  269. for _, table := range l.tables {
  270. if containsAnyPrefixes(table, prefixes) {
  271. tableGroup = append(tableGroup, table)
  272. } else {
  273. finishGroup()
  274. }
  275. }
  276. finishGroup()
  277. l.RUnlock()
  278. if len(tableGroups) == 0 {
  279. continue
  280. }
  281. opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
  282. for _, operation := range tableGroups {
  283. cd := compactDef{
  284. thisLevel: l,
  285. nextLevel: l,
  286. top: nil,
  287. bot: operation,
  288. dropPrefixes: prefixes,
  289. t: s.levelTargets(),
  290. }
  291. _, span := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
  292. span.SetAttributes(attribute.Int("Compaction level", l.level))
  293. span.SetAttributes(attribute.String("Drop Prefixes", fmt.Sprintf("%v", prefixes)))
  294. cd.t.baseLevel = l.level
  295. if err := s.runCompactDef(-1, l.level, cd); err != nil {
  296. opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
  297. span.End()
  298. return err
  299. }
  300. span.SetAttributes(
  301. attribute.Int("Top tables count", len(cd.top)),
  302. attribute.Int("Bottom tables count", len(cd.bot)))
  303. span.End()
  304. }
  305. }
  306. return nil
  307. }
  308. func (s *levelsController) startCompact(lc *z.Closer) {
  309. n := s.kv.opt.NumCompactors
  310. lc.AddRunning(n - 1)
  311. for i := 0; i < n; i++ {
  312. go s.runCompactor(i, lc)
  313. }
  314. }
  315. type targets struct {
  316. baseLevel int
  317. targetSz []int64
  318. fileSz []int64
  319. }
  320. // levelTargets calculates the targets for levels in the LSM tree. The idea comes from Dynamic Level
  321. // Sizes ( https://rocksdb.org/blog/2015/07/23/dynamic-level.html ) in RocksDB. The sizes of levels
  322. // are calculated based on the size of the lowest level, typically L6. So, if L6 size is 1GB, then
  323. // L5 target size is 100MB, L4 target size is 10MB and so on.
  324. //
  325. // L0 files don't automatically go to L1. Instead, they get compacted to Lbase, where Lbase is
  326. // chosen based on the first level which is non-empty from top (check L1 through L6). For an empty
  327. // DB, that would be L6. So, L0 compactions go to L6, then L5, L4 and so on.
  328. //
  329. // Lbase is advanced to the upper levels when its target size exceeds BaseLevelSize. For
  330. // example, when L6 reaches 1.1GB, then L4 target sizes becomes 11MB, thus exceeding the
  331. // BaseLevelSize of 10MB. L3 would then become the new Lbase, with a target size of 1MB <
  332. // BaseLevelSize.
  333. func (s *levelsController) levelTargets() targets {
  334. adjust := func(sz int64) int64 {
  335. if sz < s.kv.opt.BaseLevelSize {
  336. return s.kv.opt.BaseLevelSize
  337. }
  338. return sz
  339. }
  340. t := targets{
  341. targetSz: make([]int64, len(s.levels)),
  342. fileSz: make([]int64, len(s.levels)),
  343. }
  344. // DB size is the size of the last level.
  345. dbSize := s.lastLevel().getTotalSize()
  346. for i := len(s.levels) - 1; i > 0; i-- {
  347. ltarget := adjust(dbSize)
  348. t.targetSz[i] = ltarget
  349. if t.baseLevel == 0 && ltarget <= s.kv.opt.BaseLevelSize {
  350. t.baseLevel = i
  351. }
  352. dbSize /= int64(s.kv.opt.LevelSizeMultiplier)
  353. }
  354. tsz := s.kv.opt.BaseTableSize
  355. for i := 0; i < len(s.levels); i++ {
  356. if i == 0 {
  357. // Use MemTableSize for Level 0. Because at Level 0, we stop compactions based on the
  358. // number of tables, not the size of the level. So, having a 1:1 size ratio between
  359. // memtable size and the size of L0 files is better than churning out 32 files per
  360. // memtable (assuming 64MB MemTableSize and 2MB BaseTableSize).
  361. t.fileSz[i] = s.kv.opt.MemTableSize
  362. } else if i <= t.baseLevel {
  363. t.fileSz[i] = tsz
  364. } else {
  365. tsz *= int64(s.kv.opt.TableSizeMultiplier)
  366. t.fileSz[i] = tsz
  367. }
  368. }
  369. // Bring the base level down to the last empty level.
  370. for i := t.baseLevel + 1; i < len(s.levels)-1; i++ {
  371. if s.levels[i].getTotalSize() > 0 {
  372. break
  373. }
  374. t.baseLevel = i
  375. }
  376. // If the base level is empty and the next level size is less than the
  377. // target size, pick the next level as the base level.
  378. b := t.baseLevel
  379. lvl := s.levels
  380. if b < len(lvl)-1 && lvl[b].getTotalSize() == 0 && lvl[b+1].getTotalSize() < t.targetSz[b+1] {
  381. t.baseLevel++
  382. }
  383. return t
  384. }
  385. func (s *levelsController) runCompactor(id int, lc *z.Closer) {
  386. defer lc.Done()
  387. randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
  388. select {
  389. case <-randomDelay.C:
  390. case <-lc.HasBeenClosed():
  391. randomDelay.Stop()
  392. return
  393. }
  394. moveL0toFront := func(prios []compactionPriority) []compactionPriority {
  395. idx := -1
  396. for i, p := range prios {
  397. if p.level == 0 {
  398. idx = i
  399. break
  400. }
  401. }
  402. // If idx == -1, we didn't find L0.
  403. // If idx == 0, then we don't need to do anything. L0 is already at the front.
  404. if idx > 0 {
  405. out := append([]compactionPriority{}, prios[idx])
  406. out = append(out, prios[:idx]...)
  407. out = append(out, prios[idx+1:]...)
  408. return out
  409. }
  410. return prios
  411. }
  412. run := func(p compactionPriority) bool {
  413. err := s.doCompact(id, p)
  414. switch err {
  415. case nil:
  416. return true
  417. case errFillTables:
  418. // pass
  419. default:
  420. s.kv.opt.Warningf("While running doCompact: %v\n", err)
  421. }
  422. return false
  423. }
  424. var priosBuffer []compactionPriority
  425. runOnce := func() bool {
  426. prios := s.pickCompactLevels(priosBuffer)
  427. defer func() {
  428. priosBuffer = prios
  429. }()
  430. if id == 0 {
  431. // Worker ID zero prefers to compact L0 always.
  432. prios = moveL0toFront(prios)
  433. }
  434. for _, p := range prios {
  435. if id == 0 && p.level == 0 {
  436. // Allow worker zero to run level 0, irrespective of its adjusted score.
  437. } else if p.adjusted < 1.0 {
  438. break
  439. }
  440. if run(p) {
  441. return true
  442. }
  443. }
  444. return false
  445. }
  446. tryLmaxToLmaxCompaction := func() {
  447. p := compactionPriority{
  448. level: s.lastLevel().level,
  449. t: s.levelTargets(),
  450. }
  451. run(p)
  452. }
  453. count := 0
  454. ticker := time.NewTicker(50 * time.Millisecond)
  455. defer ticker.Stop()
  456. for {
  457. select {
  458. // Can add a done channel or other stuff.
  459. case <-ticker.C:
  460. count++
  461. // Each ticker is 50ms so 50*200=10seconds.
  462. if s.kv.opt.LmaxCompaction && id == 2 && count >= 200 {
  463. tryLmaxToLmaxCompaction()
  464. count = 0
  465. } else {
  466. runOnce()
  467. }
  468. case <-lc.HasBeenClosed():
  469. return
  470. }
  471. }
  472. }
  473. type compactionPriority struct {
  474. level int
  475. score float64
  476. adjusted float64
  477. dropPrefixes [][]byte
  478. t targets
  479. }
  480. func (s *levelsController) lastLevel() *levelHandler {
  481. return s.levels[len(s.levels)-1]
  482. }
  483. // pickCompactLevel determines which level to compact.
  484. // Based on: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
  485. // It tries to reuse priosBuffer to reduce memory allocation,
  486. // passing nil is acceptable, then new memory will be allocated.
  487. func (s *levelsController) pickCompactLevels(priosBuffer []compactionPriority) (prios []compactionPriority) {
  488. t := s.levelTargets()
  489. addPriority := func(level int, score float64) {
  490. pri := compactionPriority{
  491. level: level,
  492. score: score,
  493. adjusted: score,
  494. t: t,
  495. }
  496. prios = append(prios, pri)
  497. }
  498. // Grow buffer to fit all levels.
  499. if cap(priosBuffer) < len(s.levels) {
  500. priosBuffer = make([]compactionPriority, 0, len(s.levels))
  501. }
  502. prios = priosBuffer[:0]
  503. // Add L0 priority based on the number of tables.
  504. addPriority(0, float64(s.levels[0].numTables())/float64(s.kv.opt.NumLevelZeroTables))
  505. // All other levels use size to calculate priority.
  506. for i := 1; i < len(s.levels); i++ {
  507. // Don't consider those tables that are already being compacted right now.
  508. delSize := s.cstatus.delSize(i)
  509. l := s.levels[i]
  510. sz := l.getTotalSize() - delSize
  511. addPriority(i, float64(sz)/float64(t.targetSz[i]))
  512. }
  513. y.AssertTrue(len(prios) == len(s.levels))
  514. // The following code is borrowed from PebbleDB and results in healthier LSM tree structure.
  515. // If Li-1 has score > 1.0, then we'll divide Li-1 score by Li. If Li score is >= 1.0, then Li-1
  516. // score is reduced, which means we'll prioritize the compaction of lower levels (L5, L4 and so
  517. // on) over the higher levels (L0, L1 and so on). On the other hand, if Li score is < 1.0, then
  518. // we'll increase the priority of Li-1.
  519. // Overall what this means is, if the bottom level is already overflowing, then de-prioritize
  520. // compaction of the above level. If the bottom level is not full, then increase the priority of
  521. // above level.
  522. var prevLevel int
  523. for level := t.baseLevel; level < len(s.levels); level++ {
  524. if prios[prevLevel].adjusted >= 1 {
  525. // Avoid absurdly large scores by placing a floor on the score that we'll
  526. // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily
  527. const minScore = 0.01
  528. if prios[level].score >= minScore {
  529. prios[prevLevel].adjusted /= prios[level].adjusted
  530. } else {
  531. prios[prevLevel].adjusted /= minScore
  532. }
  533. }
  534. prevLevel = level
  535. }
  536. // Pick all the levels whose original score is >= 1.0, irrespective of their adjusted score.
  537. // We'll still sort them by their adjusted score below. Having both these scores allows us to
  538. // make better decisions about compacting L0. If we see a score >= 1.0, we can do L0->L0
  539. // compactions. If the adjusted score >= 1.0, then we can do L0->Lbase compactions.
  540. out := prios[:0]
  541. for _, p := range prios[:len(prios)-1] {
  542. if p.score >= 1.0 {
  543. out = append(out, p)
  544. }
  545. }
  546. prios = out
  547. // Sort by the adjusted score.
  548. sort.Slice(prios, func(i, j int) bool {
  549. return prios[i].adjusted > prios[j].adjusted
  550. })
  551. return prios
  552. }
  553. // checkOverlap checks if the given tables overlap with any level from the given "lev" onwards.
  554. func (s *levelsController) checkOverlap(tables []*table.Table, lev int) bool {
  555. kr := getKeyRange(tables...)
  556. for i, lh := range s.levels {
  557. if i < lev { // Skip upper levels.
  558. continue
  559. }
  560. lh.RLock()
  561. left, right := lh.overlappingTables(levelHandlerRLocked{}, kr)
  562. lh.RUnlock()
  563. if right-left > 0 {
  564. return true
  565. }
  566. }
  567. return false
  568. }
  569. // subcompact runs a single sub-compaction, iterating over the specified key-range only.
  570. //
  571. // We use splits to do a single compaction concurrently. If we have >= 3 tables
  572. // involved in the bottom level during compaction, we choose key ranges to
  573. // split the main compaction up into sub-compactions. Each sub-compaction runs
  574. // concurrently, only iterating over the provided key range, generating tables.
  575. // This speeds up the compaction significantly.
  576. func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
  577. inflightBuilders *y.Throttle, res chan<- *table.Table) {
  578. // Check overlap of the top level with the levels which are not being
  579. // compacted in this compaction.
  580. hasOverlap := s.checkOverlap(cd.allTables(), cd.nextLevel.level+1)
  581. // Pick a discard ts, so we can discard versions below this ts. We should
  582. // never discard any versions starting from above this timestamp, because
  583. // that would affect the snapshot view guarantee provided by transactions.
  584. discardTs := s.kv.orc.discardAtOrBelow()
  585. // Try to collect stats so that we can inform value log about GC. That would help us find which
  586. // value log file should be GCed.
  587. discardStats := make(map[uint32]int64)
  588. updateStats := func(vs y.ValueStruct) {
  589. // We don't need to store/update discard stats when badger is running in Disk-less mode.
  590. if s.kv.opt.InMemory {
  591. return
  592. }
  593. if vs.Meta&bitValuePointer > 0 {
  594. var vp valuePointer
  595. vp.Decode(vs.Value)
  596. discardStats[vp.Fid] += int64(vp.Len)
  597. }
  598. }
  599. // exceedsAllowedOverlap returns true if the given key range would overlap with more than 10
  600. // tables from level below nextLevel (nextLevel+1). This helps avoid generating tables at Li
  601. // with huge overlaps with Li+1.
  602. exceedsAllowedOverlap := func(kr keyRange) bool {
  603. n2n := cd.nextLevel.level + 1
  604. if n2n <= 1 || n2n >= len(s.levels) {
  605. return false
  606. }
  607. n2nl := s.levels[n2n]
  608. n2nl.RLock()
  609. defer n2nl.RUnlock()
  610. l, r := n2nl.overlappingTables(levelHandlerRLocked{}, kr)
  611. return r-l >= 10
  612. }
  613. var (
  614. lastKey, skipKey []byte
  615. numBuilds, numVersions int
  616. // Denotes if the first key is a series of duplicate keys had
  617. // "DiscardEarlierVersions" set
  618. firstKeyHasDiscardSet bool
  619. )
  620. addKeys := func(builder *table.Builder) {
  621. timeStart := time.Now()
  622. var numKeys, numSkips uint64
  623. var rangeCheck int
  624. var tableKr keyRange
  625. for ; it.Valid(); it.Next() {
  626. // See if we need to skip the prefix.
  627. if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
  628. numSkips++
  629. updateStats(it.Value())
  630. continue
  631. }
  632. // See if we need to skip this key.
  633. if len(skipKey) > 0 {
  634. if y.SameKey(it.Key(), skipKey) {
  635. numSkips++
  636. updateStats(it.Value())
  637. continue
  638. } else {
  639. skipKey = skipKey[:0]
  640. }
  641. }
  642. if !y.SameKey(it.Key(), lastKey) {
  643. firstKeyHasDiscardSet = false
  644. if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
  645. break
  646. }
  647. if builder.ReachedCapacity() {
  648. // Only break if we are on a different key, and have reached capacity. We want
  649. // to ensure that all versions of the key are stored in the same sstable, and
  650. // not divided across multiple tables at the same level.
  651. break
  652. }
  653. lastKey = y.SafeCopy(lastKey, it.Key())
  654. numVersions = 0
  655. firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
  656. if len(tableKr.left) == 0 {
  657. tableKr.left = y.SafeCopy(tableKr.left, it.Key())
  658. }
  659. tableKr.right = lastKey
  660. rangeCheck++
  661. if rangeCheck%5000 == 0 {
  662. // This table's range exceeds the allowed range overlap with the level after
  663. // next. So, we stop writing to this table. If we don't do this, then we end up
  664. // doing very expensive compactions involving too many tables. To amortize the
  665. // cost of this check, we do it only every N keys.
  666. if exceedsAllowedOverlap(tableKr) {
  667. // s.kv.opt.Debugf("L%d -> L%d Breaking due to exceedsAllowedOverlap with
  668. // kr: %s\n", cd.thisLevel.level, cd.nextLevel.level, tableKr)
  669. break
  670. }
  671. }
  672. }
  673. vs := it.Value()
  674. version := y.ParseTs(it.Key())
  675. isExpired := isDeletedOrExpired(vs.Meta, vs.ExpiresAt)
  676. // Do not discard entries inserted by merge operator. These entries will be
  677. // discarded once they're merged
  678. if version <= discardTs && vs.Meta&bitMergeEntry == 0 {
  679. // Keep track of the number of versions encountered for this key. Only consider the
  680. // versions which are below the minReadTs, otherwise, we might end up discarding the
  681. // only valid version for a running transaction.
  682. numVersions++
  683. // Keep the current version and discard all the next versions if
  684. // - The `discardEarlierVersions` bit is set OR
  685. // - We've already processed `NumVersionsToKeep` number of versions
  686. // (including the current item being processed)
  687. lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
  688. numVersions == s.kv.opt.NumVersionsToKeep
  689. if isExpired || lastValidVersion {
  690. // If this version of the key is deleted or expired, skip all the rest of the
  691. // versions. Ensure that we're only removing versions below readTs.
  692. skipKey = y.SafeCopy(skipKey, it.Key())
  693. switch {
  694. // Add the key to the table only if it has not expired.
  695. // We don't want to add the deleted/expired keys.
  696. case !isExpired && lastValidVersion:
  697. // Add this key. We have set skipKey, so the following key versions
  698. // would be skipped.
  699. case hasOverlap:
  700. // If this key range has overlap with lower levels, then keep the deletion
  701. // marker with the latest version, discarding the rest. We have set skipKey,
  702. // so the following key versions would be skipped.
  703. default:
  704. // If no overlap, we can skip all the versions, by continuing here.
  705. numSkips++
  706. updateStats(vs)
  707. continue // Skip adding this key.
  708. }
  709. }
  710. }
  711. numKeys++
  712. var vp valuePointer
  713. if vs.Meta&bitValuePointer > 0 {
  714. vp.Decode(vs.Value)
  715. }
  716. switch {
  717. case firstKeyHasDiscardSet:
  718. // This key is same as the last key which had "DiscardEarlierVersions" set. The
  719. // the next compactions will drop this key if its ts >
  720. // discardTs (of the next compaction).
  721. builder.AddStaleKey(it.Key(), vs, vp.Len)
  722. case isExpired:
  723. // If the key is expired, the next compaction will drop it if
  724. // its ts > discardTs (of the next compaction).
  725. builder.AddStaleKey(it.Key(), vs, vp.Len)
  726. default:
  727. builder.Add(it.Key(), vs, vp.Len)
  728. }
  729. }
  730. s.kv.opt.Debugf("[%d] LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
  731. cd.compactorId, numKeys, numSkips, time.Since(timeStart).Round(time.Millisecond))
  732. } // End of function: addKeys
  733. if len(kr.left) > 0 {
  734. it.Seek(kr.left)
  735. } else {
  736. it.Rewind()
  737. }
  738. for it.Valid() {
  739. if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
  740. break
  741. }
  742. bopts := buildTableOptions(s.kv)
  743. // Set TableSize to the target file size for that level.
  744. bopts.TableSize = uint64(cd.t.fileSz[cd.nextLevel.level])
  745. builder := table.NewTableBuilder(bopts)
  746. // This would do the iteration and add keys to builder.
  747. addKeys(builder)
  748. // It was true that it.Valid() at least once in the loop above, which means we
  749. // called Add() at least once, and builder is not Empty().
  750. if builder.Empty() {
  751. // Cleanup builder resources:
  752. builder.Finish()
  753. builder.Close()
  754. continue
  755. }
  756. numBuilds++
  757. if err := inflightBuilders.Do(); err != nil {
  758. // Can't return from here, until I decrRef all the tables that I built so far.
  759. break
  760. }
  761. go func(builder *table.Builder, fileID uint64) {
  762. var err error
  763. defer inflightBuilders.Done(err)
  764. defer builder.Close()
  765. var tbl *table.Table
  766. if s.kv.opt.InMemory {
  767. tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
  768. } else {
  769. fname := table.NewFilename(fileID, s.kv.opt.Dir)
  770. tbl, err = table.CreateTable(fname, builder)
  771. }
  772. // If we couldn't build the table, return fast.
  773. if err != nil {
  774. return
  775. }
  776. res <- tbl
  777. }(builder, s.reserveFileID())
  778. }
  779. s.kv.vlog.updateDiscardStats(discardStats)
  780. s.kv.opt.Debugf("Discard stats: %v", discardStats)
  781. }
  782. // compactBuildTables merges topTables and botTables to form a list of new tables.
  783. func (s *levelsController) compactBuildTables(
  784. lev int, cd compactDef) ([]*table.Table, func() error, error) {
  785. topTables := cd.top
  786. botTables := cd.bot
  787. numTables := int64(len(topTables) + len(botTables))
  788. y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, numTables)
  789. defer y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, -numTables)
  790. keepTable := func(t *table.Table) bool {
  791. for _, prefix := range cd.dropPrefixes {
  792. if bytes.HasPrefix(t.Smallest(), prefix) &&
  793. bytes.HasPrefix(t.Biggest(), prefix) {
  794. // All the keys in this table have the dropPrefix. So, this
  795. // table does not need to be in the iterator and can be
  796. // dropped immediately.
  797. return false
  798. }
  799. }
  800. return true
  801. }
  802. var valid []*table.Table
  803. for _, table := range botTables {
  804. if keepTable(table) {
  805. valid = append(valid, table)
  806. }
  807. }
  808. newIterator := func() []y.Iterator {
  809. // Create iterators across all the tables involved first.
  810. var iters []y.Iterator
  811. switch {
  812. case lev == 0:
  813. iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
  814. case len(topTables) > 0:
  815. y.AssertTrue(len(topTables) == 1)
  816. iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
  817. }
  818. // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
  819. return append(iters, table.NewConcatIterator(valid, table.NOCACHE))
  820. }
  821. res := make(chan *table.Table, 3)
  822. inflightBuilders := y.NewThrottle(8 + len(cd.splits))
  823. for _, kr := range cd.splits {
  824. // Initiate Do here so we can register the goroutines for buildTables too.
  825. if err := inflightBuilders.Do(); err != nil {
  826. s.kv.opt.Errorf("cannot start subcompaction: %+v", err)
  827. return nil, nil, err
  828. }
  829. go func(kr keyRange) {
  830. defer inflightBuilders.Done(nil)
  831. it := table.NewMergeIterator(newIterator(), false)
  832. defer it.Close()
  833. s.subcompact(it, kr, cd, inflightBuilders, res)
  834. }(kr)
  835. }
  836. var newTables []*table.Table
  837. var wg sync.WaitGroup
  838. wg.Add(1)
  839. go func() {
  840. defer wg.Done()
  841. for t := range res {
  842. newTables = append(newTables, t)
  843. }
  844. }()
  845. // Wait for all table builders to finish and also for newTables accumulator to finish.
  846. err := inflightBuilders.Finish()
  847. close(res)
  848. wg.Wait() // Wait for all tables to be picked up.
  849. if err == nil {
  850. // Ensure created files' directory entries are visible. We don't mind the extra latency
  851. // from not doing this ASAP after all file creation has finished because this is a
  852. // background operation.
  853. err = s.kv.syncDir(s.kv.opt.Dir)
  854. }
  855. if err != nil {
  856. // An error happened. Delete all the newly created table files (by calling DecrRef
  857. // -- we're the only holders of a ref).
  858. _ = decrRefs(newTables)
  859. return nil, nil, y.Wrapf(err, "while running compactions for: %+v", cd)
  860. }
  861. sort.Slice(newTables, func(i, j int) bool {
  862. return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
  863. })
  864. return newTables, func() error { return decrRefs(newTables) }, nil
  865. }
  866. func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeSet {
  867. changes := []*pb.ManifestChange{}
  868. for _, table := range newTables {
  869. changes = append(changes,
  870. newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType()))
  871. }
  872. for _, table := range cd.top {
  873. // Add a delete change only if the table is not in memory.
  874. if !table.IsInmemory {
  875. changes = append(changes, newDeleteChange(table.ID()))
  876. }
  877. }
  878. for _, table := range cd.bot {
  879. changes = append(changes, newDeleteChange(table.ID()))
  880. }
  881. return pb.ManifestChangeSet{Changes: changes}
  882. }
  883. func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
  884. for _, prefix := range listOfPrefixes {
  885. if bytes.HasPrefix(s, prefix) {
  886. return true
  887. }
  888. }
  889. return false
  890. }
  891. func containsPrefix(table *table.Table, prefix []byte) bool {
  892. smallValue := table.Smallest()
  893. largeValue := table.Biggest()
  894. if bytes.HasPrefix(smallValue, prefix) {
  895. return true
  896. }
  897. if bytes.HasPrefix(largeValue, prefix) {
  898. return true
  899. }
  900. isPresent := func() bool {
  901. ti := table.NewIterator(0)
  902. defer ti.Close()
  903. // In table iterator's Seek, we assume that key has version in last 8 bytes. We set
  904. // version=0 (ts=math.MaxUint64), so that we don't skip the key prefixed with prefix.
  905. ti.Seek(y.KeyWithTs(prefix, math.MaxUint64))
  906. return bytes.HasPrefix(ti.Key(), prefix)
  907. }
  908. if bytes.Compare(prefix, smallValue) > 0 &&
  909. bytes.Compare(prefix, largeValue) < 0 {
  910. // There may be a case when table contains [0x0000,...., 0xffff]. If we are searching for
  911. // k=0x0011, we should not directly infer that k is present. It may not be present.
  912. return isPresent()
  913. }
  914. return false
  915. }
  916. func containsAnyPrefixes(table *table.Table, listOfPrefixes [][]byte) bool {
  917. for _, prefix := range listOfPrefixes {
  918. if containsPrefix(table, prefix) {
  919. return true
  920. }
  921. }
  922. return false
  923. }
  924. type compactDef struct {
  925. compactorId int
  926. t targets
  927. p compactionPriority
  928. thisLevel *levelHandler
  929. nextLevel *levelHandler
  930. top []*table.Table
  931. bot []*table.Table
  932. thisRange keyRange
  933. nextRange keyRange
  934. splits []keyRange
  935. thisSize int64
  936. dropPrefixes [][]byte
  937. }
  938. // addSplits can allow us to run multiple sub-compactions in parallel across the split key ranges.
  939. func (s *levelsController) addSplits(cd *compactDef) {
  940. cd.splits = cd.splits[:0]
  941. // Let's say we have 10 tables in cd.bot and min width = 3. Then, we'll pick
  942. // 0, 1, 2 (pick), 3, 4, 5 (pick), 6, 7, 8 (pick), 9 (pick, because last table).
  943. // This gives us 4 picks for 10 tables.
  944. // In an edge case, 142 tables in bottom led to 48 splits. That's too many splits, because it
  945. // then uses up a lot of memory for table builder.
  946. // We should keep it so we have at max 5 splits.
  947. width := int(math.Ceil(float64(len(cd.bot)) / 5.0))
  948. if width < 3 {
  949. width = 3
  950. }
  951. skr := cd.thisRange
  952. skr.extend(cd.nextRange)
  953. addRange := func(right []byte) {
  954. skr.right = y.Copy(right)
  955. cd.splits = append(cd.splits, skr)
  956. skr.left = skr.right
  957. }
  958. for i, t := range cd.bot {
  959. // last entry in bottom table.
  960. if i == len(cd.bot)-1 {
  961. addRange([]byte{})
  962. return
  963. }
  964. if i%width == width-1 {
  965. // Right is assigned ts=0. The encoding ts bytes take MaxUint64-ts,
  966. // so, those with smaller TS will be considered larger for the same key.
  967. // Consider the following.
  968. // Top table is [A1...C3(deleted)]
  969. // bot table is [B1....C2]
  970. // It will generate a split [A1 ... C0], including any records of Key C.
  971. right := y.KeyWithTs(y.ParseKey(t.Biggest()), 0)
  972. addRange(right)
  973. }
  974. }
  975. }
  976. func (cd *compactDef) lockLevels() {
  977. cd.thisLevel.RLock()
  978. cd.nextLevel.RLock()
  979. }
  980. func (cd *compactDef) unlockLevels() {
  981. cd.nextLevel.RUnlock()
  982. cd.thisLevel.RUnlock()
  983. }
  984. func (cd *compactDef) allTables() []*table.Table {
  985. ret := make([]*table.Table, 0, len(cd.top)+len(cd.bot))
  986. ret = append(ret, cd.top...)
  987. ret = append(ret, cd.bot...)
  988. return ret
  989. }
  990. func (s *levelsController) fillTablesL0ToL0(cd *compactDef) bool {
  991. if cd.compactorId != 0 {
  992. // Only compactor zero can work on this.
  993. return false
  994. }
  995. cd.nextLevel = s.levels[0]
  996. cd.nextRange = keyRange{}
  997. cd.bot = nil
  998. // Because this level and next level are both level 0, we should NOT acquire
  999. // the read lock twice, because it can result in a deadlock. So, we don't
  1000. // call compactDef.lockLevels, instead locking the level only once and
  1001. // directly here.
  1002. //
  1003. // As per godocs on RWMutex:
  1004. // If a goroutine holds a RWMutex for reading and another goroutine might
  1005. // call Lock, no goroutine should expect to be able to acquire a read lock
  1006. // until the initial read lock is released. In particular, this prohibits
  1007. // recursive read locking. This is to ensure that the lock eventually
  1008. // becomes available; a blocked Lock call excludes new readers from
  1009. // acquiring the lock.
  1010. y.AssertTrue(cd.thisLevel.level == 0)
  1011. y.AssertTrue(cd.nextLevel.level == 0)
  1012. s.levels[0].RLock()
  1013. defer s.levels[0].RUnlock()
  1014. s.cstatus.Lock()
  1015. defer s.cstatus.Unlock()
  1016. top := cd.thisLevel.tables
  1017. var out []*table.Table
  1018. now := time.Now()
  1019. for _, t := range top {
  1020. if t.Size() >= 2*cd.t.fileSz[0] {
  1021. // This file is already big, don't include it.
  1022. continue
  1023. }
  1024. if now.Sub(t.CreatedAt) < 10*time.Second {
  1025. // Just created it 10s ago. Don't pick for compaction.
  1026. continue
  1027. }
  1028. if _, beingCompacted := s.cstatus.tables[t.ID()]; beingCompacted {
  1029. continue
  1030. }
  1031. out = append(out, t)
  1032. }
  1033. if len(out) < 4 {
  1034. // If we don't have enough tables to merge in L0, don't do it.
  1035. return false
  1036. }
  1037. cd.thisRange = infRange
  1038. cd.top = out
  1039. // Avoid any other L0 -> Lbase from happening, while this is going on.
  1040. thisLevel := s.cstatus.levels[cd.thisLevel.level]
  1041. thisLevel.ranges = append(thisLevel.ranges, infRange)
  1042. for _, t := range out {
  1043. s.cstatus.tables[t.ID()] = struct{}{}
  1044. }
  1045. // For L0->L0 compaction, we set the target file size to max, so the output is always one file.
  1046. // This significantly decreases the L0 table stalls and improves the performance.
  1047. cd.t.fileSz[0] = math.MaxUint32
  1048. return true
  1049. }
  1050. func (s *levelsController) fillTablesL0ToLbase(cd *compactDef) bool {
  1051. if cd.nextLevel.level == 0 {
  1052. panic("Base level can't be zero.")
  1053. }
  1054. // We keep cd.p.adjusted > 0.0 here to allow functions in db.go to artificially trigger
  1055. // L0->Lbase compactions. Those functions wouldn't be setting the adjusted score.
  1056. if cd.p.adjusted > 0.0 && cd.p.adjusted < 1.0 {
  1057. // Do not compact to Lbase if adjusted score is less than 1.0.
  1058. return false
  1059. }
  1060. cd.lockLevels()
  1061. defer cd.unlockLevels()
  1062. top := cd.thisLevel.tables
  1063. if len(top) == 0 {
  1064. return false
  1065. }
  1066. var out []*table.Table
  1067. if len(cd.dropPrefixes) > 0 {
  1068. // Use all tables if drop prefix is set. We don't want to compact only a
  1069. // sub-range. We want to compact all the tables.
  1070. out = top
  1071. } else {
  1072. var kr keyRange
  1073. // cd.top[0] is the oldest file. So we start from the oldest file first.
  1074. for _, t := range top {
  1075. dkr := getKeyRange(t)
  1076. if kr.overlapsWith(dkr) {
  1077. out = append(out, t)
  1078. kr.extend(dkr)
  1079. } else {
  1080. break
  1081. }
  1082. }
  1083. }
  1084. cd.thisRange = getKeyRange(out...)
  1085. cd.top = out
  1086. left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
  1087. cd.bot = make([]*table.Table, right-left)
  1088. copy(cd.bot, cd.nextLevel.tables[left:right])
  1089. if len(cd.bot) == 0 {
  1090. cd.nextRange = cd.thisRange
  1091. } else {
  1092. cd.nextRange = getKeyRange(cd.bot...)
  1093. }
  1094. return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
  1095. }
  1096. // fillTablesL0 would try to fill tables from L0 to be compacted with Lbase. If
  1097. // it can not do that, it would try to compact tables from L0 -> L0.
  1098. //
  1099. // Say L0 has 10 tables.
  1100. // fillTablesL0ToLbase picks up 5 tables to compact from L0 -> L5.
  1101. // Next call to fillTablesL0 would run L0ToLbase again, which fails this time.
  1102. // So, instead, we run fillTablesL0ToL0, which picks up rest of the 5 tables to
  1103. // be compacted within L0. Additionally, it would set the compaction range in
  1104. // cstatus to inf, so no other L0 -> Lbase compactions can happen.
  1105. // Thus, L0 -> L0 must finish for the next L0 -> Lbase to begin.
  1106. func (s *levelsController) fillTablesL0(cd *compactDef) bool {
  1107. if ok := s.fillTablesL0ToLbase(cd); ok {
  1108. return true
  1109. }
  1110. return s.fillTablesL0ToL0(cd)
  1111. }
  1112. // sortByStaleData sorts tables based on the amount of stale data they have.
  1113. // This is useful in removing tombstones.
  1114. func (s *levelsController) sortByStaleDataSize(tables []*table.Table, cd *compactDef) {
  1115. if len(tables) == 0 || cd.nextLevel == nil {
  1116. return
  1117. }
  1118. sort.Slice(tables, func(i, j int) bool {
  1119. return tables[i].StaleDataSize() > tables[j].StaleDataSize()
  1120. })
  1121. }
  1122. // sortByHeuristic sorts tables in increasing order of MaxVersion, so we
  1123. // compact older tables first.
  1124. func (s *levelsController) sortByHeuristic(tables []*table.Table, cd *compactDef) {
  1125. if len(tables) == 0 || cd.nextLevel == nil {
  1126. return
  1127. }
  1128. // Sort tables by max version. This is what RocksDB does.
  1129. sort.Slice(tables, func(i, j int) bool {
  1130. return tables[i].MaxVersion() < tables[j].MaxVersion()
  1131. })
  1132. }
  1133. // This function should be called with lock on levels.
  1134. func (s *levelsController) fillMaxLevelTables(tables []*table.Table, cd *compactDef) bool {
  1135. sortedTables := make([]*table.Table, len(tables))
  1136. copy(sortedTables, tables)
  1137. s.sortByStaleDataSize(sortedTables, cd)
  1138. if len(sortedTables) > 0 && sortedTables[0].StaleDataSize() == 0 {
  1139. // This is a maxLevel to maxLevel compaction and we don't have any stale data.
  1140. return false
  1141. }
  1142. cd.bot = []*table.Table{}
  1143. collectBotTables := func(t *table.Table, needSz int64) {
  1144. totalSize := t.Size()
  1145. j := sort.Search(len(tables), func(i int) bool {
  1146. return y.CompareKeys(tables[i].Smallest(), t.Smallest()) >= 0
  1147. })
  1148. y.AssertTrue(tables[j].ID() == t.ID())
  1149. j++
  1150. // Collect tables until we reach the the required size.
  1151. for j < len(tables) {
  1152. newT := tables[j]
  1153. totalSize += newT.Size()
  1154. if totalSize >= needSz {
  1155. break
  1156. }
  1157. cd.bot = append(cd.bot, newT)
  1158. cd.nextRange.extend(getKeyRange(newT))
  1159. j++
  1160. }
  1161. }
  1162. now := time.Now()
  1163. for _, t := range sortedTables {
  1164. // If the maxVersion is above the discardTs, we won't clean anything in
  1165. // the compaction. So skip this table.
  1166. if t.MaxVersion() > s.kv.orc.discardAtOrBelow() {
  1167. continue
  1168. }
  1169. if now.Sub(t.CreatedAt) < time.Hour {
  1170. // Just created it an hour ago. Don't pick for compaction.
  1171. continue
  1172. }
  1173. // If the stale data size is less than 10 MB, it might not be worth
  1174. // rewriting the table. Skip it.
  1175. if t.StaleDataSize() < 10<<20 {
  1176. continue
  1177. }
  1178. cd.thisSize = t.Size()
  1179. cd.thisRange = getKeyRange(t)
  1180. // Set the next range as the same as the current range. If we don't do
  1181. // this, we won't be able to run more than one max level compactions.
  1182. cd.nextRange = cd.thisRange
  1183. // If we're already compacting this range, don't do anything.
  1184. if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
  1185. continue
  1186. }
  1187. // Found a valid table!
  1188. cd.top = []*table.Table{t}
  1189. needFileSz := cd.t.fileSz[cd.thisLevel.level]
  1190. // The table size is what we want so no need to collect more tables.
  1191. if t.Size() >= needFileSz {
  1192. break
  1193. }
  1194. // TableSize is less than what we want. Collect more tables for compaction.
  1195. // If the level has multiple small tables, we collect all of them
  1196. // together to form a bigger table.
  1197. collectBotTables(t, needFileSz)
  1198. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  1199. cd.bot = cd.bot[:0]
  1200. cd.nextRange = keyRange{}
  1201. continue
  1202. }
  1203. return true
  1204. }
  1205. if len(cd.top) == 0 {
  1206. return false
  1207. }
  1208. return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
  1209. }
  1210. func (s *levelsController) fillTables(cd *compactDef) bool {
  1211. cd.lockLevels()
  1212. defer cd.unlockLevels()
  1213. tables := make([]*table.Table, len(cd.thisLevel.tables))
  1214. copy(tables, cd.thisLevel.tables)
  1215. if len(tables) == 0 {
  1216. return false
  1217. }
  1218. // We're doing a maxLevel to maxLevel compaction. Pick tables based on the stale data size.
  1219. if cd.thisLevel.isLastLevel() {
  1220. return s.fillMaxLevelTables(tables, cd)
  1221. }
  1222. // We pick tables, so we compact older tables first. This is similar to
  1223. // kOldestLargestSeqFirst in RocksDB.
  1224. s.sortByHeuristic(tables, cd)
  1225. for _, t := range tables {
  1226. cd.thisSize = t.Size()
  1227. cd.thisRange = getKeyRange(t)
  1228. // If we're already compacting this range, don't do anything.
  1229. if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
  1230. continue
  1231. }
  1232. cd.top = []*table.Table{t}
  1233. left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
  1234. cd.bot = make([]*table.Table, right-left)
  1235. copy(cd.bot, cd.nextLevel.tables[left:right])
  1236. if len(cd.bot) == 0 {
  1237. cd.bot = []*table.Table{}
  1238. cd.nextRange = cd.thisRange
  1239. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  1240. continue
  1241. }
  1242. return true
  1243. }
  1244. cd.nextRange = getKeyRange(cd.bot...)
  1245. if s.cstatus.overlapsWith(cd.nextLevel.level, cd.nextRange) {
  1246. continue
  1247. }
  1248. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  1249. continue
  1250. }
  1251. return true
  1252. }
  1253. return false
  1254. }
  1255. func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) {
  1256. if len(cd.t.fileSz) == 0 {
  1257. return errors.New("Filesizes cannot be zero. Targets are not set")
  1258. }
  1259. timeStart := time.Now()
  1260. thisLevel := cd.thisLevel
  1261. nextLevel := cd.nextLevel
  1262. y.AssertTrue(len(cd.splits) == 0)
  1263. if thisLevel.level == nextLevel.level {
  1264. // don't do anything for L0 -> L0 and Lmax -> Lmax.
  1265. } else {
  1266. s.addSplits(&cd)
  1267. }
  1268. if len(cd.splits) == 0 {
  1269. cd.splits = append(cd.splits, keyRange{})
  1270. }
  1271. // Table should never be moved directly between levels,
  1272. // always be rewritten to allow discarding invalid versions.
  1273. newTables, decr, err := s.compactBuildTables(l, cd)
  1274. if err != nil {
  1275. return err
  1276. }
  1277. defer func() {
  1278. // Only assign to err, if it's not already nil.
  1279. if decErr := decr(); err == nil {
  1280. err = decErr
  1281. }
  1282. }()
  1283. changeSet := buildChangeSet(&cd, newTables)
  1284. // We write to the manifest _before_ we delete files (and after we created files)
  1285. if err := s.kv.manifest.addChanges(changeSet.Changes); err != nil {
  1286. return err
  1287. }
  1288. getSizes := func(tables []*table.Table) int64 {
  1289. size := int64(0)
  1290. for _, i := range tables {
  1291. size += i.Size()
  1292. }
  1293. return size
  1294. }
  1295. sizeNewTables := int64(0)
  1296. sizeOldTables := int64(0)
  1297. if s.kv.opt.MetricsEnabled {
  1298. sizeNewTables = getSizes(newTables)
  1299. sizeOldTables = getSizes(cd.bot) + getSizes(cd.top)
  1300. y.NumBytesCompactionWrittenAdd(s.kv.opt.MetricsEnabled, nextLevel.strLevel, sizeNewTables)
  1301. }
  1302. // See comment earlier in this function about the ordering of these ops, and the order in which
  1303. // we access levels when reading.
  1304. if err := nextLevel.replaceTables(cd.bot, newTables); err != nil {
  1305. return err
  1306. }
  1307. if err := thisLevel.deleteTables(cd.top); err != nil {
  1308. return err
  1309. }
  1310. // Note: For level 0, while doCompact is running, it is possible that new tables are added.
  1311. // However, the tables are added only to the end, so it is ok to just delete the first table.
  1312. from := append(tablesToString(cd.top), tablesToString(cd.bot)...)
  1313. to := tablesToString(newTables)
  1314. if dur := time.Since(timeStart); dur > 2*time.Second {
  1315. var expensive string
  1316. if dur > time.Second {
  1317. expensive = " [E]"
  1318. }
  1319. s.kv.opt.Infof("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)."+
  1320. " [%s] -> [%s], took %v\n, deleted %d bytes",
  1321. id, expensive, thisLevel.level, nextLevel.level, len(cd.top), len(cd.bot),
  1322. len(newTables), len(cd.splits), strings.Join(from, " "), strings.Join(to, " "),
  1323. dur.Round(time.Millisecond), sizeOldTables-sizeNewTables)
  1324. }
  1325. if cd.thisLevel.level != 0 && len(newTables) > 2*s.kv.opt.LevelSizeMultiplier {
  1326. s.kv.opt.Infof("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
  1327. len(cd.top), hex.Dump(cd.thisRange.left), hex.Dump(cd.thisRange.right))
  1328. s.kv.opt.Infof("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
  1329. len(cd.bot), hex.Dump(cd.nextRange.left), hex.Dump(cd.nextRange.right))
  1330. }
  1331. return nil
  1332. }
  1333. func tablesToString(tables []*table.Table) []string {
  1334. var res []string
  1335. for _, t := range tables {
  1336. res = append(res, fmt.Sprintf("%05d", t.ID()))
  1337. }
  1338. res = append(res, ".")
  1339. return res
  1340. }
  1341. var errFillTables = stderrors.New("Unable to fill tables")
  1342. // doCompact picks some table on level l and compacts it away to the next level.
  1343. func (s *levelsController) doCompact(id int, p compactionPriority) error {
  1344. l := p.level
  1345. y.AssertTrue(l < s.kv.opt.MaxLevels) // Sanity check.
  1346. if p.t.baseLevel == 0 {
  1347. p.t = s.levelTargets()
  1348. }
  1349. _, span := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
  1350. defer span.End()
  1351. cd := compactDef{
  1352. compactorId: id,
  1353. p: p,
  1354. t: p.t,
  1355. thisLevel: s.levels[l],
  1356. dropPrefixes: p.dropPrefixes,
  1357. }
  1358. // While picking tables to be compacted, both levels' tables are expected to
  1359. // remain unchanged.
  1360. if l == 0 {
  1361. cd.nextLevel = s.levels[p.t.baseLevel]
  1362. if !s.fillTablesL0(&cd) {
  1363. return errFillTables
  1364. }
  1365. } else {
  1366. cd.nextLevel = cd.thisLevel
  1367. // We're not compacting the last level so pick the next level.
  1368. if !cd.thisLevel.isLastLevel() {
  1369. cd.nextLevel = s.levels[l+1]
  1370. }
  1371. if !s.fillTables(&cd) {
  1372. return errFillTables
  1373. }
  1374. }
  1375. defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
  1376. span.SetAttributes(attribute.String("Compaction", fmt.Sprintf("%+v", cd)))
  1377. if err := s.runCompactDef(id, l, cd); err != nil {
  1378. // This compaction couldn't be done successfully.
  1379. s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
  1380. return err
  1381. }
  1382. span.SetAttributes(
  1383. attribute.Int("Top tables count", len(cd.top)),
  1384. attribute.Int("Bottom tables count", len(cd.bot)))
  1385. s.kv.opt.Debugf("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
  1386. return nil
  1387. }
  1388. func (s *levelsController) addLevel0Table(t *table.Table) error {
  1389. // Add table to manifest file only if it is not opened in memory. We don't want to add a table
  1390. // to the manifest file if it exists only in memory.
  1391. if !t.IsInmemory {
  1392. // We update the manifest _before_ the table becomes part of a levelHandler, because at that
  1393. // point it could get used in some compaction. This ensures the manifest file gets updated in
  1394. // the proper order. (That means this update happens before that of some compaction which
  1395. // deletes the table.)
  1396. err := s.kv.manifest.addChanges([]*pb.ManifestChange{
  1397. newCreateChange(t.ID(), 0, t.KeyID(), t.CompressionType()),
  1398. })
  1399. if err != nil {
  1400. return err
  1401. }
  1402. }
  1403. for !s.levels[0].tryAddLevel0Table(t) {
  1404. // Before we unstall, we need to make sure that level 0 is healthy.
  1405. timeStart := time.Now()
  1406. for s.levels[0].numTables() >= s.kv.opt.NumLevelZeroTablesStall {
  1407. time.Sleep(10 * time.Millisecond)
  1408. }
  1409. dur := time.Since(timeStart)
  1410. if dur > time.Second {
  1411. s.kv.opt.Infof("L0 was stalled for %s\n", dur.Round(time.Millisecond))
  1412. }
  1413. s.l0stallsMs.Add(int64(dur.Round(time.Millisecond)))
  1414. }
  1415. return nil
  1416. }
  1417. func (s *levelsController) close() error {
  1418. err := s.cleanupLevels()
  1419. return y.Wrap(err, "levelsController.Close")
  1420. }
  1421. // get searches for a given key in all the levels of the LSM tree. It returns
  1422. // key version <= the expected version (version in key). If not found,
  1423. // it returns an empty y.ValueStruct.
  1424. func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) (
  1425. y.ValueStruct, error) {
  1426. if s.kv.IsClosed() {
  1427. return y.ValueStruct{}, ErrDBClosed
  1428. }
  1429. // It's important that we iterate the levels from 0 on upward. The reason is, if we iterated
  1430. // in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could
  1431. // read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do
  1432. // parallelize this, we will need to call the h.RLock() function by increasing order of level
  1433. // number.)
  1434. version := y.ParseTs(key)
  1435. for _, h := range s.levels {
  1436. // Ignore all levels below startLevel. This is useful for GC when L0 is kept in memory.
  1437. if h.level < startLevel {
  1438. continue
  1439. }
  1440. vs, err := h.get(key) // Calls h.RLock() and h.RUnlock().
  1441. if err != nil {
  1442. return y.ValueStruct{}, y.Wrapf(err, "get key: %q", key)
  1443. }
  1444. if vs.Value == nil && vs.Meta == 0 {
  1445. continue
  1446. }
  1447. y.NumBytesReadsLSMAdd(s.kv.opt.MetricsEnabled, int64(len(vs.Value)))
  1448. if vs.Version == version {
  1449. return vs, nil
  1450. }
  1451. if maxVs.Version < vs.Version {
  1452. maxVs = vs
  1453. }
  1454. }
  1455. if len(maxVs.Value) > 0 {
  1456. y.NumGetsWithResultsAdd(s.kv.opt.MetricsEnabled, 1)
  1457. }
  1458. return maxVs, nil
  1459. }
  1460. func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
  1461. for i := len(th) - 1; i >= 0; i-- {
  1462. // This will increment the reference of the table handler.
  1463. out = append(out, th[i].NewIterator(opt))
  1464. }
  1465. return out
  1466. }
  1467. // appendIterators appends iterators to an array of iterators, for merging.
  1468. // Note: This obtains references for the table handlers. Remember to close these iterators.
  1469. func (s *levelsController) appendIterators(
  1470. iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
  1471. // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
  1472. // data when there's a compaction.
  1473. for _, level := range s.levels {
  1474. iters = level.appendIterators(iters, opt)
  1475. }
  1476. return iters
  1477. }
  1478. // TableInfo represents the information about a table.
  1479. type TableInfo struct {
  1480. ID uint64
  1481. Level int
  1482. Left []byte
  1483. Right []byte
  1484. KeyCount uint32 // Number of keys in the table
  1485. OnDiskSize uint32
  1486. StaleDataSize uint32
  1487. UncompressedSize uint32
  1488. MaxVersion uint64
  1489. IndexSz int
  1490. BloomFilterSize int
  1491. }
  1492. func (s *levelsController) getTableInfo() (result []TableInfo) {
  1493. for _, l := range s.levels {
  1494. l.RLock()
  1495. for _, t := range l.tables {
  1496. info := TableInfo{
  1497. ID: t.ID(),
  1498. Level: l.level,
  1499. Left: t.Smallest(),
  1500. Right: t.Biggest(),
  1501. KeyCount: t.KeyCount(),
  1502. OnDiskSize: t.OnDiskSize(),
  1503. StaleDataSize: t.StaleDataSize(),
  1504. IndexSz: t.IndexSize(),
  1505. BloomFilterSize: t.BloomFilterSize(),
  1506. UncompressedSize: t.UncompressedSize(),
  1507. MaxVersion: t.MaxVersion(),
  1508. }
  1509. result = append(result, info)
  1510. }
  1511. l.RUnlock()
  1512. }
  1513. sort.Slice(result, func(i, j int) bool {
  1514. if result[i].Level != result[j].Level {
  1515. return result[i].Level < result[j].Level
  1516. }
  1517. return result[i].ID < result[j].ID
  1518. })
  1519. return
  1520. }
  1521. type LevelInfo struct {
  1522. Level int
  1523. NumTables int
  1524. Size int64
  1525. TargetSize int64
  1526. TargetFileSize int64
  1527. IsBaseLevel bool
  1528. Score float64
  1529. Adjusted float64
  1530. StaleDatSize int64
  1531. }
  1532. func (s *levelsController) getLevelInfo() []LevelInfo {
  1533. t := s.levelTargets()
  1534. prios := s.pickCompactLevels(nil)
  1535. result := make([]LevelInfo, len(s.levels))
  1536. for i, l := range s.levels {
  1537. l.RLock()
  1538. result[i].Level = i
  1539. result[i].Size = l.totalSize
  1540. result[i].NumTables = len(l.tables)
  1541. result[i].StaleDatSize = l.totalStaleSize
  1542. l.RUnlock()
  1543. result[i].TargetSize = t.targetSz[i]
  1544. result[i].TargetFileSize = t.fileSz[i]
  1545. result[i].IsBaseLevel = t.baseLevel == i
  1546. }
  1547. for _, p := range prios {
  1548. result[p.level].Score = p.score
  1549. result[p.level].Adjusted = p.adjusted
  1550. }
  1551. return result
  1552. }
  1553. // verifyChecksum verifies checksum for all tables on all levels.
  1554. func (s *levelsController) verifyChecksum() error {
  1555. var tables []*table.Table
  1556. for _, l := range s.levels {
  1557. l.RLock()
  1558. tables = tables[:0]
  1559. for _, t := range l.tables {
  1560. tables = append(tables, t)
  1561. t.IncrRef()
  1562. }
  1563. l.RUnlock()
  1564. for _, t := range tables {
  1565. errChkVerify := t.VerifyChecksum()
  1566. if err := t.DecrRef(); err != nil {
  1567. s.kv.opt.Errorf("unable to decrease reference of table: %s while "+
  1568. "verifying checksum with error: %s", t.Filename(), err)
  1569. }
  1570. if errChkVerify != nil {
  1571. return errChkVerify
  1572. }
  1573. }
  1574. }
  1575. return nil
  1576. }
  1577. // Returns the sorted list of splits for all the levels and tables based
  1578. // on the block offsets.
  1579. func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
  1580. splits := make([]string, 0)
  1581. for _, l := range s.levels {
  1582. l.RLock()
  1583. for _, t := range l.tables {
  1584. tableSplits := t.KeySplits(numPerTable, prefix)
  1585. splits = append(splits, tableSplits...)
  1586. }
  1587. l.RUnlock()
  1588. }
  1589. sort.Strings(splits)
  1590. return splits
  1591. }