index.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. // Copyright (c) 2022+ Klaus Post. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package s2
  5. import (
  6. "bytes"
  7. "encoding/binary"
  8. "encoding/json"
  9. "fmt"
  10. "io"
  11. "sort"
  12. )
  13. const (
  14. S2IndexHeader = "s2idx\x00"
  15. S2IndexTrailer = "\x00xdi2s"
  16. maxIndexEntries = 1 << 16
  17. // If distance is less than this, we do not add the entry.
  18. minIndexDist = 1 << 20
  19. )
  20. // Index represents an S2/Snappy index.
  21. type Index struct {
  22. TotalUncompressed int64 // Total Uncompressed size if known. Will be -1 if unknown.
  23. TotalCompressed int64 // Total Compressed size if known. Will be -1 if unknown.
  24. info []struct {
  25. compressedOffset int64
  26. uncompressedOffset int64
  27. }
  28. estBlockUncomp int64
  29. }
  30. func (i *Index) reset(maxBlock int) {
  31. i.estBlockUncomp = int64(maxBlock)
  32. i.TotalCompressed = -1
  33. i.TotalUncompressed = -1
  34. if len(i.info) > 0 {
  35. i.info = i.info[:0]
  36. }
  37. }
  38. // allocInfos will allocate an empty slice of infos.
  39. func (i *Index) allocInfos(n int) {
  40. if n > maxIndexEntries {
  41. panic("n > maxIndexEntries")
  42. }
  43. i.info = make([]struct {
  44. compressedOffset int64
  45. uncompressedOffset int64
  46. }, 0, n)
  47. }
  48. // add an uncompressed and compressed pair.
  49. // Entries must be sent in order.
  50. func (i *Index) add(compressedOffset, uncompressedOffset int64) error {
  51. if i == nil {
  52. return nil
  53. }
  54. lastIdx := len(i.info) - 1
  55. if lastIdx >= 0 {
  56. latest := i.info[lastIdx]
  57. if latest.uncompressedOffset == uncompressedOffset {
  58. // Uncompressed didn't change, don't add entry,
  59. // but update start index.
  60. latest.compressedOffset = compressedOffset
  61. i.info[lastIdx] = latest
  62. return nil
  63. }
  64. if latest.uncompressedOffset > uncompressedOffset {
  65. return fmt.Errorf("internal error: Earlier uncompressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
  66. }
  67. if latest.compressedOffset > compressedOffset {
  68. return fmt.Errorf("internal error: Earlier compressed received (%d > %d)", latest.uncompressedOffset, uncompressedOffset)
  69. }
  70. if latest.uncompressedOffset+minIndexDist > uncompressedOffset {
  71. // Only add entry if distance is large enough.
  72. return nil
  73. }
  74. }
  75. i.info = append(i.info, struct {
  76. compressedOffset int64
  77. uncompressedOffset int64
  78. }{compressedOffset: compressedOffset, uncompressedOffset: uncompressedOffset})
  79. return nil
  80. }
  81. // Find the offset at or before the wanted (uncompressed) offset.
  82. // If offset is 0 or positive it is the offset from the beginning of the file.
  83. // If the uncompressed size is known, the offset must be within the file.
  84. // If an offset outside the file is requested io.ErrUnexpectedEOF is returned.
  85. // If the offset is negative, it is interpreted as the distance from the end of the file,
  86. // where -1 represents the last byte.
  87. // If offset from the end of the file is requested, but size is unknown,
  88. // ErrUnsupported will be returned.
  89. func (i *Index) Find(offset int64) (compressedOff, uncompressedOff int64, err error) {
  90. if i.TotalUncompressed < 0 {
  91. return 0, 0, ErrCorrupt
  92. }
  93. if offset < 0 {
  94. offset = i.TotalUncompressed + offset
  95. if offset < 0 {
  96. return 0, 0, io.ErrUnexpectedEOF
  97. }
  98. }
  99. if offset > i.TotalUncompressed {
  100. return 0, 0, io.ErrUnexpectedEOF
  101. }
  102. if len(i.info) > 200 {
  103. n := sort.Search(len(i.info), func(n int) bool {
  104. return i.info[n].uncompressedOffset > offset
  105. })
  106. if n == 0 {
  107. n = 1
  108. }
  109. return i.info[n-1].compressedOffset, i.info[n-1].uncompressedOffset, nil
  110. }
  111. for _, info := range i.info {
  112. if info.uncompressedOffset > offset {
  113. break
  114. }
  115. compressedOff = info.compressedOffset
  116. uncompressedOff = info.uncompressedOffset
  117. }
  118. return compressedOff, uncompressedOff, nil
  119. }
  120. // reduce to stay below maxIndexEntries
  121. func (i *Index) reduce() {
  122. if len(i.info) < maxIndexEntries && i.estBlockUncomp >= minIndexDist {
  123. return
  124. }
  125. // Algorithm, keep 1, remove removeN entries...
  126. removeN := (len(i.info) + 1) / maxIndexEntries
  127. src := i.info
  128. j := 0
  129. // Each block should be at least 1MB, but don't reduce below 1000 entries.
  130. for i.estBlockUncomp*(int64(removeN)+1) < minIndexDist && len(i.info)/(removeN+1) > 1000 {
  131. removeN++
  132. }
  133. for idx := 0; idx < len(src); idx++ {
  134. i.info[j] = src[idx]
  135. j++
  136. idx += removeN
  137. }
  138. i.info = i.info[:j]
  139. // Update maxblock estimate.
  140. i.estBlockUncomp += i.estBlockUncomp * int64(removeN)
  141. }
  142. func (i *Index) appendTo(b []byte, uncompTotal, compTotal int64) []byte {
  143. i.reduce()
  144. var tmp [binary.MaxVarintLen64]byte
  145. initSize := len(b)
  146. // We make the start a skippable header+size.
  147. b = append(b, ChunkTypeIndex, 0, 0, 0)
  148. b = append(b, []byte(S2IndexHeader)...)
  149. // Total Uncompressed size
  150. n := binary.PutVarint(tmp[:], uncompTotal)
  151. b = append(b, tmp[:n]...)
  152. // Total Compressed size
  153. n = binary.PutVarint(tmp[:], compTotal)
  154. b = append(b, tmp[:n]...)
  155. // Put EstBlockUncomp size
  156. n = binary.PutVarint(tmp[:], i.estBlockUncomp)
  157. b = append(b, tmp[:n]...)
  158. // Put length
  159. n = binary.PutVarint(tmp[:], int64(len(i.info)))
  160. b = append(b, tmp[:n]...)
  161. // Check if we should add uncompressed offsets
  162. var hasUncompressed byte
  163. for idx, info := range i.info {
  164. if idx == 0 {
  165. if info.uncompressedOffset != 0 {
  166. hasUncompressed = 1
  167. break
  168. }
  169. continue
  170. }
  171. if info.uncompressedOffset != i.info[idx-1].uncompressedOffset+i.estBlockUncomp {
  172. hasUncompressed = 1
  173. break
  174. }
  175. }
  176. b = append(b, hasUncompressed)
  177. // Add each entry
  178. if hasUncompressed == 1 {
  179. for idx, info := range i.info {
  180. uOff := info.uncompressedOffset
  181. if idx > 0 {
  182. prev := i.info[idx-1]
  183. uOff -= prev.uncompressedOffset + (i.estBlockUncomp)
  184. }
  185. n = binary.PutVarint(tmp[:], uOff)
  186. b = append(b, tmp[:n]...)
  187. }
  188. }
  189. // Initial compressed size estimate.
  190. cPredict := i.estBlockUncomp / 2
  191. for idx, info := range i.info {
  192. cOff := info.compressedOffset
  193. if idx > 0 {
  194. prev := i.info[idx-1]
  195. cOff -= prev.compressedOffset + cPredict
  196. // Update compressed size prediction, with half the error.
  197. cPredict += cOff / 2
  198. }
  199. n = binary.PutVarint(tmp[:], cOff)
  200. b = append(b, tmp[:n]...)
  201. }
  202. // Add Total Size.
  203. // Stored as fixed size for easier reading.
  204. binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)-initSize+4+len(S2IndexTrailer)))
  205. b = append(b, tmp[:4]...)
  206. // Trailer
  207. b = append(b, []byte(S2IndexTrailer)...)
  208. // Update size
  209. chunkLen := len(b) - initSize - skippableFrameHeader
  210. b[initSize+1] = uint8(chunkLen >> 0)
  211. b[initSize+2] = uint8(chunkLen >> 8)
  212. b[initSize+3] = uint8(chunkLen >> 16)
  213. //fmt.Printf("chunklen: 0x%x Uncomp:%d, Comp:%d\n", chunkLen, uncompTotal, compTotal)
  214. return b
  215. }
  216. // Load a binary index.
  217. // A zero value Index can be used or a previous one can be reused.
  218. func (i *Index) Load(b []byte) ([]byte, error) {
  219. if len(b) <= 4+len(S2IndexHeader)+len(S2IndexTrailer) {
  220. return b, io.ErrUnexpectedEOF
  221. }
  222. if b[0] != ChunkTypeIndex {
  223. return b, ErrCorrupt
  224. }
  225. chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
  226. b = b[4:]
  227. // Validate we have enough...
  228. if len(b) < chunkLen {
  229. return b, io.ErrUnexpectedEOF
  230. }
  231. if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
  232. return b, ErrUnsupported
  233. }
  234. b = b[len(S2IndexHeader):]
  235. // Total Uncompressed
  236. if v, n := binary.Varint(b); n <= 0 || v < 0 {
  237. return b, ErrCorrupt
  238. } else {
  239. i.TotalUncompressed = v
  240. b = b[n:]
  241. }
  242. // Total Compressed
  243. if v, n := binary.Varint(b); n <= 0 {
  244. return b, ErrCorrupt
  245. } else {
  246. i.TotalCompressed = v
  247. b = b[n:]
  248. }
  249. // Read EstBlockUncomp
  250. if v, n := binary.Varint(b); n <= 0 {
  251. return b, ErrCorrupt
  252. } else {
  253. if v < 0 {
  254. return b, ErrCorrupt
  255. }
  256. i.estBlockUncomp = v
  257. b = b[n:]
  258. }
  259. var entries int
  260. if v, n := binary.Varint(b); n <= 0 {
  261. return b, ErrCorrupt
  262. } else {
  263. if v < 0 || v > maxIndexEntries {
  264. return b, ErrCorrupt
  265. }
  266. entries = int(v)
  267. b = b[n:]
  268. }
  269. if cap(i.info) < entries {
  270. i.allocInfos(entries)
  271. }
  272. i.info = i.info[:entries]
  273. if len(b) < 1 {
  274. return b, io.ErrUnexpectedEOF
  275. }
  276. hasUncompressed := b[0]
  277. b = b[1:]
  278. if hasUncompressed&1 != hasUncompressed {
  279. return b, ErrCorrupt
  280. }
  281. // Add each uncompressed entry
  282. for idx := range i.info {
  283. var uOff int64
  284. if hasUncompressed != 0 {
  285. // Load delta
  286. if v, n := binary.Varint(b); n <= 0 {
  287. return b, ErrCorrupt
  288. } else {
  289. uOff = v
  290. b = b[n:]
  291. }
  292. }
  293. if idx > 0 {
  294. prev := i.info[idx-1].uncompressedOffset
  295. uOff += prev + (i.estBlockUncomp)
  296. if uOff <= prev {
  297. return b, ErrCorrupt
  298. }
  299. }
  300. if uOff < 0 {
  301. return b, ErrCorrupt
  302. }
  303. i.info[idx].uncompressedOffset = uOff
  304. }
  305. // Initial compressed size estimate.
  306. cPredict := i.estBlockUncomp / 2
  307. // Add each compressed entry
  308. for idx := range i.info {
  309. var cOff int64
  310. if v, n := binary.Varint(b); n <= 0 {
  311. return b, ErrCorrupt
  312. } else {
  313. cOff = v
  314. b = b[n:]
  315. }
  316. if idx > 0 {
  317. // Update compressed size prediction, with half the error.
  318. cPredictNew := cPredict + cOff/2
  319. prev := i.info[idx-1].compressedOffset
  320. cOff += prev + cPredict
  321. if cOff <= prev {
  322. return b, ErrCorrupt
  323. }
  324. cPredict = cPredictNew
  325. }
  326. if cOff < 0 {
  327. return b, ErrCorrupt
  328. }
  329. i.info[idx].compressedOffset = cOff
  330. }
  331. if len(b) < 4+len(S2IndexTrailer) {
  332. return b, io.ErrUnexpectedEOF
  333. }
  334. // Skip size...
  335. b = b[4:]
  336. // Check trailer...
  337. if !bytes.Equal(b[:len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
  338. return b, ErrCorrupt
  339. }
  340. return b[len(S2IndexTrailer):], nil
  341. }
  342. // LoadStream will load an index from the end of the supplied stream.
  343. // ErrUnsupported will be returned if the signature cannot be found.
  344. // ErrCorrupt will be returned if unexpected values are found.
  345. // io.ErrUnexpectedEOF is returned if there are too few bytes.
  346. // IO errors are returned as-is.
  347. func (i *Index) LoadStream(rs io.ReadSeeker) error {
  348. // Go to end.
  349. _, err := rs.Seek(-10, io.SeekEnd)
  350. if err != nil {
  351. return err
  352. }
  353. var tmp [10]byte
  354. _, err = io.ReadFull(rs, tmp[:])
  355. if err != nil {
  356. return err
  357. }
  358. // Check trailer...
  359. if !bytes.Equal(tmp[4:4+len(S2IndexTrailer)], []byte(S2IndexTrailer)) {
  360. return ErrUnsupported
  361. }
  362. sz := binary.LittleEndian.Uint32(tmp[:4])
  363. if sz > maxChunkSize+skippableFrameHeader {
  364. return ErrCorrupt
  365. }
  366. _, err = rs.Seek(-int64(sz), io.SeekEnd)
  367. if err != nil {
  368. return err
  369. }
  370. // Read index.
  371. buf := make([]byte, sz)
  372. _, err = io.ReadFull(rs, buf)
  373. if err != nil {
  374. return err
  375. }
  376. _, err = i.Load(buf)
  377. return err
  378. }
  379. // IndexStream will return an index for a stream.
  380. // The stream structure will be checked, but
  381. // data within blocks is not verified.
  382. // The returned index can either be appended to the end of the stream
  383. // or stored separately.
  384. func IndexStream(r io.Reader) ([]byte, error) {
  385. var i Index
  386. var buf [maxChunkSize]byte
  387. var readHeader bool
  388. for {
  389. _, err := io.ReadFull(r, buf[:4])
  390. if err != nil {
  391. if err == io.EOF {
  392. return i.appendTo(nil, i.TotalUncompressed, i.TotalCompressed), nil
  393. }
  394. return nil, err
  395. }
  396. // Start of this chunk.
  397. startChunk := i.TotalCompressed
  398. i.TotalCompressed += 4
  399. chunkType := buf[0]
  400. if !readHeader {
  401. if chunkType != chunkTypeStreamIdentifier {
  402. return nil, ErrCorrupt
  403. }
  404. readHeader = true
  405. }
  406. chunkLen := int(buf[1]) | int(buf[2])<<8 | int(buf[3])<<16
  407. if chunkLen < checksumSize {
  408. return nil, ErrCorrupt
  409. }
  410. i.TotalCompressed += int64(chunkLen)
  411. _, err = io.ReadFull(r, buf[:chunkLen])
  412. if err != nil {
  413. return nil, io.ErrUnexpectedEOF
  414. }
  415. // The chunk types are specified at
  416. // https://github.com/google/snappy/blob/master/framing_format.txt
  417. switch chunkType {
  418. case chunkTypeCompressedData:
  419. // Section 4.2. Compressed data (chunk type 0x00).
  420. // Skip checksum.
  421. dLen, err := DecodedLen(buf[checksumSize:])
  422. if err != nil {
  423. return nil, err
  424. }
  425. if dLen > maxBlockSize {
  426. return nil, ErrCorrupt
  427. }
  428. if i.estBlockUncomp == 0 {
  429. // Use first block for estimate...
  430. i.estBlockUncomp = int64(dLen)
  431. }
  432. err = i.add(startChunk, i.TotalUncompressed)
  433. if err != nil {
  434. return nil, err
  435. }
  436. i.TotalUncompressed += int64(dLen)
  437. continue
  438. case chunkTypeUncompressedData:
  439. n2 := chunkLen - checksumSize
  440. if n2 > maxBlockSize {
  441. return nil, ErrCorrupt
  442. }
  443. if i.estBlockUncomp == 0 {
  444. // Use first block for estimate...
  445. i.estBlockUncomp = int64(n2)
  446. }
  447. err = i.add(startChunk, i.TotalUncompressed)
  448. if err != nil {
  449. return nil, err
  450. }
  451. i.TotalUncompressed += int64(n2)
  452. continue
  453. case chunkTypeStreamIdentifier:
  454. // Section 4.1. Stream identifier (chunk type 0xff).
  455. if chunkLen != len(magicBody) {
  456. return nil, ErrCorrupt
  457. }
  458. if string(buf[:len(magicBody)]) != magicBody {
  459. if string(buf[:len(magicBody)]) != magicBodySnappy {
  460. return nil, ErrCorrupt
  461. }
  462. }
  463. continue
  464. }
  465. if chunkType <= 0x7f {
  466. // Section 4.5. Reserved unskippable chunks (chunk types 0x02-0x7f).
  467. return nil, ErrUnsupported
  468. }
  469. if chunkLen > maxChunkSize {
  470. return nil, ErrUnsupported
  471. }
  472. // Section 4.4 Padding (chunk type 0xfe).
  473. // Section 4.6. Reserved skippable chunks (chunk types 0x80-0xfd).
  474. }
  475. }
  476. // JSON returns the index as JSON text.
  477. func (i *Index) JSON() []byte {
  478. type offset struct {
  479. CompressedOffset int64 `json:"compressed"`
  480. UncompressedOffset int64 `json:"uncompressed"`
  481. }
  482. x := struct {
  483. TotalUncompressed int64 `json:"total_uncompressed"` // Total Uncompressed size if known. Will be -1 if unknown.
  484. TotalCompressed int64 `json:"total_compressed"` // Total Compressed size if known. Will be -1 if unknown.
  485. Offsets []offset `json:"offsets"`
  486. EstBlockUncomp int64 `json:"est_block_uncompressed"`
  487. }{
  488. TotalUncompressed: i.TotalUncompressed,
  489. TotalCompressed: i.TotalCompressed,
  490. EstBlockUncomp: i.estBlockUncomp,
  491. }
  492. for _, v := range i.info {
  493. x.Offsets = append(x.Offsets, offset{CompressedOffset: v.compressedOffset, UncompressedOffset: v.uncompressedOffset})
  494. }
  495. b, _ := json.MarshalIndent(x, "", " ")
  496. return b
  497. }
  498. // RemoveIndexHeaders will trim all headers and trailers from a given index.
  499. // This is expected to save 20 bytes.
  500. // These can be restored using RestoreIndexHeaders.
  501. // This removes a layer of security, but is the most compact representation.
  502. // Returns nil if headers contains errors.
  503. // The returned slice references the provided slice.
  504. func RemoveIndexHeaders(b []byte) []byte {
  505. const save = 4 + len(S2IndexHeader) + len(S2IndexTrailer) + 4
  506. if len(b) <= save {
  507. return nil
  508. }
  509. if b[0] != ChunkTypeIndex {
  510. return nil
  511. }
  512. chunkLen := int(b[1]) | int(b[2])<<8 | int(b[3])<<16
  513. b = b[4:]
  514. // Validate we have enough...
  515. if len(b) < chunkLen {
  516. return nil
  517. }
  518. b = b[:chunkLen]
  519. if !bytes.Equal(b[:len(S2IndexHeader)], []byte(S2IndexHeader)) {
  520. return nil
  521. }
  522. b = b[len(S2IndexHeader):]
  523. if !bytes.HasSuffix(b, []byte(S2IndexTrailer)) {
  524. return nil
  525. }
  526. b = bytes.TrimSuffix(b, []byte(S2IndexTrailer))
  527. if len(b) < 4 {
  528. return nil
  529. }
  530. return b[:len(b)-4]
  531. }
  532. // RestoreIndexHeaders will index restore headers removed by RemoveIndexHeaders.
  533. // No error checking is performed on the input.
  534. // If a 0 length slice is sent, it is returned without modification.
  535. func RestoreIndexHeaders(in []byte) []byte {
  536. if len(in) == 0 {
  537. return in
  538. }
  539. b := make([]byte, 0, 4+len(S2IndexHeader)+len(in)+len(S2IndexTrailer)+4)
  540. b = append(b, ChunkTypeIndex, 0, 0, 0)
  541. b = append(b, []byte(S2IndexHeader)...)
  542. b = append(b, in...)
  543. var tmp [4]byte
  544. binary.LittleEndian.PutUint32(tmp[:], uint32(len(b)+4+len(S2IndexTrailer)))
  545. b = append(b, tmp[:4]...)
  546. // Trailer
  547. b = append(b, []byte(S2IndexTrailer)...)
  548. chunkLen := len(b) - skippableFrameHeader
  549. b[1] = uint8(chunkLen >> 0)
  550. b[2] = uint8(chunkLen >> 8)
  551. b[3] = uint8(chunkLen >> 16)
  552. return b
  553. }