policy.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. /*
  2. * Copyright 2020 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package ristretto
  17. import (
  18. "math"
  19. "sync"
  20. "sync/atomic"
  21. "github.com/dgraph-io/ristretto/v2/z"
  22. )
  23. const (
  24. // lfuSample is the number of items to sample when looking at eviction
  25. // candidates. 5 seems to be the most optimal number [citation needed].
  26. lfuSample = 5
  27. )
  28. func newPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] {
  29. return newDefaultPolicy[V](numCounters, maxCost)
  30. }
  31. type defaultPolicy[V any] struct {
  32. sync.Mutex
  33. admit *tinyLFU
  34. evict *sampledLFU
  35. itemsCh chan []uint64
  36. stop chan struct{}
  37. done chan struct{}
  38. isClosed bool
  39. metrics *Metrics
  40. }
  41. func newDefaultPolicy[V any](numCounters, maxCost int64) *defaultPolicy[V] {
  42. p := &defaultPolicy[V]{
  43. admit: newTinyLFU(numCounters),
  44. evict: newSampledLFU(maxCost),
  45. itemsCh: make(chan []uint64, 3),
  46. stop: make(chan struct{}),
  47. done: make(chan struct{}),
  48. }
  49. go p.processItems()
  50. return p
  51. }
  52. func (p *defaultPolicy[V]) CollectMetrics(metrics *Metrics) {
  53. p.metrics = metrics
  54. p.evict.metrics = metrics
  55. }
  56. type policyPair struct {
  57. key uint64
  58. cost int64
  59. }
  60. func (p *defaultPolicy[V]) processItems() {
  61. for {
  62. select {
  63. case items := <-p.itemsCh:
  64. p.Lock()
  65. p.admit.Push(items)
  66. p.Unlock()
  67. case <-p.stop:
  68. p.done <- struct{}{}
  69. return
  70. }
  71. }
  72. }
  73. func (p *defaultPolicy[V]) Push(keys []uint64) bool {
  74. if p.isClosed {
  75. return false
  76. }
  77. if len(keys) == 0 {
  78. return true
  79. }
  80. select {
  81. case p.itemsCh <- keys:
  82. p.metrics.add(keepGets, keys[0], uint64(len(keys)))
  83. return true
  84. default:
  85. p.metrics.add(dropGets, keys[0], uint64(len(keys)))
  86. return false
  87. }
  88. }
  89. // Add decides whether the item with the given key and cost should be accepted by
  90. // the policy. It returns the list of victims that have been evicted and a boolean
  91. // indicating whether the incoming item should be accepted.
  92. func (p *defaultPolicy[V]) Add(key uint64, cost int64) ([]*Item[V], bool) {
  93. p.Lock()
  94. defer p.Unlock()
  95. // Cannot add an item bigger than entire cache.
  96. if cost > p.evict.getMaxCost() {
  97. return nil, false
  98. }
  99. // No need to go any further if the item is already in the cache.
  100. if has := p.evict.updateIfHas(key, cost); has {
  101. // An update does not count as an addition, so return false.
  102. return nil, false
  103. }
  104. // If the execution reaches this point, the key doesn't exist in the cache.
  105. // Calculate the remaining room in the cache (usually bytes).
  106. room := p.evict.roomLeft(cost)
  107. if room >= 0 {
  108. // There's enough room in the cache to store the new item without
  109. // overflowing. Do that now and stop here.
  110. p.evict.add(key, cost)
  111. p.metrics.add(costAdd, key, uint64(cost))
  112. return nil, true
  113. }
  114. // incHits is the hit count for the incoming item.
  115. incHits := p.admit.Estimate(key)
  116. // sample is the eviction candidate pool to be filled via random sampling.
  117. // TODO: perhaps we should use a min heap here. Right now our time
  118. // complexity is N for finding the min. Min heap should bring it down to
  119. // O(lg N).
  120. sample := make([]*policyPair, 0, lfuSample)
  121. // As items are evicted they will be appended to victims.
  122. victims := make([]*Item[V], 0)
  123. // Delete victims until there's enough space or a minKey is found that has
  124. // more hits than incoming item.
  125. for ; room < 0; room = p.evict.roomLeft(cost) {
  126. // Fill up empty slots in sample.
  127. sample = p.evict.fillSample(sample)
  128. // Find minimally used item in sample.
  129. minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0)
  130. for i, pair := range sample {
  131. // Look up hit count for sample key.
  132. if hits := p.admit.Estimate(pair.key); hits < minHits {
  133. minKey, minHits, minId, minCost = pair.key, hits, i, pair.cost
  134. }
  135. }
  136. // If the incoming item isn't worth keeping in the policy, reject.
  137. if incHits < minHits {
  138. p.metrics.add(rejectSets, key, 1)
  139. return victims, false
  140. }
  141. // Delete the victim from metadata.
  142. p.evict.del(minKey)
  143. // Delete the victim from sample.
  144. sample[minId] = sample[len(sample)-1]
  145. sample = sample[:len(sample)-1]
  146. // Store victim in evicted victims slice.
  147. victims = append(victims, &Item[V]{
  148. Key: minKey,
  149. Conflict: 0,
  150. Cost: minCost,
  151. })
  152. }
  153. p.evict.add(key, cost)
  154. p.metrics.add(costAdd, key, uint64(cost))
  155. return victims, true
  156. }
  157. func (p *defaultPolicy[V]) Has(key uint64) bool {
  158. p.Lock()
  159. _, exists := p.evict.keyCosts[key]
  160. p.Unlock()
  161. return exists
  162. }
  163. func (p *defaultPolicy[V]) Del(key uint64) {
  164. p.Lock()
  165. p.evict.del(key)
  166. p.Unlock()
  167. }
  168. func (p *defaultPolicy[V]) Cap() int64 {
  169. p.Lock()
  170. capacity := p.evict.getMaxCost() - p.evict.used
  171. p.Unlock()
  172. return capacity
  173. }
  174. func (p *defaultPolicy[V]) Update(key uint64, cost int64) {
  175. p.Lock()
  176. p.evict.updateIfHas(key, cost)
  177. p.Unlock()
  178. }
  179. func (p *defaultPolicy[V]) Cost(key uint64) int64 {
  180. p.Lock()
  181. if cost, found := p.evict.keyCosts[key]; found {
  182. p.Unlock()
  183. return cost
  184. }
  185. p.Unlock()
  186. return -1
  187. }
  188. func (p *defaultPolicy[V]) Clear() {
  189. p.Lock()
  190. p.admit.clear()
  191. p.evict.clear()
  192. p.Unlock()
  193. }
  194. func (p *defaultPolicy[V]) Close() {
  195. if p.isClosed {
  196. return
  197. }
  198. // Block until the p.processItems goroutine returns.
  199. p.stop <- struct{}{}
  200. <-p.done
  201. close(p.stop)
  202. close(p.done)
  203. close(p.itemsCh)
  204. p.isClosed = true
  205. }
  206. func (p *defaultPolicy[V]) MaxCost() int64 {
  207. if p == nil || p.evict == nil {
  208. return 0
  209. }
  210. return p.evict.getMaxCost()
  211. }
  212. func (p *defaultPolicy[V]) UpdateMaxCost(maxCost int64) {
  213. if p == nil || p.evict == nil {
  214. return
  215. }
  216. p.evict.updateMaxCost(maxCost)
  217. }
  218. // sampledLFU is an eviction helper storing key-cost pairs.
  219. type sampledLFU struct {
  220. // NOTE: align maxCost to 64-bit boundary for use with atomic.
  221. // As per https://golang.org/pkg/sync/atomic/: "On ARM, x86-32,
  222. // and 32-bit MIPS, it is the caller’s responsibility to arrange
  223. // for 64-bit alignment of 64-bit words accessed atomically.
  224. // The first word in a variable or in an allocated struct, array,
  225. // or slice can be relied upon to be 64-bit aligned."
  226. maxCost int64
  227. used int64
  228. metrics *Metrics
  229. keyCosts map[uint64]int64
  230. }
  231. func newSampledLFU(maxCost int64) *sampledLFU {
  232. return &sampledLFU{
  233. keyCosts: make(map[uint64]int64),
  234. maxCost: maxCost,
  235. }
  236. }
  237. func (p *sampledLFU) getMaxCost() int64 {
  238. return atomic.LoadInt64(&p.maxCost)
  239. }
  240. func (p *sampledLFU) updateMaxCost(maxCost int64) {
  241. atomic.StoreInt64(&p.maxCost, maxCost)
  242. }
  243. func (p *sampledLFU) roomLeft(cost int64) int64 {
  244. return p.getMaxCost() - (p.used + cost)
  245. }
  246. func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair {
  247. if len(in) >= lfuSample {
  248. return in
  249. }
  250. for key, cost := range p.keyCosts {
  251. in = append(in, &policyPair{key, cost})
  252. if len(in) >= lfuSample {
  253. return in
  254. }
  255. }
  256. return in
  257. }
  258. func (p *sampledLFU) del(key uint64) {
  259. cost, ok := p.keyCosts[key]
  260. if !ok {
  261. return
  262. }
  263. p.used -= cost
  264. delete(p.keyCosts, key)
  265. p.metrics.add(costEvict, key, uint64(cost))
  266. p.metrics.add(keyEvict, key, 1)
  267. }
  268. func (p *sampledLFU) add(key uint64, cost int64) {
  269. p.keyCosts[key] = cost
  270. p.used += cost
  271. }
  272. func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool {
  273. if prev, found := p.keyCosts[key]; found {
  274. // Update the cost of an existing key, but don't worry about evicting.
  275. // Evictions will be handled the next time a new item is added.
  276. p.metrics.add(keyUpdate, key, 1)
  277. if prev > cost {
  278. diff := prev - cost
  279. p.metrics.add(costAdd, key, ^(uint64(diff) - 1))
  280. } else if cost > prev {
  281. diff := cost - prev
  282. p.metrics.add(costAdd, key, uint64(diff))
  283. }
  284. p.used += cost - prev
  285. p.keyCosts[key] = cost
  286. return true
  287. }
  288. return false
  289. }
  290. func (p *sampledLFU) clear() {
  291. p.used = 0
  292. p.keyCosts = make(map[uint64]int64)
  293. }
  294. // tinyLFU is an admission helper that keeps track of access frequency using
  295. // tiny (4-bit) counters in the form of a count-min sketch.
  296. // tinyLFU is NOT thread safe.
  297. type tinyLFU struct {
  298. freq *cmSketch
  299. door *z.Bloom
  300. incrs int64
  301. resetAt int64
  302. }
  303. func newTinyLFU(numCounters int64) *tinyLFU {
  304. return &tinyLFU{
  305. freq: newCmSketch(numCounters),
  306. door: z.NewBloomFilter(float64(numCounters), 0.01),
  307. resetAt: numCounters,
  308. }
  309. }
  310. func (p *tinyLFU) Push(keys []uint64) {
  311. for _, key := range keys {
  312. p.Increment(key)
  313. }
  314. }
  315. func (p *tinyLFU) Estimate(key uint64) int64 {
  316. hits := p.freq.Estimate(key)
  317. if p.door.Has(key) {
  318. hits++
  319. }
  320. return hits
  321. }
  322. func (p *tinyLFU) Increment(key uint64) {
  323. // Flip doorkeeper bit if not already done.
  324. if added := p.door.AddIfNotHas(key); !added {
  325. // Increment count-min counter if doorkeeper bit is already set.
  326. p.freq.Increment(key)
  327. }
  328. p.incrs++
  329. if p.incrs >= p.resetAt {
  330. p.reset()
  331. }
  332. }
  333. func (p *tinyLFU) reset() {
  334. // Zero out incrs.
  335. p.incrs = 0
  336. // clears doorkeeper bits
  337. p.door.Clear()
  338. // halves count-min counters
  339. p.freq.Reset()
  340. }
  341. func (p *tinyLFU) clear() {
  342. p.incrs = 0
  343. p.door.Clear()
  344. p.freq.Clear()
  345. }