brain_multiprocessing.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. # Licensed under the LGPL: https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
  2. # For details: https://github.com/PyCQA/astroid/blob/main/LICENSE
  3. # Copyright (c) https://github.com/PyCQA/astroid/blob/main/CONTRIBUTORS.txt
  4. from astroid.bases import BoundMethod
  5. from astroid.brain.helpers import register_module_extender
  6. from astroid.builder import parse
  7. from astroid.exceptions import InferenceError
  8. from astroid.manager import AstroidManager
  9. from astroid.nodes.scoped_nodes import FunctionDef
  10. def _multiprocessing_transform():
  11. module = parse(
  12. """
  13. from multiprocessing.managers import SyncManager
  14. def Manager():
  15. return SyncManager()
  16. """
  17. )
  18. # Multiprocessing uses a getattr lookup inside contexts,
  19. # in order to get the attributes they need. Since it's extremely
  20. # dynamic, we use this approach to fake it.
  21. node = parse(
  22. """
  23. from multiprocessing.context import DefaultContext, BaseContext
  24. default = DefaultContext()
  25. base = BaseContext()
  26. """
  27. )
  28. try:
  29. context = next(node["default"].infer())
  30. base = next(node["base"].infer())
  31. except (InferenceError, StopIteration):
  32. return module
  33. for node in (context, base):
  34. for key, value in node.locals.items():
  35. if key.startswith("_"):
  36. continue
  37. value = value[0]
  38. if isinstance(value, FunctionDef):
  39. # We need to rebound this, since otherwise
  40. # it will have an extra argument (self).
  41. value = BoundMethod(value, node)
  42. module[key] = value
  43. return module
  44. def _multiprocessing_managers_transform():
  45. return parse(
  46. """
  47. import array
  48. import threading
  49. import multiprocessing.pool as pool
  50. import queue
  51. class Namespace(object):
  52. pass
  53. class Value(object):
  54. def __init__(self, typecode, value, lock=True):
  55. self._typecode = typecode
  56. self._value = value
  57. def get(self):
  58. return self._value
  59. def set(self, value):
  60. self._value = value
  61. def __repr__(self):
  62. return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value)
  63. value = property(get, set)
  64. def Array(typecode, sequence, lock=True):
  65. return array.array(typecode, sequence)
  66. class SyncManager(object):
  67. Queue = JoinableQueue = queue.Queue
  68. Event = threading.Event
  69. RLock = threading.RLock
  70. Lock = threading.Lock
  71. BoundedSemaphore = threading.BoundedSemaphore
  72. Condition = threading.Condition
  73. Barrier = threading.Barrier
  74. Pool = pool.Pool
  75. list = list
  76. dict = dict
  77. Value = Value
  78. Array = Array
  79. Namespace = Namespace
  80. __enter__ = lambda self: self
  81. __exit__ = lambda *args: args
  82. def start(self, initializer=None, initargs=None):
  83. pass
  84. def shutdown(self):
  85. pass
  86. """
  87. )
  88. register_module_extender(
  89. AstroidManager(), "multiprocessing.managers", _multiprocessing_managers_transform
  90. )
  91. register_module_extender(
  92. AstroidManager(), "multiprocessing", _multiprocessing_transform
  93. )