finder.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. """Functions related to finding and loading plugins."""
  2. from __future__ import annotations
  3. import configparser
  4. import importlib.metadata
  5. import inspect
  6. import itertools
  7. import logging
  8. import sys
  9. from typing import Any
  10. from typing import Generator
  11. from typing import Iterable
  12. from typing import NamedTuple
  13. from flake8 import utils
  14. from flake8.defaults import VALID_CODE_PREFIX
  15. from flake8.exceptions import ExecutionError
  16. from flake8.exceptions import FailedToLoadPlugin
  17. LOG = logging.getLogger(__name__)
  18. FLAKE8_GROUPS = frozenset(("flake8.extension", "flake8.report"))
  19. BANNED_PLUGINS = {
  20. "flake8-colors": "5.0",
  21. "flake8-per-file-ignores": "3.7",
  22. }
  23. class Plugin(NamedTuple):
  24. """A plugin before loading."""
  25. package: str
  26. version: str
  27. entry_point: importlib.metadata.EntryPoint
  28. class LoadedPlugin(NamedTuple):
  29. """Represents a plugin after being imported."""
  30. plugin: Plugin
  31. obj: Any
  32. parameters: dict[str, bool]
  33. @property
  34. def entry_name(self) -> str:
  35. """Return the name given in the packaging metadata."""
  36. return self.plugin.entry_point.name
  37. @property
  38. def display_name(self) -> str:
  39. """Return the name for use in user-facing / error messages."""
  40. return f"{self.plugin.package}[{self.entry_name}]"
  41. class Checkers(NamedTuple):
  42. """Classified plugins needed for checking."""
  43. tree: list[LoadedPlugin]
  44. logical_line: list[LoadedPlugin]
  45. physical_line: list[LoadedPlugin]
  46. class Plugins(NamedTuple):
  47. """Classified plugins."""
  48. checkers: Checkers
  49. reporters: dict[str, LoadedPlugin]
  50. disabled: list[LoadedPlugin]
  51. def all_plugins(self) -> Generator[LoadedPlugin, None, None]:
  52. """Return an iterator over all :class:`LoadedPlugin`s."""
  53. yield from self.checkers.tree
  54. yield from self.checkers.logical_line
  55. yield from self.checkers.physical_line
  56. yield from self.reporters.values()
  57. def versions_str(self) -> str:
  58. """Return a user-displayed list of plugin versions."""
  59. return ", ".join(
  60. sorted(
  61. {
  62. f"{loaded.plugin.package}: {loaded.plugin.version}"
  63. for loaded in self.all_plugins()
  64. if loaded.plugin.package not in {"flake8", "local"}
  65. }
  66. )
  67. )
  68. class PluginOptions(NamedTuple):
  69. """Options related to plugin loading."""
  70. local_plugin_paths: tuple[str, ...]
  71. enable_extensions: frozenset[str]
  72. require_plugins: frozenset[str]
  73. @classmethod
  74. def blank(cls) -> PluginOptions:
  75. """Make a blank PluginOptions, mostly used for tests."""
  76. return cls(
  77. local_plugin_paths=(),
  78. enable_extensions=frozenset(),
  79. require_plugins=frozenset(),
  80. )
  81. def _parse_option(
  82. cfg: configparser.RawConfigParser,
  83. cfg_opt_name: str,
  84. opt: str | None,
  85. ) -> list[str]:
  86. # specified on commandline: use that
  87. if opt is not None:
  88. return utils.parse_comma_separated_list(opt)
  89. else:
  90. # ideally this would reuse our config parsing framework but we need to
  91. # parse this from preliminary options before plugins are enabled
  92. for opt_name in (cfg_opt_name, cfg_opt_name.replace("_", "-")):
  93. val = cfg.get("flake8", opt_name, fallback=None)
  94. if val is not None:
  95. return utils.parse_comma_separated_list(val)
  96. else:
  97. return []
  98. def parse_plugin_options(
  99. cfg: configparser.RawConfigParser,
  100. cfg_dir: str,
  101. *,
  102. enable_extensions: str | None,
  103. require_plugins: str | None,
  104. ) -> PluginOptions:
  105. """Parse plugin loading related options."""
  106. paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip()
  107. paths = utils.parse_comma_separated_list(paths_s)
  108. paths = utils.normalize_paths(paths, cfg_dir)
  109. return PluginOptions(
  110. local_plugin_paths=tuple(paths),
  111. enable_extensions=frozenset(
  112. _parse_option(cfg, "enable_extensions", enable_extensions),
  113. ),
  114. require_plugins=frozenset(
  115. _parse_option(cfg, "require_plugins", require_plugins),
  116. ),
  117. )
  118. def _flake8_plugins(
  119. eps: Iterable[importlib.metadata.EntryPoint],
  120. name: str,
  121. version: str,
  122. ) -> Generator[Plugin, None, None]:
  123. pyflakes_meta = importlib.metadata.distribution("pyflakes").metadata
  124. pycodestyle_meta = importlib.metadata.distribution("pycodestyle").metadata
  125. for ep in eps:
  126. if ep.group not in FLAKE8_GROUPS:
  127. continue
  128. if ep.name == "F":
  129. yield Plugin(pyflakes_meta["name"], pyflakes_meta["version"], ep)
  130. elif ep.name in "EW":
  131. # pycodestyle provides both `E` and `W` -- but our default select
  132. # handles those
  133. # ideally pycodestyle's plugin entrypoints would exactly represent
  134. # the codes they produce...
  135. yield Plugin(
  136. pycodestyle_meta["name"], pycodestyle_meta["version"], ep
  137. )
  138. else:
  139. yield Plugin(name, version, ep)
  140. def _find_importlib_plugins() -> Generator[Plugin, None, None]:
  141. # some misconfigured pythons (RHEL) have things on `sys.path` twice
  142. seen = set()
  143. for dist in importlib.metadata.distributions():
  144. # assigned to prevent continual reparsing
  145. eps = dist.entry_points
  146. # perf: skip parsing `.metadata` (slow) if no entry points match
  147. if not any(ep.group in FLAKE8_GROUPS for ep in eps):
  148. continue
  149. # assigned to prevent continual reparsing
  150. meta = dist.metadata
  151. if meta["name"] in seen:
  152. continue
  153. else:
  154. seen.add(meta["name"])
  155. if meta["name"] in BANNED_PLUGINS:
  156. LOG.warning(
  157. "%s plugin is obsolete in flake8>=%s",
  158. meta["name"],
  159. BANNED_PLUGINS[meta["name"]],
  160. )
  161. continue
  162. elif meta["name"] == "flake8":
  163. # special case flake8 which provides plugins for pyflakes /
  164. # pycodestyle
  165. yield from _flake8_plugins(eps, meta["name"], meta["version"])
  166. continue
  167. for ep in eps:
  168. if ep.group in FLAKE8_GROUPS:
  169. yield Plugin(meta["name"], meta["version"], ep)
  170. def _find_local_plugins(
  171. cfg: configparser.RawConfigParser,
  172. ) -> Generator[Plugin, None, None]:
  173. for plugin_type in ("extension", "report"):
  174. group = f"flake8.{plugin_type}"
  175. for plugin_s in utils.parse_comma_separated_list(
  176. cfg.get("flake8:local-plugins", plugin_type, fallback="").strip(),
  177. regexp=utils.LOCAL_PLUGIN_LIST_RE,
  178. ):
  179. name, _, entry_str = plugin_s.partition("=")
  180. name, entry_str = name.strip(), entry_str.strip()
  181. ep = importlib.metadata.EntryPoint(name, entry_str, group)
  182. yield Plugin("local", "local", ep)
  183. def _check_required_plugins(
  184. plugins: list[Plugin],
  185. expected: frozenset[str],
  186. ) -> None:
  187. plugin_names = {
  188. utils.normalize_pypi_name(plugin.package) for plugin in plugins
  189. }
  190. expected_names = {utils.normalize_pypi_name(name) for name in expected}
  191. missing_plugins = expected_names - plugin_names
  192. if missing_plugins:
  193. raise ExecutionError(
  194. f"required plugins were not installed!\n"
  195. f"- installed: {', '.join(sorted(plugin_names))}\n"
  196. f"- expected: {', '.join(sorted(expected_names))}\n"
  197. f"- missing: {', '.join(sorted(missing_plugins))}"
  198. )
  199. def find_plugins(
  200. cfg: configparser.RawConfigParser,
  201. opts: PluginOptions,
  202. ) -> list[Plugin]:
  203. """Discovers all plugins (but does not load them)."""
  204. ret = [*_find_importlib_plugins(), *_find_local_plugins(cfg)]
  205. # for determinism, sort the list
  206. ret.sort()
  207. _check_required_plugins(ret, opts.require_plugins)
  208. return ret
  209. def _parameters_for(func: Any) -> dict[str, bool]:
  210. """Return the parameters for the plugin.
  211. This will inspect the plugin and return either the function parameters
  212. if the plugin is a function or the parameters for ``__init__`` after
  213. ``self`` if the plugin is a class.
  214. :returns:
  215. A dictionary mapping the parameter name to whether or not it is
  216. required (a.k.a., is positional only/does not have a default).
  217. """
  218. is_class = not inspect.isfunction(func)
  219. if is_class:
  220. func = func.__init__
  221. parameters = {
  222. parameter.name: parameter.default is inspect.Parameter.empty
  223. for parameter in inspect.signature(func).parameters.values()
  224. if parameter.kind is inspect.Parameter.POSITIONAL_OR_KEYWORD
  225. }
  226. if is_class:
  227. parameters.pop("self", None)
  228. return parameters
  229. def _load_plugin(plugin: Plugin) -> LoadedPlugin:
  230. try:
  231. obj = plugin.entry_point.load()
  232. except Exception as e:
  233. raise FailedToLoadPlugin(plugin.package, e)
  234. if not callable(obj):
  235. err = TypeError("expected loaded plugin to be callable")
  236. raise FailedToLoadPlugin(plugin.package, err)
  237. return LoadedPlugin(plugin, obj, _parameters_for(obj))
  238. def _import_plugins(
  239. plugins: list[Plugin],
  240. opts: PluginOptions,
  241. ) -> list[LoadedPlugin]:
  242. sys.path.extend(opts.local_plugin_paths)
  243. return [_load_plugin(p) for p in plugins]
  244. def _classify_plugins(
  245. plugins: list[LoadedPlugin],
  246. opts: PluginOptions,
  247. ) -> Plugins:
  248. tree = []
  249. logical_line = []
  250. physical_line = []
  251. reporters = {}
  252. disabled = []
  253. for loaded in plugins:
  254. if (
  255. getattr(loaded.obj, "off_by_default", False)
  256. and loaded.plugin.entry_point.name not in opts.enable_extensions
  257. ):
  258. disabled.append(loaded)
  259. elif loaded.plugin.entry_point.group == "flake8.report":
  260. reporters[loaded.entry_name] = loaded
  261. elif "tree" in loaded.parameters:
  262. tree.append(loaded)
  263. elif "logical_line" in loaded.parameters:
  264. logical_line.append(loaded)
  265. elif "physical_line" in loaded.parameters:
  266. physical_line.append(loaded)
  267. else:
  268. raise NotImplementedError(f"what plugin type? {loaded}")
  269. for loaded in itertools.chain(tree, logical_line, physical_line):
  270. if not VALID_CODE_PREFIX.match(loaded.entry_name):
  271. raise ExecutionError(
  272. f"plugin code for `{loaded.display_name}` does not match "
  273. f"{VALID_CODE_PREFIX.pattern}"
  274. )
  275. return Plugins(
  276. checkers=Checkers(
  277. tree=tree,
  278. logical_line=logical_line,
  279. physical_line=physical_line,
  280. ),
  281. reporters=reporters,
  282. disabled=disabled,
  283. )
  284. def load_plugins(
  285. plugins: list[Plugin],
  286. opts: PluginOptions,
  287. ) -> Plugins:
  288. """Load and classify all flake8 plugins.
  289. - first: extends ``sys.path`` with ``paths`` (to import local plugins)
  290. - next: converts the ``Plugin``s to ``LoadedPlugins``
  291. - finally: classifies plugins into their specific types
  292. """
  293. return _classify_plugins(_import_plugins(plugins, opts), opts)