value.go 32 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bytes"
  8. "context"
  9. "errors"
  10. "fmt"
  11. "hash"
  12. "hash/crc32"
  13. "io"
  14. "math"
  15. "os"
  16. "sort"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "sync/atomic"
  21. "go.opentelemetry.io/otel"
  22. "go.opentelemetry.io/otel/attribute"
  23. "github.com/dgraph-io/badger/v4/y"
  24. "github.com/dgraph-io/ristretto/v2/z"
  25. )
  26. // maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
  27. // uint32, so limiting at max uint32.
  28. var maxVlogFileSize uint32 = math.MaxUint32
  29. // Values have their first byte being byteData or byteDelete. This helps us distinguish between
  30. // a key that has never been seen and a key that has been explicitly deleted.
  31. const (
  32. bitDelete byte = 1 << 0 // Set if the key has been deleted.
  33. bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
  34. bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
  35. // Set if item shouldn't be discarded via compactions (used by merge operator)
  36. bitMergeEntry byte = 1 << 3
  37. // The MSB 2 bits are for transactions.
  38. bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
  39. bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
  40. mi int64 = 1 << 20 //nolint:unused
  41. // size of vlog header.
  42. // +----------------+------------------+
  43. // | keyID(8 bytes) | baseIV(12 bytes)|
  44. // +----------------+------------------+
  45. vlogHeaderSize = 20
  46. )
  47. var errStop = errors.New("Stop iteration")
  48. var errTruncate = errors.New("Do truncate")
  49. type logEntry func(e Entry, vp valuePointer) error
  50. type safeRead struct {
  51. k []byte
  52. v []byte
  53. recordOffset uint32
  54. lf *logFile
  55. }
  56. // hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
  57. // bytes read. The hashReader writes to h (hash) what it reads from r.
  58. type hashReader struct {
  59. r io.Reader
  60. h hash.Hash32
  61. bytesRead int // Number of bytes read.
  62. }
  63. func newHashReader(r io.Reader) *hashReader {
  64. hash := crc32.New(y.CastagnoliCrcTable)
  65. return &hashReader{
  66. r: r,
  67. h: hash,
  68. }
  69. }
  70. // Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
  71. func (t *hashReader) Read(p []byte) (int, error) {
  72. n, err := t.r.Read(p)
  73. if err != nil {
  74. return n, err
  75. }
  76. t.bytesRead += n
  77. return t.h.Write(p[:n])
  78. }
  79. // ReadByte reads exactly one byte from the reader. Returns error on failure.
  80. func (t *hashReader) ReadByte() (byte, error) {
  81. b := make([]byte, 1)
  82. _, err := t.Read(b)
  83. return b[0], err
  84. }
  85. // Sum32 returns the sum32 of the underlying hash.
  86. func (t *hashReader) Sum32() uint32 {
  87. return t.h.Sum32()
  88. }
  89. // Entry reads an entry from the provided reader. It also validates the checksum for every entry
  90. // read. Returns error on failure.
  91. func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
  92. tee := newHashReader(reader)
  93. var h header
  94. hlen, err := h.DecodeFrom(tee)
  95. if err != nil {
  96. return nil, err
  97. }
  98. if h.klen > uint32(1<<16) { // Key length must be below uint16.
  99. return nil, errTruncate
  100. }
  101. kl := int(h.klen)
  102. if cap(r.k) < kl {
  103. r.k = make([]byte, 2*kl)
  104. }
  105. vl := int(h.vlen)
  106. if cap(r.v) < vl {
  107. r.v = make([]byte, 2*vl)
  108. }
  109. e := &Entry{}
  110. e.offset = r.recordOffset
  111. e.hlen = hlen
  112. buf := make([]byte, h.klen+h.vlen)
  113. if _, err := io.ReadFull(tee, buf[:]); err != nil {
  114. if err == io.EOF {
  115. err = errTruncate
  116. }
  117. return nil, err
  118. }
  119. if r.lf.encryptionEnabled() {
  120. if buf, err = r.lf.decryptKV(buf[:], r.recordOffset); err != nil {
  121. return nil, err
  122. }
  123. }
  124. e.Key = buf[:h.klen]
  125. e.Value = buf[h.klen:]
  126. var crcBuf [crc32.Size]byte
  127. if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
  128. if err == io.EOF {
  129. err = errTruncate
  130. }
  131. return nil, err
  132. }
  133. crc := y.BytesToU32(crcBuf[:])
  134. if crc != tee.Sum32() {
  135. return nil, errTruncate
  136. }
  137. e.meta = h.meta
  138. e.UserMeta = h.userMeta
  139. e.ExpiresAt = h.expiresAt
  140. return e, nil
  141. }
  142. func (vlog *valueLog) rewrite(f *logFile) error {
  143. vlog.filesLock.RLock()
  144. for _, fid := range vlog.filesToBeDeleted {
  145. if fid == f.fid {
  146. vlog.filesLock.RUnlock()
  147. return fmt.Errorf("value log file already marked for deletion fid: %d", fid)
  148. }
  149. }
  150. maxFid := vlog.maxFid
  151. y.AssertTruef(f.fid < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
  152. vlog.filesLock.RUnlock()
  153. vlog.opt.Infof("Rewriting fid: %d", f.fid)
  154. wb := make([]*Entry, 0, 1000)
  155. var size int64
  156. y.AssertTrue(vlog.db != nil)
  157. var count, moved int
  158. fe := func(e Entry) error {
  159. count++
  160. if count%100000 == 0 {
  161. vlog.opt.Debugf("Processing entry %d", count)
  162. }
  163. vs, err := vlog.db.get(e.Key)
  164. if err != nil {
  165. return err
  166. }
  167. if discardEntry(e, vs, vlog.db) {
  168. return nil
  169. }
  170. // Value is still present in value log.
  171. if len(vs.Value) == 0 {
  172. return fmt.Errorf("Empty value: %+v", vs)
  173. }
  174. var vp valuePointer
  175. vp.Decode(vs.Value)
  176. // If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
  177. if vp.Fid > f.fid {
  178. return nil
  179. }
  180. // If the entry found from the LSM Tree points to an offset greater than the one
  181. // read from vlog, don't do anything.
  182. if vp.Offset > e.offset {
  183. return nil
  184. }
  185. // If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
  186. // insert them back into the DB.
  187. // NOTE: It might be possible that the entry read from the LSM Tree points to
  188. // an older vlog file. See the comments in the else part.
  189. if vp.Fid == f.fid && vp.Offset == e.offset {
  190. moved++
  191. // This new entry only contains the key, and a pointer to the value.
  192. ne := new(Entry)
  193. // Remove only the bitValuePointer and transaction markers. We
  194. // should keep the other bits.
  195. ne.meta = e.meta &^ (bitValuePointer | bitTxn | bitFinTxn)
  196. ne.UserMeta = e.UserMeta
  197. ne.ExpiresAt = e.ExpiresAt
  198. ne.Key = append([]byte{}, e.Key...)
  199. ne.Value = append([]byte{}, e.Value...)
  200. es := ne.estimateSizeAndSetThreshold(vlog.db.valueThreshold())
  201. // Consider size of value as well while considering the total size
  202. // of the batch. There have been reports of high memory usage in
  203. // rewrite because we don't consider the value size. See #1292.
  204. es += int64(len(e.Value))
  205. // Ensure length and size of wb is within transaction limits.
  206. if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
  207. size+es >= vlog.opt.maxBatchSize {
  208. if err := vlog.db.batchSet(wb); err != nil {
  209. return err
  210. }
  211. size = 0
  212. wb = wb[:0]
  213. }
  214. wb = append(wb, ne)
  215. size += es
  216. } else { //nolint:staticcheck
  217. // It might be possible that the entry read from LSM Tree points to
  218. // an older vlog file. This can happen in the following situation.
  219. // Assume DB is opened with
  220. // numberOfVersionsToKeep=1
  221. //
  222. // Now, if we have ONLY one key in the system "FOO" which has been
  223. // updated 3 times and the same key has been garbage collected 3
  224. // times, we'll have 3 versions of the movekey
  225. // for the same key "FOO".
  226. //
  227. // NOTE: moveKeyi is the gc'ed version of the original key with version i
  228. // We're calling the gc'ed keys as moveKey to simplify the
  229. // explanantion. We used to add move keys but we no longer do that.
  230. //
  231. // Assume we have 3 move keys in L0.
  232. // - moveKey1 (points to vlog file 10),
  233. // - moveKey2 (points to vlog file 14) and
  234. // - moveKey3 (points to vlog file 15).
  235. //
  236. // Also, assume there is another move key "moveKey1" (points to
  237. // vlog file 6) (this is also a move Key for key "FOO" ) on upper
  238. // levels (let's say 3). The move key "moveKey1" on level 0 was
  239. // inserted because vlog file 6 was GCed.
  240. //
  241. // Here's what the arrangement looks like
  242. // L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
  243. // L1 => ....
  244. // L2 => ....
  245. // L3 => (moveKey1 => vlog6)
  246. //
  247. // When L0 compaction runs, it keeps only moveKey3 because the number of versions
  248. // to keep is set to 1. (we've dropped moveKey1's latest version)
  249. //
  250. // The new arrangement of keys is
  251. // L0 => ....
  252. // L1 => (moveKey3 => vlog15)
  253. // L2 => ....
  254. // L3 => (moveKey1 => vlog6)
  255. //
  256. // Now if we try to GC vlog file 10, the entry read from vlog file
  257. // will point to vlog10 but the entry read from LSM Tree will point
  258. // to vlog6. The move key read from LSM tree will point to vlog6
  259. // because we've asked for version 1 of the move key.
  260. //
  261. // This might seem like an issue but it's not really an issue
  262. // because the user has set the number of versions to keep to 1 and
  263. // the latest version of moveKey points to the correct vlog file
  264. // and offset. The stale move key on L3 will be eventually dropped
  265. // by compaction because there is a newer versions in the upper
  266. // levels.
  267. }
  268. return nil
  269. }
  270. _, err := f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
  271. return fe(e)
  272. })
  273. if err != nil {
  274. return err
  275. }
  276. batchSize := 1024
  277. var loops int
  278. for i := 0; i < len(wb); {
  279. loops++
  280. if batchSize == 0 {
  281. vlog.db.opt.Warningf("We shouldn't reach batch size of zero.")
  282. return ErrNoRewrite
  283. }
  284. end := i + batchSize
  285. if end > len(wb) {
  286. end = len(wb)
  287. }
  288. if err := vlog.db.batchSet(wb[i:end]); err != nil {
  289. if err == ErrTxnTooBig {
  290. // Decrease the batch size to half.
  291. batchSize = batchSize / 2
  292. continue
  293. }
  294. return err
  295. }
  296. i += batchSize
  297. }
  298. vlog.opt.Infof("Processed %d entries in %d loops", len(wb), loops)
  299. vlog.opt.Infof("Total entries: %d. Moved: %d", count, moved)
  300. vlog.opt.Infof("Removing fid: %d", f.fid)
  301. var deleteFileNow bool
  302. // Entries written to LSM. Remove the older file now.
  303. {
  304. vlog.filesLock.Lock()
  305. // Just a sanity-check.
  306. if _, ok := vlog.filesMap[f.fid]; !ok {
  307. vlog.filesLock.Unlock()
  308. return fmt.Errorf("Unable to find fid: %d", f.fid)
  309. }
  310. if vlog.iteratorCount() == 0 {
  311. delete(vlog.filesMap, f.fid)
  312. deleteFileNow = true
  313. } else {
  314. vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
  315. }
  316. vlog.filesLock.Unlock()
  317. }
  318. if deleteFileNow {
  319. if err := vlog.deleteLogFile(f); err != nil {
  320. return err
  321. }
  322. }
  323. return nil
  324. }
  325. func (vlog *valueLog) incrIteratorCount() {
  326. vlog.numActiveIterators.Add(1)
  327. }
  328. func (vlog *valueLog) iteratorCount() int {
  329. return int(vlog.numActiveIterators.Load())
  330. }
  331. func (vlog *valueLog) decrIteratorCount() error {
  332. num := vlog.numActiveIterators.Add(-1)
  333. if num != 0 {
  334. return nil
  335. }
  336. vlog.filesLock.Lock()
  337. lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
  338. for _, id := range vlog.filesToBeDeleted {
  339. lfs = append(lfs, vlog.filesMap[id])
  340. delete(vlog.filesMap, id)
  341. }
  342. vlog.filesToBeDeleted = nil
  343. vlog.filesLock.Unlock()
  344. for _, lf := range lfs {
  345. if err := vlog.deleteLogFile(lf); err != nil {
  346. return err
  347. }
  348. }
  349. return nil
  350. }
  351. func (vlog *valueLog) deleteLogFile(lf *logFile) error {
  352. if lf == nil {
  353. return nil
  354. }
  355. lf.lock.Lock()
  356. defer lf.lock.Unlock()
  357. // Delete fid from discard stats as well.
  358. vlog.discardStats.Update(lf.fid, -1)
  359. return lf.Delete()
  360. }
  361. func (vlog *valueLog) dropAll() (int, error) {
  362. // If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
  363. if vlog.db.opt.InMemory {
  364. return 0, nil
  365. }
  366. // We don't want to block dropAll on any pending transactions. So, don't worry about iterator
  367. // count.
  368. var count int
  369. deleteAll := func() error {
  370. vlog.filesLock.Lock()
  371. defer vlog.filesLock.Unlock()
  372. for _, lf := range vlog.filesMap {
  373. if err := vlog.deleteLogFile(lf); err != nil {
  374. return err
  375. }
  376. count++
  377. }
  378. vlog.filesMap = make(map[uint32]*logFile)
  379. vlog.maxFid = 0
  380. return nil
  381. }
  382. if err := deleteAll(); err != nil {
  383. return count, err
  384. }
  385. vlog.db.opt.Infof("Value logs deleted. Creating value log file: 1")
  386. if _, err := vlog.createVlogFile(); err != nil { // Called while writes are stopped.
  387. return count, err
  388. }
  389. return count, nil
  390. }
  391. func (db *DB) valueThreshold() int64 {
  392. return db.threshold.valueThreshold.Load()
  393. }
  394. type valueLog struct {
  395. dirPath string
  396. // guards our view of which files exist, which to be deleted, how many active iterators
  397. filesLock sync.RWMutex
  398. filesMap map[uint32]*logFile
  399. maxFid uint32
  400. filesToBeDeleted []uint32
  401. // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
  402. numActiveIterators atomic.Int32
  403. db *DB
  404. writableLogOffset atomic.Uint32 // read by read, written by write
  405. numEntriesWritten uint32
  406. opt Options
  407. garbageCh chan struct{}
  408. discardStats *discardStats
  409. }
  410. func vlogFilePath(dirPath string, fid uint32) string {
  411. return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
  412. }
  413. func (vlog *valueLog) fpath(fid uint32) string {
  414. return vlogFilePath(vlog.dirPath, fid)
  415. }
  416. func (vlog *valueLog) populateFilesMap() error {
  417. vlog.filesMap = make(map[uint32]*logFile)
  418. files, err := os.ReadDir(vlog.dirPath)
  419. if err != nil {
  420. return errFile(err, vlog.dirPath, "Unable to open log dir.")
  421. }
  422. found := make(map[uint64]struct{})
  423. for _, file := range files {
  424. if !strings.HasSuffix(file.Name(), ".vlog") {
  425. continue
  426. }
  427. fsz := len(file.Name())
  428. fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
  429. if err != nil {
  430. return errFile(err, file.Name(), "Unable to parse log id.")
  431. }
  432. if _, ok := found[fid]; ok {
  433. return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
  434. }
  435. found[fid] = struct{}{}
  436. lf := &logFile{
  437. fid: uint32(fid),
  438. path: vlog.fpath(uint32(fid)),
  439. registry: vlog.db.registry,
  440. }
  441. vlog.filesMap[uint32(fid)] = lf
  442. if vlog.maxFid < uint32(fid) {
  443. vlog.maxFid = uint32(fid)
  444. }
  445. }
  446. return nil
  447. }
  448. func (vlog *valueLog) createVlogFile() (*logFile, error) {
  449. fid := vlog.maxFid + 1
  450. path := vlog.fpath(fid)
  451. lf := &logFile{
  452. fid: fid,
  453. path: path,
  454. registry: vlog.db.registry,
  455. writeAt: vlogHeaderSize,
  456. opt: vlog.opt,
  457. }
  458. err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
  459. if err != z.NewFile && err != nil {
  460. return nil, err
  461. }
  462. vlog.filesLock.Lock()
  463. vlog.filesMap[fid] = lf
  464. y.AssertTrue(vlog.maxFid < fid)
  465. vlog.maxFid = fid
  466. // writableLogOffset is only written by write func, by read by Read func.
  467. // To avoid a race condition, all reads and updates to this variable must be
  468. // done via atomics.
  469. vlog.writableLogOffset.Store(vlogHeaderSize)
  470. vlog.numEntriesWritten = 0
  471. vlog.filesLock.Unlock()
  472. return lf, nil
  473. }
  474. func errFile(err error, path string, msg string) error {
  475. return fmt.Errorf("%s. Path=%s. Error=%v", msg, path, err)
  476. }
  477. // init initializes the value log struct. This initialization needs to happen
  478. // before compactions start.
  479. func (vlog *valueLog) init(db *DB) {
  480. vlog.opt = db.opt
  481. vlog.db = db
  482. // We don't need to open any vlog files or collect stats for GC if DB is opened
  483. // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
  484. if vlog.opt.InMemory {
  485. return
  486. }
  487. vlog.dirPath = vlog.opt.ValueDir
  488. vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
  489. lf, err := InitDiscardStats(vlog.opt)
  490. y.Check(err)
  491. vlog.discardStats = lf
  492. // See TestPersistLFDiscardStats for purpose of statement below.
  493. db.logToSyncChan(endVLogInitMsg)
  494. }
  495. func (vlog *valueLog) open(db *DB) error {
  496. // We don't need to open any vlog files or collect stats for GC if DB is opened
  497. // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
  498. if db.opt.InMemory {
  499. return nil
  500. }
  501. if err := vlog.populateFilesMap(); err != nil {
  502. return err
  503. }
  504. // If no files are found, then create a new file.
  505. if len(vlog.filesMap) == 0 {
  506. if vlog.opt.ReadOnly {
  507. return nil
  508. }
  509. _, err := vlog.createVlogFile()
  510. return y.Wrapf(err, "Error while creating log file in valueLog.open")
  511. }
  512. fids := vlog.sortedFids()
  513. for _, fid := range fids {
  514. lf, ok := vlog.filesMap[fid]
  515. y.AssertTrue(ok)
  516. // Just open in RDWR mode. This should not create a new log file.
  517. lf.opt = vlog.opt
  518. if err := lf.open(vlog.fpath(fid), os.O_RDWR,
  519. 2*vlog.opt.ValueLogFileSize); err != nil {
  520. return y.Wrapf(err, "Open existing file: %q", lf.path)
  521. }
  522. // We shouldn't delete the maxFid file.
  523. if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid {
  524. vlog.opt.Infof("Deleting empty file: %s", lf.path)
  525. if err := lf.Delete(); err != nil {
  526. return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
  527. }
  528. delete(vlog.filesMap, fid)
  529. }
  530. }
  531. if vlog.opt.ReadOnly {
  532. return nil
  533. }
  534. // Now we can read the latest value log file, and see if it needs truncation. We could
  535. // technically do this over all the value log files, but that would mean slowing down the value
  536. // log open.
  537. last, ok := vlog.filesMap[vlog.maxFid]
  538. y.AssertTrue(ok)
  539. lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
  540. func(_ Entry, vp valuePointer) error {
  541. return nil
  542. })
  543. if err != nil {
  544. return y.Wrapf(err, "while iterating over: %s", last.path)
  545. }
  546. if err := last.Truncate(int64(lastOff)); err != nil {
  547. return y.Wrapf(err, "while truncating last value log file: %s", last.path)
  548. }
  549. // Don't write to the old log file. Always create a new one.
  550. if _, err := vlog.createVlogFile(); err != nil {
  551. return y.Wrapf(err, "Error while creating log file in valueLog.open")
  552. }
  553. return nil
  554. }
  555. func (vlog *valueLog) Close() error {
  556. if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
  557. return nil
  558. }
  559. vlog.opt.Debugf("Stopping garbage collection of values.")
  560. var err error
  561. for id, lf := range vlog.filesMap {
  562. lf.lock.Lock() // We won’t release the lock.
  563. offset := int64(-1)
  564. if !vlog.opt.ReadOnly && id == vlog.maxFid {
  565. offset = int64(vlog.woffset())
  566. }
  567. if terr := lf.Close(offset); terr != nil && err == nil {
  568. err = terr
  569. }
  570. }
  571. if vlog.discardStats != nil {
  572. vlog.db.captureDiscardStats()
  573. if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
  574. err = terr
  575. }
  576. }
  577. return err
  578. }
  579. // sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
  580. // filesMap.
  581. func (vlog *valueLog) sortedFids() []uint32 {
  582. toBeDeleted := make(map[uint32]struct{})
  583. for _, fid := range vlog.filesToBeDeleted {
  584. toBeDeleted[fid] = struct{}{}
  585. }
  586. ret := make([]uint32, 0, len(vlog.filesMap))
  587. for fid := range vlog.filesMap {
  588. if _, ok := toBeDeleted[fid]; !ok {
  589. ret = append(ret, fid)
  590. }
  591. }
  592. sort.Slice(ret, func(i, j int) bool {
  593. return ret[i] < ret[j]
  594. })
  595. return ret
  596. }
  597. type request struct {
  598. // Input values
  599. Entries []*Entry
  600. // Output values and wait group stuff below
  601. Ptrs []valuePointer
  602. Wg sync.WaitGroup
  603. Err error
  604. ref atomic.Int32
  605. }
  606. func (req *request) reset() {
  607. req.Entries = req.Entries[:0]
  608. req.Ptrs = req.Ptrs[:0]
  609. req.Wg = sync.WaitGroup{}
  610. req.Err = nil
  611. req.ref.Store(0)
  612. }
  613. func (req *request) IncrRef() {
  614. req.ref.Add(1)
  615. }
  616. func (req *request) DecrRef() {
  617. nRef := req.ref.Add(-1)
  618. if nRef > 0 {
  619. return
  620. }
  621. req.Entries = nil
  622. requestPool.Put(req)
  623. }
  624. func (req *request) Wait() error {
  625. req.Wg.Wait()
  626. err := req.Err
  627. req.DecrRef() // DecrRef after writing to DB.
  628. return err
  629. }
  630. type requests []*request
  631. func (reqs requests) DecrRef() {
  632. for _, req := range reqs {
  633. req.DecrRef()
  634. }
  635. }
  636. func (reqs requests) IncrRef() {
  637. for _, req := range reqs {
  638. req.IncrRef()
  639. }
  640. }
  641. // sync function syncs content of latest value log file to disk. Syncing of value log directory is
  642. // not required here as it happens every time a value log file rotation happens(check createVlogFile
  643. // function). During rotation, previous value log file also gets synced to disk. It only syncs file
  644. // if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
  645. // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
  646. func (vlog *valueLog) sync() error {
  647. if vlog.opt.SyncWrites || vlog.opt.InMemory {
  648. return nil
  649. }
  650. vlog.filesLock.RLock()
  651. maxFid := vlog.maxFid
  652. curlf := vlog.filesMap[maxFid]
  653. // Sometimes it is possible that vlog.maxFid has been increased but file creation
  654. // with same id is still in progress and this function is called. In those cases
  655. // entry for the file might not be present in vlog.filesMap.
  656. if curlf == nil {
  657. vlog.filesLock.RUnlock()
  658. return nil
  659. }
  660. curlf.lock.RLock()
  661. vlog.filesLock.RUnlock()
  662. err := curlf.Sync()
  663. curlf.lock.RUnlock()
  664. return err
  665. }
  666. func (vlog *valueLog) woffset() uint32 {
  667. return vlog.writableLogOffset.Load()
  668. }
  669. // validateWrites will check whether the given requests can fit into 4GB vlog file.
  670. // NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
  671. // uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
  672. func (vlog *valueLog) validateWrites(reqs []*request) error {
  673. vlogOffset := uint64(vlog.woffset())
  674. for _, req := range reqs {
  675. // calculate size of the request.
  676. size := estimateRequestSize(req)
  677. estimatedVlogOffset := vlogOffset + size
  678. if estimatedVlogOffset > uint64(maxVlogFileSize) {
  679. return fmt.Errorf("Request size offset %d is bigger than maximum offset %d",
  680. estimatedVlogOffset, maxVlogFileSize)
  681. }
  682. if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
  683. // We'll create a new vlog file if the estimated offset is greater or equal to
  684. // max vlog size. So, resetting the vlogOffset.
  685. vlogOffset = 0
  686. continue
  687. }
  688. // Estimated vlog offset will become current vlog offset if the vlog is not rotated.
  689. vlogOffset = estimatedVlogOffset
  690. }
  691. return nil
  692. }
  693. // estimateRequestSize returns the size that needed to be written for the given request.
  694. func estimateRequestSize(req *request) uint64 {
  695. size := uint64(0)
  696. for _, e := range req.Entries {
  697. size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
  698. }
  699. return size
  700. }
  701. // write is thread-unsafe by design and should not be called concurrently.
  702. func (vlog *valueLog) write(reqs []*request) error {
  703. if vlog.db.opt.InMemory {
  704. return nil
  705. }
  706. // Validate writes before writing to vlog. Because, we don't want to partially write and return
  707. // an error.
  708. if err := vlog.validateWrites(reqs); err != nil {
  709. return y.Wrapf(err, "while validating writes")
  710. }
  711. vlog.filesLock.RLock()
  712. maxFid := vlog.maxFid
  713. curlf := vlog.filesMap[maxFid]
  714. vlog.filesLock.RUnlock()
  715. defer func() {
  716. if vlog.opt.SyncWrites {
  717. if err := curlf.Sync(); err != nil {
  718. vlog.opt.Errorf("Error while curlf sync: %v\n", err)
  719. }
  720. }
  721. }()
  722. write := func(buf *bytes.Buffer) error {
  723. if buf.Len() == 0 {
  724. return nil
  725. }
  726. n := uint32(buf.Len())
  727. endOffset := vlog.writableLogOffset.Add(n)
  728. // Increase the file size if we cannot accommodate this entry.
  729. // [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
  730. if int(endOffset) >= len(curlf.Data) {
  731. if err := curlf.Truncate(int64(endOffset)); err != nil {
  732. return err
  733. }
  734. }
  735. start := int(endOffset - n)
  736. y.AssertTrue(copy(curlf.Data[start:], buf.Bytes()) == int(n))
  737. curlf.size.Store(endOffset)
  738. return nil
  739. }
  740. toDisk := func() error {
  741. if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) ||
  742. vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
  743. if err := curlf.doneWriting(vlog.woffset()); err != nil {
  744. return err
  745. }
  746. newlf, err := vlog.createVlogFile()
  747. if err != nil {
  748. return err
  749. }
  750. curlf = newlf
  751. }
  752. return nil
  753. }
  754. buf := new(bytes.Buffer)
  755. for i := range reqs {
  756. b := reqs[i]
  757. b.Ptrs = b.Ptrs[:0]
  758. var written, bytesWritten int
  759. valueSizes := make([]int64, 0, len(b.Entries))
  760. for j := range b.Entries {
  761. buf.Reset()
  762. e := b.Entries[j]
  763. valueSizes = append(valueSizes, int64(len(e.Value)))
  764. if e.skipVlogAndSetThreshold(vlog.db.valueThreshold()) {
  765. b.Ptrs = append(b.Ptrs, valuePointer{})
  766. continue
  767. }
  768. var p valuePointer
  769. p.Fid = curlf.fid
  770. p.Offset = vlog.woffset()
  771. // We should not store transaction marks in the vlog file because it will never have all
  772. // the entries in a transaction. If we store entries with transaction marks then value
  773. // GC will not be able to iterate on the entire vlog file.
  774. // But, we still want the entry to stay intact for the memTable WAL. So, store the meta
  775. // in a temporary variable and reassign it after writing to the value log.
  776. tmpMeta := e.meta
  777. e.meta = e.meta &^ (bitTxn | bitFinTxn)
  778. plen, err := curlf.encodeEntry(buf, e, p.Offset) // Now encode the entry into buffer.
  779. if err != nil {
  780. return err
  781. }
  782. // Restore the meta.
  783. e.meta = tmpMeta
  784. p.Len = uint32(plen)
  785. b.Ptrs = append(b.Ptrs, p)
  786. if err := write(buf); err != nil {
  787. return err
  788. }
  789. written++
  790. bytesWritten += buf.Len()
  791. // No need to flush anything, we write to file directly via mmap.
  792. }
  793. y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
  794. y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
  795. vlog.numEntriesWritten += uint32(written)
  796. vlog.db.threshold.update(valueSizes)
  797. // We write to disk here so that all entries that are part of the same transaction are
  798. // written to the same vlog file.
  799. if err := toDisk(); err != nil {
  800. return err
  801. }
  802. }
  803. return toDisk()
  804. }
  805. // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
  806. // (if non-nil)
  807. func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
  808. vlog.filesLock.RLock()
  809. defer vlog.filesLock.RUnlock()
  810. ret, ok := vlog.filesMap[vp.Fid]
  811. if !ok {
  812. // log file has gone away, we can't do anything. Return.
  813. return nil, fmt.Errorf("file with ID: %d not found", vp.Fid)
  814. }
  815. // Check for valid offset if we are reading from writable log.
  816. maxFid := vlog.maxFid
  817. // In read-only mode we don't need to check for writable offset as we are not writing anything.
  818. // Moreover, this offset is not set in readonly mode.
  819. if !vlog.opt.ReadOnly && vp.Fid == maxFid {
  820. currentOffset := vlog.woffset()
  821. if vp.Offset >= currentOffset {
  822. return nil, fmt.Errorf(
  823. "Invalid value pointer offset: %d greater than current offset: %d",
  824. vp.Offset, currentOffset)
  825. }
  826. }
  827. ret.lock.RLock()
  828. return ret, nil
  829. }
  830. // Read reads the value log at a given location.
  831. // TODO: Make this read private.
  832. func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
  833. buf, lf, err := vlog.readValueBytes(vp)
  834. // log file is locked so, decide whether to lock immediately or let the caller to
  835. // unlock it, after caller uses it.
  836. cb := vlog.getUnlockCallback(lf)
  837. if err != nil {
  838. return nil, cb, err
  839. }
  840. if vlog.opt.VerifyValueChecksum {
  841. hash := crc32.New(y.CastagnoliCrcTable)
  842. if _, err := hash.Write(buf[:len(buf)-crc32.Size]); err != nil {
  843. runCallback(cb)
  844. return nil, nil, y.Wrapf(err, "failed to write hash for vp %+v", vp)
  845. }
  846. // Fetch checksum from the end of the buffer.
  847. checksum := buf[len(buf)-crc32.Size:]
  848. if hash.Sum32() != y.BytesToU32(checksum) {
  849. runCallback(cb)
  850. return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", vp)
  851. }
  852. }
  853. var h header
  854. headerLen := h.Decode(buf)
  855. kv := buf[headerLen:]
  856. if lf.encryptionEnabled() {
  857. kv, err = lf.decryptKV(kv, vp.Offset)
  858. if err != nil {
  859. return nil, cb, err
  860. }
  861. }
  862. if uint32(len(kv)) < h.klen+h.vlen {
  863. vlog.db.opt.Errorf("Invalid read: vp: %+v", vp)
  864. return nil, nil, fmt.Errorf("Invalid read: Len: %d read at:[%d:%d]",
  865. len(kv), h.klen, h.klen+h.vlen)
  866. }
  867. return kv[h.klen : h.klen+h.vlen], cb, nil
  868. }
  869. // getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
  870. // otherwise, it unlock the logfile and return nil.
  871. func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
  872. if lf == nil {
  873. return nil
  874. }
  875. return lf.lock.RUnlock
  876. }
  877. // readValueBytes return vlog entry slice and read locked log file. Caller should take care of
  878. // logFile unlocking.
  879. func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
  880. lf, err := vlog.getFileRLocked(vp)
  881. if err != nil {
  882. return nil, nil, err
  883. }
  884. buf, err := lf.read(vp)
  885. y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
  886. y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
  887. return buf, lf, err
  888. }
  889. func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
  890. vlog.filesLock.RLock()
  891. defer vlog.filesLock.RUnlock()
  892. LOOP:
  893. // Pick a candidate that contains the largest amount of discardable data
  894. fid, discard := vlog.discardStats.MaxDiscard()
  895. // MaxDiscard will return fid=0 if it doesn't have any discard data. The
  896. // vlog files start from 1.
  897. if fid == 0 {
  898. vlog.opt.Debugf("No file with discard stats")
  899. return nil
  900. }
  901. lf, ok := vlog.filesMap[fid]
  902. // This file was deleted but it's discard stats increased because of compactions. The file
  903. // doesn't exist so we don't need to do anything. Skip it and retry.
  904. if !ok {
  905. vlog.discardStats.Update(fid, -1)
  906. goto LOOP
  907. }
  908. // We have a valid file.
  909. fi, err := lf.Fd.Stat()
  910. if err != nil {
  911. vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
  912. return nil
  913. }
  914. if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
  915. vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
  916. discard, thr, fi.Name())
  917. return nil
  918. }
  919. if fid < vlog.maxFid {
  920. vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
  921. lf, ok := vlog.filesMap[fid]
  922. y.AssertTrue(ok)
  923. return lf
  924. }
  925. // Don't randomly pick any value log file.
  926. return nil
  927. }
  928. func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
  929. if vs.Version != y.ParseTs(e.Key) {
  930. // Version not found. Discard.
  931. return true
  932. }
  933. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  934. return true
  935. }
  936. if (vs.Meta & bitValuePointer) == 0 {
  937. // Key also stores the value in LSM. Discard.
  938. return true
  939. }
  940. if (vs.Meta & bitFinTxn) > 0 {
  941. // Just a txn finish entry. Discard.
  942. return true
  943. }
  944. return false
  945. }
  946. func (vlog *valueLog) doRunGC(lf *logFile) error {
  947. _, span := otel.Tracer("").Start(context.TODO(), "Badger.GC")
  948. span.SetAttributes(attribute.String("GC rewrite for", lf.path))
  949. defer span.End()
  950. if err := vlog.rewrite(lf); err != nil {
  951. return err
  952. }
  953. // Remove the file from discardStats.
  954. vlog.discardStats.Update(lf.fid, -1)
  955. return nil
  956. }
  957. func (vlog *valueLog) waitOnGC(lc *z.Closer) {
  958. defer lc.Done()
  959. <-lc.HasBeenClosed() // Wait for lc to be closed.
  960. // Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
  961. // the channel of size 1.
  962. vlog.garbageCh <- struct{}{}
  963. }
  964. func (vlog *valueLog) runGC(discardRatio float64) error {
  965. select {
  966. case vlog.garbageCh <- struct{}{}:
  967. // Pick a log file for GC.
  968. defer func() {
  969. <-vlog.garbageCh
  970. }()
  971. lf := vlog.pickLog(discardRatio)
  972. if lf == nil {
  973. return ErrNoRewrite
  974. }
  975. return vlog.doRunGC(lf)
  976. default:
  977. return ErrRejected
  978. }
  979. }
  980. func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
  981. if vlog.opt.InMemory {
  982. return
  983. }
  984. for fid, discard := range stats {
  985. vlog.discardStats.Update(fid, discard)
  986. }
  987. // The following is to coordinate with some test cases where we want to
  988. // verify that at least one iteration of updateDiscardStats has been completed.
  989. vlog.db.logToSyncChan(updateDiscardStatsMsg)
  990. }
  991. type vlogThreshold struct {
  992. logger Logger
  993. percentile float64
  994. valueThreshold atomic.Int64
  995. valueCh chan []int64
  996. clearCh chan bool
  997. closer *z.Closer
  998. // Metrics contains a running log of statistics like amount of data stored etc.
  999. vlMetrics *z.HistogramData
  1000. }
  1001. func initVlogThreshold(opt *Options) *vlogThreshold {
  1002. getBounds := func() []float64 {
  1003. mxbd := opt.maxValueThreshold
  1004. mnbd := float64(opt.ValueThreshold)
  1005. y.AssertTruef(mxbd >= mnbd, "maximum threshold bound is less than the min threshold")
  1006. size := math.Min(mxbd-mnbd+1, 1024.0)
  1007. bdstp := (mxbd - mnbd) / size
  1008. bounds := make([]float64, int64(size))
  1009. for i := range bounds {
  1010. if i == 0 {
  1011. bounds[0] = mnbd
  1012. continue
  1013. }
  1014. if i == int(size-1) {
  1015. bounds[i] = mxbd
  1016. continue
  1017. }
  1018. bounds[i] = bounds[i-1] + bdstp
  1019. }
  1020. return bounds
  1021. }
  1022. lt := &vlogThreshold{
  1023. logger: opt.Logger,
  1024. percentile: opt.VLogPercentile,
  1025. valueCh: make(chan []int64, 1000),
  1026. clearCh: make(chan bool, 1),
  1027. closer: z.NewCloser(1),
  1028. vlMetrics: z.NewHistogramData(getBounds()),
  1029. }
  1030. lt.valueThreshold.Store(opt.ValueThreshold)
  1031. return lt
  1032. }
  1033. func (v *vlogThreshold) Clear(opt Options) {
  1034. v.valueThreshold.Store(opt.ValueThreshold)
  1035. v.clearCh <- true
  1036. }
  1037. func (v *vlogThreshold) update(sizes []int64) {
  1038. v.valueCh <- sizes
  1039. }
  1040. func (v *vlogThreshold) close() {
  1041. v.closer.SignalAndWait()
  1042. }
  1043. func (v *vlogThreshold) listenForValueThresholdUpdate() {
  1044. defer v.closer.Done()
  1045. for {
  1046. select {
  1047. case <-v.closer.HasBeenClosed():
  1048. return
  1049. case val := <-v.valueCh:
  1050. for _, e := range val {
  1051. v.vlMetrics.Update(e)
  1052. }
  1053. // we are making it to get Options.VlogPercentile so that values with sizes
  1054. // in range of Options.VlogPercentile will make it to the LSM tree and rest to the
  1055. // value log file.
  1056. p := int64(v.vlMetrics.Percentile(v.percentile))
  1057. if v.valueThreshold.Load() != p {
  1058. if v.logger != nil {
  1059. v.logger.Infof("updating value of threshold to: %d", p)
  1060. }
  1061. v.valueThreshold.Store(p)
  1062. }
  1063. case <-v.clearCh:
  1064. v.vlMetrics.Clear()
  1065. }
  1066. }
  1067. }