stream_writer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "encoding/hex"
  8. "fmt"
  9. "sync"
  10. "github.com/dustin/go-humanize"
  11. "google.golang.org/protobuf/proto"
  12. "github.com/dgraph-io/badger/v4/pb"
  13. "github.com/dgraph-io/badger/v4/table"
  14. "github.com/dgraph-io/badger/v4/y"
  15. "github.com/dgraph-io/ristretto/v2/z"
  16. )
  17. // StreamWriter is used to write data coming from multiple streams. The streams must not have any
  18. // overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is
  19. // capable of generating such an output. So, this StreamWriter can be used at the other end to build
  20. // BadgerDB at a much faster pace by writing SSTables (and value logs) directly to LSM tree levels
  21. // without causing any compactions at all. This is way faster than using batched writer or using
  22. // transactions, but only applicable in situations where the keys are pre-sorted and the DB is being
  23. // bootstrapped. Existing data would get deleted when using this writer. So, this is only useful
  24. // when restoring from backup or replicating DB across servers.
  25. //
  26. // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new
  27. // DBs.
  28. type StreamWriter struct {
  29. writeLock sync.Mutex
  30. db *DB
  31. done func()
  32. throttle *y.Throttle
  33. maxVersion uint64
  34. writers map[uint32]*sortedWriter
  35. prevLevel int
  36. }
  37. // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
  38. // called. The memory usage of a StreamWriter is directly proportional to the number of streams
  39. // possible. So, efforts must be made to keep the number of streams low. Stream framework would
  40. // typically use 16 goroutines and hence create 16 streams.
  41. func (db *DB) NewStreamWriter() *StreamWriter {
  42. return &StreamWriter{
  43. db: db,
  44. // throttle shouldn't make much difference. Memory consumption is based on the number of
  45. // concurrent streams being processed.
  46. throttle: y.NewThrottle(16),
  47. writers: make(map[uint32]*sortedWriter),
  48. }
  49. }
  50. // Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
  51. // existing DB, stops compactions and any writes being done by other means. Be very careful when
  52. // calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
  53. // in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
  54. func (sw *StreamWriter) Prepare() error {
  55. sw.writeLock.Lock()
  56. defer sw.writeLock.Unlock()
  57. done, err := sw.db.dropAll()
  58. // Ensure that done() is never called more than once.
  59. var once sync.Once
  60. sw.done = func() { once.Do(done) }
  61. return err
  62. }
  63. // PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
  64. // In incremental stream write, the tables are written at one level above the current base level.
  65. func (sw *StreamWriter) PrepareIncremental() error {
  66. sw.writeLock.Lock()
  67. defer sw.writeLock.Unlock()
  68. // Ensure that done() is never called more than once.
  69. var once sync.Once
  70. // prepareToDrop will stop all the incoming writes and process any pending flush tasks.
  71. // Before we start writing, we'll stop the compactions because no one else should be writing to
  72. // the same level as the stream writer is writing to.
  73. f, err := sw.db.prepareToDrop()
  74. if err != nil {
  75. sw.done = func() { once.Do(f) }
  76. return err
  77. }
  78. sw.db.stopCompactions()
  79. done := func() {
  80. sw.db.startCompactions()
  81. f()
  82. }
  83. sw.done = func() { once.Do(done) }
  84. mts, decr := sw.db.getMemTables()
  85. defer decr()
  86. for _, m := range mts {
  87. if !m.sl.Empty() {
  88. return fmt.Errorf("Unable to do incremental writes because MemTable has data")
  89. }
  90. }
  91. isEmptyDB := true
  92. for _, level := range sw.db.Levels() {
  93. if level.NumTables > 0 {
  94. sw.prevLevel = level.Level
  95. isEmptyDB = false
  96. break
  97. }
  98. }
  99. if isEmptyDB {
  100. // If DB is empty, we should allow doing incremental stream write.
  101. return nil
  102. }
  103. if sw.prevLevel == 0 {
  104. // It seems that data is present in all levels from Lmax to L0. If we call flatten
  105. // on the tree, all the data will go to Lmax. All the levels above will be empty
  106. // after flatten call. Now, we should be able to use incremental stream writer again.
  107. if err := sw.db.Flatten(3); err != nil {
  108. return fmt.Errorf("error during flatten in StreamWriter: %w", err)
  109. }
  110. sw.prevLevel = len(sw.db.Levels()) - 1
  111. }
  112. return nil
  113. }
  114. // Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
  115. // would use to demux the writes. Write is thread safe and can be called concurrently by multiple
  116. // goroutines.
  117. func (sw *StreamWriter) Write(buf *z.Buffer) error {
  118. if buf.LenNoPadding() == 0 {
  119. return nil
  120. }
  121. // closedStreams keeps track of all streams which are going to be marked as done. We are
  122. // keeping track of all streams so that we can close them at the end, after inserting all
  123. // the valid kvs.
  124. closedStreams := make(map[uint32]struct{})
  125. streamReqs := make(map[uint32]*request)
  126. err := buf.SliceIterate(func(s []byte) error {
  127. var kv pb.KV
  128. if err := proto.Unmarshal(s, &kv); err != nil {
  129. return err
  130. }
  131. if kv.StreamDone {
  132. closedStreams[kv.StreamId] = struct{}{}
  133. return nil
  134. }
  135. // Panic if some kv comes after stream has been marked as closed.
  136. if _, ok := closedStreams[kv.StreamId]; ok {
  137. panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
  138. }
  139. sw.writeLock.Lock()
  140. if sw.maxVersion < kv.Version {
  141. sw.maxVersion = kv.Version
  142. }
  143. if sw.prevLevel == 0 {
  144. // If prevLevel is 0, that means that we have not written anything yet.
  145. // So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
  146. // so we can set prevLevel to len(levels).
  147. sw.prevLevel = len(sw.db.lc.levels)
  148. }
  149. sw.writeLock.Unlock()
  150. var meta, userMeta byte
  151. if len(kv.Meta) > 0 {
  152. meta = kv.Meta[0]
  153. }
  154. if len(kv.UserMeta) > 0 {
  155. userMeta = kv.UserMeta[0]
  156. }
  157. e := &Entry{
  158. Key: y.KeyWithTs(kv.Key, kv.Version),
  159. Value: y.Copy(kv.Value),
  160. UserMeta: userMeta,
  161. ExpiresAt: kv.ExpiresAt,
  162. meta: meta,
  163. }
  164. // If the value can be collocated with the key in LSM tree, we can skip
  165. // writing the value to value log.
  166. req := streamReqs[kv.StreamId]
  167. if req == nil {
  168. req = &request{}
  169. streamReqs[kv.StreamId] = req
  170. }
  171. req.Entries = append(req.Entries, e)
  172. return nil
  173. })
  174. if err != nil {
  175. return err
  176. }
  177. all := make([]*request, 0, len(streamReqs))
  178. for _, req := range streamReqs {
  179. all = append(all, req)
  180. }
  181. sw.writeLock.Lock()
  182. defer sw.writeLock.Unlock()
  183. // We are writing all requests to vlog even if some request belongs to already closed stream.
  184. // It is safe to do because we are panicking while writing to sorted writer, which will be nil
  185. // for closed stream. At restart, stream writer will drop all the data in Prepare function.
  186. if err := sw.db.vlog.write(all); err != nil {
  187. return err
  188. }
  189. for streamID, req := range streamReqs {
  190. writer, ok := sw.writers[streamID]
  191. if !ok {
  192. var err error
  193. writer, err = sw.newWriter(streamID)
  194. if err != nil {
  195. return y.Wrapf(err, "failed to create writer with ID %d", streamID)
  196. }
  197. sw.writers[streamID] = writer
  198. }
  199. if writer == nil {
  200. panic(fmt.Sprintf("write performed on closed stream: %d", streamID))
  201. }
  202. writer.reqCh <- req
  203. }
  204. // Now we can close any streams if required. We will make writer for
  205. // the closed streams as nil.
  206. for streamId := range closedStreams {
  207. writer, ok := sw.writers[streamId]
  208. if !ok {
  209. sw.db.opt.Warningf("Trying to close stream: %d, but no sorted "+
  210. "writer found for it", streamId)
  211. continue
  212. }
  213. writer.closer.SignalAndWait()
  214. if err := writer.Done(); err != nil {
  215. return err
  216. }
  217. sw.writers[streamId] = nil
  218. }
  219. return nil
  220. }
  221. // Flush is called once we are done writing all the entries. It syncs DB directories. It also
  222. // updates Oracle with maxVersion found in all entries (if DB is not managed).
  223. func (sw *StreamWriter) Flush() error {
  224. sw.writeLock.Lock()
  225. defer sw.writeLock.Unlock()
  226. defer sw.done()
  227. for _, writer := range sw.writers {
  228. if writer != nil {
  229. writer.closer.SignalAndWait()
  230. }
  231. }
  232. for _, writer := range sw.writers {
  233. if writer == nil {
  234. continue
  235. }
  236. if err := writer.Done(); err != nil {
  237. return err
  238. }
  239. }
  240. if !sw.db.opt.managedTxns {
  241. if sw.db.orc != nil {
  242. sw.db.orc.Stop()
  243. }
  244. if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion {
  245. sw.maxVersion = curMax
  246. }
  247. sw.db.orc = newOracle(sw.db.opt)
  248. sw.db.orc.nextTxnTs = sw.maxVersion
  249. sw.db.orc.txnMark.Done(sw.maxVersion)
  250. sw.db.orc.readMark.Done(sw.maxVersion)
  251. sw.db.orc.incrementNextTs()
  252. }
  253. // Wait for all files to be written.
  254. if err := sw.throttle.Finish(); err != nil {
  255. return err
  256. }
  257. // Sort tables at the end.
  258. for _, l := range sw.db.lc.levels {
  259. l.sortTables()
  260. }
  261. // Now sync the directories, so all the files are registered.
  262. if sw.db.opt.ValueDir != sw.db.opt.Dir {
  263. if err := sw.db.syncDir(sw.db.opt.ValueDir); err != nil {
  264. return err
  265. }
  266. }
  267. if err := sw.db.syncDir(sw.db.opt.Dir); err != nil {
  268. return err
  269. }
  270. return sw.db.lc.validate()
  271. }
  272. // Cancel signals all goroutines to exit. Calling defer sw.Cancel() immediately after creating a new StreamWriter
  273. // ensures that writes are unblocked even upon early return. Note that dropAll() is not called here, so any
  274. // partially written data will not be erased until a new StreamWriter is initialized.
  275. func (sw *StreamWriter) Cancel() {
  276. sw.writeLock.Lock()
  277. defer sw.writeLock.Unlock()
  278. for _, writer := range sw.writers {
  279. if writer != nil {
  280. writer.closer.Signal()
  281. }
  282. }
  283. for _, writer := range sw.writers {
  284. if writer != nil {
  285. writer.closer.Wait()
  286. }
  287. }
  288. if err := sw.throttle.Finish(); err != nil {
  289. sw.db.opt.Errorf("error in throttle.Finish: %+v", err)
  290. }
  291. // Handle Cancel() being called before Prepare().
  292. if sw.done != nil {
  293. sw.done()
  294. }
  295. }
  296. type sortedWriter struct {
  297. db *DB
  298. throttle *y.Throttle
  299. opts table.Options
  300. builder *table.Builder
  301. lastKey []byte
  302. level int
  303. streamID uint32
  304. reqCh chan *request
  305. // Have separate closer for each writer, as it can be closed at any time.
  306. closer *z.Closer
  307. }
  308. func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
  309. bopts := buildTableOptions(sw.db)
  310. for i := 2; i < sw.db.opt.MaxLevels; i++ {
  311. bopts.TableSize *= uint64(sw.db.opt.TableSizeMultiplier)
  312. }
  313. w := &sortedWriter{
  314. db: sw.db,
  315. opts: bopts,
  316. streamID: streamID,
  317. throttle: sw.throttle,
  318. builder: table.NewTableBuilder(bopts),
  319. reqCh: make(chan *request, 3),
  320. closer: z.NewCloser(1),
  321. level: sw.prevLevel - 1, // Write at the level just above the one we were writing to.
  322. }
  323. go w.handleRequests()
  324. return w, nil
  325. }
  326. func (w *sortedWriter) handleRequests() {
  327. defer w.closer.Done()
  328. process := func(req *request) {
  329. for i, e := range req.Entries {
  330. // If badger is running in InMemory mode, len(req.Ptrs) == 0.
  331. var vs y.ValueStruct
  332. if e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
  333. vs = y.ValueStruct{
  334. Value: e.Value,
  335. Meta: e.meta,
  336. UserMeta: e.UserMeta,
  337. ExpiresAt: e.ExpiresAt,
  338. }
  339. } else {
  340. vptr := req.Ptrs[i]
  341. vs = y.ValueStruct{
  342. Value: vptr.Encode(),
  343. Meta: e.meta | bitValuePointer,
  344. UserMeta: e.UserMeta,
  345. ExpiresAt: e.ExpiresAt,
  346. }
  347. }
  348. if err := w.Add(e.Key, vs); err != nil {
  349. panic(err)
  350. }
  351. }
  352. }
  353. for {
  354. select {
  355. case req := <-w.reqCh:
  356. process(req)
  357. case <-w.closer.HasBeenClosed():
  358. close(w.reqCh)
  359. for req := range w.reqCh {
  360. process(req)
  361. }
  362. return
  363. }
  364. }
  365. }
  366. // Add adds key and vs to sortedWriter.
  367. func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
  368. if len(w.lastKey) > 0 && y.CompareKeys(key, w.lastKey) <= 0 {
  369. return fmt.Errorf("keys not in sorted order (last key: %s, key: %s)",
  370. hex.Dump(w.lastKey), hex.Dump(key))
  371. }
  372. sameKey := y.SameKey(key, w.lastKey)
  373. // Same keys should go into the same SSTable.
  374. if !sameKey && w.builder.ReachedCapacity() {
  375. if err := w.send(false); err != nil {
  376. return err
  377. }
  378. }
  379. w.lastKey = y.SafeCopy(w.lastKey, key)
  380. var vp valuePointer
  381. if vs.Meta&bitValuePointer > 0 {
  382. vp.Decode(vs.Value)
  383. }
  384. w.builder.Add(key, vs, vp.Len)
  385. return nil
  386. }
  387. func (w *sortedWriter) send(done bool) error {
  388. if err := w.throttle.Do(); err != nil {
  389. return err
  390. }
  391. go func(builder *table.Builder) {
  392. err := w.createTable(builder)
  393. w.throttle.Done(err)
  394. }(w.builder)
  395. // If done is true, this indicates we can close the writer.
  396. // No need to allocate underlying TableBuilder now.
  397. if done {
  398. w.builder = nil
  399. return nil
  400. }
  401. w.builder = table.NewTableBuilder(w.opts)
  402. return nil
  403. }
  404. // Done is called once we are done writing all keys and valueStructs
  405. // to sortedWriter. It completes writing current SST to disk.
  406. func (w *sortedWriter) Done() error {
  407. if w.builder.Empty() {
  408. w.builder.Close()
  409. // Assign builder as nil, so that underlying memory can be garbage collected.
  410. w.builder = nil
  411. return nil
  412. }
  413. return w.send(true)
  414. }
  415. func (w *sortedWriter) createTable(builder *table.Builder) error {
  416. defer builder.Close()
  417. if builder.Empty() {
  418. builder.Finish()
  419. return nil
  420. }
  421. fileID := w.db.lc.reserveFileID()
  422. var tbl *table.Table
  423. if w.db.opt.InMemory {
  424. data := builder.Finish()
  425. var err error
  426. if tbl, err = table.OpenInMemoryTable(data, fileID, builder.Opts()); err != nil {
  427. return err
  428. }
  429. } else {
  430. var err error
  431. fname := table.NewFilename(fileID, w.db.opt.Dir)
  432. if tbl, err = table.CreateTable(fname, builder); err != nil {
  433. return err
  434. }
  435. }
  436. lc := w.db.lc
  437. lhandler := lc.levels[w.level]
  438. // Now that table can be opened successfully, let's add this to the MANIFEST.
  439. change := &pb.ManifestChange{
  440. Id: tbl.ID(),
  441. KeyId: tbl.KeyID(),
  442. Op: pb.ManifestChange_CREATE,
  443. Level: uint32(lhandler.level),
  444. Compression: uint32(tbl.CompressionType()),
  445. }
  446. if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}, w.db.opt); err != nil {
  447. return err
  448. }
  449. // We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
  450. // We can sort all tables only once during Flush() call.
  451. lhandler.addTable(tbl)
  452. // Release the ref held by OpenTable.
  453. _ = tbl.DecrRef()
  454. w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
  455. fileID, lhandler.level, w.streamID, humanize.IBytes(uint64(tbl.Size())))
  456. return nil
  457. }