merge.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. stderrors "errors"
  8. "sync"
  9. "time"
  10. "github.com/dgraph-io/badger/v4/y"
  11. "github.com/dgraph-io/ristretto/v2/z"
  12. )
  13. // MergeOperator represents a Badger merge operator.
  14. type MergeOperator struct {
  15. sync.RWMutex
  16. f MergeFunc
  17. db *DB
  18. key []byte
  19. closer *z.Closer
  20. }
  21. // MergeFunc accepts two byte slices, one representing an existing value, and
  22. // another representing a new value that needs to be ‘merged’ into it. MergeFunc
  23. // contains the logic to perform the ‘merge’ and return an updated value.
  24. // MergeFunc could perform operations like integer addition, list appends etc.
  25. // Note that the ordering of the operands is maintained.
  26. type MergeFunc func(existingVal, newVal []byte) []byte
  27. // GetMergeOperator creates a new MergeOperator for a given key and returns a
  28. // pointer to it. It also fires off a goroutine that performs a compaction using
  29. // the merge function that runs periodically, as specified by dur.
  30. func (db *DB) GetMergeOperator(key []byte,
  31. f MergeFunc, dur time.Duration) *MergeOperator {
  32. op := &MergeOperator{
  33. f: f,
  34. db: db,
  35. key: key,
  36. closer: z.NewCloser(1),
  37. }
  38. go op.runCompactions(dur)
  39. return op
  40. }
  41. var errNoMerge = stderrors.New("No need for merge")
  42. func (op *MergeOperator) iterateAndMerge() (newVal []byte, latest uint64, err error) {
  43. txn := op.db.NewTransaction(false)
  44. defer txn.Discard()
  45. opt := DefaultIteratorOptions
  46. opt.AllVersions = true
  47. it := txn.NewKeyIterator(op.key, opt)
  48. defer it.Close()
  49. var numVersions int
  50. for it.Rewind(); it.Valid(); it.Next() {
  51. item := it.Item()
  52. if item.IsDeletedOrExpired() {
  53. break
  54. }
  55. numVersions++
  56. if numVersions == 1 {
  57. // This should be the newVal, considering this is the latest version.
  58. newVal, err = item.ValueCopy(newVal)
  59. if err != nil {
  60. return nil, 0, err
  61. }
  62. latest = item.Version()
  63. } else {
  64. if err := item.Value(func(oldVal []byte) error {
  65. // The merge should always be on the newVal considering it has the merge result of
  66. // the latest version. The value read should be the oldVal.
  67. newVal = op.f(oldVal, newVal)
  68. return nil
  69. }); err != nil {
  70. return nil, 0, err
  71. }
  72. }
  73. if item.DiscardEarlierVersions() {
  74. break
  75. }
  76. }
  77. if numVersions == 0 {
  78. return nil, latest, ErrKeyNotFound
  79. } else if numVersions == 1 {
  80. return newVal, latest, errNoMerge
  81. }
  82. return newVal, latest, nil
  83. }
  84. func (op *MergeOperator) compact() error {
  85. op.Lock()
  86. defer op.Unlock()
  87. val, version, err := op.iterateAndMerge()
  88. if err == ErrKeyNotFound || err == errNoMerge {
  89. return nil
  90. } else if err != nil {
  91. return err
  92. }
  93. entries := []*Entry{
  94. {
  95. Key: y.KeyWithTs(op.key, version),
  96. Value: val,
  97. meta: bitDiscardEarlierVersions,
  98. },
  99. }
  100. // Write value back to the DB. It is important that we do not set the bitMergeEntry bit
  101. // here. When compaction happens, all the older merged entries will be removed.
  102. return op.db.batchSetAsync(entries, func(err error) {
  103. if err != nil {
  104. op.db.opt.Errorf("failed to insert the result of merge compaction: %s", err)
  105. }
  106. })
  107. }
  108. func (op *MergeOperator) runCompactions(dur time.Duration) {
  109. ticker := time.NewTicker(dur)
  110. defer op.closer.Done()
  111. var stop bool
  112. for {
  113. select {
  114. case <-op.closer.HasBeenClosed():
  115. stop = true
  116. case <-ticker.C: // wait for tick
  117. }
  118. if err := op.compact(); err != nil {
  119. op.db.opt.Errorf("failure while running merge operation: %s", err)
  120. }
  121. if stop {
  122. ticker.Stop()
  123. break
  124. }
  125. }
  126. }
  127. // Add records a value in Badger which will eventually be merged by a background
  128. // routine into the values that were recorded by previous invocations to Add().
  129. func (op *MergeOperator) Add(val []byte) error {
  130. return op.db.Update(func(txn *Txn) error {
  131. return txn.SetEntry(NewEntry(op.key, val).withMergeBit())
  132. })
  133. }
  134. // Get returns the latest value for the merge operator, which is derived by
  135. // applying the merge function to all the values added so far.
  136. //
  137. // If Add has not been called even once, Get will return ErrKeyNotFound.
  138. func (op *MergeOperator) Get() ([]byte, error) {
  139. op.RLock()
  140. defer op.RUnlock()
  141. var existing []byte
  142. err := op.db.View(func(txn *Txn) (err error) {
  143. existing, _, err = op.iterateAndMerge()
  144. return err
  145. })
  146. if err == errNoMerge {
  147. return existing, nil
  148. }
  149. return existing, err
  150. }
  151. // Stop waits for any pending merge to complete and then stops the background
  152. // goroutine.
  153. func (op *MergeOperator) Stop() {
  154. op.closer.SignalAndWait()
  155. }