pipeconns.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package fasthttputil
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. // NewPipeConns returns new bi-directional connection pipe.
  10. //
  11. // PipeConns is NOT safe for concurrent use by multiple goroutines!
  12. func NewPipeConns() *PipeConns {
  13. ch1 := make(chan *byteBuffer, 4)
  14. ch2 := make(chan *byteBuffer, 4)
  15. pc := &PipeConns{
  16. stopCh: make(chan struct{}),
  17. }
  18. pc.c1.rCh = ch1
  19. pc.c1.wCh = ch2
  20. pc.c2.rCh = ch2
  21. pc.c2.wCh = ch1
  22. pc.c1.pc = pc
  23. pc.c2.pc = pc
  24. return pc
  25. }
  26. // PipeConns provides bi-directional connection pipe,
  27. // which use in-process memory as a transport.
  28. //
  29. // PipeConns must be created by calling NewPipeConns.
  30. //
  31. // PipeConns has the following additional features comparing to connections
  32. // returned from net.Pipe():
  33. //
  34. // - It is faster.
  35. // - It buffers Write calls, so there is no need to have concurrent goroutine
  36. // calling Read in order to unblock each Write call.
  37. // - It supports read and write deadlines.
  38. //
  39. // PipeConns is NOT safe for concurrent use by multiple goroutines!
  40. type PipeConns struct {
  41. stopCh chan struct{}
  42. c1 pipeConn
  43. c2 pipeConn
  44. stopChLock sync.Mutex
  45. }
  46. // SetAddresses sets the local and remote addresses for the connection.
  47. func (pc *PipeConns) SetAddresses(localAddr1, remoteAddr1, localAddr2, remoteAddr2 net.Addr) {
  48. pc.c1.addrLock.Lock()
  49. defer pc.c1.addrLock.Unlock()
  50. pc.c2.addrLock.Lock()
  51. defer pc.c2.addrLock.Unlock()
  52. pc.c1.localAddr = localAddr1
  53. pc.c1.remoteAddr = remoteAddr1
  54. pc.c2.localAddr = localAddr2
  55. pc.c2.remoteAddr = remoteAddr2
  56. }
  57. // Conn1 returns the first end of bi-directional pipe.
  58. //
  59. // Data written to Conn1 may be read from Conn2.
  60. // Data written to Conn2 may be read from Conn1.
  61. func (pc *PipeConns) Conn1() net.Conn {
  62. return &pc.c1
  63. }
  64. // Conn2 returns the second end of bi-directional pipe.
  65. //
  66. // Data written to Conn2 may be read from Conn1.
  67. // Data written to Conn1 may be read from Conn2.
  68. func (pc *PipeConns) Conn2() net.Conn {
  69. return &pc.c2
  70. }
  71. // Close closes pipe connections.
  72. func (pc *PipeConns) Close() error {
  73. pc.stopChLock.Lock()
  74. select {
  75. case <-pc.stopCh:
  76. default:
  77. close(pc.stopCh)
  78. }
  79. pc.stopChLock.Unlock()
  80. return nil
  81. }
  82. type pipeConn struct {
  83. localAddr net.Addr
  84. remoteAddr net.Addr
  85. b *byteBuffer
  86. rCh chan *byteBuffer
  87. wCh chan *byteBuffer
  88. pc *PipeConns
  89. readDeadlineTimer *time.Timer
  90. writeDeadlineTimer *time.Timer
  91. readDeadlineCh <-chan time.Time
  92. writeDeadlineCh <-chan time.Time
  93. bb []byte
  94. addrLock sync.RWMutex
  95. readDeadlineChLock sync.Mutex
  96. }
  97. func (c *pipeConn) Write(p []byte) (int, error) {
  98. b := acquireByteBuffer()
  99. b.b = append(b.b[:0], p...)
  100. select {
  101. case <-c.pc.stopCh:
  102. releaseByteBuffer(b)
  103. return 0, errConnectionClosed
  104. default:
  105. }
  106. select {
  107. case c.wCh <- b:
  108. default:
  109. select {
  110. case c.wCh <- b:
  111. case <-c.writeDeadlineCh:
  112. c.writeDeadlineCh = closedDeadlineCh
  113. return 0, ErrTimeout
  114. case <-c.pc.stopCh:
  115. releaseByteBuffer(b)
  116. return 0, errConnectionClosed
  117. }
  118. }
  119. return len(p), nil
  120. }
  121. func (c *pipeConn) WriteString(s string) (int, error) {
  122. return c.Write(s2b(s))
  123. }
  124. func (c *pipeConn) Read(p []byte) (int, error) {
  125. mayBlock := true
  126. nn := 0
  127. for len(p) > 0 {
  128. n, err := c.read(p, mayBlock)
  129. nn += n
  130. if err != nil {
  131. if !mayBlock && err == errWouldBlock {
  132. err = nil
  133. }
  134. return nn, err
  135. }
  136. p = p[n:]
  137. mayBlock = false
  138. }
  139. return nn, nil
  140. }
  141. func (c *pipeConn) read(p []byte, mayBlock bool) (int, error) {
  142. if len(c.bb) == 0 {
  143. if err := c.readNextByteBuffer(mayBlock); err != nil {
  144. return 0, err
  145. }
  146. }
  147. n := copy(p, c.bb)
  148. c.bb = c.bb[n:]
  149. return n, nil
  150. }
  151. func (c *pipeConn) readNextByteBuffer(mayBlock bool) error {
  152. releaseByteBuffer(c.b)
  153. c.b = nil
  154. select {
  155. case c.b = <-c.rCh:
  156. default:
  157. if !mayBlock {
  158. return errWouldBlock
  159. }
  160. c.readDeadlineChLock.Lock()
  161. readDeadlineCh := c.readDeadlineCh
  162. c.readDeadlineChLock.Unlock()
  163. select {
  164. case c.b = <-c.rCh:
  165. case <-readDeadlineCh:
  166. c.readDeadlineChLock.Lock()
  167. c.readDeadlineCh = closedDeadlineCh
  168. c.readDeadlineChLock.Unlock()
  169. // rCh may contain data when deadline is reached.
  170. // Read the data before returning ErrTimeout.
  171. select {
  172. case c.b = <-c.rCh:
  173. default:
  174. return ErrTimeout
  175. }
  176. case <-c.pc.stopCh:
  177. // rCh may contain data when stopCh is closed.
  178. // Read the data before returning EOF.
  179. select {
  180. case c.b = <-c.rCh:
  181. default:
  182. return io.EOF
  183. }
  184. }
  185. }
  186. c.bb = c.b.b
  187. return nil
  188. }
  189. var (
  190. errWouldBlock = errors.New("would block")
  191. errConnectionClosed = errors.New("connection closed")
  192. )
  193. type timeoutError struct{}
  194. func (e *timeoutError) Error() string {
  195. return "timeout"
  196. }
  197. // Only implement the Timeout() function of the net.Error interface.
  198. // This allows for checks like:
  199. //
  200. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  201. func (e *timeoutError) Timeout() bool {
  202. return true
  203. }
  204. // ErrTimeout is returned from Read() or Write() on timeout.
  205. var ErrTimeout = &timeoutError{}
  206. func (c *pipeConn) Close() error {
  207. return c.pc.Close()
  208. }
  209. func (c *pipeConn) LocalAddr() net.Addr {
  210. c.addrLock.RLock()
  211. defer c.addrLock.RUnlock()
  212. if c.localAddr != nil {
  213. return c.localAddr
  214. }
  215. return pipeAddr(0)
  216. }
  217. func (c *pipeConn) RemoteAddr() net.Addr {
  218. c.addrLock.RLock()
  219. defer c.addrLock.RUnlock()
  220. if c.remoteAddr != nil {
  221. return c.remoteAddr
  222. }
  223. return pipeAddr(0)
  224. }
  225. func (c *pipeConn) SetDeadline(deadline time.Time) error {
  226. c.SetReadDeadline(deadline) //nolint:errcheck
  227. c.SetWriteDeadline(deadline) //nolint:errcheck
  228. return nil
  229. }
  230. func (c *pipeConn) SetReadDeadline(deadline time.Time) error {
  231. if c.readDeadlineTimer == nil {
  232. c.readDeadlineTimer = time.NewTimer(time.Hour)
  233. }
  234. readDeadlineCh := updateTimer(c.readDeadlineTimer, deadline)
  235. c.readDeadlineChLock.Lock()
  236. c.readDeadlineCh = readDeadlineCh
  237. c.readDeadlineChLock.Unlock()
  238. return nil
  239. }
  240. func (c *pipeConn) SetWriteDeadline(deadline time.Time) error {
  241. if c.writeDeadlineTimer == nil {
  242. c.writeDeadlineTimer = time.NewTimer(time.Hour)
  243. }
  244. c.writeDeadlineCh = updateTimer(c.writeDeadlineTimer, deadline)
  245. return nil
  246. }
  247. func updateTimer(t *time.Timer, deadline time.Time) <-chan time.Time {
  248. if !t.Stop() {
  249. select {
  250. case <-t.C:
  251. default:
  252. }
  253. }
  254. if deadline.IsZero() {
  255. return nil
  256. }
  257. d := time.Until(deadline)
  258. if d <= 0 {
  259. return closedDeadlineCh
  260. }
  261. t.Reset(d)
  262. return t.C
  263. }
  264. var closedDeadlineCh = func() <-chan time.Time {
  265. ch := make(chan time.Time)
  266. close(ch)
  267. return ch
  268. }()
  269. type pipeAddr int
  270. func (pipeAddr) Network() string {
  271. return "pipe"
  272. }
  273. func (pipeAddr) String() string {
  274. return "pipe"
  275. }
  276. type byteBuffer struct {
  277. b []byte
  278. }
  279. func acquireByteBuffer() *byteBuffer {
  280. return byteBufferPool.Get().(*byteBuffer)
  281. }
  282. func releaseByteBuffer(b *byteBuffer) {
  283. if b != nil {
  284. byteBufferPool.Put(b)
  285. }
  286. }
  287. var byteBufferPool = &sync.Pool{
  288. New: func() any {
  289. return &byteBuffer{
  290. b: make([]byte, 1024),
  291. }
  292. },
  293. }