async.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
  2. # For details: https://github.com/PyCQA/pylint/blob/main/LICENSE
  3. # Copyright (c) https://github.com/PyCQA/pylint/blob/main/CONTRIBUTORS.txt
  4. """Checker for anything related to the async protocol (PEP 492)."""
  5. from __future__ import annotations
  6. import sys
  7. from typing import TYPE_CHECKING
  8. import astroid
  9. from astroid import nodes, util
  10. from pylint import checkers
  11. from pylint.checkers import utils as checker_utils
  12. from pylint.checkers.utils import decorated_with
  13. if TYPE_CHECKING:
  14. from pylint.lint import PyLinter
  15. class AsyncChecker(checkers.BaseChecker):
  16. name = "async"
  17. msgs = {
  18. "E1700": (
  19. "Yield inside async function",
  20. "yield-inside-async-function",
  21. "Used when an `yield` or `yield from` statement is "
  22. "found inside an async function.",
  23. {"minversion": (3, 5)},
  24. ),
  25. "E1701": (
  26. "Async context manager '%s' doesn't implement __aenter__ and __aexit__.",
  27. "not-async-context-manager",
  28. "Used when an async context manager is used with an object "
  29. "that does not implement the async context management protocol.",
  30. {"minversion": (3, 5)},
  31. ),
  32. }
  33. def open(self) -> None:
  34. self._mixin_class_rgx = self.linter.config.mixin_class_rgx
  35. self._async_generators = ["contextlib.asynccontextmanager"]
  36. @checker_utils.only_required_for_messages("yield-inside-async-function")
  37. def visit_asyncfunctiondef(self, node: nodes.AsyncFunctionDef) -> None:
  38. for child in node.nodes_of_class(nodes.Yield):
  39. if child.scope() is node and (
  40. sys.version_info[:2] == (3, 5) or isinstance(child, nodes.YieldFrom)
  41. ):
  42. self.add_message("yield-inside-async-function", node=child)
  43. @checker_utils.only_required_for_messages("not-async-context-manager")
  44. def visit_asyncwith(self, node: nodes.AsyncWith) -> None:
  45. for ctx_mgr, _ in node.items:
  46. inferred = checker_utils.safe_infer(ctx_mgr)
  47. if inferred is None or isinstance(inferred, util.UninferableBase):
  48. continue
  49. if isinstance(inferred, nodes.AsyncFunctionDef):
  50. # Check if we are dealing with a function decorated
  51. # with contextlib.asynccontextmanager.
  52. if decorated_with(inferred, self._async_generators):
  53. continue
  54. elif isinstance(inferred, astroid.bases.AsyncGenerator):
  55. # Check if we are dealing with a function decorated
  56. # with contextlib.asynccontextmanager.
  57. if decorated_with(inferred.parent, self._async_generators):
  58. continue
  59. else:
  60. try:
  61. inferred.getattr("__aenter__")
  62. inferred.getattr("__aexit__")
  63. except astroid.exceptions.NotFoundError:
  64. if isinstance(inferred, astroid.Instance):
  65. # If we do not know the bases of this class,
  66. # just skip it.
  67. if not checker_utils.has_known_bases(inferred):
  68. continue
  69. # Ignore mixin classes if they match the rgx option.
  70. if (
  71. "not-async-context-manager"
  72. in self.linter.config.ignored_checks_for_mixins
  73. and self._mixin_class_rgx.match(inferred.name)
  74. ):
  75. continue
  76. else:
  77. continue
  78. self.add_message(
  79. "not-async-context-manager", node=node, args=(inferred.name,)
  80. )
  81. def register(linter: PyLinter) -> None:
  82. linter.register_checker(AsyncChecker(linter))