cancelreader.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package cancelreader
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. )
  7. // ErrCanceled gets returned when trying to read from a canceled reader.
  8. var ErrCanceled = fmt.Errorf("read canceled")
  9. // CancelReader is a io.Reader whose Read() calls can be canceled without data
  10. // being consumed. The cancelReader has to be closed.
  11. type CancelReader interface {
  12. io.ReadCloser
  13. // Cancel cancels ongoing and future reads an returns true if it succeeded.
  14. Cancel() bool
  15. }
  16. // File represents an input/output resource with a file descriptor.
  17. type File interface {
  18. io.ReadWriteCloser
  19. // Fd returns its file descriptor
  20. Fd() uintptr
  21. // Name returns its file name.
  22. Name() string
  23. }
  24. // fallbackCancelReader implements cancelReader but does not actually support
  25. // cancelation during an ongoing Read() call. Thus, Cancel() always returns
  26. // false. However, after calling Cancel(), new Read() calls immediately return
  27. // errCanceled and don't consume any data anymore.
  28. type fallbackCancelReader struct {
  29. r io.Reader
  30. cancelMixin
  31. }
  32. // newFallbackCancelReader is a fallback for NewReader that cannot actually
  33. // cancel an ongoing read but will immediately return on future reads if it has
  34. // been canceled.
  35. func newFallbackCancelReader(reader io.Reader) (CancelReader, error) {
  36. return &fallbackCancelReader{r: reader}, nil
  37. }
  38. func (r *fallbackCancelReader) Read(data []byte) (int, error) {
  39. if r.isCanceled() {
  40. return 0, ErrCanceled
  41. }
  42. n, err := r.r.Read(data)
  43. /*
  44. If the underlying reader is a blocking reader (e.g. an open connection),
  45. it might happen that 1 goroutine cancels the reader while its stuck in
  46. the read call waiting for something.
  47. If that happens, we should still cancel the read.
  48. */
  49. if r.isCanceled() {
  50. return 0, ErrCanceled
  51. }
  52. return n, err // nolint: wrapcheck
  53. }
  54. func (r *fallbackCancelReader) Cancel() bool {
  55. r.setCanceled()
  56. return false
  57. }
  58. func (r *fallbackCancelReader) Close() error {
  59. return nil
  60. }
  61. // cancelMixin represents a goroutine-safe cancelation status.
  62. type cancelMixin struct {
  63. unsafeCanceled bool
  64. lock sync.Mutex
  65. }
  66. func (c *cancelMixin) isCanceled() bool {
  67. c.lock.Lock()
  68. defer c.lock.Unlock()
  69. return c.unsafeCanceled
  70. }
  71. func (c *cancelMixin) setCanceled() {
  72. c.lock.Lock()
  73. defer c.lock.Unlock()
  74. c.unsafeCanceled = true
  75. }