publisher.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. /*
  2. * Copyright 2019 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "sync"
  19. "sync/atomic"
  20. "github.com/dgraph-io/badger/v4/pb"
  21. "github.com/dgraph-io/badger/v4/trie"
  22. "github.com/dgraph-io/badger/v4/y"
  23. "github.com/dgraph-io/ristretto/v2/z"
  24. )
  25. type subscriber struct {
  26. id uint64
  27. matches []pb.Match
  28. sendCh chan *pb.KVList
  29. subCloser *z.Closer
  30. // this will be atomic pointer which will be used to
  31. // track whether the subscriber is active or not
  32. active *atomic.Uint64
  33. }
  34. type publisher struct {
  35. sync.Mutex
  36. pubCh chan requests
  37. subscribers map[uint64]subscriber
  38. nextID uint64
  39. indexer *trie.Trie
  40. }
  41. func newPublisher() *publisher {
  42. return &publisher{
  43. pubCh: make(chan requests, 1000),
  44. subscribers: make(map[uint64]subscriber),
  45. nextID: 0,
  46. indexer: trie.NewTrie(),
  47. }
  48. }
  49. func (p *publisher) listenForUpdates(c *z.Closer) {
  50. defer func() {
  51. p.cleanSubscribers()
  52. c.Done()
  53. }()
  54. slurp := func(batch requests) {
  55. for {
  56. select {
  57. case reqs := <-p.pubCh:
  58. batch = append(batch, reqs...)
  59. default:
  60. p.publishUpdates(batch)
  61. return
  62. }
  63. }
  64. }
  65. for {
  66. select {
  67. case <-c.HasBeenClosed():
  68. return
  69. case reqs := <-p.pubCh:
  70. slurp(reqs)
  71. }
  72. }
  73. }
  74. func (p *publisher) publishUpdates(reqs requests) {
  75. p.Lock()
  76. defer func() {
  77. p.Unlock()
  78. // Release all the request.
  79. reqs.DecrRef()
  80. }()
  81. batchedUpdates := make(map[uint64]*pb.KVList)
  82. for _, req := range reqs {
  83. for _, e := range req.Entries {
  84. ids := p.indexer.Get(e.Key)
  85. if len(ids) == 0 {
  86. continue
  87. }
  88. k := y.SafeCopy(nil, e.Key)
  89. kv := &pb.KV{
  90. Key: y.ParseKey(k),
  91. Value: y.SafeCopy(nil, e.Value),
  92. Meta: []byte{e.UserMeta},
  93. ExpiresAt: e.ExpiresAt,
  94. Version: y.ParseTs(k),
  95. }
  96. for id := range ids {
  97. if _, ok := batchedUpdates[id]; !ok {
  98. batchedUpdates[id] = &pb.KVList{}
  99. }
  100. batchedUpdates[id].Kv = append(batchedUpdates[id].Kv, kv)
  101. }
  102. }
  103. }
  104. for id, kvs := range batchedUpdates {
  105. if p.subscribers[id].active.Load() == 1 {
  106. p.subscribers[id].sendCh <- kvs
  107. }
  108. }
  109. }
  110. func (p *publisher) newSubscriber(c *z.Closer, matches []pb.Match) (subscriber, error) {
  111. p.Lock()
  112. defer p.Unlock()
  113. ch := make(chan *pb.KVList, 1000)
  114. id := p.nextID
  115. // Increment next ID.
  116. p.nextID++
  117. s := subscriber{
  118. id: id,
  119. matches: matches,
  120. sendCh: ch,
  121. subCloser: c,
  122. active: new(atomic.Uint64),
  123. }
  124. s.active.Store(1)
  125. p.subscribers[id] = s
  126. for _, m := range matches {
  127. if err := p.indexer.AddMatch(m, id); err != nil {
  128. return subscriber{}, err
  129. }
  130. }
  131. return s, nil
  132. }
  133. // cleanSubscribers stops all the subscribers. Ideally, It should be called while closing DB.
  134. func (p *publisher) cleanSubscribers() {
  135. p.Lock()
  136. defer p.Unlock()
  137. for id, s := range p.subscribers {
  138. for _, m := range s.matches {
  139. _ = p.indexer.DeleteMatch(m, id)
  140. }
  141. delete(p.subscribers, id)
  142. s.subCloser.SignalAndWait()
  143. }
  144. }
  145. func (p *publisher) deleteSubscriber(id uint64) {
  146. p.Lock()
  147. defer p.Unlock()
  148. if s, ok := p.subscribers[id]; ok {
  149. for _, m := range s.matches {
  150. _ = p.indexer.DeleteMatch(m, id)
  151. }
  152. }
  153. delete(p.subscribers, id)
  154. }
  155. func (p *publisher) sendUpdates(reqs requests) {
  156. if p.noOfSubscribers() != 0 {
  157. reqs.IncrRef()
  158. p.pubCh <- reqs
  159. }
  160. }
  161. func (p *publisher) noOfSubscribers() int {
  162. p.Lock()
  163. defer p.Unlock()
  164. return len(p.subscribers)
  165. }