stream_writer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "encoding/hex"
  8. "fmt"
  9. "sync"
  10. humanize "github.com/dustin/go-humanize"
  11. "github.com/pkg/errors"
  12. "google.golang.org/protobuf/proto"
  13. "github.com/dgraph-io/badger/v4/pb"
  14. "github.com/dgraph-io/badger/v4/table"
  15. "github.com/dgraph-io/badger/v4/y"
  16. "github.com/dgraph-io/ristretto/v2/z"
  17. )
  18. // StreamWriter is used to write data coming from multiple streams. The streams must not have any
  19. // overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is
  20. // capable of generating such an output. So, this StreamWriter can be used at the other end to build
  21. // BadgerDB at a much faster pace by writing SSTables (and value logs) directly to LSM tree levels
  22. // without causing any compactions at all. This is way faster than using batched writer or using
  23. // transactions, but only applicable in situations where the keys are pre-sorted and the DB is being
  24. // bootstrapped. Existing data would get deleted when using this writer. So, this is only useful
  25. // when restoring from backup or replicating DB across servers.
  26. //
  27. // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new
  28. // DBs.
  29. type StreamWriter struct {
  30. writeLock sync.Mutex
  31. db *DB
  32. done func()
  33. throttle *y.Throttle
  34. maxVersion uint64
  35. writers map[uint32]*sortedWriter
  36. prevLevel int
  37. }
  38. // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
  39. // called. The memory usage of a StreamWriter is directly proportional to the number of streams
  40. // possible. So, efforts must be made to keep the number of streams low. Stream framework would
  41. // typically use 16 goroutines and hence create 16 streams.
  42. func (db *DB) NewStreamWriter() *StreamWriter {
  43. return &StreamWriter{
  44. db: db,
  45. // throttle shouldn't make much difference. Memory consumption is based on the number of
  46. // concurrent streams being processed.
  47. throttle: y.NewThrottle(16),
  48. writers: make(map[uint32]*sortedWriter),
  49. }
  50. }
  51. // Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
  52. // existing DB, stops compactions and any writes being done by other means. Be very careful when
  53. // calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
  54. // in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
  55. func (sw *StreamWriter) Prepare() error {
  56. sw.writeLock.Lock()
  57. defer sw.writeLock.Unlock()
  58. done, err := sw.db.dropAll()
  59. // Ensure that done() is never called more than once.
  60. var once sync.Once
  61. sw.done = func() { once.Do(done) }
  62. return err
  63. }
  64. // PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
  65. // In incremental stream write, the tables are written at one level above the current base level.
  66. func (sw *StreamWriter) PrepareIncremental() error {
  67. sw.writeLock.Lock()
  68. defer sw.writeLock.Unlock()
  69. // Ensure that done() is never called more than once.
  70. var once sync.Once
  71. // prepareToDrop will stop all the incoming writes and process any pending flush tasks.
  72. // Before we start writing, we'll stop the compactions because no one else should be writing to
  73. // the same level as the stream writer is writing to.
  74. f, err := sw.db.prepareToDrop()
  75. if err != nil {
  76. sw.done = func() { once.Do(f) }
  77. return err
  78. }
  79. sw.db.stopCompactions()
  80. done := func() {
  81. sw.db.startCompactions()
  82. f()
  83. }
  84. sw.done = func() { once.Do(done) }
  85. mts, decr := sw.db.getMemTables()
  86. defer decr()
  87. for _, m := range mts {
  88. if !m.sl.Empty() {
  89. return fmt.Errorf("Unable to do incremental writes because MemTable has data")
  90. }
  91. }
  92. isEmptyDB := true
  93. for _, level := range sw.db.Levels() {
  94. if level.NumTables > 0 {
  95. sw.prevLevel = level.Level
  96. isEmptyDB = false
  97. break
  98. }
  99. }
  100. if isEmptyDB {
  101. // If DB is empty, we should allow doing incremental stream write.
  102. return nil
  103. }
  104. if sw.prevLevel == 0 {
  105. // It seems that data is present in all levels from Lmax to L0. If we call flatten
  106. // on the tree, all the data will go to Lmax. All the levels above will be empty
  107. // after flatten call. Now, we should be able to use incremental stream writer again.
  108. if err := sw.db.Flatten(3); err != nil {
  109. return errors.Wrapf(err, "error during flatten in StreamWriter")
  110. }
  111. sw.prevLevel = len(sw.db.Levels()) - 1
  112. }
  113. return nil
  114. }
  115. // Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
  116. // would use to demux the writes. Write is thread safe and can be called concurrently by multiple
  117. // goroutines.
  118. func (sw *StreamWriter) Write(buf *z.Buffer) error {
  119. if buf.LenNoPadding() == 0 {
  120. return nil
  121. }
  122. // closedStreams keeps track of all streams which are going to be marked as done. We are
  123. // keeping track of all streams so that we can close them at the end, after inserting all
  124. // the valid kvs.
  125. closedStreams := make(map[uint32]struct{})
  126. streamReqs := make(map[uint32]*request)
  127. err := buf.SliceIterate(func(s []byte) error {
  128. var kv pb.KV
  129. if err := proto.Unmarshal(s, &kv); err != nil {
  130. return err
  131. }
  132. if kv.StreamDone {
  133. closedStreams[kv.StreamId] = struct{}{}
  134. return nil
  135. }
  136. // Panic if some kv comes after stream has been marked as closed.
  137. if _, ok := closedStreams[kv.StreamId]; ok {
  138. panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
  139. }
  140. sw.writeLock.Lock()
  141. if sw.maxVersion < kv.Version {
  142. sw.maxVersion = kv.Version
  143. }
  144. if sw.prevLevel == 0 {
  145. // If prevLevel is 0, that means that we have not written anything yet.
  146. // So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
  147. // so we can set prevLevel to len(levels).
  148. sw.prevLevel = len(sw.db.lc.levels)
  149. }
  150. sw.writeLock.Unlock()
  151. var meta, userMeta byte
  152. if len(kv.Meta) > 0 {
  153. meta = kv.Meta[0]
  154. }
  155. if len(kv.UserMeta) > 0 {
  156. userMeta = kv.UserMeta[0]
  157. }
  158. e := &Entry{
  159. Key: y.KeyWithTs(kv.Key, kv.Version),
  160. Value: y.Copy(kv.Value),
  161. UserMeta: userMeta,
  162. ExpiresAt: kv.ExpiresAt,
  163. meta: meta,
  164. }
  165. // If the value can be collocated with the key in LSM tree, we can skip
  166. // writing the value to value log.
  167. req := streamReqs[kv.StreamId]
  168. if req == nil {
  169. req = &request{}
  170. streamReqs[kv.StreamId] = req
  171. }
  172. req.Entries = append(req.Entries, e)
  173. return nil
  174. })
  175. if err != nil {
  176. return err
  177. }
  178. all := make([]*request, 0, len(streamReqs))
  179. for _, req := range streamReqs {
  180. all = append(all, req)
  181. }
  182. sw.writeLock.Lock()
  183. defer sw.writeLock.Unlock()
  184. // We are writing all requests to vlog even if some request belongs to already closed stream.
  185. // It is safe to do because we are panicking while writing to sorted writer, which will be nil
  186. // for closed stream. At restart, stream writer will drop all the data in Prepare function.
  187. if err := sw.db.vlog.write(all); err != nil {
  188. return err
  189. }
  190. for streamID, req := range streamReqs {
  191. writer, ok := sw.writers[streamID]
  192. if !ok {
  193. var err error
  194. writer, err = sw.newWriter(streamID)
  195. if err != nil {
  196. return y.Wrapf(err, "failed to create writer with ID %d", streamID)
  197. }
  198. sw.writers[streamID] = writer
  199. }
  200. if writer == nil {
  201. panic(fmt.Sprintf("write performed on closed stream: %d", streamID))
  202. }
  203. writer.reqCh <- req
  204. }
  205. // Now we can close any streams if required. We will make writer for
  206. // the closed streams as nil.
  207. for streamId := range closedStreams {
  208. writer, ok := sw.writers[streamId]
  209. if !ok {
  210. sw.db.opt.Warningf("Trying to close stream: %d, but no sorted "+
  211. "writer found for it", streamId)
  212. continue
  213. }
  214. writer.closer.SignalAndWait()
  215. if err := writer.Done(); err != nil {
  216. return err
  217. }
  218. sw.writers[streamId] = nil
  219. }
  220. return nil
  221. }
  222. // Flush is called once we are done writing all the entries. It syncs DB directories. It also
  223. // updates Oracle with maxVersion found in all entries (if DB is not managed).
  224. func (sw *StreamWriter) Flush() error {
  225. sw.writeLock.Lock()
  226. defer sw.writeLock.Unlock()
  227. defer sw.done()
  228. for _, writer := range sw.writers {
  229. if writer != nil {
  230. writer.closer.SignalAndWait()
  231. }
  232. }
  233. for _, writer := range sw.writers {
  234. if writer == nil {
  235. continue
  236. }
  237. if err := writer.Done(); err != nil {
  238. return err
  239. }
  240. }
  241. if !sw.db.opt.managedTxns {
  242. if sw.db.orc != nil {
  243. sw.db.orc.Stop()
  244. }
  245. if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion {
  246. sw.maxVersion = curMax
  247. }
  248. sw.db.orc = newOracle(sw.db.opt)
  249. sw.db.orc.nextTxnTs = sw.maxVersion
  250. sw.db.orc.txnMark.Done(sw.maxVersion)
  251. sw.db.orc.readMark.Done(sw.maxVersion)
  252. sw.db.orc.incrementNextTs()
  253. }
  254. // Wait for all files to be written.
  255. if err := sw.throttle.Finish(); err != nil {
  256. return err
  257. }
  258. // Sort tables at the end.
  259. for _, l := range sw.db.lc.levels {
  260. l.sortTables()
  261. }
  262. // Now sync the directories, so all the files are registered.
  263. if sw.db.opt.ValueDir != sw.db.opt.Dir {
  264. if err := sw.db.syncDir(sw.db.opt.ValueDir); err != nil {
  265. return err
  266. }
  267. }
  268. if err := sw.db.syncDir(sw.db.opt.Dir); err != nil {
  269. return err
  270. }
  271. return sw.db.lc.validate()
  272. }
  273. // Cancel signals all goroutines to exit. Calling defer sw.Cancel() immediately after creating a new StreamWriter
  274. // ensures that writes are unblocked even upon early return. Note that dropAll() is not called here, so any
  275. // partially written data will not be erased until a new StreamWriter is initialized.
  276. func (sw *StreamWriter) Cancel() {
  277. sw.writeLock.Lock()
  278. defer sw.writeLock.Unlock()
  279. for _, writer := range sw.writers {
  280. if writer != nil {
  281. writer.closer.Signal()
  282. }
  283. }
  284. for _, writer := range sw.writers {
  285. if writer != nil {
  286. writer.closer.Wait()
  287. }
  288. }
  289. if err := sw.throttle.Finish(); err != nil {
  290. sw.db.opt.Errorf("error in throttle.Finish: %+v", err)
  291. }
  292. // Handle Cancel() being called before Prepare().
  293. if sw.done != nil {
  294. sw.done()
  295. }
  296. }
  297. type sortedWriter struct {
  298. db *DB
  299. throttle *y.Throttle
  300. opts table.Options
  301. builder *table.Builder
  302. lastKey []byte
  303. level int
  304. streamID uint32
  305. reqCh chan *request
  306. // Have separate closer for each writer, as it can be closed at any time.
  307. closer *z.Closer
  308. }
  309. func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
  310. bopts := buildTableOptions(sw.db)
  311. for i := 2; i < sw.db.opt.MaxLevels; i++ {
  312. bopts.TableSize *= uint64(sw.db.opt.TableSizeMultiplier)
  313. }
  314. w := &sortedWriter{
  315. db: sw.db,
  316. opts: bopts,
  317. streamID: streamID,
  318. throttle: sw.throttle,
  319. builder: table.NewTableBuilder(bopts),
  320. reqCh: make(chan *request, 3),
  321. closer: z.NewCloser(1),
  322. level: sw.prevLevel - 1, // Write at the level just above the one we were writing to.
  323. }
  324. go w.handleRequests()
  325. return w, nil
  326. }
  327. func (w *sortedWriter) handleRequests() {
  328. defer w.closer.Done()
  329. process := func(req *request) {
  330. for i, e := range req.Entries {
  331. // If badger is running in InMemory mode, len(req.Ptrs) == 0.
  332. var vs y.ValueStruct
  333. if e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
  334. vs = y.ValueStruct{
  335. Value: e.Value,
  336. Meta: e.meta,
  337. UserMeta: e.UserMeta,
  338. ExpiresAt: e.ExpiresAt,
  339. }
  340. } else {
  341. vptr := req.Ptrs[i]
  342. vs = y.ValueStruct{
  343. Value: vptr.Encode(),
  344. Meta: e.meta | bitValuePointer,
  345. UserMeta: e.UserMeta,
  346. ExpiresAt: e.ExpiresAt,
  347. }
  348. }
  349. if err := w.Add(e.Key, vs); err != nil {
  350. panic(err)
  351. }
  352. }
  353. }
  354. for {
  355. select {
  356. case req := <-w.reqCh:
  357. process(req)
  358. case <-w.closer.HasBeenClosed():
  359. close(w.reqCh)
  360. for req := range w.reqCh {
  361. process(req)
  362. }
  363. return
  364. }
  365. }
  366. }
  367. // Add adds key and vs to sortedWriter.
  368. func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
  369. if len(w.lastKey) > 0 && y.CompareKeys(key, w.lastKey) <= 0 {
  370. return errors.Errorf("keys not in sorted order (last key: %s, key: %s)",
  371. hex.Dump(w.lastKey), hex.Dump(key))
  372. }
  373. sameKey := y.SameKey(key, w.lastKey)
  374. // Same keys should go into the same SSTable.
  375. if !sameKey && w.builder.ReachedCapacity() {
  376. if err := w.send(false); err != nil {
  377. return err
  378. }
  379. }
  380. w.lastKey = y.SafeCopy(w.lastKey, key)
  381. var vp valuePointer
  382. if vs.Meta&bitValuePointer > 0 {
  383. vp.Decode(vs.Value)
  384. }
  385. w.builder.Add(key, vs, vp.Len)
  386. return nil
  387. }
  388. func (w *sortedWriter) send(done bool) error {
  389. if err := w.throttle.Do(); err != nil {
  390. return err
  391. }
  392. go func(builder *table.Builder) {
  393. err := w.createTable(builder)
  394. w.throttle.Done(err)
  395. }(w.builder)
  396. // If done is true, this indicates we can close the writer.
  397. // No need to allocate underlying TableBuilder now.
  398. if done {
  399. w.builder = nil
  400. return nil
  401. }
  402. w.builder = table.NewTableBuilder(w.opts)
  403. return nil
  404. }
  405. // Done is called once we are done writing all keys and valueStructs
  406. // to sortedWriter. It completes writing current SST to disk.
  407. func (w *sortedWriter) Done() error {
  408. if w.builder.Empty() {
  409. w.builder.Close()
  410. // Assign builder as nil, so that underlying memory can be garbage collected.
  411. w.builder = nil
  412. return nil
  413. }
  414. return w.send(true)
  415. }
  416. func (w *sortedWriter) createTable(builder *table.Builder) error {
  417. defer builder.Close()
  418. if builder.Empty() {
  419. builder.Finish()
  420. return nil
  421. }
  422. fileID := w.db.lc.reserveFileID()
  423. var tbl *table.Table
  424. if w.db.opt.InMemory {
  425. data := builder.Finish()
  426. var err error
  427. if tbl, err = table.OpenInMemoryTable(data, fileID, builder.Opts()); err != nil {
  428. return err
  429. }
  430. } else {
  431. var err error
  432. fname := table.NewFilename(fileID, w.db.opt.Dir)
  433. if tbl, err = table.CreateTable(fname, builder); err != nil {
  434. return err
  435. }
  436. }
  437. lc := w.db.lc
  438. lhandler := lc.levels[w.level]
  439. // Now that table can be opened successfully, let's add this to the MANIFEST.
  440. change := &pb.ManifestChange{
  441. Id: tbl.ID(),
  442. KeyId: tbl.KeyID(),
  443. Op: pb.ManifestChange_CREATE,
  444. Level: uint32(lhandler.level),
  445. Compression: uint32(tbl.CompressionType()),
  446. }
  447. if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}); err != nil {
  448. return err
  449. }
  450. // We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
  451. // We can sort all tables only once during Flush() call.
  452. lhandler.addTable(tbl)
  453. // Release the ref held by OpenTable.
  454. _ = tbl.DecrRef()
  455. w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
  456. fileID, lhandler.level, w.streamID, humanize.IBytes(uint64(tbl.Size())))
  457. return nil
  458. }