allocator.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. /*
  2. * SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
  3. * SPDX-License-Identifier: Apache-2.0
  4. */
  5. package z
  6. import (
  7. "bytes"
  8. "fmt"
  9. "math"
  10. "math/bits"
  11. "math/rand"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "unsafe"
  17. "github.com/dustin/go-humanize"
  18. )
  19. // Allocator amortizes the cost of small allocations by allocating memory in
  20. // bigger chunks. Internally it uses z.Calloc to allocate memory. Once
  21. // allocated, the memory is not moved, so it is safe to use the allocated bytes
  22. // to unsafe cast them to Go struct pointers. Maintaining a freelist is slow.
  23. // Instead, Allocator only allocates memory, with the idea that finally we
  24. // would just release the entire Allocator.
  25. type Allocator struct {
  26. sync.Mutex
  27. compIdx uint64 // Stores bufIdx in 32 MSBs and posIdx in 32 LSBs.
  28. buffers [][]byte
  29. Ref uint64
  30. Tag string
  31. }
  32. // allocs keeps references to all Allocators, so we can safely discard them later.
  33. var allocsMu *sync.Mutex
  34. var allocRef uint64
  35. var allocs map[uint64]*Allocator
  36. var calculatedLog2 []int
  37. func init() {
  38. allocsMu = new(sync.Mutex)
  39. allocs = make(map[uint64]*Allocator)
  40. // Set up a unique Ref per process.
  41. allocRef = uint64(rand.Int63n(1<<16)) << 48
  42. calculatedLog2 = make([]int, 1025)
  43. for i := 1; i <= 1024; i++ {
  44. calculatedLog2[i] = int(math.Log2(float64(i)))
  45. }
  46. }
  47. // NewAllocator creates an allocator starting with the given size.
  48. func NewAllocator(sz int, tag string) *Allocator {
  49. ref := atomic.AddUint64(&allocRef, 1)
  50. // We should not allow a zero sized page because addBufferWithMinSize
  51. // will run into an infinite loop trying to double the pagesize.
  52. if sz < 512 {
  53. sz = 512
  54. }
  55. a := &Allocator{
  56. Ref: ref,
  57. buffers: make([][]byte, 64),
  58. Tag: tag,
  59. }
  60. l2 := uint64(log2(sz))
  61. if bits.OnesCount64(uint64(sz)) > 1 {
  62. l2 += 1
  63. }
  64. a.buffers[0] = Calloc(1<<l2, a.Tag)
  65. allocsMu.Lock()
  66. allocs[ref] = a
  67. allocsMu.Unlock()
  68. return a
  69. }
  70. func (a *Allocator) Reset() {
  71. atomic.StoreUint64(&a.compIdx, 0)
  72. }
  73. func Allocators() string {
  74. allocsMu.Lock()
  75. tags := make(map[string]uint64)
  76. num := make(map[string]int)
  77. for _, ac := range allocs {
  78. tags[ac.Tag] += ac.Allocated()
  79. num[ac.Tag] += 1
  80. }
  81. var buf bytes.Buffer
  82. for tag, sz := range tags {
  83. fmt.Fprintf(&buf, "Tag: %s Num: %d Size: %s . ", tag, num[tag], humanize.IBytes(sz))
  84. }
  85. allocsMu.Unlock()
  86. return buf.String()
  87. }
  88. func (a *Allocator) String() string {
  89. var s strings.Builder
  90. s.WriteString(fmt.Sprintf("Allocator: %x\n", a.Ref))
  91. var cum int
  92. for i, b := range a.buffers {
  93. cum += len(b)
  94. if len(b) == 0 {
  95. break
  96. }
  97. s.WriteString(fmt.Sprintf("idx: %d len: %d cum: %d\n", i, len(b), cum))
  98. }
  99. pos := atomic.LoadUint64(&a.compIdx)
  100. bi, pi := parse(pos)
  101. s.WriteString(fmt.Sprintf("bi: %d pi: %d\n", bi, pi))
  102. s.WriteString(fmt.Sprintf("Size: %d\n", a.Size()))
  103. return s.String()
  104. }
  105. // AllocatorFrom would return the allocator corresponding to the ref.
  106. func AllocatorFrom(ref uint64) *Allocator {
  107. allocsMu.Lock()
  108. a := allocs[ref]
  109. allocsMu.Unlock()
  110. return a
  111. }
  112. func parse(pos uint64) (bufIdx, posIdx int) {
  113. return int(pos >> 32), int(pos & 0xFFFFFFFF)
  114. }
  115. // Size returns the size of the allocations so far.
  116. func (a *Allocator) Size() int {
  117. pos := atomic.LoadUint64(&a.compIdx)
  118. bi, pi := parse(pos)
  119. var sz int
  120. for i, b := range a.buffers {
  121. if i < bi {
  122. sz += len(b)
  123. continue
  124. }
  125. sz += pi
  126. return sz
  127. }
  128. panic("Size should not reach here")
  129. }
  130. func log2(sz int) int {
  131. if sz < len(calculatedLog2) {
  132. return calculatedLog2[sz]
  133. }
  134. pow := 10
  135. sz >>= 10
  136. for sz > 1 {
  137. sz >>= 1
  138. pow++
  139. }
  140. return pow
  141. }
  142. func (a *Allocator) Allocated() uint64 {
  143. var alloc int
  144. for _, b := range a.buffers {
  145. alloc += cap(b)
  146. }
  147. return uint64(alloc)
  148. }
  149. func (a *Allocator) TrimTo(max int) {
  150. var alloc int
  151. for i, b := range a.buffers {
  152. if len(b) == 0 {
  153. break
  154. }
  155. alloc += len(b)
  156. if alloc < max {
  157. continue
  158. }
  159. Free(b)
  160. a.buffers[i] = nil
  161. }
  162. }
  163. // Release would release the memory back. Remember to make this call to avoid memory leaks.
  164. func (a *Allocator) Release() {
  165. if a == nil {
  166. return
  167. }
  168. var alloc int
  169. for _, b := range a.buffers {
  170. if len(b) == 0 {
  171. break
  172. }
  173. alloc += len(b)
  174. Free(b)
  175. }
  176. allocsMu.Lock()
  177. delete(allocs, a.Ref)
  178. allocsMu.Unlock()
  179. }
  180. const maxAlloc = 1 << 30
  181. func (a *Allocator) MaxAlloc() int {
  182. return maxAlloc
  183. }
  184. const nodeAlign = unsafe.Sizeof(uint64(0)) - 1
  185. func (a *Allocator) AllocateAligned(sz int) []byte {
  186. tsz := sz + int(nodeAlign)
  187. out := a.Allocate(tsz)
  188. // We are reusing allocators. In that case, it's important to zero out the memory allocated
  189. // here. We don't always zero it out (in Allocate), because other functions would be immediately
  190. // overwriting the allocated slices anyway (see Copy).
  191. ZeroOut(out, 0, len(out))
  192. addr := uintptr(unsafe.Pointer(&out[0]))
  193. aligned := (addr + nodeAlign) & ^nodeAlign
  194. start := int(aligned - addr)
  195. return out[start : start+sz]
  196. }
  197. func (a *Allocator) Copy(buf []byte) []byte {
  198. if a == nil {
  199. return append([]byte{}, buf...)
  200. }
  201. out := a.Allocate(len(buf))
  202. copy(out, buf)
  203. return out
  204. }
  205. func (a *Allocator) addBufferAt(bufIdx, minSz int) {
  206. for {
  207. if bufIdx >= len(a.buffers) {
  208. panic(fmt.Sprintf("Allocator can not allocate more than %d buffers", len(a.buffers)))
  209. }
  210. if len(a.buffers[bufIdx]) == 0 {
  211. break
  212. }
  213. if minSz <= len(a.buffers[bufIdx]) {
  214. // No need to do anything. We already have a buffer which can satisfy minSz.
  215. return
  216. }
  217. bufIdx++
  218. }
  219. assert(bufIdx > 0)
  220. // We need to allocate a new buffer.
  221. // Make pageSize double of the last allocation.
  222. pageSize := 2 * len(a.buffers[bufIdx-1])
  223. // Ensure pageSize is bigger than sz.
  224. for pageSize < minSz {
  225. pageSize *= 2
  226. }
  227. // If bigger than maxAlloc, trim to maxAlloc.
  228. if pageSize > maxAlloc {
  229. pageSize = maxAlloc
  230. }
  231. buf := Calloc(pageSize, a.Tag)
  232. assert(len(a.buffers[bufIdx]) == 0)
  233. a.buffers[bufIdx] = buf
  234. }
  235. func (a *Allocator) Allocate(sz int) []byte {
  236. if a == nil {
  237. return make([]byte, sz)
  238. }
  239. if sz > maxAlloc {
  240. panic(fmt.Sprintf("Unable to allocate more than %d\n", maxAlloc))
  241. }
  242. if sz == 0 {
  243. return nil
  244. }
  245. for {
  246. pos := atomic.AddUint64(&a.compIdx, uint64(sz))
  247. bufIdx, posIdx := parse(pos)
  248. buf := a.buffers[bufIdx]
  249. if posIdx > len(buf) {
  250. a.Lock()
  251. newPos := atomic.LoadUint64(&a.compIdx)
  252. newBufIdx, _ := parse(newPos)
  253. if newBufIdx != bufIdx {
  254. a.Unlock()
  255. continue
  256. }
  257. a.addBufferAt(bufIdx+1, sz)
  258. atomic.StoreUint64(&a.compIdx, uint64((bufIdx+1)<<32))
  259. a.Unlock()
  260. // We added a new buffer. Let's acquire slice the right way by going back to the top.
  261. continue
  262. }
  263. data := buf[posIdx-sz : posIdx]
  264. return data
  265. }
  266. }
  267. type AllocatorPool struct {
  268. numGets int64
  269. allocCh chan *Allocator
  270. closer *Closer
  271. }
  272. func NewAllocatorPool(sz int) *AllocatorPool {
  273. a := &AllocatorPool{
  274. allocCh: make(chan *Allocator, sz),
  275. closer: NewCloser(1),
  276. }
  277. go a.freeupAllocators()
  278. return a
  279. }
  280. func (p *AllocatorPool) Get(sz int, tag string) *Allocator {
  281. if p == nil {
  282. return NewAllocator(sz, tag)
  283. }
  284. atomic.AddInt64(&p.numGets, 1)
  285. select {
  286. case alloc := <-p.allocCh:
  287. alloc.Reset()
  288. alloc.Tag = tag
  289. return alloc
  290. default:
  291. return NewAllocator(sz, tag)
  292. }
  293. }
  294. func (p *AllocatorPool) Return(a *Allocator) {
  295. if a == nil {
  296. return
  297. }
  298. if p == nil {
  299. a.Release()
  300. return
  301. }
  302. a.TrimTo(400 << 20)
  303. select {
  304. case p.allocCh <- a:
  305. return
  306. default:
  307. a.Release()
  308. }
  309. }
  310. func (p *AllocatorPool) Release() {
  311. if p == nil {
  312. return
  313. }
  314. p.closer.SignalAndWait()
  315. }
  316. func (p *AllocatorPool) freeupAllocators() {
  317. defer p.closer.Done()
  318. ticker := time.NewTicker(2 * time.Second)
  319. defer ticker.Stop()
  320. releaseOne := func() bool {
  321. select {
  322. case alloc := <-p.allocCh:
  323. alloc.Release()
  324. return true
  325. default:
  326. return false
  327. }
  328. }
  329. var last int64
  330. for {
  331. select {
  332. case <-p.closer.HasBeenClosed():
  333. close(p.allocCh)
  334. for alloc := range p.allocCh {
  335. alloc.Release()
  336. }
  337. return
  338. case <-ticker.C:
  339. gets := atomic.LoadInt64(&p.numGets)
  340. if gets != last {
  341. // Some retrievals were made since the last time. So, let's avoid doing a release.
  342. last = gets
  343. continue
  344. }
  345. releaseOne()
  346. }
  347. }
  348. }