levels.go 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bytes"
  19. "context"
  20. "encoding/hex"
  21. stderrors "errors"
  22. "fmt"
  23. "math"
  24. "math/rand"
  25. "os"
  26. "sort"
  27. "strings"
  28. "sync"
  29. "sync/atomic"
  30. "time"
  31. "github.com/pkg/errors"
  32. otrace "go.opencensus.io/trace"
  33. "github.com/dgraph-io/badger/v4/pb"
  34. "github.com/dgraph-io/badger/v4/table"
  35. "github.com/dgraph-io/badger/v4/y"
  36. "github.com/dgraph-io/ristretto/v2/z"
  37. )
  38. type levelsController struct {
  39. nextFileID atomic.Uint64
  40. l0stallsMs atomic.Int64
  41. // The following are initialized once and const.
  42. levels []*levelHandler
  43. kv *DB
  44. cstatus compactStatus
  45. }
  46. // revertToManifest checks that all necessary table files exist and removes all table files not
  47. // referenced by the manifest. idMap is a set of table file id's that were read from the directory
  48. // listing.
  49. func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error {
  50. // 1. Check all files in manifest exist.
  51. for id := range mf.Tables {
  52. if _, ok := idMap[id]; !ok {
  53. return fmt.Errorf("file does not exist for table %d", id)
  54. }
  55. }
  56. // 2. Delete files that shouldn't exist.
  57. for id := range idMap {
  58. if _, ok := mf.Tables[id]; !ok {
  59. kv.opt.Debugf("Table file %d not referenced in MANIFEST\n", id)
  60. filename := table.NewFilename(id, kv.opt.Dir)
  61. if err := os.Remove(filename); err != nil {
  62. return y.Wrapf(err, "While removing table %d", id)
  63. }
  64. }
  65. }
  66. return nil
  67. }
  68. func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
  69. y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
  70. s := &levelsController{
  71. kv: db,
  72. levels: make([]*levelHandler, db.opt.MaxLevels),
  73. }
  74. s.cstatus.tables = make(map[uint64]struct{})
  75. s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
  76. for i := 0; i < db.opt.MaxLevels; i++ {
  77. s.levels[i] = newLevelHandler(db, i)
  78. s.cstatus.levels[i] = new(levelCompactStatus)
  79. }
  80. if db.opt.InMemory {
  81. return s, nil
  82. }
  83. // Compare manifest against directory, check for existent/non-existent files, and remove.
  84. if err := revertToManifest(db, mf, getIDMap(db.opt.Dir)); err != nil {
  85. return nil, err
  86. }
  87. var mu sync.Mutex
  88. tables := make([][]*table.Table, db.opt.MaxLevels)
  89. var maxFileID uint64
  90. // We found that using 3 goroutines allows disk throughput to be utilized to its max.
  91. // Disk utilization is the main thing we should focus on, while trying to read the data. That's
  92. // the one factor that remains constant between HDD and SSD.
  93. throttle := y.NewThrottle(3)
  94. start := time.Now()
  95. var numOpened atomic.Int32
  96. tick := time.NewTicker(3 * time.Second)
  97. defer tick.Stop()
  98. for fileID, tf := range mf.Tables {
  99. fname := table.NewFilename(fileID, db.opt.Dir)
  100. select {
  101. case <-tick.C:
  102. db.opt.Infof("%d tables out of %d opened in %s\n", numOpened.Load(),
  103. len(mf.Tables), time.Since(start).Round(time.Millisecond))
  104. default:
  105. }
  106. if err := throttle.Do(); err != nil {
  107. closeAllTables(tables)
  108. return nil, err
  109. }
  110. if fileID > maxFileID {
  111. maxFileID = fileID
  112. }
  113. go func(fname string, tf TableManifest) {
  114. var rerr error
  115. defer func() {
  116. throttle.Done(rerr)
  117. numOpened.Add(1)
  118. }()
  119. dk, err := db.registry.DataKey(tf.KeyID)
  120. if err != nil {
  121. rerr = y.Wrapf(err, "Error while reading datakey")
  122. return
  123. }
  124. topt := buildTableOptions(db)
  125. // Explicitly set Compression and DataKey based on how the table was generated.
  126. topt.Compression = tf.Compression
  127. topt.DataKey = dk
  128. mf, err := z.OpenMmapFile(fname, db.opt.getFileFlags(), 0)
  129. if err != nil {
  130. rerr = y.Wrapf(err, "Opening file: %q", fname)
  131. return
  132. }
  133. t, err := table.OpenTable(mf, topt)
  134. if err != nil {
  135. if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
  136. db.opt.Errorf(err.Error())
  137. db.opt.Errorf("Ignoring table %s", mf.Fd.Name())
  138. // Do not set rerr. We will continue without this table.
  139. } else {
  140. rerr = y.Wrapf(err, "Opening table: %q", fname)
  141. }
  142. return
  143. }
  144. mu.Lock()
  145. tables[tf.Level] = append(tables[tf.Level], t)
  146. mu.Unlock()
  147. }(fname, tf)
  148. }
  149. if err := throttle.Finish(); err != nil {
  150. closeAllTables(tables)
  151. return nil, err
  152. }
  153. db.opt.Infof("All %d tables opened in %s\n", numOpened.Load(),
  154. time.Since(start).Round(time.Millisecond))
  155. s.nextFileID.Store(maxFileID + 1)
  156. for i, tbls := range tables {
  157. s.levels[i].initTables(tbls)
  158. }
  159. // Make sure key ranges do not overlap etc.
  160. if err := s.validate(); err != nil {
  161. _ = s.cleanupLevels()
  162. return nil, y.Wrap(err, "Level validation")
  163. }
  164. // Sync directory (because we have at least removed some files, or previously created the
  165. // manifest file).
  166. if err := syncDir(db.opt.Dir); err != nil {
  167. _ = s.close()
  168. return nil, err
  169. }
  170. return s, nil
  171. }
  172. // Closes the tables, for cleanup in newLevelsController. (We Close() instead of using DecrRef()
  173. // because that would delete the underlying files.) We ignore errors, which is OK because tables
  174. // are read-only.
  175. func closeAllTables(tables [][]*table.Table) {
  176. for _, tableSlice := range tables {
  177. for _, table := range tableSlice {
  178. _ = table.Close(-1)
  179. }
  180. }
  181. }
  182. func (s *levelsController) cleanupLevels() error {
  183. var firstErr error
  184. for _, l := range s.levels {
  185. if err := l.close(); err != nil && firstErr == nil {
  186. firstErr = err
  187. }
  188. }
  189. return firstErr
  190. }
  191. // dropTree picks all tables from all levels, creates a manifest changeset,
  192. // applies it, and then decrements the refs of these tables, which would result
  193. // in their deletion.
  194. func (s *levelsController) dropTree() (int, error) {
  195. // First pick all tables, so we can create a manifest changelog.
  196. var all []*table.Table
  197. for _, l := range s.levels {
  198. l.RLock()
  199. all = append(all, l.tables...)
  200. l.RUnlock()
  201. }
  202. if len(all) == 0 {
  203. return 0, nil
  204. }
  205. // Generate the manifest changes.
  206. changes := []*pb.ManifestChange{}
  207. for _, table := range all {
  208. // Add a delete change only if the table is not in memory.
  209. if !table.IsInmemory {
  210. changes = append(changes, newDeleteChange(table.ID()))
  211. }
  212. }
  213. changeSet := pb.ManifestChangeSet{Changes: changes}
  214. if err := s.kv.manifest.addChanges(changeSet.Changes); err != nil {
  215. return 0, err
  216. }
  217. // Now that manifest has been successfully written, we can delete the tables.
  218. for _, l := range s.levels {
  219. l.Lock()
  220. l.totalSize = 0
  221. l.tables = l.tables[:0]
  222. l.Unlock()
  223. }
  224. for _, table := range all {
  225. if err := table.DecrRef(); err != nil {
  226. return 0, err
  227. }
  228. }
  229. return len(all), nil
  230. }
  231. // dropPrefix runs a L0->L1 compaction, and then runs same level compaction on the rest of the
  232. // levels. For L0->L1 compaction, it runs compactions normally, but skips over
  233. // all the keys with the provided prefix.
  234. // For Li->Li compactions, it picks up the tables which would have the prefix. The
  235. // tables who only have keys with this prefix are quickly dropped. The ones which have other keys
  236. // are run through MergeIterator and compacted to create new tables. All the mechanisms of
  237. // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow.
  238. func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
  239. opt := s.kv.opt
  240. // Iterate levels in the reverse order because if we were to iterate from
  241. // lower level (say level 0) to a higher level (say level 3) we could have
  242. // a state in which level 0 is compacted and an older version of a key exists in lower level.
  243. // At this point, if someone creates an iterator, they would see an old
  244. // value for a key from lower levels. Iterating in reverse order ensures we
  245. // drop the oldest data first so that lookups never return stale data.
  246. for i := len(s.levels) - 1; i >= 0; i-- {
  247. l := s.levels[i]
  248. l.RLock()
  249. if l.level == 0 {
  250. size := len(l.tables)
  251. l.RUnlock()
  252. if size > 0 {
  253. cp := compactionPriority{
  254. level: 0,
  255. score: 1.74,
  256. // A unique number greater than 1.0 does two things. Helps identify this
  257. // function in logs, and forces a compaction.
  258. dropPrefixes: prefixes,
  259. }
  260. if err := s.doCompact(174, cp); err != nil {
  261. opt.Warningf("While compacting level 0: %v", err)
  262. return nil
  263. }
  264. }
  265. continue
  266. }
  267. // Build a list of compaction tableGroups affecting all the prefixes we
  268. // need to drop. We need to build tableGroups that satisfy the invariant that
  269. // bottom tables are consecutive.
  270. // tableGroup contains groups of consecutive tables.
  271. var tableGroups [][]*table.Table
  272. var tableGroup []*table.Table
  273. finishGroup := func() {
  274. if len(tableGroup) > 0 {
  275. tableGroups = append(tableGroups, tableGroup)
  276. tableGroup = nil
  277. }
  278. }
  279. for _, table := range l.tables {
  280. if containsAnyPrefixes(table, prefixes) {
  281. tableGroup = append(tableGroup, table)
  282. } else {
  283. finishGroup()
  284. }
  285. }
  286. finishGroup()
  287. l.RUnlock()
  288. if len(tableGroups) == 0 {
  289. continue
  290. }
  291. _, span := otrace.StartSpan(context.Background(), "Badger.Compaction")
  292. span.Annotatef(nil, "Compaction level: %v", l.level)
  293. span.Annotatef(nil, "Drop Prefixes: %v", prefixes)
  294. defer span.End()
  295. opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups))
  296. for _, operation := range tableGroups {
  297. cd := compactDef{
  298. span: span,
  299. thisLevel: l,
  300. nextLevel: l,
  301. top: nil,
  302. bot: operation,
  303. dropPrefixes: prefixes,
  304. t: s.levelTargets(),
  305. }
  306. cd.t.baseLevel = l.level
  307. if err := s.runCompactDef(-1, l.level, cd); err != nil {
  308. opt.Warningf("While running compact def: %+v. Error: %v", cd, err)
  309. return err
  310. }
  311. }
  312. }
  313. return nil
  314. }
  315. func (s *levelsController) startCompact(lc *z.Closer) {
  316. n := s.kv.opt.NumCompactors
  317. lc.AddRunning(n - 1)
  318. for i := 0; i < n; i++ {
  319. go s.runCompactor(i, lc)
  320. }
  321. }
  322. type targets struct {
  323. baseLevel int
  324. targetSz []int64
  325. fileSz []int64
  326. }
  327. // levelTargets calculates the targets for levels in the LSM tree. The idea comes from Dynamic Level
  328. // Sizes ( https://rocksdb.org/blog/2015/07/23/dynamic-level.html ) in RocksDB. The sizes of levels
  329. // are calculated based on the size of the lowest level, typically L6. So, if L6 size is 1GB, then
  330. // L5 target size is 100MB, L4 target size is 10MB and so on.
  331. //
  332. // L0 files don't automatically go to L1. Instead, they get compacted to Lbase, where Lbase is
  333. // chosen based on the first level which is non-empty from top (check L1 through L6). For an empty
  334. // DB, that would be L6. So, L0 compactions go to L6, then L5, L4 and so on.
  335. //
  336. // Lbase is advanced to the upper levels when its target size exceeds BaseLevelSize. For
  337. // example, when L6 reaches 1.1GB, then L4 target sizes becomes 11MB, thus exceeding the
  338. // BaseLevelSize of 10MB. L3 would then become the new Lbase, with a target size of 1MB <
  339. // BaseLevelSize.
  340. func (s *levelsController) levelTargets() targets {
  341. adjust := func(sz int64) int64 {
  342. if sz < s.kv.opt.BaseLevelSize {
  343. return s.kv.opt.BaseLevelSize
  344. }
  345. return sz
  346. }
  347. t := targets{
  348. targetSz: make([]int64, len(s.levels)),
  349. fileSz: make([]int64, len(s.levels)),
  350. }
  351. // DB size is the size of the last level.
  352. dbSize := s.lastLevel().getTotalSize()
  353. for i := len(s.levels) - 1; i > 0; i-- {
  354. ltarget := adjust(dbSize)
  355. t.targetSz[i] = ltarget
  356. if t.baseLevel == 0 && ltarget <= s.kv.opt.BaseLevelSize {
  357. t.baseLevel = i
  358. }
  359. dbSize /= int64(s.kv.opt.LevelSizeMultiplier)
  360. }
  361. tsz := s.kv.opt.BaseTableSize
  362. for i := 0; i < len(s.levels); i++ {
  363. if i == 0 {
  364. // Use MemTableSize for Level 0. Because at Level 0, we stop compactions based on the
  365. // number of tables, not the size of the level. So, having a 1:1 size ratio between
  366. // memtable size and the size of L0 files is better than churning out 32 files per
  367. // memtable (assuming 64MB MemTableSize and 2MB BaseTableSize).
  368. t.fileSz[i] = s.kv.opt.MemTableSize
  369. } else if i <= t.baseLevel {
  370. t.fileSz[i] = tsz
  371. } else {
  372. tsz *= int64(s.kv.opt.TableSizeMultiplier)
  373. t.fileSz[i] = tsz
  374. }
  375. }
  376. // Bring the base level down to the last empty level.
  377. for i := t.baseLevel + 1; i < len(s.levels)-1; i++ {
  378. if s.levels[i].getTotalSize() > 0 {
  379. break
  380. }
  381. t.baseLevel = i
  382. }
  383. // If the base level is empty and the next level size is less than the
  384. // target size, pick the next level as the base level.
  385. b := t.baseLevel
  386. lvl := s.levels
  387. if b < len(lvl)-1 && lvl[b].getTotalSize() == 0 && lvl[b+1].getTotalSize() < t.targetSz[b+1] {
  388. t.baseLevel++
  389. }
  390. return t
  391. }
  392. func (s *levelsController) runCompactor(id int, lc *z.Closer) {
  393. defer lc.Done()
  394. randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
  395. select {
  396. case <-randomDelay.C:
  397. case <-lc.HasBeenClosed():
  398. randomDelay.Stop()
  399. return
  400. }
  401. moveL0toFront := func(prios []compactionPriority) []compactionPriority {
  402. idx := -1
  403. for i, p := range prios {
  404. if p.level == 0 {
  405. idx = i
  406. break
  407. }
  408. }
  409. // If idx == -1, we didn't find L0.
  410. // If idx == 0, then we don't need to do anything. L0 is already at the front.
  411. if idx > 0 {
  412. out := append([]compactionPriority{}, prios[idx])
  413. out = append(out, prios[:idx]...)
  414. out = append(out, prios[idx+1:]...)
  415. return out
  416. }
  417. return prios
  418. }
  419. run := func(p compactionPriority) bool {
  420. err := s.doCompact(id, p)
  421. switch err {
  422. case nil:
  423. return true
  424. case errFillTables:
  425. // pass
  426. default:
  427. s.kv.opt.Warningf("While running doCompact: %v\n", err)
  428. }
  429. return false
  430. }
  431. var priosBuffer []compactionPriority
  432. runOnce := func() bool {
  433. prios := s.pickCompactLevels(priosBuffer)
  434. defer func() {
  435. priosBuffer = prios
  436. }()
  437. if id == 0 {
  438. // Worker ID zero prefers to compact L0 always.
  439. prios = moveL0toFront(prios)
  440. }
  441. for _, p := range prios {
  442. if id == 0 && p.level == 0 {
  443. // Allow worker zero to run level 0, irrespective of its adjusted score.
  444. } else if p.adjusted < 1.0 {
  445. break
  446. }
  447. if run(p) {
  448. return true
  449. }
  450. }
  451. return false
  452. }
  453. tryLmaxToLmaxCompaction := func() {
  454. p := compactionPriority{
  455. level: s.lastLevel().level,
  456. t: s.levelTargets(),
  457. }
  458. run(p)
  459. }
  460. count := 0
  461. ticker := time.NewTicker(50 * time.Millisecond)
  462. defer ticker.Stop()
  463. for {
  464. select {
  465. // Can add a done channel or other stuff.
  466. case <-ticker.C:
  467. count++
  468. // Each ticker is 50ms so 50*200=10seconds.
  469. if s.kv.opt.LmaxCompaction && id == 2 && count >= 200 {
  470. tryLmaxToLmaxCompaction()
  471. count = 0
  472. } else {
  473. runOnce()
  474. }
  475. case <-lc.HasBeenClosed():
  476. return
  477. }
  478. }
  479. }
  480. type compactionPriority struct {
  481. level int
  482. score float64
  483. adjusted float64
  484. dropPrefixes [][]byte
  485. t targets
  486. }
  487. func (s *levelsController) lastLevel() *levelHandler {
  488. return s.levels[len(s.levels)-1]
  489. }
  490. // pickCompactLevel determines which level to compact.
  491. // Based on: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction
  492. // It tries to reuse priosBuffer to reduce memory allocation,
  493. // passing nil is acceptable, then new memory will be allocated.
  494. func (s *levelsController) pickCompactLevels(priosBuffer []compactionPriority) (prios []compactionPriority) {
  495. t := s.levelTargets()
  496. addPriority := func(level int, score float64) {
  497. pri := compactionPriority{
  498. level: level,
  499. score: score,
  500. adjusted: score,
  501. t: t,
  502. }
  503. prios = append(prios, pri)
  504. }
  505. // Grow buffer to fit all levels.
  506. if cap(priosBuffer) < len(s.levels) {
  507. priosBuffer = make([]compactionPriority, 0, len(s.levels))
  508. }
  509. prios = priosBuffer[:0]
  510. // Add L0 priority based on the number of tables.
  511. addPriority(0, float64(s.levels[0].numTables())/float64(s.kv.opt.NumLevelZeroTables))
  512. // All other levels use size to calculate priority.
  513. for i := 1; i < len(s.levels); i++ {
  514. // Don't consider those tables that are already being compacted right now.
  515. delSize := s.cstatus.delSize(i)
  516. l := s.levels[i]
  517. sz := l.getTotalSize() - delSize
  518. addPriority(i, float64(sz)/float64(t.targetSz[i]))
  519. }
  520. y.AssertTrue(len(prios) == len(s.levels))
  521. // The following code is borrowed from PebbleDB and results in healthier LSM tree structure.
  522. // If Li-1 has score > 1.0, then we'll divide Li-1 score by Li. If Li score is >= 1.0, then Li-1
  523. // score is reduced, which means we'll prioritize the compaction of lower levels (L5, L4 and so
  524. // on) over the higher levels (L0, L1 and so on). On the other hand, if Li score is < 1.0, then
  525. // we'll increase the priority of Li-1.
  526. // Overall what this means is, if the bottom level is already overflowing, then de-prioritize
  527. // compaction of the above level. If the bottom level is not full, then increase the priority of
  528. // above level.
  529. var prevLevel int
  530. for level := t.baseLevel; level < len(s.levels); level++ {
  531. if prios[prevLevel].adjusted >= 1 {
  532. // Avoid absurdly large scores by placing a floor on the score that we'll
  533. // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily
  534. const minScore = 0.01
  535. if prios[level].score >= minScore {
  536. prios[prevLevel].adjusted /= prios[level].adjusted
  537. } else {
  538. prios[prevLevel].adjusted /= minScore
  539. }
  540. }
  541. prevLevel = level
  542. }
  543. // Pick all the levels whose original score is >= 1.0, irrespective of their adjusted score.
  544. // We'll still sort them by their adjusted score below. Having both these scores allows us to
  545. // make better decisions about compacting L0. If we see a score >= 1.0, we can do L0->L0
  546. // compactions. If the adjusted score >= 1.0, then we can do L0->Lbase compactions.
  547. out := prios[:0]
  548. for _, p := range prios[:len(prios)-1] {
  549. if p.score >= 1.0 {
  550. out = append(out, p)
  551. }
  552. }
  553. prios = out
  554. // Sort by the adjusted score.
  555. sort.Slice(prios, func(i, j int) bool {
  556. return prios[i].adjusted > prios[j].adjusted
  557. })
  558. return prios
  559. }
  560. // checkOverlap checks if the given tables overlap with any level from the given "lev" onwards.
  561. func (s *levelsController) checkOverlap(tables []*table.Table, lev int) bool {
  562. kr := getKeyRange(tables...)
  563. for i, lh := range s.levels {
  564. if i < lev { // Skip upper levels.
  565. continue
  566. }
  567. lh.RLock()
  568. left, right := lh.overlappingTables(levelHandlerRLocked{}, kr)
  569. lh.RUnlock()
  570. if right-left > 0 {
  571. return true
  572. }
  573. }
  574. return false
  575. }
  576. // subcompact runs a single sub-compaction, iterating over the specified key-range only.
  577. //
  578. // We use splits to do a single compaction concurrently. If we have >= 3 tables
  579. // involved in the bottom level during compaction, we choose key ranges to
  580. // split the main compaction up into sub-compactions. Each sub-compaction runs
  581. // concurrently, only iterating over the provided key range, generating tables.
  582. // This speeds up the compaction significantly.
  583. func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef,
  584. inflightBuilders *y.Throttle, res chan<- *table.Table) {
  585. // Check overlap of the top level with the levels which are not being
  586. // compacted in this compaction.
  587. hasOverlap := s.checkOverlap(cd.allTables(), cd.nextLevel.level+1)
  588. // Pick a discard ts, so we can discard versions below this ts. We should
  589. // never discard any versions starting from above this timestamp, because
  590. // that would affect the snapshot view guarantee provided by transactions.
  591. discardTs := s.kv.orc.discardAtOrBelow()
  592. // Try to collect stats so that we can inform value log about GC. That would help us find which
  593. // value log file should be GCed.
  594. discardStats := make(map[uint32]int64)
  595. updateStats := func(vs y.ValueStruct) {
  596. // We don't need to store/update discard stats when badger is running in Disk-less mode.
  597. if s.kv.opt.InMemory {
  598. return
  599. }
  600. if vs.Meta&bitValuePointer > 0 {
  601. var vp valuePointer
  602. vp.Decode(vs.Value)
  603. discardStats[vp.Fid] += int64(vp.Len)
  604. }
  605. }
  606. // exceedsAllowedOverlap returns true if the given key range would overlap with more than 10
  607. // tables from level below nextLevel (nextLevel+1). This helps avoid generating tables at Li
  608. // with huge overlaps with Li+1.
  609. exceedsAllowedOverlap := func(kr keyRange) bool {
  610. n2n := cd.nextLevel.level + 1
  611. if n2n <= 1 || n2n >= len(s.levels) {
  612. return false
  613. }
  614. n2nl := s.levels[n2n]
  615. n2nl.RLock()
  616. defer n2nl.RUnlock()
  617. l, r := n2nl.overlappingTables(levelHandlerRLocked{}, kr)
  618. return r-l >= 10
  619. }
  620. var (
  621. lastKey, skipKey []byte
  622. numBuilds, numVersions int
  623. // Denotes if the first key is a series of duplicate keys had
  624. // "DiscardEarlierVersions" set
  625. firstKeyHasDiscardSet bool
  626. )
  627. addKeys := func(builder *table.Builder) {
  628. timeStart := time.Now()
  629. var numKeys, numSkips uint64
  630. var rangeCheck int
  631. var tableKr keyRange
  632. for ; it.Valid(); it.Next() {
  633. // See if we need to skip the prefix.
  634. if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) {
  635. numSkips++
  636. updateStats(it.Value())
  637. continue
  638. }
  639. // See if we need to skip this key.
  640. if len(skipKey) > 0 {
  641. if y.SameKey(it.Key(), skipKey) {
  642. numSkips++
  643. updateStats(it.Value())
  644. continue
  645. } else {
  646. skipKey = skipKey[:0]
  647. }
  648. }
  649. if !y.SameKey(it.Key(), lastKey) {
  650. firstKeyHasDiscardSet = false
  651. if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
  652. break
  653. }
  654. if builder.ReachedCapacity() {
  655. // Only break if we are on a different key, and have reached capacity. We want
  656. // to ensure that all versions of the key are stored in the same sstable, and
  657. // not divided across multiple tables at the same level.
  658. break
  659. }
  660. lastKey = y.SafeCopy(lastKey, it.Key())
  661. numVersions = 0
  662. firstKeyHasDiscardSet = it.Value().Meta&bitDiscardEarlierVersions > 0
  663. if len(tableKr.left) == 0 {
  664. tableKr.left = y.SafeCopy(tableKr.left, it.Key())
  665. }
  666. tableKr.right = lastKey
  667. rangeCheck++
  668. if rangeCheck%5000 == 0 {
  669. // This table's range exceeds the allowed range overlap with the level after
  670. // next. So, we stop writing to this table. If we don't do this, then we end up
  671. // doing very expensive compactions involving too many tables. To amortize the
  672. // cost of this check, we do it only every N keys.
  673. if exceedsAllowedOverlap(tableKr) {
  674. // s.kv.opt.Debugf("L%d -> L%d Breaking due to exceedsAllowedOverlap with
  675. // kr: %s\n", cd.thisLevel.level, cd.nextLevel.level, tableKr)
  676. break
  677. }
  678. }
  679. }
  680. vs := it.Value()
  681. version := y.ParseTs(it.Key())
  682. isExpired := isDeletedOrExpired(vs.Meta, vs.ExpiresAt)
  683. // Do not discard entries inserted by merge operator. These entries will be
  684. // discarded once they're merged
  685. if version <= discardTs && vs.Meta&bitMergeEntry == 0 {
  686. // Keep track of the number of versions encountered for this key. Only consider the
  687. // versions which are below the minReadTs, otherwise, we might end up discarding the
  688. // only valid version for a running transaction.
  689. numVersions++
  690. // Keep the current version and discard all the next versions if
  691. // - The `discardEarlierVersions` bit is set OR
  692. // - We've already processed `NumVersionsToKeep` number of versions
  693. // (including the current item being processed)
  694. lastValidVersion := vs.Meta&bitDiscardEarlierVersions > 0 ||
  695. numVersions == s.kv.opt.NumVersionsToKeep
  696. if isExpired || lastValidVersion {
  697. // If this version of the key is deleted or expired, skip all the rest of the
  698. // versions. Ensure that we're only removing versions below readTs.
  699. skipKey = y.SafeCopy(skipKey, it.Key())
  700. switch {
  701. // Add the key to the table only if it has not expired.
  702. // We don't want to add the deleted/expired keys.
  703. case !isExpired && lastValidVersion:
  704. // Add this key. We have set skipKey, so the following key versions
  705. // would be skipped.
  706. case hasOverlap:
  707. // If this key range has overlap with lower levels, then keep the deletion
  708. // marker with the latest version, discarding the rest. We have set skipKey,
  709. // so the following key versions would be skipped.
  710. default:
  711. // If no overlap, we can skip all the versions, by continuing here.
  712. numSkips++
  713. updateStats(vs)
  714. continue // Skip adding this key.
  715. }
  716. }
  717. }
  718. numKeys++
  719. var vp valuePointer
  720. if vs.Meta&bitValuePointer > 0 {
  721. vp.Decode(vs.Value)
  722. }
  723. switch {
  724. case firstKeyHasDiscardSet:
  725. // This key is same as the last key which had "DiscardEarlierVersions" set. The
  726. // the next compactions will drop this key if its ts >
  727. // discardTs (of the next compaction).
  728. builder.AddStaleKey(it.Key(), vs, vp.Len)
  729. case isExpired:
  730. // If the key is expired, the next compaction will drop it if
  731. // its ts > discardTs (of the next compaction).
  732. builder.AddStaleKey(it.Key(), vs, vp.Len)
  733. default:
  734. builder.Add(it.Key(), vs, vp.Len)
  735. }
  736. }
  737. s.kv.opt.Debugf("[%d] LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
  738. cd.compactorId, numKeys, numSkips, time.Since(timeStart).Round(time.Millisecond))
  739. } // End of function: addKeys
  740. if len(kr.left) > 0 {
  741. it.Seek(kr.left)
  742. } else {
  743. it.Rewind()
  744. }
  745. for it.Valid() {
  746. if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 {
  747. break
  748. }
  749. bopts := buildTableOptions(s.kv)
  750. // Set TableSize to the target file size for that level.
  751. bopts.TableSize = uint64(cd.t.fileSz[cd.nextLevel.level])
  752. builder := table.NewTableBuilder(bopts)
  753. // This would do the iteration and add keys to builder.
  754. addKeys(builder)
  755. // It was true that it.Valid() at least once in the loop above, which means we
  756. // called Add() at least once, and builder is not Empty().
  757. if builder.Empty() {
  758. // Cleanup builder resources:
  759. builder.Finish()
  760. builder.Close()
  761. continue
  762. }
  763. numBuilds++
  764. if err := inflightBuilders.Do(); err != nil {
  765. // Can't return from here, until I decrRef all the tables that I built so far.
  766. break
  767. }
  768. go func(builder *table.Builder, fileID uint64) {
  769. var err error
  770. defer inflightBuilders.Done(err)
  771. defer builder.Close()
  772. var tbl *table.Table
  773. if s.kv.opt.InMemory {
  774. tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
  775. } else {
  776. fname := table.NewFilename(fileID, s.kv.opt.Dir)
  777. tbl, err = table.CreateTable(fname, builder)
  778. }
  779. // If we couldn't build the table, return fast.
  780. if err != nil {
  781. return
  782. }
  783. res <- tbl
  784. }(builder, s.reserveFileID())
  785. }
  786. s.kv.vlog.updateDiscardStats(discardStats)
  787. s.kv.opt.Debugf("Discard stats: %v", discardStats)
  788. }
  789. // compactBuildTables merges topTables and botTables to form a list of new tables.
  790. func (s *levelsController) compactBuildTables(
  791. lev int, cd compactDef) ([]*table.Table, func() error, error) {
  792. topTables := cd.top
  793. botTables := cd.bot
  794. numTables := int64(len(topTables) + len(botTables))
  795. y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, numTables)
  796. defer y.NumCompactionTablesAdd(s.kv.opt.MetricsEnabled, -numTables)
  797. cd.span.Annotatef(nil, "Top tables count: %v Bottom tables count: %v",
  798. len(topTables), len(botTables))
  799. keepTable := func(t *table.Table) bool {
  800. for _, prefix := range cd.dropPrefixes {
  801. if bytes.HasPrefix(t.Smallest(), prefix) &&
  802. bytes.HasPrefix(t.Biggest(), prefix) {
  803. // All the keys in this table have the dropPrefix. So, this
  804. // table does not need to be in the iterator and can be
  805. // dropped immediately.
  806. return false
  807. }
  808. }
  809. return true
  810. }
  811. var valid []*table.Table
  812. for _, table := range botTables {
  813. if keepTable(table) {
  814. valid = append(valid, table)
  815. }
  816. }
  817. newIterator := func() []y.Iterator {
  818. // Create iterators across all the tables involved first.
  819. var iters []y.Iterator
  820. switch {
  821. case lev == 0:
  822. iters = appendIteratorsReversed(iters, topTables, table.NOCACHE)
  823. case len(topTables) > 0:
  824. y.AssertTrue(len(topTables) == 1)
  825. iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)}
  826. }
  827. // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap.
  828. return append(iters, table.NewConcatIterator(valid, table.NOCACHE))
  829. }
  830. res := make(chan *table.Table, 3)
  831. inflightBuilders := y.NewThrottle(8 + len(cd.splits))
  832. for _, kr := range cd.splits {
  833. // Initiate Do here so we can register the goroutines for buildTables too.
  834. if err := inflightBuilders.Do(); err != nil {
  835. s.kv.opt.Errorf("cannot start subcompaction: %+v", err)
  836. return nil, nil, err
  837. }
  838. go func(kr keyRange) {
  839. defer inflightBuilders.Done(nil)
  840. it := table.NewMergeIterator(newIterator(), false)
  841. defer it.Close()
  842. s.subcompact(it, kr, cd, inflightBuilders, res)
  843. }(kr)
  844. }
  845. var newTables []*table.Table
  846. var wg sync.WaitGroup
  847. wg.Add(1)
  848. go func() {
  849. defer wg.Done()
  850. for t := range res {
  851. newTables = append(newTables, t)
  852. }
  853. }()
  854. // Wait for all table builders to finish and also for newTables accumulator to finish.
  855. err := inflightBuilders.Finish()
  856. close(res)
  857. wg.Wait() // Wait for all tables to be picked up.
  858. if err == nil {
  859. // Ensure created files' directory entries are visible. We don't mind the extra latency
  860. // from not doing this ASAP after all file creation has finished because this is a
  861. // background operation.
  862. err = s.kv.syncDir(s.kv.opt.Dir)
  863. }
  864. if err != nil {
  865. // An error happened. Delete all the newly created table files (by calling DecrRef
  866. // -- we're the only holders of a ref).
  867. _ = decrRefs(newTables)
  868. return nil, nil, y.Wrapf(err, "while running compactions for: %+v", cd)
  869. }
  870. sort.Slice(newTables, func(i, j int) bool {
  871. return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0
  872. })
  873. return newTables, func() error { return decrRefs(newTables) }, nil
  874. }
  875. func buildChangeSet(cd *compactDef, newTables []*table.Table) pb.ManifestChangeSet {
  876. changes := []*pb.ManifestChange{}
  877. for _, table := range newTables {
  878. changes = append(changes,
  879. newCreateChange(table.ID(), cd.nextLevel.level, table.KeyID(), table.CompressionType()))
  880. }
  881. for _, table := range cd.top {
  882. // Add a delete change only if the table is not in memory.
  883. if !table.IsInmemory {
  884. changes = append(changes, newDeleteChange(table.ID()))
  885. }
  886. }
  887. for _, table := range cd.bot {
  888. changes = append(changes, newDeleteChange(table.ID()))
  889. }
  890. return pb.ManifestChangeSet{Changes: changes}
  891. }
  892. func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool {
  893. for _, prefix := range listOfPrefixes {
  894. if bytes.HasPrefix(s, prefix) {
  895. return true
  896. }
  897. }
  898. return false
  899. }
  900. func containsPrefix(table *table.Table, prefix []byte) bool {
  901. smallValue := table.Smallest()
  902. largeValue := table.Biggest()
  903. if bytes.HasPrefix(smallValue, prefix) {
  904. return true
  905. }
  906. if bytes.HasPrefix(largeValue, prefix) {
  907. return true
  908. }
  909. isPresent := func() bool {
  910. ti := table.NewIterator(0)
  911. defer ti.Close()
  912. // In table iterator's Seek, we assume that key has version in last 8 bytes. We set
  913. // version=0 (ts=math.MaxUint64), so that we don't skip the key prefixed with prefix.
  914. ti.Seek(y.KeyWithTs(prefix, math.MaxUint64))
  915. return bytes.HasPrefix(ti.Key(), prefix)
  916. }
  917. if bytes.Compare(prefix, smallValue) > 0 &&
  918. bytes.Compare(prefix, largeValue) < 0 {
  919. // There may be a case when table contains [0x0000,...., 0xffff]. If we are searching for
  920. // k=0x0011, we should not directly infer that k is present. It may not be present.
  921. return isPresent()
  922. }
  923. return false
  924. }
  925. func containsAnyPrefixes(table *table.Table, listOfPrefixes [][]byte) bool {
  926. for _, prefix := range listOfPrefixes {
  927. if containsPrefix(table, prefix) {
  928. return true
  929. }
  930. }
  931. return false
  932. }
  933. type compactDef struct {
  934. span *otrace.Span
  935. compactorId int
  936. t targets
  937. p compactionPriority
  938. thisLevel *levelHandler
  939. nextLevel *levelHandler
  940. top []*table.Table
  941. bot []*table.Table
  942. thisRange keyRange
  943. nextRange keyRange
  944. splits []keyRange
  945. thisSize int64
  946. dropPrefixes [][]byte
  947. }
  948. // addSplits can allow us to run multiple sub-compactions in parallel across the split key ranges.
  949. func (s *levelsController) addSplits(cd *compactDef) {
  950. cd.splits = cd.splits[:0]
  951. // Let's say we have 10 tables in cd.bot and min width = 3. Then, we'll pick
  952. // 0, 1, 2 (pick), 3, 4, 5 (pick), 6, 7, 8 (pick), 9 (pick, because last table).
  953. // This gives us 4 picks for 10 tables.
  954. // In an edge case, 142 tables in bottom led to 48 splits. That's too many splits, because it
  955. // then uses up a lot of memory for table builder.
  956. // We should keep it so we have at max 5 splits.
  957. width := int(math.Ceil(float64(len(cd.bot)) / 5.0))
  958. if width < 3 {
  959. width = 3
  960. }
  961. skr := cd.thisRange
  962. skr.extend(cd.nextRange)
  963. addRange := func(right []byte) {
  964. skr.right = y.Copy(right)
  965. cd.splits = append(cd.splits, skr)
  966. skr.left = skr.right
  967. }
  968. for i, t := range cd.bot {
  969. // last entry in bottom table.
  970. if i == len(cd.bot)-1 {
  971. addRange([]byte{})
  972. return
  973. }
  974. if i%width == width-1 {
  975. // Right is assigned ts=0. The encoding ts bytes take MaxUint64-ts,
  976. // so, those with smaller TS will be considered larger for the same key.
  977. // Consider the following.
  978. // Top table is [A1...C3(deleted)]
  979. // bot table is [B1....C2]
  980. // It will generate a split [A1 ... C0], including any records of Key C.
  981. right := y.KeyWithTs(y.ParseKey(t.Biggest()), 0)
  982. addRange(right)
  983. }
  984. }
  985. }
  986. func (cd *compactDef) lockLevels() {
  987. cd.thisLevel.RLock()
  988. cd.nextLevel.RLock()
  989. }
  990. func (cd *compactDef) unlockLevels() {
  991. cd.nextLevel.RUnlock()
  992. cd.thisLevel.RUnlock()
  993. }
  994. func (cd *compactDef) allTables() []*table.Table {
  995. ret := make([]*table.Table, 0, len(cd.top)+len(cd.bot))
  996. ret = append(ret, cd.top...)
  997. ret = append(ret, cd.bot...)
  998. return ret
  999. }
  1000. func (s *levelsController) fillTablesL0ToL0(cd *compactDef) bool {
  1001. if cd.compactorId != 0 {
  1002. // Only compactor zero can work on this.
  1003. return false
  1004. }
  1005. cd.nextLevel = s.levels[0]
  1006. cd.nextRange = keyRange{}
  1007. cd.bot = nil
  1008. // Because this level and next level are both level 0, we should NOT acquire
  1009. // the read lock twice, because it can result in a deadlock. So, we don't
  1010. // call compactDef.lockLevels, instead locking the level only once and
  1011. // directly here.
  1012. //
  1013. // As per godocs on RWMutex:
  1014. // If a goroutine holds a RWMutex for reading and another goroutine might
  1015. // call Lock, no goroutine should expect to be able to acquire a read lock
  1016. // until the initial read lock is released. In particular, this prohibits
  1017. // recursive read locking. This is to ensure that the lock eventually
  1018. // becomes available; a blocked Lock call excludes new readers from
  1019. // acquiring the lock.
  1020. y.AssertTrue(cd.thisLevel.level == 0)
  1021. y.AssertTrue(cd.nextLevel.level == 0)
  1022. s.levels[0].RLock()
  1023. defer s.levels[0].RUnlock()
  1024. s.cstatus.Lock()
  1025. defer s.cstatus.Unlock()
  1026. top := cd.thisLevel.tables
  1027. var out []*table.Table
  1028. now := time.Now()
  1029. for _, t := range top {
  1030. if t.Size() >= 2*cd.t.fileSz[0] {
  1031. // This file is already big, don't include it.
  1032. continue
  1033. }
  1034. if now.Sub(t.CreatedAt) < 10*time.Second {
  1035. // Just created it 10s ago. Don't pick for compaction.
  1036. continue
  1037. }
  1038. if _, beingCompacted := s.cstatus.tables[t.ID()]; beingCompacted {
  1039. continue
  1040. }
  1041. out = append(out, t)
  1042. }
  1043. if len(out) < 4 {
  1044. // If we don't have enough tables to merge in L0, don't do it.
  1045. return false
  1046. }
  1047. cd.thisRange = infRange
  1048. cd.top = out
  1049. // Avoid any other L0 -> Lbase from happening, while this is going on.
  1050. thisLevel := s.cstatus.levels[cd.thisLevel.level]
  1051. thisLevel.ranges = append(thisLevel.ranges, infRange)
  1052. for _, t := range out {
  1053. s.cstatus.tables[t.ID()] = struct{}{}
  1054. }
  1055. // For L0->L0 compaction, we set the target file size to max, so the output is always one file.
  1056. // This significantly decreases the L0 table stalls and improves the performance.
  1057. cd.t.fileSz[0] = math.MaxUint32
  1058. return true
  1059. }
  1060. func (s *levelsController) fillTablesL0ToLbase(cd *compactDef) bool {
  1061. if cd.nextLevel.level == 0 {
  1062. panic("Base level can't be zero.")
  1063. }
  1064. // We keep cd.p.adjusted > 0.0 here to allow functions in db.go to artificially trigger
  1065. // L0->Lbase compactions. Those functions wouldn't be setting the adjusted score.
  1066. if cd.p.adjusted > 0.0 && cd.p.adjusted < 1.0 {
  1067. // Do not compact to Lbase if adjusted score is less than 1.0.
  1068. return false
  1069. }
  1070. cd.lockLevels()
  1071. defer cd.unlockLevels()
  1072. top := cd.thisLevel.tables
  1073. if len(top) == 0 {
  1074. return false
  1075. }
  1076. var out []*table.Table
  1077. if len(cd.dropPrefixes) > 0 {
  1078. // Use all tables if drop prefix is set. We don't want to compact only a
  1079. // sub-range. We want to compact all the tables.
  1080. out = top
  1081. } else {
  1082. var kr keyRange
  1083. // cd.top[0] is the oldest file. So we start from the oldest file first.
  1084. for _, t := range top {
  1085. dkr := getKeyRange(t)
  1086. if kr.overlapsWith(dkr) {
  1087. out = append(out, t)
  1088. kr.extend(dkr)
  1089. } else {
  1090. break
  1091. }
  1092. }
  1093. }
  1094. cd.thisRange = getKeyRange(out...)
  1095. cd.top = out
  1096. left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
  1097. cd.bot = make([]*table.Table, right-left)
  1098. copy(cd.bot, cd.nextLevel.tables[left:right])
  1099. if len(cd.bot) == 0 {
  1100. cd.nextRange = cd.thisRange
  1101. } else {
  1102. cd.nextRange = getKeyRange(cd.bot...)
  1103. }
  1104. return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
  1105. }
  1106. // fillTablesL0 would try to fill tables from L0 to be compacted with Lbase. If
  1107. // it can not do that, it would try to compact tables from L0 -> L0.
  1108. //
  1109. // Say L0 has 10 tables.
  1110. // fillTablesL0ToLbase picks up 5 tables to compact from L0 -> L5.
  1111. // Next call to fillTablesL0 would run L0ToLbase again, which fails this time.
  1112. // So, instead, we run fillTablesL0ToL0, which picks up rest of the 5 tables to
  1113. // be compacted within L0. Additionally, it would set the compaction range in
  1114. // cstatus to inf, so no other L0 -> Lbase compactions can happen.
  1115. // Thus, L0 -> L0 must finish for the next L0 -> Lbase to begin.
  1116. func (s *levelsController) fillTablesL0(cd *compactDef) bool {
  1117. if ok := s.fillTablesL0ToLbase(cd); ok {
  1118. return true
  1119. }
  1120. return s.fillTablesL0ToL0(cd)
  1121. }
  1122. // sortByStaleData sorts tables based on the amount of stale data they have.
  1123. // This is useful in removing tombstones.
  1124. func (s *levelsController) sortByStaleDataSize(tables []*table.Table, cd *compactDef) {
  1125. if len(tables) == 0 || cd.nextLevel == nil {
  1126. return
  1127. }
  1128. sort.Slice(tables, func(i, j int) bool {
  1129. return tables[i].StaleDataSize() > tables[j].StaleDataSize()
  1130. })
  1131. }
  1132. // sortByHeuristic sorts tables in increasing order of MaxVersion, so we
  1133. // compact older tables first.
  1134. func (s *levelsController) sortByHeuristic(tables []*table.Table, cd *compactDef) {
  1135. if len(tables) == 0 || cd.nextLevel == nil {
  1136. return
  1137. }
  1138. // Sort tables by max version. This is what RocksDB does.
  1139. sort.Slice(tables, func(i, j int) bool {
  1140. return tables[i].MaxVersion() < tables[j].MaxVersion()
  1141. })
  1142. }
  1143. // This function should be called with lock on levels.
  1144. func (s *levelsController) fillMaxLevelTables(tables []*table.Table, cd *compactDef) bool {
  1145. sortedTables := make([]*table.Table, len(tables))
  1146. copy(sortedTables, tables)
  1147. s.sortByStaleDataSize(sortedTables, cd)
  1148. if len(sortedTables) > 0 && sortedTables[0].StaleDataSize() == 0 {
  1149. // This is a maxLevel to maxLevel compaction and we don't have any stale data.
  1150. return false
  1151. }
  1152. cd.bot = []*table.Table{}
  1153. collectBotTables := func(t *table.Table, needSz int64) {
  1154. totalSize := t.Size()
  1155. j := sort.Search(len(tables), func(i int) bool {
  1156. return y.CompareKeys(tables[i].Smallest(), t.Smallest()) >= 0
  1157. })
  1158. y.AssertTrue(tables[j].ID() == t.ID())
  1159. j++
  1160. // Collect tables until we reach the the required size.
  1161. for j < len(tables) {
  1162. newT := tables[j]
  1163. totalSize += newT.Size()
  1164. if totalSize >= needSz {
  1165. break
  1166. }
  1167. cd.bot = append(cd.bot, newT)
  1168. cd.nextRange.extend(getKeyRange(newT))
  1169. j++
  1170. }
  1171. }
  1172. now := time.Now()
  1173. for _, t := range sortedTables {
  1174. // If the maxVersion is above the discardTs, we won't clean anything in
  1175. // the compaction. So skip this table.
  1176. if t.MaxVersion() > s.kv.orc.discardAtOrBelow() {
  1177. continue
  1178. }
  1179. if now.Sub(t.CreatedAt) < time.Hour {
  1180. // Just created it an hour ago. Don't pick for compaction.
  1181. continue
  1182. }
  1183. // If the stale data size is less than 10 MB, it might not be worth
  1184. // rewriting the table. Skip it.
  1185. if t.StaleDataSize() < 10<<20 {
  1186. continue
  1187. }
  1188. cd.thisSize = t.Size()
  1189. cd.thisRange = getKeyRange(t)
  1190. // Set the next range as the same as the current range. If we don't do
  1191. // this, we won't be able to run more than one max level compactions.
  1192. cd.nextRange = cd.thisRange
  1193. // If we're already compacting this range, don't do anything.
  1194. if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
  1195. continue
  1196. }
  1197. // Found a valid table!
  1198. cd.top = []*table.Table{t}
  1199. needFileSz := cd.t.fileSz[cd.thisLevel.level]
  1200. // The table size is what we want so no need to collect more tables.
  1201. if t.Size() >= needFileSz {
  1202. break
  1203. }
  1204. // TableSize is less than what we want. Collect more tables for compaction.
  1205. // If the level has multiple small tables, we collect all of them
  1206. // together to form a bigger table.
  1207. collectBotTables(t, needFileSz)
  1208. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  1209. cd.bot = cd.bot[:0]
  1210. cd.nextRange = keyRange{}
  1211. continue
  1212. }
  1213. return true
  1214. }
  1215. if len(cd.top) == 0 {
  1216. return false
  1217. }
  1218. return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd)
  1219. }
  1220. func (s *levelsController) fillTables(cd *compactDef) bool {
  1221. cd.lockLevels()
  1222. defer cd.unlockLevels()
  1223. tables := make([]*table.Table, len(cd.thisLevel.tables))
  1224. copy(tables, cd.thisLevel.tables)
  1225. if len(tables) == 0 {
  1226. return false
  1227. }
  1228. // We're doing a maxLevel to maxLevel compaction. Pick tables based on the stale data size.
  1229. if cd.thisLevel.isLastLevel() {
  1230. return s.fillMaxLevelTables(tables, cd)
  1231. }
  1232. // We pick tables, so we compact older tables first. This is similar to
  1233. // kOldestLargestSeqFirst in RocksDB.
  1234. s.sortByHeuristic(tables, cd)
  1235. for _, t := range tables {
  1236. cd.thisSize = t.Size()
  1237. cd.thisRange = getKeyRange(t)
  1238. // If we're already compacting this range, don't do anything.
  1239. if s.cstatus.overlapsWith(cd.thisLevel.level, cd.thisRange) {
  1240. continue
  1241. }
  1242. cd.top = []*table.Table{t}
  1243. left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange)
  1244. cd.bot = make([]*table.Table, right-left)
  1245. copy(cd.bot, cd.nextLevel.tables[left:right])
  1246. if len(cd.bot) == 0 {
  1247. cd.bot = []*table.Table{}
  1248. cd.nextRange = cd.thisRange
  1249. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  1250. continue
  1251. }
  1252. return true
  1253. }
  1254. cd.nextRange = getKeyRange(cd.bot...)
  1255. if s.cstatus.overlapsWith(cd.nextLevel.level, cd.nextRange) {
  1256. continue
  1257. }
  1258. if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) {
  1259. continue
  1260. }
  1261. return true
  1262. }
  1263. return false
  1264. }
  1265. func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) {
  1266. if len(cd.t.fileSz) == 0 {
  1267. return errors.New("Filesizes cannot be zero. Targets are not set")
  1268. }
  1269. timeStart := time.Now()
  1270. thisLevel := cd.thisLevel
  1271. nextLevel := cd.nextLevel
  1272. y.AssertTrue(len(cd.splits) == 0)
  1273. if thisLevel.level == nextLevel.level {
  1274. // don't do anything for L0 -> L0 and Lmax -> Lmax.
  1275. } else {
  1276. s.addSplits(&cd)
  1277. }
  1278. if len(cd.splits) == 0 {
  1279. cd.splits = append(cd.splits, keyRange{})
  1280. }
  1281. // Table should never be moved directly between levels,
  1282. // always be rewritten to allow discarding invalid versions.
  1283. newTables, decr, err := s.compactBuildTables(l, cd)
  1284. if err != nil {
  1285. return err
  1286. }
  1287. defer func() {
  1288. // Only assign to err, if it's not already nil.
  1289. if decErr := decr(); err == nil {
  1290. err = decErr
  1291. }
  1292. }()
  1293. changeSet := buildChangeSet(&cd, newTables)
  1294. // We write to the manifest _before_ we delete files (and after we created files)
  1295. if err := s.kv.manifest.addChanges(changeSet.Changes); err != nil {
  1296. return err
  1297. }
  1298. getSizes := func(tables []*table.Table) int64 {
  1299. size := int64(0)
  1300. for _, i := range tables {
  1301. size += i.Size()
  1302. }
  1303. return size
  1304. }
  1305. sizeNewTables := int64(0)
  1306. sizeOldTables := int64(0)
  1307. if s.kv.opt.MetricsEnabled {
  1308. sizeNewTables = getSizes(newTables)
  1309. sizeOldTables = getSizes(cd.bot) + getSizes(cd.top)
  1310. y.NumBytesCompactionWrittenAdd(s.kv.opt.MetricsEnabled, nextLevel.strLevel, sizeNewTables)
  1311. }
  1312. // See comment earlier in this function about the ordering of these ops, and the order in which
  1313. // we access levels when reading.
  1314. if err := nextLevel.replaceTables(cd.bot, newTables); err != nil {
  1315. return err
  1316. }
  1317. if err := thisLevel.deleteTables(cd.top); err != nil {
  1318. return err
  1319. }
  1320. // Note: For level 0, while doCompact is running, it is possible that new tables are added.
  1321. // However, the tables are added only to the end, so it is ok to just delete the first table.
  1322. from := append(tablesToString(cd.top), tablesToString(cd.bot)...)
  1323. to := tablesToString(newTables)
  1324. if dur := time.Since(timeStart); dur > 2*time.Second {
  1325. var expensive string
  1326. if dur > time.Second {
  1327. expensive = " [E]"
  1328. }
  1329. s.kv.opt.Infof("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)."+
  1330. " [%s] -> [%s], took %v\n, deleted %d bytes",
  1331. id, expensive, thisLevel.level, nextLevel.level, len(cd.top), len(cd.bot),
  1332. len(newTables), len(cd.splits), strings.Join(from, " "), strings.Join(to, " "),
  1333. dur.Round(time.Millisecond), sizeOldTables-sizeNewTables)
  1334. }
  1335. if cd.thisLevel.level != 0 && len(newTables) > 2*s.kv.opt.LevelSizeMultiplier {
  1336. s.kv.opt.Infof("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
  1337. len(cd.top), hex.Dump(cd.thisRange.left), hex.Dump(cd.thisRange.right))
  1338. s.kv.opt.Infof("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n",
  1339. len(cd.bot), hex.Dump(cd.nextRange.left), hex.Dump(cd.nextRange.right))
  1340. }
  1341. return nil
  1342. }
  1343. func tablesToString(tables []*table.Table) []string {
  1344. var res []string
  1345. for _, t := range tables {
  1346. res = append(res, fmt.Sprintf("%05d", t.ID()))
  1347. }
  1348. res = append(res, ".")
  1349. return res
  1350. }
  1351. var errFillTables = stderrors.New("Unable to fill tables")
  1352. // doCompact picks some table on level l and compacts it away to the next level.
  1353. func (s *levelsController) doCompact(id int, p compactionPriority) error {
  1354. l := p.level
  1355. y.AssertTrue(l < s.kv.opt.MaxLevels) // Sanity check.
  1356. if p.t.baseLevel == 0 {
  1357. p.t = s.levelTargets()
  1358. }
  1359. _, span := otrace.StartSpan(context.Background(), "Badger.Compaction")
  1360. defer span.End()
  1361. cd := compactDef{
  1362. compactorId: id,
  1363. span: span,
  1364. p: p,
  1365. t: p.t,
  1366. thisLevel: s.levels[l],
  1367. dropPrefixes: p.dropPrefixes,
  1368. }
  1369. // While picking tables to be compacted, both levels' tables are expected to
  1370. // remain unchanged.
  1371. if l == 0 {
  1372. cd.nextLevel = s.levels[p.t.baseLevel]
  1373. if !s.fillTablesL0(&cd) {
  1374. return errFillTables
  1375. }
  1376. } else {
  1377. cd.nextLevel = cd.thisLevel
  1378. // We're not compacting the last level so pick the next level.
  1379. if !cd.thisLevel.isLastLevel() {
  1380. cd.nextLevel = s.levels[l+1]
  1381. }
  1382. if !s.fillTables(&cd) {
  1383. return errFillTables
  1384. }
  1385. }
  1386. defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
  1387. span.Annotatef(nil, "Compaction: %+v", cd)
  1388. if err := s.runCompactDef(id, l, cd); err != nil {
  1389. // This compaction couldn't be done successfully.
  1390. s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
  1391. return err
  1392. }
  1393. s.kv.opt.Debugf("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
  1394. return nil
  1395. }
  1396. func (s *levelsController) addLevel0Table(t *table.Table) error {
  1397. // Add table to manifest file only if it is not opened in memory. We don't want to add a table
  1398. // to the manifest file if it exists only in memory.
  1399. if !t.IsInmemory {
  1400. // We update the manifest _before_ the table becomes part of a levelHandler, because at that
  1401. // point it could get used in some compaction. This ensures the manifest file gets updated in
  1402. // the proper order. (That means this update happens before that of some compaction which
  1403. // deletes the table.)
  1404. err := s.kv.manifest.addChanges([]*pb.ManifestChange{
  1405. newCreateChange(t.ID(), 0, t.KeyID(), t.CompressionType()),
  1406. })
  1407. if err != nil {
  1408. return err
  1409. }
  1410. }
  1411. for !s.levels[0].tryAddLevel0Table(t) {
  1412. // Before we unstall, we need to make sure that level 0 is healthy.
  1413. timeStart := time.Now()
  1414. for s.levels[0].numTables() >= s.kv.opt.NumLevelZeroTablesStall {
  1415. time.Sleep(10 * time.Millisecond)
  1416. }
  1417. dur := time.Since(timeStart)
  1418. if dur > time.Second {
  1419. s.kv.opt.Infof("L0 was stalled for %s\n", dur.Round(time.Millisecond))
  1420. }
  1421. s.l0stallsMs.Add(int64(dur.Round(time.Millisecond)))
  1422. }
  1423. return nil
  1424. }
  1425. func (s *levelsController) close() error {
  1426. err := s.cleanupLevels()
  1427. return y.Wrap(err, "levelsController.Close")
  1428. }
  1429. // get searches for a given key in all the levels of the LSM tree. It returns
  1430. // key version <= the expected version (version in key). If not found,
  1431. // it returns an empty y.ValueStruct.
  1432. func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) (
  1433. y.ValueStruct, error) {
  1434. if s.kv.IsClosed() {
  1435. return y.ValueStruct{}, ErrDBClosed
  1436. }
  1437. // It's important that we iterate the levels from 0 on upward. The reason is, if we iterated
  1438. // in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could
  1439. // read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do
  1440. // parallelize this, we will need to call the h.RLock() function by increasing order of level
  1441. // number.)
  1442. version := y.ParseTs(key)
  1443. for _, h := range s.levels {
  1444. // Ignore all levels below startLevel. This is useful for GC when L0 is kept in memory.
  1445. if h.level < startLevel {
  1446. continue
  1447. }
  1448. vs, err := h.get(key) // Calls h.RLock() and h.RUnlock().
  1449. if err != nil {
  1450. return y.ValueStruct{}, y.Wrapf(err, "get key: %q", key)
  1451. }
  1452. if vs.Value == nil && vs.Meta == 0 {
  1453. continue
  1454. }
  1455. y.NumBytesReadsLSMAdd(s.kv.opt.MetricsEnabled, int64(len(vs.Value)))
  1456. if vs.Version == version {
  1457. return vs, nil
  1458. }
  1459. if maxVs.Version < vs.Version {
  1460. maxVs = vs
  1461. }
  1462. }
  1463. if len(maxVs.Value) > 0 {
  1464. y.NumGetsWithResultsAdd(s.kv.opt.MetricsEnabled, 1)
  1465. }
  1466. return maxVs, nil
  1467. }
  1468. func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator {
  1469. for i := len(th) - 1; i >= 0; i-- {
  1470. // This will increment the reference of the table handler.
  1471. out = append(out, th[i].NewIterator(opt))
  1472. }
  1473. return out
  1474. }
  1475. // appendIterators appends iterators to an array of iterators, for merging.
  1476. // Note: This obtains references for the table handlers. Remember to close these iterators.
  1477. func (s *levelsController) appendIterators(
  1478. iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
  1479. // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing
  1480. // data when there's a compaction.
  1481. for _, level := range s.levels {
  1482. iters = level.appendIterators(iters, opt)
  1483. }
  1484. return iters
  1485. }
  1486. // TableInfo represents the information about a table.
  1487. type TableInfo struct {
  1488. ID uint64
  1489. Level int
  1490. Left []byte
  1491. Right []byte
  1492. KeyCount uint32 // Number of keys in the table
  1493. OnDiskSize uint32
  1494. StaleDataSize uint32
  1495. UncompressedSize uint32
  1496. MaxVersion uint64
  1497. IndexSz int
  1498. BloomFilterSize int
  1499. }
  1500. func (s *levelsController) getTableInfo() (result []TableInfo) {
  1501. for _, l := range s.levels {
  1502. l.RLock()
  1503. for _, t := range l.tables {
  1504. info := TableInfo{
  1505. ID: t.ID(),
  1506. Level: l.level,
  1507. Left: t.Smallest(),
  1508. Right: t.Biggest(),
  1509. KeyCount: t.KeyCount(),
  1510. OnDiskSize: t.OnDiskSize(),
  1511. StaleDataSize: t.StaleDataSize(),
  1512. IndexSz: t.IndexSize(),
  1513. BloomFilterSize: t.BloomFilterSize(),
  1514. UncompressedSize: t.UncompressedSize(),
  1515. MaxVersion: t.MaxVersion(),
  1516. }
  1517. result = append(result, info)
  1518. }
  1519. l.RUnlock()
  1520. }
  1521. sort.Slice(result, func(i, j int) bool {
  1522. if result[i].Level != result[j].Level {
  1523. return result[i].Level < result[j].Level
  1524. }
  1525. return result[i].ID < result[j].ID
  1526. })
  1527. return
  1528. }
  1529. type LevelInfo struct {
  1530. Level int
  1531. NumTables int
  1532. Size int64
  1533. TargetSize int64
  1534. TargetFileSize int64
  1535. IsBaseLevel bool
  1536. Score float64
  1537. Adjusted float64
  1538. StaleDatSize int64
  1539. }
  1540. func (s *levelsController) getLevelInfo() []LevelInfo {
  1541. t := s.levelTargets()
  1542. prios := s.pickCompactLevels(nil)
  1543. result := make([]LevelInfo, len(s.levels))
  1544. for i, l := range s.levels {
  1545. l.RLock()
  1546. result[i].Level = i
  1547. result[i].Size = l.totalSize
  1548. result[i].NumTables = len(l.tables)
  1549. result[i].StaleDatSize = l.totalStaleSize
  1550. l.RUnlock()
  1551. result[i].TargetSize = t.targetSz[i]
  1552. result[i].TargetFileSize = t.fileSz[i]
  1553. result[i].IsBaseLevel = t.baseLevel == i
  1554. }
  1555. for _, p := range prios {
  1556. result[p.level].Score = p.score
  1557. result[p.level].Adjusted = p.adjusted
  1558. }
  1559. return result
  1560. }
  1561. // verifyChecksum verifies checksum for all tables on all levels.
  1562. func (s *levelsController) verifyChecksum() error {
  1563. var tables []*table.Table
  1564. for _, l := range s.levels {
  1565. l.RLock()
  1566. tables = tables[:0]
  1567. for _, t := range l.tables {
  1568. tables = append(tables, t)
  1569. t.IncrRef()
  1570. }
  1571. l.RUnlock()
  1572. for _, t := range tables {
  1573. errChkVerify := t.VerifyChecksum()
  1574. if err := t.DecrRef(); err != nil {
  1575. s.kv.opt.Errorf("unable to decrease reference of table: %s while "+
  1576. "verifying checksum with error: %s", t.Filename(), err)
  1577. }
  1578. if errChkVerify != nil {
  1579. return errChkVerify
  1580. }
  1581. }
  1582. }
  1583. return nil
  1584. }
  1585. // Returns the sorted list of splits for all the levels and tables based
  1586. // on the block offsets.
  1587. func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string {
  1588. splits := make([]string, 0)
  1589. for _, l := range s.levels {
  1590. l.RLock()
  1591. for _, t := range l.tables {
  1592. tableSplits := t.KeySplits(numPerTable, prefix)
  1593. splits = append(splits, tableSplits...)
  1594. }
  1595. l.RUnlock()
  1596. }
  1597. sort.Strings(splits)
  1598. return splits
  1599. }