| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188 |
- /*
- * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
- * SPDX-License-Identifier: Apache-2.0
- */
- package badger
- import (
- "bytes"
- "context"
- stderrors "errors"
- "fmt"
- "hash"
- "hash/crc32"
- "io"
- "math"
- "os"
- "sort"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "github.com/pkg/errors"
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/attribute"
- "github.com/dgraph-io/badger/v4/y"
- "github.com/dgraph-io/ristretto/v2/z"
- )
- // maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
- // uint32, so limiting at max uint32.
- var maxVlogFileSize uint32 = math.MaxUint32
- // Values have their first byte being byteData or byteDelete. This helps us distinguish between
- // a key that has never been seen and a key that has been explicitly deleted.
- const (
- bitDelete byte = 1 << 0 // Set if the key has been deleted.
- bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
- bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
- // Set if item shouldn't be discarded via compactions (used by merge operator)
- bitMergeEntry byte = 1 << 3
- // The MSB 2 bits are for transactions.
- bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
- bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
- mi int64 = 1 << 20 //nolint:unused
- // size of vlog header.
- // +----------------+------------------+
- // | keyID(8 bytes) | baseIV(12 bytes)|
- // +----------------+------------------+
- vlogHeaderSize = 20
- )
- var errStop = stderrors.New("Stop iteration")
- var errTruncate = stderrors.New("Do truncate")
- type logEntry func(e Entry, vp valuePointer) error
- type safeRead struct {
- k []byte
- v []byte
- recordOffset uint32
- lf *logFile
- }
- // hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
- // bytes read. The hashReader writes to h (hash) what it reads from r.
- type hashReader struct {
- r io.Reader
- h hash.Hash32
- bytesRead int // Number of bytes read.
- }
- func newHashReader(r io.Reader) *hashReader {
- hash := crc32.New(y.CastagnoliCrcTable)
- return &hashReader{
- r: r,
- h: hash,
- }
- }
- // Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
- func (t *hashReader) Read(p []byte) (int, error) {
- n, err := t.r.Read(p)
- if err != nil {
- return n, err
- }
- t.bytesRead += n
- return t.h.Write(p[:n])
- }
- // ReadByte reads exactly one byte from the reader. Returns error on failure.
- func (t *hashReader) ReadByte() (byte, error) {
- b := make([]byte, 1)
- _, err := t.Read(b)
- return b[0], err
- }
- // Sum32 returns the sum32 of the underlying hash.
- func (t *hashReader) Sum32() uint32 {
- return t.h.Sum32()
- }
- // Entry reads an entry from the provided reader. It also validates the checksum for every entry
- // read. Returns error on failure.
- func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
- tee := newHashReader(reader)
- var h header
- hlen, err := h.DecodeFrom(tee)
- if err != nil {
- return nil, err
- }
- if h.klen > uint32(1<<16) { // Key length must be below uint16.
- return nil, errTruncate
- }
- kl := int(h.klen)
- if cap(r.k) < kl {
- r.k = make([]byte, 2*kl)
- }
- vl := int(h.vlen)
- if cap(r.v) < vl {
- r.v = make([]byte, 2*vl)
- }
- e := &Entry{}
- e.offset = r.recordOffset
- e.hlen = hlen
- buf := make([]byte, h.klen+h.vlen)
- if _, err := io.ReadFull(tee, buf[:]); err != nil {
- if err == io.EOF {
- err = errTruncate
- }
- return nil, err
- }
- if r.lf.encryptionEnabled() {
- if buf, err = r.lf.decryptKV(buf[:], r.recordOffset); err != nil {
- return nil, err
- }
- }
- e.Key = buf[:h.klen]
- e.Value = buf[h.klen:]
- var crcBuf [crc32.Size]byte
- if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
- if err == io.EOF {
- err = errTruncate
- }
- return nil, err
- }
- crc := y.BytesToU32(crcBuf[:])
- if crc != tee.Sum32() {
- return nil, errTruncate
- }
- e.meta = h.meta
- e.UserMeta = h.userMeta
- e.ExpiresAt = h.expiresAt
- return e, nil
- }
- func (vlog *valueLog) rewrite(f *logFile) error {
- vlog.filesLock.RLock()
- for _, fid := range vlog.filesToBeDeleted {
- if fid == f.fid {
- vlog.filesLock.RUnlock()
- return errors.Errorf("value log file already marked for deletion fid: %d", fid)
- }
- }
- maxFid := vlog.maxFid
- y.AssertTruef(f.fid < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
- vlog.filesLock.RUnlock()
- vlog.opt.Infof("Rewriting fid: %d", f.fid)
- wb := make([]*Entry, 0, 1000)
- var size int64
- y.AssertTrue(vlog.db != nil)
- var count, moved int
- fe := func(e Entry) error {
- count++
- if count%100000 == 0 {
- vlog.opt.Debugf("Processing entry %d", count)
- }
- vs, err := vlog.db.get(e.Key)
- if err != nil {
- return err
- }
- if discardEntry(e, vs, vlog.db) {
- return nil
- }
- // Value is still present in value log.
- if len(vs.Value) == 0 {
- return errors.Errorf("Empty value: %+v", vs)
- }
- var vp valuePointer
- vp.Decode(vs.Value)
- // If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
- if vp.Fid > f.fid {
- return nil
- }
- // If the entry found from the LSM Tree points to an offset greater than the one
- // read from vlog, don't do anything.
- if vp.Offset > e.offset {
- return nil
- }
- // If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
- // insert them back into the DB.
- // NOTE: It might be possible that the entry read from the LSM Tree points to
- // an older vlog file. See the comments in the else part.
- if vp.Fid == f.fid && vp.Offset == e.offset {
- moved++
- // This new entry only contains the key, and a pointer to the value.
- ne := new(Entry)
- // Remove only the bitValuePointer and transaction markers. We
- // should keep the other bits.
- ne.meta = e.meta &^ (bitValuePointer | bitTxn | bitFinTxn)
- ne.UserMeta = e.UserMeta
- ne.ExpiresAt = e.ExpiresAt
- ne.Key = append([]byte{}, e.Key...)
- ne.Value = append([]byte{}, e.Value...)
- es := ne.estimateSizeAndSetThreshold(vlog.db.valueThreshold())
- // Consider size of value as well while considering the total size
- // of the batch. There have been reports of high memory usage in
- // rewrite because we don't consider the value size. See #1292.
- es += int64(len(e.Value))
- // Ensure length and size of wb is within transaction limits.
- if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
- size+es >= vlog.opt.maxBatchSize {
- if err := vlog.db.batchSet(wb); err != nil {
- return err
- }
- size = 0
- wb = wb[:0]
- }
- wb = append(wb, ne)
- size += es
- } else { //nolint:staticcheck
- // It might be possible that the entry read from LSM Tree points to
- // an older vlog file. This can happen in the following situation.
- // Assume DB is opened with
- // numberOfVersionsToKeep=1
- //
- // Now, if we have ONLY one key in the system "FOO" which has been
- // updated 3 times and the same key has been garbage collected 3
- // times, we'll have 3 versions of the movekey
- // for the same key "FOO".
- //
- // NOTE: moveKeyi is the gc'ed version of the original key with version i
- // We're calling the gc'ed keys as moveKey to simplify the
- // explanantion. We used to add move keys but we no longer do that.
- //
- // Assume we have 3 move keys in L0.
- // - moveKey1 (points to vlog file 10),
- // - moveKey2 (points to vlog file 14) and
- // - moveKey3 (points to vlog file 15).
- //
- // Also, assume there is another move key "moveKey1" (points to
- // vlog file 6) (this is also a move Key for key "FOO" ) on upper
- // levels (let's say 3). The move key "moveKey1" on level 0 was
- // inserted because vlog file 6 was GCed.
- //
- // Here's what the arrangement looks like
- // L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
- // L1 => ....
- // L2 => ....
- // L3 => (moveKey1 => vlog6)
- //
- // When L0 compaction runs, it keeps only moveKey3 because the number of versions
- // to keep is set to 1. (we've dropped moveKey1's latest version)
- //
- // The new arrangement of keys is
- // L0 => ....
- // L1 => (moveKey3 => vlog15)
- // L2 => ....
- // L3 => (moveKey1 => vlog6)
- //
- // Now if we try to GC vlog file 10, the entry read from vlog file
- // will point to vlog10 but the entry read from LSM Tree will point
- // to vlog6. The move key read from LSM tree will point to vlog6
- // because we've asked for version 1 of the move key.
- //
- // This might seem like an issue but it's not really an issue
- // because the user has set the number of versions to keep to 1 and
- // the latest version of moveKey points to the correct vlog file
- // and offset. The stale move key on L3 will be eventually dropped
- // by compaction because there is a newer versions in the upper
- // levels.
- }
- return nil
- }
- _, err := f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
- return fe(e)
- })
- if err != nil {
- return err
- }
- batchSize := 1024
- var loops int
- for i := 0; i < len(wb); {
- loops++
- if batchSize == 0 {
- vlog.db.opt.Warningf("We shouldn't reach batch size of zero.")
- return ErrNoRewrite
- }
- end := i + batchSize
- if end > len(wb) {
- end = len(wb)
- }
- if err := vlog.db.batchSet(wb[i:end]); err != nil {
- if err == ErrTxnTooBig {
- // Decrease the batch size to half.
- batchSize = batchSize / 2
- continue
- }
- return err
- }
- i += batchSize
- }
- vlog.opt.Infof("Processed %d entries in %d loops", len(wb), loops)
- vlog.opt.Infof("Total entries: %d. Moved: %d", count, moved)
- vlog.opt.Infof("Removing fid: %d", f.fid)
- var deleteFileNow bool
- // Entries written to LSM. Remove the older file now.
- {
- vlog.filesLock.Lock()
- // Just a sanity-check.
- if _, ok := vlog.filesMap[f.fid]; !ok {
- vlog.filesLock.Unlock()
- return errors.Errorf("Unable to find fid: %d", f.fid)
- }
- if vlog.iteratorCount() == 0 {
- delete(vlog.filesMap, f.fid)
- deleteFileNow = true
- } else {
- vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
- }
- vlog.filesLock.Unlock()
- }
- if deleteFileNow {
- if err := vlog.deleteLogFile(f); err != nil {
- return err
- }
- }
- return nil
- }
- func (vlog *valueLog) incrIteratorCount() {
- vlog.numActiveIterators.Add(1)
- }
- func (vlog *valueLog) iteratorCount() int {
- return int(vlog.numActiveIterators.Load())
- }
- func (vlog *valueLog) decrIteratorCount() error {
- num := vlog.numActiveIterators.Add(-1)
- if num != 0 {
- return nil
- }
- vlog.filesLock.Lock()
- lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
- for _, id := range vlog.filesToBeDeleted {
- lfs = append(lfs, vlog.filesMap[id])
- delete(vlog.filesMap, id)
- }
- vlog.filesToBeDeleted = nil
- vlog.filesLock.Unlock()
- for _, lf := range lfs {
- if err := vlog.deleteLogFile(lf); err != nil {
- return err
- }
- }
- return nil
- }
- func (vlog *valueLog) deleteLogFile(lf *logFile) error {
- if lf == nil {
- return nil
- }
- lf.lock.Lock()
- defer lf.lock.Unlock()
- // Delete fid from discard stats as well.
- vlog.discardStats.Update(lf.fid, -1)
- return lf.Delete()
- }
- func (vlog *valueLog) dropAll() (int, error) {
- // If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
- if vlog.db.opt.InMemory {
- return 0, nil
- }
- // We don't want to block dropAll on any pending transactions. So, don't worry about iterator
- // count.
- var count int
- deleteAll := func() error {
- vlog.filesLock.Lock()
- defer vlog.filesLock.Unlock()
- for _, lf := range vlog.filesMap {
- if err := vlog.deleteLogFile(lf); err != nil {
- return err
- }
- count++
- }
- vlog.filesMap = make(map[uint32]*logFile)
- vlog.maxFid = 0
- return nil
- }
- if err := deleteAll(); err != nil {
- return count, err
- }
- vlog.db.opt.Infof("Value logs deleted. Creating value log file: 1")
- if _, err := vlog.createVlogFile(); err != nil { // Called while writes are stopped.
- return count, err
- }
- return count, nil
- }
- func (db *DB) valueThreshold() int64 {
- return db.threshold.valueThreshold.Load()
- }
- type valueLog struct {
- dirPath string
- // guards our view of which files exist, which to be deleted, how many active iterators
- filesLock sync.RWMutex
- filesMap map[uint32]*logFile
- maxFid uint32
- filesToBeDeleted []uint32
- // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
- numActiveIterators atomic.Int32
- db *DB
- writableLogOffset atomic.Uint32 // read by read, written by write
- numEntriesWritten uint32
- opt Options
- garbageCh chan struct{}
- discardStats *discardStats
- }
- func vlogFilePath(dirPath string, fid uint32) string {
- return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
- }
- func (vlog *valueLog) fpath(fid uint32) string {
- return vlogFilePath(vlog.dirPath, fid)
- }
- func (vlog *valueLog) populateFilesMap() error {
- vlog.filesMap = make(map[uint32]*logFile)
- files, err := os.ReadDir(vlog.dirPath)
- if err != nil {
- return errFile(err, vlog.dirPath, "Unable to open log dir.")
- }
- found := make(map[uint64]struct{})
- for _, file := range files {
- if !strings.HasSuffix(file.Name(), ".vlog") {
- continue
- }
- fsz := len(file.Name())
- fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
- if err != nil {
- return errFile(err, file.Name(), "Unable to parse log id.")
- }
- if _, ok := found[fid]; ok {
- return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
- }
- found[fid] = struct{}{}
- lf := &logFile{
- fid: uint32(fid),
- path: vlog.fpath(uint32(fid)),
- registry: vlog.db.registry,
- }
- vlog.filesMap[uint32(fid)] = lf
- if vlog.maxFid < uint32(fid) {
- vlog.maxFid = uint32(fid)
- }
- }
- return nil
- }
- func (vlog *valueLog) createVlogFile() (*logFile, error) {
- fid := vlog.maxFid + 1
- path := vlog.fpath(fid)
- lf := &logFile{
- fid: fid,
- path: path,
- registry: vlog.db.registry,
- writeAt: vlogHeaderSize,
- opt: vlog.opt,
- }
- err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
- if err != z.NewFile && err != nil {
- return nil, err
- }
- vlog.filesLock.Lock()
- vlog.filesMap[fid] = lf
- y.AssertTrue(vlog.maxFid < fid)
- vlog.maxFid = fid
- // writableLogOffset is only written by write func, by read by Read func.
- // To avoid a race condition, all reads and updates to this variable must be
- // done via atomics.
- vlog.writableLogOffset.Store(vlogHeaderSize)
- vlog.numEntriesWritten = 0
- vlog.filesLock.Unlock()
- return lf, nil
- }
- func errFile(err error, path string, msg string) error {
- return fmt.Errorf("%s. Path=%s. Error=%v", msg, path, err)
- }
- // init initializes the value log struct. This initialization needs to happen
- // before compactions start.
- func (vlog *valueLog) init(db *DB) {
- vlog.opt = db.opt
- vlog.db = db
- // We don't need to open any vlog files or collect stats for GC if DB is opened
- // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
- if vlog.opt.InMemory {
- return
- }
- vlog.dirPath = vlog.opt.ValueDir
- vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
- lf, err := InitDiscardStats(vlog.opt)
- y.Check(err)
- vlog.discardStats = lf
- // See TestPersistLFDiscardStats for purpose of statement below.
- db.logToSyncChan(endVLogInitMsg)
- }
- func (vlog *valueLog) open(db *DB) error {
- // We don't need to open any vlog files or collect stats for GC if DB is opened
- // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
- if db.opt.InMemory {
- return nil
- }
- if err := vlog.populateFilesMap(); err != nil {
- return err
- }
- // If no files are found, then create a new file.
- if len(vlog.filesMap) == 0 {
- if vlog.opt.ReadOnly {
- return nil
- }
- _, err := vlog.createVlogFile()
- return y.Wrapf(err, "Error while creating log file in valueLog.open")
- }
- fids := vlog.sortedFids()
- for _, fid := range fids {
- lf, ok := vlog.filesMap[fid]
- y.AssertTrue(ok)
- // Just open in RDWR mode. This should not create a new log file.
- lf.opt = vlog.opt
- if err := lf.open(vlog.fpath(fid), os.O_RDWR,
- 2*vlog.opt.ValueLogFileSize); err != nil {
- return y.Wrapf(err, "Open existing file: %q", lf.path)
- }
- // We shouldn't delete the maxFid file.
- if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid {
- vlog.opt.Infof("Deleting empty file: %s", lf.path)
- if err := lf.Delete(); err != nil {
- return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
- }
- delete(vlog.filesMap, fid)
- }
- }
- if vlog.opt.ReadOnly {
- return nil
- }
- // Now we can read the latest value log file, and see if it needs truncation. We could
- // technically do this over all the value log files, but that would mean slowing down the value
- // log open.
- last, ok := vlog.filesMap[vlog.maxFid]
- y.AssertTrue(ok)
- lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
- func(_ Entry, vp valuePointer) error {
- return nil
- })
- if err != nil {
- return y.Wrapf(err, "while iterating over: %s", last.path)
- }
- if err := last.Truncate(int64(lastOff)); err != nil {
- return y.Wrapf(err, "while truncating last value log file: %s", last.path)
- }
- // Don't write to the old log file. Always create a new one.
- if _, err := vlog.createVlogFile(); err != nil {
- return y.Wrapf(err, "Error while creating log file in valueLog.open")
- }
- return nil
- }
- func (vlog *valueLog) Close() error {
- if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
- return nil
- }
- vlog.opt.Debugf("Stopping garbage collection of values.")
- var err error
- for id, lf := range vlog.filesMap {
- lf.lock.Lock() // We won’t release the lock.
- offset := int64(-1)
- if !vlog.opt.ReadOnly && id == vlog.maxFid {
- offset = int64(vlog.woffset())
- }
- if terr := lf.Close(offset); terr != nil && err == nil {
- err = terr
- }
- }
- if vlog.discardStats != nil {
- vlog.db.captureDiscardStats()
- if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
- err = terr
- }
- }
- return err
- }
- // sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
- // filesMap.
- func (vlog *valueLog) sortedFids() []uint32 {
- toBeDeleted := make(map[uint32]struct{})
- for _, fid := range vlog.filesToBeDeleted {
- toBeDeleted[fid] = struct{}{}
- }
- ret := make([]uint32, 0, len(vlog.filesMap))
- for fid := range vlog.filesMap {
- if _, ok := toBeDeleted[fid]; !ok {
- ret = append(ret, fid)
- }
- }
- sort.Slice(ret, func(i, j int) bool {
- return ret[i] < ret[j]
- })
- return ret
- }
- type request struct {
- // Input values
- Entries []*Entry
- // Output values and wait group stuff below
- Ptrs []valuePointer
- Wg sync.WaitGroup
- Err error
- ref atomic.Int32
- }
- func (req *request) reset() {
- req.Entries = req.Entries[:0]
- req.Ptrs = req.Ptrs[:0]
- req.Wg = sync.WaitGroup{}
- req.Err = nil
- req.ref.Store(0)
- }
- func (req *request) IncrRef() {
- req.ref.Add(1)
- }
- func (req *request) DecrRef() {
- nRef := req.ref.Add(-1)
- if nRef > 0 {
- return
- }
- req.Entries = nil
- requestPool.Put(req)
- }
- func (req *request) Wait() error {
- req.Wg.Wait()
- err := req.Err
- req.DecrRef() // DecrRef after writing to DB.
- return err
- }
- type requests []*request
- func (reqs requests) DecrRef() {
- for _, req := range reqs {
- req.DecrRef()
- }
- }
- func (reqs requests) IncrRef() {
- for _, req := range reqs {
- req.IncrRef()
- }
- }
- // sync function syncs content of latest value log file to disk. Syncing of value log directory is
- // not required here as it happens every time a value log file rotation happens(check createVlogFile
- // function). During rotation, previous value log file also gets synced to disk. It only syncs file
- // if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
- // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
- func (vlog *valueLog) sync() error {
- if vlog.opt.SyncWrites || vlog.opt.InMemory {
- return nil
- }
- vlog.filesLock.RLock()
- maxFid := vlog.maxFid
- curlf := vlog.filesMap[maxFid]
- // Sometimes it is possible that vlog.maxFid has been increased but file creation
- // with same id is still in progress and this function is called. In those cases
- // entry for the file might not be present in vlog.filesMap.
- if curlf == nil {
- vlog.filesLock.RUnlock()
- return nil
- }
- curlf.lock.RLock()
- vlog.filesLock.RUnlock()
- err := curlf.Sync()
- curlf.lock.RUnlock()
- return err
- }
- func (vlog *valueLog) woffset() uint32 {
- return vlog.writableLogOffset.Load()
- }
- // validateWrites will check whether the given requests can fit into 4GB vlog file.
- // NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
- // uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
- func (vlog *valueLog) validateWrites(reqs []*request) error {
- vlogOffset := uint64(vlog.woffset())
- for _, req := range reqs {
- // calculate size of the request.
- size := estimateRequestSize(req)
- estimatedVlogOffset := vlogOffset + size
- if estimatedVlogOffset > uint64(maxVlogFileSize) {
- return errors.Errorf("Request size offset %d is bigger than maximum offset %d",
- estimatedVlogOffset, maxVlogFileSize)
- }
- if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
- // We'll create a new vlog file if the estimated offset is greater or equal to
- // max vlog size. So, resetting the vlogOffset.
- vlogOffset = 0
- continue
- }
- // Estimated vlog offset will become current vlog offset if the vlog is not rotated.
- vlogOffset = estimatedVlogOffset
- }
- return nil
- }
- // estimateRequestSize returns the size that needed to be written for the given request.
- func estimateRequestSize(req *request) uint64 {
- size := uint64(0)
- for _, e := range req.Entries {
- size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
- }
- return size
- }
- // write is thread-unsafe by design and should not be called concurrently.
- func (vlog *valueLog) write(reqs []*request) error {
- if vlog.db.opt.InMemory {
- return nil
- }
- // Validate writes before writing to vlog. Because, we don't want to partially write and return
- // an error.
- if err := vlog.validateWrites(reqs); err != nil {
- return y.Wrapf(err, "while validating writes")
- }
- vlog.filesLock.RLock()
- maxFid := vlog.maxFid
- curlf := vlog.filesMap[maxFid]
- vlog.filesLock.RUnlock()
- defer func() {
- if vlog.opt.SyncWrites {
- if err := curlf.Sync(); err != nil {
- vlog.opt.Errorf("Error while curlf sync: %v\n", err)
- }
- }
- }()
- write := func(buf *bytes.Buffer) error {
- if buf.Len() == 0 {
- return nil
- }
- n := uint32(buf.Len())
- endOffset := vlog.writableLogOffset.Add(n)
- // Increase the file size if we cannot accommodate this entry.
- // [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
- if int(endOffset) >= len(curlf.Data) {
- if err := curlf.Truncate(int64(endOffset)); err != nil {
- return err
- }
- }
- start := int(endOffset - n)
- y.AssertTrue(copy(curlf.Data[start:], buf.Bytes()) == int(n))
- curlf.size.Store(endOffset)
- return nil
- }
- toDisk := func() error {
- if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) ||
- vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
- if err := curlf.doneWriting(vlog.woffset()); err != nil {
- return err
- }
- newlf, err := vlog.createVlogFile()
- if err != nil {
- return err
- }
- curlf = newlf
- }
- return nil
- }
- buf := new(bytes.Buffer)
- for i := range reqs {
- b := reqs[i]
- b.Ptrs = b.Ptrs[:0]
- var written, bytesWritten int
- valueSizes := make([]int64, 0, len(b.Entries))
- for j := range b.Entries {
- buf.Reset()
- e := b.Entries[j]
- valueSizes = append(valueSizes, int64(len(e.Value)))
- if e.skipVlogAndSetThreshold(vlog.db.valueThreshold()) {
- b.Ptrs = append(b.Ptrs, valuePointer{})
- continue
- }
- var p valuePointer
- p.Fid = curlf.fid
- p.Offset = vlog.woffset()
- // We should not store transaction marks in the vlog file because it will never have all
- // the entries in a transaction. If we store entries with transaction marks then value
- // GC will not be able to iterate on the entire vlog file.
- // But, we still want the entry to stay intact for the memTable WAL. So, store the meta
- // in a temporary variable and reassign it after writing to the value log.
- tmpMeta := e.meta
- e.meta = e.meta &^ (bitTxn | bitFinTxn)
- plen, err := curlf.encodeEntry(buf, e, p.Offset) // Now encode the entry into buffer.
- if err != nil {
- return err
- }
- // Restore the meta.
- e.meta = tmpMeta
- p.Len = uint32(plen)
- b.Ptrs = append(b.Ptrs, p)
- if err := write(buf); err != nil {
- return err
- }
- written++
- bytesWritten += buf.Len()
- // No need to flush anything, we write to file directly via mmap.
- }
- y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
- y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
- vlog.numEntriesWritten += uint32(written)
- vlog.db.threshold.update(valueSizes)
- // We write to disk here so that all entries that are part of the same transaction are
- // written to the same vlog file.
- if err := toDisk(); err != nil {
- return err
- }
- }
- return toDisk()
- }
- // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
- // (if non-nil)
- func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
- vlog.filesLock.RLock()
- defer vlog.filesLock.RUnlock()
- ret, ok := vlog.filesMap[vp.Fid]
- if !ok {
- // log file has gone away, we can't do anything. Return.
- return nil, errors.Errorf("file with ID: %d not found", vp.Fid)
- }
- // Check for valid offset if we are reading from writable log.
- maxFid := vlog.maxFid
- // In read-only mode we don't need to check for writable offset as we are not writing anything.
- // Moreover, this offset is not set in readonly mode.
- if !vlog.opt.ReadOnly && vp.Fid == maxFid {
- currentOffset := vlog.woffset()
- if vp.Offset >= currentOffset {
- return nil, errors.Errorf(
- "Invalid value pointer offset: %d greater than current offset: %d",
- vp.Offset, currentOffset)
- }
- }
- ret.lock.RLock()
- return ret, nil
- }
- // Read reads the value log at a given location.
- // TODO: Make this read private.
- func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
- buf, lf, err := vlog.readValueBytes(vp)
- // log file is locked so, decide whether to lock immediately or let the caller to
- // unlock it, after caller uses it.
- cb := vlog.getUnlockCallback(lf)
- if err != nil {
- return nil, cb, err
- }
- if vlog.opt.VerifyValueChecksum {
- hash := crc32.New(y.CastagnoliCrcTable)
- if _, err := hash.Write(buf[:len(buf)-crc32.Size]); err != nil {
- runCallback(cb)
- return nil, nil, y.Wrapf(err, "failed to write hash for vp %+v", vp)
- }
- // Fetch checksum from the end of the buffer.
- checksum := buf[len(buf)-crc32.Size:]
- if hash.Sum32() != y.BytesToU32(checksum) {
- runCallback(cb)
- return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", vp)
- }
- }
- var h header
- headerLen := h.Decode(buf)
- kv := buf[headerLen:]
- if lf.encryptionEnabled() {
- kv, err = lf.decryptKV(kv, vp.Offset)
- if err != nil {
- return nil, cb, err
- }
- }
- if uint32(len(kv)) < h.klen+h.vlen {
- vlog.db.opt.Errorf("Invalid read: vp: %+v", vp)
- return nil, nil, errors.Errorf("Invalid read: Len: %d read at:[%d:%d]",
- len(kv), h.klen, h.klen+h.vlen)
- }
- return kv[h.klen : h.klen+h.vlen], cb, nil
- }
- // getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
- // otherwise, it unlock the logfile and return nil.
- func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
- if lf == nil {
- return nil
- }
- return lf.lock.RUnlock
- }
- // readValueBytes return vlog entry slice and read locked log file. Caller should take care of
- // logFile unlocking.
- func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
- lf, err := vlog.getFileRLocked(vp)
- if err != nil {
- return nil, nil, err
- }
- buf, err := lf.read(vp)
- y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
- y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
- return buf, lf, err
- }
- func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
- vlog.filesLock.RLock()
- defer vlog.filesLock.RUnlock()
- LOOP:
- // Pick a candidate that contains the largest amount of discardable data
- fid, discard := vlog.discardStats.MaxDiscard()
- // MaxDiscard will return fid=0 if it doesn't have any discard data. The
- // vlog files start from 1.
- if fid == 0 {
- vlog.opt.Debugf("No file with discard stats")
- return nil
- }
- lf, ok := vlog.filesMap[fid]
- // This file was deleted but it's discard stats increased because of compactions. The file
- // doesn't exist so we don't need to do anything. Skip it and retry.
- if !ok {
- vlog.discardStats.Update(fid, -1)
- goto LOOP
- }
- // We have a valid file.
- fi, err := lf.Fd.Stat()
- if err != nil {
- vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
- return nil
- }
- if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
- vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
- discard, thr, fi.Name())
- return nil
- }
- if fid < vlog.maxFid {
- vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
- lf, ok := vlog.filesMap[fid]
- y.AssertTrue(ok)
- return lf
- }
- // Don't randomly pick any value log file.
- return nil
- }
- func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
- if vs.Version != y.ParseTs(e.Key) {
- // Version not found. Discard.
- return true
- }
- if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
- return true
- }
- if (vs.Meta & bitValuePointer) == 0 {
- // Key also stores the value in LSM. Discard.
- return true
- }
- if (vs.Meta & bitFinTxn) > 0 {
- // Just a txn finish entry. Discard.
- return true
- }
- return false
- }
- func (vlog *valueLog) doRunGC(lf *logFile) error {
- _, span := otel.Tracer("").Start(context.TODO(), "Badger.GC")
- span.SetAttributes(attribute.String("GC rewrite for", lf.path))
- defer span.End()
- if err := vlog.rewrite(lf); err != nil {
- return err
- }
- // Remove the file from discardStats.
- vlog.discardStats.Update(lf.fid, -1)
- return nil
- }
- func (vlog *valueLog) waitOnGC(lc *z.Closer) {
- defer lc.Done()
- <-lc.HasBeenClosed() // Wait for lc to be closed.
- // Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
- // the channel of size 1.
- vlog.garbageCh <- struct{}{}
- }
- func (vlog *valueLog) runGC(discardRatio float64) error {
- select {
- case vlog.garbageCh <- struct{}{}:
- // Pick a log file for GC.
- defer func() {
- <-vlog.garbageCh
- }()
- lf := vlog.pickLog(discardRatio)
- if lf == nil {
- return ErrNoRewrite
- }
- return vlog.doRunGC(lf)
- default:
- return ErrRejected
- }
- }
- func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
- if vlog.opt.InMemory {
- return
- }
- for fid, discard := range stats {
- vlog.discardStats.Update(fid, discard)
- }
- // The following is to coordinate with some test cases where we want to
- // verify that at least one iteration of updateDiscardStats has been completed.
- vlog.db.logToSyncChan(updateDiscardStatsMsg)
- }
- type vlogThreshold struct {
- logger Logger
- percentile float64
- valueThreshold atomic.Int64
- valueCh chan []int64
- clearCh chan bool
- closer *z.Closer
- // Metrics contains a running log of statistics like amount of data stored etc.
- vlMetrics *z.HistogramData
- }
- func initVlogThreshold(opt *Options) *vlogThreshold {
- getBounds := func() []float64 {
- mxbd := opt.maxValueThreshold
- mnbd := float64(opt.ValueThreshold)
- y.AssertTruef(mxbd >= mnbd, "maximum threshold bound is less than the min threshold")
- size := math.Min(mxbd-mnbd+1, 1024.0)
- bdstp := (mxbd - mnbd) / size
- bounds := make([]float64, int64(size))
- for i := range bounds {
- if i == 0 {
- bounds[0] = mnbd
- continue
- }
- if i == int(size-1) {
- bounds[i] = mxbd
- continue
- }
- bounds[i] = bounds[i-1] + bdstp
- }
- return bounds
- }
- lt := &vlogThreshold{
- logger: opt.Logger,
- percentile: opt.VLogPercentile,
- valueCh: make(chan []int64, 1000),
- clearCh: make(chan bool, 1),
- closer: z.NewCloser(1),
- vlMetrics: z.NewHistogramData(getBounds()),
- }
- lt.valueThreshold.Store(opt.ValueThreshold)
- return lt
- }
- func (v *vlogThreshold) Clear(opt Options) {
- v.valueThreshold.Store(opt.ValueThreshold)
- v.clearCh <- true
- }
- func (v *vlogThreshold) update(sizes []int64) {
- v.valueCh <- sizes
- }
- func (v *vlogThreshold) close() {
- v.closer.SignalAndWait()
- }
- func (v *vlogThreshold) listenForValueThresholdUpdate() {
- defer v.closer.Done()
- for {
- select {
- case <-v.closer.HasBeenClosed():
- return
- case val := <-v.valueCh:
- for _, e := range val {
- v.vlMetrics.Update(e)
- }
- // we are making it to get Options.VlogPercentile so that values with sizes
- // in range of Options.VlogPercentile will make it to the LSM tree and rest to the
- // value log file.
- p := int64(v.vlMetrics.Percentile(v.percentile))
- if v.valueThreshold.Load() != p {
- if v.logger != nil {
- v.logger.Infof("updating value of threshold to: %d", p)
- }
- v.valueThreshold.Store(p)
- }
- case <-v.clearCh:
- v.vlMetrics.Clear()
- }
- }
- }
|