value.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. stderrors "errors"
  10. "fmt"
  11. "hash"
  12. "hash/crc32"
  13. "io"
  14. "math"
  15. "os"
  16. "sort"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "github.com/pkg/errors"
  22. "go.opentelemetry.io/otel"
  23. "go.opentelemetry.io/otel/attribute"
  24. "github.com/dgraph-io/badger/v4/y"
  25. "github.com/dgraph-io/ristretto/v2/z"
  26. )
  27. // maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
  28. // uint32, so limiting at max uint32.
  29. var maxVlogFileSize uint32 = math.MaxUint32
  30. // Values have their first byte being byteData or byteDelete. This helps us distinguish between
  31. // a key that has never been seen and a key that has been explicitly deleted.
  32. const (
  33. bitDelete byte = 1 << 0 // Set if the key has been deleted.
  34. bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
  35. bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
  36. // Set if item shouldn't be discarded via compactions (used by merge operator)
  37. bitMergeEntry byte = 1 << 3
  38. // The MSB 2 bits are for transactions.
  39. bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
  40. bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
  41. mi int64 = 1 << 20 //nolint:unused
  42. // size of vlog header.
  43. // +----------------+------------------+
  44. // | keyID(8 bytes) | baseIV(12 bytes)|
  45. // +----------------+------------------+
  46. vlogHeaderSize = 20
  47. )
  48. var errStop = stderrors.New("Stop iteration")
  49. var errTruncate = stderrors.New("Do truncate")
  50. type logEntry func(e Entry, vp valuePointer) error
  51. type safeRead struct {
  52. k []byte
  53. v []byte
  54. recordOffset uint32
  55. lf *logFile
  56. }
  57. // hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
  58. // bytes read. The hashReader writes to h (hash) what it reads from r.
  59. type hashReader struct {
  60. r io.Reader
  61. h hash.Hash32
  62. bytesRead int // Number of bytes read.
  63. }
  64. func newHashReader(r io.Reader) *hashReader {
  65. hash := crc32.New(y.CastagnoliCrcTable)
  66. return &hashReader{
  67. r: r,
  68. h: hash,
  69. }
  70. }
  71. // Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
  72. func (t *hashReader) Read(p []byte) (int, error) {
  73. n, err := t.r.Read(p)
  74. if err != nil {
  75. return n, err
  76. }
  77. t.bytesRead += n
  78. return t.h.Write(p[:n])
  79. }
  80. // ReadByte reads exactly one byte from the reader. Returns error on failure.
  81. func (t *hashReader) ReadByte() (byte, error) {
  82. b := make([]byte, 1)
  83. _, err := t.Read(b)
  84. return b[0], err
  85. }
  86. // Sum32 returns the sum32 of the underlying hash.
  87. func (t *hashReader) Sum32() uint32 {
  88. return t.h.Sum32()
  89. }
  90. // Entry reads an entry from the provided reader. It also validates the checksum for every entry
  91. // read. Returns error on failure.
  92. func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
  93. tee := newHashReader(reader)
  94. var h header
  95. hlen, err := h.DecodeFrom(tee)
  96. if err != nil {
  97. return nil, err
  98. }
  99. if h.klen > uint32(1<<16) { // Key length must be below uint16.
  100. return nil, errTruncate
  101. }
  102. kl := int(h.klen)
  103. if cap(r.k) < kl {
  104. r.k = make([]byte, 2*kl)
  105. }
  106. vl := int(h.vlen)
  107. if cap(r.v) < vl {
  108. r.v = make([]byte, 2*vl)
  109. }
  110. e := &Entry{}
  111. e.offset = r.recordOffset
  112. e.hlen = hlen
  113. buf := make([]byte, h.klen+h.vlen)
  114. if _, err := io.ReadFull(tee, buf[:]); err != nil {
  115. if err == io.EOF {
  116. err = errTruncate
  117. }
  118. return nil, err
  119. }
  120. if r.lf.encryptionEnabled() {
  121. if buf, err = r.lf.decryptKV(buf[:], r.recordOffset); err != nil {
  122. return nil, err
  123. }
  124. }
  125. e.Key = buf[:h.klen]
  126. e.Value = buf[h.klen:]
  127. var crcBuf [crc32.Size]byte
  128. if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
  129. if err == io.EOF {
  130. err = errTruncate
  131. }
  132. return nil, err
  133. }
  134. crc := y.BytesToU32(crcBuf[:])
  135. if crc != tee.Sum32() {
  136. return nil, errTruncate
  137. }
  138. e.meta = h.meta
  139. e.UserMeta = h.userMeta
  140. e.ExpiresAt = h.expiresAt
  141. return e, nil
  142. }
  143. func (vlog *valueLog) rewrite(f *logFile) error {
  144. vlog.filesLock.RLock()
  145. for _, fid := range vlog.filesToBeDeleted {
  146. if fid == f.fid {
  147. vlog.filesLock.RUnlock()
  148. return errors.Errorf("value log file already marked for deletion fid: %d", fid)
  149. }
  150. }
  151. maxFid := vlog.maxFid
  152. y.AssertTruef(f.fid < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
  153. vlog.filesLock.RUnlock()
  154. vlog.opt.Infof("Rewriting fid: %d", f.fid)
  155. wb := make([]*Entry, 0, 1000)
  156. var size int64
  157. y.AssertTrue(vlog.db != nil)
  158. var count, moved int
  159. fe := func(e Entry) error {
  160. count++
  161. if count%100000 == 0 {
  162. vlog.opt.Debugf("Processing entry %d", count)
  163. }
  164. vs, err := vlog.db.get(e.Key)
  165. if err != nil {
  166. return err
  167. }
  168. if discardEntry(e, vs, vlog.db) {
  169. return nil
  170. }
  171. // Value is still present in value log.
  172. if len(vs.Value) == 0 {
  173. return errors.Errorf("Empty value: %+v", vs)
  174. }
  175. var vp valuePointer
  176. vp.Decode(vs.Value)
  177. // If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
  178. if vp.Fid > f.fid {
  179. return nil
  180. }
  181. // If the entry found from the LSM Tree points to an offset greater than the one
  182. // read from vlog, don't do anything.
  183. if vp.Offset > e.offset {
  184. return nil
  185. }
  186. // If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
  187. // insert them back into the DB.
  188. // NOTE: It might be possible that the entry read from the LSM Tree points to
  189. // an older vlog file. See the comments in the else part.
  190. if vp.Fid == f.fid && vp.Offset == e.offset {
  191. moved++
  192. // This new entry only contains the key, and a pointer to the value.
  193. ne := new(Entry)
  194. // Remove only the bitValuePointer and transaction markers. We
  195. // should keep the other bits.
  196. ne.meta = e.meta &^ (bitValuePointer | bitTxn | bitFinTxn)
  197. ne.UserMeta = e.UserMeta
  198. ne.ExpiresAt = e.ExpiresAt
  199. ne.Key = append([]byte{}, e.Key...)
  200. ne.Value = append([]byte{}, e.Value...)
  201. es := ne.estimateSizeAndSetThreshold(vlog.db.valueThreshold())
  202. // Consider size of value as well while considering the total size
  203. // of the batch. There have been reports of high memory usage in
  204. // rewrite because we don't consider the value size. See #1292.
  205. es += int64(len(e.Value))
  206. // Ensure length and size of wb is within transaction limits.
  207. if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
  208. size+es >= vlog.opt.maxBatchSize {
  209. if err := vlog.db.batchSet(wb); err != nil {
  210. return err
  211. }
  212. size = 0
  213. wb = wb[:0]
  214. }
  215. wb = append(wb, ne)
  216. size += es
  217. } else { //nolint:staticcheck
  218. // It might be possible that the entry read from LSM Tree points to
  219. // an older vlog file. This can happen in the following situation.
  220. // Assume DB is opened with
  221. // numberOfVersionsToKeep=1
  222. //
  223. // Now, if we have ONLY one key in the system "FOO" which has been
  224. // updated 3 times and the same key has been garbage collected 3
  225. // times, we'll have 3 versions of the movekey
  226. // for the same key "FOO".
  227. //
  228. // NOTE: moveKeyi is the gc'ed version of the original key with version i
  229. // We're calling the gc'ed keys as moveKey to simplify the
  230. // explanantion. We used to add move keys but we no longer do that.
  231. //
  232. // Assume we have 3 move keys in L0.
  233. // - moveKey1 (points to vlog file 10),
  234. // - moveKey2 (points to vlog file 14) and
  235. // - moveKey3 (points to vlog file 15).
  236. //
  237. // Also, assume there is another move key "moveKey1" (points to
  238. // vlog file 6) (this is also a move Key for key "FOO" ) on upper
  239. // levels (let's say 3). The move key "moveKey1" on level 0 was
  240. // inserted because vlog file 6 was GCed.
  241. //
  242. // Here's what the arrangement looks like
  243. // L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
  244. // L1 => ....
  245. // L2 => ....
  246. // L3 => (moveKey1 => vlog6)
  247. //
  248. // When L0 compaction runs, it keeps only moveKey3 because the number of versions
  249. // to keep is set to 1. (we've dropped moveKey1's latest version)
  250. //
  251. // The new arrangement of keys is
  252. // L0 => ....
  253. // L1 => (moveKey3 => vlog15)
  254. // L2 => ....
  255. // L3 => (moveKey1 => vlog6)
  256. //
  257. // Now if we try to GC vlog file 10, the entry read from vlog file
  258. // will point to vlog10 but the entry read from LSM Tree will point
  259. // to vlog6. The move key read from LSM tree will point to vlog6
  260. // because we've asked for version 1 of the move key.
  261. //
  262. // This might seem like an issue but it's not really an issue
  263. // because the user has set the number of versions to keep to 1 and
  264. // the latest version of moveKey points to the correct vlog file
  265. // and offset. The stale move key on L3 will be eventually dropped
  266. // by compaction because there is a newer versions in the upper
  267. // levels.
  268. }
  269. return nil
  270. }
  271. _, err := f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
  272. return fe(e)
  273. })
  274. if err != nil {
  275. return err
  276. }
  277. batchSize := 1024
  278. var loops int
  279. for i := 0; i < len(wb); {
  280. loops++
  281. if batchSize == 0 {
  282. vlog.db.opt.Warningf("We shouldn't reach batch size of zero.")
  283. return ErrNoRewrite
  284. }
  285. end := i + batchSize
  286. if end > len(wb) {
  287. end = len(wb)
  288. }
  289. if err := vlog.db.batchSet(wb[i:end]); err != nil {
  290. if err == ErrTxnTooBig {
  291. // Decrease the batch size to half.
  292. batchSize = batchSize / 2
  293. continue
  294. }
  295. return err
  296. }
  297. i += batchSize
  298. }
  299. vlog.opt.Infof("Processed %d entries in %d loops", len(wb), loops)
  300. vlog.opt.Infof("Total entries: %d. Moved: %d", count, moved)
  301. vlog.opt.Infof("Removing fid: %d", f.fid)
  302. var deleteFileNow bool
  303. // Entries written to LSM. Remove the older file now.
  304. {
  305. vlog.filesLock.Lock()
  306. // Just a sanity-check.
  307. if _, ok := vlog.filesMap[f.fid]; !ok {
  308. vlog.filesLock.Unlock()
  309. return errors.Errorf("Unable to find fid: %d", f.fid)
  310. }
  311. if vlog.iteratorCount() == 0 {
  312. delete(vlog.filesMap, f.fid)
  313. deleteFileNow = true
  314. } else {
  315. vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
  316. }
  317. vlog.filesLock.Unlock()
  318. }
  319. if deleteFileNow {
  320. if err := vlog.deleteLogFile(f); err != nil {
  321. return err
  322. }
  323. }
  324. return nil
  325. }
  326. func (vlog *valueLog) incrIteratorCount() {
  327. vlog.numActiveIterators.Add(1)
  328. }
  329. func (vlog *valueLog) iteratorCount() int {
  330. return int(vlog.numActiveIterators.Load())
  331. }
  332. func (vlog *valueLog) decrIteratorCount() error {
  333. num := vlog.numActiveIterators.Add(-1)
  334. if num != 0 {
  335. return nil
  336. }
  337. vlog.filesLock.Lock()
  338. lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
  339. for _, id := range vlog.filesToBeDeleted {
  340. lfs = append(lfs, vlog.filesMap[id])
  341. delete(vlog.filesMap, id)
  342. }
  343. vlog.filesToBeDeleted = nil
  344. vlog.filesLock.Unlock()
  345. for _, lf := range lfs {
  346. if err := vlog.deleteLogFile(lf); err != nil {
  347. return err
  348. }
  349. }
  350. return nil
  351. }
  352. func (vlog *valueLog) deleteLogFile(lf *logFile) error {
  353. if lf == nil {
  354. return nil
  355. }
  356. lf.lock.Lock()
  357. defer lf.lock.Unlock()
  358. // Delete fid from discard stats as well.
  359. vlog.discardStats.Update(lf.fid, -1)
  360. return lf.Delete()
  361. }
  362. func (vlog *valueLog) dropAll() (int, error) {
  363. // If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
  364. if vlog.db.opt.InMemory {
  365. return 0, nil
  366. }
  367. // We don't want to block dropAll on any pending transactions. So, don't worry about iterator
  368. // count.
  369. var count int
  370. deleteAll := func() error {
  371. vlog.filesLock.Lock()
  372. defer vlog.filesLock.Unlock()
  373. for _, lf := range vlog.filesMap {
  374. if err := vlog.deleteLogFile(lf); err != nil {
  375. return err
  376. }
  377. count++
  378. }
  379. vlog.filesMap = make(map[uint32]*logFile)
  380. vlog.maxFid = 0
  381. return nil
  382. }
  383. if err := deleteAll(); err != nil {
  384. return count, err
  385. }
  386. vlog.db.opt.Infof("Value logs deleted. Creating value log file: 1")
  387. if _, err := vlog.createVlogFile(); err != nil { // Called while writes are stopped.
  388. return count, err
  389. }
  390. return count, nil
  391. }
  392. func (db *DB) valueThreshold() int64 {
  393. return db.threshold.valueThreshold.Load()
  394. }
  395. type valueLog struct {
  396. dirPath string
  397. // guards our view of which files exist, which to be deleted, how many active iterators
  398. filesLock sync.RWMutex
  399. filesMap map[uint32]*logFile
  400. maxFid uint32
  401. filesToBeDeleted []uint32
  402. // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
  403. numActiveIterators atomic.Int32
  404. db *DB
  405. writableLogOffset atomic.Uint32 // read by read, written by write
  406. numEntriesWritten uint32
  407. opt Options
  408. garbageCh chan struct{}
  409. discardStats *discardStats
  410. }
  411. func vlogFilePath(dirPath string, fid uint32) string {
  412. return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
  413. }
  414. func (vlog *valueLog) fpath(fid uint32) string {
  415. return vlogFilePath(vlog.dirPath, fid)
  416. }
  417. func (vlog *valueLog) populateFilesMap() error {
  418. vlog.filesMap = make(map[uint32]*logFile)
  419. files, err := os.ReadDir(vlog.dirPath)
  420. if err != nil {
  421. return errFile(err, vlog.dirPath, "Unable to open log dir.")
  422. }
  423. found := make(map[uint64]struct{})
  424. for _, file := range files {
  425. if !strings.HasSuffix(file.Name(), ".vlog") {
  426. continue
  427. }
  428. fsz := len(file.Name())
  429. fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
  430. if err != nil {
  431. return errFile(err, file.Name(), "Unable to parse log id.")
  432. }
  433. if _, ok := found[fid]; ok {
  434. return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
  435. }
  436. found[fid] = struct{}{}
  437. lf := &logFile{
  438. fid: uint32(fid),
  439. path: vlog.fpath(uint32(fid)),
  440. registry: vlog.db.registry,
  441. }
  442. vlog.filesMap[uint32(fid)] = lf
  443. if vlog.maxFid < uint32(fid) {
  444. vlog.maxFid = uint32(fid)
  445. }
  446. }
  447. return nil
  448. }
  449. func (vlog *valueLog) createVlogFile() (*logFile, error) {
  450. fid := vlog.maxFid + 1
  451. path := vlog.fpath(fid)
  452. lf := &logFile{
  453. fid: fid,
  454. path: path,
  455. registry: vlog.db.registry,
  456. writeAt: vlogHeaderSize,
  457. opt: vlog.opt,
  458. }
  459. err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
  460. if err != z.NewFile && err != nil {
  461. return nil, err
  462. }
  463. vlog.filesLock.Lock()
  464. vlog.filesMap[fid] = lf
  465. y.AssertTrue(vlog.maxFid < fid)
  466. vlog.maxFid = fid
  467. // writableLogOffset is only written by write func, by read by Read func.
  468. // To avoid a race condition, all reads and updates to this variable must be
  469. // done via atomics.
  470. vlog.writableLogOffset.Store(vlogHeaderSize)
  471. vlog.numEntriesWritten = 0
  472. vlog.filesLock.Unlock()
  473. return lf, nil
  474. }
  475. func errFile(err error, path string, msg string) error {
  476. return fmt.Errorf("%s. Path=%s. Error=%v", msg, path, err)
  477. }
  478. // init initializes the value log struct. This initialization needs to happen
  479. // before compactions start.
  480. func (vlog *valueLog) init(db *DB) {
  481. vlog.opt = db.opt
  482. vlog.db = db
  483. // We don't need to open any vlog files or collect stats for GC if DB is opened
  484. // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
  485. if vlog.opt.InMemory {
  486. return
  487. }
  488. vlog.dirPath = vlog.opt.ValueDir
  489. vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
  490. lf, err := InitDiscardStats(vlog.opt)
  491. y.Check(err)
  492. vlog.discardStats = lf
  493. // See TestPersistLFDiscardStats for purpose of statement below.
  494. db.logToSyncChan(endVLogInitMsg)
  495. }
  496. func (vlog *valueLog) open(db *DB) error {
  497. // We don't need to open any vlog files or collect stats for GC if DB is opened
  498. // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
  499. if db.opt.InMemory {
  500. return nil
  501. }
  502. if err := vlog.populateFilesMap(); err != nil {
  503. return err
  504. }
  505. // If no files are found, then create a new file.
  506. if len(vlog.filesMap) == 0 {
  507. if vlog.opt.ReadOnly {
  508. return nil
  509. }
  510. _, err := vlog.createVlogFile()
  511. return y.Wrapf(err, "Error while creating log file in valueLog.open")
  512. }
  513. fids := vlog.sortedFids()
  514. for _, fid := range fids {
  515. lf, ok := vlog.filesMap[fid]
  516. y.AssertTrue(ok)
  517. // Just open in RDWR mode. This should not create a new log file.
  518. lf.opt = vlog.opt
  519. if err := lf.open(vlog.fpath(fid), os.O_RDWR,
  520. 2*vlog.opt.ValueLogFileSize); err != nil {
  521. return y.Wrapf(err, "Open existing file: %q", lf.path)
  522. }
  523. // We shouldn't delete the maxFid file.
  524. if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid {
  525. vlog.opt.Infof("Deleting empty file: %s", lf.path)
  526. if err := lf.Delete(); err != nil {
  527. return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
  528. }
  529. delete(vlog.filesMap, fid)
  530. }
  531. }
  532. if vlog.opt.ReadOnly {
  533. return nil
  534. }
  535. // Now we can read the latest value log file, and see if it needs truncation. We could
  536. // technically do this over all the value log files, but that would mean slowing down the value
  537. // log open.
  538. last, ok := vlog.filesMap[vlog.maxFid]
  539. y.AssertTrue(ok)
  540. lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
  541. func(_ Entry, vp valuePointer) error {
  542. return nil
  543. })
  544. if err != nil {
  545. return y.Wrapf(err, "while iterating over: %s", last.path)
  546. }
  547. if err := last.Truncate(int64(lastOff)); err != nil {
  548. return y.Wrapf(err, "while truncating last value log file: %s", last.path)
  549. }
  550. // Don't write to the old log file. Always create a new one.
  551. if _, err := vlog.createVlogFile(); err != nil {
  552. return y.Wrapf(err, "Error while creating log file in valueLog.open")
  553. }
  554. return nil
  555. }
  556. func (vlog *valueLog) Close() error {
  557. if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
  558. return nil
  559. }
  560. vlog.opt.Debugf("Stopping garbage collection of values.")
  561. var err error
  562. for id, lf := range vlog.filesMap {
  563. lf.lock.Lock() // We won’t release the lock.
  564. offset := int64(-1)
  565. if !vlog.opt.ReadOnly && id == vlog.maxFid {
  566. offset = int64(vlog.woffset())
  567. }
  568. if terr := lf.Close(offset); terr != nil && err == nil {
  569. err = terr
  570. }
  571. }
  572. if vlog.discardStats != nil {
  573. vlog.db.captureDiscardStats()
  574. if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
  575. err = terr
  576. }
  577. }
  578. return err
  579. }
  580. // sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
  581. // filesMap.
  582. func (vlog *valueLog) sortedFids() []uint32 {
  583. toBeDeleted := make(map[uint32]struct{})
  584. for _, fid := range vlog.filesToBeDeleted {
  585. toBeDeleted[fid] = struct{}{}
  586. }
  587. ret := make([]uint32, 0, len(vlog.filesMap))
  588. for fid := range vlog.filesMap {
  589. if _, ok := toBeDeleted[fid]; !ok {
  590. ret = append(ret, fid)
  591. }
  592. }
  593. sort.Slice(ret, func(i, j int) bool {
  594. return ret[i] < ret[j]
  595. })
  596. return ret
  597. }
  598. type request struct {
  599. // Input values
  600. Entries []*Entry
  601. // Output values and wait group stuff below
  602. Ptrs []valuePointer
  603. Wg sync.WaitGroup
  604. Err error
  605. ref atomic.Int32
  606. }
  607. func (req *request) reset() {
  608. req.Entries = req.Entries[:0]
  609. req.Ptrs = req.Ptrs[:0]
  610. req.Wg = sync.WaitGroup{}
  611. req.Err = nil
  612. req.ref.Store(0)
  613. }
  614. func (req *request) IncrRef() {
  615. req.ref.Add(1)
  616. }
  617. func (req *request) DecrRef() {
  618. nRef := req.ref.Add(-1)
  619. if nRef > 0 {
  620. return
  621. }
  622. req.Entries = nil
  623. requestPool.Put(req)
  624. }
  625. func (req *request) Wait() error {
  626. req.Wg.Wait()
  627. err := req.Err
  628. req.DecrRef() // DecrRef after writing to DB.
  629. return err
  630. }
  631. type requests []*request
  632. func (reqs requests) DecrRef() {
  633. for _, req := range reqs {
  634. req.DecrRef()
  635. }
  636. }
  637. func (reqs requests) IncrRef() {
  638. for _, req := range reqs {
  639. req.IncrRef()
  640. }
  641. }
  642. // sync function syncs content of latest value log file to disk. Syncing of value log directory is
  643. // not required here as it happens every time a value log file rotation happens(check createVlogFile
  644. // function). During rotation, previous value log file also gets synced to disk. It only syncs file
  645. // if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
  646. // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
  647. func (vlog *valueLog) sync() error {
  648. if vlog.opt.SyncWrites || vlog.opt.InMemory {
  649. return nil
  650. }
  651. vlog.filesLock.RLock()
  652. maxFid := vlog.maxFid
  653. curlf := vlog.filesMap[maxFid]
  654. // Sometimes it is possible that vlog.maxFid has been increased but file creation
  655. // with same id is still in progress and this function is called. In those cases
  656. // entry for the file might not be present in vlog.filesMap.
  657. if curlf == nil {
  658. vlog.filesLock.RUnlock()
  659. return nil
  660. }
  661. curlf.lock.RLock()
  662. vlog.filesLock.RUnlock()
  663. err := curlf.Sync()
  664. curlf.lock.RUnlock()
  665. return err
  666. }
  667. func (vlog *valueLog) woffset() uint32 {
  668. return vlog.writableLogOffset.Load()
  669. }
  670. // validateWrites will check whether the given requests can fit into 4GB vlog file.
  671. // NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
  672. // uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
  673. func (vlog *valueLog) validateWrites(reqs []*request) error {
  674. vlogOffset := uint64(vlog.woffset())
  675. for _, req := range reqs {
  676. // calculate size of the request.
  677. size := estimateRequestSize(req)
  678. estimatedVlogOffset := vlogOffset + size
  679. if estimatedVlogOffset > uint64(maxVlogFileSize) {
  680. return errors.Errorf("Request size offset %d is bigger than maximum offset %d",
  681. estimatedVlogOffset, maxVlogFileSize)
  682. }
  683. if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
  684. // We'll create a new vlog file if the estimated offset is greater or equal to
  685. // max vlog size. So, resetting the vlogOffset.
  686. vlogOffset = 0
  687. continue
  688. }
  689. // Estimated vlog offset will become current vlog offset if the vlog is not rotated.
  690. vlogOffset = estimatedVlogOffset
  691. }
  692. return nil
  693. }
  694. // estimateRequestSize returns the size that needed to be written for the given request.
  695. func estimateRequestSize(req *request) uint64 {
  696. size := uint64(0)
  697. for _, e := range req.Entries {
  698. size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
  699. }
  700. return size
  701. }
  702. // write is thread-unsafe by design and should not be called concurrently.
  703. func (vlog *valueLog) write(reqs []*request) error {
  704. if vlog.db.opt.InMemory {
  705. return nil
  706. }
  707. // Validate writes before writing to vlog. Because, we don't want to partially write and return
  708. // an error.
  709. if err := vlog.validateWrites(reqs); err != nil {
  710. return y.Wrapf(err, "while validating writes")
  711. }
  712. vlog.filesLock.RLock()
  713. maxFid := vlog.maxFid
  714. curlf := vlog.filesMap[maxFid]
  715. vlog.filesLock.RUnlock()
  716. defer func() {
  717. if vlog.opt.SyncWrites {
  718. if err := curlf.Sync(); err != nil {
  719. vlog.opt.Errorf("Error while curlf sync: %v\n", err)
  720. }
  721. }
  722. }()
  723. write := func(buf *bytes.Buffer) error {
  724. if buf.Len() == 0 {
  725. return nil
  726. }
  727. n := uint32(buf.Len())
  728. endOffset := vlog.writableLogOffset.Add(n)
  729. // Increase the file size if we cannot accommodate this entry.
  730. // [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
  731. if int(endOffset) >= len(curlf.Data) {
  732. if err := curlf.Truncate(int64(endOffset)); err != nil {
  733. return err
  734. }
  735. }
  736. start := int(endOffset - n)
  737. y.AssertTrue(copy(curlf.Data[start:], buf.Bytes()) == int(n))
  738. curlf.size.Store(endOffset)
  739. return nil
  740. }
  741. toDisk := func() error {
  742. if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) ||
  743. vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
  744. if err := curlf.doneWriting(vlog.woffset()); err != nil {
  745. return err
  746. }
  747. newlf, err := vlog.createVlogFile()
  748. if err != nil {
  749. return err
  750. }
  751. curlf = newlf
  752. }
  753. return nil
  754. }
  755. buf := new(bytes.Buffer)
  756. for i := range reqs {
  757. b := reqs[i]
  758. b.Ptrs = b.Ptrs[:0]
  759. var written, bytesWritten int
  760. valueSizes := make([]int64, 0, len(b.Entries))
  761. for j := range b.Entries {
  762. buf.Reset()
  763. e := b.Entries[j]
  764. valueSizes = append(valueSizes, int64(len(e.Value)))
  765. if e.skipVlogAndSetThreshold(vlog.db.valueThreshold()) {
  766. b.Ptrs = append(b.Ptrs, valuePointer{})
  767. continue
  768. }
  769. var p valuePointer
  770. p.Fid = curlf.fid
  771. p.Offset = vlog.woffset()
  772. // We should not store transaction marks in the vlog file because it will never have all
  773. // the entries in a transaction. If we store entries with transaction marks then value
  774. // GC will not be able to iterate on the entire vlog file.
  775. // But, we still want the entry to stay intact for the memTable WAL. So, store the meta
  776. // in a temporary variable and reassign it after writing to the value log.
  777. tmpMeta := e.meta
  778. e.meta = e.meta &^ (bitTxn | bitFinTxn)
  779. plen, err := curlf.encodeEntry(buf, e, p.Offset) // Now encode the entry into buffer.
  780. if err != nil {
  781. return err
  782. }
  783. // Restore the meta.
  784. e.meta = tmpMeta
  785. p.Len = uint32(plen)
  786. b.Ptrs = append(b.Ptrs, p)
  787. if err := write(buf); err != nil {
  788. return err
  789. }
  790. written++
  791. bytesWritten += buf.Len()
  792. // No need to flush anything, we write to file directly via mmap.
  793. }
  794. y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
  795. y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
  796. vlog.numEntriesWritten += uint32(written)
  797. vlog.db.threshold.update(valueSizes)
  798. // We write to disk here so that all entries that are part of the same transaction are
  799. // written to the same vlog file.
  800. if err := toDisk(); err != nil {
  801. return err
  802. }
  803. }
  804. return toDisk()
  805. }
  806. // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
  807. // (if non-nil)
  808. func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
  809. vlog.filesLock.RLock()
  810. defer vlog.filesLock.RUnlock()
  811. ret, ok := vlog.filesMap[vp.Fid]
  812. if !ok {
  813. // log file has gone away, we can't do anything. Return.
  814. return nil, errors.Errorf("file with ID: %d not found", vp.Fid)
  815. }
  816. // Check for valid offset if we are reading from writable log.
  817. maxFid := vlog.maxFid
  818. // In read-only mode we don't need to check for writable offset as we are not writing anything.
  819. // Moreover, this offset is not set in readonly mode.
  820. if !vlog.opt.ReadOnly && vp.Fid == maxFid {
  821. currentOffset := vlog.woffset()
  822. if vp.Offset >= currentOffset {
  823. return nil, errors.Errorf(
  824. "Invalid value pointer offset: %d greater than current offset: %d",
  825. vp.Offset, currentOffset)
  826. }
  827. }
  828. ret.lock.RLock()
  829. return ret, nil
  830. }
  831. // Read reads the value log at a given location.
  832. // TODO: Make this read private.
  833. func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
  834. buf, lf, err := vlog.readValueBytes(vp)
  835. // log file is locked so, decide whether to lock immediately or let the caller to
  836. // unlock it, after caller uses it.
  837. cb := vlog.getUnlockCallback(lf)
  838. if err != nil {
  839. return nil, cb, err
  840. }
  841. if vlog.opt.VerifyValueChecksum {
  842. hash := crc32.New(y.CastagnoliCrcTable)
  843. if _, err := hash.Write(buf[:len(buf)-crc32.Size]); err != nil {
  844. runCallback(cb)
  845. return nil, nil, y.Wrapf(err, "failed to write hash for vp %+v", vp)
  846. }
  847. // Fetch checksum from the end of the buffer.
  848. checksum := buf[len(buf)-crc32.Size:]
  849. if hash.Sum32() != y.BytesToU32(checksum) {
  850. runCallback(cb)
  851. return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", vp)
  852. }
  853. }
  854. var h header
  855. headerLen := h.Decode(buf)
  856. kv := buf[headerLen:]
  857. if lf.encryptionEnabled() {
  858. kv, err = lf.decryptKV(kv, vp.Offset)
  859. if err != nil {
  860. return nil, cb, err
  861. }
  862. }
  863. if uint32(len(kv)) < h.klen+h.vlen {
  864. vlog.db.opt.Errorf("Invalid read: vp: %+v", vp)
  865. return nil, nil, errors.Errorf("Invalid read: Len: %d read at:[%d:%d]",
  866. len(kv), h.klen, h.klen+h.vlen)
  867. }
  868. return kv[h.klen : h.klen+h.vlen], cb, nil
  869. }
  870. // getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
  871. // otherwise, it unlock the logfile and return nil.
  872. func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
  873. if lf == nil {
  874. return nil
  875. }
  876. return lf.lock.RUnlock
  877. }
  878. // readValueBytes return vlog entry slice and read locked log file. Caller should take care of
  879. // logFile unlocking.
  880. func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
  881. lf, err := vlog.getFileRLocked(vp)
  882. if err != nil {
  883. return nil, nil, err
  884. }
  885. buf, err := lf.read(vp)
  886. y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
  887. y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
  888. return buf, lf, err
  889. }
  890. func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
  891. vlog.filesLock.RLock()
  892. defer vlog.filesLock.RUnlock()
  893. LOOP:
  894. // Pick a candidate that contains the largest amount of discardable data
  895. fid, discard := vlog.discardStats.MaxDiscard()
  896. // MaxDiscard will return fid=0 if it doesn't have any discard data. The
  897. // vlog files start from 1.
  898. if fid == 0 {
  899. vlog.opt.Debugf("No file with discard stats")
  900. return nil
  901. }
  902. lf, ok := vlog.filesMap[fid]
  903. // This file was deleted but it's discard stats increased because of compactions. The file
  904. // doesn't exist so we don't need to do anything. Skip it and retry.
  905. if !ok {
  906. vlog.discardStats.Update(fid, -1)
  907. goto LOOP
  908. }
  909. // We have a valid file.
  910. fi, err := lf.Fd.Stat()
  911. if err != nil {
  912. vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
  913. return nil
  914. }
  915. if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
  916. vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
  917. discard, thr, fi.Name())
  918. return nil
  919. }
  920. if fid < vlog.maxFid {
  921. vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
  922. lf, ok := vlog.filesMap[fid]
  923. y.AssertTrue(ok)
  924. return lf
  925. }
  926. // Don't randomly pick any value log file.
  927. return nil
  928. }
  929. func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
  930. if vs.Version != y.ParseTs(e.Key) {
  931. // Version not found. Discard.
  932. return true
  933. }
  934. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  935. return true
  936. }
  937. if (vs.Meta & bitValuePointer) == 0 {
  938. // Key also stores the value in LSM. Discard.
  939. return true
  940. }
  941. if (vs.Meta & bitFinTxn) > 0 {
  942. // Just a txn finish entry. Discard.
  943. return true
  944. }
  945. return false
  946. }
  947. func (vlog *valueLog) doRunGC(lf *logFile) error {
  948. _, span := otel.Tracer("").Start(context.TODO(), "Badger.GC")
  949. span.SetAttributes(attribute.String("GC rewrite for", lf.path))
  950. defer span.End()
  951. if err := vlog.rewrite(lf); err != nil {
  952. return err
  953. }
  954. // Remove the file from discardStats.
  955. vlog.discardStats.Update(lf.fid, -1)
  956. return nil
  957. }
  958. func (vlog *valueLog) waitOnGC(lc *z.Closer) {
  959. defer lc.Done()
  960. <-lc.HasBeenClosed() // Wait for lc to be closed.
  961. // Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
  962. // the channel of size 1.
  963. vlog.garbageCh <- struct{}{}
  964. }
  965. func (vlog *valueLog) runGC(discardRatio float64) error {
  966. select {
  967. case vlog.garbageCh <- struct{}{}:
  968. // Pick a log file for GC.
  969. defer func() {
  970. <-vlog.garbageCh
  971. }()
  972. lf := vlog.pickLog(discardRatio)
  973. if lf == nil {
  974. return ErrNoRewrite
  975. }
  976. return vlog.doRunGC(lf)
  977. default:
  978. return ErrRejected
  979. }
  980. }
  981. func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
  982. if vlog.opt.InMemory {
  983. return
  984. }
  985. for fid, discard := range stats {
  986. vlog.discardStats.Update(fid, discard)
  987. }
  988. // The following is to coordinate with some test cases where we want to
  989. // verify that at least one iteration of updateDiscardStats has been completed.
  990. vlog.db.logToSyncChan(updateDiscardStatsMsg)
  991. }
  992. type vlogThreshold struct {
  993. logger Logger
  994. percentile float64
  995. valueThreshold atomic.Int64
  996. valueCh chan []int64
  997. clearCh chan bool
  998. closer *z.Closer
  999. // Metrics contains a running log of statistics like amount of data stored etc.
  1000. vlMetrics *z.HistogramData
  1001. }
  1002. func initVlogThreshold(opt *Options) *vlogThreshold {
  1003. getBounds := func() []float64 {
  1004. mxbd := opt.maxValueThreshold
  1005. mnbd := float64(opt.ValueThreshold)
  1006. y.AssertTruef(mxbd >= mnbd, "maximum threshold bound is less than the min threshold")
  1007. size := math.Min(mxbd-mnbd+1, 1024.0)
  1008. bdstp := (mxbd - mnbd) / size
  1009. bounds := make([]float64, int64(size))
  1010. for i := range bounds {
  1011. if i == 0 {
  1012. bounds[0] = mnbd
  1013. continue
  1014. }
  1015. if i == int(size-1) {
  1016. bounds[i] = mxbd
  1017. continue
  1018. }
  1019. bounds[i] = bounds[i-1] + bdstp
  1020. }
  1021. return bounds
  1022. }
  1023. lt := &vlogThreshold{
  1024. logger: opt.Logger,
  1025. percentile: opt.VLogPercentile,
  1026. valueCh: make(chan []int64, 1000),
  1027. clearCh: make(chan bool, 1),
  1028. closer: z.NewCloser(1),
  1029. vlMetrics: z.NewHistogramData(getBounds()),
  1030. }
  1031. lt.valueThreshold.Store(opt.ValueThreshold)
  1032. return lt
  1033. }
  1034. func (v *vlogThreshold) Clear(opt Options) {
  1035. v.valueThreshold.Store(opt.ValueThreshold)
  1036. v.clearCh <- true
  1037. }
  1038. func (v *vlogThreshold) update(sizes []int64) {
  1039. v.valueCh <- sizes
  1040. }
  1041. func (v *vlogThreshold) close() {
  1042. v.closer.SignalAndWait()
  1043. }
  1044. func (v *vlogThreshold) listenForValueThresholdUpdate() {
  1045. defer v.closer.Done()
  1046. for {
  1047. select {
  1048. case <-v.closer.HasBeenClosed():
  1049. return
  1050. case val := <-v.valueCh:
  1051. for _, e := range val {
  1052. v.vlMetrics.Update(e)
  1053. }
  1054. // we are making it to get Options.VlogPercentile so that values with sizes
  1055. // in range of Options.VlogPercentile will make it to the LSM tree and rest to the
  1056. // value log file.
  1057. p := int64(v.vlMetrics.Percentile(v.percentile))
  1058. if v.valueThreshold.Load() != p {
  1059. if v.logger != nil {
  1060. v.logger.Infof("updating value of threshold to: %d", p)
  1061. }
  1062. v.valueThreshold.Store(p)
  1063. }
  1064. case <-v.clearCh:
  1065. v.vlMetrics.Clear()
  1066. }
  1067. }
  1068. }