table.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package table
  17. import (
  18. "bytes"
  19. "crypto/aes"
  20. "encoding/binary"
  21. "fmt"
  22. "math"
  23. "os"
  24. "path/filepath"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "unsafe"
  31. "github.com/klauspost/compress/snappy"
  32. "github.com/klauspost/compress/zstd"
  33. "github.com/pkg/errors"
  34. "google.golang.org/protobuf/proto"
  35. "github.com/dgraph-io/badger/v4/fb"
  36. "github.com/dgraph-io/badger/v4/options"
  37. "github.com/dgraph-io/badger/v4/pb"
  38. "github.com/dgraph-io/badger/v4/y"
  39. "github.com/dgraph-io/ristretto/v2"
  40. "github.com/dgraph-io/ristretto/v2/z"
  41. )
  42. const fileSuffix = ".sst"
  43. const intSize = int(unsafe.Sizeof(int(0)))
  44. // Options contains configurable options for Table/Builder.
  45. type Options struct {
  46. // Options for Opening/Building Table.
  47. // Open tables in read only mode.
  48. ReadOnly bool
  49. MetricsEnabled bool
  50. // Maximum size of the table.
  51. TableSize uint64
  52. tableCapacity uint64 // 0.9x TableSize.
  53. // ChkMode is the checksum verification mode for Table.
  54. ChkMode options.ChecksumVerificationMode
  55. // Options for Table builder.
  56. // BloomFalsePositive is the false positive probabiltiy of bloom filter.
  57. BloomFalsePositive float64
  58. // BlockSize is the size of each block inside SSTable in bytes.
  59. BlockSize int
  60. // DataKey is the key used to decrypt the encrypted text.
  61. DataKey *pb.DataKey
  62. // Compression indicates the compression algorithm used for block compression.
  63. Compression options.CompressionType
  64. // Block cache is used to cache decompressed and decrypted blocks.
  65. BlockCache *ristretto.Cache[[]byte, *Block]
  66. IndexCache *ristretto.Cache[uint64, *fb.TableIndex]
  67. AllocPool *z.AllocatorPool
  68. // ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks.
  69. ZSTDCompressionLevel int
  70. }
  71. // TableInterface is useful for testing.
  72. type TableInterface interface {
  73. Smallest() []byte
  74. Biggest() []byte
  75. DoesNotHave(hash uint32) bool
  76. MaxVersion() uint64
  77. }
  78. // Table represents a loaded table file with the info we have about it.
  79. type Table struct {
  80. sync.Mutex
  81. *z.MmapFile
  82. tableSize int // Initialized in OpenTable, using fd.Stat().
  83. _index *fb.TableIndex // Nil if encryption is enabled. Use fetchIndex to access.
  84. _cheap *cheapIndex
  85. ref atomic.Int32 // For file garbage collection
  86. // The following are initialized once and const.
  87. smallest, biggest []byte // Smallest and largest keys (with timestamps).
  88. id uint64 // file id, part of filename
  89. Checksum []byte
  90. CreatedAt time.Time
  91. indexStart int
  92. indexLen int
  93. hasBloomFilter bool
  94. IsInmemory bool // Set to true if the table is on level 0 and opened in memory.
  95. opt *Options
  96. }
  97. type cheapIndex struct {
  98. MaxVersion uint64
  99. KeyCount uint32
  100. UncompressedSize uint32
  101. OnDiskSize uint32
  102. BloomFilterLength int
  103. OffsetsLength int
  104. }
  105. func (t *Table) cheapIndex() *cheapIndex {
  106. return t._cheap
  107. }
  108. func (t *Table) offsetsLength() int { return t.cheapIndex().OffsetsLength }
  109. // MaxVersion returns the maximum version across all keys stored in this table.
  110. func (t *Table) MaxVersion() uint64 { return t.cheapIndex().MaxVersion }
  111. // BloomFilterSize returns the size of the bloom filter in bytes stored in memory.
  112. func (t *Table) BloomFilterSize() int { return t.cheapIndex().BloomFilterLength }
  113. // UncompressedSize is the size uncompressed data stored in this file.
  114. func (t *Table) UncompressedSize() uint32 { return t.cheapIndex().UncompressedSize }
  115. // KeyCount is the total number of keys in this table.
  116. func (t *Table) KeyCount() uint32 { return t.cheapIndex().KeyCount }
  117. // OnDiskSize returns the total size of key-values stored in this table (including the
  118. // disk space occupied on the value log).
  119. func (t *Table) OnDiskSize() uint32 { return t.cheapIndex().OnDiskSize }
  120. // CompressionType returns the compression algorithm used for block compression.
  121. func (t *Table) CompressionType() options.CompressionType {
  122. return t.opt.Compression
  123. }
  124. // IncrRef increments the refcount (having to do with whether the file should be deleted)
  125. func (t *Table) IncrRef() {
  126. t.ref.Add(1)
  127. }
  128. // DecrRef decrements the refcount and possibly deletes the table
  129. func (t *Table) DecrRef() error {
  130. newRef := t.ref.Add(-1)
  131. if newRef == 0 {
  132. // We can safely delete this file, because for all the current files, we always have
  133. // at least one reference pointing to them.
  134. // Delete all blocks from the cache.
  135. for i := 0; i < t.offsetsLength(); i++ {
  136. t.opt.BlockCache.Del(t.blockCacheKey(i))
  137. }
  138. if err := t.Delete(); err != nil {
  139. return err
  140. }
  141. }
  142. return nil
  143. }
  144. // BlockEvictHandler is used to reuse the byte slice stored in the block on cache eviction.
  145. func BlockEvictHandler(b *Block) {
  146. b.decrRef()
  147. }
  148. type Block struct {
  149. offset int
  150. data []byte
  151. checksum []byte
  152. entriesIndexStart int // start index of entryOffsets list
  153. entryOffsets []uint32 // used to binary search an entry in the block.
  154. chkLen int // checksum length.
  155. freeMe bool // used to determine if the blocked should be reused.
  156. ref atomic.Int32
  157. }
  158. var NumBlocks atomic.Int32
  159. // incrRef increments the ref of a block and return a bool indicating if the
  160. // increment was successful. A true value indicates that the block can be used.
  161. func (b *Block) incrRef() bool {
  162. for {
  163. // We can't blindly add 1 to ref. We need to check whether it has
  164. // reached zero first, because if it did, then we should absolutely not
  165. // use this block.
  166. ref := b.ref.Load()
  167. // The ref would not be equal to 0 unless the existing
  168. // block get evicted before this line. If the ref is zero, it means that
  169. // the block is already added the the blockPool and cannot be used
  170. // anymore. The ref of a new block is 1 so the following condition will
  171. // be true only if the block got reused before we could increment its
  172. // ref.
  173. if ref == 0 {
  174. return false
  175. }
  176. // Increment the ref only if it is not zero and has not changed between
  177. // the time we read it and we're updating it.
  178. //
  179. if b.ref.CompareAndSwap(ref, ref+1) {
  180. return true
  181. }
  182. }
  183. }
  184. func (b *Block) decrRef() {
  185. if b == nil {
  186. return
  187. }
  188. // Insert the []byte into pool only if the block is resuable. When a block
  189. // is reusable a new []byte is used for decompression and this []byte can
  190. // be reused.
  191. // In case of an uncompressed block, the []byte is a reference to the
  192. // table.mmap []byte slice. Any attempt to write data to the mmap []byte
  193. // will lead to SEGFAULT.
  194. if b.ref.Add(-1) == 0 {
  195. if b.freeMe {
  196. z.Free(b.data)
  197. }
  198. NumBlocks.Add(-1)
  199. // blockPool.Put(&b.data)
  200. }
  201. y.AssertTrue(b.ref.Load() >= 0)
  202. }
  203. func (b *Block) size() int64 {
  204. return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
  205. cap(b.data) + cap(b.checksum) + cap(b.entryOffsets)*4)
  206. }
  207. func (b *Block) verifyCheckSum() error {
  208. cs := &pb.Checksum{}
  209. if err := proto.Unmarshal(b.checksum, cs); err != nil {
  210. return y.Wrapf(err, "unable to unmarshal checksum for block")
  211. }
  212. return y.VerifyChecksum(b.data, cs)
  213. }
  214. func CreateTable(fname string, builder *Builder) (*Table, error) {
  215. bd := builder.Done()
  216. mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size)
  217. if err == z.NewFile {
  218. // Expected.
  219. } else if err != nil {
  220. return nil, y.Wrapf(err, "while creating table: %s", fname)
  221. } else {
  222. return nil, errors.Errorf("file already exists: %s", fname)
  223. }
  224. written := bd.Copy(mf.Data)
  225. y.AssertTrue(written == len(mf.Data))
  226. if err := z.Msync(mf.Data); err != nil {
  227. return nil, y.Wrapf(err, "while calling msync on %s", fname)
  228. }
  229. return OpenTable(mf, *builder.opts)
  230. }
  231. // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function
  232. // entry. Returns a table with one reference count on it (decrementing which may delete the file!
  233. // -- consider t.Close() instead). The fd has to writeable because we call Truncate on it before
  234. // deleting. Checksum for all blocks of table is verified based on value of chkMode.
  235. func OpenTable(mf *z.MmapFile, opts Options) (*Table, error) {
  236. // BlockSize is used to compute the approximate size of the decompressed
  237. // block. It should not be zero if the table is compressed.
  238. if opts.BlockSize == 0 && opts.Compression != options.None {
  239. return nil, errors.New("Block size cannot be zero")
  240. }
  241. fileInfo, err := mf.Fd.Stat()
  242. if err != nil {
  243. mf.Close(-1)
  244. return nil, y.Wrap(err, "")
  245. }
  246. filename := fileInfo.Name()
  247. id, ok := ParseFileID(filename)
  248. if !ok {
  249. mf.Close(-1)
  250. return nil, errors.Errorf("Invalid filename: %s", filename)
  251. }
  252. t := &Table{
  253. MmapFile: mf,
  254. id: id,
  255. opt: &opts,
  256. IsInmemory: false,
  257. tableSize: int(fileInfo.Size()),
  258. CreatedAt: fileInfo.ModTime(),
  259. }
  260. // Caller is given one reference.
  261. t.ref.Store(1)
  262. if err := t.initBiggestAndSmallest(); err != nil {
  263. return nil, y.Wrapf(err, "failed to initialize table")
  264. }
  265. if opts.ChkMode == options.OnTableRead || opts.ChkMode == options.OnTableAndBlockRead {
  266. if err := t.VerifyChecksum(); err != nil {
  267. mf.Close(-1)
  268. return nil, y.Wrapf(err, "failed to verify checksum")
  269. }
  270. }
  271. return t, nil
  272. }
  273. // OpenInMemoryTable is similar to OpenTable but it opens a new table from the provided data.
  274. // OpenInMemoryTable is used for L0 tables.
  275. func OpenInMemoryTable(data []byte, id uint64, opt *Options) (*Table, error) {
  276. mf := &z.MmapFile{
  277. Data: data,
  278. Fd: nil,
  279. }
  280. t := &Table{
  281. MmapFile: mf,
  282. opt: opt,
  283. tableSize: len(data),
  284. IsInmemory: true,
  285. id: id, // It is important that each table gets a unique ID.
  286. }
  287. // Caller is given one reference.
  288. t.ref.Store(1)
  289. if err := t.initBiggestAndSmallest(); err != nil {
  290. return nil, err
  291. }
  292. return t, nil
  293. }
  294. func (t *Table) initBiggestAndSmallest() error {
  295. // This defer will help gathering debugging info incase initIndex crashes.
  296. defer func() {
  297. if r := recover(); r != nil {
  298. // Use defer for printing info because there may be an intermediate panic.
  299. var debugBuf bytes.Buffer
  300. defer func() {
  301. panic(fmt.Sprintf("%s\n== Recovered ==\n", debugBuf.String()))
  302. }()
  303. // Get the count of null bytes at the end of file. This is to make sure if there was an
  304. // issue with mmap sync or file copy.
  305. count := 0
  306. for i := len(t.Data) - 1; i >= 0; i-- {
  307. if t.Data[i] != 0 {
  308. break
  309. }
  310. count++
  311. }
  312. fmt.Fprintf(&debugBuf, "\n== Recovering from initIndex crash ==\n")
  313. fmt.Fprintf(&debugBuf, "File Info: [ID: %d, Size: %d, Zeros: %d]\n",
  314. t.id, t.tableSize, count)
  315. fmt.Fprintf(&debugBuf, "isEnrypted: %v ", t.shouldDecrypt())
  316. readPos := t.tableSize
  317. // Read checksum size.
  318. readPos -= 4
  319. buf := t.readNoFail(readPos, 4)
  320. checksumLen := int(y.BytesToU32(buf))
  321. fmt.Fprintf(&debugBuf, "checksumLen: %d ", checksumLen)
  322. // Read checksum.
  323. checksum := &pb.Checksum{}
  324. readPos -= checksumLen
  325. buf = t.readNoFail(readPos, checksumLen)
  326. _ = proto.Unmarshal(buf, checksum)
  327. fmt.Fprintf(&debugBuf, "checksum: %+v ", checksum)
  328. // Read index size from the footer.
  329. readPos -= 4
  330. buf = t.readNoFail(readPos, 4)
  331. indexLen := int(y.BytesToU32(buf))
  332. fmt.Fprintf(&debugBuf, "indexLen: %d ", indexLen)
  333. // Read index.
  334. readPos -= t.indexLen
  335. t.indexStart = readPos
  336. indexData := t.readNoFail(readPos, t.indexLen)
  337. fmt.Fprintf(&debugBuf, "index: %v ", indexData)
  338. }
  339. }()
  340. var err error
  341. var ko *fb.BlockOffset
  342. if ko, err = t.initIndex(); err != nil {
  343. return y.Wrapf(err, "failed to read index.")
  344. }
  345. t.smallest = y.Copy(ko.KeyBytes())
  346. it2 := t.NewIterator(REVERSED | NOCACHE)
  347. defer it2.Close()
  348. it2.Rewind()
  349. if !it2.Valid() {
  350. return y.Wrapf(it2.err, "failed to initialize biggest for table %s", t.Filename())
  351. }
  352. t.biggest = y.Copy(it2.Key())
  353. return nil
  354. }
  355. func (t *Table) read(off, sz int) ([]byte, error) {
  356. return t.Bytes(off, sz)
  357. }
  358. func (t *Table) readNoFail(off, sz int) []byte {
  359. res, err := t.read(off, sz)
  360. y.Check(err)
  361. return res
  362. }
  363. // initIndex reads the index and populate the necessary table fields and returns
  364. // first block offset
  365. func (t *Table) initIndex() (*fb.BlockOffset, error) {
  366. readPos := t.tableSize
  367. // Read checksum len from the last 4 bytes.
  368. readPos -= 4
  369. buf := t.readNoFail(readPos, 4)
  370. checksumLen := int(y.BytesToU32(buf))
  371. if checksumLen < 0 {
  372. return nil, errors.New("checksum length less than zero. Data corrupted")
  373. }
  374. // Read checksum.
  375. expectedChk := &pb.Checksum{}
  376. readPos -= checksumLen
  377. buf = t.readNoFail(readPos, checksumLen)
  378. if err := proto.Unmarshal(buf, expectedChk); err != nil {
  379. return nil, err
  380. }
  381. // Read index size from the footer.
  382. readPos -= 4
  383. buf = t.readNoFail(readPos, 4)
  384. t.indexLen = int(y.BytesToU32(buf))
  385. // Read index.
  386. readPos -= t.indexLen
  387. t.indexStart = readPos
  388. data := t.readNoFail(readPos, t.indexLen)
  389. if err := y.VerifyChecksum(data, expectedChk); err != nil {
  390. return nil, y.Wrapf(err, "failed to verify checksum for table: %s", t.Filename())
  391. }
  392. index, err := t.readTableIndex()
  393. if err != nil {
  394. return nil, err
  395. }
  396. if !t.shouldDecrypt() {
  397. // If there's no encryption, this points to the mmap'ed buffer.
  398. t._index = index
  399. }
  400. t._cheap = &cheapIndex{
  401. MaxVersion: index.MaxVersion(),
  402. KeyCount: index.KeyCount(),
  403. UncompressedSize: index.UncompressedSize(),
  404. OnDiskSize: index.OnDiskSize(),
  405. OffsetsLength: index.OffsetsLength(),
  406. BloomFilterLength: index.BloomFilterLength(),
  407. }
  408. t.hasBloomFilter = len(index.BloomFilterBytes()) > 0
  409. var bo fb.BlockOffset
  410. y.AssertTrue(index.Offsets(&bo, 0))
  411. return &bo, nil
  412. }
  413. // KeySplits splits the table into at least n ranges based on the block offsets.
  414. func (t *Table) KeySplits(n int, prefix []byte) []string {
  415. if n == 0 {
  416. return nil
  417. }
  418. oLen := t.offsetsLength()
  419. jump := oLen / n
  420. if jump == 0 {
  421. jump = 1
  422. }
  423. var bo fb.BlockOffset
  424. var res []string
  425. for i := 0; i < oLen; i += jump {
  426. if i >= oLen {
  427. i = oLen - 1
  428. }
  429. y.AssertTrue(t.offsets(&bo, i))
  430. if bytes.HasPrefix(bo.KeyBytes(), prefix) {
  431. res = append(res, string(bo.KeyBytes()))
  432. }
  433. }
  434. return res
  435. }
  436. func (t *Table) fetchIndex() *fb.TableIndex {
  437. if !t.shouldDecrypt() {
  438. return t._index
  439. }
  440. if t.opt.IndexCache == nil {
  441. panic("Index Cache must be set for encrypted workloads")
  442. }
  443. if val, ok := t.opt.IndexCache.Get(t.indexKey()); ok && val != nil {
  444. return val
  445. }
  446. index, err := t.readTableIndex()
  447. y.Check(err)
  448. t.opt.IndexCache.Set(t.indexKey(), index, int64(t.indexLen))
  449. return index
  450. }
  451. func (t *Table) offsets(ko *fb.BlockOffset, i int) bool {
  452. return t.fetchIndex().Offsets(ko, i)
  453. }
  454. // block function return a new block. Each block holds a ref and the byte
  455. // slice stored in the block will be reused when the ref becomes zero. The
  456. // caller should release the block by calling block.decrRef() on it.
  457. func (t *Table) block(idx int, useCache bool) (*Block, error) {
  458. y.AssertTruef(idx >= 0, "idx=%d", idx)
  459. if idx >= t.offsetsLength() {
  460. return nil, errors.New("block out of index")
  461. }
  462. if t.opt.BlockCache != nil {
  463. key := t.blockCacheKey(idx)
  464. blk, ok := t.opt.BlockCache.Get(key)
  465. if ok && blk != nil {
  466. // Use the block only if the increment was successful. The block
  467. // could get evicted from the cache between the Get() call and the
  468. // incrRef() call.
  469. if blk.incrRef() {
  470. return blk, nil
  471. }
  472. }
  473. }
  474. var ko fb.BlockOffset
  475. y.AssertTrue(t.offsets(&ko, idx))
  476. blk := &Block{offset: int(ko.Offset())}
  477. blk.ref.Store(1)
  478. defer blk.decrRef() // Deal with any errors, where blk would not be returned.
  479. NumBlocks.Add(1)
  480. var err error
  481. if blk.data, err = t.read(blk.offset, int(ko.Len())); err != nil {
  482. return nil, y.Wrapf(err,
  483. "failed to read from file: %s at offset: %d, len: %d",
  484. t.Fd.Name(), blk.offset, ko.Len())
  485. }
  486. if t.shouldDecrypt() {
  487. // Decrypt the block if it is encrypted.
  488. if blk.data, err = t.decrypt(blk.data, true); err != nil {
  489. return nil, err
  490. }
  491. // blk.data is allocated via Calloc. So, do free.
  492. blk.freeMe = true
  493. }
  494. if err = t.decompress(blk); err != nil {
  495. return nil, y.Wrapf(err,
  496. "failed to decode compressed data in file: %s at offset: %d, len: %d",
  497. t.Fd.Name(), blk.offset, ko.Len())
  498. }
  499. // Read meta data related to block.
  500. readPos := len(blk.data) - 4 // First read checksum length.
  501. blk.chkLen = int(y.BytesToU32(blk.data[readPos : readPos+4]))
  502. // Checksum length greater than block size could happen if the table was compressed and
  503. // it was opened with an incorrect compression algorithm (or the data was corrupted).
  504. if blk.chkLen > len(blk.data) {
  505. return nil, errors.New("invalid checksum length. Either the data is " +
  506. "corrupted or the table options are incorrectly set")
  507. }
  508. // Read checksum and store it
  509. readPos -= blk.chkLen
  510. blk.checksum = blk.data[readPos : readPos+blk.chkLen]
  511. // Move back and read numEntries in the block.
  512. readPos -= 4
  513. numEntries := int(y.BytesToU32(blk.data[readPos : readPos+4]))
  514. entriesIndexStart := readPos - (numEntries * 4)
  515. entriesIndexEnd := entriesIndexStart + numEntries*4
  516. blk.entryOffsets = y.BytesToU32Slice(blk.data[entriesIndexStart:entriesIndexEnd])
  517. blk.entriesIndexStart = entriesIndexStart
  518. // Drop checksum and checksum length.
  519. // The checksum is calculated for actual data + entry index + index length
  520. blk.data = blk.data[:readPos+4]
  521. // Verify checksum on if checksum verification mode is OnRead on OnStartAndRead.
  522. if t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead {
  523. if err = blk.verifyCheckSum(); err != nil {
  524. return nil, err
  525. }
  526. }
  527. blk.incrRef()
  528. if useCache && t.opt.BlockCache != nil {
  529. key := t.blockCacheKey(idx)
  530. // incrRef should never return false here because we're calling it on a
  531. // new block with ref=1.
  532. y.AssertTrue(blk.incrRef())
  533. // Decrement the block ref if we could not insert it in the cache.
  534. if !t.opt.BlockCache.Set(key, blk, blk.size()) {
  535. blk.decrRef()
  536. }
  537. // We have added an OnReject func in our cache, which gets called in case the block is not
  538. // admitted to the cache. So, every block would be accounted for.
  539. }
  540. return blk, nil
  541. }
  542. // blockCacheKey is used to store blocks in the block cache.
  543. func (t *Table) blockCacheKey(idx int) []byte {
  544. y.AssertTrue(t.id < math.MaxUint32)
  545. y.AssertTrue(uint32(idx) < math.MaxUint32)
  546. buf := make([]byte, 8)
  547. // Assume t.ID does not overflow uint32.
  548. binary.BigEndian.PutUint32(buf[:4], uint32(t.ID()))
  549. binary.BigEndian.PutUint32(buf[4:], uint32(idx))
  550. return buf
  551. }
  552. // indexKey returns the cache key for block offsets. blockOffsets
  553. // are stored in the index cache.
  554. func (t *Table) indexKey() uint64 {
  555. return t.id
  556. }
  557. // IndexSize is the size of table index in bytes.
  558. func (t *Table) IndexSize() int {
  559. return t.indexLen
  560. }
  561. // Size is its file size in bytes
  562. func (t *Table) Size() int64 { return int64(t.tableSize) }
  563. // StaleDataSize is the amount of stale data (that can be dropped by a compaction )in this SST.
  564. func (t *Table) StaleDataSize() uint32 { return t.fetchIndex().StaleDataSize() }
  565. // Smallest is its smallest key, or nil if there are none
  566. func (t *Table) Smallest() []byte { return t.smallest }
  567. // Biggest is its biggest key, or nil if there are none
  568. func (t *Table) Biggest() []byte { return t.biggest }
  569. // Filename is NOT the file name. Just kidding, it is.
  570. func (t *Table) Filename() string { return t.Fd.Name() }
  571. // ID is the table's ID number (used to make the file name).
  572. func (t *Table) ID() uint64 { return t.id }
  573. // DoesNotHave returns true if and only if the table does not have the key hash.
  574. // It does a bloom filter lookup.
  575. func (t *Table) DoesNotHave(hash uint32) bool {
  576. if !t.hasBloomFilter {
  577. return false
  578. }
  579. y.NumLSMBloomHitsAdd(t.opt.MetricsEnabled, "DoesNotHave_ALL", 1)
  580. index := t.fetchIndex()
  581. bf := index.BloomFilterBytes()
  582. mayContain := y.Filter(bf).MayContain(hash)
  583. if !mayContain {
  584. y.NumLSMBloomHitsAdd(t.opt.MetricsEnabled, "DoesNotHave_HIT", 1)
  585. }
  586. return !mayContain
  587. }
  588. // readTableIndex reads table index from the sst and returns its pb format.
  589. func (t *Table) readTableIndex() (*fb.TableIndex, error) {
  590. data := t.readNoFail(t.indexStart, t.indexLen)
  591. var err error
  592. // Decrypt the table index if it is encrypted.
  593. if t.shouldDecrypt() {
  594. if data, err = t.decrypt(data, false); err != nil {
  595. return nil, y.Wrapf(err,
  596. "Error while decrypting table index for the table %d in readTableIndex", t.id)
  597. }
  598. }
  599. return fb.GetRootAsTableIndex(data, 0), nil
  600. }
  601. // VerifyChecksum verifies checksum for all blocks of table. This function is called by
  602. // OpenTable() function. This function is also called inside levelsController.VerifyChecksum().
  603. func (t *Table) VerifyChecksum() error {
  604. ti := t.fetchIndex()
  605. for i := 0; i < ti.OffsetsLength(); i++ {
  606. b, err := t.block(i, true)
  607. if err != nil {
  608. return y.Wrapf(err, "checksum validation failed for table: %s, block: %d, offset:%d",
  609. t.Filename(), i, b.offset)
  610. }
  611. // We should not call incrRef here, because the block already has one ref when created.
  612. defer b.decrRef()
  613. // OnBlockRead or OnTableAndBlockRead, we don't need to call verify checksum
  614. // on block, verification would be done while reading block itself.
  615. if !(t.opt.ChkMode == options.OnBlockRead || t.opt.ChkMode == options.OnTableAndBlockRead) {
  616. if err = b.verifyCheckSum(); err != nil {
  617. return y.Wrapf(err,
  618. "checksum validation failed for table: %s, block: %d, offset:%d",
  619. t.Filename(), i, b.offset)
  620. }
  621. }
  622. }
  623. return nil
  624. }
  625. // shouldDecrypt tells whether to decrypt or not. We decrypt only if the datakey exist
  626. // for the table.
  627. func (t *Table) shouldDecrypt() bool {
  628. return t.opt.DataKey != nil
  629. }
  630. // KeyID returns data key id.
  631. func (t *Table) KeyID() uint64 {
  632. if t.opt.DataKey != nil {
  633. return t.opt.DataKey.KeyId
  634. }
  635. // By default it's 0, if it is plain text.
  636. return 0
  637. }
  638. // decrypt decrypts the given data. It should be called only after checking shouldDecrypt.
  639. func (t *Table) decrypt(data []byte, viaCalloc bool) ([]byte, error) {
  640. // Last BlockSize bytes of the data is the IV.
  641. iv := data[len(data)-aes.BlockSize:]
  642. // Rest all bytes are data.
  643. data = data[:len(data)-aes.BlockSize]
  644. var dst []byte
  645. if viaCalloc {
  646. dst = z.Calloc(len(data), "Table.Decrypt")
  647. } else {
  648. dst = make([]byte, len(data))
  649. }
  650. if err := y.XORBlock(dst, data, t.opt.DataKey.Data, iv); err != nil {
  651. return nil, y.Wrapf(err, "while decrypt")
  652. }
  653. return dst, nil
  654. }
  655. // ParseFileID reads the file id out of a filename.
  656. func ParseFileID(name string) (uint64, bool) {
  657. name = filepath.Base(name)
  658. if !strings.HasSuffix(name, fileSuffix) {
  659. return 0, false
  660. }
  661. // suffix := name[len(fileSuffix):]
  662. name = strings.TrimSuffix(name, fileSuffix)
  663. id, err := strconv.Atoi(name)
  664. if err != nil {
  665. return 0, false
  666. }
  667. y.AssertTrue(id >= 0)
  668. return uint64(id), true
  669. }
  670. // IDToFilename does the inverse of ParseFileID
  671. func IDToFilename(id uint64) string {
  672. return fmt.Sprintf("%06d", id) + fileSuffix
  673. }
  674. // NewFilename should be named TableFilepath -- it combines the dir with the ID to make a table
  675. // filepath.
  676. func NewFilename(id uint64, dir string) string {
  677. return filepath.Join(dir, IDToFilename(id))
  678. }
  679. // decompress decompresses the data stored in a block.
  680. func (t *Table) decompress(b *Block) error {
  681. var dst []byte
  682. var err error
  683. // Point to the original b.data
  684. src := b.data
  685. switch t.opt.Compression {
  686. case options.None:
  687. // Nothing to be done here.
  688. return nil
  689. case options.Snappy:
  690. if sz, err := snappy.DecodedLen(b.data); err == nil {
  691. dst = z.Calloc(sz, "Table.Decompress")
  692. } else {
  693. dst = z.Calloc(len(b.data)*4, "Table.Decompress") // Take a guess.
  694. }
  695. b.data, err = snappy.Decode(dst, b.data)
  696. if err != nil {
  697. z.Free(dst)
  698. return y.Wrap(err, "failed to decompress")
  699. }
  700. case options.ZSTD:
  701. sz := int(float64(t.opt.BlockSize) * 1.2)
  702. // Get frame content size from header.
  703. var hdr zstd.Header
  704. if err := hdr.Decode(b.data); err == nil && hdr.HasFCS && hdr.FrameContentSize < uint64(t.opt.BlockSize*2) {
  705. sz = int(hdr.FrameContentSize)
  706. }
  707. dst = z.Calloc(sz, "Table.Decompress")
  708. b.data, err = y.ZSTDDecompress(dst, b.data)
  709. if err != nil {
  710. z.Free(dst)
  711. return y.Wrap(err, "failed to decompress")
  712. }
  713. default:
  714. return errors.New("Unsupported compression type")
  715. }
  716. if b.freeMe {
  717. z.Free(src)
  718. b.freeMe = false
  719. }
  720. if len(b.data) > 0 && len(dst) > 0 && &dst[0] != &b.data[0] {
  721. z.Free(dst)
  722. } else {
  723. b.freeMe = true
  724. }
  725. return nil
  726. }