stream.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. "sort"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. humanize "github.com/dustin/go-humanize"
  14. "google.golang.org/protobuf/proto"
  15. "github.com/dgraph-io/badger/v4/pb"
  16. "github.com/dgraph-io/badger/v4/y"
  17. "github.com/dgraph-io/ristretto/v2/z"
  18. )
  19. const batchSize = 16 << 20 // 16 MB
  20. // maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit
  21. // as a single list that is still over the limit will have to be sent as is since it
  22. // cannot be split further. This limit prevents the framework from creating batches
  23. // so big that sending them causes issues (e.g running into the max size gRPC limit).
  24. var maxStreamSize = uint64(100 << 20) // 100MB
  25. // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up
  26. // key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key
  27. // ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted
  28. // order, use Iterator.
  29. type Stream struct {
  30. // Prefix to only iterate over certain range of keys. If set to nil (default), Stream would
  31. // iterate over the entire DB.
  32. Prefix []byte
  33. // Number of goroutines to use for iterating over key ranges. Defaults to 8.
  34. NumGo int
  35. // Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can
  36. // be used to help differentiate them from other activities. Default is "Badger.Stream".
  37. LogPrefix string
  38. // ChooseKey is invoked each time a new key is encountered. Note that this is not called
  39. // on every version of the value, only the first encountered version (i.e. the highest version
  40. // of the value a key has). ChooseKey can be left nil to select all keys.
  41. //
  42. // Note: Calls to ChooseKey are concurrent.
  43. ChooseKey func(item *Item) bool
  44. // MaxSize is the maximum allowed size of a stream batch. This is a soft limit
  45. // as a single list that is still over the limit will have to be sent as is since it
  46. // cannot be split further. This limit prevents the framework from creating batches
  47. // so big that sending them causes issues (e.g running into the max size gRPC limit).
  48. // If necessary, set it up before the Stream starts synchronisation
  49. // This is not a concurrency-safe setting
  50. MaxSize uint64
  51. // KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
  52. // is upto the caller to iterate over the versions and generate zero, one or more KVs. It
  53. // is expected that the user would advance the iterator to go through the versions of the
  54. // values. However, the user MUST immediately return from this function on the first encounter
  55. // with a mismatching key. See example usage in ToList function. Can be left nil to use ToList
  56. // function by default.
  57. //
  58. // KeyToList has access to z.Allocator accessible via stream.Allocator(itr.ThreadId). This
  59. // allocator can be used to allocate KVs, to decrease the memory pressure on Go GC. Stream
  60. // framework takes care of releasing those resources after calling Send. AllocRef does
  61. // NOT need to be set in the returned KVList, as Stream framework would ignore that field,
  62. // instead using the allocator assigned to that thread id.
  63. //
  64. // Note: Calls to KeyToList are concurrent.
  65. KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error)
  66. // This is the method where Stream sends the final output. All calls to Send are done by a
  67. // single goroutine, i.e. logic within Send method can expect single threaded execution.
  68. Send func(buf *z.Buffer) error
  69. // Read data above the sinceTs. All keys with version =< sinceTs will be ignored.
  70. SinceTs uint64
  71. readTs uint64
  72. db *DB
  73. rangeCh chan keyRange
  74. kvChan chan *z.Buffer
  75. nextStreamId atomic.Uint32
  76. doneMarkers bool
  77. scanned atomic.Uint64 // used to estimate the ETA for data scan.
  78. numProducers atomic.Int32
  79. }
  80. // SendDoneMarkers when true would send out done markers on the stream. False by default.
  81. func (st *Stream) SendDoneMarkers(done bool) {
  82. st.doneMarkers = done
  83. }
  84. // ToList is a default implementation of KeyToList. It picks up all valid versions of the key,
  85. // skipping over deleted or expired keys.
  86. func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
  87. a := itr.Alloc
  88. ka := a.Copy(key)
  89. list := &pb.KVList{}
  90. for ; itr.Valid(); itr.Next() {
  91. item := itr.Item()
  92. if item.IsDeletedOrExpired() {
  93. break
  94. }
  95. if !bytes.Equal(key, item.Key()) {
  96. // Break out on the first encounter with another key.
  97. break
  98. }
  99. kv := y.NewKV(a)
  100. kv.Key = ka
  101. if err := item.Value(func(val []byte) error {
  102. kv.Value = a.Copy(val)
  103. return nil
  104. }); err != nil {
  105. return nil, err
  106. }
  107. kv.Version = item.Version()
  108. kv.ExpiresAt = item.ExpiresAt()
  109. kv.UserMeta = a.Copy([]byte{item.UserMeta()})
  110. list.Kv = append(list.Kv, kv)
  111. if st.db.opt.NumVersionsToKeep == 1 {
  112. break
  113. }
  114. if item.DiscardEarlierVersions() {
  115. break
  116. }
  117. }
  118. return list, nil
  119. }
  120. // keyRange is [start, end), including start, excluding end. Do ensure that the start,
  121. // end byte slices are owned by keyRange struct.
  122. func (st *Stream) produceRanges(ctx context.Context) {
  123. ranges := st.db.Ranges(st.Prefix, 16)
  124. y.AssertTrue(len(ranges) > 0)
  125. y.AssertTrue(ranges[0].left == nil)
  126. y.AssertTrue(ranges[len(ranges)-1].right == nil)
  127. st.db.opt.Infof("Number of ranges found: %d\n", len(ranges))
  128. // Sort in descending order of size.
  129. sort.Slice(ranges, func(i, j int) bool {
  130. return ranges[i].size > ranges[j].size
  131. })
  132. for i, r := range ranges {
  133. st.rangeCh <- *r
  134. st.db.opt.Infof("Sent range %d for iteration: [%x, %x) of size: %s\n",
  135. i, r.left, r.right, humanize.IBytes(uint64(r.size)))
  136. }
  137. close(st.rangeCh)
  138. }
  139. // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan.
  140. func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
  141. st.numProducers.Add(1)
  142. defer st.numProducers.Add(-1)
  143. var txn *Txn
  144. if st.readTs > 0 {
  145. txn = st.db.NewTransactionAt(st.readTs, false)
  146. } else {
  147. txn = st.db.NewTransaction(false)
  148. }
  149. defer txn.Discard()
  150. // produceKVs is running iterate serially. So, we can define the outList here.
  151. outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
  152. defer func() {
  153. // The outList variable changes. So, we need to evaluate the variable in the defer. DO NOT
  154. // call `defer outList.Release()`.
  155. _ = outList.Release()
  156. }()
  157. iterate := func(kr keyRange) error {
  158. iterOpts := DefaultIteratorOptions
  159. iterOpts.AllVersions = true
  160. iterOpts.Prefix = st.Prefix
  161. iterOpts.PrefetchValues = false
  162. iterOpts.SinceTs = st.SinceTs
  163. itr := txn.NewIterator(iterOpts)
  164. itr.ThreadId = threadId
  165. defer itr.Close()
  166. itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate")
  167. defer itr.Alloc.Release()
  168. // This unique stream id is used to identify all the keys from this iteration.
  169. streamId := st.nextStreamId.Add(1)
  170. var scanned int
  171. sendIt := func() error {
  172. select {
  173. case st.kvChan <- outList:
  174. outList = z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
  175. st.scanned.Add(uint64(itr.scanned - scanned))
  176. scanned = itr.scanned
  177. case <-ctx.Done():
  178. return ctx.Err()
  179. }
  180. return nil
  181. }
  182. var prevKey []byte
  183. for itr.Seek(kr.left); itr.Valid(); {
  184. // it.Valid would only return true for keys with the provided Prefix in iterOpts.
  185. item := itr.Item()
  186. if bytes.Equal(item.Key(), prevKey) {
  187. itr.Next()
  188. continue
  189. }
  190. prevKey = append(prevKey[:0], item.Key()...)
  191. // Check if we reached the end of the key range.
  192. if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 {
  193. break
  194. }
  195. // Check if we should pick this key.
  196. if st.ChooseKey != nil && !st.ChooseKey(item) {
  197. continue
  198. }
  199. // Now convert to key value.
  200. itr.Alloc.Reset()
  201. list, err := st.KeyToList(item.KeyCopy(nil), itr)
  202. if err != nil {
  203. st.db.opt.Warningf("While reading key: %x, got error: %v", item.Key(), err)
  204. continue
  205. }
  206. if list == nil || len(list.Kv) == 0 {
  207. continue
  208. }
  209. for _, kv := range list.Kv {
  210. kv.StreamId = streamId
  211. KVToBuffer(kv, outList)
  212. if outList.LenNoPadding() < batchSize {
  213. continue
  214. }
  215. if err := sendIt(); err != nil {
  216. return err
  217. }
  218. }
  219. }
  220. // Mark the stream as done.
  221. if st.doneMarkers {
  222. kv := &pb.KV{
  223. StreamId: streamId,
  224. StreamDone: true,
  225. }
  226. KVToBuffer(kv, outList)
  227. }
  228. return sendIt()
  229. }
  230. for {
  231. select {
  232. case kr, ok := <-st.rangeCh:
  233. if !ok {
  234. // Done with the keys.
  235. return nil
  236. }
  237. if err := iterate(kr); err != nil {
  238. return err
  239. }
  240. case <-ctx.Done():
  241. return ctx.Err()
  242. }
  243. }
  244. }
  245. func (st *Stream) streamKVs(ctx context.Context) error {
  246. onDiskSize, uncompressedSize := st.db.EstimateSize(st.Prefix)
  247. // Manish has seen uncompressed size to be in 20% error margin.
  248. uncompressedSize = uint64(float64(uncompressedSize) * 1.2)
  249. st.db.opt.Infof("%s Streaming about %s of uncompressed data (%s on disk)\n",
  250. st.LogPrefix, humanize.IBytes(uncompressedSize), humanize.IBytes(onDiskSize))
  251. tickerDur := 5 * time.Second
  252. var bytesSent uint64
  253. t := time.NewTicker(tickerDur)
  254. defer t.Stop()
  255. now := time.Now()
  256. sendBatch := func(batch *z.Buffer) error {
  257. defer func() { _ = batch.Release() }()
  258. sz := uint64(batch.LenNoPadding())
  259. if sz == 0 {
  260. return nil
  261. }
  262. bytesSent += sz
  263. // st.db.opt.Infof("%s Sending batch of size: %s.\n", st.LogPrefix, humanize.IBytes(sz))
  264. if err := st.Send(batch); err != nil {
  265. st.db.opt.Warningf("Error while sending: %v\n", err)
  266. return err
  267. }
  268. return nil
  269. }
  270. slurp := func(batch *z.Buffer) error {
  271. loop:
  272. for {
  273. // Send the batch immediately if it already exceeds the maximum allowed size.
  274. // If the size of the batch exceeds maxStreamSize, break from the loop to
  275. // avoid creating a batch that is so big that certain limits are reached.
  276. if uint64(batch.LenNoPadding()) > st.MaxSize {
  277. break loop
  278. }
  279. select {
  280. case kvs, ok := <-st.kvChan:
  281. if !ok {
  282. break loop
  283. }
  284. y.AssertTrue(kvs != nil)
  285. y.Check2(batch.Write(kvs.Bytes()))
  286. y.Check(kvs.Release())
  287. default:
  288. break loop
  289. }
  290. }
  291. return sendBatch(batch)
  292. } // end of slurp.
  293. writeRate := y.NewRateMonitor(20)
  294. scanRate := y.NewRateMonitor(20)
  295. outer:
  296. for {
  297. var batch *z.Buffer
  298. select {
  299. case <-ctx.Done():
  300. return ctx.Err()
  301. case <-t.C:
  302. // Instead of calculating speed over the entire lifetime, we average the speed over
  303. // ticker duration.
  304. writeRate.Capture(bytesSent)
  305. scanned := st.scanned.Load()
  306. scanRate.Capture(scanned)
  307. numProducers := st.numProducers.Load()
  308. st.db.opt.Infof("%s [%s] Scan (%d): ~%s/%s at %s/sec. Sent: %s at %s/sec."+
  309. " jemalloc: %s\n",
  310. st.LogPrefix, y.FixedDuration(time.Since(now)), numProducers,
  311. y.IBytesToString(scanned, 1), humanize.IBytes(uncompressedSize),
  312. humanize.IBytes(scanRate.Rate()),
  313. y.IBytesToString(bytesSent, 1), humanize.IBytes(writeRate.Rate()),
  314. humanize.IBytes(uint64(z.NumAllocBytes())))
  315. case kvs, ok := <-st.kvChan:
  316. if !ok {
  317. break outer
  318. }
  319. y.AssertTrue(kvs != nil)
  320. batch = kvs
  321. // Otherwise, slurp more keys into this batch.
  322. if err := slurp(batch); err != nil {
  323. return err
  324. }
  325. }
  326. }
  327. st.db.opt.Infof("%s Sent data of size %s\n", st.LogPrefix, humanize.IBytes(bytesSent))
  328. return nil
  329. }
  330. // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of
  331. // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single
  332. // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also
  333. // spits logs out to Infof, using provided LogPrefix. Note that all calls to Output.Send
  334. // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
  335. // return that error. Orchestrate can be called multiple times, but in serial order.
  336. func (st *Stream) Orchestrate(ctx context.Context) error {
  337. ctx, cancel := context.WithCancel(ctx)
  338. defer cancel()
  339. st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists.
  340. // kvChan should only have a small capacity to ensure that we don't buffer up too much data if
  341. // sending is slow. Page size is set to 4MB, which is used to lazily cap the size of each
  342. // KVList. To get 128MB buffer, we can set the channel size to 32.
  343. st.kvChan = make(chan *z.Buffer, 32)
  344. if st.KeyToList == nil {
  345. st.KeyToList = st.ToList
  346. }
  347. // Picks up ranges from Badger, and sends them to rangeCh.
  348. go st.produceRanges(ctx)
  349. errCh := make(chan error, st.NumGo) // Stores error by consumeKeys.
  350. var wg sync.WaitGroup
  351. for i := 0; i < st.NumGo; i++ {
  352. wg.Add(1)
  353. go func(threadId int) {
  354. defer wg.Done()
  355. // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan.
  356. if err := st.produceKVs(ctx, threadId); err != nil {
  357. select {
  358. case errCh <- err:
  359. default:
  360. }
  361. }
  362. }(i)
  363. }
  364. // Pick up key-values from kvChan and send to stream.
  365. kvErr := make(chan error, 1)
  366. go func() {
  367. // Picks up KV lists from kvChan, and sends them to Output.
  368. err := st.streamKVs(ctx)
  369. if err != nil {
  370. cancel() // Stop all the go routines.
  371. }
  372. kvErr <- err
  373. }()
  374. wg.Wait() // Wait for produceKVs to be over.
  375. close(st.kvChan) // Now we can close kvChan.
  376. defer func() {
  377. // If due to some error, we have buffers left in kvChan, we should release them.
  378. for buf := range st.kvChan {
  379. _ = buf.Release()
  380. }
  381. }()
  382. select {
  383. case err := <-errCh: // Check error from produceKVs.
  384. return err
  385. default:
  386. }
  387. // Wait for key streaming to be over.
  388. err := <-kvErr
  389. return err
  390. }
  391. func (db *DB) newStream() *Stream {
  392. return &Stream{
  393. db: db,
  394. NumGo: db.opt.NumGoroutines,
  395. LogPrefix: "Badger.Stream",
  396. MaxSize: maxStreamSize,
  397. }
  398. }
  399. // NewStream creates a new Stream.
  400. func (db *DB) NewStream() *Stream {
  401. if db.opt.managedTxns {
  402. panic("This API can not be called in managed mode.")
  403. }
  404. return db.newStream()
  405. }
  406. // NewStreamAt creates a new Stream at a particular timestamp. Should only be used with managed DB.
  407. func (db *DB) NewStreamAt(readTs uint64) *Stream {
  408. if !db.opt.managedTxns {
  409. panic("This API can only be called in managed mode.")
  410. }
  411. stream := db.newStream()
  412. stream.readTs = readTs
  413. return stream
  414. }
  415. func BufferToKVList(buf *z.Buffer) (*pb.KVList, error) {
  416. var list pb.KVList
  417. err := buf.SliceIterate(func(s []byte) error {
  418. kv := new(pb.KV)
  419. if err := proto.Unmarshal(s, kv); err != nil {
  420. return err
  421. }
  422. list.Kv = append(list.Kv, kv)
  423. return nil
  424. })
  425. return &list, err
  426. }
  427. func KVToBuffer(kv *pb.KV, buf *z.Buffer) {
  428. in := buf.SliceAllocate(proto.Size(kv))[:0]
  429. _, err := proto.MarshalOptions{}.MarshalAppend(in, kv)
  430. y.AssertTrue(err == nil)
  431. }