| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- /*
- * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
- * SPDX-License-Identifier: Apache-2.0
- */
- package y
- import (
- "container/heap"
- "context"
- "sync/atomic"
- "github.com/dgraph-io/ristretto/v2/z"
- )
- type uint64Heap []uint64
- func (u uint64Heap) Len() int { return len(u) }
- func (u uint64Heap) Less(i, j int) bool { return u[i] < u[j] }
- func (u uint64Heap) Swap(i, j int) { u[i], u[j] = u[j], u[i] }
- func (u *uint64Heap) Push(x interface{}) { *u = append(*u, x.(uint64)) }
- func (u *uint64Heap) Pop() interface{} {
- old := *u
- n := len(old)
- x := old[n-1]
- *u = old[0 : n-1]
- return x
- }
- // mark contains one of more indices, along with a done boolean to indicate the
- // status of the index: begin or done. It also contains waiters, who could be
- // waiting for the watermark to reach >= a certain index.
- type mark struct {
- // Either this is an (index, waiter) pair or (index, done) or (indices, done).
- index uint64
- waiter chan struct{}
- indices []uint64
- done bool // Set to true if the index is done.
- }
- // WaterMark is used to keep track of the minimum un-finished index. Typically, an index k becomes
- // finished or "done" according to a WaterMark once Done(k) has been called
- // 1. as many times as Begin(k) has, AND
- // 2. a positive number of times.
- //
- // An index may also become "done" by calling SetDoneUntil at a time such that it is not
- // inter-mingled with Begin/Done calls.
- //
- // Since doneUntil and lastIndex addresses are passed to sync/atomic packages, we ensure that they
- // are 64-bit aligned by putting them at the beginning of the structure.
- type WaterMark struct {
- doneUntil atomic.Uint64
- lastIndex atomic.Uint64
- Name string
- markCh chan mark
- }
- // Init initializes a WaterMark struct. MUST be called before using it.
- func (w *WaterMark) Init(closer *z.Closer) {
- w.markCh = make(chan mark, 100)
- go w.process(closer)
- }
- // Begin sets the last index to the given value.
- func (w *WaterMark) Begin(index uint64) {
- w.lastIndex.Store(index)
- w.markCh <- mark{index: index, done: false}
- }
- // BeginMany works like Begin but accepts multiple indices.
- func (w *WaterMark) BeginMany(indices []uint64) {
- w.lastIndex.Store(indices[len(indices)-1])
- w.markCh <- mark{index: 0, indices: indices, done: false}
- }
- // Done sets a single index as done.
- func (w *WaterMark) Done(index uint64) {
- w.markCh <- mark{index: index, done: true}
- }
- // DoneMany works like Done but accepts multiple indices.
- func (w *WaterMark) DoneMany(indices []uint64) {
- w.markCh <- mark{index: 0, indices: indices, done: true}
- }
- // DoneUntil returns the maximum index that has the property that all indices
- // less than or equal to it are done.
- func (w *WaterMark) DoneUntil() uint64 {
- return w.doneUntil.Load()
- }
- // SetDoneUntil sets the maximum index that has the property that all indices
- // less than or equal to it are done.
- func (w *WaterMark) SetDoneUntil(val uint64) {
- w.doneUntil.Store(val)
- }
- // LastIndex returns the last index for which Begin has been called.
- func (w *WaterMark) LastIndex() uint64 {
- return w.lastIndex.Load()
- }
- // WaitForMark waits until the given index is marked as done.
- func (w *WaterMark) WaitForMark(ctx context.Context, index uint64) error {
- if w.DoneUntil() >= index {
- return nil
- }
- waitCh := make(chan struct{})
- w.markCh <- mark{index: index, waiter: waitCh}
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-waitCh:
- return nil
- }
- }
- // process is used to process the Mark channel. This is not thread-safe,
- // so only run one goroutine for process. One is sufficient, because
- // all goroutine ops use purely memory and cpu.
- // Each index has to emit atleast one begin watermark in serial order otherwise waiters
- // can get blocked idefinitely. Example: We had an watermark at 100 and a waiter at 101,
- // if no watermark is emitted at index 101 then waiter would get stuck indefinitely as it
- // can't decide whether the task at 101 has decided not to emit watermark or it didn't get
- // scheduled yet.
- func (w *WaterMark) process(closer *z.Closer) {
- defer closer.Done()
- var indices uint64Heap
- // pending maps raft proposal index to the number of pending mutations for this proposal.
- pending := make(map[uint64]int)
- waiters := make(map[uint64][]chan struct{})
- heap.Init(&indices)
- processOne := func(index uint64, done bool) {
- // If not already done, then set. Otherwise, don't undo a done entry.
- prev, present := pending[index]
- if !present {
- heap.Push(&indices, index)
- }
- delta := 1
- if done {
- delta = -1
- }
- pending[index] = prev + delta
- // Update mark by going through all indices in order; and checking if they have
- // been done. Stop at the first index, which isn't done.
- doneUntil := w.DoneUntil()
- if doneUntil > index {
- AssertTruef(false, "Name: %s doneUntil: %d. Index: %d", w.Name, doneUntil, index)
- }
- until := doneUntil
- loops := 0
- for len(indices) > 0 {
- min := indices[0]
- if done := pending[min]; done > 0 {
- break // len(indices) will be > 0.
- }
- // Even if done is called multiple times causing it to become
- // negative, we should still pop the index.
- heap.Pop(&indices)
- delete(pending, min)
- until = min
- loops++
- }
- if until != doneUntil {
- AssertTrue(w.doneUntil.CompareAndSwap(doneUntil, until))
- }
- notifyAndRemove := func(idx uint64, toNotify []chan struct{}) {
- for _, ch := range toNotify {
- close(ch)
- }
- delete(waiters, idx) // Release the memory back.
- }
- if until-doneUntil <= uint64(len(waiters)) {
- // Issue #908 showed that if doneUntil is close to 2^60, while until is zero, this loop
- // can hog up CPU just iterating over integers creating a busy-wait loop. So, only do
- // this path if until - doneUntil is less than the number of waiters.
- for idx := doneUntil + 1; idx <= until; idx++ {
- if toNotify, ok := waiters[idx]; ok {
- notifyAndRemove(idx, toNotify)
- }
- }
- } else {
- for idx, toNotify := range waiters {
- if idx <= until {
- notifyAndRemove(idx, toNotify)
- }
- }
- } // end of notifying waiters.
- }
- for {
- select {
- case <-closer.HasBeenClosed():
- return
- case mark := <-w.markCh:
- if mark.waiter != nil {
- doneUntil := w.doneUntil.Load()
- if doneUntil >= mark.index {
- close(mark.waiter)
- } else {
- ws, ok := waiters[mark.index]
- if !ok {
- waiters[mark.index] = []chan struct{}{mark.waiter}
- } else {
- waiters[mark.index] = append(ws, mark.waiter)
- }
- }
- } else {
- // it is possible that mark.index is zero. We need to handle that case as well.
- if mark.index > 0 || (mark.index == 0 && len(mark.indices) == 0) {
- processOne(mark.index, mark.done)
- }
- for _, index := range mark.indices {
- processOne(index, mark.done)
- }
- }
- }
- }
- }
|