Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@
},
"path": "./src/wait/wait.py"
},
{
"name": "Multiple Wait",
"description": "Usage of demonstrating multiple sequential wait operations.",
"handler": "multiple_wait.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/wait/multiple_wait.py"
},
{
"name": "Callback",
"description": "Basic usage of context.create_callback() to create a callback for external systems",
Expand Down Expand Up @@ -154,6 +165,28 @@
"ExecutionTimeout": 300
},
"path": "./src/wait_for_condition/wait_for_condition.py"
},
{
"name": "Run in Child Context Large Data",
"description": "Usage of context.run_in_child_context() to execute operations in isolated contexts with large data",
"handler": "run_in_child_context_large_data.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/run_in_child_context/run_in_child_context_large_data.py"
},
{
"name": "Simple Execution",
"description": "Simple execution without durable execution",
"handler": "simple_execution.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/simple_execution/simple_execution.py"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Test runInChildContext with large data exceeding individual step limits."""

from typing import Any

from aws_durable_execution_sdk_python.context import (
DurableContext,
durable_with_child_context,
)
from aws_durable_execution_sdk_python.execution import durable_execution


def generate_large_string(size_in_kb: int) -> str:
"""Generate a string of approximately the specified size in KB."""
target_size = size_in_kb * 1024 # Convert KB to bytes
base_string = "A" * 1000 # 1KB string
repetitions = target_size // 1000
remainder = target_size % 1000

return base_string * repetitions + "A" * remainder


@durable_with_child_context
def large_data_processor(child_context: DurableContext) -> dict[str, Any]:
"""Process large data in child context."""
# Generate data using a loop - each step returns ~50KB of data (under the step limit)
step_results: list[str] = []
step_sizes: list[int] = []

for i in range(1, 6): # 1 to 5
step_result: str = child_context.step(
lambda _: generate_large_string(50), # 50KB
name=f"generate-data-{i}",
)

step_results.append(step_result)
step_sizes.append(len(step_result))

# Concatenate all results - total should be ~250KB
concatenated_result = "".join(step_results)

return {
"totalSize": len(concatenated_result),
"sizeInKB": round(len(concatenated_result) / 1024),
"data": concatenated_result,
"stepSizes": step_sizes,
}


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating runInChildContext with large data."""
# Use runInChildContext to handle large data that would exceed 256k step limit
large_data_result: dict[str, Any] = context.run_in_child_context(
large_data_processor(), name="large-data-processor"
)

# Add a wait after runInChildContext to test persistence across invocations
context.wait(seconds=1, name="post-processing-wait")

# Verify the data is still intact after the wait
data_integrity_check = (
len(large_data_result["data"]) == large_data_result["totalSize"]
and len(large_data_result["data"]) > 0
)

return {
"success": True,
"message": "Successfully processed large data exceeding individual step limits using runInChildContext",
"dataIntegrityCheck": data_integrity_check,
"summary": {
"totalDataSize": large_data_result["sizeInKB"],
"stepsExecuted": 5,
"childContextUsed": True,
"waitExecuted": True,
"dataPreservedAcrossWait": data_integrity_check,
},
}
18 changes: 18 additions & 0 deletions examples/src/simple_execution/simple_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Demonstrates handler execution without any durable operations."""

import json
import time
from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(event: Any, _context: DurableContext) -> dict[str, Any]:
"""Handler that executes without any durable operations."""
return {
"received": json.dumps(event),
"timestamp": int(time.time() * 1000), # milliseconds since epoch
"message": "Handler completed successfully",
}
18 changes: 18 additions & 0 deletions examples/src/wait/multiple_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Example demonstrating multiple sequential wait operations."""

from typing import Any

from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating multiple sequential wait operations."""
context.wait(seconds=5, name="wait-1")
context.wait(seconds=5, name="wait-2")

return {
"completedWaits": 2,
"finalStep": "done",
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Tests for run_in_child_context_large_data."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.run_in_child_context import run_in_child_context_large_data
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=run_in_child_context_large_data.handler,
lambda_function_name="run in child context large data",
)
def test_handle_large_data_exceeding_256k_limit_using_run_in_child_context(
durable_runner,
):
"""Test handling large data exceeding 256k limit using runInChildContext."""
with durable_runner:
result = durable_runner.run(input=None, timeout=30)

result_data = deserialize_operation_payload(result.result)

# Verify the execution succeeded
assert result.status is InvocationStatus.SUCCEEDED
assert result_data["success"] is True

# Verify large data was processed
assert result_data["summary"]["totalDataSize"] > 240 # Should be ~250KB
assert result_data["summary"]["stepsExecuted"] == 5
assert result_data["summary"]["childContextUsed"] is True
assert result_data["summary"]["waitExecuted"] is True
assert result_data["summary"]["dataPreservedAcrossWait"] is True

# Verify data integrity across wait
assert result_data["dataIntegrityCheck"] is True
40 changes: 40 additions & 0 deletions examples/test/simple_execution/test_simple_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Tests for simple_execution."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.simple_execution import simple_execution
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=simple_execution.handler,
lambda_function_name="simple execution",
)
def test_execute_simple_handler_without_operations(durable_runner):
"""Test simple handler execution without operations."""
test_payload = {
"userId": "test-user",
"action": "simple-execution",
}

with durable_runner:
result = durable_runner.run(input=test_payload, timeout=10)

result_data = deserialize_operation_payload(result.result)

# Verify the result structure and content
assert (
result_data["received"]
== '{"userId": "test-user", "action": "simple-execution"}'
)
assert result_data["message"] == "Handler completed successfully"
assert isinstance(result_data["timestamp"], int)
assert result_data["timestamp"] > 0

# Should have no operations for simple execution
assert len(result.operations) == 0

# Verify no error occurred
assert result.status is InvocationStatus.SUCCEEDED
57 changes: 57 additions & 0 deletions examples/test/wait/test_multiple_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Tests for multiple_waits."""

import pytest
from aws_durable_execution_sdk_python.execution import InvocationStatus

from src.wait import multiple_wait
from test.conftest import deserialize_operation_payload


@pytest.mark.example
@pytest.mark.durable_execution(
handler=multiple_wait.handler,
lambda_function_name="multiple wait",
)
def test_multiple_sequential_wait_operations(durable_runner):
"""Test multiple sequential wait operations."""
with durable_runner:
result = durable_runner.run(input=None, timeout=20)

assert result.status is InvocationStatus.SUCCEEDED

# Verify the final result
assert deserialize_operation_payload(result.result) == {
"completedWaits": 2,
"finalStep": "done",
}

# Verify operations were tracked
operations = [op for op in result.operations if op.operation_type.value == "WAIT"]
assert len(operations) == 2

# Find the wait operations by name
wait_1_ops = [
op
for op in operations
if op.operation_type.value == "WAIT" and op.name == "wait-1"
]
assert len(wait_1_ops) == 1
first_wait = wait_1_ops[0]

wait_2_ops = [
op
for op in operations
if op.operation_type.value == "WAIT" and op.name == "wait-2"
]
assert len(wait_2_ops) == 1
second_wait = wait_2_ops[0]

# Verify operation types and status
assert first_wait.operation_type.value == "WAIT"
assert first_wait.status.value == "SUCCEEDED"
assert second_wait.operation_type.value == "WAIT"
assert second_wait.status.value == "SUCCEEDED"

# Verify wait details
assert first_wait.scheduled_end_timestamp is not None
assert second_wait.scheduled_end_timestamp is not None
Loading