diff --git a/lambda-durable-hitl-python-sam/.gitignore b/lambda-durable-hitl-python-sam/.gitignore new file mode 100644 index 000000000..d1021e88d --- /dev/null +++ b/lambda-durable-hitl-python-sam/.gitignore @@ -0,0 +1,57 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +ENV/ +env/ +.venv + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.hypothesis/ +*.cover +.tox/ + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# AWS SAM +.aws-sam/ +samconfig.toml + +# Logs +*.log + +# OS +.DS_Store +Thumbs.db + +# Environment variables +.env +.env.local diff --git a/lambda-durable-hitl-python-sam/README.md b/lambda-durable-hitl-python-sam/README.md new file mode 100644 index 000000000..17efa9e46 --- /dev/null +++ b/lambda-durable-hitl-python-sam/README.md @@ -0,0 +1,504 @@ +# Lambda Durable Functions with Human-in-the-Loop (HITL) Pattern + +This pattern demonstrates how to implement Lambda durable functions with Human-in-the-Loop capabilities using Python 3.13 and the AWS Durable Execution SDK. The pattern showcases how Lambda functions can pause execution, wait for human approval, and resume based on human decisions while maintaining state across the pause/resume cycle. + +## Overview + +This serverless pattern enables workflows that require human decision-making by pausing Lambda execution, notifying approvers, and resuming based on their decisions. The pattern uses Lambda durable functions to maintain execution state across pauses and resumes, making it ideal for approval workflows, content moderation, and any process requiring human judgment. + +**Key Features:** +- Durable execution with checkpointed steps (no re-execution on replay) +- Polling-based approval checking with no compute charges during waits +- Automatic timeout handling for overdue approvals +- Complete audit trail in DynamoDB +- SNS notifications for approvers +- CLI tool for testing and managing approvals + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Initiator Layer │ +├─────────────────────────────────────────────────────────────────┤ +│ ┌──────────────┐ ┌──────────────────────────┐ │ +│ │ CLI Tool │ │ Test Invocation │ │ +│ │ │ │ (Manual/Automated) │ │ +│ └──────┬───────┘ └────────┬─────────────────┘ │ +│ │ │ │ +└─────────┼───────────────────────────────┼───────────────────────┘ + │ │ + │ (6) List/Submit │ (1) Invoke + │ Decision │ Workflow + │ │ +┌─────────▼───────────────────────────────▼───────────────────────┐ +│ AWS Lambda Layer │ +├──────────────────────────────────────────────────────────────────┤ +│ ┌────────────────────────────────┐ ┌──────────────────────────┐ │ +│ │ Workflow Lambda │ │ Approval API Lambda │ │ +│ │ (Durable Execution) │ │ │ │ +│ │ │ │ - Get approval request │ │ +│ │ - Create approval request │ │ - Validate status │ │ +│ │ - Pause with callback │ │ - Invoke callback │ │ +│ │ - Send notification │ │ - Update status │ │ +│ │ - Resume on callback │ │ │ │ +│ └────┬───────────────────┬───────┘ └──────────┬───────────────┘ │ +│ │ │ │ │ +└───────┼───────────────────┼──────────────────────┼───────────────────┘ + │ │ │ + │ (2) Store │ (4) Notify │ (8,10) Get/Update + │ Request │ │ Status + │ │ │ +┌───────▼───────────────────┼──────────────────────▼───────────────────┐ +│ Storage & Notification Layer │ +├───────────────────────────────────────────────────────────────────────┤ +│ ┌─────────────────────┐ ┌──────────────────────────┐ │ +│ │ DynamoDB Table │ │ SNS Topic │ │ +│ │ ApprovalRequests │ │ ApprovalNotifications │ │ +│ │ │ │ │ │ +│ │ - approval_id (PK) │ │ - Sends notifications │ │ +│ │ - callback_token │ │ to approvers │ │ +│ │ - document details │ │ │ │ +│ │ - status │ └────────┬─────────────────┘ │ +│ │ - timestamps │ │ │ +│ └─────────────────────┘ │ (5) Notification │ +│ │ │ +└────────────────────────────────────────────┼─────────────────────────┘ + │ + ▼ + ┌────────────────┐ + │ Approver │ + │ (Human) │ + └────────────────┘ +``` + +### Components + +1. **Workflow Lambda** (`src/workflow/`): Orchestrates the approval workflow using Lambda durable functions SDK. It creates approval requests, polls DynamoDB for decisions using durable waits (no compute charges), sends notifications, and completes when a decision is made. Uses `@durable_execution` and `@durable_step` decorators for checkpointing. + +2. **Approval API Lambda** (`src/approval_api/`): Processes approval/rejection decisions. It validates requests, updates the approval status in DynamoDB, allowing the workflow to detect the decision and resume. + +3. **Shared Module** (`src/shared/`): Contains common code used by both Lambda functions: + - `models.py`: Data models (WorkflowEvent, ApprovalRequest, WorkflowResult, Decision enum) + - `dynamodb_operations.py`: DynamoDB operations (create/update approval requests) + +4. **DynamoDB Table**: Stores approval request state including document details, status, and timestamps. Uses a Global Secondary Index (StatusIndex) for querying pending approvals. + +5. **SNS Topic**: Sends notifications to approvers when new approval requests are created, including approval details and expiration time. + +### Execution Flow + +1. Test invocation triggers Workflow Lambda with document details (asynchronous invocation required for ExecutionTimeout > 15 minutes) +2. Workflow Lambda creates approval request in DynamoDB (checkpointed step) +3. Workflow Lambda sends SNS notification to approvers (checkpointed step) +4. Workflow Lambda polls DynamoDB every 5 seconds using durable waits (no compute charges during waits) +5. SNS delivers notification to approver +6. Approver invokes Approval API Lambda to submit decision +7. Approval API Lambda updates status in DynamoDB +8. Workflow Lambda detects decision change during next poll +9. Workflow Lambda completes and returns result + +**Important**: Durable functions with ExecutionTimeout > 900 seconds (15 minutes) must be invoked asynchronously using `--invocation-type Event`. + +## Project Structure + +``` +lambda-durable-hitl-python-sam/ +├── src/ +│ ├── approval_api/ # Approval API Lambda function +│ │ ├── app.py # Processes approval decisions +│ │ ├── Dockerfile # Container image definition +│ │ └── requirements.txt # Function dependencies +│ ├── shared/ # Shared code between Lambda functions +│ │ ├── __init__.py +│ │ ├── models.py # Data models (WorkflowEvent, ApprovalRequest, etc.) +│ │ └── dynamodb_operations.py # DynamoDB helper functions +│ └── workflow/ # Workflow Lambda function (durable execution) +│ ├── app.py # Main workflow orchestrator +│ ├── Dockerfile # Container image definition +│ └── requirements.txt # Function dependencies (includes aws-durable-execution-sdk-python) +├── template.yaml # SAM template (infrastructure as code) +├── example-pattern.json # Pattern metadata for serverless-patterns repository +├── .gitignore # Git ignore patterns +└── README.md # This file +``` + +### Folder Purposes + +- **src/approval_api/**: Lambda function that processes approval/rejection decisions. Validates requests, updates DynamoDB status, allowing the workflow to detect decisions and complete. + +- **src/shared/**: Common code shared between both Lambda functions. Contains data models (WorkflowEvent, ApprovalRequest, WorkflowResult, Decision enum) and DynamoDB operations (create/update approval requests). Prevents code duplication. + +- **src/workflow/**: Main durable execution Lambda function that orchestrates the approval workflow. Uses AWS Durable Execution SDK with `@durable_execution` and `@durable_step` decorators for checkpointing. Polls DynamoDB for decisions using durable waits. + +### Key Files + +- **template.yaml**: Defines all AWS resources (Lambda functions, DynamoDB table, SNS topic, IAM roles) +- **src/workflow/app.py**: Main durable execution logic with `@durable_execution` decorator +- **src/approval_api/app.py**: Handles approval/rejection decisions +- **src/shared/**: Common code shared between Lambda functions + +## Prerequisites + +- AWS CLI configured with appropriate credentials +- AWS SAM CLI installed (version 1.100.0 or later) +- Python 3.13 or later +- Docker (for building Lambda container images) +- An AWS account with permissions to create Lambda functions, DynamoDB tables, SNS topics, and IAM roles + +## Deployment + +### Step 1: Clone and Navigate + +```bash +cd lambda-durable-hitl-python-sam +``` + +### Step 2: Build the Application + +```bash +sam build +``` + +This command builds the Lambda container images and prepares the application for deployment. + +### Step 3: Deploy to AWS + +For first-time deployment: + +```bash +sam deploy --guided +``` + +You'll be prompted for: +- Stack name (e.g., `lambda-durable-hitl-stack`) +- AWS Region (e.g., `us-east-1`) +- Confirmation for creating IAM roles +- Confirmation for deploying without authorization + +For subsequent deployments: + +```bash +sam deploy +``` + +### Step 4: Note the Outputs + +After deployment, SAM will output important values: +- `WorkflowFunctionName`: Name of the Workflow Lambda function +- `ApprovalApiFunctionName`: Name of the Approval API Lambda function +- `ApprovalsTableName`: Name of the DynamoDB table +- `SnsTopicArn`: ARN of the SNS topic + +Save these values for testing. + +## Important Notes + +### Durable Function Invocation + +- **Asynchronous invocation required**: Durable functions with `ExecutionTimeout > 900 seconds` (15 minutes) MUST be invoked asynchronously using `--invocation-type Event` +- **Version requirement**: You must publish a Lambda version before invoking a durable function (cannot use `$LATEST` or unqualified ARN) +- **No synchronous response**: Asynchronous invocation returns immediately with a 202 status; check DynamoDB or CloudWatch Logs for results + +### Polling vs Callback Pattern + +This implementation uses a **polling pattern** where the workflow checks DynamoDB every 5 seconds for approval decisions: +- **Advantages**: Simpler implementation, no callback token management, easier to test +- **Durable waits**: Uses `context.wait(Duration.from_seconds(5))` which incurs no compute charges during waits +- **Checkpointing**: Each poll is a checkpointed step, so on replay it won't re-execute completed checks + +### Configuration + +The SAM template configures the following environment variables: + +**Workflow Lambda:** +- `APPROVALS_TABLE_NAME`: DynamoDB table name for approval requests +- `SNS_TOPIC_ARN`: SNS topic ARN for notifications +- `APPROVAL_TIMEOUT_SECONDS`: Default timeout in seconds (default: 300 for testing, can be increased to 3600 for production) + +**Approval API Lambda:** +- `APPROVALS_TABLE_NAME`: DynamoDB table name for approval requests + +### Customizing Timeout + +To change the default approval timeout, update the `ApprovalTimeoutSeconds` parameter in `template.yaml`: + +```yaml +Parameters: + ApprovalTimeoutSeconds: + Type: Number + Default: 3600 # 1 hour for production + Description: Default timeout for approval requests in seconds + MinValue: 60 + MaxValue: 86400 +``` + +Or pass it during deployment: + +```bash +sam deploy --parameter-overrides ApprovalTimeoutSeconds=7200 +``` + +## Testing + +### Testing with AWS CLI + +All testing can be done using standard AWS CLI commands. No additional tools required. + +#### Step 1: Set Environment Variables + +```bash +export AWS_DEFAULT_REGION=us-east-1 +export STACK_NAME=app-sam # Your CloudFormation stack name + +# Get function names from CloudFormation outputs +export WORKFLOW_FUNCTION=$(aws cloudformation describe-stacks \ + --stack-name $STACK_NAME \ + --query 'Stacks[0].Outputs[?OutputKey==`WorkflowFunctionName`].OutputValue' \ + --output text) + +export APPROVAL_API_FUNCTION=$(aws cloudformation describe-stacks \ + --stack-name $STACK_NAME \ + --query 'Stacks[0].Outputs[?OutputKey==`ApprovalApiFunctionName`].OutputValue' \ + --output text) +``` + +#### Step 2: Invoke the Workflow + +```bash +# Publish a Lambda version (required for durable functions) +export WORKFLOW_VERSION=$(aws lambda publish-version \ + --function-name $WORKFLOW_FUNCTION \ + --query 'Version' \ + --output text) + +# Invoke workflow asynchronously (required for ExecutionTimeout > 15 minutes) +aws lambda invoke \ + --function-name $WORKFLOW_FUNCTION:$WORKFLOW_VERSION \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"document_id":"doc-123","document_name":"Q4 Budget Proposal","requester":"user@example.com"}' \ + response.json + +# Check response (should show StatusCode: 202) +cat response.json +``` + +#### Step 3: List Pending Approvals + +```bash +# Scan DynamoDB for pending approval requests +aws dynamodb scan \ + --table-name $STACK_NAME-ApprovalRequests \ + --filter-expression "#status = :pending" \ + --expression-attribute-names '{"#status":"status"}' \ + --expression-attribute-values '{":pending":{"S":"pending"}}' \ + --max-items 10 +``` + +Or get all items: + +```bash +aws dynamodb scan --table-name $STACK_NAME-ApprovalRequests --max-items 10 +``` + +#### Step 4: Get Approval Details + +```bash +# Get specific approval request by ID +aws dynamodb get-item \ + --table-name $STACK_NAME-ApprovalRequests \ + --key '{"approval_id":{"S":""}}' +``` + +#### Step 5: Submit Approval Decision + +**Approve a request:** + +```bash +aws lambda invoke \ + --function-name $APPROVAL_API_FUNCTION \ + --cli-binary-format raw-in-base64-out \ + --payload '{"action":"decide","approval_id":"","decision":"approved","approver":"test-approver","comments":"Looks good, approved"}' \ + approval_response.json + +# Check response +cat approval_response.json +``` + +**Reject a request:** + +```bash +aws lambda invoke \ + --function-name $APPROVAL_API_FUNCTION \ + --cli-binary-format raw-in-base64-out \ + --payload '{"action":"decide","approval_id":"","decision":"rejected","approver":"test-approver","comments":"Needs revision"}' \ + approval_response.json + +# Check response +cat approval_response.json +``` + +#### Step 6: Verify Workflow Completion + +```bash +# Check DynamoDB to verify status changed +aws dynamodb get-item \ + --table-name $STACK_NAME-ApprovalRequests \ + --key '{"approval_id":{"S":""}}' + +# Check CloudWatch Logs for workflow completion +aws logs tail /aws/lambda/$WORKFLOW_FUNCTION --follow +``` + +### Testing Timeout Scenarios + +To test timeout handling, invoke the workflow with a short timeout: + +```bash +# Invoke workflow with 60-second timeout +aws lambda invoke \ + --function-name $WORKFLOW_FUNCTION:$WORKFLOW_VERSION \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"document_id":"doc-456","document_name":"Test Doc","requester":"user@example.com","timeout_seconds":60}' \ + response.json +``` + +Wait for the timeout to expire (60 seconds), then check the approval status: + +```bash +# Get approval_id from DynamoDB +aws dynamodb scan --table-name $STACK_NAME-ApprovalRequests --max-items 5 + +# Check status - should show "timeout" +python approval_cli.py get +``` + +The status should show "timeout" and the workflow will have completed automatically. + +### Running Unit Tests + +```bash +# Install test dependencies +pip install -r tests/requirements.txt + +# Run all tests +pytest tests/unit/ + +# Run with coverage +pytest tests/unit/ --cov=src --cov-report=html +``` + +### Running Property-Based Tests + +```bash +# Run property tests +pytest tests/property/ + +# Run specific property test +pytest tests/property/test_approval_properties.py::test_unique_approval_identifiers +``` + +## Use Cases + +### Document Approval Workflow + +The primary use case demonstrated in this pattern is document approval. When a document is submitted for review: + +1. The workflow pauses and creates an approval request +2. Approvers receive a notification via SNS +3. Approvers review the document and submit their decision via CLI +4. The workflow resumes with the approval decision +5. The system records the decision, comments, and timestamp + +This pattern is ideal for: +- Contract approvals +- Policy document reviews +- Technical design document approvals +- Budget proposal reviews + +### Expense Approval System + +This pattern can be adapted for expense approval workflows: + +1. Employee submits an expense report +2. Workflow pauses and notifies the manager +3. Manager reviews and approves/rejects via CLI or web interface +4. Workflow resumes and processes the expense accordingly +5. System maintains audit trail of all decisions + +Key benefits: +- Automatic timeout for overdue approvals +- Complete audit trail in DynamoDB +- Scalable to handle high volumes of expense requests + +### Content Moderation Workflow + +For content moderation scenarios: + +1. User-generated content triggers the workflow +2. Content is flagged for human review +3. Moderator receives notification +4. Moderator reviews and approves/rejects content +5. Workflow resumes and publishes or removes content + +Advantages: +- Durable execution ensures no content is lost during review +- Timeout handling for content that requires urgent decisions +- Flexible decision recording with comments + +## Cleanup + +To remove all resources created by this pattern: + +```bash +sam delete +``` + +This will delete: +- Lambda functions (Workflow and Approval API) +- DynamoDB table (ApprovalRequests) +- SNS topic (ApprovalNotifications) +- IAM roles and policies +- CloudWatch log groups + +**Note**: If you've subscribed email addresses to the SNS topic, you'll need to manually unsubscribe them before deletion. + +## Security Considerations + +- IAM roles follow least privilege principle +- DynamoDB table uses encryption at rest +- Callback tokens are not exposed in logs or error messages +- API Gateway validates request parameters before processing +- All input data is validated before processing + +## Cost Considerations + +This pattern uses the following AWS services: +- **Lambda**: Pay per invocation and execution time +- **DynamoDB**: On-demand billing for read/write requests +- **SNS**: Pay per notification sent +- **CloudWatch Logs**: Pay for log storage and ingestion + +Estimated cost for 1,000 approval requests per month: < $1 USD + +## Limitations + +- Maximum durable execution timeout: 1 year (configurable via `DurableConfig.ExecutionTimeout`) +- Synchronous invocation only supported for ExecutionTimeout ≤ 900 seconds (15 minutes) +- Must publish Lambda version before invoking durable function (cannot use $LATEST) +- DynamoDB item size limit: 400 KB +- SNS message size limit: 256 KB +- Lambda container image size limit: 10 GB +- Polling interval: 5 seconds (configurable in workflow code) + +## Additional Resources + +- [Lambda Durable Functions Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html) +- [AWS SAM Documentation](https://docs.aws.amazon.com/serverless-application-model/) +- [DynamoDB Best Practices](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/best-practices.html) +- [SNS Documentation](https://docs.aws.amazon.com/sns/) + +## License + +This pattern is released under the MIT-0 License. See the LICENSE file for details. diff --git a/lambda-durable-hitl-python-sam/example-pattern.json b/lambda-durable-hitl-python-sam/example-pattern.json new file mode 100644 index 000000000..16f061b7b --- /dev/null +++ b/lambda-durable-hitl-python-sam/example-pattern.json @@ -0,0 +1,86 @@ +{ + "title": "Lambda Durable Functions with Human-in-the-Loop", + "description": "Demonstrates Lambda durable functions with human approval workflow using Python 3.13, DynamoDB, and SNS", + "language": "Python", + "level": "300", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to pause Lambda execution, wait for human approval, and resume using the Lambda durable functions SDK.", + "The Workflow Lambda creates an approval request in DynamoDB and polls for decisions using durable waits (no compute charges during waits).", + "An SNS notification is sent to approvers, who can submit their decision via the Approval API Lambda function.", + "The Workflow Lambda detects the decision during polling and resumes execution with the approval result.", + "The pattern includes timeout handling, status tracking, and a complete audit trail of all approval decisions." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-hitl-python-sam", + "templateURL": "serverless-patterns/lambda-durable-hitl-python-sam", + "projectFolder": "lambda-durable-hitl-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Lambda Durable Functions Documentation", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "AWS SAM Documentation", + "link": "https://docs.aws.amazon.com/serverless-application-model/" + }, + { + "text": "DynamoDB Best Practices", + "link": "https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/best-practices.html" + }, + { + "text": "Amazon SNS Documentation", + "link": "https://docs.aws.amazon.com/sns/" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the README in the GitHub repo for detailed testing instructions.", + "Test the approval workflow using AWS CLI:", + "1. Publish Lambda version: aws lambda publish-version --function-name ", + "2. Invoke workflow: aws lambda invoke --function-name : --invocation-type Event --payload '{...}' response.json", + "3. List approvals: aws dynamodb scan --table-name ", + "4. Submit decision: aws lambda invoke --function-name --payload '{\"action\":\"decide\",...}' response.json" + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete" + ] + }, + "authors": [ + { + "name": "Mian Tariq", + "bio": "Cloud Solutions Architect" + } + ], + "patternArch": { + "icon1": { + "name": "lambda", + "label": "Lambda Durable Functions" + }, + "icon2": { + "name": "dynamodb", + "label": "Amazon DynamoDB" + }, + "icon3": { + "name": "sns", + "label": "Amazon SNS" + } + } +} diff --git a/lambda-durable-hitl-python-sam/src/approval_api/Dockerfile b/lambda-durable-hitl-python-sam/src/approval_api/Dockerfile new file mode 100644 index 000000000..ce9525bb7 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/approval_api/Dockerfile @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.13 + +# Copy requirements file +COPY approval_api/requirements.txt ${LAMBDA_TASK_ROOT}/ + +# Install dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy function code +COPY approval_api/app.py ${LAMBDA_TASK_ROOT}/ +COPY shared/*.py ${LAMBDA_TASK_ROOT}/ + +# Set the CMD to your handler +CMD ["app.lambda_handler"] diff --git a/lambda-durable-hitl-python-sam/src/approval_api/app.py b/lambda-durable-hitl-python-sam/src/approval_api/app.py new file mode 100644 index 000000000..d5241d23d --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/approval_api/app.py @@ -0,0 +1,482 @@ +""" +Approval API Lambda Function + +This Lambda function handles approval/rejection decisions from the CLI tool. +It retrieves callback tokens from DynamoDB and resumes workflow execution. + +Primary Use Case: Document Approval Processing +---------------------------------------------- +Processes approval decisions submitted via CLI, validates the request, +and invokes the durable execution callback to resume the paused workflow. + +Additional Use Cases: +-------------------- +1. Expense Approval: Manager reviews expense report and approves/rejects, + this function validates the decision and resumes expense processing workflow. + +2. Content Moderation: Moderator reviews flagged content and approves/rejects, + this function processes the decision and resumes content publishing workflow. + +3. Budget Proposal Review: Finance team reviews budget proposal and approves/rejects, + this function validates the decision and resumes budget allocation workflow. + +Key Responsibilities: +- Validate approval request exists and is still pending +- Check expiration status to prevent late approvals +- Retrieve callback token from DynamoDB +- Invoke durable execution callback to resume workflow +- Update approval status with decision and comments +- Maintain complete audit trail of all decisions +""" + +import json +import logging +import os +import sys +from typing import Dict, Any, Optional, Tuple +from datetime import datetime + +# Add shared module to path +sys.path.insert(0, os.path.dirname(__file__)) + +from models import ApprovalDecisionRequest, ApprovalRequest, ApprovalStatus, Decision +from dynamodb_operations import get_approval_request, update_approval_status + +# Configure structured JSON logging +logger = logging.getLogger() +logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO')) + + +def validate_input(event: Dict[str, Any]) -> Tuple[bool, Optional[str], Optional[ApprovalDecisionRequest]]: + """ + Validate input data from the Lambda event. + + Validates that required fields are present and decision values are valid. + + Args: + event: Lambda event dictionary + + Returns: + Tuple of (is_valid, error_message, parsed_request) + - is_valid: True if input is valid, False otherwise + - error_message: Descriptive error message if invalid, None if valid + - parsed_request: Parsed ApprovalDecisionRequest if valid, None if invalid + """ + # Handle both API Gateway format (with body) and direct Lambda invocation + if "body" in event: + try: + body = json.loads(event["body"]) + except json.JSONDecodeError as e: + return False, f"Invalid JSON in request body: {str(e)}", None + else: + body = event + + # Validate required fields + if "approval_id" not in body: + return False, "Missing required field: approval_id", None + + if "decision" not in body: + return False, "Missing required field: decision", None + + # Validate approval_id is not empty + approval_id = body["approval_id"] + if not approval_id or not isinstance(approval_id, str) or not approval_id.strip(): + return False, "Invalid approval_id: must be a non-empty string", None + + # Validate decision value + decision_str = body["decision"] + if decision_str not in ["approved", "rejected"]: + return False, f"Invalid decision value: '{decision_str}'. Must be 'approved' or 'rejected'", None + + # Parse the request + try: + request = ApprovalDecisionRequest.from_api_event(event) + return True, None, request + except Exception as e: + return False, f"Failed to parse request: {str(e)}", None + + +def validate_approval_request(approval: ApprovalRequest) -> Tuple[bool, Optional[str]]: + """ + Validate that approval request is in a valid state for processing. + + Checks that the request exists, is pending, and has not expired. + + Args: + approval: ApprovalRequest object from DynamoDB + + Returns: + Tuple of (is_valid, error_message) + - is_valid: True if request is valid, False otherwise + - error_message: Descriptive error message if invalid, None if valid + """ + # Check if request is expired + if approval.is_expired(): + logger.warning( + json.dumps({ + "message": "Approval request has expired", + "approval_id": approval.approval_id, + "expires_at": approval.expires_at.isoformat(), + "current_time": datetime.now().isoformat() + }) + ) + return False, f"Approval request has expired at {approval.expires_at.isoformat()}" + + # Check if request is still pending + if approval.status != ApprovalStatus.PENDING: + logger.warning( + json.dumps({ + "message": "Approval request is not in pending status", + "approval_id": approval.approval_id, + "current_status": approval.status.value + }) + ) + + # Provide specific error messages based on status + if approval.status == ApprovalStatus.APPROVED: + return False, f"Approval request has already been approved" + elif approval.status == ApprovalStatus.REJECTED: + return False, f"Approval request has already been rejected" + elif approval.status == ApprovalStatus.TIMEOUT: + return False, f"Approval request has timed out" + else: + return False, f"Approval request is in invalid status: {approval.status.value}" + + return True, None + + +def invoke_durable_callback( + callback_id: str, + decision: Decision, + comments: Optional[str] = None +) -> Tuple[bool, Optional[str]]: + """ + Complete the callback to resume the workflow. + + Uses the Lambda service APIs SendDurableExecutionCallbackSuccess or + SendDurableExecutionCallbackFailure to resume the paused durable execution. + + Args: + callback_id: Callback ID from the paused durable execution + decision: Approval decision (approved/rejected) + comments: Optional comments from the approver + + Returns: + Tuple of (success, error_message) + - success: True if callback completed successfully, False otherwise + - error_message: Error description if failed, None if successful + """ + import boto3 + from botocore.exceptions import ClientError + + lambda_client = boto3.client('lambda') + + # Prepare callback payload with decision and comments + callback_payload = { + "decision": decision.value, + "comments": comments + } + + logger.info( + json.dumps({ + "message": "Completing durable execution callback", + "decision": decision.value, + "has_comments": comments is not None + }) + ) + + try: + if decision == Decision.APPROVED: + # Use SendDurableExecutionCallbackSuccess for approved decisions + response = lambda_client.send_durable_execution_callback_success( + CallbackId=callback_id, + Result=json.dumps(callback_payload) + ) + + logger.info( + json.dumps({ + "message": "Callback success sent", + "decision": decision.value + }) + ) + + elif decision == Decision.REJECTED: + # Use SendDurableExecutionCallbackFailure for rejected decisions + response = lambda_client.send_durable_execution_callback_failure( + CallbackId=callback_id, + Error="ApprovalRejected", + Cause=json.dumps(callback_payload) + ) + + logger.info( + json.dumps({ + "message": "Callback failure sent", + "decision": decision.value + }) + ) + + else: + error_msg = f"Invalid decision for callback: {decision.value}" + logger.error( + json.dumps({ + "message": "Invalid decision type", + "decision": decision.value + }) + ) + return False, error_msg + + logger.info( + json.dumps({ + "message": "Durable execution callback completed successfully", + "decision": decision.value + }) + ) + + return True, None + + except ClientError as e: + error_msg = f"Failed to complete callback: {e.response['Error']['Code']} - {e.response['Error']['Message']}" + logger.error( + json.dumps({ + "message": "Callback completion failed", + "error": error_msg, + "error_code": e.response['Error']['Code'] + }) + ) + return False, error_msg + + except Exception as e: + error_msg = f"Unexpected error completing callback: {str(e)}" + logger.error( + json.dumps({ + "message": "Unexpected callback error", + "error": error_msg, + "error_type": type(e).__name__ + }) + ) + return False, error_msg + + +def create_error_response(status_code: int, error_code: str, error_message: str, details: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """ + Create a standardized error response. + + Args: + status_code: HTTP status code + error_code: Error code identifier + error_message: Human-readable error message + details: Optional additional error details + + Returns: + Dict containing error response in standard format + """ + error_response = { + "error": { + "code": error_code, + "message": error_message + } + } + + if details: + error_response["error"]["details"] = details + + return { + "statusCode": status_code, + "body": json.dumps(error_response) + } + + +def create_success_response(approval_id: str, decision: Decision, message: str = "Decision processed successfully") -> Dict[str, Any]: + """ + Create a standardized success response. + + Args: + approval_id: Approval request identifier + decision: Decision that was processed + message: Success message + + Returns: + Dict containing success response + """ + return { + "statusCode": 200, + "body": json.dumps({ + "message": message, + "approval_id": approval_id, + "decision": decision.value + }) + } + + +def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: + """ + Processes approval decisions and resumes workflow. + + This function: + 1. Validates input data + 2. Retrieves the approval request from DynamoDB + 3. Validates the request is pending and not expired + 4. Invokes the durable execution callback + 5. Updates the approval status in DynamoDB + + Args: + event: Lambda event with approval_id, decision, comments (optional) + context: Lambda context + + Returns: + Response with success/error status + """ + # Log invocation with structured logging + logger.info( + json.dumps({ + "message": "Approval API Lambda invoked", + "request_id": context.request_id if hasattr(context, 'request_id') else None + }) + ) + + try: + # Task 6.6: Validate input data + is_valid, error_message, decision_request = validate_input(event) + if not is_valid: + logger.warning( + json.dumps({ + "message": "Input validation failed", + "error": error_message + }) + ) + return create_error_response(400, "INVALID_REQUEST", error_message) + + logger.info( + json.dumps({ + "message": "Input validated successfully", + "approval_id": decision_request.approval_id, + "decision": decision_request.decision.value + }) + ) + + # Task 6.2: Retrieve approval request from DynamoDB + approval = get_approval_request(decision_request.approval_id) + + if approval is None: + logger.warning( + json.dumps({ + "message": "Approval request not found", + "approval_id": decision_request.approval_id + }) + ) + return create_error_response( + 404, + "APPROVAL_NOT_FOUND", + f"Approval request with ID '{decision_request.approval_id}' not found", + {"approval_id": decision_request.approval_id} + ) + + logger.info( + json.dumps({ + "message": "Approval request retrieved", + "approval_id": approval.approval_id, + "status": approval.status.value, + "expires_at": approval.expires_at.isoformat() + }) + ) + + # Task 6.2: Validate approval request state + is_valid, error_message = validate_approval_request(approval) + if not is_valid: + # Determine appropriate error code based on the error + if "expired" in error_message.lower() or "timed out" in error_message.lower(): + error_code = "APPROVAL_EXPIRED" + status_code = 409 + elif "already been" in error_message.lower(): + error_code = "APPROVAL_ALREADY_DECIDED" + status_code = 409 + else: + error_code = "INVALID_APPROVAL_STATE" + status_code = 409 + + return create_error_response( + status_code, + error_code, + error_message, + {"approval_id": approval.approval_id, "status": approval.status.value} + ) + + # Task 6.5: Update approval status in DynamoDB BEFORE completing callback + # This ensures the decision is available when the workflow resumes + try: + updated_approval = update_approval_status( + approval_id=decision_request.approval_id, + decision=decision_request.decision, + comments=decision_request.comments, + decided_by=decision_request.decided_by + ) + + logger.info( + json.dumps({ + "message": "Approval status updated successfully", + "approval_id": updated_approval.approval_id, + "decision": updated_approval.decision.value, + "status": updated_approval.status.value + }) + ) + + except Exception as e: + logger.error( + json.dumps({ + "message": "Failed to update approval status in DynamoDB", + "approval_id": decision_request.approval_id, + "error": str(e), + "error_type": type(e).__name__ + }) + ) + return create_error_response( + 500, + "DATABASE_UPDATE_FAILED", + f"Failed to update approval status: {str(e)}", + {"approval_id": decision_request.approval_id} + ) + + # Task 6.3: Complete durable execution callback AFTER DynamoDB update + callback_success, callback_error = invoke_durable_callback( + callback_id=approval.callback_token, + decision=decision_request.decision, + comments=decision_request.comments + ) + + if not callback_success: + logger.error( + json.dumps({ + "message": "Failed to complete callback", + "approval_id": approval.approval_id, + "error": callback_error + }) + ) + return create_error_response( + 500, + "CALLBACK_COMPLETION_FAILED", + callback_error, + {"approval_id": approval.approval_id} + ) + + # Return success response + return create_success_response( + approval_id=decision_request.approval_id, + decision=decision_request.decision, + message="Decision processed successfully and workflow resumed" + ) + + except Exception as e: + # Handle unexpected errors + logger.error( + json.dumps({ + "message": "Unexpected error processing approval decision", + "error": str(e), + "error_type": type(e).__name__ + }) + ) + + return create_error_response( + 500, + "INTERNAL_ERROR", + f"An unexpected error occurred: {str(e)}" + ) diff --git a/lambda-durable-hitl-python-sam/src/approval_api/requirements.txt b/lambda-durable-hitl-python-sam/src/approval_api/requirements.txt new file mode 100644 index 000000000..8052b17f9 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/approval_api/requirements.txt @@ -0,0 +1,9 @@ +# AWS SDK +boto3>=1.34.0 +botocore>=1.34.0 + +# AWS Lambda durable functions SDK +aws-durable-execution-sdk-python>=0.1.0 + +# Data validation and parsing +pydantic>=2.5.0 diff --git a/lambda-durable-hitl-python-sam/src/shared/__init__.py b/lambda-durable-hitl-python-sam/src/shared/__init__.py new file mode 100644 index 000000000..4635553eb --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/shared/__init__.py @@ -0,0 +1,19 @@ +"""Shared modules for Lambda functions.""" + +from .models import ( + ApprovalStatus, + Decision, + ApprovalRequest, + WorkflowEvent, + ApprovalDecisionRequest, + WorkflowResult +) + +__all__ = [ + 'ApprovalStatus', + 'Decision', + 'ApprovalRequest', + 'WorkflowEvent', + 'ApprovalDecisionRequest', + 'WorkflowResult' +] diff --git a/lambda-durable-hitl-python-sam/src/shared/dynamodb_operations.py b/lambda-durable-hitl-python-sam/src/shared/dynamodb_operations.py new file mode 100644 index 000000000..b640de478 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/shared/dynamodb_operations.py @@ -0,0 +1,273 @@ +""" +DynamoDB operations for the Lambda Durable HITL pattern. + +This module provides functions for creating, retrieving, updating, and querying +approval requests in DynamoDB with proper error handling and retry logic. +""" + +import os +import uuid +import time +from datetime import datetime, timedelta +from typing import List, Optional +import boto3 +from botocore.exceptions import ClientError + +try: + from .models import ApprovalRequest, ApprovalStatus, Decision +except ImportError: + from models import ApprovalRequest, ApprovalStatus, Decision + + +# Initialize DynamoDB client +dynamodb = boto3.resource('dynamodb') + + +def get_table(): + """ + Get the DynamoDB table for approval requests. + + Returns: + boto3 Table resource + """ + table_name = os.environ.get('APPROVALS_TABLE_NAME') + if not table_name: + raise ValueError("APPROVALS_TABLE_NAME environment variable not set") + return dynamodb.Table(table_name) + + +def _retry_with_backoff(operation, max_retries=3, initial_delay=0.1): + """ + Execute a DynamoDB operation with exponential backoff retry logic. + + Args: + operation: Callable that performs the DynamoDB operation + max_retries: Maximum number of retry attempts + initial_delay: Initial delay in seconds (doubles with each retry) + + Returns: + Result of the operation + + Raises: + ClientError: If operation fails after all retries + """ + delay = initial_delay + last_exception = None + + for attempt in range(max_retries): + try: + return operation() + except ClientError as e: + last_exception = e + error_code = e.response['Error']['Code'] + + # Retry on throttling and server errors + if error_code in ['ProvisionedThroughputExceededException', + 'ThrottlingException', + 'InternalServerError', + 'ServiceUnavailable']: + if attempt < max_retries - 1: + time.sleep(delay) + delay *= 2 # Exponential backoff + continue + + # Don't retry on other errors + raise + + # All retries exhausted + raise last_exception + + +def create_approval_request( + callback_token: str, + document_id: str, + document_name: str, + requester: str, + timeout_seconds: int = 3600 +) -> ApprovalRequest: + """ + Create a new approval request in DynamoDB. + + Args: + callback_token: Durable execution callback token + document_id: Unique identifier for the document + document_name: Human-readable document name + requester: User who submitted the document + timeout_seconds: Timeout duration in seconds (default 1 hour) + + Returns: + ApprovalRequest: Created approval request object + + Raises: + ClientError: If DynamoDB operation fails after retries + """ + table = get_table() + + # Generate unique approval ID + approval_id = str(uuid.uuid4()) + + # Calculate timestamps + now = datetime.now() + expires_at = now + timedelta(seconds=timeout_seconds) + + # Calculate TTL (7 days after creation for automatic cleanup) + ttl = int((now + timedelta(days=7)).timestamp()) + + # Create approval request object + approval = ApprovalRequest( + approval_id=approval_id, + callback_token=callback_token, + document_id=document_id, + document_name=document_name, + requester=requester, + status=ApprovalStatus.PENDING, + created_at=now, + updated_at=now, + expires_at=expires_at, + ttl=ttl + ) + + # Store in DynamoDB with retry logic + def put_operation(): + table.put_item(Item=approval.to_dynamodb_item()) + + _retry_with_backoff(put_operation) + + return approval + + +def get_approval_request(approval_id: str) -> Optional[ApprovalRequest]: + """ + Retrieve an approval request from DynamoDB. + + Args: + approval_id: Unique identifier for the approval request + + Returns: + ApprovalRequest: Retrieved approval request, or None if not found + + Raises: + ClientError: If DynamoDB operation fails after retries + """ + table = get_table() + + def get_operation(): + response = table.get_item(Key={'approval_id': approval_id}) + return response + + response = _retry_with_backoff(get_operation) + + if 'Item' not in response: + return None + + return ApprovalRequest.from_dynamodb_item(response['Item']) + + +def update_approval_status( + approval_id: str, + decision: Decision, + comments: Optional[str] = None, + decided_by: Optional[str] = None +) -> ApprovalRequest: + """ + Update the status of an approval request with a decision. + + Args: + approval_id: Unique identifier for the approval request + decision: Approval decision (approved/rejected/timeout) + comments: Optional comments from the approver + decided_by: Optional identifier of who made the decision + + Returns: + ApprovalRequest: Updated approval request object + + Raises: + ValueError: If approval request not found + ClientError: If DynamoDB operation fails after retries + """ + table = get_table() + + # Map decision to status + status_map = { + Decision.APPROVED: ApprovalStatus.APPROVED, + Decision.REJECTED: ApprovalStatus.REJECTED, + Decision.TIMEOUT: ApprovalStatus.TIMEOUT + } + status = status_map[decision] + + # Calculate timestamps + now = datetime.now() + decided_at = now + + # Build update expression + update_expression = "SET #status = :status, updated_at = :updated_at, decision = :decision, decided_at = :decided_at" + expression_attribute_names = {"#status": "status"} + expression_attribute_values = { + ":status": status.value, + ":updated_at": now.isoformat(), + ":decision": decision.value, + ":decided_at": decided_at.isoformat() + } + + if comments: + update_expression += ", comments = :comments" + expression_attribute_values[":comments"] = comments + + if decided_by: + update_expression += ", decided_by = :decided_by" + expression_attribute_values[":decided_by"] = decided_by + + def update_operation(): + response = table.update_item( + Key={'approval_id': approval_id}, + UpdateExpression=update_expression, + ExpressionAttributeNames=expression_attribute_names, + ExpressionAttributeValues=expression_attribute_values, + ReturnValues='ALL_NEW' + ) + return response + + response = _retry_with_backoff(update_operation) + + if 'Attributes' not in response: + raise ValueError(f"Approval request with ID '{approval_id}' not found") + + return ApprovalRequest.from_dynamodb_item(response['Attributes']) + + +def query_pending_approvals(limit: int = 100) -> List[ApprovalRequest]: + """ + Query for all pending approval requests using the StatusIndex GSI. + + Args: + limit: Maximum number of results to return (default 100) + + Returns: + List[ApprovalRequest]: List of pending approval requests + + Raises: + ClientError: If DynamoDB operation fails after retries + """ + table = get_table() + + def query_operation(): + response = table.query( + IndexName='StatusIndex', + KeyConditionExpression='#status = :status', + ExpressionAttributeNames={'#status': 'status'}, + ExpressionAttributeValues={':status': ApprovalStatus.PENDING.value}, + Limit=limit, + ScanIndexForward=False # Sort by created_at descending (newest first) + ) + return response + + response = _retry_with_backoff(query_operation) + + # Convert items to ApprovalRequest objects and filter out expired ones + approvals = [] + for item in response.get('Items', []): + approval = ApprovalRequest.from_dynamodb_item(item) + if approval.is_pending(): # Only include non-expired pending approvals + approvals.append(approval) + + return approvals diff --git a/lambda-durable-hitl-python-sam/src/shared/models.py b/lambda-durable-hitl-python-sam/src/shared/models.py new file mode 100644 index 000000000..205ddf700 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/shared/models.py @@ -0,0 +1,236 @@ +""" +Data models for the Lambda Durable HITL pattern. + +This module defines the core data structures used throughout the approval workflow, +including approval requests, workflow events, and decision requests. +""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Optional +from enum import Enum +import json + + +class ApprovalStatus(Enum): + """Status values for approval requests.""" + PENDING = "pending" + APPROVED = "approved" + REJECTED = "rejected" + EXPIRED = "expired" + TIMEOUT = "timeout" + + +class Decision(Enum): + """Decision values for approval outcomes.""" + APPROVED = "approved" + REJECTED = "rejected" + TIMEOUT = "timeout" + + +@dataclass +class ApprovalRequest: + """ + Represents an approval request in the system. + + This model stores all information about a pending or completed approval, + including the callback token needed to resume workflow execution. + """ + + approval_id: str + callback_token: str + document_id: str + document_name: str + requester: str + status: ApprovalStatus + created_at: datetime + updated_at: datetime + expires_at: datetime + decision: Optional[Decision] = None + comments: Optional[str] = None + decided_by: Optional[str] = None + decided_at: Optional[datetime] = None + ttl: Optional[int] = None # Unix timestamp for DynamoDB TTL + + def to_dynamodb_item(self) -> dict: + """ + Convert to DynamoDB item format. + + Returns: + dict: DynamoDB item with all fields serialized appropriately + """ + item = { + "approval_id": self.approval_id, + "callback_token": self.callback_token, + "document_id": self.document_id, + "document_name": self.document_name, + "requester": self.requester, + "status": self.status.value, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + "expires_at": self.expires_at.isoformat(), + } + + if self.decision: + item["decision"] = self.decision.value + if self.comments: + item["comments"] = self.comments + if self.decided_by: + item["decided_by"] = self.decided_by + if self.decided_at: + item["decided_at"] = self.decided_at.isoformat() + if self.ttl: + item["ttl"] = self.ttl + + return item + + @classmethod + def from_dynamodb_item(cls, item: dict) -> 'ApprovalRequest': + """ + Create from DynamoDB item. + + Args: + item: DynamoDB item dictionary + + Returns: + ApprovalRequest: Deserialized approval request object + """ + return cls( + approval_id=item["approval_id"], + callback_token=item["callback_token"], + document_id=item["document_id"], + document_name=item["document_name"], + requester=item["requester"], + status=ApprovalStatus(item["status"]), + created_at=datetime.fromisoformat(item["created_at"]), + updated_at=datetime.fromisoformat(item["updated_at"]), + expires_at=datetime.fromisoformat(item["expires_at"]), + decision=Decision(item["decision"]) if "decision" in item else None, + comments=item.get("comments"), + decided_by=item.get("decided_by"), + decided_at=datetime.fromisoformat(item["decided_at"]) if "decided_at" in item else None, + ttl=item.get("ttl") + ) + + def is_expired(self) -> bool: + """ + Check if approval request has expired. + + Returns: + bool: True if current time is past expires_at, False otherwise + """ + return datetime.now() > self.expires_at + + def is_pending(self) -> bool: + """ + Check if approval request is still pending. + + Returns: + bool: True if status is PENDING and not expired, False otherwise + """ + return self.status == ApprovalStatus.PENDING and not self.is_expired() + + +@dataclass +class WorkflowEvent: + """ + Input event for workflow lambda. + + This represents the initial request to start an approval workflow. + """ + + document_id: str + document_name: str + requester: str + timeout_seconds: Optional[int] = 3600 # Default 1 hour + + @classmethod + def from_lambda_event(cls, event: dict) -> 'WorkflowEvent': + """ + Parse from Lambda event. + + Args: + event: Lambda event dictionary + + Returns: + WorkflowEvent: Parsed workflow event object + """ + return cls( + document_id=event["document_id"], + document_name=event["document_name"], + requester=event["requester"], + timeout_seconds=event.get("timeout_seconds", 3600) + ) + + +@dataclass +class ApprovalDecisionRequest: + """ + Request to submit an approval decision. + + This represents a decision (approve/reject) submitted by a human approver. + """ + + approval_id: str + decision: Decision + comments: Optional[str] = None + decided_by: Optional[str] = None + + @classmethod + def from_api_event(cls, event: dict) -> 'ApprovalDecisionRequest': + """ + Parse from API Gateway event or Lambda invocation event. + + Args: + event: API Gateway or Lambda event dictionary + + Returns: + ApprovalDecisionRequest: Parsed decision request object + """ + # Handle both API Gateway format (with body) and direct Lambda invocation + if "body" in event: + body = json.loads(event["body"]) + else: + body = event + + return cls( + approval_id=body["approval_id"], + decision=Decision(body["decision"]), + comments=body.get("comments"), + decided_by=body.get("decided_by") + ) + + +@dataclass +class WorkflowResult: + """ + Result of workflow execution. + + This represents the final outcome of an approval workflow. + """ + + approval_id: str + document_id: str + decision: Decision + comments: Optional[str] = None + decided_at: Optional[datetime] = None + + def to_dict(self) -> dict: + """ + Convert to dictionary for Lambda response. + + Returns: + dict: Serialized workflow result + """ + result = { + "approval_id": self.approval_id, + "document_id": self.document_id, + "decision": self.decision.value, + } + + if self.comments: + result["comments"] = self.comments + if self.decided_at: + result["decided_at"] = self.decided_at.isoformat() + + return result diff --git a/lambda-durable-hitl-python-sam/src/workflow/Dockerfile b/lambda-durable-hitl-python-sam/src/workflow/Dockerfile new file mode 100644 index 000000000..7165fe6d3 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/workflow/Dockerfile @@ -0,0 +1,14 @@ +FROM public.ecr.aws/lambda/python:3.13 + +# Copy requirements file +COPY workflow/requirements.txt ${LAMBDA_TASK_ROOT}/ + +# Install dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy function code +COPY workflow/app.py ${LAMBDA_TASK_ROOT}/ +COPY shared/*.py ${LAMBDA_TASK_ROOT}/ + +# Set the CMD to your handler +CMD ["app.lambda_handler"] diff --git a/lambda-durable-hitl-python-sam/src/workflow/app.py b/lambda-durable-hitl-python-sam/src/workflow/app.py new file mode 100644 index 000000000..4f0764b94 --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/workflow/app.py @@ -0,0 +1,236 @@ +""" +Workflow Lambda Function - Durable Execution with Human-in-the-Loop + +This Lambda function orchestrates the approval workflow using Lambda durable functions. +It pauses execution, waits for human approval, and resumes based on the decision. + +Key Features: +- Durable execution ensures no data loss during pause/resume +- context.step() ensures idempotent operations on replay +- Automatic timeout handling for overdue approvals +- Complete audit trail in DynamoDB +""" + +import json +import logging +import os +import sys +from typing import Dict, Any +from datetime import datetime + +# Add shared module to path +sys.path.insert(0, os.path.dirname(__file__)) + +from models import WorkflowEvent, WorkflowResult, Decision +from dynamodb_operations import create_approval_request + +# Import AWS Durable Execution SDK +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, +) +from aws_durable_execution_sdk_python.config import Duration, CallbackConfig + +# Configure structured JSON logging +logger = logging.getLogger() +logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO')) + + +def send_approval_notification( + approval_id: str, + document_name: str, + requester: str, + expires_at: str +) -> bool: + """Send SNS notification to approvers about pending approval request.""" + import boto3 + from botocore.exceptions import ClientError + + sns_client = boto3.client('sns') + sns_topic_arn = os.environ.get('SNS_TOPIC_ARN') + + if not sns_topic_arn: + logger.error(json.dumps({ + "message": "SNS topic ARN not configured", + "approval_id": approval_id + })) + return False + + message = f"""Approval Required: {document_name} + +A document requires your approval: + +Document: {document_name} +Submitted by: {requester} +Approval ID: {approval_id} +Expires: {expires_at} + +To approve or reject, use the Approval API Lambda. +""" + + try: + response = sns_client.publish( + TopicArn=sns_topic_arn, + Subject=f"Approval Required: {document_name}", + Message=message, + MessageAttributes={ + 'approval_id': {'DataType': 'String', 'StringValue': approval_id}, + 'document_name': {'DataType': 'String', 'StringValue': document_name}, + 'requester': {'DataType': 'String', 'StringValue': requester} + } + ) + logger.info(json.dumps({ + "message": "SNS notification sent", + "approval_id": approval_id, + "message_id": response.get('MessageId') + })) + return True + except (ClientError, Exception) as e: + logger.error(json.dumps({ + "message": "Failed to send SNS notification", + "approval_id": approval_id, + "error": str(e) + })) + return False + + +@durable_execution +def lambda_handler(event: Dict[str, Any], context: DurableContext) -> Dict[str, Any]: + """ + Main workflow orchestrator using durable execution. + + CRITICAL: Operations that generate unique IDs (like create_approval_request) + MUST be wrapped in context.step() to ensure idempotency on replay. + """ + context.logger.info(json.dumps({"message": "Workflow Lambda invoked", "event": event})) + + try: + # Parse workflow event + workflow_event = WorkflowEvent.from_lambda_event(event) + + context.logger.info(json.dumps({ + "message": "Workflow event parsed", + "document_id": workflow_event.document_id, + "document_name": workflow_event.document_name, + "requester": workflow_event.requester, + "timeout_seconds": workflow_event.timeout_seconds + })) + + # Create callback configuration with timeout + callback_config = CallbackConfig( + timeout=Duration.from_seconds(workflow_event.timeout_seconds) + ) + + # Create the callback - SDK manages idempotency for this + callback = context.create_callback( + name="approval_callback", + config=callback_config + ) + + context.logger.info(json.dumps({ + "message": "Callback created", + "callback_id": callback.callback_id, + "document_id": workflow_event.document_id + })) + + # Create approval request in DynamoDB + approval = create_approval_request( + callback_token=callback.callback_id, + document_id=workflow_event.document_id, + document_name=workflow_event.document_name, + requester=workflow_event.requester, + timeout_seconds=workflow_event.timeout_seconds + ) + + context.logger.info(json.dumps({ + "message": "Approval request created", + "approval_id": approval.approval_id, + "document_id": approval.document_id, + "expires_at": approval.expires_at.isoformat() + })) + + # Send notification to approvers + notification_sent = send_approval_notification( + approval_id=approval.approval_id, + document_name=approval.document_name, + requester=approval.requester, + expires_at=approval.expires_at.isoformat() + ) + + if not notification_sent: + context.logger.warning(json.dumps({ + "message": "Notification failed but workflow continues", + "approval_id": approval.approval_id + })) + + # Wait for callback from Approval API + context.logger.info(json.dumps({ + "message": "Waiting for approval callback", + "approval_id": approval.approval_id, + "document_id": workflow_event.document_id + })) + + # This blocks until callback is completed via Lambda API or timeout + decision_result = callback.result() + + context.logger.info(json.dumps({ + "message": "Callback received, workflow resuming", + "approval_id": approval.approval_id, + "document_id": workflow_event.document_id, + "decision_result": str(decision_result) + })) + + # Parse the decision from the callback result + try: + if isinstance(decision_result, str): + callback_data = json.loads(decision_result) + elif isinstance(decision_result, dict): + callback_data = decision_result + else: + callback_data = {} + + decision_str = callback_data.get("decision", "approved") + comments = callback_data.get("comments") + except (json.JSONDecodeError, TypeError): + decision_str = "approved" + comments = None + + decision = Decision(decision_str) if decision_str else Decision.APPROVED + + workflow_result = WorkflowResult( + approval_id=approval.approval_id, + document_id=approval.document_id, + decision=decision, + comments=comments, + decided_at=datetime.now() + ) + + context.logger.info(json.dumps({ + "message": "Workflow completed successfully", + "approval_id": approval.approval_id, + "document_id": workflow_result.document_id, + "decision": workflow_result.decision.value if workflow_result.decision else None + })) + + return { + "statusCode": 200, + "body": json.dumps(workflow_result.to_dict()) + } + + except KeyError as e: + error_msg = f"Missing required field in event: {str(e)}" + context.logger.error(json.dumps({ + "message": "Event validation failed", + "error": error_msg, + "event": event + })) + raise ValueError(error_msg) + + except Exception as e: + context.logger.error(json.dumps({ + "message": "Workflow execution failed", + "error": str(e), + "error_type": type(e).__name__, + "event": event + })) + raise diff --git a/lambda-durable-hitl-python-sam/src/workflow/requirements.txt b/lambda-durable-hitl-python-sam/src/workflow/requirements.txt new file mode 100644 index 000000000..99f850c1b --- /dev/null +++ b/lambda-durable-hitl-python-sam/src/workflow/requirements.txt @@ -0,0 +1,9 @@ +# AWS SDK +boto3>=1.34.0 +botocore>=1.34.0 + +# AWS Durable Execution SDK +aws-durable-execution-sdk-python + +# Data validation and parsing +pydantic>=2.5.0 diff --git a/lambda-durable-hitl-python-sam/template.yaml b/lambda-durable-hitl-python-sam/template.yaml new file mode 100644 index 000000000..6464193e4 --- /dev/null +++ b/lambda-durable-hitl-python-sam/template.yaml @@ -0,0 +1,237 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + Lambda Durable Functions with Human-in-the-Loop (HITL) Pattern + + This pattern demonstrates how to pause Lambda execution, wait for human approval, + and resume using the Lambda durable functions SDK with Python 3.13. + +Globals: + Function: + Timeout: 900 + MemorySize: 512 + Architectures: + - x86_64 + Environment: + Variables: + LOG_LEVEL: INFO + +Parameters: + ApprovalTimeoutSeconds: + Type: Number + Default: 300 + Description: Default timeout for approval requests in seconds (5 minutes) + MinValue: 60 + MaxValue: 86400 + +Resources: + # DynamoDB Table for storing approval requests + ApprovalRequestsTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub '${AWS::StackName}-ApprovalRequests' + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: approval_id + AttributeType: S + - AttributeName: status + AttributeType: S + - AttributeName: created_at + AttributeType: S + KeySchema: + - AttributeName: approval_id + KeyType: HASH + GlobalSecondaryIndexes: + - IndexName: StatusIndex + KeySchema: + - AttributeName: status + KeyType: HASH + - AttributeName: created_at + KeyType: RANGE + Projection: + ProjectionType: ALL + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + SSESpecification: + SSEEnabled: true + PointInTimeRecoverySpecification: + PointInTimeRecoveryEnabled: true + Tags: + - Key: Pattern + Value: lambda-durable-hitl + + # SNS Topic for approval notifications + ApprovalNotificationsTopic: + Type: AWS::SNS::Topic + Properties: + TopicName: !Sub '${AWS::StackName}-ApprovalNotifications' + DisplayName: Approval Notifications + Tags: + - Key: Pattern + Value: lambda-durable-hitl + + # IAM Role for Workflow Lambda + WorkflowLambdaRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: WorkflowLambdaPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - dynamodb:PutItem + - dynamodb:GetItem + - dynamodb:UpdateItem + Resource: !GetAtt ApprovalRequestsTable.Arn + - Effect: Allow + Action: + - sns:Publish + Resource: !Ref ApprovalNotificationsTopic + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-*' + - Effect: Allow + Action: + - lambda:CheckpointDurableExecution + - lambda:GetDurableExecutionState + Resource: '*' + Tags: + - Key: Pattern + Value: lambda-durable-hitl + + # Workflow Lambda Function (Durable Execution) + WorkflowFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub '${AWS::StackName}-Workflow' + PackageType: Image + ImageUri: workflow:latest + Role: !GetAtt WorkflowLambdaRole.Arn + Timeout: 900 + MemorySize: 512 + DurableConfig: + ExecutionTimeout: 3600 + Environment: + Variables: + APPROVALS_TABLE_NAME: !Ref ApprovalRequestsTable + SNS_TOPIC_ARN: !Ref ApprovalNotificationsTopic + APPROVAL_TIMEOUT_SECONDS: !Ref ApprovalTimeoutSeconds + Tags: + Pattern: lambda-durable-hitl + Metadata: + DockerTag: latest + DockerContext: ./src + Dockerfile: workflow/Dockerfile + + # IAM Role for Approval API Lambda + ApprovalApiLambdaRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: ApprovalApiLambdaPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - dynamodb:GetItem + - dynamodb:UpdateItem + - dynamodb:Query + Resource: + - !GetAtt ApprovalRequestsTable.Arn + - !Sub '${ApprovalRequestsTable.Arn}/index/*' + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !Sub 'arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-*' + - Effect: Allow + Action: + - lambda:SendDurableExecutionCallbackSuccess + - lambda:SendDurableExecutionCallbackFailure + Resource: '*' + Tags: + - Key: Pattern + Value: lambda-durable-hitl + + # Approval API Lambda Function + ApprovalApiFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub '${AWS::StackName}-ApprovalApi' + PackageType: Image + ImageUri: approval_api:latest + Role: !GetAtt ApprovalApiLambdaRole.Arn + Environment: + Variables: + APPROVALS_TABLE_NAME: !Ref ApprovalRequestsTable + Tags: + Pattern: lambda-durable-hitl + Metadata: + DockerTag: latest + DockerContext: ./src + Dockerfile: approval_api/Dockerfile + +Outputs: + ApprovalRequestsTableName: + Description: DynamoDB table name for approval requests + Value: !Ref ApprovalRequestsTable + Export: + Name: !Sub '${AWS::StackName}-ApprovalRequestsTableName' + + ApprovalNotificationsTopicArn: + Description: SNS topic ARN for approval notifications + Value: !Ref ApprovalNotificationsTopic + Export: + Name: !Sub '${AWS::StackName}-ApprovalNotificationsTopicArn' + + WorkflowFunctionArn: + Description: Workflow Lambda function ARN + Value: !GetAtt WorkflowFunction.Arn + Export: + Name: !Sub '${AWS::StackName}-WorkflowFunctionArn' + + WorkflowFunctionName: + Description: Workflow Lambda function name + Value: !Ref WorkflowFunction + Export: + Name: !Sub '${AWS::StackName}-WorkflowFunctionName' + + ApprovalApiFunctionArn: + Description: Approval API Lambda function ARN + Value: !GetAtt ApprovalApiFunction.Arn + Export: + Name: !Sub '${AWS::StackName}-ApprovalApiFunctionArn' + + ApprovalApiFunctionName: + Description: Approval API Lambda function name + Value: !Ref ApprovalApiFunction + Export: + Name: !Sub '${AWS::StackName}-ApprovalApiFunctionName' + + StackName: + Description: CloudFormation stack name + Value: !Ref AWS::StackName + Export: + Name: !Sub '${AWS::StackName}-StackName'