brain_subprocess.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. # Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
  2. # For details: https://github.com/PyCQA/astroid/blob/main/LICENSE
  3. # Copyright (c) https://github.com/PyCQA/astroid/blob/main/CONTRIBUTORS.txt
  4. import textwrap
  5. from astroid.brain.helpers import register_module_extender
  6. from astroid.builder import parse
  7. from astroid.const import PY39_PLUS, PY310_PLUS, PY311_PLUS
  8. from astroid.manager import AstroidManager
  9. def _subprocess_transform():
  10. communicate = (bytes("string", "ascii"), bytes("string", "ascii"))
  11. communicate_signature = "def communicate(self, input=None, timeout=None)"
  12. args = """\
  13. self, args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None,
  14. preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None,
  15. universal_newlines=None, startupinfo=None, creationflags=0, restore_signals=True,
  16. start_new_session=False, pass_fds=(), *, encoding=None, errors=None, text=None"""
  17. if PY39_PLUS:
  18. args += ", user=None, group=None, extra_groups=None, umask=-1"
  19. if PY310_PLUS:
  20. args += ", pipesize=-1"
  21. if PY311_PLUS:
  22. args += ", process_group=None"
  23. init = f"""
  24. def __init__({args}):
  25. pass"""
  26. wait_signature = "def wait(self, timeout=None)"
  27. ctx_manager = """
  28. def __enter__(self): return self
  29. def __exit__(self, *args): pass
  30. """
  31. py3_args = "args = []"
  32. check_output_signature = """
  33. check_output(
  34. args, *,
  35. stdin=None,
  36. stderr=None,
  37. shell=False,
  38. cwd=None,
  39. encoding=None,
  40. errors=None,
  41. universal_newlines=False,
  42. timeout=None,
  43. env=None,
  44. text=None,
  45. restore_signals=True,
  46. preexec_fn=None,
  47. pass_fds=(),
  48. input=None,
  49. bufsize=0,
  50. executable=None,
  51. close_fds=False,
  52. startupinfo=None,
  53. creationflags=0,
  54. start_new_session=False
  55. ):
  56. """.strip()
  57. code = textwrap.dedent(
  58. f"""
  59. def {check_output_signature}
  60. if universal_newlines:
  61. return ""
  62. return b""
  63. class Popen(object):
  64. returncode = pid = 0
  65. stdin = stdout = stderr = file()
  66. {py3_args}
  67. {communicate_signature}:
  68. return {communicate!r}
  69. {wait_signature}:
  70. return self.returncode
  71. def poll(self):
  72. return self.returncode
  73. def send_signal(self, signal):
  74. pass
  75. def terminate(self):
  76. pass
  77. def kill(self):
  78. pass
  79. {ctx_manager}
  80. """
  81. )
  82. if PY39_PLUS:
  83. code += """
  84. @classmethod
  85. def __class_getitem__(cls, item):
  86. pass
  87. """
  88. init_lines = textwrap.dedent(init).splitlines()
  89. indented_init = "\n".join(" " * 4 + line for line in init_lines)
  90. code += indented_init
  91. return parse(code)
  92. register_module_extender(AstroidManager(), "subprocess", _subprocess_transform)