diff --git a/monai/transforms/compose.py b/monai/transforms/compose.py index 95653ffbd4..1767ed4a20 100644 --- a/monai/transforms/compose.py +++ b/monai/transforms/compose.py @@ -255,6 +255,90 @@ def __init__( self.set_random_state(seed=get_seed()) self.overrides = overrides + # Automatically assign group ID to child transforms for inversion tracking + self._set_transform_groups() + + def _set_transform_groups(self): + """ + Automatically set group IDs on child transforms for inversion tracking. + + This allows Invertd to identify which transforms belong to this + ``Compose`` instance, including wrapped transforms (for example, + array transforms inside dictionary transforms). + + Args: + None. + + Returns: + None. + """ + from monai.transforms.inverse import TraceableTransform + + group_id = str(id(self)) + visited = set() # Track visited objects to avoid infinite recursion + + def set_group_recursive(obj, gid, allow_compose: bool = False): + """ + Recursively set a group ID on a transform and its wrapped transforms. + + Args: + obj: Transform instance to process. + gid: Group identifier to assign. + allow_compose: Whether to set group on ``Compose`` instances. + ``Compose`` internals are not traversed to preserve nested + pipeline boundaries. + + Returns: + None. + """ + if obj is None or isinstance(obj, (bool, int, float, str, bytes)): + return + + # Avoid infinite recursion + obj_id = id(obj) + if obj_id in visited: + return + visited.add(obj_id) + + if isinstance(obj, Compose): + if allow_compose: + obj._group = gid + return + + if isinstance(obj, TraceableTransform): + obj._group = gid + + if isinstance(obj, Mapping): + for attr in obj.values(): + set_group_recursive(attr, gid) + return + + if isinstance(obj, (list, tuple, set)): + for attr in obj: + set_group_recursive(attr, gid) + return + + attrs: list[Any] = [] + if hasattr(obj, "__dict__"): + attrs.extend(vars(obj).values()) + + slots = getattr(type(obj), "__slots__", ()) + if isinstance(slots, str): + slots = (slots,) + for slot in slots: + if slot.startswith("__"): + continue + try: + attrs.append(getattr(obj, slot)) + except AttributeError: + continue + + for attr in attrs: + set_group_recursive(attr, gid) + + for transform in self.transforms: + set_group_recursive(transform, group_id, allow_compose=True) + @LazyTransform.lazy.setter # type: ignore def lazy(self, val: bool): self._lazy = val diff --git a/monai/transforms/inverse.py b/monai/transforms/inverse.py index 2f57f4614a..d7cdedc0ef 100644 --- a/monai/transforms/inverse.py +++ b/monai/transforms/inverse.py @@ -82,6 +82,10 @@ def _init_trace_threadlocal(self): if not hasattr(self._tracing, "value"): self._tracing.value = MONAIEnvVars.trace_transform() != "0" + # Initialize group identifier (set by Compose for automatic group tracking) + if not hasattr(self, "_group"): + self._group: str | None = None + def __getstate__(self): """When pickling, remove the `_tracing` member from the output, if present, since it's not picklable.""" _dict = dict(getattr(self, "__dict__", {})) # this makes __dict__ always present in the unpickled object @@ -119,13 +123,22 @@ def get_transform_info(self) -> dict: """ Return a dictionary with the relevant information pertaining to an applied transform. """ + # Ensure _group is initialized + self._init_trace_threadlocal() + vals = ( self.__class__.__name__, id(self), self.tracing, self._do_transform if hasattr(self, "_do_transform") else True, ) - return dict(zip(self.transform_info_keys(), vals)) + info = dict(zip(self.transform_info_keys(), vals)) + + # Add group if set (automatically set by Compose) + if self._group is not None: + info[TraceKeys.GROUP] = self._group + + return info def push_transform(self, data, *args, **kwargs): """ diff --git a/monai/transforms/post/dictionary.py b/monai/transforms/post/dictionary.py index 65fdd22b22..e51fc7af37 100644 --- a/monai/transforms/post/dictionary.py +++ b/monai/transforms/post/dictionary.py @@ -48,7 +48,7 @@ from monai.transforms.transform import MapTransform from monai.transforms.utility.array import ToTensor from monai.transforms.utils import allow_missing_keys_mode, convert_applied_interp_mode -from monai.utils import PostFix, convert_to_tensor, ensure_tuple, ensure_tuple_rep +from monai.utils import PostFix, TraceKeys, convert_to_tensor, ensure_tuple, ensure_tuple_rep from monai.utils.type_conversion import convert_to_dst_type __all__ = [ @@ -859,6 +859,27 @@ def __init__( self.post_func = ensure_tuple_rep(post_func, len(self.keys)) self._totensor = ToTensor() + def _filter_transforms_by_group(self, all_transforms: list[dict]) -> list[dict]: + """Filter applied operations to only include transforms from the target pipeline. + + Uses automatic group tracking where ``Compose`` assigns its ID to child transforms. + + Args: + all_transforms: Full list of applied transform metadata dictionaries. + + Returns: + Subset whose ``TraceKeys.GROUP`` matches ``str(id(self.transform))``, or the original + list when no match is found for backward compatibility. + """ + # Get the group ID of the transform (Compose instance) + target_group = str(id(self.transform)) + + # Filter transforms that match the target group + filtered = [xform for xform in all_transforms if xform.get(TraceKeys.GROUP) == target_group] + + # If no transforms match (backward compatibility), return all transforms + return filtered if filtered else all_transforms + def __call__(self, data: Mapping[Hashable, Any]) -> dict[Hashable, Any]: d = dict(data) for ( @@ -894,10 +915,13 @@ def __call__(self, data: Mapping[Hashable, Any]) -> dict[Hashable, Any]: orig_meta_key = orig_meta_key or f"{orig_key}_{meta_key_postfix}" if orig_key in d and isinstance(d[orig_key], MetaTensor): - transform_info = d[orig_key].applied_operations + all_transforms = d[orig_key].applied_operations meta_info = d[orig_key].meta + + # Automatically filter by Compose instance group ID + transform_info = self._filter_transforms_by_group(all_transforms) else: - transform_info = d[InvertibleTransform.trace_key(orig_key)] + transform_info = self._filter_transforms_by_group(d[InvertibleTransform.trace_key(orig_key)]) meta_info = d.get(orig_meta_key, {}) if nearest_interp: transform_info = convert_applied_interp_mode( diff --git a/monai/utils/enums.py b/monai/utils/enums.py index f5bb6c4c5b..52d9eed5f5 100644 --- a/monai/utils/enums.py +++ b/monai/utils/enums.py @@ -334,6 +334,7 @@ class TraceKeys(StrEnum): TRACING: str = "tracing" STATUSES: str = "statuses" LAZY: str = "lazy" + GROUP: str = "group" class TraceStatusKeys(StrEnum): diff --git a/tests/transforms/compose/test_compose.py b/tests/transforms/compose/test_compose.py index 96c6d4606f..132aaeabb5 100644 --- a/tests/transforms/compose/test_compose.py +++ b/tests/transforms/compose/test_compose.py @@ -268,6 +268,43 @@ def test_data_loader_2(self): self.assertAlmostEqual(out_1.cpu().item(), 0.28602141572) set_determinism(None) + def test_set_transform_groups_on_wrapped_transform_attributes(self): + class _IdentityInvertible(mt.InvertibleTransform): + def __call__(self, data): + return data + + def inverse(self, data): + return data + + class _WrapperWithTransform: + def __init__(self): + self.transform = _IdentityInvertible() + + def __call__(self, data): + return self.transform(data) + + class _WrapperWithTransforms: + def __init__(self): + self.transforms = [_IdentityInvertible(), {"inner": _IdentityInvertible()}] + + def __call__(self, data): + for transform in self.transforms: + if isinstance(transform, dict): + for nested_transform in transform.values(): + data = nested_transform(data) + else: + data = transform(data) + return data + + wrapped_transform = _WrapperWithTransform() + wrapped_transforms = _WrapperWithTransforms() + composed = mt.Compose([wrapped_transform, wrapped_transforms]) + expected_group = str(id(composed)) + + self.assertEqual(getattr(wrapped_transform.transform, "_group", None), expected_group) + self.assertEqual(getattr(wrapped_transforms.transforms[0], "_group", None), expected_group) + self.assertEqual(getattr(wrapped_transforms.transforms[1]["inner"], "_group", None), expected_group) + def test_flatten_and_len(self): x = mt.EnsureChannelFirst(channel_dim="no_channel") t1 = mt.Compose([x, x, x, x, mt.Compose([mt.Compose([x, x]), x, x])]) diff --git a/tests/transforms/inverse/test_invertd.py b/tests/transforms/inverse/test_invertd.py index 2b5e9da85d..abe826ebd8 100644 --- a/tests/transforms/inverse/test_invertd.py +++ b/tests/transforms/inverse/test_invertd.py @@ -17,7 +17,7 @@ import numpy as np import torch -from monai.data import DataLoader, Dataset, create_test_image_3d, decollate_batch +from monai.data import DataLoader, Dataset, MetaTensor, create_test_image_2d, create_test_image_3d, decollate_batch from monai.transforms import ( CastToTyped, Compose, @@ -36,7 +36,10 @@ ScaleIntensityd, Spacingd, ) -from monai.utils import set_determinism +from monai.transforms.inverse import InvertibleTransform +from monai.transforms.transform import MapTransform +from monai.transforms.utility.dictionary import Lambdad +from monai.utils import TraceKeys, set_determinism from tests.test_utils import assert_allclose, make_nifti_image KEYS = ["image", "label"] @@ -137,6 +140,236 @@ def test_invert(self): set_determinism(seed=None) + def test_invertd_with_postprocessing_transforms(self): + """Test that Invertd ignores postprocessing transforms using automatic group tracking. + + This is a regression test for the issue where Invertd would fail when + postprocessing contains invertible transforms before Invertd is called. + The fix uses automatic group tracking where Compose assigns its ID to child transforms. + """ + img, _ = create_test_image_2d(60, 60, 2, 10, num_seg_classes=2) + img = MetaTensor(img, meta={"original_channel_dim": float("nan"), "pixdim": [1.0, 1.0, 1.0]}) + key = "image" + + # Preprocessing pipeline + preprocessing = Compose([EnsureChannelFirstd(key), Spacingd(key, pixdim=[2.0, 2.0])]) + + # Postprocessing with Lambdad before Invertd + # Previously this would raise RuntimeError about transform ID mismatch + postprocessing = Compose( + [ + Lambdad(key, func=lambda x: x), # Should be ignored during inversion + Invertd(key, transform=preprocessing, orig_keys=key), + ] + ) + + # Apply transforms + item = {key: img} + pre = preprocessing(item) + + # This should NOT raise an error (was failing before the fix). + # Any exception here means the bug is not fixed. + post = postprocessing(pre) + self.assertIsNotNone(post) + self.assertIn(key, post) + + def test_invertd_multiple_pipelines(self): + """Test that Invertd correctly handles multiple independent preprocessing pipelines.""" + img1, _ = create_test_image_2d(60, 60, 2, 10, num_seg_classes=2) + img1 = MetaTensor(img1, meta={"original_channel_dim": float("nan"), "pixdim": [1.0, 1.0, 1.0]}) + img2, _ = create_test_image_2d(60, 60, 2, 10, num_seg_classes=2) + img2 = MetaTensor(img2, meta={"original_channel_dim": float("nan"), "pixdim": [1.0, 1.0, 1.0]}) + + # Two different preprocessing pipelines + preprocessing1 = Compose([EnsureChannelFirstd("image1"), Spacingd("image1", pixdim=[2.0, 2.0])]) + + preprocessing2 = Compose([EnsureChannelFirstd("image2"), Spacingd("image2", pixdim=[1.5, 1.5])]) + + # Postprocessing that inverts both + postprocessing = Compose( + [ + Lambdad(["image1", "image2"], func=lambda x: x), + Invertd("image1", transform=preprocessing1, orig_keys="image1"), + Invertd("image2", transform=preprocessing2, orig_keys="image2"), + ] + ) + + # Apply transforms + item = {"image1": img1, "image2": img2} + pre1 = preprocessing1(item) + pre2 = preprocessing2(pre1) + + # Should not raise error - each Invertd should only invert its own pipeline + post = postprocessing(pre2) + self.assertIn("image1", post) + self.assertIn("image2", post) + + def test_invertd_multiple_postprocessing_transforms(self): + """Test Invertd with multiple invertible transforms in postprocessing before Invertd.""" + img, _ = create_test_image_2d(60, 60, 2, 10, num_seg_classes=2) + img = MetaTensor(img, meta={"original_channel_dim": float("nan"), "pixdim": [1.0, 1.0, 1.0]}) + key = "image" + + preprocessing = Compose([EnsureChannelFirstd(key), Spacingd(key, pixdim=[2.0, 2.0])]) + + # Multiple transforms in postprocessing before Invertd + postprocessing = Compose( + [ + Lambdad(key, func=lambda x: x * 2), + Lambdad(key, func=lambda x: x + 1), + Lambdad(key, func=lambda x: x - 1), + Invertd(key, transform=preprocessing, orig_keys=key), + ] + ) + + item = {key: img} + pre = preprocessing(item) + post = postprocessing(pre) + + self.assertIsNotNone(post) + self.assertIn(key, post) + + def test_invertd_group_isolation(self): + """Test that groups correctly isolate transforms from different Compose instances.""" + img, _ = create_test_image_2d(60, 60, 2, 10, num_seg_classes=2) + img = MetaTensor(img, meta={"original_channel_dim": float("nan"), "pixdim": [1.0, 1.0, 1.0]}) + key = "image" + + # First preprocessing + preprocessing1 = Compose([EnsureChannelFirstd(key), Spacingd(key, pixdim=[2.0, 2.0])]) + + # Second preprocessing (different pipeline) + preprocessing2 = Compose([Spacingd(key, pixdim=[1.5, 1.5])]) + + item = {key: img} + pre1 = preprocessing1(item) + + # Verify group IDs are in applied_operations + self.assertTrue(len(pre1[key].applied_operations) > 0) + group1 = pre1[key].applied_operations[0].get("group") + self.assertIsNotNone(group1) + self.assertEqual(group1, str(id(preprocessing1))) + + # Apply second preprocessing + pre2 = preprocessing2(pre1) + self.assertTupleEqual(pre2[key].shape, (1, 40, 40)) + + # Should have operations from both pipelines with different groups + groups = [op.get("group") for op in pre2[key].applied_operations] + preprocessing1_group = str(id(preprocessing1)) + preprocessing2_group = str(id(preprocessing2)) + self.assertIn(preprocessing1_group, groups) + self.assertIn(preprocessing2_group, groups) + self.assertEqual(groups.count(preprocessing1_group), 1) + self.assertEqual(groups.count(preprocessing2_group), 1) + + # Inverting preprocessing1 should only invert its transforms + inverter = Invertd(key, transform=preprocessing1, orig_keys=key) + inverted = inverter(pre2) + self.assertIsNotNone(inverted) + self.assertTupleEqual(inverted[key].shape, (1, 60, 60)) + + def test_invertd_filters_trace_key_transforms_by_group(self): + """Test group filtering when Invertd reads transforms from ``trace_key``.""" + + class _IdentityMapInvertible(MapTransform, InvertibleTransform): + def __init__(self, keys): + super().__init__(keys) + + def __call__(self, data): + return dict(data) + + def inverse(self, data): + return dict(data) + + key = "image" + target_transform = _IdentityMapInvertible(key) + target_group = str(id(target_transform)) + item = { + key: torch.zeros((1, 8, 8), dtype=torch.float32), + InvertibleTransform.trace_key(key): [ + {TraceKeys.GROUP: target_group}, + {TraceKeys.GROUP: "other-group"}, + ], + } + + inverter = Invertd(key, transform=target_transform, orig_keys=key, nearest_interp=False) + inverted = inverter(item) + + trace_key = InvertibleTransform.trace_key(key) + self.assertEqual(len(inverted[trace_key]), 1) + self.assertEqual(inverted[trace_key][0].get(TraceKeys.GROUP), target_group) + + def test_compose_inverse_with_groups(self): + """Test that Compose.inverse() works correctly with automatic group tracking.""" + img, _ = create_test_image_2d(60, 60, 2, 10, num_seg_classes=2) + img = MetaTensor(img, meta={"original_channel_dim": float("nan"), "pixdim": [1.0, 1.0, 1.0]}) + key = "image" + + # Create a preprocessing pipeline + preprocessing = Compose([EnsureChannelFirstd(key), Spacingd(key, pixdim=[2.0, 2.0])]) + + # Apply preprocessing + item = {key: img} + pre = preprocessing(item) + + # Call inverse() directly on the Compose object + inverted = preprocessing.inverse(pre) + + # Should successfully invert + self.assertIsNotNone(inverted) + self.assertIn(key, inverted) + # Shape should be restored after inversion + self.assertEqual(inverted[key].shape[1:], img.shape) + + def test_compose_inverse_with_postprocessing_groups(self): + """Test Compose.inverse() when data has been through multiple pipelines with different groups.""" + img, _ = create_test_image_2d(60, 60, 2, 10, num_seg_classes=2) + img = MetaTensor(img, meta={"original_channel_dim": float("nan"), "pixdim": [1.0, 1.0, 1.0]}) + key = "image" + + # Preprocessing pipeline + preprocessing = Compose([EnsureChannelFirstd(key), Spacingd(key, pixdim=[2.0, 2.0])]) + + # Postprocessing pipeline (different group) + postprocessing = Compose([Lambdad(key, func=lambda x: x * 2)]) + + # Apply both pipelines + item = {key: img} + pre = preprocessing(item) + post = postprocessing(pre) + + # Now call inverse() directly on preprocessing + # This tests that inverse() can handle data that has transforms from multiple groups + # This WILL fail because applied_operations contains postprocessing transforms + # and inverse() doesn't do group filtering (only Invertd does) + with self.assertRaises(RuntimeError): + preprocessing.inverse(post) + + def test_mixed_invertd_and_compose_inverse(self): + """Test mixing Invertd (with group filtering) and Compose.inverse() (without filtering).""" + img, _ = create_test_image_2d(60, 60, 2, 10, num_seg_classes=2) + img = MetaTensor(img, meta={"original_channel_dim": float("nan"), "pixdim": [1.0, 1.0, 1.0]}) + key = "image" + + # First pipeline + pipeline1 = Compose([EnsureChannelFirstd(key), Spacingd(key, pixdim=[2.0, 2.0])]) + + # Apply first pipeline + item = {key: img} + result1 = pipeline1(item) + + # Use Compose.inverse() directly - should work fine + inverted1 = pipeline1.inverse(result1) + self.assertIsNotNone(inverted1) + self.assertEqual(inverted1[key].shape[1:], img.shape) + + # Now apply pipeline again and use Invertd + result2 = pipeline1(item) + inverter = Invertd(key, transform=pipeline1, orig_keys=key) + inverted2 = inverter(result2) + self.assertIsNotNone(inverted2) + if __name__ == "__main__": unittest.main()