context.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. """Manage resources."""
  2. import ast
  3. import os.path as op
  4. import re
  5. from argparse import Namespace
  6. from copy import copy
  7. from functools import lru_cache
  8. from pathlib import Path
  9. from tempfile import NamedTemporaryFile, mkdtemp
  10. from typing import Dict, List, Set
  11. from pylama.errors import Error
  12. from pylama.utils import read
  13. # Parse modeline
  14. MODELINE_RE = re.compile(
  15. r"^\s*#\s+(?:pylama:)\s*((?:[\w_]*=[^:\n\s]+:?)+)", re.I | re.M
  16. ).search
  17. SKIP_PATTERN = re.compile(r"# *noqa\b", re.I).search
  18. class RunContext: # pylint: disable=R0902
  19. """Manage resources."""
  20. __slots__ = (
  21. "errors",
  22. "options",
  23. "skip",
  24. "ignore",
  25. "select",
  26. "linters",
  27. "linters_params",
  28. "filename",
  29. "_ast",
  30. "_from_stdin",
  31. "_source",
  32. "_tempfile",
  33. "_lines",
  34. )
  35. def __init__(self, filename: str, source: str = None, options: Namespace = None):
  36. """Initialize the class."""
  37. self.errors: List[Error] = []
  38. self.options = options
  39. self.skip = False
  40. self.ignore = set()
  41. self.select = set()
  42. self.linters = []
  43. self.linters_params = {}
  44. self._ast = None
  45. self._from_stdin = source is not None
  46. self._source = source
  47. self._tempfile = None
  48. self._lines = None
  49. if options:
  50. if options.abspath:
  51. filename = op.abspath(filename)
  52. self.skip = options.skip and any(
  53. ptrn.match(filename) for ptrn in options.skip
  54. )
  55. self.linters.extend(options.linters)
  56. self.ignore |= options.ignore
  57. self.select |= options.select
  58. self.linters_params.update(options.linters_params)
  59. for mask in options.file_params:
  60. if mask.match(filename):
  61. fparams = options.file_params[mask]
  62. self.update_params(**fparams)
  63. self.filename = filename
  64. # Read/parse modeline
  65. if not self.skip:
  66. modeline = MODELINE_RE(self.source)
  67. if modeline:
  68. values = modeline.group(1).split(":")
  69. self.update_params(**dict(v.split("=", 1) for v in values)) # type: ignore
  70. def __enter__(self):
  71. """Enter to context."""
  72. return self
  73. def __exit__(self, etype, evalue, _):
  74. """Exit from the context."""
  75. if self._tempfile is not None:
  76. tmpfile = Path(self._tempfile)
  77. tmpfile.unlink()
  78. tmpfile.parent.rmdir()
  79. if evalue is not None:
  80. if etype is IOError:
  81. self.push(text=f"{evalue}", number="E001")
  82. elif etype is UnicodeDecodeError:
  83. self.push(text=f"UnicodeError: {self.filename}", number="E001")
  84. elif etype is SyntaxError:
  85. self.push(
  86. lnum=evalue.lineno,
  87. col=evalue.offset,
  88. text=f"SyntaxError: {evalue.args[0]}",
  89. )
  90. else:
  91. self.push(lnum=1, col=1, text=str(evalue))
  92. return False
  93. return True
  94. @property
  95. def source(self):
  96. """Get the current source code."""
  97. if self._source is None:
  98. self._source = read(self.filename)
  99. return self._source
  100. @property
  101. def lines(self):
  102. """Split source to lines."""
  103. if self._lines is None:
  104. self._lines = self.source.splitlines(True)
  105. return self._lines
  106. @property
  107. def ast(self):
  108. """Get the AST for the source."""
  109. if self._ast is None:
  110. self._ast = compile(self.source, self.filename, "exec", ast.PyCF_ONLY_AST)
  111. return self._ast
  112. @property
  113. def temp_filename(self):
  114. """Get a filename for run external command."""
  115. if not self._from_stdin:
  116. return self.filename
  117. if self._tempfile is None:
  118. file = NamedTemporaryFile( # noqa
  119. "w",
  120. encoding="utf8",
  121. suffix=".py",
  122. dir=mkdtemp(prefix="pylama_"),
  123. delete=False,
  124. )
  125. file.write(self.source)
  126. file.close()
  127. self._tempfile = file.name
  128. return self._tempfile
  129. def update_params(self, ignore=None, select=None, linters=None, skip=None, **_):
  130. """Update general params (from file configs or modeline)."""
  131. if select:
  132. self.select |= set(select.split(","))
  133. if ignore:
  134. self.ignore |= set(ignore.split(","))
  135. if linters:
  136. self.linters = linters.split(",")
  137. if skip is not None:
  138. self.skip = bool(int(skip))
  139. @lru_cache(42)
  140. def get_params(self, name: str) -> Dict:
  141. """Get params for a linter with the given name."""
  142. lparams = copy(self.linters_params.get(name, {}))
  143. for key in ("ignore", "select"):
  144. if key in lparams and not isinstance(lparams[key], set):
  145. lparams[key] = set(lparams[key].split(","))
  146. return lparams
  147. @lru_cache(42)
  148. def get_filter(self, name: str, key: str) -> Set:
  149. """Get select/ignore from linter params."""
  150. lparams = self.get_params(name)
  151. return lparams.get(key, set())
  152. def push(self, filtrate: bool = True, **params):
  153. """Record an error."""
  154. err = Error(filename=self.filename, **params)
  155. number = err.number
  156. if len(self.lines) >= err.lnum and SKIP_PATTERN(self.lines[err.lnum - 1]):
  157. return None
  158. if filtrate:
  159. for rule in self.select | self.get_filter(err.source, "select"):
  160. if number.startswith(rule):
  161. return self.errors.append(err)
  162. for rule in self.ignore | self.get_filter(err.source, "ignore"):
  163. if number.startswith(rule):
  164. return None
  165. return self.errors.append(err)