-
Notifications
You must be signed in to change notification settings - Fork 150
Optimize re-sync content #7754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Optimize re-sync content #7754
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Optimized sync with content caching in `QueryExistingContents` and query fix in `QueryExistingArtifacts`. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,8 +30,38 @@ class QueryExistingContents(Stage): | |
|
|
||
| This stage drains all available items from `self._in_q` and batches everything into one large | ||
| call to the db for efficiency. | ||
|
|
||
| When `repo_version` is provided, content from that version is cached in memory on first | ||
| access (per content type) so that repeat syncs can resolve most items without per-batch | ||
| database queries. | ||
|
|
||
| By default, the cache and DB queries load only the natural key fields (plus `pk`). | ||
| Plugins that need additional fields in later pipeline stages can pass | ||
| `extra_fields` - a mapping of content model class to a list of extra field names | ||
| to include. | ||
| """ | ||
|
|
||
| def __init__(self, repo_version=None, extra_fields=None, *args, **kwargs): | ||
| super().__init__(*args, **kwargs) | ||
| self._repo_version = repo_version | ||
| self._extra_fields = extra_fields or {} | ||
| self._content_cache = {} | ||
|
|
||
| def _fields_for_type(self, model_type): | ||
| base = ("pk",) + model_type._sanitized_natural_key_fields() | ||
| extra = self._extra_fields.get(model_type, ()) | ||
| return base + tuple(f for f in extra if f not in base) | ||
|
|
||
| def _ensure_type_cache(self, model_type): | ||
| if model_type not in self._content_cache and self._repo_version is not None: | ||
| fields = self._fields_for_type(model_type) | ||
| cache = {} | ||
| for content in self._repo_version.get_content( | ||
| model_type.objects.only(*fields) | ||
| ).iterator(): | ||
| cache[content.natural_key()] = content | ||
| self._content_cache[model_type] = cache | ||
|
|
||
| async def run(self): | ||
| """ | ||
| The coroutine for this stage. | ||
|
|
@@ -42,25 +72,46 @@ async def run(self): | |
| async for batch in self.batches(): | ||
| content_q_by_type = defaultdict(lambda: Q(pk__in=[])) | ||
| d_content_by_nat_key = defaultdict(list) | ||
| cache_hits_by_type = defaultdict(set) | ||
|
|
||
| for d_content in batch: | ||
| if d_content.content._state.adding: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Theoretically content could be passed through already saved, in which case I think we're probably not
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I understand this. Why do we need to call
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it basically resets the orphan cleanup protection timer. |
||
| model_type = type(d_content.content) | ||
| unit_q = d_content.content.q() | ||
| content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q | ||
| d_content_by_nat_key[d_content.content.natural_key()].append(d_content) | ||
|
|
||
| nat_key = d_content.content.natural_key() | ||
| await sync_to_async(self._ensure_type_cache)(model_type) | ||
| cached = self._content_cache.get(model_type, {}).get(nat_key) | ||
| if cached is not None: | ||
| d_content.content = cached | ||
| cache_hits_by_type[model_type].add(cached.pk) | ||
| else: | ||
| unit_q = d_content.content.q() | ||
| content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q | ||
| d_content_by_nat_key[nat_key].append(d_content) | ||
|
|
||
| db_results_by_type = defaultdict(list) | ||
| for model_type, content_q in content_q_by_type.items(): | ||
| try: | ||
| await sync_to_async(model_type.objects.filter(content_q).touch)() | ||
| except AttributeError: | ||
| raise TypeError( | ||
| "Plugins which declare custom ORM managers on their content classes " | ||
| "should have those managers inherit from " | ||
| "pulpcore.plugin.models.ContentManager." | ||
| ) | ||
| fields = self._fields_for_type(model_type) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could do deferred_fields = self._deferred_fields_for_model(model_type) # which is a simpler function to implement
async for result in sync_to_async_iterable(
model_type.objects.filter(content_q).defer(*deferred_fields).iterator()
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might want to double check what happens when passing |
||
| async for result in sync_to_async_iterable( | ||
| model_type.objects.filter(content_q).iterator() | ||
| model_type.objects.filter(content_q).only(*fields).iterator() | ||
| ): | ||
| db_results_by_type[model_type].append(result) | ||
|
|
||
| all_types = set(cache_hits_by_type.keys()) | set(db_results_by_type.keys()) | ||
| for model_type in all_types: | ||
| pks = cache_hits_by_type.get(model_type, set()) | ||
| pks = pks | {r.pk for r in db_results_by_type.get(model_type, [])} | ||
| if pks: | ||
| try: | ||
| await sync_to_async(model_type.objects.filter(pk__in=pks).touch)() | ||
| except AttributeError: | ||
| raise TypeError( | ||
| "Plugins which declare custom ORM managers on their content classes " | ||
| "should have those managers inherit from " | ||
| "pulpcore.plugin.models.ContentManager." | ||
| ) | ||
|
|
||
| for model_type, results in db_results_by_type.items(): | ||
| for result in results: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A little worried that the additional loops here would outweigh the reduced # of queries |
||
| for d_content in d_content_by_nat_key[result.natural_key()]: | ||
| d_content.content = result | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So,there's 2 ways to implement this behavior:
.only().defer()Thinking about this a bit more,
deferred_fieldsmight have been the right idea, just the wrong implementation.It's really only a couple of fields that are known to be really expensive, and it's probably easier and safer for plugins to just opt out of those specific fields (if the sync pipeline doesn't need them) than to just use a heuristic like "plugins probably only need the natural key fields", which might require doing a bit of an audit to make sure that's actually true, otherwise it might actually cause performance regressions.
So instead of trying to calculate the list of extra fields, it might be simpler to just take a list of deferred fields and pass it directly to
.defer()(not.only())The main concern would be: if a plugin turned on caching without opting into some of this, it might increase memory consumption during the sync due to the cache being immediately pre-populated and held for the entire sync. I don't know that that would be a concern for any plugin that doesn't already define their own pipeline though (who would need to switch this on deliberately anyway)
What do you think?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could do something like