main.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. from datetime import datetime, timedelta
  2. from django import forms
  3. from django.conf import settings
  4. from django.contrib import messages
  5. from django.contrib.admin import FieldListFilter
  6. from django.contrib.admin.exceptions import (
  7. DisallowedModelAdminLookup, DisallowedModelAdminToField,
  8. )
  9. from django.contrib.admin.options import (
  10. IS_POPUP_VAR, TO_FIELD_VAR, IncorrectLookupParameters,
  11. )
  12. from django.contrib.admin.utils import (
  13. get_fields_from_path, lookup_needs_distinct, prepare_lookup_value, quote,
  14. )
  15. from django.core.exceptions import (
  16. FieldDoesNotExist, ImproperlyConfigured, SuspiciousOperation,
  17. )
  18. from django.core.paginator import InvalidPage
  19. from django.db.models import Exists, F, Field, ManyToOneRel, OrderBy, OuterRef
  20. from django.db.models.expressions import Combinable
  21. from django.urls import reverse
  22. from django.utils.http import urlencode
  23. from django.utils.timezone import make_aware
  24. from django.utils.translation import gettext
  25. # Changelist settings
  26. ALL_VAR = 'all'
  27. ORDER_VAR = 'o'
  28. ORDER_TYPE_VAR = 'ot'
  29. PAGE_VAR = 'p'
  30. SEARCH_VAR = 'q'
  31. ERROR_FLAG = 'e'
  32. IGNORED_PARAMS = (
  33. ALL_VAR, ORDER_VAR, ORDER_TYPE_VAR, SEARCH_VAR, IS_POPUP_VAR, TO_FIELD_VAR)
  34. class ChangeListSearchForm(forms.Form):
  35. def __init__(self, *args, **kwargs):
  36. super().__init__(*args, **kwargs)
  37. # Populate "fields" dynamically because SEARCH_VAR is a variable:
  38. self.fields = {
  39. SEARCH_VAR: forms.CharField(required=False, strip=False),
  40. }
  41. class ChangeList:
  42. search_form_class = ChangeListSearchForm
  43. def __init__(self, request, model, list_display, list_display_links,
  44. list_filter, date_hierarchy, search_fields, list_select_related,
  45. list_per_page, list_max_show_all, list_editable, model_admin, sortable_by):
  46. self.model = model
  47. self.opts = model._meta
  48. self.lookup_opts = self.opts
  49. self.root_queryset = model_admin.get_queryset(request)
  50. self.list_display = list_display
  51. self.list_display_links = list_display_links
  52. self.list_filter = list_filter
  53. self.has_filters = None
  54. self.has_active_filters = None
  55. self.clear_all_filters_qs = None
  56. self.date_hierarchy = date_hierarchy
  57. self.search_fields = search_fields
  58. self.list_select_related = list_select_related
  59. self.list_per_page = list_per_page
  60. self.list_max_show_all = list_max_show_all
  61. self.model_admin = model_admin
  62. self.preserved_filters = model_admin.get_preserved_filters(request)
  63. self.sortable_by = sortable_by
  64. # Get search parameters from the query string.
  65. _search_form = self.search_form_class(request.GET)
  66. if not _search_form.is_valid():
  67. for error in _search_form.errors.values():
  68. messages.error(request, ', '.join(error))
  69. self.query = _search_form.cleaned_data.get(SEARCH_VAR) or ''
  70. try:
  71. self.page_num = int(request.GET.get(PAGE_VAR, 1))
  72. except ValueError:
  73. self.page_num = 1
  74. self.show_all = ALL_VAR in request.GET
  75. self.is_popup = IS_POPUP_VAR in request.GET
  76. to_field = request.GET.get(TO_FIELD_VAR)
  77. if to_field and not model_admin.to_field_allowed(request, to_field):
  78. raise DisallowedModelAdminToField("The field %s cannot be referenced." % to_field)
  79. self.to_field = to_field
  80. self.params = dict(request.GET.items())
  81. if PAGE_VAR in self.params:
  82. del self.params[PAGE_VAR]
  83. if ERROR_FLAG in self.params:
  84. del self.params[ERROR_FLAG]
  85. if self.is_popup:
  86. self.list_editable = ()
  87. else:
  88. self.list_editable = list_editable
  89. self.queryset = self.get_queryset(request)
  90. self.get_results(request)
  91. if self.is_popup:
  92. title = gettext('Select %s')
  93. elif self.model_admin.has_change_permission(request):
  94. title = gettext('Select %s to change')
  95. else:
  96. title = gettext('Select %s to view')
  97. self.title = title % self.opts.verbose_name
  98. self.pk_attname = self.lookup_opts.pk.attname
  99. def get_filters_params(self, params=None):
  100. """
  101. Return all params except IGNORED_PARAMS.
  102. """
  103. params = params or self.params
  104. lookup_params = params.copy() # a dictionary of the query string
  105. # Remove all the parameters that are globally and systematically
  106. # ignored.
  107. for ignored in IGNORED_PARAMS:
  108. if ignored in lookup_params:
  109. del lookup_params[ignored]
  110. return lookup_params
  111. def get_filters(self, request):
  112. lookup_params = self.get_filters_params()
  113. may_have_duplicates = False
  114. has_active_filters = False
  115. for key, value in lookup_params.items():
  116. if not self.model_admin.lookup_allowed(key, value):
  117. raise DisallowedModelAdminLookup("Filtering by %s not allowed" % key)
  118. filter_specs = []
  119. for list_filter in self.list_filter:
  120. lookup_params_count = len(lookup_params)
  121. if callable(list_filter):
  122. # This is simply a custom list filter class.
  123. spec = list_filter(request, lookup_params, self.model, self.model_admin)
  124. else:
  125. field_path = None
  126. if isinstance(list_filter, (tuple, list)):
  127. # This is a custom FieldListFilter class for a given field.
  128. field, field_list_filter_class = list_filter
  129. else:
  130. # This is simply a field name, so use the default
  131. # FieldListFilter class that has been registered for the
  132. # type of the given field.
  133. field, field_list_filter_class = list_filter, FieldListFilter.create
  134. if not isinstance(field, Field):
  135. field_path = field
  136. field = get_fields_from_path(self.model, field_path)[-1]
  137. spec = field_list_filter_class(
  138. field, request, lookup_params,
  139. self.model, self.model_admin, field_path=field_path,
  140. )
  141. # field_list_filter_class removes any lookup_params it
  142. # processes. If that happened, check if distinct() is needed to
  143. # remove duplicate results.
  144. if lookup_params_count > len(lookup_params):
  145. may_have_duplicates |= lookup_needs_distinct(self.lookup_opts, field_path)
  146. if spec and spec.has_output():
  147. filter_specs.append(spec)
  148. if lookup_params_count > len(lookup_params):
  149. has_active_filters = True
  150. if self.date_hierarchy:
  151. # Create bounded lookup parameters so that the query is more
  152. # efficient.
  153. year = lookup_params.pop('%s__year' % self.date_hierarchy, None)
  154. if year is not None:
  155. month = lookup_params.pop('%s__month' % self.date_hierarchy, None)
  156. day = lookup_params.pop('%s__day' % self.date_hierarchy, None)
  157. try:
  158. from_date = datetime(
  159. int(year),
  160. int(month if month is not None else 1),
  161. int(day if day is not None else 1),
  162. )
  163. except ValueError as e:
  164. raise IncorrectLookupParameters(e) from e
  165. if day:
  166. to_date = from_date + timedelta(days=1)
  167. elif month:
  168. # In this branch, from_date will always be the first of a
  169. # month, so advancing 32 days gives the next month.
  170. to_date = (from_date + timedelta(days=32)).replace(day=1)
  171. else:
  172. to_date = from_date.replace(year=from_date.year + 1)
  173. if settings.USE_TZ:
  174. from_date = make_aware(from_date)
  175. to_date = make_aware(to_date)
  176. lookup_params.update({
  177. '%s__gte' % self.date_hierarchy: from_date,
  178. '%s__lt' % self.date_hierarchy: to_date,
  179. })
  180. # At this point, all the parameters used by the various ListFilters
  181. # have been removed from lookup_params, which now only contains other
  182. # parameters passed via the query string. We now loop through the
  183. # remaining parameters both to ensure that all the parameters are valid
  184. # fields and to determine if at least one of them needs distinct(). If
  185. # the lookup parameters aren't real fields, then bail out.
  186. try:
  187. for key, value in lookup_params.items():
  188. lookup_params[key] = prepare_lookup_value(key, value)
  189. may_have_duplicates |= lookup_needs_distinct(self.lookup_opts, key)
  190. return (
  191. filter_specs, bool(filter_specs), lookup_params, may_have_duplicates,
  192. has_active_filters,
  193. )
  194. except FieldDoesNotExist as e:
  195. raise IncorrectLookupParameters(e) from e
  196. def get_query_string(self, new_params=None, remove=None):
  197. if new_params is None:
  198. new_params = {}
  199. if remove is None:
  200. remove = []
  201. p = self.params.copy()
  202. for r in remove:
  203. for k in list(p):
  204. if k.startswith(r):
  205. del p[k]
  206. for k, v in new_params.items():
  207. if v is None:
  208. if k in p:
  209. del p[k]
  210. else:
  211. p[k] = v
  212. return '?%s' % urlencode(sorted(p.items()))
  213. def get_results(self, request):
  214. paginator = self.model_admin.get_paginator(request, self.queryset, self.list_per_page)
  215. # Get the number of objects, with admin filters applied.
  216. result_count = paginator.count
  217. # Get the total number of objects, with no admin filters applied.
  218. if self.model_admin.show_full_result_count:
  219. full_result_count = self.root_queryset.count()
  220. else:
  221. full_result_count = None
  222. can_show_all = result_count <= self.list_max_show_all
  223. multi_page = result_count > self.list_per_page
  224. # Get the list of objects to display on this page.
  225. if (self.show_all and can_show_all) or not multi_page:
  226. result_list = self.queryset._clone()
  227. else:
  228. try:
  229. result_list = paginator.page(self.page_num).object_list
  230. except InvalidPage:
  231. raise IncorrectLookupParameters
  232. self.result_count = result_count
  233. self.show_full_result_count = self.model_admin.show_full_result_count
  234. # Admin actions are shown if there is at least one entry
  235. # or if entries are not counted because show_full_result_count is disabled
  236. self.show_admin_actions = not self.show_full_result_count or bool(full_result_count)
  237. self.full_result_count = full_result_count
  238. self.result_list = result_list
  239. self.can_show_all = can_show_all
  240. self.multi_page = multi_page
  241. self.paginator = paginator
  242. def _get_default_ordering(self):
  243. ordering = []
  244. if self.model_admin.ordering:
  245. ordering = self.model_admin.ordering
  246. elif self.lookup_opts.ordering:
  247. ordering = self.lookup_opts.ordering
  248. return ordering
  249. def get_ordering_field(self, field_name):
  250. """
  251. Return the proper model field name corresponding to the given
  252. field_name to use for ordering. field_name may either be the name of a
  253. proper model field or the name of a method (on the admin or model) or a
  254. callable with the 'admin_order_field' attribute. Return None if no
  255. proper model field name can be matched.
  256. """
  257. try:
  258. field = self.lookup_opts.get_field(field_name)
  259. return field.name
  260. except FieldDoesNotExist:
  261. # See whether field_name is a name of a non-field
  262. # that allows sorting.
  263. if callable(field_name):
  264. attr = field_name
  265. elif hasattr(self.model_admin, field_name):
  266. attr = getattr(self.model_admin, field_name)
  267. else:
  268. attr = getattr(self.model, field_name)
  269. if isinstance(attr, property) and hasattr(attr, 'fget'):
  270. attr = attr.fget
  271. return getattr(attr, 'admin_order_field', None)
  272. def get_ordering(self, request, queryset):
  273. """
  274. Return the list of ordering fields for the change list.
  275. First check the get_ordering() method in model admin, then check
  276. the object's default ordering. Then, any manually-specified ordering
  277. from the query string overrides anything. Finally, a deterministic
  278. order is guaranteed by calling _get_deterministic_ordering() with the
  279. constructed ordering.
  280. """
  281. params = self.params
  282. ordering = list(self.model_admin.get_ordering(request) or self._get_default_ordering())
  283. if ORDER_VAR in params:
  284. # Clear ordering and used params
  285. ordering = []
  286. order_params = params[ORDER_VAR].split('.')
  287. for p in order_params:
  288. try:
  289. none, pfx, idx = p.rpartition('-')
  290. field_name = self.list_display[int(idx)]
  291. order_field = self.get_ordering_field(field_name)
  292. if not order_field:
  293. continue # No 'admin_order_field', skip it
  294. if isinstance(order_field, OrderBy):
  295. if pfx == '-':
  296. order_field = order_field.copy()
  297. order_field.reverse_ordering()
  298. ordering.append(order_field)
  299. elif hasattr(order_field, 'resolve_expression'):
  300. # order_field is an expression.
  301. ordering.append(order_field.desc() if pfx == '-' else order_field.asc())
  302. # reverse order if order_field has already "-" as prefix
  303. elif order_field.startswith('-') and pfx == '-':
  304. ordering.append(order_field[1:])
  305. else:
  306. ordering.append(pfx + order_field)
  307. except (IndexError, ValueError):
  308. continue # Invalid ordering specified, skip it.
  309. # Add the given query's ordering fields, if any.
  310. ordering.extend(queryset.query.order_by)
  311. return self._get_deterministic_ordering(ordering)
  312. def _get_deterministic_ordering(self, ordering):
  313. """
  314. Ensure a deterministic order across all database backends. Search for a
  315. single field or unique together set of fields providing a total
  316. ordering. If these are missing, augment the ordering with a descendant
  317. primary key.
  318. """
  319. ordering = list(ordering)
  320. ordering_fields = set()
  321. total_ordering_fields = {'pk'} | {
  322. field.attname for field in self.lookup_opts.fields
  323. if field.unique and not field.null
  324. }
  325. for part in ordering:
  326. # Search for single field providing a total ordering.
  327. field_name = None
  328. if isinstance(part, str):
  329. field_name = part.lstrip('-')
  330. elif isinstance(part, F):
  331. field_name = part.name
  332. elif isinstance(part, OrderBy) and isinstance(part.expression, F):
  333. field_name = part.expression.name
  334. if field_name:
  335. # Normalize attname references by using get_field().
  336. try:
  337. field = self.lookup_opts.get_field(field_name)
  338. except FieldDoesNotExist:
  339. # Could be "?" for random ordering or a related field
  340. # lookup. Skip this part of introspection for now.
  341. continue
  342. # Ordering by a related field name orders by the referenced
  343. # model's ordering. Skip this part of introspection for now.
  344. if field.remote_field and field_name == field.name:
  345. continue
  346. if field.attname in total_ordering_fields:
  347. break
  348. ordering_fields.add(field.attname)
  349. else:
  350. # No single total ordering field, try unique_together and total
  351. # unique constraints.
  352. constraint_field_names = (
  353. *self.lookup_opts.unique_together,
  354. *(
  355. constraint.fields
  356. for constraint in self.lookup_opts.total_unique_constraints
  357. ),
  358. )
  359. for field_names in constraint_field_names:
  360. # Normalize attname references by using get_field().
  361. fields = [self.lookup_opts.get_field(field_name) for field_name in field_names]
  362. # Composite unique constraints containing a nullable column
  363. # cannot ensure total ordering.
  364. if any(field.null for field in fields):
  365. continue
  366. if ordering_fields.issuperset(field.attname for field in fields):
  367. break
  368. else:
  369. # If no set of unique fields is present in the ordering, rely
  370. # on the primary key to provide total ordering.
  371. ordering.append('-pk')
  372. return ordering
  373. def get_ordering_field_columns(self):
  374. """
  375. Return a dictionary of ordering field column numbers and asc/desc.
  376. """
  377. # We must cope with more than one column having the same underlying sort
  378. # field, so we base things on column numbers.
  379. ordering = self._get_default_ordering()
  380. ordering_fields = {}
  381. if ORDER_VAR not in self.params:
  382. # for ordering specified on ModelAdmin or model Meta, we don't know
  383. # the right column numbers absolutely, because there might be more
  384. # than one column associated with that ordering, so we guess.
  385. for field in ordering:
  386. if isinstance(field, (Combinable, OrderBy)):
  387. if not isinstance(field, OrderBy):
  388. field = field.asc()
  389. if isinstance(field.expression, F):
  390. order_type = 'desc' if field.descending else 'asc'
  391. field = field.expression.name
  392. else:
  393. continue
  394. elif field.startswith('-'):
  395. field = field[1:]
  396. order_type = 'desc'
  397. else:
  398. order_type = 'asc'
  399. for index, attr in enumerate(self.list_display):
  400. if self.get_ordering_field(attr) == field:
  401. ordering_fields[index] = order_type
  402. break
  403. else:
  404. for p in self.params[ORDER_VAR].split('.'):
  405. none, pfx, idx = p.rpartition('-')
  406. try:
  407. idx = int(idx)
  408. except ValueError:
  409. continue # skip it
  410. ordering_fields[idx] = 'desc' if pfx == '-' else 'asc'
  411. return ordering_fields
  412. def get_queryset(self, request):
  413. # First, we collect all the declared list filters.
  414. (
  415. self.filter_specs,
  416. self.has_filters,
  417. remaining_lookup_params,
  418. filters_may_have_duplicates,
  419. self.has_active_filters,
  420. ) = self.get_filters(request)
  421. # Then, we let every list filter modify the queryset to its liking.
  422. qs = self.root_queryset
  423. for filter_spec in self.filter_specs:
  424. new_qs = filter_spec.queryset(request, qs)
  425. if new_qs is not None:
  426. qs = new_qs
  427. try:
  428. # Finally, we apply the remaining lookup parameters from the query
  429. # string (i.e. those that haven't already been processed by the
  430. # filters).
  431. qs = qs.filter(**remaining_lookup_params)
  432. except (SuspiciousOperation, ImproperlyConfigured):
  433. # Allow certain types of errors to be re-raised as-is so that the
  434. # caller can treat them in a special way.
  435. raise
  436. except Exception as e:
  437. # Every other error is caught with a naked except, because we don't
  438. # have any other way of validating lookup parameters. They might be
  439. # invalid if the keyword arguments are incorrect, or if the values
  440. # are not in the correct type, so we might get FieldError,
  441. # ValueError, ValidationError, or ?.
  442. raise IncorrectLookupParameters(e)
  443. # Apply search results
  444. qs, search_may_have_duplicates = self.model_admin.get_search_results(
  445. request, qs, self.query,
  446. )
  447. # Set query string for clearing all filters.
  448. self.clear_all_filters_qs = self.get_query_string(
  449. new_params=remaining_lookup_params,
  450. remove=self.get_filters_params(),
  451. )
  452. # Remove duplicates from results, if necessary
  453. if filters_may_have_duplicates | search_may_have_duplicates:
  454. qs = qs.filter(pk=OuterRef('pk'))
  455. qs = self.root_queryset.filter(Exists(qs))
  456. # Set ordering.
  457. ordering = self.get_ordering(request, qs)
  458. qs = qs.order_by(*ordering)
  459. if not qs.query.select_related:
  460. qs = self.apply_select_related(qs)
  461. return qs
  462. def apply_select_related(self, qs):
  463. if self.list_select_related is True:
  464. return qs.select_related()
  465. if self.list_select_related is False:
  466. if self.has_related_field_in_list_display():
  467. return qs.select_related()
  468. if self.list_select_related:
  469. return qs.select_related(*self.list_select_related)
  470. return qs
  471. def has_related_field_in_list_display(self):
  472. for field_name in self.list_display:
  473. try:
  474. field = self.lookup_opts.get_field(field_name)
  475. except FieldDoesNotExist:
  476. pass
  477. else:
  478. if isinstance(field.remote_field, ManyToOneRel):
  479. # <FK>_id field names don't require a join.
  480. if field_name != field.get_attname():
  481. return True
  482. return False
  483. def url_for_result(self, result):
  484. pk = getattr(result, self.pk_attname)
  485. return reverse('admin:%s_%s_change' % (self.opts.app_label,
  486. self.opts.model_name),
  487. args=(quote(pk),),
  488. current_app=self.model_admin.admin_site.name)