merge.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. stderrors "errors"
  19. "sync"
  20. "time"
  21. "github.com/dgraph-io/badger/v4/y"
  22. "github.com/dgraph-io/ristretto/v2/z"
  23. )
  24. // MergeOperator represents a Badger merge operator.
  25. type MergeOperator struct {
  26. sync.RWMutex
  27. f MergeFunc
  28. db *DB
  29. key []byte
  30. closer *z.Closer
  31. }
  32. // MergeFunc accepts two byte slices, one representing an existing value, and
  33. // another representing a new value that needs to be ‘merged’ into it. MergeFunc
  34. // contains the logic to perform the ‘merge’ and return an updated value.
  35. // MergeFunc could perform operations like integer addition, list appends etc.
  36. // Note that the ordering of the operands is maintained.
  37. type MergeFunc func(existingVal, newVal []byte) []byte
  38. // GetMergeOperator creates a new MergeOperator for a given key and returns a
  39. // pointer to it. It also fires off a goroutine that performs a compaction using
  40. // the merge function that runs periodically, as specified by dur.
  41. func (db *DB) GetMergeOperator(key []byte,
  42. f MergeFunc, dur time.Duration) *MergeOperator {
  43. op := &MergeOperator{
  44. f: f,
  45. db: db,
  46. key: key,
  47. closer: z.NewCloser(1),
  48. }
  49. go op.runCompactions(dur)
  50. return op
  51. }
  52. var errNoMerge = stderrors.New("No need for merge")
  53. func (op *MergeOperator) iterateAndMerge() (newVal []byte, latest uint64, err error) {
  54. txn := op.db.NewTransaction(false)
  55. defer txn.Discard()
  56. opt := DefaultIteratorOptions
  57. opt.AllVersions = true
  58. it := txn.NewKeyIterator(op.key, opt)
  59. defer it.Close()
  60. var numVersions int
  61. for it.Rewind(); it.Valid(); it.Next() {
  62. item := it.Item()
  63. if item.IsDeletedOrExpired() {
  64. break
  65. }
  66. numVersions++
  67. if numVersions == 1 {
  68. // This should be the newVal, considering this is the latest version.
  69. newVal, err = item.ValueCopy(newVal)
  70. if err != nil {
  71. return nil, 0, err
  72. }
  73. latest = item.Version()
  74. } else {
  75. if err := item.Value(func(oldVal []byte) error {
  76. // The merge should always be on the newVal considering it has the merge result of
  77. // the latest version. The value read should be the oldVal.
  78. newVal = op.f(oldVal, newVal)
  79. return nil
  80. }); err != nil {
  81. return nil, 0, err
  82. }
  83. }
  84. if item.DiscardEarlierVersions() {
  85. break
  86. }
  87. }
  88. if numVersions == 0 {
  89. return nil, latest, ErrKeyNotFound
  90. } else if numVersions == 1 {
  91. return newVal, latest, errNoMerge
  92. }
  93. return newVal, latest, nil
  94. }
  95. func (op *MergeOperator) compact() error {
  96. op.Lock()
  97. defer op.Unlock()
  98. val, version, err := op.iterateAndMerge()
  99. if err == ErrKeyNotFound || err == errNoMerge {
  100. return nil
  101. } else if err != nil {
  102. return err
  103. }
  104. entries := []*Entry{
  105. {
  106. Key: y.KeyWithTs(op.key, version),
  107. Value: val,
  108. meta: bitDiscardEarlierVersions,
  109. },
  110. }
  111. // Write value back to the DB. It is important that we do not set the bitMergeEntry bit
  112. // here. When compaction happens, all the older merged entries will be removed.
  113. return op.db.batchSetAsync(entries, func(err error) {
  114. if err != nil {
  115. op.db.opt.Errorf("failed to insert the result of merge compaction: %s", err)
  116. }
  117. })
  118. }
  119. func (op *MergeOperator) runCompactions(dur time.Duration) {
  120. ticker := time.NewTicker(dur)
  121. defer op.closer.Done()
  122. var stop bool
  123. for {
  124. select {
  125. case <-op.closer.HasBeenClosed():
  126. stop = true
  127. case <-ticker.C: // wait for tick
  128. }
  129. if err := op.compact(); err != nil {
  130. op.db.opt.Errorf("failure while running merge operation: %s", err)
  131. }
  132. if stop {
  133. ticker.Stop()
  134. break
  135. }
  136. }
  137. }
  138. // Add records a value in Badger which will eventually be merged by a background
  139. // routine into the values that were recorded by previous invocations to Add().
  140. func (op *MergeOperator) Add(val []byte) error {
  141. return op.db.Update(func(txn *Txn) error {
  142. return txn.SetEntry(NewEntry(op.key, val).withMergeBit())
  143. })
  144. }
  145. // Get returns the latest value for the merge operator, which is derived by
  146. // applying the merge function to all the values added so far.
  147. //
  148. // If Add has not been called even once, Get will return ErrKeyNotFound.
  149. func (op *MergeOperator) Get() ([]byte, error) {
  150. op.RLock()
  151. defer op.RUnlock()
  152. var existing []byte
  153. err := op.db.View(func(txn *Txn) (err error) {
  154. existing, _, err = op.iterateAndMerge()
  155. return err
  156. })
  157. if err == errNoMerge {
  158. return existing, nil
  159. }
  160. return existing, err
  161. }
  162. // Stop waits for any pending merge to complete and then stops the background
  163. // goroutine.
  164. func (op *MergeOperator) Stop() {
  165. op.closer.SignalAndWait()
  166. }