diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..1405592 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,16 @@ +# Changelog + +## [Unreleased] + +### Changed +- **架构调整:从同步到异步处理模式** + - 重构SDK Client为SQS异步架构 + - Producer Lambda(SdkClientFunction):接收Telegram webhook,立即返回200,消息写入SQS队列 + - Consumer Lambda(ConsumerFunction):异步处理SQS消息,调用Agent Server,返回结果给Telegram + - 解决Telegram 30秒webhook超时问题,支持长运行任务 + +### Added +- SQS任务队列(TaskQueue)和死信队列(DLQueue) +- SNS告警主题(AlarmTopic)用于CloudWatch通知 +- CloudWatch告警和自定义指标监控 +- DynamoDB会话表用于多轮对话状态管理 diff --git a/README.md b/README.md index abdcb64..b7158a6 100644 --- a/README.md +++ b/README.md @@ -17,14 +17,16 @@ A Serverless AI Agent system built on Claude Agent SDK, implementing stateful co ## Architecture ``` -Telegram User → Bot API → API Gateway → sdk-client Lambda - ↓ - API Gateway → agent-container Lambda - ↓ - DynamoDB (Session mapping) + S3 (Session files) + Bedrock (Claude) +Telegram User → Bot API → API Gateway → Producer Lambda → SQS Queue → Consumer Lambda + ↓ ↓ + Return 200 agent-server Lambda + immediately ↓ + DynamoDB (Session mapping) + S3 (Session files) + Bedrock (Claude) ``` -**Core Design**: Uses the Hybrid Sessions pattern recommended by Claude Agent SDK +**Core Design**: +- Uses the Hybrid Sessions pattern recommended by Claude Agent SDK +- **SQS Async Architecture**: Producer returns 200 immediately to Telegram, Consumer processes requests asynchronously ## Features @@ -34,6 +36,7 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda - **Skills Support**: Reusable skill modules with hello-world example - **MCP Integration**: Support for HTTP and local command-based MCP servers - **Auto Cleanup**: 25-day TTL + S3 lifecycle management +- **SQS Queue**: Async processing + auto retry + dead letter queue - **Quick Start**: Provides example Skill/SubAgent/MCP configurations for adding other components ## Project Structure @@ -51,7 +54,9 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda │ └── system_prompt.md # System Prompt │ ├── agent-sdk-client/ # Telegram Client (ZIP Deployment) -│ └── handler.py # Webhook Handler +│ ├── handler.py # Producer: Webhook receiver, writes to SQS +│ ├── consumer.py # Consumer: SQS consumer, calls Agent +│ └── config.py # Configuration management │ ├── docs/ # Documentation │ └── anthropic-agent-sdk-official/ # SDK Official Docs Reference @@ -93,16 +98,32 @@ sam deploy --guided | `BEDROCK_SECRET_ACCESS_KEY` | Bedrock secret key | | `SDK_CLIENT_AUTH_TOKEN` | Internal authentication token | | `TELEGRAM_BOT_TOKEN` | Telegram Bot Token | +| `QUEUE_URL` | SQS queue URL (auto-created) | ## Tech Stack - **Runtime**: Python 3.12 + Claude Agent SDK - **Computing**: AWS Lambda (ARM64) - **Storage**: S3 + DynamoDB +- **Message Queue**: AWS SQS (Standard Queue + DLQ) - **AI**: Claude via Amazon Bedrock - **Orchestration**: AWS SAM - **Integration**: Telegram Bot API + MCP +## SQS Async Architecture + +**Problem Solved**: Telegram Webhook times out and retries after ~27s, while Agent processing may take 30-70s, causing duplicate responses. + +**Solution**: +1. Producer Lambda receives Webhook, writes to SQS, returns 200 immediately (<1s) +2. Consumer Lambda consumes from SQS, calls Agent Server, sends response to Telegram +3. Retry 3 times on failure, then move to dead letter queue (DLQ) + +**Queue Configuration**: +- VisibilityTimeout: 900s (= Lambda timeout) +- maxReceiveCount: 3 (retry 3 times) +- DLQ Alarm: CloudWatch alarm triggers when messages enter DLQ + ## Session Management **Lifecycle**: @@ -181,14 +202,16 @@ MIT ## 架构 ``` -Telegram User → Bot API → API Gateway → sdk-client Lambda - ↓ - API Gateway → agent-container Lambda - ↓ - DynamoDB (Session映射) + S3 (Session文件) + Bedrock (Claude) +Telegram User → Bot API → API Gateway → Producer Lambda → SQS Queue → Consumer Lambda + ↓ ↓ + 立即返回 200 agent-server Lambda + ↓ + DynamoDB (Session映射) + S3 (Session文件) + Bedrock (Claude) ``` -**核心设计**:采用 Claude Agent SDK 官方推荐的 Hybrid Sessions 模式 +**核心设计**: +- 采用 Claude Agent SDK 官方推荐的 Hybrid Sessions 模式 +- **SQS 异步架构**:Producer 立即返回 200 给 Telegram,Consumer 异步处理请求 ## 特性 @@ -198,6 +221,7 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda - **Skills 支持**:可复用的技能模块,包含 hello-world 示例 - **MCP 集成**:支持 HTTP 和本地命令类型的 MCP 服务器 - **自动清理**:25天 TTL + S3 生命周期管理 +- **SQS 队列**:异步处理 + 自动重试 + 死信队列 - **快速开始**:提供示例 Skill/SubAgent/MCP 配置,可按照示例添加其他组件 ## 项目结构 @@ -215,7 +239,9 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda │ └── system_prompt.md # 系统提示 │ ├── agent-sdk-client/ # Telegram客户端 (ZIP部署) -│ └── handler.py # Webhook处理 +│ ├── handler.py # Producer: Webhook接收,写入SQS +│ ├── consumer.py # Consumer: SQS消费,调用Agent +│ └── config.py # 配置管理 │ ├── docs/ # 文档 │ └── anthropic-agent-sdk-official/ # SDK官方文档参考 @@ -257,16 +283,32 @@ sam deploy --guided | `BEDROCK_SECRET_ACCESS_KEY` | Bedrock密钥 | | `SDK_CLIENT_AUTH_TOKEN` | 内部认证Token | | `TELEGRAM_BOT_TOKEN` | Telegram Bot Token | +| `QUEUE_URL` | SQS队列URL(自动创建) | ## 技术栈 - **Runtime**: Python 3.12 + Claude Agent SDK - **计算**: AWS Lambda (ARM64) - **存储**: S3 + DynamoDB +- **消息队列**: AWS SQS (Standard Queue + DLQ) - **AI**: Claude via Amazon Bedrock - **编排**: AWS SAM - **集成**: Telegram Bot API + MCP +## SQS 异步架构 + +**解决的问题**:Telegram Webhook 在 ~27s 后超时重试,而 Agent 处理可能需要 30-70s,导致重复响应。 + +**解决方案**: +1. Producer Lambda 接收 Webhook,写入 SQS,立即返回 200(<1s) +2. Consumer Lambda 从 SQS 消费,调用 Agent Server,发送响应给 Telegram +3. 失败重试 3 次,最终失败进入死信队列(DLQ) + +**队列配置**: +- VisibilityTimeout: 900s(= Lambda 超时) +- maxReceiveCount: 3(重试 3 次) +- DLQ 告警:消息进入 DLQ 时触发 CloudWatch 告警 + ## Session 管理 **生命周期**: diff --git a/agent-sdk-client/config.py b/agent-sdk-client/config.py index 7b67d68..791b468 100644 --- a/agent-sdk-client/config.py +++ b/agent-sdk-client/config.py @@ -10,6 +10,7 @@ class Config: telegram_token: str agent_server_url: str auth_token: str + queue_url: str @classmethod def from_env(cls) -> 'Config': @@ -18,4 +19,5 @@ def from_env(cls) -> 'Config': telegram_token=os.getenv('TELEGRAM_BOT_TOKEN', ''), agent_server_url=os.getenv('AGENT_SERVER_URL', ''), auth_token=os.getenv('SDK_CLIENT_AUTH_TOKEN', 'default-token'), + queue_url=os.getenv('QUEUE_URL', ''), ) diff --git a/agent-sdk-client/consumer.py b/agent-sdk-client/consumer.py new file mode 100644 index 0000000..673bb6c --- /dev/null +++ b/agent-sdk-client/consumer.py @@ -0,0 +1,142 @@ +"""Lambda handler for SQS Consumer. + +Processes messages from SQS queue, calls Agent Server, sends response to Telegram. +""" +import asyncio +import json +from typing import Any + +import httpx +from telegram import Bot, Update +from telegram.constants import ParseMode, ChatAction +from telegram.helpers import escape_markdown +from telegram.error import BadRequest + +from config import Config + + +def lambda_handler(event: dict, context: Any) -> dict: + """SQS Consumer Lambda entry point.""" + for record in event['Records']: + try: + message_data = json.loads(record['body']) + except json.JSONDecodeError as e: + # Invalid message format - log and skip + import logging + logger = logging.getLogger() + logger.error(f"Failed to parse SQS message: {e}") + continue + + try: + asyncio.run(process_message(message_data)) + except Exception as e: + # Log and let SQS retry on failure + import logging + logger = logging.getLogger() + logger.exception(f"Failed to process message: {e}") + raise # Re-raise to fail the batch item + + return {'statusCode': 200} + + +async def process_message(message_data: dict) -> None: + """Process single message from SQS queue.""" + import logging + logger = logging.getLogger() + + config = Config.from_env() + bot = Bot(config.telegram_token) + + # Reconstruct Update object from stored data + update = Update.de_json(message_data['telegram_update'], bot) + message = update.message or update.edited_message + + if not message: + logger.warning("Received update with no message or edited_message") + return + + # Send typing indicator + await bot.send_chat_action( + chat_id=message.chat_id, + action=ChatAction.TYPING, + message_thread_id=message.message_thread_id, + ) + + # Initialize result with default error response + # This ensures result is always defined, even if Agent Server call fails + result = { + 'response': '', + 'is_error': True, + 'error_message': 'Failed to get response from Agent Server' + } + + # Call Agent Server + try: + async with httpx.AsyncClient(timeout=600.0) as client: + response = await client.post( + config.agent_server_url, + headers={ + 'Authorization': f'Bearer {config.auth_token}', + 'Content-Type': 'application/json', + }, + json={ + 'user_message': message.text, + 'chat_id': str(message.chat_id), + 'thread_id': str(message.message_thread_id) if message.message_thread_id else None, + }, + ) + response.raise_for_status() + result = response.json() + + except httpx.TimeoutException: + logger.warning(f"Agent Server timeout for chat_id={message.chat_id}") + await bot.send_message( + chat_id=message.chat_id, + text="Request timed out.", + message_thread_id=message.message_thread_id, + ) + raise # Re-raise to trigger SQS retry for transient errors + + except Exception as e: + logger.exception(f"Agent Server error for chat_id={message.chat_id}") + error_text = f"Error: {str(e)[:200]}" + try: + await bot.send_message( + chat_id=message.chat_id, + text=error_text, + message_thread_id=message.message_thread_id, + ) + except Exception as send_error: + logger.error(f"Failed to send error message to Telegram: {send_error}") + # Don't re-raise - error message already sent to user, retrying would cause duplicate messages + + # Format response (result is guaranteed to be defined now) + if result.get('is_error'): + text = f"Agent error: {result.get('error_message', 'Unknown')}" + else: + text = result.get('response') or 'No response' + + if len(text) > 4000: + text = text[:4000] + "\n\n... (truncated)" + + # Send response to Telegram + try: + await bot.send_message( + chat_id=message.chat_id, + text=text, + parse_mode=ParseMode.MARKDOWN_V2, + message_thread_id=message.message_thread_id, + reply_to_message_id=message.message_id, + ) + except BadRequest as e: + if "parse entities" in str(e).lower(): + safe_text = escape_markdown(text, version=2) + await bot.send_message( + chat_id=message.chat_id, + text=safe_text, + parse_mode=ParseMode.MARKDOWN_V2, + message_thread_id=message.message_thread_id, + reply_to_message_id=message.message_id, + ) + else: + raise diff --git a/agent-sdk-client/handler.py b/agent-sdk-client/handler.py index 435a6f5..f681358 100644 --- a/agent-sdk-client/handler.py +++ b/agent-sdk-client/handler.py @@ -1,102 +1,173 @@ -"""Lambda handler for sdk-client. +"""Lambda handler for sdk-client (Producer). -Receives Telegram webhook, calls agent-server, sends response back. +Receives Telegram webhook, writes to SQS, returns 200 immediately. """ -import asyncio import json +import logging from typing import Any -import httpx +import boto3 +from botocore.exceptions import ClientError from telegram import Bot, Update -from telegram.constants import ParseMode, ChatAction -from telegram.helpers import escape_markdown -from telegram.error import BadRequest from config import Config +logger = logging.getLogger() +logger.setLevel(logging.INFO) -def lambda_handler(event: dict, context: Any) -> dict: - """Lambda entry point.""" - try: - body = json.loads(event.get('body', '{}')) - except json.JSONDecodeError: - return {'statusCode': 200} +# Reuse SQS client across invocations +_sqs_client = None +_cloudwatch_client = None - asyncio.run(process_webhook(body)) - return {'statusCode': 200} +def _get_sqs_client(): + """Get or create SQS client singleton.""" + global _sqs_client + if _sqs_client is None: + _sqs_client = boto3.client('sqs') + return _sqs_client -async def process_webhook(body: dict) -> None: - """Process Telegram webhook payload.""" - config = Config.from_env() - bot = Bot(config.telegram_token) - update = Update.de_json(body, bot) - if not update: - return +def _get_cloudwatch_client(): + """Get or create CloudWatch client singleton.""" + global _cloudwatch_client + if _cloudwatch_client is None: + _cloudwatch_client = boto3.client('cloudwatch') + return _cloudwatch_client + + +def _send_metric(metric_name: str, value: float = 1.0): + """Send custom metric to CloudWatch (non-blocking).""" + try: + cloudwatch = _get_cloudwatch_client() + cloudwatch.put_metric_data( + Namespace='OmniCloudAgent/Producer', + MetricData=[ + { + 'MetricName': metric_name, + 'Value': value, + 'Unit': 'Count', + } + ], + ) + except Exception as e: + logger.warning(f"Failed to send CloudWatch metric: {e}") - message = update.message or update.edited_message - if not message or not message.text: - return - await bot.send_chat_action( - chat_id=message.chat_id, - action=ChatAction.TYPING, - message_thread_id=message.message_thread_id, - ) +def _send_to_sqs_safe(sqs, queue_url: str, message_body: dict) -> bool: + """Send message to SQS with comprehensive error handling. + Returns: + True if message sent successfully, False otherwise. + """ try: - async with httpx.AsyncClient(timeout=600.0) as client: - response = await client.post( - config.agent_server_url, - headers={ - 'Authorization': f'Bearer {config.auth_token}', - 'Content-Type': 'application/json', - }, - json={ - 'user_message': message.text, - 'chat_id': str(message.chat_id), - 'thread_id': str(message.message_thread_id) if message.message_thread_id else None, - }, + response = sqs.send_message( + QueueUrl=queue_url, MessageBody=json.dumps(message_body) + ) + message_id = response.get('MessageId', 'unknown') + logger.info(f"Message sent to SQS: {message_id}") + _send_metric('SQSMessageSent') + return True + + except sqs.exceptions.QueueDoesNotExist: + logger.error( + f"CRITICAL: Queue does not exist: {queue_url}", + extra={'queue_url': queue_url}, + ) + _send_metric('SQSError.QueueNotFound') + return False + + except ClientError as e: + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + error_msg = e.response.get('Error', {}).get('Message', '') + + if error_code in ('AccessDenied', 'AccessDeniedException'): + logger.error( + f"CRITICAL: IAM permission denied for SQS: {error_msg}", + extra={'error_code': error_code, 'queue_url': queue_url}, + ) + _send_metric('SQSError.AccessDenied') + + elif error_code in ('ThrottlingException', 'RequestThrottled'): + logger.warning( + f"SQS throttled (will be retried by consumer): {error_msg}", + extra={'error_code': error_code}, ) - result = response.json() + _send_metric('SQSError.Throttled') + + elif error_code == 'InvalidParameterValue': + logger.error( + f"CRITICAL: Invalid SQS parameter: {error_msg}", + extra={'error_code': error_code, 'message_body': message_body}, + ) + _send_metric('SQSError.InvalidParameter') + + else: + logger.error( + f"SQS ClientError [{error_code}]: {error_msg}", + extra={'error_code': error_code, 'error_msg': error_msg}, + ) + _send_metric(f'SQSError.{error_code}') + + return False - except httpx.TimeoutException: - await bot.send_message(chat_id=message.chat_id, text="Request timed out.", - message_thread_id=message.message_thread_id) - return except Exception as e: - await bot.send_message(chat_id=message.chat_id, text=f"Error: {str(e)[:200]}", - message_thread_id=message.message_thread_id) - return + logger.exception( + f"Unexpected error sending to SQS: {e}", + extra={'exception_type': type(e).__name__}, + ) + _send_metric('SQSError.Unexpected') + return False - if result.get('is_error'): - text = f"Agent error: {result.get('error_message', 'Unknown')}" - else: - text = result.get('response', 'No response') - if len(text) > 4000: - text = text[:4000] + "\n\n... (truncated)" +def lambda_handler(event: dict, context: Any) -> dict: + """Lambda entry point - Producer. - # Try MARKDOWN_V2 first, fallback with escape on parse errors + Validates Telegram message and writes to SQS queue. + Returns 200 immediately without waiting for processing. + """ try: - await bot.send_message( - chat_id=message.chat_id, - text=text, - parse_mode=ParseMode.MARKDOWN_V2, - message_thread_id=message.message_thread_id, - reply_to_message_id=message.message_id, + body = json.loads(event.get('body', '{}')) + except json.JSONDecodeError: + logger.warning('Invalid JSON in webhook body') + return {'statusCode': 200} + + config = Config.from_env() + + # Quick validation - parse update to check if it's a valid message + bot = Bot(config.telegram_token) + update = Update.de_json(body, bot) + + if not update: + logger.debug('Ignoring non-update webhook') + return {'statusCode': 200} + + message = update.message or update.edited_message + if not message or not message.text: + logger.debug('Ignoring webhook without text message') + return {'statusCode': 200} + + # Write to SQS for async processing + sqs = _get_sqs_client() + message_body = { + 'telegram_update': body, + 'chat_id': message.chat_id, + 'message_id': message.message_id, + 'text': message.text, + 'thread_id': message.message_thread_id, + } + + success = _send_to_sqs_safe(sqs, config.queue_url, message_body) + + # Return 200 immediately - processing happens async in consumer + # Note: Even if SQS fails, we return 200 to prevent Telegram webhook retries + if not success: + logger.error( + f'Failed to send message to SQS but returning 200 to Telegram', + extra={ + 'chat_id': message.chat_id, + 'message_id': message.message_id, + }, ) - except BadRequest as e: - if "parse entities" in str(e).lower(): - print(f"[MARKDOWN_V2] Parse error: {e}, retrying with escaped text") - safe_text = escape_markdown(text, version=2) - await bot.send_message( - chat_id=message.chat_id, - text=safe_text, - parse_mode=ParseMode.MARKDOWN_V2, - message_thread_id=message.message_thread_id, - reply_to_message_id=message.message_id, - ) - else: - raise + + return {'statusCode': 200} diff --git a/agent-sdk-server/handler.py b/agent-sdk-server/handler.py index f90622c..b83fbf3 100644 --- a/agent-sdk-server/handler.py +++ b/agent-sdk-server/handler.py @@ -4,12 +4,16 @@ """ import asyncio import json +import logging from typing import Any from config import Config from session_store import SessionStore from agent_session import process_message +logger = logging.getLogger() +logger.setLevel(logging.INFO) + def lambda_handler(event: dict, context: Any) -> dict: """Lambda entry point. @@ -76,26 +80,56 @@ def lambda_handler(event: dict, context: Any) -> dict: # Download session files if resuming if session_id: - store.download_session_files(session_id) + try: + store.download_session_files(session_id) + except Exception as e: + logger.error(f"Failed to download session files for session_id={session_id}: {e}") + return { + 'statusCode': 500, + 'body': json.dumps({'error': 'Failed to download session'}) + } # Process message with Agent SDK - result = asyncio.run(process_message( - user_message=user_message, - session_id=session_id, - model=model, - )) + try: + result = asyncio.run(process_message( + user_message=user_message, + session_id=session_id, + model=model, + )) + except Exception as e: + logger.exception(f"Agent SDK processing failed for chat_id={chat_id}") + return { + 'statusCode': 500, + 'headers': {'Content-Type': 'application/json'}, + 'body': json.dumps({ + 'response': '', + 'session_id': session_id, + 'cost_usd': 0.0, + 'num_turns': 0, + 'is_error': True, + 'error_message': f'Agent processing error: {str(e)[:200]}' + }) + } # Get session_id from result (SDK generates it for new sessions) result_session_id = result.get('session_id', '') # Save session mapping if new session if result_session_id and result_session_id != session_id: - store.save_session_id(chat_id, thread_id, result_session_id) + try: + store.save_session_id(chat_id, thread_id, result_session_id) + except Exception as e: + logger.error(f"Failed to save session mapping for chat_id={chat_id}: {e}") + # Continue anyway - session can still work with S3 files # Upload session files if result_session_id: - store.upload_session_files(result_session_id) - store.update_session_timestamp(chat_id, thread_id) + try: + store.upload_session_files(result_session_id) + store.update_session_timestamp(chat_id, thread_id) + except Exception as e: + logger.error(f"Failed to upload session files for session_id={result_session_id}: {e}") + # Continue anyway - result is already available to return return { 'statusCode': 200, diff --git a/template.yaml b/template.yaml index d8fb472..f8e40d3 100644 --- a/template.yaml +++ b/template.yaml @@ -67,6 +67,31 @@ Resources: AttributeName: ttl Enabled: true + # SQS Task Queue + TaskQueue: + Type: AWS::SQS::Queue + Properties: + QueueName: !Sub '${AWS::StackName}-TaskQueue' + VisibilityTimeout: 900 # 15 minutes = Lambda timeout + MessageRetentionPeriod: 1209600 # 14 days + RedrivePolicy: + deadLetterTargetArn: !GetAtt DLQueue.Arn + maxReceiveCount: 3 # Retry 3 times then move to DLQ + + # Dead Letter Queue + DLQueue: + Type: AWS::SQS::Queue + Properties: + QueueName: !Sub '${AWS::StackName}-DLQueue' + MessageRetentionPeriod: 1209600 # 14 days + + # SNS Topic for alarm notifications + AlarmTopic: + Type: AWS::SNS::Topic + Properties: + TopicName: !Sub '${AWS::StackName}-Alarms' + DisplayName: OmniCloud Agent Alarms + # Agent Server Lambda (Container) AgentServerFunction: Type: AWS::Serverless::Function @@ -108,18 +133,28 @@ Resources: FunctionUrlConfig: AuthType: NONE - # SDK Client Lambda (ZIP) + # SDK Client Lambda - Producer (writes to SQS, returns 200 immediately) SdkClientFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./agent-sdk-client Handler: handler.lambda_handler Runtime: python3.12 + Timeout: 10 # Producer should complete quickly (<1s for message validation + SQS write) Environment: Variables: TELEGRAM_BOT_TOKEN: !Ref TelegramBotToken AGENT_SERVER_URL: !GetAtt AgentServerFunctionUrl.FunctionUrl SDK_CLIENT_AUTH_TOKEN: !Ref AuthToken + QUEUE_URL: !Ref TaskQueue + Policies: + - SQSSendMessagePolicy: + QueueName: !GetAtt TaskQueue.QueueName + - Statement: + - Effect: Allow + Action: + - cloudwatch:PutMetricData + Resource: '*' Events: Webhook: Type: HttpApi @@ -127,6 +162,89 @@ Resources: Path: /webhook Method: POST + # Consumer Lambda - processes messages from SQS + ConsumerFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: ./agent-sdk-client + Handler: consumer.lambda_handler + Runtime: python3.12 + Timeout: 900 # 15 minutes for long Agent Server calls + Environment: + Variables: + TELEGRAM_BOT_TOKEN: !Ref TelegramBotToken + AGENT_SERVER_URL: !GetAtt AgentServerFunctionUrl.FunctionUrl + SDK_CLIENT_AUTH_TOKEN: !Ref AuthToken + QUEUE_URL: !Ref TaskQueue + SESSION_TABLE: !Ref SessionTable + Policies: + - SQSPollerPolicy: + QueueName: !GetAtt TaskQueue.QueueName + - DynamoDBCrudPolicy: + TableName: !Ref SessionTable + Events: + SQSEvent: + Type: SQS + Properties: + Queue: !GetAtt TaskQueue.Arn + BatchSize: 1 # Process one message at a time + MaximumBatchingWindowInSeconds: 0 # Process immediately + + # DLQ Alarm - alert when messages land in DLQ + DLQAlarm: + Type: AWS::CloudWatch::Alarm + Properties: + AlarmName: !Sub '${AWS::StackName}-DLQ-Messages' + AlarmDescription: Alert when messages land in Dead Letter Queue + MetricName: ApproximateNumberOfMessagesVisible + Namespace: AWS/SQS + Statistic: Sum + Period: 300 + EvaluationPeriods: 1 + Threshold: 1 + ComparisonOperator: GreaterThanOrEqualToThreshold + AlarmActions: + - !Ref AlarmTopic + Dimensions: + - Name: QueueName + Value: !GetAtt DLQueue.QueueName + + # Producer SQS send error alarm + ProducerSQSErrorAlarm: + Type: AWS::CloudWatch::Alarm + Properties: + AlarmName: !Sub '${AWS::StackName}-Producer-SQS-Errors' + AlarmDescription: Alert when Producer fails to send messages to SQS + MetricName: SQSError.QueueNotFound + Namespace: OmniCloudAgent/Producer + Statistic: Sum + Period: 60 + EvaluationPeriods: 1 + Threshold: 1 + ComparisonOperator: GreaterThanOrEqualToThreshold + AlarmActions: + - !Ref AlarmTopic + TreatMissingData: notBreaching + + # Producer Lambda error alarm + ProducerLambdaErrorAlarm: + Type: AWS::CloudWatch::Alarm + Properties: + AlarmName: !Sub '${AWS::StackName}-Producer-Lambda-Errors' + AlarmDescription: Alert on Producer Lambda exceptions + MetricName: Errors + Namespace: AWS/Lambda + Statistic: Sum + Period: 60 + EvaluationPeriods: 1 + Threshold: 1 + ComparisonOperator: GreaterThanOrEqualToThreshold + AlarmActions: + - !Ref AlarmTopic + Dimensions: + - Name: FunctionName + Value: !Ref SdkClientFunction + Outputs: WebhookUrl: Description: Telegram Webhook URL @@ -134,3 +252,14 @@ Outputs: AgentServerUrl: Description: Agent Server Function URL Value: !GetAtt AgentServerFunctionUrl.FunctionUrl + TaskQueueUrl: + Description: Task Queue URL + Value: !Ref TaskQueue + DLQueueUrl: + Description: Dead Letter Queue URL + Value: !Ref DLQueue + AlarmTopicArn: + Description: SNS Topic for CloudWatch Alarm notifications + Value: !Ref AlarmTopic + Export: + Name: !Sub '${AWS::StackName}-AlarmTopic'