reader.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // The `fwd` package provides a buffered reader
  2. // and writer. Each has methods that help improve
  3. // the encoding/decoding performance of some binary
  4. // protocols.
  5. //
  6. // The `fwd.Writer` and `fwd.Reader` type provide similar
  7. // functionality to their counterparts in `bufio`, plus
  8. // a few extra utility methods that simplify read-ahead
  9. // and write-ahead. I wrote this package to improve serialization
  10. // performance for http://github.com/tinylib/msgp,
  11. // where it provided about a 2x speedup over `bufio` for certain
  12. // workloads. However, care must be taken to understand the semantics of the
  13. // extra methods provided by this package, as they allow
  14. // the user to access and manipulate the buffer memory
  15. // directly.
  16. //
  17. // The extra methods for `fwd.Reader` are `Peek`, `Skip`
  18. // and `Next`. `(*fwd.Reader).Peek`, unlike `(*bufio.Reader).Peek`,
  19. // will re-allocate the read buffer in order to accommodate arbitrarily
  20. // large read-ahead. `(*fwd.Reader).Skip` skips the next `n` bytes
  21. // in the stream, and uses the `io.Seeker` interface if the underlying
  22. // stream implements it. `(*fwd.Reader).Next` returns a slice pointing
  23. // to the next `n` bytes in the read buffer (like `Peek`), but also
  24. // increments the read position. This allows users to process streams
  25. // in arbitrary block sizes without having to manage appropriately-sized
  26. // slices. Additionally, obviating the need to copy the data from the
  27. // buffer to another location in memory can improve performance dramatically
  28. // in CPU-bound applications.
  29. //
  30. // `fwd.Writer` only has one extra method, which is `(*fwd.Writer).Next`, which
  31. // returns a slice pointing to the next `n` bytes of the writer, and increments
  32. // the write position by the length of the returned slice. This allows users
  33. // to write directly to the end of the buffer.
  34. package fwd
  35. import "io"
  36. const (
  37. // DefaultReaderSize is the default size of the read buffer
  38. DefaultReaderSize = 2048
  39. // minimum read buffer; straight from bufio
  40. minReaderSize = 16
  41. )
  42. // NewReader returns a new *Reader that reads from 'r'
  43. func NewReader(r io.Reader) *Reader {
  44. return NewReaderSize(r, DefaultReaderSize)
  45. }
  46. // NewReaderSize returns a new *Reader that
  47. // reads from 'r' and has a buffer size 'n'.
  48. func NewReaderSize(r io.Reader, n int) *Reader {
  49. buf := make([]byte, 0, max(n, minReaderSize))
  50. return NewReaderBuf(r, buf)
  51. }
  52. // NewReaderBuf returns a new *Reader that
  53. // reads from 'r' and uses 'buf' as a buffer.
  54. // 'buf' is not used when has smaller capacity than 16,
  55. // custom buffer is allocated instead.
  56. func NewReaderBuf(r io.Reader, buf []byte) *Reader {
  57. if cap(buf) < minReaderSize {
  58. buf = make([]byte, 0, minReaderSize)
  59. }
  60. buf = buf[:0]
  61. rd := &Reader{
  62. r: r,
  63. data: buf,
  64. }
  65. if s, ok := r.(io.Seeker); ok {
  66. rd.rs = s
  67. }
  68. return rd
  69. }
  70. // Reader is a buffered look-ahead reader
  71. type Reader struct {
  72. r io.Reader // underlying reader
  73. // data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space
  74. data []byte // data
  75. n int // read offset
  76. state error // last read error
  77. // if the reader past to NewReader was
  78. // also an io.Seeker, this is non-nil
  79. rs io.Seeker
  80. }
  81. // Reset resets the underlying reader
  82. // and the read buffer.
  83. func (r *Reader) Reset(rd io.Reader) {
  84. r.r = rd
  85. r.data = r.data[0:0]
  86. r.n = 0
  87. r.state = nil
  88. if s, ok := rd.(io.Seeker); ok {
  89. r.rs = s
  90. } else {
  91. r.rs = nil
  92. }
  93. }
  94. // more() does one read on the underlying reader
  95. func (r *Reader) more() {
  96. // move data backwards so that
  97. // the read offset is 0; this way
  98. // we can supply the maximum number of
  99. // bytes to the reader
  100. if r.n != 0 {
  101. if r.n < len(r.data) {
  102. r.data = r.data[:copy(r.data[0:], r.data[r.n:])]
  103. } else {
  104. r.data = r.data[:0]
  105. }
  106. r.n = 0
  107. }
  108. var a int
  109. a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)])
  110. if a == 0 && r.state == nil {
  111. r.state = io.ErrNoProgress
  112. return
  113. } else if a > 0 && r.state == io.EOF {
  114. // discard the io.EOF if we read more than 0 bytes.
  115. // the next call to Read should return io.EOF again.
  116. r.state = nil
  117. }
  118. r.data = r.data[:len(r.data)+a]
  119. }
  120. // pop error
  121. func (r *Reader) err() (e error) {
  122. e, r.state = r.state, nil
  123. return
  124. }
  125. // pop error; EOF -> io.ErrUnexpectedEOF
  126. func (r *Reader) noEOF() (e error) {
  127. e, r.state = r.state, nil
  128. if e == io.EOF {
  129. e = io.ErrUnexpectedEOF
  130. }
  131. return
  132. }
  133. // buffered bytes
  134. func (r *Reader) buffered() int { return len(r.data) - r.n }
  135. // Buffered returns the number of bytes currently in the buffer
  136. func (r *Reader) Buffered() int { return len(r.data) - r.n }
  137. // BufferSize returns the total size of the buffer
  138. func (r *Reader) BufferSize() int { return cap(r.data) }
  139. // Peek returns the next 'n' buffered bytes,
  140. // reading from the underlying reader if necessary.
  141. // It will only return a slice shorter than 'n' bytes
  142. // if it also returns an error. Peek does not advance
  143. // the reader. EOF errors are *not* returned as
  144. // io.ErrUnexpectedEOF.
  145. func (r *Reader) Peek(n int) ([]byte, error) {
  146. // in the degenerate case,
  147. // we may need to realloc
  148. // (the caller asked for more
  149. // bytes than the size of the buffer)
  150. if cap(r.data) < n {
  151. old := r.data[r.n:]
  152. r.data = make([]byte, n+r.buffered())
  153. r.data = r.data[:copy(r.data, old)]
  154. r.n = 0
  155. }
  156. // keep filling until
  157. // we hit an error or
  158. // read enough bytes
  159. for r.buffered() < n && r.state == nil {
  160. r.more()
  161. }
  162. // we must have hit an error
  163. if r.buffered() < n {
  164. return r.data[r.n:], r.err()
  165. }
  166. return r.data[r.n : r.n+n], nil
  167. }
  168. // Skip moves the reader forward 'n' bytes.
  169. // Returns the number of bytes skipped and any
  170. // errors encountered. It is analogous to Seek(n, 1).
  171. // If the underlying reader implements io.Seeker, then
  172. // that method will be used to skip forward.
  173. //
  174. // If the reader encounters
  175. // an EOF before skipping 'n' bytes, it
  176. // returns io.ErrUnexpectedEOF. If the
  177. // underlying reader implements io.Seeker, then
  178. // those rules apply instead. (Many implementations
  179. // will not return `io.EOF` until the next call
  180. // to Read.)
  181. func (r *Reader) Skip(n int) (int, error) {
  182. // fast path
  183. if r.buffered() >= n {
  184. r.n += n
  185. return n, nil
  186. }
  187. // use seeker implementation
  188. // if we can
  189. if r.rs != nil {
  190. return r.skipSeek(n)
  191. }
  192. // loop on filling
  193. // and then erasing
  194. o := n
  195. for r.buffered() < n && r.state == nil {
  196. r.more()
  197. // we can skip forward
  198. // up to r.buffered() bytes
  199. step := min(r.buffered(), n)
  200. r.n += step
  201. n -= step
  202. }
  203. // at this point, n should be
  204. // 0 if everything went smoothly
  205. return o - n, r.noEOF()
  206. }
  207. // Next returns the next 'n' bytes in the stream.
  208. // Unlike Peek, Next advances the reader position.
  209. // The returned bytes point to the same
  210. // data as the buffer, so the slice is
  211. // only valid until the next reader method call.
  212. // An EOF is considered an unexpected error.
  213. // If an the returned slice is less than the
  214. // length asked for, an error will be returned,
  215. // and the reader position will not be incremented.
  216. func (r *Reader) Next(n int) ([]byte, error) {
  217. // in case the buffer is too small
  218. if cap(r.data) < n {
  219. old := r.data[r.n:]
  220. r.data = make([]byte, n+r.buffered())
  221. r.data = r.data[:copy(r.data, old)]
  222. r.n = 0
  223. }
  224. // fill at least 'n' bytes
  225. for r.buffered() < n && r.state == nil {
  226. r.more()
  227. }
  228. if r.buffered() < n {
  229. return r.data[r.n:], r.noEOF()
  230. }
  231. out := r.data[r.n : r.n+n]
  232. r.n += n
  233. return out, nil
  234. }
  235. // skipSeek uses the io.Seeker to seek forward.
  236. // only call this function when n > r.buffered()
  237. func (r *Reader) skipSeek(n int) (int, error) {
  238. o := r.buffered()
  239. // first, clear buffer
  240. n -= o
  241. r.n = 0
  242. r.data = r.data[:0]
  243. // then seek forward remaning bytes
  244. i, err := r.rs.Seek(int64(n), 1)
  245. return int(i) + o, err
  246. }
  247. // Read implements `io.Reader`
  248. func (r *Reader) Read(b []byte) (int, error) {
  249. // if we have data in the buffer, just
  250. // return that.
  251. if r.buffered() != 0 {
  252. x := copy(b, r.data[r.n:])
  253. r.n += x
  254. return x, nil
  255. }
  256. var n int
  257. // we have no buffered data; determine
  258. // whether or not to buffer or call
  259. // the underlying reader directly
  260. if len(b) >= cap(r.data) {
  261. n, r.state = r.r.Read(b)
  262. } else {
  263. r.more()
  264. n = copy(b, r.data)
  265. r.n = n
  266. }
  267. if n == 0 {
  268. return 0, r.err()
  269. }
  270. return n, nil
  271. }
  272. // ReadFull attempts to read len(b) bytes into
  273. // 'b'. It returns the number of bytes read into
  274. // 'b', and an error if it does not return len(b).
  275. // EOF is considered an unexpected error.
  276. func (r *Reader) ReadFull(b []byte) (int, error) {
  277. var n int // read into b
  278. var nn int // scratch
  279. l := len(b)
  280. // either read buffered data,
  281. // or read directly for the underlying
  282. // buffer, or fetch more buffered data.
  283. for n < l && r.state == nil {
  284. if r.buffered() != 0 {
  285. nn = copy(b[n:], r.data[r.n:])
  286. n += nn
  287. r.n += nn
  288. } else if l-n > cap(r.data) {
  289. nn, r.state = r.r.Read(b[n:])
  290. n += nn
  291. } else {
  292. r.more()
  293. }
  294. }
  295. if n < l {
  296. return n, r.noEOF()
  297. }
  298. return n, nil
  299. }
  300. // ReadByte implements `io.ByteReader`
  301. func (r *Reader) ReadByte() (byte, error) {
  302. for r.buffered() < 1 && r.state == nil {
  303. r.more()
  304. }
  305. if r.buffered() < 1 {
  306. return 0, r.err()
  307. }
  308. b := r.data[r.n]
  309. r.n++
  310. return b, nil
  311. }
  312. // WriteTo implements `io.WriterTo`
  313. func (r *Reader) WriteTo(w io.Writer) (int64, error) {
  314. var (
  315. i int64
  316. ii int
  317. err error
  318. )
  319. // first, clear buffer
  320. if r.buffered() > 0 {
  321. ii, err = w.Write(r.data[r.n:])
  322. i += int64(ii)
  323. if err != nil {
  324. return i, err
  325. }
  326. r.data = r.data[0:0]
  327. r.n = 0
  328. }
  329. for r.state == nil {
  330. // here we just do
  331. // 1:1 reads and writes
  332. r.more()
  333. if r.buffered() > 0 {
  334. ii, err = w.Write(r.data)
  335. i += int64(ii)
  336. if err != nil {
  337. return i, err
  338. }
  339. r.data = r.data[0:0]
  340. r.n = 0
  341. }
  342. }
  343. if r.state != io.EOF {
  344. return i, r.err()
  345. }
  346. return i, nil
  347. }
  348. func min(a int, b int) int {
  349. if a < b {
  350. return a
  351. }
  352. return b
  353. }
  354. func max(a int, b int) int {
  355. if a < b {
  356. return b
  357. }
  358. return a
  359. }