| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- import time
- from contextlib import contextmanager
- from contextlib import nullcontext
- import attr
- import pytest
- from _pytest._code import ExceptionInfo
- from _pytest.capture import CaptureFixture
- from _pytest.capture import FDCapture
- from _pytest.capture import SysCapture
- from _pytest.outcomes import OutcomeException
- from _pytest.reports import TestReport
- from _pytest.runner import CallInfo
- from _pytest.runner import check_interactive_exception
- from _pytest.unittest import TestCaseFunction
- @attr.s
- class SubTestContext:
- msg = attr.ib()
- kwargs = attr.ib()
- @attr.s(init=False)
- class SubTestReport(TestReport):
- context = attr.ib()
- @property
- def head_line(self):
- _, _, domain = self.location
- return f"{domain} {self.sub_test_description()}"
- def sub_test_description(self):
- parts = []
- if isinstance(self.context.msg, str):
- parts.append(f"[{self.context.msg}]")
- if self.context.kwargs:
- params_desc = ", ".join(
- f"{k}={v!r}" for (k, v) in sorted(self.context.kwargs.items())
- )
- parts.append(f"({params_desc})")
- return " ".join(parts) or "(<subtest>)"
- def _to_json(self):
- data = super()._to_json()
- del data["context"]
- data["_report_type"] = "SubTestReport"
- data["_subtest.context"] = attr.asdict(self.context)
- return data
- @classmethod
- def _from_json(cls, reportdict):
- report = super()._from_json(reportdict)
- context_data = reportdict["_subtest.context"]
- report.context = SubTestContext(
- msg=context_data["msg"], kwargs=context_data["kwargs"]
- )
- return report
- @classmethod
- def _from_test_report(cls, test_report):
- return super()._from_json(test_report._to_json())
- def _addSubTest(self, test_case, test, exc_info):
- if exc_info is not None:
- msg = test._message if isinstance(test._message, str) else None
- call_info = make_call_info(
- ExceptionInfo(exc_info, _ispytest=True),
- start=0,
- stop=0,
- duration=0,
- when="call",
- )
- report = self.ihook.pytest_runtest_makereport(item=self, call=call_info)
- sub_report = SubTestReport._from_test_report(report)
- sub_report.context = SubTestContext(msg, dict(test.params))
- self.ihook.pytest_runtest_logreport(report=sub_report)
- if check_interactive_exception(call_info, sub_report):
- self.ihook.pytest_exception_interact(
- node=self, call=call_info, report=sub_report
- )
- def pytest_configure(config):
- TestCaseFunction.addSubTest = _addSubTest
- TestCaseFunction.failfast = False
- def pytest_unconfigure():
- if hasattr(TestCaseFunction, "_addSubTest"):
- del TestCaseFunction.addSubTest
- if hasattr(TestCaseFunction, "failfast"):
- del TestCaseFunction.failfast
- @pytest.fixture
- def subtests(request):
- capmam = request.node.config.pluginmanager.get_plugin("capturemanager")
- if capmam is not None:
- suspend_capture_ctx = capmam.global_and_fixture_disabled
- else:
- suspend_capture_ctx = nullcontext
- yield SubTests(request.node.ihook, suspend_capture_ctx, request)
- @attr.s
- class SubTests:
- ihook = attr.ib()
- suspend_capture_ctx = attr.ib()
- request = attr.ib()
- @property
- def item(self):
- return self.request.node
- @contextmanager
- def _capturing_output(self):
- option = self.request.config.getoption("capture", None)
- # capsys or capfd are active, subtest should not capture
- capman = self.request.config.pluginmanager.getplugin("capturemanager")
- capture_fixture_active = getattr(capman, "_capture_fixture", None)
- if option == "sys" and not capture_fixture_active:
- with ignore_pytest_private_warning():
- fixture = CaptureFixture(SysCapture, self.request)
- elif option == "fd" and not capture_fixture_active:
- with ignore_pytest_private_warning():
- fixture = CaptureFixture(FDCapture, self.request)
- else:
- fixture = None
- if fixture is not None:
- fixture._start()
- captured = Captured()
- try:
- yield captured
- finally:
- if fixture is not None:
- out, err = fixture.readouterr()
- fixture.close()
- captured.out = out
- captured.err = err
- @contextmanager
- def test(self, msg=None, **kwargs):
- start = time.time()
- precise_start = time.perf_counter()
- exc_info = None
- with self._capturing_output() as captured:
- try:
- yield
- except (Exception, OutcomeException):
- exc_info = ExceptionInfo.from_current()
- precise_stop = time.perf_counter()
- duration = precise_stop - precise_start
- stop = time.time()
- call_info = make_call_info(
- exc_info, start=start, stop=stop, duration=duration, when="call"
- )
- report = self.ihook.pytest_runtest_makereport(item=self.item, call=call_info)
- sub_report = SubTestReport._from_test_report(report)
- sub_report.context = SubTestContext(msg, kwargs.copy())
- captured.update_report(sub_report)
- with self.suspend_capture_ctx():
- self.ihook.pytest_runtest_logreport(report=sub_report)
- if check_interactive_exception(call_info, sub_report):
- self.ihook.pytest_exception_interact(
- node=self.item, call=call_info, report=sub_report
- )
- def make_call_info(exc_info, *, start, stop, duration, when):
- return CallInfo(
- None,
- exc_info,
- start=start,
- stop=stop,
- duration=duration,
- when=when,
- _ispytest=True,
- )
- @contextmanager
- def ignore_pytest_private_warning():
- import warnings
- with warnings.catch_warnings():
- warnings.filterwarnings(
- "ignore",
- "A private pytest class or function was used.",
- category=pytest.PytestDeprecationWarning,
- )
- yield
- @attr.s
- class Captured:
- out = attr.ib(default="", type=str)
- err = attr.ib(default="", type=str)
- def update_report(self, report):
- if self.out:
- report.sections.append(("Captured stdout call", self.out))
- if self.err:
- report.sections.append(("Captured stderr call", self.err))
- def pytest_report_to_serializable(report):
- if isinstance(report, SubTestReport):
- return report._to_json()
- def pytest_report_from_serializable(data):
- if data.get("_report_type") == "SubTestReport":
- return SubTestReport._from_json(data)
- @pytest.hookimpl(tryfirst=True)
- def pytest_report_teststatus(report):
- if report.when != "call" or not isinstance(report, SubTestReport):
- return
- if hasattr(report, "wasxfail"):
- return None
- outcome = report.outcome
- if report.passed:
- return f"subtests {outcome}", ",", "SUBPASS"
- elif report.skipped:
- return outcome, "-", "SUBSKIP"
- elif outcome == "failed":
- return outcome, "u", "SUBFAIL"
|