pytest_subtests.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. import time
  2. from contextlib import contextmanager
  3. from contextlib import nullcontext
  4. import attr
  5. import pytest
  6. from _pytest._code import ExceptionInfo
  7. from _pytest.capture import CaptureFixture
  8. from _pytest.capture import FDCapture
  9. from _pytest.capture import SysCapture
  10. from _pytest.outcomes import OutcomeException
  11. from _pytest.reports import TestReport
  12. from _pytest.runner import CallInfo
  13. from _pytest.runner import check_interactive_exception
  14. from _pytest.unittest import TestCaseFunction
  15. @attr.s
  16. class SubTestContext:
  17. msg = attr.ib()
  18. kwargs = attr.ib()
  19. @attr.s(init=False)
  20. class SubTestReport(TestReport):
  21. context = attr.ib()
  22. @property
  23. def head_line(self):
  24. _, _, domain = self.location
  25. return f"{domain} {self.sub_test_description()}"
  26. def sub_test_description(self):
  27. parts = []
  28. if isinstance(self.context.msg, str):
  29. parts.append(f"[{self.context.msg}]")
  30. if self.context.kwargs:
  31. params_desc = ", ".join(
  32. f"{k}={v!r}" for (k, v) in sorted(self.context.kwargs.items())
  33. )
  34. parts.append(f"({params_desc})")
  35. return " ".join(parts) or "(<subtest>)"
  36. def _to_json(self):
  37. data = super()._to_json()
  38. del data["context"]
  39. data["_report_type"] = "SubTestReport"
  40. data["_subtest.context"] = attr.asdict(self.context)
  41. return data
  42. @classmethod
  43. def _from_json(cls, reportdict):
  44. report = super()._from_json(reportdict)
  45. context_data = reportdict["_subtest.context"]
  46. report.context = SubTestContext(
  47. msg=context_data["msg"], kwargs=context_data["kwargs"]
  48. )
  49. return report
  50. @classmethod
  51. def _from_test_report(cls, test_report):
  52. return super()._from_json(test_report._to_json())
  53. def _addSubTest(self, test_case, test, exc_info):
  54. if exc_info is not None:
  55. msg = test._message if isinstance(test._message, str) else None
  56. call_info = make_call_info(
  57. ExceptionInfo(exc_info, _ispytest=True),
  58. start=0,
  59. stop=0,
  60. duration=0,
  61. when="call",
  62. )
  63. report = self.ihook.pytest_runtest_makereport(item=self, call=call_info)
  64. sub_report = SubTestReport._from_test_report(report)
  65. sub_report.context = SubTestContext(msg, dict(test.params))
  66. self.ihook.pytest_runtest_logreport(report=sub_report)
  67. if check_interactive_exception(call_info, sub_report):
  68. self.ihook.pytest_exception_interact(
  69. node=self, call=call_info, report=sub_report
  70. )
  71. def pytest_configure(config):
  72. TestCaseFunction.addSubTest = _addSubTest
  73. TestCaseFunction.failfast = False
  74. def pytest_unconfigure():
  75. if hasattr(TestCaseFunction, "_addSubTest"):
  76. del TestCaseFunction.addSubTest
  77. if hasattr(TestCaseFunction, "failfast"):
  78. del TestCaseFunction.failfast
  79. @pytest.fixture
  80. def subtests(request):
  81. capmam = request.node.config.pluginmanager.get_plugin("capturemanager")
  82. if capmam is not None:
  83. suspend_capture_ctx = capmam.global_and_fixture_disabled
  84. else:
  85. suspend_capture_ctx = nullcontext
  86. yield SubTests(request.node.ihook, suspend_capture_ctx, request)
  87. @attr.s
  88. class SubTests:
  89. ihook = attr.ib()
  90. suspend_capture_ctx = attr.ib()
  91. request = attr.ib()
  92. @property
  93. def item(self):
  94. return self.request.node
  95. @contextmanager
  96. def _capturing_output(self):
  97. option = self.request.config.getoption("capture", None)
  98. # capsys or capfd are active, subtest should not capture
  99. capman = self.request.config.pluginmanager.getplugin("capturemanager")
  100. capture_fixture_active = getattr(capman, "_capture_fixture", None)
  101. if option == "sys" and not capture_fixture_active:
  102. with ignore_pytest_private_warning():
  103. fixture = CaptureFixture(SysCapture, self.request)
  104. elif option == "fd" and not capture_fixture_active:
  105. with ignore_pytest_private_warning():
  106. fixture = CaptureFixture(FDCapture, self.request)
  107. else:
  108. fixture = None
  109. if fixture is not None:
  110. fixture._start()
  111. captured = Captured()
  112. try:
  113. yield captured
  114. finally:
  115. if fixture is not None:
  116. out, err = fixture.readouterr()
  117. fixture.close()
  118. captured.out = out
  119. captured.err = err
  120. @contextmanager
  121. def test(self, msg=None, **kwargs):
  122. start = time.time()
  123. precise_start = time.perf_counter()
  124. exc_info = None
  125. with self._capturing_output() as captured:
  126. try:
  127. yield
  128. except (Exception, OutcomeException):
  129. exc_info = ExceptionInfo.from_current()
  130. precise_stop = time.perf_counter()
  131. duration = precise_stop - precise_start
  132. stop = time.time()
  133. call_info = make_call_info(
  134. exc_info, start=start, stop=stop, duration=duration, when="call"
  135. )
  136. report = self.ihook.pytest_runtest_makereport(item=self.item, call=call_info)
  137. sub_report = SubTestReport._from_test_report(report)
  138. sub_report.context = SubTestContext(msg, kwargs.copy())
  139. captured.update_report(sub_report)
  140. with self.suspend_capture_ctx():
  141. self.ihook.pytest_runtest_logreport(report=sub_report)
  142. if check_interactive_exception(call_info, sub_report):
  143. self.ihook.pytest_exception_interact(
  144. node=self.item, call=call_info, report=sub_report
  145. )
  146. def make_call_info(exc_info, *, start, stop, duration, when):
  147. return CallInfo(
  148. None,
  149. exc_info,
  150. start=start,
  151. stop=stop,
  152. duration=duration,
  153. when=when,
  154. _ispytest=True,
  155. )
  156. @contextmanager
  157. def ignore_pytest_private_warning():
  158. import warnings
  159. with warnings.catch_warnings():
  160. warnings.filterwarnings(
  161. "ignore",
  162. "A private pytest class or function was used.",
  163. category=pytest.PytestDeprecationWarning,
  164. )
  165. yield
  166. @attr.s
  167. class Captured:
  168. out = attr.ib(default="", type=str)
  169. err = attr.ib(default="", type=str)
  170. def update_report(self, report):
  171. if self.out:
  172. report.sections.append(("Captured stdout call", self.out))
  173. if self.err:
  174. report.sections.append(("Captured stderr call", self.err))
  175. def pytest_report_to_serializable(report):
  176. if isinstance(report, SubTestReport):
  177. return report._to_json()
  178. def pytest_report_from_serializable(data):
  179. if data.get("_report_type") == "SubTestReport":
  180. return SubTestReport._from_json(data)
  181. @pytest.hookimpl(tryfirst=True)
  182. def pytest_report_teststatus(report):
  183. if report.when != "call" or not isinstance(report, SubTestReport):
  184. return
  185. if hasattr(report, "wasxfail"):
  186. return None
  187. outcome = report.outcome
  188. if report.passed:
  189. return f"subtests {outcome}", ",", "SUBPASS"
  190. elif report.skipped:
  191. return outcome, "-", "SUBSKIP"
  192. elif outcome == "failed":
  193. return outcome, "u", "SUBFAIL"