parallel.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
  2. # For details: https://github.com/PyCQA/pylint/blob/main/LICENSE
  3. # Copyright (c) https://github.com/PyCQA/pylint/blob/main/CONTRIBUTORS.txt
  4. from __future__ import annotations
  5. import functools
  6. import warnings
  7. from collections import defaultdict
  8. from collections.abc import Iterable, Sequence
  9. from typing import TYPE_CHECKING, Any
  10. import dill
  11. from pylint import reporters
  12. from pylint.lint.utils import _augment_sys_path
  13. from pylint.message import Message
  14. from pylint.typing import FileItem
  15. from pylint.utils import LinterStats, merge_stats
  16. try:
  17. import multiprocessing
  18. except ImportError:
  19. multiprocessing = None # type: ignore[assignment]
  20. try:
  21. from concurrent.futures import ProcessPoolExecutor
  22. except ImportError:
  23. ProcessPoolExecutor = None # type: ignore[assignment,misc]
  24. if TYPE_CHECKING:
  25. from pylint.lint import PyLinter
  26. # PyLinter object used by worker processes when checking files using parallel mode
  27. # should only be used by the worker processes
  28. _worker_linter: PyLinter | None = None
  29. def _worker_initialize(
  30. linter: bytes, extra_packages_paths: Sequence[str] | None = None
  31. ) -> None:
  32. """Function called to initialize a worker for a Process within a concurrent Pool.
  33. :param linter: A linter-class (PyLinter) instance pickled with dill
  34. :param extra_packages_paths: Extra entries to be added to sys.path
  35. """
  36. global _worker_linter # pylint: disable=global-statement
  37. _worker_linter = dill.loads(linter)
  38. assert _worker_linter
  39. # On the worker process side the messages are just collected and passed back to
  40. # parent process as _worker_check_file function's return value
  41. _worker_linter.set_reporter(reporters.CollectingReporter())
  42. _worker_linter.open()
  43. if extra_packages_paths:
  44. _augment_sys_path(extra_packages_paths)
  45. def _worker_check_single_file(
  46. file_item: FileItem,
  47. ) -> tuple[
  48. int,
  49. # TODO: 3.0: Make this only str after deprecation has been removed
  50. str | None,
  51. str,
  52. str | None,
  53. list[Message],
  54. LinterStats,
  55. int,
  56. defaultdict[str, list[Any]],
  57. ]:
  58. if not _worker_linter:
  59. raise RuntimeError("Worker linter not yet initialised")
  60. _worker_linter.open()
  61. _worker_linter.check_single_file_item(file_item)
  62. mapreduce_data = defaultdict(list)
  63. for checker in _worker_linter.get_checkers():
  64. data = checker.get_map_data()
  65. if data is not None:
  66. mapreduce_data[checker.name].append(data)
  67. msgs = _worker_linter.reporter.messages
  68. assert isinstance(_worker_linter.reporter, reporters.CollectingReporter)
  69. _worker_linter.reporter.reset()
  70. if _worker_linter.current_name is None:
  71. warnings.warn(
  72. (
  73. "In pylint 3.0 the current_name attribute of the linter object should be a string. "
  74. "If unknown it should be initialized as an empty string."
  75. ),
  76. DeprecationWarning,
  77. )
  78. return (
  79. id(multiprocessing.current_process()),
  80. _worker_linter.current_name,
  81. file_item.filepath,
  82. _worker_linter.file_state.base_name,
  83. msgs,
  84. _worker_linter.stats,
  85. _worker_linter.msg_status,
  86. mapreduce_data,
  87. )
  88. def _merge_mapreduce_data(
  89. linter: PyLinter,
  90. all_mapreduce_data: defaultdict[int, list[defaultdict[str, list[Any]]]],
  91. ) -> None:
  92. """Merges map/reduce data across workers, invoking relevant APIs on checkers."""
  93. # First collate the data and prepare it, so we can send it to the checkers for
  94. # validation. The intent here is to collect all the mapreduce data for all checker-
  95. # runs across processes - that will then be passed to a static method on the
  96. # checkers to be reduced and further processed.
  97. collated_map_reduce_data: defaultdict[str, list[Any]] = defaultdict(list)
  98. for linter_data in all_mapreduce_data.values():
  99. for run_data in linter_data:
  100. for checker_name, data in run_data.items():
  101. collated_map_reduce_data[checker_name].extend(data)
  102. # Send the data to checkers that support/require consolidated data
  103. original_checkers = linter.get_checkers()
  104. for checker in original_checkers:
  105. if checker.name in collated_map_reduce_data:
  106. # Assume that if the check has returned map/reduce data that it has the
  107. # reducer function
  108. checker.reduce_map_data(linter, collated_map_reduce_data[checker.name])
  109. def check_parallel(
  110. linter: PyLinter,
  111. jobs: int,
  112. files: Iterable[FileItem],
  113. extra_packages_paths: Sequence[str] | None = None,
  114. ) -> None:
  115. """Use the given linter to lint the files with given amount of workers (jobs).
  116. This splits the work filestream-by-filestream. If you need to do work across
  117. multiple files, as in the similarity-checker, then implement the map/reduce mixin functionality.
  118. """
  119. # The linter is inherited by all the pool's workers, i.e. the linter
  120. # is identical to the linter object here. This is required so that
  121. # a custom PyLinter object can be used.
  122. initializer = functools.partial(
  123. _worker_initialize, extra_packages_paths=extra_packages_paths
  124. )
  125. with ProcessPoolExecutor(
  126. max_workers=jobs, initializer=initializer, initargs=(dill.dumps(linter),)
  127. ) as executor:
  128. linter.open()
  129. all_stats = []
  130. all_mapreduce_data: defaultdict[
  131. int, list[defaultdict[str, list[Any]]]
  132. ] = defaultdict(list)
  133. # Maps each file to be worked on by a single _worker_check_single_file() call,
  134. # collecting any map/reduce data by checker module so that we can 'reduce' it
  135. # later.
  136. for (
  137. worker_idx, # used to merge map/reduce data across workers
  138. module,
  139. file_path,
  140. base_name,
  141. messages,
  142. stats,
  143. msg_status,
  144. mapreduce_data,
  145. ) in executor.map(_worker_check_single_file, files):
  146. linter.file_state.base_name = base_name
  147. linter.file_state._is_base_filestate = False
  148. linter.set_current_module(module, file_path)
  149. for msg in messages:
  150. linter.reporter.handle_message(msg)
  151. all_stats.append(stats)
  152. all_mapreduce_data[worker_idx].append(mapreduce_data)
  153. linter.msg_status |= msg_status
  154. _merge_mapreduce_data(linter, all_mapreduce_data)
  155. linter.stats = merge_stats([linter.stats] + all_stats)