introspection.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. from collections import namedtuple
  2. import sqlparse
  3. from MySQLdb.constants import FIELD_TYPE
  4. from django.db.backends.base.introspection import (
  5. BaseDatabaseIntrospection, FieldInfo as BaseFieldInfo, TableInfo,
  6. )
  7. from django.db.models import Index
  8. from django.utils.datastructures import OrderedSet
  9. FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('extra', 'is_unsigned', 'has_json_constraint'))
  10. InfoLine = namedtuple(
  11. 'InfoLine',
  12. 'col_name data_type max_len num_prec num_scale extra column_default '
  13. 'collation is_unsigned'
  14. )
  15. class DatabaseIntrospection(BaseDatabaseIntrospection):
  16. data_types_reverse = {
  17. FIELD_TYPE.BLOB: 'TextField',
  18. FIELD_TYPE.CHAR: 'CharField',
  19. FIELD_TYPE.DECIMAL: 'DecimalField',
  20. FIELD_TYPE.NEWDECIMAL: 'DecimalField',
  21. FIELD_TYPE.DATE: 'DateField',
  22. FIELD_TYPE.DATETIME: 'DateTimeField',
  23. FIELD_TYPE.DOUBLE: 'FloatField',
  24. FIELD_TYPE.FLOAT: 'FloatField',
  25. FIELD_TYPE.INT24: 'IntegerField',
  26. FIELD_TYPE.JSON: 'JSONField',
  27. FIELD_TYPE.LONG: 'IntegerField',
  28. FIELD_TYPE.LONGLONG: 'BigIntegerField',
  29. FIELD_TYPE.SHORT: 'SmallIntegerField',
  30. FIELD_TYPE.STRING: 'CharField',
  31. FIELD_TYPE.TIME: 'TimeField',
  32. FIELD_TYPE.TIMESTAMP: 'DateTimeField',
  33. FIELD_TYPE.TINY: 'IntegerField',
  34. FIELD_TYPE.TINY_BLOB: 'TextField',
  35. FIELD_TYPE.MEDIUM_BLOB: 'TextField',
  36. FIELD_TYPE.LONG_BLOB: 'TextField',
  37. FIELD_TYPE.VAR_STRING: 'CharField',
  38. }
  39. def get_field_type(self, data_type, description):
  40. field_type = super().get_field_type(data_type, description)
  41. if 'auto_increment' in description.extra:
  42. if field_type == 'IntegerField':
  43. return 'AutoField'
  44. elif field_type == 'BigIntegerField':
  45. return 'BigAutoField'
  46. elif field_type == 'SmallIntegerField':
  47. return 'SmallAutoField'
  48. if description.is_unsigned:
  49. if field_type == 'BigIntegerField':
  50. return 'PositiveBigIntegerField'
  51. elif field_type == 'IntegerField':
  52. return 'PositiveIntegerField'
  53. elif field_type == 'SmallIntegerField':
  54. return 'PositiveSmallIntegerField'
  55. # JSON data type is an alias for LONGTEXT in MariaDB, use check
  56. # constraints clauses to introspect JSONField.
  57. if description.has_json_constraint:
  58. return 'JSONField'
  59. return field_type
  60. def get_table_list(self, cursor):
  61. """Return a list of table and view names in the current database."""
  62. cursor.execute("SHOW FULL TABLES")
  63. return [TableInfo(row[0], {'BASE TABLE': 't', 'VIEW': 'v'}.get(row[1]))
  64. for row in cursor.fetchall()]
  65. def get_table_description(self, cursor, table_name):
  66. """
  67. Return a description of the table with the DB-API cursor.description
  68. interface."
  69. """
  70. json_constraints = {}
  71. if self.connection.mysql_is_mariadb and self.connection.features.can_introspect_json_field:
  72. # JSON data type is an alias for LONGTEXT in MariaDB, select
  73. # JSON_VALID() constraints to introspect JSONField.
  74. cursor.execute("""
  75. SELECT c.constraint_name AS column_name
  76. FROM information_schema.check_constraints AS c
  77. WHERE
  78. c.table_name = %s AND
  79. LOWER(c.check_clause) = 'json_valid(`' + LOWER(c.constraint_name) + '`)' AND
  80. c.constraint_schema = DATABASE()
  81. """, [table_name])
  82. json_constraints = {row[0] for row in cursor.fetchall()}
  83. # A default collation for the given table.
  84. cursor.execute("""
  85. SELECT table_collation
  86. FROM information_schema.tables
  87. WHERE table_schema = DATABASE()
  88. AND table_name = %s
  89. """, [table_name])
  90. row = cursor.fetchone()
  91. default_column_collation = row[0] if row else ''
  92. # information_schema database gives more accurate results for some figures:
  93. # - varchar length returned by cursor.description is an internal length,
  94. # not visible length (#5725)
  95. # - precision and scale (for decimal fields) (#5014)
  96. # - auto_increment is not available in cursor.description
  97. cursor.execute("""
  98. SELECT
  99. column_name, data_type, character_maximum_length,
  100. numeric_precision, numeric_scale, extra, column_default,
  101. CASE
  102. WHEN collation_name = %s THEN NULL
  103. ELSE collation_name
  104. END AS collation_name,
  105. CASE
  106. WHEN column_type LIKE '%% unsigned' THEN 1
  107. ELSE 0
  108. END AS is_unsigned
  109. FROM information_schema.columns
  110. WHERE table_name = %s AND table_schema = DATABASE()
  111. """, [default_column_collation, table_name])
  112. field_info = {line[0]: InfoLine(*line) for line in cursor.fetchall()}
  113. cursor.execute("SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name))
  114. def to_int(i):
  115. return int(i) if i is not None else i
  116. fields = []
  117. for line in cursor.description:
  118. info = field_info[line[0]]
  119. fields.append(FieldInfo(
  120. *line[:3],
  121. to_int(info.max_len) or line[3],
  122. to_int(info.num_prec) or line[4],
  123. to_int(info.num_scale) or line[5],
  124. line[6],
  125. info.column_default,
  126. info.collation,
  127. info.extra,
  128. info.is_unsigned,
  129. line[0] in json_constraints,
  130. ))
  131. return fields
  132. def get_sequences(self, cursor, table_name, table_fields=()):
  133. for field_info in self.get_table_description(cursor, table_name):
  134. if 'auto_increment' in field_info.extra:
  135. # MySQL allows only one auto-increment column per table.
  136. return [{'table': table_name, 'column': field_info.name}]
  137. return []
  138. def get_relations(self, cursor, table_name):
  139. """
  140. Return a dictionary of {field_name: (field_name_other_table, other_table)}
  141. representing all relationships to the given table.
  142. """
  143. constraints = self.get_key_columns(cursor, table_name)
  144. relations = {}
  145. for my_fieldname, other_table, other_field in constraints:
  146. relations[my_fieldname] = (other_field, other_table)
  147. return relations
  148. def get_key_columns(self, cursor, table_name):
  149. """
  150. Return a list of (column_name, referenced_table_name, referenced_column_name)
  151. for all key columns in the given table.
  152. """
  153. key_columns = []
  154. cursor.execute("""
  155. SELECT column_name, referenced_table_name, referenced_column_name
  156. FROM information_schema.key_column_usage
  157. WHERE table_name = %s
  158. AND table_schema = DATABASE()
  159. AND referenced_table_name IS NOT NULL
  160. AND referenced_column_name IS NOT NULL""", [table_name])
  161. key_columns.extend(cursor.fetchall())
  162. return key_columns
  163. def get_storage_engine(self, cursor, table_name):
  164. """
  165. Retrieve the storage engine for a given table. Return the default
  166. storage engine if the table doesn't exist.
  167. """
  168. cursor.execute(
  169. "SELECT engine "
  170. "FROM information_schema.tables "
  171. "WHERE table_name = %s", [table_name])
  172. result = cursor.fetchone()
  173. if not result:
  174. return self.connection.features._mysql_storage_engine
  175. return result[0]
  176. def _parse_constraint_columns(self, check_clause, columns):
  177. check_columns = OrderedSet()
  178. statement = sqlparse.parse(check_clause)[0]
  179. tokens = (token for token in statement.flatten() if not token.is_whitespace)
  180. for token in tokens:
  181. if (
  182. token.ttype == sqlparse.tokens.Name and
  183. self.connection.ops.quote_name(token.value) == token.value and
  184. token.value[1:-1] in columns
  185. ):
  186. check_columns.add(token.value[1:-1])
  187. return check_columns
  188. def get_constraints(self, cursor, table_name):
  189. """
  190. Retrieve any constraints or keys (unique, pk, fk, check, index) across
  191. one or more columns.
  192. """
  193. constraints = {}
  194. # Get the actual constraint names and columns
  195. name_query = """
  196. SELECT kc.`constraint_name`, kc.`column_name`,
  197. kc.`referenced_table_name`, kc.`referenced_column_name`
  198. FROM information_schema.key_column_usage AS kc
  199. WHERE
  200. kc.table_schema = DATABASE() AND
  201. kc.table_name = %s
  202. ORDER BY kc.`ordinal_position`
  203. """
  204. cursor.execute(name_query, [table_name])
  205. for constraint, column, ref_table, ref_column in cursor.fetchall():
  206. if constraint not in constraints:
  207. constraints[constraint] = {
  208. 'columns': OrderedSet(),
  209. 'primary_key': False,
  210. 'unique': False,
  211. 'index': False,
  212. 'check': False,
  213. 'foreign_key': (ref_table, ref_column) if ref_column else None,
  214. }
  215. if self.connection.features.supports_index_column_ordering:
  216. constraints[constraint]['orders'] = []
  217. constraints[constraint]['columns'].add(column)
  218. # Now get the constraint types
  219. type_query = """
  220. SELECT c.constraint_name, c.constraint_type
  221. FROM information_schema.table_constraints AS c
  222. WHERE
  223. c.table_schema = DATABASE() AND
  224. c.table_name = %s
  225. """
  226. cursor.execute(type_query, [table_name])
  227. for constraint, kind in cursor.fetchall():
  228. if kind.lower() == "primary key":
  229. constraints[constraint]['primary_key'] = True
  230. constraints[constraint]['unique'] = True
  231. elif kind.lower() == "unique":
  232. constraints[constraint]['unique'] = True
  233. # Add check constraints.
  234. if self.connection.features.can_introspect_check_constraints:
  235. unnamed_constraints_index = 0
  236. columns = {info.name for info in self.get_table_description(cursor, table_name)}
  237. if self.connection.mysql_is_mariadb:
  238. type_query = """
  239. SELECT c.constraint_name, c.check_clause
  240. FROM information_schema.check_constraints AS c
  241. WHERE
  242. c.constraint_schema = DATABASE() AND
  243. c.table_name = %s
  244. """
  245. else:
  246. type_query = """
  247. SELECT cc.constraint_name, cc.check_clause
  248. FROM
  249. information_schema.check_constraints AS cc,
  250. information_schema.table_constraints AS tc
  251. WHERE
  252. cc.constraint_schema = DATABASE() AND
  253. tc.table_schema = cc.constraint_schema AND
  254. cc.constraint_name = tc.constraint_name AND
  255. tc.constraint_type = 'CHECK' AND
  256. tc.table_name = %s
  257. """
  258. cursor.execute(type_query, [table_name])
  259. for constraint, check_clause in cursor.fetchall():
  260. constraint_columns = self._parse_constraint_columns(check_clause, columns)
  261. # Ensure uniqueness of unnamed constraints. Unnamed unique
  262. # and check columns constraints have the same name as
  263. # a column.
  264. if set(constraint_columns) == {constraint}:
  265. unnamed_constraints_index += 1
  266. constraint = '__unnamed_constraint_%s__' % unnamed_constraints_index
  267. constraints[constraint] = {
  268. 'columns': constraint_columns,
  269. 'primary_key': False,
  270. 'unique': False,
  271. 'index': False,
  272. 'check': True,
  273. 'foreign_key': None,
  274. }
  275. # Now add in the indexes
  276. cursor.execute("SHOW INDEX FROM %s" % self.connection.ops.quote_name(table_name))
  277. for table, non_unique, index, colseq, column, order, type_ in [
  278. x[:6] + (x[10],) for x in cursor.fetchall()
  279. ]:
  280. if index not in constraints:
  281. constraints[index] = {
  282. 'columns': OrderedSet(),
  283. 'primary_key': False,
  284. 'unique': False,
  285. 'check': False,
  286. 'foreign_key': None,
  287. }
  288. if self.connection.features.supports_index_column_ordering:
  289. constraints[index]['orders'] = []
  290. constraints[index]['index'] = True
  291. constraints[index]['type'] = Index.suffix if type_ == 'BTREE' else type_.lower()
  292. constraints[index]['columns'].add(column)
  293. if self.connection.features.supports_index_column_ordering:
  294. constraints[index]['orders'].append('DESC' if order == 'D' else 'ASC')
  295. # Convert the sorted sets to lists
  296. for constraint in constraints.values():
  297. constraint['columns'] = list(constraint['columns'])
  298. return constraints