| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520 |
- /*
- * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
- * SPDX-License-Identifier: Apache-2.0
- */
- package badger
- import (
- "encoding/hex"
- "fmt"
- "sync"
- "github.com/dustin/go-humanize"
- "google.golang.org/protobuf/proto"
- "github.com/dgraph-io/badger/v4/pb"
- "github.com/dgraph-io/badger/v4/table"
- "github.com/dgraph-io/badger/v4/y"
- "github.com/dgraph-io/ristretto/v2/z"
- )
- // StreamWriter is used to write data coming from multiple streams. The streams must not have any
- // overlapping key ranges. Within each stream, the keys must be sorted. Badger Stream framework is
- // capable of generating such an output. So, this StreamWriter can be used at the other end to build
- // BadgerDB at a much faster pace by writing SSTables (and value logs) directly to LSM tree levels
- // without causing any compactions at all. This is way faster than using batched writer or using
- // transactions, but only applicable in situations where the keys are pre-sorted and the DB is being
- // bootstrapped. Existing data would get deleted when using this writer. So, this is only useful
- // when restoring from backup or replicating DB across servers.
- //
- // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new
- // DBs.
- type StreamWriter struct {
- writeLock sync.Mutex
- db *DB
- done func()
- throttle *y.Throttle
- maxVersion uint64
- writers map[uint32]*sortedWriter
- prevLevel int
- }
- // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be
- // called. The memory usage of a StreamWriter is directly proportional to the number of streams
- // possible. So, efforts must be made to keep the number of streams low. Stream framework would
- // typically use 16 goroutines and hence create 16 streams.
- func (db *DB) NewStreamWriter() *StreamWriter {
- return &StreamWriter{
- db: db,
- // throttle shouldn't make much difference. Memory consumption is based on the number of
- // concurrent streams being processed.
- throttle: y.NewThrottle(16),
- writers: make(map[uint32]*sortedWriter),
- }
- }
- // Prepare should be called before writing any entry to StreamWriter. It deletes all data present in
- // existing DB, stops compactions and any writes being done by other means. Be very careful when
- // calling Prepare, because it could result in permanent data loss. Not calling Prepare would result
- // in a corrupt Badger instance. Use PrepareIncremental to do incremental stream write.
- func (sw *StreamWriter) Prepare() error {
- sw.writeLock.Lock()
- defer sw.writeLock.Unlock()
- done, err := sw.db.dropAll()
- // Ensure that done() is never called more than once.
- var once sync.Once
- sw.done = func() { once.Do(done) }
- return err
- }
- // PrepareIncremental should be called before writing any entry to StreamWriter incrementally.
- // In incremental stream write, the tables are written at one level above the current base level.
- func (sw *StreamWriter) PrepareIncremental() error {
- sw.writeLock.Lock()
- defer sw.writeLock.Unlock()
- // Ensure that done() is never called more than once.
- var once sync.Once
- // prepareToDrop will stop all the incoming writes and process any pending flush tasks.
- // Before we start writing, we'll stop the compactions because no one else should be writing to
- // the same level as the stream writer is writing to.
- f, err := sw.db.prepareToDrop()
- if err != nil {
- sw.done = func() { once.Do(f) }
- return err
- }
- sw.db.stopCompactions()
- done := func() {
- sw.db.startCompactions()
- f()
- }
- sw.done = func() { once.Do(done) }
- mts, decr := sw.db.getMemTables()
- defer decr()
- for _, m := range mts {
- if !m.sl.Empty() {
- return fmt.Errorf("Unable to do incremental writes because MemTable has data")
- }
- }
- isEmptyDB := true
- for _, level := range sw.db.Levels() {
- if level.NumTables > 0 {
- sw.prevLevel = level.Level
- isEmptyDB = false
- break
- }
- }
- if isEmptyDB {
- // If DB is empty, we should allow doing incremental stream write.
- return nil
- }
- if sw.prevLevel == 0 {
- // It seems that data is present in all levels from Lmax to L0. If we call flatten
- // on the tree, all the data will go to Lmax. All the levels above will be empty
- // after flatten call. Now, we should be able to use incremental stream writer again.
- if err := sw.db.Flatten(3); err != nil {
- return fmt.Errorf("error during flatten in StreamWriter: %w", err)
- }
- sw.prevLevel = len(sw.db.Levels()) - 1
- }
- return nil
- }
- // Write writes KVList to DB. Each KV within the list contains the stream id which StreamWriter
- // would use to demux the writes. Write is thread safe and can be called concurrently by multiple
- // goroutines.
- func (sw *StreamWriter) Write(buf *z.Buffer) error {
- if buf.LenNoPadding() == 0 {
- return nil
- }
- // closedStreams keeps track of all streams which are going to be marked as done. We are
- // keeping track of all streams so that we can close them at the end, after inserting all
- // the valid kvs.
- closedStreams := make(map[uint32]struct{})
- streamReqs := make(map[uint32]*request)
- err := buf.SliceIterate(func(s []byte) error {
- var kv pb.KV
- if err := proto.Unmarshal(s, &kv); err != nil {
- return err
- }
- if kv.StreamDone {
- closedStreams[kv.StreamId] = struct{}{}
- return nil
- }
- // Panic if some kv comes after stream has been marked as closed.
- if _, ok := closedStreams[kv.StreamId]; ok {
- panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId))
- }
- sw.writeLock.Lock()
- if sw.maxVersion < kv.Version {
- sw.maxVersion = kv.Version
- }
- if sw.prevLevel == 0 {
- // If prevLevel is 0, that means that we have not written anything yet.
- // So, we can write to the maxLevel. newWriter writes to prevLevel - 1,
- // so we can set prevLevel to len(levels).
- sw.prevLevel = len(sw.db.lc.levels)
- }
- sw.writeLock.Unlock()
- var meta, userMeta byte
- if len(kv.Meta) > 0 {
- meta = kv.Meta[0]
- }
- if len(kv.UserMeta) > 0 {
- userMeta = kv.UserMeta[0]
- }
- e := &Entry{
- Key: y.KeyWithTs(kv.Key, kv.Version),
- Value: y.Copy(kv.Value),
- UserMeta: userMeta,
- ExpiresAt: kv.ExpiresAt,
- meta: meta,
- }
- // If the value can be collocated with the key in LSM tree, we can skip
- // writing the value to value log.
- req := streamReqs[kv.StreamId]
- if req == nil {
- req = &request{}
- streamReqs[kv.StreamId] = req
- }
- req.Entries = append(req.Entries, e)
- return nil
- })
- if err != nil {
- return err
- }
- all := make([]*request, 0, len(streamReqs))
- for _, req := range streamReqs {
- all = append(all, req)
- }
- sw.writeLock.Lock()
- defer sw.writeLock.Unlock()
- // We are writing all requests to vlog even if some request belongs to already closed stream.
- // It is safe to do because we are panicking while writing to sorted writer, which will be nil
- // for closed stream. At restart, stream writer will drop all the data in Prepare function.
- if err := sw.db.vlog.write(all); err != nil {
- return err
- }
- for streamID, req := range streamReqs {
- writer, ok := sw.writers[streamID]
- if !ok {
- var err error
- writer, err = sw.newWriter(streamID)
- if err != nil {
- return y.Wrapf(err, "failed to create writer with ID %d", streamID)
- }
- sw.writers[streamID] = writer
- }
- if writer == nil {
- panic(fmt.Sprintf("write performed on closed stream: %d", streamID))
- }
- writer.reqCh <- req
- }
- // Now we can close any streams if required. We will make writer for
- // the closed streams as nil.
- for streamId := range closedStreams {
- writer, ok := sw.writers[streamId]
- if !ok {
- sw.db.opt.Warningf("Trying to close stream: %d, but no sorted "+
- "writer found for it", streamId)
- continue
- }
- writer.closer.SignalAndWait()
- if err := writer.Done(); err != nil {
- return err
- }
- sw.writers[streamId] = nil
- }
- return nil
- }
- // Flush is called once we are done writing all the entries. It syncs DB directories. It also
- // updates Oracle with maxVersion found in all entries (if DB is not managed).
- func (sw *StreamWriter) Flush() error {
- sw.writeLock.Lock()
- defer sw.writeLock.Unlock()
- defer sw.done()
- for _, writer := range sw.writers {
- if writer != nil {
- writer.closer.SignalAndWait()
- }
- }
- for _, writer := range sw.writers {
- if writer == nil {
- continue
- }
- if err := writer.Done(); err != nil {
- return err
- }
- }
- if !sw.db.opt.managedTxns {
- if sw.db.orc != nil {
- sw.db.orc.Stop()
- }
- if curMax := sw.db.orc.readTs(); curMax >= sw.maxVersion {
- sw.maxVersion = curMax
- }
- sw.db.orc = newOracle(sw.db.opt)
- sw.db.orc.nextTxnTs = sw.maxVersion
- sw.db.orc.txnMark.Done(sw.maxVersion)
- sw.db.orc.readMark.Done(sw.maxVersion)
- sw.db.orc.incrementNextTs()
- }
- // Wait for all files to be written.
- if err := sw.throttle.Finish(); err != nil {
- return err
- }
- // Sort tables at the end.
- for _, l := range sw.db.lc.levels {
- l.sortTables()
- }
- // Now sync the directories, so all the files are registered.
- if sw.db.opt.ValueDir != sw.db.opt.Dir {
- if err := sw.db.syncDir(sw.db.opt.ValueDir); err != nil {
- return err
- }
- }
- if err := sw.db.syncDir(sw.db.opt.Dir); err != nil {
- return err
- }
- return sw.db.lc.validate()
- }
- // Cancel signals all goroutines to exit. Calling defer sw.Cancel() immediately after creating a new StreamWriter
- // ensures that writes are unblocked even upon early return. Note that dropAll() is not called here, so any
- // partially written data will not be erased until a new StreamWriter is initialized.
- func (sw *StreamWriter) Cancel() {
- sw.writeLock.Lock()
- defer sw.writeLock.Unlock()
- for _, writer := range sw.writers {
- if writer != nil {
- writer.closer.Signal()
- }
- }
- for _, writer := range sw.writers {
- if writer != nil {
- writer.closer.Wait()
- }
- }
- if err := sw.throttle.Finish(); err != nil {
- sw.db.opt.Errorf("error in throttle.Finish: %+v", err)
- }
- // Handle Cancel() being called before Prepare().
- if sw.done != nil {
- sw.done()
- }
- }
- type sortedWriter struct {
- db *DB
- throttle *y.Throttle
- opts table.Options
- builder *table.Builder
- lastKey []byte
- level int
- streamID uint32
- reqCh chan *request
- // Have separate closer for each writer, as it can be closed at any time.
- closer *z.Closer
- }
- func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) {
- bopts := buildTableOptions(sw.db)
- for i := 2; i < sw.db.opt.MaxLevels; i++ {
- bopts.TableSize *= uint64(sw.db.opt.TableSizeMultiplier)
- }
- w := &sortedWriter{
- db: sw.db,
- opts: bopts,
- streamID: streamID,
- throttle: sw.throttle,
- builder: table.NewTableBuilder(bopts),
- reqCh: make(chan *request, 3),
- closer: z.NewCloser(1),
- level: sw.prevLevel - 1, // Write at the level just above the one we were writing to.
- }
- go w.handleRequests()
- return w, nil
- }
- func (w *sortedWriter) handleRequests() {
- defer w.closer.Done()
- process := func(req *request) {
- for i, e := range req.Entries {
- // If badger is running in InMemory mode, len(req.Ptrs) == 0.
- var vs y.ValueStruct
- if e.skipVlogAndSetThreshold(w.db.valueThreshold()) {
- vs = y.ValueStruct{
- Value: e.Value,
- Meta: e.meta,
- UserMeta: e.UserMeta,
- ExpiresAt: e.ExpiresAt,
- }
- } else {
- vptr := req.Ptrs[i]
- vs = y.ValueStruct{
- Value: vptr.Encode(),
- Meta: e.meta | bitValuePointer,
- UserMeta: e.UserMeta,
- ExpiresAt: e.ExpiresAt,
- }
- }
- if err := w.Add(e.Key, vs); err != nil {
- panic(err)
- }
- }
- }
- for {
- select {
- case req := <-w.reqCh:
- process(req)
- case <-w.closer.HasBeenClosed():
- close(w.reqCh)
- for req := range w.reqCh {
- process(req)
- }
- return
- }
- }
- }
- // Add adds key and vs to sortedWriter.
- func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error {
- if len(w.lastKey) > 0 && y.CompareKeys(key, w.lastKey) <= 0 {
- return fmt.Errorf("keys not in sorted order (last key: %s, key: %s)",
- hex.Dump(w.lastKey), hex.Dump(key))
- }
- sameKey := y.SameKey(key, w.lastKey)
- // Same keys should go into the same SSTable.
- if !sameKey && w.builder.ReachedCapacity() {
- if err := w.send(false); err != nil {
- return err
- }
- }
- w.lastKey = y.SafeCopy(w.lastKey, key)
- var vp valuePointer
- if vs.Meta&bitValuePointer > 0 {
- vp.Decode(vs.Value)
- }
- w.builder.Add(key, vs, vp.Len)
- return nil
- }
- func (w *sortedWriter) send(done bool) error {
- if err := w.throttle.Do(); err != nil {
- return err
- }
- go func(builder *table.Builder) {
- err := w.createTable(builder)
- w.throttle.Done(err)
- }(w.builder)
- // If done is true, this indicates we can close the writer.
- // No need to allocate underlying TableBuilder now.
- if done {
- w.builder = nil
- return nil
- }
- w.builder = table.NewTableBuilder(w.opts)
- return nil
- }
- // Done is called once we are done writing all keys and valueStructs
- // to sortedWriter. It completes writing current SST to disk.
- func (w *sortedWriter) Done() error {
- if w.builder.Empty() {
- w.builder.Close()
- // Assign builder as nil, so that underlying memory can be garbage collected.
- w.builder = nil
- return nil
- }
- return w.send(true)
- }
- func (w *sortedWriter) createTable(builder *table.Builder) error {
- defer builder.Close()
- if builder.Empty() {
- builder.Finish()
- return nil
- }
- fileID := w.db.lc.reserveFileID()
- var tbl *table.Table
- if w.db.opt.InMemory {
- data := builder.Finish()
- var err error
- if tbl, err = table.OpenInMemoryTable(data, fileID, builder.Opts()); err != nil {
- return err
- }
- } else {
- var err error
- fname := table.NewFilename(fileID, w.db.opt.Dir)
- if tbl, err = table.CreateTable(fname, builder); err != nil {
- return err
- }
- }
- lc := w.db.lc
- lhandler := lc.levels[w.level]
- // Now that table can be opened successfully, let's add this to the MANIFEST.
- change := &pb.ManifestChange{
- Id: tbl.ID(),
- KeyId: tbl.KeyID(),
- Op: pb.ManifestChange_CREATE,
- Level: uint32(lhandler.level),
- Compression: uint32(tbl.CompressionType()),
- }
- if err := w.db.manifest.addChanges([]*pb.ManifestChange{change}, w.db.opt); err != nil {
- return err
- }
- // We are not calling lhandler.replaceTables() here, as it sorts tables on every addition.
- // We can sort all tables only once during Flush() call.
- lhandler.addTable(tbl)
- // Release the ref held by OpenTable.
- _ = tbl.DecrRef()
- w.db.opt.Infof("Table created: %d at level: %d for stream: %d. Size: %s\n",
- fileID, lhandler.level, w.streamID, humanize.IBytes(uint64(tbl.Size())))
- return nil
- }
|