Skip to content

Commit 5abdb88

Browse files
FullyTypedAstraea Quinn S
authored andcommitted
feat: add summary generators for map/parallel operations
Implement summary generators to handle large BatchResult payloads (>256KB) in map and parallel operations, matching TypeScript SDK behavior. When results exceed checkpoint size limit, operations use ReplayChildren mode with compact JSON summaries instead of full result serialization. Key changes: - Add SummaryGenerator protocol and default implementations - Extend ParallelConfig/MapConfig with summary_generator field - Update ConcurrentExecutor to pass summary generators to child contexts - Implement replay_children logic based on payload size, not generator presence - Add test coverage for small/large payload scenarios This enables efficient handling of large concurrent operation results while maintaining deterministic replay behavior and preventing checkpoint size violations.
1 parent 3cde4ac commit 5abdb88

File tree

11 files changed

+753
-24
lines changed

11 files changed

+753
-24
lines changed

src/aws_durable_execution_sdk_python/concurrency.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from aws_durable_execution_sdk_python.lambda_service import OperationSubType
3030
from aws_durable_execution_sdk_python.serdes import SerDes
3131
from aws_durable_execution_sdk_python.state import ExecutionState
32-
from aws_durable_execution_sdk_python.types import DurableContext
32+
from aws_durable_execution_sdk_python.types import DurableContext, SummaryGenerator
3333

3434

3535
logger = logging.getLogger(__name__)
@@ -566,13 +566,24 @@ def __init__(
566566
sub_type_iteration: OperationSubType,
567567
name_prefix: str,
568568
serdes: SerDes | None,
569+
summary_generator: SummaryGenerator | None = None,
569570
):
571+
"""Initialize ConcurrentExecutor.
572+
573+
Args:
574+
summary_generator: Optional function to generate compact summaries for large results.
575+
When the serialized result exceeds 256KB, this generator creates a JSON summary
576+
instead of checkpointing the full result. Used by map/parallel operations to
577+
handle large BatchResult payloads efficiently. Matches TypeScript behavior in
578+
run-in-child-context-handler.ts.
579+
"""
570580
self.executables = executables
571581
self.max_concurrency = max_concurrency
572582
self.completion_config = completion_config
573583
self.sub_type_top = sub_type_top
574584
self.sub_type_iteration = sub_type_iteration
575585
self.name_prefix = name_prefix
586+
self.summary_generator = summary_generator
576587

577588
# Event-driven state tracking for when the executor is done
578589
self._completion_event = threading.Event()
@@ -785,7 +796,11 @@ def execute_in_child_context(child_context: DurableContext) -> ResultType:
785796
return run_in_child_context(
786797
execute_in_child_context,
787798
f"{self.name_prefix}{executable.index}",
788-
ChildConfig(serdes=self.serdes, sub_type=self.sub_type_iteration),
799+
ChildConfig(
800+
serdes=self.serdes,
801+
sub_type=self.sub_type_iteration,
802+
summary_generator=self.summary_generator,
803+
),
789804
)
790805

791806

src/aws_durable_execution_sdk_python/config.py

Lines changed: 169 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from aws_durable_execution_sdk_python.lambda_service import OperationSubType
2121
from aws_durable_execution_sdk_python.serdes import SerDes
22+
from aws_durable_execution_sdk_python.types import SummaryGenerator
2223

2324

2425
Numeric = int | float # deliberately leaving off complex
@@ -39,6 +40,38 @@ class TerminationMode(Enum):
3940

4041
@dataclass(frozen=True)
4142
class CompletionConfig:
43+
"""Configuration for determining when parallel/map operations complete.
44+
45+
This class defines the success/failure criteria for operations that process
46+
multiple items or branches concurrently.
47+
48+
Args:
49+
min_successful: Minimum number of successful completions required.
50+
If None, no minimum is enforced. Use this to implement "at least N
51+
must succeed" semantics.
52+
53+
tolerated_failure_count: Maximum number of failures allowed before
54+
the operation is considered failed. If None, no limit on failure count.
55+
Use this to implement "fail fast after N failures" semantics.
56+
57+
tolerated_failure_percentage: Maximum percentage of failures allowed
58+
(0.0 to 100.0). If None, no percentage limit is enforced.
59+
Use this to implement "fail if more than X% fail" semantics.
60+
61+
Note:
62+
The operation completes when any of the completion criteria are met:
63+
- Enough successes (min_successful reached)
64+
- Too many failures (tolerated limits exceeded)
65+
- All items/branches completed
66+
67+
Example:
68+
# Succeed if at least 3 succeed, fail if more than 2 fail
69+
config = CompletionConfig(
70+
min_successful=3,
71+
tolerated_failure_count=2
72+
)
73+
"""
74+
4275
min_successful: int | None = None
4376
tolerated_failure_count: int | None = None
4477
tolerated_failure_percentage: int | float | None = None
@@ -77,11 +110,47 @@ def all_successful():
77110

78111
@dataclass(frozen=True)
79112
class ParallelConfig:
113+
"""Configuration options for parallel execution operations.
114+
115+
This class configures how parallel operations are executed, including
116+
concurrency limits, completion criteria, and serialization behavior.
117+
118+
Args:
119+
max_concurrency: Maximum number of parallel branches to execute concurrently.
120+
If None, no limit is imposed and all branches run concurrently.
121+
Use this to control resource usage and prevent overwhelming the system.
122+
123+
completion_config: Defines when the parallel operation should complete.
124+
Controls success/failure criteria for the overall parallel operation.
125+
Default is CompletionConfig.all_successful() which requires all branches
126+
to succeed. Other options include first_successful() and all_completed().
127+
128+
serdes: Custom serialization/deserialization configuration for parallel results.
129+
If None, uses the default serializer. This allows custom handling of
130+
complex result types or optimization for large result sets.
131+
132+
summary_generator: Function to generate compact summaries for large results (>256KB).
133+
When the serialized result exceeds CHECKPOINT_SIZE_LIMIT, this generator
134+
creates a JSON summary instead of checkpointing the full result. The operation
135+
is marked with ReplayChildren=true to reconstruct the full result during replay.
136+
137+
Used internally by map/parallel operations to handle large BatchResult payloads.
138+
Signature: (result: T) -> str
139+
140+
Example:
141+
# Run at most 3 branches concurrently, succeed if any one succeeds
142+
config = ParallelConfig(
143+
max_concurrency=3,
144+
completion_config=CompletionConfig.first_successful()
145+
)
146+
"""
147+
80148
max_concurrency: int | None = None
81149
completion_config: CompletionConfig = field(
82150
default_factory=CompletionConfig.all_successful
83151
)
84152
serdes: SerDes | None = None
153+
summary_generator: SummaryGenerator | None = None
85154

86155

87156
class StepSemantics(Enum):
@@ -106,12 +175,41 @@ class CheckpointMode(Enum):
106175

107176
@dataclass(frozen=True)
108177
class ChildConfig(Generic[T]):
109-
"""Options when running inside a child context."""
178+
"""Configuration options for child context operations.
179+
180+
This class configures how child contexts are executed and checkpointed,
181+
matching the TypeScript ChildConfig interface behavior.
182+
183+
Args:
184+
serdes: Custom serialization/deserialization configuration for the child context data.
185+
If None, uses the default serializer. This allows different serialization
186+
strategies for child operations vs parent operations.
187+
188+
sub_type: Operation subtype identifier used for tracking and debugging.
189+
Examples: OperationSubType.MAP_ITERATION, OperationSubType.PARALLEL_BRANCH.
190+
Used internally by the execution engine for operation classification.
191+
192+
summary_generator: Function to generate compact summaries for large results (>256KB).
193+
When the serialized result exceeds CHECKPOINT_SIZE_LIMIT, this generator
194+
creates a JSON summary instead of checkpointing the full result. The operation
195+
is marked with ReplayChildren=true to reconstruct the full result during replay.
196+
197+
Used internally by map/parallel operations to handle large BatchResult payloads.
198+
Signature: (result: T) -> str
199+
Note:
200+
checkpoint_mode field is commented out as it's not currently implemented.
201+
When implemented, it will control when checkpoints are created:
202+
- CHECKPOINT_AT_START_AND_FINISH: Checkpoint at both start and completion (default)
203+
- CHECKPOINT_AT_FINISH: Only checkpoint when operation completes
204+
- NO_CHECKPOINT: No automatic checkpointing
205+
206+
See TypeScript reference: aws-durable-execution-sdk-js/src/types/index.ts
207+
"""
110208

111209
# checkpoint_mode: CheckpointMode = CheckpointMode.CHECKPOINT_AT_START_AND_FINISH
112210
serdes: SerDes | None = None
113211
sub_type: OperationSubType | None = None
114-
summary_generator: Callable[[T], str] | None = None
212+
summary_generator: SummaryGenerator | None = None
115213

116214

117215
class ItemsPerBatchUnit(Enum):
@@ -121,17 +219,86 @@ class ItemsPerBatchUnit(Enum):
121219

122220
@dataclass(frozen=True)
123221
class ItemBatcher(Generic[T]):
222+
"""Configuration for batching items in map operations.
223+
224+
This class defines how individual items should be grouped together into batches
225+
for more efficient processing in map operations.
226+
227+
Args:
228+
max_items_per_batch: Maximum number of items to include in a single batch.
229+
If 0 (default), no item count limit is applied. Use this to control
230+
batch size when processing many small items.
231+
232+
max_item_bytes_per_batch: Maximum total size in bytes for items in a batch.
233+
If 0 (default), no size limit is applied. Use this to control memory
234+
usage when processing large items or when items vary significantly in size.
235+
236+
batch_input: Additional data to include with each batch.
237+
This data is passed to the processing function along with the batched items.
238+
Useful for providing context or configuration that applies to all items
239+
in the batch.
240+
241+
Example:
242+
# Batch up to 100 items or 1MB, whichever comes first
243+
batcher = ItemBatcher(
244+
max_items_per_batch=100,
245+
max_item_bytes_per_batch=1024*1024,
246+
batch_input={"processing_mode": "fast"}
247+
)
248+
"""
249+
124250
max_items_per_batch: int = 0
125251
max_item_bytes_per_batch: int | float = 0
126252
batch_input: T | None = None
127253

128254

129255
@dataclass(frozen=True)
130256
class MapConfig:
257+
"""Configuration options for map operations over collections.
258+
259+
This class configures how map operations process collections of items,
260+
including concurrency, batching, completion criteria, and serialization.
261+
262+
Args:
263+
max_concurrency: Maximum number of items to process concurrently.
264+
If None, no limit is imposed and all items are processed concurrently.
265+
Use this to control resource usage when processing large collections.
266+
267+
item_batcher: Configuration for batching multiple items together for processing.
268+
Allows grouping items by count or size to optimize processing efficiency.
269+
Default is no batching (each item processed individually).
270+
271+
completion_config: Defines when the map operation should complete.
272+
Controls success/failure criteria for the overall map operation.
273+
Default allows any number of failures. Use CompletionConfig.all_successful()
274+
to require all items to succeed.
275+
276+
serdes: Custom serialization/deserialization configuration for map results.
277+
If None, uses the default serializer. This allows custom handling of
278+
complex item types or optimization for large result collections.
279+
280+
summary_generator: Function to generate compact summaries for large results (>256KB).
281+
When the serialized result exceeds CHECKPOINT_SIZE_LIMIT, this generator
282+
creates a JSON summary instead of checkpointing the full result. The operation
283+
is marked with ReplayChildren=true to reconstruct the full result during replay.
284+
285+
Used internally by map/parallel operations to handle large BatchResult payloads.
286+
Signature: (result: T) -> str
287+
288+
Example:
289+
# Process 5 items at a time, batch by count, require all to succeed
290+
config = MapConfig(
291+
max_concurrency=5,
292+
item_batcher=ItemBatcher(max_items_per_batch=10),
293+
completion_config=CompletionConfig.all_successful()
294+
)
295+
"""
296+
131297
max_concurrency: int | None = None
132298
item_batcher: ItemBatcher = field(default_factory=ItemBatcher)
133299
completion_config: CompletionConfig = field(default_factory=CompletionConfig)
134300
serdes: SerDes | None = None
301+
summary_generator: SummaryGenerator | None = None
135302

136303

137304
@dataclass

src/aws_durable_execution_sdk_python/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ def parallel(
326326
) -> BatchResult[T]:
327327
"""Execute multiple callables in parallel."""
328328

329-
def parallel_in_child_context(child_context):
329+
def parallel_in_child_context(child_context) -> BatchResult[T]:
330330
return parallel_handler(
331331
callables=functions,
332332
config=config,

src/aws_durable_execution_sdk_python/operation/child.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,28 @@ def child_handler(
8888
operation_id=operation_identifier.operation_id,
8989
durable_execution_arn=state.durable_execution_arn,
9090
)
91+
# Summary Generator Logic:
92+
# When the serialized result exceeds 256KB, we use ReplayChildren mode to avoid
93+
# checkpointing large payloads. Instead, we checkpoint a compact summary and mark
94+
# the operation for replay. This matches the TypeScript implementation behavior.
95+
#
96+
# See TypeScript reference:
97+
# - aws-durable-execution-sdk-js/src/handlers/run-in-child-context-handler/run-in-child-context-handler.ts (lines ~200-220)
98+
#
99+
# The summary generator creates a JSON summary with metadata (type, counts, status)
100+
# instead of the full BatchResult. During replay, the child context is re-executed
101+
# to reconstruct the full result rather than deserializing from the checkpoint.
91102
replay_children: bool = False
92103
if len(serialized_result) > CHECKPOINT_SIZE_LIMIT:
93104
logger.debug(
94-
"Large payload detected, using ReplayChildren mode: id: %s, name: %s",
105+
"Large payload detected, using ReplayChildren mode: id: %s, name: %s, payload_size: %d, limit: %d",
95106
operation_identifier.operation_id,
96107
operation_identifier.name,
108+
len(serialized_result),
109+
CHECKPOINT_SIZE_LIMIT,
97110
)
98111
replay_children = True
112+
# Use summary generator if provided, otherwise use empty string (matches TypeScript)
99113
serialized_result = (
100114
config.summary_generator(raw_result) if config.summary_generator else ""
101115
)

src/aws_durable_execution_sdk_python/operation/map.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
import json
56
import logging
67
from collections.abc import Callable, Sequence
78
from typing import TYPE_CHECKING, Generic, TypeVar
@@ -18,7 +19,7 @@
1819
from aws_durable_execution_sdk_python.config import ChildConfig
1920
from aws_durable_execution_sdk_python.serdes import SerDes
2021
from aws_durable_execution_sdk_python.state import ExecutionState
21-
from aws_durable_execution_sdk_python.types import DurableContext
22+
from aws_durable_execution_sdk_python.types import DurableContext, SummaryGenerator
2223

2324

2425
logger = logging.getLogger(__name__)
@@ -40,6 +41,7 @@ def __init__(
4041
iteration_sub_type: OperationSubType,
4142
name_prefix: str,
4243
serdes: SerDes | None,
44+
summary_generator: SummaryGenerator | None = None,
4345
):
4446
super().__init__(
4547
executables=executables,
@@ -49,6 +51,7 @@ def __init__(
4951
sub_type_iteration=iteration_sub_type,
5052
name_prefix=name_prefix,
5153
serdes=serdes,
54+
summary_generator=summary_generator,
5255
)
5356
self.items = items
5457

@@ -73,6 +76,7 @@ def from_items(
7376
iteration_sub_type=OperationSubType.MAP_ITERATION,
7477
name_prefix="map-item-",
7578
serdes=config.serdes,
79+
summary_generator=config.summary_generator,
7680
)
7781

7882
def execute_item(self, child_context, executable: Executable[Callable]) -> R:
@@ -93,7 +97,28 @@ def map_handler(
9397
],
9498
) -> BatchResult[R]:
9599
"""Execute a callable for each item in parallel."""
100+
# Summary Generator Construction (matches TypeScript implementation):
101+
# Construct the summary generator at the handler level, just like TypeScript does in map-handler.ts.
102+
# This matches the pattern where handlers are responsible for configuring operation-specific behavior.
103+
#
104+
# See TypeScript reference: aws-durable-execution-sdk-js/src/handlers/map-handler/map-handler.ts (~line 79)
105+
96106
executor: MapExecutor[T, R] = MapExecutor.from_items(
97-
items=items, func=func, config=config or MapConfig()
107+
items=items,
108+
func=func,
109+
config=config or MapConfig(summary_generator=MapSummaryGenerator()),
98110
)
99111
return executor.execute(execution_state, run_in_child_context)
112+
113+
114+
class MapSummaryGenerator:
115+
def __call__(self, result: BatchResult) -> str:
116+
fields = {
117+
"totalCount": result.total_count,
118+
"successCount": result.success_count,
119+
"failureCount": result.failure_count,
120+
"completionReason": result.completion_reason.value,
121+
"status": result.status.value,
122+
"type": "MapResult",
123+
}
124+
return json.dumps(fields)

0 commit comments

Comments
 (0)