From 624b4e3cbd3b56b9a4c2a1abdcd83cbe3ff4c576 Mon Sep 17 00:00:00 2001 From: Jitka Halova Date: Fri, 29 May 2026 18:38:09 +0200 Subject: [PATCH 1/4] Optimize re-sync content Assisted By: Claude Opus 4.6 --- pulpcore/plugin/stages/content_stages.py | 49 +++++++++++++++++-- pulpcore/plugin/stages/declarative_version.py | 2 +- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/pulpcore/plugin/stages/content_stages.py b/pulpcore/plugin/stages/content_stages.py index 78962e2bc1f..1a436a8dd01 100644 --- a/pulpcore/plugin/stages/content_stages.py +++ b/pulpcore/plugin/stages/content_stages.py @@ -32,6 +32,27 @@ class QueryExistingContents(Stage): call to the db for efficiency. """ + def __init__(self, repo_version=None, deferred_fields=None, *args, **kwargs): + super().__init__(*args, **kwargs) + self._repo_version = repo_version + self._deferred_fields = deferred_fields or {} + self._content_cache = {} + + def _fields_for_type(self, model_type): + base = ("pk",) + model_type._sanitized_natural_key_fields() + extra = self._deferred_fields.get(model_type, ()) + return base + tuple(f for f in extra if f not in base) + + def _ensure_type_cache(self, model_type): + if model_type not in self._content_cache and self._repo_version is not None: + fields = self._fields_for_type(model_type) + cache = {} + for content in self._repo_version.get_content( + model_type.objects.only(*fields) + ).iterator(): + cache[content.natural_key()] = content + self._content_cache[model_type] = cache + async def run(self): """ The coroutine for this stage. @@ -42,12 +63,31 @@ async def run(self): async for batch in self.batches(): content_q_by_type = defaultdict(lambda: Q(pk__in=[])) d_content_by_nat_key = defaultdict(list) + cache_hits_by_type = defaultdict(lambda: Q(pk__in=[])) + for d_content in batch: if d_content.content._state.adding: model_type = type(d_content.content) - unit_q = d_content.content.q() - content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q - d_content_by_nat_key[d_content.content.natural_key()].append(d_content) + nat_key = d_content.content.natural_key() + await sync_to_async(self._ensure_type_cache)(model_type) + cached = self._content_cache.get(model_type, {}).get(nat_key) + if cached is not None: + d_content.content = cached + cache_hits_by_type[model_type] |= Q(pk=cached.pk) + else: + unit_q = d_content.content.q() + content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q + d_content_by_nat_key[nat_key].append(d_content) + + for model_type, hit_q in cache_hits_by_type.items(): + try: + await sync_to_async(model_type.objects.filter(hit_q).touch)() + except AttributeError: + raise TypeError( + "Plugins which declare custom ORM managers on their content classes " + "should have those managers inherit from " + "pulpcore.plugin.models.ContentManager." + ) for model_type, content_q in content_q_by_type.items(): try: @@ -58,8 +98,9 @@ async def run(self): "should have those managers inherit from " "pulpcore.plugin.models.ContentManager." ) + fields = self._fields_for_type(model_type) async for result in sync_to_async_iterable( - model_type.objects.filter(content_q).iterator() + model_type.objects.filter(content_q).only(*fields).iterator() ): for d_content in d_content_by_nat_key[result.natural_key()]: d_content.content = result diff --git a/pulpcore/plugin/stages/declarative_version.py b/pulpcore/plugin/stages/declarative_version.py index 22cd54173ca..0e91071ca47 100644 --- a/pulpcore/plugin/stages/declarative_version.py +++ b/pulpcore/plugin/stages/declarative_version.py @@ -142,7 +142,7 @@ def pipeline_stages(self, new_version): [ ArtifactDownloader(resource_budget=resource_budget), ArtifactSaver(resource_budget=resource_budget), - QueryExistingContents(), + QueryExistingContents(new_version), ContentSaver(), RemoteArtifactSaver(), ResolveContentFutures(), From 5daddf20c493b27f8735e023c94362ae288d8ba1 Mon Sep 17 00:00:00 2001 From: Jitka Halova Date: Mon, 1 Jun 2026 13:20:41 +0200 Subject: [PATCH 2/4] wip - fix review findings --- pulpcore/plugin/stages/content_stages.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/pulpcore/plugin/stages/content_stages.py b/pulpcore/plugin/stages/content_stages.py index 1a436a8dd01..bab7d28b5c8 100644 --- a/pulpcore/plugin/stages/content_stages.py +++ b/pulpcore/plugin/stages/content_stages.py @@ -30,17 +30,26 @@ class QueryExistingContents(Stage): This stage drains all available items from `self._in_q` and batches everything into one large call to the db for efficiency. + + When `repo_version` is provided, content from that version is cached in memory on first + access (per content type) so that repeat syncs can resolve most items without per-batch + database queries. + + By default, the cache and DB queries load only the natural key fields (plus `pk`). + Plugins that need additional fields in later pipeline stages can pass + `extra_fields` - a mapping of content model class to a list of extra field names + to include. """ - def __init__(self, repo_version=None, deferred_fields=None, *args, **kwargs): + def __init__(self, repo_version=None, extra_fields=None, *args, **kwargs): super().__init__(*args, **kwargs) self._repo_version = repo_version - self._deferred_fields = deferred_fields or {} + self._extra_fields = extra_fields or {} self._content_cache = {} def _fields_for_type(self, model_type): base = ("pk",) + model_type._sanitized_natural_key_fields() - extra = self._deferred_fields.get(model_type, ()) + extra = self._extra_fields.get(model_type, ()) return base + tuple(f for f in extra if f not in base) def _ensure_type_cache(self, model_type): @@ -63,7 +72,7 @@ async def run(self): async for batch in self.batches(): content_q_by_type = defaultdict(lambda: Q(pk__in=[])) d_content_by_nat_key = defaultdict(list) - cache_hits_by_type = defaultdict(lambda: Q(pk__in=[])) + cache_hits_by_type = defaultdict(set) for d_content in batch: if d_content.content._state.adding: @@ -73,15 +82,15 @@ async def run(self): cached = self._content_cache.get(model_type, {}).get(nat_key) if cached is not None: d_content.content = cached - cache_hits_by_type[model_type] |= Q(pk=cached.pk) + cache_hits_by_type[model_type].add(cached.pk) else: unit_q = d_content.content.q() content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q d_content_by_nat_key[nat_key].append(d_content) - for model_type, hit_q in cache_hits_by_type.items(): + for model_type, hit_pks in cache_hits_by_type.items(): try: - await sync_to_async(model_type.objects.filter(hit_q).touch)() + await sync_to_async(model_type.objects.filter(pk__in=hit_pks).touch)() except AttributeError: raise TypeError( "Plugins which declare custom ORM managers on their content classes " From a98bf7f7dbaf43df68b349358a497f7f31d154c6 Mon Sep 17 00:00:00 2001 From: Jitka Halova Date: Tue, 2 Jun 2026 16:49:31 +0200 Subject: [PATCH 3/4] Optimize QueryExistingArtifacts --- pulpcore/plugin/stages/artifact_stages.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pulpcore/plugin/stages/artifact_stages.py b/pulpcore/plugin/stages/artifact_stages.py index bfda6d12061..92cca86f556 100644 --- a/pulpcore/plugin/stages/artifact_stages.py +++ b/pulpcore/plugin/stages/artifact_stages.py @@ -200,17 +200,15 @@ async def run(self): "pulp_domain": self.domain, } existing_artifacts_qs = Artifact.objects.filter(**query_params) - existing_artifacts = sync_to_async_iterable(existing_artifacts_qs) await sync_to_async(existing_artifacts_qs.touch)() + existing_by_digest = {} + async for result in sync_to_async_iterable(existing_artifacts_qs): + existing_by_digest[getattr(result, digest_type)] = result for d_content in batch: for d_artifact in d_content.d_artifacts: artifact_digest = getattr(d_artifact.artifact, digest_type) - if artifact_digest: - async for result in existing_artifacts: - result_digest = getattr(result, digest_type) - if result_digest == artifact_digest: - d_artifact.artifact = result - break + if artifact_digest and artifact_digest in existing_by_digest: + d_artifact.artifact = existing_by_digest[artifact_digest] for d_content in batch: await self.put(d_content) From c4c68af0eebcdc9e45a574ffac8c71c14ae2dfda Mon Sep 17 00:00:00 2001 From: Jitka Halova Date: Tue, 2 Jun 2026 17:32:43 +0200 Subject: [PATCH 4/4] wip - reduce redundant queries closes #7558 --- CHANGES/7558.bugfix | 1 + pulpcore/plugin/stages/content_stages.py | 37 ++++++++++++------------ 2 files changed, 20 insertions(+), 18 deletions(-) create mode 100644 CHANGES/7558.bugfix diff --git a/CHANGES/7558.bugfix b/CHANGES/7558.bugfix new file mode 100644 index 00000000000..0aa60de66ce --- /dev/null +++ b/CHANGES/7558.bugfix @@ -0,0 +1 @@ +Optimized sync with content caching in `QueryExistingContents` and query fix in `QueryExistingArtifacts`. diff --git a/pulpcore/plugin/stages/content_stages.py b/pulpcore/plugin/stages/content_stages.py index bab7d28b5c8..d30df481f69 100644 --- a/pulpcore/plugin/stages/content_stages.py +++ b/pulpcore/plugin/stages/content_stages.py @@ -88,29 +88,30 @@ async def run(self): content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q d_content_by_nat_key[nat_key].append(d_content) - for model_type, hit_pks in cache_hits_by_type.items(): - try: - await sync_to_async(model_type.objects.filter(pk__in=hit_pks).touch)() - except AttributeError: - raise TypeError( - "Plugins which declare custom ORM managers on their content classes " - "should have those managers inherit from " - "pulpcore.plugin.models.ContentManager." - ) - + db_results_by_type = defaultdict(list) for model_type, content_q in content_q_by_type.items(): - try: - await sync_to_async(model_type.objects.filter(content_q).touch)() - except AttributeError: - raise TypeError( - "Plugins which declare custom ORM managers on their content classes " - "should have those managers inherit from " - "pulpcore.plugin.models.ContentManager." - ) fields = self._fields_for_type(model_type) async for result in sync_to_async_iterable( model_type.objects.filter(content_q).only(*fields).iterator() ): + db_results_by_type[model_type].append(result) + + all_types = set(cache_hits_by_type.keys()) | set(db_results_by_type.keys()) + for model_type in all_types: + pks = cache_hits_by_type.get(model_type, set()) + pks = pks | {r.pk for r in db_results_by_type.get(model_type, [])} + if pks: + try: + await sync_to_async(model_type.objects.filter(pk__in=pks).touch)() + except AttributeError: + raise TypeError( + "Plugins which declare custom ORM managers on their content classes " + "should have those managers inherit from " + "pulpcore.plugin.models.ContentManager." + ) + + for model_type, results in db_results_by_type.items(): + for result in results: for d_content in d_content_by_nat_key[result.natural_key()]: d_content.content = result