tree.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. # tree.py
  2. # Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
  3. #
  4. # This module is part of GitPython and is released under
  5. # the BSD License: http://www.opensource.org/licenses/bsd-license.php
  6. from git.util import IterableList, join_path
  7. import git.diff as git_diff
  8. from git.util import to_bin_sha
  9. from . import util
  10. from .base import IndexObject, IndexObjUnion
  11. from .blob import Blob
  12. from .submodule.base import Submodule
  13. from .fun import tree_entries_from_data, tree_to_stream
  14. # typing -------------------------------------------------
  15. from typing import (
  16. Any,
  17. Callable,
  18. Dict,
  19. Iterable,
  20. Iterator,
  21. List,
  22. Tuple,
  23. Type,
  24. Union,
  25. cast,
  26. TYPE_CHECKING,
  27. )
  28. from git.types import PathLike, Literal
  29. if TYPE_CHECKING:
  30. from git.repo import Repo
  31. from io import BytesIO
  32. TreeCacheTup = Tuple[bytes, int, str]
  33. TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]]
  34. # def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]:
  35. # return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str)
  36. # --------------------------------------------------------
  37. cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b)
  38. __all__ = ("TreeModifier", "Tree")
  39. def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int:
  40. a, b = t1[2], t2[2]
  41. # assert isinstance(a, str) and isinstance(b, str)
  42. len_a, len_b = len(a), len(b)
  43. min_len = min(len_a, len_b)
  44. min_cmp = cmp(a[:min_len], b[:min_len])
  45. if min_cmp:
  46. return min_cmp
  47. return len_a - len_b
  48. def merge_sort(a: List[TreeCacheTup], cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None:
  49. if len(a) < 2:
  50. return None
  51. mid = len(a) // 2
  52. lefthalf = a[:mid]
  53. righthalf = a[mid:]
  54. merge_sort(lefthalf, cmp)
  55. merge_sort(righthalf, cmp)
  56. i = 0
  57. j = 0
  58. k = 0
  59. while i < len(lefthalf) and j < len(righthalf):
  60. if cmp(lefthalf[i], righthalf[j]) <= 0:
  61. a[k] = lefthalf[i]
  62. i = i + 1
  63. else:
  64. a[k] = righthalf[j]
  65. j = j + 1
  66. k = k + 1
  67. while i < len(lefthalf):
  68. a[k] = lefthalf[i]
  69. i = i + 1
  70. k = k + 1
  71. while j < len(righthalf):
  72. a[k] = righthalf[j]
  73. j = j + 1
  74. k = k + 1
  75. class TreeModifier(object):
  76. """A utility class providing methods to alter the underlying cache in a list-like fashion.
  77. Once all adjustments are complete, the _cache, which really is a reference to
  78. the cache of a tree, will be sorted. Assuring it will be in a serializable state"""
  79. __slots__ = "_cache"
  80. def __init__(self, cache: List[TreeCacheTup]) -> None:
  81. self._cache = cache
  82. def _index_by_name(self, name: str) -> int:
  83. """:return: index of an item with name, or -1 if not found"""
  84. for i, t in enumerate(self._cache):
  85. if t[2] == name:
  86. return i
  87. # END found item
  88. # END for each item in cache
  89. return -1
  90. # { Interface
  91. def set_done(self) -> "TreeModifier":
  92. """Call this method once you are done modifying the tree information.
  93. It may be called several times, but be aware that each call will cause
  94. a sort operation
  95. :return self:"""
  96. merge_sort(self._cache, git_cmp)
  97. return self
  98. # } END interface
  99. # { Mutators
  100. def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier":
  101. """Add the given item to the tree. If an item with the given name already
  102. exists, nothing will be done, but a ValueError will be raised if the
  103. sha and mode of the existing item do not match the one you add, unless
  104. force is True
  105. :param sha: The 20 or 40 byte sha of the item to add
  106. :param mode: int representing the stat compatible mode of the item
  107. :param force: If True, an item with your name and information will overwrite
  108. any existing item with the same name, no matter which information it has
  109. :return: self"""
  110. if "/" in name:
  111. raise ValueError("Name must not contain '/' characters")
  112. if (mode >> 12) not in Tree._map_id_to_type:
  113. raise ValueError("Invalid object type according to mode %o" % mode)
  114. sha = to_bin_sha(sha)
  115. index = self._index_by_name(name)
  116. item = (sha, mode, name)
  117. # assert is_tree_cache(item)
  118. if index == -1:
  119. self._cache.append(item)
  120. else:
  121. if force:
  122. self._cache[index] = item
  123. else:
  124. ex_item = self._cache[index]
  125. if ex_item[0] != sha or ex_item[1] != mode:
  126. raise ValueError("Item %r existed with different properties" % name)
  127. # END handle mismatch
  128. # END handle force
  129. # END handle name exists
  130. return self
  131. def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None:
  132. """Add the given item to the tree, its correctness is assumed, which
  133. puts the caller into responsibility to assure the input is correct.
  134. For more information on the parameters, see ``add``
  135. :param binsha: 20 byte binary sha"""
  136. assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
  137. tree_cache = (binsha, mode, name)
  138. self._cache.append(tree_cache)
  139. def __delitem__(self, name: str) -> None:
  140. """Deletes an item with the given name if it exists"""
  141. index = self._index_by_name(name)
  142. if index > -1:
  143. del self._cache[index]
  144. # } END mutators
  145. class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
  146. """Tree objects represent an ordered list of Blobs and other Trees.
  147. ``Tree as a list``::
  148. Access a specific blob using the
  149. tree['filename'] notation.
  150. You may as well access by index
  151. blob = tree[0]
  152. """
  153. type: Literal["tree"] = "tree"
  154. __slots__ = "_cache"
  155. # actual integer ids for comparison
  156. commit_id = 0o16 # equals stat.S_IFDIR | stat.S_IFLNK - a directory link
  157. blob_id = 0o10
  158. symlink_id = 0o12
  159. tree_id = 0o04
  160. _map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
  161. commit_id: Submodule,
  162. blob_id: Blob,
  163. symlink_id: Blob
  164. # tree id added once Tree is defined
  165. }
  166. def __init__(
  167. self,
  168. repo: "Repo",
  169. binsha: bytes,
  170. mode: int = tree_id << 12,
  171. path: Union[PathLike, None] = None,
  172. ):
  173. super(Tree, self).__init__(repo, binsha, mode, path)
  174. @classmethod
  175. def _get_intermediate_items(
  176. cls,
  177. index_object: IndexObjUnion,
  178. ) -> Union[Tuple["Tree", ...], Tuple[()]]:
  179. if index_object.type == "tree":
  180. return tuple(index_object._iter_convert_to_object(index_object._cache))
  181. return ()
  182. def _set_cache_(self, attr: str) -> None:
  183. if attr == "_cache":
  184. # Set the data when we need it
  185. ostream = self.repo.odb.stream(self.binsha)
  186. self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read())
  187. else:
  188. super(Tree, self)._set_cache_(attr)
  189. # END handle attribute
  190. def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]:
  191. """Iterable yields tuples of (binsha, mode, name), which will be converted
  192. to the respective object representation"""
  193. for binsha, mode, name in iterable:
  194. path = join_path(self.path, name)
  195. try:
  196. yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
  197. except KeyError as e:
  198. raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
  199. # END for each item
  200. def join(self, file: str) -> IndexObjUnion:
  201. """Find the named object in this tree's contents
  202. :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule``
  203. :raise KeyError: if given file or tree does not exist in tree"""
  204. msg = "Blob or Tree named %r not found"
  205. if "/" in file:
  206. tree = self
  207. item = self
  208. tokens = file.split("/")
  209. for i, token in enumerate(tokens):
  210. item = tree[token]
  211. if item.type == "tree":
  212. tree = item
  213. else:
  214. # safety assertion - blobs are at the end of the path
  215. if i != len(tokens) - 1:
  216. raise KeyError(msg % file)
  217. return item
  218. # END handle item type
  219. # END for each token of split path
  220. if item == self:
  221. raise KeyError(msg % file)
  222. return item
  223. else:
  224. for info in self._cache:
  225. if info[2] == file: # [2] == name
  226. return self._map_id_to_type[info[1] >> 12](
  227. self.repo, info[0], info[1], join_path(self.path, info[2])
  228. )
  229. # END for each obj
  230. raise KeyError(msg % file)
  231. # END handle long paths
  232. def __truediv__(self, file: str) -> IndexObjUnion:
  233. """For PY3 only"""
  234. return self.join(file)
  235. @property
  236. def trees(self) -> List["Tree"]:
  237. """:return: list(Tree, ...) list of trees directly below this tree"""
  238. return [i for i in self if i.type == "tree"]
  239. @property
  240. def blobs(self) -> List[Blob]:
  241. """:return: list(Blob, ...) list of blobs directly below this tree"""
  242. return [i for i in self if i.type == "blob"]
  243. @property
  244. def cache(self) -> TreeModifier:
  245. """
  246. :return: An object allowing to modify the internal cache. This can be used
  247. to change the tree's contents. When done, make sure you call ``set_done``
  248. on the tree modifier, or serialization behaviour will be incorrect.
  249. See the ``TreeModifier`` for more information on how to alter the cache"""
  250. return TreeModifier(self._cache)
  251. def traverse(
  252. self, # type: ignore[override]
  253. predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
  254. prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
  255. depth: int = -1,
  256. branch_first: bool = True,
  257. visit_once: bool = False,
  258. ignore_self: int = 1,
  259. as_edge: bool = False,
  260. ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]:
  261. """For documentation, see util.Traversable._traverse()
  262. Trees are set to visit_once = False to gain more performance in the traversal"""
  263. # """
  264. # # To typecheck instead of using cast.
  265. # import itertools
  266. # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
  267. # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
  268. # ret = super(Tree, self).traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
  269. # ret_tup = itertools.tee(ret, 2)
  270. # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
  271. # return ret_tup[0]"""
  272. return cast(
  273. Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
  274. super(Tree, self)._traverse(
  275. predicate,
  276. prune,
  277. depth, # type: ignore
  278. branch_first,
  279. visit_once,
  280. ignore_self,
  281. ),
  282. )
  283. def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
  284. """
  285. :return: IterableList with the results of the traversal as produced by
  286. traverse()
  287. Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']]
  288. """
  289. return super(Tree, self)._list_traverse(*args, **kwargs)
  290. # List protocol
  291. def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
  292. return list(self._iter_convert_to_object(self._cache[i:j]))
  293. def __iter__(self) -> Iterator[IndexObjUnion]:
  294. return self._iter_convert_to_object(self._cache)
  295. def __len__(self) -> int:
  296. return len(self._cache)
  297. def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
  298. if isinstance(item, int):
  299. info = self._cache[item]
  300. return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
  301. if isinstance(item, str):
  302. # compatibility
  303. return self.join(item)
  304. # END index is basestring
  305. raise TypeError("Invalid index type: %r" % item)
  306. def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
  307. if isinstance(item, IndexObject):
  308. for info in self._cache:
  309. if item.binsha == info[0]:
  310. return True
  311. # END compare sha
  312. # END for each entry
  313. # END handle item is index object
  314. # compatibility
  315. # treat item as repo-relative path
  316. else:
  317. path = self.path
  318. for info in self._cache:
  319. if item == join_path(path, info[2]):
  320. return True
  321. # END for each item
  322. return False
  323. def __reversed__(self) -> Iterator[IndexObjUnion]:
  324. return reversed(self._iter_convert_to_object(self._cache)) # type: ignore
  325. def _serialize(self, stream: "BytesIO") -> "Tree":
  326. """Serialize this tree into the stream. Please note that we will assume
  327. our tree data to be in a sorted state. If this is not the case, serialization
  328. will not generate a correct tree representation as these are assumed to be sorted
  329. by algorithms"""
  330. tree_to_stream(self._cache, stream.write)
  331. return self
  332. def _deserialize(self, stream: "BytesIO") -> "Tree":
  333. self._cache = tree_entries_from_data(stream.read())
  334. return self
  335. # END tree
  336. # finalize map definition
  337. Tree._map_id_to_type[Tree.tree_id] = Tree
  338. #