memtable.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bufio"
  8. "bytes"
  9. "crypto/aes"
  10. cryptorand "crypto/rand"
  11. "encoding/binary"
  12. "fmt"
  13. "hash/crc32"
  14. "io"
  15. "os"
  16. "path/filepath"
  17. "sort"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "sync/atomic"
  22. "github.com/dgraph-io/badger/v4/pb"
  23. "github.com/dgraph-io/badger/v4/skl"
  24. "github.com/dgraph-io/badger/v4/y"
  25. "github.com/dgraph-io/ristretto/v2/z"
  26. )
  27. // memTable structure stores a skiplist and a corresponding WAL. Writes to memTable are written
  28. // both to the WAL and the skiplist. On a crash, the WAL is replayed to bring the skiplist back to
  29. // its pre-crash form.
  30. type memTable struct {
  31. // TODO: Give skiplist z.Calloc'd []byte.
  32. sl *skl.Skiplist
  33. wal *logFile
  34. maxVersion uint64
  35. opt Options
  36. buf *bytes.Buffer
  37. }
  38. func (db *DB) openMemTables(opt Options) error {
  39. // We don't need to open any tables in in-memory mode.
  40. if db.opt.InMemory {
  41. return nil
  42. }
  43. files, err := os.ReadDir(db.opt.Dir)
  44. if err != nil {
  45. return errFile(err, db.opt.Dir, "Unable to open mem dir.")
  46. }
  47. var fids []int
  48. for _, file := range files {
  49. if !strings.HasSuffix(file.Name(), memFileExt) {
  50. continue
  51. }
  52. fsz := len(file.Name())
  53. fid, err := strconv.ParseInt(file.Name()[:fsz-len(memFileExt)], 10, 64)
  54. if err != nil {
  55. return errFile(err, file.Name(), "Unable to parse log id.")
  56. }
  57. fids = append(fids, int(fid))
  58. }
  59. // Sort in ascending order.
  60. sort.Slice(fids, func(i, j int) bool {
  61. return fids[i] < fids[j]
  62. })
  63. for _, fid := range fids {
  64. flags := os.O_RDWR
  65. if db.opt.ReadOnly {
  66. flags = os.O_RDONLY
  67. }
  68. mt, err := db.openMemTable(fid, flags)
  69. if err != nil {
  70. return y.Wrapf(err, "while opening fid: %d", fid)
  71. }
  72. // If this memtable is empty we don't need to add it. This is a
  73. // memtable that was completely truncated.
  74. if mt.sl.Empty() {
  75. mt.DecrRef()
  76. continue
  77. }
  78. // These should no longer be written to. So, make them part of the imm.
  79. db.imm = append(db.imm, mt)
  80. }
  81. if len(fids) != 0 {
  82. db.nextMemFid = fids[len(fids)-1]
  83. }
  84. db.nextMemFid++
  85. return nil
  86. }
  87. const memFileExt string = ".mem"
  88. func (db *DB) openMemTable(fid, flags int) (*memTable, error) {
  89. filepath := db.mtFilePath(fid)
  90. s := skl.NewSkiplist(arenaSize(db.opt))
  91. mt := &memTable{
  92. sl: s,
  93. opt: db.opt,
  94. buf: &bytes.Buffer{},
  95. }
  96. // We don't need to create the wal for the skiplist in in-memory mode so return the mt.
  97. if db.opt.InMemory {
  98. return mt, z.NewFile
  99. }
  100. mt.wal = &logFile{
  101. fid: uint32(fid),
  102. path: filepath,
  103. registry: db.registry,
  104. writeAt: vlogHeaderSize,
  105. opt: db.opt,
  106. }
  107. lerr := mt.wal.open(filepath, flags, 2*db.opt.MemTableSize)
  108. if lerr != z.NewFile && lerr != nil {
  109. return nil, y.Wrapf(lerr, "While opening memtable: %s", filepath)
  110. }
  111. // Have a callback set to delete WAL when skiplist reference count goes down to zero. That is,
  112. // when it gets flushed to L0.
  113. s.OnClose = func() {
  114. if err := mt.wal.Delete(); err != nil {
  115. db.opt.Errorf("while deleting file: %s, err: %v", filepath, err)
  116. }
  117. }
  118. if lerr == z.NewFile {
  119. return mt, lerr
  120. }
  121. err := mt.UpdateSkipList()
  122. return mt, y.Wrapf(err, "while updating skiplist")
  123. }
  124. func (db *DB) newMemTable() (*memTable, error) {
  125. mt, err := db.openMemTable(db.nextMemFid, os.O_CREATE|os.O_RDWR)
  126. if err == z.NewFile {
  127. db.nextMemFid++
  128. return mt, nil
  129. }
  130. if err != nil {
  131. db.opt.Errorf("Got error: %v for id: %d\n", err, db.nextMemFid)
  132. return nil, y.Wrapf(err, "newMemTable")
  133. }
  134. return nil, fmt.Errorf("File %s already exists", mt.wal.Fd.Name())
  135. }
  136. func (db *DB) mtFilePath(fid int) string {
  137. return filepath.Join(db.opt.Dir, fmt.Sprintf("%05d%s", fid, memFileExt))
  138. }
  139. func (mt *memTable) SyncWAL() error {
  140. return mt.wal.Sync()
  141. }
  142. func (mt *memTable) isFull() bool {
  143. if mt.sl.MemSize() >= mt.opt.MemTableSize {
  144. return true
  145. }
  146. if mt.opt.InMemory {
  147. // InMemory mode doesn't have any WAL.
  148. return false
  149. }
  150. return int64(mt.wal.writeAt) >= mt.opt.MemTableSize
  151. }
  152. func (mt *memTable) Put(key []byte, value y.ValueStruct) error {
  153. entry := &Entry{
  154. Key: key,
  155. Value: value.Value,
  156. UserMeta: value.UserMeta,
  157. meta: value.Meta,
  158. ExpiresAt: value.ExpiresAt,
  159. }
  160. // wal is nil only when badger in running in in-memory mode and we don't need the wal.
  161. if mt.wal != nil {
  162. // If WAL exceeds opt.ValueLogFileSize, we'll force flush the memTable. See logic in
  163. // ensureRoomForWrite.
  164. if err := mt.wal.writeEntry(mt.buf, entry, mt.opt); err != nil {
  165. return y.Wrapf(err, "cannot write entry to WAL file")
  166. }
  167. }
  168. // We insert the finish marker in the WAL but not in the memtable.
  169. if entry.meta&bitFinTxn > 0 {
  170. return nil
  171. }
  172. // Write to skiplist and update maxVersion encountered.
  173. mt.sl.Put(key, value)
  174. if ts := y.ParseTs(entry.Key); ts > mt.maxVersion {
  175. mt.maxVersion = ts
  176. }
  177. y.NumBytesWrittenToL0Add(mt.opt.MetricsEnabled, entry.estimateSizeAndSetThreshold(mt.opt.ValueThreshold))
  178. return nil
  179. }
  180. func (mt *memTable) UpdateSkipList() error {
  181. if mt.wal == nil || mt.sl == nil {
  182. return nil
  183. }
  184. endOff, err := mt.wal.iterate(true, 0, mt.replayFunction(mt.opt))
  185. if err != nil {
  186. return y.Wrapf(err, "while iterating wal: %s", mt.wal.Fd.Name())
  187. }
  188. if endOff < mt.wal.size.Load() && mt.opt.ReadOnly {
  189. return y.Wrapf(ErrTruncateNeeded, "end offset: %d < size: %d", endOff, mt.wal.size.Load())
  190. }
  191. return mt.wal.Truncate(int64(endOff))
  192. }
  193. // IncrRef increases the refcount
  194. func (mt *memTable) IncrRef() {
  195. mt.sl.IncrRef()
  196. }
  197. // DecrRef decrements the refcount, deallocating the Skiplist when done using it
  198. func (mt *memTable) DecrRef() {
  199. mt.sl.DecrRef()
  200. }
  201. func (mt *memTable) replayFunction(opt Options) func(Entry, valuePointer) error {
  202. first := true
  203. return func(e Entry, _ valuePointer) error { // Function for replaying.
  204. if first {
  205. opt.Debugf("First key=%q\n", e.Key)
  206. }
  207. first = false
  208. if ts := y.ParseTs(e.Key); ts > mt.maxVersion {
  209. mt.maxVersion = ts
  210. }
  211. v := y.ValueStruct{
  212. Value: e.Value,
  213. Meta: e.meta,
  214. UserMeta: e.UserMeta,
  215. ExpiresAt: e.ExpiresAt,
  216. }
  217. // This is already encoded correctly. Value would be either a vptr, or a full value
  218. // depending upon how big the original value was. Skiplist makes a copy of the key and
  219. // value.
  220. mt.sl.Put(e.Key, v)
  221. return nil
  222. }
  223. }
  224. type logFile struct {
  225. *z.MmapFile
  226. path string
  227. // This is a lock on the log file. It guards the fd’s value, the file’s
  228. // existence and the file’s memory map.
  229. //
  230. // Use shared ownership when reading/writing the file or memory map, use
  231. // exclusive ownership to open/close the descriptor, unmap or remove the file.
  232. lock sync.RWMutex
  233. fid uint32
  234. size atomic.Uint32
  235. dataKey *pb.DataKey
  236. baseIV []byte
  237. registry *KeyRegistry
  238. writeAt uint32
  239. opt Options
  240. }
  241. func (lf *logFile) Truncate(end int64) error {
  242. if fi, err := lf.Fd.Stat(); err != nil {
  243. return fmt.Errorf("while file.stat on file: %s, error: %v\n", lf.Fd.Name(), err)
  244. } else if fi.Size() == end {
  245. return nil
  246. }
  247. y.AssertTrue(!lf.opt.ReadOnly)
  248. lf.size.Store(uint32(end))
  249. return lf.MmapFile.Truncate(end)
  250. }
  251. // encodeEntry will encode entry to the buf
  252. // layout of entry
  253. // +--------+-----+-------+-------+
  254. // | header | key | value | crc32 |
  255. // +--------+-----+-------+-------+
  256. func (lf *logFile) encodeEntry(buf *bytes.Buffer, e *Entry, offset uint32) (int, error) {
  257. h := header{
  258. klen: uint32(len(e.Key)),
  259. vlen: uint32(len(e.Value)),
  260. expiresAt: e.ExpiresAt,
  261. meta: e.meta,
  262. userMeta: e.UserMeta,
  263. }
  264. hash := crc32.New(y.CastagnoliCrcTable)
  265. writer := io.MultiWriter(buf, hash)
  266. // encode header.
  267. var headerEnc [maxHeaderSize]byte
  268. sz := h.Encode(headerEnc[:])
  269. y.Check2(writer.Write(headerEnc[:sz]))
  270. // we'll encrypt only key and value.
  271. if lf.encryptionEnabled() {
  272. // TODO: no need to allocate the bytes. we can calculate the encrypted buf one by one
  273. // since we're using ctr mode of AES encryption. Ordering won't changed. Need some
  274. // refactoring in XORBlock which will work like stream cipher.
  275. eBuf := make([]byte, 0, len(e.Key)+len(e.Value))
  276. eBuf = append(eBuf, e.Key...)
  277. eBuf = append(eBuf, e.Value...)
  278. if err := y.XORBlockStream(
  279. writer, eBuf, lf.dataKey.Data, lf.generateIV(offset)); err != nil {
  280. return 0, y.Wrapf(err, "Error while encoding entry for vlog.")
  281. }
  282. } else {
  283. // Encryption is disabled so writing directly to the buffer.
  284. y.Check2(writer.Write(e.Key))
  285. y.Check2(writer.Write(e.Value))
  286. }
  287. // write crc32 hash.
  288. var crcBuf [crc32.Size]byte
  289. binary.BigEndian.PutUint32(crcBuf[:], hash.Sum32())
  290. y.Check2(buf.Write(crcBuf[:]))
  291. // return encoded length.
  292. return len(headerEnc[:sz]) + len(e.Key) + len(e.Value) + len(crcBuf), nil
  293. }
  294. func (lf *logFile) writeEntry(buf *bytes.Buffer, e *Entry, opt Options) error {
  295. buf.Reset()
  296. plen, err := lf.encodeEntry(buf, e, lf.writeAt)
  297. if err != nil {
  298. return err
  299. }
  300. y.AssertTrue(plen == copy(lf.Data[lf.writeAt:], buf.Bytes()))
  301. lf.writeAt += uint32(plen)
  302. lf.zeroNextEntry()
  303. return nil
  304. }
  305. func (lf *logFile) decodeEntry(buf []byte, offset uint32) (*Entry, error) {
  306. var h header
  307. hlen := h.Decode(buf)
  308. kv := buf[hlen:]
  309. if lf.encryptionEnabled() {
  310. var err error
  311. // No need to worry about mmap. because, XORBlock allocates a byte array to do the
  312. // xor. So, the given slice is not being mutated.
  313. if kv, err = lf.decryptKV(kv, offset); err != nil {
  314. return nil, err
  315. }
  316. }
  317. e := &Entry{
  318. meta: h.meta,
  319. UserMeta: h.userMeta,
  320. ExpiresAt: h.expiresAt,
  321. offset: offset,
  322. Key: kv[:h.klen],
  323. Value: kv[h.klen : h.klen+h.vlen],
  324. }
  325. return e, nil
  326. }
  327. func (lf *logFile) decryptKV(buf []byte, offset uint32) ([]byte, error) {
  328. return y.XORBlockAllocate(buf, lf.dataKey.Data, lf.generateIV(offset))
  329. }
  330. // KeyID returns datakey's ID.
  331. func (lf *logFile) keyID() uint64 {
  332. if lf.dataKey == nil {
  333. // If there is no datakey, then we'll return 0. Which means no encryption.
  334. return 0
  335. }
  336. return lf.dataKey.KeyId
  337. }
  338. func (lf *logFile) encryptionEnabled() bool {
  339. return lf.dataKey != nil
  340. }
  341. // Acquire lock on mmap/file if you are calling this
  342. func (lf *logFile) read(p valuePointer) (buf []byte, err error) {
  343. offset := p.Offset
  344. // Do not convert size to uint32, because the lf.Data can be of size
  345. // 4GB, which overflows the uint32 during conversion to make the size 0,
  346. // causing the read to fail with ErrEOF. See issue #585.
  347. size := int64(len(lf.Data))
  348. valsz := p.Len
  349. lfsz := lf.size.Load()
  350. if int64(offset) >= size || int64(offset+valsz) > size ||
  351. // Ensure that the read is within the file's actual size. It might be possible that
  352. // the offset+valsz length is beyond the file's actual size. This could happen when
  353. // dropAll and iterations are running simultaneously.
  354. int64(offset+valsz) > int64(lfsz) {
  355. err = y.ErrEOF
  356. } else {
  357. buf = lf.Data[offset : offset+valsz]
  358. }
  359. return buf, err
  360. }
  361. // generateIV will generate IV by appending given offset with the base IV.
  362. func (lf *logFile) generateIV(offset uint32) []byte {
  363. iv := make([]byte, aes.BlockSize)
  364. // baseIV is of 12 bytes.
  365. y.AssertTrue(12 == copy(iv[:12], lf.baseIV))
  366. // remaining 4 bytes is obtained from offset.
  367. binary.BigEndian.PutUint32(iv[12:], offset)
  368. return iv
  369. }
  370. func (lf *logFile) doneWriting(offset uint32) error {
  371. if lf.opt.SyncWrites {
  372. if err := lf.Sync(); err != nil {
  373. return y.Wrapf(err, "Unable to sync value log: %q", lf.path)
  374. }
  375. }
  376. // Before we were acquiring a lock here on lf.lock, because we were invalidating the file
  377. // descriptor due to reopening it as read-only. Now, we don't invalidate the fd, but unmap it,
  378. // truncate it and remap it. That creates a window where we have segfaults because the mmap is
  379. // no longer valid, while someone might be reading it. Therefore, we need a lock here again.
  380. lf.lock.Lock()
  381. defer lf.lock.Unlock()
  382. if err := lf.Truncate(int64(offset)); err != nil {
  383. return y.Wrapf(err, "Unable to truncate file: %q", lf.path)
  384. }
  385. // Previously we used to close the file after it was written and reopen it in read-only mode.
  386. // We no longer open files in read-only mode. We keep all vlog files open in read-write mode.
  387. return nil
  388. }
  389. // iterate iterates over log file. It doesn't not allocate new memory for every kv pair.
  390. // Therefore, the kv pair is only valid for the duration of fn call.
  391. func (lf *logFile) iterate(readOnly bool, offset uint32, fn logEntry) (uint32, error) {
  392. if offset == 0 {
  393. // If offset is set to zero, let's advance past the encryption key header.
  394. offset = vlogHeaderSize
  395. }
  396. // For now, read directly from file, because it allows
  397. reader := bufio.NewReader(lf.NewReader(int(offset)))
  398. read := &safeRead{
  399. k: make([]byte, 10),
  400. v: make([]byte, 10),
  401. recordOffset: offset,
  402. lf: lf,
  403. }
  404. var lastCommit uint64
  405. var validEndOffset uint32 = offset
  406. var entries []*Entry
  407. var vptrs []valuePointer
  408. loop:
  409. for {
  410. e, err := read.Entry(reader)
  411. switch {
  412. // We have not reached the end of the file but the entry we read is
  413. // zero. This happens because we have truncated the file and
  414. // zero'ed it out.
  415. case err == io.EOF:
  416. break loop
  417. case err == io.ErrUnexpectedEOF || err == errTruncate:
  418. break loop
  419. case err != nil:
  420. return 0, err
  421. case e == nil:
  422. continue
  423. case e.isZero():
  424. break loop
  425. }
  426. var vp valuePointer
  427. vp.Len = uint32(e.hlen + len(e.Key) + len(e.Value) + crc32.Size)
  428. read.recordOffset += vp.Len
  429. vp.Offset = e.offset
  430. vp.Fid = lf.fid
  431. switch {
  432. case e.meta&bitTxn > 0:
  433. txnTs := y.ParseTs(e.Key)
  434. if lastCommit == 0 {
  435. lastCommit = txnTs
  436. }
  437. if lastCommit != txnTs {
  438. break loop
  439. }
  440. entries = append(entries, e)
  441. vptrs = append(vptrs, vp)
  442. case e.meta&bitFinTxn > 0:
  443. txnTs, err := strconv.ParseUint(string(e.Value), 10, 64)
  444. if err != nil || lastCommit != txnTs {
  445. break loop
  446. }
  447. // Got the end of txn. Now we can store them.
  448. lastCommit = 0
  449. validEndOffset = read.recordOffset
  450. for i, e := range entries {
  451. vp := vptrs[i]
  452. if err := fn(*e, vp); err != nil {
  453. if err == errStop {
  454. break
  455. }
  456. return 0, errFile(err, lf.path, "Iteration function")
  457. }
  458. }
  459. entries = entries[:0]
  460. vptrs = vptrs[:0]
  461. default:
  462. if lastCommit != 0 {
  463. // This is most likely an entry which was moved as part of GC.
  464. // We shouldn't get this entry in the middle of a transaction.
  465. break loop
  466. }
  467. validEndOffset = read.recordOffset
  468. if err := fn(*e, vp); err != nil {
  469. if err == errStop {
  470. break
  471. }
  472. return 0, errFile(err, lf.path, "Iteration function")
  473. }
  474. }
  475. }
  476. return validEndOffset, nil
  477. }
  478. // Zero out the next entry to deal with any crashes.
  479. func (lf *logFile) zeroNextEntry() {
  480. z.ZeroOut(lf.Data, int(lf.writeAt), int(lf.writeAt+maxHeaderSize))
  481. }
  482. func (lf *logFile) open(path string, flags int, fsize int64) error {
  483. mf, ferr := z.OpenMmapFile(path, flags, int(fsize))
  484. lf.MmapFile = mf
  485. if ferr == z.NewFile {
  486. if err := lf.bootstrap(); err != nil {
  487. os.Remove(path)
  488. return err
  489. }
  490. lf.size.Store(vlogHeaderSize)
  491. } else if ferr != nil {
  492. return y.Wrapf(ferr, "while opening file: %s", path)
  493. }
  494. lf.size.Store(uint32(len(lf.Data)))
  495. if lf.size.Load() < vlogHeaderSize {
  496. // Every vlog file should have at least vlogHeaderSize. If it is less than vlogHeaderSize
  497. // then it must have been corrupted. But no need to handle here. log replayer will truncate
  498. // and bootstrap the logfile. So ignoring here.
  499. return nil
  500. }
  501. // Copy over the encryption registry data.
  502. buf := make([]byte, vlogHeaderSize)
  503. y.AssertTruef(vlogHeaderSize == copy(buf, lf.Data),
  504. "Unable to copy from %s, size %d", path, lf.size.Load())
  505. keyID := binary.BigEndian.Uint64(buf[:8])
  506. // retrieve datakey.
  507. if dk, err := lf.registry.DataKey(keyID); err != nil {
  508. return y.Wrapf(err, "While opening vlog file %d", lf.fid)
  509. } else {
  510. lf.dataKey = dk
  511. }
  512. lf.baseIV = buf[8:]
  513. y.AssertTrue(len(lf.baseIV) == 12)
  514. // Preserved ferr so we can return if this was a new file.
  515. return ferr
  516. }
  517. // bootstrap will initialize the log file with key id and baseIV.
  518. // The below figure shows the layout of log file.
  519. // +----------------+------------------+------------------+
  520. // | keyID(8 bytes) | baseIV(12 bytes)| entry... |
  521. // +----------------+------------------+------------------+
  522. func (lf *logFile) bootstrap() error {
  523. var err error
  524. // generate data key for the log file.
  525. var dk *pb.DataKey
  526. if dk, err = lf.registry.LatestDataKey(); err != nil {
  527. return y.Wrapf(err, "Error while retrieving datakey in logFile.bootstarp")
  528. }
  529. lf.dataKey = dk
  530. // We'll always preserve vlogHeaderSize for key id and baseIV.
  531. buf := make([]byte, vlogHeaderSize)
  532. // write key id to the buf.
  533. // key id will be zero if the logfile is in plain text.
  534. binary.BigEndian.PutUint64(buf[:8], lf.keyID())
  535. // generate base IV. It'll be used with offset of the vptr to encrypt the entry.
  536. if _, err := cryptorand.Read(buf[8:]); err != nil {
  537. return y.Wrapf(err, "Error while creating base IV, while creating logfile")
  538. }
  539. // Initialize base IV.
  540. lf.baseIV = buf[8:]
  541. y.AssertTrue(len(lf.baseIV) == 12)
  542. // Copy over to the logFile.
  543. y.AssertTrue(vlogHeaderSize == copy(lf.Data[0:], buf))
  544. // Zero out the next entry.
  545. lf.zeroNextEntry()
  546. return nil
  547. }