memtable.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bufio"
  8. "bytes"
  9. "crypto/aes"
  10. cryptorand "crypto/rand"
  11. "encoding/binary"
  12. "fmt"
  13. "hash/crc32"
  14. "io"
  15. "os"
  16. "path/filepath"
  17. "sort"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "sync/atomic"
  22. "github.com/pkg/errors"
  23. "github.com/dgraph-io/badger/v4/pb"
  24. "github.com/dgraph-io/badger/v4/skl"
  25. "github.com/dgraph-io/badger/v4/y"
  26. "github.com/dgraph-io/ristretto/v2/z"
  27. )
  28. // memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
  29. // both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
  30. // its pre-crash form.
  31. type memTable struct {
  32. // TODO: Give skiplist z.Calloc'd []byte.
  33. sl *skl.Skiplist
  34. wal *logFile
  35. maxVersion uint64
  36. opt Options
  37. buf *bytes.Buffer
  38. }
  39. func (db *DB) openMemTables(opt Options) error {
  40. // We don't need to open any tables in in-memory mode.
  41. if db.opt.InMemory {
  42. return nil
  43. }
  44. files, err := os.ReadDir(db.opt.Dir)
  45. if err != nil {
  46. return errFile(err, db.opt.Dir, "Unable to open mem dir.")
  47. }
  48. var fids []int
  49. for _, file := range files {
  50. if !strings.HasSuffix(file.Name(), memFileExt) {
  51. continue
  52. }
  53. fsz := len(file.Name())
  54. fid, err := strconv.ParseInt(file.Name()[:fsz-len(memFileExt)], 10, 64)
  55. if err != nil {
  56. return errFile(err, file.Name(), "Unable to parse log id.")
  57. }
  58. fids = append(fids, int(fid))
  59. }
  60. // Sort in ascending order.
  61. sort.Slice(fids, func(i, j int) bool {
  62. return fids[i] < fids[j]
  63. })
  64. for _, fid := range fids {
  65. flags := os.O_RDWR
  66. if db.opt.ReadOnly {
  67. flags = os.O_RDONLY
  68. }
  69. mt, err := db.openMemTable(fid, flags)
  70. if err != nil {
  71. return y.Wrapf(err, "while opening fid: %d", fid)
  72. }
  73. // If this memtable is empty we don't need to add it. This is a
  74. // memtable that was completely truncated.
  75. if mt.sl.Empty() {
  76. mt.DecrRef()
  77. continue
  78. }
  79. // These should no longer be written to. So, make them part of the imm.
  80. db.imm = append(db.imm, mt)
  81. }
  82. if len(fids) != 0 {
  83. db.nextMemFid = fids[len(fids)-1]
  84. }
  85. db.nextMemFid++
  86. return nil
  87. }
  88. const memFileExt string = ".mem"
  89. func (db *DB) openMemTable(fid, flags int) (*memTable, error) {
  90. filepath := db.mtFilePath(fid)
  91. s := skl.NewSkiplist(arenaSize(db.opt))
  92. mt := &memTable{
  93. sl: s,
  94. opt: db.opt,
  95. buf: &bytes.Buffer{},
  96. }
  97. // We don't need to create the wal for the skiplist in in-memory mode so return the mt.
  98. if db.opt.InMemory {
  99. return mt, z.NewFile
  100. }
  101. mt.wal = &logFile{
  102. fid: uint32(fid),
  103. path: filepath,
  104. registry: db.registry,
  105. writeAt: vlogHeaderSize,
  106. opt: db.opt,
  107. }
  108. lerr := mt.wal.open(filepath, flags, 2*db.opt.MemTableSize)
  109. if lerr != z.NewFile && lerr != nil {
  110. return nil, y.Wrapf(lerr, "While opening memtable: %s", filepath)
  111. }
  112. // Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
  113. // when it gets flushed to L0.
  114. s.OnClose = func() {
  115. if err := mt.wal.Delete(); err != nil {
  116. db.opt.Errorf("while deleting file: %s, err: %v", filepath, err)
  117. }
  118. }
  119. if lerr == z.NewFile {
  120. return mt, lerr
  121. }
  122. err := mt.UpdateSkipList()
  123. return mt, y.Wrapf(err, "while updating skiplist")
  124. }
  125. func (db *DB) newMemTable() (*memTable, error) {
  126. mt, err := db.openMemTable(db.nextMemFid, os.O_CREATE|os.O_RDWR)
  127. if err == z.NewFile {
  128. db.nextMemFid++
  129. return mt, nil
  130. }
  131. if err != nil {
  132. db.opt.Errorf("Got error: %v for id: %d\n", err, db.nextMemFid)
  133. return nil, y.Wrapf(err, "newMemTable")
  134. }
  135. return nil, errors.Errorf("File %s already exists", mt.wal.Fd.Name())
  136. }
  137. func (db *DB) mtFilePath(fid int) string {
  138. return filepath.Join(db.opt.Dir, fmt.Sprintf("%05d%s", fid, memFileExt))
  139. }
  140. func (mt *memTable) SyncWAL() error {
  141. return mt.wal.Sync()
  142. }
  143. func (mt *memTable) isFull() bool {
  144. if mt.sl.MemSize() >= mt.opt.MemTableSize {
  145. return true
  146. }
  147. if mt.opt.InMemory {
  148. // InMemory mode doesn't have any WAL.
  149. return false
  150. }
  151. return int64(mt.wal.writeAt) >= mt.opt.MemTableSize
  152. }
  153. func (mt *memTable) Put(key []byte, value y.ValueStruct) error {
  154. entry := &Entry{
  155. Key: key,
  156. Value: value.Value,
  157. UserMeta: value.UserMeta,
  158. meta: value.Meta,
  159. ExpiresAt: value.ExpiresAt,
  160. }
  161. // wal is nil only when badger in running in in-memory mode and we don't need the wal.
  162. if mt.wal != nil {
  163. // If WAL exceeds opt.ValueLogFileSize, we'll force flush the memTable. See logic in
  164. // ensureRoomForWrite.
  165. if err := mt.wal.writeEntry(mt.buf, entry, mt.opt); err != nil {
  166. return y.Wrapf(err, "cannot write entry to WAL file")
  167. }
  168. }
  169. // We insert the finish marker in the WAL but not in the memtable.
  170. if entry.meta&bitFinTxn > 0 {
  171. return nil
  172. }
  173. // Write to skiplist and update maxVersion encountered.
  174. mt.sl.Put(key, value)
  175. if ts := y.ParseTs(entry.Key); ts > mt.maxVersion {
  176. mt.maxVersion = ts
  177. }
  178. y.NumBytesWrittenToL0Add(mt.opt.MetricsEnabled, entry.estimateSizeAndSetThreshold(mt.opt.ValueThreshold))
  179. return nil
  180. }
  181. func (mt *memTable) UpdateSkipList() error {
  182. if mt.wal == nil || mt.sl == nil {
  183. return nil
  184. }
  185. endOff, err := mt.wal.iterate(true, 0, mt.replayFunction(mt.opt))
  186. if err != nil {
  187. return y.Wrapf(err, "while iterating wal: %s", mt.wal.Fd.Name())
  188. }
  189. if endOff < mt.wal.size.Load() && mt.opt.ReadOnly {
  190. return y.Wrapf(ErrTruncateNeeded, "end offset: %d < size: %d", endOff, mt.wal.size.Load())
  191. }
  192. return mt.wal.Truncate(int64(endOff))
  193. }
  194. // IncrRef increases the refcount
  195. func (mt *memTable) IncrRef() {
  196. mt.sl.IncrRef()
  197. }
  198. // DecrRef decrements the refcount, deallocating the Skiplist when done using it
  199. func (mt *memTable) DecrRef() {
  200. mt.sl.DecrRef()
  201. }
  202. func (mt *memTable) replayFunction(opt Options) func(Entry, valuePointer) error {
  203. first := true
  204. return func(e Entry, _ valuePointer) error { // Function for replaying.
  205. if first {
  206. opt.Debugf("First key=%q\n", e.Key)
  207. }
  208. first = false
  209. if ts := y.ParseTs(e.Key); ts > mt.maxVersion {
  210. mt.maxVersion = ts
  211. }
  212. v := y.ValueStruct{
  213. Value: e.Value,
  214. Meta: e.meta,
  215. UserMeta: e.UserMeta,
  216. ExpiresAt: e.ExpiresAt,
  217. }
  218. // This is already encoded correctly. Value would be either a vptr, or a full value
  219. // depending upon how big the original value was. Skiplist makes a copy of the key and
  220. // value.
  221. mt.sl.Put(e.Key, v)
  222. return nil
  223. }
  224. }
  225. type logFile struct {
  226. *z.MmapFile
  227. path string
  228. // This is a lock on the log file. It guards the fd’s value, the file’s
  229. // existence and the file’s memory map.
  230. //
  231. // Use shared ownership when reading/writing the file or memory map, use
  232. // exclusive ownership to open/close the descriptor, unmap or remove the file.
  233. lock sync.RWMutex
  234. fid uint32
  235. size atomic.Uint32
  236. dataKey *pb.DataKey
  237. baseIV []byte
  238. registry *KeyRegistry
  239. writeAt uint32
  240. opt Options
  241. }
  242. func (lf *logFile) Truncate(end int64) error {
  243. if fi, err := lf.Fd.Stat(); err != nil {
  244. return fmt.Errorf("while file.stat on file: %s, error: %v\n", lf.Fd.Name(), err)
  245. } else if fi.Size() == end {
  246. return nil
  247. }
  248. y.AssertTrue(!lf.opt.ReadOnly)
  249. lf.size.Store(uint32(end))
  250. return lf.MmapFile.Truncate(end)
  251. }
  252. // encodeEntry will encode entry to the buf
  253. // layout of entry
  254. // +--------+-----+-------+-------+
  255. // | header | key | value | crc32 |
  256. // +--------+-----+-------+-------+
  257. func (lf *logFile) encodeEntry(buf *bytes.Buffer, e *Entry, offset uint32) (int, error) {
  258. h := header{
  259. klen: uint32(len(e.Key)),
  260. vlen: uint32(len(e.Value)),
  261. expiresAt: e.ExpiresAt,
  262. meta: e.meta,
  263. userMeta: e.UserMeta,
  264. }
  265. hash := crc32.New(y.CastagnoliCrcTable)
  266. writer := io.MultiWriter(buf, hash)
  267. // encode header.
  268. var headerEnc [maxHeaderSize]byte
  269. sz := h.Encode(headerEnc[:])
  270. y.Check2(writer.Write(headerEnc[:sz]))
  271. // we'll encrypt only key and value.
  272. if lf.encryptionEnabled() {
  273. // TODO: no need to allocate the bytes. we can calculate the encrypted buf one by one
  274. // since we're using ctr mode of AES encryption. Ordering won't changed. Need some
  275. // refactoring in XORBlock which will work like stream cipher.
  276. eBuf := make([]byte, 0, len(e.Key)+len(e.Value))
  277. eBuf = append(eBuf, e.Key...)
  278. eBuf = append(eBuf, e.Value...)
  279. if err := y.XORBlockStream(
  280. writer, eBuf, lf.dataKey.Data, lf.generateIV(offset)); err != nil {
  281. return 0, y.Wrapf(err, "Error while encoding entry for vlog.")
  282. }
  283. } else {
  284. // Encryption is disabled so writing directly to the buffer.
  285. y.Check2(writer.Write(e.Key))
  286. y.Check2(writer.Write(e.Value))
  287. }
  288. // write crc32 hash.
  289. var crcBuf [crc32.Size]byte
  290. binary.BigEndian.PutUint32(crcBuf[:], hash.Sum32())
  291. y.Check2(buf.Write(crcBuf[:]))
  292. // return encoded length.
  293. return len(headerEnc[:sz]) + len(e.Key) + len(e.Value) + len(crcBuf), nil
  294. }
  295. func (lf *logFile) writeEntry(buf *bytes.Buffer, e *Entry, opt Options) error {
  296. buf.Reset()
  297. plen, err := lf.encodeEntry(buf, e, lf.writeAt)
  298. if err != nil {
  299. return err
  300. }
  301. y.AssertTrue(plen == copy(lf.Data[lf.writeAt:], buf.Bytes()))
  302. lf.writeAt += uint32(plen)
  303. lf.zeroNextEntry()
  304. return nil
  305. }
  306. func (lf *logFile) decodeEntry(buf []byte, offset uint32) (*Entry, error) {
  307. var h header
  308. hlen := h.Decode(buf)
  309. kv := buf[hlen:]
  310. if lf.encryptionEnabled() {
  311. var err error
  312. // No need to worry about mmap. because, XORBlock allocates a byte array to do the
  313. // xor. So, the given slice is not being mutated.
  314. if kv, err = lf.decryptKV(kv, offset); err != nil {
  315. return nil, err
  316. }
  317. }
  318. e := &Entry{
  319. meta: h.meta,
  320. UserMeta: h.userMeta,
  321. ExpiresAt: h.expiresAt,
  322. offset: offset,
  323. Key: kv[:h.klen],
  324. Value: kv[h.klen : h.klen+h.vlen],
  325. }
  326. return e, nil
  327. }
  328. func (lf *logFile) decryptKV(buf []byte, offset uint32) ([]byte, error) {
  329. return y.XORBlockAllocate(buf, lf.dataKey.Data, lf.generateIV(offset))
  330. }
  331. // KeyID returns datakey's ID.
  332. func (lf *logFile) keyID() uint64 {
  333. if lf.dataKey == nil {
  334. // If there is no datakey, then we'll return 0. Which means no encryption.
  335. return 0
  336. }
  337. return lf.dataKey.KeyId
  338. }
  339. func (lf *logFile) encryptionEnabled() bool {
  340. return lf.dataKey != nil
  341. }
  342. // Acquire lock on mmap/file if you are calling this
  343. func (lf *logFile) read(p valuePointer) (buf []byte, err error) {
  344. offset := p.Offset
  345. // Do not convert size to uint32, because the lf.Data can be of size
  346. // 4GB, which overflows the uint32 during conversion to make the size 0,
  347. // causing the read to fail with ErrEOF. See issue #585.
  348. size := int64(len(lf.Data))
  349. valsz := p.Len
  350. lfsz := lf.size.Load()
  351. if int64(offset) >= size || int64(offset+valsz) > size ||
  352. // Ensure that the read is within the file's actual size. It might be possible that
  353. // the offset+valsz length is beyond the file's actual size. This could happen when
  354. // dropAll and iterations are running simultaneously.
  355. int64(offset+valsz) > int64(lfsz) {
  356. err = y.ErrEOF
  357. } else {
  358. buf = lf.Data[offset : offset+valsz]
  359. }
  360. return buf, err
  361. }
  362. // generateIV will generate IV by appending given offset with the base IV.
  363. func (lf *logFile) generateIV(offset uint32) []byte {
  364. iv := make([]byte, aes.BlockSize)
  365. // baseIV is of 12 bytes.
  366. y.AssertTrue(12 == copy(iv[:12], lf.baseIV))
  367. // remaining 4 bytes is obtained from offset.
  368. binary.BigEndian.PutUint32(iv[12:], offset)
  369. return iv
  370. }
  371. func (lf *logFile) doneWriting(offset uint32) error {
  372. if lf.opt.SyncWrites {
  373. if err := lf.Sync(); err != nil {
  374. return y.Wrapf(err, "Unable to sync value log: %q", lf.path)
  375. }
  376. }
  377. // Before we were acquiring a lock here on lf.lock, because we were invalidating the file
  378. // descriptor due to reopening it as read-only. Now, we don't invalidate the fd, but unmap it,
  379. // truncate it and remap it. That creates a window where we have segfaults because the mmap is
  380. // no longer valid, while someone might be reading it. Therefore, we need a lock here again.
  381. lf.lock.Lock()
  382. defer lf.lock.Unlock()
  383. if err := lf.Truncate(int64(offset)); err != nil {
  384. return y.Wrapf(err, "Unable to truncate file: %q", lf.path)
  385. }
  386. // Previously we used to close the file after it was written and reopen it in read-only mode.
  387. // We no longer open files in read-only mode. We keep all vlog files open in read-write mode.
  388. return nil
  389. }
  390. // iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
  391. // Therefore, the kv pair is only valid for the duration of fn call.
  392. func (lf *logFile) iterate(readOnly bool, offset uint32, fn logEntry) (uint32, error) {
  393. if offset == 0 {
  394. // If offset is set to zero, let's advance past the encryption key header.
  395. offset = vlogHeaderSize
  396. }
  397. // For now, read directly from file, because it allows
  398. reader := bufio.NewReader(lf.NewReader(int(offset)))
  399. read := &safeRead{
  400. k: make([]byte, 10),
  401. v: make([]byte, 10),
  402. recordOffset: offset,
  403. lf: lf,
  404. }
  405. var lastCommit uint64
  406. var validEndOffset uint32 = offset
  407. var entries []*Entry
  408. var vptrs []valuePointer
  409. loop:
  410. for {
  411. e, err := read.Entry(reader)
  412. switch {
  413. // We have not reached the end of the file but the entry we read is
  414. // zero. This happens because we have truncated the file and
  415. // zero'ed it out.
  416. case err == io.EOF:
  417. break loop
  418. case err == io.ErrUnexpectedEOF || err == errTruncate:
  419. break loop
  420. case err != nil:
  421. return 0, err
  422. case e == nil:
  423. continue
  424. case e.isZero():
  425. break loop
  426. }
  427. var vp valuePointer
  428. vp.Len = uint32(e.hlen + len(e.Key) + len(e.Value) + crc32.Size)
  429. read.recordOffset += vp.Len
  430. vp.Offset = e.offset
  431. vp.Fid = lf.fid
  432. switch {
  433. case e.meta&bitTxn > 0:
  434. txnTs := y.ParseTs(e.Key)
  435. if lastCommit == 0 {
  436. lastCommit = txnTs
  437. }
  438. if lastCommit != txnTs {
  439. break loop
  440. }
  441. entries = append(entries, e)
  442. vptrs = append(vptrs, vp)
  443. case e.meta&bitFinTxn > 0:
  444. txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
  445. if err != nil || lastCommit != txnTs {
  446. break loop
  447. }
  448. // Got the end of txn. Now we can store them.
  449. lastCommit = 0
  450. validEndOffset = read.recordOffset
  451. for i, e := range entries {
  452. vp := vptrs[i]
  453. if err := fn(*e, vp); err != nil {
  454. if err == errStop {
  455. break
  456. }
  457. return 0, errFile(err, lf.path, "Iteration function")
  458. }
  459. }
  460. entries = entries[:0]
  461. vptrs = vptrs[:0]
  462. default:
  463. if lastCommit != 0 {
  464. // This is most likely an entry which was moved as part of GC.
  465. // We shouldn't get this entry in the middle of a transaction.
  466. break loop
  467. }
  468. validEndOffset = read.recordOffset
  469. if err := fn(*e, vp); err != nil {
  470. if err == errStop {
  471. break
  472. }
  473. return 0, errFile(err, lf.path, "Iteration function")
  474. }
  475. }
  476. }
  477. return validEndOffset, nil
  478. }
  479. // Zero out the next entry to deal with any crashes.
  480. func (lf *logFile) zeroNextEntry() {
  481. z.ZeroOut(lf.Data, int(lf.writeAt), int(lf.writeAt+maxHeaderSize))
  482. }
  483. func (lf *logFile) open(path string, flags int, fsize int64) error {
  484. mf, ferr := z.OpenMmapFile(path, flags, int(fsize))
  485. lf.MmapFile = mf
  486. if ferr == z.NewFile {
  487. if err := lf.bootstrap(); err != nil {
  488. os.Remove(path)
  489. return err
  490. }
  491. lf.size.Store(vlogHeaderSize)
  492. } else if ferr != nil {
  493. return y.Wrapf(ferr, "while opening file: %s", path)
  494. }
  495. lf.size.Store(uint32(len(lf.Data)))
  496. if lf.size.Load() < vlogHeaderSize {
  497. // Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize
  498. // then it must have been corrupted. But no need to handle here. log replayer will truncate
  499. // and bootstrap the logfile. So ignoring here.
  500. return nil
  501. }
  502. // Copy over the encryption registry data.
  503. buf := make([]byte, vlogHeaderSize)
  504. y.AssertTruef(vlogHeaderSize == copy(buf, lf.Data),
  505. "Unable to copy from %s, size %d", path, lf.size.Load())
  506. keyID := binary.BigEndian.Uint64(buf[:8])
  507. // retrieve datakey.
  508. if dk, err := lf.registry.DataKey(keyID); err != nil {
  509. return y.Wrapf(err, "While opening vlog file %d", lf.fid)
  510. } else {
  511. lf.dataKey = dk
  512. }
  513. lf.baseIV = buf[8:]
  514. y.AssertTrue(len(lf.baseIV) == 12)
  515. // Preserved ferr so we can return if this was a new file.
  516. return ferr
  517. }
  518. // bootstrap will initialize the log file with key id and baseIV.
  519. // The below figure shows the layout of log file.
  520. // +----------------+------------------+------------------+
  521. // | keyID(8 bytes) | baseIV(12 bytes)| entry... |
  522. // +----------------+------------------+------------------+
  523. func (lf *logFile) bootstrap() error {
  524. var err error
  525. // generate data key for the log file.
  526. var dk *pb.DataKey
  527. if dk, err = lf.registry.LatestDataKey(); err != nil {
  528. return y.Wrapf(err, "Error while retrieving datakey in logFile.bootstarp")
  529. }
  530. lf.dataKey = dk
  531. // We'll always preserve vlogHeaderSize for key id and baseIV.
  532. buf := make([]byte, vlogHeaderSize)
  533. // write key id to the buf.
  534. // key id will be zero if the logfile is in plain text.
  535. binary.BigEndian.PutUint64(buf[:8], lf.keyID())
  536. // generate base IV. It'll be used with offset of the vptr to encrypt the entry.
  537. if _, err := cryptorand.Read(buf[8:]); err != nil {
  538. return y.Wrapf(err, "Error while creating base IV, while creating logfile")
  539. }
  540. // Initialize base IV.
  541. lf.baseIV = buf[8:]
  542. y.AssertTrue(len(lf.baseIV) == 12)
  543. // Copy over to the logFile.
  544. y.AssertTrue(vlogHeaderSize == copy(lf.Data[0:], buf))
  545. // Zero out the next entry.
  546. lf.zeroNextEntry()
  547. return nil
  548. }