finder.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. """Functions related to finding and loading plugins."""
  2. import configparser
  3. import inspect
  4. import itertools
  5. import logging
  6. import re
  7. import sys
  8. from typing import Any
  9. from typing import Dict
  10. from typing import FrozenSet
  11. from typing import Generator
  12. from typing import Iterable
  13. from typing import List
  14. from typing import NamedTuple
  15. from typing import Optional
  16. from typing import Tuple
  17. from flake8 import utils
  18. from flake8._compat import importlib_metadata
  19. from flake8.exceptions import ExecutionError
  20. from flake8.exceptions import FailedToLoadPlugin
  21. LOG = logging.getLogger(__name__)
  22. VALID_CODE = re.compile("^[A-Z]{1,3}[0-9]{0,3}$", re.ASCII)
  23. FLAKE8_GROUPS = frozenset(("flake8.extension", "flake8.report"))
  24. BANNED_PLUGINS = {
  25. "flake8-colors": "5.0",
  26. "flake8-per-file-ignores": "3.7",
  27. }
  28. class Plugin(NamedTuple):
  29. """A plugin before loading."""
  30. package: str
  31. version: str
  32. entry_point: importlib_metadata.EntryPoint
  33. class LoadedPlugin(NamedTuple):
  34. """Represents a plugin after being imported."""
  35. plugin: Plugin
  36. obj: Any
  37. parameters: Dict[str, bool]
  38. @property
  39. def entry_name(self) -> str:
  40. """Return the name given in the packaging metadata."""
  41. return self.plugin.entry_point.name
  42. @property
  43. def display_name(self) -> str:
  44. """Return the name for use in user-facing / error messages."""
  45. return f"{self.plugin.package}[{self.entry_name}]"
  46. class Checkers(NamedTuple):
  47. """Classified plugins needed for checking."""
  48. tree: List[LoadedPlugin]
  49. logical_line: List[LoadedPlugin]
  50. physical_line: List[LoadedPlugin]
  51. class Plugins(NamedTuple):
  52. """Classified plugins."""
  53. checkers: Checkers
  54. reporters: Dict[str, LoadedPlugin]
  55. disabled: List[LoadedPlugin]
  56. def all_plugins(self) -> Generator[LoadedPlugin, None, None]:
  57. """Return an iterator over all :class:`LoadedPlugin`s."""
  58. yield from self.checkers.tree
  59. yield from self.checkers.logical_line
  60. yield from self.checkers.physical_line
  61. yield from self.reporters.values()
  62. def versions_str(self) -> str:
  63. """Return a user-displayed list of plugin versions."""
  64. return ", ".join(
  65. sorted(
  66. {
  67. f"{loaded.plugin.package}: {loaded.plugin.version}"
  68. for loaded in self.all_plugins()
  69. if loaded.plugin.package not in {"flake8", "local"}
  70. }
  71. )
  72. )
  73. class PluginOptions(NamedTuple):
  74. """Options related to plugin loading."""
  75. local_plugin_paths: Tuple[str, ...]
  76. enable_extensions: FrozenSet[str]
  77. require_plugins: FrozenSet[str]
  78. @classmethod
  79. def blank(cls) -> "PluginOptions":
  80. """Make a blank PluginOptions, mostly used for tests."""
  81. return cls(
  82. local_plugin_paths=(),
  83. enable_extensions=frozenset(),
  84. require_plugins=frozenset(),
  85. )
  86. def _parse_option(
  87. cfg: configparser.RawConfigParser,
  88. cfg_opt_name: str,
  89. opt: Optional[str],
  90. ) -> List[str]:
  91. # specified on commandline: use that
  92. if opt is not None:
  93. return utils.parse_comma_separated_list(opt)
  94. else:
  95. # ideally this would reuse our config parsing framework but we need to
  96. # parse this from preliminary options before plugins are enabled
  97. for opt_name in (cfg_opt_name, cfg_opt_name.replace("_", "-")):
  98. val = cfg.get("flake8", opt_name, fallback=None)
  99. if val is not None:
  100. return utils.parse_comma_separated_list(val)
  101. else:
  102. return []
  103. def parse_plugin_options(
  104. cfg: configparser.RawConfigParser,
  105. cfg_dir: str,
  106. *,
  107. enable_extensions: Optional[str],
  108. require_plugins: Optional[str],
  109. ) -> PluginOptions:
  110. """Parse plugin loading related options."""
  111. paths_s = cfg.get("flake8:local-plugins", "paths", fallback="").strip()
  112. paths = utils.parse_comma_separated_list(paths_s)
  113. paths = utils.normalize_paths(paths, cfg_dir)
  114. return PluginOptions(
  115. local_plugin_paths=tuple(paths),
  116. enable_extensions=frozenset(
  117. _parse_option(cfg, "enable_extensions", enable_extensions),
  118. ),
  119. require_plugins=frozenset(
  120. _parse_option(cfg, "require_plugins", require_plugins),
  121. ),
  122. )
  123. def _flake8_plugins(
  124. eps: Iterable[importlib_metadata.EntryPoint],
  125. name: str,
  126. version: str,
  127. ) -> Generator[Plugin, None, None]:
  128. pyflakes_meta = importlib_metadata.distribution("pyflakes").metadata
  129. pycodestyle_meta = importlib_metadata.distribution("pycodestyle").metadata
  130. for ep in eps:
  131. if ep.group not in FLAKE8_GROUPS:
  132. continue
  133. if ep.name == "F":
  134. yield Plugin(pyflakes_meta["name"], pyflakes_meta["version"], ep)
  135. elif ep.name in "EW":
  136. # pycodestyle provides both `E` and `W` -- but our default select
  137. # handles those
  138. # ideally pycodestyle's plugin entrypoints would exactly represent
  139. # the codes they produce...
  140. yield Plugin(
  141. pycodestyle_meta["name"], pycodestyle_meta["version"], ep
  142. )
  143. else:
  144. yield Plugin(name, version, ep)
  145. def _find_importlib_plugins() -> Generator[Plugin, None, None]:
  146. # some misconfigured pythons (RHEL) have things on `sys.path` twice
  147. seen = set()
  148. for dist in importlib_metadata.distributions():
  149. # assigned to prevent continual reparsing
  150. eps = dist.entry_points
  151. # perf: skip parsing `.metadata` (slow) if no entry points match
  152. if not any(ep.group in FLAKE8_GROUPS for ep in eps):
  153. continue
  154. # assigned to prevent continual reparsing
  155. meta = dist.metadata
  156. if meta["name"] in seen:
  157. continue
  158. else:
  159. seen.add(meta["name"])
  160. if meta["name"] in BANNED_PLUGINS:
  161. LOG.warning(
  162. "%s plugin is obsolete in flake8>=%s",
  163. meta["name"],
  164. BANNED_PLUGINS[meta["name"]],
  165. )
  166. continue
  167. elif meta["name"] == "flake8":
  168. # special case flake8 which provides plugins for pyflakes /
  169. # pycodestyle
  170. yield from _flake8_plugins(eps, meta["name"], meta["version"])
  171. continue
  172. for ep in eps:
  173. if ep.group in FLAKE8_GROUPS:
  174. yield Plugin(meta["name"], meta["version"], ep)
  175. def _find_local_plugins(
  176. cfg: configparser.RawConfigParser,
  177. ) -> Generator[Plugin, None, None]:
  178. for plugin_type in ("extension", "report"):
  179. group = f"flake8.{plugin_type}"
  180. for plugin_s in utils.parse_comma_separated_list(
  181. cfg.get("flake8:local-plugins", plugin_type, fallback="").strip(),
  182. regexp=utils.LOCAL_PLUGIN_LIST_RE,
  183. ):
  184. name, _, entry_str = plugin_s.partition("=")
  185. name, entry_str = name.strip(), entry_str.strip()
  186. ep = importlib_metadata.EntryPoint(name, entry_str, group)
  187. yield Plugin("local", "local", ep)
  188. def _check_required_plugins(
  189. plugins: List[Plugin],
  190. expected: FrozenSet[str],
  191. ) -> None:
  192. plugin_names = {
  193. utils.normalize_pypi_name(plugin.package) for plugin in plugins
  194. }
  195. expected_names = {utils.normalize_pypi_name(name) for name in expected}
  196. missing_plugins = expected_names - plugin_names
  197. if missing_plugins:
  198. raise ExecutionError(
  199. f"required plugins were not installed!\n"
  200. f"- installed: {', '.join(sorted(plugin_names))}\n"
  201. f"- expected: {', '.join(sorted(expected_names))}\n"
  202. f"- missing: {', '.join(sorted(missing_plugins))}"
  203. )
  204. def find_plugins(
  205. cfg: configparser.RawConfigParser,
  206. opts: PluginOptions,
  207. ) -> List[Plugin]:
  208. """Discovers all plugins (but does not load them)."""
  209. ret = [*_find_importlib_plugins(), *_find_local_plugins(cfg)]
  210. # for determinism, sort the list
  211. ret.sort()
  212. _check_required_plugins(ret, opts.require_plugins)
  213. return ret
  214. def _parameters_for(func: Any) -> Dict[str, bool]:
  215. """Return the parameters for the plugin.
  216. This will inspect the plugin and return either the function parameters
  217. if the plugin is a function or the parameters for ``__init__`` after
  218. ``self`` if the plugin is a class.
  219. :returns:
  220. A dictionary mapping the parameter name to whether or not it is
  221. required (a.k.a., is positional only/does not have a default).
  222. """
  223. is_class = not inspect.isfunction(func)
  224. if is_class:
  225. func = func.__init__
  226. parameters = {
  227. parameter.name: parameter.default is inspect.Parameter.empty
  228. for parameter in inspect.signature(func).parameters.values()
  229. if parameter.kind is inspect.Parameter.POSITIONAL_OR_KEYWORD
  230. }
  231. if is_class:
  232. parameters.pop("self", None)
  233. return parameters
  234. def _load_plugin(plugin: Plugin) -> LoadedPlugin:
  235. try:
  236. obj = plugin.entry_point.load()
  237. except Exception as e:
  238. raise FailedToLoadPlugin(plugin.package, e)
  239. if not callable(obj):
  240. err = TypeError("expected loaded plugin to be callable")
  241. raise FailedToLoadPlugin(plugin.package, err)
  242. return LoadedPlugin(plugin, obj, _parameters_for(obj))
  243. def _import_plugins(
  244. plugins: List[Plugin],
  245. opts: PluginOptions,
  246. ) -> List[LoadedPlugin]:
  247. sys.path.extend(opts.local_plugin_paths)
  248. return [_load_plugin(p) for p in plugins]
  249. def _classify_plugins(
  250. plugins: List[LoadedPlugin],
  251. opts: PluginOptions,
  252. ) -> Plugins:
  253. tree = []
  254. logical_line = []
  255. physical_line = []
  256. reporters = {}
  257. disabled = []
  258. for loaded in plugins:
  259. if (
  260. getattr(loaded.obj, "off_by_default", False)
  261. and loaded.plugin.entry_point.name not in opts.enable_extensions
  262. ):
  263. disabled.append(loaded)
  264. elif loaded.plugin.entry_point.group == "flake8.report":
  265. reporters[loaded.entry_name] = loaded
  266. elif "tree" in loaded.parameters:
  267. tree.append(loaded)
  268. elif "logical_line" in loaded.parameters:
  269. logical_line.append(loaded)
  270. elif "physical_line" in loaded.parameters:
  271. physical_line.append(loaded)
  272. else:
  273. raise NotImplementedError(f"what plugin type? {loaded}")
  274. for loaded in itertools.chain(tree, logical_line, physical_line):
  275. if not VALID_CODE.match(loaded.entry_name):
  276. raise ExecutionError(
  277. f"plugin code for `{loaded.display_name}` does not match "
  278. f"{VALID_CODE.pattern}"
  279. )
  280. return Plugins(
  281. checkers=Checkers(
  282. tree=tree,
  283. logical_line=logical_line,
  284. physical_line=physical_line,
  285. ),
  286. reporters=reporters,
  287. disabled=disabled,
  288. )
  289. def load_plugins(
  290. plugins: List[Plugin],
  291. opts: PluginOptions,
  292. ) -> Plugins:
  293. """Load and classify all flake8 plugins.
  294. - first: extends ``sys.path`` with ``paths`` (to import local plugins)
  295. - next: converts the ``Plugin``s to ``LoadedPlugins``
  296. - finally: classifies plugins into their specific types
  297. """
  298. return _classify_plugins(_import_plugins(plugins, opts), opts)