fun.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. """Module with functions which are supposed to be as fast as possible"""
  2. from stat import S_ISDIR
  3. from git.compat import safe_decode, defenc
  4. # typing ----------------------------------------------
  5. from typing import (
  6. Callable,
  7. List,
  8. MutableSequence,
  9. Sequence,
  10. Tuple,
  11. TYPE_CHECKING,
  12. Union,
  13. overload,
  14. )
  15. if TYPE_CHECKING:
  16. from _typeshed import ReadableBuffer
  17. from git import GitCmdObjectDB
  18. EntryTup = Tuple[bytes, int, str] # same as TreeCacheTup in tree.py
  19. EntryTupOrNone = Union[EntryTup, None]
  20. # ---------------------------------------------------
  21. __all__ = (
  22. "tree_to_stream",
  23. "tree_entries_from_data",
  24. "traverse_trees_recursive",
  25. "traverse_tree_recursive",
  26. )
  27. def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]) -> None:
  28. """Write the give list of entries into a stream using its write method
  29. :param entries: **sorted** list of tuples with (binsha, mode, name)
  30. :param write: write method which takes a data string"""
  31. ord_zero = ord("0")
  32. bit_mask = 7 # 3 bits set
  33. for binsha, mode, name in entries:
  34. mode_str = b""
  35. for i in range(6):
  36. mode_str = bytes([((mode >> (i * 3)) & bit_mask) + ord_zero]) + mode_str
  37. # END for each 8 octal value
  38. # git slices away the first octal if its zero
  39. if mode_str[0] == ord_zero:
  40. mode_str = mode_str[1:]
  41. # END save a byte
  42. # here it comes: if the name is actually unicode, the replacement below
  43. # will not work as the binsha is not part of the ascii unicode encoding -
  44. # hence we must convert to an utf8 string for it to work properly.
  45. # According to my tests, this is exactly what git does, that is it just
  46. # takes the input literally, which appears to be utf8 on linux.
  47. if isinstance(name, str):
  48. name_bytes = name.encode(defenc)
  49. else:
  50. name_bytes = name # type: ignore[unreachable] # check runtime types - is always str?
  51. write(b"".join((mode_str, b" ", name_bytes, b"\0", binsha)))
  52. # END for each item
  53. def tree_entries_from_data(data: bytes) -> List[EntryTup]:
  54. """Reads the binary representation of a tree and returns tuples of Tree items
  55. :param data: data block with tree data (as bytes)
  56. :return: list(tuple(binsha, mode, tree_relative_path), ...)"""
  57. ord_zero = ord("0")
  58. space_ord = ord(" ")
  59. len_data = len(data)
  60. i = 0
  61. out = []
  62. while i < len_data:
  63. mode = 0
  64. # read mode
  65. # Some git versions truncate the leading 0, some don't
  66. # The type will be extracted from the mode later
  67. while data[i] != space_ord:
  68. # move existing mode integer up one level being 3 bits
  69. # and add the actual ordinal value of the character
  70. mode = (mode << 3) + (data[i] - ord_zero)
  71. i += 1
  72. # END while reading mode
  73. # byte is space now, skip it
  74. i += 1
  75. # parse name, it is NULL separated
  76. ns = i
  77. while data[i] != 0:
  78. i += 1
  79. # END while not reached NULL
  80. # default encoding for strings in git is utf8
  81. # Only use the respective unicode object if the byte stream was encoded
  82. name_bytes = data[ns:i]
  83. name = safe_decode(name_bytes)
  84. # byte is NULL, get next 20
  85. i += 1
  86. sha = data[i : i + 20]
  87. i = i + 20
  88. out.append((sha, mode, name))
  89. # END for each byte in data stream
  90. return out
  91. def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int) -> EntryTupOrNone:
  92. """return data entry matching the given name and tree mode
  93. or None.
  94. Before the item is returned, the respective data item is set
  95. None in the tree_data list to mark it done"""
  96. try:
  97. item = tree_data[start_at]
  98. if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
  99. tree_data[start_at] = None
  100. return item
  101. except IndexError:
  102. pass
  103. # END exception handling
  104. for index, item in enumerate(tree_data):
  105. if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
  106. tree_data[index] = None
  107. return item
  108. # END if item matches
  109. # END for each item
  110. return None
  111. @overload
  112. def _to_full_path(item: None, path_prefix: str) -> None:
  113. ...
  114. @overload
  115. def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup:
  116. ...
  117. def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone:
  118. """Rebuild entry with given path prefix"""
  119. if not item:
  120. return item
  121. return (item[0], item[1], path_prefix + item[2])
  122. def traverse_trees_recursive(
  123. odb: "GitCmdObjectDB", tree_shas: Sequence[Union[bytes, None]], path_prefix: str
  124. ) -> List[Tuple[EntryTupOrNone, ...]]:
  125. """
  126. :return: list of list with entries according to the given binary tree-shas.
  127. The result is encoded in a list
  128. of n tuple|None per blob/commit, (n == len(tree_shas)), where
  129. * [0] == 20 byte sha
  130. * [1] == mode as int
  131. * [2] == path relative to working tree root
  132. The entry tuple is None if the respective blob/commit did not
  133. exist in the given tree.
  134. :param tree_shas: iterable of shas pointing to trees. All trees must
  135. be on the same level. A tree-sha may be None in which case None
  136. :param path_prefix: a prefix to be added to the returned paths on this level,
  137. set it '' for the first iteration
  138. :note: The ordering of the returned items will be partially lost"""
  139. trees_data: List[List[EntryTupOrNone]] = []
  140. nt = len(tree_shas)
  141. for tree_sha in tree_shas:
  142. if tree_sha is None:
  143. data: List[EntryTupOrNone] = []
  144. else:
  145. # make new list for typing as list invariant
  146. data = list(tree_entries_from_data(odb.stream(tree_sha).read()))
  147. # END handle muted trees
  148. trees_data.append(data)
  149. # END for each sha to get data for
  150. out: List[Tuple[EntryTupOrNone, ...]] = []
  151. # find all matching entries and recursively process them together if the match
  152. # is a tree. If the match is a non-tree item, put it into the result.
  153. # Processed items will be set None
  154. for ti, tree_data in enumerate(trees_data):
  155. for ii, item in enumerate(tree_data):
  156. if not item:
  157. continue
  158. # END skip already done items
  159. entries: List[EntryTupOrNone]
  160. entries = [None for _ in range(nt)]
  161. entries[ti] = item
  162. _sha, mode, name = item
  163. is_dir = S_ISDIR(mode) # type mode bits
  164. # find this item in all other tree data items
  165. # wrap around, but stop one before our current index, hence
  166. # ti+nt, not ti+1+nt
  167. for tio in range(ti + 1, ti + nt):
  168. tio = tio % nt
  169. entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii)
  170. # END for each other item data
  171. # if we are a directory, enter recursion
  172. if is_dir:
  173. out.extend(
  174. traverse_trees_recursive(
  175. odb,
  176. [((ei and ei[0]) or None) for ei in entries],
  177. path_prefix + name + "/",
  178. )
  179. )
  180. else:
  181. out.append(tuple(_to_full_path(e, path_prefix) for e in entries))
  182. # END handle recursion
  183. # finally mark it done
  184. tree_data[ii] = None
  185. # END for each item
  186. # we are done with one tree, set all its data empty
  187. del tree_data[:]
  188. # END for each tree_data chunk
  189. return out
  190. def traverse_tree_recursive(odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str) -> List[EntryTup]:
  191. """
  192. :return: list of entries of the tree pointed to by the binary tree_sha. An entry
  193. has the following format:
  194. * [0] 20 byte sha
  195. * [1] mode as int
  196. * [2] path relative to the repository
  197. :param path_prefix: prefix to prepend to the front of all returned paths"""
  198. entries = []
  199. data = tree_entries_from_data(odb.stream(tree_sha).read())
  200. # unpacking/packing is faster than accessing individual items
  201. for sha, mode, name in data:
  202. if S_ISDIR(mode):
  203. entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + "/"))
  204. else:
  205. entries.append((sha, mode, path_prefix + name))
  206. # END for each item
  207. return entries