| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309 |
- """Interface for accessing the file system with automatic caching.
- The idea is to cache the results of any file system state reads during
- a single transaction. This has two main benefits:
- * This avoids redundant syscalls, as we won't perform the same OS
- operations multiple times.
- * This makes it easier to reason about concurrent FS updates, as different
- operations targeting the same paths can't report different state during
- a transaction.
- Note that this only deals with reading state, not writing.
- Properties maintained by the API:
- * The contents of the file are always from the same or later time compared
- to the reported mtime of the file, even if mtime is queried after reading
- a file.
- * Repeating an operation produces the same result as the first one during
- a transaction.
- * Call flush() to start a new transaction (flush the caches).
- The API is a bit limited. It's easy to add new cached operations, however.
- You should perform all file system reads through the API to actually take
- advantage of the benefits.
- """
- from __future__ import annotations
- import os
- import stat
- from mypy_extensions import mypyc_attr
- from mypy.util import hash_digest
- @mypyc_attr(allow_interpreted_subclasses=True) # for tests
- class FileSystemCache:
- def __init__(self) -> None:
- # The package root is not flushed with the caches.
- # It is set by set_package_root() below.
- self.package_root: list[str] = []
- self.flush()
- def set_package_root(self, package_root: list[str]) -> None:
- self.package_root = package_root
- def flush(self) -> None:
- """Start another transaction and empty all caches."""
- self.stat_cache: dict[str, os.stat_result] = {}
- self.stat_error_cache: dict[str, OSError] = {}
- self.listdir_cache: dict[str, list[str]] = {}
- self.listdir_error_cache: dict[str, OSError] = {}
- self.isfile_case_cache: dict[str, bool] = {}
- self.exists_case_cache: dict[str, bool] = {}
- self.read_cache: dict[str, bytes] = {}
- self.read_error_cache: dict[str, Exception] = {}
- self.hash_cache: dict[str, str] = {}
- self.fake_package_cache: set[str] = set()
- def stat(self, path: str) -> os.stat_result:
- if path in self.stat_cache:
- return self.stat_cache[path]
- if path in self.stat_error_cache:
- raise copy_os_error(self.stat_error_cache[path])
- try:
- st = os.stat(path)
- except OSError as err:
- if self.init_under_package_root(path):
- try:
- return self._fake_init(path)
- except OSError:
- pass
- # Take a copy to get rid of associated traceback and frame objects.
- # Just assigning to __traceback__ doesn't free them.
- self.stat_error_cache[path] = copy_os_error(err)
- raise err
- self.stat_cache[path] = st
- return st
- def init_under_package_root(self, path: str) -> bool:
- """Is this path an __init__.py under a package root?
- This is used to detect packages that don't contain __init__.py
- files, which is needed to support Bazel. The function should
- only be called for non-existing files.
- It will return True if it refers to a __init__.py file that
- Bazel would create, so that at runtime Python would think the
- directory containing it is a package. For this to work you
- must pass one or more package roots using the --package-root
- flag.
- As an exceptional case, any directory that is a package root
- itself will not be considered to contain a __init__.py file.
- This is different from the rules Bazel itself applies, but is
- necessary for mypy to properly distinguish packages from other
- directories.
- See https://docs.bazel.build/versions/master/be/python.html,
- where this behavior is described under legacy_create_init.
- """
- if not self.package_root:
- return False
- dirname, basename = os.path.split(path)
- if basename != "__init__.py":
- return False
- if not os.path.basename(dirname).isidentifier():
- # Can't put an __init__.py in a place that's not an identifier
- return False
- try:
- st = self.stat(dirname)
- except OSError:
- return False
- else:
- if not stat.S_ISDIR(st.st_mode):
- return False
- ok = False
- drive, path = os.path.splitdrive(path) # Ignore Windows drive name
- if os.path.isabs(path):
- path = os.path.relpath(path)
- path = os.path.normpath(path)
- for root in self.package_root:
- if path.startswith(root):
- if path == root + basename:
- # A package root itself is never a package.
- ok = False
- break
- else:
- ok = True
- return ok
- def _fake_init(self, path: str) -> os.stat_result:
- """Prime the cache with a fake __init__.py file.
- This makes code that looks for path believe an empty file by
- that name exists. Should only be called after
- init_under_package_root() returns True.
- """
- dirname, basename = os.path.split(path)
- assert basename == "__init__.py", path
- assert not os.path.exists(path), path # Not cached!
- dirname = os.path.normpath(dirname)
- st = self.stat(dirname) # May raise OSError
- # Get stat result as a list so we can modify it.
- seq: list[float] = list(st)
- seq[stat.ST_MODE] = stat.S_IFREG | 0o444
- seq[stat.ST_INO] = 1
- seq[stat.ST_NLINK] = 1
- seq[stat.ST_SIZE] = 0
- st = os.stat_result(seq)
- self.stat_cache[path] = st
- # Make listdir() and read() also pretend this file exists.
- self.fake_package_cache.add(dirname)
- return st
- def listdir(self, path: str) -> list[str]:
- path = os.path.normpath(path)
- if path in self.listdir_cache:
- res = self.listdir_cache[path]
- # Check the fake cache.
- if path in self.fake_package_cache and "__init__.py" not in res:
- res.append("__init__.py") # Updates the result as well as the cache
- return res
- if path in self.listdir_error_cache:
- raise copy_os_error(self.listdir_error_cache[path])
- try:
- results = os.listdir(path)
- except OSError as err:
- # Like above, take a copy to reduce memory use.
- self.listdir_error_cache[path] = copy_os_error(err)
- raise err
- self.listdir_cache[path] = results
- # Check the fake cache.
- if path in self.fake_package_cache and "__init__.py" not in results:
- results.append("__init__.py")
- return results
- def isfile(self, path: str) -> bool:
- try:
- st = self.stat(path)
- except OSError:
- return False
- return stat.S_ISREG(st.st_mode)
- def isfile_case(self, path: str, prefix: str) -> bool:
- """Return whether path exists and is a file.
- On case-insensitive filesystems (like Mac or Windows) this returns
- False if the case of path's last component does not exactly match
- the case found in the filesystem.
- We check also the case of other path components up to prefix.
- For example, if path is 'user-stubs/pack/mod.pyi' and prefix is 'user-stubs',
- we check that the case of 'pack' and 'mod.py' matches exactly, 'user-stubs' will be
- case insensitive on case insensitive filesystems.
- The caller must ensure that prefix is a valid file system prefix of path.
- """
- if not self.isfile(path):
- # Fast path
- return False
- if path in self.isfile_case_cache:
- return self.isfile_case_cache[path]
- head, tail = os.path.split(path)
- if not tail:
- self.isfile_case_cache[path] = False
- return False
- try:
- names = self.listdir(head)
- # This allows one to check file name case sensitively in
- # case-insensitive filesystems.
- res = tail in names
- except OSError:
- res = False
- if res:
- # Also recursively check the other path components in case sensitive way.
- res = self.exists_case(head, prefix)
- self.isfile_case_cache[path] = res
- return res
- def exists_case(self, path: str, prefix: str) -> bool:
- """Return whether path exists - checking path components in case sensitive
- fashion, up to prefix.
- """
- if path in self.exists_case_cache:
- return self.exists_case_cache[path]
- head, tail = os.path.split(path)
- if not head.startswith(prefix) or not tail:
- # Only perform the check for paths under prefix.
- self.exists_case_cache[path] = True
- return True
- try:
- names = self.listdir(head)
- # This allows one to check file name case sensitively in
- # case-insensitive filesystems.
- res = tail in names
- except OSError:
- res = False
- if res:
- # Also recursively check other path components.
- res = self.exists_case(head, prefix)
- self.exists_case_cache[path] = res
- return res
- def isdir(self, path: str) -> bool:
- try:
- st = self.stat(path)
- except OSError:
- return False
- return stat.S_ISDIR(st.st_mode)
- def exists(self, path: str) -> bool:
- try:
- self.stat(path)
- except FileNotFoundError:
- return False
- return True
- def read(self, path: str) -> bytes:
- if path in self.read_cache:
- return self.read_cache[path]
- if path in self.read_error_cache:
- raise self.read_error_cache[path]
- # Need to stat first so that the contents of file are from no
- # earlier instant than the mtime reported by self.stat().
- self.stat(path)
- dirname, basename = os.path.split(path)
- dirname = os.path.normpath(dirname)
- # Check the fake cache.
- if basename == "__init__.py" and dirname in self.fake_package_cache:
- data = b""
- else:
- try:
- with open(path, "rb") as f:
- data = f.read()
- except OSError as err:
- self.read_error_cache[path] = err
- raise
- self.read_cache[path] = data
- self.hash_cache[path] = hash_digest(data)
- return data
- def hash_digest(self, path: str) -> str:
- if path not in self.hash_cache:
- self.read(path)
- return self.hash_cache[path]
- def samefile(self, f1: str, f2: str) -> bool:
- s1 = self.stat(f1)
- s2 = self.stat(f2)
- return os.path.samestat(s1, s2)
- def copy_os_error(e: OSError) -> OSError:
- new = OSError(*e.args)
- new.errno = e.errno
- new.strerror = e.strerror
- new.filename = e.filename
- if e.filename2:
- new.filename2 = e.filename2
- return new
|