reader.go 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075
  1. // Copyright 2011 The Snappy-Go Authors. All rights reserved.
  2. // Copyright (c) 2019+ Klaus Post. All rights reserved.
  3. // Use of this source code is governed by a BSD-style
  4. // license that can be found in the LICENSE file.
  5. package s2
  6. import (
  7. "errors"
  8. "fmt"
  9. "io"
  10. "io/ioutil"
  11. "math"
  12. "runtime"
  13. "sync"
  14. )
  15. // ErrCantSeek is returned if the stream cannot be seeked.
  16. type ErrCantSeek struct {
  17. Reason string
  18. }
  19. // Error returns the error as string.
  20. func (e ErrCantSeek) Error() string {
  21. return fmt.Sprintf("s2: Can't seek because %s", e.Reason)
  22. }
  23. // NewReader returns a new Reader that decompresses from r, using the framing
  24. // format described at
  25. // https://github.com/google/snappy/blob/master/framing_format.txt with S2 changes.
  26. func NewReader(r io.Reader, opts ...ReaderOption) *Reader {
  27. nr := Reader{
  28. r: r,
  29. maxBlock: maxBlockSize,
  30. }
  31. for _, opt := range opts {
  32. if err := opt(&nr); err != nil {
  33. nr.err = err
  34. return &nr
  35. }
  36. }
  37. nr.maxBufSize = MaxEncodedLen(nr.maxBlock) + checksumSize
  38. if nr.lazyBuf > 0 {
  39. nr.buf = make([]byte, MaxEncodedLen(nr.lazyBuf)+checksumSize)
  40. } else {
  41. nr.buf = make([]byte, MaxEncodedLen(defaultBlockSize)+checksumSize)
  42. }
  43. nr.readHeader = nr.ignoreStreamID
  44. nr.paramsOK = true
  45. return &nr
  46. }
  47. // ReaderOption is an option for creating a decoder.
  48. type ReaderOption func(*Reader) error
  49. // ReaderMaxBlockSize allows to control allocations if the stream
  50. // has been compressed with a smaller WriterBlockSize, or with the default 1MB.
  51. // Blocks must be this size or smaller to decompress,
  52. // otherwise the decoder will return ErrUnsupported.
  53. //
  54. // For streams compressed with Snappy this can safely be set to 64KB (64 << 10).
  55. //
  56. // Default is the maximum limit of 4MB.
  57. func ReaderMaxBlockSize(blockSize int) ReaderOption {
  58. return func(r *Reader) error {
  59. if blockSize > maxBlockSize || blockSize <= 0 {
  60. return errors.New("s2: block size too large. Must be <= 4MB and > 0")
  61. }
  62. if r.lazyBuf == 0 && blockSize < defaultBlockSize {
  63. r.lazyBuf = blockSize
  64. }
  65. r.maxBlock = blockSize
  66. return nil
  67. }
  68. }
  69. // ReaderAllocBlock allows to control upfront stream allocations
  70. // and not allocate for frames bigger than this initially.
  71. // If frames bigger than this is seen a bigger buffer will be allocated.
  72. //
  73. // Default is 1MB, which is default output size.
  74. func ReaderAllocBlock(blockSize int) ReaderOption {
  75. return func(r *Reader) error {
  76. if blockSize > maxBlockSize || blockSize < 1024 {
  77. return errors.New("s2: invalid ReaderAllocBlock. Must be <= 4MB and >= 1024")
  78. }
  79. r.lazyBuf = blockSize
  80. return nil
  81. }
  82. }
  83. // ReaderIgnoreStreamIdentifier will make the reader skip the expected
  84. // stream identifier at the beginning of the stream.
  85. // This can be used when serving a stream that has been forwarded to a specific point.
  86. func ReaderIgnoreStreamIdentifier() ReaderOption {
  87. return func(r *Reader) error {
  88. r.ignoreStreamID = true
  89. return nil
  90. }
  91. }
  92. // ReaderSkippableCB will register a callback for chuncks with the specified ID.
  93. // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
  94. // For each chunk with the ID, the callback is called with the content.
  95. // Any returned non-nil error will abort decompression.
  96. // Only one callback per ID is supported, latest sent will be used.
  97. // You can peek the stream, triggering the callback, by doing a Read with a 0
  98. // byte buffer.
  99. func ReaderSkippableCB(id uint8, fn func(r io.Reader) error) ReaderOption {
  100. return func(r *Reader) error {
  101. if id < 0x80 || id > 0xfd {
  102. return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfd (inclusive)")
  103. }
  104. r.skippableCB[id-0x80] = fn
  105. return nil
  106. }
  107. }
  108. // ReaderIgnoreCRC will make the reader skip CRC calculation and checks.
  109. func ReaderIgnoreCRC() ReaderOption {
  110. return func(r *Reader) error {
  111. r.ignoreCRC = true
  112. return nil
  113. }
  114. }
  115. // Reader is an io.Reader that can read Snappy-compressed bytes.
  116. type Reader struct {
  117. r io.Reader
  118. err error
  119. decoded []byte
  120. buf []byte
  121. skippableCB [0xff - 0x80]func(r io.Reader) error
  122. blockStart int64 // Uncompressed offset at start of current.
  123. index *Index
  124. // decoded[i:j] contains decoded bytes that have not yet been passed on.
  125. i, j int
  126. // maximum block size allowed.
  127. maxBlock int
  128. // maximum expected buffer size.
  129. maxBufSize int
  130. // alloc a buffer this size if > 0.
  131. lazyBuf int
  132. readHeader bool
  133. paramsOK bool
  134. snappyFrame bool
  135. ignoreStreamID bool
  136. ignoreCRC bool
  137. }
  138. // GetBufferCapacity returns the capacity of the internal buffer.
  139. // This might be useful to know when reusing the same reader in combination
  140. // with the lazy buffer option.
  141. func (r *Reader) GetBufferCapacity() int {
  142. return cap(r.buf)
  143. }
  144. // ensureBufferSize will ensure that the buffer can take at least n bytes.
  145. // If false is returned the buffer exceeds maximum allowed size.
  146. func (r *Reader) ensureBufferSize(n int) bool {
  147. if n > r.maxBufSize {
  148. r.err = ErrCorrupt
  149. return false
  150. }
  151. if cap(r.buf) >= n {
  152. return true
  153. }
  154. // Realloc buffer.
  155. r.buf = make([]byte, n)
  156. return true
  157. }
  158. // Reset discards any buffered data, resets all state, and switches the Snappy
  159. // reader to read from r. This permits reusing a Reader rather than allocating
  160. // a new one.
  161. func (r *Reader) Reset(reader io.Reader) {
  162. if !r.paramsOK {
  163. return
  164. }
  165. r.index = nil
  166. r.r = reader
  167. r.err = nil
  168. r.i = 0
  169. r.j = 0
  170. r.blockStart = 0
  171. r.readHeader = r.ignoreStreamID
  172. }
  173. func (r *Reader) readFull(p []byte, allowEOF bool) (ok bool) {
  174. if _, r.err = io.ReadFull(r.r, p); r.err != nil {
  175. if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
  176. r.err = ErrCorrupt
  177. }
  178. return false
  179. }
  180. return true
  181. }
  182. // skippable will skip n bytes.
  183. // If the supplied reader supports seeking that is used.
  184. // tmp is used as a temporary buffer for reading.
  185. // The supplied slice does not need to be the size of the read.
  186. func (r *Reader) skippable(tmp []byte, n int, allowEOF bool, id uint8) (ok bool) {
  187. if id < 0x80 {
  188. r.err = fmt.Errorf("internal error: skippable id < 0x80")
  189. return false
  190. }
  191. if fn := r.skippableCB[id-0x80]; fn != nil {
  192. rd := io.LimitReader(r.r, int64(n))
  193. r.err = fn(rd)
  194. if r.err != nil {
  195. return false
  196. }
  197. _, r.err = io.CopyBuffer(ioutil.Discard, rd, tmp)
  198. return r.err == nil
  199. }
  200. if rs, ok := r.r.(io.ReadSeeker); ok {
  201. _, err := rs.Seek(int64(n), io.SeekCurrent)
  202. if err == nil {
  203. return true
  204. }
  205. if err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
  206. r.err = ErrCorrupt
  207. return false
  208. }
  209. }
  210. for n > 0 {
  211. if n < len(tmp) {
  212. tmp = tmp[:n]
  213. }
  214. if _, r.err = io.ReadFull(r.r, tmp); r.err != nil {
  215. if r.err == io.ErrUnexpectedEOF || (r.err == io.EOF && !allowEOF) {
  216. r.err = ErrCorrupt
  217. }
  218. return false
  219. }
  220. n -= len(tmp)
  221. }
  222. return true
  223. }
  224. // Read satisfies the io.Reader interface.
  225. func (r *Reader) Read(p []byte) (int, error) {
  226. if r.err != nil {
  227. return 0, r.err
  228. }
  229. for {
  230. if r.i < r.j {
  231. n := copy(p, r.decoded[r.i:r.j])
  232. r.i += n
  233. return n, nil
  234. }
  235. if !r.readFull(r.buf[:4], true) {
  236. return 0, r.err
  237. }
  238. chunkType := r.buf[0]
  239. if !r.readHeader {
  240. if chunkType != chunkTypeStreamIdentifier {
  241. r.err = ErrCorrupt
  242. return 0, r.err
  243. }
  244. r.readHeader = true
  245. }
  246. chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
  247. // The chunk types are specified at
  248. // https://github.com/google/snappy/blob/master/framing_format.txt
  249. switch chunkType {
  250. case chunkTypeCompressedData:
  251. r.blockStart += int64(r.j)
  252. // Section 4.2. Compressed data (chunk type 0x00).
  253. if chunkLen < checksumSize {
  254. r.err = ErrCorrupt
  255. return 0, r.err
  256. }
  257. if !r.ensureBufferSize(chunkLen) {
  258. if r.err == nil {
  259. r.err = ErrUnsupported
  260. }
  261. return 0, r.err
  262. }
  263. buf := r.buf[:chunkLen]
  264. if !r.readFull(buf, false) {
  265. return 0, r.err
  266. }
  267. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  268. buf = buf[checksumSize:]
  269. n, err := DecodedLen(buf)
  270. if err != nil {
  271. r.err = err
  272. return 0, r.err
  273. }
  274. if r.snappyFrame && n > maxSnappyBlockSize {
  275. r.err = ErrCorrupt
  276. return 0, r.err
  277. }
  278. if n > len(r.decoded) {
  279. if n > r.maxBlock {
  280. r.err = ErrCorrupt
  281. return 0, r.err
  282. }
  283. r.decoded = make([]byte, n)
  284. }
  285. if _, err := Decode(r.decoded, buf); err != nil {
  286. r.err = err
  287. return 0, r.err
  288. }
  289. if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
  290. r.err = ErrCRC
  291. return 0, r.err
  292. }
  293. r.i, r.j = 0, n
  294. continue
  295. case chunkTypeUncompressedData:
  296. r.blockStart += int64(r.j)
  297. // Section 4.3. Uncompressed data (chunk type 0x01).
  298. if chunkLen < checksumSize {
  299. r.err = ErrCorrupt
  300. return 0, r.err
  301. }
  302. if !r.ensureBufferSize(chunkLen) {
  303. if r.err == nil {
  304. r.err = ErrUnsupported
  305. }
  306. return 0, r.err
  307. }
  308. buf := r.buf[:checksumSize]
  309. if !r.readFull(buf, false) {
  310. return 0, r.err
  311. }
  312. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  313. // Read directly into r.decoded instead of via r.buf.
  314. n := chunkLen - checksumSize
  315. if r.snappyFrame && n > maxSnappyBlockSize {
  316. r.err = ErrCorrupt
  317. return 0, r.err
  318. }
  319. if n > len(r.decoded) {
  320. if n > r.maxBlock {
  321. r.err = ErrCorrupt
  322. return 0, r.err
  323. }
  324. r.decoded = make([]byte, n)
  325. }
  326. if !r.readFull(r.decoded[:n], false) {
  327. return 0, r.err
  328. }
  329. if !r.ignoreCRC && crc(r.decoded[:n]) != checksum {
  330. r.err = ErrCRC
  331. return 0, r.err
  332. }
  333. r.i, r.j = 0, n
  334. continue
  335. case chunkTypeStreamIdentifier:
  336. // Section 4.1. Stream identifier (chunk type 0xff).
  337. if chunkLen != len(magicBody) {
  338. r.err = ErrCorrupt
  339. return 0, r.err
  340. }
  341. if !r.readFull(r.buf[:len(magicBody)], false) {
  342. return 0, r.err
  343. }
  344. if string(r.buf[:len(magicBody)]) != magicBody {
  345. if string(r.buf[:len(magicBody)]) != magicBodySnappy {
  346. r.err = ErrCorrupt
  347. return 0, r.err
  348. } else {
  349. r.snappyFrame = true
  350. }
  351. } else {
  352. r.snappyFrame = false
  353. }
  354. continue
  355. }
  356. if chunkType <= 0x7f {
  357. // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
  358. // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
  359. r.err = ErrUnsupported
  360. return 0, r.err
  361. }
  362. // Section 4.4 Padding (chunk type 0xfe).
  363. // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
  364. if chunkLen > maxChunkSize {
  365. // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
  366. r.err = ErrUnsupported
  367. return 0, r.err
  368. }
  369. // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
  370. if !r.skippable(r.buf, chunkLen, false, chunkType) {
  371. return 0, r.err
  372. }
  373. }
  374. }
  375. // DecodeConcurrent will decode the full stream to w.
  376. // This function should not be combined with reading, seeking or other operations.
  377. // Up to 'concurrent' goroutines will be used.
  378. // If <= 0, runtime.NumCPU will be used.
  379. // On success the number of bytes decompressed nil and is returned.
  380. // This is mainly intended for bigger streams.
  381. func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, err error) {
  382. if r.i > 0 || r.j > 0 || r.blockStart > 0 {
  383. return 0, errors.New("DecodeConcurrent called after ")
  384. }
  385. if concurrent <= 0 {
  386. concurrent = runtime.NumCPU()
  387. }
  388. // Write to output
  389. var errMu sync.Mutex
  390. var aErr error
  391. setErr := func(e error) (ok bool) {
  392. errMu.Lock()
  393. defer errMu.Unlock()
  394. if e == nil {
  395. return aErr == nil
  396. }
  397. if aErr == nil {
  398. aErr = e
  399. }
  400. return false
  401. }
  402. hasErr := func() (ok bool) {
  403. errMu.Lock()
  404. v := aErr != nil
  405. errMu.Unlock()
  406. return v
  407. }
  408. var aWritten int64
  409. toRead := make(chan []byte, concurrent)
  410. writtenBlocks := make(chan []byte, concurrent)
  411. queue := make(chan chan []byte, concurrent)
  412. reUse := make(chan chan []byte, concurrent)
  413. for i := 0; i < concurrent; i++ {
  414. toRead <- make([]byte, 0, r.maxBufSize)
  415. writtenBlocks <- make([]byte, 0, r.maxBufSize)
  416. reUse <- make(chan []byte, 1)
  417. }
  418. // Writer
  419. var wg sync.WaitGroup
  420. wg.Add(1)
  421. go func() {
  422. defer wg.Done()
  423. for toWrite := range queue {
  424. entry := <-toWrite
  425. reUse <- toWrite
  426. if hasErr() || entry == nil {
  427. if entry != nil {
  428. writtenBlocks <- entry
  429. }
  430. continue
  431. }
  432. if hasErr() {
  433. writtenBlocks <- entry
  434. continue
  435. }
  436. n, err := w.Write(entry)
  437. want := len(entry)
  438. writtenBlocks <- entry
  439. if err != nil {
  440. setErr(err)
  441. continue
  442. }
  443. if n != want {
  444. setErr(io.ErrShortWrite)
  445. continue
  446. }
  447. aWritten += int64(n)
  448. }
  449. }()
  450. defer func() {
  451. if r.err != nil {
  452. setErr(r.err)
  453. } else if err != nil {
  454. setErr(err)
  455. }
  456. close(queue)
  457. wg.Wait()
  458. if err == nil {
  459. err = aErr
  460. }
  461. written = aWritten
  462. }()
  463. // Reader
  464. for !hasErr() {
  465. if !r.readFull(r.buf[:4], true) {
  466. if r.err == io.EOF {
  467. r.err = nil
  468. }
  469. return 0, r.err
  470. }
  471. chunkType := r.buf[0]
  472. if !r.readHeader {
  473. if chunkType != chunkTypeStreamIdentifier {
  474. r.err = ErrCorrupt
  475. return 0, r.err
  476. }
  477. r.readHeader = true
  478. }
  479. chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
  480. // The chunk types are specified at
  481. // https://github.com/google/snappy/blob/master/framing_format.txt
  482. switch chunkType {
  483. case chunkTypeCompressedData:
  484. r.blockStart += int64(r.j)
  485. // Section 4.2. Compressed data (chunk type 0x00).
  486. if chunkLen < checksumSize {
  487. r.err = ErrCorrupt
  488. return 0, r.err
  489. }
  490. if chunkLen > r.maxBufSize {
  491. r.err = ErrCorrupt
  492. return 0, r.err
  493. }
  494. orgBuf := <-toRead
  495. buf := orgBuf[:chunkLen]
  496. if !r.readFull(buf, false) {
  497. return 0, r.err
  498. }
  499. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  500. buf = buf[checksumSize:]
  501. n, err := DecodedLen(buf)
  502. if err != nil {
  503. r.err = err
  504. return 0, r.err
  505. }
  506. if r.snappyFrame && n > maxSnappyBlockSize {
  507. r.err = ErrCorrupt
  508. return 0, r.err
  509. }
  510. if n > r.maxBlock {
  511. r.err = ErrCorrupt
  512. return 0, r.err
  513. }
  514. wg.Add(1)
  515. decoded := <-writtenBlocks
  516. entry := <-reUse
  517. queue <- entry
  518. go func() {
  519. defer wg.Done()
  520. decoded = decoded[:n]
  521. _, err := Decode(decoded, buf)
  522. toRead <- orgBuf
  523. if err != nil {
  524. writtenBlocks <- decoded
  525. setErr(err)
  526. entry <- nil
  527. return
  528. }
  529. if !r.ignoreCRC && crc(decoded) != checksum {
  530. writtenBlocks <- decoded
  531. setErr(ErrCRC)
  532. entry <- nil
  533. return
  534. }
  535. entry <- decoded
  536. }()
  537. continue
  538. case chunkTypeUncompressedData:
  539. // Section 4.3. Uncompressed data (chunk type 0x01).
  540. if chunkLen < checksumSize {
  541. r.err = ErrCorrupt
  542. return 0, r.err
  543. }
  544. if chunkLen > r.maxBufSize {
  545. r.err = ErrCorrupt
  546. return 0, r.err
  547. }
  548. // Grab write buffer
  549. orgBuf := <-writtenBlocks
  550. buf := orgBuf[:checksumSize]
  551. if !r.readFull(buf, false) {
  552. return 0, r.err
  553. }
  554. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  555. // Read content.
  556. n := chunkLen - checksumSize
  557. if r.snappyFrame && n > maxSnappyBlockSize {
  558. r.err = ErrCorrupt
  559. return 0, r.err
  560. }
  561. if n > r.maxBlock {
  562. r.err = ErrCorrupt
  563. return 0, r.err
  564. }
  565. // Read uncompressed
  566. buf = orgBuf[:n]
  567. if !r.readFull(buf, false) {
  568. return 0, r.err
  569. }
  570. if !r.ignoreCRC && crc(buf) != checksum {
  571. r.err = ErrCRC
  572. return 0, r.err
  573. }
  574. entry := <-reUse
  575. queue <- entry
  576. entry <- buf
  577. continue
  578. case chunkTypeStreamIdentifier:
  579. // Section 4.1. Stream identifier (chunk type 0xff).
  580. if chunkLen != len(magicBody) {
  581. r.err = ErrCorrupt
  582. return 0, r.err
  583. }
  584. if !r.readFull(r.buf[:len(magicBody)], false) {
  585. return 0, r.err
  586. }
  587. if string(r.buf[:len(magicBody)]) != magicBody {
  588. if string(r.buf[:len(magicBody)]) != magicBodySnappy {
  589. r.err = ErrCorrupt
  590. return 0, r.err
  591. } else {
  592. r.snappyFrame = true
  593. }
  594. } else {
  595. r.snappyFrame = false
  596. }
  597. continue
  598. }
  599. if chunkType <= 0x7f {
  600. // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
  601. // fmt.Printf("ERR chunktype: 0x%x\n", chunkType)
  602. r.err = ErrUnsupported
  603. return 0, r.err
  604. }
  605. // Section 4.4 Padding (chunk type 0xfe).
  606. // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
  607. if chunkLen > maxChunkSize {
  608. // fmt.Printf("ERR chunkLen: 0x%x\n", chunkLen)
  609. r.err = ErrUnsupported
  610. return 0, r.err
  611. }
  612. // fmt.Printf("skippable: ID: 0x%x, len: 0x%x\n", chunkType, chunkLen)
  613. if !r.skippable(r.buf, chunkLen, false, chunkType) {
  614. return 0, r.err
  615. }
  616. }
  617. return 0, r.err
  618. }
  619. // Skip will skip n bytes forward in the decompressed output.
  620. // For larger skips this consumes less CPU and is faster than reading output and discarding it.
  621. // CRC is not checked on skipped blocks.
  622. // io.ErrUnexpectedEOF is returned if the stream ends before all bytes have been skipped.
  623. // If a decoding error is encountered subsequent calls to Read will also fail.
  624. func (r *Reader) Skip(n int64) error {
  625. if n < 0 {
  626. return errors.New("attempted negative skip")
  627. }
  628. if r.err != nil {
  629. return r.err
  630. }
  631. for n > 0 {
  632. if r.i < r.j {
  633. // Skip in buffer.
  634. // decoded[i:j] contains decoded bytes that have not yet been passed on.
  635. left := int64(r.j - r.i)
  636. if left >= n {
  637. tmp := int64(r.i) + n
  638. if tmp > math.MaxInt32 {
  639. return errors.New("s2: internal overflow in skip")
  640. }
  641. r.i = int(tmp)
  642. return nil
  643. }
  644. n -= int64(r.j - r.i)
  645. r.i = r.j
  646. }
  647. // Buffer empty; read blocks until we have content.
  648. if !r.readFull(r.buf[:4], true) {
  649. if r.err == io.EOF {
  650. r.err = io.ErrUnexpectedEOF
  651. }
  652. return r.err
  653. }
  654. chunkType := r.buf[0]
  655. if !r.readHeader {
  656. if chunkType != chunkTypeStreamIdentifier {
  657. r.err = ErrCorrupt
  658. return r.err
  659. }
  660. r.readHeader = true
  661. }
  662. chunkLen := int(r.buf[1]) | int(r.buf[2])<<8 | int(r.buf[3])<<16
  663. // The chunk types are specified at
  664. // https://github.com/google/snappy/blob/master/framing_format.txt
  665. switch chunkType {
  666. case chunkTypeCompressedData:
  667. r.blockStart += int64(r.j)
  668. // Section 4.2. Compressed data (chunk type 0x00).
  669. if chunkLen < checksumSize {
  670. r.err = ErrCorrupt
  671. return r.err
  672. }
  673. if !r.ensureBufferSize(chunkLen) {
  674. if r.err == nil {
  675. r.err = ErrUnsupported
  676. }
  677. return r.err
  678. }
  679. buf := r.buf[:chunkLen]
  680. if !r.readFull(buf, false) {
  681. return r.err
  682. }
  683. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  684. buf = buf[checksumSize:]
  685. dLen, err := DecodedLen(buf)
  686. if err != nil {
  687. r.err = err
  688. return r.err
  689. }
  690. if dLen > r.maxBlock {
  691. r.err = ErrCorrupt
  692. return r.err
  693. }
  694. // Check if destination is within this block
  695. if int64(dLen) > n {
  696. if len(r.decoded) < dLen {
  697. r.decoded = make([]byte, dLen)
  698. }
  699. if _, err := Decode(r.decoded, buf); err != nil {
  700. r.err = err
  701. return r.err
  702. }
  703. if crc(r.decoded[:dLen]) != checksum {
  704. r.err = ErrCorrupt
  705. return r.err
  706. }
  707. } else {
  708. // Skip block completely
  709. n -= int64(dLen)
  710. r.blockStart += int64(dLen)
  711. dLen = 0
  712. }
  713. r.i, r.j = 0, dLen
  714. continue
  715. case chunkTypeUncompressedData:
  716. r.blockStart += int64(r.j)
  717. // Section 4.3. Uncompressed data (chunk type 0x01).
  718. if chunkLen < checksumSize {
  719. r.err = ErrCorrupt
  720. return r.err
  721. }
  722. if !r.ensureBufferSize(chunkLen) {
  723. if r.err != nil {
  724. r.err = ErrUnsupported
  725. }
  726. return r.err
  727. }
  728. buf := r.buf[:checksumSize]
  729. if !r.readFull(buf, false) {
  730. return r.err
  731. }
  732. checksum := uint32(buf[0]) | uint32(buf[1])<<8 | uint32(buf[2])<<16 | uint32(buf[3])<<24
  733. // Read directly into r.decoded instead of via r.buf.
  734. n2 := chunkLen - checksumSize
  735. if n2 > len(r.decoded) {
  736. if n2 > r.maxBlock {
  737. r.err = ErrCorrupt
  738. return r.err
  739. }
  740. r.decoded = make([]byte, n2)
  741. }
  742. if !r.readFull(r.decoded[:n2], false) {
  743. return r.err
  744. }
  745. if int64(n2) < n {
  746. if crc(r.decoded[:n2]) != checksum {
  747. r.err = ErrCorrupt
  748. return r.err
  749. }
  750. }
  751. r.i, r.j = 0, n2
  752. continue
  753. case chunkTypeStreamIdentifier:
  754. // Section 4.1. Stream identifier (chunk type 0xff).
  755. if chunkLen != len(magicBody) {
  756. r.err = ErrCorrupt
  757. return r.err
  758. }
  759. if !r.readFull(r.buf[:len(magicBody)], false) {
  760. return r.err
  761. }
  762. if string(r.buf[:len(magicBody)]) != magicBody {
  763. if string(r.buf[:len(magicBody)]) != magicBodySnappy {
  764. r.err = ErrCorrupt
  765. return r.err
  766. }
  767. }
  768. continue
  769. }
  770. if chunkType <= 0x7f {
  771. // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
  772. r.err = ErrUnsupported
  773. return r.err
  774. }
  775. if chunkLen > maxChunkSize {
  776. r.err = ErrUnsupported
  777. return r.err
  778. }
  779. // Section 4.4 Padding (chunk type 0xfe).
  780. // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
  781. if !r.skippable(r.buf, chunkLen, false, chunkType) {
  782. return r.err
  783. }
  784. }
  785. return nil
  786. }
  787. // ReadSeeker provides random or forward seeking in compressed content.
  788. // See Reader.ReadSeeker
  789. type ReadSeeker struct {
  790. *Reader
  791. readAtMu sync.Mutex
  792. }
  793. // ReadSeeker will return an io.ReadSeeker and io.ReaderAt
  794. // compatible version of the reader.
  795. // If 'random' is specified the returned io.Seeker can be used for
  796. // random seeking, otherwise only forward seeking is supported.
  797. // Enabling random seeking requires the original input to support
  798. // the io.Seeker interface.
  799. // A custom index can be specified which will be used if supplied.
  800. // When using a custom index, it will not be read from the input stream.
  801. // The ReadAt position will affect regular reads and the current position of Seek.
  802. // So using Read after ReadAt will continue from where the ReadAt stopped.
  803. // No functions should be used concurrently.
  804. // The returned ReadSeeker contains a shallow reference to the existing Reader,
  805. // meaning changes performed to one is reflected in the other.
  806. func (r *Reader) ReadSeeker(random bool, index []byte) (*ReadSeeker, error) {
  807. // Read index if provided.
  808. if len(index) != 0 {
  809. if r.index == nil {
  810. r.index = &Index{}
  811. }
  812. if _, err := r.index.Load(index); err != nil {
  813. return nil, ErrCantSeek{Reason: "loading index returned: " + err.Error()}
  814. }
  815. }
  816. // Check if input is seekable
  817. rs, ok := r.r.(io.ReadSeeker)
  818. if !ok {
  819. if !random {
  820. return &ReadSeeker{Reader: r}, nil
  821. }
  822. return nil, ErrCantSeek{Reason: "input stream isn't seekable"}
  823. }
  824. if r.index != nil {
  825. // Seekable and index, ok...
  826. return &ReadSeeker{Reader: r}, nil
  827. }
  828. // Load from stream.
  829. r.index = &Index{}
  830. // Read current position.
  831. pos, err := rs.Seek(0, io.SeekCurrent)
  832. if err != nil {
  833. return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
  834. }
  835. err = r.index.LoadStream(rs)
  836. if err != nil {
  837. if err == ErrUnsupported {
  838. // If we don't require random seeking, reset input and return.
  839. if !random {
  840. _, err = rs.Seek(pos, io.SeekStart)
  841. if err != nil {
  842. return nil, ErrCantSeek{Reason: "resetting stream returned: " + err.Error()}
  843. }
  844. r.index = nil
  845. return &ReadSeeker{Reader: r}, nil
  846. }
  847. return nil, ErrCantSeek{Reason: "input stream does not contain an index"}
  848. }
  849. return nil, ErrCantSeek{Reason: "reading index returned: " + err.Error()}
  850. }
  851. // reset position.
  852. _, err = rs.Seek(pos, io.SeekStart)
  853. if err != nil {
  854. return nil, ErrCantSeek{Reason: "seeking input returned: " + err.Error()}
  855. }
  856. return &ReadSeeker{Reader: r}, nil
  857. }
  858. // Seek allows seeking in compressed data.
  859. func (r *ReadSeeker) Seek(offset int64, whence int) (int64, error) {
  860. if r.err != nil {
  861. if !errors.Is(r.err, io.EOF) {
  862. return 0, r.err
  863. }
  864. // Reset on EOF
  865. r.err = nil
  866. }
  867. // Calculate absolute offset.
  868. absOffset := offset
  869. switch whence {
  870. case io.SeekStart:
  871. case io.SeekCurrent:
  872. absOffset = r.blockStart + int64(r.i) + offset
  873. case io.SeekEnd:
  874. if r.index == nil {
  875. return 0, ErrUnsupported
  876. }
  877. absOffset = r.index.TotalUncompressed + offset
  878. default:
  879. r.err = ErrUnsupported
  880. return 0, r.err
  881. }
  882. if absOffset < 0 {
  883. return 0, errors.New("seek before start of file")
  884. }
  885. if !r.readHeader {
  886. // Make sure we read the header.
  887. _, r.err = r.Read([]byte{})
  888. if r.err != nil {
  889. return 0, r.err
  890. }
  891. }
  892. // If we are inside current block no need to seek.
  893. // This includes no offset changes.
  894. if absOffset >= r.blockStart && absOffset < r.blockStart+int64(r.j) {
  895. r.i = int(absOffset - r.blockStart)
  896. return r.blockStart + int64(r.i), nil
  897. }
  898. rs, ok := r.r.(io.ReadSeeker)
  899. if r.index == nil || !ok {
  900. currOffset := r.blockStart + int64(r.i)
  901. if absOffset >= currOffset {
  902. err := r.Skip(absOffset - currOffset)
  903. return r.blockStart + int64(r.i), err
  904. }
  905. return 0, ErrUnsupported
  906. }
  907. // We can seek and we have an index.
  908. c, u, err := r.index.Find(absOffset)
  909. if err != nil {
  910. return r.blockStart + int64(r.i), err
  911. }
  912. // Seek to next block
  913. _, err = rs.Seek(c, io.SeekStart)
  914. if err != nil {
  915. return 0, err
  916. }
  917. r.i = r.j // Remove rest of current block.
  918. r.blockStart = u - int64(r.j) // Adjust current block start for accounting.
  919. if u < absOffset {
  920. // Forward inside block
  921. return absOffset, r.Skip(absOffset - u)
  922. }
  923. if u > absOffset {
  924. return 0, fmt.Errorf("s2 seek: (internal error) u (%d) > absOffset (%d)", u, absOffset)
  925. }
  926. return absOffset, nil
  927. }
  928. // ReadAt reads len(p) bytes into p starting at offset off in the
  929. // underlying input source. It returns the number of bytes
  930. // read (0 <= n <= len(p)) and any error encountered.
  931. //
  932. // When ReadAt returns n < len(p), it returns a non-nil error
  933. // explaining why more bytes were not returned. In this respect,
  934. // ReadAt is stricter than Read.
  935. //
  936. // Even if ReadAt returns n < len(p), it may use all of p as scratch
  937. // space during the call. If some data is available but not len(p) bytes,
  938. // ReadAt blocks until either all the data is available or an error occurs.
  939. // In this respect ReadAt is different from Read.
  940. //
  941. // If the n = len(p) bytes returned by ReadAt are at the end of the
  942. // input source, ReadAt may return either err == EOF or err == nil.
  943. //
  944. // If ReadAt is reading from an input source with a seek offset,
  945. // ReadAt should not affect nor be affected by the underlying
  946. // seek offset.
  947. //
  948. // Clients of ReadAt can execute parallel ReadAt calls on the
  949. // same input source. This is however not recommended.
  950. func (r *ReadSeeker) ReadAt(p []byte, offset int64) (int, error) {
  951. r.readAtMu.Lock()
  952. defer r.readAtMu.Unlock()
  953. _, err := r.Seek(offset, io.SeekStart)
  954. if err != nil {
  955. return 0, err
  956. }
  957. n := 0
  958. for n < len(p) {
  959. n2, err := r.Read(p[n:])
  960. if err != nil {
  961. // This will include io.EOF
  962. return n + n2, err
  963. }
  964. n += n2
  965. }
  966. return n, nil
  967. }
  968. // ReadByte satisfies the io.ByteReader interface.
  969. func (r *Reader) ReadByte() (byte, error) {
  970. if r.err != nil {
  971. return 0, r.err
  972. }
  973. if r.i < r.j {
  974. c := r.decoded[r.i]
  975. r.i++
  976. return c, nil
  977. }
  978. var tmp [1]byte
  979. for i := 0; i < 10; i++ {
  980. n, err := r.Read(tmp[:])
  981. if err != nil {
  982. return 0, err
  983. }
  984. if n == 1 {
  985. return tmp[0], nil
  986. }
  987. }
  988. return 0, io.ErrNoProgress
  989. }
  990. // SkippableCB will register a callback for chunks with the specified ID.
  991. // ID must be a Reserved skippable chunks ID, 0x80-0xfd (inclusive).
  992. // For each chunk with the ID, the callback is called with the content.
  993. // Any returned non-nil error will abort decompression.
  994. // Only one callback per ID is supported, latest sent will be used.
  995. // Sending a nil function will disable previous callbacks.
  996. // You can peek the stream, triggering the callback, by doing a Read with a 0
  997. // byte buffer.
  998. func (r *Reader) SkippableCB(id uint8, fn func(r io.Reader) error) error {
  999. if id < 0x80 || id >= chunkTypePadding {
  1000. return fmt.Errorf("ReaderSkippableCB: Invalid id provided, must be 0x80-0xfe (inclusive)")
  1001. }
  1002. r.skippableCB[id-0x80] = fn
  1003. return nil
  1004. }