table.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package table
  6. import (
  7. "bytes"
  8. "crypto/aes"
  9. "encoding/binary"
  10. "errors"
  11. "fmt"
  12. "math"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "sync/atomic"
  19. "time"
  20. "unsafe"
  21. "github.com/klauspost/compress/snappy"
  22. "github.com/klauspost/compress/zstd"
  23. "google.golang.org/protobuf/proto"
  24. "github.com/dgraph-io/badger/v4/fb"
  25. "github.com/dgraph-io/badger/v4/options"
  26. "github.com/dgraph-io/badger/v4/pb"
  27. "github.com/dgraph-io/badger/v4/y"
  28. "github.com/dgraph-io/ristretto/v2"
  29. "github.com/dgraph-io/ristretto/v2/z"
  30. )
  31. const fileSuffix = ".sst"
  32. const intSize = int(unsafe.Sizeof(int(0)))
  33. // Options contains configurable options for Table/Builder.
  34. type Options struct {
  35. // Options for Opening/Building Table.
  36. // Open tables in read only mode.
  37. ReadOnly bool
  38. MetricsEnabled bool
  39. // Maximum size of the table.
  40. TableSize uint64
  41. tableCapacity uint64 // 0.9x TableSize.
  42. // ChkMode is the checksum verification mode for Table.
  43. ChkMode options.ChecksumVerificationMode
  44. // Options for Table builder.
  45. // BloomFalsePositive is the false positive probabiltiy of bloom filter.
  46. BloomFalsePositive float64
  47. // BlockSize is the size of each block inside SSTable in bytes.
  48. BlockSize int
  49. // DataKey is the key used to decrypt the encrypted text.
  50. DataKey *pb.DataKey
  51. // Compression indicates the compression algorithm used for block compression.
  52. Compression options.CompressionType
  53. // Block cache is used to cache decompressed and decrypted blocks.
  54. BlockCache *ristretto.Cache[[]byte, *Block]
  55. IndexCache *ristretto.Cache[uint64, *fb.TableIndex]
  56. AllocPool *z.AllocatorPool
  57. // ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks.
  58. ZSTDCompressionLevel int
  59. }
  60. // TableInterface is useful for testing.
  61. type TableInterface interface {
  62. Smallest() []byte
  63. Biggest() []byte
  64. DoesNotHave(hash uint32) bool
  65. MaxVersion() uint64
  66. }
  67. // Table represents a loaded table file with the info we have about it.
  68. type Table struct {
  69. sync.Mutex
  70. *z.MmapFile
  71. tableSize int // Initialized in OpenTable, using fd.Stat().
  72. _index *fb.TableIndex // Nil if encryption is enabled. Use fetchIndex to access.
  73. _cheap *cheapIndex
  74. ref atomic.Int32 // For file garbage collection
  75. // The following are initialized once and const.
  76. smallest, biggest []byte // Smallest and largest keys (with timestamps).
  77. id uint64 // file id, part of filename
  78. Checksum []byte
  79. CreatedAt time.Time
  80. indexStart int
  81. indexLen int
  82. hasBloomFilter bool
  83. IsInmemory bool // Set to true if the table is on level 0 and opened in memory.
  84. opt *Options
  85. }
  86. type cheapIndex struct {
  87. MaxVersion uint64
  88. KeyCount uint32
  89. UncompressedSize uint32
  90. OnDiskSize uint32
  91. BloomFilterLength int
  92. OffsetsLength int
  93. }
  94. func (t *Table) cheapIndex() *cheapIndex {
  95. return t._cheap
  96. }
  97. func (t *Table) offsetsLength() int { return t.cheapIndex().OffsetsLength }
  98. // MaxVersion returns the maximum version across all keys stored in this table.
  99. func (t *Table) MaxVersion() uint64 { return t.cheapIndex().MaxVersion }
  100. // BloomFilterSize returns the size of the bloom filter in bytes stored in memory.
  101. func (t *Table) BloomFilterSize() int { return t.cheapIndex().BloomFilterLength }
  102. // UncompressedSize is the size uncompressed data stored in this file.
  103. func (t *Table) UncompressedSize() uint32 { return t.cheapIndex().UncompressedSize }
  104. // KeyCount is the total number of keys in this table.
  105. func (t *Table) KeyCount() uint32 { return t.cheapIndex().KeyCount }
  106. // OnDiskSize returns the total size of key-values stored in this table (including the
  107. // disk space occupied on the value log).
  108. func (t *Table) OnDiskSize() uint32 { return t.cheapIndex().OnDiskSize }
  109. // CompressionType returns the compression algorithm used for block compression.
  110. func (t *Table) CompressionType() options.CompressionType {
  111. return t.opt.Compression
  112. }
  113. // IncrRef increments the refcount (having to do with whether the file should be deleted)
  114. func (t *Table) IncrRef() {
  115. t.ref.Add(1)
  116. }
  117. // DecrRef decrements the refcount and possibly deletes the table
  118. func (t *Table) DecrRef() error {
  119. newRef := t.ref.Add(-1)
  120. if newRef == 0 {
  121. // We can safely delete this file, because for all the current files, we always have
  122. // at least one reference pointing to them.
  123. // Delete all blocks from the cache.
  124. for i := 0; i < t.offsetsLength(); i++ {
  125. t.opt.BlockCache.Del(t.blockCacheKey(i))
  126. }
  127. if err := t.Delete(); err != nil {
  128. return err
  129. }
  130. }
  131. return nil
  132. }
  133. // BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
  134. func BlockEvictHandler(b *Block) {
  135. b.decrRef()
  136. }
  137. type Block struct {
  138. offset int
  139. data []byte
  140. checksum []byte
  141. entriesIndexStart int // start index of entryOffsets list
  142. entryOffsets []uint32 // used to binary search an entry in the block.
  143. chkLen int // checksum length.
  144. freeMe bool // used to determine if the blocked should be reused.
  145. ref atomic.Int32
  146. }
  147. var NumBlocks atomic.Int32
  148. // incrRef increments the ref of a block and return a bool indicating if the
  149. // increment was successful. A true value indicates that the block can be used.
  150. func (b *Block) incrRef() bool {
  151. for {
  152. // We can't blindly add 1 to ref. We need to check whether it has
  153. // reached zero first, because if it did, then we should absolutely not
  154. // use this block.
  155. ref := b.ref.Load()
  156. // The ref would not be equal to 0 unless the existing
  157. // block get evicted before this line. If the ref is zero, it means that
  158. // the block is already added the the blockPool and cannot be used
  159. // anymore. The ref of a new block is 1 so the following condition will
  160. // be true only if the block got reused before we could increment its
  161. // ref.
  162. if ref == 0 {
  163. return false
  164. }
  165. // Increment the ref only if it is not zero and has not changed between
  166. // the time we read it and we're updating it.
  167. //
  168. if b.ref.CompareAndSwap(ref, ref+1) {
  169. return true
  170. }
  171. }
  172. }
  173. func (b *Block) decrRef() {
  174. if b == nil {
  175. return
  176. }
  177. // Insert the []byte into pool only if the block is resuable. When a block
  178. // is reusable a new []byte is used for decompression and this []byte can
  179. // be reused.
  180. // In case of an uncompressed block, the []byte is a reference to the
  181. // table.mmap []byte slice. Any attempt to write data to the mmap []byte
  182. // will lead to SEGFAULT.
  183. if b.ref.Add(-1) == 0 {
  184. if b.freeMe {
  185. z.Free(b.data)
  186. }
  187. NumBlocks.Add(-1)
  188. // blockPool.Put(&b.data)
  189. }
  190. y.AssertTrue(b.ref.Load() >= 0)
  191. }
  192. func (b *Block) size() int64 {
  193. return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
  194. cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
  195. }
  196. func (b *Block) verifyCheckSum() error {
  197. cs := &pb.Checksum{}
  198. if err := proto.Unmarshal(b.checksum, cs); err != nil {
  199. return y.Wrapf(err, "unable to unmarshal checksum for block")
  200. }
  201. return y.VerifyChecksum(b.data, cs)
  202. }
  203. func CreateTable(fname string, builder *Builder) (*Table, error) {
  204. bd := builder.Done()
  205. mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size)
  206. if err == z.NewFile {
  207. // Expected.
  208. } else if err != nil {
  209. return nil, y.Wrapf(err, "while creating table: %s", fname)
  210. } else {
  211. return nil, fmt.Errorf("file already exists: %s", fname)
  212. }
  213. written := bd.Copy(mf.Data)
  214. y.AssertTrue(written == len(mf.Data))
  215. if err := z.Msync(mf.Data); err != nil {
  216. return nil, y.Wrapf(err, "while calling msync on %s", fname)
  217. }
  218. return OpenTable(mf, *builder.opts)
  219. }
  220. // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function
  221. // entry. Returns a table with one reference count on it (decrementing which may delete the file!
  222. // -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before
  223. // deleting. Checksum for all blocks of table is verified based on value of chkMode.
  224. func OpenTable(mf *z.MmapFile, opts Options) (*Table, error) {
  225. // BlockSize is used to compute the approximate size of the decompressed
  226. // block. It should not be zero if the table is compressed.
  227. if opts.BlockSize == 0 && opts.Compression != options.None {
  228. return nil, errors.New("Block size cannot be zero")
  229. }
  230. fileInfo, err := mf.Fd.Stat()
  231. if err != nil {
  232. mf.Close(-1)
  233. return nil, y.Wrap(err, "")
  234. }
  235. filename := fileInfo.Name()
  236. id, ok := ParseFileID(filename)
  237. if !ok {
  238. mf.Close(-1)
  239. return nil, fmt.Errorf("Invalid filename: %s", filename)
  240. }
  241. t := &Table{
  242. MmapFile: mf,
  243. id: id,
  244. opt: &opts,
  245. IsInmemory: false,
  246. tableSize: int(fileInfo.Size()),
  247. CreatedAt: fileInfo.ModTime(),
  248. }
  249. // Caller is given one reference.
  250. t.ref.Store(1)
  251. if err := t.initBiggestAndSmallest(); err != nil {
  252. return nil, y.Wrapf(err, "failed to initialize table")
  253. }
  254. if opts.ChkMode == options.OnTableRead || opts.ChkMode == options.OnTableAndBlockRead {
  255. if err := t.VerifyChecksum(); err != nil {
  256. mf.Close(-1)
  257. return nil, y.Wrapf(err, "failed to verify checksum")
  258. }
  259. }
  260. return t, nil
  261. }
  262. // OpenInMemoryTable is similar to OpenTable but it opens a new table from the provided data.
  263. // OpenInMemoryTable is used for L0 tables.
  264. func OpenInMemoryTable(data []byte, id uint64, opt *Options) (*Table, error) {
  265. mf := &z.MmapFile{
  266. Data: data,
  267. Fd: nil,
  268. }
  269. t := &Table{
  270. MmapFile: mf,
  271. opt: opt,
  272. tableSize: len(data),
  273. IsInmemory: true,
  274. id: id, // It is important that each table gets a unique ID.
  275. }
  276. // Caller is given one reference.
  277. t.ref.Store(1)
  278. if err := t.initBiggestAndSmallest(); err != nil {
  279. return nil, err
  280. }
  281. return t, nil
  282. }
  283. func (t *Table) initBiggestAndSmallest() error {
  284. // This defer will help gathering debugging info incase initIndex crashes.
  285. defer func() {
  286. if r := recover(); r != nil {
  287. // Use defer for printing info because there may be an intermediate panic.
  288. var debugBuf bytes.Buffer
  289. defer func() {
  290. panic(fmt.Sprintf("%s\n== Recovered ==\n", debugBuf.String()))
  291. }()
  292. // Get the count of null bytes at the end of file. This is to make sure if there was an
  293. // issue with mmap sync or file copy.
  294. count := 0
  295. for i := len(t.Data) - 1; i >= 0; i-- {
  296. if t.Data[i] != 0 {
  297. break
  298. }
  299. count++
  300. }
  301. fmt.Fprintf(&debugBuf, "\n== Recovering from initIndex crash ==\n")
  302. fmt.Fprintf(&debugBuf, "File Info: [ID: %d, Size: %d, Zeros: %d]\n",
  303. t.id, t.tableSize, count)
  304. fmt.Fprintf(&debugBuf, "isEnrypted: %v ", t.shouldDecrypt())
  305. readPos := t.tableSize
  306. // Read checksum size.
  307. readPos -= 4
  308. buf := t.readNoFail(readPos, 4)
  309. checksumLen := int(y.BytesToU32(buf))
  310. fmt.Fprintf(&debugBuf, "checksumLen: %d ", checksumLen)
  311. // Read checksum.
  312. checksum := &pb.Checksum{}
  313. readPos -= checksumLen
  314. buf = t.readNoFail(readPos, checksumLen)
  315. _ = proto.Unmarshal(buf, checksum)
  316. fmt.Fprintf(&debugBuf, "checksum: %+v ", checksum)
  317. // Read index size from the footer.
  318. readPos -= 4
  319. buf = t.readNoFail(readPos, 4)
  320. indexLen := int(y.BytesToU32(buf))
  321. fmt.Fprintf(&debugBuf, "indexLen: %d ", indexLen)
  322. // Read index.
  323. readPos -= t.indexLen
  324. t.indexStart = readPos
  325. indexData := t.readNoFail(readPos, t.indexLen)
  326. fmt.Fprintf(&debugBuf, "index: %v ", indexData)
  327. }
  328. }()
  329. var err error
  330. var ko *fb.BlockOffset
  331. if ko, err = t.initIndex(); err != nil {
  332. return y.Wrapf(err, "failed to read index.")
  333. }
  334. t.smallest = y.Copy(ko.KeyBytes())
  335. it2 := t.NewIterator(REVERSED | NOCACHE)
  336. defer it2.Close()
  337. it2.Rewind()
  338. if !it2.Valid() {
  339. return y.Wrapf(it2.err, "failed to initialize biggest for table %s", t.Filename())
  340. }
  341. t.biggest = y.Copy(it2.Key())
  342. return nil
  343. }
  344. func (t *Table) read(off, sz int) ([]byte, error) {
  345. return t.Bytes(off, sz)
  346. }
  347. func (t *Table) readNoFail(off, sz int) []byte {
  348. res, err := t.read(off, sz)
  349. y.Check(err)
  350. return res
  351. }
  352. // initIndex reads the index and populate the necessary table fields and returns
  353. // first block offset
  354. func (t *Table) initIndex() (*fb.BlockOffset, error) {
  355. readPos := t.tableSize
  356. // Read checksum len from the last 4 bytes.
  357. readPos -= 4
  358. buf := t.readNoFail(readPos, 4)
  359. checksumLen := int(y.BytesToU32(buf))
  360. if checksumLen < 0 {
  361. return nil, errors.New("checksum length less than zero. Data corrupted")
  362. }
  363. // Read checksum.
  364. expectedChk := &pb.Checksum{}
  365. readPos -= checksumLen
  366. buf = t.readNoFail(readPos, checksumLen)
  367. if err := proto.Unmarshal(buf, expectedChk); err != nil {
  368. return nil, err
  369. }
  370. // Read index size from the footer.
  371. readPos -= 4
  372. buf = t.readNoFail(readPos, 4)
  373. t.indexLen = int(y.BytesToU32(buf))
  374. // Read index.
  375. readPos -= t.indexLen
  376. t.indexStart = readPos
  377. data := t.readNoFail(readPos, t.indexLen)
  378. if err := y.VerifyChecksum(data, expectedChk); err != nil {
  379. return nil, y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename())
  380. }
  381. index, err := t.readTableIndex()
  382. if err != nil {
  383. return nil, err
  384. }
  385. if !t.shouldDecrypt() {
  386. // If there's no encryption, this points to the mmap'ed buffer.
  387. t._index = index
  388. }
  389. t._cheap = &cheapIndex{
  390. MaxVersion: index.MaxVersion(),
  391. KeyCount: index.KeyCount(),
  392. UncompressedSize: index.UncompressedSize(),
  393. OnDiskSize: index.OnDiskSize(),
  394. OffsetsLength: index.OffsetsLength(),
  395. BloomFilterLength: index.BloomFilterLength(),
  396. }
  397. t.hasBloomFilter = len(index.BloomFilterBytes()) > 0
  398. var bo fb.BlockOffset
  399. y.AssertTrue(index.Offsets(&bo, 0))
  400. return &bo, nil
  401. }
  402. // KeySplits splits the table into at least n ranges based on the block offsets.
  403. func (t *Table) KeySplits(n int, prefix []byte) []string {
  404. if n == 0 {
  405. return nil
  406. }
  407. oLen := t.offsetsLength()
  408. jump := oLen / n
  409. if jump == 0 {
  410. jump = 1
  411. }
  412. var bo fb.BlockOffset
  413. var res []string
  414. for i := 0; i < oLen; i += jump {
  415. if i >= oLen {
  416. i = oLen - 1
  417. }
  418. y.AssertTrue(t.offsets(&bo, i))
  419. if bytes.HasPrefix(bo.KeyBytes(), prefix) {
  420. res = append(res, string(bo.KeyBytes()))
  421. }
  422. }
  423. return res
  424. }
  425. func (t *Table) fetchIndex() *fb.TableIndex {
  426. if !t.shouldDecrypt() {
  427. return t._index
  428. }
  429. if t.opt.IndexCache == nil {
  430. panic("Index Cache must be set for encrypted workloads")
  431. }
  432. if val, ok := t.opt.IndexCache.Get(t.indexKey()); ok && val != nil {
  433. return val
  434. }
  435. index, err := t.readTableIndex()
  436. y.Check(err)
  437. t.opt.IndexCache.Set(t.indexKey(), index, int64(t.indexLen))
  438. return index
  439. }
  440. func (t *Table) offsets(ko *fb.BlockOffset, i int) bool {
  441. return t.fetchIndex().Offsets(ko, i)
  442. }
  443. // block function return a new block. Each block holds a ref and the byte
  444. // slice stored in the block will be reused when the ref becomes zero. The
  445. // caller should release the block by calling block.decrRef() on it.
  446. func (t *Table) block(idx int, useCache bool) (*Block, error) {
  447. y.AssertTruef(idx >= 0, "idx=%d", idx)
  448. if idx >= t.offsetsLength() {
  449. return nil, errors.New("block out of index")
  450. }
  451. if t.opt.BlockCache != nil {
  452. key := t.blockCacheKey(idx)
  453. blk, ok := t.opt.BlockCache.Get(key)
  454. if ok && blk != nil {
  455. // Use the block only if the increment was successful. The block
  456. // could get evicted from the cache between the Get() call and the
  457. // incrRef() call.
  458. if blk.incrRef() {
  459. return blk, nil
  460. }
  461. }
  462. }
  463. var ko fb.BlockOffset
  464. y.AssertTrue(t.offsets(&ko, idx))
  465. blk := &Block{offset: int(ko.Offset())}
  466. blk.ref.Store(1)
  467. defer blk.decrRef() // Deal with any errors, where blk would not be returned.
  468. NumBlocks.Add(1)
  469. var err error
  470. if blk.data, err = t.read(blk.offset, int(ko.Len())); err != nil {
  471. return nil, y.Wrapf(err,
  472. "failed to read from file: %s at offset: %d, len: %d",
  473. t.Fd.Name(), blk.offset, ko.Len())
  474. }
  475. if t.shouldDecrypt() {
  476. // Decrypt the block if it is encrypted.
  477. if blk.data, err = t.decrypt(blk.data, true); err != nil {
  478. return nil, err
  479. }
  480. // blk.data is allocated via Calloc. So, do free.
  481. blk.freeMe = true
  482. }
  483. if err = t.decompress(blk); err != nil {
  484. return nil, y.Wrapf(err,
  485. "failed to decode compressed data in file: %s at offset: %d, len: %d",
  486. t.Fd.Name(), blk.offset, ko.Len())
  487. }
  488. // Read meta data related to block.
  489. readPos := len(blk.data) - 4 // First read checksum length.
  490. blk.chkLen = int(y.BytesToU32(blk.data[readPos : readPos+4]))
  491. // Checksum length greater than block size could happen if the table was compressed and
  492. // it was opened with an incorrect compression algorithm (or the data was corrupted).
  493. if blk.chkLen > len(blk.data) {
  494. return nil, errors.New("invalid checksum length. Either the data is " +
  495. "corrupted or the table options are incorrectly set")
  496. }
  497. // Read checksum and store it
  498. readPos -= blk.chkLen
  499. blk.checksum = blk.data[readPos : readPos+blk.chkLen]
  500. // Move back and read numEntries in the block.
  501. readPos -= 4
  502. numEntries := int(y.BytesToU32(blk.data[readPos : readPos+4]))
  503. entriesIndexStart := readPos - (numEntries * 4)
  504. entriesIndexEnd := entriesIndexStart + numEntries*4
  505. blk.entryOffsets = y.BytesToU32Slice(blk.data[entriesIndexStart:entriesIndexEnd])
  506. blk.entriesIndexStart = entriesIndexStart
  507. // Drop checksum and checksum length.
  508. // The checksum is calculated for actual data + entry index + index length
  509. blk.data = blk.data[:readPos+4]
  510. // Verify checksum on if checksum verification mode is OnRead on OnStartAndRead.
  511. if t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead {
  512. if err = blk.verifyCheckSum(); err != nil {
  513. return nil, err
  514. }
  515. }
  516. blk.incrRef()
  517. if useCache && t.opt.BlockCache != nil {
  518. key := t.blockCacheKey(idx)
  519. // incrRef should never return false here because we're calling it on a
  520. // new block with ref=1.
  521. y.AssertTrue(blk.incrRef())
  522. // Decrement the block ref if we could not insert it in the cache.
  523. if !t.opt.BlockCache.Set(key, blk, blk.size()) {
  524. blk.decrRef()
  525. }
  526. // We have added an OnReject func in our cache, which gets called in case the block is not
  527. // admitted to the cache. So, every block would be accounted for.
  528. }
  529. return blk, nil
  530. }
  531. // blockCacheKey is used to store blocks in the block cache.
  532. func (t *Table) blockCacheKey(idx int) []byte {
  533. y.AssertTrue(t.id < math.MaxUint32)
  534. y.AssertTrue(uint32(idx) < math.MaxUint32)
  535. buf := make([]byte, 8)
  536. // Assume t.ID does not overflow uint32.
  537. binary.BigEndian.PutUint32(buf[:4], uint32(t.ID()))
  538. binary.BigEndian.PutUint32(buf[4:], uint32(idx))
  539. return buf
  540. }
  541. // indexKey returns the cache key for block offsets. blockOffsets
  542. // are stored in the index cache.
  543. func (t *Table) indexKey() uint64 {
  544. return t.id
  545. }
  546. // IndexSize is the size of table index in bytes.
  547. func (t *Table) IndexSize() int {
  548. return t.indexLen
  549. }
  550. // Size is its file size in bytes
  551. func (t *Table) Size() int64 { return int64(t.tableSize) }
  552. // StaleDataSize is the amount of stale data (that can be dropped by a compaction )in this SST.
  553. func (t *Table) StaleDataSize() uint32 { return t.fetchIndex().StaleDataSize() }
  554. // Smallest is its smallest key, or nil if there are none
  555. func (t *Table) Smallest() []byte { return t.smallest }
  556. // Biggest is its biggest key, or nil if there are none
  557. func (t *Table) Biggest() []byte { return t.biggest }
  558. // Filename is NOT the file name. Just kidding, it is.
  559. func (t *Table) Filename() string { return t.Fd.Name() }
  560. // ID is the table's ID number (used to make the file name).
  561. func (t *Table) ID() uint64 { return t.id }
  562. // DoesNotHave returns true if and only if the table does not have the key hash.
  563. // It does a bloom filter lookup.
  564. func (t *Table) DoesNotHave(hash uint32) bool {
  565. if !t.hasBloomFilter {
  566. return false
  567. }
  568. y.NumLSMBloomHitsAdd(t.opt.MetricsEnabled, "DoesNotHave_ALL", 1)
  569. index := t.fetchIndex()
  570. bf := index.BloomFilterBytes()
  571. mayContain := y.Filter(bf).MayContain(hash)
  572. if !mayContain {
  573. y.NumLSMBloomHitsAdd(t.opt.MetricsEnabled, "DoesNotHave_HIT", 1)
  574. }
  575. return !mayContain
  576. }
  577. // readTableIndex reads table index from the sst and returns its pb format.
  578. func (t *Table) readTableIndex() (*fb.TableIndex, error) {
  579. data := t.readNoFail(t.indexStart, t.indexLen)
  580. var err error
  581. // Decrypt the table index if it is encrypted.
  582. if t.shouldDecrypt() {
  583. if data, err = t.decrypt(data, false); err != nil {
  584. return nil, y.Wrapf(err,
  585. "Error while decrypting table index for the table %d in readTableIndex", t.id)
  586. }
  587. }
  588. return fb.GetRootAsTableIndex(data, 0), nil
  589. }
  590. // VerifyChecksum verifies checksum for all blocks of table. This function is called by
  591. // OpenTable() function. This function is also called inside levelsController.VerifyChecksum().
  592. func (t *Table) VerifyChecksum() error {
  593. ti := t.fetchIndex()
  594. for i := 0; i < ti.OffsetsLength(); i++ {
  595. b, err := t.block(i, true)
  596. if err != nil {
  597. return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
  598. t.Filename(), i, b.offset)
  599. }
  600. // We should not call incrRef here, because the block already has one ref when created.
  601. defer b.decrRef()
  602. // OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
  603. // on block, verification would be done while reading block itself.
  604. if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) {
  605. if err = b.verifyCheckSum(); err != nil {
  606. return y.Wrapf(err,
  607. "checksum validation failed for table: %s, block: %d, offset:%d",
  608. t.Filename(), i, b.offset)
  609. }
  610. }
  611. }
  612. return nil
  613. }
  614. // shouldDecrypt tells whether to decrypt or not. We decrypt only if the datakey exist
  615. // for the table.
  616. func (t *Table) shouldDecrypt() bool {
  617. return t.opt.DataKey != nil
  618. }
  619. // KeyID returns data key id.
  620. func (t *Table) KeyID() uint64 {
  621. if t.opt.DataKey != nil {
  622. return t.opt.DataKey.KeyId
  623. }
  624. // By default it's 0, if it is plain text.
  625. return 0
  626. }
  627. // decrypt decrypts the given data. It should be called only after checking shouldDecrypt.
  628. func (t *Table) decrypt(data []byte, viaCalloc bool) ([]byte, error) {
  629. // Last BlockSize bytes of the data is the IV.
  630. iv := data[len(data)-aes.BlockSize:]
  631. // Rest all bytes are data.
  632. data = data[:len(data)-aes.BlockSize]
  633. var dst []byte
  634. if viaCalloc {
  635. dst = z.Calloc(len(data), "Table.Decrypt")
  636. } else {
  637. dst = make([]byte, len(data))
  638. }
  639. if err := y.XORBlock(dst, data, t.opt.DataKey.Data, iv); err != nil {
  640. return nil, y.Wrapf(err, "while decrypt")
  641. }
  642. return dst, nil
  643. }
  644. // ParseFileID reads the file id out of a filename.
  645. func ParseFileID(name string) (uint64, bool) {
  646. name = filepath.Base(name)
  647. if !strings.HasSuffix(name, fileSuffix) {
  648. return 0, false
  649. }
  650. // suffix := name[len(fileSuffix):]
  651. name = strings.TrimSuffix(name, fileSuffix)
  652. id, err := strconv.Atoi(name)
  653. if err != nil {
  654. return 0, false
  655. }
  656. y.AssertTrue(id >= 0)
  657. return uint64(id), true
  658. }
  659. // IDToFilename does the inverse of ParseFileID
  660. func IDToFilename(id uint64) string {
  661. return fmt.Sprintf("%06d", id) + fileSuffix
  662. }
  663. // NewFilename should be named TableFilepath -- it combines the dir with the ID to make a table
  664. // filepath.
  665. func NewFilename(id uint64, dir string) string {
  666. return filepath.Join(dir, IDToFilename(id))
  667. }
  668. // decompress decompresses the data stored in a block.
  669. func (t *Table) decompress(b *Block) error {
  670. var dst []byte
  671. var err error
  672. // Point to the original b.data
  673. src := b.data
  674. switch t.opt.Compression {
  675. case options.None:
  676. // Nothing to be done here.
  677. return nil
  678. case options.Snappy:
  679. if sz, err := snappy.DecodedLen(b.data); err == nil {
  680. dst = z.Calloc(sz, "Table.Decompress")
  681. } else {
  682. dst = z.Calloc(len(b.data)*4, "Table.Decompress") // Take a guess.
  683. }
  684. b.data, err = snappy.Decode(dst, b.data)
  685. if err != nil {
  686. z.Free(dst)
  687. return y.Wrap(err, "failed to decompress")
  688. }
  689. case options.ZSTD:
  690. sz := int(float64(t.opt.BlockSize) * 1.2)
  691. // Get frame content size from header.
  692. var hdr zstd.Header
  693. if err := hdr.Decode(b.data); err == nil && hdr.HasFCS && hdr.FrameContentSize < uint64(t.opt.BlockSize*2) {
  694. sz = int(hdr.FrameContentSize)
  695. }
  696. dst = z.Calloc(sz, "Table.Decompress")
  697. b.data, err = y.ZSTDDecompress(dst, b.data)
  698. if err != nil {
  699. z.Free(dst)
  700. return y.Wrap(err, "failed to decompress")
  701. }
  702. default:
  703. return errors.New("Unsupported compression type")
  704. }
  705. if b.freeMe {
  706. z.Free(src)
  707. b.freeMe = false
  708. }
  709. if len(b.data) > 0 && len(dst) > 0 && &dst[0] != &b.data[0] {
  710. z.Free(dst)
  711. } else {
  712. b.freeMe = true
  713. }
  714. return nil
  715. }