levels.go 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. "encoding/hex"
  10. "errors"
  11. "fmt"
  12. "math"
  13. "math/rand"
  14. "os"
  15. "sort"
  16. "strings"
  17. "sync"
  18. "sync/atomic"
  19. "time"
  20. "go.opentelemetry.io/otel"
  21. "go.opentelemetry.io/otel/attribute"
  22. "github.com/dgraph-io/badger/v4/pb"
  23. "github.com/dgraph-io/badger/v4/table"
  24. "github.com/dgraph-io/badger/v4/y"
  25. "github.com/dgraph-io/ristretto/v2/z"
  26. )
  27. type levelsController struct {
  28. nextFileID atomic.Uint64
  29. l0stallsMs atomic.Int64
  30. // The following are initialized once and const.
  31. levels []*levelHandler
  32. kv *DB
  33. cstatus compactStatus
  34. }
  35. // revertToManifest checks that all necessary table files exist and removes all table files not
  36. // referenced by the manifest. idMap is a set of table file id's that were read from the directory
  37. // listing.
  38. func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error {
  39. // 1. Check all files in manifest exist.
  40. for id := range mf.Tables {
  41. if _, ok := idMap[id]; !ok {
  42. return fmt.Errorf("file does not exist for table %d", id)
  43. }
  44. }
  45. // 2. Delete files that shouldn't exist.
  46. for id := range idMap {
  47. if _, ok := mf.Tables[id]; !ok {
  48. kv.opt.Debugf("Table file %d not referenced in MANIFEST\n", id)
  49. filename := table.NewFilename(id, kv.opt.Dir)
  50. if err := os.Remove(filename); err != nil {
  51. return y.Wrapf(err, "While removing table %d", id)
  52. }
  53. }
  54. }
  55. return nil
  56. }
  57. func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
  58. y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
  59. s := &levelsController{
  60. kv: db,
  61. levels: make([]*levelHandler, db.opt.MaxLevels),
  62. }
  63. s.cstatus.tables = make(map[uint64]struct{})
  64. s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
  65. for i := 0; i < db.opt.MaxLevels; i++ {
  66. s.levels[i] = newLevelHandler(db, i)
  67. s.cstatus.levels[i] = new(levelCompactStatus)
  68. }
  69. if db.opt.InMemory {
  70. return s, nil
  71. }
  72. // Compare manifest against directory, check for existent/non-existent files, and remove.
  73. if err := revertToManifest(db, mf, getIDMap(db.opt.Dir)); err != nil {
  74. return nil, err
  75. }
  76. var mu sync.Mutex
  77. tables := make([][]*table.Table, db.opt.MaxLevels)
  78. var maxFileID uint64
  79. // We found that using 3 goroutines allows disk throughput to be utilized to its max.
  80. // Disk utilization is the main thing we should focus on, while trying to read the data. That's
  81. // the one factor that remains constant between HDD and SSD.
  82. throttle := y.NewThrottle(3)
  83. start := time.Now()
  84. var numOpened atomic.Int32
  85. tick := time.NewTicker(3 * time.Second)
  86. defer tick.Stop()
  87. for fileID, tf := range mf.Tables {
  88. fname := table.NewFilename(fileID, db.opt.Dir)
  89. select {
  90. case <-tick.C:
  91. db.opt.Infof("%d tables out of %d opened in %s\n", numOpened.Load(),
  92. len(mf.Tables), time.Since(start).Round(time.Millisecond))
  93. default:
  94. }
  95. if err := throttle.Do(); err != nil {
  96. closeAllTables(tables)
  97. return nil, err
  98. }
  99. if fileID > maxFileID {
  100. maxFileID = fileID
  101. }
  102. go func(fname string, tf TableManifest) {
  103. var rerr error
  104. defer func() {
  105. throttle.Done(rerr)
  106. numOpened.Add(1)
  107. }()
  108. dk, err := db.registry.DataKey(tf.KeyID)
  109. if err != nil {
  110. rerr = y.Wrapf(err, "Error while reading datakey")
  111. return
  112. }
  113. topt := buildTableOptions(db)
  114. // Explicitly set Compression and DataKey based on how the table was generated.
  115. topt.Compression = tf.Compression
  116. topt.DataKey = dk
  117. mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
  118. if err != nil {
  119. rerr = y.Wrapf(err, "Opening file: %q", fname)
  120. return
  121. }
  122. t, err := table.OpenTable(mf, topt)
  123. if err != nil {
  124. if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
  125. db.opt.Errorf(err.Error())
  126. db.opt.Errorf("Ignoring table %s", mf.Fd.Name())
  127. // Do not set rerr. We will continue without this table.
  128. } else {
  129. rerr = y.Wrapf(err, "Opening table: %q", fname)
  130. }
  131. return
  132. }
  133. mu.Lock()
  134. tables[tf.Level] = append(tables[tf.Level], t)
  135. mu.Unlock()
  136. }(fname, tf)
  137. }
  138. if err := throttle.Finish(); err != nil {
  139. closeAllTables(tables)
  140. return nil, err
  141. }
  142. db.opt.Infof("All %d tables opened in %s\n", numOpened.Load(),
  143. time.Since(start).Round(time.Millisecond))
  144. s.nextFileID.Store(maxFileID + 1)
  145. for i, tbls := range tables {
  146. s.levels[i].initTables(tbls)
  147. }
  148. // Make sure key ranges do not overlap etc.
  149. if err := s.validate(); err != nil {
  150. _ = s.cleanupLevels()
  151. return nil, y.Wrap(err, "Level validation")
  152. }
  153. // Sync directory (because we have at least removed some files, or previously created the
  154. // manifest file).
  155. if err := syncDir(db.opt.Dir); err != nil {
  156. _ = s.close()
  157. return nil, err
  158. }
  159. return s, nil
  160. }
  161. // Closes the tables, for cleanup in newLevelsController. (We Close() instead of using DecrRef()
  162. // because that would delete the underlying files.) We ignore errors, which is OK because tables
  163. // are read-only.
  164. func closeAllTables(tables [][]*table.Table) {
  165. for _, tableSlice := range tables {
  166. for _, table := range tableSlice {
  167. _ = table.Close(-1)
  168. }
  169. }
  170. }
  171. func (s *levelsController) cleanupLevels() error {
  172. var firstErr error
  173. for _, l := range s.levels {
  174. if err := l.close(); err != nil && firstErr == nil {
  175. firstErr = err
  176. }
  177. }
  178. return firstErr
  179. }
  180. // dropTree picks all tables from all levels, creates a manifest changeset,
  181. // applies it, and then decrements the refs of these tables, which would result
  182. // in their deletion.
  183. func (s *levelsController) dropTree() (int, error) {
  184. // First pick all tables, so we can create a manifest changelog.
  185. var all []*table.Table
  186. for _, l := range s.levels {
  187. l.RLock()
  188. all = append(all, l.tables...)
  189. l.RUnlock()
  190. }
  191. if len(all) == 0 {
  192. return 0, nil
  193. }
  194. // Generate the manifest changes.
  195. changes := []*pb.ManifestChange{}
  196. for _, table := range all {
  197. // Add a delete change only if the table is not in memory.
  198. if !table.IsInmemory {
  199. changes = append(changes, newDeleteChange(table.ID()))
  200. }
  201. }
  202. changeSet := pb.ManifestChangeSet{Changes: changes}
  203. if err := s.kv.manifest.addChanges(changeSet.Changes, s.kv.opt); err != nil {
  204. return 0, err
  205. }
  206. // Now that manifest has been successfully written, we can delete the tables.
  207. for _, l := range s.levels {
  208. l.Lock()
  209. l.totalSize = 0
  210. l.tables = l.tables[:0]
  211. l.Unlock()
  212. }
  213. for _, table := range all {
  214. if err := table.DecrRef(); err != nil {
  215. return 0, err
  216. }
  217. }
  218. return len(all), nil
  219. }
  220. // dropPrefix runs a L0->L1 compaction, and then runs same level compaction on the rest of the
  221. // levels. For L0->L1 compaction, it runs compactions normally, but skips over
  222. // all the keys with the provided prefix.
  223. // For Li->Li compactions, it picks up the tables which would have the prefix. The
  224. // tables who only have keys with this prefix are quickly dropped. The ones which have other keys
  225. // are run through MergeIterator and compacted to create new tables. All the mechanisms of
  226. // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
  227. func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
  228. opt := s.kv.opt
  229. // Iterate levels in the reverse order because if we were to iterate from
  230. // lower level (say level 0) to a higher level (say level 3) we could have
  231. // a state in which level 0 is compacted and an older version of a key exists in lower level.
  232. // At this point, if someone creates an iterator, they would see an old
  233. // value for a key from lower levels. Iterating in reverse order ensures we
  234. // drop the oldest data first so that lookups never return stale data.
  235. for i := len(s.levels) - 1; i >= 0; i-- {
  236. l := s.levels[i]
  237. l.RLock()
  238. if l.level == 0 {
  239. size := len(l.tables)
  240. l.RUnlock()
  241. if size > 0 {
  242. cp := compactionPriority{
  243. level: 0,
  244. score: 1.74,
  245. // A unique number greater than 1.0 does two things. Helps identify this
  246. // function in logs, and forces a compaction.
  247. dropPrefixes: prefixes,
  248. }
  249. if err := s.doCompact(174, cp); err != nil {
  250. opt.Warningf("While compacting level 0: %v", err)
  251. return nil
  252. }
  253. }
  254. continue
  255. }
  256. // Build a list of compaction tableGroups affecting all the prefixes we
  257. // need to drop. We need to build tableGroups that satisfy the invariant that
  258. // bottom tables are consecutive.
  259. // tableGroup contains groups of consecutive tables.
  260. var tableGroups [][]*table.Table
  261. var tableGroup []*table.Table
  262. finishGroup := func() {
  263. if len(tableGroup) > 0 {
  264. tableGroups = append(tableGroups, tableGroup)
  265. tableGroup = nil
  266. }
  267. }
  268. for _, table := range l.tables {
  269. if containsAnyPrefixes(table, prefixes) {
  270. tableGroup = append(tableGroup, table)
  271. } else {
  272. finishGroup()
  273. }
  274. }
  275. finishGroup()
  276. l.RUnlock()
  277. if len(tableGroups) == 0 {
  278. continue
  279. }
  280. opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
  281. for _, operation := range tableGroups {
  282. cd := compactDef{
  283. thisLevel: l,
  284. nextLevel: l,
  285. top: nil,
  286. bot: operation,
  287. dropPrefixes: prefixes,
  288. t: s.levelTargets(),
  289. }
  290. _, span := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
  291. span.SetAttributes(attribute.Int("Compaction level", l.level))
  292. span.SetAttributes(attribute.String("Drop Prefixes", fmt.Sprintf("%v", prefixes)))
  293. cd.t.baseLevel = l.level
  294. if err := s.runCompactDef(-1, l.level, cd); err != nil {
  295. opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
  296. span.End()
  297. return err
  298. }
  299. span.SetAttributes(
  300. attribute.Int("Top tables count", len(cd.top)),
  301. attribute.Int("Bottom tables count", len(cd.bot)))
  302. span.End()
  303. }
  304. }
  305. return nil
  306. }
  307. func (s *levelsController) startCompact(lc *z.Closer) {
  308. n := s.kv.opt.NumCompactors
  309. lc.AddRunning(n - 1)
  310. for i := 0; i < n; i++ {
  311. go s.runCompactor(i, lc)
  312. }
  313. }
  314. type targets struct {
  315. baseLevel int
  316. targetSz []int64
  317. fileSz []int64
  318. }
  319. // levelTargets calculates the targets for levels in the LSM tree. The idea comes from Dynamic Level
  320. // Sizes ( https://rocksdb.org/blog/2015/07/23/dynamic-level.html ) in RocksDB. The sizes of levels
  321. // are calculated based on the size of the lowest level, typically L6. So, if L6 size is 1GB, then
  322. // L5 target size is 100MB, L4 target size is 10MB and so on.
  323. //
  324. // L0 files don't automatically go to L1. Instead, they get compacted to Lbase, where Lbase is
  325. // chosen based on the first level which is non-empty from top (check L1 through L6). For an empty
  326. // DB, that would be L6. So, L0 compactions go to L6, then L5, L4 and so on.
  327. //
  328. // Lbase is advanced to the upper levels when its target size exceeds BaseLevelSize. For
  329. // example, when L6 reaches 1.1GB, then L4 target sizes becomes 11MB, thus exceeding the
  330. // BaseLevelSize of 10MB. L3 would then become the new Lbase, with a target size of 1MB <
  331. // BaseLevelSize.
  332. func (s *levelsController) levelTargets() targets {
  333. adjust := func(sz int64) int64 {
  334. if sz < s.kv.opt.BaseLevelSize {
  335. return s.kv.opt.BaseLevelSize
  336. }
  337. return sz
  338. }
  339. t := targets{
  340. targetSz: make([]int64, len(s.levels)),
  341. fileSz: make([]int64, len(s.levels)),
  342. }
  343. // DB size is the size of the last level.
  344. dbSize := s.lastLevel().getTotalSize()
  345. for i := len(s.levels) - 1; i > 0; i-- {
  346. ltarget := adjust(dbSize)
  347. t.targetSz[i] = ltarget
  348. if t.baseLevel == 0 && ltarget <= s.kv.opt.BaseLevelSize {
  349. t.baseLevel = i
  350. }
  351. dbSize /= int64(s.kv.opt.LevelSizeMultiplier)
  352. }
  353. tsz := s.kv.opt.BaseTableSize
  354. for i := 0; i < len(s.levels); i++ {
  355. if i == 0 {
  356. // Use MemTableSize for Level 0. Because at Level 0, we stop compactions based on the
  357. // number of tables, not the size of the level. So, having a 1:1 size ratio between
  358. // memtable size and the size of L0 files is better than churning out 32 files per
  359. // memtable (assuming 64MB MemTableSize and 2MB BaseTableSize).
  360. t.fileSz[i] = s.kv.opt.MemTableSize
  361. } else if i <= t.baseLevel {
  362. t.fileSz[i] = tsz
  363. } else {
  364. tsz *= int64(s.kv.opt.TableSizeMultiplier)
  365. t.fileSz[i] = tsz
  366. }
  367. }
  368. // Bring the base level down to the last empty level.
  369. for i := t.baseLevel + 1; i < len(s.levels)-1; i++ {
  370. if s.levels[i].getTotalSize() > 0 {
  371. break
  372. }
  373. t.baseLevel = i
  374. }
  375. // If the base level is empty and the next level size is less than the
  376. // target size, pick the next level as the base level.
  377. b := t.baseLevel
  378. lvl := s.levels
  379. if b < len(lvl)-1 && lvl[b].getTotalSize() == 0 && lvl[b+1].getTotalSize() < t.targetSz[b+1] {
  380. t.baseLevel++
  381. }
  382. return t
  383. }
  384. func (s *levelsController) runCompactor(id int, lc *z.Closer) {
  385. defer lc.Done()
  386. randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
  387. select {
  388. case <-randomDelay.C:
  389. case <-lc.HasBeenClosed():
  390. randomDelay.Stop()
  391. return
  392. }
  393. moveL0toFront := func(prios []compactionPriority) []compactionPriority {
  394. idx := -1
  395. for i, p := range prios {
  396. if p.level == 0 {
  397. idx = i
  398. break
  399. }
  400. }
  401. // If idx == -1, we didn't find L0.
  402. // If idx == 0, then we don't need to do anything. L0 is already at the front.
  403. if idx > 0 {
  404. out := append([]compactionPriority{}, prios[idx])
  405. out = append(out, prios[:idx]...)
  406. out = append(out, prios[idx+1:]...)
  407. return out
  408. }
  409. return prios
  410. }
  411. run := func(p compactionPriority) bool {
  412. err := s.doCompact(id, p)
  413. switch err {
  414. case nil:
  415. return true
  416. case errFillTables:
  417. // pass
  418. default:
  419. s.kv.opt.Warningf("While running doCompact: %v\n", err)
  420. }
  421. return false
  422. }
  423. var priosBuffer []compactionPriority
  424. runOnce := func() bool {
  425. prios := s.pickCompactLevels(priosBuffer)
  426. defer func() {
  427. priosBuffer = prios
  428. }()
  429. if id == 0 {
  430. // Worker ID zero prefers to compact L0 always.
  431. prios = moveL0toFront(prios)
  432. }
  433. for _, p := range prios {
  434. if id == 0 && p.level == 0 {
  435. // Allow worker zero to run level 0, irrespective of its adjusted score.
  436. } else if p.adjusted < 1.0 {
  437. break
  438. }
  439. if run(p) {
  440. return true
  441. }
  442. }
  443. return false
  444. }
  445. tryLmaxToLmaxCompaction := func() {
  446. p := compactionPriority{
  447. level: s.lastLevel().level,
  448. t: s.levelTargets(),
  449. }
  450. run(p)
  451. }
  452. count := 0
  453. ticker := time.NewTicker(50 * time.Millisecond)
  454. defer ticker.Stop()
  455. for {
  456. select {
  457. // Can add a done channel or other stuff.
  458. case <-ticker.C:
  459. count++
  460. // Each ticker is 50ms so 50*200=10seconds.
  461. if s.kv.opt.LmaxCompaction && id == 2 && count >= 200 {
  462. tryLmaxToLmaxCompaction()
  463. count = 0
  464. } else {
  465. runOnce()
  466. }
  467. case <-lc.HasBeenClosed():
  468. return
  469. }
  470. }
  471. }
  472. type compactionPriority struct {
  473. level int
  474. score float64
  475. adjusted float64
  476. dropPrefixes [][]byte
  477. t targets
  478. }
  479. func (s *levelsController) lastLevel() *levelHandler {
  480. return s.levels[len(s.levels)-1]
  481. }
  482. // pickCompactLevel determines which level to compact.
  483. // Based on: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
  484. // It tries to reuse priosBuffer to reduce memory allocation,
  485. // passing nil is acceptable, then new memory will be allocated.
  486. func (s *levelsController) pickCompactLevels(priosBuffer []compactionPriority) (prios []compactionPriority) {
  487. t := s.levelTargets()
  488. addPriority := func(level int, score float64) {
  489. pri := compactionPriority{
  490. level: level,
  491. score: score,
  492. adjusted: score,
  493. t: t,
  494. }
  495. prios = append(prios, pri)
  496. }
  497. // Grow buffer to fit all levels.
  498. if cap(priosBuffer) < len(s.levels) {
  499. priosBuffer = make([]compactionPriority, 0, len(s.levels))
  500. }
  501. prios = priosBuffer[:0]
  502. // Add L0 priority based on the number of tables.
  503. addPriority(0, float64(s.levels[0].numTables())/float64(s.kv.opt.NumLevelZeroTables))
  504. // All other levels use size to calculate priority.
  505. for i := 1; i < len(s.levels); i++ {
  506. // Don't consider those tables that are already being compacted right now.
  507. delSize := s.cstatus.delSize(i)
  508. l := s.levels[i]
  509. sz := l.getTotalSize() - delSize
  510. addPriority(i, float64(sz)/float64(t.targetSz[i]))
  511. }
  512. y.AssertTrue(len(prios) == len(s.levels))
  513. // The following code is borrowed from PebbleDB and results in healthier LSM tree structure.
  514. // If Li-1 has score > 1.0, then we'll divide Li-1 score by Li. If Li score is >= 1.0, then Li-1
  515. // score is reduced, which means we'll prioritize the compaction of lower levels (L5, L4 and so
  516. // on) over the higher levels (L0, L1 and so on). On the other hand, if Li score is < 1.0, then
  517. // we'll increase the priority of Li-1.
  518. // Overall what this means is, if the bottom level is already overflowing, then de-prioritize
  519. // compaction of the above level. If the bottom level is not full, then increase the priority of
  520. // above level.
  521. var prevLevel int
  522. for level := t.baseLevel; level < len(s.levels); level++ {
  523. if prios[prevLevel].adjusted >= 1 {
  524. // Avoid absurdly large scores by placing a floor on the score that we'll
  525. // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily
  526. const minScore = 0.01
  527. if prios[level].score >= minScore {
  528. prios[prevLevel].adjusted /= prios[level].adjusted
  529. } else {
  530. prios[prevLevel].adjusted /= minScore
  531. }
  532. }
  533. prevLevel = level
  534. }
  535. // Pick all the levels whose original score is >= 1.0, irrespective of their adjusted score.
  536. // We'll still sort them by their adjusted score below. Having both these scores allows us to
  537. // make better decisions about compacting L0. If we see a score >= 1.0, we can do L0->L0
  538. // compactions. If the adjusted score >= 1.0, then we can do L0->Lbase compactions.
  539. out := prios[:0]
  540. for _, p := range prios[:len(prios)-1] {
  541. if p.score >= 1.0 {
  542. out = append(out, p)
  543. }
  544. }
  545. prios = out
  546. // Sort by the adjusted score.
  547. sort.Slice(prios, func(i, j int) bool {
  548. return prios[i].adjusted > prios[j].adjusted
  549. })
  550. return prios
  551. }
  552. // checkOverlap checks if the given tables overlap with any level from the given "lev" onwards.
  553. func (s *levelsController) checkOverlap(tables []*table.Table, lev int) bool {
  554. kr := getKeyRange(tables...)
  555. for i, lh := range s.levels {
  556. if i < lev { // Skip upper levels.
  557. continue
  558. }
  559. lh.RLock()
  560. left, right := lh.overlappingTables(levelHandlerRLocked{}, kr)
  561. lh.RUnlock()
  562. if right-left > 0 {
  563. return true
  564. }
  565. }
  566. return false
  567. }
  568. // subcompact runs a single sub-compaction, iterating over the specified key-range only.
  569. //
  570. // We use splits to do a single compaction concurrently. If we have >= 3 tables
  571. // involved in the bottom level during compaction, we choose key ranges to
  572. // split the main compaction up into sub-compactions. Each sub-compaction runs
  573. // concurrently, only iterating over the provided key range, generating tables.
  574. // This speeds up the compaction significantly.
  575. func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
  576. inflightBuilders *y.Throttle, res chan<- *table.Table) {
  577. // Check overlap of the top level with the levels which are not being
  578. // compacted in this compaction.
  579. hasOverlap := s.checkOverlap(cd.allTables(), cd.nextLevel.level+1)
  580. // Pick a discard ts, so we can discard versions below this ts. We should
  581. // never discard any versions starting from above this timestamp, because
  582. // that would affect the snapshot view guarantee provided by transactions.
  583. discardTs := s.kv.orc.discardAtOrBelow()
  584. // Try to collect stats so that we can inform value log about GC. That would help us find which
  585. // value log file should be GCed.
  586. discardStats := make(map[uint32]int64)
  587. updateStats := func(vs y.ValueStruct) {
  588. // We don't need to store/update discard stats when badger is running in Disk-less mode.
  589. if s.kv.opt.InMemory {
  590. return
  591. }
  592. if vs.Meta&bitValuePointer > 0 {
  593. var vp valuePointer
  594. vp.Decode(vs.Value)
  595. discardStats[vp.Fid] += int64(vp.Len)
  596. }
  597. }
  598. // exceedsAllowedOverlap returns true if the given key range would overlap with more than 10
  599. // tables from level below nextLevel (nextLevel+1). This helps avoid generating tables at Li
  600. // with huge overlaps with Li+1.
  601. exceedsAllowedOverlap := func(kr keyRange) bool {
  602. n2n := cd.nextLevel.level + 1
  603. if n2n <= 1 || n2n >= len(s.levels) {
  604. return false
  605. }
  606. n2nl := s.levels[n2n]
  607. n2nl.RLock()
  608. defer n2nl.RUnlock()
  609. l, r := n2nl.overlappingTables(levelHandlerRLocked{}, kr)
  610. return r-l >= 10
  611. }
  612. var (
  613. lastKey, skipKey []byte
  614. numBuilds, numVersions int
  615. // Denotes if the first key is a series of duplicate keys had
  616. // "DiscardEarlierVersions" set
  617. firstKeyHasDiscardSet bool
  618. )
  619. addKeys := func(builder *table.Builder) {
  620. timeStart := time.Now()
  621. var numKeys, numSkips uint64
  622. var rangeCheck int
  623. var tableKr keyRange
  624. for ; it.Valid(); it.Next() {
  625. // See if we need to skip the prefix.
  626. if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
  627. numSkips++
  628. updateStats(it.Value())
  629. continue
  630. }
  631. // See if we need to skip this key.
  632. if len(skipKey) > 0 {
  633. if y.SameKey(it.Key(), skipKey) {
  634. numSkips++
  635. updateStats(it.Value())
  636. continue
  637. } else {
  638. skipKey = skipKey[:0]
  639. }
  640. }
  641. if !y.SameKey(it.Key(), lastKey) {
  642. firstKeyHasDiscardSet = false
  643. if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
  644. break
  645. }
  646. if builder.ReachedCapacity() {
  647. // Only break if we are on a different key, and have reached capacity. We want
  648. // to ensure that all versions of the key are stored in the same sstable, and
  649. // not divided across multiple tables at the same level.
  650. break
  651. }
  652. lastKey = y.SafeCopy(lastKey, it.Key())
  653. numVersions = 0
  654. firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
  655. if len(tableKr.left) == 0 {
  656. tableKr.left = y.SafeCopy(tableKr.left, it.Key())
  657. }
  658. tableKr.right = lastKey
  659. rangeCheck++
  660. if rangeCheck%5000 == 0 {
  661. // This table's range exceeds the allowed range overlap with the level after
  662. // next. So, we stop writing to this table. If we don't do this, then we end up
  663. // doing very expensive compactions involving too many tables. To amortize the
  664. // cost of this check, we do it only every N keys.
  665. if exceedsAllowedOverlap(tableKr) {
  666. // s.kv.opt.Debugf("L%d -> L%d Breaking due to exceedsAllowedOverlap with
  667. // kr: %s\n", cd.thisLevel.level, cd.nextLevel.level, tableKr)
  668. break
  669. }
  670. }
  671. }
  672. vs := it.Value()
  673. version := y.ParseTs(it.Key())
  674. isExpired := isDeletedOrExpired(vs.Meta, vs.ExpiresAt)
  675. // Do not discard entries inserted by merge operator. These entries will be
  676. // discarded once they're merged
  677. if version <= discardTs && vs.Meta&bitMergeEntry == 0 {
  678. // Keep track of the number of versions encountered for this key. Only consider the
  679. // versions which are below the minReadTs, otherwise, we might end up discarding the
  680. // only valid version for a running transaction.
  681. numVersions++
  682. // Keep the current version and discard all the next versions if
  683. // - The `discardEarlierVersions` bit is set OR
  684. // - We've already processed `NumVersionsToKeep` number of versions
  685. // (including the current item being processed)
  686. lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
  687. numVersions == s.kv.opt.NumVersionsToKeep
  688. if isExpired || lastValidVersion {
  689. // If this version of the key is deleted or expired, skip all the rest of the
  690. // versions. Ensure that we're only removing versions below readTs.
  691. skipKey = y.SafeCopy(skipKey, it.Key())
  692. switch {
  693. // Add the key to the table only if it has not expired.
  694. // We don't want to add the deleted/expired keys.
  695. case !isExpired && lastValidVersion:
  696. // Add this key. We have set skipKey, so the following key versions
  697. // would be skipped.
  698. case hasOverlap:
  699. // If this key range has overlap with lower levels, then keep the deletion
  700. // marker with the latest version, discarding the rest. We have set skipKey,
  701. // so the following key versions would be skipped.
  702. default:
  703. // If no overlap, we can skip all the versions, by continuing here.
  704. numSkips++
  705. updateStats(vs)
  706. continue // Skip adding this key.
  707. }
  708. }
  709. }
  710. numKeys++
  711. var vp valuePointer
  712. if vs.Meta&bitValuePointer > 0 {
  713. vp.Decode(vs.Value)
  714. }
  715. switch {
  716. case firstKeyHasDiscardSet:
  717. // This key is same as the last key which had "DiscardEarlierVersions" set. The
  718. // the next compactions will drop this key if its ts >
  719. // discardTs (of the next compaction).
  720. builder.AddStaleKey(it.Key(), vs, vp.Len)
  721. case isExpired:
  722. // If the key is expired, the next compaction will drop it if
  723. // its ts > discardTs (of the next compaction).
  724. builder.AddStaleKey(it.Key(), vs, vp.Len)
  725. default:
  726. builder.Add(it.Key(), vs, vp.Len)
  727. }
  728. }
  729. s.kv.opt.Debugf("[%d] LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
  730. cd.compactorId, numKeys, numSkips, time.Since(timeStart).Round(time.Millisecond))
  731. } // End of function: addKeys
  732. if len(kr.left) > 0 {
  733. it.Seek(kr.left)
  734. } else {
  735. it.Rewind()
  736. }
  737. for it.Valid() {
  738. if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
  739. break
  740. }
  741. bopts := buildTableOptions(s.kv)
  742. // Set TableSize to the target file size for that level.
  743. bopts.TableSize = uint64(cd.t.fileSz[cd.nextLevel.level])
  744. builder := table.NewTableBuilder(bopts)
  745. // This would do the iteration and add keys to builder.
  746. addKeys(builder)
  747. // It was true that it.Valid() at least once in the loop above, which means we
  748. // called Add() at least once, and builder is not Empty().
  749. if builder.Empty() {
  750. // Cleanup builder resources:
  751. builder.Finish()
  752. builder.Close()
  753. continue
  754. }
  755. numBuilds++
  756. if err := inflightBuilders.Do(); err != nil {
  757. // Can't return from here, until I decrRef all the tables that I built so far.
  758. break
  759. }
  760. go func(builder *table.Builder, fileID uint64) {
  761. var err error
  762. defer inflightBuilders.Done(err)
  763. defer builder.Close()
  764. var tbl *table.Table
  765. if s.kv.opt.InMemory {
  766. tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
  767. } else {
  768. fname := table.NewFilename(fileID, s.kv.opt.Dir)
  769. tbl, err = table.CreateTable(fname, builder)
  770. }
  771. // If we couldn't build the table, return fast.
  772. if err != nil {
  773. return
  774. }
  775. res <- tbl
  776. }(builder, s.reserveFileID())
  777. }
  778. s.kv.vlog.updateDiscardStats(discardStats)
  779. s.kv.opt.Debugf("Discard stats: %v", discardStats)
  780. }
  781. // compactBuildTables merges topTables and botTables to form a list of new tables.
  782. func (s *levelsController) compactBuildTables(
  783. lev int, cd compactDef) ([]*table.Table, func() error, error) {
  784. topTables := cd.top
  785. botTables := cd.bot
  786. numTables := int64(len(topTables) + len(botTables))
  787. y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, numTables)
  788. defer y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, -numTables)
  789. keepTable := func(t *table.Table) bool {
  790. for _, prefix := range cd.dropPrefixes {
  791. if bytes.HasPrefix(t.Smallest(), prefix) &&
  792. bytes.HasPrefix(t.Biggest(), prefix) {
  793. // All the keys in this table have the dropPrefix. So, this
  794. // table does not need to be in the iterator and can be
  795. // dropped immediately.
  796. return false
  797. }
  798. }
  799. return true
  800. }
  801. var valid []*table.Table
  802. for _, table := range botTables {
  803. if keepTable(table) {
  804. valid = append(valid, table)
  805. }
  806. }
  807. newIterator := func() []y.Iterator {
  808. // Create iterators across all the tables involved first.
  809. var iters []y.Iterator
  810. switch {
  811. case lev == 0:
  812. iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
  813. case len(topTables) > 0:
  814. y.AssertTrue(len(topTables) == 1)
  815. iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
  816. }
  817. // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
  818. return append(iters, table.NewConcatIterator(valid, table.NOCACHE))
  819. }
  820. res := make(chan *table.Table, 3)
  821. inflightBuilders := y.NewThrottle(8 + len(cd.splits))
  822. for _, kr := range cd.splits {
  823. // Initiate Do here so we can register the goroutines for buildTables too.
  824. if err := inflightBuilders.Do(); err != nil {
  825. s.kv.opt.Errorf("cannot start subcompaction: %+v", err)
  826. return nil, nil, err
  827. }
  828. go func(kr keyRange) {
  829. defer inflightBuilders.Done(nil)
  830. it := table.NewMergeIterator(newIterator(), false)
  831. defer it.Close()
  832. s.subcompact(it, kr, cd, inflightBuilders, res)
  833. }(kr)
  834. }
  835. var newTables []*table.Table
  836. var wg sync.WaitGroup
  837. wg.Add(1)
  838. go func() {
  839. defer wg.Done()
  840. for t := range res {
  841. newTables = append(newTables, t)
  842. }
  843. }()
  844. // Wait for all table builders to finish and also for newTables accumulator to finish.
  845. err := inflightBuilders.Finish()
  846. close(res)
  847. wg.Wait() // Wait for all tables to be picked up.
  848. if err == nil {
  849. // Ensure created files' directory entries are visible. We don't mind the extra latency
  850. // from not doing this ASAP after all file creation has finished because this is a
  851. // background operation.
  852. err = s.kv.syncDir(s.kv.opt.Dir)
  853. }
  854. if err != nil {
  855. // An error happened. Delete all the newly created table files (by calling DecrRef
  856. // -- we're the only holders of a ref).
  857. _ = decrRefs(newTables)
  858. return nil, nil, y.Wrapf(err, "while running compactions for: %+v", cd)
  859. }
  860. sort.Slice(newTables, func(i, j int) bool {
  861. return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
  862. })
  863. return newTables, func() error { return decrRefs(newTables) }, nil
  864. }
  865. func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeSet {
  866. changes := []*pb.ManifestChange{}
  867. for _, table := range newTables {
  868. changes = append(changes,
  869. newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType()))
  870. }
  871. for _, table := range cd.top {
  872. // Add a delete change only if the table is not in memory.
  873. if !table.IsInmemory {
  874. changes = append(changes, newDeleteChange(table.ID()))
  875. }
  876. }
  877. for _, table := range cd.bot {
  878. changes = append(changes, newDeleteChange(table.ID()))
  879. }
  880. return pb.ManifestChangeSet{Changes: changes}
  881. }
  882. func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
  883. for _, prefix := range listOfPrefixes {
  884. if bytes.HasPrefix(s, prefix) {
  885. return true
  886. }
  887. }
  888. return false
  889. }
  890. func containsPrefix(table *table.Table, prefix []byte) bool {
  891. smallValue := table.Smallest()
  892. largeValue := table.Biggest()
  893. if bytes.HasPrefix(smallValue, prefix) {
  894. return true
  895. }
  896. if bytes.HasPrefix(largeValue, prefix) {
  897. return true
  898. }
  899. isPresent := func() bool {
  900. ti := table.NewIterator(0)
  901. defer ti.Close()
  902. // In table iterator's Seek, we assume that key has version in last 8 bytes. We set
  903. // version=0 (ts=math.MaxUint64), so that we don't skip the key prefixed with prefix.
  904. ti.Seek(y.KeyWithTs(prefix, math.MaxUint64))
  905. return bytes.HasPrefix(ti.Key(), prefix)
  906. }
  907. if bytes.Compare(prefix, smallValue) > 0 &&
  908. bytes.Compare(prefix, largeValue) < 0 {
  909. // There may be a case when table contains [0x0000,...., 0xffff]. If we are searching for
  910. // k=0x0011, we should not directly infer that k is present. It may not be present.
  911. return isPresent()
  912. }
  913. return false
  914. }
  915. func containsAnyPrefixes(table *table.Table, listOfPrefixes [][]byte) bool {
  916. for _, prefix := range listOfPrefixes {
  917. if containsPrefix(table, prefix) {
  918. return true
  919. }
  920. }
  921. return false
  922. }
  923. type compactDef struct {
  924. compactorId int
  925. t targets
  926. p compactionPriority
  927. thisLevel *levelHandler
  928. nextLevel *levelHandler
  929. top []*table.Table
  930. bot []*table.Table
  931. thisRange keyRange
  932. nextRange keyRange
  933. splits []keyRange
  934. thisSize int64
  935. dropPrefixes [][]byte
  936. }
  937. // addSplits can allow us to run multiple sub-compactions in parallel across the split key ranges.
  938. func (s *levelsController) addSplits(cd *compactDef) {
  939. cd.splits = cd.splits[:0]
  940. // Let's say we have 10 tables in cd.bot and min width = 3. Then, we'll pick
  941. // 0, 1, 2 (pick), 3, 4, 5 (pick), 6, 7, 8 (pick), 9 (pick, because last table).
  942. // This gives us 4 picks for 10 tables.
  943. // In an edge case, 142 tables in bottom led to 48 splits. That's too many splits, because it
  944. // then uses up a lot of memory for table builder.
  945. // We should keep it so we have at max 5 splits.
  946. width := int(math.Ceil(float64(len(cd.bot)) / 5.0))
  947. if width < 3 {
  948. width = 3
  949. }
  950. skr := cd.thisRange
  951. skr.extend(cd.nextRange)
  952. addRange := func(right []byte) {
  953. skr.right = y.Copy(right)
  954. cd.splits = append(cd.splits, skr)
  955. skr.left = skr.right
  956. }
  957. for i, t := range cd.bot {
  958. // last entry in bottom table.
  959. if i == len(cd.bot)-1 {
  960. addRange([]byte{})
  961. return
  962. }
  963. if i%width == width-1 {
  964. // Right is assigned ts=0. The encoding ts bytes take MaxUint64-ts,
  965. // so, those with smaller TS will be considered larger for the same key.
  966. // Consider the following.
  967. // Top table is [A1...C3(deleted)]
  968. // bot table is [B1....C2]
  969. // It will generate a split [A1 ... C0], including any records of Key C.
  970. right := y.KeyWithTs(y.ParseKey(t.Biggest()), 0)
  971. addRange(right)
  972. }
  973. }
  974. }
  975. func (cd *compactDef) lockLevels() {
  976. cd.thisLevel.RLock()
  977. cd.nextLevel.RLock()
  978. }
  979. func (cd *compactDef) unlockLevels() {
  980. cd.nextLevel.RUnlock()
  981. cd.thisLevel.RUnlock()
  982. }
  983. func (cd *compactDef) allTables() []*table.Table {
  984. ret := make([]*table.Table, 0, len(cd.top)+len(cd.bot))
  985. ret = append(ret, cd.top...)
  986. ret = append(ret, cd.bot...)
  987. return ret
  988. }
  989. func (s *levelsController) fillTablesL0ToL0(cd *compactDef) bool {
  990. if cd.compactorId != 0 {
  991. // Only compactor zero can work on this.
  992. return false
  993. }
  994. cd.nextLevel = s.levels[0]
  995. cd.nextRange = keyRange{}
  996. cd.bot = nil
  997. // Because this level and next level are both level 0, we should NOT acquire
  998. // the read lock twice, because it can result in a deadlock. So, we don't
  999. // call compactDef.lockLevels, instead locking the level only once and
  1000. // directly here.
  1001. //
  1002. // As per godocs on RWMutex:
  1003. // If a goroutine holds a RWMutex for reading and another goroutine might
  1004. // call Lock, no goroutine should expect to be able to acquire a read lock
  1005. // until the initial read lock is released. In particular, this prohibits
  1006. // recursive read locking. This is to ensure that the lock eventually
  1007. // becomes available; a blocked Lock call excludes new readers from
  1008. // acquiring the lock.
  1009. y.AssertTrue(cd.thisLevel.level == 0)
  1010. y.AssertTrue(cd.nextLevel.level == 0)
  1011. s.levels[0].RLock()
  1012. defer s.levels[0].RUnlock()
  1013. s.cstatus.Lock()
  1014. defer s.cstatus.Unlock()
  1015. top := cd.thisLevel.tables
  1016. var out []*table.Table
  1017. now := time.Now()
  1018. for _, t := range top {
  1019. if t.Size() >= 2*cd.t.fileSz[0] {
  1020. // This file is already big, don't include it.
  1021. continue
  1022. }
  1023. if now.Sub(t.CreatedAt) < 10*time.Second {
  1024. // Just created it 10s ago. Don't pick for compaction.
  1025. continue
  1026. }
  1027. if _, beingCompacted := s.cstatus.tables[t.ID()]; beingCompacted {
  1028. continue
  1029. }
  1030. out = append(out, t)
  1031. }
  1032. if len(out) < 4 {
  1033. // If we don't have enough tables to merge in L0, don't do it.
  1034. return false
  1035. }
  1036. cd.thisRange = infRange
  1037. cd.top = out
  1038. // Avoid any other L0 -> Lbase from happening, while this is going on.
  1039. thisLevel := s.cstatus.levels[cd.thisLevel.level]
  1040. thisLevel.ranges = append(thisLevel.ranges, infRange)
  1041. for _, t := range out {
  1042. s.cstatus.tables[t.ID()] = struct{}{}
  1043. }
  1044. // For L0->L0 compaction, we set the target file size to max, so the output is always one file.
  1045. // This significantly decreases the L0 table stalls and improves the performance.
  1046. cd.t.fileSz[0] = math.MaxUint32
  1047. return true
  1048. }
  1049. func (s *levelsController) fillTablesL0ToLbase(cd *compactDef) bool {
  1050. if cd.nextLevel.level == 0 {
  1051. panic("Base level can't be zero.")
  1052. }
  1053. // We keep cd.p.adjusted > 0.0 here to allow functions in db.go to artificially trigger
  1054. // L0->Lbase compactions. Those functions wouldn't be setting the adjusted score.
  1055. if cd.p.adjusted > 0.0 && cd.p.adjusted < 1.0 {
  1056. // Do not compact to Lbase if adjusted score is less than 1.0.
  1057. return false
  1058. }
  1059. cd.lockLevels()
  1060. defer cd.unlockLevels()
  1061. top := cd.thisLevel.tables
  1062. if len(top) == 0 {
  1063. return false
  1064. }
  1065. var out []*table.Table
  1066. if len(cd.dropPrefixes) > 0 {
  1067. // Use all tables if drop prefix is set. We don't want to compact only a
  1068. // sub-range. We want to compact all the tables.
  1069. out = top
  1070. } else {
  1071. var kr keyRange
  1072. // cd.top[0] is the oldest file. So we start from the oldest file first.
  1073. for _, t := range top {
  1074. dkr := getKeyRange(t)
  1075. if kr.overlapsWith(dkr) {
  1076. out = append(out, t)
  1077. kr.extend(dkr)
  1078. } else {
  1079. break
  1080. }
  1081. }
  1082. }
  1083. cd.thisRange = getKeyRange(out...)
  1084. cd.top = out
  1085. left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
  1086. cd.bot = make([]*table.Table, right-left)
  1087. copy(cd.bot, cd.nextLevel.tables[left:right])
  1088. if len(cd.bot) == 0 {
  1089. cd.nextRange = cd.thisRange
  1090. } else {
  1091. cd.nextRange = getKeyRange(cd.bot...)
  1092. }
  1093. return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
  1094. }
  1095. // fillTablesL0 would try to fill tables from L0 to be compacted with Lbase. If
  1096. // it can not do that, it would try to compact tables from L0 -> L0.
  1097. //
  1098. // Say L0 has 10 tables.
  1099. // fillTablesL0ToLbase picks up 5 tables to compact from L0 -> L5.
  1100. // Next call to fillTablesL0 would run L0ToLbase again, which fails this time.
  1101. // So, instead, we run fillTablesL0ToL0, which picks up rest of the 5 tables to
  1102. // be compacted within L0. Additionally, it would set the compaction range in
  1103. // cstatus to inf, so no other L0 -> Lbase compactions can happen.
  1104. // Thus, L0 -> L0 must finish for the next L0 -> Lbase to begin.
  1105. func (s *levelsController) fillTablesL0(cd *compactDef) bool {
  1106. if ok := s.fillTablesL0ToLbase(cd); ok {
  1107. return true
  1108. }
  1109. return s.fillTablesL0ToL0(cd)
  1110. }
  1111. // sortByStaleData sorts tables based on the amount of stale data they have.
  1112. // This is useful in removing tombstones.
  1113. func (s *levelsController) sortByStaleDataSize(tables []*table.Table, cd *compactDef) {
  1114. if len(tables) == 0 || cd.nextLevel == nil {
  1115. return
  1116. }
  1117. sort.Slice(tables, func(i, j int) bool {
  1118. return tables[i].StaleDataSize() > tables[j].StaleDataSize()
  1119. })
  1120. }
  1121. // sortByHeuristic sorts tables in increasing order of MaxVersion, so we
  1122. // compact older tables first.
  1123. func (s *levelsController) sortByHeuristic(tables []*table.Table, cd *compactDef) {
  1124. if len(tables) == 0 || cd.nextLevel == nil {
  1125. return
  1126. }
  1127. // Sort tables by max version. This is what RocksDB does.
  1128. sort.Slice(tables, func(i, j int) bool {
  1129. return tables[i].MaxVersion() < tables[j].MaxVersion()
  1130. })
  1131. }
  1132. // This function should be called with lock on levels.
  1133. func (s *levelsController) fillMaxLevelTables(tables []*table.Table, cd *compactDef) bool {
  1134. sortedTables := make([]*table.Table, len(tables))
  1135. copy(sortedTables, tables)
  1136. s.sortByStaleDataSize(sortedTables, cd)
  1137. if len(sortedTables) > 0 && sortedTables[0].StaleDataSize() == 0 {
  1138. // This is a maxLevel to maxLevel compaction and we don't have any stale data.
  1139. return false
  1140. }
  1141. cd.bot = []*table.Table{}
  1142. collectBotTables := func(t *table.Table, needSz int64) {
  1143. totalSize := t.Size()
  1144. j := sort.Search(len(tables), func(i int) bool {
  1145. return y.CompareKeys(tables[i].Smallest(), t.Smallest()) >= 0
  1146. })
  1147. y.AssertTrue(tables[j].ID() == t.ID())
  1148. j++
  1149. // Collect tables until we reach the the required size.
  1150. for j < len(tables) {
  1151. newT := tables[j]
  1152. totalSize += newT.Size()
  1153. if totalSize >= needSz {
  1154. break
  1155. }
  1156. cd.bot = append(cd.bot, newT)
  1157. cd.nextRange.extend(getKeyRange(newT))
  1158. j++
  1159. }
  1160. }
  1161. now := time.Now()
  1162. for _, t := range sortedTables {
  1163. // If the maxVersion is above the discardTs, we won't clean anything in
  1164. // the compaction. So skip this table.
  1165. if t.MaxVersion() > s.kv.orc.discardAtOrBelow() {
  1166. continue
  1167. }
  1168. if now.Sub(t.CreatedAt) < time.Hour {
  1169. // Just created it an hour ago. Don't pick for compaction.
  1170. continue
  1171. }
  1172. // If the stale data size is less than 10 MB, it might not be worth
  1173. // rewriting the table. Skip it.
  1174. if t.StaleDataSize() < 10<<20 {
  1175. continue
  1176. }
  1177. cd.thisSize = t.Size()
  1178. cd.thisRange = getKeyRange(t)
  1179. // Set the next range as the same as the current range. If we don't do
  1180. // this, we won't be able to run more than one max level compactions.
  1181. cd.nextRange = cd.thisRange
  1182. // If we're already compacting this range, don't do anything.
  1183. if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
  1184. continue
  1185. }
  1186. // Found a valid table!
  1187. cd.top = []*table.Table{t}
  1188. needFileSz := cd.t.fileSz[cd.thisLevel.level]
  1189. // The table size is what we want so no need to collect more tables.
  1190. if t.Size() >= needFileSz {
  1191. break
  1192. }
  1193. // TableSize is less than what we want. Collect more tables for compaction.
  1194. // If the level has multiple small tables, we collect all of them
  1195. // together to form a bigger table.
  1196. collectBotTables(t, needFileSz)
  1197. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  1198. cd.bot = cd.bot[:0]
  1199. cd.nextRange = keyRange{}
  1200. continue
  1201. }
  1202. return true
  1203. }
  1204. if len(cd.top) == 0 {
  1205. return false
  1206. }
  1207. return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
  1208. }
  1209. func (s *levelsController) fillTables(cd *compactDef) bool {
  1210. cd.lockLevels()
  1211. defer cd.unlockLevels()
  1212. tables := make([]*table.Table, len(cd.thisLevel.tables))
  1213. copy(tables, cd.thisLevel.tables)
  1214. if len(tables) == 0 {
  1215. return false
  1216. }
  1217. // We're doing a maxLevel to maxLevel compaction. Pick tables based on the stale data size.
  1218. if cd.thisLevel.isLastLevel() {
  1219. return s.fillMaxLevelTables(tables, cd)
  1220. }
  1221. // We pick tables, so we compact older tables first. This is similar to
  1222. // kOldestLargestSeqFirst in RocksDB.
  1223. s.sortByHeuristic(tables, cd)
  1224. for _, t := range tables {
  1225. cd.thisSize = t.Size()
  1226. cd.thisRange = getKeyRange(t)
  1227. // If we're already compacting this range, don't do anything.
  1228. if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
  1229. continue
  1230. }
  1231. cd.top = []*table.Table{t}
  1232. left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
  1233. cd.bot = make([]*table.Table, right-left)
  1234. copy(cd.bot, cd.nextLevel.tables[left:right])
  1235. if len(cd.bot) == 0 {
  1236. cd.bot = []*table.Table{}
  1237. cd.nextRange = cd.thisRange
  1238. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  1239. continue
  1240. }
  1241. return true
  1242. }
  1243. cd.nextRange = getKeyRange(cd.bot...)
  1244. if s.cstatus.overlapsWith(cd.nextLevel.level, cd.nextRange) {
  1245. continue
  1246. }
  1247. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  1248. continue
  1249. }
  1250. return true
  1251. }
  1252. return false
  1253. }
  1254. func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) {
  1255. if len(cd.t.fileSz) == 0 {
  1256. return errors.New("Filesizes cannot be zero. Targets are not set")
  1257. }
  1258. timeStart := time.Now()
  1259. thisLevel := cd.thisLevel
  1260. nextLevel := cd.nextLevel
  1261. y.AssertTrue(len(cd.splits) == 0)
  1262. if thisLevel.level == nextLevel.level {
  1263. // don't do anything for L0 -> L0 and Lmax -> Lmax.
  1264. } else {
  1265. s.addSplits(&cd)
  1266. }
  1267. if len(cd.splits) == 0 {
  1268. cd.splits = append(cd.splits, keyRange{})
  1269. }
  1270. // Table should never be moved directly between levels,
  1271. // always be rewritten to allow discarding invalid versions.
  1272. newTables, decr, err := s.compactBuildTables(l, cd)
  1273. if err != nil {
  1274. return err
  1275. }
  1276. defer func() {
  1277. // Only assign to err, if it's not already nil.
  1278. if decErr := decr(); err == nil {
  1279. err = decErr
  1280. }
  1281. }()
  1282. changeSet := buildChangeSet(&cd, newTables)
  1283. // We write to the manifest _before_ we delete files (and after we created files)
  1284. if err := s.kv.manifest.addChanges(changeSet.Changes, s.kv.opt); err != nil {
  1285. return err
  1286. }
  1287. getSizes := func(tables []*table.Table) int64 {
  1288. size := int64(0)
  1289. for _, i := range tables {
  1290. size += i.Size()
  1291. }
  1292. return size
  1293. }
  1294. sizeNewTables := int64(0)
  1295. sizeOldTables := int64(0)
  1296. if s.kv.opt.MetricsEnabled {
  1297. sizeNewTables = getSizes(newTables)
  1298. sizeOldTables = getSizes(cd.bot) + getSizes(cd.top)
  1299. y.NumBytesCompactionWrittenAdd(s.kv.opt.MetricsEnabled, nextLevel.strLevel, sizeNewTables)
  1300. }
  1301. // See comment earlier in this function about the ordering of these ops, and the order in which
  1302. // we access levels when reading.
  1303. if err := nextLevel.replaceTables(cd.bot, newTables); err != nil {
  1304. return err
  1305. }
  1306. if err := thisLevel.deleteTables(cd.top); err != nil {
  1307. return err
  1308. }
  1309. // Note: For level 0, while doCompact is running, it is possible that new tables are added.
  1310. // However, the tables are added only to the end, so it is ok to just delete the first table.
  1311. from := append(tablesToString(cd.top), tablesToString(cd.bot)...)
  1312. to := tablesToString(newTables)
  1313. if dur := time.Since(timeStart); dur > 2*time.Second {
  1314. var expensive string
  1315. if dur > time.Second {
  1316. expensive = " [E]"
  1317. }
  1318. s.kv.opt.Infof("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)."+
  1319. " [%s] -> [%s], took %v\n, deleted %d bytes",
  1320. id, expensive, thisLevel.level, nextLevel.level, len(cd.top), len(cd.bot),
  1321. len(newTables), len(cd.splits), strings.Join(from, " "), strings.Join(to, " "),
  1322. dur.Round(time.Millisecond), sizeOldTables-sizeNewTables)
  1323. }
  1324. if cd.thisLevel.level != 0 && len(newTables) > 2*s.kv.opt.LevelSizeMultiplier {
  1325. s.kv.opt.Infof("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
  1326. len(cd.top), hex.Dump(cd.thisRange.left), hex.Dump(cd.thisRange.right))
  1327. s.kv.opt.Infof("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
  1328. len(cd.bot), hex.Dump(cd.nextRange.left), hex.Dump(cd.nextRange.right))
  1329. }
  1330. return nil
  1331. }
  1332. func tablesToString(tables []*table.Table) []string {
  1333. var res []string
  1334. for _, t := range tables {
  1335. res = append(res, fmt.Sprintf("%05d", t.ID()))
  1336. }
  1337. res = append(res, ".")
  1338. return res
  1339. }
  1340. var errFillTables = errors.New("Unable to fill tables")
  1341. // doCompact picks some table on level l and compacts it away to the next level.
  1342. func (s *levelsController) doCompact(id int, p compactionPriority) error {
  1343. l := p.level
  1344. y.AssertTrue(l < s.kv.opt.MaxLevels) // Sanity check.
  1345. if p.t.baseLevel == 0 {
  1346. p.t = s.levelTargets()
  1347. }
  1348. _, span := otel.Tracer("").Start(context.TODO(), "Badger.Compaction")
  1349. defer span.End()
  1350. cd := compactDef{
  1351. compactorId: id,
  1352. p: p,
  1353. t: p.t,
  1354. thisLevel: s.levels[l],
  1355. dropPrefixes: p.dropPrefixes,
  1356. }
  1357. // While picking tables to be compacted, both levels' tables are expected to
  1358. // remain unchanged.
  1359. if l == 0 {
  1360. cd.nextLevel = s.levels[p.t.baseLevel]
  1361. if !s.fillTablesL0(&cd) {
  1362. return errFillTables
  1363. }
  1364. } else {
  1365. cd.nextLevel = cd.thisLevel
  1366. // We're not compacting the last level so pick the next level.
  1367. if !cd.thisLevel.isLastLevel() {
  1368. cd.nextLevel = s.levels[l+1]
  1369. }
  1370. if !s.fillTables(&cd) {
  1371. return errFillTables
  1372. }
  1373. }
  1374. defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
  1375. span.SetAttributes(attribute.String("Compaction", fmt.Sprintf("%+v", cd)))
  1376. if err := s.runCompactDef(id, l, cd); err != nil {
  1377. // This compaction couldn't be done successfully.
  1378. s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
  1379. return err
  1380. }
  1381. span.SetAttributes(
  1382. attribute.Int("Top tables count", len(cd.top)),
  1383. attribute.Int("Bottom tables count", len(cd.bot)))
  1384. s.kv.opt.Debugf("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
  1385. return nil
  1386. }
  1387. func (s *levelsController) addLevel0Table(t *table.Table) error {
  1388. // Add table to manifest file only if it is not opened in memory. We don't want to add a table
  1389. // to the manifest file if it exists only in memory.
  1390. if !t.IsInmemory {
  1391. // We update the manifest _before_ the table becomes part of a levelHandler, because at that
  1392. // point it could get used in some compaction. This ensures the manifest file gets updated in
  1393. // the proper order. (That means this update happens before that of some compaction which
  1394. // deletes the table.)
  1395. err := s.kv.manifest.addChanges([]*pb.ManifestChange{
  1396. newCreateChange(t.ID(), 0, t.KeyID(), t.CompressionType()),
  1397. }, s.kv.opt)
  1398. if err != nil {
  1399. return err
  1400. }
  1401. }
  1402. for !s.levels[0].tryAddLevel0Table(t) {
  1403. // Before we unstall, we need to make sure that level 0 is healthy.
  1404. timeStart := time.Now()
  1405. for s.levels[0].numTables() >= s.kv.opt.NumLevelZeroTablesStall {
  1406. time.Sleep(10 * time.Millisecond)
  1407. }
  1408. dur := time.Since(timeStart)
  1409. if dur > time.Second {
  1410. s.kv.opt.Infof("L0 was stalled for %s\n", dur.Round(time.Millisecond))
  1411. }
  1412. s.l0stallsMs.Add(int64(dur.Round(time.Millisecond)))
  1413. }
  1414. return nil
  1415. }
  1416. func (s *levelsController) close() error {
  1417. err := s.cleanupLevels()
  1418. return y.Wrap(err, "levelsController.Close")
  1419. }
  1420. // get searches for a given key in all the levels of the LSM tree. It returns
  1421. // key version <= the expected version (version in key). If not found,
  1422. // it returns an empty y.ValueStruct.
  1423. func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) (
  1424. y.ValueStruct, error) {
  1425. if s.kv.IsClosed() {
  1426. return y.ValueStruct{}, ErrDBClosed
  1427. }
  1428. // It's important that we iterate the levels from 0 on upward. The reason is, if we iterated
  1429. // in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could
  1430. // read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do
  1431. // parallelize this, we will need to call the h.RLock() function by increasing order of level
  1432. // number.)
  1433. version := y.ParseTs(key)
  1434. for _, h := range s.levels {
  1435. // Ignore all levels below startLevel. This is useful for GC when L0 is kept in memory.
  1436. if h.level < startLevel {
  1437. continue
  1438. }
  1439. vs, err := h.get(key) // Calls h.RLock() and h.RUnlock().
  1440. if err != nil {
  1441. return y.ValueStruct{}, y.Wrapf(err, "get key: %q", key)
  1442. }
  1443. if vs.Value == nil && vs.Meta == 0 {
  1444. continue
  1445. }
  1446. y.NumBytesReadsLSMAdd(s.kv.opt.MetricsEnabled, int64(len(vs.Value)))
  1447. if vs.Version == version {
  1448. return vs, nil
  1449. }
  1450. if maxVs.Version < vs.Version {
  1451. maxVs = vs
  1452. }
  1453. }
  1454. if len(maxVs.Value) > 0 {
  1455. y.NumGetsWithResultsAdd(s.kv.opt.MetricsEnabled, 1)
  1456. }
  1457. return maxVs, nil
  1458. }
  1459. func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
  1460. for i := len(th) - 1; i >= 0; i-- {
  1461. // This will increment the reference of the table handler.
  1462. out = append(out, th[i].NewIterator(opt))
  1463. }
  1464. return out
  1465. }
  1466. // appendIterators appends iterators to an array of iterators, for merging.
  1467. // Note: This obtains references for the table handlers. Remember to close these iterators.
  1468. func (s *levelsController) appendIterators(
  1469. iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
  1470. // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
  1471. // data when there's a compaction.
  1472. for _, level := range s.levels {
  1473. iters = level.appendIterators(iters, opt)
  1474. }
  1475. return iters
  1476. }
  1477. // TableInfo represents the information about a table.
  1478. type TableInfo struct {
  1479. ID uint64
  1480. Level int
  1481. Left []byte
  1482. Right []byte
  1483. KeyCount uint32 // Number of keys in the table
  1484. OnDiskSize uint32
  1485. StaleDataSize uint32
  1486. UncompressedSize uint32
  1487. MaxVersion uint64
  1488. IndexSz int
  1489. BloomFilterSize int
  1490. }
  1491. func (s *levelsController) getTableInfo() (result []TableInfo) {
  1492. for _, l := range s.levels {
  1493. l.RLock()
  1494. for _, t := range l.tables {
  1495. info := TableInfo{
  1496. ID: t.ID(),
  1497. Level: l.level,
  1498. Left: t.Smallest(),
  1499. Right: t.Biggest(),
  1500. KeyCount: t.KeyCount(),
  1501. OnDiskSize: t.OnDiskSize(),
  1502. StaleDataSize: t.StaleDataSize(),
  1503. IndexSz: t.IndexSize(),
  1504. BloomFilterSize: t.BloomFilterSize(),
  1505. UncompressedSize: t.UncompressedSize(),
  1506. MaxVersion: t.MaxVersion(),
  1507. }
  1508. result = append(result, info)
  1509. }
  1510. l.RUnlock()
  1511. }
  1512. sort.Slice(result, func(i, j int) bool {
  1513. if result[i].Level != result[j].Level {
  1514. return result[i].Level < result[j].Level
  1515. }
  1516. return result[i].ID < result[j].ID
  1517. })
  1518. return
  1519. }
  1520. type LevelInfo struct {
  1521. Level int
  1522. NumTables int
  1523. Size int64
  1524. TargetSize int64
  1525. TargetFileSize int64
  1526. IsBaseLevel bool
  1527. Score float64
  1528. Adjusted float64
  1529. StaleDatSize int64
  1530. }
  1531. func (s *levelsController) getLevelInfo() []LevelInfo {
  1532. t := s.levelTargets()
  1533. prios := s.pickCompactLevels(nil)
  1534. result := make([]LevelInfo, len(s.levels))
  1535. for i, l := range s.levels {
  1536. l.RLock()
  1537. result[i].Level = i
  1538. result[i].Size = l.totalSize
  1539. result[i].NumTables = len(l.tables)
  1540. result[i].StaleDatSize = l.totalStaleSize
  1541. l.RUnlock()
  1542. result[i].TargetSize = t.targetSz[i]
  1543. result[i].TargetFileSize = t.fileSz[i]
  1544. result[i].IsBaseLevel = t.baseLevel == i
  1545. }
  1546. for _, p := range prios {
  1547. result[p.level].Score = p.score
  1548. result[p.level].Adjusted = p.adjusted
  1549. }
  1550. return result
  1551. }
  1552. // verifyChecksum verifies checksum for all tables on all levels.
  1553. func (s *levelsController) verifyChecksum() error {
  1554. var tables []*table.Table
  1555. for _, l := range s.levels {
  1556. l.RLock()
  1557. tables = tables[:0]
  1558. for _, t := range l.tables {
  1559. tables = append(tables, t)
  1560. t.IncrRef()
  1561. }
  1562. l.RUnlock()
  1563. for _, t := range tables {
  1564. errChkVerify := t.VerifyChecksum()
  1565. if err := t.DecrRef(); err != nil {
  1566. s.kv.opt.Errorf("unable to decrease reference of table: %s while "+
  1567. "verifying checksum with error: %s", t.Filename(), err)
  1568. }
  1569. if errChkVerify != nil {
  1570. return errChkVerify
  1571. }
  1572. }
  1573. }
  1574. return nil
  1575. }
  1576. // Returns the sorted list of splits for all the levels and tables based
  1577. // on the block offsets.
  1578. func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
  1579. splits := make([]string, 0)
  1580. for _, l := range s.levels {
  1581. l.RLock()
  1582. for _, t := range l.tables {
  1583. tableSplits := t.KeySplits(numPerTable, prefix)
  1584. splits = append(splits, tableSplits...)
  1585. }
  1586. l.RUnlock()
  1587. }
  1588. sort.Strings(splits)
  1589. return splits
  1590. }