diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index 8ae76b1..1c1aedd 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -510,6 +510,39 @@ "ExecutionTimeout": 300 }, "path": "./src/callback/callback_serdes.py" + }, + { + "name": "No Replay Execution", + "description": "Execution with simples steps and without replay", + "handler": "no_replay_execution.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/no_replay_execution/no_replay_execution.py" + }, + { + "name": "Run In Child Context With Failing Step", + "description": "Demonstrates runInChildContext with a failing step followed by a successful wait", + "handler": "run_in_child_context_step_failure.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/run_in_child_context/run_in_child_context_step_failure.py" + }, + { + "name": "Comprehensive Operations", + "description": "Complex multi-operation example demonstrating all major operations", + "handler": "comprehensive_operations.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/comprehensive_operations/comprehensive_operations.py" } ] } diff --git a/examples/src/comprehensive_operations/comprehensive_operations.py b/examples/src/comprehensive_operations/comprehensive_operations.py new file mode 100644 index 0000000..9f4c0bb --- /dev/null +++ b/examples/src/comprehensive_operations/comprehensive_operations.py @@ -0,0 +1,51 @@ +"""Complex multi-operation example demonstrating all major operations.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import Duration + + +@durable_execution +def handler(event: dict[str, Any], context: DurableContext) -> dict[str, Any]: + """Comprehensive example demonstrating all major durable operations.""" + print(f"Starting comprehensive operations example with event: {event}") + + # Step 1: ctx.step - Simple step that returns a result + step1_result: str = context.step( + lambda _: "Step 1 completed successfully", + name="step1", + ) + + # Step 2: ctx.wait - Wait for 1 second + context.wait(Duration.from_seconds(1)) + + # Step 3: ctx.map - Map with 5 iterations returning numbers 1 to 5 + map_input = [1, 2, 3, 4, 5] + + map_results = context.map( + inputs=map_input, + func=lambda ctx, item, index, _: ctx.step( + lambda _: item, name=f"map-step-{index}" + ), + name="map-numbers", + ).to_dict() + + # Step 4: ctx.parallel - 3 branches, each returning a fruit name + + parallel_results = context.parallel( + functions=[ + lambda ctx: ctx.step(lambda _: "apple", name="fruit-step-1"), + lambda ctx: ctx.step(lambda _: "banana", name="fruit-step-2"), + lambda ctx: ctx.step(lambda _: "orange", name="fruit-step-3"), + ] + ).to_dict() + + # Final result combining all operations + return { + "step1": step1_result, + "waitCompleted": True, + "mapResults": map_results, + "parallelResults": parallel_results, + } diff --git a/examples/src/no_replay_execution/no_replay_execution.py b/examples/src/no_replay_execution/no_replay_execution.py new file mode 100644 index 0000000..a6eb646 --- /dev/null +++ b/examples/src/no_replay_execution/no_replay_execution.py @@ -0,0 +1,15 @@ +"""Demonstrates step execution tracking when no replay occurs.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, bool]: + """Handler demonstrating step execution without replay.""" + context.step(lambda _: "user-1", name="fetch-user-1") + context.step(lambda _: "user-2", name="fetch-user-2") + + return {"completed": True} diff --git a/examples/src/run_in_child_context/run_in_child_context_step_failure.py b/examples/src/run_in_child_context/run_in_child_context_step_failure.py new file mode 100644 index 0000000..c55c52e --- /dev/null +++ b/examples/src/run_in_child_context/run_in_child_context_step_failure.py @@ -0,0 +1,50 @@ +"""Demonstrates runInChildContext with a failing step followed by a successful wait.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution +from aws_durable_execution_sdk_python.config import StepConfig, Duration +from aws_durable_execution_sdk_python.retries import ( + RetryStrategyConfig, + create_retry_strategy, +) + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, bool]: + """Handler demonstrating runInChildContext with failing step.""" + + def child_with_failure(ctx: DurableContext) -> None: + """Child context with a failing step.""" + + retry_config = RetryStrategyConfig( + max_attempts=3, + initial_delay=Duration.from_seconds(1), + max_delay=Duration.from_seconds(10), + backoff_rate=2.0, + ) + step_config = StepConfig(retry_strategy=create_retry_strategy(retry_config)) + + def failing_step(_: DurableContext) -> None: + """Step that always fails.""" + raise Exception("Step failed in child context") + + ctx.step( + failing_step, + name="failing-step", + config=step_config, + ) + + try: + context.run_in_child_context( + child_with_failure, + name="child-with-failure", + ) + except Exception as error: + # Catch and ignore child context and step errors + result = {"success": True, "error": str(error)} + + context.wait(Duration.from_seconds(1), name="wait-after-failure") + + return result diff --git a/examples/template.yaml b/examples/template.yaml index 7046ee4..92264c2 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -625,3 +625,43 @@ Resources: DurableConfig: RetentionPeriodInDays: 7 ExecutionTimeout: 300 + NoReplayExecution: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: no_replay_execution.handler + Description: Execution with simples steps and without replay + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + RunInChildContextStepFailure: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: run_in_child_context_step_failure.handler + Description: Demonstrates runInChildContext with a failing step followed by + a successful wait + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 + ComprehensiveOperations: + Type: AWS::Serverless::Function + Properties: + CodeUri: build/ + Handler: comprehensive_operations.handler + Description: Complex multi-operation example demonstrating all major operations + Role: + Fn::GetAtt: + - DurableFunctionRole + - Arn + DurableConfig: + RetentionPeriodInDays: 7 + ExecutionTimeout: 300 diff --git a/examples/test/comprehensive_operations/test_comprehensive_operations.py b/examples/test/comprehensive_operations/test_comprehensive_operations.py new file mode 100644 index 0000000..4b84cc6 --- /dev/null +++ b/examples/test/comprehensive_operations/test_comprehensive_operations.py @@ -0,0 +1,94 @@ +"""Tests for comprehensive_operations.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.comprehensive_operations import comprehensive_operations +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=comprehensive_operations.handler, + lambda_function_name="Comprehensive Operations", +) +def test_execute_all_operations_successfully(durable_runner): + """Test that all operations execute successfully.""" + with durable_runner: + result = durable_runner.run(input={"message": "test"}, timeout=30) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + + assert result_data["step1"] == "Step 1 completed successfully" + assert result_data["waitCompleted"] is True + + # verify map results + map_results = result_data["mapResults"] + assert len(map_results["all"]) == 5 + assert [item["result"] for item in map_results["all"]] == [1, 2, 3, 4, 5] + assert map_results["completionReason"] == "ALL_COMPLETED" + + # verify parallel results + parallel_results = result_data["parallelResults"] + assert len(parallel_results["all"]) == 3 + assert [item["result"] for item in parallel_results["all"]] == [ + "apple", + "banana", + "orange", + ] + assert parallel_results["completionReason"] == "ALL_COMPLETED" + + # Get all operations including nested ones + all_ops = result.get_all_operations() + + # Verify step1 operation + step1_ops = [ + op for op in all_ops if op.operation_type.value == "STEP" and op.name == "step1" + ] + assert len(step1_ops) == 1 + step1_op = step1_ops[0] + assert ( + deserialize_operation_payload(step1_op.result) + == "Step 1 completed successfully" + ) + + # Verify wait operation (should be at index 1) + wait_op = result.operations[1] + assert wait_op.operation_type.value == "WAIT" + + # Verify individual map step operations exist with correct names + for i in range(5): + map_step_ops = [ + op + for op in all_ops + if op.operation_type.value == "STEP" and op.name == f"map-step-{i}" + ] + assert len(map_step_ops) == 1 + assert deserialize_operation_payload(map_step_ops[0].result) == i + 1 + + # Verify individual parallel step operations exist + fruit_step_1_ops = [ + op + for op in all_ops + if op.operation_type.value == "STEP" and op.name == "fruit-step-1" + ] + assert len(fruit_step_1_ops) == 1 + assert deserialize_operation_payload(fruit_step_1_ops[0].result) == "apple" + + fruit_step_2_ops = [ + op + for op in all_ops + if op.operation_type.value == "STEP" and op.name == "fruit-step-2" + ] + assert len(fruit_step_2_ops) == 1 + assert deserialize_operation_payload(fruit_step_2_ops[0].result) == "banana" + + fruit_step_3_ops = [ + op + for op in all_ops + if op.operation_type.value == "STEP" and op.name == "fruit-step-3" + ] + assert len(fruit_step_3_ops) == 1 + assert deserialize_operation_payload(fruit_step_3_ops[0].result) == "orange" diff --git a/examples/test/no_replay_execution/test_no_replay_execution.py b/examples/test/no_replay_execution/test_no_replay_execution.py new file mode 100644 index 0000000..934e107 --- /dev/null +++ b/examples/test/no_replay_execution/test_no_replay_execution.py @@ -0,0 +1,52 @@ +"""Tests for no_replay_execution.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.no_replay_execution import no_replay_execution +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=no_replay_execution.handler, + lambda_function_name="No Replay Execution", +) +def test_handle_step_operations_when_no_replay_occurs(durable_runner): + """Test step operations when no replay occurs.""" + with durable_runner: + result = durable_runner.run(input=None, timeout=10) + + assert result.status is InvocationStatus.SUCCEEDED + + # Verify final result + assert deserialize_operation_payload(result.result) == {"completed": True} + + # Get step operations + user1_step_ops = [ + op + for op in result.operations + if op.operation_type.value == "STEP" and op.name == "fetch-user-1" + ] + assert len(user1_step_ops) == 1 + user1_step = user1_step_ops[0] + + user2_step_ops = [ + op + for op in result.operations + if op.operation_type.value == "STEP" and op.name == "fetch-user-2" + ] + assert len(user2_step_ops) == 1 + user2_step = user2_step_ops[0] + + # Verify first-time execution tracking (no replay) + assert user1_step.operation_type.value == "STEP" + assert user1_step.status.value == "SUCCEEDED" + assert deserialize_operation_payload(user1_step.result) == "user-1" + + assert user2_step.operation_type.value == "STEP" + assert user2_step.status.value == "SUCCEEDED" + assert deserialize_operation_payload(user2_step.result) == "user-2" + + # Verify both operations tracked + assert len(result.operations) == 2 diff --git a/examples/test/run_in_child_context/test_run_in_child_context_step_failure.py b/examples/test/run_in_child_context/test_run_in_child_context_step_failure.py new file mode 100644 index 0000000..52c1b8c --- /dev/null +++ b/examples/test/run_in_child_context/test_run_in_child_context_step_failure.py @@ -0,0 +1,23 @@ +"""Tests for run_in_child_context_failing_step.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.run_in_child_context import run_in_child_context_step_failure +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=run_in_child_context_step_failure.handler, + lambda_function_name="Run In Child Context With Failing Step", +) +def test_succeed_despite_failing_step_in_child_context(durable_runner): + """Test that execution succeeds despite failing step in child context.""" + with durable_runner: + result = durable_runner.run(input=None, timeout=30) + + assert result.status is InvocationStatus.SUCCEEDED + + result_data = deserialize_operation_payload(result.result) + assert result_data == {"success": True, "error": "Step failed in child context"}