buffer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package z
  6. import (
  7. "encoding/binary"
  8. "errors"
  9. "fmt"
  10. "log"
  11. "os"
  12. "sort"
  13. "sync/atomic"
  14. )
  15. const (
  16. defaultCapacity = 64
  17. defaultTag = "buffer"
  18. )
  19. // Buffer is equivalent of bytes.Buffer without the ability to read. It is NOT thread-safe.
  20. //
  21. // In UseCalloc mode, z.Calloc is used to allocate memory, which depending upon how the code is
  22. // compiled could use jemalloc for allocations.
  23. //
  24. // In UseMmap mode, Buffer uses file mmap to allocate memory. This allows us to store big data
  25. // structures without using physical memory.
  26. //
  27. // MaxSize can be set to limit the memory usage.
  28. type Buffer struct {
  29. padding uint64 // number of starting bytes used for padding
  30. offset uint64 // used length of the buffer
  31. buf []byte // backing slice for the buffer
  32. bufType BufferType // type of the underlying buffer
  33. curSz int // capacity of the buffer
  34. maxSz int // causes a panic if the buffer grows beyond this size
  35. mmapFile *MmapFile // optional mmap backing for the buffer
  36. autoMmapAfter int // Calloc falls back to an mmaped tmpfile after crossing this size
  37. autoMmapDir string // directory for autoMmap to create a tempfile in
  38. persistent bool // when enabled, Release will not delete the underlying mmap file
  39. tag string // used for jemalloc stats
  40. }
  41. func NewBuffer(capacity int, tag string) *Buffer {
  42. if capacity < defaultCapacity {
  43. capacity = defaultCapacity
  44. }
  45. if tag == "" {
  46. tag = defaultTag
  47. }
  48. return &Buffer{
  49. buf: Calloc(capacity, tag),
  50. bufType: UseCalloc,
  51. curSz: capacity,
  52. offset: 8,
  53. padding: 8,
  54. tag: tag,
  55. }
  56. }
  57. // It is the caller's responsibility to set offset after this, because Buffer
  58. // doesn't remember what it was.
  59. func NewBufferPersistent(path string, capacity int) (*Buffer, error) {
  60. file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
  61. if err != nil {
  62. return nil, err
  63. }
  64. buffer, err := newBufferFile(file, capacity)
  65. if err != nil {
  66. return nil, err
  67. }
  68. buffer.persistent = true
  69. return buffer, nil
  70. }
  71. func NewBufferTmp(dir string, capacity int) (*Buffer, error) {
  72. if dir == "" {
  73. dir = tmpDir
  74. }
  75. file, err := os.CreateTemp(dir, "buffer")
  76. if err != nil {
  77. return nil, err
  78. }
  79. return newBufferFile(file, capacity)
  80. }
  81. func newBufferFile(file *os.File, capacity int) (*Buffer, error) {
  82. if capacity < defaultCapacity {
  83. capacity = defaultCapacity
  84. }
  85. mmapFile, err := OpenMmapFileUsing(file, capacity, true)
  86. if err != nil && err != NewFile {
  87. return nil, err
  88. }
  89. buf := &Buffer{
  90. buf: mmapFile.Data,
  91. bufType: UseMmap,
  92. curSz: len(mmapFile.Data),
  93. mmapFile: mmapFile,
  94. offset: 8,
  95. padding: 8,
  96. }
  97. return buf, nil
  98. }
  99. func NewBufferSlice(slice []byte) *Buffer {
  100. return &Buffer{
  101. offset: uint64(len(slice)),
  102. buf: slice,
  103. bufType: UseInvalid,
  104. }
  105. }
  106. func (b *Buffer) WithAutoMmap(threshold int, path string) *Buffer {
  107. if b.bufType != UseCalloc {
  108. panic("can only autoMmap with UseCalloc")
  109. }
  110. b.autoMmapAfter = threshold
  111. if path == "" {
  112. b.autoMmapDir = tmpDir
  113. } else {
  114. b.autoMmapDir = path
  115. }
  116. return b
  117. }
  118. func (b *Buffer) WithMaxSize(size int) *Buffer {
  119. b.maxSz = size
  120. return b
  121. }
  122. func (b *Buffer) IsEmpty() bool {
  123. return int(b.offset) == b.StartOffset()
  124. }
  125. // LenWithPadding would return the number of bytes written to the buffer so far
  126. // plus the padding at the start of the buffer.
  127. func (b *Buffer) LenWithPadding() int {
  128. return int(atomic.LoadUint64(&b.offset))
  129. }
  130. // LenNoPadding would return the number of bytes written to the buffer so far
  131. // (without the padding).
  132. func (b *Buffer) LenNoPadding() int {
  133. return int(atomic.LoadUint64(&b.offset) - b.padding)
  134. }
  135. // Bytes would return all the written bytes as a slice.
  136. func (b *Buffer) Bytes() []byte {
  137. off := atomic.LoadUint64(&b.offset)
  138. return b.buf[b.padding:off]
  139. }
  140. // Grow would grow the buffer to have at least n more bytes. In case the buffer is at capacity, it
  141. // would reallocate twice the size of current capacity + n, to ensure n bytes can be written to the
  142. // buffer without further allocation. In UseMmap mode, this might result in underlying file
  143. // expansion.
  144. func (b *Buffer) Grow(n int) {
  145. if b.buf == nil {
  146. panic("z.Buffer needs to be initialized before using")
  147. }
  148. if b.maxSz > 0 && int(b.offset)+n > b.maxSz {
  149. err := fmt.Errorf(
  150. "z.Buffer max size exceeded: %d offset: %d grow: %d", b.maxSz, b.offset, n)
  151. panic(err)
  152. }
  153. if int(b.offset)+n < b.curSz {
  154. return
  155. }
  156. // Calculate new capacity.
  157. growBy := b.curSz + n
  158. // Don't allocate more than 1GB at a time.
  159. if growBy > 1<<30 {
  160. growBy = 1 << 30
  161. }
  162. // Allocate at least n, even if it exceeds the 1GB limit above.
  163. if n > growBy {
  164. growBy = n
  165. }
  166. b.curSz += growBy
  167. switch b.bufType {
  168. case UseCalloc:
  169. // If autoMmap gets triggered, copy the slice over to an mmaped file.
  170. if b.autoMmapAfter > 0 && b.curSz > b.autoMmapAfter {
  171. b.bufType = UseMmap
  172. file, err := os.CreateTemp(b.autoMmapDir, "")
  173. if err != nil {
  174. panic(err)
  175. }
  176. mmapFile, err := OpenMmapFileUsing(file, b.curSz, true)
  177. if err != nil && err != NewFile {
  178. panic(err)
  179. }
  180. assert(int(b.offset) == copy(mmapFile.Data, b.buf[:b.offset]))
  181. Free(b.buf)
  182. b.mmapFile = mmapFile
  183. b.buf = mmapFile.Data
  184. break
  185. }
  186. // Else, reallocate the slice.
  187. newBuf := Calloc(b.curSz, b.tag)
  188. assert(int(b.offset) == copy(newBuf, b.buf[:b.offset]))
  189. Free(b.buf)
  190. b.buf = newBuf
  191. case UseMmap:
  192. // Truncate and remap the underlying file.
  193. if err := b.mmapFile.Truncate(int64(b.curSz)); err != nil {
  194. err = errors.Join(err,
  195. fmt.Errorf("while trying to truncate file: %s to size: %d", b.mmapFile.Fd.Name(), b.curSz))
  196. panic(err)
  197. }
  198. b.buf = b.mmapFile.Data
  199. default:
  200. panic("can only use Grow on UseCalloc and UseMmap buffers")
  201. }
  202. }
  203. // Allocate is a way to get a slice of size n back from the buffer. This slice can be directly
  204. // written to. Warning: Allocate is not thread-safe. The byte slice returned MUST be used before
  205. // further calls to Buffer.
  206. func (b *Buffer) Allocate(n int) []byte {
  207. b.Grow(n)
  208. off := b.offset
  209. b.offset += uint64(n)
  210. return b.buf[off:int(b.offset)]
  211. }
  212. // AllocateOffset works the same way as allocate, but instead of returning a byte slice, it returns
  213. // the offset of the allocation.
  214. func (b *Buffer) AllocateOffset(n int) int {
  215. b.Grow(n)
  216. b.offset += uint64(n)
  217. return int(b.offset) - n
  218. }
  219. func (b *Buffer) writeLen(sz int) {
  220. buf := b.Allocate(8)
  221. binary.BigEndian.PutUint64(buf, uint64(sz))
  222. }
  223. // SliceAllocate would encode the size provided into the buffer, followed by a call to Allocate,
  224. // hence returning the slice of size sz. This can be used to allocate a lot of small buffers into
  225. // this big buffer.
  226. // Note that SliceAllocate should NOT be mixed with normal calls to Write.
  227. func (b *Buffer) SliceAllocate(sz int) []byte {
  228. b.Grow(8 + sz)
  229. b.writeLen(sz)
  230. return b.Allocate(sz)
  231. }
  232. func (b *Buffer) StartOffset() int {
  233. return int(b.padding)
  234. }
  235. func (b *Buffer) WriteSlice(slice []byte) {
  236. dst := b.SliceAllocate(len(slice))
  237. assert(len(slice) == copy(dst, slice))
  238. }
  239. func (b *Buffer) SliceIterate(f func(slice []byte) error) error {
  240. if b.IsEmpty() {
  241. return nil
  242. }
  243. next := b.StartOffset()
  244. var slice []byte
  245. for next >= 0 {
  246. slice, next = b.Slice(next)
  247. if len(slice) == 0 {
  248. continue
  249. }
  250. if err := f(slice); err != nil {
  251. return err
  252. }
  253. }
  254. return nil
  255. }
  256. const (
  257. UseCalloc BufferType = iota
  258. UseMmap
  259. UseInvalid
  260. )
  261. type BufferType int
  262. func (t BufferType) String() string {
  263. switch t {
  264. case UseCalloc:
  265. return "UseCalloc"
  266. case UseMmap:
  267. return "UseMmap"
  268. default:
  269. return "UseInvalid"
  270. }
  271. }
  272. type LessFunc func(a, b []byte) bool
  273. type sortHelper struct {
  274. offsets []int
  275. b *Buffer
  276. tmp *Buffer
  277. less LessFunc
  278. small []int
  279. }
  280. func (s *sortHelper) sortSmall(start, end int) {
  281. s.tmp.Reset()
  282. s.small = s.small[:0]
  283. next := start
  284. for next >= 0 && next < end {
  285. s.small = append(s.small, next)
  286. _, next = s.b.Slice(next)
  287. }
  288. // We are sorting the slices pointed to by s.small offsets, but only moving the offsets around.
  289. sort.Slice(s.small, func(i, j int) bool {
  290. left, _ := s.b.Slice(s.small[i])
  291. right, _ := s.b.Slice(s.small[j])
  292. return s.less(left, right)
  293. })
  294. // Now we iterate over the s.small offsets and copy over the slices. The result is now in order.
  295. for _, off := range s.small {
  296. _, _ = s.tmp.Write(rawSlice(s.b.buf[off:]))
  297. }
  298. assert(end-start == copy(s.b.buf[start:end], s.tmp.Bytes()))
  299. }
  300. func assert(b bool) {
  301. if !b {
  302. log.Fatalf("%+v", errors.New("Assertion failure"))
  303. }
  304. }
  305. func check(err error) {
  306. if err != nil {
  307. log.Fatalf("%+v", err)
  308. }
  309. }
  310. func check2(_ interface{}, err error) {
  311. check(err)
  312. }
  313. func (s *sortHelper) merge(left, right []byte, start, end int) {
  314. if len(left) == 0 || len(right) == 0 {
  315. return
  316. }
  317. s.tmp.Reset()
  318. check2(s.tmp.Write(left))
  319. left = s.tmp.Bytes()
  320. var ls, rs []byte
  321. copyLeft := func() {
  322. assert(len(ls) == copy(s.b.buf[start:], ls))
  323. left = left[len(ls):]
  324. start += len(ls)
  325. }
  326. copyRight := func() {
  327. assert(len(rs) == copy(s.b.buf[start:], rs))
  328. right = right[len(rs):]
  329. start += len(rs)
  330. }
  331. for start < end {
  332. if len(left) == 0 {
  333. assert(len(right) == copy(s.b.buf[start:end], right))
  334. return
  335. }
  336. if len(right) == 0 {
  337. assert(len(left) == copy(s.b.buf[start:end], left))
  338. return
  339. }
  340. ls = rawSlice(left)
  341. rs = rawSlice(right)
  342. // We skip the first 4 bytes in the rawSlice, because that stores the length.
  343. if s.less(ls[8:], rs[8:]) {
  344. copyLeft()
  345. } else {
  346. copyRight()
  347. }
  348. }
  349. }
  350. func (s *sortHelper) sort(lo, hi int) []byte {
  351. assert(lo <= hi)
  352. mid := lo + (hi-lo)/2
  353. loff, hoff := s.offsets[lo], s.offsets[hi]
  354. if lo == mid {
  355. // No need to sort, just return the buffer.
  356. return s.b.buf[loff:hoff]
  357. }
  358. // lo, mid would sort from [offset[lo], offset[mid]) .
  359. left := s.sort(lo, mid)
  360. // Typically we'd use mid+1, but here mid represents an offset in the buffer. Each offset
  361. // contains a thousand entries. So, if we do mid+1, we'd skip over those entries.
  362. right := s.sort(mid, hi)
  363. s.merge(left, right, loff, hoff)
  364. return s.b.buf[loff:hoff]
  365. }
  366. // SortSlice is like SortSliceBetween but sorting over the entire buffer.
  367. func (b *Buffer) SortSlice(less func(left, right []byte) bool) {
  368. b.SortSliceBetween(b.StartOffset(), int(b.offset), less)
  369. }
  370. func (b *Buffer) SortSliceBetween(start, end int, less LessFunc) {
  371. if start >= end {
  372. return
  373. }
  374. if start == 0 {
  375. panic("start can never be zero")
  376. }
  377. var offsets []int
  378. next, count := start, 0
  379. for next >= 0 && next < end {
  380. if count%1024 == 0 {
  381. offsets = append(offsets, next)
  382. }
  383. _, next = b.Slice(next)
  384. count++
  385. }
  386. assert(len(offsets) > 0)
  387. if offsets[len(offsets)-1] != end {
  388. offsets = append(offsets, end)
  389. }
  390. szTmp := int(float64((end-start)/2) * 1.1)
  391. s := &sortHelper{
  392. offsets: offsets,
  393. b: b,
  394. less: less,
  395. small: make([]int, 0, 1024),
  396. tmp: NewBuffer(szTmp, b.tag),
  397. }
  398. defer func() { _ = s.tmp.Release() }()
  399. left := offsets[0]
  400. for _, off := range offsets[1:] {
  401. s.sortSmall(left, off)
  402. left = off
  403. }
  404. s.sort(0, len(offsets)-1)
  405. }
  406. func rawSlice(buf []byte) []byte {
  407. sz := binary.BigEndian.Uint64(buf)
  408. return buf[:8+int(sz)]
  409. }
  410. // Slice would return the slice written at offset.
  411. func (b *Buffer) Slice(offset int) ([]byte, int) {
  412. if offset >= int(b.offset) {
  413. return nil, -1
  414. }
  415. sz := binary.BigEndian.Uint64(b.buf[offset:])
  416. start := offset + 8
  417. next := start + int(sz)
  418. res := b.buf[start:next]
  419. if next >= int(b.offset) {
  420. next = -1
  421. }
  422. return res, next
  423. }
  424. // SliceOffsets is an expensive function. Use sparingly.
  425. func (b *Buffer) SliceOffsets() []int {
  426. next := b.StartOffset()
  427. var offsets []int
  428. for next >= 0 {
  429. offsets = append(offsets, next)
  430. _, next = b.Slice(next)
  431. }
  432. return offsets
  433. }
  434. func (b *Buffer) Data(offset int) []byte {
  435. if offset > b.curSz {
  436. panic("offset beyond current size")
  437. }
  438. return b.buf[offset:b.curSz]
  439. }
  440. // Write would write p bytes to the buffer.
  441. func (b *Buffer) Write(p []byte) (n int, err error) {
  442. n = len(p)
  443. b.Grow(n)
  444. assert(n == copy(b.buf[b.offset:], p))
  445. b.offset += uint64(n)
  446. return n, nil
  447. }
  448. // Reset would reset the buffer to be reused.
  449. func (b *Buffer) Reset() {
  450. b.offset = uint64(b.StartOffset())
  451. }
  452. // Release would free up the memory allocated by the buffer. Once the usage of buffer is done, it is
  453. // important to call Release, otherwise a memory leak can happen.
  454. func (b *Buffer) Release() error {
  455. if b == nil {
  456. return nil
  457. }
  458. switch b.bufType {
  459. case UseCalloc:
  460. Free(b.buf)
  461. case UseMmap:
  462. if b.mmapFile == nil {
  463. return nil
  464. }
  465. path := b.mmapFile.Fd.Name()
  466. if err := b.mmapFile.Close(-1); err != nil {
  467. return errors.Join(err, fmt.Errorf("while closing file: %s", path))
  468. }
  469. if !b.persistent {
  470. if err := os.Remove(path); err != nil {
  471. return errors.Join(err, fmt.Errorf("while deleting file %s", path))
  472. }
  473. }
  474. }
  475. return nil
  476. }