publisher.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "sync"
  8. "sync/atomic"
  9. "github.com/dgraph-io/badger/v4/pb"
  10. "github.com/dgraph-io/badger/v4/trie"
  11. "github.com/dgraph-io/badger/v4/y"
  12. "github.com/dgraph-io/ristretto/v2/z"
  13. )
  14. type subscriber struct {
  15. id uint64
  16. matches []pb.Match
  17. sendCh chan *pb.KVList
  18. subCloser *z.Closer
  19. // this will be atomic pointer which will be used to
  20. // track whether the subscriber is active or not
  21. active *atomic.Uint64
  22. }
  23. type publisher struct {
  24. sync.Mutex
  25. pubCh chan requests
  26. subscribers map[uint64]subscriber
  27. nextID uint64
  28. indexer *trie.Trie
  29. }
  30. func newPublisher() *publisher {
  31. return &publisher{
  32. pubCh: make(chan requests, 1000),
  33. subscribers: make(map[uint64]subscriber),
  34. nextID: 0,
  35. indexer: trie.NewTrie(),
  36. }
  37. }
  38. func (p *publisher) listenForUpdates(c *z.Closer) {
  39. defer func() {
  40. p.cleanSubscribers()
  41. c.Done()
  42. }()
  43. slurp := func(batch requests) {
  44. for {
  45. select {
  46. case reqs := <-p.pubCh:
  47. batch = append(batch, reqs...)
  48. default:
  49. p.publishUpdates(batch)
  50. return
  51. }
  52. }
  53. }
  54. for {
  55. select {
  56. case <-c.HasBeenClosed():
  57. return
  58. case reqs := <-p.pubCh:
  59. slurp(reqs)
  60. }
  61. }
  62. }
  63. func (p *publisher) publishUpdates(reqs requests) {
  64. p.Lock()
  65. defer func() {
  66. p.Unlock()
  67. // Release all the request.
  68. reqs.DecrRef()
  69. }()
  70. batchedUpdates := make(map[uint64]*pb.KVList)
  71. for _, req := range reqs {
  72. for _, e := range req.Entries {
  73. ids := p.indexer.Get(e.Key)
  74. if len(ids) == 0 {
  75. continue
  76. }
  77. k := y.SafeCopy(nil, e.Key)
  78. kv := &pb.KV{
  79. Key: y.ParseKey(k),
  80. Value: y.SafeCopy(nil, e.Value),
  81. Meta: []byte{e.UserMeta},
  82. ExpiresAt: e.ExpiresAt,
  83. Version: y.ParseTs(k),
  84. }
  85. for id := range ids {
  86. if _, ok := batchedUpdates[id]; !ok {
  87. batchedUpdates[id] = &pb.KVList{}
  88. }
  89. batchedUpdates[id].Kv = append(batchedUpdates[id].Kv, kv)
  90. }
  91. }
  92. }
  93. for id, kvs := range batchedUpdates {
  94. if p.subscribers[id].active.Load() == 1 {
  95. p.subscribers[id].sendCh <- kvs
  96. }
  97. }
  98. }
  99. func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) (subscriber, error) {
  100. p.Lock()
  101. defer p.Unlock()
  102. ch := make(chan *pb.KVList, 1000)
  103. id := p.nextID
  104. // Increment next ID.
  105. p.nextID++
  106. s := subscriber{
  107. id: id,
  108. matches: matches,
  109. sendCh: ch,
  110. subCloser: c,
  111. active: new(atomic.Uint64),
  112. }
  113. s.active.Store(1)
  114. p.subscribers[id] = s
  115. for _, m := range matches {
  116. if err := p.indexer.AddMatch(m, id); err != nil {
  117. return subscriber{}, err
  118. }
  119. }
  120. return s, nil
  121. }
  122. // cleanSubscribers stops all the subscribers. Ideally, It should be called while closing DB.
  123. func (p *publisher) cleanSubscribers() {
  124. p.Lock()
  125. defer p.Unlock()
  126. for id, s := range p.subscribers {
  127. for _, m := range s.matches {
  128. _ = p.indexer.DeleteMatch(m, id)
  129. }
  130. delete(p.subscribers, id)
  131. s.subCloser.SignalAndWait()
  132. }
  133. }
  134. func (p *publisher) deleteSubscriber(id uint64) {
  135. p.Lock()
  136. defer p.Unlock()
  137. if s, ok := p.subscribers[id]; ok {
  138. for _, m := range s.matches {
  139. _ = p.indexer.DeleteMatch(m, id)
  140. }
  141. }
  142. delete(p.subscribers, id)
  143. }
  144. func (p *publisher) sendUpdates(reqs requests) {
  145. if p.noOfSubscribers() != 0 {
  146. reqs.IncrRef()
  147. p.pubCh <- reqs
  148. }
  149. }
  150. func (p *publisher) noOfSubscribers() int {
  151. p.Lock()
  152. defer p.Unlock()
  153. return len(p.subscribers)
  154. }