| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- /*
- * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
- * SPDX-License-Identifier: Apache-2.0
- */
- package ristretto
- import (
- "math"
- "sync"
- "sync/atomic"
- "github.com/dgraph-io/ristretto/v2/z"
- )
- const (
- // lfuSample is the number of items to sample when looking at eviction
- // candidates. 5 seems to be the most optimal number [citation needed].
- lfuSample = 5
- )
- func newPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] {
- return newDefaultPolicy[V](numCounters, maxCost)
- }
- type defaultPolicy[V any] struct {
- sync.Mutex
- admit *tinyLFU
- evict *sampledLFU
- itemsCh chan []uint64
- stop chan struct{}
- done chan struct{}
- isClosed bool
- metrics *Metrics
- }
- func newDefaultPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] {
- p := &defaultPolicy[V]{
- admit: newTinyLFU(numCounters),
- evict: newSampledLFU(maxCost),
- itemsCh: make(chan []uint64, 3),
- stop: make(chan struct{}),
- done: make(chan struct{}),
- }
- go p.processItems()
- return p
- }
- func (p *defaultPolicy[V]) CollectMetrics(metrics *Metrics) {
- p.metrics = metrics
- p.evict.metrics = metrics
- }
- type policyPair struct {
- key uint64
- cost int64
- }
- func (p *defaultPolicy[V]) processItems() {
- for {
- select {
- case items := <-p.itemsCh:
- p.Lock()
- p.admit.Push(items)
- p.Unlock()
- case <-p.stop:
- p.done <- struct{}{}
- return
- }
- }
- }
- func (p *defaultPolicy[V]) Push(keys []uint64) bool {
- if p.isClosed {
- return false
- }
- if len(keys) == 0 {
- return true
- }
- select {
- case p.itemsCh <- keys:
- p.metrics.add(keepGets, keys[0], uint64(len(keys)))
- return true
- default:
- p.metrics.add(dropGets, keys[0], uint64(len(keys)))
- return false
- }
- }
- // Add decides whether the item with the given key and cost should be accepted by
- // the policy. It returns the list of victims that have been evicted and a boolean
- // indicating whether the incoming item should be accepted.
- func (p *defaultPolicy[V]) Add(key uint64, cost int64) ([]*Item[V], bool) {
- p.Lock()
- defer p.Unlock()
- // Cannot add an item bigger than entire cache.
- if cost > p.evict.getMaxCost() {
- return nil, false
- }
- // No need to go any further if the item is already in the cache.
- if has := p.evict.updateIfHas(key, cost); has {
- // An update does not count as an addition, so return false.
- return nil, false
- }
- // If the execution reaches this point, the key doesn't exist in the cache.
- // Calculate the remaining room in the cache (usually bytes).
- room := p.evict.roomLeft(cost)
- if room >= 0 {
- // There's enough room in the cache to store the new item without
- // overflowing. Do that now and stop here.
- p.evict.add(key, cost)
- p.metrics.add(costAdd, key, uint64(cost))
- return nil, true
- }
- // incHits is the hit count for the incoming item.
- incHits := p.admit.Estimate(key)
- // sample is the eviction candidate pool to be filled via random sampling.
- // TODO: perhaps we should use a min heap here. Right now our time
- // complexity is N for finding the min. Min heap should bring it down to
- // O(lg N).
- sample := make([]*policyPair, 0, lfuSample)
- // As items are evicted they will be appended to victims.
- victims := make([]*Item[V], 0)
- // Delete victims until there's enough space or a minKey is found that has
- // more hits than incoming item.
- for ; room < 0; room = p.evict.roomLeft(cost) {
- // Fill up empty slots in sample.
- sample = p.evict.fillSample(sample)
- // Find minimally used item in sample.
- minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0)
- for i, pair := range sample {
- // Look up hit count for sample key.
- if hits := p.admit.Estimate(pair.key); hits < minHits {
- minKey, minHits, minId, minCost = pair.key, hits, i, pair.cost
- }
- }
- // If the incoming item isn't worth keeping in the policy, reject.
- if incHits < minHits {
- p.metrics.add(rejectSets, key, 1)
- return victims, false
- }
- // Delete the victim from metadata.
- p.evict.del(minKey)
- // Delete the victim from sample.
- sample[minId] = sample[len(sample)-1]
- sample = sample[:len(sample)-1]
- // Store victim in evicted victims slice.
- victims = append(victims, &Item[V]{
- Key: minKey,
- Conflict: 0,
- Cost: minCost,
- })
- }
- p.evict.add(key, cost)
- p.metrics.add(costAdd, key, uint64(cost))
- return victims, true
- }
- func (p *defaultPolicy[V]) Has(key uint64) bool {
- p.Lock()
- _, exists := p.evict.keyCosts[key]
- p.Unlock()
- return exists
- }
- func (p *defaultPolicy[V]) Del(key uint64) {
- p.Lock()
- p.evict.del(key)
- p.Unlock()
- }
- func (p *defaultPolicy[V]) Cap() int64 {
- p.Lock()
- capacity := p.evict.getMaxCost() - p.evict.used
- p.Unlock()
- return capacity
- }
- func (p *defaultPolicy[V]) Update(key uint64, cost int64) {
- p.Lock()
- p.evict.updateIfHas(key, cost)
- p.Unlock()
- }
- func (p *defaultPolicy[V]) Cost(key uint64) int64 {
- p.Lock()
- if cost, found := p.evict.keyCosts[key]; found {
- p.Unlock()
- return cost
- }
- p.Unlock()
- return -1
- }
- func (p *defaultPolicy[V]) Clear() {
- p.Lock()
- p.admit.clear()
- p.evict.clear()
- p.Unlock()
- }
- func (p *defaultPolicy[V]) Close() {
- if p.isClosed {
- return
- }
- // Block until the p.processItems goroutine returns.
- p.stop <- struct{}{}
- <-p.done
- close(p.stop)
- close(p.done)
- close(p.itemsCh)
- p.isClosed = true
- }
- func (p *defaultPolicy[V]) MaxCost() int64 {
- if p == nil || p.evict == nil {
- return 0
- }
- return p.evict.getMaxCost()
- }
- func (p *defaultPolicy[V]) UpdateMaxCost(maxCost int64) {
- if p == nil || p.evict == nil {
- return
- }
- p.evict.updateMaxCost(maxCost)
- }
- // sampledLFU is an eviction helper storing key-cost pairs.
- type sampledLFU struct {
- // NOTE: align maxCost to 64-bit boundary for use with atomic.
- // As per https://golang.org/pkg/sync/atomic/: "On ARM, x86-32,
- // and 32-bit MIPS, it is the caller’s responsibility to arrange
- // for 64-bit alignment of 64-bit words accessed atomically.
- // The first word in a variable or in an allocated struct, array,
- // or slice can be relied upon to be 64-bit aligned."
- maxCost int64
- used int64
- metrics *Metrics
- keyCosts map[uint64]int64
- }
- func newSampledLFU(maxCost int64) *sampledLFU {
- return &sampledLFU{
- keyCosts: make(map[uint64]int64),
- maxCost: maxCost,
- }
- }
- func (p *sampledLFU) getMaxCost() int64 {
- return atomic.LoadInt64(&p.maxCost)
- }
- func (p *sampledLFU) updateMaxCost(maxCost int64) {
- atomic.StoreInt64(&p.maxCost, maxCost)
- }
- func (p *sampledLFU) roomLeft(cost int64) int64 {
- return p.getMaxCost() - (p.used + cost)
- }
- func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair {
- if len(in) >= lfuSample {
- return in
- }
- for key, cost := range p.keyCosts {
- in = append(in, &policyPair{key, cost})
- if len(in) >= lfuSample {
- return in
- }
- }
- return in
- }
- func (p *sampledLFU) del(key uint64) {
- cost, ok := p.keyCosts[key]
- if !ok {
- return
- }
- p.used -= cost
- delete(p.keyCosts, key)
- p.metrics.add(costEvict, key, uint64(cost))
- p.metrics.add(keyEvict, key, 1)
- }
- func (p *sampledLFU) add(key uint64, cost int64) {
- p.keyCosts[key] = cost
- p.used += cost
- }
- func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool {
- if prev, found := p.keyCosts[key]; found {
- // Update the cost of an existing key, but don't worry about evicting.
- // Evictions will be handled the next time a new item is added.
- p.metrics.add(keyUpdate, key, 1)
- if prev > cost {
- diff := prev - cost
- p.metrics.add(costAdd, key, ^(uint64(diff) - 1))
- } else if cost > prev {
- diff := cost - prev
- p.metrics.add(costAdd, key, uint64(diff))
- }
- p.used += cost - prev
- p.keyCosts[key] = cost
- return true
- }
- return false
- }
- func (p *sampledLFU) clear() {
- p.used = 0
- p.keyCosts = make(map[uint64]int64)
- }
- // tinyLFU is an admission helper that keeps track of access frequency using
- // tiny (4-bit) counters in the form of a count-min sketch.
- // tinyLFU is NOT thread safe.
- type tinyLFU struct {
- freq *cmSketch
- door *z.Bloom
- incrs int64
- resetAt int64
- }
- func newTinyLFU(numCounters int64) *tinyLFU {
- return &tinyLFU{
- freq: newCmSketch(numCounters),
- door: z.NewBloomFilter(float64(numCounters), 0.01),
- resetAt: numCounters,
- }
- }
- func (p *tinyLFU) Push(keys []uint64) {
- for _, key := range keys {
- p.Increment(key)
- }
- }
- func (p *tinyLFU) Estimate(key uint64) int64 {
- hits := p.freq.Estimate(key)
- if p.door.Has(key) {
- hits++
- }
- return hits
- }
- func (p *tinyLFU) Increment(key uint64) {
- // Flip doorkeeper bit if not already done.
- if added := p.door.AddIfNotHas(key); !added {
- // Increment count-min counter if doorkeeper bit is already set.
- p.freq.Increment(key)
- }
- p.incrs++
- if p.incrs >= p.resetAt {
- p.reset()
- }
- }
- func (p *tinyLFU) reset() {
- // Zero out incrs.
- p.incrs = 0
- // clears doorkeeper bits
- p.door.Clear()
- // halves count-min counters
- p.freq.Reset()
- }
- func (p *tinyLFU) clear() {
- p.incrs = 0
- p.door.Clear()
- p.freq.Clear()
- }
|