decoder.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949
  1. // Copyright 2019+ Klaus Post. All rights reserved.
  2. // License information can be found in the LICENSE file.
  3. // Based on work by Yann Collet, released under BSD License.
  4. package zstd
  5. import (
  6. "context"
  7. "encoding/binary"
  8. "io"
  9. "sync"
  10. "github.com/klauspost/compress/zstd/internal/xxhash"
  11. )
  12. // Decoder provides decoding of zstandard streams.
  13. // The decoder has been designed to operate without allocations after a warmup.
  14. // This means that you should store the decoder for best performance.
  15. // To re-use a stream decoder, use the Reset(r io.Reader) error to switch to another stream.
  16. // A decoder can safely be re-used even if the previous stream failed.
  17. // To release the resources, you must call the Close() function on a decoder.
  18. type Decoder struct {
  19. o decoderOptions
  20. // Unreferenced decoders, ready for use.
  21. decoders chan *blockDec
  22. // Current read position used for Reader functionality.
  23. current decoderState
  24. // sync stream decoding
  25. syncStream struct {
  26. decodedFrame uint64
  27. br readerWrapper
  28. enabled bool
  29. inFrame bool
  30. dstBuf []byte
  31. }
  32. frame *frameDec
  33. // Custom dictionaries.
  34. dicts map[uint32]*dict
  35. // streamWg is the waitgroup for all streams
  36. streamWg sync.WaitGroup
  37. }
  38. // decoderState is used for maintaining state when the decoder
  39. // is used for streaming.
  40. type decoderState struct {
  41. // current block being written to stream.
  42. decodeOutput
  43. // output in order to be written to stream.
  44. output chan decodeOutput
  45. // cancel remaining output.
  46. cancel context.CancelFunc
  47. // crc of current frame
  48. crc *xxhash.Digest
  49. flushed bool
  50. }
  51. var (
  52. // Check the interfaces we want to support.
  53. _ = io.WriterTo(&Decoder{})
  54. _ = io.Reader(&Decoder{})
  55. )
  56. // NewReader creates a new decoder.
  57. // A nil Reader can be provided in which case Reset can be used to start a decode.
  58. //
  59. // A Decoder can be used in two modes:
  60. //
  61. // 1) As a stream, or
  62. // 2) For stateless decoding using DecodeAll.
  63. //
  64. // Only a single stream can be decoded concurrently, but the same decoder
  65. // can run multiple concurrent stateless decodes. It is even possible to
  66. // use stateless decodes while a stream is being decoded.
  67. //
  68. // The Reset function can be used to initiate a new stream, which will considerably
  69. // reduce the allocations normally caused by NewReader.
  70. func NewReader(r io.Reader, opts ...DOption) (*Decoder, error) {
  71. initPredefined()
  72. var d Decoder
  73. d.o.setDefault()
  74. for _, o := range opts {
  75. err := o(&d.o)
  76. if err != nil {
  77. return nil, err
  78. }
  79. }
  80. d.current.crc = xxhash.New()
  81. d.current.flushed = true
  82. if r == nil {
  83. d.current.err = ErrDecoderNilInput
  84. }
  85. // Transfer option dicts.
  86. d.dicts = make(map[uint32]*dict, len(d.o.dicts))
  87. for _, dc := range d.o.dicts {
  88. d.dicts[dc.id] = dc
  89. }
  90. d.o.dicts = nil
  91. // Create decoders
  92. d.decoders = make(chan *blockDec, d.o.concurrent)
  93. for i := 0; i < d.o.concurrent; i++ {
  94. dec := newBlockDec(d.o.lowMem)
  95. dec.localFrame = newFrameDec(d.o)
  96. d.decoders <- dec
  97. }
  98. if r == nil {
  99. return &d, nil
  100. }
  101. return &d, d.Reset(r)
  102. }
  103. // Read bytes from the decompressed stream into p.
  104. // Returns the number of bytes read and any error that occurred.
  105. // When the stream is done, io.EOF will be returned.
  106. func (d *Decoder) Read(p []byte) (int, error) {
  107. var n int
  108. for {
  109. if len(d.current.b) > 0 {
  110. filled := copy(p, d.current.b)
  111. p = p[filled:]
  112. d.current.b = d.current.b[filled:]
  113. n += filled
  114. }
  115. if len(p) == 0 {
  116. break
  117. }
  118. if len(d.current.b) == 0 {
  119. // We have an error and no more data
  120. if d.current.err != nil {
  121. break
  122. }
  123. if !d.nextBlock(n == 0) {
  124. return n, d.current.err
  125. }
  126. }
  127. }
  128. if len(d.current.b) > 0 {
  129. if debugDecoder {
  130. println("returning", n, "still bytes left:", len(d.current.b))
  131. }
  132. // Only return error at end of block
  133. return n, nil
  134. }
  135. if d.current.err != nil {
  136. d.drainOutput()
  137. }
  138. if debugDecoder {
  139. println("returning", n, d.current.err, len(d.decoders))
  140. }
  141. return n, d.current.err
  142. }
  143. // Reset will reset the decoder the supplied stream after the current has finished processing.
  144. // Note that this functionality cannot be used after Close has been called.
  145. // Reset can be called with a nil reader to release references to the previous reader.
  146. // After being called with a nil reader, no other operations than Reset or DecodeAll or Close
  147. // should be used.
  148. func (d *Decoder) Reset(r io.Reader) error {
  149. if d.current.err == ErrDecoderClosed {
  150. return d.current.err
  151. }
  152. d.drainOutput()
  153. d.syncStream.br.r = nil
  154. if r == nil {
  155. d.current.err = ErrDecoderNilInput
  156. if len(d.current.b) > 0 {
  157. d.current.b = d.current.b[:0]
  158. }
  159. d.current.flushed = true
  160. return nil
  161. }
  162. // If bytes buffer and < 5MB, do sync decoding anyway.
  163. if bb, ok := r.(byter); ok && bb.Len() < d.o.decodeBufsBelow && !d.o.limitToCap {
  164. bb2 := bb
  165. if debugDecoder {
  166. println("*bytes.Buffer detected, doing sync decode, len:", bb.Len())
  167. }
  168. b := bb2.Bytes()
  169. var dst []byte
  170. if cap(d.syncStream.dstBuf) > 0 {
  171. dst = d.syncStream.dstBuf[:0]
  172. }
  173. dst, err := d.DecodeAll(b, dst)
  174. if err == nil {
  175. err = io.EOF
  176. }
  177. // Save output buffer
  178. d.syncStream.dstBuf = dst
  179. d.current.b = dst
  180. d.current.err = err
  181. d.current.flushed = true
  182. if debugDecoder {
  183. println("sync decode to", len(dst), "bytes, err:", err)
  184. }
  185. return nil
  186. }
  187. // Remove current block.
  188. d.stashDecoder()
  189. d.current.decodeOutput = decodeOutput{}
  190. d.current.err = nil
  191. d.current.flushed = false
  192. d.current.d = nil
  193. d.syncStream.dstBuf = nil
  194. // Ensure no-one else is still running...
  195. d.streamWg.Wait()
  196. if d.frame == nil {
  197. d.frame = newFrameDec(d.o)
  198. }
  199. if d.o.concurrent == 1 {
  200. return d.startSyncDecoder(r)
  201. }
  202. d.current.output = make(chan decodeOutput, d.o.concurrent)
  203. ctx, cancel := context.WithCancel(context.Background())
  204. d.current.cancel = cancel
  205. d.streamWg.Add(1)
  206. go d.startStreamDecoder(ctx, r, d.current.output)
  207. return nil
  208. }
  209. // drainOutput will drain the output until errEndOfStream is sent.
  210. func (d *Decoder) drainOutput() {
  211. if d.current.cancel != nil {
  212. if debugDecoder {
  213. println("cancelling current")
  214. }
  215. d.current.cancel()
  216. d.current.cancel = nil
  217. }
  218. if d.current.d != nil {
  219. if debugDecoder {
  220. printf("re-adding current decoder %p, decoders: %d", d.current.d, len(d.decoders))
  221. }
  222. d.decoders <- d.current.d
  223. d.current.d = nil
  224. d.current.b = nil
  225. }
  226. if d.current.output == nil || d.current.flushed {
  227. println("current already flushed")
  228. return
  229. }
  230. for v := range d.current.output {
  231. if v.d != nil {
  232. if debugDecoder {
  233. printf("re-adding decoder %p", v.d)
  234. }
  235. d.decoders <- v.d
  236. }
  237. }
  238. d.current.output = nil
  239. d.current.flushed = true
  240. }
  241. // WriteTo writes data to w until there's no more data to write or when an error occurs.
  242. // The return value n is the number of bytes written.
  243. // Any error encountered during the write is also returned.
  244. func (d *Decoder) WriteTo(w io.Writer) (int64, error) {
  245. var n int64
  246. for {
  247. if len(d.current.b) > 0 {
  248. n2, err2 := w.Write(d.current.b)
  249. n += int64(n2)
  250. if err2 != nil && (d.current.err == nil || d.current.err == io.EOF) {
  251. d.current.err = err2
  252. } else if n2 != len(d.current.b) {
  253. d.current.err = io.ErrShortWrite
  254. }
  255. }
  256. if d.current.err != nil {
  257. break
  258. }
  259. d.nextBlock(true)
  260. }
  261. err := d.current.err
  262. if err != nil {
  263. d.drainOutput()
  264. }
  265. if err == io.EOF {
  266. err = nil
  267. }
  268. return n, err
  269. }
  270. // DecodeAll allows stateless decoding of a blob of bytes.
  271. // Output will be appended to dst, so if the destination size is known
  272. // you can pre-allocate the destination slice to avoid allocations.
  273. // DecodeAll can be used concurrently.
  274. // The Decoder concurrency limits will be respected.
  275. func (d *Decoder) DecodeAll(input, dst []byte) ([]byte, error) {
  276. if d.decoders == nil {
  277. return dst, ErrDecoderClosed
  278. }
  279. // Grab a block decoder and frame decoder.
  280. block := <-d.decoders
  281. frame := block.localFrame
  282. initialSize := len(dst)
  283. defer func() {
  284. if debugDecoder {
  285. printf("re-adding decoder: %p", block)
  286. }
  287. frame.rawInput = nil
  288. frame.bBuf = nil
  289. if frame.history.decoders.br != nil {
  290. frame.history.decoders.br.in = nil
  291. frame.history.decoders.br.cursor = 0
  292. }
  293. d.decoders <- block
  294. }()
  295. frame.bBuf = input
  296. for {
  297. frame.history.reset()
  298. err := frame.reset(&frame.bBuf)
  299. if err != nil {
  300. if err == io.EOF {
  301. if debugDecoder {
  302. println("frame reset return EOF")
  303. }
  304. return dst, nil
  305. }
  306. return dst, err
  307. }
  308. if err = d.setDict(frame); err != nil {
  309. return nil, err
  310. }
  311. if frame.WindowSize > d.o.maxWindowSize {
  312. if debugDecoder {
  313. println("window size exceeded:", frame.WindowSize, ">", d.o.maxWindowSize)
  314. }
  315. return dst, ErrWindowSizeExceeded
  316. }
  317. if frame.FrameContentSize != fcsUnknown {
  318. if frame.FrameContentSize > d.o.maxDecodedSize-uint64(len(dst)-initialSize) {
  319. if debugDecoder {
  320. println("decoder size exceeded; fcs:", frame.FrameContentSize, "> mcs:", d.o.maxDecodedSize-uint64(len(dst)-initialSize), "len:", len(dst))
  321. }
  322. return dst, ErrDecoderSizeExceeded
  323. }
  324. if d.o.limitToCap && frame.FrameContentSize > uint64(cap(dst)-len(dst)) {
  325. if debugDecoder {
  326. println("decoder size exceeded; fcs:", frame.FrameContentSize, "> (cap-len)", cap(dst)-len(dst))
  327. }
  328. return dst, ErrDecoderSizeExceeded
  329. }
  330. if cap(dst)-len(dst) < int(frame.FrameContentSize) {
  331. dst2 := make([]byte, len(dst), len(dst)+int(frame.FrameContentSize)+compressedBlockOverAlloc)
  332. copy(dst2, dst)
  333. dst = dst2
  334. }
  335. }
  336. if cap(dst) == 0 && !d.o.limitToCap {
  337. // Allocate len(input) * 2 by default if nothing is provided
  338. // and we didn't get frame content size.
  339. size := len(input) * 2
  340. // Cap to 1 MB.
  341. if size > 1<<20 {
  342. size = 1 << 20
  343. }
  344. if uint64(size) > d.o.maxDecodedSize {
  345. size = int(d.o.maxDecodedSize)
  346. }
  347. dst = make([]byte, 0, size)
  348. }
  349. dst, err = frame.runDecoder(dst, block)
  350. if err != nil {
  351. return dst, err
  352. }
  353. if uint64(len(dst)-initialSize) > d.o.maxDecodedSize {
  354. return dst, ErrDecoderSizeExceeded
  355. }
  356. if len(frame.bBuf) == 0 {
  357. if debugDecoder {
  358. println("frame dbuf empty")
  359. }
  360. break
  361. }
  362. }
  363. return dst, nil
  364. }
  365. // nextBlock returns the next block.
  366. // If an error occurs d.err will be set.
  367. // Optionally the function can block for new output.
  368. // If non-blocking mode is used the returned boolean will be false
  369. // if no data was available without blocking.
  370. func (d *Decoder) nextBlock(blocking bool) (ok bool) {
  371. if d.current.err != nil {
  372. // Keep error state.
  373. return false
  374. }
  375. d.current.b = d.current.b[:0]
  376. // SYNC:
  377. if d.syncStream.enabled {
  378. if !blocking {
  379. return false
  380. }
  381. ok = d.nextBlockSync()
  382. if !ok {
  383. d.stashDecoder()
  384. }
  385. return ok
  386. }
  387. //ASYNC:
  388. d.stashDecoder()
  389. if blocking {
  390. d.current.decodeOutput, ok = <-d.current.output
  391. } else {
  392. select {
  393. case d.current.decodeOutput, ok = <-d.current.output:
  394. default:
  395. return false
  396. }
  397. }
  398. if !ok {
  399. // This should not happen, so signal error state...
  400. d.current.err = io.ErrUnexpectedEOF
  401. return false
  402. }
  403. next := d.current.decodeOutput
  404. if next.d != nil && next.d.async.newHist != nil {
  405. d.current.crc.Reset()
  406. }
  407. if debugDecoder {
  408. var tmp [4]byte
  409. binary.LittleEndian.PutUint32(tmp[:], uint32(xxhash.Sum64(next.b)))
  410. println("got", len(d.current.b), "bytes, error:", d.current.err, "data crc:", tmp)
  411. }
  412. if d.o.ignoreChecksum {
  413. return true
  414. }
  415. if len(next.b) > 0 {
  416. d.current.crc.Write(next.b)
  417. }
  418. if next.err == nil && next.d != nil && next.d.hasCRC {
  419. got := uint32(d.current.crc.Sum64())
  420. if got != next.d.checkCRC {
  421. if debugDecoder {
  422. printf("CRC Check Failed: %08x (got) != %08x (on stream)\n", got, next.d.checkCRC)
  423. }
  424. d.current.err = ErrCRCMismatch
  425. } else {
  426. if debugDecoder {
  427. printf("CRC ok %08x\n", got)
  428. }
  429. }
  430. }
  431. return true
  432. }
  433. func (d *Decoder) nextBlockSync() (ok bool) {
  434. if d.current.d == nil {
  435. d.current.d = <-d.decoders
  436. }
  437. for len(d.current.b) == 0 {
  438. if !d.syncStream.inFrame {
  439. d.frame.history.reset()
  440. d.current.err = d.frame.reset(&d.syncStream.br)
  441. if d.current.err == nil {
  442. d.current.err = d.setDict(d.frame)
  443. }
  444. if d.current.err != nil {
  445. return false
  446. }
  447. if d.frame.WindowSize > d.o.maxDecodedSize || d.frame.WindowSize > d.o.maxWindowSize {
  448. d.current.err = ErrDecoderSizeExceeded
  449. return false
  450. }
  451. d.syncStream.decodedFrame = 0
  452. d.syncStream.inFrame = true
  453. }
  454. d.current.err = d.frame.next(d.current.d)
  455. if d.current.err != nil {
  456. return false
  457. }
  458. d.frame.history.ensureBlock()
  459. if debugDecoder {
  460. println("History trimmed:", len(d.frame.history.b), "decoded already:", d.syncStream.decodedFrame)
  461. }
  462. histBefore := len(d.frame.history.b)
  463. d.current.err = d.current.d.decodeBuf(&d.frame.history)
  464. if d.current.err != nil {
  465. println("error after:", d.current.err)
  466. return false
  467. }
  468. d.current.b = d.frame.history.b[histBefore:]
  469. if debugDecoder {
  470. println("history after:", len(d.frame.history.b))
  471. }
  472. // Check frame size (before CRC)
  473. d.syncStream.decodedFrame += uint64(len(d.current.b))
  474. if d.syncStream.decodedFrame > d.frame.FrameContentSize {
  475. if debugDecoder {
  476. printf("DecodedFrame (%d) > FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
  477. }
  478. d.current.err = ErrFrameSizeExceeded
  479. return false
  480. }
  481. // Check FCS
  482. if d.current.d.Last && d.frame.FrameContentSize != fcsUnknown && d.syncStream.decodedFrame != d.frame.FrameContentSize {
  483. if debugDecoder {
  484. printf("DecodedFrame (%d) != FrameContentSize (%d)\n", d.syncStream.decodedFrame, d.frame.FrameContentSize)
  485. }
  486. d.current.err = ErrFrameSizeMismatch
  487. return false
  488. }
  489. // Update/Check CRC
  490. if d.frame.HasCheckSum {
  491. if !d.o.ignoreChecksum {
  492. d.frame.crc.Write(d.current.b)
  493. }
  494. if d.current.d.Last {
  495. if !d.o.ignoreChecksum {
  496. d.current.err = d.frame.checkCRC()
  497. } else {
  498. d.current.err = d.frame.consumeCRC()
  499. }
  500. if d.current.err != nil {
  501. println("CRC error:", d.current.err)
  502. return false
  503. }
  504. }
  505. }
  506. d.syncStream.inFrame = !d.current.d.Last
  507. }
  508. return true
  509. }
  510. func (d *Decoder) stashDecoder() {
  511. if d.current.d != nil {
  512. if debugDecoder {
  513. printf("re-adding current decoder %p", d.current.d)
  514. }
  515. d.decoders <- d.current.d
  516. d.current.d = nil
  517. }
  518. }
  519. // Close will release all resources.
  520. // It is NOT possible to reuse the decoder after this.
  521. func (d *Decoder) Close() {
  522. if d.current.err == ErrDecoderClosed {
  523. return
  524. }
  525. d.drainOutput()
  526. if d.current.cancel != nil {
  527. d.current.cancel()
  528. d.streamWg.Wait()
  529. d.current.cancel = nil
  530. }
  531. if d.decoders != nil {
  532. close(d.decoders)
  533. for dec := range d.decoders {
  534. dec.Close()
  535. }
  536. d.decoders = nil
  537. }
  538. if d.current.d != nil {
  539. d.current.d.Close()
  540. d.current.d = nil
  541. }
  542. d.current.err = ErrDecoderClosed
  543. }
  544. // IOReadCloser returns the decoder as an io.ReadCloser for convenience.
  545. // Any changes to the decoder will be reflected, so the returned ReadCloser
  546. // can be reused along with the decoder.
  547. // io.WriterTo is also supported by the returned ReadCloser.
  548. func (d *Decoder) IOReadCloser() io.ReadCloser {
  549. return closeWrapper{d: d}
  550. }
  551. // closeWrapper wraps a function call as a closer.
  552. type closeWrapper struct {
  553. d *Decoder
  554. }
  555. // WriteTo forwards WriteTo calls to the decoder.
  556. func (c closeWrapper) WriteTo(w io.Writer) (n int64, err error) {
  557. return c.d.WriteTo(w)
  558. }
  559. // Read forwards read calls to the decoder.
  560. func (c closeWrapper) Read(p []byte) (n int, err error) {
  561. return c.d.Read(p)
  562. }
  563. // Close closes the decoder.
  564. func (c closeWrapper) Close() error {
  565. c.d.Close()
  566. return nil
  567. }
  568. type decodeOutput struct {
  569. d *blockDec
  570. b []byte
  571. err error
  572. }
  573. func (d *Decoder) startSyncDecoder(r io.Reader) error {
  574. d.frame.history.reset()
  575. d.syncStream.br = readerWrapper{r: r}
  576. d.syncStream.inFrame = false
  577. d.syncStream.enabled = true
  578. d.syncStream.decodedFrame = 0
  579. return nil
  580. }
  581. // Create Decoder:
  582. // ASYNC:
  583. // Spawn 3 go routines.
  584. // 0: Read frames and decode block literals.
  585. // 1: Decode sequences.
  586. // 2: Execute sequences, send to output.
  587. func (d *Decoder) startStreamDecoder(ctx context.Context, r io.Reader, output chan decodeOutput) {
  588. defer d.streamWg.Done()
  589. br := readerWrapper{r: r}
  590. var seqDecode = make(chan *blockDec, d.o.concurrent)
  591. var seqExecute = make(chan *blockDec, d.o.concurrent)
  592. // Async 1: Decode sequences...
  593. go func() {
  594. var hist history
  595. var hasErr bool
  596. for block := range seqDecode {
  597. if hasErr {
  598. if block != nil {
  599. seqExecute <- block
  600. }
  601. continue
  602. }
  603. if block.async.newHist != nil {
  604. if debugDecoder {
  605. println("Async 1: new history, recent:", block.async.newHist.recentOffsets)
  606. }
  607. hist.reset()
  608. hist.decoders = block.async.newHist.decoders
  609. hist.recentOffsets = block.async.newHist.recentOffsets
  610. hist.windowSize = block.async.newHist.windowSize
  611. if block.async.newHist.dict != nil {
  612. hist.setDict(block.async.newHist.dict)
  613. }
  614. }
  615. if block.err != nil || block.Type != blockTypeCompressed {
  616. hasErr = block.err != nil
  617. seqExecute <- block
  618. continue
  619. }
  620. hist.decoders.literals = block.async.literals
  621. block.err = block.prepareSequences(block.async.seqData, &hist)
  622. if debugDecoder && block.err != nil {
  623. println("prepareSequences returned:", block.err)
  624. }
  625. hasErr = block.err != nil
  626. if block.err == nil {
  627. block.err = block.decodeSequences(&hist)
  628. if debugDecoder && block.err != nil {
  629. println("decodeSequences returned:", block.err)
  630. }
  631. hasErr = block.err != nil
  632. // block.async.sequence = hist.decoders.seq[:hist.decoders.nSeqs]
  633. block.async.seqSize = hist.decoders.seqSize
  634. }
  635. seqExecute <- block
  636. }
  637. close(seqExecute)
  638. hist.reset()
  639. }()
  640. var wg sync.WaitGroup
  641. wg.Add(1)
  642. // Async 3: Execute sequences...
  643. frameHistCache := d.frame.history.b
  644. go func() {
  645. var hist history
  646. var decodedFrame uint64
  647. var fcs uint64
  648. var hasErr bool
  649. for block := range seqExecute {
  650. out := decodeOutput{err: block.err, d: block}
  651. if block.err != nil || hasErr {
  652. hasErr = true
  653. output <- out
  654. continue
  655. }
  656. if block.async.newHist != nil {
  657. if debugDecoder {
  658. println("Async 2: new history")
  659. }
  660. hist.reset()
  661. hist.windowSize = block.async.newHist.windowSize
  662. hist.allocFrameBuffer = block.async.newHist.allocFrameBuffer
  663. if block.async.newHist.dict != nil {
  664. hist.setDict(block.async.newHist.dict)
  665. }
  666. if cap(hist.b) < hist.allocFrameBuffer {
  667. if cap(frameHistCache) >= hist.allocFrameBuffer {
  668. hist.b = frameHistCache
  669. } else {
  670. hist.b = make([]byte, 0, hist.allocFrameBuffer)
  671. println("Alloc history sized", hist.allocFrameBuffer)
  672. }
  673. }
  674. hist.b = hist.b[:0]
  675. fcs = block.async.fcs
  676. decodedFrame = 0
  677. }
  678. do := decodeOutput{err: block.err, d: block}
  679. switch block.Type {
  680. case blockTypeRLE:
  681. if debugDecoder {
  682. println("add rle block length:", block.RLESize)
  683. }
  684. if cap(block.dst) < int(block.RLESize) {
  685. if block.lowMem {
  686. block.dst = make([]byte, block.RLESize)
  687. } else {
  688. block.dst = make([]byte, maxCompressedBlockSize)
  689. }
  690. }
  691. block.dst = block.dst[:block.RLESize]
  692. v := block.data[0]
  693. for i := range block.dst {
  694. block.dst[i] = v
  695. }
  696. hist.append(block.dst)
  697. do.b = block.dst
  698. case blockTypeRaw:
  699. if debugDecoder {
  700. println("add raw block length:", len(block.data))
  701. }
  702. hist.append(block.data)
  703. do.b = block.data
  704. case blockTypeCompressed:
  705. if debugDecoder {
  706. println("execute with history length:", len(hist.b), "window:", hist.windowSize)
  707. }
  708. hist.decoders.seqSize = block.async.seqSize
  709. hist.decoders.literals = block.async.literals
  710. do.err = block.executeSequences(&hist)
  711. hasErr = do.err != nil
  712. if debugDecoder && hasErr {
  713. println("executeSequences returned:", do.err)
  714. }
  715. do.b = block.dst
  716. }
  717. if !hasErr {
  718. decodedFrame += uint64(len(do.b))
  719. if decodedFrame > fcs {
  720. println("fcs exceeded", block.Last, fcs, decodedFrame)
  721. do.err = ErrFrameSizeExceeded
  722. hasErr = true
  723. } else if block.Last && fcs != fcsUnknown && decodedFrame != fcs {
  724. do.err = ErrFrameSizeMismatch
  725. hasErr = true
  726. } else {
  727. if debugDecoder {
  728. println("fcs ok", block.Last, fcs, decodedFrame)
  729. }
  730. }
  731. }
  732. output <- do
  733. }
  734. close(output)
  735. frameHistCache = hist.b
  736. wg.Done()
  737. if debugDecoder {
  738. println("decoder goroutines finished")
  739. }
  740. hist.reset()
  741. }()
  742. var hist history
  743. decodeStream:
  744. for {
  745. var hasErr bool
  746. hist.reset()
  747. decodeBlock := func(block *blockDec) {
  748. if hasErr {
  749. if block != nil {
  750. seqDecode <- block
  751. }
  752. return
  753. }
  754. if block.err != nil || block.Type != blockTypeCompressed {
  755. hasErr = block.err != nil
  756. seqDecode <- block
  757. return
  758. }
  759. remain, err := block.decodeLiterals(block.data, &hist)
  760. block.err = err
  761. hasErr = block.err != nil
  762. if err == nil {
  763. block.async.literals = hist.decoders.literals
  764. block.async.seqData = remain
  765. } else if debugDecoder {
  766. println("decodeLiterals error:", err)
  767. }
  768. seqDecode <- block
  769. }
  770. frame := d.frame
  771. if debugDecoder {
  772. println("New frame...")
  773. }
  774. var historySent bool
  775. frame.history.reset()
  776. err := frame.reset(&br)
  777. if debugDecoder && err != nil {
  778. println("Frame decoder returned", err)
  779. }
  780. if err == nil {
  781. err = d.setDict(frame)
  782. }
  783. if err == nil && d.frame.WindowSize > d.o.maxWindowSize {
  784. if debugDecoder {
  785. println("decoder size exceeded, fws:", d.frame.WindowSize, "> mws:", d.o.maxWindowSize)
  786. }
  787. err = ErrDecoderSizeExceeded
  788. }
  789. if err != nil {
  790. select {
  791. case <-ctx.Done():
  792. case dec := <-d.decoders:
  793. dec.sendErr(err)
  794. decodeBlock(dec)
  795. }
  796. break decodeStream
  797. }
  798. // Go through all blocks of the frame.
  799. for {
  800. var dec *blockDec
  801. select {
  802. case <-ctx.Done():
  803. break decodeStream
  804. case dec = <-d.decoders:
  805. // Once we have a decoder, we MUST return it.
  806. }
  807. err := frame.next(dec)
  808. if !historySent {
  809. h := frame.history
  810. if debugDecoder {
  811. println("Alloc History:", h.allocFrameBuffer)
  812. }
  813. hist.reset()
  814. if h.dict != nil {
  815. hist.setDict(h.dict)
  816. }
  817. dec.async.newHist = &h
  818. dec.async.fcs = frame.FrameContentSize
  819. historySent = true
  820. } else {
  821. dec.async.newHist = nil
  822. }
  823. if debugDecoder && err != nil {
  824. println("next block returned error:", err)
  825. }
  826. dec.err = err
  827. dec.hasCRC = false
  828. if dec.Last && frame.HasCheckSum && err == nil {
  829. crc, err := frame.rawInput.readSmall(4)
  830. if len(crc) < 4 {
  831. if err == nil {
  832. err = io.ErrUnexpectedEOF
  833. }
  834. println("CRC missing?", err)
  835. dec.err = err
  836. } else {
  837. dec.checkCRC = binary.LittleEndian.Uint32(crc)
  838. dec.hasCRC = true
  839. if debugDecoder {
  840. printf("found crc to check: %08x\n", dec.checkCRC)
  841. }
  842. }
  843. }
  844. err = dec.err
  845. last := dec.Last
  846. decodeBlock(dec)
  847. if err != nil {
  848. break decodeStream
  849. }
  850. if last {
  851. break
  852. }
  853. }
  854. }
  855. close(seqDecode)
  856. wg.Wait()
  857. hist.reset()
  858. d.frame.history.b = frameHistCache
  859. }
  860. func (d *Decoder) setDict(frame *frameDec) (err error) {
  861. dict, ok := d.dicts[frame.DictionaryID]
  862. if ok {
  863. if debugDecoder {
  864. println("setting dict", frame.DictionaryID)
  865. }
  866. frame.history.setDict(dict)
  867. } else if frame.DictionaryID != 0 {
  868. // A zero or missing dictionary id is ambiguous:
  869. // either dictionary zero, or no dictionary. In particular,
  870. // zstd --patch-from uses this id for the source file,
  871. // so only return an error if the dictionary id is not zero.
  872. err = ErrUnknownDictionary
  873. }
  874. return err
  875. }