iterator.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package table
  6. import (
  7. "bytes"
  8. "fmt"
  9. "io"
  10. "sort"
  11. "github.com/dgraph-io/badger/v4/fb"
  12. "github.com/dgraph-io/badger/v4/y"
  13. )
  14. type blockIterator struct {
  15. data []byte
  16. idx int // Idx of the entry inside a block
  17. err error
  18. baseKey []byte
  19. key []byte
  20. val []byte
  21. entryOffsets []uint32
  22. block *Block
  23. tableID uint64
  24. blockID int
  25. // prevOverlap stores the overlap of the previous key with the base key.
  26. // This avoids unnecessary copy of base key when the overlap is same for multiple keys.
  27. prevOverlap uint16
  28. }
  29. func (itr *blockIterator) setBlock(b *Block) {
  30. // Decrement the ref for the old block. If the old block was compressed, we
  31. // might be able to reuse it.
  32. itr.block.decrRef()
  33. itr.block = b
  34. itr.err = nil
  35. itr.idx = 0
  36. itr.baseKey = itr.baseKey[:0]
  37. itr.prevOverlap = 0
  38. itr.key = itr.key[:0]
  39. itr.val = itr.val[:0]
  40. // Drop the index from the block. We don't need it anymore.
  41. itr.data = b.data[:b.entriesIndexStart]
  42. itr.entryOffsets = b.entryOffsets
  43. }
  44. // setIdx sets the iterator to the entry at index i and set it's key and value.
  45. func (itr *blockIterator) setIdx(i int) {
  46. itr.idx = i
  47. if i >= len(itr.entryOffsets) || i < 0 {
  48. itr.err = io.EOF
  49. return
  50. }
  51. itr.err = nil
  52. startOffset := int(itr.entryOffsets[i])
  53. // Set base key.
  54. if len(itr.baseKey) == 0 {
  55. var baseHeader header
  56. baseHeader.Decode(itr.data)
  57. itr.baseKey = itr.data[headerSize : headerSize+baseHeader.diff]
  58. }
  59. var endOffset int
  60. // idx points to the last entry in the block.
  61. if itr.idx+1 == len(itr.entryOffsets) {
  62. endOffset = len(itr.data)
  63. } else {
  64. // idx point to some entry other than the last one in the block.
  65. // EndOffset of the current entry is the start offset of the next entry.
  66. endOffset = int(itr.entryOffsets[itr.idx+1])
  67. }
  68. defer func() {
  69. if r := recover(); r != nil {
  70. var debugBuf bytes.Buffer
  71. fmt.Fprintf(&debugBuf, "==== Recovered====\n")
  72. fmt.Fprintf(&debugBuf, "Table ID: %d\nBlock ID: %d\nEntry Idx: %d\nData len: %d\n"+
  73. "StartOffset: %d\nEndOffset: %d\nEntryOffsets len: %d\nEntryOffsets: %v\n",
  74. itr.tableID, itr.blockID, itr.idx, len(itr.data), startOffset, endOffset,
  75. len(itr.entryOffsets), itr.entryOffsets)
  76. panic(debugBuf.String())
  77. }
  78. }()
  79. entryData := itr.data[startOffset:endOffset]
  80. var h header
  81. h.Decode(entryData)
  82. // Header contains the length of key overlap and difference compared to the base key. If the key
  83. // before this one had the same or better key overlap, we can avoid copying that part into
  84. // itr.key. But, if the overlap was lesser, we could copy over just that portion.
  85. if h.overlap > itr.prevOverlap {
  86. itr.key = append(itr.key[:itr.prevOverlap], itr.baseKey[itr.prevOverlap:h.overlap]...)
  87. }
  88. itr.prevOverlap = h.overlap
  89. valueOff := headerSize + h.diff
  90. diffKey := entryData[headerSize:valueOff]
  91. itr.key = append(itr.key[:h.overlap], diffKey...)
  92. itr.val = entryData[valueOff:]
  93. }
  94. func (itr *blockIterator) Valid() bool {
  95. return itr != nil && itr.err == nil
  96. }
  97. func (itr *blockIterator) Error() error {
  98. return itr.err
  99. }
  100. func (itr *blockIterator) Close() {
  101. itr.block.decrRef()
  102. }
  103. var (
  104. origin = 0
  105. current = 1
  106. )
  107. // seek brings us to the first block element that is >= input key.
  108. func (itr *blockIterator) seek(key []byte, whence int) {
  109. itr.err = nil
  110. startIndex := 0 // This tells from which index we should start binary search.
  111. switch whence {
  112. case origin:
  113. // We don't need to do anything. startIndex is already at 0
  114. case current:
  115. startIndex = itr.idx
  116. }
  117. foundEntryIdx := sort.Search(len(itr.entryOffsets), func(idx int) bool {
  118. // If idx is less than start index then just return false.
  119. if idx < startIndex {
  120. return false
  121. }
  122. itr.setIdx(idx)
  123. return y.CompareKeys(itr.key, key) >= 0
  124. })
  125. itr.setIdx(foundEntryIdx)
  126. }
  127. // seekToFirst brings us to the first element.
  128. func (itr *blockIterator) seekToFirst() {
  129. itr.setIdx(0)
  130. }
  131. // seekToLast brings us to the last element.
  132. func (itr *blockIterator) seekToLast() {
  133. itr.setIdx(len(itr.entryOffsets) - 1)
  134. }
  135. func (itr *blockIterator) next() {
  136. itr.setIdx(itr.idx + 1)
  137. }
  138. func (itr *blockIterator) prev() {
  139. itr.setIdx(itr.idx - 1)
  140. }
  141. // Iterator is an iterator for a Table.
  142. type Iterator struct {
  143. t *Table
  144. bpos int
  145. bi blockIterator
  146. err error
  147. // Internally, Iterator is bidirectional. However, we only expose the
  148. // unidirectional functionality for now.
  149. opt int // Valid options are REVERSED and NOCACHE.
  150. }
  151. // NewIterator returns a new iterator of the Table
  152. func (t *Table) NewIterator(opt int) *Iterator {
  153. t.IncrRef() // Important.
  154. ti := &Iterator{t: t, opt: opt}
  155. return ti
  156. }
  157. // Close closes the iterator (and it must be called).
  158. func (itr *Iterator) Close() error {
  159. itr.bi.Close()
  160. return itr.t.DecrRef()
  161. }
  162. func (itr *Iterator) reset() {
  163. itr.bpos = 0
  164. itr.err = nil
  165. }
  166. // Valid follows the y.Iterator interface
  167. func (itr *Iterator) Valid() bool {
  168. return itr.err == nil
  169. }
  170. func (itr *Iterator) useCache() bool {
  171. return itr.opt&NOCACHE == 0
  172. }
  173. func (itr *Iterator) seekToFirst() {
  174. numBlocks := itr.t.offsetsLength()
  175. if numBlocks == 0 {
  176. itr.err = io.EOF
  177. return
  178. }
  179. itr.bpos = 0
  180. block, err := itr.t.block(itr.bpos, itr.useCache())
  181. if err != nil {
  182. itr.err = err
  183. return
  184. }
  185. itr.bi.tableID = itr.t.id
  186. itr.bi.blockID = itr.bpos
  187. itr.bi.setBlock(block)
  188. itr.bi.seekToFirst()
  189. itr.err = itr.bi.Error()
  190. }
  191. func (itr *Iterator) seekToLast() {
  192. numBlocks := itr.t.offsetsLength()
  193. if numBlocks == 0 {
  194. itr.err = io.EOF
  195. return
  196. }
  197. itr.bpos = numBlocks - 1
  198. block, err := itr.t.block(itr.bpos, itr.useCache())
  199. if err != nil {
  200. itr.err = err
  201. return
  202. }
  203. itr.bi.tableID = itr.t.id
  204. itr.bi.blockID = itr.bpos
  205. itr.bi.setBlock(block)
  206. itr.bi.seekToLast()
  207. itr.err = itr.bi.Error()
  208. }
  209. func (itr *Iterator) seekHelper(blockIdx int, key []byte) {
  210. itr.bpos = blockIdx
  211. block, err := itr.t.block(blockIdx, itr.useCache())
  212. if err != nil {
  213. itr.err = err
  214. return
  215. }
  216. itr.bi.tableID = itr.t.id
  217. itr.bi.blockID = itr.bpos
  218. itr.bi.setBlock(block)
  219. itr.bi.seek(key, origin)
  220. itr.err = itr.bi.Error()
  221. }
  222. // seekFrom brings us to a key that is >= input key.
  223. func (itr *Iterator) seekFrom(key []byte, whence int) {
  224. itr.err = nil
  225. switch whence {
  226. case origin:
  227. itr.reset()
  228. case current:
  229. }
  230. var ko fb.BlockOffset
  231. idx := sort.Search(itr.t.offsetsLength(), func(idx int) bool {
  232. // Offsets should never return false since we're iterating within the OffsetsLength.
  233. y.AssertTrue(itr.t.offsets(&ko, idx))
  234. return y.CompareKeys(ko.KeyBytes(), key) > 0
  235. })
  236. if idx == 0 {
  237. // The smallest key in our table is already strictly > key. We can return that.
  238. // This is like a SeekToFirst.
  239. itr.seekHelper(0, key)
  240. return
  241. }
  242. // block[idx].smallest is > key.
  243. // Since idx>0, we know block[idx-1].smallest is <= key.
  244. // There are two cases.
  245. // 1) Everything in block[idx-1] is strictly < key. In this case, we should go to the first
  246. // element of block[idx].
  247. // 2) Some element in block[idx-1] is >= key. We should go to that element.
  248. itr.seekHelper(idx-1, key)
  249. if itr.err == io.EOF {
  250. // Case 1. Need to visit block[idx].
  251. if idx == itr.t.offsetsLength() {
  252. // If idx == len(itr.t.blockIndex), then input key is greater than ANY element of table.
  253. // There's nothing we can do. Valid() should return false as we seek to end of table.
  254. return
  255. }
  256. // Since block[idx].smallest is > key. This is essentially a block[idx].SeekToFirst.
  257. itr.seekHelper(idx, key)
  258. }
  259. // Case 2: No need to do anything. We already did the seek in block[idx-1].
  260. }
  261. // seek will reset iterator and seek to >= key.
  262. func (itr *Iterator) seek(key []byte) {
  263. itr.seekFrom(key, origin)
  264. }
  265. // seekForPrev will reset iterator and seek to <= key.
  266. func (itr *Iterator) seekForPrev(key []byte) {
  267. // TODO: Optimize this. We shouldn't have to take a Prev step.
  268. itr.seekFrom(key, origin)
  269. if !bytes.Equal(itr.Key(), key) {
  270. itr.prev()
  271. }
  272. }
  273. func (itr *Iterator) next() {
  274. itr.err = nil
  275. if itr.bpos >= itr.t.offsetsLength() {
  276. itr.err = io.EOF
  277. return
  278. }
  279. if len(itr.bi.data) == 0 {
  280. block, err := itr.t.block(itr.bpos, itr.useCache())
  281. if err != nil {
  282. itr.err = err
  283. return
  284. }
  285. itr.bi.tableID = itr.t.id
  286. itr.bi.blockID = itr.bpos
  287. itr.bi.setBlock(block)
  288. itr.bi.seekToFirst()
  289. itr.err = itr.bi.Error()
  290. return
  291. }
  292. itr.bi.next()
  293. if !itr.bi.Valid() {
  294. itr.bpos++
  295. itr.bi.data = nil
  296. itr.next()
  297. return
  298. }
  299. }
  300. func (itr *Iterator) prev() {
  301. itr.err = nil
  302. if itr.bpos < 0 {
  303. itr.err = io.EOF
  304. return
  305. }
  306. if len(itr.bi.data) == 0 {
  307. block, err := itr.t.block(itr.bpos, itr.useCache())
  308. if err != nil {
  309. itr.err = err
  310. return
  311. }
  312. itr.bi.tableID = itr.t.id
  313. itr.bi.blockID = itr.bpos
  314. itr.bi.setBlock(block)
  315. itr.bi.seekToLast()
  316. itr.err = itr.bi.Error()
  317. return
  318. }
  319. itr.bi.prev()
  320. if !itr.bi.Valid() {
  321. itr.bpos--
  322. itr.bi.data = nil
  323. itr.prev()
  324. return
  325. }
  326. }
  327. // Key follows the y.Iterator interface.
  328. // Returns the key with timestamp.
  329. func (itr *Iterator) Key() []byte {
  330. return itr.bi.key
  331. }
  332. // Value follows the y.Iterator interface
  333. func (itr *Iterator) Value() (ret y.ValueStruct) {
  334. ret.Decode(itr.bi.val)
  335. return
  336. }
  337. // ValueCopy copies the current value and returns it as decoded
  338. // ValueStruct.
  339. func (itr *Iterator) ValueCopy() (ret y.ValueStruct) {
  340. dst := y.Copy(itr.bi.val)
  341. ret.Decode(dst)
  342. return
  343. }
  344. // Next follows the y.Iterator interface
  345. func (itr *Iterator) Next() {
  346. if itr.opt&REVERSED == 0 {
  347. itr.next()
  348. } else {
  349. itr.prev()
  350. }
  351. }
  352. // Rewind follows the y.Iterator interface
  353. func (itr *Iterator) Rewind() {
  354. if itr.opt&REVERSED == 0 {
  355. itr.seekToFirst()
  356. } else {
  357. itr.seekToLast()
  358. }
  359. }
  360. // Seek follows the y.Iterator interface
  361. func (itr *Iterator) Seek(key []byte) {
  362. if itr.opt&REVERSED == 0 {
  363. itr.seek(key)
  364. } else {
  365. itr.seekForPrev(key)
  366. }
  367. }
  368. var (
  369. REVERSED int = 2
  370. NOCACHE int = 4
  371. )
  372. // ConcatIterator concatenates the sequences defined by several iterators. (It only works with
  373. // TableIterators, probably just because it's faster to not be so generic.)
  374. type ConcatIterator struct {
  375. idx int // Which iterator is active now.
  376. cur *Iterator
  377. iters []*Iterator // Corresponds to tables.
  378. tables []*Table // Disregarding reversed, this is in ascending order.
  379. options int // Valid options are REVERSED and NOCACHE.
  380. }
  381. // NewConcatIterator creates a new concatenated iterator
  382. func NewConcatIterator(tbls []*Table, opt int) *ConcatIterator {
  383. iters := make([]*Iterator, len(tbls))
  384. for i := 0; i < len(tbls); i++ {
  385. // Increment the reference count. Since, we're not creating the iterator right now.
  386. // Here, We'll hold the reference of the tables, till the lifecycle of the iterator.
  387. tbls[i].IncrRef()
  388. // Save cycles by not initializing the iterators until needed.
  389. // iters[i] = tbls[i].NewIterator(reversed)
  390. }
  391. return &ConcatIterator{
  392. options: opt,
  393. iters: iters,
  394. tables: tbls,
  395. idx: -1, // Not really necessary because s.it.Valid()=false, but good to have.
  396. }
  397. }
  398. func (s *ConcatIterator) setIdx(idx int) {
  399. s.idx = idx
  400. if idx < 0 || idx >= len(s.iters) {
  401. s.cur = nil
  402. return
  403. }
  404. if s.iters[idx] == nil {
  405. s.iters[idx] = s.tables[idx].NewIterator(s.options)
  406. }
  407. s.cur = s.iters[s.idx]
  408. }
  409. // Rewind implements y.Interface
  410. func (s *ConcatIterator) Rewind() {
  411. if len(s.iters) == 0 {
  412. return
  413. }
  414. if s.options&REVERSED == 0 {
  415. s.setIdx(0)
  416. } else {
  417. s.setIdx(len(s.iters) - 1)
  418. }
  419. s.cur.Rewind()
  420. }
  421. // Valid implements y.Interface
  422. func (s *ConcatIterator) Valid() bool {
  423. return s.cur != nil && s.cur.Valid()
  424. }
  425. // Key implements y.Interface
  426. func (s *ConcatIterator) Key() []byte {
  427. return s.cur.Key()
  428. }
  429. // Value implements y.Interface
  430. func (s *ConcatIterator) Value() y.ValueStruct {
  431. return s.cur.Value()
  432. }
  433. // Seek brings us to element >= key if reversed is false. Otherwise, <= key.
  434. func (s *ConcatIterator) Seek(key []byte) {
  435. var idx int
  436. if s.options&REVERSED == 0 {
  437. idx = sort.Search(len(s.tables), func(i int) bool {
  438. return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
  439. })
  440. } else {
  441. n := len(s.tables)
  442. idx = n - 1 - sort.Search(n, func(i int) bool {
  443. return y.CompareKeys(s.tables[n-1-i].Smallest(), key) <= 0
  444. })
  445. }
  446. if idx >= len(s.tables) || idx < 0 {
  447. s.setIdx(-1)
  448. return
  449. }
  450. // For reversed=false, we know s.tables[i-1].Biggest() < key. Thus, the
  451. // previous table cannot possibly contain key.
  452. s.setIdx(idx)
  453. s.cur.Seek(key)
  454. }
  455. // Next advances our concat iterator.
  456. func (s *ConcatIterator) Next() {
  457. s.cur.Next()
  458. if s.cur.Valid() {
  459. // Nothing to do. Just stay with the current table.
  460. return
  461. }
  462. for { // In case there are empty tables.
  463. if s.options&REVERSED == 0 {
  464. s.setIdx(s.idx + 1)
  465. } else {
  466. s.setIdx(s.idx - 1)
  467. }
  468. if s.cur == nil {
  469. // End of list. Valid will become false.
  470. return
  471. }
  472. s.cur.Rewind()
  473. if s.cur.Valid() {
  474. break
  475. }
  476. }
  477. }
  478. // Close implements y.Interface.
  479. func (s *ConcatIterator) Close() error {
  480. for _, t := range s.tables {
  481. // DeReference the tables while closing the iterator.
  482. if err := t.DecrRef(); err != nil {
  483. return err
  484. }
  485. }
  486. for _, it := range s.iters {
  487. if it == nil {
  488. continue
  489. }
  490. if err := it.Close(); err != nil {
  491. return y.Wrap(err, "ConcatIterator")
  492. }
  493. }
  494. return nil
  495. }