watermark.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package y
  6. import (
  7. "container/heap"
  8. "context"
  9. "sync/atomic"
  10. "github.com/dgraph-io/ristretto/v2/z"
  11. )
  12. type uint64Heap []uint64
  13. func (u uint64Heap) Len() int { return len(u) }
  14. func (u uint64Heap) Less(i, j int) bool { return u[i] < u[j] }
  15. func (u uint64Heap) Swap(i, j int) { u[i], u[j] = u[j], u[i] }
  16. func (u *uint64Heap) Push(x interface{}) { *u = append(*u, x.(uint64)) }
  17. func (u *uint64Heap) Pop() interface{} {
  18. old := *u
  19. n := len(old)
  20. x := old[n-1]
  21. *u = old[0 : n-1]
  22. return x
  23. }
  24. // mark contains one of more indices, along with a done boolean to indicate the
  25. // status of the index: begin or done. It also contains waiters, who could be
  26. // waiting for the watermark to reach >= a certain index.
  27. type mark struct {
  28. // Either this is an (index, waiter) pair or (index, done) or (indices, done).
  29. index uint64
  30. waiter chan struct{}
  31. indices []uint64
  32. done bool // Set to true if the index is done.
  33. }
  34. // WaterMark is used to keep track of the minimum un-finished index. Typically, an index k becomes
  35. // finished or "done" according to a WaterMark once Done(k) has been called
  36. // 1. as many times as Begin(k) has, AND
  37. // 2. a positive number of times.
  38. //
  39. // An index may also become "done" by calling SetDoneUntil at a time such that it is not
  40. // inter-mingled with Begin/Done calls.
  41. //
  42. // Since doneUntil and lastIndex addresses are passed to sync/atomic packages, we ensure that they
  43. // are 64-bit aligned by putting them at the beginning of the structure.
  44. type WaterMark struct {
  45. doneUntil atomic.Uint64
  46. lastIndex atomic.Uint64
  47. Name string
  48. markCh chan mark
  49. }
  50. // Init initializes a WaterMark struct. MUST be called before using it.
  51. func (w *WaterMark) Init(closer *z.Closer) {
  52. w.markCh = make(chan mark, 100)
  53. go w.process(closer)
  54. }
  55. // Begin sets the last index to the given value.
  56. func (w *WaterMark) Begin(index uint64) {
  57. w.lastIndex.Store(index)
  58. w.markCh <- mark{index: index, done: false}
  59. }
  60. // BeginMany works like Begin but accepts multiple indices.
  61. func (w *WaterMark) BeginMany(indices []uint64) {
  62. w.lastIndex.Store(indices[len(indices)-1])
  63. w.markCh <- mark{index: 0, indices: indices, done: false}
  64. }
  65. // Done sets a single index as done.
  66. func (w *WaterMark) Done(index uint64) {
  67. w.markCh <- mark{index: index, done: true}
  68. }
  69. // DoneMany works like Done but accepts multiple indices.
  70. func (w *WaterMark) DoneMany(indices []uint64) {
  71. w.markCh <- mark{index: 0, indices: indices, done: true}
  72. }
  73. // DoneUntil returns the maximum index that has the property that all indices
  74. // less than or equal to it are done.
  75. func (w *WaterMark) DoneUntil() uint64 {
  76. return w.doneUntil.Load()
  77. }
  78. // SetDoneUntil sets the maximum index that has the property that all indices
  79. // less than or equal to it are done.
  80. func (w *WaterMark) SetDoneUntil(val uint64) {
  81. w.doneUntil.Store(val)
  82. }
  83. // LastIndex returns the last index for which Begin has been called.
  84. func (w *WaterMark) LastIndex() uint64 {
  85. return w.lastIndex.Load()
  86. }
  87. // WaitForMark waits until the given index is marked as done.
  88. func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error {
  89. if w.DoneUntil() >= index {
  90. return nil
  91. }
  92. waitCh := make(chan struct{})
  93. w.markCh <- mark{index: index, waiter: waitCh}
  94. select {
  95. case <-ctx.Done():
  96. return ctx.Err()
  97. case <-waitCh:
  98. return nil
  99. }
  100. }
  101. // process is used to process the Mark channel. This is not thread-safe,
  102. // so only run one goroutine for process. One is sufficient, because
  103. // all goroutine ops use purely memory and cpu.
  104. // Each index has to emit atleast one begin watermark in serial order otherwise waiters
  105. // can get blocked idefinitely. Example: We had an watermark at 100 and a waiter at 101,
  106. // if no watermark is emitted at index 101 then waiter would get stuck indefinitely as it
  107. // can't decide whether the task at 101 has decided not to emit watermark or it didn't get
  108. // scheduled yet.
  109. func (w *WaterMark) process(closer *z.Closer) {
  110. defer closer.Done()
  111. var indices uint64Heap
  112. // pending maps raft proposal index to the number of pending mutations for this proposal.
  113. pending := make(map[uint64]int)
  114. waiters := make(map[uint64][]chan struct{})
  115. heap.Init(&indices)
  116. processOne := func(index uint64, done bool) {
  117. // If not already done, then set. Otherwise, don't undo a done entry.
  118. prev, present := pending[index]
  119. if !present {
  120. heap.Push(&indices, index)
  121. }
  122. delta := 1
  123. if done {
  124. delta = -1
  125. }
  126. pending[index] = prev + delta
  127. // Update mark by going through all indices in order; and checking if they have
  128. // been done. Stop at the first index, which isn't done.
  129. doneUntil := w.DoneUntil()
  130. if doneUntil > index {
  131. AssertTruef(false, "Name: %s doneUntil: %d. Index: %d", w.Name, doneUntil, index)
  132. }
  133. until := doneUntil
  134. loops := 0
  135. for len(indices) > 0 {
  136. min := indices[0]
  137. if done := pending[min]; done > 0 {
  138. break // len(indices) will be > 0.
  139. }
  140. // Even if done is called multiple times causing it to become
  141. // negative, we should still pop the index.
  142. heap.Pop(&indices)
  143. delete(pending, min)
  144. until = min
  145. loops++
  146. }
  147. if until != doneUntil {
  148. AssertTrue(w.doneUntil.CompareAndSwap(doneUntil, until))
  149. }
  150. notifyAndRemove := func(idx uint64, toNotify []chan struct{}) {
  151. for _, ch := range toNotify {
  152. close(ch)
  153. }
  154. delete(waiters, idx) // Release the memory back.
  155. }
  156. if until-doneUntil <= uint64(len(waiters)) {
  157. // Issue #908 showed that if doneUntil is close to 2^60, while until is zero, this loop
  158. // can hog up CPU just iterating over integers creating a busy-wait loop. So, only do
  159. // this path if until - doneUntil is less than the number of waiters.
  160. for idx := doneUntil + 1; idx <= until; idx++ {
  161. if toNotify, ok := waiters[idx]; ok {
  162. notifyAndRemove(idx, toNotify)
  163. }
  164. }
  165. } else {
  166. for idx, toNotify := range waiters {
  167. if idx <= until {
  168. notifyAndRemove(idx, toNotify)
  169. }
  170. }
  171. } // end of notifying waiters.
  172. }
  173. for {
  174. select {
  175. case <-closer.HasBeenClosed():
  176. return
  177. case mark := <-w.markCh:
  178. if mark.waiter != nil {
  179. doneUntil := w.doneUntil.Load()
  180. if doneUntil >= mark.index {
  181. close(mark.waiter)
  182. } else {
  183. ws, ok := waiters[mark.index]
  184. if !ok {
  185. waiters[mark.index] = []chan struct{}{mark.waiter}
  186. } else {
  187. waiters[mark.index] = append(ws, mark.waiter)
  188. }
  189. }
  190. } else {
  191. // it is possible that mark.index is zero. We need to handle that case as well.
  192. if mark.index > 0 || (mark.index == 0 && len(mark.indices) == 0) {
  193. processOne(mark.index, mark.done)
  194. }
  195. for _, index := range mark.indices {
  196. processOne(index, mark.done)
  197. }
  198. }
  199. }
  200. }
  201. }