allocator.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. /*
  2. * Copyright 2020 Dgraph Labs, Inc. and Contributors
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package z
  17. import (
  18. "bytes"
  19. "fmt"
  20. "math"
  21. "math/bits"
  22. "math/rand"
  23. "strings"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "unsafe"
  28. "github.com/dustin/go-humanize"
  29. )
  30. // Allocator amortizes the cost of small allocations by allocating memory in
  31. // bigger chunks. Internally it uses z.Calloc to allocate memory. Once
  32. // allocated, the memory is not moved, so it is safe to use the allocated bytes
  33. // to unsafe cast them to Go struct pointers. Maintaining a freelist is slow.
  34. // Instead, Allocator only allocates memory, with the idea that finally we
  35. // would just release the entire Allocator.
  36. type Allocator struct {
  37. sync.Mutex
  38. compIdx uint64 // Stores bufIdx in 32 MSBs and posIdx in 32 LSBs.
  39. buffers [][]byte
  40. Ref uint64
  41. Tag string
  42. }
  43. // allocs keeps references to all Allocators, so we can safely discard them later.
  44. var allocsMu *sync.Mutex
  45. var allocRef uint64
  46. var allocs map[uint64]*Allocator
  47. var calculatedLog2 []int
  48. func init() {
  49. allocsMu = new(sync.Mutex)
  50. allocs = make(map[uint64]*Allocator)
  51. // Set up a unique Ref per process.
  52. allocRef = uint64(rand.Int63n(1<<16)) << 48
  53. calculatedLog2 = make([]int, 1025)
  54. for i := 1; i <= 1024; i++ {
  55. calculatedLog2[i] = int(math.Log2(float64(i)))
  56. }
  57. }
  58. // NewAllocator creates an allocator starting with the given size.
  59. func NewAllocator(sz int, tag string) *Allocator {
  60. ref := atomic.AddUint64(&allocRef, 1)
  61. // We should not allow a zero sized page because addBufferWithMinSize
  62. // will run into an infinite loop trying to double the pagesize.
  63. if sz < 512 {
  64. sz = 512
  65. }
  66. a := &Allocator{
  67. Ref: ref,
  68. buffers: make([][]byte, 64),
  69. Tag: tag,
  70. }
  71. l2 := uint64(log2(sz))
  72. if bits.OnesCount64(uint64(sz)) > 1 {
  73. l2 += 1
  74. }
  75. a.buffers[0] = Calloc(1<<l2, a.Tag)
  76. allocsMu.Lock()
  77. allocs[ref] = a
  78. allocsMu.Unlock()
  79. return a
  80. }
  81. func (a *Allocator) Reset() {
  82. atomic.StoreUint64(&a.compIdx, 0)
  83. }
  84. func Allocators() string {
  85. allocsMu.Lock()
  86. tags := make(map[string]uint64)
  87. num := make(map[string]int)
  88. for _, ac := range allocs {
  89. tags[ac.Tag] += ac.Allocated()
  90. num[ac.Tag] += 1
  91. }
  92. var buf bytes.Buffer
  93. for tag, sz := range tags {
  94. fmt.Fprintf(&buf, "Tag: %s Num: %d Size: %s . ", tag, num[tag], humanize.IBytes(sz))
  95. }
  96. allocsMu.Unlock()
  97. return buf.String()
  98. }
  99. func (a *Allocator) String() string {
  100. var s strings.Builder
  101. s.WriteString(fmt.Sprintf("Allocator: %x\n", a.Ref))
  102. var cum int
  103. for i, b := range a.buffers {
  104. cum += len(b)
  105. if len(b) == 0 {
  106. break
  107. }
  108. s.WriteString(fmt.Sprintf("idx: %d len: %d cum: %d\n", i, len(b), cum))
  109. }
  110. pos := atomic.LoadUint64(&a.compIdx)
  111. bi, pi := parse(pos)
  112. s.WriteString(fmt.Sprintf("bi: %d pi: %d\n", bi, pi))
  113. s.WriteString(fmt.Sprintf("Size: %d\n", a.Size()))
  114. return s.String()
  115. }
  116. // AllocatorFrom would return the allocator corresponding to the ref.
  117. func AllocatorFrom(ref uint64) *Allocator {
  118. allocsMu.Lock()
  119. a := allocs[ref]
  120. allocsMu.Unlock()
  121. return a
  122. }
  123. func parse(pos uint64) (bufIdx, posIdx int) {
  124. return int(pos >> 32), int(pos & 0xFFFFFFFF)
  125. }
  126. // Size returns the size of the allocations so far.
  127. func (a *Allocator) Size() int {
  128. pos := atomic.LoadUint64(&a.compIdx)
  129. bi, pi := parse(pos)
  130. var sz int
  131. for i, b := range a.buffers {
  132. if i < bi {
  133. sz += len(b)
  134. continue
  135. }
  136. sz += pi
  137. return sz
  138. }
  139. panic("Size should not reach here")
  140. }
  141. func log2(sz int) int {
  142. if sz < len(calculatedLog2) {
  143. return calculatedLog2[sz]
  144. }
  145. pow := 10
  146. sz >>= 10
  147. for sz > 1 {
  148. sz >>= 1
  149. pow++
  150. }
  151. return pow
  152. }
  153. func (a *Allocator) Allocated() uint64 {
  154. var alloc int
  155. for _, b := range a.buffers {
  156. alloc += cap(b)
  157. }
  158. return uint64(alloc)
  159. }
  160. func (a *Allocator) TrimTo(max int) {
  161. var alloc int
  162. for i, b := range a.buffers {
  163. if len(b) == 0 {
  164. break
  165. }
  166. alloc += len(b)
  167. if alloc < max {
  168. continue
  169. }
  170. Free(b)
  171. a.buffers[i] = nil
  172. }
  173. }
  174. // Release would release the memory back. Remember to make this call to avoid memory leaks.
  175. func (a *Allocator) Release() {
  176. if a == nil {
  177. return
  178. }
  179. var alloc int
  180. for _, b := range a.buffers {
  181. if len(b) == 0 {
  182. break
  183. }
  184. alloc += len(b)
  185. Free(b)
  186. }
  187. allocsMu.Lock()
  188. delete(allocs, a.Ref)
  189. allocsMu.Unlock()
  190. }
  191. const maxAlloc = 1 << 30
  192. func (a *Allocator) MaxAlloc() int {
  193. return maxAlloc
  194. }
  195. const nodeAlign = unsafe.Sizeof(uint64(0)) - 1
  196. func (a *Allocator) AllocateAligned(sz int) []byte {
  197. tsz := sz + int(nodeAlign)
  198. out := a.Allocate(tsz)
  199. // We are reusing allocators. In that case, it's important to zero out the memory allocated
  200. // here. We don't always zero it out (in Allocate), because other functions would be immediately
  201. // overwriting the allocated slices anyway (see Copy).
  202. ZeroOut(out, 0, len(out))
  203. addr := uintptr(unsafe.Pointer(&out[0]))
  204. aligned := (addr + nodeAlign) & ^nodeAlign
  205. start := int(aligned - addr)
  206. return out[start : start+sz]
  207. }
  208. func (a *Allocator) Copy(buf []byte) []byte {
  209. if a == nil {
  210. return append([]byte{}, buf...)
  211. }
  212. out := a.Allocate(len(buf))
  213. copy(out, buf)
  214. return out
  215. }
  216. func (a *Allocator) addBufferAt(bufIdx, minSz int) {
  217. for {
  218. if bufIdx >= len(a.buffers) {
  219. panic(fmt.Sprintf("Allocator can not allocate more than %d buffers", len(a.buffers)))
  220. }
  221. if len(a.buffers[bufIdx]) == 0 {
  222. break
  223. }
  224. if minSz <= len(a.buffers[bufIdx]) {
  225. // No need to do anything. We already have a buffer which can satisfy minSz.
  226. return
  227. }
  228. bufIdx++
  229. }
  230. assert(bufIdx > 0)
  231. // We need to allocate a new buffer.
  232. // Make pageSize double of the last allocation.
  233. pageSize := 2 * len(a.buffers[bufIdx-1])
  234. // Ensure pageSize is bigger than sz.
  235. for pageSize < minSz {
  236. pageSize *= 2
  237. }
  238. // If bigger than maxAlloc, trim to maxAlloc.
  239. if pageSize > maxAlloc {
  240. pageSize = maxAlloc
  241. }
  242. buf := Calloc(pageSize, a.Tag)
  243. assert(len(a.buffers[bufIdx]) == 0)
  244. a.buffers[bufIdx] = buf
  245. }
  246. func (a *Allocator) Allocate(sz int) []byte {
  247. if a == nil {
  248. return make([]byte, sz)
  249. }
  250. if sz > maxAlloc {
  251. panic(fmt.Sprintf("Unable to allocate more than %d\n", maxAlloc))
  252. }
  253. if sz == 0 {
  254. return nil
  255. }
  256. for {
  257. pos := atomic.AddUint64(&a.compIdx, uint64(sz))
  258. bufIdx, posIdx := parse(pos)
  259. buf := a.buffers[bufIdx]
  260. if posIdx > len(buf) {
  261. a.Lock()
  262. newPos := atomic.LoadUint64(&a.compIdx)
  263. newBufIdx, _ := parse(newPos)
  264. if newBufIdx != bufIdx {
  265. a.Unlock()
  266. continue
  267. }
  268. a.addBufferAt(bufIdx+1, sz)
  269. atomic.StoreUint64(&a.compIdx, uint64((bufIdx+1)<<32))
  270. a.Unlock()
  271. // We added a new buffer. Let's acquire slice the right way by going back to the top.
  272. continue
  273. }
  274. data := buf[posIdx-sz : posIdx]
  275. return data
  276. }
  277. }
  278. type AllocatorPool struct {
  279. numGets int64
  280. allocCh chan *Allocator
  281. closer *Closer
  282. }
  283. func NewAllocatorPool(sz int) *AllocatorPool {
  284. a := &AllocatorPool{
  285. allocCh: make(chan *Allocator, sz),
  286. closer: NewCloser(1),
  287. }
  288. go a.freeupAllocators()
  289. return a
  290. }
  291. func (p *AllocatorPool) Get(sz int, tag string) *Allocator {
  292. if p == nil {
  293. return NewAllocator(sz, tag)
  294. }
  295. atomic.AddInt64(&p.numGets, 1)
  296. select {
  297. case alloc := <-p.allocCh:
  298. alloc.Reset()
  299. alloc.Tag = tag
  300. return alloc
  301. default:
  302. return NewAllocator(sz, tag)
  303. }
  304. }
  305. func (p *AllocatorPool) Return(a *Allocator) {
  306. if a == nil {
  307. return
  308. }
  309. if p == nil {
  310. a.Release()
  311. return
  312. }
  313. a.TrimTo(400 << 20)
  314. select {
  315. case p.allocCh <- a:
  316. return
  317. default:
  318. a.Release()
  319. }
  320. }
  321. func (p *AllocatorPool) Release() {
  322. if p == nil {
  323. return
  324. }
  325. p.closer.SignalAndWait()
  326. }
  327. func (p *AllocatorPool) freeupAllocators() {
  328. defer p.closer.Done()
  329. ticker := time.NewTicker(2 * time.Second)
  330. defer ticker.Stop()
  331. releaseOne := func() bool {
  332. select {
  333. case alloc := <-p.allocCh:
  334. alloc.Release()
  335. return true
  336. default:
  337. return false
  338. }
  339. }
  340. var last int64
  341. for {
  342. select {
  343. case <-p.closer.HasBeenClosed():
  344. close(p.allocCh)
  345. for alloc := range p.allocCh {
  346. alloc.Release()
  347. }
  348. return
  349. case <-ticker.C:
  350. gets := atomic.LoadInt64(&p.numGets)
  351. if gets != last {
  352. // Some retrievals were made since the last time. So, let's avoid doing a release.
  353. last = gets
  354. continue
  355. }
  356. releaseOne()
  357. }
  358. }
  359. }