level_handler.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "fmt"
  19. "sort"
  20. "sync"
  21. "github.com/dgraph-io/badger/v4/table"
  22. "github.com/dgraph-io/badger/v4/y"
  23. )
  24. type levelHandler struct {
  25. // Guards tables, totalSize.
  26. sync.RWMutex
  27. // For level >= 1, tables are sorted by key ranges, which do not overlap.
  28. // For level 0, tables are sorted by time.
  29. // For level 0, newest table are at the back. Compact the oldest one first, which is at the front.
  30. tables []*table.Table
  31. totalSize int64
  32. totalStaleSize int64
  33. // The following are initialized once and const.
  34. level int
  35. strLevel string
  36. db *DB
  37. }
  38. func (s *levelHandler) isLastLevel() bool {
  39. return s.level == s.db.opt.MaxLevels-1
  40. }
  41. func (s *levelHandler) getTotalStaleSize() int64 {
  42. s.RLock()
  43. defer s.RUnlock()
  44. return s.totalStaleSize
  45. }
  46. func (s *levelHandler) getTotalSize() int64 {
  47. s.RLock()
  48. defer s.RUnlock()
  49. return s.totalSize
  50. }
  51. // initTables replaces s.tables with given tables. This is done during loading.
  52. func (s *levelHandler) initTables(tables []*table.Table) {
  53. s.Lock()
  54. defer s.Unlock()
  55. s.tables = tables
  56. s.totalSize = 0
  57. s.totalStaleSize = 0
  58. for _, t := range tables {
  59. s.addSize(t)
  60. }
  61. if s.level == 0 {
  62. // Key range will overlap. Just sort by fileID in ascending order
  63. // because newer tables are at the end of level 0.
  64. sort.Slice(s.tables, func(i, j int) bool {
  65. return s.tables[i].ID() < s.tables[j].ID()
  66. })
  67. } else {
  68. // Sort tables by keys.
  69. sort.Slice(s.tables, func(i, j int) bool {
  70. return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
  71. })
  72. }
  73. }
  74. // deleteTables remove tables idx0, ..., idx1-1.
  75. func (s *levelHandler) deleteTables(toDel []*table.Table) error {
  76. s.Lock() // s.Unlock() below
  77. toDelMap := make(map[uint64]struct{})
  78. for _, t := range toDel {
  79. toDelMap[t.ID()] = struct{}{}
  80. }
  81. // Make a copy as iterators might be keeping a slice of tables.
  82. var newTables []*table.Table
  83. for _, t := range s.tables {
  84. _, found := toDelMap[t.ID()]
  85. if !found {
  86. newTables = append(newTables, t)
  87. continue
  88. }
  89. s.subtractSize(t)
  90. }
  91. s.tables = newTables
  92. s.Unlock() // Unlock s _before_ we DecrRef our tables, which can be slow.
  93. return decrRefs(toDel)
  94. }
  95. // replaceTables will replace tables[left:right] with newTables. Note this EXCLUDES tables[right].
  96. // You must call decr() to delete the old tables _after_ writing the update to the manifest.
  97. func (s *levelHandler) replaceTables(toDel, toAdd []*table.Table) error {
  98. // Need to re-search the range of tables in this level to be replaced as other goroutines might
  99. // be changing it as well. (They can't touch our tables, but if they add/remove other tables,
  100. // the indices get shifted around.)
  101. s.Lock() // We s.Unlock() below.
  102. toDelMap := make(map[uint64]struct{})
  103. for _, t := range toDel {
  104. toDelMap[t.ID()] = struct{}{}
  105. }
  106. var newTables []*table.Table
  107. for _, t := range s.tables {
  108. _, found := toDelMap[t.ID()]
  109. if !found {
  110. newTables = append(newTables, t)
  111. continue
  112. }
  113. s.subtractSize(t)
  114. }
  115. // Increase totalSize first.
  116. for _, t := range toAdd {
  117. s.addSize(t)
  118. t.IncrRef()
  119. newTables = append(newTables, t)
  120. }
  121. // Assign tables.
  122. s.tables = newTables
  123. sort.Slice(s.tables, func(i, j int) bool {
  124. return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
  125. })
  126. s.Unlock() // s.Unlock before we DecrRef tables -- that can be slow.
  127. return decrRefs(toDel)
  128. }
  129. // addTable adds toAdd table to levelHandler. Normally when we add tables to levelHandler, we sort
  130. // tables based on table.Smallest. This is required for correctness of the system. But in case of
  131. // stream writer this can be avoided. We can just add tables to levelHandler's table list
  132. // and after all addTable calls, we can sort table list(check sortTable method).
  133. // NOTE: levelHandler.sortTables() should be called after call addTable calls are done.
  134. func (s *levelHandler) addTable(t *table.Table) {
  135. s.Lock()
  136. defer s.Unlock()
  137. s.addSize(t) // Increase totalSize first.
  138. t.IncrRef()
  139. s.tables = append(s.tables, t)
  140. }
  141. // sortTables sorts tables of levelHandler based on table.Smallest.
  142. // Normally it should be called after all addTable calls.
  143. func (s *levelHandler) sortTables() {
  144. s.Lock()
  145. defer s.Unlock()
  146. sort.Slice(s.tables, func(i, j int) bool {
  147. return y.CompareKeys(s.tables[i].Smallest(), s.tables[j].Smallest()) < 0
  148. })
  149. }
  150. func decrRefs(tables []*table.Table) error {
  151. for _, table := range tables {
  152. if err := table.DecrRef(); err != nil {
  153. return err
  154. }
  155. }
  156. return nil
  157. }
  158. func newLevelHandler(db *DB, level int) *levelHandler {
  159. return &levelHandler{
  160. level: level,
  161. strLevel: fmt.Sprintf("l%d", level),
  162. db: db,
  163. }
  164. }
  165. // tryAddLevel0Table returns true if ok and no stalling.
  166. func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
  167. y.AssertTrue(s.level == 0)
  168. // Need lock as we may be deleting the first table during a level 0 compaction.
  169. s.Lock()
  170. defer s.Unlock()
  171. // Stall (by returning false) if we are above the specified stall setting for L0.
  172. if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
  173. return false
  174. }
  175. s.tables = append(s.tables, t)
  176. t.IncrRef()
  177. s.addSize(t)
  178. return true
  179. }
  180. // This should be called while holding the lock on the level.
  181. func (s *levelHandler) addSize(t *table.Table) {
  182. s.totalSize += t.Size()
  183. s.totalStaleSize += int64(t.StaleDataSize())
  184. }
  185. // This should be called while holding the lock on the level.
  186. func (s *levelHandler) subtractSize(t *table.Table) {
  187. s.totalSize -= t.Size()
  188. s.totalStaleSize -= int64(t.StaleDataSize())
  189. }
  190. func (s *levelHandler) numTables() int {
  191. s.RLock()
  192. defer s.RUnlock()
  193. return len(s.tables)
  194. }
  195. func (s *levelHandler) close() error {
  196. s.RLock()
  197. defer s.RUnlock()
  198. var err error
  199. for _, t := range s.tables {
  200. if closeErr := t.Close(-1); closeErr != nil && err == nil {
  201. err = closeErr
  202. }
  203. }
  204. return y.Wrap(err, "levelHandler.close")
  205. }
  206. // getTableForKey acquires a read-lock to access s.tables. It returns a list of tableHandlers.
  207. func (s *levelHandler) getTableForKey(key []byte) ([]*table.Table, func() error) {
  208. s.RLock()
  209. defer s.RUnlock()
  210. if s.level == 0 {
  211. // For level 0, we need to check every table. Remember to make a copy as s.tables may change
  212. // once we exit this function, and we don't want to lock s.tables while seeking in tables.
  213. // CAUTION: Reverse the tables.
  214. out := make([]*table.Table, 0, len(s.tables))
  215. for i := len(s.tables) - 1; i >= 0; i-- {
  216. out = append(out, s.tables[i])
  217. s.tables[i].IncrRef()
  218. }
  219. return out, func() error {
  220. for _, t := range out {
  221. if err := t.DecrRef(); err != nil {
  222. return err
  223. }
  224. }
  225. return nil
  226. }
  227. }
  228. // For level >= 1, we can do a binary search as key range does not overlap.
  229. idx := sort.Search(len(s.tables), func(i int) bool {
  230. return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
  231. })
  232. if idx >= len(s.tables) {
  233. // Given key is strictly > than every element we have.
  234. return nil, func() error { return nil }
  235. }
  236. tbl := s.tables[idx]
  237. tbl.IncrRef()
  238. return []*table.Table{tbl}, tbl.DecrRef
  239. }
  240. // get returns value for a given key or the key after that. If not found, return nil.
  241. func (s *levelHandler) get(key []byte) (y.ValueStruct, error) {
  242. tables, decr := s.getTableForKey(key)
  243. keyNoTs := y.ParseKey(key)
  244. hash := y.Hash(keyNoTs)
  245. var maxVs y.ValueStruct
  246. for _, th := range tables {
  247. if th.DoesNotHave(hash) {
  248. y.NumLSMBloomHitsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
  249. continue
  250. }
  251. it := th.NewIterator(0)
  252. defer it.Close()
  253. y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1)
  254. it.Seek(key)
  255. if !it.Valid() {
  256. continue
  257. }
  258. if y.SameKey(key, it.Key()) {
  259. if version := y.ParseTs(it.Key()); maxVs.Version < version {
  260. maxVs = it.ValueCopy()
  261. maxVs.Version = version
  262. }
  263. }
  264. }
  265. return maxVs, decr()
  266. }
  267. // appendIterators appends iterators to an array of iterators, for merging.
  268. // Note: This obtains references for the table handlers. Remember to close these iterators.
  269. func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator {
  270. s.RLock()
  271. defer s.RUnlock()
  272. var topt int
  273. if opt.Reverse {
  274. topt = table.REVERSED
  275. }
  276. if s.level == 0 {
  277. // Remember to add in reverse order!
  278. // The newer table at the end of s.tables should be added first as it takes precedence.
  279. // Level 0 tables are not in key sorted order, so we need to consider them one by one.
  280. var out []*table.Table
  281. for _, t := range s.tables {
  282. if opt.pickTable(t) {
  283. out = append(out, t)
  284. }
  285. }
  286. return appendIteratorsReversed(iters, out, topt)
  287. }
  288. tables := opt.pickTables(s.tables)
  289. if len(tables) == 0 {
  290. return iters
  291. }
  292. return append(iters, table.NewConcatIterator(tables, topt))
  293. }
  294. type levelHandlerRLocked struct{}
  295. // overlappingTables returns the tables that intersect with key range. Returns a half-interval.
  296. // This function should already have acquired a read lock, and this is so important the caller must
  297. // pass an empty parameter declaring such.
  298. func (s *levelHandler) overlappingTables(_ levelHandlerRLocked, kr keyRange) (int, int) {
  299. if len(kr.left) == 0 || len(kr.right) == 0 {
  300. return 0, 0
  301. }
  302. left := sort.Search(len(s.tables), func(i int) bool {
  303. return y.CompareKeys(kr.left, s.tables[i].Biggest()) <= 0
  304. })
  305. right := sort.Search(len(s.tables), func(i int) bool {
  306. return y.CompareKeys(kr.right, s.tables[i].Smallest()) < 0
  307. })
  308. return left, right
  309. }