From 2954e55f979dfcde679b0256baf0e6e86fc59056 Mon Sep 17 00:00:00 2001 From: vipin gupta Date: Mon, 3 Nov 2025 12:30:44 +0000 Subject: [PATCH] test: add concurrency integ tests --- examples/examples-catalog.json | 110 ++++++++++++++++++ examples/src/map/map_operations.py | 25 ++-- examples/src/map/map_with_batch_serdes.py | 96 +++++++++++++++ examples/src/map/map_with_custom_serdes.py | 63 ++++++++++ .../src/map/map_with_failure_tolerance.py | 53 +++++++++ examples/src/map/map_with_max_concurrency.py | 23 ++++ examples/src/map/map_with_min_successful.py | 43 +++++++ examples/src/parallel/parallel.py | 23 ++-- .../parallel/parallel_with_batch_serdes.py | 97 +++++++++++++++ .../parallel/parallel_with_custom_serdes.py | 60 ++++++++++ .../parallel_with_failure_tolerance.py | 59 ++++++++++ .../parallel/parallel_with_max_concurrency.py | 25 ++++ examples/src/parallel/parallel_with_wait.py | 23 ++++ examples/template.yaml | 90 ++++++++++++++ examples/test/map/test_map_operations.py | 32 +++-- .../test/map/test_map_with_batch_serdes.py | 43 +++++++ .../test/map/test_map_with_custom_serdes.py | 48 ++++++++ .../map/test_map_with_failure_tolerance.py | 52 +++++++++ .../test/map/test_map_with_max_concurrency.py | 37 ++++++ .../test/map/test_map_with_min_successful.py | 70 +++++++++++ examples/test/parallel/test_parallel.py | 27 +++-- .../test_parallel_with_batch_serdes.py | 43 +++++++ .../test_parallel_with_custom_serdes.py | 46 ++++++++ .../test_parallel_with_failure_tolerance.py | 49 ++++++++ .../test_parallel_with_max_concurrency.py | 36 ++++++ .../test/parallel/test_parallel_with_wait.py | 47 ++++++++ 26 files changed, 1277 insertions(+), 43 deletions(-) create mode 100644 examples/src/map/map_with_batch_serdes.py create mode 100644 examples/src/map/map_with_custom_serdes.py create mode 100644 examples/src/map/map_with_failure_tolerance.py create mode 100644 examples/src/map/map_with_max_concurrency.py create mode 100644 examples/src/map/map_with_min_successful.py create mode 100644 examples/src/parallel/parallel_with_batch_serdes.py create mode 100644 examples/src/parallel/parallel_with_custom_serdes.py create mode 100644 examples/src/parallel/parallel_with_failure_tolerance.py create mode 100644 examples/src/parallel/parallel_with_max_concurrency.py create mode 100644 examples/src/parallel/parallel_with_wait.py create mode 100644 examples/test/map/test_map_with_batch_serdes.py create mode 100644 examples/test/map/test_map_with_custom_serdes.py create mode 100644 examples/test/map/test_map_with_failure_tolerance.py create mode 100644 examples/test/map/test_map_with_max_concurrency.py create mode 100644 examples/test/map/test_map_with_min_successful.py create mode 100644 examples/test/parallel/test_parallel_with_batch_serdes.py create mode 100644 examples/test/parallel/test_parallel_with_custom_serdes.py create mode 100644 examples/test/parallel/test_parallel_with_failure_tolerance.py create mode 100644 examples/test/parallel/test_parallel_with_max_concurrency.py create mode 100644 examples/test/parallel/test_parallel_with_wait.py diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index 836bd7b..5ee192d 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -187,6 +187,116 @@ "ExecutionTimeout": 300 }, "path": "./src/simple_execution/simple_execution.py" + }, + { + "name": "Map with Max Concurrency", + "description": "Map operation with maxConcurrency limit", + "handler": "map_with_max_concurrency.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/map/map_with_max_concurrency.py" + }, + { + "name": "Map with Min Successful", + "description": "Map operation with min_successful completion config", + "handler": "map_with_min_successful.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/map/map_with_min_successful.py" + }, + { + "name": "Map with Failure Tolerance", + "description": "Map operation with failure tolerance", + "handler": "map_with_failure_tolerance.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/map/map_with_failure_tolerance.py" + }, + { + "name": "Parallel with Max Concurrency", + "description": "Parallel operation with maxConcurrency limit", + "handler": "parallel_with_max_concurrency.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/parallel/parallel_with_max_concurrency.py" + }, + { + "name": "Parallel with Wait", + "description": "Parallel operation with wait operations in branches", + "handler": "parallel_with_wait.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/parallel/parallel_with_wait.py" + }, + { + "name": "Parallel with Failure Tolerance", + "description": "Parallel operation with failure tolerance", + "handler": "parallel_with_failure_tolerance.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/parallel/parallel_with_failure_tolerance.py" + }, + { + "name": "Map with Custom SerDes", + "description": "Map operation with custom item-level serialization", + "handler": "map_with_custom_serdes.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/map/map_with_custom_serdes.py" + }, + { + "name": "Map with Batch SerDes", + "description": "Map operation with custom batch-level serialization", + "handler": "map_with_batch_serdes.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/map/map_with_batch_serdes.py" + }, + { + "name": "Parallel with Custom SerDes", + "description": "Parallel operation with custom item-level serialization", + "handler": "parallel_with_custom_serdes.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/parallel/parallel_with_custom_serdes.py" + }, + { + "name": "Parallel with Batch SerDes", + "description": "Parallel operation with custom batch-level serialization", + "handler": "parallel_with_batch_serdes.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/parallel/parallel_with_batch_serdes.py" } ] } diff --git a/examples/src/map/map_operations.py b/examples/src/map/map_operations.py index 7d4563b..a1ed45c 100644 --- a/examples/src/map/map_operations.py +++ b/examples/src/map/map_operations.py @@ -1,24 +1,23 @@ -"""Example demonstrating map-like operations for processing collections durably.""" +"""Example demonstrating map operations for processing collections durably.""" from typing import Any +from aws_durable_execution_sdk_python.config import MapConfig from aws_durable_execution_sdk_python.context import DurableContext from aws_durable_execution_sdk_python.execution import durable_execution -def square(x: int) -> int: - return x * x - - @durable_execution def handler(_event: Any, context: DurableContext) -> list[int]: - """Process a list of items using map-like operations.""" + """Process a list of items using context.map().""" items = [1, 2, 3, 4, 5] - # Process each item as a separate durable step - results = [] - for i, item in enumerate(items): - result = context.step(lambda _, x=item: square(x), name=f"square_{i}") - results.append(result) - - return results + # Use context.map() to process items concurrently and extract results immediately + return context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: item * 2, name=f"map_item_{index}" + ), + name="map_operation", + config=MapConfig(max_concurrency=2), + ).get_results() diff --git a/examples/src/map/map_with_batch_serdes.py b/examples/src/map/map_with_batch_serdes.py new file mode 100644 index 0000000..798adfa --- /dev/null +++ b/examples/src/map/map_with_batch_serdes.py @@ -0,0 +1,96 @@ +"""Example demonstrating map with batch-level serdes.""" + +import json +from typing import Any + +from aws_durable_execution_sdk_python.concurrency.models import ( + BatchItem, + BatchItemStatus, + BatchResult, + CompletionReason, +) +from aws_durable_execution_sdk_python.config import MapConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.lambda_service import ErrorObject +from aws_durable_execution_sdk_python.serdes import JsonSerDes, SerDes, SerDesContext + + +class CustomBatchSerDes(SerDes[BatchResult]): + """Custom serializer for the entire BatchResult.""" + + def serialize(self, value: BatchResult, _: SerDesContext) -> str: + # Serialize BatchResult with custom metadata + + wrapped = { + "batch_metadata": { + "serializer": "CustomBatchSerDes", + "version": "2.0", + "total_items": len(value.get_results()), + }, + "success_count": value.success_count, + "failure_count": value.failure_count, + "results": value.get_results(), + "errors": [e.to_dict() if e else None for e in value.get_errors()], + } + return json.dumps(wrapped) + + def deserialize(self, payload: str, _: SerDesContext) -> BatchResult: + wrapped = json.loads(payload) + batch_items = [] + results = wrapped["results"] + errors = wrapped["errors"] + + for i, result in enumerate(results): + error = errors[i] if i < len(errors) else None + if error: + batch_items.append( + BatchItem( + index=i, + status=BatchItemStatus.FAILED, + result=None, + error=ErrorObject.from_dict(error) if error else None, + ) + ) + else: + batch_items.append( + BatchItem( + index=i, + status=BatchItemStatus.SUCCEEDED, + result=result, + error=None, + ) + ) + + # Infer completion reason (assume ALL_COMPLETED if all succeeded) + completion_reason = ( + CompletionReason.ALL_COMPLETED + if wrapped["failure_count"] == 0 + else CompletionReason.FAILURE_TOLERANCE_EXCEEDED + ) + + return BatchResult(all=batch_items, completion_reason=completion_reason) + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Process items with custom batch-level serialization.""" + items = [10, 20, 30, 40] + + # Use custom serdes for the entire BatchResult, default JSON for individual items + config = MapConfig(serdes=CustomBatchSerDes(), item_serdes=JsonSerDes()) + + results = context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: item * 2, name=f"double_{index}" + ), + name="map_with_batch_serdes", + config=config, + ) + + return { + "success_count": results.success_count, + "results": results.get_results(), + "sum": sum(results.get_results()), + } diff --git a/examples/src/map/map_with_custom_serdes.py b/examples/src/map/map_with_custom_serdes.py new file mode 100644 index 0000000..5feebb3 --- /dev/null +++ b/examples/src/map/map_with_custom_serdes.py @@ -0,0 +1,63 @@ +"""Example demonstrating map with custom serdes.""" + +import json +from typing import Any + +from aws_durable_execution_sdk_python.config import MapConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext + + +class CustomItemSerDes(SerDes[dict[str, Any]]): + """Custom serializer for individual items that adds metadata.""" + + def serialize(self, value: dict[str, Any], _: SerDesContext) -> str: + # Add custom metadata during serialization + wrapped = {"data": value, "serialized_by": "CustomItemSerDes", "version": "1.0"} + + return json.dumps(wrapped) + + def deserialize(self, payload: str, _: SerDesContext) -> dict[str, Any]: + wrapped = json.loads(payload) + # Extract the original data + return wrapped["data"] + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Process items with custom item serialization. + + This example demonstrates using item_serdes to customize serialization + of individual item results, while using default serialization for the + overall BatchResult. + """ + items = [ + {"id": 1, "name": "item1"}, + {"id": 2, "name": "item2"}, + {"id": 3, "name": "item3"}, + ] + + # Use custom serdes for individual items only + # The BatchResult will use default JSON serialization + config = MapConfig(item_serdes=CustomItemSerDes()) + + results = context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: { + "processed": item["name"], + "index": index, + "doubled_id": item["id"] * 2, + }, + name=f"process_{index}", + ), + name="map_with_custom_serdes", + config=config, + ) + + return { + "success_count": results.success_count, + "results": results.get_results(), + "processed_names": [r["processed"] for r in results.get_results()], + } diff --git a/examples/src/map/map_with_failure_tolerance.py b/examples/src/map/map_with_failure_tolerance.py new file mode 100644 index 0000000..dc01d15 --- /dev/null +++ b/examples/src/map/map_with_failure_tolerance.py @@ -0,0 +1,53 @@ +"""Example demonstrating map with failure tolerance.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import ( + CompletionConfig, + MapConfig, + StepConfig, +) +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.retries import RetryStrategyConfig + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Process items with failure tolerance.""" + items = list(range(1, 11)) # [1, 2, 3, ..., 10] + + # Tolerate up to 3 failures + config = MapConfig( + max_concurrency=5, + completion_config=CompletionConfig(tolerated_failure_count=3), + ) + + # Disable retries so failures happen immediately + step_config = StepConfig(retry_strategy=RetryStrategyConfig(max_attempts=1)) + + results = context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: _process_with_failures(item), + name=f"item_{index}", + config=step_config, + ), + name="map_with_tolerance", + config=config, + ) + + return { + "success_count": results.success_count, + "failure_count": results.failure_count, + "succeeded": [item.result for item in results.succeeded()], + "failed_count": len(results.failed()), + "completion_reason": results.completion_reason.value, + } + + +def _process_with_failures(item: int) -> int: + """Process item - fails for items 3, 6, 9.""" + if item % 3 == 0: + raise ValueError(f"Item {item} failed") + return item * 2 diff --git a/examples/src/map/map_with_max_concurrency.py b/examples/src/map/map_with_max_concurrency.py new file mode 100644 index 0000000..6289b3f --- /dev/null +++ b/examples/src/map/map_with_max_concurrency.py @@ -0,0 +1,23 @@ +"""Example demonstrating map with maxConcurrency limit.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import MapConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> list[int]: + """Process items with concurrency limit of 3.""" + items = list(range(1, 11)) # [1, 2, 3, ..., 10] + + # Extract results immediately to avoid BatchResult serialization + return context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: item * 3, name=f"process_{index}" + ), + name="map_with_concurrency", + config=MapConfig(max_concurrency=3), + ).get_results() diff --git a/examples/src/map/map_with_min_successful.py b/examples/src/map/map_with_min_successful.py new file mode 100644 index 0000000..cc0fe5c --- /dev/null +++ b/examples/src/map/map_with_min_successful.py @@ -0,0 +1,43 @@ +"""Example demonstrating map with min_successful completion config.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import CompletionConfig, MapConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Process items with min_successful threshold.""" + items = list(range(1, 11)) # [1, 2, 3, ..., 10] + + # Configure to complete when 6 items succeed + config = MapConfig( + max_concurrency=5, + completion_config=CompletionConfig(min_successful=6), + ) + + results = context.map( + inputs=items, + func=lambda ctx, item, index, _: ctx.step( + lambda _: _process_item(item), name=f"item_{index}" + ), + name="map_min_successful", + config=config, + ) + + return { + "success_count": results.success_count, + "failure_count": results.failure_count, + "total_count": results.total_count, + "results": results.get_results(), + "completion_reason": results.completion_reason.value, + } + + +def _process_item(item: int) -> int: + """Process item - fails for items 7, 8, 9.""" + if item in [7, 8, 9]: + raise ValueError(f"Item {item} failed") + return item * 2 diff --git a/examples/src/parallel/parallel.py b/examples/src/parallel/parallel.py index 58015d8..205f52d 100644 --- a/examples/src/parallel/parallel.py +++ b/examples/src/parallel/parallel.py @@ -1,17 +1,26 @@ -"""Example demonstrating parallel-like operations for concurrent execution.""" +"""Example demonstrating parallel operations for concurrent execution.""" from typing import Any +from aws_durable_execution_sdk_python.config import ParallelConfig from aws_durable_execution_sdk_python.context import DurableContext from aws_durable_execution_sdk_python.execution import durable_execution @durable_execution def handler(_event: Any, context: DurableContext) -> list[str]: - # Execute multiple operations - task1 = context.step(lambda _: "Task 1 complete", name="task1") - task2 = context.step(lambda _: "Task 2 complete", name="task2") - task3 = context.step(lambda _: "Task 3 complete", name="task3") + """Execute multiple operations in parallel using context.parallel().""" - # All tasks execute concurrently and results are collected - return [task1, task2, task3] + # Use context.parallel() to execute functions concurrently and extract results immediately + return context.parallel( + functions=[ + lambda ctx: ctx.step(lambda _: "task 1 completed", name="task1"), + lambda ctx: ctx.step(lambda _: "task 2 completed", name="task2"), + lambda ctx: ( + ctx.wait(1, name="wait_in_task3"), + "task 3 completed after wait", + )[1], + ], + name="parallel_operation", + config=ParallelConfig(max_concurrency=2), + ).get_results() diff --git a/examples/src/parallel/parallel_with_batch_serdes.py b/examples/src/parallel/parallel_with_batch_serdes.py new file mode 100644 index 0000000..84014e0 --- /dev/null +++ b/examples/src/parallel/parallel_with_batch_serdes.py @@ -0,0 +1,97 @@ +"""Example demonstrating parallel with batch-level serdes.""" + +import json +from typing import Any + +from aws_durable_execution_sdk_python.concurrency.models import ( + BatchItem, + BatchItemStatus, + BatchResult, + CompletionReason, +) +from aws_durable_execution_sdk_python.config import ParallelConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.lambda_service import ErrorObject +from aws_durable_execution_sdk_python.serdes import JsonSerDes, SerDes, SerDesContext + + +class CustomBatchSerDes(SerDes[BatchResult]): + """Custom serializer for the entire BatchResult.""" + + def serialize(self, value: BatchResult, _: SerDesContext) -> str: + wrapped = { + "batch_metadata": { + "serializer": "CustomBatchSerDes", + "version": "2.0", + "total_branches": len(value.get_results()), + }, + "success_count": value.success_count, + "failure_count": value.failure_count, + "results": value.get_results(), + "errors": [e.to_dict() if e else None for e in value.get_errors()], + } + return json.dumps(wrapped) + + def deserialize(self, payload: str, _: SerDesContext) -> BatchResult: + wrapped = json.loads(payload) + # Reconstruct BatchResult from wrapped data + # Need to rebuild BatchItem list from results and errors + + batch_items = [] + results = wrapped["results"] + errors = wrapped["errors"] + + for i, result in enumerate(results): + error = errors[i] if i < len(errors) else None + if error: + batch_items.append( + BatchItem( + index=i, + status=BatchItemStatus.FAILED, + result=None, + error=ErrorObject.from_dict(error) if error else None, + ) + ) + else: + batch_items.append( + BatchItem( + index=i, + status=BatchItemStatus.SUCCEEDED, + result=result, + error=None, + ) + ) + + # Infer completion reason (assume ALL_COMPLETED if all succeeded) + completion_reason = ( + CompletionReason.ALL_COMPLETED + if wrapped["failure_count"] == 0 + else CompletionReason.FAILURE_TOLERANCE_EXCEEDED + ) + + return BatchResult(all=batch_items, completion_reason=completion_reason) + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Execute parallel tasks with custom batch-level serialization.""" + + # Use custom serdes for the entire BatchResult, default JSON for individual functions + config = ParallelConfig(serdes=CustomBatchSerDes(), item_serdes=JsonSerDes()) + + results = context.parallel( + functions=[ + lambda ctx: ctx.step(lambda _: 100, name="branch1"), + lambda ctx: ctx.step(lambda _: 200, name="branch2"), + lambda ctx: ctx.step(lambda _: 300, name="branch3"), + ], + name="parallel_with_batch_serdes", + config=config, + ) + + return { + "success_count": results.success_count, + "results": results.get_results(), + "total": sum(results.get_results()), + } diff --git a/examples/src/parallel/parallel_with_custom_serdes.py b/examples/src/parallel/parallel_with_custom_serdes.py new file mode 100644 index 0000000..ec694d8 --- /dev/null +++ b/examples/src/parallel/parallel_with_custom_serdes.py @@ -0,0 +1,60 @@ +"""Example demonstrating parallel with custom serdes.""" + +import json +from typing import Any + +from aws_durable_execution_sdk_python.config import ParallelConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext + + +class CustomItemSerDes(SerDes[dict[str, Any]]): + """Custom serializer for individual items that adds metadata.""" + + def serialize(self, value: dict[str, Any], _: SerDesContext) -> str: + # Add custom metadata during serialization + wrapped = {"data": value, "serialized_by": "CustomItemSerDes"} + + return json.dumps(wrapped) + + def deserialize(self, payload: str, _: SerDesContext) -> dict[str, Any]: + wrapped = json.loads(payload) + # Extract the original data + return wrapped["data"] + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Execute parallel tasks with custom item serialization. + + This example demonstrates using item_serdes to customize serialization + of individual function results, while using default serialization for the + overall BatchResult. + """ + + # Use custom serdes for individual function results only + # The BatchResult will use default JSON serialization + config = ParallelConfig(item_serdes=CustomItemSerDes()) + + results = context.parallel( + functions=[ + lambda ctx: ctx.step( + lambda _: {"task": "task1", "value": 100}, name="task1" + ), + lambda ctx: ctx.step( + lambda _: {"task": "task2", "value": 200}, name="task2" + ), + lambda ctx: ctx.step( + lambda _: {"task": "task3", "value": 300}, name="task3" + ), + ], + name="parallel_with_custom_serdes", + config=config, + ) + + return { + "success_count": results.success_count, + "results": results.get_results(), + "total_value": sum(r["value"] for r in results.get_results()), + } diff --git a/examples/src/parallel/parallel_with_failure_tolerance.py b/examples/src/parallel/parallel_with_failure_tolerance.py new file mode 100644 index 0000000..12327b9 --- /dev/null +++ b/examples/src/parallel/parallel_with_failure_tolerance.py @@ -0,0 +1,59 @@ +"""Example demonstrating parallel with failure tolerance.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import ( + CompletionConfig, + ParallelConfig, + StepConfig, +) +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.retries import RetryStrategyConfig + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Execute tasks with failure tolerance.""" + + # Tolerate up to 2 failures + config = ParallelConfig( + completion_config=CompletionConfig(tolerated_failure_count=2) + ) + + # Disable retries so failures happen immediately + step_config = StepConfig(retry_strategy=RetryStrategyConfig(max_attempts=1)) + + results = context.parallel( + functions=[ + lambda ctx: ctx.step( + lambda _: "success 1", name="task1", config=step_config + ), + lambda ctx: ctx.step( + lambda _: _failing_task(2), name="task2", config=step_config + ), + lambda ctx: ctx.step( + lambda _: "success 3", name="task3", config=step_config + ), + lambda ctx: ctx.step( + lambda _: _failing_task(4), name="task4", config=step_config + ), + lambda ctx: ctx.step( + lambda _: "success 5", name="task5", config=step_config + ), + ], + name="parallel_with_tolerance", + config=config, + ) + + return { + "success_count": results.success_count, + "failure_count": results.failure_count, + "succeeded": results.get_results(), + "completion_reason": results.completion_reason.value, + } + + +def _failing_task(task_num: int) -> str: + """Task that always fails.""" + raise ValueError(f"Task {task_num} failed") diff --git a/examples/src/parallel/parallel_with_max_concurrency.py b/examples/src/parallel/parallel_with_max_concurrency.py new file mode 100644 index 0000000..a5b6e52 --- /dev/null +++ b/examples/src/parallel/parallel_with_max_concurrency.py @@ -0,0 +1,25 @@ +"""Example demonstrating parallel with maxConcurrency limit.""" + +from typing import Any + +from aws_durable_execution_sdk_python.config import ParallelConfig +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> list[str]: + """Execute 5 tasks with concurrency limit of 2.""" + + # Extract results immediately to avoid BatchResult serialization + return context.parallel( + functions=[ + lambda ctx: ctx.step(lambda _: "task 1", name="task1"), + lambda ctx: ctx.step(lambda _: "task 2", name="task2"), + lambda ctx: ctx.step(lambda _: "task 3", name="task3"), + lambda ctx: ctx.step(lambda _: "task 4", name="task4"), + lambda ctx: ctx.step(lambda _: "task 5", name="task5"), + ], + name="parallel_with_concurrency", + config=ParallelConfig(max_concurrency=2), + ).get_results() diff --git a/examples/src/parallel/parallel_with_wait.py b/examples/src/parallel/parallel_with_wait.py new file mode 100644 index 0000000..746a0b0 --- /dev/null +++ b/examples/src/parallel/parallel_with_wait.py @@ -0,0 +1,23 @@ +"""Example demonstrating parallel with wait operations.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> str: + """Execute parallel waits.""" + + # Call get_results() to extract data and avoid BatchResult serialization + context.parallel( + functions=[ + lambda ctx: ctx.wait(1, name="wait_1_second"), + lambda ctx: ctx.wait(2, name="wait_2_seconds"), + lambda ctx: ctx.wait(5, name="wait_5_seconds"), + ], + name="parallel_waits", + ).get_results() + + return "Completed waits" diff --git a/examples/template.yaml b/examples/template.yaml index 82d87b6..5559af9 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -146,3 +146,93 @@ Resources: DurableConfig: RetentionPeriodInDays: 7 ExecutionTimeout: 300 + MapWithMaxConcurrency: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: map_with_max_concurrency.handler + Description: Map operation with maxConcurrency limit + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + MapWithMinSuccessful: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: map_with_min_successful.handler + Description: Map operation with min_successful completion config + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + MapWithFailureTolerance: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: map_with_failure_tolerance.handler + Description: Map operation with failure tolerance + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + ParallelWithMaxConcurrency: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: parallel_with_max_concurrency.handler + Description: Parallel operation with maxConcurrency limit + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + ParallelWithWait: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: parallel_with_wait.handler + Description: Parallel operation with wait operations in branches + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + ParallelWithFailureTolerance: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: parallel_with_failure_tolerance.handler + Description: Parallel operation with failure tolerance + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + MapWithCustomSerDes: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: map_with_custom_serdes.handler + Description: Map operation with custom item-level serialization + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + MapWithBatchSerDes: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: map_with_batch_serdes.handler + Description: Map operation with custom batch-level serialization + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + ParallelWithCustomSerDes: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: parallel_with_custom_serdes.handler + Description: Parallel operation with custom item-level serialization + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + ParallelWithBatchSerDes: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: parallel_with_batch_serdes.handler + Description: Parallel operation with custom batch-level serialization + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 diff --git a/examples/test/map/test_map_operations.py b/examples/test/map/test_map_operations.py index 28b1e94..da8dc93 100644 --- a/examples/test/map/test_map_operations.py +++ b/examples/test/map/test_map_operations.py @@ -2,7 +2,10 @@ import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python.lambda_service import OperationType +from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, +) from src.map import map_operations from test.conftest import deserialize_operation_payload @@ -14,19 +17,26 @@ lambda_function_name="map operations", ) def test_map_operations(durable_runner): - """Test map_operations example.""" + """Test map_operations example using context.map().""" with durable_runner: result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED - assert deserialize_operation_payload(result.result) == [1, 4, 9, 16, 25] + assert deserialize_operation_payload(result.result) == [2, 4, 6, 8, 10] + + # Get the map operation (CONTEXT type with MAP subtype) + map_op = result.get_context("map_operation") + assert map_op is not None + assert map_op.status is OperationStatus.SUCCEEDED + + # Verify all five child operations exist + assert len(map_op.child_operations) == 5 - # Verify all five step operations exist - step_ops = [ - op for op in result.operations if op.operation_type == OperationType.STEP - ] - assert len(step_ops) == 5 + # Verify child operation names (SDK uses map-item-* format) + child_names = {op.name for op in map_op.child_operations} + expected_names = {f"map-item-{i}" for i in range(5)} + assert child_names == expected_names - step_names = {op.name for op in step_ops} - expected_names = {f"square_{i}" for i in range(5)} - assert step_names == expected_names + # Verify all children succeeded + for child in map_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/map/test_map_with_batch_serdes.py b/examples/test/map/test_map_with_batch_serdes.py new file mode 100644 index 0000000..b30a9bd --- /dev/null +++ b/examples/test/map/test_map_with_batch_serdes.py @@ -0,0 +1,43 @@ +"""Tests for map with batch-level serdes.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from src.map import map_with_batch_serdes +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_with_batch_serdes.handler, + lambda_function_name="Map with Batch SerDes", +) +def test_map_with_batch_serdes(durable_runner): + """Test map with custom batch-level serialization.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Verify all items were processed + assert result_data["success_count"] == 4 + + # Verify results + results = result_data["results"] + assert len(results) == 4 + assert results == [20, 40, 60, 80] # [10*2, 20*2, 30*2, 40*2] + + # Verify sum + assert result_data["sum"] == 200 + + # Get the map operation + map_op = result.get_context("map_with_batch_serdes") + assert map_op is not None + assert map_op.status is OperationStatus.SUCCEEDED + + # Verify all 4 child operations exist and succeeded + assert len(map_op.child_operations) == 4 + for child in map_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/map/test_map_with_custom_serdes.py b/examples/test/map/test_map_with_custom_serdes.py new file mode 100644 index 0000000..c0d3d79 --- /dev/null +++ b/examples/test/map/test_map_with_custom_serdes.py @@ -0,0 +1,48 @@ +"""Tests for map with custom serdes.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from src.map import map_with_custom_serdes +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_with_custom_serdes.handler, + lambda_function_name="Map with Custom SerDes", +) +def test_map_with_custom_serdes(durable_runner): + """Test map with custom item serialization.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Verify all items were processed + assert result_data["success_count"] == 3 + + # Verify results were properly deserialized + results = result_data["results"] + assert len(results) == 3 + + # Verify the custom serdes worked (data was serialized and deserialized correctly) + processed_names = result_data["processed_names"] + assert processed_names == ["item1", "item2", "item3"] + + # Verify processing logic worked correctly + for i, r in enumerate(results): + assert r["index"] == i + assert r["doubled_id"] == (i + 1) * 2 # IDs are 1, 2, 3 + + # Get the map operation + map_op = result.get_context("map_with_custom_serdes") + assert map_op is not None + assert map_op.status is OperationStatus.SUCCEEDED + + # Verify all 3 child operations exist and succeeded + assert len(map_op.child_operations) == 3 + for child in map_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/map/test_map_with_failure_tolerance.py b/examples/test/map/test_map_with_failure_tolerance.py new file mode 100644 index 0000000..4cf06d1 --- /dev/null +++ b/examples/test/map/test_map_with_failure_tolerance.py @@ -0,0 +1,52 @@ +"""Tests for map with failure tolerance.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from src.map import map_with_failure_tolerance +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_with_failure_tolerance.handler, + lambda_function_name="Map with Failure Tolerance", +) +def test_map_with_failure_tolerance(durable_runner): + """Test map with failure tolerance.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Should have 7 successes and 3 failures (items 3, 6, 9 fail) + assert result_data["success_count"] == 7 + assert result_data["failure_count"] == 3 + assert result_data["failed_count"] == 3 + + # Verify successful results (items 1,2,4,5,7,8,10 multiplied by 2) + expected_results = [2, 4, 8, 10, 14, 16, 20] + assert set(result_data["succeeded"]) == set(expected_results) + + assert result_data["completion_reason"] == "ALL_COMPLETED" + + # Get the map operation + map_op = result.get_context("map_with_tolerance") + assert map_op is not None + assert map_op.status is OperationStatus.SUCCEEDED + + # Verify all 10 child operations exist + assert len(map_op.child_operations) == 10 + + # Count successes and failures + succeeded = [ + op for op in map_op.child_operations if op.status is OperationStatus.SUCCEEDED + ] + failed = [ + op for op in map_op.child_operations if op.status is OperationStatus.FAILED + ] + + assert len(succeeded) == 7 + assert len(failed) == 3 diff --git a/examples/test/map/test_map_with_max_concurrency.py b/examples/test/map/test_map_with_max_concurrency.py new file mode 100644 index 0000000..3b6d505 --- /dev/null +++ b/examples/test/map/test_map_with_max_concurrency.py @@ -0,0 +1,37 @@ +"""Tests for map with maxConcurrency.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from src.map import map_with_max_concurrency +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_with_max_concurrency.handler, + lambda_function_name="Map with Max Concurrency", +) +def test_map_with_max_concurrency(durable_runner): + """Test map with maxConcurrency limit.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + results_list = deserialize_operation_payload(result.result) + assert len(results_list) == 10 + # Items 1-10 multiplied by 3 + assert results_list == [3, 6, 9, 12, 15, 18, 21, 24, 27, 30] + + # Get the map operation + map_op = result.get_context("map_with_concurrency") + assert map_op is not None + assert map_op.status is OperationStatus.SUCCEEDED + + # Verify all 10 child operations exist + assert len(map_op.child_operations) == 10 + + # Verify all children succeeded + for child in map_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/map/test_map_with_min_successful.py b/examples/test/map/test_map_with_min_successful.py new file mode 100644 index 0000000..c3a2177 --- /dev/null +++ b/examples/test/map/test_map_with_min_successful.py @@ -0,0 +1,70 @@ +"""Tests for map with min_successful.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from src.map import map_with_min_successful +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=map_with_min_successful.handler, + lambda_function_name="Map with Min Successful", +) +def test_map_with_min_successful(durable_runner): + """Test map with min_successful threshold.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # With min_successful=6, operation completes after reaching 6 successes + # Due to concurrency (max_concurrency=5), some items may complete before check + # Items 1-6 succeed, item 10 succeeds, items 7-9 fail + # Depending on timing, we get 6 or 7 successes + assert result_data["success_count"] >= 6 + assert result_data["success_count"] <= 7 + + # Operation stops once min_successful is reached + # Items 7-9 (which would fail) are never processed + assert result_data["failure_count"] == 0 + assert result_data["total_count"] == 10 + + # Verify we got the expected successful results + # Items 1-6 always succeed (2, 4, 6, 8, 10, 12) + # Item 10 might also succeed (20) depending on timing + assert len(result_data["results"]) == result_data["success_count"] + for result_val in result_data["results"]: + assert result_val % 2 == 0 # All results should be even (item * 2) + assert result_val >= 2 and result_val <= 20 # Range: items 1-10 * 2 + assert result_val not in [14, 16, 18] # Items 7-9 should not be present + + # Completion reason should be MIN_SUCCESSFUL_REACHED + assert result_data["completion_reason"] == "MIN_SUCCESSFUL_REACHED" + + # Get the map operation + map_op = result.get_context("map_min_successful") + assert map_op is not None + assert map_op.status is OperationStatus.SUCCEEDED + + # All 10 operations may be started, but only some complete before min_successful + assert len(map_op.child_operations) == 10 + + # Count operations by status + succeeded = [ + op for op in map_op.child_operations if op.status is OperationStatus.SUCCEEDED + ] + failed = [ + op for op in map_op.child_operations if op.status is OperationStatus.FAILED + ] + started = [ + op for op in map_op.child_operations if op.status is OperationStatus.STARTED + ] + + # Should have 6-7 successes, 0 failures, and remaining in STARTED state + assert len(succeeded) == result_data["success_count"] + assert len(failed) == 0 + assert len(started) == 10 - result_data["success_count"] diff --git a/examples/test/parallel/test_parallel.py b/examples/test/parallel/test_parallel.py index cce24ac..184e854 100644 --- a/examples/test/parallel/test_parallel.py +++ b/examples/test/parallel/test_parallel.py @@ -2,7 +2,7 @@ import pytest from aws_durable_execution_sdk_python.execution import InvocationStatus -from aws_durable_execution_sdk_python.lambda_service import OperationType +from aws_durable_execution_sdk_python.lambda_service import OperationStatus from src.parallel import parallel from test.conftest import deserialize_operation_payload @@ -14,22 +14,25 @@ lambda_function_name="Parallel Operations", ) def test_parallel(durable_runner): - """Test parallel example.""" + """Test parallel example using context.parallel().""" with durable_runner: result = durable_runner.run(input="test", timeout=10) assert result.status is InvocationStatus.SUCCEEDED assert deserialize_operation_payload(result.result) == [ - "Task 1 complete", - "Task 2 complete", - "Task 3 complete", + "task 1 completed", + "task 2 completed", + "task 3 completed after wait", ] - # Verify all three step operations exist - step_ops = [ - op for op in result.operations if op.operation_type == OperationType.STEP - ] - assert len(step_ops) == 3 + # Get the parallel operation (CONTEXT type with PARALLEL subtype) + parallel_op = result.get_context("parallel_operation") + assert parallel_op is not None + assert parallel_op.status is OperationStatus.SUCCEEDED + + # Verify all three child operations exist + assert len(parallel_op.child_operations) == 3 - step_names = {op.name for op in step_ops} - assert step_names == {"task1", "task2", "task3"} + # Verify all children succeeded + for child in parallel_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/parallel/test_parallel_with_batch_serdes.py b/examples/test/parallel/test_parallel_with_batch_serdes.py new file mode 100644 index 0000000..069428b --- /dev/null +++ b/examples/test/parallel/test_parallel_with_batch_serdes.py @@ -0,0 +1,43 @@ +"""Tests for parallel with batch-level serdes.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from src.parallel import parallel_with_batch_serdes +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=parallel_with_batch_serdes.handler, + lambda_function_name="Parallel with Batch SerDes", +) +def test_parallel_with_batch_serdes(durable_runner): + """Test parallel with custom batch-level serialization.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Verify all branches succeeded + assert result_data["success_count"] == 3 + + # Verify results + results = result_data["results"] + assert len(results) == 3 + assert results == [100, 200, 300] + + # Verify total + assert result_data["total"] == 600 + + # Get the parallel operation + parallel_op = result.get_context("parallel_with_batch_serdes") + assert parallel_op is not None + assert parallel_op.status is OperationStatus.SUCCEEDED + + # Verify all 3 child operations exist and succeeded + assert len(parallel_op.child_operations) == 3 + for child in parallel_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/parallel/test_parallel_with_custom_serdes.py b/examples/test/parallel/test_parallel_with_custom_serdes.py new file mode 100644 index 0000000..548dd5f --- /dev/null +++ b/examples/test/parallel/test_parallel_with_custom_serdes.py @@ -0,0 +1,46 @@ +"""Tests for parallel with custom serdes.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from src.parallel import parallel_with_custom_serdes +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=parallel_with_custom_serdes.handler, + lambda_function_name="Parallel with Custom SerDes", +) +def test_parallel_with_custom_serdes(durable_runner): + """Test parallel with custom item serialization.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Verify all tasks succeeded + assert result_data["success_count"] == 3 + + # Verify results were properly deserialized + results = result_data["results"] + assert len(results) == 3 + + # Verify the custom serdes worked (data was serialized and deserialized correctly) + task_names = {r["task"] for r in results} + assert task_names == {"task1", "task2", "task3"} + + # Verify values were preserved through serialization + assert result_data["total_value"] == 600 # 100 + 200 + 300 + + # Get the parallel operation + parallel_op = result.get_context("parallel_with_custom_serdes") + assert parallel_op is not None + assert parallel_op.status is OperationStatus.SUCCEEDED + + # Verify all 3 child operations exist and succeeded + assert len(parallel_op.child_operations) == 3 + for child in parallel_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/parallel/test_parallel_with_failure_tolerance.py b/examples/test/parallel/test_parallel_with_failure_tolerance.py new file mode 100644 index 0000000..275e27b --- /dev/null +++ b/examples/test/parallel/test_parallel_with_failure_tolerance.py @@ -0,0 +1,49 @@ +"""Tests for parallel with failure tolerance.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from src.parallel import parallel_with_failure_tolerance +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=parallel_with_failure_tolerance.handler, + lambda_function_name="Parallel with Failure Tolerance", +) +def test_parallel_with_failure_tolerance(durable_runner): + """Test parallel with failure tolerance.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + # Should have 3 successes and 2 failures + assert result_data["success_count"] == 3 + assert result_data["failure_count"] == 2 + assert set(result_data["succeeded"]) == {"success 1", "success 3", "success 5"} + assert result_data["completion_reason"] == "ALL_COMPLETED" + + # Get the parallel operation + parallel_op = result.get_context("parallel_with_tolerance") + assert parallel_op is not None + assert parallel_op.status is OperationStatus.SUCCEEDED + + # Verify all 5 child operations exist + assert len(parallel_op.child_operations) == 5 + + # Count successes and failures + succeeded = [ + op + for op in parallel_op.child_operations + if op.status is OperationStatus.SUCCEEDED + ] + failed = [ + op for op in parallel_op.child_operations if op.status is OperationStatus.FAILED + ] + + assert len(succeeded) == 3 + assert len(failed) == 2 diff --git a/examples/test/parallel/test_parallel_with_max_concurrency.py b/examples/test/parallel/test_parallel_with_max_concurrency.py new file mode 100644 index 0000000..ce65bde --- /dev/null +++ b/examples/test/parallel/test_parallel_with_max_concurrency.py @@ -0,0 +1,36 @@ +"""Tests for parallel with maxConcurrency.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import OperationStatus +from src.parallel import parallel_with_max_concurrency +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=parallel_with_max_concurrency.handler, + lambda_function_name="Parallel with Max Concurrency", +) +def test_parallel_with_max_concurrency(durable_runner): + """Test parallel with maxConcurrency limit.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + results_list = deserialize_operation_payload(result.result) + assert len(results_list) == 5 + assert set(results_list) == {"task 1", "task 2", "task 3", "task 4", "task 5"} + + # Get the parallel operation + parallel_op = result.get_context("parallel_with_concurrency") + assert parallel_op is not None + assert parallel_op.status is OperationStatus.SUCCEEDED + + # Verify all 5 child operations exist + assert len(parallel_op.child_operations) == 5 + + # Verify all children succeeded + for child in parallel_op.child_operations: + assert child.status is OperationStatus.SUCCEEDED diff --git a/examples/test/parallel/test_parallel_with_wait.py b/examples/test/parallel/test_parallel_with_wait.py new file mode 100644 index 0000000..b1b9a15 --- /dev/null +++ b/examples/test/parallel/test_parallel_with_wait.py @@ -0,0 +1,47 @@ +"""Tests for parallel with wait operations.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus +from aws_durable_execution_sdk_python.lambda_service import ( + OperationStatus, + OperationType, +) +from src.parallel import parallel_with_wait +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=parallel_with_wait.handler, + lambda_function_name="Parallel with Wait", +) +def test_parallel_with_wait(durable_runner): + """Test parallel with wait operations.""" + with durable_runner: + result = durable_runner.run(input="test", timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + assert deserialize_operation_payload(result.result) == "Completed waits" + + # Get the parallel operation + parallel_op = result.get_context("parallel_waits") + assert parallel_op is not None + assert parallel_op.status is OperationStatus.SUCCEEDED + + # Verify all 3 child operations exist + assert len(parallel_op.child_operations) == 3 + + # Each child should have a wait operation + wait_names = set() + for child in parallel_op.child_operations: + # Find wait operations in child + wait_ops = [ + op + for op in child.child_operations + if op.operation_type == OperationType.WAIT + ] + assert len(wait_ops) == 1 + wait_names.add(wait_ops[0].name) + + # Verify all expected wait operations exist + assert wait_names == {"wait_1_second", "wait_2_seconds", "wait_5_seconds"}