y.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. /*
  2. * Copyright 2017 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package y
  17. import (
  18. "bytes"
  19. "encoding/binary"
  20. stderrors "errors"
  21. "fmt"
  22. "hash/crc32"
  23. "io"
  24. "math"
  25. "os"
  26. "reflect"
  27. "strconv"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "github.com/dgraph-io/badger/v4/pb"
  32. "github.com/dgraph-io/ristretto/v2/z"
  33. )
  34. var (
  35. // ErrEOF indicates an end of file when trying to read from a memory mapped file
  36. // and encountering the end of slice.
  37. ErrEOF = stderrors.New("ErrEOF: End of file")
  38. // ErrCommitAfterFinish indicates that write batch commit was called after
  39. // finish
  40. ErrCommitAfterFinish = stderrors.New("Batch commit not permitted after finish")
  41. )
  42. type Flags int
  43. const (
  44. // Sync indicates that O_DSYNC should be set on the underlying file,
  45. // ensuring that data writes do not return until the data is flushed
  46. // to disk.
  47. Sync Flags = 1 << iota
  48. // ReadOnly opens the underlying file on a read-only basis.
  49. ReadOnly
  50. )
  51. var (
  52. // This is O_DSYNC (datasync) on platforms that support it -- see file_unix.go
  53. datasyncFileFlag = 0x0
  54. // CastagnoliCrcTable is a CRC32 polynomial table
  55. CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli)
  56. )
  57. // OpenExistingFile opens an existing file, errors if it doesn't exist.
  58. func OpenExistingFile(filename string, flags Flags) (*os.File, error) {
  59. openFlags := os.O_RDWR
  60. if flags&ReadOnly != 0 {
  61. openFlags = os.O_RDONLY
  62. }
  63. if flags&Sync != 0 {
  64. openFlags |= datasyncFileFlag
  65. }
  66. return os.OpenFile(filename, openFlags, 0)
  67. }
  68. // CreateSyncedFile creates a new file (using O_EXCL), errors if it already existed.
  69. func CreateSyncedFile(filename string, sync bool) (*os.File, error) {
  70. flags := os.O_RDWR | os.O_CREATE | os.O_EXCL
  71. if sync {
  72. flags |= datasyncFileFlag
  73. }
  74. return os.OpenFile(filename, flags, 0600)
  75. }
  76. // OpenSyncedFile creates the file if one doesn't exist.
  77. func OpenSyncedFile(filename string, sync bool) (*os.File, error) {
  78. flags := os.O_RDWR | os.O_CREATE
  79. if sync {
  80. flags |= datasyncFileFlag
  81. }
  82. return os.OpenFile(filename, flags, 0600)
  83. }
  84. // OpenTruncFile opens the file with O_RDWR | O_CREATE | O_TRUNC
  85. func OpenTruncFile(filename string, sync bool) (*os.File, error) {
  86. flags := os.O_RDWR | os.O_CREATE | os.O_TRUNC
  87. if sync {
  88. flags |= datasyncFileFlag
  89. }
  90. return os.OpenFile(filename, flags, 0600)
  91. }
  92. // SafeCopy does append(a[:0], src...).
  93. func SafeCopy(a, src []byte) []byte {
  94. return append(a[:0], src...)
  95. }
  96. // Copy copies a byte slice and returns the copied slice.
  97. func Copy(a []byte) []byte {
  98. b := make([]byte, len(a))
  99. copy(b, a)
  100. return b
  101. }
  102. // KeyWithTs generates a new key by appending ts to key.
  103. func KeyWithTs(key []byte, ts uint64) []byte {
  104. out := make([]byte, len(key)+8)
  105. copy(out, key)
  106. binary.BigEndian.PutUint64(out[len(key):], math.MaxUint64-ts)
  107. return out
  108. }
  109. // ParseTs parses the timestamp from the key bytes.
  110. func ParseTs(key []byte) uint64 {
  111. if len(key) <= 8 {
  112. return 0
  113. }
  114. return math.MaxUint64 - binary.BigEndian.Uint64(key[len(key)-8:])
  115. }
  116. // CompareKeys checks the key without timestamp and checks the timestamp if keyNoTs
  117. // is same.
  118. // a<timestamp> would be sorted higher than aa<timestamp> if we use bytes.compare
  119. // All keys should have timestamp.
  120. func CompareKeys(key1, key2 []byte) int {
  121. if cmp := bytes.Compare(key1[:len(key1)-8], key2[:len(key2)-8]); cmp != 0 {
  122. return cmp
  123. }
  124. return bytes.Compare(key1[len(key1)-8:], key2[len(key2)-8:])
  125. }
  126. // ParseKey parses the actual key from the key bytes.
  127. func ParseKey(key []byte) []byte {
  128. if key == nil {
  129. return nil
  130. }
  131. return key[:len(key)-8]
  132. }
  133. // SameKey checks for key equality ignoring the version timestamp suffix.
  134. func SameKey(src, dst []byte) bool {
  135. if len(src) != len(dst) {
  136. return false
  137. }
  138. return bytes.Equal(ParseKey(src), ParseKey(dst))
  139. }
  140. // Slice holds a reusable buf, will reallocate if you request a larger size than ever before.
  141. // One problem is with n distinct sizes in random order it'll reallocate log(n) times.
  142. type Slice struct {
  143. buf []byte
  144. }
  145. // Resize reuses the Slice's buffer (or makes a new one) and returns a slice in that buffer of
  146. // length sz.
  147. func (s *Slice) Resize(sz int) []byte {
  148. if cap(s.buf) < sz {
  149. s.buf = make([]byte, sz)
  150. }
  151. return s.buf[0:sz]
  152. }
  153. // FixedDuration returns a string representation of the given duration with the
  154. // hours, minutes, and seconds.
  155. func FixedDuration(d time.Duration) string {
  156. str := fmt.Sprintf("%02ds", int(d.Seconds())%60)
  157. if d >= time.Minute {
  158. str = fmt.Sprintf("%02dm", int(d.Minutes())%60) + str
  159. }
  160. if d >= time.Hour {
  161. str = fmt.Sprintf("%02dh", int(d.Hours())) + str
  162. }
  163. return str
  164. }
  165. // Throttle allows a limited number of workers to run at a time. It also
  166. // provides a mechanism to check for errors encountered by workers and wait for
  167. // them to finish.
  168. type Throttle struct {
  169. once sync.Once
  170. wg sync.WaitGroup
  171. ch chan struct{}
  172. errCh chan error
  173. finishErr error
  174. }
  175. // NewThrottle creates a new throttle with a max number of workers.
  176. func NewThrottle(max int) *Throttle {
  177. return &Throttle{
  178. ch: make(chan struct{}, max),
  179. errCh: make(chan error, max),
  180. }
  181. }
  182. // Do should be called by workers before they start working. It blocks if there
  183. // are already maximum number of workers working. If it detects an error from
  184. // previously Done workers, it would return it.
  185. func (t *Throttle) Do() error {
  186. for {
  187. select {
  188. case t.ch <- struct{}{}:
  189. t.wg.Add(1)
  190. return nil
  191. case err := <-t.errCh:
  192. if err != nil {
  193. return err
  194. }
  195. }
  196. }
  197. }
  198. // Done should be called by workers when they finish working. They can also
  199. // pass the error status of work done.
  200. func (t *Throttle) Done(err error) {
  201. if err != nil {
  202. t.errCh <- err
  203. }
  204. select {
  205. case <-t.ch:
  206. default:
  207. panic("Throttle Do Done mismatch")
  208. }
  209. t.wg.Done()
  210. }
  211. // Finish waits until all workers have finished working. It would return any error passed by Done.
  212. // If Finish is called multiple time, it will wait for workers to finish only once(first time).
  213. // From next calls, it will return same error as found on first call.
  214. func (t *Throttle) Finish() error {
  215. t.once.Do(func() {
  216. t.wg.Wait()
  217. close(t.ch)
  218. close(t.errCh)
  219. for err := range t.errCh {
  220. if err != nil {
  221. t.finishErr = err
  222. return
  223. }
  224. }
  225. })
  226. return t.finishErr
  227. }
  228. // U16ToBytes converts the given Uint16 to bytes
  229. func U16ToBytes(v uint16) []byte {
  230. var uBuf [2]byte
  231. binary.BigEndian.PutUint16(uBuf[:], v)
  232. return uBuf[:]
  233. }
  234. // BytesToU16 converts the given byte slice to uint16
  235. func BytesToU16(b []byte) uint16 {
  236. return binary.BigEndian.Uint16(b)
  237. }
  238. // U32ToBytes converts the given Uint32 to bytes
  239. func U32ToBytes(v uint32) []byte {
  240. var uBuf [4]byte
  241. binary.BigEndian.PutUint32(uBuf[:], v)
  242. return uBuf[:]
  243. }
  244. // BytesToU32 converts the given byte slice to uint32
  245. func BytesToU32(b []byte) uint32 {
  246. return binary.BigEndian.Uint32(b)
  247. }
  248. // U32SliceToBytes converts the given Uint32 slice to byte slice
  249. func U32SliceToBytes(u32s []uint32) []byte {
  250. if len(u32s) == 0 {
  251. return nil
  252. }
  253. var b []byte
  254. hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
  255. hdr.Len = len(u32s) * 4
  256. hdr.Cap = hdr.Len
  257. hdr.Data = uintptr(unsafe.Pointer(&u32s[0]))
  258. return b
  259. }
  260. // BytesToU32Slice converts the given byte slice to uint32 slice
  261. func BytesToU32Slice(b []byte) []uint32 {
  262. if len(b) == 0 {
  263. return nil
  264. }
  265. var u32s []uint32
  266. hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u32s))
  267. hdr.Len = len(b) / 4
  268. hdr.Cap = hdr.Len
  269. hdr.Data = uintptr(unsafe.Pointer(&b[0]))
  270. return u32s
  271. }
  272. // U64ToBytes converts the given Uint64 to bytes
  273. func U64ToBytes(v uint64) []byte {
  274. var uBuf [8]byte
  275. binary.BigEndian.PutUint64(uBuf[:], v)
  276. return uBuf[:]
  277. }
  278. // BytesToU64 converts the given byte slice to uint64
  279. func BytesToU64(b []byte) uint64 {
  280. return binary.BigEndian.Uint64(b)
  281. }
  282. // U64SliceToBytes converts the given Uint64 slice to byte slice
  283. func U64SliceToBytes(u64s []uint64) []byte {
  284. if len(u64s) == 0 {
  285. return nil
  286. }
  287. var b []byte
  288. hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
  289. hdr.Len = len(u64s) * 8
  290. hdr.Cap = hdr.Len
  291. hdr.Data = uintptr(unsafe.Pointer(&u64s[0]))
  292. return b
  293. }
  294. // BytesToU64Slice converts the given byte slice to uint64 slice
  295. func BytesToU64Slice(b []byte) []uint64 {
  296. if len(b) == 0 {
  297. return nil
  298. }
  299. var u64s []uint64
  300. hdr := (*reflect.SliceHeader)(unsafe.Pointer(&u64s))
  301. hdr.Len = len(b) / 8
  302. hdr.Cap = hdr.Len
  303. hdr.Data = uintptr(unsafe.Pointer(&b[0]))
  304. return u64s
  305. }
  306. // page struct contains one underlying buffer.
  307. type page struct {
  308. buf []byte
  309. }
  310. // PageBuffer consists of many pages. A page is a wrapper over []byte. PageBuffer can act as a
  311. // replacement of bytes.Buffer. Instead of having single underlying buffer, it has multiple
  312. // underlying buffers. Hence it avoids any copy during relocation(as happens in bytes.Buffer).
  313. // PageBuffer allocates memory in pages. Once a page is full, it will allocate page with double the
  314. // size of previous page. Its function are not thread safe.
  315. type PageBuffer struct {
  316. pages []*page
  317. length int // Length of PageBuffer.
  318. nextPageSize int // Size of next page to be allocated.
  319. }
  320. // NewPageBuffer returns a new PageBuffer with first page having size pageSize.
  321. func NewPageBuffer(pageSize int) *PageBuffer {
  322. b := &PageBuffer{}
  323. b.pages = append(b.pages, &page{buf: make([]byte, 0, pageSize)})
  324. b.nextPageSize = pageSize * 2
  325. return b
  326. }
  327. // Write writes data to PageBuffer b. It returns number of bytes written and any error encountered.
  328. func (b *PageBuffer) Write(data []byte) (int, error) {
  329. dataLen := len(data)
  330. for {
  331. cp := b.pages[len(b.pages)-1] // Current page.
  332. n := copy(cp.buf[len(cp.buf):cap(cp.buf)], data)
  333. cp.buf = cp.buf[:len(cp.buf)+n]
  334. b.length += n
  335. if len(data) == n {
  336. break
  337. }
  338. data = data[n:]
  339. b.pages = append(b.pages, &page{buf: make([]byte, 0, b.nextPageSize)})
  340. b.nextPageSize *= 2
  341. }
  342. return dataLen, nil
  343. }
  344. // WriteByte writes data byte to PageBuffer and returns any encountered error.
  345. func (b *PageBuffer) WriteByte(data byte) error {
  346. _, err := b.Write([]byte{data})
  347. return err
  348. }
  349. // Len returns length of PageBuffer.
  350. func (b *PageBuffer) Len() int {
  351. return b.length
  352. }
  353. // pageForOffset returns pageIdx and startIdx for the offset.
  354. func (b *PageBuffer) pageForOffset(offset int) (int, int) {
  355. AssertTrue(offset < b.length)
  356. var pageIdx, startIdx, sizeNow int
  357. for i := 0; i < len(b.pages); i++ {
  358. cp := b.pages[i]
  359. if sizeNow+len(cp.buf)-1 < offset {
  360. sizeNow += len(cp.buf)
  361. } else {
  362. pageIdx = i
  363. startIdx = offset - sizeNow
  364. break
  365. }
  366. }
  367. return pageIdx, startIdx
  368. }
  369. // Truncate truncates PageBuffer to length n.
  370. func (b *PageBuffer) Truncate(n int) {
  371. pageIdx, startIdx := b.pageForOffset(n)
  372. // For simplicity of the code reject extra pages. These pages can be kept.
  373. b.pages = b.pages[:pageIdx+1]
  374. cp := b.pages[len(b.pages)-1]
  375. cp.buf = cp.buf[:startIdx]
  376. b.length = n
  377. }
  378. // Bytes returns whole Buffer data as single []byte.
  379. func (b *PageBuffer) Bytes() []byte {
  380. buf := make([]byte, b.length)
  381. written := 0
  382. for i := 0; i < len(b.pages); i++ {
  383. written += copy(buf[written:], b.pages[i].buf)
  384. }
  385. return buf
  386. }
  387. // WriteTo writes whole buffer to w. It returns number of bytes written and any error encountered.
  388. func (b *PageBuffer) WriteTo(w io.Writer) (int64, error) {
  389. written := int64(0)
  390. for i := 0; i < len(b.pages); i++ {
  391. n, err := w.Write(b.pages[i].buf)
  392. written += int64(n)
  393. if err != nil {
  394. return written, err
  395. }
  396. }
  397. return written, nil
  398. }
  399. // NewReaderAt returns a reader which starts reading from offset in page buffer.
  400. func (b *PageBuffer) NewReaderAt(offset int) *PageBufferReader {
  401. pageIdx, startIdx := b.pageForOffset(offset)
  402. return &PageBufferReader{
  403. buf: b,
  404. pageIdx: pageIdx,
  405. startIdx: startIdx,
  406. }
  407. }
  408. // PageBufferReader is a reader for PageBuffer.
  409. type PageBufferReader struct {
  410. buf *PageBuffer // Underlying page buffer.
  411. pageIdx int // Idx of page from where it will start reading.
  412. startIdx int // Idx inside page - buf.pages[pageIdx] from where it will start reading.
  413. }
  414. // Read reads upto len(p) bytes. It returns number of bytes read and any error encountered.
  415. func (r *PageBufferReader) Read(p []byte) (int, error) {
  416. // Check if there is enough to Read.
  417. pc := len(r.buf.pages)
  418. read := 0
  419. for r.pageIdx < pc && read < len(p) {
  420. cp := r.buf.pages[r.pageIdx] // Current Page.
  421. endIdx := len(cp.buf) // Last Idx up to which we can read from this page.
  422. n := copy(p[read:], cp.buf[r.startIdx:endIdx])
  423. read += n
  424. r.startIdx += n
  425. // Instead of len(cp.buf), we comparing with cap(cp.buf). This ensures that we move to next
  426. // page only when we have read all data. Reading from last page is an edge case. We don't
  427. // want to move to next page until last page is full to its capacity.
  428. if r.startIdx >= cap(cp.buf) {
  429. // We should move to next page.
  430. r.pageIdx++
  431. r.startIdx = 0
  432. continue
  433. }
  434. // When last page in not full to its capacity and we have read all data up to its
  435. // length, just break out of the loop.
  436. if r.pageIdx == pc-1 {
  437. break
  438. }
  439. }
  440. if read == 0 && len(p) > 0 {
  441. return read, io.EOF
  442. }
  443. return read, nil
  444. }
  445. const kvsz = int(unsafe.Sizeof(pb.KV{}))
  446. func NewKV(alloc *z.Allocator) *pb.KV {
  447. if alloc == nil {
  448. return &pb.KV{}
  449. }
  450. b := alloc.AllocateAligned(kvsz)
  451. return (*pb.KV)(unsafe.Pointer(&b[0]))
  452. }
  453. // IBytesToString converts size in bytes to human readable format.
  454. // The code is taken from humanize library and changed to provide
  455. // value upto custom decimal precision.
  456. // IBytesToString(12312412, 1) -> 11.7 MiB
  457. func IBytesToString(size uint64, precision int) string {
  458. sizes := []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"}
  459. base := float64(1024)
  460. if size < 10 {
  461. return fmt.Sprintf("%d B", size)
  462. }
  463. e := math.Floor(math.Log(float64(size)) / math.Log(base))
  464. suffix := sizes[int(e)]
  465. val := float64(size) / math.Pow(base, e)
  466. f := "%." + strconv.Itoa(precision) + "f %s"
  467. return fmt.Sprintf(f, val, suffix)
  468. }
  469. type RateMonitor struct {
  470. start time.Time
  471. lastSent uint64
  472. lastCapture time.Time
  473. rates []float64
  474. idx int
  475. }
  476. func NewRateMonitor(numSamples int) *RateMonitor {
  477. return &RateMonitor{
  478. start: time.Now(),
  479. rates: make([]float64, numSamples),
  480. }
  481. }
  482. const minRate = 0.0001
  483. // Capture captures the current number of sent bytes. This number should be monotonically
  484. // increasing.
  485. func (rm *RateMonitor) Capture(sent uint64) {
  486. diff := sent - rm.lastSent
  487. dur := time.Since(rm.lastCapture)
  488. rm.lastCapture, rm.lastSent = time.Now(), sent
  489. rate := float64(diff) / dur.Seconds()
  490. if rate < minRate {
  491. rate = minRate
  492. }
  493. rm.rates[rm.idx] = rate
  494. rm.idx = (rm.idx + 1) % len(rm.rates)
  495. }
  496. // Rate returns the average rate of transmission smoothed out by the number of samples.
  497. func (rm *RateMonitor) Rate() uint64 {
  498. var total float64
  499. var den float64
  500. for _, r := range rm.rates {
  501. if r < minRate {
  502. // Ignore this. We always set minRate, so this is a zero.
  503. // Typically at the start of the rate monitor, we'd have zeros.
  504. continue
  505. }
  506. total += r
  507. den += 1.0
  508. }
  509. if den < minRate {
  510. return 0
  511. }
  512. return uint64(total / den)
  513. }