| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438 |
- """Test cases for fine-grained incremental checking.
- Each test cases runs a batch build followed by one or more fine-grained
- incremental steps. We verify that each step produces the expected output.
- See the comment at the top of test-data/unit/fine-grained.test for more
- information.
- N.B.: Unlike most of the other test suites, testfinegrained does not
- rely on an alt_lib_path for finding source files. This means that they
- can test interactions with the lib_path that is built implicitly based
- on specified sources.
- """
- from __future__ import annotations
- import os
- import re
- import unittest
- from typing import Any
- import pytest
- from mypy import build
- from mypy.config_parser import parse_config_file
- from mypy.dmypy_server import Server
- from mypy.dmypy_util import DEFAULT_STATUS_FILE
- from mypy.errors import CompileError
- from mypy.find_sources import create_source_list
- from mypy.modulefinder import BuildSource
- from mypy.options import TYPE_VAR_TUPLE, UNPACK, Options
- from mypy.server.mergecheck import check_consistency
- from mypy.server.update import sort_messages_preserving_file_order
- from mypy.test.config import test_temp_dir
- from mypy.test.data import DataDrivenTestCase, DataSuite, DeleteFile, UpdateFile
- from mypy.test.helpers import (
- assert_module_equivalence,
- assert_string_arrays_equal,
- assert_target_equivalence,
- find_test_files,
- parse_options,
- perform_file_operations,
- )
- # Set to True to perform (somewhat expensive) checks for duplicate AST nodes after merge
- CHECK_CONSISTENCY = False
- class FineGrainedSuite(DataSuite):
- files = find_test_files(
- pattern="fine-grained*.test", exclude=["fine-grained-cache-incremental.test"]
- )
- # Whether to use the fine-grained cache in the testing. This is overridden
- # by a trivial subclass to produce a suite that uses the cache.
- use_cache = False
- def should_skip(self, testcase: DataDrivenTestCase) -> bool:
- # Decide whether to skip the test. This could have been structured
- # as a filter() classmethod also, but we want the tests reported
- # as skipped, not just elided.
- if self.use_cache:
- if testcase.only_when == "-only_when_nocache":
- return True
- # TODO: In caching mode we currently don't well support
- # starting from cached states with errors in them.
- if testcase.output and testcase.output[0] != "==":
- return True
- else:
- if testcase.only_when == "-only_when_cache":
- return True
- return False
- def run_case(self, testcase: DataDrivenTestCase) -> None:
- if self.should_skip(testcase):
- pytest.skip()
- return
- main_src = "\n".join(testcase.input)
- main_path = os.path.join(test_temp_dir, "main")
- with open(main_path, "w", encoding="utf8") as f:
- f.write(main_src)
- options = self.get_options(main_src, testcase, build_cache=False)
- build_options = self.get_options(main_src, testcase, build_cache=True)
- server = Server(options, DEFAULT_STATUS_FILE)
- num_regular_incremental_steps = self.get_build_steps(main_src)
- step = 1
- sources = self.parse_sources(main_src, step, options)
- if step <= num_regular_incremental_steps:
- messages = self.build(build_options, sources)
- else:
- messages = self.run_check(server, sources)
- a = []
- if messages:
- a.extend(normalize_messages(messages))
- assert testcase.tmpdir
- a.extend(self.maybe_suggest(step, server, main_src, testcase.tmpdir.name))
- a.extend(self.maybe_inspect(step, server, main_src))
- if server.fine_grained_manager:
- if CHECK_CONSISTENCY:
- check_consistency(server.fine_grained_manager)
- steps = testcase.find_steps()
- all_triggered = []
- for operations in steps:
- step += 1
- output, triggered = self.perform_step(
- operations,
- server,
- options,
- build_options,
- testcase,
- main_src,
- step,
- num_regular_incremental_steps,
- )
- a.append("==")
- a.extend(output)
- all_triggered.extend(triggered)
- # Normalize paths in test output (for Windows).
- a = [line.replace("\\", "/") for line in a]
- assert_string_arrays_equal(
- testcase.output, a, f"Invalid output ({testcase.file}, line {testcase.line})"
- )
- if testcase.triggered:
- assert_string_arrays_equal(
- testcase.triggered,
- self.format_triggered(all_triggered),
- f"Invalid active triggers ({testcase.file}, line {testcase.line})",
- )
- def get_options(self, source: str, testcase: DataDrivenTestCase, build_cache: bool) -> Options:
- # This handles things like '# flags: --foo'.
- options = parse_options(source, testcase, incremental_step=1)
- options.incremental = True
- options.use_builtins_fixtures = True
- options.show_traceback = True
- options.error_summary = False
- options.fine_grained_incremental = not build_cache
- options.use_fine_grained_cache = self.use_cache and not build_cache
- options.cache_fine_grained = self.use_cache
- options.local_partial_types = True
- options.enable_incomplete_feature = [TYPE_VAR_TUPLE, UNPACK]
- # Treat empty bodies safely for these test cases.
- options.allow_empty_bodies = not testcase.name.endswith("_no_empty")
- if re.search("flags:.*--follow-imports", source) is None:
- # Override the default for follow_imports
- options.follow_imports = "error"
- for name, _ in testcase.files:
- if "mypy.ini" in name or "pyproject.toml" in name:
- parse_config_file(options, lambda: None, name)
- break
- return options
- def run_check(self, server: Server, sources: list[BuildSource]) -> list[str]:
- response = server.check(sources, export_types=True, is_tty=False, terminal_width=-1)
- out = response["out"] or response["err"]
- assert isinstance(out, str)
- return out.splitlines()
- def build(self, options: Options, sources: list[BuildSource]) -> list[str]:
- try:
- result = build.build(sources=sources, options=options)
- except CompileError as e:
- return e.messages
- return result.errors
- def format_triggered(self, triggered: list[list[str]]) -> list[str]:
- result = []
- for n, triggers in enumerate(triggered):
- filtered = [trigger for trigger in triggers if not trigger.endswith("__>")]
- filtered = sorted(filtered)
- result.append(("%d: %s" % (n + 2, ", ".join(filtered))).strip())
- return result
- def get_build_steps(self, program_text: str) -> int:
- """Get the number of regular incremental steps to run, from the test source"""
- if not self.use_cache:
- return 0
- m = re.search("# num_build_steps: ([0-9]+)$", program_text, flags=re.MULTILINE)
- if m is not None:
- return int(m.group(1))
- return 1
- def perform_step(
- self,
- operations: list[UpdateFile | DeleteFile],
- server: Server,
- options: Options,
- build_options: Options,
- testcase: DataDrivenTestCase,
- main_src: str,
- step: int,
- num_regular_incremental_steps: int,
- ) -> tuple[list[str], list[list[str]]]:
- """Perform one fine-grained incremental build step (after some file updates/deletions).
- Return (mypy output, triggered targets).
- """
- perform_file_operations(operations)
- sources = self.parse_sources(main_src, step, options)
- if step <= num_regular_incremental_steps:
- new_messages = self.build(build_options, sources)
- else:
- new_messages = self.run_check(server, sources)
- updated: list[str] = []
- changed: list[str] = []
- targets: list[str] = []
- triggered = []
- if server.fine_grained_manager:
- if CHECK_CONSISTENCY:
- check_consistency(server.fine_grained_manager)
- triggered.append(server.fine_grained_manager.triggered)
- updated = server.fine_grained_manager.updated_modules
- changed = [mod for mod, file in server.fine_grained_manager.changed_modules]
- targets = server.fine_grained_manager.processed_targets
- expected_stale = testcase.expected_stale_modules.get(step - 1)
- if expected_stale is not None:
- assert_module_equivalence("stale" + str(step - 1), expected_stale, changed)
- expected_rechecked = testcase.expected_rechecked_modules.get(step - 1)
- if expected_rechecked is not None:
- assert_module_equivalence("rechecked" + str(step - 1), expected_rechecked, updated)
- expected = testcase.expected_fine_grained_targets.get(step)
- if expected:
- assert_target_equivalence("targets" + str(step), expected, targets)
- new_messages = normalize_messages(new_messages)
- a = new_messages
- assert testcase.tmpdir
- a.extend(self.maybe_suggest(step, server, main_src, testcase.tmpdir.name))
- a.extend(self.maybe_inspect(step, server, main_src))
- return a, triggered
- def parse_sources(
- self, program_text: str, incremental_step: int, options: Options
- ) -> list[BuildSource]:
- """Return target BuildSources for a test case.
- Normally, the unit tests will check all files included in the test
- case. This differs from how testcheck works by default, as dmypy
- doesn't currently support following imports.
- You can override this behavior and instruct the tests to check
- multiple modules by using a comment like this in the test case
- input:
- # cmd: main a.py
- You can also use `# cmdN:` to have a different cmd for incremental
- step N (2, 3, ...).
- """
- m = re.search("# cmd: mypy ([a-zA-Z0-9_./ ]+)$", program_text, flags=re.MULTILINE)
- regex = f"# cmd{incremental_step}: mypy ([a-zA-Z0-9_./ ]+)$"
- alt_m = re.search(regex, program_text, flags=re.MULTILINE)
- if alt_m is not None:
- # Optionally return a different command if in a later step
- # of incremental mode, otherwise default to reusing the
- # original cmd.
- m = alt_m
- if m:
- # The test case wants to use a non-default set of files.
- paths = [os.path.join(test_temp_dir, path) for path in m.group(1).strip().split()]
- return create_source_list(paths, options)
- else:
- base = BuildSource(os.path.join(test_temp_dir, "main"), "__main__", None)
- # Use expand_dir instead of create_source_list to avoid complaints
- # when there aren't any .py files in an increment
- return [base] + create_source_list([test_temp_dir], options, allow_empty_dir=True)
- def maybe_suggest(self, step: int, server: Server, src: str, tmp_dir: str) -> list[str]:
- output: list[str] = []
- targets = self.get_suggest(src, step)
- for flags, target in targets:
- json = "--json" in flags
- callsites = "--callsites" in flags
- no_any = "--no-any" in flags
- no_errors = "--no-errors" in flags
- m = re.match("--flex-any=([0-9.]+)", flags)
- flex_any = float(m.group(1)) if m else None
- m = re.match(r"--use-fixme=(\w+)", flags)
- use_fixme = m.group(1) if m else None
- m = re.match("--max-guesses=([0-9]+)", flags)
- max_guesses = int(m.group(1)) if m else None
- res: dict[str, Any] = server.cmd_suggest(
- target.strip(),
- json=json,
- no_any=no_any,
- no_errors=no_errors,
- flex_any=flex_any,
- use_fixme=use_fixme,
- callsites=callsites,
- max_guesses=max_guesses,
- )
- val = res["error"] if "error" in res else res["out"] + res["err"]
- if json:
- # JSON contains already escaped \ on Windows, so requires a bit of care.
- val = val.replace("\\\\", "\\")
- val = val.replace(os.path.realpath(tmp_dir) + os.path.sep, "")
- val = val.replace(os.path.abspath(tmp_dir) + os.path.sep, "")
- output.extend(val.strip().split("\n"))
- return normalize_messages(output)
- def maybe_inspect(self, step: int, server: Server, src: str) -> list[str]:
- output: list[str] = []
- targets = self.get_inspect(src, step)
- for flags, location in targets:
- m = re.match(r"--show=(\w+)", flags)
- show = m.group(1) if m else "type"
- verbosity = 0
- if "-v" in flags:
- verbosity = 1
- if "-vv" in flags:
- verbosity = 2
- m = re.match(r"--limit=([0-9]+)", flags)
- limit = int(m.group(1)) if m else 0
- include_span = "--include-span" in flags
- include_kind = "--include-kind" in flags
- include_object_attrs = "--include-object-attrs" in flags
- union_attrs = "--union-attrs" in flags
- force_reload = "--force-reload" in flags
- res: dict[str, Any] = server.cmd_inspect(
- show,
- location,
- verbosity=verbosity,
- limit=limit,
- include_span=include_span,
- include_kind=include_kind,
- include_object_attrs=include_object_attrs,
- union_attrs=union_attrs,
- force_reload=force_reload,
- )
- val = res["error"] if "error" in res else res["out"] + res["err"]
- output.extend(val.strip().split("\n"))
- return normalize_messages(output)
- def get_suggest(self, program_text: str, incremental_step: int) -> list[tuple[str, str]]:
- step_bit = "1?" if incremental_step == 1 else str(incremental_step)
- regex = f"# suggest{step_bit}: (--[a-zA-Z0-9_\\-./=?^ ]+ )*([a-zA-Z0-9_.:/?^ ]+)$"
- m = re.findall(regex, program_text, flags=re.MULTILINE)
- return m
- def get_inspect(self, program_text: str, incremental_step: int) -> list[tuple[str, str]]:
- step_bit = "1?" if incremental_step == 1 else str(incremental_step)
- regex = f"# inspect{step_bit}: (--[a-zA-Z0-9_\\-=?^ ]+ )*([a-zA-Z0-9_.:/?^ ]+)$"
- m = re.findall(regex, program_text, flags=re.MULTILINE)
- return m
- def normalize_messages(messages: list[str]) -> list[str]:
- return [re.sub("^tmp" + re.escape(os.sep), "", message) for message in messages]
- class TestMessageSorting(unittest.TestCase):
- def test_simple_sorting(self) -> None:
- msgs = ['x.py:1: error: "int" not callable', 'foo/y.py:123: note: "X" not defined']
- old_msgs = ['foo/y.py:12: note: "Y" not defined', 'x.py:8: error: "str" not callable']
- assert sort_messages_preserving_file_order(msgs, old_msgs) == list(reversed(msgs))
- assert sort_messages_preserving_file_order(list(reversed(msgs)), old_msgs) == list(
- reversed(msgs)
- )
- def test_long_form_sorting(self) -> None:
- # Multi-line errors should be sorted together and not split.
- msg1 = [
- 'x.py:1: error: "int" not callable',
- "and message continues (x: y)",
- " 1()",
- " ^~~",
- ]
- msg2 = [
- 'foo/y.py: In function "f":',
- 'foo/y.py:123: note: "X" not defined',
- "and again message continues",
- ]
- old_msgs = ['foo/y.py:12: note: "Y" not defined', 'x.py:8: error: "str" not callable']
- assert sort_messages_preserving_file_order(msg1 + msg2, old_msgs) == msg2 + msg1
- assert sort_messages_preserving_file_order(msg2 + msg1, old_msgs) == msg2 + msg1
- def test_mypy_error_prefix(self) -> None:
- # Some errors don't have a file and start with "mypy: ". These
- # shouldn't be sorted together with file-specific errors.
- msg1 = 'x.py:1: error: "int" not callable'
- msg2 = 'foo/y:123: note: "X" not defined'
- msg3 = "mypy: Error not associated with a file"
- old_msgs = [
- "mypy: Something wrong",
- 'foo/y:12: note: "Y" not defined',
- 'x.py:8: error: "str" not callable',
- ]
- assert sort_messages_preserving_file_order([msg1, msg2, msg3], old_msgs) == [
- msg2,
- msg1,
- msg3,
- ]
- assert sort_messages_preserving_file_order([msg3, msg2, msg1], old_msgs) == [
- msg2,
- msg1,
- msg3,
- ]
- def test_new_file_at_the_end(self) -> None:
- msg1 = 'x.py:1: error: "int" not callable'
- msg2 = 'foo/y.py:123: note: "X" not defined'
- new1 = "ab.py:3: error: Problem: error"
- new2 = "aaa:3: error: Bad"
- old_msgs = ['foo/y.py:12: note: "Y" not defined', 'x.py:8: error: "str" not callable']
- assert sort_messages_preserving_file_order([msg1, msg2, new1], old_msgs) == [
- msg2,
- msg1,
- new1,
- ]
- assert sort_messages_preserving_file_order([new1, msg1, msg2, new2], old_msgs) == [
- msg2,
- msg1,
- new1,
- new2,
- ]
|