From 0fdc9bc921c7dfe4272ace83a81ac241076a3200 Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Wed, 5 Nov 2025 16:57:50 -0800 Subject: [PATCH] test: Add integration tests for wait and run in child context - Add integration tests: - multiple waits - simple execution - run in child context with large data --- examples/examples-catalog.json | 33 ++++++++ .../run_in_child_context_large_data.py | 77 +++++++++++++++++++ .../src/simple_execution/simple_execution.py | 18 +++++ examples/src/wait/multiple_wait.py | 18 +++++ .../test_run_in_child_context_large_data.py | 36 +++++++++ .../simple_execution/test_simple_execution.py | 40 ++++++++++ examples/test/wait/test_multiple_wait.py | 57 ++++++++++++++ 7 files changed, 279 insertions(+) create mode 100644 examples/src/run_in_child_context/run_in_child_context_large_data.py create mode 100644 examples/src/simple_execution/simple_execution.py create mode 100644 examples/src/wait/multiple_wait.py create mode 100644 examples/test/run_in_child_context/test_run_in_child_context_large_data.py create mode 100644 examples/test/simple_execution/test_simple_execution.py create mode 100644 examples/test/wait/test_multiple_wait.py diff --git a/examples/examples-catalog.json b/examples/examples-catalog.json index 30c7e21..836bd7b 100644 --- a/examples/examples-catalog.json +++ b/examples/examples-catalog.json @@ -56,6 +56,17 @@ }, "path": "./src/wait/wait.py" }, + { + "name": "Multiple Wait", + "description": "Usage of demonstrating multiple sequential wait operations.", + "handler": "multiple_wait.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/wait/multiple_wait.py" + }, { "name": "Callback", "description": "Basic usage of context.create_callback() to create a callback for external systems", @@ -154,6 +165,28 @@ "ExecutionTimeout": 300 }, "path": "./src/wait_for_condition/wait_for_condition.py" + }, + { + "name": "Run in Child Context Large Data", + "description": "Usage of context.run_in_child_context() to execute operations in isolated contexts with large data", + "handler": "run_in_child_context_large_data.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/run_in_child_context/run_in_child_context_large_data.py" + }, + { + "name": "Simple Execution", + "description": "Simple execution without durable execution", + "handler": "simple_execution.handler", + "integration": true, + "durableConfig": { + "RetentionPeriodInDays": 7, + "ExecutionTimeout": 300 + }, + "path": "./src/simple_execution/simple_execution.py" } ] } diff --git a/examples/src/run_in_child_context/run_in_child_context_large_data.py b/examples/src/run_in_child_context/run_in_child_context_large_data.py new file mode 100644 index 0000000..cabb66e --- /dev/null +++ b/examples/src/run_in_child_context/run_in_child_context_large_data.py @@ -0,0 +1,77 @@ +"""Test runInChildContext with large data exceeding individual step limits.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import ( + DurableContext, + durable_with_child_context, +) +from aws_durable_execution_sdk_python.execution import durable_execution + + +def generate_large_string(size_in_kb: int) -> str: + """Generate a string of approximately the specified size in KB.""" + target_size = size_in_kb * 1024 # Convert KB to bytes + base_string = "A" * 1000 # 1KB string + repetitions = target_size // 1000 + remainder = target_size % 1000 + + return base_string * repetitions + "A" * remainder + + +@durable_with_child_context +def large_data_processor(child_context: DurableContext) -> dict[str, Any]: + """Process large data in child context.""" + # Generate data using a loop - each step returns ~50KB of data (under the step limit) + step_results: list[str] = [] + step_sizes: list[int] = [] + + for i in range(1, 6): # 1 to 5 + step_result: str = child_context.step( + lambda _: generate_large_string(50), # 50KB + name=f"generate-data-{i}", + ) + + step_results.append(step_result) + step_sizes.append(len(step_result)) + + # Concatenate all results - total should be ~250KB + concatenated_result = "".join(step_results) + + return { + "totalSize": len(concatenated_result), + "sizeInKB": round(len(concatenated_result) / 1024), + "data": concatenated_result, + "stepSizes": step_sizes, + } + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating runInChildContext with large data.""" + # Use runInChildContext to handle large data that would exceed 256k step limit + large_data_result: dict[str, Any] = context.run_in_child_context( + large_data_processor(), name="large-data-processor" + ) + + # Add a wait after runInChildContext to test persistence across invocations + context.wait(seconds=1, name="post-processing-wait") + + # Verify the data is still intact after the wait + data_integrity_check = ( + len(large_data_result["data"]) == large_data_result["totalSize"] + and len(large_data_result["data"]) > 0 + ) + + return { + "success": True, + "message": "Successfully processed large data exceeding individual step limits using runInChildContext", + "dataIntegrityCheck": data_integrity_check, + "summary": { + "totalDataSize": large_data_result["sizeInKB"], + "stepsExecuted": 5, + "childContextUsed": True, + "waitExecuted": True, + "dataPreservedAcrossWait": data_integrity_check, + }, + } diff --git a/examples/src/simple_execution/simple_execution.py b/examples/src/simple_execution/simple_execution.py new file mode 100644 index 0000000..77cacba --- /dev/null +++ b/examples/src/simple_execution/simple_execution.py @@ -0,0 +1,18 @@ +"""Demonstrates handler execution without any durable operations.""" + +import json +import time +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(event: Any, _context: DurableContext) -> dict[str, Any]: + """Handler that executes without any durable operations.""" + return { + "received": json.dumps(event), + "timestamp": int(time.time() * 1000), # milliseconds since epoch + "message": "Handler completed successfully", + } diff --git a/examples/src/wait/multiple_wait.py b/examples/src/wait/multiple_wait.py new file mode 100644 index 0000000..7a13402 --- /dev/null +++ b/examples/src/wait/multiple_wait.py @@ -0,0 +1,18 @@ +"""Example demonstrating multiple sequential wait operations.""" + +from typing import Any + +from aws_durable_execution_sdk_python.context import DurableContext +from aws_durable_execution_sdk_python.execution import durable_execution + + +@durable_execution +def handler(_event: Any, context: DurableContext) -> dict[str, Any]: + """Handler demonstrating multiple sequential wait operations.""" + context.wait(seconds=5, name="wait-1") + context.wait(seconds=5, name="wait-2") + + return { + "completedWaits": 2, + "finalStep": "done", + } diff --git a/examples/test/run_in_child_context/test_run_in_child_context_large_data.py b/examples/test/run_in_child_context/test_run_in_child_context_large_data.py new file mode 100644 index 0000000..3469780 --- /dev/null +++ b/examples/test/run_in_child_context/test_run_in_child_context_large_data.py @@ -0,0 +1,36 @@ +"""Tests for run_in_child_context_large_data.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.run_in_child_context import run_in_child_context_large_data +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=run_in_child_context_large_data.handler, + lambda_function_name="run in child context large data", +) +def test_handle_large_data_exceeding_256k_limit_using_run_in_child_context( + durable_runner, +): + """Test handling large data exceeding 256k limit using runInChildContext.""" + with durable_runner: + result = durable_runner.run(input=None, timeout=30) + + result_data = deserialize_operation_payload(result.result) + + # Verify the execution succeeded + assert result.status is InvocationStatus.SUCCEEDED + assert result_data["success"] is True + + # Verify large data was processed + assert result_data["summary"]["totalDataSize"] > 240 # Should be ~250KB + assert result_data["summary"]["stepsExecuted"] == 5 + assert result_data["summary"]["childContextUsed"] is True + assert result_data["summary"]["waitExecuted"] is True + assert result_data["summary"]["dataPreservedAcrossWait"] is True + + # Verify data integrity across wait + assert result_data["dataIntegrityCheck"] is True diff --git a/examples/test/simple_execution/test_simple_execution.py b/examples/test/simple_execution/test_simple_execution.py new file mode 100644 index 0000000..740cce4 --- /dev/null +++ b/examples/test/simple_execution/test_simple_execution.py @@ -0,0 +1,40 @@ +"""Tests for simple_execution.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.simple_execution import simple_execution +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=simple_execution.handler, + lambda_function_name="simple execution", +) +def test_execute_simple_handler_without_operations(durable_runner): + """Test simple handler execution without operations.""" + test_payload = { + "userId": "test-user", + "action": "simple-execution", + } + + with durable_runner: + result = durable_runner.run(input=test_payload, timeout=10) + + result_data = deserialize_operation_payload(result.result) + + # Verify the result structure and content + assert ( + result_data["received"] + == '{"userId": "test-user", "action": "simple-execution"}' + ) + assert result_data["message"] == "Handler completed successfully" + assert isinstance(result_data["timestamp"], int) + assert result_data["timestamp"] > 0 + + # Should have no operations for simple execution + assert len(result.operations) == 0 + + # Verify no error occurred + assert result.status is InvocationStatus.SUCCEEDED diff --git a/examples/test/wait/test_multiple_wait.py b/examples/test/wait/test_multiple_wait.py new file mode 100644 index 0000000..40ecbc5 --- /dev/null +++ b/examples/test/wait/test_multiple_wait.py @@ -0,0 +1,57 @@ +"""Tests for multiple_waits.""" + +import pytest +from aws_durable_execution_sdk_python.execution import InvocationStatus + +from src.wait import multiple_wait +from test.conftest import deserialize_operation_payload + + +@pytest.mark.example +@pytest.mark.durable_execution( + handler=multiple_wait.handler, + lambda_function_name="multiple wait", +) +def test_multiple_sequential_wait_operations(durable_runner): + """Test multiple sequential wait operations.""" + with durable_runner: + result = durable_runner.run(input=None, timeout=20) + + assert result.status is InvocationStatus.SUCCEEDED + + # Verify the final result + assert deserialize_operation_payload(result.result) == { + "completedWaits": 2, + "finalStep": "done", + } + + # Verify operations were tracked + operations = [op for op in result.operations if op.operation_type.value == "WAIT"] + assert len(operations) == 2 + + # Find the wait operations by name + wait_1_ops = [ + op + for op in operations + if op.operation_type.value == "WAIT" and op.name == "wait-1" + ] + assert len(wait_1_ops) == 1 + first_wait = wait_1_ops[0] + + wait_2_ops = [ + op + for op in operations + if op.operation_type.value == "WAIT" and op.name == "wait-2" + ] + assert len(wait_2_ops) == 1 + second_wait = wait_2_ops[0] + + # Verify operation types and status + assert first_wait.operation_type.value == "WAIT" + assert first_wait.status.value == "SUCCEEDED" + assert second_wait.operation_type.value == "WAIT" + assert second_wait.status.value == "SUCCEEDED" + + # Verify wait details + assert first_wait.scheduled_end_timestamp is not None + assert second_wait.scheduled_end_timestamp is not None