builder.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package table
  6. import (
  7. "crypto/aes"
  8. "errors"
  9. "math"
  10. "runtime"
  11. "sync"
  12. "sync/atomic"
  13. "unsafe"
  14. fbs "github.com/google/flatbuffers/go"
  15. "github.com/klauspost/compress/s2"
  16. "google.golang.org/protobuf/proto"
  17. "github.com/dgraph-io/badger/v4/fb"
  18. "github.com/dgraph-io/badger/v4/options"
  19. "github.com/dgraph-io/badger/v4/pb"
  20. "github.com/dgraph-io/badger/v4/y"
  21. "github.com/dgraph-io/ristretto/v2/z"
  22. )
  23. const (
  24. KB = 1024
  25. MB = KB * 1024
  26. // When a block is encrypted, it's length increases. We add 256 bytes of padding to
  27. // handle cases when block size increases. This is an approximate number.
  28. padding = 256
  29. )
  30. type header struct {
  31. overlap uint16 // Overlap with base key.
  32. diff uint16 // Length of the diff.
  33. }
  34. const headerSize = uint16(unsafe.Sizeof(header{}))
  35. // Encode encodes the header.
  36. func (h header) Encode() []byte {
  37. var b [4]byte
  38. *(*header)(unsafe.Pointer(&b[0])) = h
  39. return b[:]
  40. }
  41. // Decode decodes the header.
  42. func (h *header) Decode(buf []byte) {
  43. // Copy over data from buf into h. Using *h=unsafe.pointer(...) leads to
  44. // pointer alignment issues. See https://github.com/hypermodeinc/badger/issues/1096
  45. // and comment https://github.com/hypermodeinc/badger/pull/1097#pullrequestreview-307361714
  46. copy(((*[headerSize]byte)(unsafe.Pointer(h))[:]), buf[:headerSize])
  47. }
  48. // bblock represents a block that is being compressed/encrypted in the background.
  49. type bblock struct {
  50. data []byte
  51. baseKey []byte // Base key for the current block.
  52. entryOffsets []uint32 // Offsets of entries present in current block.
  53. end int // Points to the end offset of the block.
  54. }
  55. // Builder is used in building a table.
  56. type Builder struct {
  57. // Typically tens or hundreds of meg. This is for one single file.
  58. alloc *z.Allocator
  59. curBlock *bblock
  60. compressedSize atomic.Uint32
  61. uncompressedSize atomic.Uint32
  62. lenOffsets uint32
  63. keyHashes []uint32 // Used for building the bloomfilter.
  64. opts *Options
  65. maxVersion uint64
  66. onDiskSize uint32
  67. staleDataSize int
  68. // Used to concurrently compress/encrypt blocks.
  69. wg sync.WaitGroup
  70. blockChan chan *bblock
  71. blockList []*bblock
  72. }
  73. func (b *Builder) allocate(need int) []byte {
  74. bb := b.curBlock
  75. if len(bb.data[bb.end:]) < need {
  76. // We need to reallocate. 1GB is the max size that the allocator can allocate.
  77. // While reallocating, if doubling exceeds that limit, then put the upper bound on it.
  78. sz := 2 * len(bb.data)
  79. if sz > (1 << 30) {
  80. sz = 1 << 30
  81. }
  82. if bb.end+need > sz {
  83. sz = bb.end + need
  84. }
  85. tmp := b.alloc.Allocate(sz)
  86. copy(tmp, bb.data)
  87. bb.data = tmp
  88. }
  89. bb.end += need
  90. return bb.data[bb.end-need : bb.end]
  91. }
  92. // append appends to curBlock.data
  93. func (b *Builder) append(data []byte) {
  94. dst := b.allocate(len(data))
  95. y.AssertTrue(len(data) == copy(dst, data))
  96. }
  97. const maxAllocatorInitialSz = 256 << 20
  98. // NewTableBuilder makes a new TableBuilder.
  99. func NewTableBuilder(opts Options) *Builder {
  100. sz := 2 * int(opts.TableSize)
  101. if sz > maxAllocatorInitialSz {
  102. sz = maxAllocatorInitialSz
  103. }
  104. b := &Builder{
  105. alloc: opts.AllocPool.Get(sz, "TableBuilder"),
  106. opts: &opts,
  107. }
  108. b.alloc.Tag = "Builder"
  109. b.curBlock = &bblock{
  110. data: b.alloc.Allocate(opts.BlockSize + padding),
  111. }
  112. b.opts.tableCapacity = uint64(float64(b.opts.TableSize) * 0.95)
  113. // If encryption or compression is not enabled, do not start compression/encryption goroutines
  114. // and write directly to the buffer.
  115. if b.opts.Compression == options.None && b.opts.DataKey == nil {
  116. return b
  117. }
  118. count := 2 * runtime.NumCPU()
  119. b.blockChan = make(chan *bblock, count*2)
  120. b.wg.Add(count)
  121. for i := 0; i < count; i++ {
  122. go b.handleBlock()
  123. }
  124. return b
  125. }
  126. func maxEncodedLen(ctype options.CompressionType, sz int) int {
  127. switch ctype {
  128. case options.Snappy:
  129. return s2.MaxEncodedLen(sz)
  130. case options.ZSTD:
  131. return y.ZSTDCompressBound(sz)
  132. }
  133. return sz
  134. }
  135. func (b *Builder) handleBlock() {
  136. defer b.wg.Done()
  137. doCompress := b.opts.Compression != options.None
  138. for item := range b.blockChan {
  139. // Extract the block.
  140. blockBuf := item.data[:item.end]
  141. // Compress the block.
  142. if doCompress {
  143. out, err := b.compressData(blockBuf)
  144. y.Check(err)
  145. blockBuf = out
  146. }
  147. if b.shouldEncrypt() {
  148. out, err := b.encrypt(blockBuf)
  149. y.Check(y.Wrapf(err, "Error while encrypting block in table builder."))
  150. blockBuf = out
  151. }
  152. // BlockBuf should always less than or equal to allocated space. If the blockBuf is greater
  153. // than allocated space that means the data from this block cannot be stored in its
  154. // existing location.
  155. allocatedSpace := maxEncodedLen(b.opts.Compression, (item.end)) + padding + 1
  156. y.AssertTrue(len(blockBuf) <= allocatedSpace)
  157. // blockBuf was allocated on allocator. So, we don't need to copy it over.
  158. item.data = blockBuf
  159. item.end = len(blockBuf)
  160. b.compressedSize.Add(uint32(len(blockBuf)))
  161. }
  162. }
  163. // Close closes the TableBuilder.
  164. func (b *Builder) Close() {
  165. b.opts.AllocPool.Return(b.alloc)
  166. }
  167. // Empty returns whether it's empty.
  168. func (b *Builder) Empty() bool { return len(b.keyHashes) == 0 }
  169. // keyDiff returns a suffix of newKey that is different from b.baseKey.
  170. func (b *Builder) keyDiff(newKey []byte) []byte {
  171. var i int
  172. for i = 0; i < len(newKey) && i < len(b.curBlock.baseKey); i++ {
  173. if newKey[i] != b.curBlock.baseKey[i] {
  174. break
  175. }
  176. }
  177. return newKey[i:]
  178. }
  179. func (b *Builder) addHelper(key []byte, v y.ValueStruct, vpLen uint32) {
  180. b.keyHashes = append(b.keyHashes, y.Hash(y.ParseKey(key)))
  181. if version := y.ParseTs(key); version > b.maxVersion {
  182. b.maxVersion = version
  183. }
  184. // diffKey stores the difference of key with baseKey.
  185. var diffKey []byte
  186. if len(b.curBlock.baseKey) == 0 {
  187. // Make a copy. Builder should not keep references. Otherwise, caller has to be very careful
  188. // and will have to make copies of keys every time they add to builder, which is even worse.
  189. b.curBlock.baseKey = append(b.curBlock.baseKey[:0], key...)
  190. diffKey = key
  191. } else {
  192. diffKey = b.keyDiff(key)
  193. }
  194. y.AssertTrue(len(key)-len(diffKey) <= math.MaxUint16)
  195. y.AssertTrue(len(diffKey) <= math.MaxUint16)
  196. h := header{
  197. overlap: uint16(len(key) - len(diffKey)),
  198. diff: uint16(len(diffKey)),
  199. }
  200. // store current entry's offset
  201. b.curBlock.entryOffsets = append(b.curBlock.entryOffsets, uint32(b.curBlock.end))
  202. // Layout: header, diffKey, value.
  203. b.append(h.Encode())
  204. b.append(diffKey)
  205. dst := b.allocate(int(v.EncodedSize()))
  206. v.Encode(dst)
  207. // Add the vpLen to the onDisk size. We'll add the size of the block to
  208. // onDisk size in Finish() function.
  209. b.onDiskSize += vpLen
  210. }
  211. /*
  212. Structure of Block.
  213. +-------------------+---------------------+--------------------+--------------+------------------+
  214. | Entry1 | Entry2 | Entry3 | Entry4 | Entry5 |
  215. +-------------------+---------------------+--------------------+--------------+------------------+
  216. | Entry6 | ... | ... | ... | EntryN |
  217. +-------------------+---------------------+--------------------+--------------+------------------+
  218. | Block Meta(contains list of offsets used| Block Meta Size | Block | Checksum Size |
  219. | to perform binary search in the block) | (4 Bytes) | Checksum | (4 Bytes) |
  220. +-----------------------------------------+--------------------+--------------+------------------+
  221. */
  222. // In case the data is encrypted, the "IV" is added to the end of the block.
  223. func (b *Builder) finishBlock() {
  224. if len(b.curBlock.entryOffsets) == 0 {
  225. return
  226. }
  227. // Append the entryOffsets and its length.
  228. b.append(y.U32SliceToBytes(b.curBlock.entryOffsets))
  229. b.append(y.U32ToBytes(uint32(len(b.curBlock.entryOffsets))))
  230. checksum := b.calculateChecksum(b.curBlock.data[:b.curBlock.end])
  231. // Append the block checksum and its length.
  232. b.append(checksum)
  233. b.append(y.U32ToBytes(uint32(len(checksum))))
  234. b.blockList = append(b.blockList, b.curBlock)
  235. b.uncompressedSize.Add(uint32(b.curBlock.end))
  236. // Add length of baseKey (rounded to next multiple of 4 because of alignment).
  237. // Add another 40 Bytes, these additional 40 bytes consists of
  238. // 12 bytes of metadata of flatbuffer
  239. // 8 bytes for Key in flat buffer
  240. // 8 bytes for offset
  241. // 8 bytes for the len
  242. // 4 bytes for the size of slice while SliceAllocate
  243. b.lenOffsets += uint32(int(math.Ceil(float64(len(b.curBlock.baseKey))/4))*4) + 40
  244. // If compression/encryption is enabled, we need to send the block to the blockChan.
  245. if b.blockChan != nil {
  246. b.blockChan <- b.curBlock
  247. }
  248. }
  249. func (b *Builder) shouldFinishBlock(key []byte, value y.ValueStruct) bool {
  250. // If there is no entry till now, we will return false.
  251. if len(b.curBlock.entryOffsets) <= 0 {
  252. return false
  253. }
  254. // Integer overflow check for statements below.
  255. y.AssertTrue((uint32(len(b.curBlock.entryOffsets))+1)*4+4+8+4 < math.MaxUint32)
  256. // We should include current entry also in size, that's why +1 to len(b.entryOffsets).
  257. entriesOffsetsSize := uint32((len(b.curBlock.entryOffsets)+1)*4 +
  258. 4 + // size of list
  259. 8 + // Sum64 in checksum proto
  260. 4) // checksum length
  261. estimatedSize := uint32(b.curBlock.end) + uint32(6 /*header size for entry*/) +
  262. uint32(len(key)) + value.EncodedSize() + entriesOffsetsSize
  263. if b.shouldEncrypt() {
  264. // IV is added at the end of the block, while encrypting.
  265. // So, size of IV is added to estimatedSize.
  266. estimatedSize += aes.BlockSize
  267. }
  268. // Integer overflow check for table size.
  269. y.AssertTrue(uint64(b.curBlock.end)+uint64(estimatedSize) < math.MaxUint32)
  270. return estimatedSize > uint32(b.opts.BlockSize)
  271. }
  272. // AddStaleKey is same is Add function but it also increments the internal
  273. // staleDataSize counter. This value will be used to prioritize this table for
  274. // compaction.
  275. func (b *Builder) AddStaleKey(key []byte, v y.ValueStruct, valueLen uint32) {
  276. // Rough estimate based on how much space it will occupy in the SST.
  277. b.staleDataSize += len(key) + len(v.Value) + 4 /* entry offset */ + 4 /* header size */
  278. b.addInternal(key, v, valueLen, true)
  279. }
  280. // Add adds a key-value pair to the block.
  281. func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) {
  282. b.addInternal(key, value, valueLen, false)
  283. }
  284. func (b *Builder) addInternal(key []byte, value y.ValueStruct, valueLen uint32, isStale bool) {
  285. if b.shouldFinishBlock(key, value) {
  286. if isStale {
  287. // This key will be added to tableIndex and it is stale.
  288. b.staleDataSize += len(key) + 4 /* len */ + 4 /* offset */
  289. }
  290. b.finishBlock()
  291. // Create a new block and start writing.
  292. b.curBlock = &bblock{
  293. data: b.alloc.Allocate(b.opts.BlockSize + padding),
  294. }
  295. }
  296. b.addHelper(key, value, valueLen)
  297. }
  298. // TODO: vvv this was the comment on ReachedCapacity.
  299. // FinalSize returns the *rough* final size of the array, counting the header which is
  300. // not yet written.
  301. // TODO: Look into why there is a discrepancy. I suspect it is because of Write(empty, empty)
  302. // at the end. The diff can vary.
  303. // ReachedCapacity returns true if we... roughly (?) reached capacity?
  304. func (b *Builder) ReachedCapacity() bool {
  305. // If encryption/compression is enabled then use the compresssed size.
  306. sumBlockSizes := b.compressedSize.Load()
  307. if b.opts.Compression == options.None && b.opts.DataKey == nil {
  308. sumBlockSizes = b.uncompressedSize.Load()
  309. }
  310. blocksSize := sumBlockSizes + // actual length of current buffer
  311. uint32(len(b.curBlock.entryOffsets)*4) + // all entry offsets size
  312. 4 + // count of all entry offsets
  313. 8 + // checksum bytes
  314. 4 // checksum length
  315. estimateSz := blocksSize +
  316. 4 + // Index length
  317. b.lenOffsets
  318. return uint64(estimateSz) > b.opts.tableCapacity
  319. }
  320. // Finish finishes the table by appending the index.
  321. /*
  322. The table structure looks like
  323. +---------+------------+-----------+---------------+
  324. | Block 1 | Block 2 | Block 3 | Block 4 |
  325. +---------+------------+-----------+---------------+
  326. | Block 5 | Block 6 | Block ... | Block N |
  327. +---------+------------+-----------+---------------+
  328. | Index | Index Size | Checksum | Checksum Size |
  329. +---------+------------+-----------+---------------+
  330. */
  331. // In case the data is encrypted, the "IV" is added to the end of the index.
  332. func (b *Builder) Finish() []byte {
  333. bd := b.Done()
  334. buf := make([]byte, bd.Size)
  335. written := bd.Copy(buf)
  336. y.AssertTrue(written == len(buf))
  337. return buf
  338. }
  339. type buildData struct {
  340. blockList []*bblock
  341. index []byte
  342. checksum []byte
  343. Size int
  344. alloc *z.Allocator
  345. }
  346. func (bd *buildData) Copy(dst []byte) int {
  347. var written int
  348. for _, bl := range bd.blockList {
  349. written += copy(dst[written:], bl.data[:bl.end])
  350. }
  351. written += copy(dst[written:], bd.index)
  352. written += copy(dst[written:], y.U32ToBytes(uint32(len(bd.index))))
  353. written += copy(dst[written:], bd.checksum)
  354. written += copy(dst[written:], y.U32ToBytes(uint32(len(bd.checksum))))
  355. return written
  356. }
  357. func (b *Builder) Done() buildData {
  358. b.finishBlock() // This will never start a new block.
  359. if b.blockChan != nil {
  360. close(b.blockChan)
  361. }
  362. // Wait for block handler to finish.
  363. b.wg.Wait()
  364. if len(b.blockList) == 0 {
  365. return buildData{}
  366. }
  367. bd := buildData{
  368. blockList: b.blockList,
  369. alloc: b.alloc,
  370. }
  371. var f y.Filter
  372. if b.opts.BloomFalsePositive > 0 {
  373. bits := y.BloomBitsPerKey(len(b.keyHashes), b.opts.BloomFalsePositive)
  374. f = y.NewFilter(b.keyHashes, bits)
  375. }
  376. index, dataSize := b.buildIndex(f)
  377. var err error
  378. if b.shouldEncrypt() {
  379. index, err = b.encrypt(index)
  380. y.Check(err)
  381. }
  382. checksum := b.calculateChecksum(index)
  383. bd.index = index
  384. bd.checksum = checksum
  385. bd.Size = int(dataSize) + len(index) + len(checksum) + 4 + 4
  386. return bd
  387. }
  388. func (b *Builder) calculateChecksum(data []byte) []byte {
  389. // Build checksum for the index.
  390. checksum := pb.Checksum{
  391. // TODO: The checksum type should be configurable from the
  392. // options.
  393. // We chose to use CRC32 as the default option because
  394. // it performed better compared to xxHash64.
  395. // See the BenchmarkChecksum in table_test.go file
  396. // Size => 1024 B 2048 B
  397. // CRC32 => 63.7 ns/op 112 ns/op
  398. // xxHash64 => 87.5 ns/op 158 ns/op
  399. Sum: y.CalculateChecksum(data, pb.Checksum_CRC32C),
  400. Algo: pb.Checksum_CRC32C,
  401. }
  402. // Write checksum to the file.
  403. chksum, err := proto.Marshal(&checksum)
  404. y.Check(err)
  405. // Write checksum size.
  406. return chksum
  407. }
  408. // DataKey returns datakey of the builder.
  409. func (b *Builder) DataKey() *pb.DataKey {
  410. return b.opts.DataKey
  411. }
  412. func (b *Builder) Opts() *Options {
  413. return b.opts
  414. }
  415. // encrypt will encrypt the given data and appends IV to the end of the encrypted data.
  416. // This should be only called only after checking shouldEncrypt method.
  417. func (b *Builder) encrypt(data []byte) ([]byte, error) {
  418. iv, err := y.GenerateIV()
  419. if err != nil {
  420. return data, y.Wrapf(err, "Error while generating IV in Builder.encrypt")
  421. }
  422. needSz := len(data) + len(iv)
  423. dst := b.alloc.Allocate(needSz)
  424. if err = y.XORBlock(dst[:len(data)], data, b.DataKey().Data, iv); err != nil {
  425. return data, y.Wrapf(err, "Error while encrypting in Builder.encrypt")
  426. }
  427. y.AssertTrue(len(iv) == copy(dst[len(data):], iv))
  428. return dst, nil
  429. }
  430. // shouldEncrypt tells us whether to encrypt the data or not.
  431. // We encrypt only if the data key exist. Otherwise, not.
  432. func (b *Builder) shouldEncrypt() bool {
  433. return b.opts.DataKey != nil
  434. }
  435. // compressData compresses the given data.
  436. func (b *Builder) compressData(data []byte) ([]byte, error) {
  437. switch b.opts.Compression {
  438. case options.None:
  439. return data, nil
  440. case options.Snappy:
  441. sz := s2.MaxEncodedLen(len(data))
  442. dst := b.alloc.Allocate(sz)
  443. return s2.EncodeSnappy(dst, data), nil
  444. case options.ZSTD:
  445. sz := y.ZSTDCompressBound(len(data))
  446. dst := b.alloc.Allocate(sz)
  447. return y.ZSTDCompress(dst, data, b.opts.ZSTDCompressionLevel)
  448. }
  449. return nil, errors.New("Unsupported compression type")
  450. }
  451. func (b *Builder) buildIndex(bloom []byte) ([]byte, uint32) {
  452. builder := fbs.NewBuilder(3 << 20)
  453. boList, dataSize := b.writeBlockOffsets(builder)
  454. // Write block offset vector the the idxBuilder.
  455. fb.TableIndexStartOffsetsVector(builder, len(boList))
  456. // Write individual block offsets in reverse order to work around how Flatbuffers expects it.
  457. for i := len(boList) - 1; i >= 0; i-- {
  458. builder.PrependUOffsetT(boList[i])
  459. }
  460. boEnd := builder.EndVector(len(boList))
  461. var bfoff fbs.UOffsetT
  462. // Write the bloom filter.
  463. if len(bloom) > 0 {
  464. bfoff = builder.CreateByteVector(bloom)
  465. }
  466. b.onDiskSize += dataSize
  467. fb.TableIndexStart(builder)
  468. fb.TableIndexAddOffsets(builder, boEnd)
  469. fb.TableIndexAddBloomFilter(builder, bfoff)
  470. fb.TableIndexAddMaxVersion(builder, b.maxVersion)
  471. fb.TableIndexAddUncompressedSize(builder, b.uncompressedSize.Load())
  472. fb.TableIndexAddKeyCount(builder, uint32(len(b.keyHashes)))
  473. fb.TableIndexAddOnDiskSize(builder, b.onDiskSize)
  474. fb.TableIndexAddStaleDataSize(builder, uint32(b.staleDataSize))
  475. builder.Finish(fb.TableIndexEnd(builder))
  476. buf := builder.FinishedBytes()
  477. index := fb.GetRootAsTableIndex(buf, 0)
  478. // Mutate the ondisk size to include the size of the index as well.
  479. y.AssertTrue(index.MutateOnDiskSize(index.OnDiskSize() + uint32(len(buf))))
  480. return buf, dataSize
  481. }
  482. // writeBlockOffsets writes all the blockOffets in b.offsets and returns the
  483. // offsets for the newly written items.
  484. func (b *Builder) writeBlockOffsets(builder *fbs.Builder) ([]fbs.UOffsetT, uint32) {
  485. var startOffset uint32
  486. var uoffs []fbs.UOffsetT
  487. for _, bl := range b.blockList {
  488. uoff := b.writeBlockOffset(builder, bl, startOffset)
  489. uoffs = append(uoffs, uoff)
  490. startOffset += uint32(bl.end)
  491. }
  492. return uoffs, startOffset
  493. }
  494. // writeBlockOffset writes the given key,offset,len triple to the indexBuilder.
  495. // It returns the offset of the newly written blockoffset.
  496. func (b *Builder) writeBlockOffset(
  497. builder *fbs.Builder, bl *bblock, startOffset uint32) fbs.UOffsetT {
  498. // Write the key to the buffer.
  499. k := builder.CreateByteVector(bl.baseKey)
  500. // Build the blockOffset.
  501. fb.BlockOffsetStart(builder)
  502. fb.BlockOffsetAddKey(builder, k)
  503. fb.BlockOffsetAddOffset(builder, startOffset)
  504. fb.BlockOffsetAddLen(builder, uint32(bl.end))
  505. return fb.BlockOffsetEnd(builder)
  506. }