buffer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. /*
  2. * Copyright 2020 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package z
  17. import (
  18. "encoding/binary"
  19. "fmt"
  20. "log"
  21. "os"
  22. "sort"
  23. "sync/atomic"
  24. "github.com/pkg/errors"
  25. )
  26. const (
  27. defaultCapacity = 64
  28. defaultTag = "buffer"
  29. )
  30. // Buffer is equivalent of bytes.Buffer without the ability to read. It is NOT thread-safe.
  31. //
  32. // In UseCalloc mode, z.Calloc is used to allocate memory, which depending upon how the code is
  33. // compiled could use jemalloc for allocations.
  34. //
  35. // In UseMmap mode, Buffer uses file mmap to allocate memory. This allows us to store big data
  36. // structures without using physical memory.
  37. //
  38. // MaxSize can be set to limit the memory usage.
  39. type Buffer struct {
  40. padding uint64 // number of starting bytes used for padding
  41. offset uint64 // used length of the buffer
  42. buf []byte // backing slice for the buffer
  43. bufType BufferType // type of the underlying buffer
  44. curSz int // capacity of the buffer
  45. maxSz int // causes a panic if the buffer grows beyond this size
  46. mmapFile *MmapFile // optional mmap backing for the buffer
  47. autoMmapAfter int // Calloc falls back to an mmaped tmpfile after crossing this size
  48. autoMmapDir string // directory for autoMmap to create a tempfile in
  49. persistent bool // when enabled, Release will not delete the underlying mmap file
  50. tag string // used for jemalloc stats
  51. }
  52. func NewBuffer(capacity int, tag string) *Buffer {
  53. if capacity < defaultCapacity {
  54. capacity = defaultCapacity
  55. }
  56. if tag == "" {
  57. tag = defaultTag
  58. }
  59. return &Buffer{
  60. buf: Calloc(capacity, tag),
  61. bufType: UseCalloc,
  62. curSz: capacity,
  63. offset: 8,
  64. padding: 8,
  65. tag: tag,
  66. }
  67. }
  68. // It is the caller's responsibility to set offset after this, because Buffer
  69. // doesn't remember what it was.
  70. func NewBufferPersistent(path string, capacity int) (*Buffer, error) {
  71. file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
  72. if err != nil {
  73. return nil, err
  74. }
  75. buffer, err := newBufferFile(file, capacity)
  76. if err != nil {
  77. return nil, err
  78. }
  79. buffer.persistent = true
  80. return buffer, nil
  81. }
  82. func NewBufferTmp(dir string, capacity int) (*Buffer, error) {
  83. if dir == "" {
  84. dir = tmpDir
  85. }
  86. file, err := os.CreateTemp(dir, "buffer")
  87. if err != nil {
  88. return nil, err
  89. }
  90. return newBufferFile(file, capacity)
  91. }
  92. func newBufferFile(file *os.File, capacity int) (*Buffer, error) {
  93. if capacity < defaultCapacity {
  94. capacity = defaultCapacity
  95. }
  96. mmapFile, err := OpenMmapFileUsing(file, capacity, true)
  97. if err != nil && err != NewFile {
  98. return nil, err
  99. }
  100. buf := &Buffer{
  101. buf: mmapFile.Data,
  102. bufType: UseMmap,
  103. curSz: len(mmapFile.Data),
  104. mmapFile: mmapFile,
  105. offset: 8,
  106. padding: 8,
  107. }
  108. return buf, nil
  109. }
  110. func NewBufferSlice(slice []byte) *Buffer {
  111. return &Buffer{
  112. offset: uint64(len(slice)),
  113. buf: slice,
  114. bufType: UseInvalid,
  115. }
  116. }
  117. func (b *Buffer) WithAutoMmap(threshold int, path string) *Buffer {
  118. if b.bufType != UseCalloc {
  119. panic("can only autoMmap with UseCalloc")
  120. }
  121. b.autoMmapAfter = threshold
  122. if path == "" {
  123. b.autoMmapDir = tmpDir
  124. } else {
  125. b.autoMmapDir = path
  126. }
  127. return b
  128. }
  129. func (b *Buffer) WithMaxSize(size int) *Buffer {
  130. b.maxSz = size
  131. return b
  132. }
  133. func (b *Buffer) IsEmpty() bool {
  134. return int(b.offset) == b.StartOffset()
  135. }
  136. // LenWithPadding would return the number of bytes written to the buffer so far
  137. // plus the padding at the start of the buffer.
  138. func (b *Buffer) LenWithPadding() int {
  139. return int(atomic.LoadUint64(&b.offset))
  140. }
  141. // LenNoPadding would return the number of bytes written to the buffer so far
  142. // (without the padding).
  143. func (b *Buffer) LenNoPadding() int {
  144. return int(atomic.LoadUint64(&b.offset) - b.padding)
  145. }
  146. // Bytes would return all the written bytes as a slice.
  147. func (b *Buffer) Bytes() []byte {
  148. off := atomic.LoadUint64(&b.offset)
  149. return b.buf[b.padding:off]
  150. }
  151. // Grow would grow the buffer to have at least n more bytes. In case the buffer is at capacity, it
  152. // would reallocate twice the size of current capacity + n, to ensure n bytes can be written to the
  153. // buffer without further allocation. In UseMmap mode, this might result in underlying file
  154. // expansion.
  155. func (b *Buffer) Grow(n int) {
  156. if b.buf == nil {
  157. panic("z.Buffer needs to be initialized before using")
  158. }
  159. if b.maxSz > 0 && int(b.offset)+n > b.maxSz {
  160. err := fmt.Errorf(
  161. "z.Buffer max size exceeded: %d offset: %d grow: %d", b.maxSz, b.offset, n)
  162. panic(err)
  163. }
  164. if int(b.offset)+n < b.curSz {
  165. return
  166. }
  167. // Calculate new capacity.
  168. growBy := b.curSz + n
  169. // Don't allocate more than 1GB at a time.
  170. if growBy > 1<<30 {
  171. growBy = 1 << 30
  172. }
  173. // Allocate at least n, even if it exceeds the 1GB limit above.
  174. if n > growBy {
  175. growBy = n
  176. }
  177. b.curSz += growBy
  178. switch b.bufType {
  179. case UseCalloc:
  180. // If autoMmap gets triggered, copy the slice over to an mmaped file.
  181. if b.autoMmapAfter > 0 && b.curSz > b.autoMmapAfter {
  182. b.bufType = UseMmap
  183. file, err := os.CreateTemp(b.autoMmapDir, "")
  184. if err != nil {
  185. panic(err)
  186. }
  187. mmapFile, err := OpenMmapFileUsing(file, b.curSz, true)
  188. if err != nil && err != NewFile {
  189. panic(err)
  190. }
  191. assert(int(b.offset) == copy(mmapFile.Data, b.buf[:b.offset]))
  192. Free(b.buf)
  193. b.mmapFile = mmapFile
  194. b.buf = mmapFile.Data
  195. break
  196. }
  197. // Else, reallocate the slice.
  198. newBuf := Calloc(b.curSz, b.tag)
  199. assert(int(b.offset) == copy(newBuf, b.buf[:b.offset]))
  200. Free(b.buf)
  201. b.buf = newBuf
  202. case UseMmap:
  203. // Truncate and remap the underlying file.
  204. if err := b.mmapFile.Truncate(int64(b.curSz)); err != nil {
  205. err = errors.Wrapf(err,
  206. "while trying to truncate file: %s to size: %d", b.mmapFile.Fd.Name(), b.curSz)
  207. panic(err)
  208. }
  209. b.buf = b.mmapFile.Data
  210. default:
  211. panic("can only use Grow on UseCalloc and UseMmap buffers")
  212. }
  213. }
  214. // Allocate is a way to get a slice of size n back from the buffer. This slice can be directly
  215. // written to. Warning: Allocate is not thread-safe. The byte slice returned MUST be used before
  216. // further calls to Buffer.
  217. func (b *Buffer) Allocate(n int) []byte {
  218. b.Grow(n)
  219. off := b.offset
  220. b.offset += uint64(n)
  221. return b.buf[off:int(b.offset)]
  222. }
  223. // AllocateOffset works the same way as allocate, but instead of returning a byte slice, it returns
  224. // the offset of the allocation.
  225. func (b *Buffer) AllocateOffset(n int) int {
  226. b.Grow(n)
  227. b.offset += uint64(n)
  228. return int(b.offset) - n
  229. }
  230. func (b *Buffer) writeLen(sz int) {
  231. buf := b.Allocate(8)
  232. binary.BigEndian.PutUint64(buf, uint64(sz))
  233. }
  234. // SliceAllocate would encode the size provided into the buffer, followed by a call to Allocate,
  235. // hence returning the slice of size sz. This can be used to allocate a lot of small buffers into
  236. // this big buffer.
  237. // Note that SliceAllocate should NOT be mixed with normal calls to Write.
  238. func (b *Buffer) SliceAllocate(sz int) []byte {
  239. b.Grow(8 + sz)
  240. b.writeLen(sz)
  241. return b.Allocate(sz)
  242. }
  243. func (b *Buffer) StartOffset() int {
  244. return int(b.padding)
  245. }
  246. func (b *Buffer) WriteSlice(slice []byte) {
  247. dst := b.SliceAllocate(len(slice))
  248. assert(len(slice) == copy(dst, slice))
  249. }
  250. func (b *Buffer) SliceIterate(f func(slice []byte) error) error {
  251. if b.IsEmpty() {
  252. return nil
  253. }
  254. next := b.StartOffset()
  255. var slice []byte
  256. for next >= 0 {
  257. slice, next = b.Slice(next)
  258. if len(slice) == 0 {
  259. continue
  260. }
  261. if err := f(slice); err != nil {
  262. return err
  263. }
  264. }
  265. return nil
  266. }
  267. const (
  268. UseCalloc BufferType = iota
  269. UseMmap
  270. UseInvalid
  271. )
  272. type BufferType int
  273. func (t BufferType) String() string {
  274. switch t {
  275. case UseCalloc:
  276. return "UseCalloc"
  277. case UseMmap:
  278. return "UseMmap"
  279. default:
  280. return "UseInvalid"
  281. }
  282. }
  283. type LessFunc func(a, b []byte) bool
  284. type sortHelper struct {
  285. offsets []int
  286. b *Buffer
  287. tmp *Buffer
  288. less LessFunc
  289. small []int
  290. }
  291. func (s *sortHelper) sortSmall(start, end int) {
  292. s.tmp.Reset()
  293. s.small = s.small[:0]
  294. next := start
  295. for next >= 0 && next < end {
  296. s.small = append(s.small, next)
  297. _, next = s.b.Slice(next)
  298. }
  299. // We are sorting the slices pointed to by s.small offsets, but only moving the offsets around.
  300. sort.Slice(s.small, func(i, j int) bool {
  301. left, _ := s.b.Slice(s.small[i])
  302. right, _ := s.b.Slice(s.small[j])
  303. return s.less(left, right)
  304. })
  305. // Now we iterate over the s.small offsets and copy over the slices. The result is now in order.
  306. for _, off := range s.small {
  307. _, _ = s.tmp.Write(rawSlice(s.b.buf[off:]))
  308. }
  309. assert(end-start == copy(s.b.buf[start:end], s.tmp.Bytes()))
  310. }
  311. func assert(b bool) {
  312. if !b {
  313. log.Fatalf("%+v", errors.Errorf("Assertion failure"))
  314. }
  315. }
  316. func check(err error) {
  317. if err != nil {
  318. log.Fatalf("%+v", err)
  319. }
  320. }
  321. func check2(_ interface{}, err error) {
  322. check(err)
  323. }
  324. func (s *sortHelper) merge(left, right []byte, start, end int) {
  325. if len(left) == 0 || len(right) == 0 {
  326. return
  327. }
  328. s.tmp.Reset()
  329. check2(s.tmp.Write(left))
  330. left = s.tmp.Bytes()
  331. var ls, rs []byte
  332. copyLeft := func() {
  333. assert(len(ls) == copy(s.b.buf[start:], ls))
  334. left = left[len(ls):]
  335. start += len(ls)
  336. }
  337. copyRight := func() {
  338. assert(len(rs) == copy(s.b.buf[start:], rs))
  339. right = right[len(rs):]
  340. start += len(rs)
  341. }
  342. for start < end {
  343. if len(left) == 0 {
  344. assert(len(right) == copy(s.b.buf[start:end], right))
  345. return
  346. }
  347. if len(right) == 0 {
  348. assert(len(left) == copy(s.b.buf[start:end], left))
  349. return
  350. }
  351. ls = rawSlice(left)
  352. rs = rawSlice(right)
  353. // We skip the first 4 bytes in the rawSlice, because that stores the length.
  354. if s.less(ls[8:], rs[8:]) {
  355. copyLeft()
  356. } else {
  357. copyRight()
  358. }
  359. }
  360. }
  361. func (s *sortHelper) sort(lo, hi int) []byte {
  362. assert(lo <= hi)
  363. mid := lo + (hi-lo)/2
  364. loff, hoff := s.offsets[lo], s.offsets[hi]
  365. if lo == mid {
  366. // No need to sort, just return the buffer.
  367. return s.b.buf[loff:hoff]
  368. }
  369. // lo, mid would sort from [offset[lo], offset[mid]) .
  370. left := s.sort(lo, mid)
  371. // Typically we'd use mid+1, but here mid represents an offset in the buffer. Each offset
  372. // contains a thousand entries. So, if we do mid+1, we'd skip over those entries.
  373. right := s.sort(mid, hi)
  374. s.merge(left, right, loff, hoff)
  375. return s.b.buf[loff:hoff]
  376. }
  377. // SortSlice is like SortSliceBetween but sorting over the entire buffer.
  378. func (b *Buffer) SortSlice(less func(left, right []byte) bool) {
  379. b.SortSliceBetween(b.StartOffset(), int(b.offset), less)
  380. }
  381. func (b *Buffer) SortSliceBetween(start, end int, less LessFunc) {
  382. if start >= end {
  383. return
  384. }
  385. if start == 0 {
  386. panic("start can never be zero")
  387. }
  388. var offsets []int
  389. next, count := start, 0
  390. for next >= 0 && next < end {
  391. if count%1024 == 0 {
  392. offsets = append(offsets, next)
  393. }
  394. _, next = b.Slice(next)
  395. count++
  396. }
  397. assert(len(offsets) > 0)
  398. if offsets[len(offsets)-1] != end {
  399. offsets = append(offsets, end)
  400. }
  401. szTmp := int(float64((end-start)/2) * 1.1)
  402. s := &sortHelper{
  403. offsets: offsets,
  404. b: b,
  405. less: less,
  406. small: make([]int, 0, 1024),
  407. tmp: NewBuffer(szTmp, b.tag),
  408. }
  409. defer func() { _ = s.tmp.Release() }()
  410. left := offsets[0]
  411. for _, off := range offsets[1:] {
  412. s.sortSmall(left, off)
  413. left = off
  414. }
  415. s.sort(0, len(offsets)-1)
  416. }
  417. func rawSlice(buf []byte) []byte {
  418. sz := binary.BigEndian.Uint64(buf)
  419. return buf[:8+int(sz)]
  420. }
  421. // Slice would return the slice written at offset.
  422. func (b *Buffer) Slice(offset int) ([]byte, int) {
  423. if offset >= int(b.offset) {
  424. return nil, -1
  425. }
  426. sz := binary.BigEndian.Uint64(b.buf[offset:])
  427. start := offset + 8
  428. next := start + int(sz)
  429. res := b.buf[start:next]
  430. if next >= int(b.offset) {
  431. next = -1
  432. }
  433. return res, next
  434. }
  435. // SliceOffsets is an expensive function. Use sparingly.
  436. func (b *Buffer) SliceOffsets() []int {
  437. next := b.StartOffset()
  438. var offsets []int
  439. for next >= 0 {
  440. offsets = append(offsets, next)
  441. _, next = b.Slice(next)
  442. }
  443. return offsets
  444. }
  445. func (b *Buffer) Data(offset int) []byte {
  446. if offset > b.curSz {
  447. panic("offset beyond current size")
  448. }
  449. return b.buf[offset:b.curSz]
  450. }
  451. // Write would write p bytes to the buffer.
  452. func (b *Buffer) Write(p []byte) (n int, err error) {
  453. n = len(p)
  454. b.Grow(n)
  455. assert(n == copy(b.buf[b.offset:], p))
  456. b.offset += uint64(n)
  457. return n, nil
  458. }
  459. // Reset would reset the buffer to be reused.
  460. func (b *Buffer) Reset() {
  461. b.offset = uint64(b.StartOffset())
  462. }
  463. // Release would free up the memory allocated by the buffer. Once the usage of buffer is done, it is
  464. // important to call Release, otherwise a memory leak can happen.
  465. func (b *Buffer) Release() error {
  466. if b == nil {
  467. return nil
  468. }
  469. switch b.bufType {
  470. case UseCalloc:
  471. Free(b.buf)
  472. case UseMmap:
  473. if b.mmapFile == nil {
  474. return nil
  475. }
  476. path := b.mmapFile.Fd.Name()
  477. if err := b.mmapFile.Close(-1); err != nil {
  478. return errors.Wrapf(err, "while closing file: %s", path)
  479. }
  480. if !b.persistent {
  481. if err := os.Remove(path); err != nil {
  482. return errors.Wrapf(err, "while deleting file %s", path)
  483. }
  484. }
  485. }
  486. return nil
  487. }