manifest.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package badger
  6. import (
  7. "bufio"
  8. "bytes"
  9. "encoding/binary"
  10. "errors"
  11. "fmt"
  12. "hash/crc32"
  13. "io"
  14. "math"
  15. "os"
  16. "path/filepath"
  17. "sync"
  18. "google.golang.org/protobuf/proto"
  19. "github.com/dgraph-io/badger/v4/options"
  20. "github.com/dgraph-io/badger/v4/pb"
  21. "github.com/dgraph-io/badger/v4/y"
  22. )
  23. // Manifest represents the contents of the MANIFEST file in a Badger store.
  24. //
  25. // The MANIFEST file describes the startup state of the db -- all LSM files and what level they're
  26. // at.
  27. //
  28. // It consists of a sequence of ManifestChangeSet objects. Each of these is treated atomically,
  29. // and contains a sequence of ManifestChange's (file creations/deletions) which we use to
  30. // reconstruct the manifest at startup.
  31. type Manifest struct {
  32. Levels []levelManifest
  33. Tables map[uint64]TableManifest
  34. // Contains total number of creation and deletion changes in the manifest -- used to compute
  35. // whether it'd be useful to rewrite the manifest.
  36. Creations int
  37. Deletions int
  38. }
  39. func createManifest() Manifest {
  40. levels := make([]levelManifest, 0)
  41. return Manifest{
  42. Levels: levels,
  43. Tables: make(map[uint64]TableManifest),
  44. }
  45. }
  46. // levelManifest contains information about LSM tree levels
  47. // in the MANIFEST file.
  48. type levelManifest struct {
  49. Tables map[uint64]struct{} // Set of table id's
  50. }
  51. // TableManifest contains information about a specific table
  52. // in the LSM tree.
  53. type TableManifest struct {
  54. Level uint8
  55. KeyID uint64
  56. Compression options.CompressionType
  57. }
  58. // manifestFile holds the file pointer (and other info) about the manifest file, which is a log
  59. // file we append to.
  60. type manifestFile struct {
  61. fp *os.File
  62. directory string
  63. // The external magic number used by the application running badger.
  64. externalMagic uint16
  65. // We make this configurable so that unit tests can hit rewrite() code quickly
  66. deletionsRewriteThreshold int
  67. // Guards appends, which includes access to the manifest field.
  68. appendLock sync.Mutex
  69. // Used to track the current state of the manifest, used when rewriting.
  70. manifest Manifest
  71. // Used to indicate if badger was opened in InMemory mode.
  72. inMemory bool
  73. }
  74. const (
  75. // ManifestFilename is the filename for the manifest file.
  76. ManifestFilename = "MANIFEST"
  77. manifestRewriteFilename = "MANIFEST-REWRITE"
  78. manifestDeletionsRewriteThreshold = 10000
  79. manifestDeletionsRatio = 10
  80. )
  81. // asChanges returns a sequence of changes that could be used to recreate the Manifest in its
  82. // present state.
  83. func (m *Manifest) asChanges() []*pb.ManifestChange {
  84. changes := make([]*pb.ManifestChange, 0, len(m.Tables))
  85. for id, tm := range m.Tables {
  86. changes = append(changes, newCreateChange(id, int(tm.Level), tm.KeyID, tm.Compression))
  87. }
  88. return changes
  89. }
  90. func (m *Manifest) clone(opt Options) Manifest {
  91. changeSet := pb.ManifestChangeSet{Changes: m.asChanges()}
  92. ret := createManifest()
  93. y.Check(applyChangeSet(&ret, &changeSet, opt))
  94. return ret
  95. }
  96. // openOrCreateManifestFile opens a Badger manifest file if it exists, or creates one if
  97. // doesn’t exists.
  98. func openOrCreateManifestFile(opt Options) (
  99. ret *manifestFile, result Manifest, err error) {
  100. if opt.InMemory {
  101. return &manifestFile{inMemory: true}, Manifest{}, nil
  102. }
  103. return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, opt.ExternalMagicVersion,
  104. manifestDeletionsRewriteThreshold, opt)
  105. }
  106. func helpOpenOrCreateManifestFile(dir string, readOnly bool, extMagic uint16,
  107. deletionsThreshold int, opt Options) (*manifestFile, Manifest, error) {
  108. path := filepath.Join(dir, ManifestFilename)
  109. var flags y.Flags
  110. if readOnly {
  111. flags |= y.ReadOnly
  112. }
  113. fp, err := y.OpenExistingFile(path, flags) // We explicitly sync in addChanges, outside the lock.
  114. if err != nil {
  115. if !os.IsNotExist(err) {
  116. return nil, Manifest{}, err
  117. }
  118. if readOnly {
  119. return nil, Manifest{}, fmt.Errorf("no manifest found, required for read-only db")
  120. }
  121. m := createManifest()
  122. fp, netCreations, err := helpRewrite(dir, &m, extMagic)
  123. if err != nil {
  124. return nil, Manifest{}, err
  125. }
  126. y.AssertTrue(netCreations == 0)
  127. mf := &manifestFile{
  128. fp: fp,
  129. directory: dir,
  130. externalMagic: extMagic,
  131. manifest: m.clone(opt),
  132. deletionsRewriteThreshold: deletionsThreshold,
  133. }
  134. return mf, m, nil
  135. }
  136. manifest, truncOffset, err := ReplayManifestFile(fp, extMagic, opt)
  137. if err != nil {
  138. _ = fp.Close()
  139. return nil, Manifest{}, err
  140. }
  141. if !readOnly {
  142. // Truncate file so we don't have a half-written entry at the end.
  143. if err := fp.Truncate(truncOffset); err != nil {
  144. _ = fp.Close()
  145. return nil, Manifest{}, err
  146. }
  147. }
  148. if _, err = fp.Seek(0, io.SeekEnd); err != nil {
  149. _ = fp.Close()
  150. return nil, Manifest{}, err
  151. }
  152. mf := &manifestFile{
  153. fp: fp,
  154. directory: dir,
  155. externalMagic: extMagic,
  156. manifest: manifest.clone(opt),
  157. deletionsRewriteThreshold: deletionsThreshold,
  158. }
  159. return mf, manifest, nil
  160. }
  161. func (mf *manifestFile) close() error {
  162. if mf.inMemory {
  163. return nil
  164. }
  165. return mf.fp.Close()
  166. }
  167. // addChanges writes a batch of changes, atomically, to the file. By "atomically" that means when
  168. // we replay the MANIFEST file, we'll either replay all the changes or none of them. (The truth of
  169. // this depends on the filesystem -- some might append garbage data if a system crash happens at
  170. // the wrong time.)
  171. func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange, opt Options) error {
  172. if mf.inMemory {
  173. return nil
  174. }
  175. changes := pb.ManifestChangeSet{Changes: changesParam}
  176. buf, err := proto.Marshal(&changes)
  177. if err != nil {
  178. return err
  179. }
  180. // Maybe we could use O_APPEND instead (on certain file systems)
  181. mf.appendLock.Lock()
  182. defer mf.appendLock.Unlock()
  183. if err := applyChangeSet(&mf.manifest, &changes, opt); err != nil {
  184. return err
  185. }
  186. // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care
  187. if mf.manifest.Deletions > mf.deletionsRewriteThreshold &&
  188. mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) {
  189. if err := mf.rewrite(); err != nil {
  190. return err
  191. }
  192. } else {
  193. var lenCrcBuf [8]byte
  194. binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(buf)))
  195. binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(buf, y.CastagnoliCrcTable))
  196. buf = append(lenCrcBuf[:], buf...)
  197. if _, err := mf.fp.Write(buf); err != nil {
  198. return err
  199. }
  200. }
  201. return syncFunc(mf.fp)
  202. }
  203. // this function is saved here to allow injection of fake filesystem latency at test time.
  204. var syncFunc = func(f *os.File) error { return f.Sync() }
  205. // Has to be 4 bytes. The value can never change, ever, anyway.
  206. var magicText = [4]byte{'B', 'd', 'g', 'r'}
  207. // The magic version number. It is allocated 2 bytes, so it's value must be <= math.MaxUint16
  208. const badgerMagicVersion = 8
  209. func helpRewrite(dir string, m *Manifest, extMagic uint16) (*os.File, int, error) {
  210. rewritePath := filepath.Join(dir, manifestRewriteFilename)
  211. // We explicitly sync.
  212. fp, err := y.OpenTruncFile(rewritePath, false)
  213. if err != nil {
  214. return nil, 0, err
  215. }
  216. // magic bytes are structured as
  217. // +---------------------+-------------------------+-----------------------+
  218. // | magicText (4 bytes) | externalMagic (2 bytes) | badgerMagic (2 bytes) |
  219. // +---------------------+-------------------------+-----------------------+
  220. y.AssertTrue(badgerMagicVersion <= math.MaxUint16)
  221. buf := make([]byte, 8)
  222. copy(buf[0:4], magicText[:])
  223. binary.BigEndian.PutUint16(buf[4:6], extMagic)
  224. binary.BigEndian.PutUint16(buf[6:8], badgerMagicVersion)
  225. netCreations := len(m.Tables)
  226. changes := m.asChanges()
  227. set := pb.ManifestChangeSet{Changes: changes}
  228. changeBuf, err := proto.Marshal(&set)
  229. if err != nil {
  230. fp.Close()
  231. return nil, 0, err
  232. }
  233. var lenCrcBuf [8]byte
  234. binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(changeBuf)))
  235. binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(changeBuf, y.CastagnoliCrcTable))
  236. buf = append(buf, lenCrcBuf[:]...)
  237. buf = append(buf, changeBuf...)
  238. if _, err := fp.Write(buf); err != nil {
  239. fp.Close()
  240. return nil, 0, err
  241. }
  242. if err := fp.Sync(); err != nil {
  243. fp.Close()
  244. return nil, 0, err
  245. }
  246. // In Windows the files should be closed before doing a Rename.
  247. if err = fp.Close(); err != nil {
  248. return nil, 0, err
  249. }
  250. manifestPath := filepath.Join(dir, ManifestFilename)
  251. if err := os.Rename(rewritePath, manifestPath); err != nil {
  252. return nil, 0, err
  253. }
  254. fp, err = y.OpenExistingFile(manifestPath, 0)
  255. if err != nil {
  256. return nil, 0, err
  257. }
  258. if _, err := fp.Seek(0, io.SeekEnd); err != nil {
  259. fp.Close()
  260. return nil, 0, err
  261. }
  262. if err := syncDir(dir); err != nil {
  263. fp.Close()
  264. return nil, 0, err
  265. }
  266. return fp, netCreations, nil
  267. }
  268. // Must be called while appendLock is held.
  269. func (mf *manifestFile) rewrite() error {
  270. // In Windows the files should be closed before doing a Rename.
  271. if err := mf.fp.Close(); err != nil {
  272. return err
  273. }
  274. fp, netCreations, err := helpRewrite(mf.directory, &mf.manifest, mf.externalMagic)
  275. if err != nil {
  276. return err
  277. }
  278. mf.fp = fp
  279. mf.manifest.Creations = netCreations
  280. mf.manifest.Deletions = 0
  281. return nil
  282. }
  283. type countingReader struct {
  284. wrapped *bufio.Reader
  285. count int64
  286. }
  287. func (r *countingReader) Read(p []byte) (n int, err error) {
  288. n, err = r.wrapped.Read(p)
  289. r.count += int64(n)
  290. return
  291. }
  292. func (r *countingReader) ReadByte() (b byte, err error) {
  293. b, err = r.wrapped.ReadByte()
  294. if err == nil {
  295. r.count++
  296. }
  297. return
  298. }
  299. var (
  300. errBadMagic = errors.New("manifest has bad magic")
  301. errBadChecksum = errors.New("manifest has checksum mismatch")
  302. )
  303. // ReplayManifestFile reads the manifest file and constructs two manifest objects. (We need one
  304. // immutable copy and one mutable copy of the manifest. Easiest way is to construct two of them.)
  305. // Also, returns the last offset after a completely read manifest entry -- the file must be
  306. // truncated at that point before further appends are made (if there is a partial entry after
  307. // that). In normal conditions, truncOffset is the file size.
  308. func ReplayManifestFile(fp *os.File, extMagic uint16, opt Options) (Manifest, int64, error) {
  309. r := countingReader{wrapped: bufio.NewReader(fp)}
  310. var magicBuf [8]byte
  311. if _, err := io.ReadFull(&r, magicBuf[:]); err != nil {
  312. return Manifest{}, 0, errBadMagic
  313. }
  314. if !bytes.Equal(magicBuf[0:4], magicText[:]) {
  315. return Manifest{}, 0, errBadMagic
  316. }
  317. extVersion := y.BytesToU16(magicBuf[4:6])
  318. version := y.BytesToU16(magicBuf[6:8])
  319. if version != badgerMagicVersion {
  320. return Manifest{}, 0,
  321. //nolint:lll
  322. fmt.Errorf("manifest has unsupported version: %d (we support %d).\n"+
  323. "Please see https://docs.hypermode.com/badger/troubleshooting#i-see-manifest-has-unsupported-version-x-we-support-y-error"+
  324. " on how to fix this.",
  325. version, badgerMagicVersion)
  326. }
  327. if extVersion != extMagic {
  328. return Manifest{}, 0,
  329. fmt.Errorf("Cannot open DB because the external magic number doesn't match. "+
  330. "Expected: %d, version present in manifest: %d\n", extMagic, extVersion)
  331. }
  332. stat, err := fp.Stat()
  333. if err != nil {
  334. return Manifest{}, 0, err
  335. }
  336. build := createManifest()
  337. var offset int64
  338. for {
  339. offset = r.count
  340. var lenCrcBuf [8]byte
  341. _, err := io.ReadFull(&r, lenCrcBuf[:])
  342. if err != nil {
  343. if err == io.EOF || err == io.ErrUnexpectedEOF {
  344. break
  345. }
  346. return Manifest{}, 0, err
  347. }
  348. length := y.BytesToU32(lenCrcBuf[0:4])
  349. // Sanity check to ensure we don't over-allocate memory.
  350. if length > uint32(stat.Size()) {
  351. return Manifest{}, 0, fmt.Errorf(
  352. "Buffer length: %d greater than file size: %d. Manifest file might be corrupted",
  353. length, stat.Size())
  354. }
  355. var buf = make([]byte, length)
  356. if _, err := io.ReadFull(&r, buf); err != nil {
  357. if err == io.EOF || err == io.ErrUnexpectedEOF {
  358. break
  359. }
  360. return Manifest{}, 0, err
  361. }
  362. if crc32.Checksum(buf, y.CastagnoliCrcTable) != y.BytesToU32(lenCrcBuf[4:8]) {
  363. return Manifest{}, 0, errBadChecksum
  364. }
  365. var changeSet pb.ManifestChangeSet
  366. if err := proto.Unmarshal(buf, &changeSet); err != nil {
  367. return Manifest{}, 0, err
  368. }
  369. if err := applyChangeSet(&build, &changeSet, opt); err != nil {
  370. return Manifest{}, 0, err
  371. }
  372. }
  373. return build, offset, nil
  374. }
  375. func applyManifestChange(build *Manifest, tc *pb.ManifestChange, opt Options) error {
  376. switch tc.Op {
  377. case pb.ManifestChange_CREATE:
  378. if _, ok := build.Tables[tc.Id]; ok {
  379. return fmt.Errorf("MANIFEST invalid, table %d exists", tc.Id)
  380. }
  381. build.Tables[tc.Id] = TableManifest{
  382. Level: uint8(tc.Level),
  383. KeyID: tc.KeyId,
  384. Compression: options.CompressionType(tc.Compression),
  385. }
  386. for len(build.Levels) <= int(tc.Level) {
  387. build.Levels = append(build.Levels, levelManifest{make(map[uint64]struct{})})
  388. }
  389. build.Levels[tc.Level].Tables[tc.Id] = struct{}{}
  390. build.Creations++
  391. case pb.ManifestChange_DELETE:
  392. tm, ok := build.Tables[tc.Id]
  393. if !ok {
  394. opt.Warningf("MANIFEST delete: table %d has already been removed", tc.Id)
  395. for _, level := range build.Levels {
  396. delete(level.Tables, tc.Id)
  397. }
  398. } else {
  399. delete(build.Levels[tm.Level].Tables, tc.Id)
  400. delete(build.Tables, tc.Id)
  401. }
  402. build.Deletions++
  403. default:
  404. return fmt.Errorf("MANIFEST file has invalid manifestChange op")
  405. }
  406. return nil
  407. }
  408. // This is not a "recoverable" error -- opening the KV store fails because the MANIFEST file is
  409. // just plain broken.
  410. func applyChangeSet(build *Manifest, changeSet *pb.ManifestChangeSet, opt Options) error {
  411. for _, change := range changeSet.Changes {
  412. if err := applyManifestChange(build, change, opt); err != nil {
  413. return err
  414. }
  415. }
  416. return nil
  417. }
  418. func newCreateChange(
  419. id uint64, level int, keyID uint64, c options.CompressionType) *pb.ManifestChange {
  420. return &pb.ManifestChange{
  421. Id: id,
  422. Op: pb.ManifestChange_CREATE,
  423. Level: uint32(level),
  424. KeyId: keyID,
  425. // Hard coding it, since we're supporting only AES for now.
  426. EncryptionAlgo: pb.EncryptionAlgo_aes,
  427. Compression: uint32(c),
  428. }
  429. }
  430. func newDeleteChange(id uint64) *pb.ManifestChange {
  431. return &pb.ManifestChange{
  432. Id: id,
  433. Op: pb.ManifestChange_DELETE,
  434. }
  435. }