chan_struct.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package async
  2. // UnboundedStructChan is a channel with an unbounded buffer for caching
  3. // struct{} objects. This implementation is a specialized version that
  4. // optimizes for struct{} objects than other types. A channel must be
  5. // closed via Close method.
  6. type UnboundedStructChan struct {
  7. in, out, close chan struct{}
  8. n uint64
  9. }
  10. // NewUnboundedStructChan returns a unbounded channel with unlimited capacity.
  11. func NewUnboundedStructChan() *UnboundedStructChan {
  12. ch := &UnboundedStructChan{
  13. // The size of Struct is less than 16 bytes, we use 16 to fit
  14. // a CPU cache line (L2, 256 Bytes), which may reduce cache misses.
  15. in: make(chan struct{}, 16),
  16. out: make(chan struct{}, 16),
  17. close: make(chan struct{}),
  18. }
  19. go ch.processing()
  20. return ch
  21. }
  22. // In returns a send-only channel that can be used to send values
  23. // to the channel.
  24. func (ch *UnboundedStructChan) In() chan<- struct{} { return ch.in }
  25. // Out returns a receive-only channel that can be used to receive
  26. // values from the channel.
  27. func (ch *UnboundedStructChan) Out() <-chan struct{} { return ch.out }
  28. // Close closes the channel.
  29. func (ch *UnboundedStructChan) Close() { ch.close <- struct{}{} }
  30. func (ch *UnboundedStructChan) processing() {
  31. for {
  32. select {
  33. case _, ok := <-ch.in:
  34. if !ok {
  35. // We don't want the input channel be accidentally closed
  36. // via close() instead of Close(). If that happens, it is
  37. // a misuse, do a panic as warning.
  38. panic("async: misuse of unbounded channel, In() was closed")
  39. }
  40. ch.n++
  41. case <-ch.close:
  42. ch.closed()
  43. return
  44. }
  45. for ch.n > 0 {
  46. select {
  47. case ch.out <- struct{}{}:
  48. ch.n--
  49. case _, ok := <-ch.in:
  50. if !ok {
  51. // We don't want the input channel be accidentally closed
  52. // via close() instead of Close(). If that happens, it is
  53. // a misuse, do a panic as warning.
  54. panic("async: misuse of unbounded channel, In() was closed")
  55. }
  56. ch.n++
  57. case <-ch.close:
  58. ch.closed()
  59. return
  60. }
  61. }
  62. }
  63. }
  64. func (ch *UnboundedStructChan) closed() {
  65. close(ch.in)
  66. for range ch.in {
  67. ch.n++
  68. }
  69. for ; ch.n > 0; ch.n-- {
  70. select {
  71. case ch.out <- struct{}{}:
  72. default:
  73. }
  74. }
  75. close(ch.out)
  76. close(ch.close)
  77. }