introspection.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. import re
  2. from collections import namedtuple
  3. import sqlparse
  4. from django.db.backends.base.introspection import (
  5. BaseDatabaseIntrospection, FieldInfo as BaseFieldInfo, TableInfo,
  6. )
  7. from django.db.models import Index
  8. from django.utils.regex_helper import _lazy_re_compile
  9. FieldInfo = namedtuple('FieldInfo', BaseFieldInfo._fields + ('pk', 'has_json_constraint'))
  10. field_size_re = _lazy_re_compile(r'^\s*(?:var)?char\s*\(\s*(\d+)\s*\)\s*$')
  11. def get_field_size(name):
  12. """ Extract the size number from a "varchar(11)" type name """
  13. m = field_size_re.search(name)
  14. return int(m[1]) if m else None
  15. # This light wrapper "fakes" a dictionary interface, because some SQLite data
  16. # types include variables in them -- e.g. "varchar(30)" -- and can't be matched
  17. # as a simple dictionary lookup.
  18. class FlexibleFieldLookupDict:
  19. # Maps SQL types to Django Field types. Some of the SQL types have multiple
  20. # entries here because SQLite allows for anything and doesn't normalize the
  21. # field type; it uses whatever was given.
  22. base_data_types_reverse = {
  23. 'bool': 'BooleanField',
  24. 'boolean': 'BooleanField',
  25. 'smallint': 'SmallIntegerField',
  26. 'smallint unsigned': 'PositiveSmallIntegerField',
  27. 'smallinteger': 'SmallIntegerField',
  28. 'int': 'IntegerField',
  29. 'integer': 'IntegerField',
  30. 'bigint': 'BigIntegerField',
  31. 'integer unsigned': 'PositiveIntegerField',
  32. 'bigint unsigned': 'PositiveBigIntegerField',
  33. 'decimal': 'DecimalField',
  34. 'real': 'FloatField',
  35. 'text': 'TextField',
  36. 'char': 'CharField',
  37. 'varchar': 'CharField',
  38. 'blob': 'BinaryField',
  39. 'date': 'DateField',
  40. 'datetime': 'DateTimeField',
  41. 'time': 'TimeField',
  42. }
  43. def __getitem__(self, key):
  44. key = key.lower().split('(', 1)[0].strip()
  45. return self.base_data_types_reverse[key]
  46. class DatabaseIntrospection(BaseDatabaseIntrospection):
  47. data_types_reverse = FlexibleFieldLookupDict()
  48. def get_field_type(self, data_type, description):
  49. field_type = super().get_field_type(data_type, description)
  50. if description.pk and field_type in {'BigIntegerField', 'IntegerField', 'SmallIntegerField'}:
  51. # No support for BigAutoField or SmallAutoField as SQLite treats
  52. # all integer primary keys as signed 64-bit integers.
  53. return 'AutoField'
  54. if description.has_json_constraint:
  55. return 'JSONField'
  56. return field_type
  57. def get_table_list(self, cursor):
  58. """Return a list of table and view names in the current database."""
  59. # Skip the sqlite_sequence system table used for autoincrement key
  60. # generation.
  61. cursor.execute("""
  62. SELECT name, type FROM sqlite_master
  63. WHERE type in ('table', 'view') AND NOT name='sqlite_sequence'
  64. ORDER BY name""")
  65. return [TableInfo(row[0], row[1][0]) for row in cursor.fetchall()]
  66. def get_table_description(self, cursor, table_name):
  67. """
  68. Return a description of the table with the DB-API cursor.description
  69. interface.
  70. """
  71. cursor.execute('PRAGMA table_info(%s)' % self.connection.ops.quote_name(table_name))
  72. table_info = cursor.fetchall()
  73. collations = self._get_column_collations(cursor, table_name)
  74. json_columns = set()
  75. if self.connection.features.can_introspect_json_field:
  76. for line in table_info:
  77. column = line[1]
  78. json_constraint_sql = '%%json_valid("%s")%%' % column
  79. has_json_constraint = cursor.execute("""
  80. SELECT sql
  81. FROM sqlite_master
  82. WHERE
  83. type = 'table' AND
  84. name = %s AND
  85. sql LIKE %s
  86. """, [table_name, json_constraint_sql]).fetchone()
  87. if has_json_constraint:
  88. json_columns.add(column)
  89. return [
  90. FieldInfo(
  91. name, data_type, None, get_field_size(data_type), None, None,
  92. not notnull, default, collations.get(name), pk == 1, name in json_columns
  93. )
  94. for cid, name, data_type, notnull, default, pk in table_info
  95. ]
  96. def get_sequences(self, cursor, table_name, table_fields=()):
  97. pk_col = self.get_primary_key_column(cursor, table_name)
  98. return [{'table': table_name, 'column': pk_col}]
  99. def get_relations(self, cursor, table_name):
  100. """
  101. Return a dictionary of {field_name: (field_name_other_table, other_table)}
  102. representing all relationships to the given table.
  103. """
  104. # Dictionary of relations to return
  105. relations = {}
  106. # Schema for this table
  107. cursor.execute(
  108. "SELECT sql, type FROM sqlite_master "
  109. "WHERE tbl_name = %s AND type IN ('table', 'view')",
  110. [table_name]
  111. )
  112. create_sql, table_type = cursor.fetchone()
  113. if table_type == 'view':
  114. # It might be a view, then no results will be returned
  115. return relations
  116. results = create_sql[create_sql.index('(') + 1:create_sql.rindex(')')]
  117. # Walk through and look for references to other tables. SQLite doesn't
  118. # really have enforced references, but since it echoes out the SQL used
  119. # to create the table we can look for REFERENCES statements used there.
  120. for field_desc in results.split(','):
  121. field_desc = field_desc.strip()
  122. if field_desc.startswith("UNIQUE"):
  123. continue
  124. m = re.search(r'references (\S*) ?\(["|]?(.*)["|]?\)', field_desc, re.I)
  125. if not m:
  126. continue
  127. table, column = [s.strip('"') for s in m.groups()]
  128. if field_desc.startswith("FOREIGN KEY"):
  129. # Find name of the target FK field
  130. m = re.match(r'FOREIGN KEY\s*\(([^\)]*)\).*', field_desc, re.I)
  131. field_name = m[1].strip('"')
  132. else:
  133. field_name = field_desc.split()[0].strip('"')
  134. cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s", [table])
  135. result = cursor.fetchall()[0]
  136. other_table_results = result[0].strip()
  137. li, ri = other_table_results.index('('), other_table_results.rindex(')')
  138. other_table_results = other_table_results[li + 1:ri]
  139. for other_desc in other_table_results.split(','):
  140. other_desc = other_desc.strip()
  141. if other_desc.startswith('UNIQUE'):
  142. continue
  143. other_name = other_desc.split(' ', 1)[0].strip('"')
  144. if other_name == column:
  145. relations[field_name] = (other_name, table)
  146. break
  147. return relations
  148. def get_key_columns(self, cursor, table_name):
  149. """
  150. Return a list of (column_name, referenced_table_name, referenced_column_name)
  151. for all key columns in given table.
  152. """
  153. key_columns = []
  154. # Schema for this table
  155. cursor.execute("SELECT sql FROM sqlite_master WHERE tbl_name = %s AND type = %s", [table_name, "table"])
  156. results = cursor.fetchone()[0].strip()
  157. results = results[results.index('(') + 1:results.rindex(')')]
  158. # Walk through and look for references to other tables. SQLite doesn't
  159. # really have enforced references, but since it echoes out the SQL used
  160. # to create the table we can look for REFERENCES statements used there.
  161. for field_index, field_desc in enumerate(results.split(',')):
  162. field_desc = field_desc.strip()
  163. if field_desc.startswith("UNIQUE"):
  164. continue
  165. m = re.search(r'"(.*)".*references (.*) \(["|](.*)["|]\)', field_desc, re.I)
  166. if not m:
  167. continue
  168. # This will append (column_name, referenced_table_name, referenced_column_name) to key_columns
  169. key_columns.append(tuple(s.strip('"') for s in m.groups()))
  170. return key_columns
  171. def get_primary_key_column(self, cursor, table_name):
  172. """Return the column name of the primary key for the given table."""
  173. # Don't use PRAGMA because that causes issues with some transactions
  174. cursor.execute(
  175. "SELECT sql, type FROM sqlite_master "
  176. "WHERE tbl_name = %s AND type IN ('table', 'view')",
  177. [table_name]
  178. )
  179. row = cursor.fetchone()
  180. if row is None:
  181. raise ValueError("Table %s does not exist" % table_name)
  182. create_sql, table_type = row
  183. if table_type == 'view':
  184. # Views don't have a primary key.
  185. return None
  186. fields_sql = create_sql[create_sql.index('(') + 1:create_sql.rindex(')')]
  187. for field_desc in fields_sql.split(','):
  188. field_desc = field_desc.strip()
  189. m = re.match(r'(?:(?:["`\[])(.*)(?:["`\]])|(\w+)).*PRIMARY KEY.*', field_desc)
  190. if m:
  191. return m[1] if m[1] else m[2]
  192. return None
  193. def _get_foreign_key_constraints(self, cursor, table_name):
  194. constraints = {}
  195. cursor.execute('PRAGMA foreign_key_list(%s)' % self.connection.ops.quote_name(table_name))
  196. for row in cursor.fetchall():
  197. # Remaining on_update/on_delete/match values are of no interest.
  198. id_, _, table, from_, to = row[:5]
  199. constraints['fk_%d' % id_] = {
  200. 'columns': [from_],
  201. 'primary_key': False,
  202. 'unique': False,
  203. 'foreign_key': (table, to),
  204. 'check': False,
  205. 'index': False,
  206. }
  207. return constraints
  208. def _parse_column_or_constraint_definition(self, tokens, columns):
  209. token = None
  210. is_constraint_definition = None
  211. field_name = None
  212. constraint_name = None
  213. unique = False
  214. unique_columns = []
  215. check = False
  216. check_columns = []
  217. braces_deep = 0
  218. for token in tokens:
  219. if token.match(sqlparse.tokens.Punctuation, '('):
  220. braces_deep += 1
  221. elif token.match(sqlparse.tokens.Punctuation, ')'):
  222. braces_deep -= 1
  223. if braces_deep < 0:
  224. # End of columns and constraints for table definition.
  225. break
  226. elif braces_deep == 0 and token.match(sqlparse.tokens.Punctuation, ','):
  227. # End of current column or constraint definition.
  228. break
  229. # Detect column or constraint definition by first token.
  230. if is_constraint_definition is None:
  231. is_constraint_definition = token.match(sqlparse.tokens.Keyword, 'CONSTRAINT')
  232. if is_constraint_definition:
  233. continue
  234. if is_constraint_definition:
  235. # Detect constraint name by second token.
  236. if constraint_name is None:
  237. if token.ttype in (sqlparse.tokens.Name, sqlparse.tokens.Keyword):
  238. constraint_name = token.value
  239. elif token.ttype == sqlparse.tokens.Literal.String.Symbol:
  240. constraint_name = token.value[1:-1]
  241. # Start constraint columns parsing after UNIQUE keyword.
  242. if token.match(sqlparse.tokens.Keyword, 'UNIQUE'):
  243. unique = True
  244. unique_braces_deep = braces_deep
  245. elif unique:
  246. if unique_braces_deep == braces_deep:
  247. if unique_columns:
  248. # Stop constraint parsing.
  249. unique = False
  250. continue
  251. if token.ttype in (sqlparse.tokens.Name, sqlparse.tokens.Keyword):
  252. unique_columns.append(token.value)
  253. elif token.ttype == sqlparse.tokens.Literal.String.Symbol:
  254. unique_columns.append(token.value[1:-1])
  255. else:
  256. # Detect field name by first token.
  257. if field_name is None:
  258. if token.ttype in (sqlparse.tokens.Name, sqlparse.tokens.Keyword):
  259. field_name = token.value
  260. elif token.ttype == sqlparse.tokens.Literal.String.Symbol:
  261. field_name = token.value[1:-1]
  262. if token.match(sqlparse.tokens.Keyword, 'UNIQUE'):
  263. unique_columns = [field_name]
  264. # Start constraint columns parsing after CHECK keyword.
  265. if token.match(sqlparse.tokens.Keyword, 'CHECK'):
  266. check = True
  267. check_braces_deep = braces_deep
  268. elif check:
  269. if check_braces_deep == braces_deep:
  270. if check_columns:
  271. # Stop constraint parsing.
  272. check = False
  273. continue
  274. if token.ttype in (sqlparse.tokens.Name, sqlparse.tokens.Keyword):
  275. if token.value in columns:
  276. check_columns.append(token.value)
  277. elif token.ttype == sqlparse.tokens.Literal.String.Symbol:
  278. if token.value[1:-1] in columns:
  279. check_columns.append(token.value[1:-1])
  280. unique_constraint = {
  281. 'unique': True,
  282. 'columns': unique_columns,
  283. 'primary_key': False,
  284. 'foreign_key': None,
  285. 'check': False,
  286. 'index': False,
  287. } if unique_columns else None
  288. check_constraint = {
  289. 'check': True,
  290. 'columns': check_columns,
  291. 'primary_key': False,
  292. 'unique': False,
  293. 'foreign_key': None,
  294. 'index': False,
  295. } if check_columns else None
  296. return constraint_name, unique_constraint, check_constraint, token
  297. def _parse_table_constraints(self, sql, columns):
  298. # Check constraint parsing is based of SQLite syntax diagram.
  299. # https://www.sqlite.org/syntaxdiagrams.html#table-constraint
  300. statement = sqlparse.parse(sql)[0]
  301. constraints = {}
  302. unnamed_constrains_index = 0
  303. tokens = (token for token in statement.flatten() if not token.is_whitespace)
  304. # Go to columns and constraint definition
  305. for token in tokens:
  306. if token.match(sqlparse.tokens.Punctuation, '('):
  307. break
  308. # Parse columns and constraint definition
  309. while True:
  310. constraint_name, unique, check, end_token = self._parse_column_or_constraint_definition(tokens, columns)
  311. if unique:
  312. if constraint_name:
  313. constraints[constraint_name] = unique
  314. else:
  315. unnamed_constrains_index += 1
  316. constraints['__unnamed_constraint_%s__' % unnamed_constrains_index] = unique
  317. if check:
  318. if constraint_name:
  319. constraints[constraint_name] = check
  320. else:
  321. unnamed_constrains_index += 1
  322. constraints['__unnamed_constraint_%s__' % unnamed_constrains_index] = check
  323. if end_token.match(sqlparse.tokens.Punctuation, ')'):
  324. break
  325. return constraints
  326. def get_constraints(self, cursor, table_name):
  327. """
  328. Retrieve any constraints or keys (unique, pk, fk, check, index) across
  329. one or more columns.
  330. """
  331. constraints = {}
  332. # Find inline check constraints.
  333. try:
  334. table_schema = cursor.execute(
  335. "SELECT sql FROM sqlite_master WHERE type='table' and name=%s" % (
  336. self.connection.ops.quote_name(table_name),
  337. )
  338. ).fetchone()[0]
  339. except TypeError:
  340. # table_name is a view.
  341. pass
  342. else:
  343. columns = {info.name for info in self.get_table_description(cursor, table_name)}
  344. constraints.update(self._parse_table_constraints(table_schema, columns))
  345. # Get the index info
  346. cursor.execute("PRAGMA index_list(%s)" % self.connection.ops.quote_name(table_name))
  347. for row in cursor.fetchall():
  348. # SQLite 3.8.9+ has 5 columns, however older versions only give 3
  349. # columns. Discard last 2 columns if there.
  350. number, index, unique = row[:3]
  351. cursor.execute(
  352. "SELECT sql FROM sqlite_master "
  353. "WHERE type='index' AND name=%s" % self.connection.ops.quote_name(index)
  354. )
  355. # There's at most one row.
  356. sql, = cursor.fetchone() or (None,)
  357. # Inline constraints are already detected in
  358. # _parse_table_constraints(). The reasons to avoid fetching inline
  359. # constraints from `PRAGMA index_list` are:
  360. # - Inline constraints can have a different name and information
  361. # than what `PRAGMA index_list` gives.
  362. # - Not all inline constraints may appear in `PRAGMA index_list`.
  363. if not sql:
  364. # An inline constraint
  365. continue
  366. # Get the index info for that index
  367. cursor.execute('PRAGMA index_info(%s)' % self.connection.ops.quote_name(index))
  368. for index_rank, column_rank, column in cursor.fetchall():
  369. if index not in constraints:
  370. constraints[index] = {
  371. "columns": [],
  372. "primary_key": False,
  373. "unique": bool(unique),
  374. "foreign_key": None,
  375. "check": False,
  376. "index": True,
  377. }
  378. constraints[index]['columns'].append(column)
  379. # Add type and column orders for indexes
  380. if constraints[index]['index']:
  381. # SQLite doesn't support any index type other than b-tree
  382. constraints[index]['type'] = Index.suffix
  383. orders = self._get_index_columns_orders(sql)
  384. if orders is not None:
  385. constraints[index]['orders'] = orders
  386. # Get the PK
  387. pk_column = self.get_primary_key_column(cursor, table_name)
  388. if pk_column:
  389. # SQLite doesn't actually give a name to the PK constraint,
  390. # so we invent one. This is fine, as the SQLite backend never
  391. # deletes PK constraints by name, as you can't delete constraints
  392. # in SQLite; we remake the table with a new PK instead.
  393. constraints["__primary__"] = {
  394. "columns": [pk_column],
  395. "primary_key": True,
  396. "unique": False, # It's not actually a unique constraint.
  397. "foreign_key": None,
  398. "check": False,
  399. "index": False,
  400. }
  401. constraints.update(self._get_foreign_key_constraints(cursor, table_name))
  402. return constraints
  403. def _get_index_columns_orders(self, sql):
  404. tokens = sqlparse.parse(sql)[0]
  405. for token in tokens:
  406. if isinstance(token, sqlparse.sql.Parenthesis):
  407. columns = str(token).strip('()').split(', ')
  408. return ['DESC' if info.endswith('DESC') else 'ASC' for info in columns]
  409. return None
  410. def _get_column_collations(self, cursor, table_name):
  411. row = cursor.execute("""
  412. SELECT sql
  413. FROM sqlite_master
  414. WHERE type = 'table' AND name = %s
  415. """, [table_name]).fetchone()
  416. if not row:
  417. return {}
  418. sql = row[0]
  419. columns = str(sqlparse.parse(sql)[0][-1]).strip('()').split(', ')
  420. collations = {}
  421. for column in columns:
  422. tokens = column[1:].split()
  423. column_name = tokens[0].strip('"')
  424. for index, token in enumerate(tokens):
  425. if token == 'COLLATE':
  426. collation = tokens[index + 1]
  427. break
  428. else:
  429. collation = None
  430. collations[column_name] = collation
  431. return collations