decoder.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957
  1. // Copyright 2019+ Klaus Post. All rights reserved.
  2. // License information can be found in the LICENSE file.
  3. // Based on work by Yann Collet, released under BSD License.
  4. package zstd
  5. import (
  6. "context"
  7. "encoding/binary"
  8. "io"
  9. "sync"
  10. "github.com/klauspost/compress/zstd/internal/xxhash"
  11. )
  12. // Decoder provides decoding of zstandard streams.
  13. // The decoder has been designed to operate without allocations after a warmup.
  14. // This means that you should store the decoder for best performance.
  15. // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
  16. // A decoder can safely be re-used even if the previous stream failed.
  17. // To release the resources, you must call the Close() function on a decoder.
  18. type Decoder struct {
  19. o decoderOptions
  20. // Unreferenced decoders, ready for use.
  21. decoders chan *blockDec
  22. // Current read position used for Reader functionality.
  23. current decoderState
  24. // sync stream decoding
  25. syncStream struct {
  26. decodedFrame uint64
  27. br readerWrapper
  28. enabled bool
  29. inFrame bool
  30. dstBuf []byte
  31. }
  32. frame *frameDec
  33. // streamWg is the waitgroup for all streams
  34. streamWg sync.WaitGroup
  35. }
  36. // decoderState is used for maintaining state when the decoder
  37. // is used for streaming.
  38. type decoderState struct {
  39. // current block being written to stream.
  40. decodeOutput
  41. // output in order to be written to stream.
  42. output chan decodeOutput
  43. // cancel remaining output.
  44. cancel context.CancelFunc
  45. // crc of current frame
  46. crc *xxhash.Digest
  47. flushed bool
  48. }
  49. var (
  50. // Check the interfaces we want to support.
  51. _ = io.WriterTo(&Decoder{})
  52. _ = io.Reader(&Decoder{})
  53. )
  54. // NewReader creates a new decoder.
  55. // A nil Reader can be provided in which case Reset can be used to start a decode.
  56. //
  57. // A Decoder can be used in two modes:
  58. //
  59. // 1) As a stream, or
  60. // 2) For stateless decoding using DecodeAll.
  61. //
  62. // Only a single stream can be decoded concurrently, but the same decoder
  63. // can run multiple concurrent stateless decodes. It is even possible to
  64. // use stateless decodes while a stream is being decoded.
  65. //
  66. // The Reset function can be used to initiate a new stream, which will considerably
  67. // reduce the allocations normally caused by NewReader.
  68. func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
  69. initPredefined()
  70. var d Decoder
  71. d.o.setDefault()
  72. for _, o := range opts {
  73. err := o(&d.o)
  74. if err != nil {
  75. return nil, err
  76. }
  77. }
  78. d.current.crc = xxhash.New()
  79. d.current.flushed = true
  80. if r == nil {
  81. d.current.err = ErrDecoderNilInput
  82. }
  83. // Initialize dict map if needed.
  84. if d.o.dicts == nil {
  85. d.o.dicts = make(map[uint32]*dict)
  86. }
  87. // Create decoders
  88. d.decoders = make(chan *blockDec, d.o.concurrent)
  89. for i := 0; i < d.o.concurrent; i++ {
  90. dec := newBlockDec(d.o.lowMem)
  91. dec.localFrame = newFrameDec(d.o)
  92. d.decoders <- dec
  93. }
  94. if r == nil {
  95. return &d, nil
  96. }
  97. return &d, d.Reset(r)
  98. }
  99. // Read bytes from the decompressed stream into p.
  100. // Returns the number of bytes read and any error that occurred.
  101. // When the stream is done, io.EOF will be returned.
  102. func (d *Decoder) Read(p []byte) (int, error) {
  103. var n int
  104. for {
  105. if len(d.current.b) > 0 {
  106. filled := copy(p, d.current.b)
  107. p = p[filled:]
  108. d.current.b = d.current.b[filled:]
  109. n += filled
  110. }
  111. if len(p) == 0 {
  112. break
  113. }
  114. if len(d.current.b) == 0 {
  115. // We have an error and no more data
  116. if d.current.err != nil {
  117. break
  118. }
  119. if !d.nextBlock(n == 0) {
  120. return n, d.current.err
  121. }
  122. }
  123. }
  124. if len(d.current.b) > 0 {
  125. if debugDecoder {
  126. println("returning", n, "still bytes left:", len(d.current.b))
  127. }
  128. // Only return error at end of block
  129. return n, nil
  130. }
  131. if d.current.err != nil {
  132. d.drainOutput()
  133. }
  134. if debugDecoder {
  135. println("returning", n, d.current.err, len(d.decoders))
  136. }
  137. return n, d.current.err
  138. }
  139. // Reset will reset the decoder the supplied stream after the current has finished processing.
  140. // Note that this functionality cannot be used after Close has been called.
  141. // Reset can be called with a nil reader to release references to the previous reader.
  142. // After being called with a nil reader, no other operations than Reset or DecodeAll or Close
  143. // should be used.
  144. func (d *Decoder) Reset(r io.Reader) error {
  145. if d.current.err == ErrDecoderClosed {
  146. return d.current.err
  147. }
  148. d.drainOutput()
  149. d.syncStream.br.r = nil
  150. if r == nil {
  151. d.current.err = ErrDecoderNilInput
  152. if len(d.current.b) > 0 {
  153. d.current.b = d.current.b[:0]
  154. }
  155. d.current.flushed = true
  156. return nil
  157. }
  158. // If bytes buffer and < 5MB, do sync decoding anyway.
  159. if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap {
  160. bb2 := bb
  161. if debugDecoder {
  162. println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
  163. }
  164. b := bb2.Bytes()
  165. var dst []byte
  166. if cap(d.syncStream.dstBuf) > 0 {
  167. dst = d.syncStream.dstBuf[:0]
  168. }
  169. dst, err := d.DecodeAll(b, dst)
  170. if err == nil {
  171. err = io.EOF
  172. }
  173. // Save output buffer
  174. d.syncStream.dstBuf = dst
  175. d.current.b = dst
  176. d.current.err = err
  177. d.current.flushed = true
  178. if debugDecoder {
  179. println("sync decode to", len(dst), "bytes, err:", err)
  180. }
  181. return nil
  182. }
  183. // Remove current block.
  184. d.stashDecoder()
  185. d.current.decodeOutput = decodeOutput{}
  186. d.current.err = nil
  187. d.current.flushed = false
  188. d.current.d = nil
  189. d.syncStream.dstBuf = nil
  190. // Ensure no-one else is still running...
  191. d.streamWg.Wait()
  192. if d.frame == nil {
  193. d.frame = newFrameDec(d.o)
  194. }
  195. if d.o.concurrent == 1 {
  196. return d.startSyncDecoder(r)
  197. }
  198. d.current.output = make(chan decodeOutput, d.o.concurrent)
  199. ctx, cancel := context.WithCancel(context.Background())
  200. d.current.cancel = cancel
  201. d.streamWg.Add(1)
  202. go d.startStreamDecoder(ctx, r, d.current.output)
  203. return nil
  204. }
  205. // ResetWithOptions will reset the decoder and apply the given options
  206. // for the next stream or DecodeAll operation.
  207. // Options are applied on top of the existing options.
  208. // Some options cannot be changed on reset and will return an error.
  209. func (d *Decoder) ResetWithOptions(r io.Reader, opts ...DOption) error {
  210. d.o.resetOpt = true
  211. defer func() { d.o.resetOpt = false }()
  212. for _, o := range opts {
  213. if err := o(&d.o); err != nil {
  214. return err
  215. }
  216. }
  217. return d.Reset(r)
  218. }
  219. // drainOutput will drain the output until errEndOfStream is sent.
  220. func (d *Decoder) drainOutput() {
  221. if d.current.cancel != nil {
  222. if debugDecoder {
  223. println("cancelling current")
  224. }
  225. d.current.cancel()
  226. d.current.cancel = nil
  227. }
  228. if d.current.d != nil {
  229. if debugDecoder {
  230. printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
  231. }
  232. d.decoders <- d.current.d
  233. d.current.d = nil
  234. d.current.b = nil
  235. }
  236. if d.current.output == nil || d.current.flushed {
  237. println("current already flushed")
  238. return
  239. }
  240. for v := range d.current.output {
  241. if v.d != nil {
  242. if debugDecoder {
  243. printf("re-adding decoder %p", v.d)
  244. }
  245. d.decoders <- v.d
  246. }
  247. }
  248. d.current.output = nil
  249. d.current.flushed = true
  250. }
  251. // WriteTo writes data to w until there's no more data to write or when an error occurs.
  252. // The return value n is the number of bytes written.
  253. // Any error encountered during the write is also returned.
  254. func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
  255. var n int64
  256. for {
  257. if len(d.current.b) > 0 {
  258. n2, err2 := w.Write(d.current.b)
  259. n += int64(n2)
  260. if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
  261. d.current.err = err2
  262. } else if n2 != len(d.current.b) {
  263. d.current.err = io.ErrShortWrite
  264. }
  265. }
  266. if d.current.err != nil {
  267. break
  268. }
  269. d.nextBlock(true)
  270. }
  271. err := d.current.err
  272. if err != nil {
  273. d.drainOutput()
  274. }
  275. if err == io.EOF {
  276. err = nil
  277. }
  278. return n, err
  279. }
  280. // DecodeAll allows stateless decoding of a blob of bytes.
  281. // Output will be appended to dst, so if the destination size is known
  282. // you can pre-allocate the destination slice to avoid allocations.
  283. // DecodeAll can be used concurrently.
  284. // The Decoder concurrency limits will be respected.
  285. func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
  286. if d.decoders == nil {
  287. return dst, ErrDecoderClosed
  288. }
  289. // Grab a block decoder and frame decoder.
  290. block := <-d.decoders
  291. frame := block.localFrame
  292. initialSize := len(dst)
  293. defer func() {
  294. if debugDecoder {
  295. printf("re-adding decoder: %p", block)
  296. }
  297. frame.rawInput = nil
  298. frame.bBuf = nil
  299. if frame.history.decoders.br != nil {
  300. frame.history.decoders.br.in = nil
  301. frame.history.decoders.br.cursor = 0
  302. }
  303. d.decoders <- block
  304. }()
  305. frame.bBuf = input
  306. for {
  307. frame.history.reset()
  308. err := frame.reset(&frame.bBuf)
  309. if err != nil {
  310. if err == io.EOF {
  311. if debugDecoder {
  312. println("frame reset return EOF")
  313. }
  314. return dst, nil
  315. }
  316. return dst, err
  317. }
  318. if err = d.setDict(frame); err != nil {
  319. return nil, err
  320. }
  321. if frame.WindowSize > d.o.maxWindowSize {
  322. if debugDecoder {
  323. println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize)
  324. }
  325. return dst, ErrWindowSizeExceeded
  326. }
  327. if frame.FrameContentSize != fcsUnknown {
  328. if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)-initialSize) {
  329. if debugDecoder {
  330. println("decoder size exceeded; fcs:", frame.FrameContentSize, "> mcs:", d.o.maxDecodedSize-uint64(len(dst)-initialSize), "len:", len(dst))
  331. }
  332. return dst, ErrDecoderSizeExceeded
  333. }
  334. if d.o.limitToCap && frame.FrameContentSize > uint64(cap(dst)-len(dst)) {
  335. if debugDecoder {
  336. println("decoder size exceeded; fcs:", frame.FrameContentSize, "> (cap-len)", cap(dst)-len(dst))
  337. }
  338. return dst, ErrDecoderSizeExceeded
  339. }
  340. if cap(dst)-len(dst) < int(frame.FrameContentSize) {
  341. dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc)
  342. copy(dst2, dst)
  343. dst = dst2
  344. }
  345. }
  346. if cap(dst) == 0 && !d.o.limitToCap {
  347. // Allocate len(input) * 2 by default if nothing is provided
  348. // and we didn't get frame content size.
  349. size := min(
  350. // Cap to 1 MB.
  351. len(input)*2, 1<<20)
  352. if uint64(size) > d.o.maxDecodedSize {
  353. size = int(d.o.maxDecodedSize)
  354. }
  355. dst = make([]byte, 0, size)
  356. }
  357. dst, err = frame.runDecoder(dst, block)
  358. if err != nil {
  359. return dst, err
  360. }
  361. if uint64(len(dst)-initialSize) > d.o.maxDecodedSize {
  362. return dst, ErrDecoderSizeExceeded
  363. }
  364. if len(frame.bBuf) == 0 {
  365. if debugDecoder {
  366. println("frame dbuf empty")
  367. }
  368. break
  369. }
  370. }
  371. return dst, nil
  372. }
  373. // nextBlock returns the next block.
  374. // If an error occurs d.err will be set.
  375. // Optionally the function can block for new output.
  376. // If non-blocking mode is used the returned boolean will be false
  377. // if no data was available without blocking.
  378. func (d *Decoder) nextBlock(blocking bool) (ok bool) {
  379. if d.current.err != nil {
  380. // Keep error state.
  381. return false
  382. }
  383. d.current.b = d.current.b[:0]
  384. // SYNC:
  385. if d.syncStream.enabled {
  386. if !blocking {
  387. return false
  388. }
  389. ok = d.nextBlockSync()
  390. if !ok {
  391. d.stashDecoder()
  392. }
  393. return ok
  394. }
  395. //ASYNC:
  396. d.stashDecoder()
  397. if blocking {
  398. d.current.decodeOutput, ok = <-d.current.output
  399. } else {
  400. select {
  401. case d.current.decodeOutput, ok = <-d.current.output:
  402. default:
  403. return false
  404. }
  405. }
  406. if !ok {
  407. // This should not happen, so signal error state...
  408. d.current.err = io.ErrUnexpectedEOF
  409. return false
  410. }
  411. next := d.current.decodeOutput
  412. if next.d != nil && next.d.async.newHist != nil {
  413. d.current.crc.Reset()
  414. }
  415. if debugDecoder {
  416. var tmp [4]byte
  417. binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b)))
  418. println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
  419. }
  420. if d.o.ignoreChecksum {
  421. return true
  422. }
  423. if len(next.b) > 0 {
  424. d.current.crc.Write(next.b)
  425. }
  426. if next.err == nil && next.d != nil && next.d.hasCRC {
  427. got := uint32(d.current.crc.Sum64())
  428. if got != next.d.checkCRC {
  429. if debugDecoder {
  430. printf("CRC Check Failed: %08x (got) != %08x (on stream)\n", got, next.d.checkCRC)
  431. }
  432. d.current.err = ErrCRCMismatch
  433. } else {
  434. if debugDecoder {
  435. printf("CRC ok %08x\n", got)
  436. }
  437. }
  438. }
  439. return true
  440. }
  441. func (d *Decoder) nextBlockSync() (ok bool) {
  442. if d.current.d == nil {
  443. d.current.d = <-d.decoders
  444. }
  445. for len(d.current.b) == 0 {
  446. if !d.syncStream.inFrame {
  447. d.frame.history.reset()
  448. d.current.err = d.frame.reset(&d.syncStream.br)
  449. if d.current.err == nil {
  450. d.current.err = d.setDict(d.frame)
  451. }
  452. if d.current.err != nil {
  453. return false
  454. }
  455. if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
  456. d.current.err = ErrDecoderSizeExceeded
  457. return false
  458. }
  459. d.syncStream.decodedFrame = 0
  460. d.syncStream.inFrame = true
  461. }
  462. d.current.err = d.frame.next(d.current.d)
  463. if d.current.err != nil {
  464. return false
  465. }
  466. d.frame.history.ensureBlock()
  467. if debugDecoder {
  468. println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame)
  469. }
  470. histBefore := len(d.frame.history.b)
  471. d.current.err = d.current.d.decodeBuf(&d.frame.history)
  472. if d.current.err != nil {
  473. println("error after:", d.current.err)
  474. return false
  475. }
  476. d.current.b = d.frame.history.b[histBefore:]
  477. if debugDecoder {
  478. println("history after:", len(d.frame.history.b))
  479. }
  480. // Check frame size (before CRC)
  481. d.syncStream.decodedFrame += uint64(len(d.current.b))
  482. if d.syncStream.decodedFrame > d.frame.FrameContentSize {
  483. if debugDecoder {
  484. printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
  485. }
  486. d.current.err = ErrFrameSizeExceeded
  487. return false
  488. }
  489. // Check FCS
  490. if d.current.d.Last && d.frame.FrameContentSize != fcsUnknown && d.syncStream.decodedFrame != d.frame.FrameContentSize {
  491. if debugDecoder {
  492. printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
  493. }
  494. d.current.err = ErrFrameSizeMismatch
  495. return false
  496. }
  497. // Update/Check CRC
  498. if d.frame.HasCheckSum {
  499. if !d.o.ignoreChecksum {
  500. d.frame.crc.Write(d.current.b)
  501. }
  502. if d.current.d.Last {
  503. if !d.o.ignoreChecksum {
  504. d.current.err = d.frame.checkCRC()
  505. } else {
  506. d.current.err = d.frame.consumeCRC()
  507. }
  508. if d.current.err != nil {
  509. println("CRC error:", d.current.err)
  510. return false
  511. }
  512. }
  513. }
  514. d.syncStream.inFrame = !d.current.d.Last
  515. }
  516. return true
  517. }
  518. func (d *Decoder) stashDecoder() {
  519. if d.current.d != nil {
  520. if debugDecoder {
  521. printf("re-adding current decoder %p", d.current.d)
  522. }
  523. d.decoders <- d.current.d
  524. d.current.d = nil
  525. }
  526. }
  527. // Close will release all resources.
  528. // It is NOT possible to reuse the decoder after this.
  529. func (d *Decoder) Close() {
  530. if d.current.err == ErrDecoderClosed {
  531. return
  532. }
  533. d.drainOutput()
  534. if d.current.cancel != nil {
  535. d.current.cancel()
  536. d.streamWg.Wait()
  537. d.current.cancel = nil
  538. }
  539. if d.decoders != nil {
  540. close(d.decoders)
  541. for dec := range d.decoders {
  542. dec.Close()
  543. }
  544. d.decoders = nil
  545. }
  546. if d.current.d != nil {
  547. d.current.d.Close()
  548. d.current.d = nil
  549. }
  550. d.current.err = ErrDecoderClosed
  551. }
  552. // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
  553. // Any changes to the decoder will be reflected, so the returned ReadCloser
  554. // can be reused along with the decoder.
  555. // io.WriterTo is also supported by the returned ReadCloser.
  556. func (d *Decoder) IOReadCloser() io.ReadCloser {
  557. return closeWrapper{d: d}
  558. }
  559. // closeWrapper wraps a function call as a closer.
  560. type closeWrapper struct {
  561. d *Decoder
  562. }
  563. // WriteTo forwards WriteTo calls to the decoder.
  564. func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
  565. return c.d.WriteTo(w)
  566. }
  567. // Read forwards read calls to the decoder.
  568. func (c closeWrapper) Read(p []byte) (n int, err error) {
  569. return c.d.Read(p)
  570. }
  571. // Close closes the decoder.
  572. func (c closeWrapper) Close() error {
  573. c.d.Close()
  574. return nil
  575. }
  576. type decodeOutput struct {
  577. d *blockDec
  578. b []byte
  579. err error
  580. }
  581. func (d *Decoder) startSyncDecoder(r io.Reader) error {
  582. d.frame.history.reset()
  583. d.syncStream.br = readerWrapper{r: r}
  584. d.syncStream.inFrame = false
  585. d.syncStream.enabled = true
  586. d.syncStream.decodedFrame = 0
  587. return nil
  588. }
  589. // Create Decoder:
  590. // ASYNC:
  591. // Spawn 3 go routines.
  592. // 0: Read frames and decode block literals.
  593. // 1: Decode sequences.
  594. // 2: Execute sequences, send to output.
  595. func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
  596. defer d.streamWg.Done()
  597. br := readerWrapper{r: r}
  598. var seqDecode = make(chan *blockDec, d.o.concurrent)
  599. var seqExecute = make(chan *blockDec, d.o.concurrent)
  600. // Async 1: Decode sequences...
  601. go func() {
  602. var hist history
  603. var hasErr bool
  604. for block := range seqDecode {
  605. if hasErr {
  606. if block != nil {
  607. seqExecute <- block
  608. }
  609. continue
  610. }
  611. if block.async.newHist != nil {
  612. if debugDecoder {
  613. println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
  614. }
  615. hist.reset()
  616. hist.decoders = block.async.newHist.decoders
  617. hist.recentOffsets = block.async.newHist.recentOffsets
  618. hist.windowSize = block.async.newHist.windowSize
  619. if block.async.newHist.dict != nil {
  620. hist.setDict(block.async.newHist.dict)
  621. }
  622. }
  623. if block.err != nil || block.Type != blockTypeCompressed {
  624. hasErr = block.err != nil
  625. seqExecute <- block
  626. continue
  627. }
  628. hist.decoders.literals = block.async.literals
  629. block.err = block.prepareSequences(block.async.seqData, &hist)
  630. if debugDecoder && block.err != nil {
  631. println("prepareSequences returned:", block.err)
  632. }
  633. hasErr = block.err != nil
  634. if block.err == nil {
  635. block.err = block.decodeSequences(&hist)
  636. if debugDecoder && block.err != nil {
  637. println("decodeSequences returned:", block.err)
  638. }
  639. hasErr = block.err != nil
  640. // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
  641. block.async.seqSize = hist.decoders.seqSize
  642. }
  643. seqExecute <- block
  644. }
  645. close(seqExecute)
  646. hist.reset()
  647. }()
  648. var wg sync.WaitGroup
  649. wg.Add(1)
  650. // Async 3: Execute sequences...
  651. frameHistCache := d.frame.history.b
  652. go func() {
  653. var hist history
  654. var decodedFrame uint64
  655. var fcs uint64
  656. var hasErr bool
  657. for block := range seqExecute {
  658. out := decodeOutput{err: block.err, d: block}
  659. if block.err != nil || hasErr {
  660. hasErr = true
  661. output <- out
  662. continue
  663. }
  664. if block.async.newHist != nil {
  665. if debugDecoder {
  666. println("Async 2: new history")
  667. }
  668. hist.reset()
  669. hist.windowSize = block.async.newHist.windowSize
  670. hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
  671. if block.async.newHist.dict != nil {
  672. hist.setDict(block.async.newHist.dict)
  673. }
  674. if cap(hist.b) < hist.allocFrameBuffer {
  675. if cap(frameHistCache) >= hist.allocFrameBuffer {
  676. hist.b = frameHistCache
  677. } else {
  678. hist.b = make([]byte, 0, hist.allocFrameBuffer)
  679. println("Alloc history sized", hist.allocFrameBuffer)
  680. }
  681. }
  682. hist.b = hist.b[:0]
  683. fcs = block.async.fcs
  684. decodedFrame = 0
  685. }
  686. do := decodeOutput{err: block.err, d: block}
  687. switch block.Type {
  688. case blockTypeRLE:
  689. if debugDecoder {
  690. println("add rle block length:", block.RLESize)
  691. }
  692. if cap(block.dst) < int(block.RLESize) {
  693. if block.lowMem {
  694. block.dst = make([]byte, block.RLESize)
  695. } else {
  696. block.dst = make([]byte, maxCompressedBlockSize)
  697. }
  698. }
  699. block.dst = block.dst[:block.RLESize]
  700. v := block.data[0]
  701. for i := range block.dst {
  702. block.dst[i] = v
  703. }
  704. hist.append(block.dst)
  705. do.b = block.dst
  706. case blockTypeRaw:
  707. if debugDecoder {
  708. println("add raw block length:", len(block.data))
  709. }
  710. hist.append(block.data)
  711. do.b = block.data
  712. case blockTypeCompressed:
  713. if debugDecoder {
  714. println("execute with history length:", len(hist.b), "window:", hist.windowSize)
  715. }
  716. hist.decoders.seqSize = block.async.seqSize
  717. hist.decoders.literals = block.async.literals
  718. do.err = block.executeSequences(&hist)
  719. hasErr = do.err != nil
  720. if debugDecoder && hasErr {
  721. println("executeSequences returned:", do.err)
  722. }
  723. do.b = block.dst
  724. }
  725. if !hasErr {
  726. decodedFrame += uint64(len(do.b))
  727. if decodedFrame > fcs {
  728. println("fcs exceeded", block.Last, fcs, decodedFrame)
  729. do.err = ErrFrameSizeExceeded
  730. hasErr = true
  731. } else if block.Last && fcs != fcsUnknown && decodedFrame != fcs {
  732. do.err = ErrFrameSizeMismatch
  733. hasErr = true
  734. } else {
  735. if debugDecoder {
  736. println("fcs ok", block.Last, fcs, decodedFrame)
  737. }
  738. }
  739. }
  740. output <- do
  741. }
  742. close(output)
  743. frameHistCache = hist.b
  744. wg.Done()
  745. if debugDecoder {
  746. println("decoder goroutines finished")
  747. }
  748. hist.reset()
  749. }()
  750. var hist history
  751. decodeStream:
  752. for {
  753. var hasErr bool
  754. hist.reset()
  755. decodeBlock := func(block *blockDec) {
  756. if hasErr {
  757. if block != nil {
  758. seqDecode <- block
  759. }
  760. return
  761. }
  762. if block.err != nil || block.Type != blockTypeCompressed {
  763. hasErr = block.err != nil
  764. seqDecode <- block
  765. return
  766. }
  767. remain, err := block.decodeLiterals(block.data, &hist)
  768. block.err = err
  769. hasErr = block.err != nil
  770. if err == nil {
  771. block.async.literals = hist.decoders.literals
  772. block.async.seqData = remain
  773. } else if debugDecoder {
  774. println("decodeLiterals error:", err)
  775. }
  776. seqDecode <- block
  777. }
  778. frame := d.frame
  779. if debugDecoder {
  780. println("New frame...")
  781. }
  782. var historySent bool
  783. frame.history.reset()
  784. err := frame.reset(&br)
  785. if debugDecoder && err != nil {
  786. println("Frame decoder returned", err)
  787. }
  788. if err == nil {
  789. err = d.setDict(frame)
  790. }
  791. if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
  792. if debugDecoder {
  793. println("decoder size exceeded, fws:", d.frame.WindowSize, "> mws:", d.o.maxWindowSize)
  794. }
  795. err = ErrDecoderSizeExceeded
  796. }
  797. if err != nil {
  798. select {
  799. case <-ctx.Done():
  800. case dec := <-d.decoders:
  801. dec.sendErr(err)
  802. decodeBlock(dec)
  803. }
  804. break decodeStream
  805. }
  806. // Go through all blocks of the frame.
  807. for {
  808. var dec *blockDec
  809. select {
  810. case <-ctx.Done():
  811. break decodeStream
  812. case dec = <-d.decoders:
  813. // Once we have a decoder, we MUST return it.
  814. }
  815. err := frame.next(dec)
  816. if !historySent {
  817. h := frame.history
  818. if debugDecoder {
  819. println("Alloc History:", h.allocFrameBuffer)
  820. }
  821. hist.reset()
  822. if h.dict != nil {
  823. hist.setDict(h.dict)
  824. }
  825. dec.async.newHist = &h
  826. dec.async.fcs = frame.FrameContentSize
  827. historySent = true
  828. } else {
  829. dec.async.newHist = nil
  830. }
  831. if debugDecoder && err != nil {
  832. println("next block returned error:", err)
  833. }
  834. dec.err = err
  835. dec.hasCRC = false
  836. if dec.Last && frame.HasCheckSum && err == nil {
  837. crc, err := frame.rawInput.readSmall(4)
  838. if len(crc) < 4 {
  839. if err == nil {
  840. err = io.ErrUnexpectedEOF
  841. }
  842. println("CRC missing?", err)
  843. dec.err = err
  844. } else {
  845. dec.checkCRC = binary.LittleEndian.Uint32(crc)
  846. dec.hasCRC = true
  847. if debugDecoder {
  848. printf("found crc to check: %08x\n", dec.checkCRC)
  849. }
  850. }
  851. }
  852. err = dec.err
  853. last := dec.Last
  854. decodeBlock(dec)
  855. if err != nil {
  856. break decodeStream
  857. }
  858. if last {
  859. break
  860. }
  861. }
  862. }
  863. close(seqDecode)
  864. wg.Wait()
  865. hist.reset()
  866. d.frame.history.b = frameHistCache
  867. }
  868. func (d *Decoder) setDict(frame *frameDec) (err error) {
  869. dict, ok := d.o.dicts[frame.DictionaryID]
  870. if ok {
  871. if debugDecoder {
  872. println("setting dict", frame.DictionaryID)
  873. }
  874. frame.history.setDict(dict)
  875. } else if frame.DictionaryID != 0 {
  876. // A zero or missing dictionary id is ambiguous:
  877. // either dictionary zero, or no dictionary. In particular,
  878. // zstd --patch-from uses this id for the source file,
  879. // so only return an error if the dictionary id is not zero.
  880. err = ErrUnknownDictionary
  881. }
  882. return err
  883. }