Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions lambda-durable-esm-and-chaining/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Event-Driven Data Pipeline with Lambda Durable Functions

This serverless pattern demonstrates how to build an event-driven data processing pipeline using AWS Lambda Durable Functions with **direct SQS Event Source Mapping** and Lambda invoke chaining.

## How It Works

This pattern demonstrates an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping. When a message arrives in the SQS queue, it directly triggers the durable function (no intermediary Lambda needed). The durable function then orchestrates a series of specialized processing steps using Lambda invoke chaining - first validating the incoming data, then transforming it (converting data_source to uppercase), and finally storing the processed results in DynamoDB. Throughout this process, the durable function automatically creates checkpoints, enabling fault-tolerant execution that can recover from failures without losing progress. The entire pipeline operates within the 15-minute ESM execution limit, making it ideal for reliable batch processing workflows.

## Architecture Overview

The pattern showcases two key Durable Functions capabilities:
1. **Direct Event Source Mapping**: SQS directly triggers the durable function (15-minute limit)
2. **Lambda Invoke Chaining**: Orchestrates specialized processing functions

![Architecture Diagram](architecture-diagram.png)

## Key Features

- **Direct ESM Integration**: No intermediary function needed
- **15-minute execution constraint**: Demonstrates ESM time limits
- **Fault-tolerant processing**: Automatic checkpointing and recovery
- **Microservices coordination**: Chains specialized Lambda functions
- **Batch processing**: Handles multiple SQS records per invocation
- **Simple storage**: Uses DynamoDB for processed data

## Important ESM Constraints

⚠️ **15-Minute Execution Limit**: When using Event Source Mapping with Durable Functions, the total execution time cannot exceed 15 minutes. This includes:
- All processing steps
- Function invocations
- No long wait operations

## Use Cases

- ETL pipelines with validation and transformation
- Event-driven microservices orchestration
- Batch processing with fault tolerance
- Data processing workflows requiring checkpointing

## Prerequisites

- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) configured with appropriate permissions
- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html) latest version installed
- [Python 3.14](https://www.python.org/downloads/release/python-3140/) runtime installed

## Deployment

1. **Build the application**:
```bash
sam build
```

2. **Deploy to AWS**:
```bash
sam deploy --guided
```

Note the outputs after deployment:
- `DataProcessingQueueUrl`: Use this for `<QUEUE_URL>`
- `ProcessedDataTable`: Use this for `<PROCESSED_DATA_TABLE>`

3. **Test the pipeline**:
```bash
# Send a test message to SQS
aws sqs send-message \
--queue-url <QUEUE_URL> \
--message-body '{"data_source": "test.csv", "processing_type": "standard"}'
```

4. **Verify successful processing**:
```bash
# Check if data was processed and stored in DynamoDB
aws dynamodb scan --table-name <PROCESSED_DATA_TABLE> --query 'Items[*]'
```

**Success indicators:**
- You should see at least one item in the DynamoDB table
- Original input data: `"data_source": "test.csv"`
- Transformed data: `"data_source": "TEST.CSV"` (uppercase transformation applied)
- Execution tracking with unique `execution_id`
- Timestamps showing when data was processed and stored

This confirms the entire pipeline worked: SQS → Durable Function → Validation → Transformation → Storage → DynamoDB

## Components

### 1. Durable Pipeline Function (`src/durable_pipeline/`)
- **Direct SQS Event Source Mapping**: Receives SQS events directly
- **15-minute execution limit**: Must complete all processing within ESM constraints
- **Batch processing**: Handles multiple SQS records per invocation
- **Lambda invoke chaining**: Orchestrates validation, transformation, and storage
- **Automatic checkpointing**: Recovers from failures without losing progress

### 2. Specialized Processing Functions
- **Validation Function**: Simple data validation checks
- **Transformation Function**: Basic data transformation
- **Storage Function**: Persists processed data to DynamoDB

## Monitoring

- CloudWatch Logs for execution tracking
- DynamoDB table for processed data
- SQS DLQ for failed messages

## Configuration

Key environment variables:
- `ENVIRONMENT`: Deployment environment (dev/prod)
- `PROCESSED_DATA_TABLE`: DynamoDB table for processed data
- `VALIDATION_FUNCTION_ARN`: ARN of validation function
- `TRANSFORMATION_FUNCTION_ARN`: ARN of transformation function
- `STORAGE_FUNCTION_ARN`: ARN of storage function

## ESM-Specific Considerations

- **Execution Timeout**: Set to 900 seconds (15 minutes) maximum
- **Batch Size**: Configured for optimal processing (5 records)
- **Error Handling**: Uses SQS DLQ for failed batches
- **Efficient Processing**: Optimized for speed to stay within time limits

## Error Handling

- Automatic retries with exponential backoff
- Dead Letter Queue for failed messages
- Partial batch failure support
- Checkpoint-based recovery

## Cost Optimization

- Pay only for active compute time
- Efficient batch processing
- Automatic scaling based on queue depth

## Cleanup

```bash
sam delete
```

## Learn More

- [AWS Lambda Durable Functions Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html)
- [Event Source Mappings with Durable Functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking-esm.html)
- [Lambda Invoke Chaining](https://docs.aws.amazon.com/lambda/latest/dg/durable-examples.html#durable-examples-chained-invocations)
Binary file added lambda-durable-esm-and-chaining/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
67 changes: 67 additions & 0 deletions lambda-durable-esm-and-chaining/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"title": "Event-Driven Data Pipeline with Lambda Durable Functions",
"description": "This serverless pattern demonstrates how to build an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping and Lambda invoke chaining.",
"language": "Python",
"level": "200",
"framework": "SAM",
"services": ["sqs","lambda", "dynamoDB"],
"introBox": {
"headline": "How it works",
"text": [
"This pattern demonstrates an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping. When a message arrives in the SQS queue, it directly triggers the durable function (no intermediary Lambda needed). The durable function then orchestrates a series of specialized processing steps using Lambda invoke chaining - first validating the incoming data, then transforming it (converting data_source to uppercase), and finally storing the processed results in DynamoDB. Throughout this process, the durable function automatically creates checkpoints, enabling fault-tolerant execution that can recover from failures without losing progress. The entire pipeline operates within the 15-minute ESM execution limit, making it ideal for reliable batch processing workflows."
]
},
"testing": {
"headline": "Testing",
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"headline": "Cleanup",
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"deploy": {
"text": [
"sam build",
"sam deploy --guided"
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-esm-and-chaining",
"templateURL":"serverles-patterns/lambda-durable-esm-and-chaining",
"templateFile": "template.yaml",
"projectFolder": "lambda-durable-esm-and-chaining"
}
},
"resources": {
"headline": "Additional resources",
"bullets": [
{
"text": "AWS Lambda Durable Functions Documentation",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html"
},
{
"text": "Event Source Mappings with Durable Functions",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking-esm.html"
},
{
"text": "Durbale Function Lambda Invoke Chaining",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-examples.html#durable-examples-chained-invocations"
}
]
},
"authors": [
{
"name": "Sahithi Ginjupalli",
"image": "https://drive.google.com/file/d/1YcKYuGz3LfzSxiwb2lWJfpyi49SbvOSr/view?usp=sharing",
"bio": "Cloud Engineer at AWS with a passion for diving deep into cloud and AI services to build innovative serverless applications.",
"linkedin": "ginjupalli-sahithi-37460a18b",
"twitter": ""
}
]
}

100 changes: 100 additions & 0 deletions lambda-durable-esm-and-chaining/src/durable_pipeline/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import json
import os
import boto3
from datetime import datetime
from typing import Dict, Any, List
from aws_durable_execution_sdk_python import DurableContext, durable_execution

# Initialize AWS clients
dynamodb = boto3.resource('dynamodb')
lambda_client = boto3.client('lambda')

@durable_execution
def lambda_handler(event: Dict[str, Any], context: DurableContext) -> Dict[str, Any]:
"""
Main durable pipeline function that processes SQS events directly via ESM.
Demonstrates lambda invoke chaining with checkpointing and recovery.
Limited to 15 minutes total execution time due to ESM constraints.
"""

# Extract configuration from environment
validation_function_arn = os.environ['VALIDATION_FUNCTION_ARN']
transformation_function_arn = os.environ['TRANSFORMATION_FUNCTION_ARN']
storage_function_arn = os.environ['STORAGE_FUNCTION_ARN']
processed_data_table = os.environ['PROCESSED_DATA_TABLE']
environment = os.environ.get('ENVIRONMENT', 'dev')

print(f"Processing SQS batch with {len(event.get('Records', []))} records")

# Process each SQS record in the batch
batch_results = []

for record in event.get('Records', []):
try:
# Extract data from SQS record
message_id = record['messageId']
data = json.loads(record['body'])
execution_name = f"{environment}-esm-{message_id}"

print(f"Processing record: {message_id}")

# Step 1: Validate data by invoking validation function
validation_result = context.invoke(
validation_function_arn,
{'data': data, 'execution_id': execution_name},
name=f'validate-data-{message_id}'
)

if not validation_result.get('is_valid', False):
batch_results.append({
'message_id': message_id,
'status': 'failed',
'reason': 'validation_failed'
})
continue

# Step 2: Transform data by invoking transformation function
transformation_result = context.invoke(
transformation_function_arn,
{'data': data, 'execution_id': execution_name},
name=f'transform-data-{message_id}'
)

# Step 3: Store processed data by invoking storage function
storage_result = context.invoke(
storage_function_arn,
{
'transformed_data': transformation_result,
'execution_id': execution_name,
'original_data': data
},
name=f'store-data-{message_id}'
)

batch_results.append({
'message_id': message_id,
'status': 'completed',
'execution_id': execution_name
})

except Exception as e:
print(f"Error processing record {record.get('messageId', 'unknown')}: {str(e)}")
batch_results.append({
'message_id': record.get('messageId', 'unknown'),
'status': 'error',
'error': str(e)
})

# Return batch processing summary
successful_records = len([r for r in batch_results if r['status'] == 'completed'])
failed_records = len([r for r in batch_results if r['status'] in ['failed', 'error']])

return {
'batch_summary': {
'total_records': len(batch_results),
'successful_records': successful_records,
'failed_records': failed_records
},
'record_results': batch_results,
'processed_at': datetime.utcnow().isoformat()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
aws-durable-execution-sdk-python
47 changes: 47 additions & 0 deletions lambda-durable-esm-and-chaining/src/storage/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import boto3
import os
from typing import Dict, Any
from datetime import datetime

dynamodb = boto3.resource('dynamodb')

def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Simple data storage function that saves to DynamoDB"""

transformed_data = event['transformed_data']
execution_id = event['execution_id']
original_data = event['original_data']

table_name = os.environ['PROCESSED_DATA_TABLE']

print(f"Storing processed data for execution: {execution_id}")

try:
table = dynamodb.Table(table_name)

# Store processed data in DynamoDB
item = {
'execution_id': execution_id,
'original_data': original_data,
'transformed_data': transformed_data,
'stored_at': datetime.utcnow().isoformat(),
'data_source': original_data.get('data_source', 'unknown'),
'processing_type': original_data.get('processing_type', 'standard')
}

table.put_item(Item=item)

return {
'success': True,
'execution_id': execution_id,
'table_name': table_name,
'stored_at': datetime.utcnow().isoformat()
}

except Exception as e:
print(f"Error storing data: {str(e)}")
return {
'success': False,
'execution_id': execution_id,
'error': str(e)
}
22 changes: 22 additions & 0 deletions lambda-durable-esm-and-chaining/src/transformation/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Dict, Any
from datetime import datetime

def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""Simple data transformation function"""

data = event['data']
execution_id = event['execution_id']

print(f"Transforming data for execution: {execution_id}")

# Simple transformation: add processing metadata and uppercase data_source
transformed_data = {
'original_data': data,
'data_source': data.get('data_source', '').upper(),
'processing_type': data.get('processing_type', 'standard'),
'processed_at': datetime.utcnow().isoformat(),
'execution_id': execution_id,
'transformation_applied': 'uppercase_data_source'
}

return transformed_data
Loading