|
6 | 6 | from pulpcore.app.files import validate_file_paths |
7 | 7 | from pulpcore.app.models import Content, ContentArtifact |
8 | 8 | from pulpcore.app.util import batch_qs |
| 9 | +from pulpcore.exceptions import DuplicateContentInRepositoryError |
| 10 | +from collections import defaultdict |
| 11 | +from django_guid import get_guid |
| 12 | +from typing import NamedTuple |
| 13 | +from uuid import UUID |
9 | 14 |
|
10 | 15 |
|
11 | 16 | _logger = logging.getLogger(__name__) |
@@ -78,35 +83,59 @@ def validate_duplicate_content(version): |
78 | 83 | Uses repo_key_fields to determine if content is duplicated. |
79 | 84 |
|
80 | 85 | Raises: |
81 | | - ValueError: If repo version has duplicate content. |
| 86 | + RepositoryVersionCreateError: If repo version has duplicate content. |
82 | 87 | """ |
83 | | - error_messages = [] |
84 | | - |
| 88 | + dup_count = 0 |
| 89 | + correlation_id = get_guid() |
85 | 90 | for type_obj in version.repository.CONTENT_TYPES: |
86 | 91 | if type_obj.repo_key_fields == (): |
87 | 92 | continue |
88 | | - |
89 | 93 | pulp_type = type_obj.get_pulp_type() |
90 | | - repo_key_fields = type_obj.repo_key_fields |
91 | | - new_content_total = type_obj.objects.filter( |
92 | | - pk__in=version.content.filter(pulp_type=pulp_type) |
93 | | - ).count() |
94 | | - unique_new_content_total = ( |
95 | | - type_obj.objects.filter(pk__in=version.content.filter(pulp_type=pulp_type)) |
96 | | - .distinct(*repo_key_fields) |
97 | | - .count() |
98 | | - ) |
99 | | - |
100 | | - if unique_new_content_total < new_content_total: |
101 | | - error_messages.append( |
102 | | - _( |
103 | | - "More than one {pulp_type} content with the duplicate values for {fields}." |
104 | | - ).format(pulp_type=pulp_type, fields=", ".join(repo_key_fields)) |
105 | | - ) |
106 | | - if error_messages: |
107 | | - raise ValueError( |
108 | | - _("Cannot create repository version. {msg}").format(msg=", ".join(error_messages)) |
109 | | - ) |
| 94 | + unique_keys = type_obj.repo_key_fields |
| 95 | + content_qs = type_obj.objects.filter(pk__in=version.content.filter(pulp_type=pulp_type)) |
| 96 | + dup_count = count_duplicates(content_qs, unique_keys) |
| 97 | + if dup_count > 0: |
| 98 | + # At this point the task already failed, so we'll pay extra queries |
| 99 | + # to collect duplicates and provide more useful logs |
| 100 | + for duplicate in collect_duplicates(content_qs, unique_keys): |
| 101 | + keyset_value = duplicate.keyset_value |
| 102 | + duplicate_pks = duplicate.duplicate_pks |
| 103 | + _logger.info(f"Duplicates found: {pulp_type=}; {keyset_value=}; {duplicate_pks=}") |
| 104 | + if dup_count > 0: |
| 105 | + raise DuplicateContentInRepositoryError(dup_count, correlation_id) |
| 106 | + |
| 107 | + |
| 108 | +class DuplicateEntry(NamedTuple): |
| 109 | + keyset_value: tuple[str, ...] |
| 110 | + duplicate_pks: list[UUID] |
| 111 | + |
| 112 | + |
| 113 | +def count_duplicates(content_qs, unique_keys: tuple[str]) -> int: |
| 114 | + new_content_total = content_qs.count() |
| 115 | + unique_new_content_total = content_qs.distinct(*unique_keys).count() |
| 116 | + return new_content_total - unique_new_content_total |
| 117 | + |
| 118 | + |
| 119 | +def collect_duplicates(content_qs, unique_keys: tuple[str]) -> list[DuplicateEntry]: |
| 120 | + last_keyset = None |
| 121 | + last_pk = None |
| 122 | + keyset_to_contents = defaultdict(list) |
| 123 | + content_qs = content_qs.values_list(*unique_keys, "pk") |
| 124 | + for values in content_qs.order_by(*unique_keys).iterator(): |
| 125 | + keyset_value = values[:-1] |
| 126 | + pk = str(values[-1]) |
| 127 | + if keyset_value == last_keyset: |
| 128 | + dup_pk_list = keyset_to_contents[keyset_value] |
| 129 | + # the previous duplicate didn't know it was a duplicate |
| 130 | + if len(dup_pk_list) == 0: |
| 131 | + dup_pk_list.append(last_pk) |
| 132 | + dup_pk_list.append(pk) |
| 133 | + last_keyset = keyset_value |
| 134 | + last_pk = pk |
| 135 | + duplicate_entries = [] |
| 136 | + for keyset_value, pk_list in keyset_to_contents.items(): |
| 137 | + duplicate_entries.append(DuplicateEntry(duplicate_pks=pk_list, keyset_value=keyset_value)) |
| 138 | + return duplicate_entries |
110 | 139 |
|
111 | 140 |
|
112 | 141 | def validate_version_paths(version): |
|
0 commit comments