chan_interface.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. // Code generated by go run gen.go; DO NOT EDIT.
  2. package async
  3. // UnboundedInterfaceChan is a channel with an unbounded buffer for caching
  4. // Interface objects. A channel must be closed via Close method.
  5. type UnboundedInterfaceChan struct {
  6. in, out chan interface{}
  7. close chan struct{}
  8. q []interface{}
  9. }
  10. // NewUnboundedInterfaceChan returns a unbounded channel with unlimited capacity.
  11. func NewUnboundedInterfaceChan() *UnboundedInterfaceChan {
  12. ch := &UnboundedInterfaceChan{
  13. // The size of Interface is less than 16 bytes, we use 16 to fit
  14. // a CPU cache line (L2, 256 Bytes), which may reduce cache misses.
  15. in: make(chan interface{}, 16),
  16. out: make(chan interface{}, 16),
  17. close: make(chan struct{}),
  18. }
  19. go ch.processing()
  20. return ch
  21. }
  22. // In returns the send channel of the given channel, which can be used to
  23. // send values to the channel.
  24. func (ch *UnboundedInterfaceChan) In() chan<- interface{} { return ch.in }
  25. // Out returns the receive channel of the given channel, which can be used
  26. // to receive values from the channel.
  27. func (ch *UnboundedInterfaceChan) Out() <-chan interface{} { return ch.out }
  28. // Close closes the channel.
  29. func (ch *UnboundedInterfaceChan) Close() { ch.close <- struct{}{} }
  30. func (ch *UnboundedInterfaceChan) processing() {
  31. // This is a preallocation of the internal unbounded buffer.
  32. // The size is randomly picked. But if one changes the size, the
  33. // reallocation size at the subsequent for loop should also be
  34. // changed too. Furthermore, there is no memory leak since the
  35. // queue is garbage collected.
  36. ch.q = make([]interface{}, 0, 1<<10)
  37. for {
  38. select {
  39. case e, ok := <-ch.in:
  40. if !ok {
  41. // We don't want the input channel be accidentally closed
  42. // via close() instead of Close(). If that happens, it is
  43. // a misuse, do a panic as warning.
  44. panic("async: misuse of unbounded channel, In() was closed")
  45. }
  46. ch.q = append(ch.q, e)
  47. case <-ch.close:
  48. ch.closed()
  49. return
  50. }
  51. for len(ch.q) > 0 {
  52. select {
  53. case ch.out <- ch.q[0]:
  54. ch.q[0] = nil // de-reference earlier to help GC
  55. ch.q = ch.q[1:]
  56. case e, ok := <-ch.in:
  57. if !ok {
  58. // We don't want the input channel be accidentally closed
  59. // via close() instead of Close(). If that happens, it is
  60. // a misuse, do a panic as warning.
  61. panic("async: misuse of unbounded channel, In() was closed")
  62. }
  63. ch.q = append(ch.q, e)
  64. case <-ch.close:
  65. ch.closed()
  66. return
  67. }
  68. }
  69. // If the remaining capacity is too small, we prefer to
  70. // reallocate the entire buffer.
  71. if cap(ch.q) < 1<<5 {
  72. ch.q = make([]interface{}, 0, 1<<10)
  73. }
  74. }
  75. }
  76. func (ch *UnboundedInterfaceChan) closed() {
  77. close(ch.in)
  78. for e := range ch.in {
  79. ch.q = append(ch.q, e)
  80. }
  81. for len(ch.q) > 0 {
  82. select {
  83. case ch.out <- ch.q[0]:
  84. ch.q[0] = nil // de-reference earlier to help GC
  85. ch.q = ch.q[1:]
  86. default:
  87. }
  88. }
  89. close(ch.out)
  90. close(ch.close)
  91. }