| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- """Basic introspection of modules."""
- from __future__ import annotations
- import importlib
- import inspect
- import os
- import pkgutil
- import queue
- import sys
- from multiprocessing import Process, Queue
- from types import ModuleType
- class ModuleProperties:
- # Note that all __init__ args must have default values
- def __init__(
- self,
- name: str = "",
- file: str | None = None,
- path: list[str] | None = None,
- all: list[str] | None = None,
- is_c_module: bool = False,
- subpackages: list[str] | None = None,
- ) -> None:
- self.name = name # __name__ attribute
- self.file = file # __file__ attribute
- self.path = path # __path__ attribute
- self.all = all # __all__ attribute
- self.is_c_module = is_c_module
- self.subpackages = subpackages or []
- def is_c_module(module: ModuleType) -> bool:
- if module.__dict__.get("__file__") is None:
- # Could be a namespace package. These must be handled through
- # introspection, since there is no source file.
- return True
- return os.path.splitext(module.__dict__["__file__"])[-1] in [".so", ".pyd", ".dll"]
- class InspectError(Exception):
- pass
- def get_package_properties(package_id: str) -> ModuleProperties:
- """Use runtime introspection to get information about a module/package."""
- try:
- package = importlib.import_module(package_id)
- except BaseException as e:
- raise InspectError(str(e)) from e
- name = getattr(package, "__name__", package_id)
- file = getattr(package, "__file__", None)
- path: list[str] | None = getattr(package, "__path__", None)
- if not isinstance(path, list):
- path = None
- pkg_all = getattr(package, "__all__", None)
- if pkg_all is not None:
- try:
- pkg_all = list(pkg_all)
- except Exception:
- pkg_all = None
- is_c = is_c_module(package)
- if path is None:
- # Object has no path; this means it's either a module inside a package
- # (and thus no sub-packages), or it could be a C extension package.
- if is_c:
- # This is a C extension module, now get the list of all sub-packages
- # using the inspect module
- subpackages = [
- package.__name__ + "." + name
- for name, val in inspect.getmembers(package)
- if inspect.ismodule(val) and val.__name__ == package.__name__ + "." + name
- ]
- else:
- # It's a module inside a package. There's nothing else to walk/yield.
- subpackages = []
- else:
- all_packages = pkgutil.walk_packages(
- path, prefix=package.__name__ + ".", onerror=lambda r: None
- )
- subpackages = [qualified_name for importer, qualified_name, ispkg in all_packages]
- return ModuleProperties(
- name=name, file=file, path=path, all=pkg_all, is_c_module=is_c, subpackages=subpackages
- )
- def worker(tasks: Queue[str], results: Queue[str | ModuleProperties], sys_path: list[str]) -> None:
- """The main loop of a worker introspection process."""
- sys.path = sys_path
- while True:
- mod = tasks.get()
- try:
- prop = get_package_properties(mod)
- except InspectError as e:
- results.put(str(e))
- continue
- results.put(prop)
- class ModuleInspect:
- """Perform runtime introspection of modules in a separate process.
- Reuse the process for multiple modules for efficiency. However, if there is an
- error, retry using a fresh process to avoid cross-contamination of state between
- modules.
- We use a separate process to isolate us from many side effects. For example, the
- import of a module may kill the current process, and we want to recover from that.
- Always use in a with statement for proper clean-up:
- with ModuleInspect() as m:
- p = m.get_package_properties('urllib.parse')
- """
- def __init__(self) -> None:
- self._start()
- def _start(self) -> None:
- self.tasks: Queue[str] = Queue()
- self.results: Queue[ModuleProperties | str] = Queue()
- self.proc = Process(target=worker, args=(self.tasks, self.results, sys.path))
- self.proc.start()
- self.counter = 0 # Number of successful roundtrips
- def close(self) -> None:
- """Free any resources used."""
- self.proc.terminate()
- def get_package_properties(self, package_id: str) -> ModuleProperties:
- """Return some properties of a module/package using runtime introspection.
- Raise InspectError if the target couldn't be imported.
- """
- self.tasks.put(package_id)
- res = self._get_from_queue()
- if res is None:
- # The process died; recover and report error.
- self._start()
- raise InspectError(f"Process died when importing {package_id!r}")
- if isinstance(res, str):
- # Error importing module
- if self.counter > 0:
- # Also try with a fresh process. Maybe one of the previous imports has
- # corrupted some global state.
- self.close()
- self._start()
- return self.get_package_properties(package_id)
- raise InspectError(res)
- self.counter += 1
- return res
- def _get_from_queue(self) -> ModuleProperties | str | None:
- """Get value from the queue.
- Return the value read from the queue, or None if the process unexpectedly died.
- """
- max_iter = 600
- n = 0
- while True:
- if n == max_iter:
- raise RuntimeError("Timeout waiting for subprocess")
- try:
- return self.results.get(timeout=0.05)
- except queue.Empty:
- if not self.proc.is_alive():
- return None
- n += 1
- def __enter__(self) -> ModuleInspect:
- return self
- def __exit__(self, *args: object) -> None:
- self.close()
|