stream.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. "sort"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. humanize "github.com/dustin/go-humanize"
  14. "google.golang.org/protobuf/proto"
  15. "github.com/dgraph-io/badger/v4/pb"
  16. "github.com/dgraph-io/badger/v4/y"
  17. "github.com/dgraph-io/ristretto/v2/z"
  18. )
  19. const batchSize = 16 << 20 // 16 MB
  20. // maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit
  21. // as a single list that is still over the limit will have to be sent as is since it
  22. // cannot be split further. This limit prevents the framework from creating batches
  23. // so big that sending them causes issues (e.g running into the max size gRPC limit).
  24. var maxStreamSize = uint64(100 << 20) // 100MB
  25. // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up
  26. // key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key
  27. // ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted
  28. // order, use Iterator.
  29. type Stream struct {
  30. // Prefix to only iterate over certain range of keys. If set to nil (default), Stream would
  31. // iterate over the entire DB.
  32. Prefix []byte
  33. // Number of goroutines to use for iterating over key ranges. Defaults to 8.
  34. NumGo int
  35. // Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can
  36. // be used to help differentiate them from other activities. Default is "Badger.Stream".
  37. LogPrefix string
  38. // ChooseKey is invoked each time a new key is encountered. Note that this is not called
  39. // on every version of the value, only the first encountered version (i.e. the highest version
  40. // of the value a key has). ChooseKey can be left nil to select all keys.
  41. //
  42. // Note: Calls to ChooseKey are concurrent.
  43. ChooseKey func(item *Item) bool
  44. // MaxSize is the maximum allowed size of a stream batch. This is a soft limit
  45. // as a single list that is still over the limit will have to be sent as is since it
  46. // cannot be split further. This limit prevents the framework from creating batches
  47. // so big that sending them causes issues (e.g running into the max size gRPC limit).
  48. // If necessary, set it up before the Stream starts synchronisation
  49. // This is not a concurrency-safe setting
  50. MaxSize uint64
  51. // KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
  52. // is upto the caller to iterate over the versions and generate zero, one or more KVs. It
  53. // is expected that the user would advance the iterator to go through the versions of the
  54. // values. However, the user MUST immediately return from this function on the first encounter
  55. // with a mismatching key. See example usage in ToList function. Can be left nil to use ToList
  56. // function by default.
  57. //
  58. // KeyToList has access to z.Allocator accessible via stream.Allocator(itr.ThreadId). This
  59. // allocator can be used to allocate KVs, to decrease the memory pressure on Go GC. Stream
  60. // framework takes care of releasing those resources after calling Send. AllocRef does
  61. // NOT need to be set in the returned KVList, as Stream framework would ignore that field,
  62. // instead using the allocator assigned to that thread id.
  63. //
  64. // Note: Calls to KeyToList are concurrent.
  65. KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error)
  66. // UseKeyToListWithThreadId is used to indicate that KeyToListWithThreadId should be used
  67. // instead of KeyToList. This is a new api that can be used to figure out parallelism
  68. // of the stream. Each threadId would be run serially. KeyToList being concurrent makes you
  69. // take care of concurrency in KeyToList. Here threadId could be used to do some things serially.
  70. // Once a thread finishes FinishThread() would be called.
  71. UseKeyToListWithThreadId bool
  72. KeyToListWithThreadId func(key []byte, itr *Iterator, threadId int) (*pb.KVList, error)
  73. FinishThread func(threadId int) (*pb.KVList, error)
  74. // This is the method where Stream sends the final output. All calls to Send are done by a
  75. // single goroutine, i.e. logic within Send method can expect single threaded execution.
  76. Send func(buf *z.Buffer) error
  77. // Read data above the sinceTs. All keys with version =< sinceTs will be ignored.
  78. SinceTs uint64
  79. readTs uint64
  80. db *DB
  81. rangeCh chan keyRange
  82. kvChan chan *z.Buffer
  83. nextStreamId atomic.Uint32
  84. doneMarkers bool
  85. scanned atomic.Uint64 // used to estimate the ETA for data scan.
  86. numProducers atomic.Int32
  87. }
  88. // SendDoneMarkers when true would send out done markers on the stream. False by default.
  89. func (st *Stream) SendDoneMarkers(done bool) {
  90. st.doneMarkers = done
  91. }
  92. // ToList is a default implementation of KeyToList. It picks up all valid versions of the key,
  93. // skipping over deleted or expired keys.
  94. func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
  95. a := itr.Alloc
  96. ka := a.Copy(key)
  97. list := &pb.KVList{}
  98. for ; itr.Valid(); itr.Next() {
  99. item := itr.Item()
  100. if item.IsDeletedOrExpired() {
  101. break
  102. }
  103. if !bytes.Equal(key, item.Key()) {
  104. // Break out on the first encounter with another key.
  105. break
  106. }
  107. kv := y.NewKV(a)
  108. kv.Key = ka
  109. if err := item.Value(func(val []byte) error {
  110. kv.Value = a.Copy(val)
  111. return nil
  112. }); err != nil {
  113. return nil, err
  114. }
  115. kv.Version = item.Version()
  116. kv.ExpiresAt = item.ExpiresAt()
  117. kv.UserMeta = a.Copy([]byte{item.UserMeta()})
  118. list.Kv = append(list.Kv, kv)
  119. if st.db.opt.NumVersionsToKeep == 1 {
  120. break
  121. }
  122. if item.DiscardEarlierVersions() {
  123. break
  124. }
  125. }
  126. return list, nil
  127. }
  128. // keyRange is [start, end), including start, excluding end. Do ensure that the start,
  129. // end byte slices are owned by keyRange struct.
  130. func (st *Stream) produceRanges(ctx context.Context) {
  131. ranges := st.db.Ranges(st.Prefix, st.NumGo)
  132. y.AssertTrue(len(ranges) > 0)
  133. y.AssertTrue(ranges[0].left == nil)
  134. y.AssertTrue(ranges[len(ranges)-1].right == nil)
  135. st.db.opt.Infof("Number of ranges found: %d\n", len(ranges))
  136. // Sort in descending order of size.
  137. sort.Slice(ranges, func(i, j int) bool {
  138. return ranges[i].size > ranges[j].size
  139. })
  140. for i, r := range ranges {
  141. st.rangeCh <- *r
  142. st.db.opt.Infof("Sent range %d for iteration: [%x, %x) of size: %s\n",
  143. i, r.left, r.right, humanize.IBytes(uint64(r.size)))
  144. }
  145. close(st.rangeCh)
  146. }
  147. // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan.
  148. func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
  149. st.numProducers.Add(1)
  150. defer st.numProducers.Add(-1)
  151. var txn *Txn
  152. if st.readTs > 0 {
  153. txn = st.db.NewTransactionAt(st.readTs, false)
  154. } else {
  155. txn = st.db.NewTransaction(false)
  156. }
  157. defer txn.Discard()
  158. // produceKVs is running iterate serially. So, we can define the outList here.
  159. outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
  160. defer func() {
  161. // The outList variable changes. So, we need to evaluate the variable in the defer. DO NOT
  162. // call `defer outList.Release()`.
  163. _ = outList.Release()
  164. }()
  165. iterate := func(kr keyRange) error {
  166. iterOpts := DefaultIteratorOptions
  167. iterOpts.AllVersions = true
  168. iterOpts.Prefix = st.Prefix
  169. iterOpts.PrefetchValues = true
  170. iterOpts.SinceTs = st.SinceTs
  171. itr := txn.NewIterator(iterOpts)
  172. itr.ThreadId = threadId
  173. defer itr.Close()
  174. itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate")
  175. defer itr.Alloc.Release()
  176. // This unique stream id is used to identify all the keys from this iteration.
  177. streamId := st.nextStreamId.Add(1)
  178. var scanned int
  179. sendIt := func() error {
  180. select {
  181. case st.kvChan <- outList:
  182. outList = z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
  183. st.scanned.Add(uint64(itr.scanned - scanned))
  184. scanned = itr.scanned
  185. case <-ctx.Done():
  186. return ctx.Err()
  187. }
  188. return nil
  189. }
  190. var prevKey []byte
  191. for itr.Seek(kr.left); itr.Valid(); {
  192. // it.Valid would only return true for keys with the provided Prefix in iterOpts.
  193. item := itr.Item()
  194. if bytes.Equal(item.Key(), prevKey) {
  195. itr.Next()
  196. continue
  197. }
  198. prevKey = append(prevKey[:0], item.Key()...)
  199. // Check if we reached the end of the key range.
  200. if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 {
  201. break
  202. }
  203. // Check if we should pick this key.
  204. if st.ChooseKey != nil && !st.ChooseKey(item) {
  205. continue
  206. }
  207. // Now convert to key value.
  208. itr.Alloc.Reset()
  209. var list *pb.KVList
  210. var err error
  211. if st.UseKeyToListWithThreadId {
  212. list, err = st.KeyToListWithThreadId(item.KeyCopy(nil), itr, threadId)
  213. } else {
  214. list, err = st.KeyToList(item.KeyCopy(nil), itr)
  215. }
  216. if err != nil {
  217. st.db.opt.Warningf("While reading key: %x, got error: %v", item.Key(), err)
  218. continue
  219. }
  220. if list == nil || len(list.Kv) == 0 {
  221. continue
  222. }
  223. for _, kv := range list.Kv {
  224. kv.StreamId = streamId
  225. KVToBuffer(kv, outList)
  226. if outList.LenNoPadding() < batchSize {
  227. continue
  228. }
  229. if err := sendIt(); err != nil {
  230. return err
  231. }
  232. }
  233. }
  234. if st.UseKeyToListWithThreadId {
  235. if kvs, err := st.FinishThread(threadId); err != nil {
  236. return err
  237. } else {
  238. for _, kv := range kvs.Kv {
  239. kv.StreamId = streamId
  240. KVToBuffer(kv, outList)
  241. if outList.LenNoPadding() < batchSize {
  242. continue
  243. }
  244. if err := sendIt(); err != nil {
  245. return err
  246. }
  247. }
  248. }
  249. }
  250. // Mark the stream as done.
  251. if st.doneMarkers {
  252. kv := &pb.KV{
  253. StreamId: streamId,
  254. StreamDone: true,
  255. }
  256. KVToBuffer(kv, outList)
  257. }
  258. return sendIt()
  259. }
  260. for {
  261. select {
  262. case kr, ok := <-st.rangeCh:
  263. if !ok {
  264. // Done with the keys.
  265. return nil
  266. }
  267. if err := iterate(kr); err != nil {
  268. return err
  269. }
  270. case <-ctx.Done():
  271. return ctx.Err()
  272. }
  273. }
  274. }
  275. func (st *Stream) streamKVs(ctx context.Context) error {
  276. onDiskSize, uncompressedSize := st.db.EstimateSize(st.Prefix)
  277. // Manish has seen uncompressed size to be in 20% error margin.
  278. uncompressedSize = uint64(float64(uncompressedSize) * 1.2)
  279. st.db.opt.Infof("%s Streaming about %s of uncompressed data (%s on disk)\n",
  280. st.LogPrefix, humanize.IBytes(uncompressedSize), humanize.IBytes(onDiskSize))
  281. tickerDur := 5 * time.Second
  282. var bytesSent uint64
  283. t := time.NewTicker(tickerDur)
  284. defer t.Stop()
  285. now := time.Now()
  286. sendBatch := func(batch *z.Buffer) error {
  287. defer func() { _ = batch.Release() }()
  288. sz := uint64(batch.LenNoPadding())
  289. if sz == 0 {
  290. return nil
  291. }
  292. bytesSent += sz
  293. // st.db.opt.Infof("%s Sending batch of size: %s.\n", st.LogPrefix, humanize.IBytes(sz))
  294. if err := st.Send(batch); err != nil {
  295. st.db.opt.Warningf("Error while sending: %v\n", err)
  296. return err
  297. }
  298. return nil
  299. }
  300. slurp := func(batch *z.Buffer) error {
  301. loop:
  302. for {
  303. // Send the batch immediately if it already exceeds the maximum allowed size.
  304. // If the size of the batch exceeds maxStreamSize, break from the loop to
  305. // avoid creating a batch that is so big that certain limits are reached.
  306. if uint64(batch.LenNoPadding()) > st.MaxSize {
  307. break loop
  308. }
  309. select {
  310. case kvs, ok := <-st.kvChan:
  311. if !ok {
  312. break loop
  313. }
  314. y.AssertTrue(kvs != nil)
  315. y.Check2(batch.Write(kvs.Bytes()))
  316. y.Check(kvs.Release())
  317. default:
  318. break loop
  319. }
  320. }
  321. return sendBatch(batch)
  322. } // end of slurp.
  323. writeRate := y.NewRateMonitor(20)
  324. scanRate := y.NewRateMonitor(20)
  325. outer:
  326. for {
  327. var batch *z.Buffer
  328. select {
  329. case <-ctx.Done():
  330. return ctx.Err()
  331. case <-t.C:
  332. // Instead of calculating speed over the entire lifetime, we average the speed over
  333. // ticker duration.
  334. writeRate.Capture(bytesSent)
  335. scanned := st.scanned.Load()
  336. scanRate.Capture(scanned)
  337. numProducers := st.numProducers.Load()
  338. st.db.opt.Infof("%s [%s] Scan (%d): ~%s/%s at %s/sec. Sent: %s at %s/sec."+
  339. " jemalloc: %s\n",
  340. st.LogPrefix, y.FixedDuration(time.Since(now)), numProducers,
  341. y.IBytesToString(scanned, 1), humanize.IBytes(uncompressedSize),
  342. humanize.IBytes(scanRate.Rate()),
  343. y.IBytesToString(bytesSent, 1), humanize.IBytes(writeRate.Rate()),
  344. humanize.IBytes(uint64(z.NumAllocBytes())))
  345. case kvs, ok := <-st.kvChan:
  346. if !ok {
  347. break outer
  348. }
  349. y.AssertTrue(kvs != nil)
  350. batch = kvs
  351. // Otherwise, slurp more keys into this batch.
  352. if err := slurp(batch); err != nil {
  353. return err
  354. }
  355. }
  356. }
  357. st.db.opt.Infof("%s Sent data of size %s\n", st.LogPrefix, humanize.IBytes(bytesSent))
  358. return nil
  359. }
  360. // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of
  361. // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single
  362. // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also
  363. // spits logs out to Infof, using provided LogPrefix. Note that all calls to Output.Send
  364. // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
  365. // return that error. Orchestrate can be called multiple times, but in serial order.
  366. func (st *Stream) Orchestrate(ctx context.Context) error {
  367. ctx, cancel := context.WithCancel(ctx)
  368. defer cancel()
  369. st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists.
  370. // kvChan should only have a small capacity to ensure that we don't buffer up too much data if
  371. // sending is slow. Page size is set to 4MB, which is used to lazily cap the size of each
  372. // KVList. To get 128MB buffer, we can set the channel size to 32.
  373. st.kvChan = make(chan *z.Buffer, 32)
  374. if st.KeyToList == nil {
  375. st.KeyToList = st.ToList
  376. }
  377. // Picks up ranges from Badger, and sends them to rangeCh.
  378. go st.produceRanges(ctx)
  379. errCh := make(chan error, st.NumGo) // Stores error by consumeKeys.
  380. var wg sync.WaitGroup
  381. for i := 0; i < st.NumGo; i++ {
  382. wg.Add(1)
  383. go func(threadId int) {
  384. defer wg.Done()
  385. // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan.
  386. if err := st.produceKVs(ctx, threadId); err != nil {
  387. select {
  388. case errCh <- err:
  389. default:
  390. }
  391. }
  392. }(i)
  393. }
  394. // Pick up key-values from kvChan and send to stream.
  395. kvErr := make(chan error, 1)
  396. go func() {
  397. // Picks up KV lists from kvChan, and sends them to Output.
  398. err := st.streamKVs(ctx)
  399. if err != nil {
  400. cancel() // Stop all the go routines.
  401. }
  402. kvErr <- err
  403. }()
  404. wg.Wait() // Wait for produceKVs to be over.
  405. close(st.kvChan) // Now we can close kvChan.
  406. defer func() {
  407. // If due to some error, we have buffers left in kvChan, we should release them.
  408. for buf := range st.kvChan {
  409. _ = buf.Release()
  410. }
  411. }()
  412. select {
  413. case err := <-errCh: // Check error from produceKVs.
  414. return err
  415. default:
  416. }
  417. // Wait for key streaming to be over.
  418. err := <-kvErr
  419. return err
  420. }
  421. func (db *DB) newStream() *Stream {
  422. return &Stream{
  423. db: db,
  424. NumGo: db.opt.NumGoroutines,
  425. LogPrefix: "Badger.Stream",
  426. MaxSize: maxStreamSize,
  427. }
  428. }
  429. // NewStream creates a new Stream.
  430. func (db *DB) NewStream() *Stream {
  431. if db.opt.managedTxns {
  432. panic("This API can not be called in managed mode.")
  433. }
  434. return db.newStream()
  435. }
  436. // NewStreamAt creates a new Stream at a particular timestamp. Should only be used with managed DB.
  437. func (db *DB) NewStreamAt(readTs uint64) *Stream {
  438. if !db.opt.managedTxns {
  439. panic("This API can only be called in managed mode.")
  440. }
  441. stream := db.newStream()
  442. stream.readTs = readTs
  443. return stream
  444. }
  445. func BufferToKVList(buf *z.Buffer) (*pb.KVList, error) {
  446. var list pb.KVList
  447. err := buf.SliceIterate(func(s []byte) error {
  448. kv := new(pb.KV)
  449. if err := proto.Unmarshal(s, kv); err != nil {
  450. return err
  451. }
  452. list.Kv = append(list.Kv, kv)
  453. return nil
  454. })
  455. return &list, err
  456. }
  457. func KVToBuffer(kv *pb.KV, buf *z.Buffer) {
  458. in := buf.SliceAllocate(proto.Size(kv))[:0]
  459. _, err := proto.MarshalOptions{}.MarshalAppend(in, kv)
  460. y.AssertTrue(err == nil)
  461. }