iterator.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package table
  17. import (
  18. "bytes"
  19. "fmt"
  20. "io"
  21. "sort"
  22. "github.com/dgraph-io/badger/v4/fb"
  23. "github.com/dgraph-io/badger/v4/y"
  24. )
  25. type blockIterator struct {
  26. data []byte
  27. idx int // Idx of the entry inside a block
  28. err error
  29. baseKey []byte
  30. key []byte
  31. val []byte
  32. entryOffsets []uint32
  33. block *Block
  34. tableID uint64
  35. blockID int
  36. // prevOverlap stores the overlap of the previous key with the base key.
  37. // This avoids unnecessary copy of base key when the overlap is same for multiple keys.
  38. prevOverlap uint16
  39. }
  40. func (itr *blockIterator) setBlock(b *Block) {
  41. // Decrement the ref for the old block. If the old block was compressed, we
  42. // might be able to reuse it.
  43. itr.block.decrRef()
  44. itr.block = b
  45. itr.err = nil
  46. itr.idx = 0
  47. itr.baseKey = itr.baseKey[:0]
  48. itr.prevOverlap = 0
  49. itr.key = itr.key[:0]
  50. itr.val = itr.val[:0]
  51. // Drop the index from the block. We don't need it anymore.
  52. itr.data = b.data[:b.entriesIndexStart]
  53. itr.entryOffsets = b.entryOffsets
  54. }
  55. // setIdx sets the iterator to the entry at index i and set it's key and value.
  56. func (itr *blockIterator) setIdx(i int) {
  57. itr.idx = i
  58. if i >= len(itr.entryOffsets) || i < 0 {
  59. itr.err = io.EOF
  60. return
  61. }
  62. itr.err = nil
  63. startOffset := int(itr.entryOffsets[i])
  64. // Set base key.
  65. if len(itr.baseKey) == 0 {
  66. var baseHeader header
  67. baseHeader.Decode(itr.data)
  68. itr.baseKey = itr.data[headerSize : headerSize+baseHeader.diff]
  69. }
  70. var endOffset int
  71. // idx points to the last entry in the block.
  72. if itr.idx+1 == len(itr.entryOffsets) {
  73. endOffset = len(itr.data)
  74. } else {
  75. // idx point to some entry other than the last one in the block.
  76. // EndOffset of the current entry is the start offset of the next entry.
  77. endOffset = int(itr.entryOffsets[itr.idx+1])
  78. }
  79. defer func() {
  80. if r := recover(); r != nil {
  81. var debugBuf bytes.Buffer
  82. fmt.Fprintf(&debugBuf, "==== Recovered====\n")
  83. fmt.Fprintf(&debugBuf, "Table ID: %d\nBlock ID: %d\nEntry Idx: %d\nData len: %d\n"+
  84. "StartOffset: %d\nEndOffset: %d\nEntryOffsets len: %d\nEntryOffsets: %v\n",
  85. itr.tableID, itr.blockID, itr.idx, len(itr.data), startOffset, endOffset,
  86. len(itr.entryOffsets), itr.entryOffsets)
  87. panic(debugBuf.String())
  88. }
  89. }()
  90. entryData := itr.data[startOffset:endOffset]
  91. var h header
  92. h.Decode(entryData)
  93. // Header contains the length of key overlap and difference compared to the base key. If the key
  94. // before this one had the same or better key overlap, we can avoid copying that part into
  95. // itr.key. But, if the overlap was lesser, we could copy over just that portion.
  96. if h.overlap > itr.prevOverlap {
  97. itr.key = append(itr.key[:itr.prevOverlap], itr.baseKey[itr.prevOverlap:h.overlap]...)
  98. }
  99. itr.prevOverlap = h.overlap
  100. valueOff := headerSize + h.diff
  101. diffKey := entryData[headerSize:valueOff]
  102. itr.key = append(itr.key[:h.overlap], diffKey...)
  103. itr.val = entryData[valueOff:]
  104. }
  105. func (itr *blockIterator) Valid() bool {
  106. return itr != nil && itr.err == nil
  107. }
  108. func (itr *blockIterator) Error() error {
  109. return itr.err
  110. }
  111. func (itr *blockIterator) Close() {
  112. itr.block.decrRef()
  113. }
  114. var (
  115. origin = 0
  116. current = 1
  117. )
  118. // seek brings us to the first block element that is >= input key.
  119. func (itr *blockIterator) seek(key []byte, whence int) {
  120. itr.err = nil
  121. startIndex := 0 // This tells from which index we should start binary search.
  122. switch whence {
  123. case origin:
  124. // We don't need to do anything. startIndex is already at 0
  125. case current:
  126. startIndex = itr.idx
  127. }
  128. foundEntryIdx := sort.Search(len(itr.entryOffsets), func(idx int) bool {
  129. // If idx is less than start index then just return false.
  130. if idx < startIndex {
  131. return false
  132. }
  133. itr.setIdx(idx)
  134. return y.CompareKeys(itr.key, key) >= 0
  135. })
  136. itr.setIdx(foundEntryIdx)
  137. }
  138. // seekToFirst brings us to the first element.
  139. func (itr *blockIterator) seekToFirst() {
  140. itr.setIdx(0)
  141. }
  142. // seekToLast brings us to the last element.
  143. func (itr *blockIterator) seekToLast() {
  144. itr.setIdx(len(itr.entryOffsets) - 1)
  145. }
  146. func (itr *blockIterator) next() {
  147. itr.setIdx(itr.idx + 1)
  148. }
  149. func (itr *blockIterator) prev() {
  150. itr.setIdx(itr.idx - 1)
  151. }
  152. // Iterator is an iterator for a Table.
  153. type Iterator struct {
  154. t *Table
  155. bpos int
  156. bi blockIterator
  157. err error
  158. // Internally, Iterator is bidirectional. However, we only expose the
  159. // unidirectional functionality for now.
  160. opt int // Valid options are REVERSED and NOCACHE.
  161. }
  162. // NewIterator returns a new iterator of the Table
  163. func (t *Table) NewIterator(opt int) *Iterator {
  164. t.IncrRef() // Important.
  165. ti := &Iterator{t: t, opt: opt}
  166. return ti
  167. }
  168. // Close closes the iterator (and it must be called).
  169. func (itr *Iterator) Close() error {
  170. itr.bi.Close()
  171. return itr.t.DecrRef()
  172. }
  173. func (itr *Iterator) reset() {
  174. itr.bpos = 0
  175. itr.err = nil
  176. }
  177. // Valid follows the y.Iterator interface
  178. func (itr *Iterator) Valid() bool {
  179. return itr.err == nil
  180. }
  181. func (itr *Iterator) useCache() bool {
  182. return itr.opt&NOCACHE == 0
  183. }
  184. func (itr *Iterator) seekToFirst() {
  185. numBlocks := itr.t.offsetsLength()
  186. if numBlocks == 0 {
  187. itr.err = io.EOF
  188. return
  189. }
  190. itr.bpos = 0
  191. block, err := itr.t.block(itr.bpos, itr.useCache())
  192. if err != nil {
  193. itr.err = err
  194. return
  195. }
  196. itr.bi.tableID = itr.t.id
  197. itr.bi.blockID = itr.bpos
  198. itr.bi.setBlock(block)
  199. itr.bi.seekToFirst()
  200. itr.err = itr.bi.Error()
  201. }
  202. func (itr *Iterator) seekToLast() {
  203. numBlocks := itr.t.offsetsLength()
  204. if numBlocks == 0 {
  205. itr.err = io.EOF
  206. return
  207. }
  208. itr.bpos = numBlocks - 1
  209. block, err := itr.t.block(itr.bpos, itr.useCache())
  210. if err != nil {
  211. itr.err = err
  212. return
  213. }
  214. itr.bi.tableID = itr.t.id
  215. itr.bi.blockID = itr.bpos
  216. itr.bi.setBlock(block)
  217. itr.bi.seekToLast()
  218. itr.err = itr.bi.Error()
  219. }
  220. func (itr *Iterator) seekHelper(blockIdx int, key []byte) {
  221. itr.bpos = blockIdx
  222. block, err := itr.t.block(blockIdx, itr.useCache())
  223. if err != nil {
  224. itr.err = err
  225. return
  226. }
  227. itr.bi.tableID = itr.t.id
  228. itr.bi.blockID = itr.bpos
  229. itr.bi.setBlock(block)
  230. itr.bi.seek(key, origin)
  231. itr.err = itr.bi.Error()
  232. }
  233. // seekFrom brings us to a key that is >= input key.
  234. func (itr *Iterator) seekFrom(key []byte, whence int) {
  235. itr.err = nil
  236. switch whence {
  237. case origin:
  238. itr.reset()
  239. case current:
  240. }
  241. var ko fb.BlockOffset
  242. idx := sort.Search(itr.t.offsetsLength(), func(idx int) bool {
  243. // Offsets should never return false since we're iterating within the OffsetsLength.
  244. y.AssertTrue(itr.t.offsets(&ko, idx))
  245. return y.CompareKeys(ko.KeyBytes(), key) > 0
  246. })
  247. if idx == 0 {
  248. // The smallest key in our table is already strictly > key. We can return that.
  249. // This is like a SeekToFirst.
  250. itr.seekHelper(0, key)
  251. return
  252. }
  253. // block[idx].smallest is > key.
  254. // Since idx>0, we know block[idx-1].smallest is <= key.
  255. // There are two cases.
  256. // 1) Everything in block[idx-1] is strictly < key. In this case, we should go to the first
  257. // element of block[idx].
  258. // 2) Some element in block[idx-1] is >= key. We should go to that element.
  259. itr.seekHelper(idx-1, key)
  260. if itr.err == io.EOF {
  261. // Case 1. Need to visit block[idx].
  262. if idx == itr.t.offsetsLength() {
  263. // If idx == len(itr.t.blockIndex), then input key is greater than ANY element of table.
  264. // There's nothing we can do. Valid() should return false as we seek to end of table.
  265. return
  266. }
  267. // Since block[idx].smallest is > key. This is essentially a block[idx].SeekToFirst.
  268. itr.seekHelper(idx, key)
  269. }
  270. // Case 2: No need to do anything. We already did the seek in block[idx-1].
  271. }
  272. // seek will reset iterator and seek to >= key.
  273. func (itr *Iterator) seek(key []byte) {
  274. itr.seekFrom(key, origin)
  275. }
  276. // seekForPrev will reset iterator and seek to <= key.
  277. func (itr *Iterator) seekForPrev(key []byte) {
  278. // TODO: Optimize this. We shouldn't have to take a Prev step.
  279. itr.seekFrom(key, origin)
  280. if !bytes.Equal(itr.Key(), key) {
  281. itr.prev()
  282. }
  283. }
  284. func (itr *Iterator) next() {
  285. itr.err = nil
  286. if itr.bpos >= itr.t.offsetsLength() {
  287. itr.err = io.EOF
  288. return
  289. }
  290. if len(itr.bi.data) == 0 {
  291. block, err := itr.t.block(itr.bpos, itr.useCache())
  292. if err != nil {
  293. itr.err = err
  294. return
  295. }
  296. itr.bi.tableID = itr.t.id
  297. itr.bi.blockID = itr.bpos
  298. itr.bi.setBlock(block)
  299. itr.bi.seekToFirst()
  300. itr.err = itr.bi.Error()
  301. return
  302. }
  303. itr.bi.next()
  304. if !itr.bi.Valid() {
  305. itr.bpos++
  306. itr.bi.data = nil
  307. itr.next()
  308. return
  309. }
  310. }
  311. func (itr *Iterator) prev() {
  312. itr.err = nil
  313. if itr.bpos < 0 {
  314. itr.err = io.EOF
  315. return
  316. }
  317. if len(itr.bi.data) == 0 {
  318. block, err := itr.t.block(itr.bpos, itr.useCache())
  319. if err != nil {
  320. itr.err = err
  321. return
  322. }
  323. itr.bi.tableID = itr.t.id
  324. itr.bi.blockID = itr.bpos
  325. itr.bi.setBlock(block)
  326. itr.bi.seekToLast()
  327. itr.err = itr.bi.Error()
  328. return
  329. }
  330. itr.bi.prev()
  331. if !itr.bi.Valid() {
  332. itr.bpos--
  333. itr.bi.data = nil
  334. itr.prev()
  335. return
  336. }
  337. }
  338. // Key follows the y.Iterator interface.
  339. // Returns the key with timestamp.
  340. func (itr *Iterator) Key() []byte {
  341. return itr.bi.key
  342. }
  343. // Value follows the y.Iterator interface
  344. func (itr *Iterator) Value() (ret y.ValueStruct) {
  345. ret.Decode(itr.bi.val)
  346. return
  347. }
  348. // ValueCopy copies the current value and returns it as decoded
  349. // ValueStruct.
  350. func (itr *Iterator) ValueCopy() (ret y.ValueStruct) {
  351. dst := y.Copy(itr.bi.val)
  352. ret.Decode(dst)
  353. return
  354. }
  355. // Next follows the y.Iterator interface
  356. func (itr *Iterator) Next() {
  357. if itr.opt&REVERSED == 0 {
  358. itr.next()
  359. } else {
  360. itr.prev()
  361. }
  362. }
  363. // Rewind follows the y.Iterator interface
  364. func (itr *Iterator) Rewind() {
  365. if itr.opt&REVERSED == 0 {
  366. itr.seekToFirst()
  367. } else {
  368. itr.seekToLast()
  369. }
  370. }
  371. // Seek follows the y.Iterator interface
  372. func (itr *Iterator) Seek(key []byte) {
  373. if itr.opt&REVERSED == 0 {
  374. itr.seek(key)
  375. } else {
  376. itr.seekForPrev(key)
  377. }
  378. }
  379. var (
  380. REVERSED int = 2
  381. NOCACHE int = 4
  382. )
  383. // ConcatIterator concatenates the sequences defined by several iterators. (It only works with
  384. // TableIterators, probably just because it's faster to not be so generic.)
  385. type ConcatIterator struct {
  386. idx int // Which iterator is active now.
  387. cur *Iterator
  388. iters []*Iterator // Corresponds to tables.
  389. tables []*Table // Disregarding reversed, this is in ascending order.
  390. options int // Valid options are REVERSED and NOCACHE.
  391. }
  392. // NewConcatIterator creates a new concatenated iterator
  393. func NewConcatIterator(tbls []*Table, opt int) *ConcatIterator {
  394. iters := make([]*Iterator, len(tbls))
  395. for i := 0; i < len(tbls); i++ {
  396. // Increment the reference count. Since, we're not creating the iterator right now.
  397. // Here, We'll hold the reference of the tables, till the lifecycle of the iterator.
  398. tbls[i].IncrRef()
  399. // Save cycles by not initializing the iterators until needed.
  400. // iters[i] = tbls[i].NewIterator(reversed)
  401. }
  402. return &ConcatIterator{
  403. options: opt,
  404. iters: iters,
  405. tables: tbls,
  406. idx: -1, // Not really necessary because s.it.Valid()=false, but good to have.
  407. }
  408. }
  409. func (s *ConcatIterator) setIdx(idx int) {
  410. s.idx = idx
  411. if idx < 0 || idx >= len(s.iters) {
  412. s.cur = nil
  413. return
  414. }
  415. if s.iters[idx] == nil {
  416. s.iters[idx] = s.tables[idx].NewIterator(s.options)
  417. }
  418. s.cur = s.iters[s.idx]
  419. }
  420. // Rewind implements y.Interface
  421. func (s *ConcatIterator) Rewind() {
  422. if len(s.iters) == 0 {
  423. return
  424. }
  425. if s.options&REVERSED == 0 {
  426. s.setIdx(0)
  427. } else {
  428. s.setIdx(len(s.iters) - 1)
  429. }
  430. s.cur.Rewind()
  431. }
  432. // Valid implements y.Interface
  433. func (s *ConcatIterator) Valid() bool {
  434. return s.cur != nil && s.cur.Valid()
  435. }
  436. // Key implements y.Interface
  437. func (s *ConcatIterator) Key() []byte {
  438. return s.cur.Key()
  439. }
  440. // Value implements y.Interface
  441. func (s *ConcatIterator) Value() y.ValueStruct {
  442. return s.cur.Value()
  443. }
  444. // Seek brings us to element >= key if reversed is false. Otherwise, <= key.
  445. func (s *ConcatIterator) Seek(key []byte) {
  446. var idx int
  447. if s.options&REVERSED == 0 {
  448. idx = sort.Search(len(s.tables), func(i int) bool {
  449. return y.CompareKeys(s.tables[i].Biggest(), key) >= 0
  450. })
  451. } else {
  452. n := len(s.tables)
  453. idx = n - 1 - sort.Search(n, func(i int) bool {
  454. return y.CompareKeys(s.tables[n-1-i].Smallest(), key) <= 0
  455. })
  456. }
  457. if idx >= len(s.tables) || idx < 0 {
  458. s.setIdx(-1)
  459. return
  460. }
  461. // For reversed=false, we know s.tables[i-1].Biggest() < key. Thus, the
  462. // previous table cannot possibly contain key.
  463. s.setIdx(idx)
  464. s.cur.Seek(key)
  465. }
  466. // Next advances our concat iterator.
  467. func (s *ConcatIterator) Next() {
  468. s.cur.Next()
  469. if s.cur.Valid() {
  470. // Nothing to do. Just stay with the current table.
  471. return
  472. }
  473. for { // In case there are empty tables.
  474. if s.options&REVERSED == 0 {
  475. s.setIdx(s.idx + 1)
  476. } else {
  477. s.setIdx(s.idx - 1)
  478. }
  479. if s.cur == nil {
  480. // End of list. Valid will become false.
  481. return
  482. }
  483. s.cur.Rewind()
  484. if s.cur.Valid() {
  485. break
  486. }
  487. }
  488. }
  489. // Close implements y.Interface.
  490. func (s *ConcatIterator) Close() error {
  491. for _, t := range s.tables {
  492. // DeReference the tables while closing the iterator.
  493. if err := t.DecrRef(); err != nil {
  494. return err
  495. }
  496. }
  497. for _, it := range s.iters {
  498. if it == nil {
  499. continue
  500. }
  501. if err := it.Close(); err != nil {
  502. return y.Wrap(err, "ConcatIterator")
  503. }
  504. }
  505. return nil
  506. }