iterator.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bytes"
  19. "fmt"
  20. "hash/crc32"
  21. "math"
  22. "sort"
  23. "sync"
  24. "time"
  25. "github.com/dgraph-io/badger/v4/table"
  26. "github.com/dgraph-io/badger/v4/y"
  27. "github.com/dgraph-io/ristretto/v2/z"
  28. )
  29. type prefetchStatus uint8
  30. const (
  31. prefetched prefetchStatus = iota + 1
  32. )
  33. // Item is returned during iteration. Both the Key() and Value() output is only valid until
  34. // iterator.Next() is called.
  35. type Item struct {
  36. key []byte
  37. vptr []byte
  38. val []byte
  39. version uint64
  40. expiresAt uint64
  41. slice *y.Slice // Used only during prefetching.
  42. next *Item
  43. txn *Txn
  44. err error
  45. wg sync.WaitGroup
  46. status prefetchStatus
  47. meta byte // We need to store meta to know about bitValuePointer.
  48. userMeta byte
  49. }
  50. // String returns a string representation of Item
  51. func (item *Item) String() string {
  52. return fmt.Sprintf("key=%q, version=%d, meta=%x", item.Key(), item.Version(), item.meta)
  53. }
  54. // Key returns the key.
  55. //
  56. // Key is only valid as long as item is valid, or transaction is valid. If you need to use it
  57. // outside its validity, please use KeyCopy.
  58. func (item *Item) Key() []byte {
  59. return item.key
  60. }
  61. // KeyCopy returns a copy of the key of the item, writing it to dst slice.
  62. // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
  63. // returned.
  64. func (item *Item) KeyCopy(dst []byte) []byte {
  65. return y.SafeCopy(dst, item.key)
  66. }
  67. // Version returns the commit timestamp of the item.
  68. func (item *Item) Version() uint64 {
  69. return item.version
  70. }
  71. // Value retrieves the value of the item from the value log.
  72. //
  73. // This method must be called within a transaction. Calling it outside a
  74. // transaction is considered undefined behavior. If an iterator is being used,
  75. // then Item.Value() is defined in the current iteration only, because items are
  76. // reused.
  77. //
  78. // If you need to use a value outside a transaction, please use Item.ValueCopy
  79. // instead, or copy it yourself. Value might change once discard or commit is called.
  80. // Use ValueCopy if you want to do a Set after Get.
  81. func (item *Item) Value(fn func(val []byte) error) error {
  82. item.wg.Wait()
  83. if item.status == prefetched {
  84. if item.err == nil && fn != nil {
  85. if err := fn(item.val); err != nil {
  86. return err
  87. }
  88. }
  89. return item.err
  90. }
  91. buf, cb, err := item.yieldItemValue()
  92. defer runCallback(cb)
  93. if err != nil {
  94. return err
  95. }
  96. if fn != nil {
  97. return fn(buf)
  98. }
  99. return nil
  100. }
  101. // ValueCopy returns a copy of the value of the item from the value log, writing it to dst slice.
  102. // If nil is passed, or capacity of dst isn't sufficient, a new slice would be allocated and
  103. // returned. Tip: It might make sense to reuse the returned slice as dst argument for the next call.
  104. //
  105. // This function is useful in long running iterate/update transactions to avoid a write deadlock.
  106. // See Github issue: https://github.com/dgraph-io/badger/issues/315
  107. func (item *Item) ValueCopy(dst []byte) ([]byte, error) {
  108. item.wg.Wait()
  109. if item.status == prefetched {
  110. return y.SafeCopy(dst, item.val), item.err
  111. }
  112. buf, cb, err := item.yieldItemValue()
  113. defer runCallback(cb)
  114. return y.SafeCopy(dst, buf), err
  115. }
  116. func (item *Item) hasValue() bool {
  117. if item.meta == 0 && item.vptr == nil {
  118. // key not found
  119. return false
  120. }
  121. return true
  122. }
  123. // IsDeletedOrExpired returns true if item contains deleted or expired value.
  124. func (item *Item) IsDeletedOrExpired() bool {
  125. return isDeletedOrExpired(item.meta, item.expiresAt)
  126. }
  127. // DiscardEarlierVersions returns whether the item was created with the
  128. // option to discard earlier versions of a key when multiple are available.
  129. func (item *Item) DiscardEarlierVersions() bool {
  130. return item.meta&bitDiscardEarlierVersions > 0
  131. }
  132. func (item *Item) yieldItemValue() ([]byte, func(), error) {
  133. key := item.Key() // No need to copy.
  134. if !item.hasValue() {
  135. return nil, nil, nil
  136. }
  137. if item.slice == nil {
  138. item.slice = new(y.Slice)
  139. }
  140. if (item.meta & bitValuePointer) == 0 {
  141. val := item.slice.Resize(len(item.vptr))
  142. copy(val, item.vptr)
  143. return val, nil, nil
  144. }
  145. var vp valuePointer
  146. vp.Decode(item.vptr)
  147. db := item.txn.db
  148. result, cb, err := db.vlog.Read(vp, item.slice)
  149. if err != nil {
  150. db.opt.Errorf("Unable to read: Key: %v, Version : %v, meta: %v, userMeta: %v"+
  151. " Error: %v", key, item.version, item.meta, item.userMeta, err)
  152. var txn *Txn
  153. if db.opt.managedTxns {
  154. txn = db.NewTransactionAt(math.MaxUint64, false)
  155. } else {
  156. txn = db.NewTransaction(false)
  157. }
  158. defer txn.Discard()
  159. iopt := DefaultIteratorOptions
  160. iopt.AllVersions = true
  161. iopt.InternalAccess = true
  162. iopt.PrefetchValues = false
  163. it := txn.NewKeyIterator(item.Key(), iopt)
  164. defer it.Close()
  165. for it.Rewind(); it.Valid(); it.Next() {
  166. item := it.Item()
  167. var vp valuePointer
  168. if item.meta&bitValuePointer > 0 {
  169. vp.Decode(item.vptr)
  170. }
  171. db.opt.Errorf("Key: %v, Version : %v, meta: %v, userMeta: %v valuePointer: %+v",
  172. item.Key(), item.version, item.meta, item.userMeta, vp)
  173. }
  174. }
  175. // Don't return error if we cannot read the value. Just log the error.
  176. return result, cb, nil
  177. }
  178. func runCallback(cb func()) {
  179. if cb != nil {
  180. cb()
  181. }
  182. }
  183. func (item *Item) prefetchValue() {
  184. val, cb, err := item.yieldItemValue()
  185. defer runCallback(cb)
  186. item.err = err
  187. item.status = prefetched
  188. if val == nil {
  189. return
  190. }
  191. buf := item.slice.Resize(len(val))
  192. copy(buf, val)
  193. item.val = buf
  194. }
  195. // EstimatedSize returns the approximate size of the key-value pair.
  196. //
  197. // This can be called while iterating through a store to quickly estimate the
  198. // size of a range of key-value pairs (without fetching the corresponding
  199. // values).
  200. func (item *Item) EstimatedSize() int64 {
  201. if !item.hasValue() {
  202. return 0
  203. }
  204. if (item.meta & bitValuePointer) == 0 {
  205. return int64(len(item.key) + len(item.vptr))
  206. }
  207. var vp valuePointer
  208. vp.Decode(item.vptr)
  209. return int64(vp.Len) // includes key length.
  210. }
  211. // KeySize returns the size of the key.
  212. // Exact size of the key is key + 8 bytes of timestamp
  213. func (item *Item) KeySize() int64 {
  214. return int64(len(item.key))
  215. }
  216. // ValueSize returns the approximate size of the value.
  217. //
  218. // This can be called to quickly estimate the size of a value without fetching
  219. // it.
  220. func (item *Item) ValueSize() int64 {
  221. if !item.hasValue() {
  222. return 0
  223. }
  224. if (item.meta & bitValuePointer) == 0 {
  225. return int64(len(item.vptr))
  226. }
  227. var vp valuePointer
  228. vp.Decode(item.vptr)
  229. klen := int64(len(item.key) + 8) // 8 bytes for timestamp.
  230. // 6 bytes are for the approximate length of the header. Since header is encoded in varint, we
  231. // cannot find the exact length of header without fetching it.
  232. return int64(vp.Len) - klen - 6 - crc32.Size
  233. }
  234. // UserMeta returns the userMeta set by the user. Typically, this byte, optionally set by the user
  235. // is used to interpret the value.
  236. func (item *Item) UserMeta() byte {
  237. return item.userMeta
  238. }
  239. // ExpiresAt returns a Unix time value indicating when the item will be
  240. // considered expired. 0 indicates that the item will never expire.
  241. func (item *Item) ExpiresAt() uint64 {
  242. return item.expiresAt
  243. }
  244. // TODO: Switch this to use linked list container in Go.
  245. type list struct {
  246. head *Item
  247. tail *Item
  248. }
  249. func (l *list) push(i *Item) {
  250. i.next = nil
  251. if l.tail == nil {
  252. l.head = i
  253. l.tail = i
  254. return
  255. }
  256. l.tail.next = i
  257. l.tail = i
  258. }
  259. func (l *list) pop() *Item {
  260. if l.head == nil {
  261. return nil
  262. }
  263. i := l.head
  264. if l.head == l.tail {
  265. l.tail = nil
  266. l.head = nil
  267. } else {
  268. l.head = i.next
  269. }
  270. i.next = nil
  271. return i
  272. }
  273. // IteratorOptions is used to set options when iterating over Badger key-value
  274. // stores.
  275. //
  276. // This package provides DefaultIteratorOptions which contains options that
  277. // should work for most applications. Consider using that as a starting point
  278. // before customizing it for your own needs.
  279. type IteratorOptions struct {
  280. // PrefetchSize is the number of KV pairs to prefetch while iterating.
  281. // Valid only if PrefetchValues is true.
  282. PrefetchSize int
  283. // PrefetchValues Indicates whether we should prefetch values during
  284. // iteration and store them.
  285. PrefetchValues bool
  286. Reverse bool // Direction of iteration. False is forward, true is backward.
  287. AllVersions bool // Fetch all valid versions of the same key.
  288. InternalAccess bool // Used to allow internal access to badger keys.
  289. // The following option is used to narrow down the SSTables that iterator
  290. // picks up. If Prefix is specified, only tables which could have this
  291. // prefix are picked based on their range of keys.
  292. prefixIsKey bool // If set, use the prefix for bloom filter lookup.
  293. Prefix []byte // Only iterate over this given prefix.
  294. SinceTs uint64 // Only read data that has version > SinceTs.
  295. }
  296. func (opt *IteratorOptions) compareToPrefix(key []byte) int {
  297. // We should compare key without timestamp. For example key - a[TS] might be > "aa" prefix.
  298. key = y.ParseKey(key)
  299. if len(key) > len(opt.Prefix) {
  300. key = key[:len(opt.Prefix)]
  301. }
  302. return bytes.Compare(key, opt.Prefix)
  303. }
  304. func (opt *IteratorOptions) pickTable(t table.TableInterface) bool {
  305. // Ignore this table if its max version is less than the sinceTs.
  306. if t.MaxVersion() < opt.SinceTs {
  307. return false
  308. }
  309. if len(opt.Prefix) == 0 {
  310. return true
  311. }
  312. if opt.compareToPrefix(t.Smallest()) > 0 {
  313. return false
  314. }
  315. if opt.compareToPrefix(t.Biggest()) < 0 {
  316. return false
  317. }
  318. // Bloom filter lookup would only work if opt.Prefix does NOT have the read
  319. // timestamp as part of the key.
  320. if opt.prefixIsKey && t.DoesNotHave(y.Hash(opt.Prefix)) {
  321. return false
  322. }
  323. return true
  324. }
  325. // pickTables picks the necessary table for the iterator. This function also assumes
  326. // that the tables are sorted in the right order.
  327. func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table {
  328. filterTables := func(tables []*table.Table) []*table.Table {
  329. if opt.SinceTs > 0 {
  330. tmp := tables[:0]
  331. for _, t := range tables {
  332. if t.MaxVersion() < opt.SinceTs {
  333. continue
  334. }
  335. tmp = append(tmp, t)
  336. }
  337. tables = tmp
  338. }
  339. return tables
  340. }
  341. if len(opt.Prefix) == 0 {
  342. out := make([]*table.Table, len(all))
  343. copy(out, all)
  344. return filterTables(out)
  345. }
  346. sIdx := sort.Search(len(all), func(i int) bool {
  347. // table.Biggest >= opt.prefix
  348. // if opt.Prefix < table.Biggest, then surely it is not in any of the preceding tables.
  349. return opt.compareToPrefix(all[i].Biggest()) >= 0
  350. })
  351. if sIdx == len(all) {
  352. // Not found.
  353. return []*table.Table{}
  354. }
  355. filtered := all[sIdx:]
  356. if !opt.prefixIsKey {
  357. eIdx := sort.Search(len(filtered), func(i int) bool {
  358. return opt.compareToPrefix(filtered[i].Smallest()) > 0
  359. })
  360. out := make([]*table.Table, len(filtered[:eIdx]))
  361. copy(out, filtered[:eIdx])
  362. return filterTables(out)
  363. }
  364. // opt.prefixIsKey == true. This code is optimizing for opt.prefixIsKey part.
  365. var out []*table.Table
  366. hash := y.Hash(opt.Prefix)
  367. for _, t := range filtered {
  368. // When we encounter the first table whose smallest key is higher than opt.Prefix, we can
  369. // stop. This is an IMPORTANT optimization, just considering how often we call
  370. // NewKeyIterator.
  371. if opt.compareToPrefix(t.Smallest()) > 0 {
  372. // if table.Smallest > opt.Prefix, then this and all tables after this can be ignored.
  373. break
  374. }
  375. // opt.Prefix is actually the key. So, we can run bloom filter checks
  376. // as well.
  377. if t.DoesNotHave(hash) {
  378. continue
  379. }
  380. out = append(out, t)
  381. }
  382. return filterTables(out)
  383. }
  384. // DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
  385. var DefaultIteratorOptions = IteratorOptions{
  386. PrefetchValues: true,
  387. PrefetchSize: 100,
  388. Reverse: false,
  389. AllVersions: false,
  390. }
  391. // Iterator helps iterating over the KV pairs in a lexicographically sorted order.
  392. type Iterator struct {
  393. iitr y.Iterator
  394. txn *Txn
  395. readTs uint64
  396. opt IteratorOptions
  397. item *Item
  398. data list
  399. waste list
  400. lastKey []byte // Used to skip over multiple versions of the same key.
  401. closed bool
  402. scanned int // Used to estimate the size of data scanned by iterator.
  403. // ThreadId is an optional value that can be set to identify which goroutine created
  404. // the iterator. It can be used, for example, to uniquely identify each of the
  405. // iterators created by the stream interface
  406. ThreadId int
  407. Alloc *z.Allocator
  408. }
  409. // NewIterator returns a new iterator. Depending upon the options, either only keys, or both
  410. // key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
  411. // Using prefetch is recommended if you're doing a long running iteration, for performance.
  412. //
  413. // Multiple Iterators:
  414. // For a read-only txn, multiple iterators can be running simultaneously. However, for a read-write
  415. // txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
  416. // iterator was created. If writes are performed after an iterator is created, then that iterator
  417. // will not be able to see those writes. Only writes performed before an iterator was created can be
  418. // viewed.
  419. func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
  420. if txn.discarded {
  421. panic(ErrDiscardedTxn)
  422. }
  423. if txn.db.IsClosed() {
  424. panic(ErrDBClosed)
  425. }
  426. y.NumIteratorsCreatedAdd(txn.db.opt.MetricsEnabled, 1)
  427. // Keep track of the number of active iterators.
  428. txn.numIterators.Add(1)
  429. // TODO: If Prefix is set, only pick those memtables which have keys with the prefix.
  430. tables, decr := txn.db.getMemTables()
  431. defer decr()
  432. txn.db.vlog.incrIteratorCount()
  433. var iters []y.Iterator
  434. if itr := txn.newPendingWritesIterator(opt.Reverse); itr != nil {
  435. iters = append(iters, itr)
  436. }
  437. for i := 0; i < len(tables); i++ {
  438. iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse))
  439. }
  440. iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.
  441. res := &Iterator{
  442. txn: txn,
  443. iitr: table.NewMergeIterator(iters, opt.Reverse),
  444. opt: opt,
  445. readTs: txn.readTs,
  446. }
  447. return res
  448. }
  449. // NewKeyIterator is just like NewIterator, but allows the user to iterate over all versions of a
  450. // single key. Internally, it sets the Prefix option in provided opt, and uses that prefix to
  451. // additionally run bloom filter lookups before picking tables from the LSM tree.
  452. func (txn *Txn) NewKeyIterator(key []byte, opt IteratorOptions) *Iterator {
  453. if len(opt.Prefix) > 0 {
  454. panic("opt.Prefix should be nil for NewKeyIterator.")
  455. }
  456. opt.Prefix = key // This key must be without the timestamp.
  457. opt.prefixIsKey = true
  458. opt.AllVersions = true
  459. return txn.NewIterator(opt)
  460. }
  461. func (it *Iterator) newItem() *Item {
  462. item := it.waste.pop()
  463. if item == nil {
  464. item = &Item{slice: new(y.Slice), txn: it.txn}
  465. }
  466. return item
  467. }
  468. // Item returns pointer to the current key-value pair.
  469. // This item is only valid until it.Next() gets called.
  470. func (it *Iterator) Item() *Item {
  471. tx := it.txn
  472. tx.addReadKey(it.item.Key())
  473. return it.item
  474. }
  475. // Valid returns false when iteration is done.
  476. func (it *Iterator) Valid() bool {
  477. if it.item == nil {
  478. return false
  479. }
  480. if it.opt.prefixIsKey {
  481. return bytes.Equal(it.item.key, it.opt.Prefix)
  482. }
  483. return bytes.HasPrefix(it.item.key, it.opt.Prefix)
  484. }
  485. // ValidForPrefix returns false when iteration is done
  486. // or when the current key is not prefixed by the specified prefix.
  487. func (it *Iterator) ValidForPrefix(prefix []byte) bool {
  488. return it.Valid() && bytes.HasPrefix(it.item.key, prefix)
  489. }
  490. // Close would close the iterator. It is important to call this when you're done with iteration.
  491. func (it *Iterator) Close() {
  492. if it.closed {
  493. return
  494. }
  495. it.closed = true
  496. if it.iitr == nil {
  497. it.txn.numIterators.Add(-1)
  498. return
  499. }
  500. it.iitr.Close()
  501. // It is important to wait for the fill goroutines to finish. Otherwise, we might leave zombie
  502. // goroutines behind, which are waiting to acquire file read locks after DB has been closed.
  503. waitFor := func(l list) {
  504. item := l.pop()
  505. for item != nil {
  506. item.wg.Wait()
  507. item = l.pop()
  508. }
  509. }
  510. waitFor(it.waste)
  511. waitFor(it.data)
  512. // TODO: We could handle this error.
  513. _ = it.txn.db.vlog.decrIteratorCount()
  514. it.txn.numIterators.Add(-1)
  515. }
  516. // Next would advance the iterator by one. Always check it.Valid() after a Next()
  517. // to ensure you have access to a valid it.Item().
  518. func (it *Iterator) Next() {
  519. if it.iitr == nil {
  520. return
  521. }
  522. // Reuse current item
  523. it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
  524. it.scanned += len(it.item.key) + len(it.item.val) + len(it.item.vptr) + 2
  525. it.waste.push(it.item)
  526. // Set next item to current
  527. it.item = it.data.pop()
  528. for it.iitr.Valid() && hasPrefix(it) {
  529. if it.parseItem() {
  530. // parseItem calls one extra next.
  531. // This is used to deal with the complexity of reverse iteration.
  532. break
  533. }
  534. }
  535. }
  536. func isDeletedOrExpired(meta byte, expiresAt uint64) bool {
  537. if meta&bitDelete > 0 {
  538. return true
  539. }
  540. if expiresAt == 0 {
  541. return false
  542. }
  543. return expiresAt <= uint64(time.Now().Unix())
  544. }
  545. // parseItem is a complex function because it needs to handle both forward and reverse iteration
  546. // implementation. We store keys such that their versions are sorted in descending order. This makes
  547. // forward iteration efficient, but revese iteration complicated. This tradeoff is better because
  548. // forward iteration is more common than reverse. It returns true, if either the iterator is invalid
  549. // or it has pushed an item into it.data list, else it returns false.
  550. //
  551. // This function advances the iterator.
  552. func (it *Iterator) parseItem() bool {
  553. mi := it.iitr
  554. key := mi.Key()
  555. setItem := func(item *Item) {
  556. if it.item == nil {
  557. it.item = item
  558. } else {
  559. it.data.push(item)
  560. }
  561. }
  562. isInternalKey := bytes.HasPrefix(key, badgerPrefix)
  563. // Skip badger keys.
  564. if !it.opt.InternalAccess && isInternalKey {
  565. mi.Next()
  566. return false
  567. }
  568. // Skip any versions which are beyond the readTs.
  569. version := y.ParseTs(key)
  570. // Ignore everything that is above the readTs and below or at the sinceTs.
  571. if version > it.readTs || (it.opt.SinceTs > 0 && version <= it.opt.SinceTs) {
  572. mi.Next()
  573. return false
  574. }
  575. // Skip banned keys only if it does not have badger internal prefix.
  576. if !isInternalKey && it.txn.db.isBanned(key) != nil {
  577. mi.Next()
  578. return false
  579. }
  580. if it.opt.AllVersions {
  581. // Return deleted or expired values also, otherwise user can't figure out
  582. // whether the key was deleted.
  583. item := it.newItem()
  584. it.fill(item)
  585. setItem(item)
  586. mi.Next()
  587. return true
  588. }
  589. // If iterating in forward direction, then just checking the last key against current key would
  590. // be sufficient.
  591. if !it.opt.Reverse {
  592. if y.SameKey(it.lastKey, key) {
  593. mi.Next()
  594. return false
  595. }
  596. // Only track in forward direction.
  597. // We should update lastKey as soon as we find a different key in our snapshot.
  598. // Consider keys: a 5, b 7 (del), b 5. When iterating, lastKey = a.
  599. // Then we see b 7, which is deleted. If we don't store lastKey = b, we'll then return b 5,
  600. // which is wrong. Therefore, update lastKey here.
  601. it.lastKey = y.SafeCopy(it.lastKey, mi.Key())
  602. }
  603. FILL:
  604. // If deleted, advance and return.
  605. vs := mi.Value()
  606. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  607. mi.Next()
  608. return false
  609. }
  610. item := it.newItem()
  611. it.fill(item)
  612. // fill item based on current cursor position. All Next calls have returned, so reaching here
  613. // means no Next was called.
  614. mi.Next() // Advance but no fill item yet.
  615. if !it.opt.Reverse || !mi.Valid() { // Forward direction, or invalid.
  616. setItem(item)
  617. return true
  618. }
  619. // Reverse direction.
  620. nextTs := y.ParseTs(mi.Key())
  621. mik := y.ParseKey(mi.Key())
  622. if nextTs <= it.readTs && bytes.Equal(mik, item.key) {
  623. // This is a valid potential candidate.
  624. goto FILL
  625. }
  626. // Ignore the next candidate. Return the current one.
  627. setItem(item)
  628. return true
  629. }
  630. func (it *Iterator) fill(item *Item) {
  631. vs := it.iitr.Value()
  632. item.meta = vs.Meta
  633. item.userMeta = vs.UserMeta
  634. item.expiresAt = vs.ExpiresAt
  635. item.version = y.ParseTs(it.iitr.Key())
  636. item.key = y.SafeCopy(item.key, y.ParseKey(it.iitr.Key()))
  637. item.vptr = y.SafeCopy(item.vptr, vs.Value)
  638. item.val = nil
  639. if it.opt.PrefetchValues {
  640. item.wg.Add(1)
  641. go func() {
  642. // FIXME we are not handling errors here.
  643. item.prefetchValue()
  644. item.wg.Done()
  645. }()
  646. }
  647. }
  648. func hasPrefix(it *Iterator) bool {
  649. // We shouldn't check prefix in case the iterator is going in reverse. Since in reverse we expect
  650. // people to append items to the end of prefix.
  651. if !it.opt.Reverse && len(it.opt.Prefix) > 0 {
  652. return bytes.HasPrefix(y.ParseKey(it.iitr.Key()), it.opt.Prefix)
  653. }
  654. return true
  655. }
  656. func (it *Iterator) prefetch() {
  657. prefetchSize := 2
  658. if it.opt.PrefetchValues && it.opt.PrefetchSize > 1 {
  659. prefetchSize = it.opt.PrefetchSize
  660. }
  661. i := it.iitr
  662. var count int
  663. it.item = nil
  664. for i.Valid() && hasPrefix(it) {
  665. if !it.parseItem() {
  666. continue
  667. }
  668. count++
  669. if count == prefetchSize {
  670. break
  671. }
  672. }
  673. }
  674. // Seek would seek to the provided key if present. If absent, it would seek to the next
  675. // smallest key greater than the provided key if iterating in the forward direction.
  676. // Behavior would be reversed if iterating backwards.
  677. func (it *Iterator) Seek(key []byte) {
  678. if it.iitr == nil {
  679. return
  680. }
  681. if len(key) > 0 {
  682. it.txn.addReadKey(key)
  683. }
  684. for i := it.data.pop(); i != nil; i = it.data.pop() {
  685. i.wg.Wait()
  686. it.waste.push(i)
  687. }
  688. it.lastKey = it.lastKey[:0]
  689. if len(key) == 0 {
  690. key = it.opt.Prefix
  691. }
  692. if len(key) == 0 {
  693. it.iitr.Rewind()
  694. it.prefetch()
  695. return
  696. }
  697. if !it.opt.Reverse {
  698. key = y.KeyWithTs(key, it.txn.readTs)
  699. } else {
  700. key = y.KeyWithTs(key, 0)
  701. }
  702. it.iitr.Seek(key)
  703. it.prefetch()
  704. }
  705. // Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
  706. // smallest key if iterating forward, and largest if iterating backward. It does not keep track of
  707. // whether the cursor started with a Seek().
  708. func (it *Iterator) Rewind() {
  709. it.Seek(nil)
  710. }