Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/7558.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimized sync with content caching in `QueryExistingContents` and query fix in `QueryExistingArtifacts`.
12 changes: 5 additions & 7 deletions pulpcore/plugin/stages/artifact_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,15 @@ async def run(self):
"pulp_domain": self.domain,
}
existing_artifacts_qs = Artifact.objects.filter(**query_params)
existing_artifacts = sync_to_async_iterable(existing_artifacts_qs)
await sync_to_async(existing_artifacts_qs.touch)()
existing_by_digest = {}
async for result in sync_to_async_iterable(existing_artifacts_qs):
existing_by_digest[getattr(result, digest_type)] = result
for d_content in batch:
for d_artifact in d_content.d_artifacts:
artifact_digest = getattr(d_artifact.artifact, digest_type)
if artifact_digest:
async for result in existing_artifacts:
result_digest = getattr(result, digest_type)
if result_digest == artifact_digest:
d_artifact.artifact = result
break
if artifact_digest and artifact_digest in existing_by_digest:
d_artifact.artifact = existing_by_digest[artifact_digest]
for d_content in batch:
await self.put(d_content)

Expand Down
77 changes: 64 additions & 13 deletions pulpcore/plugin/stages/content_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,38 @@ class QueryExistingContents(Stage):

This stage drains all available items from `self._in_q` and batches everything into one large
call to the db for efficiency.

When `repo_version` is provided, content from that version is cached in memory on first
access (per content type) so that repeat syncs can resolve most items without per-batch
database queries.

By default, the cache and DB queries load only the natural key fields (plus `pk`).
Plugins that need additional fields in later pipeline stages can pass
`extra_fields` - a mapping of content model class to a list of extra field names
to include.
"""

def __init__(self, repo_version=None, extra_fields=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._repo_version = repo_version
self._extra_fields = extra_fields or {}
self._content_cache = {}

def _fields_for_type(self, model_type):
base = ("pk",) + model_type._sanitized_natural_key_fields()
extra = self._extra_fields.get(model_type, ())
return base + tuple(f for f in extra if f not in base)
Copy link
Copy Markdown
Contributor

@dralley dralley Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So,there's 2 ways to implement this behavior:

  1. Creating a list of fields to pass to .only()
  2. Creating a list of fields to pass to .defer()

Thinking about this a bit more, deferred_fields might have been the right idea, just the wrong implementation.

It's really only a couple of fields that are known to be really expensive, and it's probably easier and safer for plugins to just opt out of those specific fields (if the sync pipeline doesn't need them) than to just use a heuristic like "plugins probably only need the natural key fields", which might require doing a bit of an audit to make sure that's actually true, otherwise it might actually cause performance regressions.

So instead of trying to calculate the list of extra fields, it might be simpler to just take a list of deferred fields and pass it directly to .defer() (not .only())

The main concern would be: if a plugin turned on caching without opting into some of this, it might increase memory consumption during the sync due to the cache being immediately pre-populated and held for the entire sync. I don't know that that would be a concern for any plugin that doesn't already define their own pipeline though (who would need to switch this on deliberately anyway)

What do you think?

Copy link
Copy Markdown
Contributor

@dralley dralley Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could do something like

    def _deferred_fields_for_type(self, model_type):
        self._deferred_fields[model_type] if self._deferred_fields else []


def _ensure_type_cache(self, model_type):
if model_type not in self._content_cache and self._repo_version is not None:
fields = self._fields_for_type(model_type)
cache = {}
for content in self._repo_version.get_content(
model_type.objects.only(*fields)
).iterator():
cache[content.natural_key()] = content
self._content_cache[model_type] = cache

async def run(self):
"""
The coroutine for this stage.
Expand All @@ -42,25 +72,46 @@ async def run(self):
async for batch in self.batches():
content_q_by_type = defaultdict(lambda: Q(pk__in=[]))
d_content_by_nat_key = defaultdict(list)
cache_hits_by_type = defaultdict(set)

for d_content in batch:
if d_content.content._state.adding:
Copy link
Copy Markdown
Contributor

@dralley dralley Jun 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically content could be passed through already saved, in which case I think we're probably not touch ing it. That might be an existing bug, though not a particularly serious one.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand this. Why do we need to call touch when content already exists? Is it because of orphan clean up?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it basically resets the orphan cleanup protection timer.

model_type = type(d_content.content)
unit_q = d_content.content.q()
content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q
d_content_by_nat_key[d_content.content.natural_key()].append(d_content)

nat_key = d_content.content.natural_key()
await sync_to_async(self._ensure_type_cache)(model_type)
cached = self._content_cache.get(model_type, {}).get(nat_key)
if cached is not None:
d_content.content = cached
cache_hits_by_type[model_type].add(cached.pk)
else:
unit_q = d_content.content.q()
content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q
d_content_by_nat_key[nat_key].append(d_content)

db_results_by_type = defaultdict(list)
for model_type, content_q in content_q_by_type.items():
try:
await sync_to_async(model_type.objects.filter(content_q).touch)()
except AttributeError:
raise TypeError(
"Plugins which declare custom ORM managers on their content classes "
"should have those managers inherit from "
"pulpcore.plugin.models.ContentManager."
)
fields = self._fields_for_type(model_type)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do

                deferred_fields = self._deferred_fields_for_model(model_type)  # which is a simpler function to implement
                async for result in sync_to_async_iterable(
                    model_type.objects.filter(content_q).defer(*deferred_fields).iterator()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to double check what happens when passing .defer() an empty set of arguments though

async for result in sync_to_async_iterable(
model_type.objects.filter(content_q).iterator()
model_type.objects.filter(content_q).only(*fields).iterator()
):
db_results_by_type[model_type].append(result)

all_types = set(cache_hits_by_type.keys()) | set(db_results_by_type.keys())
for model_type in all_types:
pks = cache_hits_by_type.get(model_type, set())
pks = pks | {r.pk for r in db_results_by_type.get(model_type, [])}
if pks:
try:
await sync_to_async(model_type.objects.filter(pk__in=pks).touch)()
except AttributeError:
raise TypeError(
"Plugins which declare custom ORM managers on their content classes "
"should have those managers inherit from "
"pulpcore.plugin.models.ContentManager."
)

for model_type, results in db_results_by_type.items():
for result in results:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little worried that the additional loops here would outweigh the reduced # of queries

for d_content in d_content_by_nat_key[result.natural_key()]:
d_content.content = result

Expand Down
2 changes: 1 addition & 1 deletion pulpcore/plugin/stages/declarative_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def pipeline_stages(self, new_version):
[
ArtifactDownloader(resource_budget=resource_budget),
ArtifactSaver(resource_budget=resource_budget),
QueryExistingContents(),
QueryExistingContents(new_version),
ContentSaver(),
RemoteArtifactSaver(),
ResolveContentFutures(),
Expand Down
Loading