| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- // Code generated by go run gen.go; DO NOT EDIT.
- package async
- // UnboundedInterfaceChan is a channel with an unbounded buffer for caching
- // Interface objects. A channel must be closed via Close method.
- type UnboundedInterfaceChan struct {
- in, out chan interface{}
- close chan struct{}
- q []interface{}
- }
- // NewUnboundedInterfaceChan returns a unbounded channel with unlimited capacity.
- func NewUnboundedInterfaceChan() *UnboundedInterfaceChan {
- ch := &UnboundedInterfaceChan{
- // The size of Interface is less than 16 bytes, we use 16 to fit
- // a CPU cache line (L2, 256 Bytes), which may reduce cache misses.
- in: make(chan interface{}, 16),
- out: make(chan interface{}, 16),
- close: make(chan struct{}),
- }
- go ch.processing()
- return ch
- }
- // In returns the send channel of the given channel, which can be used to
- // send values to the channel.
- func (ch *UnboundedInterfaceChan) In() chan<- interface{} { return ch.in }
- // Out returns the receive channel of the given channel, which can be used
- // to receive values from the channel.
- func (ch *UnboundedInterfaceChan) Out() <-chan interface{} { return ch.out }
- // Close closes the channel.
- func (ch *UnboundedInterfaceChan) Close() { ch.close <- struct{}{} }
- func (ch *UnboundedInterfaceChan) processing() {
- // This is a preallocation of the internal unbounded buffer.
- // The size is randomly picked. But if one changes the size, the
- // reallocation size at the subsequent for loop should also be
- // changed too. Furthermore, there is no memory leak since the
- // queue is garbage collected.
- ch.q = make([]interface{}, 0, 1<<10)
- for {
- select {
- case e, ok := <-ch.in:
- if !ok {
- // We don't want the input channel be accidentally closed
- // via close() instead of Close(). If that happens, it is
- // a misuse, do a panic as warning.
- panic("async: misuse of unbounded channel, In() was closed")
- }
- ch.q = append(ch.q, e)
- case <-ch.close:
- ch.closed()
- return
- }
- for len(ch.q) > 0 {
- select {
- case ch.out <- ch.q[0]:
- ch.q[0] = nil // de-reference earlier to help GC
- ch.q = ch.q[1:]
- case e, ok := <-ch.in:
- if !ok {
- // We don't want the input channel be accidentally closed
- // via close() instead of Close(). If that happens, it is
- // a misuse, do a panic as warning.
- panic("async: misuse of unbounded channel, In() was closed")
- }
- ch.q = append(ch.q, e)
- case <-ch.close:
- ch.closed()
- return
- }
- }
- // If the remaining capacity is too small, we prefer to
- // reallocate the entire buffer.
- if cap(ch.q) < 1<<5 {
- ch.q = make([]interface{}, 0, 1<<10)
- }
- }
- }
- func (ch *UnboundedInterfaceChan) closed() {
- close(ch.in)
- for e := range ch.in {
- ch.q = append(ch.q, e)
- }
- for len(ch.q) > 0 {
- select {
- case ch.out <- ch.q[0]:
- ch.q[0] = nil // de-reference earlier to help GC
- ch.q = ch.q[1:]
- default:
- }
- }
- close(ch.out)
- close(ch.close)
- }
|