fun.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. """Package with general repository related functions"""
  2. from __future__ import annotations
  3. import os
  4. import stat
  5. from pathlib import Path
  6. from string import digits
  7. from git.exc import WorkTreeRepositoryUnsupported
  8. from git.objects import Object
  9. from git.refs import SymbolicReference
  10. from git.util import hex_to_bin, bin_to_hex, cygpath
  11. from gitdb.exc import (
  12. BadObject,
  13. BadName,
  14. )
  15. import os.path as osp
  16. from git.cmd import Git
  17. # Typing ----------------------------------------------------------------------
  18. from typing import Union, Optional, cast, TYPE_CHECKING
  19. from git.types import Commit_ish
  20. if TYPE_CHECKING:
  21. from git.types import PathLike
  22. from .base import Repo
  23. from git.db import GitCmdObjectDB
  24. from git.refs.reference import Reference
  25. from git.objects import Commit, TagObject, Blob, Tree
  26. from git.refs.tag import Tag
  27. # ----------------------------------------------------------------------------
  28. __all__ = (
  29. "rev_parse",
  30. "is_git_dir",
  31. "touch",
  32. "find_submodule_git_dir",
  33. "name_to_object",
  34. "short_to_long",
  35. "deref_tag",
  36. "to_commit",
  37. "find_worktree_git_dir",
  38. )
  39. def touch(filename: str) -> str:
  40. with open(filename, "ab"):
  41. pass
  42. return filename
  43. def is_git_dir(d: "PathLike") -> bool:
  44. """This is taken from the git setup.c:is_git_directory
  45. function.
  46. @throws WorkTreeRepositoryUnsupported if it sees a worktree directory. It's quite hacky to do that here,
  47. but at least clearly indicates that we don't support it.
  48. There is the unlikely danger to throw if we see directories which just look like a worktree dir,
  49. but are none."""
  50. if osp.isdir(d):
  51. if (osp.isdir(osp.join(d, "objects")) or "GIT_OBJECT_DIRECTORY" in os.environ) and osp.isdir(
  52. osp.join(d, "refs")
  53. ):
  54. headref = osp.join(d, "HEAD")
  55. return osp.isfile(headref) or (osp.islink(headref) and os.readlink(headref).startswith("refs"))
  56. elif (
  57. osp.isfile(osp.join(d, "gitdir"))
  58. and osp.isfile(osp.join(d, "commondir"))
  59. and osp.isfile(osp.join(d, "gitfile"))
  60. ):
  61. raise WorkTreeRepositoryUnsupported(d)
  62. return False
  63. def find_worktree_git_dir(dotgit: "PathLike") -> Optional[str]:
  64. """Search for a gitdir for this worktree."""
  65. try:
  66. statbuf = os.stat(dotgit)
  67. except OSError:
  68. return None
  69. if not stat.S_ISREG(statbuf.st_mode):
  70. return None
  71. try:
  72. lines = Path(dotgit).read_text().splitlines()
  73. for key, value in [line.strip().split(": ") for line in lines]:
  74. if key == "gitdir":
  75. return value
  76. except ValueError:
  77. pass
  78. return None
  79. def find_submodule_git_dir(d: "PathLike") -> Optional["PathLike"]:
  80. """Search for a submodule repo."""
  81. if is_git_dir(d):
  82. return d
  83. try:
  84. with open(d) as fp:
  85. content = fp.read().rstrip()
  86. except IOError:
  87. # it's probably not a file
  88. pass
  89. else:
  90. if content.startswith("gitdir: "):
  91. path = content[8:]
  92. if Git.is_cygwin():
  93. ## Cygwin creates submodules prefixed with `/cygdrive/...` suffixes.
  94. # Cygwin git understands Cygwin paths much better than Windows ones
  95. # Also the Cygwin tests are assuming Cygwin paths.
  96. path = cygpath(path)
  97. if not osp.isabs(path):
  98. path = osp.normpath(osp.join(osp.dirname(d), path))
  99. return find_submodule_git_dir(path)
  100. # end handle exception
  101. return None
  102. def short_to_long(odb: "GitCmdObjectDB", hexsha: str) -> Optional[bytes]:
  103. """:return: long hexadecimal sha1 from the given less-than-40 byte hexsha
  104. or None if no candidate could be found.
  105. :param hexsha: hexsha with less than 40 byte"""
  106. try:
  107. return bin_to_hex(odb.partial_to_complete_sha_hex(hexsha))
  108. except BadObject:
  109. return None
  110. # END exception handling
  111. def name_to_object(
  112. repo: "Repo", name: str, return_ref: bool = False
  113. ) -> Union[SymbolicReference, "Commit", "TagObject", "Blob", "Tree"]:
  114. """
  115. :return: object specified by the given name, hexshas ( short and long )
  116. as well as references are supported
  117. :param return_ref: if name specifies a reference, we will return the reference
  118. instead of the object. Otherwise it will raise BadObject or BadName
  119. """
  120. hexsha: Union[None, str, bytes] = None
  121. # is it a hexsha ? Try the most common ones, which is 7 to 40
  122. if repo.re_hexsha_shortened.match(name):
  123. if len(name) != 40:
  124. # find long sha for short sha
  125. hexsha = short_to_long(repo.odb, name)
  126. else:
  127. hexsha = name
  128. # END handle short shas
  129. # END find sha if it matches
  130. # if we couldn't find an object for what seemed to be a short hexsha
  131. # try to find it as reference anyway, it could be named 'aaa' for instance
  132. if hexsha is None:
  133. for base in (
  134. "%s",
  135. "refs/%s",
  136. "refs/tags/%s",
  137. "refs/heads/%s",
  138. "refs/remotes/%s",
  139. "refs/remotes/%s/HEAD",
  140. ):
  141. try:
  142. hexsha = SymbolicReference.dereference_recursive(repo, base % name)
  143. if return_ref:
  144. return SymbolicReference(repo, base % name)
  145. # END handle symbolic ref
  146. break
  147. except ValueError:
  148. pass
  149. # END for each base
  150. # END handle hexsha
  151. # didn't find any ref, this is an error
  152. if return_ref:
  153. raise BadObject("Couldn't find reference named %r" % name)
  154. # END handle return ref
  155. # tried everything ? fail
  156. if hexsha is None:
  157. raise BadName(name)
  158. # END assert hexsha was found
  159. return Object.new_from_sha(repo, hex_to_bin(hexsha))
  160. def deref_tag(tag: "Tag") -> "TagObject":
  161. """Recursively dereference a tag and return the resulting object"""
  162. while True:
  163. try:
  164. tag = tag.object
  165. except AttributeError:
  166. break
  167. # END dereference tag
  168. return tag
  169. def to_commit(obj: Object) -> Union["Commit", "TagObject"]:
  170. """Convert the given object to a commit if possible and return it"""
  171. if obj.type == "tag":
  172. obj = deref_tag(obj)
  173. if obj.type != "commit":
  174. raise ValueError("Cannot convert object %r to type commit" % obj)
  175. # END verify type
  176. return obj
  177. def rev_parse(repo: "Repo", rev: str) -> Union["Commit", "Tag", "Tree", "Blob"]:
  178. """
  179. :return: Object at the given revision, either Commit, Tag, Tree or Blob
  180. :param rev: git-rev-parse compatible revision specification as string, please see
  181. http://www.kernel.org/pub/software/scm/git/docs/git-rev-parse.html
  182. for details
  183. :raise BadObject: if the given revision could not be found
  184. :raise ValueError: If rev couldn't be parsed
  185. :raise IndexError: If invalid reflog index is specified"""
  186. # colon search mode ?
  187. if rev.startswith(":/"):
  188. # colon search mode
  189. raise NotImplementedError("commit by message search ( regex )")
  190. # END handle search
  191. obj: Union[Commit_ish, "Reference", None] = None
  192. ref = None
  193. output_type = "commit"
  194. start = 0
  195. parsed_to = 0
  196. lr = len(rev)
  197. while start < lr:
  198. if rev[start] not in "^~:@":
  199. start += 1
  200. continue
  201. # END handle start
  202. token = rev[start]
  203. if obj is None:
  204. # token is a rev name
  205. if start == 0:
  206. ref = repo.head.ref
  207. else:
  208. if token == "@":
  209. ref = cast("Reference", name_to_object(repo, rev[:start], return_ref=True))
  210. else:
  211. obj = cast(Commit_ish, name_to_object(repo, rev[:start]))
  212. # END handle token
  213. # END handle refname
  214. else:
  215. assert obj is not None
  216. if ref is not None:
  217. obj = cast("Commit", ref.commit)
  218. # END handle ref
  219. # END initialize obj on first token
  220. start += 1
  221. # try to parse {type}
  222. if start < lr and rev[start] == "{":
  223. end = rev.find("}", start)
  224. if end == -1:
  225. raise ValueError("Missing closing brace to define type in %s" % rev)
  226. output_type = rev[start + 1 : end] # exclude brace
  227. # handle type
  228. if output_type == "commit":
  229. pass # default
  230. elif output_type == "tree":
  231. try:
  232. obj = cast(Commit_ish, obj)
  233. obj = to_commit(obj).tree
  234. except (AttributeError, ValueError):
  235. pass # error raised later
  236. # END exception handling
  237. elif output_type in ("", "blob"):
  238. obj = cast("TagObject", obj)
  239. if obj and obj.type == "tag":
  240. obj = deref_tag(obj)
  241. else:
  242. # cannot do anything for non-tags
  243. pass
  244. # END handle tag
  245. elif token == "@":
  246. # try single int
  247. assert ref is not None, "Require Reference to access reflog"
  248. revlog_index = None
  249. try:
  250. # transform reversed index into the format of our revlog
  251. revlog_index = -(int(output_type) + 1)
  252. except ValueError as e:
  253. # TODO: Try to parse the other date options, using parse_date
  254. # maybe
  255. raise NotImplementedError("Support for additional @{...} modes not implemented") from e
  256. # END handle revlog index
  257. try:
  258. entry = ref.log_entry(revlog_index)
  259. except IndexError as e:
  260. raise IndexError("Invalid revlog index: %i" % revlog_index) from e
  261. # END handle index out of bound
  262. obj = Object.new_from_sha(repo, hex_to_bin(entry.newhexsha))
  263. # make it pass the following checks
  264. output_type = ""
  265. else:
  266. raise ValueError("Invalid output type: %s ( in %s )" % (output_type, rev))
  267. # END handle output type
  268. # empty output types don't require any specific type, its just about dereferencing tags
  269. if output_type and obj and obj.type != output_type:
  270. raise ValueError("Could not accommodate requested object type %r, got %s" % (output_type, obj.type))
  271. # END verify output type
  272. start = end + 1 # skip brace
  273. parsed_to = start
  274. continue
  275. # END parse type
  276. # try to parse a number
  277. num = 0
  278. if token != ":":
  279. found_digit = False
  280. while start < lr:
  281. if rev[start] in digits:
  282. num = num * 10 + int(rev[start])
  283. start += 1
  284. found_digit = True
  285. else:
  286. break
  287. # END handle number
  288. # END number parse loop
  289. # no explicit number given, 1 is the default
  290. # It could be 0 though
  291. if not found_digit:
  292. num = 1
  293. # END set default num
  294. # END number parsing only if non-blob mode
  295. parsed_to = start
  296. # handle hierarchy walk
  297. try:
  298. obj = cast(Commit_ish, obj)
  299. if token == "~":
  300. obj = to_commit(obj)
  301. for _ in range(num):
  302. obj = obj.parents[0]
  303. # END for each history item to walk
  304. elif token == "^":
  305. obj = to_commit(obj)
  306. # must be n'th parent
  307. if num:
  308. obj = obj.parents[num - 1]
  309. elif token == ":":
  310. if obj.type != "tree":
  311. obj = obj.tree
  312. # END get tree type
  313. obj = obj[rev[start:]]
  314. parsed_to = lr
  315. else:
  316. raise ValueError("Invalid token: %r" % token)
  317. # END end handle tag
  318. except (IndexError, AttributeError) as e:
  319. raise BadName(
  320. f"Invalid revision spec '{rev}' - not enough " f"parent commits to reach '{token}{int(num)}'"
  321. ) from e
  322. # END exception handling
  323. # END parse loop
  324. # still no obj ? Its probably a simple name
  325. if obj is None:
  326. obj = cast(Commit_ish, name_to_object(repo, rev))
  327. parsed_to = lr
  328. # END handle simple name
  329. if obj is None:
  330. raise ValueError("Revision specifier could not be parsed: %s" % rev)
  331. if parsed_to != lr:
  332. raise ValueError("Didn't consume complete rev spec %s, consumed part: %s" % (rev, rev[:parsed_to]))
  333. return obj