y.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package y
  6. import (
  7. "bytes"
  8. "encoding/binary"
  9. stderrors "errors"
  10. "fmt"
  11. "hash/crc32"
  12. "io"
  13. "math"
  14. "os"
  15. "reflect"
  16. "strconv"
  17. "sync"
  18. "time"
  19. "unsafe"
  20. "github.com/dgraph-io/badger/v4/pb"
  21. "github.com/dgraph-io/ristretto/v2/z"
  22. )
  23. var (
  24. // ErrEOF indicates an end of file when trying to read from a memory mapped file
  25. // and encountering the end of slice.
  26. ErrEOF = stderrors.New("ErrEOF: End of file")
  27. // ErrCommitAfterFinish indicates that write batch commit was called after
  28. // finish
  29. ErrCommitAfterFinish = stderrors.New("Batch commit not permitted after finish")
  30. )
  31. type Flags int
  32. const (
  33. // Sync indicates that O_DSYNC should be set on the underlying file,
  34. // ensuring that data writes do not return until the data is flushed
  35. // to disk.
  36. Sync Flags = 1 << iota
  37. // ReadOnly opens the underlying file on a read-only basis.
  38. ReadOnly
  39. )
  40. var (
  41. // This is O_DSYNC (datasync) on platforms that support it -- see file_unix.go
  42. datasyncFileFlag = 0x0
  43. // CastagnoliCrcTable is a CRC32 polynomial table
  44. CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli)
  45. )
  46. // OpenExistingFile opens an existing file, errors if it doesn't exist.
  47. func OpenExistingFile(filename string, flags Flags) (*os.File, error) {
  48. openFlags := os.O_RDWR
  49. if flags&ReadOnly != 0 {
  50. openFlags = os.O_RDONLY
  51. }
  52. if flags&Sync != 0 {
  53. openFlags |= datasyncFileFlag
  54. }
  55. return os.OpenFile(filename, openFlags, 0)
  56. }
  57. // CreateSyncedFile creates a new file (using O_EXCL), errors if it already existed.
  58. func CreateSyncedFile(filename string, sync bool) (*os.File, error) {
  59. flags := os.O_RDWR | os.O_CREATE | os.O_EXCL
  60. if sync {
  61. flags |= datasyncFileFlag
  62. }
  63. return os.OpenFile(filename, flags, 0600)
  64. }
  65. // OpenSyncedFile creates the file if one doesn't exist.
  66. func OpenSyncedFile(filename string, sync bool) (*os.File, error) {
  67. flags := os.O_RDWR | os.O_CREATE
  68. if sync {
  69. flags |= datasyncFileFlag
  70. }
  71. return os.OpenFile(filename, flags, 0600)
  72. }
  73. // OpenTruncFile opens the file with O_RDWR | O_CREATE | O_TRUNC
  74. func OpenTruncFile(filename string, sync bool) (*os.File, error) {
  75. flags := os.O_RDWR | os.O_CREATE | os.O_TRUNC
  76. if sync {
  77. flags |= datasyncFileFlag
  78. }
  79. return os.OpenFile(filename, flags, 0600)
  80. }
  81. // SafeCopy does append(a[:0], src...).
  82. func SafeCopy(a, src []byte) []byte {
  83. return append(a[:0], src...)
  84. }
  85. // Copy copies a byte slice and returns the copied slice.
  86. func Copy(a []byte) []byte {
  87. b := make([]byte, len(a))
  88. copy(b, a)
  89. return b
  90. }
  91. // KeyWithTs generates a new key by appending ts to key.
  92. func KeyWithTs(key []byte, ts uint64) []byte {
  93. out := make([]byte, len(key)+8)
  94. copy(out, key)
  95. binary.BigEndian.PutUint64(out[len(key):], math.MaxUint64-ts)
  96. return out
  97. }
  98. // ParseTs parses the timestamp from the key bytes.
  99. func ParseTs(key []byte) uint64 {
  100. if len(key) <= 8 {
  101. return 0
  102. }
  103. return math.MaxUint64 - binary.BigEndian.Uint64(key[len(key)-8:])
  104. }
  105. // CompareKeys checks the key without timestamp and checks the timestamp if keyNoTs
  106. // is same.
  107. // a<timestamp> would be sorted higher than aa<timestamp> if we use bytes.compare
  108. // All keys should have timestamp.
  109. func CompareKeys(key1, key2 []byte) int {
  110. if cmp := bytes.Compare(key1[:len(key1)-8], key2[:len(key2)-8]); cmp != 0 {
  111. return cmp
  112. }
  113. return bytes.Compare(key1[len(key1)-8:], key2[len(key2)-8:])
  114. }
  115. // ParseKey parses the actual key from the key bytes.
  116. func ParseKey(key []byte) []byte {
  117. if key == nil {
  118. return nil
  119. }
  120. return key[:len(key)-8]
  121. }
  122. // SameKey checks for key equality ignoring the version timestamp suffix.
  123. func SameKey(src, dst []byte) bool {
  124. if len(src) != len(dst) {
  125. return false
  126. }
  127. return bytes.Equal(ParseKey(src), ParseKey(dst))
  128. }
  129. // Slice holds a reusable buf, will reallocate if you request a larger size than ever before.
  130. // One problem is with n distinct sizes in random order it'll reallocate log(n) times.
  131. type Slice struct {
  132. buf []byte
  133. }
  134. // Resize reuses the Slice's buffer (or makes a new one) and returns a slice in that buffer of
  135. // length sz.
  136. func (s *Slice) Resize(sz int) []byte {
  137. if cap(s.buf) < sz {
  138. s.buf = make([]byte, sz)
  139. }
  140. return s.buf[0:sz]
  141. }
  142. // FixedDuration returns a string representation of the given duration with the
  143. // hours, minutes, and seconds.
  144. func FixedDuration(d time.Duration) string {
  145. str := fmt.Sprintf("%02ds", int(d.Seconds())%60)
  146. if d >= time.Minute {
  147. str = fmt.Sprintf("%02dm", int(d.Minutes())%60) + str
  148. }
  149. if d >= time.Hour {
  150. str = fmt.Sprintf("%02dh", int(d.Hours())) + str
  151. }
  152. return str
  153. }
  154. // Throttle allows a limited number of workers to run at a time. It also
  155. // provides a mechanism to check for errors encountered by workers and wait for
  156. // them to finish.
  157. type Throttle struct {
  158. once sync.Once
  159. wg sync.WaitGroup
  160. ch chan struct{}
  161. errCh chan error
  162. finishErr error
  163. }
  164. // NewThrottle creates a new throttle with a max number of workers.
  165. func NewThrottle(max int) *Throttle {
  166. return &Throttle{
  167. ch: make(chan struct{}, max),
  168. errCh: make(chan error, max),
  169. }
  170. }
  171. // Do should be called by workers before they start working. It blocks if there
  172. // are already maximum number of workers working. If it detects an error from
  173. // previously Done workers, it would return it.
  174. func (t *Throttle) Do() error {
  175. for {
  176. select {
  177. case t.ch <- struct{}{}:
  178. t.wg.Add(1)
  179. return nil
  180. case err := <-t.errCh:
  181. if err != nil {
  182. return err
  183. }
  184. }
  185. }
  186. }
  187. // Done should be called by workers when they finish working. They can also
  188. // pass the error status of work done.
  189. func (t *Throttle) Done(err error) {
  190. if err != nil {
  191. t.errCh <- err
  192. }
  193. select {
  194. case <-t.ch:
  195. default:
  196. panic("Throttle Do Done mismatch")
  197. }
  198. t.wg.Done()
  199. }
  200. // Finish waits until all workers have finished working. It would return any error passed by Done.
  201. // If Finish is called multiple time, it will wait for workers to finish only once(first time).
  202. // From next calls, it will return same error as found on first call.
  203. func (t *Throttle) Finish() error {
  204. t.once.Do(func() {
  205. t.wg.Wait()
  206. close(t.ch)
  207. close(t.errCh)
  208. for err := range t.errCh {
  209. if err != nil {
  210. t.finishErr = err
  211. return
  212. }
  213. }
  214. })
  215. return t.finishErr
  216. }
  217. // U16ToBytes converts the given Uint16 to bytes
  218. func U16ToBytes(v uint16) []byte {
  219. var uBuf [2]byte
  220. binary.BigEndian.PutUint16(uBuf[:], v)
  221. return uBuf[:]
  222. }
  223. // BytesToU16 converts the given byte slice to uint16
  224. func BytesToU16(b []byte) uint16 {
  225. return binary.BigEndian.Uint16(b)
  226. }
  227. // U32ToBytes converts the given Uint32 to bytes
  228. func U32ToBytes(v uint32) []byte {
  229. var uBuf [4]byte
  230. binary.BigEndian.PutUint32(uBuf[:], v)
  231. return uBuf[:]
  232. }
  233. // BytesToU32 converts the given byte slice to uint32
  234. func BytesToU32(b []byte) uint32 {
  235. return binary.BigEndian.Uint32(b)
  236. }
  237. // U32SliceToBytes converts the given Uint32 slice to byte slice
  238. func U32SliceToBytes(u32s []uint32) []byte {
  239. if len(u32s) == 0 {
  240. return nil
  241. }
  242. var b []byte
  243. hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
  244. hdr.Len = len(u32s) * 4
  245. hdr.Cap = hdr.Len
  246. hdr.Data = uintptr(unsafe.Pointer(&u32s[0]))
  247. return b
  248. }
  249. // BytesToU32Slice converts the given byte slice to uint32 slice
  250. func BytesToU32Slice(b []byte) []uint32 {
  251. if len(b) == 0 {
  252. return nil
  253. }
  254. var u32s []uint32
  255. hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u32s))
  256. hdr.Len = len(b) / 4
  257. hdr.Cap = hdr.Len
  258. hdr.Data = uintptr(unsafe.Pointer(&b[0]))
  259. return u32s
  260. }
  261. // U64ToBytes converts the given Uint64 to bytes
  262. func U64ToBytes(v uint64) []byte {
  263. var uBuf [8]byte
  264. binary.BigEndian.PutUint64(uBuf[:], v)
  265. return uBuf[:]
  266. }
  267. // BytesToU64 converts the given byte slice to uint64
  268. func BytesToU64(b []byte) uint64 {
  269. return binary.BigEndian.Uint64(b)
  270. }
  271. // U64SliceToBytes converts the given Uint64 slice to byte slice
  272. func U64SliceToBytes(u64s []uint64) []byte {
  273. if len(u64s) == 0 {
  274. return nil
  275. }
  276. var b []byte
  277. hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
  278. hdr.Len = len(u64s) * 8
  279. hdr.Cap = hdr.Len
  280. hdr.Data = uintptr(unsafe.Pointer(&u64s[0]))
  281. return b
  282. }
  283. // BytesToU64Slice converts the given byte slice to uint64 slice
  284. func BytesToU64Slice(b []byte) []uint64 {
  285. if len(b) == 0 {
  286. return nil
  287. }
  288. var u64s []uint64
  289. hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u64s))
  290. hdr.Len = len(b) / 8
  291. hdr.Cap = hdr.Len
  292. hdr.Data = uintptr(unsafe.Pointer(&b[0]))
  293. return u64s
  294. }
  295. // page struct contains one underlying buffer.
  296. type page struct {
  297. buf []byte
  298. }
  299. // PageBuffer consists of many pages. A page is a wrapper over []byte. PageBuffer can act as a
  300. // replacement of bytes.Buffer. Instead of having single underlying buffer, it has multiple
  301. // underlying buffers. Hence it avoids any copy during relocation(as happens in bytes.Buffer).
  302. // PageBuffer allocates memory in pages. Once a page is full, it will allocate page with double the
  303. // size of previous page. Its function are not thread safe.
  304. type PageBuffer struct {
  305. pages []*page
  306. length int // Length of PageBuffer.
  307. nextPageSize int // Size of next page to be allocated.
  308. }
  309. // NewPageBuffer returns a new PageBuffer with first page having size pageSize.
  310. func NewPageBuffer(pageSize int) *PageBuffer {
  311. b := &PageBuffer{}
  312. b.pages = append(b.pages, &page{buf: make([]byte, 0, pageSize)})
  313. b.nextPageSize = pageSize * 2
  314. return b
  315. }
  316. // Write writes data to PageBuffer b. It returns number of bytes written and any error encountered.
  317. func (b *PageBuffer) Write(data []byte) (int, error) {
  318. dataLen := len(data)
  319. for {
  320. cp := b.pages[len(b.pages)-1] // Current page.
  321. n := copy(cp.buf[len(cp.buf):cap(cp.buf)], data)
  322. cp.buf = cp.buf[:len(cp.buf)+n]
  323. b.length += n
  324. if len(data) == n {
  325. break
  326. }
  327. data = data[n:]
  328. b.pages = append(b.pages, &page{buf: make([]byte, 0, b.nextPageSize)})
  329. b.nextPageSize *= 2
  330. }
  331. return dataLen, nil
  332. }
  333. // WriteByte writes data byte to PageBuffer and returns any encountered error.
  334. func (b *PageBuffer) WriteByte(data byte) error {
  335. _, err := b.Write([]byte{data})
  336. return err
  337. }
  338. // Len returns length of PageBuffer.
  339. func (b *PageBuffer) Len() int {
  340. return b.length
  341. }
  342. // pageForOffset returns pageIdx and startIdx for the offset.
  343. func (b *PageBuffer) pageForOffset(offset int) (int, int) {
  344. AssertTrue(offset < b.length)
  345. var pageIdx, startIdx, sizeNow int
  346. for i := 0; i < len(b.pages); i++ {
  347. cp := b.pages[i]
  348. if sizeNow+len(cp.buf)-1 < offset {
  349. sizeNow += len(cp.buf)
  350. } else {
  351. pageIdx = i
  352. startIdx = offset - sizeNow
  353. break
  354. }
  355. }
  356. return pageIdx, startIdx
  357. }
  358. // Truncate truncates PageBuffer to length n.
  359. func (b *PageBuffer) Truncate(n int) {
  360. pageIdx, startIdx := b.pageForOffset(n)
  361. // For simplicity of the code reject extra pages. These pages can be kept.
  362. b.pages = b.pages[:pageIdx+1]
  363. cp := b.pages[len(b.pages)-1]
  364. cp.buf = cp.buf[:startIdx]
  365. b.length = n
  366. }
  367. // Bytes returns whole Buffer data as single []byte.
  368. func (b *PageBuffer) Bytes() []byte {
  369. buf := make([]byte, b.length)
  370. written := 0
  371. for i := 0; i < len(b.pages); i++ {
  372. written += copy(buf[written:], b.pages[i].buf)
  373. }
  374. return buf
  375. }
  376. // WriteTo writes whole buffer to w. It returns number of bytes written and any error encountered.
  377. func (b *PageBuffer) WriteTo(w io.Writer) (int64, error) {
  378. written := int64(0)
  379. for i := 0; i < len(b.pages); i++ {
  380. n, err := w.Write(b.pages[i].buf)
  381. written += int64(n)
  382. if err != nil {
  383. return written, err
  384. }
  385. }
  386. return written, nil
  387. }
  388. // NewReaderAt returns a reader which starts reading from offset in page buffer.
  389. func (b *PageBuffer) NewReaderAt(offset int) *PageBufferReader {
  390. pageIdx, startIdx := b.pageForOffset(offset)
  391. return &PageBufferReader{
  392. buf: b,
  393. pageIdx: pageIdx,
  394. startIdx: startIdx,
  395. }
  396. }
  397. // PageBufferReader is a reader for PageBuffer.
  398. type PageBufferReader struct {
  399. buf *PageBuffer // Underlying page buffer.
  400. pageIdx int // Idx of page from where it will start reading.
  401. startIdx int // Idx inside page - buf.pages[pageIdx] from where it will start reading.
  402. }
  403. // Read reads upto len(p) bytes. It returns number of bytes read and any error encountered.
  404. func (r *PageBufferReader) Read(p []byte) (int, error) {
  405. // Check if there is enough to Read.
  406. pc := len(r.buf.pages)
  407. read := 0
  408. for r.pageIdx < pc && read < len(p) {
  409. cp := r.buf.pages[r.pageIdx] // Current Page.
  410. endIdx := len(cp.buf) // Last Idx up to which we can read from this page.
  411. n := copy(p[read:], cp.buf[r.startIdx:endIdx])
  412. read += n
  413. r.startIdx += n
  414. // Instead of len(cp.buf), we comparing with cap(cp.buf). This ensures that we move to next
  415. // page only when we have read all data. Reading from last page is an edge case. We don't
  416. // want to move to next page until last page is full to its capacity.
  417. if r.startIdx >= cap(cp.buf) {
  418. // We should move to next page.
  419. r.pageIdx++
  420. r.startIdx = 0
  421. continue
  422. }
  423. // When last page in not full to its capacity and we have read all data up to its
  424. // length, just break out of the loop.
  425. if r.pageIdx == pc-1 {
  426. break
  427. }
  428. }
  429. if read == 0 && len(p) > 0 {
  430. return read, io.EOF
  431. }
  432. return read, nil
  433. }
  434. const kvsz = int(unsafe.Sizeof(pb.KV{}))
  435. func NewKV(alloc *z.Allocator) *pb.KV {
  436. if alloc == nil {
  437. return &pb.KV{}
  438. }
  439. b := alloc.AllocateAligned(kvsz)
  440. return (*pb.KV)(unsafe.Pointer(&b[0]))
  441. }
  442. // IBytesToString converts size in bytes to human readable format.
  443. // The code is taken from humanize library and changed to provide
  444. // value upto custom decimal precision.
  445. // IBytesToString(12312412, 1) -> 11.7 MiB
  446. func IBytesToString(size uint64, precision int) string {
  447. sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"}
  448. base := float64(1024)
  449. if size < 10 {
  450. return fmt.Sprintf("%d B", size)
  451. }
  452. e := math.Floor(math.Log(float64(size)) / math.Log(base))
  453. suffix := sizes[int(e)]
  454. val := float64(size) / math.Pow(base, e)
  455. f := "%." + strconv.Itoa(precision) + "f %s"
  456. return fmt.Sprintf(f, val, suffix)
  457. }
  458. type RateMonitor struct {
  459. start time.Time
  460. lastSent uint64
  461. lastCapture time.Time
  462. rates []float64
  463. idx int
  464. }
  465. func NewRateMonitor(numSamples int) *RateMonitor {
  466. return &RateMonitor{
  467. start: time.Now(),
  468. rates: make([]float64, numSamples),
  469. }
  470. }
  471. const minRate = 0.0001
  472. // Capture captures the current number of sent bytes. This number should be monotonically
  473. // increasing.
  474. func (rm *RateMonitor) Capture(sent uint64) {
  475. diff := sent - rm.lastSent
  476. dur := time.Since(rm.lastCapture)
  477. rm.lastCapture, rm.lastSent = time.Now(), sent
  478. rate := float64(diff) / dur.Seconds()
  479. if rate < minRate {
  480. rate = minRate
  481. }
  482. rm.rates[rm.idx] = rate
  483. rm.idx = (rm.idx + 1) % len(rm.rates)
  484. }
  485. // Rate returns the average rate of transmission smoothed out by the number of samples.
  486. func (rm *RateMonitor) Rate() uint64 {
  487. var total float64
  488. var den float64
  489. for _, r := range rm.rates {
  490. if r < minRate {
  491. // Ignore this. We always set minRate, so this is a zero.
  492. // Typically at the start of the rate monitor, we'd have zeros.
  493. continue
  494. }
  495. total += r
  496. den += 1.0
  497. }
  498. if den < minRate {
  499. return 0
  500. }
  501. return uint64(total / den)
  502. }