creation.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import os
  2. import shutil
  3. import sys
  4. from pathlib import Path
  5. from django.db.backends.base.creation import BaseDatabaseCreation
  6. class DatabaseCreation(BaseDatabaseCreation):
  7. @staticmethod
  8. def is_in_memory_db(database_name):
  9. return not isinstance(database_name, Path) and (
  10. database_name == ':memory:' or 'mode=memory' in database_name
  11. )
  12. def _get_test_db_name(self):
  13. test_database_name = self.connection.settings_dict['TEST']['NAME'] or ':memory:'
  14. if test_database_name == ':memory:':
  15. return 'file:memorydb_%s?mode=memory&cache=shared' % self.connection.alias
  16. return test_database_name
  17. def _create_test_db(self, verbosity, autoclobber, keepdb=False):
  18. test_database_name = self._get_test_db_name()
  19. if keepdb:
  20. return test_database_name
  21. if not self.is_in_memory_db(test_database_name):
  22. # Erase the old test database
  23. if verbosity >= 1:
  24. self.log('Destroying old test database for alias %s...' % (
  25. self._get_database_display_str(verbosity, test_database_name),
  26. ))
  27. if os.access(test_database_name, os.F_OK):
  28. if not autoclobber:
  29. confirm = input(
  30. "Type 'yes' if you would like to try deleting the test "
  31. "database '%s', or 'no' to cancel: " % test_database_name
  32. )
  33. if autoclobber or confirm == 'yes':
  34. try:
  35. os.remove(test_database_name)
  36. except Exception as e:
  37. self.log('Got an error deleting the old test database: %s' % e)
  38. sys.exit(2)
  39. else:
  40. self.log('Tests cancelled.')
  41. sys.exit(1)
  42. return test_database_name
  43. def get_test_db_clone_settings(self, suffix):
  44. orig_settings_dict = self.connection.settings_dict
  45. source_database_name = orig_settings_dict['NAME']
  46. if self.is_in_memory_db(source_database_name):
  47. return orig_settings_dict
  48. else:
  49. root, ext = os.path.splitext(orig_settings_dict['NAME'])
  50. return {**orig_settings_dict, 'NAME': '{}_{}.{}'.format(root, suffix, ext)}
  51. def _clone_test_db(self, suffix, verbosity, keepdb=False):
  52. source_database_name = self.connection.settings_dict['NAME']
  53. target_database_name = self.get_test_db_clone_settings(suffix)['NAME']
  54. # Forking automatically makes a copy of an in-memory database.
  55. if not self.is_in_memory_db(source_database_name):
  56. # Erase the old test database
  57. if os.access(target_database_name, os.F_OK):
  58. if keepdb:
  59. return
  60. if verbosity >= 1:
  61. self.log('Destroying old test database for alias %s...' % (
  62. self._get_database_display_str(verbosity, target_database_name),
  63. ))
  64. try:
  65. os.remove(target_database_name)
  66. except Exception as e:
  67. self.log('Got an error deleting the old test database: %s' % e)
  68. sys.exit(2)
  69. try:
  70. shutil.copy(source_database_name, target_database_name)
  71. except Exception as e:
  72. self.log('Got an error cloning the test database: %s' % e)
  73. sys.exit(2)
  74. def _destroy_test_db(self, test_database_name, verbosity):
  75. if test_database_name and not self.is_in_memory_db(test_database_name):
  76. # Remove the SQLite database file
  77. os.remove(test_database_name)
  78. def test_db_signature(self):
  79. """
  80. Return a tuple that uniquely identifies a test database.
  81. This takes into account the special cases of ":memory:" and "" for
  82. SQLite since the databases will be distinct despite having the same
  83. TEST NAME. See https://www.sqlite.org/inmemorydb.html
  84. """
  85. test_database_name = self._get_test_db_name()
  86. sig = [self.connection.settings_dict['NAME']]
  87. if self.is_in_memory_db(test_database_name):
  88. sig.append(self.connection.alias)
  89. else:
  90. sig.append(test_database_name)
  91. return tuple(sig)