Skip to content

Commit 6e565a7

Browse files
FullyTypedAstraea Quinn S
authored andcommitted
feat(serdes): Add itemSerdes and pass it when executing children
Changes: - adds an itemSerdes field in map and parallel configs that will be passed onto run_in_child_context via Executor.execute_item and is used to serialize individual items for a batch. - passes MapConfig.serdes and ParallelConfig.serdes when in the top level scope (top run_in_child_context) which will invoke the closure that then builds the executor.
1 parent bf4b002 commit 6e565a7

File tree

6 files changed

+70
-16
lines changed

6 files changed

+70
-16
lines changed

src/aws_durable_execution_sdk_python/concurrency.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,7 @@ def __init__(
566566
sub_type_iteration: OperationSubType,
567567
name_prefix: str,
568568
serdes: SerDes | None,
569+
item_serdes: SerDes | None = None,
569570
summary_generator: SummaryGenerator | None = None,
570571
):
571572
"""Initialize ConcurrentExecutor.
@@ -604,6 +605,7 @@ def __init__(
604605
)
605606
self.executables_with_state: list[ExecutableWithState] = []
606607
self.serdes = serdes
608+
self.item_serdes = item_serdes
607609

608610
@abstractmethod
609611
def execute_item(
@@ -797,7 +799,7 @@ def execute_in_child_context(child_context: DurableContext) -> ResultType:
797799
execute_in_child_context,
798800
f"{self.name_prefix}{executable.index}",
799801
ChildConfig(
800-
serdes=self.serdes,
802+
serdes=self.item_serdes or self.serdes,
801803
sub_type=self.sub_type_iteration,
802804
summary_generator=self.summary_generator,
803805
),

src/aws_durable_execution_sdk_python/config.py

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,21 @@ class ParallelConfig:
125125
Default is CompletionConfig.all_successful() which requires all branches
126126
to succeed. Other options include first_successful() and all_completed().
127127
128-
serdes: Custom serialization/deserialization configuration for parallel results.
129-
If None, uses the default serializer. This allows custom handling of
130-
complex result types or optimization for large result sets.
128+
serdes: Custom serialization/deserialization configuration for BatchResult.
129+
Applied at the handler level to serialize the entire BatchResult object.
130+
If None, uses the default JSON serializer for BatchResult.
131+
132+
Backward Compatibility: If only 'serdes' is provided (no item_serdes),
133+
it will be used for both individual functions AND BatchResult serialization
134+
to maintain existing behavior.
135+
136+
item_serdes: Custom serialization/deserialization configuration for individual functions.
137+
Applied to each function's result as tasks complete in child contexts.
138+
If None, uses the default JSON serializer for individual function results.
139+
140+
When both 'serdes' and 'item_serdes' are provided:
141+
- item_serdes: Used for individual function results in child contexts
142+
- serdes: Used for the entire BatchResult at handler level
131143
132144
summary_generator: Function to generate compact summaries for large results (>256KB).
133145
When the serialized result exceeds CHECKPOINT_SIZE_LIMIT, this generator
@@ -150,6 +162,7 @@ class ParallelConfig:
150162
default_factory=CompletionConfig.all_successful
151163
)
152164
serdes: SerDes | None = None
165+
item_serdes: SerDes | None = None
153166
summary_generator: SummaryGenerator | None = None
154167

155168

@@ -181,9 +194,21 @@ class ChildConfig(Generic[T]):
181194
matching the TypeScript ChildConfig interface behavior.
182195
183196
Args:
184-
serdes: Custom serialization/deserialization configuration for the child context data.
185-
If None, uses the default serializer. This allows different serialization
186-
strategies for child operations vs parent operations.
197+
serdes: Custom serialization/deserialization configuration for BatchResult.
198+
Applied at the handler level to serialize the entire BatchResult object.
199+
If None, uses the default JSON serializer for BatchResult.
200+
201+
Backward Compatibility: If only 'serdes' is provided (no item_serdes),
202+
it will be used for both individual items AND BatchResult serialization
203+
to maintain existing behavior.
204+
205+
item_serdes: Custom serialization/deserialization configuration for individual items.
206+
Applied to each item's result as tasks complete in child contexts.
207+
If None, uses the default JSON serializer for individual items.
208+
209+
When both 'serdes' and 'item_serdes' are provided:
210+
- item_serdes: Used for individual item results in child contexts
211+
- serdes: Used for the entire BatchResult at handler level
187212
188213
sub_type: Operation subtype identifier used for tracking and debugging.
189214
Examples: OperationSubType.MAP_ITERATION, OperationSubType.PARALLEL_BRANCH.
@@ -208,6 +233,7 @@ class ChildConfig(Generic[T]):
208233

209234
# checkpoint_mode: CheckpointMode = CheckpointMode.CHECKPOINT_AT_START_AND_FINISH
210235
serdes: SerDes | None = None
236+
item_serdes: SerDes | None = None
211237
sub_type: OperationSubType | None = None
212238
summary_generator: SummaryGenerator | None = None
213239

@@ -273,9 +299,21 @@ class MapConfig:
273299
Default allows any number of failures. Use CompletionConfig.all_successful()
274300
to require all items to succeed.
275301
276-
serdes: Custom serialization/deserialization configuration for map results.
277-
If None, uses the default serializer. This allows custom handling of
278-
complex item types or optimization for large result collections.
302+
serdes: Custom serialization/deserialization configuration for BatchResult.
303+
Applied at the handler level to serialize the entire BatchResult object.
304+
If None, uses the default JSON serializer for BatchResult.
305+
306+
Backward Compatibility: If only 'serdes' is provided (no item_serdes),
307+
it will be used for both individual items AND BatchResult serialization
308+
to maintain existing behavior.
309+
310+
item_serdes: Custom serialization/deserialization configuration for individual items.
311+
Applied to each item's result as tasks complete in child contexts.
312+
If None, uses the default JSON serializer for individual items.
313+
314+
When both 'serdes' and 'item_serdes' are provided:
315+
- item_serdes: Used for individual item results in child contexts
316+
- serdes: Used for the entire BatchResult at handler level
279317
280318
summary_generator: Function to generate compact summaries for large results (>256KB).
281319
When the serialized result exceeds CHECKPOINT_SIZE_LIMIT, this generator
@@ -298,6 +336,7 @@ class MapConfig:
298336
item_batcher: ItemBatcher = field(default_factory=ItemBatcher)
299337
completion_config: CompletionConfig = field(default_factory=CompletionConfig)
300338
serdes: SerDes | None = None
339+
item_serdes: SerDes | None = None
301340
summary_generator: SummaryGenerator | None = None
302341

303342

src/aws_durable_execution_sdk_python/context.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def map(
303303
"""Execute a callable for each item in parallel."""
304304
map_name: str | None = self._resolve_step_name(name, func)
305305

306-
def map_in_child_context(child_context):
306+
def map_in_child_context(child_context) -> BatchResult[R]:
307307
return map_handler(
308308
items=inputs,
309309
func=func,
@@ -315,7 +315,10 @@ def map_in_child_context(child_context):
315315
return self.run_in_child_context(
316316
func=map_in_child_context,
317317
name=map_name,
318-
config=ChildConfig(sub_type=OperationSubType.MAP),
318+
config=ChildConfig(
319+
sub_type=OperationSubType.MAP,
320+
serdes=config.serdes if config is not None else None,
321+
),
319322
)
320323

321324
def parallel(
@@ -337,7 +340,10 @@ def parallel_in_child_context(child_context) -> BatchResult[T]:
337340
return self.run_in_child_context(
338341
func=parallel_in_child_context,
339342
name=name,
340-
config=ChildConfig(sub_type=OperationSubType.PARALLEL),
343+
config=ChildConfig(
344+
sub_type=OperationSubType.PARALLEL,
345+
serdes=config.serdes if config is not None else None,
346+
),
341347
)
342348

343349
def run_in_child_context(

src/aws_durable_execution_sdk_python/operation/map.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from aws_durable_execution_sdk_python.state import ExecutionState
2222
from aws_durable_execution_sdk_python.types import DurableContext, SummaryGenerator
2323

24-
2524
logger = logging.getLogger(__name__)
2625

2726
# Input item type
@@ -42,6 +41,7 @@ def __init__(
4241
name_prefix: str,
4342
serdes: SerDes | None,
4443
summary_generator: SummaryGenerator | None = None,
44+
item_serdes: SerDes | None = None,
4545
):
4646
super().__init__(
4747
executables=executables,
@@ -52,6 +52,7 @@ def __init__(
5252
name_prefix=name_prefix,
5353
serdes=serdes,
5454
summary_generator=summary_generator,
55+
item_serdes=item_serdes,
5556
)
5657
self.items = items
5758

src/aws_durable_execution_sdk_python/operation/parallel.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def __init__(
3535
name_prefix: str,
3636
serdes: SerDes | None,
3737
summary_generator: SummaryGenerator | None = None,
38+
item_serdes: SerDes | None = None,
3839
):
3940
super().__init__(
4041
executables=executables,
@@ -45,6 +46,7 @@ def __init__(
4546
name_prefix=name_prefix,
4647
serdes=serdes,
4748
summary_generator=summary_generator,
49+
item_serdes=item_serdes,
4850
)
4951

5052
@classmethod
@@ -98,7 +100,7 @@ def parallel_handler(
98100

99101

100102
class ParallelSummaryGenerator:
101-
def __call__(self, result: BatchResult[R]) -> str:
103+
def __call__(self, result: BatchResult) -> str:
102104
fields = {
103105
"totalCount": result.total_count,
104106
"successCount": result.success_count,

tests/operation/child_test.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Unit tests for child handler."""
22

33
import json
4+
from typing import cast
45
from unittest.mock import Mock
56

67
import pytest
@@ -16,6 +17,7 @@
1617
)
1718
from aws_durable_execution_sdk_python.operation.child import child_handler
1819
from aws_durable_execution_sdk_python.state import ExecutionState
20+
from aws_durable_execution_sdk_python.types import SummaryGenerator
1921
from tests.serdes_test import CustomDictSerDes
2022

2123

@@ -383,7 +385,9 @@ def test_child_handler_large_payload_with_summary_generator() -> None:
383385
def my_summary(result: str) -> str:
384386
return "summary"
385387

386-
child_config: ChildConfig = ChildConfig[str](summary_generator=my_summary)
388+
child_config: ChildConfig = ChildConfig[str](
389+
summary_generator=cast(SummaryGenerator, my_summary)
390+
)
387391

388392
actual_result = child_handler(
389393
mock_callable,

0 commit comments

Comments
 (0)