watermark.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. * Copyright 2016-2018 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package y
  17. import (
  18. "container/heap"
  19. "context"
  20. "sync/atomic"
  21. "github.com/dgraph-io/ristretto/v2/z"
  22. )
  23. type uint64Heap []uint64
  24. func (u uint64Heap) Len() int { return len(u) }
  25. func (u uint64Heap) Less(i, j int) bool { return u[i] < u[j] }
  26. func (u uint64Heap) Swap(i, j int) { u[i], u[j] = u[j], u[i] }
  27. func (u *uint64Heap) Push(x interface{}) { *u = append(*u, x.(uint64)) }
  28. func (u *uint64Heap) Pop() interface{} {
  29. old := *u
  30. n := len(old)
  31. x := old[n-1]
  32. *u = old[0 : n-1]
  33. return x
  34. }
  35. // mark contains one of more indices, along with a done boolean to indicate the
  36. // status of the index: begin or done. It also contains waiters, who could be
  37. // waiting for the watermark to reach >= a certain index.
  38. type mark struct {
  39. // Either this is an (index, waiter) pair or (index, done) or (indices, done).
  40. index uint64
  41. waiter chan struct{}
  42. indices []uint64
  43. done bool // Set to true if the index is done.
  44. }
  45. // WaterMark is used to keep track of the minimum un-finished index. Typically, an index k becomes
  46. // finished or "done" according to a WaterMark once Done(k) has been called
  47. // 1. as many times as Begin(k) has, AND
  48. // 2. a positive number of times.
  49. //
  50. // An index may also become "done" by calling SetDoneUntil at a time such that it is not
  51. // inter-mingled with Begin/Done calls.
  52. //
  53. // Since doneUntil and lastIndex addresses are passed to sync/atomic packages, we ensure that they
  54. // are 64-bit aligned by putting them at the beginning of the structure.
  55. type WaterMark struct {
  56. doneUntil atomic.Uint64
  57. lastIndex atomic.Uint64
  58. Name string
  59. markCh chan mark
  60. }
  61. // Init initializes a WaterMark struct. MUST be called before using it.
  62. func (w *WaterMark) Init(closer *z.Closer) {
  63. w.markCh = make(chan mark, 100)
  64. go w.process(closer)
  65. }
  66. // Begin sets the last index to the given value.
  67. func (w *WaterMark) Begin(index uint64) {
  68. w.lastIndex.Store(index)
  69. w.markCh <- mark{index: index, done: false}
  70. }
  71. // BeginMany works like Begin but accepts multiple indices.
  72. func (w *WaterMark) BeginMany(indices []uint64) {
  73. w.lastIndex.Store(indices[len(indices)-1])
  74. w.markCh <- mark{index: 0, indices: indices, done: false}
  75. }
  76. // Done sets a single index as done.
  77. func (w *WaterMark) Done(index uint64) {
  78. w.markCh <- mark{index: index, done: true}
  79. }
  80. // DoneMany works like Done but accepts multiple indices.
  81. func (w *WaterMark) DoneMany(indices []uint64) {
  82. w.markCh <- mark{index: 0, indices: indices, done: true}
  83. }
  84. // DoneUntil returns the maximum index that has the property that all indices
  85. // less than or equal to it are done.
  86. func (w *WaterMark) DoneUntil() uint64 {
  87. return w.doneUntil.Load()
  88. }
  89. // SetDoneUntil sets the maximum index that has the property that all indices
  90. // less than or equal to it are done.
  91. func (w *WaterMark) SetDoneUntil(val uint64) {
  92. w.doneUntil.Store(val)
  93. }
  94. // LastIndex returns the last index for which Begin has been called.
  95. func (w *WaterMark) LastIndex() uint64 {
  96. return w.lastIndex.Load()
  97. }
  98. // WaitForMark waits until the given index is marked as done.
  99. func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error {
  100. if w.DoneUntil() >= index {
  101. return nil
  102. }
  103. waitCh := make(chan struct{})
  104. w.markCh <- mark{index: index, waiter: waitCh}
  105. select {
  106. case <-ctx.Done():
  107. return ctx.Err()
  108. case <-waitCh:
  109. return nil
  110. }
  111. }
  112. // process is used to process the Mark channel. This is not thread-safe,
  113. // so only run one goroutine for process. One is sufficient, because
  114. // all goroutine ops use purely memory and cpu.
  115. // Each index has to emit atleast one begin watermark in serial order otherwise waiters
  116. // can get blocked idefinitely. Example: We had an watermark at 100 and a waiter at 101,
  117. // if no watermark is emitted at index 101 then waiter would get stuck indefinitely as it
  118. // can't decide whether the task at 101 has decided not to emit watermark or it didn't get
  119. // scheduled yet.
  120. func (w *WaterMark) process(closer *z.Closer) {
  121. defer closer.Done()
  122. var indices uint64Heap
  123. // pending maps raft proposal index to the number of pending mutations for this proposal.
  124. pending := make(map[uint64]int)
  125. waiters := make(map[uint64][]chan struct{})
  126. heap.Init(&indices)
  127. processOne := func(index uint64, done bool) {
  128. // If not already done, then set. Otherwise, don't undo a done entry.
  129. prev, present := pending[index]
  130. if !present {
  131. heap.Push(&indices, index)
  132. }
  133. delta := 1
  134. if done {
  135. delta = -1
  136. }
  137. pending[index] = prev + delta
  138. // Update mark by going through all indices in order; and checking if they have
  139. // been done. Stop at the first index, which isn't done.
  140. doneUntil := w.DoneUntil()
  141. if doneUntil > index {
  142. AssertTruef(false, "Name: %s doneUntil: %d. Index: %d", w.Name, doneUntil, index)
  143. }
  144. until := doneUntil
  145. loops := 0
  146. for len(indices) > 0 {
  147. min := indices[0]
  148. if done := pending[min]; done > 0 {
  149. break // len(indices) will be > 0.
  150. }
  151. // Even if done is called multiple times causing it to become
  152. // negative, we should still pop the index.
  153. heap.Pop(&indices)
  154. delete(pending, min)
  155. until = min
  156. loops++
  157. }
  158. if until != doneUntil {
  159. AssertTrue(w.doneUntil.CompareAndSwap(doneUntil, until))
  160. }
  161. notifyAndRemove := func(idx uint64, toNotify []chan struct{}) {
  162. for _, ch := range toNotify {
  163. close(ch)
  164. }
  165. delete(waiters, idx) // Release the memory back.
  166. }
  167. if until-doneUntil <= uint64(len(waiters)) {
  168. // Issue #908 showed that if doneUntil is close to 2^60, while until is zero, this loop
  169. // can hog up CPU just iterating over integers creating a busy-wait loop. So, only do
  170. // this path if until - doneUntil is less than the number of waiters.
  171. for idx := doneUntil + 1; idx <= until; idx++ {
  172. if toNotify, ok := waiters[idx]; ok {
  173. notifyAndRemove(idx, toNotify)
  174. }
  175. }
  176. } else {
  177. for idx, toNotify := range waiters {
  178. if idx <= until {
  179. notifyAndRemove(idx, toNotify)
  180. }
  181. }
  182. } // end of notifying waiters.
  183. }
  184. for {
  185. select {
  186. case <-closer.HasBeenClosed():
  187. return
  188. case mark := <-w.markCh:
  189. if mark.waiter != nil {
  190. doneUntil := w.doneUntil.Load()
  191. if doneUntil >= mark.index {
  192. close(mark.waiter)
  193. } else {
  194. ws, ok := waiters[mark.index]
  195. if !ok {
  196. waiters[mark.index] = []chan struct{}{mark.waiter}
  197. } else {
  198. waiters[mark.index] = append(ws, mark.waiter)
  199. }
  200. }
  201. } else {
  202. // it is possible that mark.index is zero. We need to handle that case as well.
  203. if mark.index > 0 || (mark.index == 0 && len(mark.indices) == 0) {
  204. processOne(mark.index, mark.done)
  205. }
  206. for _, index := range mark.indices {
  207. processOne(index, mark.done)
  208. }
  209. }
  210. }
  211. }
  212. }