iterator.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "fmt"
  9. "hash/crc32"
  10. "math"
  11. "sort"
  12. "sync"
  13. "time"
  14. "github.com/dgraph-io/badger/v4/table"
  15. "github.com/dgraph-io/badger/v4/y"
  16. "github.com/dgraph-io/ristretto/v2/z"
  17. )
  18. type prefetchStatus uint8
  19. const (
  20. prefetched prefetchStatus = iota + 1
  21. )
  22. // Item is returned during iteration. Both the Key() and Value() output is only valid until
  23. // iterator.Next() is called.
  24. type Item struct {
  25. key []byte
  26. vptr []byte
  27. val []byte
  28. version uint64
  29. expiresAt uint64
  30. slice *y.Slice // Used only during prefetching.
  31. next *Item
  32. txn *Txn
  33. err error
  34. wg sync.WaitGroup
  35. status prefetchStatus
  36. meta byte // We need to store meta to know about bitValuePointer.
  37. userMeta byte
  38. }
  39. // String returns a string representation of Item
  40. func (item *Item) String() string {
  41. return fmt.Sprintf("key=%q, version=%d, meta=%x", item.Key(), item.Version(), item.meta)
  42. }
  43. // Key returns the key.
  44. //
  45. // Key is only valid as long as item is valid, or transaction is valid. If you need to use it
  46. // outside its validity, please use KeyCopy.
  47. func (item *Item) Key() []byte {
  48. return item.key
  49. }
  50. // KeyCopy returns a copy of the key of the item, writing it to dst slice.
  51. // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
  52. // returned.
  53. func (item *Item) KeyCopy(dst []byte) []byte {
  54. return y.SafeCopy(dst, item.key)
  55. }
  56. // Version returns the commit timestamp of the item.
  57. func (item *Item) Version() uint64 {
  58. return item.version
  59. }
  60. // Value retrieves the value of the item from the value log.
  61. //
  62. // This method must be called within a transaction. Calling it outside a
  63. // transaction is considered undefined behavior. If an iterator is being used,
  64. // then Item.Value() is defined in the current iteration only, because items are
  65. // reused.
  66. //
  67. // If you need to use a value outside a transaction, please use Item.ValueCopy
  68. // instead, or copy it yourself. Value might change once discard or commit is called.
  69. // Use ValueCopy if you want to do a Set after Get.
  70. func (item *Item) Value(fn func(val []byte) error) error {
  71. item.wg.Wait()
  72. if item.status == prefetched {
  73. if item.err == nil && fn != nil {
  74. if err := fn(item.val); err != nil {
  75. return err
  76. }
  77. }
  78. return item.err
  79. }
  80. buf, cb, err := item.yieldItemValue()
  81. defer runCallback(cb)
  82. if err != nil {
  83. return err
  84. }
  85. if fn != nil {
  86. return fn(buf)
  87. }
  88. return nil
  89. }
  90. // ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
  91. // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
  92. // returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
  93. //
  94. // This function is useful in long running iterate/update transactions to avoid a write deadlock.
  95. // See Github issue: https://github.com/hypermodeinc/badger/issues/315
  96. func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
  97. item.wg.Wait()
  98. if item.status == prefetched {
  99. return y.SafeCopy(dst, item.val), item.err
  100. }
  101. buf, cb, err := item.yieldItemValue()
  102. defer runCallback(cb)
  103. return y.SafeCopy(dst, buf), err
  104. }
  105. func (item *Item) hasValue() bool {
  106. if item.meta == 0 && item.vptr == nil {
  107. // key not found
  108. return false
  109. }
  110. return true
  111. }
  112. // IsDeletedOrExpired returns true if item contains deleted or expired value.
  113. func (item *Item) IsDeletedOrExpired() bool {
  114. return isDeletedOrExpired(item.meta, item.expiresAt)
  115. }
  116. // DiscardEarlierVersions returns whether the item was created with the
  117. // option to discard earlier versions of a key when multiple are available.
  118. func (item *Item) DiscardEarlierVersions() bool {
  119. return item.meta&bitDiscardEarlierVersions > 0
  120. }
  121. func (item *Item) yieldItemValue() ([]byte, func(), error) {
  122. key := item.Key() // No need to copy.
  123. if !item.hasValue() {
  124. return nil, nil, nil
  125. }
  126. if item.slice == nil {
  127. item.slice = new(y.Slice)
  128. }
  129. if (item.meta & bitValuePointer) == 0 {
  130. val := item.slice.Resize(len(item.vptr))
  131. copy(val, item.vptr)
  132. return val, nil, nil
  133. }
  134. var vp valuePointer
  135. vp.Decode(item.vptr)
  136. db := item.txn.db
  137. result, cb, err := db.vlog.Read(vp, item.slice)
  138. if err != nil {
  139. db.opt.Errorf("Unable to read: Key: %v, Version : %v, meta: %v, userMeta: %v"+
  140. " Error: %v", key, item.version, item.meta, item.userMeta, err)
  141. var txn *Txn
  142. if db.opt.managedTxns {
  143. txn = db.NewTransactionAt(math.MaxUint64, false)
  144. } else {
  145. txn = db.NewTransaction(false)
  146. }
  147. defer txn.Discard()
  148. iopt := DefaultIteratorOptions
  149. iopt.AllVersions = true
  150. iopt.InternalAccess = true
  151. iopt.PrefetchValues = false
  152. it := txn.NewKeyIterator(item.Key(), iopt)
  153. defer it.Close()
  154. for it.Rewind(); it.Valid(); it.Next() {
  155. item := it.Item()
  156. var vp valuePointer
  157. if item.meta&bitValuePointer > 0 {
  158. vp.Decode(item.vptr)
  159. }
  160. db.opt.Errorf("Key: %v, Version : %v, meta: %v, userMeta: %v valuePointer: %+v",
  161. item.Key(), item.version, item.meta, item.userMeta, vp)
  162. }
  163. }
  164. // Don't return error if we cannot read the value. Just log the error.
  165. return result, cb, nil
  166. }
  167. func runCallback(cb func()) {
  168. if cb != nil {
  169. cb()
  170. }
  171. }
  172. func (item *Item) prefetchValue() {
  173. val, cb, err := item.yieldItemValue()
  174. defer runCallback(cb)
  175. item.err = err
  176. item.status = prefetched
  177. if val == nil {
  178. return
  179. }
  180. buf := item.slice.Resize(len(val))
  181. copy(buf, val)
  182. item.val = buf
  183. }
  184. // EstimatedSize returns the approximate size of the key-value pair.
  185. //
  186. // This can be called while iterating through a store to quickly estimate the
  187. // size of a range of key-value pairs (without fetching the corresponding
  188. // values).
  189. func (item *Item) EstimatedSize() int64 {
  190. if !item.hasValue() {
  191. return 0
  192. }
  193. if (item.meta & bitValuePointer) == 0 {
  194. return int64(len(item.key) + len(item.vptr))
  195. }
  196. var vp valuePointer
  197. vp.Decode(item.vptr)
  198. return int64(vp.Len) // includes key length.
  199. }
  200. // KeySize returns the size of the key.
  201. // Exact size of the key is key + 8 bytes of timestamp
  202. func (item *Item) KeySize() int64 {
  203. return int64(len(item.key))
  204. }
  205. // ValueSize returns the approximate size of the value.
  206. //
  207. // This can be called to quickly estimate the size of a value without fetching
  208. // it.
  209. func (item *Item) ValueSize() int64 {
  210. if !item.hasValue() {
  211. return 0
  212. }
  213. if (item.meta & bitValuePointer) == 0 {
  214. return int64(len(item.vptr))
  215. }
  216. var vp valuePointer
  217. vp.Decode(item.vptr)
  218. klen := int64(len(item.key) + 8) // 8 bytes for timestamp.
  219. // 6 bytes are for the approximate length of the header. Since header is encoded in varint, we
  220. // cannot find the exact length of header without fetching it.
  221. return int64(vp.Len) - klen - 6 - crc32.Size
  222. }
  223. // UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user
  224. // is used to interpret the value.
  225. func (item *Item) UserMeta() byte {
  226. return item.userMeta
  227. }
  228. // ExpiresAt returns a Unix time value indicating when the item will be
  229. // considered expired. 0 indicates that the item will never expire.
  230. func (item *Item) ExpiresAt() uint64 {
  231. return item.expiresAt
  232. }
  233. // TODO: Switch this to use linked list container in Go.
  234. type list struct {
  235. head *Item
  236. tail *Item
  237. }
  238. func (l *list) push(i *Item) {
  239. i.next = nil
  240. if l.tail == nil {
  241. l.head = i
  242. l.tail = i
  243. return
  244. }
  245. l.tail.next = i
  246. l.tail = i
  247. }
  248. func (l *list) pop() *Item {
  249. if l.head == nil {
  250. return nil
  251. }
  252. i := l.head
  253. if l.head == l.tail {
  254. l.tail = nil
  255. l.head = nil
  256. } else {
  257. l.head = i.next
  258. }
  259. i.next = nil
  260. return i
  261. }
  262. // IteratorOptions is used to set options when iterating over Badger key-value
  263. // stores.
  264. //
  265. // This package provides DefaultIteratorOptions which contains options that
  266. // should work for most applications. Consider using that as a starting point
  267. // before customizing it for your own needs.
  268. type IteratorOptions struct {
  269. // PrefetchSize is the number of KV pairs to prefetch while iterating.
  270. // Valid only if PrefetchValues is true.
  271. PrefetchSize int
  272. // PrefetchValues Indicates whether we should prefetch values during
  273. // iteration and store them.
  274. PrefetchValues bool
  275. Reverse bool // Direction of iteration. False is forward, true is backward.
  276. AllVersions bool // Fetch all valid versions of the same key.
  277. InternalAccess bool // Used to allow internal access to badger keys.
  278. // The following option is used to narrow down the SSTables that iterator
  279. // picks up. If Prefix is specified, only tables which could have this
  280. // prefix are picked based on their range of keys.
  281. prefixIsKey bool // If set, use the prefix for bloom filter lookup.
  282. Prefix []byte // Only iterate over this given prefix.
  283. SinceTs uint64 // Only read data that has version > SinceTs.
  284. }
  285. func (opt *IteratorOptions) compareToPrefix(key []byte) int {
  286. // We should compare key without timestamp. For example key - a[TS] might be > "aa" prefix.
  287. key = y.ParseKey(key)
  288. if len(key) > len(opt.Prefix) {
  289. key = key[:len(opt.Prefix)]
  290. }
  291. return bytes.Compare(key, opt.Prefix)
  292. }
  293. func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
  294. // Ignore this table if its max version is less than the sinceTs.
  295. if t.MaxVersion() < opt.SinceTs {
  296. return false
  297. }
  298. if len(opt.Prefix) == 0 {
  299. return true
  300. }
  301. if opt.compareToPrefix(t.Smallest()) > 0 {
  302. return false
  303. }
  304. if opt.compareToPrefix(t.Biggest()) < 0 {
  305. return false
  306. }
  307. // Bloom filter lookup would only work if opt.Prefix does NOT have the read
  308. // timestamp as part of the key.
  309. if opt.prefixIsKey && t.DoesNotHave(y.Hash(opt.Prefix)) {
  310. return false
  311. }
  312. return true
  313. }
  314. // pickTables picks the necessary table for the iterator. This function also assumes
  315. // that the tables are sorted in the right order.
  316. func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
  317. filterTables := func(tables []*table.Table) []*table.Table {
  318. if opt.SinceTs > 0 {
  319. tmp := tables[:0]
  320. for _, t := range tables {
  321. if t.MaxVersion() < opt.SinceTs {
  322. continue
  323. }
  324. tmp = append(tmp, t)
  325. }
  326. tables = tmp
  327. }
  328. return tables
  329. }
  330. if len(opt.Prefix) == 0 {
  331. out := make([]*table.Table, len(all))
  332. copy(out, all)
  333. return filterTables(out)
  334. }
  335. sIdx := sort.Search(len(all), func(i int) bool {
  336. // table.Biggest >= opt.prefix
  337. // if opt.Prefix < table.Biggest, then surely it is not in any of the preceding tables.
  338. return opt.compareToPrefix(all[i].Biggest()) >= 0
  339. })
  340. if sIdx == len(all) {
  341. // Not found.
  342. return []*table.Table{}
  343. }
  344. filtered := all[sIdx:]
  345. if !opt.prefixIsKey {
  346. eIdx := sort.Search(len(filtered), func(i int) bool {
  347. return opt.compareToPrefix(filtered[i].Smallest()) > 0
  348. })
  349. out := make([]*table.Table, len(filtered[:eIdx]))
  350. copy(out, filtered[:eIdx])
  351. return filterTables(out)
  352. }
  353. // opt.prefixIsKey == true. This code is optimizing for opt.prefixIsKey part.
  354. var out []*table.Table
  355. hash := y.Hash(opt.Prefix)
  356. for _, t := range filtered {
  357. // When we encounter the first table whose smallest key is higher than opt.Prefix, we can
  358. // stop. This is an IMPORTANT optimization, just considering how often we call
  359. // NewKeyIterator.
  360. if opt.compareToPrefix(t.Smallest()) > 0 {
  361. // if table.Smallest > opt.Prefix, then this and all tables after this can be ignored.
  362. break
  363. }
  364. // opt.Prefix is actually the key. So, we can run bloom filter checks
  365. // as well.
  366. if t.DoesNotHave(hash) {
  367. continue
  368. }
  369. out = append(out, t)
  370. }
  371. return filterTables(out)
  372. }
  373. // DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
  374. var DefaultIteratorOptions = IteratorOptions{
  375. PrefetchValues: true,
  376. PrefetchSize: 100,
  377. Reverse: false,
  378. AllVersions: false,
  379. }
  380. // Iterator helps iterating over the KV pairs in a lexicographically sorted order.
  381. type Iterator struct {
  382. iitr y.Iterator
  383. txn *Txn
  384. readTs uint64
  385. opt IteratorOptions
  386. item *Item
  387. data list
  388. waste list
  389. lastKey []byte // Used to skip over multiple versions of the same key.
  390. closed bool
  391. scanned int // Used to estimate the size of data scanned by iterator.
  392. // ThreadId is an optional value that can be set to identify which goroutine created
  393. // the iterator. It can be used, for example, to uniquely identify each of the
  394. // iterators created by the stream interface
  395. ThreadId int
  396. Alloc *z.Allocator
  397. }
  398. // NewIterator returns a new iterator. Depending upon the options, either only keys, or both
  399. // key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
  400. // Using prefetch is recommended if you're doing a long running iteration, for performance.
  401. //
  402. // Multiple Iterators:
  403. // For a read-only txn, multiple iterators can be running simultaneously. However, for a read-write
  404. // txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
  405. // iterator was created. If writes are performed after an iterator is created, then that iterator
  406. // will not be able to see those writes. Only writes performed before an iterator was created can be
  407. // viewed.
  408. func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
  409. if txn.discarded {
  410. panic(ErrDiscardedTxn)
  411. }
  412. if txn.db.IsClosed() {
  413. panic(ErrDBClosed)
  414. }
  415. y.NumIteratorsCreatedAdd(txn.db.opt.MetricsEnabled, 1)
  416. // Keep track of the number of active iterators.
  417. txn.numIterators.Add(1)
  418. // TODO: If Prefix is set, only pick those memtables which have keys with the prefix.
  419. tables, decr := txn.db.getMemTables()
  420. defer decr()
  421. txn.db.vlog.incrIteratorCount()
  422. var iters []y.Iterator
  423. if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
  424. iters = append(iters, itr)
  425. }
  426. for i := 0; i < len(tables); i++ {
  427. iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
  428. }
  429. iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
  430. res := &Iterator{
  431. txn: txn,
  432. iitr: table.NewMergeIterator(iters, opt.Reverse),
  433. opt: opt,
  434. readTs: txn.readTs,
  435. }
  436. return res
  437. }
  438. // NewKeyIterator is just like NewIterator, but allows the user to iterate over all versions of a
  439. // single key. Internally, it sets the Prefix option in provided opt, and uses that prefix to
  440. // additionally run bloom filter lookups before picking tables from the LSM tree.
  441. func (txn *Txn) NewKeyIterator(key []byte, opt IteratorOptions) *Iterator {
  442. if len(opt.Prefix) > 0 {
  443. panic("opt.Prefix should be nil for NewKeyIterator.")
  444. }
  445. opt.Prefix = key // This key must be without the timestamp.
  446. opt.prefixIsKey = true
  447. opt.AllVersions = true
  448. return txn.NewIterator(opt)
  449. }
  450. func (it *Iterator) newItem() *Item {
  451. item := it.waste.pop()
  452. if item == nil {
  453. item = &Item{slice: new(y.Slice), txn: it.txn}
  454. }
  455. return item
  456. }
  457. // Item returns pointer to the current key-value pair.
  458. // This item is only valid until it.Next() gets called.
  459. func (it *Iterator) Item() *Item {
  460. tx := it.txn
  461. tx.addReadKey(it.item.Key())
  462. return it.item
  463. }
  464. // Valid returns false when iteration is done.
  465. func (it *Iterator) Valid() bool {
  466. if it.item == nil {
  467. return false
  468. }
  469. if it.opt.prefixIsKey {
  470. return bytes.Equal(it.item.key, it.opt.Prefix)
  471. }
  472. return bytes.HasPrefix(it.item.key, it.opt.Prefix)
  473. }
  474. // ValidForPrefix returns false when iteration is done
  475. // or when the current key is not prefixed by the specified prefix.
  476. func (it *Iterator) ValidForPrefix(prefix []byte) bool {
  477. return it.Valid() && bytes.HasPrefix(it.item.key, prefix)
  478. }
  479. // Close would close the iterator. It is important to call this when you're done with iteration.
  480. func (it *Iterator) Close() {
  481. if it.closed {
  482. return
  483. }
  484. it.closed = true
  485. if it.iitr == nil {
  486. it.txn.numIterators.Add(-1)
  487. return
  488. }
  489. it.iitr.Close()
  490. // It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
  491. // goroutines behind, which are waiting to acquire file read locks after DB has been closed.
  492. waitFor := func(l list) {
  493. item := l.pop()
  494. for item != nil {
  495. item.wg.Wait()
  496. item = l.pop()
  497. }
  498. }
  499. waitFor(it.waste)
  500. waitFor(it.data)
  501. // TODO: We could handle this error.
  502. _ = it.txn.db.vlog.decrIteratorCount()
  503. it.txn.numIterators.Add(-1)
  504. }
  505. // Next would advance the iterator by one. Always check it.Valid() after a Next()
  506. // to ensure you have access to a valid it.Item().
  507. func (it *Iterator) Next() {
  508. if it.iitr == nil {
  509. return
  510. }
  511. // Reuse current item
  512. it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
  513. it.scanned += len(it.item.key) + len(it.item.val) + len(it.item.vptr) + 2
  514. it.waste.push(it.item)
  515. // Set next item to current
  516. it.item = it.data.pop()
  517. for it.iitr.Valid() && hasPrefix(it) {
  518. if it.parseItem() {
  519. // parseItem calls one extra next.
  520. // This is used to deal with the complexity of reverse iteration.
  521. break
  522. }
  523. }
  524. }
  525. func isDeletedOrExpired(meta byte, expiresAt uint64) bool {
  526. if meta&bitDelete > 0 {
  527. return true
  528. }
  529. if expiresAt == 0 {
  530. return false
  531. }
  532. return expiresAt <= uint64(time.Now().Unix())
  533. }
  534. // parseItem is a complex function because it needs to handle both forward and reverse iteration
  535. // implementation. We store keys such that their versions are sorted in descending order. This makes
  536. // forward iteration efficient, but revese iteration complicated. This tradeoff is better because
  537. // forward iteration is more common than reverse. It returns true, if either the iterator is invalid
  538. // or it has pushed an item into it.data list, else it returns false.
  539. //
  540. // This function advances the iterator.
  541. func (it *Iterator) parseItem() bool {
  542. mi := it.iitr
  543. key := mi.Key()
  544. setItem := func(item *Item) {
  545. if it.item == nil {
  546. it.item = item
  547. } else {
  548. it.data.push(item)
  549. }
  550. }
  551. isInternalKey := bytes.HasPrefix(key, badgerPrefix)
  552. // Skip badger keys.
  553. if !it.opt.InternalAccess && isInternalKey {
  554. mi.Next()
  555. return false
  556. }
  557. // Skip any versions which are beyond the readTs.
  558. version := y.ParseTs(key)
  559. // Ignore everything that is above the readTs and below or at the sinceTs.
  560. if version > it.readTs || (it.opt.SinceTs > 0 && version <= it.opt.SinceTs) {
  561. mi.Next()
  562. return false
  563. }
  564. // Skip banned keys only if it does not have badger internal prefix.
  565. if !isInternalKey && it.txn.db.isBanned(key) != nil {
  566. mi.Next()
  567. return false
  568. }
  569. if it.opt.AllVersions {
  570. // Return deleted or expired values also, otherwise user can't figure out
  571. // whether the key was deleted.
  572. item := it.newItem()
  573. it.fill(item)
  574. setItem(item)
  575. mi.Next()
  576. return true
  577. }
  578. // If iterating in forward direction, then just checking the last key against current key would
  579. // be sufficient.
  580. if !it.opt.Reverse {
  581. if y.SameKey(it.lastKey, key) {
  582. mi.Next()
  583. return false
  584. }
  585. // Only track in forward direction.
  586. // We should update lastKey as soon as we find a different key in our snapshot.
  587. // Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
  588. // Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
  589. // which is wrong. Therefore, update lastKey here.
  590. it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
  591. }
  592. FILL:
  593. // If deleted, advance and return.
  594. vs := mi.Value()
  595. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  596. mi.Next()
  597. return false
  598. }
  599. item := it.newItem()
  600. it.fill(item)
  601. // fill item based on current cursor position. All Next calls have returned, so reaching here
  602. // means no Next was called.
  603. mi.Next() // Advance but no fill item yet.
  604. if !it.opt.Reverse || !mi.Valid() { // Forward direction, or invalid.
  605. setItem(item)
  606. return true
  607. }
  608. // Reverse direction.
  609. nextTs := y.ParseTs(mi.Key())
  610. mik := y.ParseKey(mi.Key())
  611. if nextTs <= it.readTs && bytes.Equal(mik, item.key) {
  612. // This is a valid potential candidate.
  613. goto FILL
  614. }
  615. // Ignore the next candidate. Return the current one.
  616. setItem(item)
  617. return true
  618. }
  619. func (it *Iterator) fill(item *Item) {
  620. vs := it.iitr.Value()
  621. item.meta = vs.Meta
  622. item.userMeta = vs.UserMeta
  623. item.expiresAt = vs.ExpiresAt
  624. item.version = y.ParseTs(it.iitr.Key())
  625. item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))
  626. item.vptr = y.SafeCopy(item.vptr, vs.Value)
  627. item.val = nil
  628. if it.opt.PrefetchValues {
  629. item.wg.Add(1)
  630. go func() {
  631. // FIXME we are not handling errors here.
  632. item.prefetchValue()
  633. item.wg.Done()
  634. }()
  635. }
  636. }
  637. func hasPrefix(it *Iterator) bool {
  638. // We shouldn't check prefix in case the iterator is going in reverse. Since in reverse we expect
  639. // people to append items to the end of prefix.
  640. if !it.opt.Reverse && len(it.opt.Prefix) > 0 {
  641. return bytes.HasPrefix(y.ParseKey(it.iitr.Key()), it.opt.Prefix)
  642. }
  643. return true
  644. }
  645. func (it *Iterator) prefetch() {
  646. prefetchSize := 2
  647. if it.opt.PrefetchValues && it.opt.PrefetchSize > 1 {
  648. prefetchSize = it.opt.PrefetchSize
  649. }
  650. i := it.iitr
  651. var count int
  652. it.item = nil
  653. for i.Valid() && hasPrefix(it) {
  654. if !it.parseItem() {
  655. continue
  656. }
  657. count++
  658. if count == prefetchSize {
  659. break
  660. }
  661. }
  662. }
  663. // Seek would seek to the provided key if present. If absent, it would seek to the next
  664. // smallest key greater than the provided key if iterating in the forward direction.
  665. // Behavior would be reversed if iterating backwards.
  666. func (it *Iterator) Seek(key []byte) {
  667. if it.iitr == nil {
  668. return
  669. }
  670. if len(key) > 0 {
  671. it.txn.addReadKey(key)
  672. }
  673. for i := it.data.pop(); i != nil; i = it.data.pop() {
  674. i.wg.Wait()
  675. it.waste.push(i)
  676. }
  677. it.lastKey = it.lastKey[:0]
  678. if len(key) == 0 {
  679. key = it.opt.Prefix
  680. }
  681. if len(key) == 0 {
  682. it.iitr.Rewind()
  683. it.prefetch()
  684. return
  685. }
  686. if !it.opt.Reverse {
  687. key = y.KeyWithTs(key, it.txn.readTs)
  688. } else {
  689. key = y.KeyWithTs(key, 0)
  690. }
  691. it.iitr.Seek(key)
  692. it.prefetch()
  693. }
  694. // Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
  695. // smallest key if iterating forward, and largest if iterating backward. It does not keep track of
  696. // whether the cursor started with a Seek().
  697. func (it *Iterator) Rewind() {
  698. it.Seek(nil)
  699. }