value.go 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package badger
  17. import (
  18. "bytes"
  19. "context"
  20. stderrors "errors"
  21. "fmt"
  22. "hash"
  23. "hash/crc32"
  24. "io"
  25. "math"
  26. "os"
  27. "sort"
  28. "strconv"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "github.com/pkg/errors"
  33. otrace "go.opencensus.io/trace"
  34. "github.com/dgraph-io/badger/v4/y"
  35. "github.com/dgraph-io/ristretto/v2/z"
  36. )
  37. // maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
  38. // uint32, so limiting at max uint32.
  39. var maxVlogFileSize uint32 = math.MaxUint32
  40. // Values have their first byte being byteData or byteDelete. This helps us distinguish between
  41. // a key that has never been seen and a key that has been explicitly deleted.
  42. const (
  43. bitDelete byte = 1 << 0 // Set if the key has been deleted.
  44. bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
  45. bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
  46. // Set if item shouldn't be discarded via compactions (used by merge operator)
  47. bitMergeEntry byte = 1 << 3
  48. // The MSB 2 bits are for transactions.
  49. bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
  50. bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
  51. mi int64 = 1 << 20 //nolint:unused
  52. // size of vlog header.
  53. // +----------------+------------------+
  54. // | keyID(8 bytes) | baseIV(12 bytes)|
  55. // +----------------+------------------+
  56. vlogHeaderSize = 20
  57. )
  58. var errStop = stderrors.New("Stop iteration")
  59. var errTruncate = stderrors.New("Do truncate")
  60. type logEntry func(e Entry, vp valuePointer) error
  61. type safeRead struct {
  62. k []byte
  63. v []byte
  64. recordOffset uint32
  65. lf *logFile
  66. }
  67. // hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
  68. // bytes read. The hashReader writes to h (hash) what it reads from r.
  69. type hashReader struct {
  70. r io.Reader
  71. h hash.Hash32
  72. bytesRead int // Number of bytes read.
  73. }
  74. func newHashReader(r io.Reader) *hashReader {
  75. hash := crc32.New(y.CastagnoliCrcTable)
  76. return &hashReader{
  77. r: r,
  78. h: hash,
  79. }
  80. }
  81. // Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
  82. func (t *hashReader) Read(p []byte) (int, error) {
  83. n, err := t.r.Read(p)
  84. if err != nil {
  85. return n, err
  86. }
  87. t.bytesRead += n
  88. return t.h.Write(p[:n])
  89. }
  90. // ReadByte reads exactly one byte from the reader. Returns error on failure.
  91. func (t *hashReader) ReadByte() (byte, error) {
  92. b := make([]byte, 1)
  93. _, err := t.Read(b)
  94. return b[0], err
  95. }
  96. // Sum32 returns the sum32 of the underlying hash.
  97. func (t *hashReader) Sum32() uint32 {
  98. return t.h.Sum32()
  99. }
  100. // Entry reads an entry from the provided reader. It also validates the checksum for every entry
  101. // read. Returns error on failure.
  102. func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
  103. tee := newHashReader(reader)
  104. var h header
  105. hlen, err := h.DecodeFrom(tee)
  106. if err != nil {
  107. return nil, err
  108. }
  109. if h.klen > uint32(1<<16) { // Key length must be below uint16.
  110. return nil, errTruncate
  111. }
  112. kl := int(h.klen)
  113. if cap(r.k) < kl {
  114. r.k = make([]byte, 2*kl)
  115. }
  116. vl := int(h.vlen)
  117. if cap(r.v) < vl {
  118. r.v = make([]byte, 2*vl)
  119. }
  120. e := &Entry{}
  121. e.offset = r.recordOffset
  122. e.hlen = hlen
  123. buf := make([]byte, h.klen+h.vlen)
  124. if _, err := io.ReadFull(tee, buf[:]); err != nil {
  125. if err == io.EOF {
  126. err = errTruncate
  127. }
  128. return nil, err
  129. }
  130. if r.lf.encryptionEnabled() {
  131. if buf, err = r.lf.decryptKV(buf[:], r.recordOffset); err != nil {
  132. return nil, err
  133. }
  134. }
  135. e.Key = buf[:h.klen]
  136. e.Value = buf[h.klen:]
  137. var crcBuf [crc32.Size]byte
  138. if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
  139. if err == io.EOF {
  140. err = errTruncate
  141. }
  142. return nil, err
  143. }
  144. crc := y.BytesToU32(crcBuf[:])
  145. if crc != tee.Sum32() {
  146. return nil, errTruncate
  147. }
  148. e.meta = h.meta
  149. e.UserMeta = h.userMeta
  150. e.ExpiresAt = h.expiresAt
  151. return e, nil
  152. }
  153. func (vlog *valueLog) rewrite(f *logFile) error {
  154. vlog.filesLock.RLock()
  155. for _, fid := range vlog.filesToBeDeleted {
  156. if fid == f.fid {
  157. vlog.filesLock.RUnlock()
  158. return errors.Errorf("value log file already marked for deletion fid: %d", fid)
  159. }
  160. }
  161. maxFid := vlog.maxFid
  162. y.AssertTruef(f.fid < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
  163. vlog.filesLock.RUnlock()
  164. vlog.opt.Infof("Rewriting fid: %d", f.fid)
  165. wb := make([]*Entry, 0, 1000)
  166. var size int64
  167. y.AssertTrue(vlog.db != nil)
  168. var count, moved int
  169. fe := func(e Entry) error {
  170. count++
  171. if count%100000 == 0 {
  172. vlog.opt.Debugf("Processing entry %d", count)
  173. }
  174. vs, err := vlog.db.get(e.Key)
  175. if err != nil {
  176. return err
  177. }
  178. if discardEntry(e, vs, vlog.db) {
  179. return nil
  180. }
  181. // Value is still present in value log.
  182. if len(vs.Value) == 0 {
  183. return errors.Errorf("Empty value: %+v", vs)
  184. }
  185. var vp valuePointer
  186. vp.Decode(vs.Value)
  187. // If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
  188. if vp.Fid > f.fid {
  189. return nil
  190. }
  191. // If the entry found from the LSM Tree points to an offset greater than the one
  192. // read from vlog, don't do anything.
  193. if vp.Offset > e.offset {
  194. return nil
  195. }
  196. // If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
  197. // insert them back into the DB.
  198. // NOTE: It might be possible that the entry read from the LSM Tree points to
  199. // an older vlog file. See the comments in the else part.
  200. if vp.Fid == f.fid && vp.Offset == e.offset {
  201. moved++
  202. // This new entry only contains the key, and a pointer to the value.
  203. ne := new(Entry)
  204. // Remove only the bitValuePointer and transaction markers. We
  205. // should keep the other bits.
  206. ne.meta = e.meta &^ (bitValuePointer | bitTxn | bitFinTxn)
  207. ne.UserMeta = e.UserMeta
  208. ne.ExpiresAt = e.ExpiresAt
  209. ne.Key = append([]byte{}, e.Key...)
  210. ne.Value = append([]byte{}, e.Value...)
  211. es := ne.estimateSizeAndSetThreshold(vlog.db.valueThreshold())
  212. // Consider size of value as well while considering the total size
  213. // of the batch. There have been reports of high memory usage in
  214. // rewrite because we don't consider the value size. See #1292.
  215. es += int64(len(e.Value))
  216. // Ensure length and size of wb is within transaction limits.
  217. if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
  218. size+es >= vlog.opt.maxBatchSize {
  219. if err := vlog.db.batchSet(wb); err != nil {
  220. return err
  221. }
  222. size = 0
  223. wb = wb[:0]
  224. }
  225. wb = append(wb, ne)
  226. size += es
  227. } else { //nolint:staticcheck
  228. // It might be possible that the entry read from LSM Tree points to
  229. // an older vlog file. This can happen in the following situation.
  230. // Assume DB is opened with
  231. // numberOfVersionsToKeep=1
  232. //
  233. // Now, if we have ONLY one key in the system "FOO" which has been
  234. // updated 3 times and the same key has been garbage collected 3
  235. // times, we'll have 3 versions of the movekey
  236. // for the same key "FOO".
  237. //
  238. // NOTE: moveKeyi is the gc'ed version of the original key with version i
  239. // We're calling the gc'ed keys as moveKey to simplify the
  240. // explanantion. We used to add move keys but we no longer do that.
  241. //
  242. // Assume we have 3 move keys in L0.
  243. // - moveKey1 (points to vlog file 10),
  244. // - moveKey2 (points to vlog file 14) and
  245. // - moveKey3 (points to vlog file 15).
  246. //
  247. // Also, assume there is another move key "moveKey1" (points to
  248. // vlog file 6) (this is also a move Key for key "FOO" ) on upper
  249. // levels (let's say 3). The move key "moveKey1" on level 0 was
  250. // inserted because vlog file 6 was GCed.
  251. //
  252. // Here's what the arrangement looks like
  253. // L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
  254. // L1 => ....
  255. // L2 => ....
  256. // L3 => (moveKey1 => vlog6)
  257. //
  258. // When L0 compaction runs, it keeps only moveKey3 because the number of versions
  259. // to keep is set to 1. (we've dropped moveKey1's latest version)
  260. //
  261. // The new arrangement of keys is
  262. // L0 => ....
  263. // L1 => (moveKey3 => vlog15)
  264. // L2 => ....
  265. // L3 => (moveKey1 => vlog6)
  266. //
  267. // Now if we try to GC vlog file 10, the entry read from vlog file
  268. // will point to vlog10 but the entry read from LSM Tree will point
  269. // to vlog6. The move key read from LSM tree will point to vlog6
  270. // because we've asked for version 1 of the move key.
  271. //
  272. // This might seem like an issue but it's not really an issue
  273. // because the user has set the number of versions to keep to 1 and
  274. // the latest version of moveKey points to the correct vlog file
  275. // and offset. The stale move key on L3 will be eventually dropped
  276. // by compaction because there is a newer versions in the upper
  277. // levels.
  278. }
  279. return nil
  280. }
  281. _, err := f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
  282. return fe(e)
  283. })
  284. if err != nil {
  285. return err
  286. }
  287. batchSize := 1024
  288. var loops int
  289. for i := 0; i < len(wb); {
  290. loops++
  291. if batchSize == 0 {
  292. vlog.db.opt.Warningf("We shouldn't reach batch size of zero.")
  293. return ErrNoRewrite
  294. }
  295. end := i + batchSize
  296. if end > len(wb) {
  297. end = len(wb)
  298. }
  299. if err := vlog.db.batchSet(wb[i:end]); err != nil {
  300. if err == ErrTxnTooBig {
  301. // Decrease the batch size to half.
  302. batchSize = batchSize / 2
  303. continue
  304. }
  305. return err
  306. }
  307. i += batchSize
  308. }
  309. vlog.opt.Infof("Processed %d entries in %d loops", len(wb), loops)
  310. vlog.opt.Infof("Total entries: %d. Moved: %d", count, moved)
  311. vlog.opt.Infof("Removing fid: %d", f.fid)
  312. var deleteFileNow bool
  313. // Entries written to LSM. Remove the older file now.
  314. {
  315. vlog.filesLock.Lock()
  316. // Just a sanity-check.
  317. if _, ok := vlog.filesMap[f.fid]; !ok {
  318. vlog.filesLock.Unlock()
  319. return errors.Errorf("Unable to find fid: %d", f.fid)
  320. }
  321. if vlog.iteratorCount() == 0 {
  322. delete(vlog.filesMap, f.fid)
  323. deleteFileNow = true
  324. } else {
  325. vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
  326. }
  327. vlog.filesLock.Unlock()
  328. }
  329. if deleteFileNow {
  330. if err := vlog.deleteLogFile(f); err != nil {
  331. return err
  332. }
  333. }
  334. return nil
  335. }
  336. func (vlog *valueLog) incrIteratorCount() {
  337. vlog.numActiveIterators.Add(1)
  338. }
  339. func (vlog *valueLog) iteratorCount() int {
  340. return int(vlog.numActiveIterators.Load())
  341. }
  342. func (vlog *valueLog) decrIteratorCount() error {
  343. num := vlog.numActiveIterators.Add(-1)
  344. if num != 0 {
  345. return nil
  346. }
  347. vlog.filesLock.Lock()
  348. lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
  349. for _, id := range vlog.filesToBeDeleted {
  350. lfs = append(lfs, vlog.filesMap[id])
  351. delete(vlog.filesMap, id)
  352. }
  353. vlog.filesToBeDeleted = nil
  354. vlog.filesLock.Unlock()
  355. for _, lf := range lfs {
  356. if err := vlog.deleteLogFile(lf); err != nil {
  357. return err
  358. }
  359. }
  360. return nil
  361. }
  362. func (vlog *valueLog) deleteLogFile(lf *logFile) error {
  363. if lf == nil {
  364. return nil
  365. }
  366. lf.lock.Lock()
  367. defer lf.lock.Unlock()
  368. // Delete fid from discard stats as well.
  369. vlog.discardStats.Update(lf.fid, -1)
  370. return lf.Delete()
  371. }
  372. func (vlog *valueLog) dropAll() (int, error) {
  373. // If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
  374. if vlog.db.opt.InMemory {
  375. return 0, nil
  376. }
  377. // We don't want to block dropAll on any pending transactions. So, don't worry about iterator
  378. // count.
  379. var count int
  380. deleteAll := func() error {
  381. vlog.filesLock.Lock()
  382. defer vlog.filesLock.Unlock()
  383. for _, lf := range vlog.filesMap {
  384. if err := vlog.deleteLogFile(lf); err != nil {
  385. return err
  386. }
  387. count++
  388. }
  389. vlog.filesMap = make(map[uint32]*logFile)
  390. vlog.maxFid = 0
  391. return nil
  392. }
  393. if err := deleteAll(); err != nil {
  394. return count, err
  395. }
  396. vlog.db.opt.Infof("Value logs deleted. Creating value log file: 1")
  397. if _, err := vlog.createVlogFile(); err != nil { // Called while writes are stopped.
  398. return count, err
  399. }
  400. return count, nil
  401. }
  402. func (db *DB) valueThreshold() int64 {
  403. return db.threshold.valueThreshold.Load()
  404. }
  405. type valueLog struct {
  406. dirPath string
  407. // guards our view of which files exist, which to be deleted, how many active iterators
  408. filesLock sync.RWMutex
  409. filesMap map[uint32]*logFile
  410. maxFid uint32
  411. filesToBeDeleted []uint32
  412. // A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
  413. numActiveIterators atomic.Int32
  414. db *DB
  415. writableLogOffset atomic.Uint32 // read by read, written by write
  416. numEntriesWritten uint32
  417. opt Options
  418. garbageCh chan struct{}
  419. discardStats *discardStats
  420. }
  421. func vlogFilePath(dirPath string, fid uint32) string {
  422. return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
  423. }
  424. func (vlog *valueLog) fpath(fid uint32) string {
  425. return vlogFilePath(vlog.dirPath, fid)
  426. }
  427. func (vlog *valueLog) populateFilesMap() error {
  428. vlog.filesMap = make(map[uint32]*logFile)
  429. files, err := os.ReadDir(vlog.dirPath)
  430. if err != nil {
  431. return errFile(err, vlog.dirPath, "Unable to open log dir.")
  432. }
  433. found := make(map[uint64]struct{})
  434. for _, file := range files {
  435. if !strings.HasSuffix(file.Name(), ".vlog") {
  436. continue
  437. }
  438. fsz := len(file.Name())
  439. fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
  440. if err != nil {
  441. return errFile(err, file.Name(), "Unable to parse log id.")
  442. }
  443. if _, ok := found[fid]; ok {
  444. return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
  445. }
  446. found[fid] = struct{}{}
  447. lf := &logFile{
  448. fid: uint32(fid),
  449. path: vlog.fpath(uint32(fid)),
  450. registry: vlog.db.registry,
  451. }
  452. vlog.filesMap[uint32(fid)] = lf
  453. if vlog.maxFid < uint32(fid) {
  454. vlog.maxFid = uint32(fid)
  455. }
  456. }
  457. return nil
  458. }
  459. func (vlog *valueLog) createVlogFile() (*logFile, error) {
  460. fid := vlog.maxFid + 1
  461. path := vlog.fpath(fid)
  462. lf := &logFile{
  463. fid: fid,
  464. path: path,
  465. registry: vlog.db.registry,
  466. writeAt: vlogHeaderSize,
  467. opt: vlog.opt,
  468. }
  469. err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
  470. if err != z.NewFile && err != nil {
  471. return nil, err
  472. }
  473. vlog.filesLock.Lock()
  474. vlog.filesMap[fid] = lf
  475. y.AssertTrue(vlog.maxFid < fid)
  476. vlog.maxFid = fid
  477. // writableLogOffset is only written by write func, by read by Read func.
  478. // To avoid a race condition, all reads and updates to this variable must be
  479. // done via atomics.
  480. vlog.writableLogOffset.Store(vlogHeaderSize)
  481. vlog.numEntriesWritten = 0
  482. vlog.filesLock.Unlock()
  483. return lf, nil
  484. }
  485. func errFile(err error, path string, msg string) error {
  486. return fmt.Errorf("%s. Path=%s. Error=%v", msg, path, err)
  487. }
  488. // init initializes the value log struct. This initialization needs to happen
  489. // before compactions start.
  490. func (vlog *valueLog) init(db *DB) {
  491. vlog.opt = db.opt
  492. vlog.db = db
  493. // We don't need to open any vlog files or collect stats for GC if DB is opened
  494. // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
  495. if vlog.opt.InMemory {
  496. return
  497. }
  498. vlog.dirPath = vlog.opt.ValueDir
  499. vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
  500. lf, err := InitDiscardStats(vlog.opt)
  501. y.Check(err)
  502. vlog.discardStats = lf
  503. // See TestPersistLFDiscardStats for purpose of statement below.
  504. db.logToSyncChan(endVLogInitMsg)
  505. }
  506. func (vlog *valueLog) open(db *DB) error {
  507. // We don't need to open any vlog files or collect stats for GC if DB is opened
  508. // in InMemory mode. InMemory mode doesn't create any files/directories on disk.
  509. if db.opt.InMemory {
  510. return nil
  511. }
  512. if err := vlog.populateFilesMap(); err != nil {
  513. return err
  514. }
  515. // If no files are found, then create a new file.
  516. if len(vlog.filesMap) == 0 {
  517. if vlog.opt.ReadOnly {
  518. return nil
  519. }
  520. _, err := vlog.createVlogFile()
  521. return y.Wrapf(err, "Error while creating log file in valueLog.open")
  522. }
  523. fids := vlog.sortedFids()
  524. for _, fid := range fids {
  525. lf, ok := vlog.filesMap[fid]
  526. y.AssertTrue(ok)
  527. // Just open in RDWR mode. This should not create a new log file.
  528. lf.opt = vlog.opt
  529. if err := lf.open(vlog.fpath(fid), os.O_RDWR,
  530. 2*vlog.opt.ValueLogFileSize); err != nil {
  531. return y.Wrapf(err, "Open existing file: %q", lf.path)
  532. }
  533. // We shouldn't delete the maxFid file.
  534. if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid {
  535. vlog.opt.Infof("Deleting empty file: %s", lf.path)
  536. if err := lf.Delete(); err != nil {
  537. return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
  538. }
  539. delete(vlog.filesMap, fid)
  540. }
  541. }
  542. if vlog.opt.ReadOnly {
  543. return nil
  544. }
  545. // Now we can read the latest value log file, and see if it needs truncation. We could
  546. // technically do this over all the value log files, but that would mean slowing down the value
  547. // log open.
  548. last, ok := vlog.filesMap[vlog.maxFid]
  549. y.AssertTrue(ok)
  550. lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
  551. func(_ Entry, vp valuePointer) error {
  552. return nil
  553. })
  554. if err != nil {
  555. return y.Wrapf(err, "while iterating over: %s", last.path)
  556. }
  557. if err := last.Truncate(int64(lastOff)); err != nil {
  558. return y.Wrapf(err, "while truncating last value log file: %s", last.path)
  559. }
  560. // Don't write to the old log file. Always create a new one.
  561. if _, err := vlog.createVlogFile(); err != nil {
  562. return y.Wrapf(err, "Error while creating log file in valueLog.open")
  563. }
  564. return nil
  565. }
  566. func (vlog *valueLog) Close() error {
  567. if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
  568. return nil
  569. }
  570. vlog.opt.Debugf("Stopping garbage collection of values.")
  571. var err error
  572. for id, lf := range vlog.filesMap {
  573. lf.lock.Lock() // We won’t release the lock.
  574. offset := int64(-1)
  575. if !vlog.opt.ReadOnly && id == vlog.maxFid {
  576. offset = int64(vlog.woffset())
  577. }
  578. if terr := lf.Close(offset); terr != nil && err == nil {
  579. err = terr
  580. }
  581. }
  582. if vlog.discardStats != nil {
  583. vlog.db.captureDiscardStats()
  584. if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
  585. err = terr
  586. }
  587. }
  588. return err
  589. }
  590. // sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
  591. // filesMap.
  592. func (vlog *valueLog) sortedFids() []uint32 {
  593. toBeDeleted := make(map[uint32]struct{})
  594. for _, fid := range vlog.filesToBeDeleted {
  595. toBeDeleted[fid] = struct{}{}
  596. }
  597. ret := make([]uint32, 0, len(vlog.filesMap))
  598. for fid := range vlog.filesMap {
  599. if _, ok := toBeDeleted[fid]; !ok {
  600. ret = append(ret, fid)
  601. }
  602. }
  603. sort.Slice(ret, func(i, j int) bool {
  604. return ret[i] < ret[j]
  605. })
  606. return ret
  607. }
  608. type request struct {
  609. // Input values
  610. Entries []*Entry
  611. // Output values and wait group stuff below
  612. Ptrs []valuePointer
  613. Wg sync.WaitGroup
  614. Err error
  615. ref atomic.Int32
  616. }
  617. func (req *request) reset() {
  618. req.Entries = req.Entries[:0]
  619. req.Ptrs = req.Ptrs[:0]
  620. req.Wg = sync.WaitGroup{}
  621. req.Err = nil
  622. req.ref.Store(0)
  623. }
  624. func (req *request) IncrRef() {
  625. req.ref.Add(1)
  626. }
  627. func (req *request) DecrRef() {
  628. nRef := req.ref.Add(-1)
  629. if nRef > 0 {
  630. return
  631. }
  632. req.Entries = nil
  633. requestPool.Put(req)
  634. }
  635. func (req *request) Wait() error {
  636. req.Wg.Wait()
  637. err := req.Err
  638. req.DecrRef() // DecrRef after writing to DB.
  639. return err
  640. }
  641. type requests []*request
  642. func (reqs requests) DecrRef() {
  643. for _, req := range reqs {
  644. req.DecrRef()
  645. }
  646. }
  647. func (reqs requests) IncrRef() {
  648. for _, req := range reqs {
  649. req.IncrRef()
  650. }
  651. }
  652. // sync function syncs content of latest value log file to disk. Syncing of value log directory is
  653. // not required here as it happens every time a value log file rotation happens(check createVlogFile
  654. // function). During rotation, previous value log file also gets synced to disk. It only syncs file
  655. // if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
  656. // fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
  657. func (vlog *valueLog) sync() error {
  658. if vlog.opt.SyncWrites || vlog.opt.InMemory {
  659. return nil
  660. }
  661. vlog.filesLock.RLock()
  662. maxFid := vlog.maxFid
  663. curlf := vlog.filesMap[maxFid]
  664. // Sometimes it is possible that vlog.maxFid has been increased but file creation
  665. // with same id is still in progress and this function is called. In those cases
  666. // entry for the file might not be present in vlog.filesMap.
  667. if curlf == nil {
  668. vlog.filesLock.RUnlock()
  669. return nil
  670. }
  671. curlf.lock.RLock()
  672. vlog.filesLock.RUnlock()
  673. err := curlf.Sync()
  674. curlf.lock.RUnlock()
  675. return err
  676. }
  677. func (vlog *valueLog) woffset() uint32 {
  678. return vlog.writableLogOffset.Load()
  679. }
  680. // validateWrites will check whether the given requests can fit into 4GB vlog file.
  681. // NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
  682. // uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
  683. func (vlog *valueLog) validateWrites(reqs []*request) error {
  684. vlogOffset := uint64(vlog.woffset())
  685. for _, req := range reqs {
  686. // calculate size of the request.
  687. size := estimateRequestSize(req)
  688. estimatedVlogOffset := vlogOffset + size
  689. if estimatedVlogOffset > uint64(maxVlogFileSize) {
  690. return errors.Errorf("Request size offset %d is bigger than maximum offset %d",
  691. estimatedVlogOffset, maxVlogFileSize)
  692. }
  693. if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
  694. // We'll create a new vlog file if the estimated offset is greater or equal to
  695. // max vlog size. So, resetting the vlogOffset.
  696. vlogOffset = 0
  697. continue
  698. }
  699. // Estimated vlog offset will become current vlog offset if the vlog is not rotated.
  700. vlogOffset = estimatedVlogOffset
  701. }
  702. return nil
  703. }
  704. // estimateRequestSize returns the size that needed to be written for the given request.
  705. func estimateRequestSize(req *request) uint64 {
  706. size := uint64(0)
  707. for _, e := range req.Entries {
  708. size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
  709. }
  710. return size
  711. }
  712. // write is thread-unsafe by design and should not be called concurrently.
  713. func (vlog *valueLog) write(reqs []*request) error {
  714. if vlog.db.opt.InMemory {
  715. return nil
  716. }
  717. // Validate writes before writing to vlog. Because, we don't want to partially write and return
  718. // an error.
  719. if err := vlog.validateWrites(reqs); err != nil {
  720. return y.Wrapf(err, "while validating writes")
  721. }
  722. vlog.filesLock.RLock()
  723. maxFid := vlog.maxFid
  724. curlf := vlog.filesMap[maxFid]
  725. vlog.filesLock.RUnlock()
  726. defer func() {
  727. if vlog.opt.SyncWrites {
  728. if err := curlf.Sync(); err != nil {
  729. vlog.opt.Errorf("Error while curlf sync: %v\n", err)
  730. }
  731. }
  732. }()
  733. write := func(buf *bytes.Buffer) error {
  734. if buf.Len() == 0 {
  735. return nil
  736. }
  737. n := uint32(buf.Len())
  738. endOffset := vlog.writableLogOffset.Add(n)
  739. // Increase the file size if we cannot accommodate this entry.
  740. // [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
  741. if int(endOffset) >= len(curlf.Data) {
  742. if err := curlf.Truncate(int64(endOffset)); err != nil {
  743. return err
  744. }
  745. }
  746. start := int(endOffset - n)
  747. y.AssertTrue(copy(curlf.Data[start:], buf.Bytes()) == int(n))
  748. curlf.size.Store(endOffset)
  749. return nil
  750. }
  751. toDisk := func() error {
  752. if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) ||
  753. vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
  754. if err := curlf.doneWriting(vlog.woffset()); err != nil {
  755. return err
  756. }
  757. newlf, err := vlog.createVlogFile()
  758. if err != nil {
  759. return err
  760. }
  761. curlf = newlf
  762. }
  763. return nil
  764. }
  765. buf := new(bytes.Buffer)
  766. for i := range reqs {
  767. b := reqs[i]
  768. b.Ptrs = b.Ptrs[:0]
  769. var written, bytesWritten int
  770. valueSizes := make([]int64, 0, len(b.Entries))
  771. for j := range b.Entries {
  772. buf.Reset()
  773. e := b.Entries[j]
  774. valueSizes = append(valueSizes, int64(len(e.Value)))
  775. if e.skipVlogAndSetThreshold(vlog.db.valueThreshold()) {
  776. b.Ptrs = append(b.Ptrs, valuePointer{})
  777. continue
  778. }
  779. var p valuePointer
  780. p.Fid = curlf.fid
  781. p.Offset = vlog.woffset()
  782. // We should not store transaction marks in the vlog file because it will never have all
  783. // the entries in a transaction. If we store entries with transaction marks then value
  784. // GC will not be able to iterate on the entire vlog file.
  785. // But, we still want the entry to stay intact for the memTable WAL. So, store the meta
  786. // in a temporary variable and reassign it after writing to the value log.
  787. tmpMeta := e.meta
  788. e.meta = e.meta &^ (bitTxn | bitFinTxn)
  789. plen, err := curlf.encodeEntry(buf, e, p.Offset) // Now encode the entry into buffer.
  790. if err != nil {
  791. return err
  792. }
  793. // Restore the meta.
  794. e.meta = tmpMeta
  795. p.Len = uint32(plen)
  796. b.Ptrs = append(b.Ptrs, p)
  797. if err := write(buf); err != nil {
  798. return err
  799. }
  800. written++
  801. bytesWritten += buf.Len()
  802. // No need to flush anything, we write to file directly via mmap.
  803. }
  804. y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
  805. y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
  806. vlog.numEntriesWritten += uint32(written)
  807. vlog.db.threshold.update(valueSizes)
  808. // We write to disk here so that all entries that are part of the same transaction are
  809. // written to the same vlog file.
  810. if err := toDisk(); err != nil {
  811. return err
  812. }
  813. }
  814. return toDisk()
  815. }
  816. // Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
  817. // (if non-nil)
  818. func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
  819. vlog.filesLock.RLock()
  820. defer vlog.filesLock.RUnlock()
  821. ret, ok := vlog.filesMap[vp.Fid]
  822. if !ok {
  823. // log file has gone away, we can't do anything. Return.
  824. return nil, errors.Errorf("file with ID: %d not found", vp.Fid)
  825. }
  826. // Check for valid offset if we are reading from writable log.
  827. maxFid := vlog.maxFid
  828. // In read-only mode we don't need to check for writable offset as we are not writing anything.
  829. // Moreover, this offset is not set in readonly mode.
  830. if !vlog.opt.ReadOnly && vp.Fid == maxFid {
  831. currentOffset := vlog.woffset()
  832. if vp.Offset >= currentOffset {
  833. return nil, errors.Errorf(
  834. "Invalid value pointer offset: %d greater than current offset: %d",
  835. vp.Offset, currentOffset)
  836. }
  837. }
  838. ret.lock.RLock()
  839. return ret, nil
  840. }
  841. // Read reads the value log at a given location.
  842. // TODO: Make this read private.
  843. func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
  844. buf, lf, err := vlog.readValueBytes(vp)
  845. // log file is locked so, decide whether to lock immediately or let the caller to
  846. // unlock it, after caller uses it.
  847. cb := vlog.getUnlockCallback(lf)
  848. if err != nil {
  849. return nil, cb, err
  850. }
  851. if vlog.opt.VerifyValueChecksum {
  852. hash := crc32.New(y.CastagnoliCrcTable)
  853. if _, err := hash.Write(buf[:len(buf)-crc32.Size]); err != nil {
  854. runCallback(cb)
  855. return nil, nil, y.Wrapf(err, "failed to write hash for vp %+v", vp)
  856. }
  857. // Fetch checksum from the end of the buffer.
  858. checksum := buf[len(buf)-crc32.Size:]
  859. if hash.Sum32() != y.BytesToU32(checksum) {
  860. runCallback(cb)
  861. return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", vp)
  862. }
  863. }
  864. var h header
  865. headerLen := h.Decode(buf)
  866. kv := buf[headerLen:]
  867. if lf.encryptionEnabled() {
  868. kv, err = lf.decryptKV(kv, vp.Offset)
  869. if err != nil {
  870. return nil, cb, err
  871. }
  872. }
  873. if uint32(len(kv)) < h.klen+h.vlen {
  874. vlog.db.opt.Errorf("Invalid read: vp: %+v", vp)
  875. return nil, nil, errors.Errorf("Invalid read: Len: %d read at:[%d:%d]",
  876. len(kv), h.klen, h.klen+h.vlen)
  877. }
  878. return kv[h.klen : h.klen+h.vlen], cb, nil
  879. }
  880. // getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
  881. // otherwise, it unlock the logfile and return nil.
  882. func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
  883. if lf == nil {
  884. return nil
  885. }
  886. return lf.lock.RUnlock
  887. }
  888. // readValueBytes return vlog entry slice and read locked log file. Caller should take care of
  889. // logFile unlocking.
  890. func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
  891. lf, err := vlog.getFileRLocked(vp)
  892. if err != nil {
  893. return nil, nil, err
  894. }
  895. buf, err := lf.read(vp)
  896. y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
  897. y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
  898. return buf, lf, err
  899. }
  900. func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
  901. vlog.filesLock.RLock()
  902. defer vlog.filesLock.RUnlock()
  903. LOOP:
  904. // Pick a candidate that contains the largest amount of discardable data
  905. fid, discard := vlog.discardStats.MaxDiscard()
  906. // MaxDiscard will return fid=0 if it doesn't have any discard data. The
  907. // vlog files start from 1.
  908. if fid == 0 {
  909. vlog.opt.Debugf("No file with discard stats")
  910. return nil
  911. }
  912. lf, ok := vlog.filesMap[fid]
  913. // This file was deleted but it's discard stats increased because of compactions. The file
  914. // doesn't exist so we don't need to do anything. Skip it and retry.
  915. if !ok {
  916. vlog.discardStats.Update(fid, -1)
  917. goto LOOP
  918. }
  919. // We have a valid file.
  920. fi, err := lf.Fd.Stat()
  921. if err != nil {
  922. vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
  923. return nil
  924. }
  925. if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
  926. vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
  927. discard, thr, fi.Name())
  928. return nil
  929. }
  930. if fid < vlog.maxFid {
  931. vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
  932. lf, ok := vlog.filesMap[fid]
  933. y.AssertTrue(ok)
  934. return lf
  935. }
  936. // Don't randomly pick any value log file.
  937. return nil
  938. }
  939. func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
  940. if vs.Version != y.ParseTs(e.Key) {
  941. // Version not found. Discard.
  942. return true
  943. }
  944. if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
  945. return true
  946. }
  947. if (vs.Meta & bitValuePointer) == 0 {
  948. // Key also stores the value in LSM. Discard.
  949. return true
  950. }
  951. if (vs.Meta & bitFinTxn) > 0 {
  952. // Just a txn finish entry. Discard.
  953. return true
  954. }
  955. return false
  956. }
  957. func (vlog *valueLog) doRunGC(lf *logFile) error {
  958. _, span := otrace.StartSpan(context.Background(), "Badger.GC")
  959. span.Annotatef(nil, "GC rewrite for: %v", lf.path)
  960. defer span.End()
  961. if err := vlog.rewrite(lf); err != nil {
  962. return err
  963. }
  964. // Remove the file from discardStats.
  965. vlog.discardStats.Update(lf.fid, -1)
  966. return nil
  967. }
  968. func (vlog *valueLog) waitOnGC(lc *z.Closer) {
  969. defer lc.Done()
  970. <-lc.HasBeenClosed() // Wait for lc to be closed.
  971. // Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
  972. // the channel of size 1.
  973. vlog.garbageCh <- struct{}{}
  974. }
  975. func (vlog *valueLog) runGC(discardRatio float64) error {
  976. select {
  977. case vlog.garbageCh <- struct{}{}:
  978. // Pick a log file for GC.
  979. defer func() {
  980. <-vlog.garbageCh
  981. }()
  982. lf := vlog.pickLog(discardRatio)
  983. if lf == nil {
  984. return ErrNoRewrite
  985. }
  986. return vlog.doRunGC(lf)
  987. default:
  988. return ErrRejected
  989. }
  990. }
  991. func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
  992. if vlog.opt.InMemory {
  993. return
  994. }
  995. for fid, discard := range stats {
  996. vlog.discardStats.Update(fid, discard)
  997. }
  998. // The following is to coordinate with some test cases where we want to
  999. // verify that at least one iteration of updateDiscardStats has been completed.
  1000. vlog.db.logToSyncChan(updateDiscardStatsMsg)
  1001. }
  1002. type vlogThreshold struct {
  1003. logger Logger
  1004. percentile float64
  1005. valueThreshold atomic.Int64
  1006. valueCh chan []int64
  1007. clearCh chan bool
  1008. closer *z.Closer
  1009. // Metrics contains a running log of statistics like amount of data stored etc.
  1010. vlMetrics *z.HistogramData
  1011. }
  1012. func initVlogThreshold(opt *Options) *vlogThreshold {
  1013. getBounds := func() []float64 {
  1014. mxbd := opt.maxValueThreshold
  1015. mnbd := float64(opt.ValueThreshold)
  1016. y.AssertTruef(mxbd >= mnbd, "maximum threshold bound is less than the min threshold")
  1017. size := math.Min(mxbd-mnbd+1, 1024.0)
  1018. bdstp := (mxbd - mnbd) / size
  1019. bounds := make([]float64, int64(size))
  1020. for i := range bounds {
  1021. if i == 0 {
  1022. bounds[0] = mnbd
  1023. continue
  1024. }
  1025. if i == int(size-1) {
  1026. bounds[i] = mxbd
  1027. continue
  1028. }
  1029. bounds[i] = bounds[i-1] + bdstp
  1030. }
  1031. return bounds
  1032. }
  1033. lt := &vlogThreshold{
  1034. logger: opt.Logger,
  1035. percentile: opt.VLogPercentile,
  1036. valueCh: make(chan []int64, 1000),
  1037. clearCh: make(chan bool, 1),
  1038. closer: z.NewCloser(1),
  1039. vlMetrics: z.NewHistogramData(getBounds()),
  1040. }
  1041. lt.valueThreshold.Store(opt.ValueThreshold)
  1042. return lt
  1043. }
  1044. func (v *vlogThreshold) Clear(opt Options) {
  1045. v.valueThreshold.Store(opt.ValueThreshold)
  1046. v.clearCh <- true
  1047. }
  1048. func (v *vlogThreshold) update(sizes []int64) {
  1049. v.valueCh <- sizes
  1050. }
  1051. func (v *vlogThreshold) close() {
  1052. v.closer.SignalAndWait()
  1053. }
  1054. func (v *vlogThreshold) listenForValueThresholdUpdate() {
  1055. defer v.closer.Done()
  1056. for {
  1057. select {
  1058. case <-v.closer.HasBeenClosed():
  1059. return
  1060. case val := <-v.valueCh:
  1061. for _, e := range val {
  1062. v.vlMetrics.Update(e)
  1063. }
  1064. // we are making it to get Options.VlogPercentile so that values with sizes
  1065. // in range of Options.VlogPercentile will make it to the LSM tree and rest to the
  1066. // value log file.
  1067. p := int64(v.vlMetrics.Percentile(v.percentile))
  1068. if v.valueThreshold.Load() != p {
  1069. if v.logger != nil {
  1070. v.logger.Infof("updating value of threshold to: %d", p)
  1071. }
  1072. v.valueThreshold.Store(p)
  1073. }
  1074. case <-v.clearCh:
  1075. v.vlMetrics.Clear()
  1076. }
  1077. }
  1078. }