| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- // Code generated by go run gen.go; DO NOT EDIT.
- package async
- // UnboundedFuncChan is a channel with an unbounded buffer for caching
- // Func objects. A channel must be closed via Close method.
- type UnboundedFuncChan struct {
- in, out chan func()
- close chan struct{}
- q []func()
- }
- // NewUnboundedFuncChan returns a unbounded channel with unlimited capacity.
- func NewUnboundedFuncChan() *UnboundedFuncChan {
- ch := &UnboundedFuncChan{
- // The size of Func is less than 16 bytes, we use 16 to fit
- // a CPU cache line (L2, 256 Bytes), which may reduce cache misses.
- in: make(chan func(), 16),
- out: make(chan func(), 16),
- close: make(chan struct{}),
- }
- go ch.processing()
- return ch
- }
- // In returns the send channel of the given channel, which can be used to
- // send values to the channel.
- func (ch *UnboundedFuncChan) In() chan<- func() { return ch.in }
- // Out returns the receive channel of the given channel, which can be used
- // to receive values from the channel.
- func (ch *UnboundedFuncChan) Out() <-chan func() { return ch.out }
- // Close closes the channel.
- func (ch *UnboundedFuncChan) Close() { ch.close <- struct{}{} }
- func (ch *UnboundedFuncChan) processing() {
- // This is a preallocation of the internal unbounded buffer.
- // The size is randomly picked. But if one changes the size, the
- // reallocation size at the subsequent for loop should also be
- // changed too. Furthermore, there is no memory leak since the
- // queue is garbage collected.
- ch.q = make([]func(), 0, 1<<10)
- for {
- select {
- case e, ok := <-ch.in:
- if !ok {
- // We don't want the input channel be accidentally closed
- // via close() instead of Close(). If that happens, it is
- // a misuse, do a panic as warning.
- panic("async: misuse of unbounded channel, In() was closed")
- }
- ch.q = append(ch.q, e)
- case <-ch.close:
- ch.closed()
- return
- }
- for len(ch.q) > 0 {
- select {
- case ch.out <- ch.q[0]:
- ch.q[0] = nil // de-reference earlier to help GC
- ch.q = ch.q[1:]
- case e, ok := <-ch.in:
- if !ok {
- // We don't want the input channel be accidentally closed
- // via close() instead of Close(). If that happens, it is
- // a misuse, do a panic as warning.
- panic("async: misuse of unbounded channel, In() was closed")
- }
- ch.q = append(ch.q, e)
- case <-ch.close:
- ch.closed()
- return
- }
- }
- // If the remaining capacity is too small, we prefer to
- // reallocate the entire buffer.
- if cap(ch.q) < 1<<5 {
- ch.q = make([]func(), 0, 1<<10)
- }
- }
- }
- func (ch *UnboundedFuncChan) closed() {
- close(ch.in)
- for e := range ch.in {
- ch.q = append(ch.q, e)
- }
- for len(ch.q) > 0 {
- select {
- case ch.out <- ch.q[0]:
- ch.q[0] = nil // de-reference earlier to help GC
- ch.q = ch.q[1:]
- default:
- }
- }
- close(ch.out)
- close(ch.close)
- }
|