level_handler.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "fmt"
  8. "sort"
  9. "sync"
  10. "github.com/dgraph-io/badger/v4/table"
  11. "github.com/dgraph-io/badger/v4/y"
  12. )
  13. type levelHandler struct {
  14. // Guards tables, totalSize.
  15. sync.RWMutex
  16. // For level >= 1, tables are sorted by key ranges, which do not overlap.
  17. // For level 0, tables are sorted by time.
  18. // For level 0, newest table are at the back. Compact the oldest one first, which is at the front.
  19. tables []*table.Table
  20. totalSize int64
  21. totalStaleSize int64
  22. // The following are initialized once and const.
  23. level int
  24. strLevel string
  25. db *DB
  26. }
  27. func (s *levelHandler) isLastLevel() bool {
  28. return s.level == s.db.opt.MaxLevels-1
  29. }
  30. func (s *levelHandler) getTotalStaleSize() int64 {
  31. s.RLock()
  32. defer s.RUnlock()
  33. return s.totalStaleSize
  34. }
  35. func (s *levelHandler) getTotalSize() int64 {
  36. s.RLock()
  37. defer s.RUnlock()
  38. return s.totalSize
  39. }
  40. // initTables replaces s.tables with given tables. This is done during loading.
  41. func (s *levelHandler) initTables(tables []*table.Table) {
  42. s.Lock()
  43. defer s.Unlock()
  44. s.tables = tables
  45. s.totalSize = 0
  46. s.totalStaleSize = 0
  47. for _, t := range tables {
  48. s.addSize(t)
  49. }
  50. if s.level == 0 {
  51. // Key range will overlap. Just sort by fileID in ascending order
  52. // because newer tables are at the end of level 0.
  53. sort.Slice(s.tables, func(i, j int) bool {
  54. return s.tables[i].ID() < s.tables[j].ID()
  55. })
  56. } else {
  57. // Sort tables by keys.
  58. sort.Slice(s.tables, func(i, j int) bool {
  59. return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
  60. })
  61. }
  62. }
  63. // deleteTables remove tables idx0, ..., idx1-1.
  64. func (s *levelHandler) deleteTables(toDel []*table.Table) error {
  65. s.Lock() // s.Unlock() below
  66. toDelMap := make(map[uint64]struct{})
  67. for _, t := range toDel {
  68. toDelMap[t.ID()] = struct{}{}
  69. }
  70. // Make a copy as iterators might be keeping a slice of tables.
  71. var newTables []*table.Table
  72. for _, t := range s.tables {
  73. _, found := toDelMap[t.ID()]
  74. if !found {
  75. newTables = append(newTables, t)
  76. continue
  77. }
  78. s.subtractSize(t)
  79. }
  80. s.tables = newTables
  81. s.Unlock() // Unlock s _before_ we DecrRef our tables, which can be slow.
  82. return decrRefs(toDel)
  83. }
  84. // replaceTables will replace tables[left:right] with newTables. Note this EXCLUDES tables[right].
  85. // You must call decr() to delete the old tables _after_ writing the update to the manifest.
  86. func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error {
  87. // Need to re-search the range of tables in this level to be replaced as other goroutines might
  88. // be changing it as well. (They can't touch our tables, but if they add/remove other tables,
  89. // the indices get shifted around.)
  90. s.Lock() // We s.Unlock() below.
  91. toDelMap := make(map[uint64]struct{})
  92. for _, t := range toDel {
  93. toDelMap[t.ID()] = struct{}{}
  94. }
  95. var newTables []*table.Table
  96. for _, t := range s.tables {
  97. _, found := toDelMap[t.ID()]
  98. if !found {
  99. newTables = append(newTables, t)
  100. continue
  101. }
  102. s.subtractSize(t)
  103. }
  104. // Increase totalSize first.
  105. for _, t := range toAdd {
  106. s.addSize(t)
  107. t.IncrRef()
  108. newTables = append(newTables, t)
  109. }
  110. // Assign tables.
  111. s.tables = newTables
  112. sort.Slice(s.tables, func(i, j int) bool {
  113. return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
  114. })
  115. s.Unlock() // s.Unlock before we DecrRef tables -- that can be slow.
  116. return decrRefs(toDel)
  117. }
  118. // addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort
  119. // tables based on table.Smallest. This is required for correctness of the system. But in case of
  120. // stream writer this can be avoided. We can just add tables to levelHandler's table list
  121. // and after all addTable calls, we can sort table list(check sortTable method).
  122. // NOTE: levelHandler.sortTables() should be called after call addTable calls are done.
  123. func (s *levelHandler) addTable(t *table.Table) {
  124. s.Lock()
  125. defer s.Unlock()
  126. s.addSize(t) // Increase totalSize first.
  127. t.IncrRef()
  128. s.tables = append(s.tables, t)
  129. }
  130. // sortTables sorts tables of levelHandler based on table.Smallest.
  131. // Normally it should be called after all addTable calls.
  132. func (s *levelHandler) sortTables() {
  133. s.Lock()
  134. defer s.Unlock()
  135. sort.Slice(s.tables, func(i, j int) bool {
  136. return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
  137. })
  138. }
  139. func decrRefs(tables []*table.Table) error {
  140. for _, table := range tables {
  141. if err := table.DecrRef(); err != nil {
  142. return err
  143. }
  144. }
  145. return nil
  146. }
  147. func newLevelHandler(db *DB, level int) *levelHandler {
  148. return &levelHandler{
  149. level: level,
  150. strLevel: fmt.Sprintf("l%d", level),
  151. db: db,
  152. }
  153. }
  154. // tryAddLevel0Table returns true if ok and no stalling.
  155. func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
  156. y.AssertTrue(s.level == 0)
  157. // Need lock as we may be deleting the first table during a level 0 compaction.
  158. s.Lock()
  159. defer s.Unlock()
  160. // Stall (by returning false) if we are above the specified stall setting for L0.
  161. if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
  162. return false
  163. }
  164. s.tables = append(s.tables, t)
  165. t.IncrRef()
  166. s.addSize(t)
  167. return true
  168. }
  169. // This should be called while holding the lock on the level.
  170. func (s *levelHandler) addSize(t *table.Table) {
  171. s.totalSize += t.Size()
  172. s.totalStaleSize += int64(t.StaleDataSize())
  173. }
  174. // This should be called while holding the lock on the level.
  175. func (s *levelHandler) subtractSize(t *table.Table) {
  176. s.totalSize -= t.Size()
  177. s.totalStaleSize -= int64(t.StaleDataSize())
  178. }
  179. func (s *levelHandler) numTables() int {
  180. s.RLock()
  181. defer s.RUnlock()
  182. return len(s.tables)
  183. }
  184. func (s *levelHandler) close() error {
  185. s.RLock()
  186. defer s.RUnlock()
  187. var err error
  188. for _, t := range s.tables {
  189. if closeErr := t.Close(-1); closeErr != nil && err == nil {
  190. err = closeErr
  191. }
  192. }
  193. return y.Wrap(err, "levelHandler.close")
  194. }
  195. // getTableForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers.
  196. func (s *levelHandler) getTableForKey(key []byte) ([]*table.Table, func() error) {
  197. s.RLock()
  198. defer s.RUnlock()
  199. if s.level == 0 {
  200. // For level 0, we need to check every table. Remember to make a copy as s.tables may change
  201. // once we exit this function, and we don't want to lock s.tables while seeking in tables.
  202. // CAUTION: Reverse the tables.
  203. out := make([]*table.Table, 0, len(s.tables))
  204. for i := len(s.tables) - 1; i >= 0; i-- {
  205. out = append(out, s.tables[i])
  206. s.tables[i].IncrRef()
  207. }
  208. return out, func() error {
  209. for _, t := range out {
  210. if err := t.DecrRef(); err != nil {
  211. return err
  212. }
  213. }
  214. return nil
  215. }
  216. }
  217. // For level >= 1, we can do a binary search as key range does not overlap.
  218. idx := sort.Search(len(s.tables), func(i int) bool {
  219. return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
  220. })
  221. if idx >= len(s.tables) {
  222. // Given key is strictly > than every element we have.
  223. return nil, func() error { return nil }
  224. }
  225. tbl := s.tables[idx]
  226. tbl.IncrRef()
  227. return []*table.Table{tbl}, tbl.DecrRef
  228. }
  229. // get returns value for a given key or the key after that. If not found, return nil.
  230. func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
  231. tables, decr := s.getTableForKey(key)
  232. keyNoTs := y.ParseKey(key)
  233. hash := y.Hash(keyNoTs)
  234. var maxVs y.ValueStruct
  235. for _, th := range tables {
  236. if th.DoesNotHave(hash) {
  237. y.NumLSMBloomHitsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
  238. continue
  239. }
  240. it := th.NewIterator(0)
  241. defer it.Close()
  242. y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
  243. it.Seek(key)
  244. if !it.Valid() {
  245. continue
  246. }
  247. if y.SameKey(key, it.Key()) {
  248. if version := y.ParseTs(it.Key()); maxVs.Version < version {
  249. maxVs = it.ValueCopy()
  250. maxVs.Version = version
  251. }
  252. }
  253. }
  254. return maxVs, decr()
  255. }
  256. // appendIterators appends iterators to an array of iterators, for merging.
  257. // Note: This obtains references for the table handlers. Remember to close these iterators.
  258. func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
  259. s.RLock()
  260. defer s.RUnlock()
  261. var topt int
  262. if opt.Reverse {
  263. topt = table.REVERSED
  264. }
  265. if s.level == 0 {
  266. // Remember to add in reverse order!
  267. // The newer table at the end of s.tables should be added first as it takes precedence.
  268. // Level 0 tables are not in key sorted order, so we need to consider them one by one.
  269. var out []*table.Table
  270. for _, t := range s.tables {
  271. if opt.pickTable(t) {
  272. out = append(out, t)
  273. }
  274. }
  275. return appendIteratorsReversed(iters, out, topt)
  276. }
  277. tables := opt.pickTables(s.tables)
  278. if len(tables) == 0 {
  279. return iters
  280. }
  281. return append(iters, table.NewConcatIterator(tables, topt))
  282. }
  283. type levelHandlerRLocked struct{}
  284. // overlappingTables returns the tables that intersect with key range. Returns a half-interval.
  285. // This function should already have acquired a read lock, and this is so important the caller must
  286. // pass an empty parameter declaring such.
  287. func (s *levelHandler) overlappingTables(_ levelHandlerRLocked, kr keyRange) (int, int) {
  288. if len(kr.left) == 0 || len(kr.right) == 0 {
  289. return 0, 0
  290. }
  291. left := sort.Search(len(s.tables), func(i int) bool {
  292. return y.CompareKeys(kr.left, s.tables[i].Biggest()) <= 0
  293. })
  294. right := sort.Search(len(s.tables), func(i int) bool {
  295. return y.CompareKeys(kr.right, s.tables[i].Smallest()) < 0
  296. })
  297. return left, right
  298. }