stream.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. /*
  2. * Copyright 2018 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bytes"
  19. "context"
  20. "sort"
  21. "sync"
  22. "sync/atomic"
  23. "time"
  24. humanize "github.com/dustin/go-humanize"
  25. "google.golang.org/protobuf/proto"
  26. "github.com/dgraph-io/badger/v4/pb"
  27. "github.com/dgraph-io/badger/v4/y"
  28. "github.com/dgraph-io/ristretto/v2/z"
  29. )
  30. const batchSize = 16 << 20 // 16 MB
  31. // maxStreamSize is the maximum allowed size of a stream batch. This is a soft limit
  32. // as a single list that is still over the limit will have to be sent as is since it
  33. // cannot be split further. This limit prevents the framework from creating batches
  34. // so big that sending them causes issues (e.g running into the max size gRPC limit).
  35. var maxStreamSize = uint64(100 << 20) // 100MB
  36. // Stream provides a framework to concurrently iterate over a snapshot of Badger, pick up
  37. // key-values, batch them up and call Send. Stream does concurrent iteration over many smaller key
  38. // ranges. It does NOT send keys in lexicographical sorted order. To get keys in sorted
  39. // order, use Iterator.
  40. type Stream struct {
  41. // Prefix to only iterate over certain range of keys. If set to nil (default), Stream would
  42. // iterate over the entire DB.
  43. Prefix []byte
  44. // Number of goroutines to use for iterating over key ranges. Defaults to 8.
  45. NumGo int
  46. // Badger would produce log entries in Infof to indicate the progress of Stream. LogPrefix can
  47. // be used to help differentiate them from other activities. Default is "Badger.Stream".
  48. LogPrefix string
  49. // ChooseKey is invoked each time a new key is encountered. Note that this is not called
  50. // on every version of the value, only the first encountered version (i.e. the highest version
  51. // of the value a key has). ChooseKey can be left nil to select all keys.
  52. //
  53. // Note: Calls to ChooseKey are concurrent.
  54. ChooseKey func(item *Item) bool
  55. // MaxSize is the maximum allowed size of a stream batch. This is a soft limit
  56. // as a single list that is still over the limit will have to be sent as is since it
  57. // cannot be split further. This limit prevents the framework from creating batches
  58. // so big that sending them causes issues (e.g running into the max size gRPC limit).
  59. // If necessary, set it up before the Stream starts synchronisation
  60. // This is not a concurrency-safe setting
  61. MaxSize uint64
  62. // KeyToList, similar to ChooseKey, is only invoked on the highest version of the value. It
  63. // is upto the caller to iterate over the versions and generate zero, one or more KVs. It
  64. // is expected that the user would advance the iterator to go through the versions of the
  65. // values. However, the user MUST immediately return from this function on the first encounter
  66. // with a mismatching key. See example usage in ToList function. Can be left nil to use ToList
  67. // function by default.
  68. //
  69. // KeyToList has access to z.Allocator accessible via stream.Allocator(itr.ThreadId). This
  70. // allocator can be used to allocate KVs, to decrease the memory pressure on Go GC. Stream
  71. // framework takes care of releasing those resources after calling Send. AllocRef does
  72. // NOT need to be set in the returned KVList, as Stream framework would ignore that field,
  73. // instead using the allocator assigned to that thread id.
  74. //
  75. // Note: Calls to KeyToList are concurrent.
  76. KeyToList func(key []byte, itr *Iterator) (*pb.KVList, error)
  77. // This is the method where Stream sends the final output. All calls to Send are done by a
  78. // single goroutine, i.e. logic within Send method can expect single threaded execution.
  79. Send func(buf *z.Buffer) error
  80. // Read data above the sinceTs. All keys with version =< sinceTs will be ignored.
  81. SinceTs uint64
  82. readTs uint64
  83. db *DB
  84. rangeCh chan keyRange
  85. kvChan chan *z.Buffer
  86. nextStreamId atomic.Uint32
  87. doneMarkers bool
  88. scanned atomic.Uint64 // used to estimate the ETA for data scan.
  89. numProducers atomic.Int32
  90. }
  91. // SendDoneMarkers when true would send out done markers on the stream. False by default.
  92. func (st *Stream) SendDoneMarkers(done bool) {
  93. st.doneMarkers = done
  94. }
  95. // ToList is a default implementation of KeyToList. It picks up all valid versions of the key,
  96. // skipping over deleted or expired keys.
  97. func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) {
  98. a := itr.Alloc
  99. ka := a.Copy(key)
  100. list := &pb.KVList{}
  101. for ; itr.Valid(); itr.Next() {
  102. item := itr.Item()
  103. if item.IsDeletedOrExpired() {
  104. break
  105. }
  106. if !bytes.Equal(key, item.Key()) {
  107. // Break out on the first encounter with another key.
  108. break
  109. }
  110. kv := y.NewKV(a)
  111. kv.Key = ka
  112. if err := item.Value(func(val []byte) error {
  113. kv.Value = a.Copy(val)
  114. return nil
  115. }); err != nil {
  116. return nil, err
  117. }
  118. kv.Version = item.Version()
  119. kv.ExpiresAt = item.ExpiresAt()
  120. kv.UserMeta = a.Copy([]byte{item.UserMeta()})
  121. list.Kv = append(list.Kv, kv)
  122. if st.db.opt.NumVersionsToKeep == 1 {
  123. break
  124. }
  125. if item.DiscardEarlierVersions() {
  126. break
  127. }
  128. }
  129. return list, nil
  130. }
  131. // keyRange is [start, end), including start, excluding end. Do ensure that the start,
  132. // end byte slices are owned by keyRange struct.
  133. func (st *Stream) produceRanges(ctx context.Context) {
  134. ranges := st.db.Ranges(st.Prefix, 16)
  135. y.AssertTrue(len(ranges) > 0)
  136. y.AssertTrue(ranges[0].left == nil)
  137. y.AssertTrue(ranges[len(ranges)-1].right == nil)
  138. st.db.opt.Infof("Number of ranges found: %d\n", len(ranges))
  139. // Sort in descending order of size.
  140. sort.Slice(ranges, func(i, j int) bool {
  141. return ranges[i].size > ranges[j].size
  142. })
  143. for i, r := range ranges {
  144. st.rangeCh <- *r
  145. st.db.opt.Infof("Sent range %d for iteration: [%x, %x) of size: %s\n",
  146. i, r.left, r.right, humanize.IBytes(uint64(r.size)))
  147. }
  148. close(st.rangeCh)
  149. }
  150. // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan.
  151. func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
  152. st.numProducers.Add(1)
  153. defer st.numProducers.Add(-1)
  154. var txn *Txn
  155. if st.readTs > 0 {
  156. txn = st.db.NewTransactionAt(st.readTs, false)
  157. } else {
  158. txn = st.db.NewTransaction(false)
  159. }
  160. defer txn.Discard()
  161. // produceKVs is running iterate serially. So, we can define the outList here.
  162. outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
  163. defer func() {
  164. // The outList variable changes. So, we need to evaluate the variable in the defer. DO NOT
  165. // call `defer outList.Release()`.
  166. _ = outList.Release()
  167. }()
  168. iterate := func(kr keyRange) error {
  169. iterOpts := DefaultIteratorOptions
  170. iterOpts.AllVersions = true
  171. iterOpts.Prefix = st.Prefix
  172. iterOpts.PrefetchValues = false
  173. iterOpts.SinceTs = st.SinceTs
  174. itr := txn.NewIterator(iterOpts)
  175. itr.ThreadId = threadId
  176. defer itr.Close()
  177. itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate")
  178. defer itr.Alloc.Release()
  179. // This unique stream id is used to identify all the keys from this iteration.
  180. streamId := st.nextStreamId.Add(1)
  181. var scanned int
  182. sendIt := func() error {
  183. select {
  184. case st.kvChan <- outList:
  185. outList = z.NewBuffer(2*batchSize, "Stream.ProduceKVs")
  186. st.scanned.Add(uint64(itr.scanned - scanned))
  187. scanned = itr.scanned
  188. case <-ctx.Done():
  189. return ctx.Err()
  190. }
  191. return nil
  192. }
  193. var prevKey []byte
  194. for itr.Seek(kr.left); itr.Valid(); {
  195. // it.Valid would only return true for keys with the provided Prefix in iterOpts.
  196. item := itr.Item()
  197. if bytes.Equal(item.Key(), prevKey) {
  198. itr.Next()
  199. continue
  200. }
  201. prevKey = append(prevKey[:0], item.Key()...)
  202. // Check if we reached the end of the key range.
  203. if len(kr.right) > 0 && bytes.Compare(item.Key(), kr.right) >= 0 {
  204. break
  205. }
  206. // Check if we should pick this key.
  207. if st.ChooseKey != nil && !st.ChooseKey(item) {
  208. continue
  209. }
  210. // Now convert to key value.
  211. itr.Alloc.Reset()
  212. list, err := st.KeyToList(item.KeyCopy(nil), itr)
  213. if err != nil {
  214. st.db.opt.Warningf("While reading key: %x, got error: %v", item.Key(), err)
  215. continue
  216. }
  217. if list == nil || len(list.Kv) == 0 {
  218. continue
  219. }
  220. for _, kv := range list.Kv {
  221. kv.StreamId = streamId
  222. KVToBuffer(kv, outList)
  223. if outList.LenNoPadding() < batchSize {
  224. continue
  225. }
  226. if err := sendIt(); err != nil {
  227. return err
  228. }
  229. }
  230. }
  231. // Mark the stream as done.
  232. if st.doneMarkers {
  233. kv := &pb.KV{
  234. StreamId: streamId,
  235. StreamDone: true,
  236. }
  237. KVToBuffer(kv, outList)
  238. }
  239. return sendIt()
  240. }
  241. for {
  242. select {
  243. case kr, ok := <-st.rangeCh:
  244. if !ok {
  245. // Done with the keys.
  246. return nil
  247. }
  248. if err := iterate(kr); err != nil {
  249. return err
  250. }
  251. case <-ctx.Done():
  252. return ctx.Err()
  253. }
  254. }
  255. }
  256. func (st *Stream) streamKVs(ctx context.Context) error {
  257. onDiskSize, uncompressedSize := st.db.EstimateSize(st.Prefix)
  258. // Manish has seen uncompressed size to be in 20% error margin.
  259. uncompressedSize = uint64(float64(uncompressedSize) * 1.2)
  260. st.db.opt.Infof("%s Streaming about %s of uncompressed data (%s on disk)\n",
  261. st.LogPrefix, humanize.IBytes(uncompressedSize), humanize.IBytes(onDiskSize))
  262. tickerDur := 5 * time.Second
  263. var bytesSent uint64
  264. t := time.NewTicker(tickerDur)
  265. defer t.Stop()
  266. now := time.Now()
  267. sendBatch := func(batch *z.Buffer) error {
  268. defer func() { _ = batch.Release() }()
  269. sz := uint64(batch.LenNoPadding())
  270. if sz == 0 {
  271. return nil
  272. }
  273. bytesSent += sz
  274. // st.db.opt.Infof("%s Sending batch of size: %s.\n", st.LogPrefix, humanize.IBytes(sz))
  275. if err := st.Send(batch); err != nil {
  276. st.db.opt.Warningf("Error while sending: %v\n", err)
  277. return err
  278. }
  279. return nil
  280. }
  281. slurp := func(batch *z.Buffer) error {
  282. loop:
  283. for {
  284. // Send the batch immediately if it already exceeds the maximum allowed size.
  285. // If the size of the batch exceeds maxStreamSize, break from the loop to
  286. // avoid creating a batch that is so big that certain limits are reached.
  287. if uint64(batch.LenNoPadding()) > st.MaxSize {
  288. break loop
  289. }
  290. select {
  291. case kvs, ok := <-st.kvChan:
  292. if !ok {
  293. break loop
  294. }
  295. y.AssertTrue(kvs != nil)
  296. y.Check2(batch.Write(kvs.Bytes()))
  297. y.Check(kvs.Release())
  298. default:
  299. break loop
  300. }
  301. }
  302. return sendBatch(batch)
  303. } // end of slurp.
  304. writeRate := y.NewRateMonitor(20)
  305. scanRate := y.NewRateMonitor(20)
  306. outer:
  307. for {
  308. var batch *z.Buffer
  309. select {
  310. case <-ctx.Done():
  311. return ctx.Err()
  312. case <-t.C:
  313. // Instead of calculating speed over the entire lifetime, we average the speed over
  314. // ticker duration.
  315. writeRate.Capture(bytesSent)
  316. scanned := st.scanned.Load()
  317. scanRate.Capture(scanned)
  318. numProducers := st.numProducers.Load()
  319. st.db.opt.Infof("%s [%s] Scan (%d): ~%s/%s at %s/sec. Sent: %s at %s/sec."+
  320. " jemalloc: %s\n",
  321. st.LogPrefix, y.FixedDuration(time.Since(now)), numProducers,
  322. y.IBytesToString(scanned, 1), humanize.IBytes(uncompressedSize),
  323. humanize.IBytes(scanRate.Rate()),
  324. y.IBytesToString(bytesSent, 1), humanize.IBytes(writeRate.Rate()),
  325. humanize.IBytes(uint64(z.NumAllocBytes())))
  326. case kvs, ok := <-st.kvChan:
  327. if !ok {
  328. break outer
  329. }
  330. y.AssertTrue(kvs != nil)
  331. batch = kvs
  332. // Otherwise, slurp more keys into this batch.
  333. if err := slurp(batch); err != nil {
  334. return err
  335. }
  336. }
  337. }
  338. st.db.opt.Infof("%s Sent data of size %s\n", st.LogPrefix, humanize.IBytes(bytesSent))
  339. return nil
  340. }
  341. // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of
  342. // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single
  343. // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also
  344. // spits logs out to Infof, using provided LogPrefix. Note that all calls to Output.Send
  345. // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and
  346. // return that error. Orchestrate can be called multiple times, but in serial order.
  347. func (st *Stream) Orchestrate(ctx context.Context) error {
  348. ctx, cancel := context.WithCancel(ctx)
  349. defer cancel()
  350. st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists.
  351. // kvChan should only have a small capacity to ensure that we don't buffer up too much data if
  352. // sending is slow. Page size is set to 4MB, which is used to lazily cap the size of each
  353. // KVList. To get 128MB buffer, we can set the channel size to 32.
  354. st.kvChan = make(chan *z.Buffer, 32)
  355. if st.KeyToList == nil {
  356. st.KeyToList = st.ToList
  357. }
  358. // Picks up ranges from Badger, and sends them to rangeCh.
  359. go st.produceRanges(ctx)
  360. errCh := make(chan error, st.NumGo) // Stores error by consumeKeys.
  361. var wg sync.WaitGroup
  362. for i := 0; i < st.NumGo; i++ {
  363. wg.Add(1)
  364. go func(threadId int) {
  365. defer wg.Done()
  366. // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan.
  367. if err := st.produceKVs(ctx, threadId); err != nil {
  368. select {
  369. case errCh <- err:
  370. default:
  371. }
  372. }
  373. }(i)
  374. }
  375. // Pick up key-values from kvChan and send to stream.
  376. kvErr := make(chan error, 1)
  377. go func() {
  378. // Picks up KV lists from kvChan, and sends them to Output.
  379. err := st.streamKVs(ctx)
  380. if err != nil {
  381. cancel() // Stop all the go routines.
  382. }
  383. kvErr <- err
  384. }()
  385. wg.Wait() // Wait for produceKVs to be over.
  386. close(st.kvChan) // Now we can close kvChan.
  387. defer func() {
  388. // If due to some error, we have buffers left in kvChan, we should release them.
  389. for buf := range st.kvChan {
  390. _ = buf.Release()
  391. }
  392. }()
  393. select {
  394. case err := <-errCh: // Check error from produceKVs.
  395. return err
  396. default:
  397. }
  398. // Wait for key streaming to be over.
  399. err := <-kvErr
  400. return err
  401. }
  402. func (db *DB) newStream() *Stream {
  403. return &Stream{
  404. db: db,
  405. NumGo: db.opt.NumGoroutines,
  406. LogPrefix: "Badger.Stream",
  407. MaxSize: maxStreamSize,
  408. }
  409. }
  410. // NewStream creates a new Stream.
  411. func (db *DB) NewStream() *Stream {
  412. if db.opt.managedTxns {
  413. panic("This API can not be called in managed mode.")
  414. }
  415. return db.newStream()
  416. }
  417. // NewStreamAt creates a new Stream at a particular timestamp. Should only be used with managed DB.
  418. func (db *DB) NewStreamAt(readTs uint64) *Stream {
  419. if !db.opt.managedTxns {
  420. panic("This API can only be called in managed mode.")
  421. }
  422. stream := db.newStream()
  423. stream.readTs = readTs
  424. return stream
  425. }
  426. func BufferToKVList(buf *z.Buffer) (*pb.KVList, error) {
  427. var list pb.KVList
  428. err := buf.SliceIterate(func(s []byte) error {
  429. kv := new(pb.KV)
  430. if err := proto.Unmarshal(s, kv); err != nil {
  431. return err
  432. }
  433. list.Kv = append(list.Kv, kv)
  434. return nil
  435. })
  436. return &list, err
  437. }
  438. func KVToBuffer(kv *pb.KV, buf *z.Buffer) {
  439. in := buf.SliceAllocate(proto.Size(kv))[:0]
  440. _, err := proto.MarshalOptions{}.MarshalAppend(in, kv)
  441. y.AssertTrue(err == nil)
  442. }