Skip to content

Commit 8f9a539

Browse files
jdrew82claude
andcommitted
feat: Add bulk sync operations and advanced filtering capabilities
Add 9 new features organized in 3 phases: Phase 1 - Diff-side Filtering: - Diff.filter()/exclude() for post-diff, pre-sync manipulation - model_types parameter to scope diffs/syncs to specific model types - sync_attrs/exclude_attrs for attribute-level diff control - filters parameter for per-model-type query predicates Phase 2 - Sync-side Enhancements: - sync_filter callback to approve/reject individual CRUD operations - Structured operations summary passed to sync_complete() Phase 3 - Bulk & Parallel Execution: - Bulk CRUD methods (create_bulk/update_bulk/delete_bulk) on DiffSyncModel - Store-level bulk methods (add_bulk/update_bulk/remove_bulk) - Thread-safe LocalStore with locking for concurrent access - concurrent flag for parallel sync of independent subtrees All new parameters default to None/False for full backwards compatibility. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a3a733b commit 8f9a539

File tree

7 files changed

+1205
-66
lines changed

7 files changed

+1205
-66
lines changed

diffsync/__init__.py

Lines changed: 144 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,51 @@ def delete(self) -> Optional[Self]:
306306
"""
307307
return self.delete_base()
308308

309+
@classmethod
310+
def create_bulk(cls, adapter: "Adapter", objects: List[Dict]) -> List[Optional[Self]]:
311+
"""Bulk create multiple instances. Override for batch creation (e.g. single API call).
312+
313+
The default implementation loops over individual create() calls.
314+
315+
Args:
316+
adapter: The master data store for other DiffSyncModel instances
317+
objects: List of dicts, each with "ids" and "attrs" keys
318+
319+
Returns:
320+
List of created DiffSyncModel instances (or None for failed creations)
321+
"""
322+
return [cls.create(adapter=adapter, ids=obj["ids"], attrs=obj["attrs"]) for obj in objects]
323+
324+
@classmethod
325+
def update_bulk(cls, adapter: "Adapter", objects: List[Tuple["DiffSyncModel", Dict]]) -> List[Optional[Self]]:
326+
"""Bulk update multiple instances. Override for batch updates (e.g. single API call).
327+
328+
The default implementation loops over individual update() calls.
329+
330+
Args:
331+
adapter: The master data store for other DiffSyncModel instances
332+
objects: List of (existing_model, attrs_to_update) tuples
333+
334+
Returns:
335+
List of updated DiffSyncModel instances (or None for failed updates)
336+
"""
337+
return [model.update(attrs=attrs) for model, attrs in objects]
338+
339+
@classmethod
340+
def delete_bulk(cls, adapter: "Adapter", objects: List["DiffSyncModel"]) -> List[Optional[Self]]:
341+
"""Bulk delete multiple instances. Override for batch deletion (e.g. single API call).
342+
343+
The default implementation loops over individual delete() calls.
344+
345+
Args:
346+
adapter: The master data store for other DiffSyncModel instances
347+
objects: List of model instances to delete
348+
349+
Returns:
350+
List of deleted DiffSyncModel instances (or None for failed deletions)
351+
"""
352+
return [model.delete() for model in objects]
353+
309354
@classmethod
310355
def get_type(cls) -> StrType:
311356
"""Return the type AKA modelname of the object or the class.
@@ -581,6 +626,14 @@ def sync_from( # pylint: disable=too-many-arguments, too-many-positional-argume
581626
flags: DiffSyncFlags = DiffSyncFlags.NONE,
582627
callback: Optional[Callable[[StrType, int, int], None]] = None,
583628
diff: Optional[Diff] = None,
629+
model_types: Optional[Set[StrType]] = None,
630+
filters: Optional[Dict[StrType, Callable]] = None,
631+
sync_attrs: Optional[Dict[StrType, Set[StrType]]] = None,
632+
exclude_attrs: Optional[Dict[StrType, Set[StrType]]] = None,
633+
sync_filter: Optional[Callable[[StrType, StrType, Dict, Dict], bool]] = None,
634+
batch_size: Optional[int] = None,
635+
concurrent: bool = False,
636+
max_workers: Optional[int] = None,
584637
) -> Diff:
585638
"""Synchronize data from the given source DiffSync object into the current DiffSync object.
586639
@@ -591,6 +644,14 @@ def sync_from( # pylint: disable=too-many-arguments, too-many-positional-argume
591644
callback: Function with parameters (stage, current, total), to be called at intervals as the calculation of
592645
the diff and subsequent sync proceed.
593646
diff: An existing diff to be used rather than generating a completely new diff.
647+
model_types: Optional set of model type names to restrict the sync to.
648+
filters: Optional dict of {model_type: predicate_callable} to filter which objects are synced.
649+
sync_attrs: Optional dict of {model_type: set_of_attr_names} to whitelist attributes for syncing.
650+
exclude_attrs: Optional dict of {model_type: set_of_attr_names} to exclude attributes from syncing.
651+
sync_filter: Optional callback (action, model_type, ids, attrs) -> bool to approve/reject each operation.
652+
batch_size: Optional chunk size for batched sync execution.
653+
concurrent: If True, sync independent top-level subtrees in parallel.
654+
max_workers: Maximum number of threads for concurrent sync.
594655
595656
Returns:
596657
Diff between origin object and source
@@ -605,17 +666,35 @@ def sync_from( # pylint: disable=too-many-arguments, too-many-positional-argume
605666

606667
# Generate the diff if an existing diff was not provided
607668
if not diff:
608-
diff = self.diff_from(source, diff_class=diff_class, flags=flags, callback=callback)
669+
diff = self.diff_from(
670+
source,
671+
diff_class=diff_class,
672+
flags=flags,
673+
callback=callback,
674+
model_types=model_types,
675+
filters=filters,
676+
sync_attrs=sync_attrs,
677+
exclude_attrs=exclude_attrs,
678+
)
609679
syncer = DiffSyncSyncer(
610680
diff=diff,
611681
src_diffsync=source,
612682
dst_diffsync=self,
613683
flags=flags,
614684
callback=callback,
685+
sync_filter=sync_filter,
686+
batch_size=batch_size,
687+
concurrent=concurrent,
688+
max_workers=max_workers,
615689
)
616690
result = syncer.perform_sync()
617691
if result:
618-
self.sync_complete(source, diff, flags, syncer.base_logger)
692+
# Feature 4: Pass operations summary to sync_complete
693+
try:
694+
self.sync_complete(source, diff, flags, syncer.base_logger, operations=syncer.operations)
695+
except TypeError:
696+
# Backwards compatibility: existing subclass overrides may not accept operations kwarg
697+
self.sync_complete(source, diff, flags, syncer.base_logger)
619698

620699
return diff
621700

@@ -626,6 +705,14 @@ def sync_to( # pylint: disable=too-many-arguments, too-many-positional-argument
626705
flags: DiffSyncFlags = DiffSyncFlags.NONE,
627706
callback: Optional[Callable[[StrType, int, int], None]] = None,
628707
diff: Optional[Diff] = None,
708+
model_types: Optional[Set[StrType]] = None,
709+
filters: Optional[Dict[StrType, Callable]] = None,
710+
sync_attrs: Optional[Dict[StrType, Set[StrType]]] = None,
711+
exclude_attrs: Optional[Dict[StrType, Set[StrType]]] = None,
712+
sync_filter: Optional[Callable[[StrType, StrType, Dict, Dict], bool]] = None,
713+
batch_size: Optional[int] = None,
714+
concurrent: bool = False,
715+
max_workers: Optional[int] = None,
629716
) -> Diff:
630717
"""Synchronize data from the current DiffSync object into the given target DiffSync object.
631718
@@ -636,20 +723,40 @@ def sync_to( # pylint: disable=too-many-arguments, too-many-positional-argument
636723
callback: Function with parameters (stage, current, total), to be called at intervals as the calculation of
637724
the diff and subsequent sync proceed.
638725
diff: An existing diff that will be used when determining what needs to be synced.
726+
model_types: Optional set of model type names to restrict the sync to.
727+
filters: Optional dict of {model_type: predicate_callable} to filter which objects are synced.
728+
sync_attrs: Optional dict of {model_type: set_of_attr_names} to whitelist attributes for syncing.
729+
exclude_attrs: Optional dict of {model_type: set_of_attr_names} to exclude attributes from syncing.
730+
sync_filter: Optional callback (action, model_type, ids, attrs) -> bool to approve/reject each operation.
639731
640732
Returns:
641733
Diff between origin object and target
642734
Raises:
643735
DiffClassMismatch: The provided diff's class does not match the diff_class
644736
"""
645-
return target.sync_from(self, diff_class=diff_class, flags=flags, callback=callback, diff=diff)
737+
return target.sync_from(
738+
self,
739+
diff_class=diff_class,
740+
flags=flags,
741+
callback=callback,
742+
diff=diff,
743+
model_types=model_types,
744+
filters=filters,
745+
sync_attrs=sync_attrs,
746+
exclude_attrs=exclude_attrs,
747+
sync_filter=sync_filter,
748+
batch_size=batch_size,
749+
concurrent=concurrent,
750+
max_workers=max_workers,
751+
)
646752

647753
def sync_complete(
648754
self,
649755
source: "Adapter",
650756
diff: Diff,
651757
flags: DiffSyncFlags = DiffSyncFlags.NONE,
652758
logger: Optional[structlog.BoundLogger] = None,
759+
operations: Optional[Dict[StrType, Dict[StrType, List[Dict]]]] = None,
653760
) -> None:
654761
"""Callback triggered after a `sync_from` operation has completed and updated the model data of this instance.
655762
@@ -664,18 +771,24 @@ def sync_complete(
664771
diff: The Diff calculated prior to the sync operation.
665772
flags: Any flags that influenced the sync.
666773
logger: Logging context for the sync.
774+
operations: Structured summary of all CRUD operations performed during sync.
775+
Format: {"model_type": {"create": [{"ids": {...}, "attrs": {...}, "model": ...}], "update": [...], "delete": [...]}}
667776
"""
668777

669778
# ------------------------------------------------------------------------------
670779
# Diff calculation and construction
671780
# ------------------------------------------------------------------------------
672781

673-
def diff_from(
782+
def diff_from( # pylint: disable=too-many-arguments, too-many-positional-arguments
674783
self,
675784
source: "Adapter",
676785
diff_class: Type[Diff] = Diff,
677786
flags: DiffSyncFlags = DiffSyncFlags.NONE,
678787
callback: Optional[Callable[[StrType, int, int], None]] = None,
788+
model_types: Optional[Set[StrType]] = None,
789+
filters: Optional[Dict[StrType, Callable]] = None,
790+
sync_attrs: Optional[Dict[StrType, Set[StrType]]] = None,
791+
exclude_attrs: Optional[Dict[StrType, Set[StrType]]] = None,
679792
) -> Diff:
680793
"""Generate a Diff describing the difference from the other DiffSync to this one.
681794
@@ -685,22 +798,34 @@ def diff_from(
685798
flags: Flags influencing the behavior of this diff operation.
686799
callback: Function with parameters (stage, current, total), to be called at intervals as the
687800
calculation of the diff proceeds.
801+
model_types: Optional set of model type names to restrict the diff to.
802+
filters: Optional dict of {model_type: predicate_callable} to filter which objects are diffed.
803+
sync_attrs: Optional dict of {model_type: set_of_attr_names} to whitelist attributes for diffing.
804+
exclude_attrs: Optional dict of {model_type: set_of_attr_names} to exclude attributes from diffing.
688805
"""
689806
differ = DiffSyncDiffer(
690807
src_diffsync=source,
691808
dst_diffsync=self,
692809
flags=flags,
693810
diff_class=diff_class,
694811
callback=callback,
812+
model_types=model_types,
813+
filters=filters,
814+
sync_attrs=sync_attrs,
815+
exclude_attrs=exclude_attrs,
695816
)
696817
return differ.calculate_diffs()
697818

698-
def diff_to(
819+
def diff_to( # pylint: disable=too-many-arguments, too-many-positional-arguments
699820
self,
700821
target: "Adapter",
701822
diff_class: Type[Diff] = Diff,
702823
flags: DiffSyncFlags = DiffSyncFlags.NONE,
703824
callback: Optional[Callable[[StrType, int, int], None]] = None,
825+
model_types: Optional[Set[StrType]] = None,
826+
filters: Optional[Dict[StrType, Callable]] = None,
827+
sync_attrs: Optional[Dict[StrType, Set[StrType]]] = None,
828+
exclude_attrs: Optional[Dict[StrType, Set[StrType]]] = None,
704829
) -> Diff:
705830
"""Generate a Diff describing the difference from this DiffSync to another one.
706831
@@ -710,8 +835,21 @@ def diff_to(
710835
flags: Flags influencing the behavior of this diff operation.
711836
callback: Function with parameters (stage, current, total), to be called at intervals as the
712837
calculation of the diff proceeds.
838+
model_types: Optional set of model type names to restrict the diff to.
839+
filters: Optional dict of {model_type: predicate_callable} to filter which objects are diffed.
840+
sync_attrs: Optional dict of {model_type: set_of_attr_names} to whitelist attributes for diffing.
841+
exclude_attrs: Optional dict of {model_type: set_of_attr_names} to exclude attributes from diffing.
713842
"""
714-
return target.diff_from(self, diff_class=diff_class, flags=flags, callback=callback)
843+
return target.diff_from(
844+
self,
845+
diff_class=diff_class,
846+
flags=flags,
847+
callback=callback,
848+
model_types=model_types,
849+
filters=filters,
850+
sync_attrs=sync_attrs,
851+
exclude_attrs=exclude_attrs,
852+
)
715853

716854
# ------------------------------------------------------------------------------
717855
# Object Storage Management

0 commit comments

Comments
 (0)