store.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. /*
  2. * Copyright 2019 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package ristretto
  17. import (
  18. "sync"
  19. "time"
  20. )
  21. type updateFn[V any] func(cur, prev V) bool
  22. // TODO: Do we need this to be a separate struct from Item?
  23. type storeItem[V any] struct {
  24. key uint64
  25. conflict uint64
  26. value V
  27. expiration time.Time
  28. }
  29. // store is the interface fulfilled by all hash map implementations in this
  30. // file. Some hash map implementations are better suited for certain data
  31. // distributions than others, so this allows us to abstract that out for use
  32. // in Ristretto.
  33. //
  34. // Every store is safe for concurrent usage.
  35. type store[V any] interface {
  36. // Get returns the value associated with the key parameter.
  37. Get(uint64, uint64) (V, bool)
  38. // Expiration returns the expiration time for this key.
  39. Expiration(uint64) time.Time
  40. // Set adds the key-value pair to the Map or updates the value if it's
  41. // already present. The key-value pair is passed as a pointer to an
  42. // item object.
  43. Set(*Item[V])
  44. // Del deletes the key-value pair from the Map.
  45. Del(uint64, uint64) (uint64, V)
  46. // Update attempts to update the key with a new value and returns true if
  47. // successful.
  48. Update(*Item[V]) (V, bool)
  49. // Cleanup removes items that have an expired TTL.
  50. Cleanup(policy *defaultPolicy[V], onEvict func(item *Item[V]))
  51. // Clear clears all contents of the store.
  52. Clear(onEvict func(item *Item[V]))
  53. SetShouldUpdateFn(f updateFn[V])
  54. }
  55. // newStore returns the default store implementation.
  56. func newStore[V any]() store[V] {
  57. return newShardedMap[V]()
  58. }
  59. const numShards uint64 = 256
  60. type shardedMap[V any] struct {
  61. shards []*lockedMap[V]
  62. expiryMap *expirationMap[V]
  63. }
  64. func newShardedMap[V any]() *shardedMap[V] {
  65. sm := &shardedMap[V]{
  66. shards: make([]*lockedMap[V], int(numShards)),
  67. expiryMap: newExpirationMap[V](),
  68. }
  69. for i := range sm.shards {
  70. sm.shards[i] = newLockedMap[V](sm.expiryMap)
  71. }
  72. return sm
  73. }
  74. func (m *shardedMap[V]) SetShouldUpdateFn(f updateFn[V]) {
  75. for i := range m.shards {
  76. m.shards[i].setShouldUpdateFn(f)
  77. }
  78. }
  79. func (sm *shardedMap[V]) Get(key, conflict uint64) (V, bool) {
  80. return sm.shards[key%numShards].get(key, conflict)
  81. }
  82. func (sm *shardedMap[V]) Expiration(key uint64) time.Time {
  83. return sm.shards[key%numShards].Expiration(key)
  84. }
  85. func (sm *shardedMap[V]) Set(i *Item[V]) {
  86. if i == nil {
  87. // If item is nil make this Set a no-op.
  88. return
  89. }
  90. sm.shards[i.Key%numShards].Set(i)
  91. }
  92. func (sm *shardedMap[V]) Del(key, conflict uint64) (uint64, V) {
  93. return sm.shards[key%numShards].Del(key, conflict)
  94. }
  95. func (sm *shardedMap[V]) Update(newItem *Item[V]) (V, bool) {
  96. return sm.shards[newItem.Key%numShards].Update(newItem)
  97. }
  98. func (sm *shardedMap[V]) Cleanup(policy *defaultPolicy[V], onEvict func(item *Item[V])) {
  99. sm.expiryMap.cleanup(sm, policy, onEvict)
  100. }
  101. func (sm *shardedMap[V]) Clear(onEvict func(item *Item[V])) {
  102. for i := uint64(0); i < numShards; i++ {
  103. sm.shards[i].Clear(onEvict)
  104. }
  105. sm.expiryMap.clear()
  106. }
  107. type lockedMap[V any] struct {
  108. sync.RWMutex
  109. data map[uint64]storeItem[V]
  110. em *expirationMap[V]
  111. shouldUpdate updateFn[V]
  112. }
  113. func newLockedMap[V any](em *expirationMap[V]) *lockedMap[V] {
  114. return &lockedMap[V]{
  115. data: make(map[uint64]storeItem[V]),
  116. em: em,
  117. shouldUpdate: func(cur, prev V) bool {
  118. return true
  119. },
  120. }
  121. }
  122. func (m *lockedMap[V]) setShouldUpdateFn(f updateFn[V]) {
  123. m.shouldUpdate = f
  124. }
  125. func (m *lockedMap[V]) get(key, conflict uint64) (V, bool) {
  126. m.RLock()
  127. item, ok := m.data[key]
  128. m.RUnlock()
  129. if !ok {
  130. return zeroValue[V](), false
  131. }
  132. if conflict != 0 && (conflict != item.conflict) {
  133. return zeroValue[V](), false
  134. }
  135. // Handle expired items.
  136. if !item.expiration.IsZero() && time.Now().After(item.expiration) {
  137. return zeroValue[V](), false
  138. }
  139. return item.value, true
  140. }
  141. func (m *lockedMap[V]) Expiration(key uint64) time.Time {
  142. m.RLock()
  143. defer m.RUnlock()
  144. return m.data[key].expiration
  145. }
  146. func (m *lockedMap[V]) Set(i *Item[V]) {
  147. if i == nil {
  148. // If the item is nil make this Set a no-op.
  149. return
  150. }
  151. m.Lock()
  152. defer m.Unlock()
  153. item, ok := m.data[i.Key]
  154. if ok {
  155. // The item existed already. We need to check the conflict key and reject the
  156. // update if they do not match. Only after that the expiration map is updated.
  157. if i.Conflict != 0 && (i.Conflict != item.conflict) {
  158. return
  159. }
  160. if m.shouldUpdate != nil && !m.shouldUpdate(i.Value, item.value) {
  161. return
  162. }
  163. m.em.update(i.Key, i.Conflict, item.expiration, i.Expiration)
  164. } else {
  165. // The value is not in the map already. There's no need to return anything.
  166. // Simply add the expiration map.
  167. m.em.add(i.Key, i.Conflict, i.Expiration)
  168. }
  169. m.data[i.Key] = storeItem[V]{
  170. key: i.Key,
  171. conflict: i.Conflict,
  172. value: i.Value,
  173. expiration: i.Expiration,
  174. }
  175. }
  176. func (m *lockedMap[V]) Del(key, conflict uint64) (uint64, V) {
  177. m.Lock()
  178. defer m.Unlock()
  179. item, ok := m.data[key]
  180. if !ok {
  181. return 0, zeroValue[V]()
  182. }
  183. if conflict != 0 && (conflict != item.conflict) {
  184. return 0, zeroValue[V]()
  185. }
  186. if !item.expiration.IsZero() {
  187. m.em.del(key, item.expiration)
  188. }
  189. delete(m.data, key)
  190. return item.conflict, item.value
  191. }
  192. func (m *lockedMap[V]) Update(newItem *Item[V]) (V, bool) {
  193. m.Lock()
  194. defer m.Unlock()
  195. item, ok := m.data[newItem.Key]
  196. if !ok {
  197. return zeroValue[V](), false
  198. }
  199. if newItem.Conflict != 0 && (newItem.Conflict != item.conflict) {
  200. return zeroValue[V](), false
  201. }
  202. if m.shouldUpdate != nil && !m.shouldUpdate(newItem.Value, item.value) {
  203. return item.value, false
  204. }
  205. m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration)
  206. m.data[newItem.Key] = storeItem[V]{
  207. key: newItem.Key,
  208. conflict: newItem.Conflict,
  209. value: newItem.Value,
  210. expiration: newItem.Expiration,
  211. }
  212. return item.value, true
  213. }
  214. func (m *lockedMap[V]) Clear(onEvict func(item *Item[V])) {
  215. m.Lock()
  216. defer m.Unlock()
  217. i := &Item[V]{}
  218. if onEvict != nil {
  219. for _, si := range m.data {
  220. i.Key = si.key
  221. i.Conflict = si.conflict
  222. i.Value = si.value
  223. onEvict(i)
  224. }
  225. }
  226. m.data = make(map[uint64]storeItem[V])
  227. }