pipeconns.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package fasthttputil
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. // NewPipeConns returns new bi-directional connection pipe.
  10. //
  11. // PipeConns is NOT safe for concurrent use by multiple goroutines!
  12. func NewPipeConns() *PipeConns {
  13. ch1 := make(chan *byteBuffer, 4)
  14. ch2 := make(chan *byteBuffer, 4)
  15. pc := &PipeConns{
  16. stopCh: make(chan struct{}),
  17. }
  18. pc.c1.rCh = ch1
  19. pc.c1.wCh = ch2
  20. pc.c2.rCh = ch2
  21. pc.c2.wCh = ch1
  22. pc.c1.pc = pc
  23. pc.c2.pc = pc
  24. return pc
  25. }
  26. // PipeConns provides bi-directional connection pipe,
  27. // which use in-process memory as a transport.
  28. //
  29. // PipeConns must be created by calling NewPipeConns.
  30. //
  31. // PipeConns has the following additional features comparing to connections
  32. // returned from net.Pipe():
  33. //
  34. // - It is faster.
  35. // - It buffers Write calls, so there is no need to have concurrent goroutine
  36. // calling Read in order to unblock each Write call.
  37. // - It supports read and write deadlines.
  38. //
  39. // PipeConns is NOT safe for concurrent use by multiple goroutines!
  40. type PipeConns struct {
  41. c1 pipeConn
  42. c2 pipeConn
  43. stopCh chan struct{}
  44. stopChLock sync.Mutex
  45. }
  46. // Conn1 returns the first end of bi-directional pipe.
  47. //
  48. // Data written to Conn1 may be read from Conn2.
  49. // Data written to Conn2 may be read from Conn1.
  50. func (pc *PipeConns) Conn1() net.Conn {
  51. return &pc.c1
  52. }
  53. // Conn2 returns the second end of bi-directional pipe.
  54. //
  55. // Data written to Conn2 may be read from Conn1.
  56. // Data written to Conn1 may be read from Conn2.
  57. func (pc *PipeConns) Conn2() net.Conn {
  58. return &pc.c2
  59. }
  60. // Close closes pipe connections.
  61. func (pc *PipeConns) Close() error {
  62. pc.stopChLock.Lock()
  63. select {
  64. case <-pc.stopCh:
  65. default:
  66. close(pc.stopCh)
  67. }
  68. pc.stopChLock.Unlock()
  69. return nil
  70. }
  71. type pipeConn struct {
  72. b *byteBuffer
  73. bb []byte
  74. rCh chan *byteBuffer
  75. wCh chan *byteBuffer
  76. pc *PipeConns
  77. readDeadlineTimer *time.Timer
  78. writeDeadlineTimer *time.Timer
  79. readDeadlineCh <-chan time.Time
  80. writeDeadlineCh <-chan time.Time
  81. readDeadlineChLock sync.Mutex
  82. }
  83. func (c *pipeConn) Write(p []byte) (int, error) {
  84. b := acquireByteBuffer()
  85. b.b = append(b.b[:0], p...)
  86. select {
  87. case <-c.pc.stopCh:
  88. releaseByteBuffer(b)
  89. return 0, errConnectionClosed
  90. default:
  91. }
  92. select {
  93. case c.wCh <- b:
  94. default:
  95. select {
  96. case c.wCh <- b:
  97. case <-c.writeDeadlineCh:
  98. c.writeDeadlineCh = closedDeadlineCh
  99. return 0, ErrTimeout
  100. case <-c.pc.stopCh:
  101. releaseByteBuffer(b)
  102. return 0, errConnectionClosed
  103. }
  104. }
  105. return len(p), nil
  106. }
  107. func (c *pipeConn) Read(p []byte) (int, error) {
  108. mayBlock := true
  109. nn := 0
  110. for len(p) > 0 {
  111. n, err := c.read(p, mayBlock)
  112. nn += n
  113. if err != nil {
  114. if !mayBlock && err == errWouldBlock {
  115. err = nil
  116. }
  117. return nn, err
  118. }
  119. p = p[n:]
  120. mayBlock = false
  121. }
  122. return nn, nil
  123. }
  124. func (c *pipeConn) read(p []byte, mayBlock bool) (int, error) {
  125. if len(c.bb) == 0 {
  126. if err := c.readNextByteBuffer(mayBlock); err != nil {
  127. return 0, err
  128. }
  129. }
  130. n := copy(p, c.bb)
  131. c.bb = c.bb[n:]
  132. return n, nil
  133. }
  134. func (c *pipeConn) readNextByteBuffer(mayBlock bool) error {
  135. releaseByteBuffer(c.b)
  136. c.b = nil
  137. select {
  138. case c.b = <-c.rCh:
  139. default:
  140. if !mayBlock {
  141. return errWouldBlock
  142. }
  143. c.readDeadlineChLock.Lock()
  144. readDeadlineCh := c.readDeadlineCh
  145. c.readDeadlineChLock.Unlock()
  146. select {
  147. case c.b = <-c.rCh:
  148. case <-readDeadlineCh:
  149. c.readDeadlineChLock.Lock()
  150. c.readDeadlineCh = closedDeadlineCh
  151. c.readDeadlineChLock.Unlock()
  152. // rCh may contain data when deadline is reached.
  153. // Read the data before returning ErrTimeout.
  154. select {
  155. case c.b = <-c.rCh:
  156. default:
  157. return ErrTimeout
  158. }
  159. case <-c.pc.stopCh:
  160. // rCh may contain data when stopCh is closed.
  161. // Read the data before returning EOF.
  162. select {
  163. case c.b = <-c.rCh:
  164. default:
  165. return io.EOF
  166. }
  167. }
  168. }
  169. c.bb = c.b.b
  170. return nil
  171. }
  172. var (
  173. errWouldBlock = errors.New("would block")
  174. errConnectionClosed = errors.New("connection closed")
  175. )
  176. type timeoutError struct {
  177. }
  178. func (e *timeoutError) Error() string {
  179. return "timeout"
  180. }
  181. // Only implement the Timeout() function of the net.Error interface.
  182. // This allows for checks like:
  183. //
  184. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  185. func (e *timeoutError) Timeout() bool {
  186. return true
  187. }
  188. var (
  189. // ErrTimeout is returned from Read() or Write() on timeout.
  190. ErrTimeout = &timeoutError{}
  191. )
  192. func (c *pipeConn) Close() error {
  193. return c.pc.Close()
  194. }
  195. func (c *pipeConn) LocalAddr() net.Addr {
  196. return pipeAddr(0)
  197. }
  198. func (c *pipeConn) RemoteAddr() net.Addr {
  199. return pipeAddr(0)
  200. }
  201. func (c *pipeConn) SetDeadline(deadline time.Time) error {
  202. c.SetReadDeadline(deadline) //nolint:errcheck
  203. c.SetWriteDeadline(deadline) //nolint:errcheck
  204. return nil
  205. }
  206. func (c *pipeConn) SetReadDeadline(deadline time.Time) error {
  207. if c.readDeadlineTimer == nil {
  208. c.readDeadlineTimer = time.NewTimer(time.Hour)
  209. }
  210. readDeadlineCh := updateTimer(c.readDeadlineTimer, deadline)
  211. c.readDeadlineChLock.Lock()
  212. c.readDeadlineCh = readDeadlineCh
  213. c.readDeadlineChLock.Unlock()
  214. return nil
  215. }
  216. func (c *pipeConn) SetWriteDeadline(deadline time.Time) error {
  217. if c.writeDeadlineTimer == nil {
  218. c.writeDeadlineTimer = time.NewTimer(time.Hour)
  219. }
  220. c.writeDeadlineCh = updateTimer(c.writeDeadlineTimer, deadline)
  221. return nil
  222. }
  223. func updateTimer(t *time.Timer, deadline time.Time) <-chan time.Time {
  224. if !t.Stop() {
  225. select {
  226. case <-t.C:
  227. default:
  228. }
  229. }
  230. if deadline.IsZero() {
  231. return nil
  232. }
  233. d := -time.Since(deadline)
  234. if d <= 0 {
  235. return closedDeadlineCh
  236. }
  237. t.Reset(d)
  238. return t.C
  239. }
  240. var closedDeadlineCh = func() <-chan time.Time {
  241. ch := make(chan time.Time)
  242. close(ch)
  243. return ch
  244. }()
  245. type pipeAddr int
  246. func (pipeAddr) Network() string {
  247. return "pipe"
  248. }
  249. func (pipeAddr) String() string {
  250. return "pipe"
  251. }
  252. type byteBuffer struct {
  253. b []byte
  254. }
  255. func acquireByteBuffer() *byteBuffer {
  256. return byteBufferPool.Get().(*byteBuffer)
  257. }
  258. func releaseByteBuffer(b *byteBuffer) {
  259. if b != nil {
  260. byteBufferPool.Put(b)
  261. }
  262. }
  263. var byteBufferPool = &sync.Pool{
  264. New: func() interface{} {
  265. return &byteBuffer{
  266. b: make([]byte, 1024),
  267. }
  268. },
  269. }