_utils.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. from __future__ import annotations
  2. import re
  3. from collections.abc import Mapping
  4. from datetime import date
  5. from datetime import datetime
  6. from datetime import time
  7. from datetime import timedelta
  8. from datetime import timezone
  9. from typing import Collection
  10. from tomlkit._compat import decode
  11. RFC_3339_LOOSE = re.compile(
  12. "^"
  13. r"(([0-9]+)-(\d{2})-(\d{2}))?" # Date
  14. "("
  15. "([Tt ])?" # Separator
  16. r"(\d{2}):(\d{2}):(\d{2})(\.([0-9]+))?" # Time
  17. r"(([Zz])|([\+|\-]([01][0-9]|2[0-3]):([0-5][0-9])))?" # Timezone
  18. ")?"
  19. "$"
  20. )
  21. RFC_3339_DATETIME = re.compile(
  22. "^"
  23. "([0-9]+)-(0[1-9]|1[012])-(0[1-9]|[12][0-9]|3[01])" # Date
  24. "[Tt ]" # Separator
  25. r"([01][0-9]|2[0-3]):([0-5][0-9]):([0-5][0-9]|60)(\.([0-9]+))?" # Time
  26. r"(([Zz])|([\+|\-]([01][0-9]|2[0-3]):([0-5][0-9])))?" # Timezone
  27. "$"
  28. )
  29. RFC_3339_DATE = re.compile("^([0-9]+)-(0[1-9]|1[012])-(0[1-9]|[12][0-9]|3[01])$")
  30. RFC_3339_TIME = re.compile(
  31. r"^([01][0-9]|2[0-3]):([0-5][0-9]):([0-5][0-9]|60)(\.([0-9]+))?$"
  32. )
  33. _utc = timezone(timedelta(), "UTC")
  34. def parse_rfc3339(string: str) -> datetime | date | time:
  35. m = RFC_3339_DATETIME.match(string)
  36. if m:
  37. year = int(m.group(1))
  38. month = int(m.group(2))
  39. day = int(m.group(3))
  40. hour = int(m.group(4))
  41. minute = int(m.group(5))
  42. second = int(m.group(6))
  43. microsecond = 0
  44. if m.group(7):
  45. microsecond = int((f"{m.group(8):<06s}")[:6])
  46. if m.group(9):
  47. # Timezone
  48. tz = m.group(9)
  49. if tz.upper() == "Z":
  50. tzinfo = _utc
  51. else:
  52. sign = m.group(11)[0]
  53. hour_offset, minute_offset = int(m.group(12)), int(m.group(13))
  54. offset = timedelta(seconds=hour_offset * 3600 + minute_offset * 60)
  55. if sign == "-":
  56. offset = -offset
  57. tzinfo = timezone(offset, f"{sign}{m.group(12)}:{m.group(13)}")
  58. return datetime(
  59. year, month, day, hour, minute, second, microsecond, tzinfo=tzinfo
  60. )
  61. else:
  62. return datetime(year, month, day, hour, minute, second, microsecond)
  63. m = RFC_3339_DATE.match(string)
  64. if m:
  65. year = int(m.group(1))
  66. month = int(m.group(2))
  67. day = int(m.group(3))
  68. return date(year, month, day)
  69. m = RFC_3339_TIME.match(string)
  70. if m:
  71. hour = int(m.group(1))
  72. minute = int(m.group(2))
  73. second = int(m.group(3))
  74. microsecond = 0
  75. if m.group(4):
  76. microsecond = int((f"{m.group(5):<06s}")[:6])
  77. return time(hour, minute, second, microsecond)
  78. raise ValueError("Invalid RFC 339 string")
  79. # https://toml.io/en/v1.0.0#string
  80. CONTROL_CHARS = frozenset(chr(c) for c in range(0x20)) | {chr(0x7F)}
  81. _escaped = {
  82. "b": "\b",
  83. "t": "\t",
  84. "n": "\n",
  85. "f": "\f",
  86. "r": "\r",
  87. '"': '"',
  88. "\\": "\\",
  89. }
  90. _compact_escapes = {
  91. **{v: f"\\{k}" for k, v in _escaped.items()},
  92. '"""': '""\\"',
  93. }
  94. _basic_escapes = CONTROL_CHARS | {'"', "\\"}
  95. def _unicode_escape(seq: str) -> str:
  96. return "".join(f"\\u{ord(c):04x}" for c in seq)
  97. def escape_string(s: str, escape_sequences: Collection[str] = _basic_escapes) -> str:
  98. s = decode(s)
  99. res = []
  100. start = 0
  101. def flush(inc=1):
  102. if start != i:
  103. res.append(s[start:i])
  104. return i + inc
  105. found_sequences = {seq for seq in escape_sequences if seq in s}
  106. i = 0
  107. while i < len(s):
  108. for seq in found_sequences:
  109. seq_len = len(seq)
  110. if s[i:].startswith(seq):
  111. start = flush(seq_len)
  112. res.append(_compact_escapes.get(seq) or _unicode_escape(seq))
  113. i += seq_len - 1 # fast-forward escape sequence
  114. i += 1
  115. flush()
  116. return "".join(res)
  117. def merge_dicts(d1: dict, d2: dict) -> dict:
  118. for k, v in d2.items():
  119. if k in d1 and isinstance(d1[k], dict) and isinstance(v, Mapping):
  120. merge_dicts(d1[k], v)
  121. else:
  122. d1[k] = d2[k]