diff --git a/lambda-durable-esm-and-chaining/README.md b/lambda-durable-esm-and-chaining/README.md new file mode 100644 index 000000000..d3a45816c --- /dev/null +++ b/lambda-durable-esm-and-chaining/README.md @@ -0,0 +1,144 @@ +# Event-Driven Data Pipeline with Lambda Durable Functions + +This serverless pattern demonstrates how to build an event-driven data processing pipeline using AWS Lambda Durable Functions with **direct SQS Event Source Mapping** and Lambda invoke chaining. + +## How It Works + +This pattern demonstrates an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping. When a message arrives in the SQS queue, it directly triggers the durable function (no intermediary Lambda needed). The durable function then orchestrates a series of specialized processing steps using Lambda invoke chaining - first validating the incoming data, then transforming it (converting data_source to uppercase), and finally storing the processed results in DynamoDB. Throughout this process, the durable function automatically creates checkpoints, enabling fault-tolerant execution that can recover from failures without losing progress. The entire pipeline operates within the 15-minute ESM execution limit, making it ideal for reliable batch processing workflows. + +## Architecture Overview + +The pattern showcases two key Durable Functions capabilities: +1. **Direct Event Source Mapping**: SQS directly triggers the durable function (15-minute limit) +2. **Lambda Invoke Chaining**: Orchestrates specialized processing functions + +![Architecture Diagram](architecture-diagram.png) + +## Key Features + +- **Direct ESM Integration**: No intermediary function needed +- **15-minute execution constraint**: Demonstrates ESM time limits +- **Fault-tolerant processing**: Automatic checkpointing and recovery +- **Microservices coordination**: Chains specialized Lambda functions +- **Batch processing**: Handles multiple SQS records per invocation +- **Simple storage**: Uses DynamoDB for processed data + +## Important ESM Constraints + +⚠️ **15-Minute Execution Limit**: When using Event Source Mapping with Durable Functions, the total execution time cannot exceed 15 minutes. This includes: +- All processing steps +- Function invocations +- No long wait operations + +## Use Cases + +- ETL pipelines with validation and transformation +- Event-driven microservices orchestration +- Batch processing with fault tolerance +- Data processing workflows requiring checkpointing + +## Prerequisites + +- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) configured with appropriate permissions +- [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/install-sam-cli.html) latest version installed +- [Python 3.14](https://www.python.org/downloads/release/python-3140/) runtime installed + +## Deployment + +1. **Build the application**: + ```bash + sam build + ``` + +2. **Deploy to AWS**: + ```bash + sam deploy --guided + ``` + + Note the outputs after deployment: + - `DataProcessingQueueUrl`: Use this for `` + - `ProcessedDataTable`: Use this for `` + +3. **Test the pipeline**: + ```bash + # Send a test message to SQS + aws sqs send-message \ + --queue-url \ + --message-body '{"data_source": "test.csv", "processing_type": "standard"}' + ``` + +4. **Verify successful processing**: + ```bash + # Check if data was processed and stored in DynamoDB + aws dynamodb scan --table-name --query 'Items[*]' + ``` + + **Success indicators:** + - You should see at least one item in the DynamoDB table + - Original input data: `"data_source": "test.csv"` + - Transformed data: `"data_source": "TEST.CSV"` (uppercase transformation applied) + - Execution tracking with unique `execution_id` + - Timestamps showing when data was processed and stored + + This confirms the entire pipeline worked: SQS → Durable Function → Validation → Transformation → Storage → DynamoDB + +## Components + +### 1. Durable Pipeline Function (`src/durable_pipeline/`) +- **Direct SQS Event Source Mapping**: Receives SQS events directly +- **15-minute execution limit**: Must complete all processing within ESM constraints +- **Batch processing**: Handles multiple SQS records per invocation +- **Lambda invoke chaining**: Orchestrates validation, transformation, and storage +- **Automatic checkpointing**: Recovers from failures without losing progress + +### 2. Specialized Processing Functions +- **Validation Function**: Simple data validation checks +- **Transformation Function**: Basic data transformation +- **Storage Function**: Persists processed data to DynamoDB + +## Monitoring + +- CloudWatch Logs for execution tracking +- DynamoDB table for processed data +- SQS DLQ for failed messages + +## Configuration + +Key environment variables: +- `ENVIRONMENT`: Deployment environment (dev/prod) +- `PROCESSED_DATA_TABLE`: DynamoDB table for processed data +- `VALIDATION_FUNCTION_ARN`: ARN of validation function +- `TRANSFORMATION_FUNCTION_ARN`: ARN of transformation function +- `STORAGE_FUNCTION_ARN`: ARN of storage function + +## ESM-Specific Considerations + +- **Execution Timeout**: Set to 900 seconds (15 minutes) maximum +- **Batch Size**: Configured for optimal processing (5 records) +- **Error Handling**: Uses SQS DLQ for failed batches +- **Efficient Processing**: Optimized for speed to stay within time limits + +## Error Handling + +- Automatic retries with exponential backoff +- Dead Letter Queue for failed messages +- Partial batch failure support +- Checkpoint-based recovery + +## Cost Optimization + +- Pay only for active compute time +- Efficient batch processing +- Automatic scaling based on queue depth + +## Cleanup + +```bash +sam delete +``` + +## Learn More + +- [AWS Lambda Durable Functions Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) +- [Event Source Mappings with Durable Functions](https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking-esm.html) +- [Lambda Invoke Chaining](https://docs.aws.amazon.com/lambda/latest/dg/durable-examples.html#durable-examples-chained-invocations) diff --git a/lambda-durable-esm-and-chaining/architecture.png b/lambda-durable-esm-and-chaining/architecture.png new file mode 100644 index 000000000..3971b1d18 Binary files /dev/null and b/lambda-durable-esm-and-chaining/architecture.png differ diff --git a/lambda-durable-esm-and-chaining/example-pattern.json b/lambda-durable-esm-and-chaining/example-pattern.json new file mode 100644 index 000000000..23538ec51 --- /dev/null +++ b/lambda-durable-esm-and-chaining/example-pattern.json @@ -0,0 +1,67 @@ +{ + "title": "Event-Driven Data Pipeline with Lambda Durable Functions", + "description": "This serverless pattern demonstrates how to build an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping and Lambda invoke chaining.", + "language": "Python", + "level": "200", + "framework": "SAM", + "services": ["sqs","lambda", "dynamoDB"], + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates an event-driven data processing pipeline using AWS Lambda Durable Functions with direct SQS Event Source Mapping. When a message arrives in the SQS queue, it directly triggers the durable function (no intermediary Lambda needed). The durable function then orchestrates a series of specialized processing steps using Lambda invoke chaining - first validating the incoming data, then transforming it (converting data_source to uppercase), and finally storing the processed results in DynamoDB. Throughout this process, the durable function automatically creates checkpoints, enabling fault-tolerant execution that can recover from failures without losing progress. The entire pipeline operates within the 15-minute ESM execution limit, making it ideal for reliable batch processing workflows." + ] + }, + "testing": { + "headline": "Testing", + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "headline": "Cleanup", + "text": [ + "Delete the stack: sam delete." + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-esm-and-chaining", + "templateURL":"serverles-patterns/lambda-durable-esm-and-chaining", + "templateFile": "template.yaml", + "projectFolder": "lambda-durable-esm-and-chaining" + } + }, + "resources": { + "headline": "Additional resources", + "bullets": [ + { + "text": "AWS Lambda Durable Functions Documentation", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Event Source Mappings with Durable Functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-invoking-esm.html" + }, + { + "text": "Durbale Function Lambda Invoke Chaining", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-examples.html#durable-examples-chained-invocations" + } + ] + }, + "authors": [ + { + "name": "Sahithi Ginjupalli", + "image": "https://drive.google.com/file/d/1YcKYuGz3LfzSxiwb2lWJfpyi49SbvOSr/view?usp=sharing", + "bio": "Cloud Engineer at AWS with a passion for diving deep into cloud and AI services to build innovative serverless applications.", + "linkedin": "ginjupalli-sahithi-37460a18b", + "twitter": "" + } + ] + } + \ No newline at end of file diff --git a/lambda-durable-esm-and-chaining/src/durable_pipeline/handler.py b/lambda-durable-esm-and-chaining/src/durable_pipeline/handler.py new file mode 100644 index 000000000..ac529e02b --- /dev/null +++ b/lambda-durable-esm-and-chaining/src/durable_pipeline/handler.py @@ -0,0 +1,100 @@ +import json +import os +import boto3 +from datetime import datetime +from typing import Dict, Any, List +from aws_durable_execution_sdk_python import DurableContext, durable_execution + +# Initialize AWS clients +dynamodb = boto3.resource('dynamodb') +lambda_client = boto3.client('lambda') + +@durable_execution +def lambda_handler(event: Dict[str, Any], context: DurableContext) -> Dict[str, Any]: + """ + Main durable pipeline function that processes SQS events directly via ESM. + Demonstrates lambda invoke chaining with checkpointing and recovery. + Limited to 15 minutes total execution time due to ESM constraints. + """ + + # Extract configuration from environment + validation_function_arn = os.environ['VALIDATION_FUNCTION_ARN'] + transformation_function_arn = os.environ['TRANSFORMATION_FUNCTION_ARN'] + storage_function_arn = os.environ['STORAGE_FUNCTION_ARN'] + processed_data_table = os.environ['PROCESSED_DATA_TABLE'] + environment = os.environ.get('ENVIRONMENT', 'dev') + + print(f"Processing SQS batch with {len(event.get('Records', []))} records") + + # Process each SQS record in the batch + batch_results = [] + + for record in event.get('Records', []): + try: + # Extract data from SQS record + message_id = record['messageId'] + data = json.loads(record['body']) + execution_name = f"{environment}-esm-{message_id}" + + print(f"Processing record: {message_id}") + + # Step 1: Validate data by invoking validation function + validation_result = context.invoke( + validation_function_arn, + {'data': data, 'execution_id': execution_name}, + name=f'validate-data-{message_id}' + ) + + if not validation_result.get('is_valid', False): + batch_results.append({ + 'message_id': message_id, + 'status': 'failed', + 'reason': 'validation_failed' + }) + continue + + # Step 2: Transform data by invoking transformation function + transformation_result = context.invoke( + transformation_function_arn, + {'data': data, 'execution_id': execution_name}, + name=f'transform-data-{message_id}' + ) + + # Step 3: Store processed data by invoking storage function + storage_result = context.invoke( + storage_function_arn, + { + 'transformed_data': transformation_result, + 'execution_id': execution_name, + 'original_data': data + }, + name=f'store-data-{message_id}' + ) + + batch_results.append({ + 'message_id': message_id, + 'status': 'completed', + 'execution_id': execution_name + }) + + except Exception as e: + print(f"Error processing record {record.get('messageId', 'unknown')}: {str(e)}") + batch_results.append({ + 'message_id': record.get('messageId', 'unknown'), + 'status': 'error', + 'error': str(e) + }) + + # Return batch processing summary + successful_records = len([r for r in batch_results if r['status'] == 'completed']) + failed_records = len([r for r in batch_results if r['status'] in ['failed', 'error']]) + + return { + 'batch_summary': { + 'total_records': len(batch_results), + 'successful_records': successful_records, + 'failed_records': failed_records + }, + 'record_results': batch_results, + 'processed_at': datetime.utcnow().isoformat() + } diff --git a/lambda-durable-esm-and-chaining/src/durable_pipeline/requirements.txt b/lambda-durable-esm-and-chaining/src/durable_pipeline/requirements.txt new file mode 100644 index 000000000..85d9afabd --- /dev/null +++ b/lambda-durable-esm-and-chaining/src/durable_pipeline/requirements.txt @@ -0,0 +1 @@ +aws-durable-execution-sdk-python diff --git a/lambda-durable-esm-and-chaining/src/storage/handler.py b/lambda-durable-esm-and-chaining/src/storage/handler.py new file mode 100644 index 000000000..e901e3c2c --- /dev/null +++ b/lambda-durable-esm-and-chaining/src/storage/handler.py @@ -0,0 +1,47 @@ +import boto3 +import os +from typing import Dict, Any +from datetime import datetime + +dynamodb = boto3.resource('dynamodb') + +def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]: + """Simple data storage function that saves to DynamoDB""" + + transformed_data = event['transformed_data'] + execution_id = event['execution_id'] + original_data = event['original_data'] + + table_name = os.environ['PROCESSED_DATA_TABLE'] + + print(f"Storing processed data for execution: {execution_id}") + + try: + table = dynamodb.Table(table_name) + + # Store processed data in DynamoDB + item = { + 'execution_id': execution_id, + 'original_data': original_data, + 'transformed_data': transformed_data, + 'stored_at': datetime.utcnow().isoformat(), + 'data_source': original_data.get('data_source', 'unknown'), + 'processing_type': original_data.get('processing_type', 'standard') + } + + table.put_item(Item=item) + + return { + 'success': True, + 'execution_id': execution_id, + 'table_name': table_name, + 'stored_at': datetime.utcnow().isoformat() + } + + except Exception as e: + print(f"Error storing data: {str(e)}") + return { + 'success': False, + 'execution_id': execution_id, + 'error': str(e) + } diff --git a/lambda-durable-esm-and-chaining/src/transformation/handler.py b/lambda-durable-esm-and-chaining/src/transformation/handler.py new file mode 100644 index 000000000..2a9d018f3 --- /dev/null +++ b/lambda-durable-esm-and-chaining/src/transformation/handler.py @@ -0,0 +1,22 @@ +from typing import Dict, Any +from datetime import datetime + +def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]: + """Simple data transformation function""" + + data = event['data'] + execution_id = event['execution_id'] + + print(f"Transforming data for execution: {execution_id}") + + # Simple transformation: add processing metadata and uppercase data_source + transformed_data = { + 'original_data': data, + 'data_source': data.get('data_source', '').upper(), + 'processing_type': data.get('processing_type', 'standard'), + 'processed_at': datetime.utcnow().isoformat(), + 'execution_id': execution_id, + 'transformation_applied': 'uppercase_data_source' + } + + return transformed_data diff --git a/lambda-durable-esm-and-chaining/src/validation/handler.py b/lambda-durable-esm-and-chaining/src/validation/handler.py new file mode 100644 index 000000000..3b97786d5 --- /dev/null +++ b/lambda-durable-esm-and-chaining/src/validation/handler.py @@ -0,0 +1,22 @@ +from typing import Dict, Any + +def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]: + """Simple data validation function""" + + data = event['data'] + execution_id = event['execution_id'] + + print(f"Validating data for execution: {execution_id}") + + # Simple validation: check if required fields exist + required_fields = ['data_source', 'processing_type'] + missing_fields = [field for field in required_fields if not data.get(field)] + + is_valid = len(missing_fields) == 0 + + return { + 'is_valid': is_valid, + 'execution_id': execution_id, + 'missing_fields': missing_fields, + 'data_source': data.get('data_source', 'unknown') + } diff --git a/lambda-durable-esm-and-chaining/template.yaml b/lambda-durable-esm-and-chaining/template.yaml new file mode 100644 index 000000000..ca11770de --- /dev/null +++ b/lambda-durable-esm-and-chaining/template.yaml @@ -0,0 +1,148 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: 'Event-driven data pipeline with Lambda Durable Functions, SQS Event Source Mapping, and invoke chaining' + +Globals: + Function: + Timeout: 900 + MemorySize: 512 + Runtime: python3.14 + +Parameters: + Environment: + Type: String + Default: dev + Description: Environment name + +Resources: + # SQS Queue for incoming data processing requests + DataProcessingQueue: + Type: AWS::SQS::Queue + Properties: + QueueName: !Sub '${Environment}-data-processing-queue' + VisibilityTimeout: 960 + MessageRetentionPeriod: 1209600 + RedrivePolicy: + deadLetterTargetArn: !GetAtt DataProcessingDLQ.Arn + maxReceiveCount: 3 + + # Dead Letter Queue + DataProcessingDLQ: + Type: AWS::SQS::Queue + Properties: + QueueName: !Sub '${Environment}-data-processing-dlq' + + # DynamoDB table for storing processed data + ProcessedDataTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub '${Environment}-processed-data' + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: execution_id + AttributeType: S + KeySchema: + - AttributeName: execution_id + KeyType: HASH + + # Main Durable Function (orchestrates the pipeline with direct ESM) + DurableFunction: + Type: AWS::Serverless::Function + Metadata: + BuildMethod: python3.14 + Properties: + FunctionName: !Sub '${Environment}-durable-data-pipeline' + Runtime: python3.14 + Handler: handler.lambda_handler + CodeUri: src/durable_pipeline/ + Timeout: 900 + DurableConfig: + ExecutionTimeout: 10 + RetentionPeriodInDays: 1 + Policies: + Statement: + - Effect: Allow + Action: + - lambda:CheckpointDurableExecutions + - lambda:GetDurableExecutionState + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${Environment}-durable-data-pipeline' + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: + - !GetAtt ValidationFunction.Arn + - !GetAtt TransformationFunction.Arn + - !GetAtt StorageFunction.Arn + - Effect: Allow + Action: + - dynamodb:GetItem + - dynamodb:PutItem + - dynamodb:UpdateItem + - dynamodb:DeleteItem + - dynamodb:Query + - dynamodb:Scan + Resource: !GetAtt ProcessedDataTable.Arn + AutoPublishAlias: prod + Events: + SQSEvent: + Type: SQS + Properties: + Queue: !GetAtt DataProcessingQueue.Arn + BatchSize: 5 + MaximumBatchingWindowInSeconds: 10 + Environment: + Variables: + VALIDATION_FUNCTION_ARN: !GetAtt ValidationFunction.Arn + TRANSFORMATION_FUNCTION_ARN: !GetAtt TransformationFunction.Arn + STORAGE_FUNCTION_ARN: !GetAtt StorageFunction.Arn + PROCESSED_DATA_TABLE: !Ref ProcessedDataTable + ENVIRONMENT: !Ref Environment + + # Data Validation Function + ValidationFunction: + Type: AWS::Serverless::Function + Metadata: + BuildMethod: python3.14 + Properties: + FunctionName: !Sub '${Environment}-data-validator' + CodeUri: src/validation/ + Handler: handler.lambda_handler + + # Data Transformation Function + TransformationFunction: + Type: AWS::Serverless::Function + Metadata: + BuildMethod: python3.14 + Properties: + FunctionName: !Sub '${Environment}-data-transformer' + CodeUri: src/transformation/ + Handler: handler.lambda_handler + + # Data Storage Function + StorageFunction: + Type: AWS::Serverless::Function + Metadata: + BuildMethod: python3.14 + Properties: + FunctionName: !Sub '${Environment}-data-storage' + CodeUri: src/storage/ + Handler: handler.lambda_handler + Environment: + Variables: + PROCESSED_DATA_TABLE: !Ref ProcessedDataTable + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref ProcessedDataTable + +Outputs: + DataProcessingQueueUrl: + Description: 'SQS Queue URL for data processing requests' + Value: !Ref DataProcessingQueue + + ProcessedDataTable: + Description: 'DynamoDB Table for processed data' + Value: !Ref ProcessedDataTable + + DurableFunctionArn: + Description: 'Durable Function ARN' + Value: !GetAtt DurableFunction.Arn