Skip to content

Commit 46c123e

Browse files
committed
test: add concurrency integ tests
1 parent 402a348 commit 46c123e

26 files changed

+1224
-43
lines changed

examples/examples-catalog.json

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,116 @@
154154
"ExecutionTimeout": 300
155155
},
156156
"path": "./src/wait_for_condition.py"
157+
},
158+
{
159+
"name": "Map with Max Concurrency",
160+
"description": "Map operation with maxConcurrency limit",
161+
"handler": "map_with_max_concurrency.handler",
162+
"integration": true,
163+
"durableConfig": {
164+
"RetentionPeriodInDays": 7,
165+
"ExecutionTimeout": 300
166+
},
167+
"path": "./src/map_with_max_concurrency.py"
168+
},
169+
{
170+
"name": "Map with Min Successful",
171+
"description": "Map operation with min_successful completion config",
172+
"handler": "map_with_min_successful.handler",
173+
"integration": true,
174+
"durableConfig": {
175+
"RetentionPeriodInDays": 7,
176+
"ExecutionTimeout": 300
177+
},
178+
"path": "./src/map_with_min_successful.py"
179+
},
180+
{
181+
"name": "Map with Failure Tolerance",
182+
"description": "Map operation with failure tolerance",
183+
"handler": "map_with_failure_tolerance.handler",
184+
"integration": true,
185+
"durableConfig": {
186+
"RetentionPeriodInDays": 7,
187+
"ExecutionTimeout": 300
188+
},
189+
"path": "./src/map_with_failure_tolerance.py"
190+
},
191+
{
192+
"name": "Parallel with Max Concurrency",
193+
"description": "Parallel operation with maxConcurrency limit",
194+
"handler": "parallel_with_max_concurrency.handler",
195+
"integration": true,
196+
"durableConfig": {
197+
"RetentionPeriodInDays": 7,
198+
"ExecutionTimeout": 300
199+
},
200+
"path": "./src/parallel_with_max_concurrency.py"
201+
},
202+
{
203+
"name": "Parallel with Wait",
204+
"description": "Parallel operation with wait operations in branches",
205+
"handler": "parallel_with_wait.handler",
206+
"integration": true,
207+
"durableConfig": {
208+
"RetentionPeriodInDays": 7,
209+
"ExecutionTimeout": 300
210+
},
211+
"path": "./src/parallel_with_wait.py"
212+
},
213+
{
214+
"name": "Parallel with Failure Tolerance",
215+
"description": "Parallel operation with failure tolerance",
216+
"handler": "parallel_with_failure_tolerance.handler",
217+
"integration": true,
218+
"durableConfig": {
219+
"RetentionPeriodInDays": 7,
220+
"ExecutionTimeout": 300
221+
},
222+
"path": "./src/parallel_with_failure_tolerance.py"
223+
},
224+
{
225+
"name": "Map with Custom SerDes",
226+
"description": "Map operation with custom item-level serialization",
227+
"handler": "map_with_custom_serdes.handler",
228+
"integration": true,
229+
"durableConfig": {
230+
"RetentionPeriodInDays": 7,
231+
"ExecutionTimeout": 300
232+
},
233+
"path": "./src/map_with_custom_serdes.py"
234+
},
235+
{
236+
"name": "Map with Batch SerDes",
237+
"description": "Map operation with custom batch-level serialization",
238+
"handler": "map_with_batch_serdes.handler",
239+
"integration": true,
240+
"durableConfig": {
241+
"RetentionPeriodInDays": 7,
242+
"ExecutionTimeout": 300
243+
},
244+
"path": "./src/map_with_batch_serdes.py"
245+
},
246+
{
247+
"name": "Parallel with Custom SerDes",
248+
"description": "Parallel operation with custom item-level serialization",
249+
"handler": "parallel_with_custom_serdes.handler",
250+
"integration": true,
251+
"durableConfig": {
252+
"RetentionPeriodInDays": 7,
253+
"ExecutionTimeout": 300
254+
},
255+
"path": "./src/parallel_with_custom_serdes.py"
256+
},
257+
{
258+
"name": "Parallel with Batch SerDes",
259+
"description": "Parallel operation with custom batch-level serialization",
260+
"handler": "parallel_with_batch_serdes.handler",
261+
"integration": true,
262+
"durableConfig": {
263+
"RetentionPeriodInDays": 7,
264+
"ExecutionTimeout": 300
265+
},
266+
"path": "./src/parallel_with_batch_serdes.py"
157267
}
158268
]
159269
}

examples/src/map_operations.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,23 @@
1-
"""Example demonstrating map-like operations for processing collections durably."""
1+
"""Example demonstrating map operations for processing collections durably."""
22

33
from typing import Any
44

5+
from aws_durable_execution_sdk_python.config import MapConfig
56
from aws_durable_execution_sdk_python.context import DurableContext
67
from aws_durable_execution_sdk_python.execution import durable_execution
78

89

9-
def square(x: int) -> int:
10-
return x * x
11-
12-
1310
@durable_execution
1411
def handler(_event: Any, context: DurableContext) -> list[int]:
15-
"""Process a list of items using map-like operations."""
12+
"""Process a list of items using context.map()."""
1613
items = [1, 2, 3, 4, 5]
1714

18-
# Process each item as a separate durable step
19-
results = []
20-
for i, item in enumerate(items):
21-
result = context.step(lambda _, x=item: square(x), name=f"square_{i}")
22-
results.append(result)
23-
24-
return results
15+
# Use context.map() to process items concurrently and extract results immediately
16+
return context.map(
17+
inputs=items,
18+
func=lambda ctx, item, index, _: ctx.step(
19+
lambda _: item * 2, name=f"map_item_{index}"
20+
),
21+
name="map_operation",
22+
config=MapConfig(max_concurrency=2),
23+
).get_results()
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
"""Example demonstrating map with batch-level serdes."""
2+
3+
import json
4+
from typing import Any
5+
6+
from aws_durable_execution_sdk_python.config import MapConfig
7+
from aws_durable_execution_sdk_python.context import DurableContext
8+
from aws_durable_execution_sdk_python.execution import durable_execution
9+
from aws_durable_execution_sdk_python.lambda_service import ErrorObject
10+
from aws_durable_execution_sdk_python.serdes import JsonSerDes, SerDes, SerDesContext
11+
from aws_durable_execution_sdk_python.types import BatchResult
12+
13+
14+
class CustomBatchSerDes(SerDes[BatchResult]):
15+
"""Custom serializer for the entire BatchResult."""
16+
17+
def serialize(self, value: BatchResult, _: SerDesContext) -> str:
18+
# Serialize BatchResult with custom metadata
19+
20+
wrapped = {
21+
"batch_metadata": {
22+
"serializer": "CustomBatchSerDes",
23+
"version": "2.0",
24+
"total_items": len(value.get_results()),
25+
},
26+
"success_count": value.success_count,
27+
"failure_count": value.failure_count,
28+
"results": value.get_results(),
29+
"errors": [e.to_dict() if e else None for e in value.get_errors()],
30+
}
31+
return json.dumps(wrapped)
32+
33+
def deserialize(self, payload: str, _: SerDesContext) -> BatchResult:
34+
wrapped = json.loads(payload)
35+
# Reconstruct BatchResult from wrapped data
36+
return BatchResult(
37+
success_count=wrapped["success_count"],
38+
failure_count=wrapped["failure_count"],
39+
results=wrapped["results"],
40+
errors=[ErrorObject.from_dict(e) if e else None for e in wrapped["errors"]],
41+
)
42+
43+
44+
@durable_execution
45+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
46+
"""Process items with custom batch-level serialization."""
47+
items = [10, 20, 30, 40]
48+
49+
# Use custom serdes for the entire BatchResult, default JSON for individual items
50+
config = MapConfig(serdes=CustomBatchSerDes(), item_serdes=JsonSerDes())
51+
52+
results = context.map(
53+
inputs=items,
54+
func=lambda ctx, item, index, _: ctx.step(
55+
lambda _: item * 2, name=f"double_{index}"
56+
),
57+
name="map_with_batch_serdes",
58+
config=config,
59+
)
60+
61+
return {
62+
"success_count": results.success_count,
63+
"results": results.get_results(),
64+
"sum": sum(results.get_results()),
65+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
"""Example demonstrating map with custom serdes."""
2+
3+
import json
4+
from typing import Any
5+
6+
from aws_durable_execution_sdk_python.config import MapConfig
7+
from aws_durable_execution_sdk_python.context import DurableContext
8+
from aws_durable_execution_sdk_python.execution import durable_execution
9+
from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext
10+
11+
12+
class CustomItemSerDes(SerDes[dict[str, Any]]):
13+
"""Custom serializer for individual items that adds metadata."""
14+
15+
def serialize(self, value: dict[str, Any], _: SerDesContext) -> str:
16+
# Add custom metadata during serialization
17+
wrapped = {"data": value, "serialized_by": "CustomItemSerDes", "version": "1.0"}
18+
19+
return json.dumps(wrapped)
20+
21+
def deserialize(self, payload: str, _: SerDesContext) -> dict[str, Any]:
22+
wrapped = json.loads(payload)
23+
# Extract the original data
24+
return wrapped["data"]
25+
26+
27+
@durable_execution
28+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
29+
"""Process items with custom item serialization.
30+
31+
This example demonstrates using item_serdes to customize serialization
32+
of individual item results, while using default serialization for the
33+
overall BatchResult.
34+
"""
35+
items = [
36+
{"id": 1, "name": "item1"},
37+
{"id": 2, "name": "item2"},
38+
{"id": 3, "name": "item3"},
39+
]
40+
41+
# Use custom serdes for individual items only
42+
# The BatchResult will use default JSON serialization
43+
config = MapConfig(item_serdes=CustomItemSerDes())
44+
45+
results = context.map(
46+
inputs=items,
47+
func=lambda ctx, item, index, _: ctx.step(
48+
lambda _: {
49+
"processed": item["name"],
50+
"index": index,
51+
"doubled_id": item["id"] * 2,
52+
},
53+
name=f"process_{index}",
54+
),
55+
name="map_with_custom_serdes",
56+
config=config,
57+
)
58+
59+
return {
60+
"success_count": results.success_count,
61+
"results": results.get_results(),
62+
"processed_names": [r["processed"] for r in results.get_results()],
63+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""Example demonstrating map with failure tolerance."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import (
6+
CompletionConfig,
7+
MapConfig,
8+
StepConfig,
9+
)
10+
from aws_durable_execution_sdk_python.context import DurableContext
11+
from aws_durable_execution_sdk_python.execution import durable_execution
12+
from aws_durable_execution_sdk_python.retries import RetryStrategyConfig
13+
14+
15+
@durable_execution
16+
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
17+
"""Process items with failure tolerance."""
18+
items = list(range(1, 11)) # [1, 2, 3, ..., 10]
19+
20+
# Tolerate up to 3 failures
21+
config = MapConfig(
22+
max_concurrency=5,
23+
completion_config=CompletionConfig(tolerated_failure_count=3),
24+
)
25+
26+
# Disable retries so failures happen immediately
27+
step_config = StepConfig(retry_strategy=RetryStrategyConfig(max_attempts=1))
28+
29+
results = context.map(
30+
inputs=items,
31+
func=lambda ctx, item, index, _: ctx.step(
32+
lambda _: _process_with_failures(item),
33+
name=f"item_{index}",
34+
config=step_config,
35+
),
36+
name="map_with_tolerance",
37+
config=config,
38+
)
39+
40+
return {
41+
"success_count": results.success_count,
42+
"failure_count": results.failure_count,
43+
"succeeded": [item.result for item in results.succeeded()],
44+
"failed_count": len(results.failed()),
45+
"completion_reason": results.completion_reason.value,
46+
}
47+
48+
49+
def _process_with_failures(item: int) -> int:
50+
"""Process item - fails for items 3, 6, 9."""
51+
if item % 3 == 0:
52+
raise ValueError(f"Item {item} failed")
53+
return item * 2
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""Example demonstrating map with maxConcurrency limit."""
2+
3+
from typing import Any
4+
5+
from aws_durable_execution_sdk_python.config import MapConfig
6+
from aws_durable_execution_sdk_python.context import DurableContext
7+
from aws_durable_execution_sdk_python.execution import durable_execution
8+
9+
10+
@durable_execution
11+
def handler(_event: Any, context: DurableContext) -> list[int]:
12+
"""Process items with concurrency limit of 3."""
13+
items = list(range(1, 11)) # [1, 2, 3, ..., 10]
14+
15+
# Extract results immediately to avoid BatchResult serialization
16+
return context.map(
17+
inputs=items,
18+
func=lambda ctx, item, index, _: ctx.step(
19+
lambda _: item * 3, name=f"process_{index}"
20+
),
21+
name="map_with_concurrency",
22+
config=MapConfig(max_concurrency=3),
23+
).get_results()

0 commit comments

Comments
 (0)