ttl.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. /*
  2. * Copyright 2020 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package ristretto
  17. import (
  18. "sync"
  19. "time"
  20. )
  21. var (
  22. // TODO: find the optimal value or make it configurable.
  23. bucketDurationSecs = int64(5)
  24. )
  25. func storageBucket(t time.Time) int64 {
  26. return (t.Unix() / bucketDurationSecs) + 1
  27. }
  28. func cleanupBucket(t time.Time) int64 {
  29. // The bucket to cleanup is always behind the storage bucket by one so that
  30. // no elements in that bucket (which might not have expired yet) are deleted.
  31. return storageBucket(t) - 1
  32. }
  33. // bucket type is a map of key to conflict.
  34. type bucket map[uint64]uint64
  35. // expirationMap is a map of bucket number to the corresponding bucket.
  36. type expirationMap[V any] struct {
  37. sync.RWMutex
  38. buckets map[int64]bucket
  39. lastCleanedBucketNum int64
  40. }
  41. func newExpirationMap[V any]() *expirationMap[V] {
  42. return &expirationMap[V]{
  43. buckets: make(map[int64]bucket),
  44. lastCleanedBucketNum: cleanupBucket(time.Now()),
  45. }
  46. }
  47. func (m *expirationMap[_]) add(key, conflict uint64, expiration time.Time) {
  48. if m == nil {
  49. return
  50. }
  51. // Items that don't expire don't need to be in the expiration map.
  52. if expiration.IsZero() {
  53. return
  54. }
  55. bucketNum := storageBucket(expiration)
  56. m.Lock()
  57. defer m.Unlock()
  58. b, ok := m.buckets[bucketNum]
  59. if !ok {
  60. b = make(bucket)
  61. m.buckets[bucketNum] = b
  62. }
  63. b[key] = conflict
  64. }
  65. func (m *expirationMap[_]) update(key, conflict uint64, oldExpTime, newExpTime time.Time) {
  66. if m == nil {
  67. return
  68. }
  69. m.Lock()
  70. defer m.Unlock()
  71. oldBucketNum := storageBucket(oldExpTime)
  72. oldBucket, ok := m.buckets[oldBucketNum]
  73. if ok {
  74. delete(oldBucket, key)
  75. }
  76. // Items that don't expire don't need to be in the expiration map.
  77. if newExpTime.IsZero() {
  78. return
  79. }
  80. newBucketNum := storageBucket(newExpTime)
  81. newBucket, ok := m.buckets[newBucketNum]
  82. if !ok {
  83. newBucket = make(bucket)
  84. m.buckets[newBucketNum] = newBucket
  85. }
  86. newBucket[key] = conflict
  87. }
  88. func (m *expirationMap[_]) del(key uint64, expiration time.Time) {
  89. if m == nil {
  90. return
  91. }
  92. bucketNum := storageBucket(expiration)
  93. m.Lock()
  94. defer m.Unlock()
  95. _, ok := m.buckets[bucketNum]
  96. if !ok {
  97. return
  98. }
  99. delete(m.buckets[bucketNum], key)
  100. }
  101. // cleanup removes all the items in the bucket that was just completed. It deletes
  102. // those items from the store, and calls the onEvict function on those items.
  103. // This function is meant to be called periodically.
  104. func (m *expirationMap[V]) cleanup(store store[V], policy *defaultPolicy[V], onEvict func(item *Item[V])) int {
  105. if m == nil {
  106. return 0
  107. }
  108. m.Lock()
  109. now := time.Now()
  110. currentBucketNum := cleanupBucket(now)
  111. // Clean up all buckets up to and including currentBucketNum, starting from
  112. // (but not including) the last one that was cleaned up
  113. var buckets []bucket
  114. for bucketNum := m.lastCleanedBucketNum + 1; bucketNum <= currentBucketNum; bucketNum++ {
  115. // With an empty bucket, we don't need to add it to the Clean list
  116. if b := m.buckets[bucketNum]; b != nil {
  117. buckets = append(buckets, b)
  118. }
  119. delete(m.buckets, bucketNum)
  120. }
  121. m.lastCleanedBucketNum = currentBucketNum
  122. m.Unlock()
  123. for _, keys := range buckets {
  124. for key, conflict := range keys {
  125. expr := store.Expiration(key)
  126. // Sanity check. Verify that the store agrees that this key is expired.
  127. if expr.After(now) {
  128. continue
  129. }
  130. cost := policy.Cost(key)
  131. policy.Del(key)
  132. _, value := store.Del(key, conflict)
  133. if onEvict != nil {
  134. onEvict(&Item[V]{Key: key,
  135. Conflict: conflict,
  136. Value: value,
  137. Cost: cost,
  138. Expiration: expr,
  139. })
  140. }
  141. }
  142. }
  143. cleanedBucketsCount := len(buckets)
  144. return cleanedBucketsCount
  145. }
  146. // clear clears the expirationMap, the caller is responsible for properly
  147. // evicting the referenced items
  148. func (m *expirationMap[V]) clear() {
  149. if m == nil {
  150. return
  151. }
  152. m.Lock()
  153. m.buckets = make(map[int64]bucket)
  154. m.lastCleanedBucketNum = cleanupBucket(time.Now())
  155. m.Unlock()
  156. }