testipc.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. from __future__ import annotations
  2. import sys
  3. import time
  4. from multiprocessing import Process, Queue
  5. from unittest import TestCase, main
  6. import pytest
  7. from mypy.ipc import IPCClient, IPCServer
  8. CONNECTION_NAME = "dmypy-test-ipc"
  9. def server(msg: str, q: Queue[str]) -> None:
  10. server = IPCServer(CONNECTION_NAME)
  11. q.put(server.connection_name)
  12. data = b""
  13. while not data:
  14. with server:
  15. server.write(msg.encode())
  16. data = server.read()
  17. server.cleanup()
  18. class IPCTests(TestCase):
  19. def test_transaction_large(self) -> None:
  20. queue: Queue[str] = Queue()
  21. msg = "t" * 200000 # longer than the max read size of 100_000
  22. p = Process(target=server, args=(msg, queue), daemon=True)
  23. p.start()
  24. connection_name = queue.get()
  25. with IPCClient(connection_name, timeout=1) as client:
  26. assert client.read() == msg.encode()
  27. client.write(b"test")
  28. queue.close()
  29. queue.join_thread()
  30. p.join()
  31. def test_connect_twice(self) -> None:
  32. queue: Queue[str] = Queue()
  33. msg = "this is a test message"
  34. p = Process(target=server, args=(msg, queue), daemon=True)
  35. p.start()
  36. connection_name = queue.get()
  37. with IPCClient(connection_name, timeout=1) as client:
  38. assert client.read() == msg.encode()
  39. client.write(b"") # don't let the server hang up yet, we want to connect again.
  40. with IPCClient(connection_name, timeout=1) as client:
  41. assert client.read() == msg.encode()
  42. client.write(b"test")
  43. queue.close()
  44. queue.join_thread()
  45. p.join()
  46. assert p.exitcode == 0
  47. # Run test_connect_twice a lot, in the hopes of finding issues.
  48. # This is really slow, so it is skipped, but can be enabled if
  49. # needed to debug IPC issues.
  50. @pytest.mark.skip
  51. def test_connect_alot(self) -> None:
  52. t0 = time.time()
  53. for i in range(1000):
  54. try:
  55. print(i, "start")
  56. self.test_connect_twice()
  57. finally:
  58. t1 = time.time()
  59. print(i, t1 - t0)
  60. sys.stdout.flush()
  61. t0 = t1
  62. if __name__ == "__main__":
  63. main()