Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions examples/examples-catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,116 @@
"ExecutionTimeout": 300
},
"path": "./src/simple_execution/simple_execution.py"
},
{
"name": "Map with Max Concurrency",
"description": "Map operation with maxConcurrency limit",
"handler": "map_with_max_concurrency.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_with_max_concurrency.py"
},
{
"name": "Map with Min Successful",
"description": "Map operation with min_successful completion config",
"handler": "map_with_min_successful.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_with_min_successful.py"
},
{
"name": "Map with Failure Tolerance",
"description": "Map operation with failure tolerance",
"handler": "map_with_failure_tolerance.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_with_failure_tolerance.py"
},
{
"name": "Parallel with Max Concurrency",
"description": "Parallel operation with maxConcurrency limit",
"handler": "parallel_with_max_concurrency.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/parallel/parallel_with_max_concurrency.py"
},
{
"name": "Parallel with Wait",
"description": "Parallel operation with wait operations in branches",
"handler": "parallel_with_wait.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/parallel/parallel_with_wait.py"
},
{
"name": "Parallel with Failure Tolerance",
"description": "Parallel operation with failure tolerance",
"handler": "parallel_with_failure_tolerance.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/parallel/parallel_with_failure_tolerance.py"
},
{
"name": "Map with Custom SerDes",
"description": "Map operation with custom item-level serialization",
"handler": "map_with_custom_serdes.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_with_custom_serdes.py"
},
{
"name": "Map with Batch SerDes",
"description": "Map operation with custom batch-level serialization",
"handler": "map_with_batch_serdes.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/map/map_with_batch_serdes.py"
},
{
"name": "Parallel with Custom SerDes",
"description": "Parallel operation with custom item-level serialization",
"handler": "parallel_with_custom_serdes.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/parallel/parallel_with_custom_serdes.py"
},
{
"name": "Parallel with Batch SerDes",
"description": "Parallel operation with custom batch-level serialization",
"handler": "parallel_with_batch_serdes.handler",
"integration": true,
"durableConfig": {
"RetentionPeriodInDays": 7,
"ExecutionTimeout": 300
},
"path": "./src/parallel/parallel_with_batch_serdes.py"
}
]
}
25 changes: 12 additions & 13 deletions examples/src/map/map_operations.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
"""Example demonstrating map-like operations for processing collections durably."""
"""Example demonstrating map operations for processing collections durably."""

from typing import Any

from aws_durable_execution_sdk_python.config import MapConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


def square(x: int) -> int:
return x * x


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[int]:
"""Process a list of items using map-like operations."""
"""Process a list of items using context.map()."""
items = [1, 2, 3, 4, 5]

# Process each item as a separate durable step
results = []
for i, item in enumerate(items):
result = context.step(lambda _, x=item: square(x), name=f"square_{i}")
results.append(result)

return results
# Use context.map() to process items concurrently and extract results immediately
return context.map(
inputs=items,
func=lambda ctx, item, index, _: ctx.step(
lambda _: item * 2, name=f"map_item_{index}"
),
name="map_operation",
config=MapConfig(max_concurrency=2),
).get_results()
96 changes: 96 additions & 0 deletions examples/src/map/map_with_batch_serdes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Example demonstrating map with batch-level serdes."""

import json
from typing import Any

from aws_durable_execution_sdk_python.concurrency.models import (
BatchItem,
BatchItemStatus,
BatchResult,
CompletionReason,
)
from aws_durable_execution_sdk_python.config import MapConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.lambda_service import ErrorObject
from aws_durable_execution_sdk_python.serdes import JsonSerDes, SerDes, SerDesContext


class CustomBatchSerDes(SerDes[BatchResult]):
"""Custom serializer for the entire BatchResult."""

def serialize(self, value: BatchResult, _: SerDesContext) -> str:
# Serialize BatchResult with custom metadata

wrapped = {
"batch_metadata": {
"serializer": "CustomBatchSerDes",
"version": "2.0",
"total_items": len(value.get_results()),
},
"success_count": value.success_count,
"failure_count": value.failure_count,
"results": value.get_results(),
"errors": [e.to_dict() if e else None for e in value.get_errors()],
}
return json.dumps(wrapped)

def deserialize(self, payload: str, _: SerDesContext) -> BatchResult:
wrapped = json.loads(payload)
batch_items = []
results = wrapped["results"]
errors = wrapped["errors"]

for i, result in enumerate(results):
error = errors[i] if i < len(errors) else None
if error:
batch_items.append(
BatchItem(
index=i,
status=BatchItemStatus.FAILED,
result=None,
error=ErrorObject.from_dict(error) if error else None,
)
)
else:
batch_items.append(
BatchItem(
index=i,
status=BatchItemStatus.SUCCEEDED,
result=result,
error=None,
)
)

# Infer completion reason (assume ALL_COMPLETED if all succeeded)
completion_reason = (
CompletionReason.ALL_COMPLETED
if wrapped["failure_count"] == 0
else CompletionReason.FAILURE_TOLERANCE_EXCEEDED
)

return BatchResult(all=batch_items, completion_reason=completion_reason)


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Process items with custom batch-level serialization."""
items = [10, 20, 30, 40]

# Use custom serdes for the entire BatchResult, default JSON for individual items
config = MapConfig(serdes=CustomBatchSerDes(), item_serdes=JsonSerDes())

results = context.map(
inputs=items,
func=lambda ctx, item, index, _: ctx.step(
lambda _: item * 2, name=f"double_{index}"
),
name="map_with_batch_serdes",
config=config,
)

return {
"success_count": results.success_count,
"results": results.get_results(),
"sum": sum(results.get_results()),
}
63 changes: 63 additions & 0 deletions examples/src/map/map_with_custom_serdes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Example demonstrating map with custom serdes."""

import json
from typing import Any

from aws_durable_execution_sdk_python.config import MapConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.serdes import SerDes, SerDesContext


class CustomItemSerDes(SerDes[dict[str, Any]]):
"""Custom serializer for individual items that adds metadata."""

def serialize(self, value: dict[str, Any], _: SerDesContext) -> str:
# Add custom metadata during serialization
wrapped = {"data": value, "serialized_by": "CustomItemSerDes", "version": "1.0"}

return json.dumps(wrapped)

def deserialize(self, payload: str, _: SerDesContext) -> dict[str, Any]:
wrapped = json.loads(payload)
# Extract the original data
return wrapped["data"]


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Process items with custom item serialization.

This example demonstrates using item_serdes to customize serialization
of individual item results, while using default serialization for the
overall BatchResult.
"""
items = [
{"id": 1, "name": "item1"},
{"id": 2, "name": "item2"},
{"id": 3, "name": "item3"},
]

# Use custom serdes for individual items only
# The BatchResult will use default JSON serialization
config = MapConfig(item_serdes=CustomItemSerDes())

results = context.map(
inputs=items,
func=lambda ctx, item, index, _: ctx.step(
lambda _: {
"processed": item["name"],
"index": index,
"doubled_id": item["id"] * 2,
},
name=f"process_{index}",
),
name="map_with_custom_serdes",
config=config,
)

return {
"success_count": results.success_count,
"results": results.get_results(),
"processed_names": [r["processed"] for r in results.get_results()],
}
53 changes: 53 additions & 0 deletions examples/src/map/map_with_failure_tolerance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""Example demonstrating map with failure tolerance."""

from typing import Any

from aws_durable_execution_sdk_python.config import (
CompletionConfig,
MapConfig,
StepConfig,
)
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.retries import RetryStrategyConfig


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Process items with failure tolerance."""
items = list(range(1, 11)) # [1, 2, 3, ..., 10]

# Tolerate up to 3 failures
config = MapConfig(
max_concurrency=5,
completion_config=CompletionConfig(tolerated_failure_count=3),
)

# Disable retries so failures happen immediately
step_config = StepConfig(retry_strategy=RetryStrategyConfig(max_attempts=1))

results = context.map(
inputs=items,
func=lambda ctx, item, index, _: ctx.step(
lambda _: _process_with_failures(item),
name=f"item_{index}",
config=step_config,
),
name="map_with_tolerance",
config=config,
)

return {
"success_count": results.success_count,
"failure_count": results.failure_count,
"succeeded": [item.result for item in results.succeeded()],
"failed_count": len(results.failed()),
"completion_reason": results.completion_reason.value,
}


def _process_with_failures(item: int) -> int:
"""Process item - fails for items 3, 6, 9."""
if item % 3 == 0:
raise ValueError(f"Item {item} failed")
return item * 2
23 changes: 23 additions & 0 deletions examples/src/map/map_with_max_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Example demonstrating map with maxConcurrency limit."""

from typing import Any

from aws_durable_execution_sdk_python.config import MapConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution


@durable_execution
def handler(_event: Any, context: DurableContext) -> list[int]:
"""Process items with concurrency limit of 3."""
items = list(range(1, 11)) # [1, 2, 3, ..., 10]

# Extract results immediately to avoid BatchResult serialization
return context.map(
inputs=items,
func=lambda ctx, item, index, _: ctx.step(
lambda _: item * 3, name=f"process_{index}"
),
name="map_with_concurrency",
config=MapConfig(max_concurrency=3),
).get_results()
Loading
Loading