reader.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. // Package fwd provides a buffered reader
  2. // and writer. Each has methods that help improve
  3. // the encoding/decoding performance of some binary
  4. // protocols.
  5. //
  6. // The [Writer] and [Reader] type provide similar
  7. // functionality to their counterparts in [bufio], plus
  8. // a few extra utility methods that simplify read-ahead
  9. // and write-ahead. I wrote this package to improve serialization
  10. // performance for http://github.com/tinylib/msgp,
  11. // where it provided about a 2x speedup over `bufio` for certain
  12. // workloads. However, care must be taken to understand the semantics of the
  13. // extra methods provided by this package, as they allow
  14. // the user to access and manipulate the buffer memory
  15. // directly.
  16. //
  17. // The extra methods for [Reader] are [Reader.Peek], [Reader.Skip]
  18. // and [Reader.Next]. (*fwd.Reader).Peek, unlike (*bufio.Reader).Peek,
  19. // will re-allocate the read buffer in order to accommodate arbitrarily
  20. // large read-ahead. (*fwd.Reader).Skip skips the next 'n' bytes
  21. // in the stream, and uses the [io.Seeker] interface if the underlying
  22. // stream implements it. (*fwd.Reader).Next returns a slice pointing
  23. // to the next 'n' bytes in the read buffer (like Reader.Peek), but also
  24. // increments the read position. This allows users to process streams
  25. // in arbitrary block sizes without having to manage appropriately-sized
  26. // slices. Additionally, obviating the need to copy the data from the
  27. // buffer to another location in memory can improve performance dramatically
  28. // in CPU-bound applications.
  29. //
  30. // [Writer] only has one extra method, which is (*fwd.Writer).Next, which
  31. // returns a slice pointing to the next 'n' bytes of the writer, and increments
  32. // the write position by the length of the returned slice. This allows users
  33. // to write directly to the end of the buffer.
  34. package fwd
  35. import (
  36. "io"
  37. "os"
  38. )
  39. const (
  40. // DefaultReaderSize is the default size of the read buffer
  41. DefaultReaderSize = 2048
  42. // minimum read buffer; straight from bufio
  43. minReaderSize = 16
  44. )
  45. // NewReader returns a new *Reader that reads from 'r'
  46. func NewReader(r io.Reader) *Reader {
  47. return NewReaderSize(r, DefaultReaderSize)
  48. }
  49. // NewReaderSize returns a new *Reader that
  50. // reads from 'r' and has a buffer size 'n'.
  51. func NewReaderSize(r io.Reader, n int) *Reader {
  52. buf := make([]byte, 0, max(n, minReaderSize))
  53. return NewReaderBuf(r, buf)
  54. }
  55. // NewReaderBuf returns a new *Reader that
  56. // reads from 'r' and uses 'buf' as a buffer.
  57. // 'buf' is not used when has smaller capacity than 16,
  58. // custom buffer is allocated instead.
  59. func NewReaderBuf(r io.Reader, buf []byte) *Reader {
  60. if cap(buf) < minReaderSize {
  61. buf = make([]byte, 0, minReaderSize)
  62. }
  63. buf = buf[:0]
  64. rd := &Reader{
  65. r: r,
  66. data: buf,
  67. }
  68. if s, ok := r.(io.Seeker); ok {
  69. rd.rs = s
  70. }
  71. return rd
  72. }
  73. // Reader is a buffered look-ahead reader
  74. type Reader struct {
  75. r io.Reader // underlying reader
  76. // data[n:len(data)] is buffered data; data[len(data):cap(data)] is free buffer space
  77. data []byte // data
  78. n int // read offset
  79. inputOffset int64 // offset in the input stream
  80. state error // last read error
  81. // if the reader past to NewReader was
  82. // also an io.Seeker, this is non-nil
  83. rs io.Seeker
  84. }
  85. // Reset resets the underlying reader
  86. // and the read buffer.
  87. func (r *Reader) Reset(rd io.Reader) {
  88. r.r = rd
  89. r.data = r.data[0:0]
  90. r.n = 0
  91. r.inputOffset = 0
  92. r.state = nil
  93. if s, ok := rd.(io.Seeker); ok {
  94. r.rs = s
  95. } else {
  96. r.rs = nil
  97. }
  98. }
  99. // more() does one read on the underlying reader
  100. func (r *Reader) more() {
  101. // move data backwards so that
  102. // the read offset is 0; this way
  103. // we can supply the maximum number of
  104. // bytes to the reader
  105. if r.n != 0 {
  106. if r.n < len(r.data) {
  107. r.data = r.data[:copy(r.data[0:], r.data[r.n:])]
  108. } else {
  109. r.data = r.data[:0]
  110. }
  111. r.n = 0
  112. }
  113. var a int
  114. a, r.state = r.r.Read(r.data[len(r.data):cap(r.data)])
  115. if a == 0 && r.state == nil {
  116. r.state = io.ErrNoProgress
  117. return
  118. } else if a > 0 && r.state == io.EOF {
  119. // discard the io.EOF if we read more than 0 bytes.
  120. // the next call to Read should return io.EOF again.
  121. r.state = nil
  122. } else if r.state != nil {
  123. return
  124. }
  125. r.data = r.data[:len(r.data)+a]
  126. }
  127. // pop error
  128. func (r *Reader) err() (e error) {
  129. e, r.state = r.state, nil
  130. return
  131. }
  132. // pop error; EOF -> io.ErrUnexpectedEOF
  133. func (r *Reader) noEOF() (e error) {
  134. e, r.state = r.state, nil
  135. if e == io.EOF {
  136. e = io.ErrUnexpectedEOF
  137. }
  138. return
  139. }
  140. // buffered bytes
  141. func (r *Reader) buffered() int { return len(r.data) - r.n }
  142. // Buffered returns the number of bytes currently in the buffer
  143. func (r *Reader) Buffered() int { return len(r.data) - r.n }
  144. // BufferSize returns the total size of the buffer
  145. func (r *Reader) BufferSize() int { return cap(r.data) }
  146. // InputOffset returns the input stream byte offset of the current reader position
  147. func (r *Reader) InputOffset() int64 { return r.inputOffset }
  148. // Peek returns the next 'n' buffered bytes,
  149. // reading from the underlying reader if necessary.
  150. // It will only return a slice shorter than 'n' bytes
  151. // if it also returns an error. Peek does not advance
  152. // the reader. EOF errors are *not* returned as
  153. // io.ErrUnexpectedEOF.
  154. func (r *Reader) Peek(n int) ([]byte, error) {
  155. // in the degenerate case,
  156. // we may need to realloc
  157. // (the caller asked for more
  158. // bytes than the size of the buffer)
  159. if cap(r.data) < n {
  160. old := r.data[r.n:]
  161. r.data = make([]byte, n+r.buffered())
  162. r.data = r.data[:copy(r.data, old)]
  163. r.n = 0
  164. }
  165. // keep filling until
  166. // we hit an error or
  167. // read enough bytes
  168. for r.buffered() < n && r.state == nil {
  169. r.more()
  170. }
  171. // we must have hit an error
  172. if r.buffered() < n {
  173. return r.data[r.n:], r.err()
  174. }
  175. return r.data[r.n : r.n+n], nil
  176. }
  177. func (r *Reader) PeekByte() (b byte, err error) {
  178. if len(r.data)-r.n >= 1 {
  179. b = r.data[r.n]
  180. } else {
  181. b, err = r.peekByte()
  182. }
  183. return
  184. }
  185. func (r *Reader) peekByte() (byte, error) {
  186. const n = 1
  187. if cap(r.data) < n {
  188. old := r.data[r.n:]
  189. r.data = make([]byte, n+r.buffered())
  190. r.data = r.data[:copy(r.data, old)]
  191. r.n = 0
  192. }
  193. // keep filling until
  194. // we hit an error or
  195. // read enough bytes
  196. for r.buffered() < n && r.state == nil {
  197. r.more()
  198. }
  199. // we must have hit an error
  200. if r.buffered() < n {
  201. return 0, r.err()
  202. }
  203. return r.data[r.n], nil
  204. }
  205. // discard(n) discards up to 'n' buffered bytes, and
  206. // and returns the number of bytes discarded
  207. func (r *Reader) discard(n int) int {
  208. inbuf := r.buffered()
  209. if inbuf <= n {
  210. r.n = 0
  211. r.inputOffset += int64(inbuf)
  212. r.data = r.data[:0]
  213. return inbuf
  214. }
  215. r.n += n
  216. r.inputOffset += int64(n)
  217. return n
  218. }
  219. // Skip moves the reader forward 'n' bytes.
  220. // Returns the number of bytes skipped and any
  221. // errors encountered. It is analogous to Seek(n, 1).
  222. // If the underlying reader implements io.Seeker, then
  223. // that method will be used to skip forward.
  224. //
  225. // If the reader encounters
  226. // an EOF before skipping 'n' bytes, it
  227. // returns [io.ErrUnexpectedEOF]. If the
  228. // underlying reader implements [io.Seeker], then
  229. // those rules apply instead. (Many implementations
  230. // will not return [io.EOF] until the next call
  231. // to Read).
  232. func (r *Reader) Skip(n int) (int, error) {
  233. if n < 0 {
  234. return 0, os.ErrInvalid
  235. }
  236. // discard some or all of the current buffer
  237. skipped := r.discard(n)
  238. // if we can Seek() through the remaining bytes, do that
  239. if n > skipped && r.rs != nil {
  240. nn, err := r.rs.Seek(int64(n-skipped), 1)
  241. r.inputOffset += nn
  242. return int(nn) + skipped, err
  243. }
  244. // otherwise, keep filling the buffer
  245. // and discarding it up to 'n'
  246. for skipped < n && r.state == nil {
  247. r.more()
  248. skipped += r.discard(n - skipped)
  249. }
  250. return skipped, r.noEOF()
  251. }
  252. // Next returns the next 'n' bytes in the stream.
  253. // Unlike Peek, Next advances the reader position.
  254. // The returned bytes point to the same
  255. // data as the buffer, so the slice is
  256. // only valid until the next reader method call.
  257. // An EOF is considered an unexpected error.
  258. // If an the returned slice is less than the
  259. // length asked for, an error will be returned,
  260. // and the reader position will not be incremented.
  261. func (r *Reader) Next(n int) (b []byte, err error) {
  262. if r.state == nil && len(r.data)-r.n >= n {
  263. b = r.data[r.n : r.n+n]
  264. r.n += n
  265. r.inputOffset += int64(n)
  266. } else {
  267. b, err = r.next(n)
  268. }
  269. return
  270. }
  271. func (r *Reader) next(n int) ([]byte, error) {
  272. // in case the buffer is too small
  273. if cap(r.data) < n {
  274. old := r.data[r.n:]
  275. r.data = make([]byte, n+r.buffered())
  276. r.data = r.data[:copy(r.data, old)]
  277. r.n = 0
  278. }
  279. // fill at least 'n' bytes
  280. for r.buffered() < n && r.state == nil {
  281. r.more()
  282. }
  283. if r.buffered() < n {
  284. return r.data[r.n:], r.noEOF()
  285. }
  286. out := r.data[r.n : r.n+n]
  287. r.n += n
  288. r.inputOffset += int64(n)
  289. return out, nil
  290. }
  291. // Read implements [io.Reader].
  292. func (r *Reader) Read(b []byte) (int, error) {
  293. // if we have data in the buffer, just
  294. // return that.
  295. if r.buffered() != 0 {
  296. x := copy(b, r.data[r.n:])
  297. r.n += x
  298. r.inputOffset += int64(x)
  299. return x, nil
  300. }
  301. var n int
  302. // we have no buffered data; determine
  303. // whether or not to buffer or call
  304. // the underlying reader directly
  305. if len(b) >= cap(r.data) {
  306. n, r.state = r.r.Read(b)
  307. } else {
  308. r.more()
  309. n = copy(b, r.data)
  310. r.n = n
  311. }
  312. if n == 0 {
  313. return 0, r.err()
  314. }
  315. r.inputOffset += int64(n)
  316. return n, nil
  317. }
  318. // ReadFull attempts to read len(b) bytes into
  319. // 'b'. It returns the number of bytes read into
  320. // 'b', and an error if it does not return len(b).
  321. // EOF is considered an unexpected error.
  322. func (r *Reader) ReadFull(b []byte) (int, error) {
  323. var n int // read into b
  324. var nn int // scratch
  325. l := len(b)
  326. // either read buffered data,
  327. // or read directly for the underlying
  328. // buffer, or fetch more buffered data.
  329. for n < l && r.state == nil {
  330. if r.buffered() != 0 {
  331. nn = copy(b[n:], r.data[r.n:])
  332. n += nn
  333. r.n += nn
  334. r.inputOffset += int64(nn)
  335. } else if l-n > cap(r.data) {
  336. nn, r.state = r.r.Read(b[n:])
  337. n += nn
  338. r.inputOffset += int64(nn)
  339. } else {
  340. r.more()
  341. }
  342. }
  343. if n < l {
  344. return n, r.noEOF()
  345. }
  346. return n, nil
  347. }
  348. // ReadByte implements [io.ByteReader].
  349. func (r *Reader) ReadByte() (byte, error) {
  350. for r.buffered() < 1 && r.state == nil {
  351. r.more()
  352. }
  353. if r.buffered() < 1 {
  354. return 0, r.err()
  355. }
  356. b := r.data[r.n]
  357. r.n++
  358. r.inputOffset++
  359. return b, nil
  360. }
  361. // WriteTo implements [io.WriterTo].
  362. func (r *Reader) WriteTo(w io.Writer) (int64, error) {
  363. var (
  364. i int64
  365. ii int
  366. err error
  367. )
  368. // first, clear buffer
  369. if r.buffered() > 0 {
  370. ii, err = w.Write(r.data[r.n:])
  371. i += int64(ii)
  372. if err != nil {
  373. return i, err
  374. }
  375. r.data = r.data[0:0]
  376. r.n = 0
  377. r.inputOffset += int64(ii)
  378. }
  379. for r.state == nil {
  380. // here we just do
  381. // 1:1 reads and writes
  382. r.more()
  383. if r.buffered() > 0 {
  384. ii, err = w.Write(r.data)
  385. i += int64(ii)
  386. if err != nil {
  387. return i, err
  388. }
  389. r.data = r.data[0:0]
  390. r.n = 0
  391. r.inputOffset += int64(ii)
  392. }
  393. }
  394. if r.state != io.EOF {
  395. return i, r.err()
  396. }
  397. return i, nil
  398. }
  399. func max(a int, b int) int {
  400. if a < b {
  401. return b
  402. }
  403. return a
  404. }