merge_iterator.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package table
  6. import (
  7. "bytes"
  8. "github.com/dgraph-io/badger/v4/y"
  9. )
  10. // MergeIterator merges multiple iterators.
  11. // NOTE: MergeIterator owns the array of iterators and is responsible for closing them.
  12. type MergeIterator struct {
  13. left node
  14. right node
  15. small *node
  16. curKey []byte
  17. reverse bool
  18. }
  19. type node struct {
  20. valid bool
  21. key []byte
  22. iter y.Iterator
  23. // The two iterators are type asserted from `y.Iterator`, used to inline more function calls.
  24. // Calling functions on concrete types is much faster (about 25-30%) than calling the
  25. // interface's function.
  26. merge *MergeIterator
  27. concat *ConcatIterator
  28. }
  29. func (n *node) setIterator(iter y.Iterator) {
  30. n.iter = iter
  31. // It's okay if the type assertion below fails and n.merge/n.concat are set to nil.
  32. // We handle the nil values of merge and concat in all the methods.
  33. n.merge, _ = iter.(*MergeIterator)
  34. n.concat, _ = iter.(*ConcatIterator)
  35. }
  36. func (n *node) setKey() {
  37. switch {
  38. case n.merge != nil:
  39. n.valid = n.merge.small.valid
  40. if n.valid {
  41. n.key = n.merge.small.key
  42. }
  43. case n.concat != nil:
  44. n.valid = n.concat.Valid()
  45. if n.valid {
  46. n.key = n.concat.Key()
  47. }
  48. default:
  49. n.valid = n.iter.Valid()
  50. if n.valid {
  51. n.key = n.iter.Key()
  52. }
  53. }
  54. }
  55. func (n *node) next() {
  56. switch {
  57. case n.merge != nil:
  58. n.merge.Next()
  59. case n.concat != nil:
  60. n.concat.Next()
  61. default:
  62. n.iter.Next()
  63. }
  64. n.setKey()
  65. }
  66. func (n *node) rewind() {
  67. n.iter.Rewind()
  68. n.setKey()
  69. }
  70. func (n *node) seek(key []byte) {
  71. n.iter.Seek(key)
  72. n.setKey()
  73. }
  74. func (mi *MergeIterator) fix() {
  75. if !mi.bigger().valid {
  76. return
  77. }
  78. if !mi.small.valid {
  79. mi.swapSmall()
  80. return
  81. }
  82. cmp := y.CompareKeys(mi.small.key, mi.bigger().key)
  83. switch {
  84. case cmp == 0: // Both the keys are equal.
  85. // In case of same keys, move the right iterator ahead.
  86. mi.right.next()
  87. if &mi.right == mi.small {
  88. mi.swapSmall()
  89. }
  90. return
  91. case cmp < 0: // Small is less than bigger().
  92. if mi.reverse {
  93. mi.swapSmall()
  94. } else { //nolint:staticcheck
  95. // we don't need to do anything. Small already points to the smallest.
  96. }
  97. return
  98. default: // bigger() is less than small.
  99. if mi.reverse {
  100. // Do nothing since we're iterating in reverse. Small currently points to
  101. // the bigger key and that's okay in reverse iteration.
  102. } else {
  103. mi.swapSmall()
  104. }
  105. return
  106. }
  107. }
  108. func (mi *MergeIterator) bigger() *node {
  109. if mi.small == &mi.left {
  110. return &mi.right
  111. }
  112. return &mi.left
  113. }
  114. func (mi *MergeIterator) swapSmall() {
  115. if mi.small == &mi.left {
  116. mi.small = &mi.right
  117. return
  118. }
  119. if mi.small == &mi.right {
  120. mi.small = &mi.left
  121. return
  122. }
  123. }
  124. // Next returns the next element. If it is the same as the current key, ignore it.
  125. func (mi *MergeIterator) Next() {
  126. for mi.Valid() {
  127. if !bytes.Equal(mi.small.key, mi.curKey) {
  128. break
  129. }
  130. mi.small.next()
  131. mi.fix()
  132. }
  133. mi.setCurrent()
  134. }
  135. func (mi *MergeIterator) setCurrent() {
  136. mi.curKey = append(mi.curKey[:0], mi.small.key...)
  137. }
  138. // Rewind seeks to first element (or last element for reverse iterator).
  139. func (mi *MergeIterator) Rewind() {
  140. mi.left.rewind()
  141. mi.right.rewind()
  142. mi.fix()
  143. mi.setCurrent()
  144. }
  145. // Seek brings us to element with key >= given key.
  146. func (mi *MergeIterator) Seek(key []byte) {
  147. mi.left.seek(key)
  148. mi.right.seek(key)
  149. mi.fix()
  150. mi.setCurrent()
  151. }
  152. // Valid returns whether the MergeIterator is at a valid element.
  153. func (mi *MergeIterator) Valid() bool {
  154. return mi.small.valid
  155. }
  156. // Key returns the key associated with the current iterator.
  157. func (mi *MergeIterator) Key() []byte {
  158. return mi.small.key
  159. }
  160. // Value returns the value associated with the iterator.
  161. func (mi *MergeIterator) Value() y.ValueStruct {
  162. return mi.small.iter.Value()
  163. }
  164. // Close implements y.Iterator.
  165. func (mi *MergeIterator) Close() error {
  166. err1 := mi.left.iter.Close()
  167. err2 := mi.right.iter.Close()
  168. if err1 != nil {
  169. return y.Wrap(err1, "MergeIterator")
  170. }
  171. return y.Wrap(err2, "MergeIterator")
  172. }
  173. // NewMergeIterator creates a merge iterator.
  174. func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator {
  175. switch len(iters) {
  176. case 0:
  177. return nil
  178. case 1:
  179. return iters[0]
  180. case 2:
  181. mi := &MergeIterator{
  182. reverse: reverse,
  183. }
  184. mi.left.setIterator(iters[0])
  185. mi.right.setIterator(iters[1])
  186. // Assign left iterator randomly. This will be fixed when user calls rewind/seek.
  187. mi.small = &mi.left
  188. return mi
  189. }
  190. mid := len(iters) / 2
  191. return NewMergeIterator(
  192. []y.Iterator{
  193. NewMergeIterator(iters[:mid], reverse),
  194. NewMergeIterator(iters[mid:], reverse),
  195. }, reverse)
  196. }