stream.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. # Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
  2. #
  3. # This module is part of GitDB and is released under
  4. # the New BSD License: http://www.opensource.org/licenses/bsd-license.php
  5. from io import BytesIO
  6. import mmap
  7. import os
  8. import sys
  9. import zlib
  10. from gitdb.fun import (
  11. msb_size,
  12. stream_copy,
  13. apply_delta_data,
  14. connect_deltas,
  15. delta_types
  16. )
  17. from gitdb.util import (
  18. allocate_memory,
  19. LazyMixin,
  20. make_sha,
  21. write,
  22. close,
  23. )
  24. from gitdb.const import NULL_BYTE, BYTE_SPACE
  25. from gitdb.utils.encoding import force_bytes
  26. has_perf_mod = False
  27. try:
  28. from gitdb_speedups._perf import apply_delta as c_apply_delta
  29. has_perf_mod = True
  30. except ImportError:
  31. pass
  32. __all__ = ('DecompressMemMapReader', 'FDCompressedSha1Writer', 'DeltaApplyReader',
  33. 'Sha1Writer', 'FlexibleSha1Writer', 'ZippedStoreShaWriter', 'FDCompressedSha1Writer',
  34. 'FDStream', 'NullStream')
  35. #{ RO Streams
  36. class DecompressMemMapReader(LazyMixin):
  37. """Reads data in chunks from a memory map and decompresses it. The client sees
  38. only the uncompressed data, respective file-like read calls are handling on-demand
  39. buffered decompression accordingly
  40. A constraint on the total size of bytes is activated, simulating
  41. a logical file within a possibly larger physical memory area
  42. To read efficiently, you clearly don't want to read individual bytes, instead,
  43. read a few kilobytes at least.
  44. **Note:** The chunk-size should be carefully selected as it will involve quite a bit
  45. of string copying due to the way the zlib is implemented. Its very wasteful,
  46. hence we try to find a good tradeoff between allocation time and number of
  47. times we actually allocate. An own zlib implementation would be good here
  48. to better support streamed reading - it would only need to keep the mmap
  49. and decompress it into chunks, that's all ... """
  50. __slots__ = ('_m', '_zip', '_buf', '_buflen', '_br', '_cws', '_cwe', '_s', '_close',
  51. '_cbr', '_phi')
  52. max_read_size = 512 * 1024 # currently unused
  53. def __init__(self, m, close_on_deletion, size=None):
  54. """Initialize with mmap for stream reading
  55. :param m: must be content data - use new if you have object data and no size"""
  56. self._m = m
  57. self._zip = zlib.decompressobj()
  58. self._buf = None # buffer of decompressed bytes
  59. self._buflen = 0 # length of bytes in buffer
  60. if size is not None:
  61. self._s = size # size of uncompressed data to read in total
  62. self._br = 0 # num uncompressed bytes read
  63. self._cws = 0 # start byte of compression window
  64. self._cwe = 0 # end byte of compression window
  65. self._cbr = 0 # number of compressed bytes read
  66. self._phi = False # is True if we parsed the header info
  67. self._close = close_on_deletion # close the memmap on deletion ?
  68. def _set_cache_(self, attr):
  69. assert attr == '_s'
  70. # only happens for size, which is a marker to indicate we still
  71. # have to parse the header from the stream
  72. self._parse_header_info()
  73. def __del__(self):
  74. self.close()
  75. def _parse_header_info(self):
  76. """If this stream contains object data, parse the header info and skip the
  77. stream to a point where each read will yield object content
  78. :return: parsed type_string, size"""
  79. # read header
  80. # should really be enough, cgit uses 8192 I believe
  81. # And for good reason !! This needs to be that high for the header to be read correctly in all cases
  82. maxb = 8192
  83. self._s = maxb
  84. hdr = self.read(maxb)
  85. hdrend = hdr.find(NULL_BYTE)
  86. typ, size = hdr[:hdrend].split(BYTE_SPACE)
  87. size = int(size)
  88. self._s = size
  89. # adjust internal state to match actual header length that we ignore
  90. # The buffer will be depleted first on future reads
  91. self._br = 0
  92. hdrend += 1
  93. self._buf = BytesIO(hdr[hdrend:])
  94. self._buflen = len(hdr) - hdrend
  95. self._phi = True
  96. return typ, size
  97. #{ Interface
  98. @classmethod
  99. def new(self, m, close_on_deletion=False):
  100. """Create a new DecompressMemMapReader instance for acting as a read-only stream
  101. This method parses the object header from m and returns the parsed
  102. type and size, as well as the created stream instance.
  103. :param m: memory map on which to operate. It must be object data ( header + contents )
  104. :param close_on_deletion: if True, the memory map will be closed once we are
  105. being deleted"""
  106. inst = DecompressMemMapReader(m, close_on_deletion, 0)
  107. typ, size = inst._parse_header_info()
  108. return typ, size, inst
  109. def data(self):
  110. """:return: random access compatible data we are working on"""
  111. return self._m
  112. def close(self):
  113. """Close our underlying stream of compressed bytes if this was allowed during initialization
  114. :return: True if we closed the underlying stream
  115. :note: can be called safely
  116. """
  117. if self._close:
  118. if hasattr(self._m, 'close'):
  119. self._m.close()
  120. self._close = False
  121. # END handle resource freeing
  122. def compressed_bytes_read(self):
  123. """
  124. :return: number of compressed bytes read. This includes the bytes it
  125. took to decompress the header ( if there was one )"""
  126. # ABSTRACT: When decompressing a byte stream, it can be that the first
  127. # x bytes which were requested match the first x bytes in the loosely
  128. # compressed datastream. This is the worst-case assumption that the reader
  129. # does, it assumes that it will get at least X bytes from X compressed bytes
  130. # in call cases.
  131. # The caveat is that the object, according to our known uncompressed size,
  132. # is already complete, but there are still some bytes left in the compressed
  133. # stream that contribute to the amount of compressed bytes.
  134. # How can we know that we are truly done, and have read all bytes we need
  135. # to read ?
  136. # Without help, we cannot know, as we need to obtain the status of the
  137. # decompression. If it is not finished, we need to decompress more data
  138. # until it is finished, to yield the actual number of compressed bytes
  139. # belonging to the decompressed object
  140. # We are using a custom zlib module for this, if its not present,
  141. # we try to put in additional bytes up for decompression if feasible
  142. # and check for the unused_data.
  143. # Only scrub the stream forward if we are officially done with the
  144. # bytes we were to have.
  145. if self._br == self._s and not self._zip.unused_data:
  146. # manipulate the bytes-read to allow our own read method to continue
  147. # but keep the window at its current position
  148. self._br = 0
  149. if hasattr(self._zip, 'status'):
  150. while self._zip.status == zlib.Z_OK:
  151. self.read(mmap.PAGESIZE)
  152. # END scrub-loop custom zlib
  153. else:
  154. # pass in additional pages, until we have unused data
  155. while not self._zip.unused_data and self._cbr != len(self._m):
  156. self.read(mmap.PAGESIZE)
  157. # END scrub-loop default zlib
  158. # END handle stream scrubbing
  159. # reset bytes read, just to be sure
  160. self._br = self._s
  161. # END handle stream scrubbing
  162. # unused data ends up in the unconsumed tail, which was removed
  163. # from the count already
  164. return self._cbr
  165. #} END interface
  166. def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)):
  167. """Allows to reset the stream to restart reading
  168. :raise ValueError: If offset and whence are not 0"""
  169. if offset != 0 or whence != getattr(os, 'SEEK_SET', 0):
  170. raise ValueError("Can only seek to position 0")
  171. # END handle offset
  172. self._zip = zlib.decompressobj()
  173. self._br = self._cws = self._cwe = self._cbr = 0
  174. if self._phi:
  175. self._phi = False
  176. del(self._s) # trigger header parsing on first access
  177. # END skip header
  178. def read(self, size=-1):
  179. if size < 1:
  180. size = self._s - self._br
  181. else:
  182. size = min(size, self._s - self._br)
  183. # END clamp size
  184. if size == 0:
  185. return b''
  186. # END handle depletion
  187. # deplete the buffer, then just continue using the decompress object
  188. # which has an own buffer. We just need this to transparently parse the
  189. # header from the zlib stream
  190. dat = b''
  191. if self._buf:
  192. if self._buflen >= size:
  193. # have enough data
  194. dat = self._buf.read(size)
  195. self._buflen -= size
  196. self._br += size
  197. return dat
  198. else:
  199. dat = self._buf.read() # ouch, duplicates data
  200. size -= self._buflen
  201. self._br += self._buflen
  202. self._buflen = 0
  203. self._buf = None
  204. # END handle buffer len
  205. # END handle buffer
  206. # decompress some data
  207. # Abstract: zlib needs to operate on chunks of our memory map ( which may
  208. # be large ), as it will otherwise and always fill in the 'unconsumed_tail'
  209. # attribute which possible reads our whole map to the end, forcing
  210. # everything to be read from disk even though just a portion was requested.
  211. # As this would be a nogo, we workaround it by passing only chunks of data,
  212. # moving the window into the memory map along as we decompress, which keeps
  213. # the tail smaller than our chunk-size. This causes 'only' the chunk to be
  214. # copied once, and another copy of a part of it when it creates the unconsumed
  215. # tail. We have to use it to hand in the appropriate amount of bytes during
  216. # the next read.
  217. tail = self._zip.unconsumed_tail
  218. if tail:
  219. # move the window, make it as large as size demands. For code-clarity,
  220. # we just take the chunk from our map again instead of reusing the unconsumed
  221. # tail. The latter one would safe some memory copying, but we could end up
  222. # with not getting enough data uncompressed, so we had to sort that out as well.
  223. # Now we just assume the worst case, hence the data is uncompressed and the window
  224. # needs to be as large as the uncompressed bytes we want to read.
  225. self._cws = self._cwe - len(tail)
  226. self._cwe = self._cws + size
  227. else:
  228. cws = self._cws
  229. self._cws = self._cwe
  230. self._cwe = cws + size
  231. # END handle tail
  232. # if window is too small, make it larger so zip can decompress something
  233. if self._cwe - self._cws < 8:
  234. self._cwe = self._cws + 8
  235. # END adjust winsize
  236. # takes a slice, but doesn't copy the data, it says ...
  237. indata = self._m[self._cws:self._cwe]
  238. # get the actual window end to be sure we don't use it for computations
  239. self._cwe = self._cws + len(indata)
  240. dcompdat = self._zip.decompress(indata, size)
  241. # update the amount of compressed bytes read
  242. # We feed possibly overlapping chunks, which is why the unconsumed tail
  243. # has to be taken into consideration, as well as the unused data
  244. # if we hit the end of the stream
  245. # NOTE: Behavior changed in PY2.7 onward, which requires special handling to make the tests work properly.
  246. # They are thorough, and I assume it is truly working.
  247. # Why is this logic as convoluted as it is ? Please look at the table in
  248. # https://github.com/gitpython-developers/gitdb/issues/19 to learn about the test-results.
  249. # Basically, on py2.6, you want to use branch 1, whereas on all other python version, the second branch
  250. # will be the one that works.
  251. # However, the zlib VERSIONs as well as the platform check is used to further match the entries in the
  252. # table in the github issue. This is it ... it was the only way I could make this work everywhere.
  253. # IT's CERTAINLY GOING TO BITE US IN THE FUTURE ... .
  254. if zlib.ZLIB_VERSION in ('1.2.7', '1.2.5') and not sys.platform == 'darwin':
  255. unused_datalen = len(self._zip.unconsumed_tail)
  256. else:
  257. unused_datalen = len(self._zip.unconsumed_tail) + len(self._zip.unused_data)
  258. # # end handle very special case ...
  259. self._cbr += len(indata) - unused_datalen
  260. self._br += len(dcompdat)
  261. if dat:
  262. dcompdat = dat + dcompdat
  263. # END prepend our cached data
  264. # it can happen, depending on the compression, that we get less bytes
  265. # than ordered as it needs the final portion of the data as well.
  266. # Recursively resolve that.
  267. # Note: dcompdat can be empty even though we still appear to have bytes
  268. # to read, if we are called by compressed_bytes_read - it manipulates
  269. # us to empty the stream
  270. if dcompdat and (len(dcompdat) - len(dat)) < size and self._br < self._s:
  271. dcompdat += self.read(size - len(dcompdat))
  272. # END handle special case
  273. return dcompdat
  274. class DeltaApplyReader(LazyMixin):
  275. """A reader which dynamically applies pack deltas to a base object, keeping the
  276. memory demands to a minimum.
  277. The size of the final object is only obtainable once all deltas have been
  278. applied, unless it is retrieved from a pack index.
  279. The uncompressed Delta has the following layout (MSB being a most significant
  280. bit encoded dynamic size):
  281. * MSB Source Size - the size of the base against which the delta was created
  282. * MSB Target Size - the size of the resulting data after the delta was applied
  283. * A list of one byte commands (cmd) which are followed by a specific protocol:
  284. * cmd & 0x80 - copy delta_data[offset:offset+size]
  285. * Followed by an encoded offset into the delta data
  286. * Followed by an encoded size of the chunk to copy
  287. * cmd & 0x7f - insert
  288. * insert cmd bytes from the delta buffer into the output stream
  289. * cmd == 0 - invalid operation ( or error in delta stream )
  290. """
  291. __slots__ = (
  292. "_bstream", # base stream to which to apply the deltas
  293. "_dstreams", # tuple of delta stream readers
  294. "_mm_target", # memory map of the delta-applied data
  295. "_size", # actual number of bytes in _mm_target
  296. "_br" # number of bytes read
  297. )
  298. #{ Configuration
  299. k_max_memory_move = 250 * 1000 * 1000
  300. #} END configuration
  301. def __init__(self, stream_list):
  302. """Initialize this instance with a list of streams, the first stream being
  303. the delta to apply on top of all following deltas, the last stream being the
  304. base object onto which to apply the deltas"""
  305. assert len(stream_list) > 1, "Need at least one delta and one base stream"
  306. self._bstream = stream_list[-1]
  307. self._dstreams = tuple(stream_list[:-1])
  308. self._br = 0
  309. def _set_cache_too_slow_without_c(self, attr):
  310. # the direct algorithm is fastest and most direct if there is only one
  311. # delta. Also, the extra overhead might not be worth it for items smaller
  312. # than X - definitely the case in python, every function call costs
  313. # huge amounts of time
  314. # if len(self._dstreams) * self._bstream.size < self.k_max_memory_move:
  315. if len(self._dstreams) == 1:
  316. return self._set_cache_brute_(attr)
  317. # Aggregate all deltas into one delta in reverse order. Hence we take
  318. # the last delta, and reverse-merge its ancestor delta, until we receive
  319. # the final delta data stream.
  320. dcl = connect_deltas(self._dstreams)
  321. # call len directly, as the (optional) c version doesn't implement the sequence
  322. # protocol
  323. if dcl.rbound() == 0:
  324. self._size = 0
  325. self._mm_target = allocate_memory(0)
  326. return
  327. # END handle empty list
  328. self._size = dcl.rbound()
  329. self._mm_target = allocate_memory(self._size)
  330. bbuf = allocate_memory(self._bstream.size)
  331. stream_copy(self._bstream.read, bbuf.write, self._bstream.size, 256 * mmap.PAGESIZE)
  332. # APPLY CHUNKS
  333. write = self._mm_target.write
  334. dcl.apply(bbuf, write)
  335. self._mm_target.seek(0)
  336. def _set_cache_brute_(self, attr):
  337. """If we are here, we apply the actual deltas"""
  338. # TODO: There should be a special case if there is only one stream
  339. # Then the default-git algorithm should perform a tad faster, as the
  340. # delta is not peaked into, causing less overhead.
  341. buffer_info_list = list()
  342. max_target_size = 0
  343. for dstream in self._dstreams:
  344. buf = dstream.read(512) # read the header information + X
  345. offset, src_size = msb_size(buf)
  346. offset, target_size = msb_size(buf, offset)
  347. buffer_info_list.append((buf[offset:], offset, src_size, target_size))
  348. max_target_size = max(max_target_size, target_size)
  349. # END for each delta stream
  350. # sanity check - the first delta to apply should have the same source
  351. # size as our actual base stream
  352. base_size = self._bstream.size
  353. target_size = max_target_size
  354. # if we have more than 1 delta to apply, we will swap buffers, hence we must
  355. # assure that all buffers we use are large enough to hold all the results
  356. if len(self._dstreams) > 1:
  357. base_size = target_size = max(base_size, max_target_size)
  358. # END adjust buffer sizes
  359. # Allocate private memory map big enough to hold the first base buffer
  360. # We need random access to it
  361. bbuf = allocate_memory(base_size)
  362. stream_copy(self._bstream.read, bbuf.write, base_size, 256 * mmap.PAGESIZE)
  363. # allocate memory map large enough for the largest (intermediate) target
  364. # We will use it as scratch space for all delta ops. If the final
  365. # target buffer is smaller than our allocated space, we just use parts
  366. # of it upon return.
  367. tbuf = allocate_memory(target_size)
  368. # for each delta to apply, memory map the decompressed delta and
  369. # work on the op-codes to reconstruct everything.
  370. # For the actual copying, we use a seek and write pattern of buffer
  371. # slices.
  372. final_target_size = None
  373. for (dbuf, offset, src_size, target_size), dstream in zip(reversed(buffer_info_list), reversed(self._dstreams)):
  374. # allocate a buffer to hold all delta data - fill in the data for
  375. # fast access. We do this as we know that reading individual bytes
  376. # from our stream would be slower than necessary ( although possible )
  377. # The dbuf buffer contains commands after the first two MSB sizes, the
  378. # offset specifies the amount of bytes read to get the sizes.
  379. ddata = allocate_memory(dstream.size - offset)
  380. ddata.write(dbuf)
  381. # read the rest from the stream. The size we give is larger than necessary
  382. stream_copy(dstream.read, ddata.write, dstream.size, 256 * mmap.PAGESIZE)
  383. #######################################################################
  384. if 'c_apply_delta' in globals():
  385. c_apply_delta(bbuf, ddata, tbuf)
  386. else:
  387. apply_delta_data(bbuf, src_size, ddata, len(ddata), tbuf.write)
  388. #######################################################################
  389. # finally, swap out source and target buffers. The target is now the
  390. # base for the next delta to apply
  391. bbuf, tbuf = tbuf, bbuf
  392. bbuf.seek(0)
  393. tbuf.seek(0)
  394. final_target_size = target_size
  395. # END for each delta to apply
  396. # its already seeked to 0, constrain it to the actual size
  397. # NOTE: in the end of the loop, it swaps buffers, hence our target buffer
  398. # is not tbuf, but bbuf !
  399. self._mm_target = bbuf
  400. self._size = final_target_size
  401. #{ Configuration
  402. if not has_perf_mod:
  403. _set_cache_ = _set_cache_brute_
  404. else:
  405. _set_cache_ = _set_cache_too_slow_without_c
  406. #} END configuration
  407. def read(self, count=0):
  408. bl = self._size - self._br # bytes left
  409. if count < 1 or count > bl:
  410. count = bl
  411. # NOTE: we could check for certain size limits, and possibly
  412. # return buffers instead of strings to prevent byte copying
  413. data = self._mm_target.read(count)
  414. self._br += len(data)
  415. return data
  416. def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)):
  417. """Allows to reset the stream to restart reading
  418. :raise ValueError: If offset and whence are not 0"""
  419. if offset != 0 or whence != getattr(os, 'SEEK_SET', 0):
  420. raise ValueError("Can only seek to position 0")
  421. # END handle offset
  422. self._br = 0
  423. self._mm_target.seek(0)
  424. #{ Interface
  425. @classmethod
  426. def new(cls, stream_list):
  427. """
  428. Convert the given list of streams into a stream which resolves deltas
  429. when reading from it.
  430. :param stream_list: two or more stream objects, first stream is a Delta
  431. to the object that you want to resolve, followed by N additional delta
  432. streams. The list's last stream must be a non-delta stream.
  433. :return: Non-Delta OPackStream object whose stream can be used to obtain
  434. the decompressed resolved data
  435. :raise ValueError: if the stream list cannot be handled"""
  436. if len(stream_list) < 2:
  437. raise ValueError("Need at least two streams")
  438. # END single object special handling
  439. if stream_list[-1].type_id in delta_types:
  440. raise ValueError(
  441. "Cannot resolve deltas if there is no base object stream, last one was type: %s" % stream_list[-1].type)
  442. # END check stream
  443. return cls(stream_list)
  444. #} END interface
  445. #{ OInfo like Interface
  446. @property
  447. def type(self):
  448. return self._bstream.type
  449. @property
  450. def type_id(self):
  451. return self._bstream.type_id
  452. @property
  453. def size(self):
  454. """:return: number of uncompressed bytes in the stream"""
  455. return self._size
  456. #} END oinfo like interface
  457. #} END RO streams
  458. #{ W Streams
  459. class Sha1Writer:
  460. """Simple stream writer which produces a sha whenever you like as it degests
  461. everything it is supposed to write"""
  462. __slots__ = "sha1"
  463. def __init__(self):
  464. self.sha1 = make_sha()
  465. #{ Stream Interface
  466. def write(self, data):
  467. """:raise IOError: If not all bytes could be written
  468. :param data: byte object
  469. :return: length of incoming data"""
  470. self.sha1.update(data)
  471. return len(data)
  472. # END stream interface
  473. #{ Interface
  474. def sha(self, as_hex=False):
  475. """:return: sha so far
  476. :param as_hex: if True, sha will be hex-encoded, binary otherwise"""
  477. if as_hex:
  478. return self.sha1.hexdigest()
  479. return self.sha1.digest()
  480. #} END interface
  481. class FlexibleSha1Writer(Sha1Writer):
  482. """Writer producing a sha1 while passing on the written bytes to the given
  483. write function"""
  484. __slots__ = 'writer'
  485. def __init__(self, writer):
  486. Sha1Writer.__init__(self)
  487. self.writer = writer
  488. def write(self, data):
  489. Sha1Writer.write(self, data)
  490. self.writer(data)
  491. class ZippedStoreShaWriter(Sha1Writer):
  492. """Remembers everything someone writes to it and generates a sha"""
  493. __slots__ = ('buf', 'zip')
  494. def __init__(self):
  495. Sha1Writer.__init__(self)
  496. self.buf = BytesIO()
  497. self.zip = zlib.compressobj(zlib.Z_BEST_SPEED)
  498. def __getattr__(self, attr):
  499. return getattr(self.buf, attr)
  500. def write(self, data):
  501. alen = Sha1Writer.write(self, data)
  502. self.buf.write(self.zip.compress(data))
  503. return alen
  504. def close(self):
  505. self.buf.write(self.zip.flush())
  506. def seek(self, offset, whence=getattr(os, 'SEEK_SET', 0)):
  507. """Seeking currently only supports to rewind written data
  508. Multiple writes are not supported"""
  509. if offset != 0 or whence != getattr(os, 'SEEK_SET', 0):
  510. raise ValueError("Can only seek to position 0")
  511. # END handle offset
  512. self.buf.seek(0)
  513. def getvalue(self):
  514. """:return: string value from the current stream position to the end"""
  515. return self.buf.getvalue()
  516. class FDCompressedSha1Writer(Sha1Writer):
  517. """Digests data written to it, making the sha available, then compress the
  518. data and write it to the file descriptor
  519. **Note:** operates on raw file descriptors
  520. **Note:** for this to work, you have to use the close-method of this instance"""
  521. __slots__ = ("fd", "sha1", "zip")
  522. # default exception
  523. exc = IOError("Failed to write all bytes to filedescriptor")
  524. def __init__(self, fd):
  525. super().__init__()
  526. self.fd = fd
  527. self.zip = zlib.compressobj(zlib.Z_BEST_SPEED)
  528. #{ Stream Interface
  529. def write(self, data):
  530. """:raise IOError: If not all bytes could be written
  531. :return: length of incoming data"""
  532. self.sha1.update(data)
  533. cdata = self.zip.compress(data)
  534. bytes_written = write(self.fd, cdata)
  535. if bytes_written != len(cdata):
  536. raise self.exc
  537. return len(data)
  538. def close(self):
  539. remainder = self.zip.flush()
  540. if write(self.fd, remainder) != len(remainder):
  541. raise self.exc
  542. return close(self.fd)
  543. #} END stream interface
  544. class FDStream:
  545. """A simple wrapper providing the most basic functions on a file descriptor
  546. with the fileobject interface. Cannot use os.fdopen as the resulting stream
  547. takes ownership"""
  548. __slots__ = ("_fd", '_pos')
  549. def __init__(self, fd):
  550. self._fd = fd
  551. self._pos = 0
  552. def write(self, data):
  553. self._pos += len(data)
  554. os.write(self._fd, data)
  555. def read(self, count=0):
  556. if count == 0:
  557. count = os.path.getsize(self._filepath)
  558. # END handle read everything
  559. bytes = os.read(self._fd, count)
  560. self._pos += len(bytes)
  561. return bytes
  562. def fileno(self):
  563. return self._fd
  564. def tell(self):
  565. return self._pos
  566. def close(self):
  567. close(self._fd)
  568. class NullStream:
  569. """A stream that does nothing but providing a stream interface.
  570. Use it like /dev/null"""
  571. __slots__ = tuple()
  572. def read(self, size=0):
  573. return ''
  574. def close(self):
  575. pass
  576. def write(self, data):
  577. return len(data)
  578. #} END W streams