ring.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package ristretto
  6. import (
  7. "sync"
  8. )
  9. // ringConsumer is the user-defined object responsible for receiving and
  10. // processing items in batches when buffers are drained.
  11. type ringConsumer interface {
  12. Push([]uint64) bool
  13. }
  14. // ringStripe is a singular ring buffer that is not concurrent safe.
  15. type ringStripe struct {
  16. cons ringConsumer
  17. data []uint64
  18. capa int
  19. }
  20. func newRingStripe(cons ringConsumer, capa int64) *ringStripe {
  21. return &ringStripe{
  22. cons: cons,
  23. data: make([]uint64, 0, capa),
  24. capa: int(capa),
  25. }
  26. }
  27. // Push appends an item in the ring buffer and drains (copies items and
  28. // sends to Consumer) if full.
  29. func (s *ringStripe) Push(item uint64) {
  30. s.data = append(s.data, item)
  31. // Decide if the ring buffer should be drained.
  32. if len(s.data) >= s.capa {
  33. // Send elements to consumer and create a new ring stripe.
  34. if s.cons.Push(s.data) {
  35. s.data = make([]uint64, 0, s.capa)
  36. } else {
  37. s.data = s.data[:0]
  38. }
  39. }
  40. }
  41. // ringBuffer stores multiple buffers (stripes) and distributes Pushed items
  42. // between them to lower contention.
  43. //
  44. // This implements the "batching" process described in the BP-Wrapper paper
  45. // (section III part A).
  46. type ringBuffer struct {
  47. pool *sync.Pool
  48. }
  49. // newRingBuffer returns a striped ring buffer. The Consumer in ringConfig will
  50. // be called when individual stripes are full and need to drain their elements.
  51. func newRingBuffer(cons ringConsumer, capa int64) *ringBuffer {
  52. // LOSSY buffers use a very simple sync.Pool for concurrently reusing
  53. // stripes. We do lose some stripes due to GC (unheld items in sync.Pool
  54. // are cleared), but the performance gains generally outweigh the small
  55. // percentage of elements lost. The performance primarily comes from
  56. // low-level runtime functions used in the standard library that aren't
  57. // available to us (such as runtime_procPin()).
  58. return &ringBuffer{
  59. pool: &sync.Pool{
  60. New: func() interface{} { return newRingStripe(cons, capa) },
  61. },
  62. }
  63. }
  64. // Push adds an element to one of the internal stripes and possibly drains if
  65. // the stripe becomes full.
  66. func (b *ringBuffer) Push(item uint64) {
  67. // Reuse or create a new stripe.
  68. stripe := b.pool.Get().(*ringStripe)
  69. stripe.Push(item)
  70. b.pool.Put(stripe)
  71. }