policy.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package ristretto
  6. import (
  7. "math"
  8. "sync"
  9. "sync/atomic"
  10. "github.com/dgraph-io/ristretto/v2/z"
  11. )
  12. const (
  13. // lfuSample is the number of items to sample when looking at eviction
  14. // candidates. 5 seems to be the most optimal number [citation needed].
  15. lfuSample = 5
  16. )
  17. func newPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] {
  18. return newDefaultPolicy[V](numCounters, maxCost)
  19. }
  20. type defaultPolicy[V any] struct {
  21. sync.Mutex
  22. admit *tinyLFU
  23. evict *sampledLFU
  24. itemsCh chan []uint64
  25. stop chan struct{}
  26. done chan struct{}
  27. isClosed bool
  28. metrics *Metrics
  29. }
  30. func newDefaultPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] {
  31. p := &defaultPolicy[V]{
  32. admit: newTinyLFU(numCounters),
  33. evict: newSampledLFU(maxCost),
  34. itemsCh: make(chan []uint64, 3),
  35. stop: make(chan struct{}),
  36. done: make(chan struct{}),
  37. }
  38. go p.processItems()
  39. return p
  40. }
  41. func (p *defaultPolicy[V]) CollectMetrics(metrics *Metrics) {
  42. p.metrics = metrics
  43. p.evict.metrics = metrics
  44. }
  45. type policyPair struct {
  46. key uint64
  47. cost int64
  48. }
  49. func (p *defaultPolicy[V]) processItems() {
  50. for {
  51. select {
  52. case items := <-p.itemsCh:
  53. p.Lock()
  54. p.admit.Push(items)
  55. p.Unlock()
  56. case <-p.stop:
  57. p.done <- struct{}{}
  58. return
  59. }
  60. }
  61. }
  62. func (p *defaultPolicy[V]) Push(keys []uint64) bool {
  63. if p.isClosed {
  64. return false
  65. }
  66. if len(keys) == 0 {
  67. return true
  68. }
  69. select {
  70. case p.itemsCh <- keys:
  71. p.metrics.add(keepGets, keys[0], uint64(len(keys)))
  72. return true
  73. default:
  74. p.metrics.add(dropGets, keys[0], uint64(len(keys)))
  75. return false
  76. }
  77. }
  78. // Add decides whether the item with the given key and cost should be accepted by
  79. // the policy. It returns the list of victims that have been evicted and a boolean
  80. // indicating whether the incoming item should be accepted.
  81. func (p *defaultPolicy[V]) Add(key uint64, cost int64) ([]*Item[V], bool) {
  82. p.Lock()
  83. defer p.Unlock()
  84. // Cannot add an item bigger than entire cache.
  85. if cost > p.evict.getMaxCost() {
  86. return nil, false
  87. }
  88. // No need to go any further if the item is already in the cache.
  89. if has := p.evict.updateIfHas(key, cost); has {
  90. // An update does not count as an addition, so return false.
  91. return nil, false
  92. }
  93. // If the execution reaches this point, the key doesn't exist in the cache.
  94. // Calculate the remaining room in the cache (usually bytes).
  95. room := p.evict.roomLeft(cost)
  96. if room >= 0 {
  97. // There's enough room in the cache to store the new item without
  98. // overflowing. Do that now and stop here.
  99. p.evict.add(key, cost)
  100. p.metrics.add(costAdd, key, uint64(cost))
  101. return nil, true
  102. }
  103. // incHits is the hit count for the incoming item.
  104. incHits := p.admit.Estimate(key)
  105. // sample is the eviction candidate pool to be filled via random sampling.
  106. // TODO: perhaps we should use a min heap here. Right now our time
  107. // complexity is N for finding the min. Min heap should bring it down to
  108. // O(lg N).
  109. sample := make([]*policyPair, 0, lfuSample)
  110. // As items are evicted they will be appended to victims.
  111. victims := make([]*Item[V], 0)
  112. // Delete victims until there's enough space or a minKey is found that has
  113. // more hits than incoming item.
  114. for ; room < 0; room = p.evict.roomLeft(cost) {
  115. // Fill up empty slots in sample.
  116. sample = p.evict.fillSample(sample)
  117. // Find minimally used item in sample.
  118. minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0)
  119. for i, pair := range sample {
  120. // Look up hit count for sample key.
  121. if hits := p.admit.Estimate(pair.key); hits < minHits {
  122. minKey, minHits, minId, minCost = pair.key, hits, i, pair.cost
  123. }
  124. }
  125. // If the incoming item isn't worth keeping in the policy, reject.
  126. if incHits < minHits {
  127. p.metrics.add(rejectSets, key, 1)
  128. return victims, false
  129. }
  130. // Delete the victim from metadata.
  131. p.evict.del(minKey)
  132. // Delete the victim from sample.
  133. sample[minId] = sample[len(sample)-1]
  134. sample = sample[:len(sample)-1]
  135. // Store victim in evicted victims slice.
  136. victims = append(victims, &Item[V]{
  137. Key: minKey,
  138. Conflict: 0,
  139. Cost: minCost,
  140. })
  141. }
  142. p.evict.add(key, cost)
  143. p.metrics.add(costAdd, key, uint64(cost))
  144. return victims, true
  145. }
  146. func (p *defaultPolicy[V]) Has(key uint64) bool {
  147. p.Lock()
  148. _, exists := p.evict.keyCosts[key]
  149. p.Unlock()
  150. return exists
  151. }
  152. func (p *defaultPolicy[V]) Del(key uint64) {
  153. p.Lock()
  154. p.evict.del(key)
  155. p.Unlock()
  156. }
  157. func (p *defaultPolicy[V]) Cap() int64 {
  158. p.Lock()
  159. capacity := p.evict.getMaxCost() - p.evict.used
  160. p.Unlock()
  161. return capacity
  162. }
  163. func (p *defaultPolicy[V]) Update(key uint64, cost int64) {
  164. p.Lock()
  165. p.evict.updateIfHas(key, cost)
  166. p.Unlock()
  167. }
  168. func (p *defaultPolicy[V]) Cost(key uint64) int64 {
  169. p.Lock()
  170. if cost, found := p.evict.keyCosts[key]; found {
  171. p.Unlock()
  172. return cost
  173. }
  174. p.Unlock()
  175. return -1
  176. }
  177. func (p *defaultPolicy[V]) Clear() {
  178. p.Lock()
  179. p.admit.clear()
  180. p.evict.clear()
  181. p.Unlock()
  182. }
  183. func (p *defaultPolicy[V]) Close() {
  184. if p.isClosed {
  185. return
  186. }
  187. // Block until the p.processItems goroutine returns.
  188. p.stop <- struct{}{}
  189. <-p.done
  190. close(p.stop)
  191. close(p.done)
  192. close(p.itemsCh)
  193. p.isClosed = true
  194. }
  195. func (p *defaultPolicy[V]) MaxCost() int64 {
  196. if p == nil || p.evict == nil {
  197. return 0
  198. }
  199. return p.evict.getMaxCost()
  200. }
  201. func (p *defaultPolicy[V]) UpdateMaxCost(maxCost int64) {
  202. if p == nil || p.evict == nil {
  203. return
  204. }
  205. p.evict.updateMaxCost(maxCost)
  206. }
  207. // sampledLFU is an eviction helper storing key-cost pairs.
  208. type sampledLFU struct {
  209. // NOTE: align maxCost to 64-bit boundary for use with atomic.
  210. // As per https://golang.org/pkg/sync/atomic/: "On ARM, x86-32,
  211. // and 32-bit MIPS, it is the caller’s responsibility to arrange
  212. // for 64-bit alignment of 64-bit words accessed atomically.
  213. // The first word in a variable or in an allocated struct, array,
  214. // or slice can be relied upon to be 64-bit aligned."
  215. maxCost int64
  216. used int64
  217. metrics *Metrics
  218. keyCosts map[uint64]int64
  219. }
  220. func newSampledLFU(maxCost int64) *sampledLFU {
  221. return &sampledLFU{
  222. keyCosts: make(map[uint64]int64),
  223. maxCost: maxCost,
  224. }
  225. }
  226. func (p *sampledLFU) getMaxCost() int64 {
  227. return atomic.LoadInt64(&p.maxCost)
  228. }
  229. func (p *sampledLFU) updateMaxCost(maxCost int64) {
  230. atomic.StoreInt64(&p.maxCost, maxCost)
  231. }
  232. func (p *sampledLFU) roomLeft(cost int64) int64 {
  233. return p.getMaxCost() - (p.used + cost)
  234. }
  235. func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair {
  236. if len(in) >= lfuSample {
  237. return in
  238. }
  239. for key, cost := range p.keyCosts {
  240. in = append(in, &policyPair{key, cost})
  241. if len(in) >= lfuSample {
  242. return in
  243. }
  244. }
  245. return in
  246. }
  247. func (p *sampledLFU) del(key uint64) {
  248. cost, ok := p.keyCosts[key]
  249. if !ok {
  250. return
  251. }
  252. p.used -= cost
  253. delete(p.keyCosts, key)
  254. p.metrics.add(costEvict, key, uint64(cost))
  255. p.metrics.add(keyEvict, key, 1)
  256. }
  257. func (p *sampledLFU) add(key uint64, cost int64) {
  258. p.keyCosts[key] = cost
  259. p.used += cost
  260. }
  261. func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool {
  262. if prev, found := p.keyCosts[key]; found {
  263. // Update the cost of an existing key, but don't worry about evicting.
  264. // Evictions will be handled the next time a new item is added.
  265. p.metrics.add(keyUpdate, key, 1)
  266. if prev > cost {
  267. diff := prev - cost
  268. p.metrics.add(costAdd, key, ^(uint64(diff) - 1))
  269. } else if cost > prev {
  270. diff := cost - prev
  271. p.metrics.add(costAdd, key, uint64(diff))
  272. }
  273. p.used += cost - prev
  274. p.keyCosts[key] = cost
  275. return true
  276. }
  277. return false
  278. }
  279. func (p *sampledLFU) clear() {
  280. p.used = 0
  281. p.keyCosts = make(map[uint64]int64)
  282. }
  283. // tinyLFU is an admission helper that keeps track of access frequency using
  284. // tiny (4-bit) counters in the form of a count-min sketch.
  285. // tinyLFU is NOT thread safe.
  286. type tinyLFU struct {
  287. freq *cmSketch
  288. door *z.Bloom
  289. incrs int64
  290. resetAt int64
  291. }
  292. func newTinyLFU(numCounters int64) *tinyLFU {
  293. return &tinyLFU{
  294. freq: newCmSketch(numCounters),
  295. door: z.NewBloomFilter(float64(numCounters), 0.01),
  296. resetAt: numCounters,
  297. }
  298. }
  299. func (p *tinyLFU) Push(keys []uint64) {
  300. for _, key := range keys {
  301. p.Increment(key)
  302. }
  303. }
  304. func (p *tinyLFU) Estimate(key uint64) int64 {
  305. hits := p.freq.Estimate(key)
  306. if p.door.Has(key) {
  307. hits++
  308. }
  309. return hits
  310. }
  311. func (p *tinyLFU) Increment(key uint64) {
  312. // Flip doorkeeper bit if not already done.
  313. if added := p.door.AddIfNotHas(key); !added {
  314. // Increment count-min counter if doorkeeper bit is already set.
  315. p.freq.Increment(key)
  316. }
  317. p.incrs++
  318. if p.incrs >= p.resetAt {
  319. p.reset()
  320. }
  321. }
  322. func (p *tinyLFU) reset() {
  323. // Zero out incrs.
  324. p.incrs = 0
  325. // clears doorkeeper bits
  326. p.door.Clear()
  327. // halves count-min counters
  328. p.freq.Reset()
  329. }
  330. func (p *tinyLFU) clear() {
  331. p.incrs = 0
  332. p.door.Clear()
  333. p.freq.Clear()
  334. }