| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822 |
- """Utilities for processing .test files containing test case descriptions."""
- from __future__ import annotations
- import os
- import os.path
- import posixpath
- import re
- import shutil
- import sys
- import tempfile
- from abc import abstractmethod
- from dataclasses import dataclass
- from pathlib import Path
- from typing import Any, Final, Iterator, NamedTuple, NoReturn, Pattern, Union
- from typing_extensions import TypeAlias as _TypeAlias
- import pytest
- from mypy import defaults
- from mypy.test.config import PREFIX, test_data_prefix, test_temp_dir
- root_dir = os.path.normpath(PREFIX)
- # Debuggers that we support for debugging mypyc run tests
- # implementation of using each of these debuggers is in test_run.py
- # TODO: support more debuggers
- SUPPORTED_DEBUGGERS: Final = ["gdb", "lldb"]
- # File modify/create operation: copy module contents from source_path.
- class UpdateFile(NamedTuple):
- module: str
- content: str
- target_path: str
- # File delete operation: delete module file.
- class DeleteFile(NamedTuple):
- module: str
- path: str
- FileOperation: _TypeAlias = Union[UpdateFile, DeleteFile]
- def _file_arg_to_module(filename: str) -> str:
- filename, _ = os.path.splitext(filename)
- parts = filename.split("/") # not os.sep since it comes from test data
- if parts[-1] == "__init__":
- parts.pop()
- return ".".join(parts)
- def parse_test_case(case: DataDrivenTestCase) -> None:
- """Parse and prepare a single case from suite with test case descriptions.
- This method is part of the setup phase, just before the test case is run.
- """
- test_items = parse_test_data(case.data, case.name)
- base_path = case.suite.base_path
- if case.suite.native_sep:
- join = os.path.join
- else:
- join = posixpath.join
- out_section_missing = case.suite.required_out_section
- normalize_output = True
- files: list[tuple[str, str]] = [] # path and contents
- output_files: list[tuple[str, str | Pattern[str]]] = [] # output path and contents
- output: list[str] = [] # Regular output errors
- output2: dict[int, list[str]] = {} # Output errors for incremental, runs 2+
- deleted_paths: dict[int, set[str]] = {} # from run number of paths
- stale_modules: dict[int, set[str]] = {} # from run number to module names
- rechecked_modules: dict[int, set[str]] = {} # from run number module names
- triggered: list[str] = [] # Active triggers (one line per incremental step)
- targets: dict[int, list[str]] = {} # Fine-grained targets (per fine-grained update)
- test_modules: list[str] = [] # Modules which are deemed "test" (vs "fixture")
- def _case_fail(msg: str) -> NoReturn:
- pytest.fail(f"{case.file}:{case.line}: {msg}", pytrace=False)
- # Process the parsed items. Each item has a header of form [id args],
- # optionally followed by lines of text.
- item = first_item = test_items[0]
- test_modules.append("__main__")
- for item in test_items[1:]:
- def _item_fail(msg: str) -> NoReturn:
- item_abs_line = case.line + item.line - 2
- pytest.fail(f"{case.file}:{item_abs_line}: {msg}", pytrace=False)
- if item.id in {"file", "fixture", "outfile", "outfile-re"}:
- # Record an extra file needed for the test case.
- assert item.arg is not None
- contents = expand_variables("\n".join(item.data))
- path = join(base_path, item.arg)
- if item.id != "fixture":
- test_modules.append(_file_arg_to_module(item.arg))
- if item.id in {"file", "fixture"}:
- files.append((path, contents))
- elif item.id == "outfile-re":
- output_files.append((path, re.compile(contents.rstrip(), re.S)))
- elif item.id == "outfile":
- output_files.append((path, contents))
- elif item.id == "builtins":
- # Use an alternative stub file for the builtins module.
- assert item.arg is not None
- mpath = join(os.path.dirname(case.file), item.arg)
- with open(mpath, encoding="utf8") as f:
- files.append((join(base_path, "builtins.pyi"), f.read()))
- elif item.id == "typing":
- # Use an alternative stub file for the typing module.
- assert item.arg is not None
- src_path = join(os.path.dirname(case.file), item.arg)
- with open(src_path, encoding="utf8") as f:
- files.append((join(base_path, "typing.pyi"), f.read()))
- elif item.id == "_typeshed":
- # Use an alternative stub file for the _typeshed module.
- assert item.arg is not None
- src_path = join(os.path.dirname(case.file), item.arg)
- with open(src_path, encoding="utf8") as f:
- files.append((join(base_path, "_typeshed.pyi"), f.read()))
- elif re.match(r"stale[0-9]*$", item.id):
- passnum = 1 if item.id == "stale" else int(item.id[len("stale") :])
- assert passnum > 0
- modules = set() if item.arg is None else {t.strip() for t in item.arg.split(",")}
- stale_modules[passnum] = modules
- elif re.match(r"rechecked[0-9]*$", item.id):
- passnum = 1 if item.id == "rechecked" else int(item.id[len("rechecked") :])
- assert passnum > 0
- modules = set() if item.arg is None else {t.strip() for t in item.arg.split(",")}
- rechecked_modules[passnum] = modules
- elif re.match(r"targets[0-9]*$", item.id):
- passnum = 1 if item.id == "targets" else int(item.id[len("targets") :])
- assert passnum > 0
- reprocessed = [] if item.arg is None else [t.strip() for t in item.arg.split(",")]
- targets[passnum] = reprocessed
- elif item.id == "delete":
- # File/directory to delete during a multi-step test case
- assert item.arg is not None
- m = re.match(r"(.*)\.([0-9]+)$", item.arg)
- if m is None:
- _item_fail(f"Invalid delete section {item.arg!r}")
- num = int(m.group(2))
- if num < 2:
- _item_fail(f"Can't delete during step {num}")
- full = join(base_path, m.group(1))
- deleted_paths.setdefault(num, set()).add(full)
- elif re.match(r"out[0-9]*$", item.id):
- if item.arg is None:
- args = []
- else:
- args = item.arg.split(",")
- version_check = True
- for arg in args:
- if arg == "skip-path-normalization":
- normalize_output = False
- if arg.startswith("version"):
- compare_op = arg[7:9]
- if compare_op not in {">=", "=="}:
- _item_fail("Only >= and == version checks are currently supported")
- version_str = arg[9:]
- try:
- version = tuple(int(x) for x in version_str.split("."))
- except ValueError:
- _item_fail(f"{version_str!r} is not a valid python version")
- if compare_op == ">=":
- if version <= defaults.PYTHON3_VERSION:
- _item_fail(
- f"{arg} always true since minimum runtime version is {defaults.PYTHON3_VERSION}"
- )
- version_check = sys.version_info >= version
- elif compare_op == "==":
- if version < defaults.PYTHON3_VERSION:
- _item_fail(
- f"{arg} always false since minimum runtime version is {defaults.PYTHON3_VERSION}"
- )
- if not 1 < len(version) < 4:
- _item_fail(
- f'Only minor or patch version checks are currently supported with "==": {version_str!r}'
- )
- version_check = sys.version_info[: len(version)] == version
- if version_check:
- tmp_output = [expand_variables(line) for line in item.data]
- if os.path.sep == "\\" and normalize_output:
- tmp_output = [fix_win_path(line) for line in tmp_output]
- if item.id == "out" or item.id == "out1":
- output = tmp_output
- else:
- passnum = int(item.id[len("out") :])
- assert passnum > 1
- output2[passnum] = tmp_output
- out_section_missing = False
- elif item.id == "triggered" and item.arg is None:
- triggered = item.data
- else:
- section_str = item.id + (f" {item.arg}" if item.arg else "")
- _item_fail(f"Invalid section header [{section_str}] in case {case.name!r}")
- if out_section_missing:
- _case_fail(f"Required output section not found in case {case.name!r}")
- for passnum in stale_modules.keys():
- if passnum not in rechecked_modules:
- # If the set of rechecked modules isn't specified, make it the same as the set
- # of modules with a stale public interface.
- rechecked_modules[passnum] = stale_modules[passnum]
- if (
- passnum in stale_modules
- and passnum in rechecked_modules
- and not stale_modules[passnum].issubset(rechecked_modules[passnum])
- ):
- _case_fail(f"Stale modules after pass {passnum} must be a subset of rechecked modules")
- output_inline_start = len(output)
- input = first_item.data
- expand_errors(input, output, "main")
- for file_path, contents in files:
- expand_errors(contents.split("\n"), output, file_path)
- seen_files = set()
- for file, _ in files:
- if file in seen_files:
- _case_fail(f"Duplicated filename {file}. Did you include it multiple times?")
- seen_files.add(file)
- case.input = input
- case.output = output
- case.output_inline_start = output_inline_start
- case.output2 = output2
- case.last_line = case.line + item.line + len(item.data) - 2
- case.files = files
- case.output_files = output_files
- case.expected_stale_modules = stale_modules
- case.expected_rechecked_modules = rechecked_modules
- case.deleted_paths = deleted_paths
- case.triggered = triggered or []
- case.normalize_output = normalize_output
- case.expected_fine_grained_targets = targets
- case.test_modules = test_modules
- class DataDrivenTestCase(pytest.Item):
- """Holds parsed data-driven test cases, and handles directory setup and teardown."""
- # Override parent member type
- parent: DataSuiteCollector
- input: list[str]
- output: list[str] # Output for the first pass
- output_inline_start: int
- output2: dict[int, list[str]] # Output for runs 2+, indexed by run number
- # full path of test suite
- file = ""
- line = 0
- # (file path, file content) tuples
- files: list[tuple[str, str]]
- # Modules which is to be considered "test" rather than "fixture"
- test_modules: list[str]
- expected_stale_modules: dict[int, set[str]]
- expected_rechecked_modules: dict[int, set[str]]
- expected_fine_grained_targets: dict[int, list[str]]
- # Whether or not we should normalize the output to standardize things like
- # forward vs backward slashes in file paths for Windows vs Linux.
- normalize_output = True
- # Extra attributes used by some tests.
- last_line: int
- output_files: list[tuple[str, str | Pattern[str]]] # Path and contents for output files
- deleted_paths: dict[int, set[str]] # Mapping run number -> paths
- triggered: list[str] # Active triggers (one line per incremental step)
- def __init__(
- self,
- parent: DataSuiteCollector,
- suite: DataSuite,
- file: str,
- name: str,
- writescache: bool,
- only_when: str,
- platform: str | None,
- skip: bool,
- xfail: bool,
- data: str,
- line: int,
- ) -> None:
- super().__init__(name, parent)
- self.suite = suite
- self.file = file
- self.writescache = writescache
- self.only_when = only_when
- if (platform == "windows" and sys.platform != "win32") or (
- platform == "posix" and sys.platform == "win32"
- ):
- skip = True
- self.skip = skip
- self.xfail = xfail
- self.data = data
- self.line = line
- self.old_cwd: str | None = None
- self.tmpdir: tempfile.TemporaryDirectory[str] | None = None
- def runtest(self) -> None:
- if self.skip:
- pytest.skip()
- # TODO: add a better error message for when someone uses skip and xfail at the same time
- elif self.xfail:
- self.add_marker(pytest.mark.xfail)
- parent = self.getparent(DataSuiteCollector)
- assert parent is not None, "Should not happen"
- suite = parent.obj()
- suite.setup()
- try:
- suite.run_case(self)
- except Exception:
- # As a debugging aid, support copying the contents of the tmp directory somewhere
- save_dir: str | None = self.config.getoption("--save-failures-to", None)
- if save_dir:
- assert self.tmpdir is not None
- target_dir = os.path.join(save_dir, os.path.basename(self.tmpdir.name))
- print(f"Copying data from test {self.name} to {target_dir}")
- if not os.path.isabs(target_dir):
- assert self.old_cwd
- target_dir = os.path.join(self.old_cwd, target_dir)
- shutil.copytree(self.tmpdir.name, target_dir)
- raise
- def setup(self) -> None:
- parse_test_case(case=self)
- self.old_cwd = os.getcwd()
- self.tmpdir = tempfile.TemporaryDirectory(prefix="mypy-test-")
- os.chdir(self.tmpdir.name)
- os.mkdir(test_temp_dir)
- # Precalculate steps for find_steps()
- steps: dict[int, list[FileOperation]] = {}
- for path, content in self.files:
- m = re.match(r".*\.([0-9]+)$", path)
- if m:
- # Skip writing subsequent incremental steps - rather
- # store them as operations.
- num = int(m.group(1))
- assert num >= 2
- target_path = re.sub(r"\.[0-9]+$", "", path)
- module = module_from_path(target_path)
- operation = UpdateFile(module, content, target_path)
- steps.setdefault(num, []).append(operation)
- else:
- # Write the first incremental steps
- dir = os.path.dirname(path)
- os.makedirs(dir, exist_ok=True)
- with open(path, "w", encoding="utf8") as f:
- f.write(content)
- for num, paths in self.deleted_paths.items():
- assert num >= 2
- for path in paths:
- module = module_from_path(path)
- steps.setdefault(num, []).append(DeleteFile(module, path))
- max_step = max(steps) if steps else 2
- self.steps = [steps.get(num, []) for num in range(2, max_step + 1)]
- def teardown(self) -> None:
- if self.old_cwd is not None:
- os.chdir(self.old_cwd)
- if self.tmpdir is not None:
- try:
- self.tmpdir.cleanup()
- except OSError:
- pass
- self.old_cwd = None
- self.tmpdir = None
- def reportinfo(self) -> tuple[str, int, str]:
- return self.file, self.line, self.name
- def repr_failure(
- self, excinfo: pytest.ExceptionInfo[BaseException], style: Any | None = None
- ) -> str:
- excrepr: object
- if isinstance(excinfo.value, SystemExit):
- # We assume that before doing exit() (which raises SystemExit) we've printed
- # enough context about what happened so that a stack trace is not useful.
- # In particular, uncaught exceptions during semantic analysis or type checking
- # call exit() and they already print out a stack trace.
- excrepr = excinfo.exconly()
- elif isinstance(excinfo.value, pytest.fail.Exception) and not excinfo.value.pytrace:
- excrepr = excinfo.exconly()
- else:
- excinfo.traceback = self.parent._traceback_filter(excinfo)
- excrepr = excinfo.getrepr(style="short")
- return f"data: {self.file}:{self.line}:\n{excrepr}"
- def find_steps(self) -> list[list[FileOperation]]:
- """Return a list of descriptions of file operations for each incremental step.
- The first list item corresponds to the first incremental step, the second for the
- second step, etc. Each operation can either be a file modification/creation (UpdateFile)
- or deletion (DeleteFile).
- Defaults to having two steps if there aern't any operations.
- """
- return self.steps
- def module_from_path(path: str) -> str:
- path = re.sub(r"\.pyi?$", "", path)
- # We can have a mix of Unix-style and Windows-style separators.
- parts = re.split(r"[/\\]", path)
- del parts[0]
- module = ".".join(parts)
- module = re.sub(r"\.__init__$", "", module)
- return module
- @dataclass
- class TestItem:
- """Parsed test caseitem.
- An item is of the form
- [id arg]
- .. data ..
- """
- id: str
- arg: str | None
- # Processed, collapsed text data
- data: list[str]
- # Start line: 1-based, inclusive, relative to testcase
- line: int
- # End line: 1-based, exclusive, relative to testcase; not same as `line + len(test_item.data)` due to collapsing
- end_line: int
- @property
- def trimmed_newlines(self) -> int: # compensates for strip_list
- return self.end_line - self.line - len(self.data)
- def parse_test_data(raw_data: str, name: str) -> list[TestItem]:
- """Parse a list of lines that represent a sequence of test items."""
- lines = ["", "[case " + name + "]"] + raw_data.split("\n")
- ret: list[TestItem] = []
- data: list[str] = []
- id: str | None = None
- arg: str | None = None
- i = 0
- i0 = 0
- while i < len(lines):
- s = lines[i].strip()
- if lines[i].startswith("[") and s.endswith("]"):
- if id:
- data = collapse_line_continuation(data)
- data = strip_list(data)
- ret.append(TestItem(id, arg, data, i0 + 1, i))
- i0 = i
- id = s[1:-1]
- arg = None
- if " " in id:
- arg = id[id.index(" ") + 1 :]
- id = id[: id.index(" ")]
- data = []
- elif lines[i].startswith("\\["):
- data.append(lines[i][1:])
- elif not lines[i].startswith("--"):
- data.append(lines[i])
- elif lines[i].startswith("----"):
- data.append(lines[i][2:])
- i += 1
- # Process the last item.
- if id:
- data = collapse_line_continuation(data)
- data = strip_list(data)
- ret.append(TestItem(id, arg, data, i0 + 1, i - 1))
- return ret
- def strip_list(l: list[str]) -> list[str]:
- """Return a stripped copy of l.
- Strip whitespace at the end of all lines, and strip all empty
- lines from the end of the array.
- """
- r: list[str] = []
- for s in l:
- # Strip spaces at end of line
- r.append(re.sub(r"\s+$", "", s))
- while r and r[-1] == "":
- r.pop()
- return r
- def collapse_line_continuation(l: list[str]) -> list[str]:
- r: list[str] = []
- cont = False
- for s in l:
- ss = re.sub(r"\\$", "", s)
- if cont:
- r[-1] += re.sub("^ +", "", ss)
- else:
- r.append(ss)
- cont = s.endswith("\\")
- return r
- def expand_variables(s: str) -> str:
- return s.replace("<ROOT>", root_dir)
- def expand_errors(input: list[str], output: list[str], fnam: str) -> None:
- """Transform comments such as '# E: message' or
- '# E:3: message' in input.
- The result is lines like 'fnam:line: error: message'.
- """
- for i in range(len(input)):
- # The first in the split things isn't a comment
- for possible_err_comment in input[i].split(" # ")[1:]:
- m = re.search(
- r"^([ENW]):((?P<col>\d+):)? (?P<message>.*)$", possible_err_comment.strip()
- )
- if m:
- if m.group(1) == "E":
- severity = "error"
- elif m.group(1) == "N":
- severity = "note"
- elif m.group(1) == "W":
- severity = "warning"
- col = m.group("col")
- message = m.group("message")
- message = message.replace("\\#", "#") # adds back escaped # character
- if col is None:
- output.append(f"{fnam}:{i + 1}: {severity}: {message}")
- else:
- output.append(f"{fnam}:{i + 1}:{col}: {severity}: {message}")
- def fix_win_path(line: str) -> str:
- r"""Changes Windows paths to Linux paths in error messages.
- E.g. foo\bar.py -> foo/bar.py.
- """
- line = line.replace(root_dir, root_dir.replace("\\", "/"))
- m = re.match(r"^([\S/]+):(\d+:)?(\s+.*)", line)
- if not m:
- return line
- else:
- filename, lineno, message = m.groups()
- return "{}:{}{}".format(filename.replace("\\", "/"), lineno or "", message)
- def fix_cobertura_filename(line: str) -> str:
- r"""Changes filename paths to Linux paths in Cobertura output files.
- E.g. filename="pkg\subpkg\a.py" -> filename="pkg/subpkg/a.py".
- """
- m = re.search(r'<class .* filename="(?P<filename>.*?)"', line)
- if not m:
- return line
- return "{}{}{}".format(
- line[: m.start(1)], m.group("filename").replace("\\", "/"), line[m.end(1) :]
- )
- ##
- #
- # pytest setup
- #
- ##
- # This function name is special to pytest. See
- # https://docs.pytest.org/en/latest/reference.html#initialization-hooks
- def pytest_addoption(parser: Any) -> None:
- group = parser.getgroup("mypy")
- group.addoption(
- "--update-data",
- action="store_true",
- default=False,
- help="Update test data to reflect actual output (supported only for certain tests)",
- )
- group.addoption(
- "--save-failures-to",
- default=None,
- help="Copy the temp directories from failing tests to a target directory",
- )
- group.addoption(
- "--mypy-verbose", action="count", help="Set the verbose flag when creating mypy Options"
- )
- group.addoption(
- "--mypyc-showc",
- action="store_true",
- default=False,
- help="Display C code on mypyc test failures",
- )
- group.addoption(
- "--mypyc-debug",
- default=None,
- dest="debugger",
- choices=SUPPORTED_DEBUGGERS,
- help="Run the first mypyc run test with the specified debugger",
- )
- def pytest_configure(config: pytest.Config) -> None:
- if config.getoption("--update-data") and config.getoption("--numprocesses", default=1) > 1:
- raise pytest.UsageError(
- "--update-data incompatible with parallelized tests; re-run with -n 1"
- )
- # This function name is special to pytest. See
- # https://doc.pytest.org/en/latest/how-to/writing_plugins.html#collection-hooks
- def pytest_pycollect_makeitem(collector: Any, name: str, obj: object) -> Any | None:
- """Called by pytest on each object in modules configured in conftest.py files.
- collector is pytest.Collector, returns Optional[pytest.Class]
- """
- if isinstance(obj, type):
- # Only classes derived from DataSuite contain test cases, not the DataSuite class itself
- if issubclass(obj, DataSuite) and obj is not DataSuite:
- # Non-None result means this obj is a test case.
- # The collect method of the returned DataSuiteCollector instance will be called later,
- # with self.obj being obj.
- return DataSuiteCollector.from_parent( # type: ignore[no-untyped-call]
- parent=collector, name=name
- )
- return None
- _case_name_pattern = re.compile(
- r"(?P<name>[a-zA-Z_0-9]+)"
- r"(?P<writescache>-writescache)?"
- r"(?P<only_when>-only_when_cache|-only_when_nocache)?"
- r"(-(?P<platform>posix|windows))?"
- r"(?P<skip>-skip)?"
- r"(?P<xfail>-xfail)?"
- )
- def split_test_cases(
- parent: DataFileCollector, suite: DataSuite, file: str
- ) -> Iterator[DataDrivenTestCase]:
- """Iterate over raw test cases in file, at collection time, ignoring sub items.
- The collection phase is slow, so any heavy processing should be deferred to after
- uninteresting tests are filtered (when using -k PATTERN switch).
- """
- with open(file, encoding="utf-8") as f:
- data = f.read()
- cases = re.split(r"^\[case ([^]+)]+)\][ \t]*$\n", data, flags=re.DOTALL | re.MULTILINE)
- cases_iter = iter(cases)
- line_no = next(cases_iter).count("\n") + 1
- test_names = set()
- for case_id in cases_iter:
- data = next(cases_iter)
- m = _case_name_pattern.fullmatch(case_id)
- if not m:
- raise RuntimeError(f"Invalid testcase id {case_id!r}")
- name = m.group("name")
- if name in test_names:
- raise RuntimeError(
- 'Found a duplicate test name "{}" in {} on line {}'.format(
- name, parent.name, line_no
- )
- )
- yield DataDrivenTestCase.from_parent(
- parent=parent,
- suite=suite,
- file=file,
- name=add_test_name_suffix(name, suite.test_name_suffix),
- writescache=bool(m.group("writescache")),
- only_when=m.group("only_when"),
- platform=m.group("platform"),
- skip=bool(m.group("skip")),
- xfail=bool(m.group("xfail")),
- data=data,
- line=line_no,
- )
- line_no += data.count("\n") + 1
- # Record existing tests to prevent duplicates:
- test_names.update({name})
- class DataSuiteCollector(pytest.Class):
- def collect(self) -> Iterator[DataFileCollector]:
- """Called by pytest on each of the object returned from pytest_pycollect_makeitem"""
- # obj is the object for which pytest_pycollect_makeitem returned self.
- suite: DataSuite = self.obj
- assert os.path.isdir(
- suite.data_prefix
- ), f"Test data prefix ({suite.data_prefix}) not set correctly"
- for data_file in suite.files:
- yield DataFileCollector.from_parent(parent=self, name=data_file)
- class DataFileFix(NamedTuple):
- lineno: int # 1-offset, inclusive
- end_lineno: int # 1-offset, exclusive
- lines: list[str]
- class DataFileCollector(pytest.Collector):
- """Represents a single `.test` data driven test file.
- More context: https://github.com/python/mypy/issues/11662
- """
- parent: DataSuiteCollector
- _fixes: list[DataFileFix]
- @classmethod # We have to fight with pytest here:
- def from_parent(
- cls, parent: DataSuiteCollector, *, name: str # type: ignore[override]
- ) -> DataFileCollector:
- collector = super().from_parent(parent, name=name)
- assert isinstance(collector, DataFileCollector)
- return collector
- def collect(self) -> Iterator[DataDrivenTestCase]:
- yield from split_test_cases(
- parent=self,
- suite=self.parent.obj,
- file=os.path.join(self.parent.obj.data_prefix, self.name),
- )
- def setup(self) -> None:
- super().setup()
- self._fixes = []
- def teardown(self) -> None:
- super().teardown()
- self._apply_fixes()
- def enqueue_fix(self, fix: DataFileFix) -> None:
- self._fixes.append(fix)
- def _apply_fixes(self) -> None:
- if not self._fixes:
- return
- data_path = Path(self.parent.obj.data_prefix) / self.name
- lines = data_path.read_text().split("\n")
- # start from end to prevent line offsets from shifting as we update
- for fix in sorted(self._fixes, reverse=True):
- lines[fix.lineno - 1 : fix.end_lineno - 1] = fix.lines
- data_path.write_text("\n".join(lines))
- def add_test_name_suffix(name: str, suffix: str) -> str:
- # Find magic suffix of form "-foobar" (used for things like "-skip").
- m = re.search(r"-[-A-Za-z0-9]+$", name)
- if m:
- # Insert suite-specific test name suffix before the magic suffix
- # which must be the last thing in the test case name since we
- # are using endswith() checks.
- magic_suffix = m.group(0)
- return name[: -len(magic_suffix)] + suffix + magic_suffix
- else:
- return name + suffix
- def is_incremental(testcase: DataDrivenTestCase) -> bool:
- return "incremental" in testcase.name.lower() or "incremental" in testcase.file
- def has_stable_flags(testcase: DataDrivenTestCase) -> bool:
- if any(re.match(r"# flags[2-9]:", line) for line in testcase.input):
- return False
- for filename, contents in testcase.files:
- if os.path.basename(filename).startswith("mypy.ini."):
- return False
- return True
- class DataSuite:
- # option fields - class variables
- files: list[str]
- base_path = test_temp_dir
- # Allow external users of the test code to override the data prefix
- data_prefix = test_data_prefix
- required_out_section = False
- native_sep = False
- # Name suffix automatically added to each test case in the suite (can be
- # used to distinguish test cases in suites that share data files)
- test_name_suffix = ""
- def setup(self) -> None:
- """Setup fixtures (ad-hoc)"""
- @abstractmethod
- def run_case(self, testcase: DataDrivenTestCase) -> None:
- raise NotImplementedError
|