writer.go 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064
  1. // Copyright 2011 The Snappy-Go Authors. All rights reserved.
  2. // Copyright (c) 2019+ Klaus Post. All rights reserved.
  3. // Use of this source code is governed by a BSD-style
  4. // license that can be found in the LICENSE file.
  5. package s2
  6. import (
  7. "crypto/rand"
  8. "encoding/binary"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "runtime"
  13. "sync"
  14. "github.com/klauspost/compress/internal/race"
  15. )
  16. const (
  17. levelUncompressed = iota + 1
  18. levelFast
  19. levelBetter
  20. levelBest
  21. )
  22. // NewWriter returns a new Writer that compresses to w, using the
  23. // framing format described at
  24. // https://github.com/google/snappy/blob/master/framing_format.txt
  25. //
  26. // Users must call Close to guarantee all data has been forwarded to
  27. // the underlying io.Writer and that resources are released.
  28. // They may also call Flush zero or more times before calling Close.
  29. func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
  30. w2 := Writer{
  31. blockSize: defaultBlockSize,
  32. concurrency: runtime.GOMAXPROCS(0),
  33. randSrc: rand.Reader,
  34. level: levelFast,
  35. }
  36. for _, opt := range opts {
  37. if err := opt(&w2); err != nil {
  38. w2.errState = err
  39. return &w2
  40. }
  41. }
  42. w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
  43. w2.paramsOK = true
  44. w2.ibuf = make([]byte, 0, w2.blockSize)
  45. w2.buffers.New = func() interface{} {
  46. return make([]byte, w2.obufLen)
  47. }
  48. w2.Reset(w)
  49. return &w2
  50. }
  51. // Writer is an io.Writer that can write Snappy-compressed bytes.
  52. type Writer struct {
  53. errMu sync.Mutex
  54. errState error
  55. // ibuf is a buffer for the incoming (uncompressed) bytes.
  56. ibuf []byte
  57. blockSize int
  58. obufLen int
  59. concurrency int
  60. written int64
  61. uncompWritten int64 // Bytes sent to compression
  62. output chan chan result
  63. buffers sync.Pool
  64. pad int
  65. writer io.Writer
  66. randSrc io.Reader
  67. writerWg sync.WaitGroup
  68. index Index
  69. customEnc func(dst, src []byte) int
  70. // wroteStreamHeader is whether we have written the stream header.
  71. wroteStreamHeader bool
  72. paramsOK bool
  73. snappy bool
  74. flushOnWrite bool
  75. appendIndex bool
  76. bufferCB func([]byte)
  77. level uint8
  78. }
  79. type result struct {
  80. b []byte
  81. // return when writing
  82. ret []byte
  83. // Uncompressed start offset
  84. startOffset int64
  85. }
  86. // err returns the previously set error.
  87. // If no error has been set it is set to err if not nil.
  88. func (w *Writer) err(err error) error {
  89. w.errMu.Lock()
  90. errSet := w.errState
  91. if errSet == nil && err != nil {
  92. w.errState = err
  93. errSet = err
  94. }
  95. w.errMu.Unlock()
  96. return errSet
  97. }
  98. // Reset discards the writer's state and switches the Snappy writer to write to w.
  99. // This permits reusing a Writer rather than allocating a new one.
  100. func (w *Writer) Reset(writer io.Writer) {
  101. if !w.paramsOK {
  102. return
  103. }
  104. // Close previous writer, if any.
  105. if w.output != nil {
  106. close(w.output)
  107. w.writerWg.Wait()
  108. w.output = nil
  109. }
  110. w.errState = nil
  111. w.ibuf = w.ibuf[:0]
  112. w.wroteStreamHeader = false
  113. w.written = 0
  114. w.writer = writer
  115. w.uncompWritten = 0
  116. w.index.reset(w.blockSize)
  117. // If we didn't get a writer, stop here.
  118. if writer == nil {
  119. return
  120. }
  121. // If no concurrency requested, don't spin up writer goroutine.
  122. if w.concurrency == 1 {
  123. return
  124. }
  125. toWrite := make(chan chan result, w.concurrency)
  126. w.output = toWrite
  127. w.writerWg.Add(1)
  128. // Start a writer goroutine that will write all output in order.
  129. go func() {
  130. defer w.writerWg.Done()
  131. // Get a queued write.
  132. for write := range toWrite {
  133. // Wait for the data to be available.
  134. input := <-write
  135. if input.ret != nil && w.bufferCB != nil {
  136. w.bufferCB(input.ret)
  137. input.ret = nil
  138. }
  139. in := input.b
  140. if len(in) > 0 {
  141. if w.err(nil) == nil {
  142. // Don't expose data from previous buffers.
  143. toWrite := in[:len(in):len(in)]
  144. // Write to output.
  145. n, err := writer.Write(toWrite)
  146. if err == nil && n != len(toWrite) {
  147. err = io.ErrShortBuffer
  148. }
  149. _ = w.err(err)
  150. w.err(w.index.add(w.written, input.startOffset))
  151. w.written += int64(n)
  152. }
  153. }
  154. if cap(in) >= w.obufLen {
  155. w.buffers.Put(in)
  156. }
  157. // close the incoming write request.
  158. // This can be used for synchronizing flushes.
  159. close(write)
  160. }
  161. }()
  162. }
  163. // Write satisfies the io.Writer interface.
  164. func (w *Writer) Write(p []byte) (nRet int, errRet error) {
  165. if err := w.err(nil); err != nil {
  166. return 0, err
  167. }
  168. if w.flushOnWrite {
  169. return w.write(p)
  170. }
  171. // If we exceed the input buffer size, start writing
  172. for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
  173. var n int
  174. if len(w.ibuf) == 0 {
  175. // Large write, empty buffer.
  176. // Write directly from p to avoid copy.
  177. n, _ = w.write(p)
  178. } else {
  179. n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
  180. w.ibuf = w.ibuf[:len(w.ibuf)+n]
  181. w.write(w.ibuf)
  182. w.ibuf = w.ibuf[:0]
  183. }
  184. nRet += n
  185. p = p[n:]
  186. }
  187. if err := w.err(nil); err != nil {
  188. return nRet, err
  189. }
  190. // p should always be able to fit into w.ibuf now.
  191. n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
  192. w.ibuf = w.ibuf[:len(w.ibuf)+n]
  193. nRet += n
  194. return nRet, nil
  195. }
  196. // ReadFrom implements the io.ReaderFrom interface.
  197. // Using this is typically more efficient since it avoids a memory copy.
  198. // ReadFrom reads data from r until EOF or error.
  199. // The return value n is the number of bytes read.
  200. // Any error except io.EOF encountered during the read is also returned.
  201. func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
  202. if err := w.err(nil); err != nil {
  203. return 0, err
  204. }
  205. if len(w.ibuf) > 0 {
  206. err := w.AsyncFlush()
  207. if err != nil {
  208. return 0, err
  209. }
  210. }
  211. if br, ok := r.(byter); ok {
  212. buf := br.Bytes()
  213. if err := w.EncodeBuffer(buf); err != nil {
  214. return 0, err
  215. }
  216. return int64(len(buf)), w.AsyncFlush()
  217. }
  218. for {
  219. inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
  220. n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
  221. if err != nil {
  222. if err == io.ErrUnexpectedEOF {
  223. err = io.EOF
  224. }
  225. if err != io.EOF {
  226. return n, w.err(err)
  227. }
  228. }
  229. if n2 == 0 {
  230. if cap(inbuf) >= w.obufLen {
  231. w.buffers.Put(inbuf)
  232. }
  233. break
  234. }
  235. n += int64(n2)
  236. err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
  237. if w.err(err2) != nil {
  238. break
  239. }
  240. if err != nil {
  241. // We got EOF and wrote everything
  242. break
  243. }
  244. }
  245. return n, w.err(nil)
  246. }
  247. // AddSkippableBlock will add a skippable block to the stream.
  248. // The ID must be 0x80-0xfe (inclusive).
  249. // Length of the skippable block must be <= 16777215 bytes.
  250. func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
  251. if err := w.err(nil); err != nil {
  252. return err
  253. }
  254. if len(data) == 0 {
  255. return nil
  256. }
  257. if id < 0x80 || id > chunkTypePadding {
  258. return fmt.Errorf("invalid skippable block id %x", id)
  259. }
  260. if len(data) > maxChunkSize {
  261. return fmt.Errorf("skippable block excessed maximum size")
  262. }
  263. var header [4]byte
  264. chunkLen := len(data)
  265. header[0] = id
  266. header[1] = uint8(chunkLen >> 0)
  267. header[2] = uint8(chunkLen >> 8)
  268. header[3] = uint8(chunkLen >> 16)
  269. if w.concurrency == 1 {
  270. write := func(b []byte) error {
  271. n, err := w.writer.Write(b)
  272. if err = w.err(err); err != nil {
  273. return err
  274. }
  275. if n != len(b) {
  276. return w.err(io.ErrShortWrite)
  277. }
  278. w.written += int64(n)
  279. return w.err(nil)
  280. }
  281. if !w.wroteStreamHeader {
  282. w.wroteStreamHeader = true
  283. if w.snappy {
  284. if err := write([]byte(magicChunkSnappy)); err != nil {
  285. return err
  286. }
  287. } else {
  288. if err := write([]byte(magicChunk)); err != nil {
  289. return err
  290. }
  291. }
  292. }
  293. if err := write(header[:]); err != nil {
  294. return err
  295. }
  296. return write(data)
  297. }
  298. // Create output...
  299. if !w.wroteStreamHeader {
  300. w.wroteStreamHeader = true
  301. hWriter := make(chan result)
  302. w.output <- hWriter
  303. if w.snappy {
  304. hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
  305. } else {
  306. hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
  307. }
  308. }
  309. // Copy input.
  310. inbuf := w.buffers.Get().([]byte)[:4]
  311. copy(inbuf, header[:])
  312. inbuf = append(inbuf, data...)
  313. output := make(chan result, 1)
  314. // Queue output.
  315. w.output <- output
  316. output <- result{startOffset: w.uncompWritten, b: inbuf}
  317. return nil
  318. }
  319. // EncodeBuffer will add a buffer to the stream.
  320. // This is the fastest way to encode a stream,
  321. // but the input buffer cannot be written to by the caller
  322. // until Flush or Close has been called when concurrency != 1.
  323. //
  324. // Use the WriterBufferDone to receive a callback when the buffer is done
  325. // Processing.
  326. //
  327. // Note that input is not buffered.
  328. // This means that each write will result in discrete blocks being created.
  329. // For buffered writes, use the regular Write function.
  330. func (w *Writer) EncodeBuffer(buf []byte) (err error) {
  331. if err := w.err(nil); err != nil {
  332. return err
  333. }
  334. if w.flushOnWrite {
  335. _, err := w.write(buf)
  336. return err
  337. }
  338. // Flush queued data first.
  339. if len(w.ibuf) > 0 {
  340. err := w.AsyncFlush()
  341. if err != nil {
  342. return err
  343. }
  344. }
  345. if w.concurrency == 1 {
  346. _, err := w.writeSync(buf)
  347. if w.bufferCB != nil {
  348. w.bufferCB(buf)
  349. }
  350. return err
  351. }
  352. // Spawn goroutine and write block to output channel.
  353. if !w.wroteStreamHeader {
  354. w.wroteStreamHeader = true
  355. hWriter := make(chan result)
  356. w.output <- hWriter
  357. if w.snappy {
  358. hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
  359. } else {
  360. hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
  361. }
  362. }
  363. orgBuf := buf
  364. for len(buf) > 0 {
  365. // Cut input.
  366. uncompressed := buf
  367. if len(uncompressed) > w.blockSize {
  368. uncompressed = uncompressed[:w.blockSize]
  369. }
  370. buf = buf[len(uncompressed):]
  371. // Get an output buffer.
  372. obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
  373. race.WriteSlice(obuf)
  374. output := make(chan result)
  375. // Queue output now, so we keep order.
  376. w.output <- output
  377. res := result{
  378. startOffset: w.uncompWritten,
  379. }
  380. w.uncompWritten += int64(len(uncompressed))
  381. if len(buf) == 0 && w.bufferCB != nil {
  382. res.ret = orgBuf
  383. }
  384. go func() {
  385. race.ReadSlice(uncompressed)
  386. checksum := crc(uncompressed)
  387. // Set to uncompressed.
  388. chunkType := uint8(chunkTypeUncompressedData)
  389. chunkLen := 4 + len(uncompressed)
  390. // Attempt compressing.
  391. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  392. n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  393. // Check if we should use this, or store as uncompressed instead.
  394. if n2 > 0 {
  395. chunkType = uint8(chunkTypeCompressedData)
  396. chunkLen = 4 + n + n2
  397. obuf = obuf[:obufHeaderLen+n+n2]
  398. } else {
  399. // copy uncompressed
  400. copy(obuf[obufHeaderLen:], uncompressed)
  401. }
  402. // Fill in the per-chunk header that comes before the body.
  403. obuf[0] = chunkType
  404. obuf[1] = uint8(chunkLen >> 0)
  405. obuf[2] = uint8(chunkLen >> 8)
  406. obuf[3] = uint8(chunkLen >> 16)
  407. obuf[4] = uint8(checksum >> 0)
  408. obuf[5] = uint8(checksum >> 8)
  409. obuf[6] = uint8(checksum >> 16)
  410. obuf[7] = uint8(checksum >> 24)
  411. // Queue final output.
  412. res.b = obuf
  413. output <- res
  414. }()
  415. }
  416. return nil
  417. }
  418. func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
  419. if w.customEnc != nil {
  420. if ret := w.customEnc(obuf, uncompressed); ret >= 0 {
  421. return ret
  422. }
  423. }
  424. if w.snappy {
  425. switch w.level {
  426. case levelFast:
  427. return encodeBlockSnappy(obuf, uncompressed)
  428. case levelBetter:
  429. return encodeBlockBetterSnappy(obuf, uncompressed)
  430. case levelBest:
  431. return encodeBlockBestSnappy(obuf, uncompressed)
  432. }
  433. return 0
  434. }
  435. switch w.level {
  436. case levelFast:
  437. return encodeBlock(obuf, uncompressed)
  438. case levelBetter:
  439. return encodeBlockBetter(obuf, uncompressed)
  440. case levelBest:
  441. return encodeBlockBest(obuf, uncompressed, nil)
  442. }
  443. return 0
  444. }
  445. func (w *Writer) write(p []byte) (nRet int, errRet error) {
  446. if err := w.err(nil); err != nil {
  447. return 0, err
  448. }
  449. if w.concurrency == 1 {
  450. return w.writeSync(p)
  451. }
  452. // Spawn goroutine and write block to output channel.
  453. for len(p) > 0 {
  454. if !w.wroteStreamHeader {
  455. w.wroteStreamHeader = true
  456. hWriter := make(chan result)
  457. w.output <- hWriter
  458. if w.snappy {
  459. hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
  460. } else {
  461. hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
  462. }
  463. }
  464. var uncompressed []byte
  465. if len(p) > w.blockSize {
  466. uncompressed, p = p[:w.blockSize], p[w.blockSize:]
  467. } else {
  468. uncompressed, p = p, nil
  469. }
  470. // Copy input.
  471. // If the block is incompressible, this is used for the result.
  472. inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
  473. obuf := w.buffers.Get().([]byte)[:w.obufLen]
  474. copy(inbuf[obufHeaderLen:], uncompressed)
  475. uncompressed = inbuf[obufHeaderLen:]
  476. output := make(chan result)
  477. // Queue output now, so we keep order.
  478. w.output <- output
  479. res := result{
  480. startOffset: w.uncompWritten,
  481. }
  482. w.uncompWritten += int64(len(uncompressed))
  483. go func() {
  484. checksum := crc(uncompressed)
  485. // Set to uncompressed.
  486. chunkType := uint8(chunkTypeUncompressedData)
  487. chunkLen := 4 + len(uncompressed)
  488. // Attempt compressing.
  489. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  490. n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  491. // Check if we should use this, or store as uncompressed instead.
  492. if n2 > 0 {
  493. chunkType = uint8(chunkTypeCompressedData)
  494. chunkLen = 4 + n + n2
  495. obuf = obuf[:obufHeaderLen+n+n2]
  496. } else {
  497. // Use input as output.
  498. obuf, inbuf = inbuf, obuf
  499. }
  500. // Fill in the per-chunk header that comes before the body.
  501. obuf[0] = chunkType
  502. obuf[1] = uint8(chunkLen >> 0)
  503. obuf[2] = uint8(chunkLen >> 8)
  504. obuf[3] = uint8(chunkLen >> 16)
  505. obuf[4] = uint8(checksum >> 0)
  506. obuf[5] = uint8(checksum >> 8)
  507. obuf[6] = uint8(checksum >> 16)
  508. obuf[7] = uint8(checksum >> 24)
  509. // Queue final output.
  510. res.b = obuf
  511. output <- res
  512. // Put unused buffer back in pool.
  513. w.buffers.Put(inbuf)
  514. }()
  515. nRet += len(uncompressed)
  516. }
  517. return nRet, nil
  518. }
  519. // writeFull is a special version of write that will always write the full buffer.
  520. // Data to be compressed should start at offset obufHeaderLen and fill the remainder of the buffer.
  521. // The data will be written as a single block.
  522. // The caller is not allowed to use inbuf after this function has been called.
  523. func (w *Writer) writeFull(inbuf []byte) (errRet error) {
  524. if err := w.err(nil); err != nil {
  525. return err
  526. }
  527. if w.concurrency == 1 {
  528. _, err := w.writeSync(inbuf[obufHeaderLen:])
  529. if cap(inbuf) >= w.obufLen {
  530. w.buffers.Put(inbuf)
  531. }
  532. return err
  533. }
  534. // Spawn goroutine and write block to output channel.
  535. if !w.wroteStreamHeader {
  536. w.wroteStreamHeader = true
  537. hWriter := make(chan result)
  538. w.output <- hWriter
  539. if w.snappy {
  540. hWriter <- result{startOffset: w.uncompWritten, b: magicChunkSnappyBytes}
  541. } else {
  542. hWriter <- result{startOffset: w.uncompWritten, b: magicChunkBytes}
  543. }
  544. }
  545. // Get an output buffer.
  546. obuf := w.buffers.Get().([]byte)[:w.obufLen]
  547. uncompressed := inbuf[obufHeaderLen:]
  548. output := make(chan result)
  549. // Queue output now, so we keep order.
  550. w.output <- output
  551. res := result{
  552. startOffset: w.uncompWritten,
  553. }
  554. w.uncompWritten += int64(len(uncompressed))
  555. go func() {
  556. checksum := crc(uncompressed)
  557. // Set to uncompressed.
  558. chunkType := uint8(chunkTypeUncompressedData)
  559. chunkLen := 4 + len(uncompressed)
  560. // Attempt compressing.
  561. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  562. n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  563. // Check if we should use this, or store as uncompressed instead.
  564. if n2 > 0 {
  565. chunkType = uint8(chunkTypeCompressedData)
  566. chunkLen = 4 + n + n2
  567. obuf = obuf[:obufHeaderLen+n+n2]
  568. } else {
  569. // Use input as output.
  570. obuf, inbuf = inbuf, obuf
  571. }
  572. // Fill in the per-chunk header that comes before the body.
  573. obuf[0] = chunkType
  574. obuf[1] = uint8(chunkLen >> 0)
  575. obuf[2] = uint8(chunkLen >> 8)
  576. obuf[3] = uint8(chunkLen >> 16)
  577. obuf[4] = uint8(checksum >> 0)
  578. obuf[5] = uint8(checksum >> 8)
  579. obuf[6] = uint8(checksum >> 16)
  580. obuf[7] = uint8(checksum >> 24)
  581. // Queue final output.
  582. res.b = obuf
  583. output <- res
  584. // Put unused buffer back in pool.
  585. w.buffers.Put(inbuf)
  586. }()
  587. return nil
  588. }
  589. func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
  590. if err := w.err(nil); err != nil {
  591. return 0, err
  592. }
  593. if !w.wroteStreamHeader {
  594. w.wroteStreamHeader = true
  595. var n int
  596. var err error
  597. if w.snappy {
  598. n, err = w.writer.Write(magicChunkSnappyBytes)
  599. } else {
  600. n, err = w.writer.Write(magicChunkBytes)
  601. }
  602. if err != nil {
  603. return 0, w.err(err)
  604. }
  605. if n != len(magicChunk) {
  606. return 0, w.err(io.ErrShortWrite)
  607. }
  608. w.written += int64(n)
  609. }
  610. for len(p) > 0 {
  611. var uncompressed []byte
  612. if len(p) > w.blockSize {
  613. uncompressed, p = p[:w.blockSize], p[w.blockSize:]
  614. } else {
  615. uncompressed, p = p, nil
  616. }
  617. obuf := w.buffers.Get().([]byte)[:w.obufLen]
  618. checksum := crc(uncompressed)
  619. // Set to uncompressed.
  620. chunkType := uint8(chunkTypeUncompressedData)
  621. chunkLen := 4 + len(uncompressed)
  622. // Attempt compressing.
  623. n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
  624. n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
  625. if n2 > 0 {
  626. chunkType = uint8(chunkTypeCompressedData)
  627. chunkLen = 4 + n + n2
  628. obuf = obuf[:obufHeaderLen+n+n2]
  629. } else {
  630. obuf = obuf[:8]
  631. }
  632. // Fill in the per-chunk header that comes before the body.
  633. obuf[0] = chunkType
  634. obuf[1] = uint8(chunkLen >> 0)
  635. obuf[2] = uint8(chunkLen >> 8)
  636. obuf[3] = uint8(chunkLen >> 16)
  637. obuf[4] = uint8(checksum >> 0)
  638. obuf[5] = uint8(checksum >> 8)
  639. obuf[6] = uint8(checksum >> 16)
  640. obuf[7] = uint8(checksum >> 24)
  641. n, err := w.writer.Write(obuf)
  642. if err != nil {
  643. return 0, w.err(err)
  644. }
  645. if n != len(obuf) {
  646. return 0, w.err(io.ErrShortWrite)
  647. }
  648. w.err(w.index.add(w.written, w.uncompWritten))
  649. w.written += int64(n)
  650. w.uncompWritten += int64(len(uncompressed))
  651. if chunkType == chunkTypeUncompressedData {
  652. // Write uncompressed data.
  653. n, err := w.writer.Write(uncompressed)
  654. if err != nil {
  655. return 0, w.err(err)
  656. }
  657. if n != len(uncompressed) {
  658. return 0, w.err(io.ErrShortWrite)
  659. }
  660. w.written += int64(n)
  661. }
  662. w.buffers.Put(obuf)
  663. // Queue final output.
  664. nRet += len(uncompressed)
  665. }
  666. return nRet, nil
  667. }
  668. // AsyncFlush writes any buffered bytes to a block and starts compressing it.
  669. // It does not wait for the output has been written as Flush() does.
  670. func (w *Writer) AsyncFlush() error {
  671. if err := w.err(nil); err != nil {
  672. return err
  673. }
  674. // Queue any data still in input buffer.
  675. if len(w.ibuf) != 0 {
  676. if !w.wroteStreamHeader {
  677. _, err := w.writeSync(w.ibuf)
  678. w.ibuf = w.ibuf[:0]
  679. return w.err(err)
  680. } else {
  681. _, err := w.write(w.ibuf)
  682. w.ibuf = w.ibuf[:0]
  683. err = w.err(err)
  684. if err != nil {
  685. return err
  686. }
  687. }
  688. }
  689. return w.err(nil)
  690. }
  691. // Flush flushes the Writer to its underlying io.Writer.
  692. // This does not apply padding.
  693. func (w *Writer) Flush() error {
  694. if err := w.AsyncFlush(); err != nil {
  695. return err
  696. }
  697. if w.output == nil {
  698. return w.err(nil)
  699. }
  700. // Send empty buffer
  701. res := make(chan result)
  702. w.output <- res
  703. // Block until this has been picked up.
  704. res <- result{b: nil, startOffset: w.uncompWritten}
  705. // When it is closed, we have flushed.
  706. <-res
  707. return w.err(nil)
  708. }
  709. // Close calls Flush and then closes the Writer.
  710. // Calling Close multiple times is ok,
  711. // but calling CloseIndex after this will make it not return the index.
  712. func (w *Writer) Close() error {
  713. _, err := w.closeIndex(w.appendIndex)
  714. return err
  715. }
  716. // CloseIndex calls Close and returns an index on first call.
  717. // This is not required if you are only adding index to a stream.
  718. func (w *Writer) CloseIndex() ([]byte, error) {
  719. return w.closeIndex(true)
  720. }
  721. func (w *Writer) closeIndex(idx bool) ([]byte, error) {
  722. err := w.Flush()
  723. if w.output != nil {
  724. close(w.output)
  725. w.writerWg.Wait()
  726. w.output = nil
  727. }
  728. var index []byte
  729. if w.err(err) == nil && w.writer != nil {
  730. // Create index.
  731. if idx {
  732. compSize := int64(-1)
  733. if w.pad <= 1 {
  734. compSize = w.written
  735. }
  736. index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
  737. // Count as written for padding.
  738. if w.appendIndex {
  739. w.written += int64(len(index))
  740. }
  741. }
  742. if w.pad > 1 {
  743. tmp := w.ibuf[:0]
  744. if len(index) > 0 {
  745. // Allocate another buffer.
  746. tmp = w.buffers.Get().([]byte)[:0]
  747. defer w.buffers.Put(tmp)
  748. }
  749. add := calcSkippableFrame(w.written, int64(w.pad))
  750. frame, err := skippableFrame(tmp, add, w.randSrc)
  751. if err = w.err(err); err != nil {
  752. return nil, err
  753. }
  754. n, err2 := w.writer.Write(frame)
  755. if err2 == nil && n != len(frame) {
  756. err2 = io.ErrShortWrite
  757. }
  758. _ = w.err(err2)
  759. }
  760. if len(index) > 0 && w.appendIndex {
  761. n, err2 := w.writer.Write(index)
  762. if err2 == nil && n != len(index) {
  763. err2 = io.ErrShortWrite
  764. }
  765. _ = w.err(err2)
  766. }
  767. }
  768. err = w.err(errClosed)
  769. if err == errClosed {
  770. return index, nil
  771. }
  772. return nil, err
  773. }
  774. // calcSkippableFrame will return a total size to be added for written
  775. // to be divisible by multiple.
  776. // The value will always be > skippableFrameHeader.
  777. // The function will panic if written < 0 or wantMultiple <= 0.
  778. func calcSkippableFrame(written, wantMultiple int64) int {
  779. if wantMultiple <= 0 {
  780. panic("wantMultiple <= 0")
  781. }
  782. if written < 0 {
  783. panic("written < 0")
  784. }
  785. leftOver := written % wantMultiple
  786. if leftOver == 0 {
  787. return 0
  788. }
  789. toAdd := wantMultiple - leftOver
  790. for toAdd < skippableFrameHeader {
  791. toAdd += wantMultiple
  792. }
  793. return int(toAdd)
  794. }
  795. // skippableFrame will add a skippable frame with a total size of bytes.
  796. // total should be >= skippableFrameHeader and < maxBlockSize + skippableFrameHeader
  797. func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
  798. if total == 0 {
  799. return dst, nil
  800. }
  801. if total < skippableFrameHeader {
  802. return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
  803. }
  804. if int64(total) >= maxBlockSize+skippableFrameHeader {
  805. return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
  806. }
  807. // Chunk type 0xfe "Section 4.4 Padding (chunk type 0xfe)"
  808. dst = append(dst, chunkTypePadding)
  809. f := uint32(total - skippableFrameHeader)
  810. // Add chunk length.
  811. dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
  812. // Add data
  813. start := len(dst)
  814. dst = append(dst, make([]byte, f)...)
  815. _, err := io.ReadFull(r, dst[start:])
  816. return dst, err
  817. }
  818. var errClosed = errors.New("s2: Writer is closed")
  819. // WriterOption is an option for creating a encoder.
  820. type WriterOption func(*Writer) error
  821. // WriterConcurrency will set the concurrency,
  822. // meaning the maximum number of decoders to run concurrently.
  823. // The value supplied must be at least 1.
  824. // By default this will be set to GOMAXPROCS.
  825. func WriterConcurrency(n int) WriterOption {
  826. return func(w *Writer) error {
  827. if n <= 0 {
  828. return errors.New("concurrency must be at least 1")
  829. }
  830. w.concurrency = n
  831. return nil
  832. }
  833. }
  834. // WriterAddIndex will append an index to the end of a stream
  835. // when it is closed.
  836. func WriterAddIndex() WriterOption {
  837. return func(w *Writer) error {
  838. w.appendIndex = true
  839. return nil
  840. }
  841. }
  842. // WriterBetterCompression will enable better compression.
  843. // EncodeBetter compresses better than Encode but typically with a
  844. // 10-40% speed decrease on both compression and decompression.
  845. func WriterBetterCompression() WriterOption {
  846. return func(w *Writer) error {
  847. w.level = levelBetter
  848. return nil
  849. }
  850. }
  851. // WriterBestCompression will enable better compression.
  852. // EncodeBest compresses better than Encode but typically with a
  853. // big speed decrease on compression.
  854. func WriterBestCompression() WriterOption {
  855. return func(w *Writer) error {
  856. w.level = levelBest
  857. return nil
  858. }
  859. }
  860. // WriterUncompressed will bypass compression.
  861. // The stream will be written as uncompressed blocks only.
  862. // If concurrency is > 1 CRC and output will still be done async.
  863. func WriterUncompressed() WriterOption {
  864. return func(w *Writer) error {
  865. w.level = levelUncompressed
  866. return nil
  867. }
  868. }
  869. // WriterBufferDone will perform a callback when EncodeBuffer has finished
  870. // writing a buffer to the output and the buffer can safely be reused.
  871. // If the buffer was split into several blocks, it will be sent after the last block.
  872. // Callbacks will not be done concurrently.
  873. func WriterBufferDone(fn func(b []byte)) WriterOption {
  874. return func(w *Writer) error {
  875. w.bufferCB = fn
  876. return nil
  877. }
  878. }
  879. // WriterBlockSize allows to override the default block size.
  880. // Blocks will be this size or smaller.
  881. // Minimum size is 4KB and maximum size is 4MB.
  882. //
  883. // Bigger blocks may give bigger throughput on systems with many cores,
  884. // and will increase compression slightly, but it will limit the possible
  885. // concurrency for smaller payloads for both encoding and decoding.
  886. // Default block size is 1MB.
  887. //
  888. // When writing Snappy compatible output using WriterSnappyCompat,
  889. // the maximum block size is 64KB.
  890. func WriterBlockSize(n int) WriterOption {
  891. return func(w *Writer) error {
  892. if w.snappy && n > maxSnappyBlockSize || n < minBlockSize {
  893. return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output")
  894. }
  895. if n > maxBlockSize || n < minBlockSize {
  896. return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
  897. }
  898. w.blockSize = n
  899. return nil
  900. }
  901. }
  902. // WriterPadding will add padding to all output so the size will be a multiple of n.
  903. // This can be used to obfuscate the exact output size or make blocks of a certain size.
  904. // The contents will be a skippable frame, so it will be invisible by the decoder.
  905. // n must be > 0 and <= 4MB.
  906. // The padded area will be filled with data from crypto/rand.Reader.
  907. // The padding will be applied whenever Close is called on the writer.
  908. func WriterPadding(n int) WriterOption {
  909. return func(w *Writer) error {
  910. if n <= 0 {
  911. return fmt.Errorf("s2: padding must be at least 1")
  912. }
  913. // No need to waste our time.
  914. if n == 1 {
  915. w.pad = 0
  916. }
  917. if n > maxBlockSize {
  918. return fmt.Errorf("s2: padding must less than 4MB")
  919. }
  920. w.pad = n
  921. return nil
  922. }
  923. }
  924. // WriterPaddingSrc will get random data for padding from the supplied source.
  925. // By default crypto/rand is used.
  926. func WriterPaddingSrc(reader io.Reader) WriterOption {
  927. return func(w *Writer) error {
  928. w.randSrc = reader
  929. return nil
  930. }
  931. }
  932. // WriterSnappyCompat will write snappy compatible output.
  933. // The output can be decompressed using either snappy or s2.
  934. // If block size is more than 64KB it is set to that.
  935. func WriterSnappyCompat() WriterOption {
  936. return func(w *Writer) error {
  937. w.snappy = true
  938. if w.blockSize > 64<<10 {
  939. // We choose 8 bytes less than 64K, since that will make literal emits slightly more effective.
  940. // And allows us to skip some size checks.
  941. w.blockSize = (64 << 10) - 8
  942. }
  943. return nil
  944. }
  945. }
  946. // WriterFlushOnWrite will compress blocks on each call to the Write function.
  947. //
  948. // This is quite inefficient as blocks size will depend on the write size.
  949. //
  950. // Use WriterConcurrency(1) to also make sure that output is flushed.
  951. // When Write calls return, otherwise they will be written when compression is done.
  952. func WriterFlushOnWrite() WriterOption {
  953. return func(w *Writer) error {
  954. w.flushOnWrite = true
  955. return nil
  956. }
  957. }
  958. // WriterCustomEncoder allows to override the encoder for blocks on the stream.
  959. // The function must compress 'src' into 'dst' and return the bytes used in dst as an integer.
  960. // Block size (initial varint) should not be added by the encoder.
  961. // Returning value 0 indicates the block could not be compressed.
  962. // Returning a negative value indicates that compression should be attempted.
  963. // The function should expect to be called concurrently.
  964. func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption {
  965. return func(w *Writer) error {
  966. w.customEnc = fn
  967. return nil
  968. }
  969. }