From f67d856b3c5a69cda13a55981439219c4f6b2a6b Mon Sep 17 00:00:00 2001 From: BukeLy Date: Sun, 4 Jan 2026 20:50:16 +0800 Subject: [PATCH 1/5] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=20SDK=20Client?= =?UTF-8?q?=20=E4=B8=BA=20SQS=20=E5=BC=82=E6=AD=A5=E6=9E=B6=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 解决 Telegram Webhook 超时重试导致重复响应的问题: - Producer Lambda 接收 Webhook 后立即返回 200(<1s) - Consumer Lambda 从 SQS 异步消费,调用 Agent Server - 添加死信队列(DLQ)和 CloudWatch 告警 --- README.md | 35 ++++++++--- agent-sdk-client/config.py | 2 + agent-sdk-client/consumer.py | 105 ++++++++++++++++++++++++++++++++ agent-sdk-client/handler.py | 114 ++++++++++++----------------------- template.yaml | 70 ++++++++++++++++++++- 5 files changed, 242 insertions(+), 84 deletions(-) create mode 100644 agent-sdk-client/consumer.py diff --git a/README.md b/README.md index 838a127..597b5eb 100644 --- a/README.md +++ b/README.md @@ -5,14 +5,16 @@ ## 架构 ``` -Telegram User → Bot API → API Gateway → sdk-client Lambda - ↓ - API Gateway → agent-container Lambda - ↓ - DynamoDB (Session映射) + S3 (Session文件) + Bedrock (Claude) +Telegram User → Bot API → API Gateway → Producer Lambda → SQS Queue → Consumer Lambda + ↓ ↓ + 立即返回 200 agent-server Lambda + ↓ + DynamoDB (Session映射) + S3 (Session文件) + Bedrock (Claude) ``` -**核心设计**:采用 Claude Agent SDK 官方推荐的 Hybrid Sessions 模式 +**核心设计**: +- 采用 Claude Agent SDK 官方推荐的 Hybrid Sessions 模式 +- **SQS 异步架构**:Producer 立即返回 200 给 Telegram,Consumer 异步处理请求 ## 特性 @@ -22,6 +24,7 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda - **Skills 支持**:可复用的技能模块 - **MCP 集成**:支持 HTTP 和本地命令类型的 MCP 服务器 - **自动清理**:25天 TTL + S3 生命周期管理 +- **SQS 队列**:异步处理 + 自动重试 + 死信队列 ## 项目结构 @@ -38,7 +41,9 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda │ └── system_prompt.md # 系统提示 │ ├── agent-sdk-client/ # Telegram客户端 (ZIP部署) -│ └── handler.py # Webhook处理 +│ ├── handler.py # Producer: Webhook接收,写入SQS +│ ├── consumer.py # Consumer: SQS消费,调用Agent +│ └── config.py # 配置管理 │ ├── docs/ # 文档 │ └── anthropic-agent-sdk-official/ # SDK官方文档参考 @@ -80,16 +85,32 @@ sam deploy --guided | `BEDROCK_SECRET_ACCESS_KEY` | Bedrock密钥 | | `SDK_CLIENT_AUTH_TOKEN` | 内部认证Token | | `TELEGRAM_BOT_TOKEN` | Telegram Bot Token | +| `QUEUE_URL` | SQS队列URL(自动创建) | ## 技术栈 - **Runtime**: Python 3.12 + Claude Agent SDK - **计算**: AWS Lambda (ARM64) - **存储**: S3 + DynamoDB +- **消息队列**: AWS SQS (Standard Queue + DLQ) - **AI**: Claude via Amazon Bedrock - **编排**: AWS SAM - **集成**: Telegram Bot API + MCP +## SQS 异步架构 + +**解决的问题**:Telegram Webhook 在 ~27s 后超时重试,而 Agent 处理可能需要 30-70s,导致重复响应。 + +**解决方案**: +1. Producer Lambda 接收 Webhook,写入 SQS,立即返回 200(<1s) +2. Consumer Lambda 从 SQS 消费,调用 Agent Server,发送响应给 Telegram +3. 失败重试 3 次,最终失败进入死信队列(DLQ) + +**队列配置**: +- VisibilityTimeout: 900s(= Lambda 超时) +- maxReceiveCount: 3(重试 3 次) +- DLQ 告警:消息进入 DLQ 时触发 CloudWatch 告警 + ## Session 管理 **生命周期**: diff --git a/agent-sdk-client/config.py b/agent-sdk-client/config.py index 7b67d68..791b468 100644 --- a/agent-sdk-client/config.py +++ b/agent-sdk-client/config.py @@ -10,6 +10,7 @@ class Config: telegram_token: str agent_server_url: str auth_token: str + queue_url: str @classmethod def from_env(cls) -> 'Config': @@ -18,4 +19,5 @@ def from_env(cls) -> 'Config': telegram_token=os.getenv('TELEGRAM_BOT_TOKEN', ''), agent_server_url=os.getenv('AGENT_SERVER_URL', ''), auth_token=os.getenv('SDK_CLIENT_AUTH_TOKEN', 'default-token'), + queue_url=os.getenv('QUEUE_URL', ''), ) diff --git a/agent-sdk-client/consumer.py b/agent-sdk-client/consumer.py new file mode 100644 index 0000000..d074599 --- /dev/null +++ b/agent-sdk-client/consumer.py @@ -0,0 +1,105 @@ +"""Lambda handler for SQS Consumer. + +Processes messages from SQS queue, calls Agent Server, sends response to Telegram. +""" +import asyncio +import json +from typing import Any + +import httpx +from telegram import Bot, Update +from telegram.constants import ParseMode, ChatAction +from telegram.helpers import escape_markdown +from telegram.error import BadRequest + +from config import Config + + +def lambda_handler(event: dict, context: Any) -> dict: + """SQS Consumer Lambda entry point.""" + for record in event['Records']: + message_data = json.loads(record['body']) + asyncio.run(process_message(message_data)) + + return {'statusCode': 200} + + +async def process_message(message_data: dict) -> None: + """Process single message from SQS queue.""" + config = Config.from_env() + bot = Bot(config.telegram_token) + + # Reconstruct Update object from stored data + update = Update.de_json(message_data['telegram_update'], bot) + message = update.message or update.edited_message + + # Send typing indicator + await bot.send_chat_action( + chat_id=message.chat_id, + action=ChatAction.TYPING, + message_thread_id=message.message_thread_id, + ) + + # Call Agent Server + try: + async with httpx.AsyncClient(timeout=600.0) as client: + response = await client.post( + config.agent_server_url, + headers={ + 'Authorization': f'Bearer {config.auth_token}', + 'Content-Type': 'application/json', + }, + json={ + 'user_message': message.text, + 'chat_id': str(message.chat_id), + 'thread_id': str(message.message_thread_id) if message.message_thread_id else None, + }, + ) + result = response.json() + + except httpx.TimeoutException: + await bot.send_message( + chat_id=message.chat_id, + text="Request timed out.", + message_thread_id=message.message_thread_id, + ) + raise # Re-raise to trigger SQS retry + + except Exception as e: + await bot.send_message( + chat_id=message.chat_id, + text=f"Error: {str(e)[:200]}", + message_thread_id=message.message_thread_id, + ) + raise # Re-raise to trigger SQS retry + + # Format response + if result.get('is_error'): + text = f"Agent error: {result.get('error_message', 'Unknown')}" + else: + text = result.get('response', 'No response') + + if len(text) > 4000: + text = text[:4000] + "\n\n... (truncated)" + + # Send response to Telegram + try: + await bot.send_message( + chat_id=message.chat_id, + text=text, + parse_mode=ParseMode.MARKDOWN_V2, + message_thread_id=message.message_thread_id, + reply_to_message_id=message.message_id, + ) + except BadRequest as e: + if "parse entities" in str(e).lower(): + safe_text = escape_markdown(text, version=2) + await bot.send_message( + chat_id=message.chat_id, + text=safe_text, + parse_mode=ParseMode.MARKDOWN_V2, + message_thread_id=message.message_thread_id, + reply_to_message_id=message.message_id, + ) + else: + raise diff --git a/agent-sdk-client/handler.py b/agent-sdk-client/handler.py index 435a6f5..ddd3f3b 100644 --- a/agent-sdk-client/handler.py +++ b/agent-sdk-client/handler.py @@ -1,102 +1,64 @@ -"""Lambda handler for sdk-client. +"""Lambda handler for sdk-client (Producer). -Receives Telegram webhook, calls agent-server, sends response back. +Receives Telegram webhook, writes to SQS, returns 200 immediately. """ -import asyncio import json +import os from typing import Any -import httpx +import boto3 from telegram import Bot, Update -from telegram.constants import ParseMode, ChatAction -from telegram.helpers import escape_markdown -from telegram.error import BadRequest from config import Config +# Reuse SQS client across invocations +_sqs_client = None + + +def _get_sqs_client(): + """Get or create SQS client singleton.""" + global _sqs_client + if _sqs_client is None: + _sqs_client = boto3.client('sqs') + return _sqs_client + def lambda_handler(event: dict, context: Any) -> dict: - """Lambda entry point.""" + """Lambda entry point - Producer. + + Validates Telegram message and writes to SQS queue. + Returns 200 immediately without waiting for processing. + """ try: body = json.loads(event.get('body', '{}')) except json.JSONDecodeError: return {'statusCode': 200} - asyncio.run(process_webhook(body)) - return {'statusCode': 200} - - -async def process_webhook(body: dict) -> None: - """Process Telegram webhook payload.""" config = Config.from_env() - bot = Bot(config.telegram_token) + # Quick validation - parse update to check if it's a valid message + bot = Bot(config.telegram_token) update = Update.de_json(body, bot) + if not update: - return + return {'statusCode': 200} message = update.message or update.edited_message if not message or not message.text: - return + return {'statusCode': 200} - await bot.send_chat_action( - chat_id=message.chat_id, - action=ChatAction.TYPING, - message_thread_id=message.message_thread_id, + # Write to SQS for async processing + sqs = _get_sqs_client() + sqs.send_message( + QueueUrl=config.queue_url, + MessageBody=json.dumps({ + 'telegram_update': body, + 'chat_id': message.chat_id, + 'message_id': message.message_id, + 'text': message.text, + 'thread_id': message.message_thread_id, + }), ) - try: - async with httpx.AsyncClient(timeout=600.0) as client: - response = await client.post( - config.agent_server_url, - headers={ - 'Authorization': f'Bearer {config.auth_token}', - 'Content-Type': 'application/json', - }, - json={ - 'user_message': message.text, - 'chat_id': str(message.chat_id), - 'thread_id': str(message.message_thread_id) if message.message_thread_id else None, - }, - ) - result = response.json() - - except httpx.TimeoutException: - await bot.send_message(chat_id=message.chat_id, text="Request timed out.", - message_thread_id=message.message_thread_id) - return - except Exception as e: - await bot.send_message(chat_id=message.chat_id, text=f"Error: {str(e)[:200]}", - message_thread_id=message.message_thread_id) - return - - if result.get('is_error'): - text = f"Agent error: {result.get('error_message', 'Unknown')}" - else: - text = result.get('response', 'No response') - - if len(text) > 4000: - text = text[:4000] + "\n\n... (truncated)" - - # Try MARKDOWN_V2 first, fallback with escape on parse errors - try: - await bot.send_message( - chat_id=message.chat_id, - text=text, - parse_mode=ParseMode.MARKDOWN_V2, - message_thread_id=message.message_thread_id, - reply_to_message_id=message.message_id, - ) - except BadRequest as e: - if "parse entities" in str(e).lower(): - print(f"[MARKDOWN_V2] Parse error: {e}, retrying with escaped text") - safe_text = escape_markdown(text, version=2) - await bot.send_message( - chat_id=message.chat_id, - text=safe_text, - parse_mode=ParseMode.MARKDOWN_V2, - message_thread_id=message.message_thread_id, - reply_to_message_id=message.message_id, - ) - else: - raise + # Return 200 immediately - processing happens async in consumer + return {'statusCode': 200} diff --git a/template.yaml b/template.yaml index d8fb472..6efcca8 100644 --- a/template.yaml +++ b/template.yaml @@ -67,6 +67,24 @@ Resources: AttributeName: ttl Enabled: true + # SQS Task Queue + TaskQueue: + Type: AWS::SQS::Queue + Properties: + QueueName: !Sub '${AWS::StackName}-TaskQueue' + VisibilityTimeout: 900 # 15 minutes = Lambda timeout + MessageRetentionPeriod: 1209600 # 14 days + RedrivePolicy: + deadLetterTargetArn: !GetAtt DLQueue.Arn + maxReceiveCount: 3 # Retry 3 times then move to DLQ + + # Dead Letter Queue + DLQueue: + Type: AWS::SQS::Queue + Properties: + QueueName: !Sub '${AWS::StackName}-DLQueue' + MessageRetentionPeriod: 1209600 # 14 days + # Agent Server Lambda (Container) AgentServerFunction: Type: AWS::Serverless::Function @@ -108,18 +126,23 @@ Resources: FunctionUrlConfig: AuthType: NONE - # SDK Client Lambda (ZIP) + # SDK Client Lambda - Producer (writes to SQS, returns 200 immediately) SdkClientFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./agent-sdk-client Handler: handler.lambda_handler Runtime: python3.12 + Timeout: 30 # Producer should complete quickly Environment: Variables: TELEGRAM_BOT_TOKEN: !Ref TelegramBotToken AGENT_SERVER_URL: !GetAtt AgentServerFunctionUrl.FunctionUrl SDK_CLIENT_AUTH_TOKEN: !Ref AuthToken + QUEUE_URL: !Ref TaskQueue + Policies: + - SQSSendMessagePolicy: + QueueName: !GetAtt TaskQueue.QueueName Events: Webhook: Type: HttpApi @@ -127,6 +150,45 @@ Resources: Path: /webhook Method: POST + # Consumer Lambda - processes messages from SQS + ConsumerFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: ./agent-sdk-client + Handler: consumer.lambda_handler + Runtime: python3.12 + Timeout: 900 # 15 minutes for long Agent Server calls + Environment: + Variables: + TELEGRAM_BOT_TOKEN: !Ref TelegramBotToken + AGENT_SERVER_URL: !GetAtt AgentServerFunctionUrl.FunctionUrl + SDK_CLIENT_AUTH_TOKEN: !Ref AuthToken + QUEUE_URL: !Ref TaskQueue + Events: + SQSEvent: + Type: SQS + Properties: + Queue: !GetAtt TaskQueue.Arn + BatchSize: 1 # Process one message at a time + MaximumBatchingWindowInSeconds: 0 # Process immediately + + # DLQ Alarm - alert when messages land in DLQ + DLQAlarm: + Type: AWS::CloudWatch::Alarm + Properties: + AlarmName: !Sub '${AWS::StackName}-DLQ-Messages' + AlarmDescription: Alert when messages land in Dead Letter Queue + MetricName: ApproximateNumberOfMessagesVisible + Namespace: AWS/SQS + Statistic: Sum + Period: 300 + EvaluationPeriods: 1 + Threshold: 1 + ComparisonOperator: GreaterThanOrEqualToThreshold + Dimensions: + - Name: QueueName + Value: !GetAtt DLQueue.QueueName + Outputs: WebhookUrl: Description: Telegram Webhook URL @@ -134,3 +196,9 @@ Outputs: AgentServerUrl: Description: Agent Server Function URL Value: !GetAtt AgentServerFunctionUrl.FunctionUrl + TaskQueueUrl: + Description: Task Queue URL + Value: !Ref TaskQueue + DLQueueUrl: + Description: Dead Letter Queue URL + Value: !Ref DLQueue From ee12d84c352dddf2bc8c8c24622d540a81bdecbe Mon Sep 17 00:00:00 2001 From: BukeLy Date: Sun, 4 Jan 2026 21:15:22 +0800 Subject: [PATCH 2/5] =?UTF-8?q?fix:=20=E6=B7=BB=E5=8A=A0=20ConsumerFunctio?= =?UTF-8?q?n=20=E7=9A=84=20SQS=20=E6=9D=83=E9=99=90=E7=AD=96=E7=95=A5?= =?UTF-8?q?=E5=B9=B6=E5=A2=9E=E5=8A=A0=20HTTP=20=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E7=A0=81=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - template.yaml: 为 ConsumerFunction 添加 SQSPollerPolicy 权限 - consumer.py: 添加 response.raise_for_status() 检查 HTTP 错误状态码 --- agent-sdk-client/consumer.py | 1 + template.yaml | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/agent-sdk-client/consumer.py b/agent-sdk-client/consumer.py index d074599..4548df6 100644 --- a/agent-sdk-client/consumer.py +++ b/agent-sdk-client/consumer.py @@ -55,6 +55,7 @@ async def process_message(message_data: dict) -> None: 'thread_id': str(message.message_thread_id) if message.message_thread_id else None, }, ) + response.raise_for_status() result = response.json() except httpx.TimeoutException: diff --git a/template.yaml b/template.yaml index 6efcca8..8285abc 100644 --- a/template.yaml +++ b/template.yaml @@ -164,6 +164,12 @@ Resources: AGENT_SERVER_URL: !GetAtt AgentServerFunctionUrl.FunctionUrl SDK_CLIENT_AUTH_TOKEN: !Ref AuthToken QUEUE_URL: !Ref TaskQueue + SESSION_TABLE: !Ref SessionTable + Policies: + - SQSPollerPolicy: + QueueName: !GetAtt TaskQueue.QueueName + - DynamoDBCrudPolicy: + TableName: !Ref SessionTable Events: SQSEvent: Type: SQS From e869aeec1ba205b45aa8ed0d652cb618149e9f9f Mon Sep 17 00:00:00 2001 From: BukeLy Date: Sun, 4 Jan 2026 21:37:12 +0800 Subject: [PATCH 3/5] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8Dcode=20review?= =?UTF-8?q?=E7=9A=843=E4=B8=AA=E9=AB=98=E4=BC=98=E5=85=88=E7=BA=A7?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 修复评论4 (95分) - 异常重试导致重复Telegram消息 - consumer.py: 移除一般异常的re-raise,只对TimeoutException重试 - 避免已发送错误消息后还会重试导致用户收到3次重复消息 2. 修复评论3 (92分) - SQS send_message缺少错误处理 - handler.py: 添加完整的SQS错误处理和恢复机制 - 区分可恢复(限流)和不可恢复错误(权限、队列不存在) - 发送CloudWatch自定义指标用于监控 - SQS失败仍返回200给Telegram,防止webhook重试雪崩 3. 修复评论1 (85分) - CloudWatch告警缺少AlarmActions - template.yaml: 添加SNS Topic用于告警通知 - DLQAlarm: 配置AlarmActions发送到SNS - 新增ProducerSQSErrorAlarm和ProducerLambdaErrorAlarm 其他改进: - Producer Lambda超时从30秒改为10秒(评论6: 75分) - handler.py添加CloudWatch权限策略 --- agent-sdk-client/consumer.py | 5 +- agent-sdk-client/handler.py | 131 ++++++++++++++++++++++++++++++++--- template.yaml | 57 ++++++++++++++- 3 files changed, 179 insertions(+), 14 deletions(-) diff --git a/agent-sdk-client/consumer.py b/agent-sdk-client/consumer.py index 4548df6..c75bed9 100644 --- a/agent-sdk-client/consumer.py +++ b/agent-sdk-client/consumer.py @@ -64,7 +64,7 @@ async def process_message(message_data: dict) -> None: text="Request timed out.", message_thread_id=message.message_thread_id, ) - raise # Re-raise to trigger SQS retry + raise # Re-raise to trigger SQS retry for transient errors except Exception as e: await bot.send_message( @@ -72,7 +72,8 @@ async def process_message(message_data: dict) -> None: text=f"Error: {str(e)[:200]}", message_thread_id=message.message_thread_id, ) - raise # Re-raise to trigger SQS retry + # Don't re-raise for general exceptions - error message already sent + # to user, retrying would cause duplicate messages # Format response if result.get('is_error'): diff --git a/agent-sdk-client/handler.py b/agent-sdk-client/handler.py index ddd3f3b..f681358 100644 --- a/agent-sdk-client/handler.py +++ b/agent-sdk-client/handler.py @@ -3,16 +3,21 @@ Receives Telegram webhook, writes to SQS, returns 200 immediately. """ import json -import os +import logging from typing import Any import boto3 +from botocore.exceptions import ClientError from telegram import Bot, Update from config import Config +logger = logging.getLogger() +logger.setLevel(logging.INFO) + # Reuse SQS client across invocations _sqs_client = None +_cloudwatch_client = None def _get_sqs_client(): @@ -23,6 +28,98 @@ def _get_sqs_client(): return _sqs_client +def _get_cloudwatch_client(): + """Get or create CloudWatch client singleton.""" + global _cloudwatch_client + if _cloudwatch_client is None: + _cloudwatch_client = boto3.client('cloudwatch') + return _cloudwatch_client + + +def _send_metric(metric_name: str, value: float = 1.0): + """Send custom metric to CloudWatch (non-blocking).""" + try: + cloudwatch = _get_cloudwatch_client() + cloudwatch.put_metric_data( + Namespace='OmniCloudAgent/Producer', + MetricData=[ + { + 'MetricName': metric_name, + 'Value': value, + 'Unit': 'Count', + } + ], + ) + except Exception as e: + logger.warning(f"Failed to send CloudWatch metric: {e}") + + +def _send_to_sqs_safe(sqs, queue_url: str, message_body: dict) -> bool: + """Send message to SQS with comprehensive error handling. + + Returns: + True if message sent successfully, False otherwise. + """ + try: + response = sqs.send_message( + QueueUrl=queue_url, MessageBody=json.dumps(message_body) + ) + message_id = response.get('MessageId', 'unknown') + logger.info(f"Message sent to SQS: {message_id}") + _send_metric('SQSMessageSent') + return True + + except sqs.exceptions.QueueDoesNotExist: + logger.error( + f"CRITICAL: Queue does not exist: {queue_url}", + extra={'queue_url': queue_url}, + ) + _send_metric('SQSError.QueueNotFound') + return False + + except ClientError as e: + error_code = e.response.get('Error', {}).get('Code', 'Unknown') + error_msg = e.response.get('Error', {}).get('Message', '') + + if error_code in ('AccessDenied', 'AccessDeniedException'): + logger.error( + f"CRITICAL: IAM permission denied for SQS: {error_msg}", + extra={'error_code': error_code, 'queue_url': queue_url}, + ) + _send_metric('SQSError.AccessDenied') + + elif error_code in ('ThrottlingException', 'RequestThrottled'): + logger.warning( + f"SQS throttled (will be retried by consumer): {error_msg}", + extra={'error_code': error_code}, + ) + _send_metric('SQSError.Throttled') + + elif error_code == 'InvalidParameterValue': + logger.error( + f"CRITICAL: Invalid SQS parameter: {error_msg}", + extra={'error_code': error_code, 'message_body': message_body}, + ) + _send_metric('SQSError.InvalidParameter') + + else: + logger.error( + f"SQS ClientError [{error_code}]: {error_msg}", + extra={'error_code': error_code, 'error_msg': error_msg}, + ) + _send_metric(f'SQSError.{error_code}') + + return False + + except Exception as e: + logger.exception( + f"Unexpected error sending to SQS: {e}", + extra={'exception_type': type(e).__name__}, + ) + _send_metric('SQSError.Unexpected') + return False + + def lambda_handler(event: dict, context: Any) -> dict: """Lambda entry point - Producer. @@ -32,6 +129,7 @@ def lambda_handler(event: dict, context: Any) -> dict: try: body = json.loads(event.get('body', '{}')) except json.JSONDecodeError: + logger.warning('Invalid JSON in webhook body') return {'statusCode': 200} config = Config.from_env() @@ -41,24 +139,35 @@ def lambda_handler(event: dict, context: Any) -> dict: update = Update.de_json(body, bot) if not update: + logger.debug('Ignoring non-update webhook') return {'statusCode': 200} message = update.message or update.edited_message if not message or not message.text: + logger.debug('Ignoring webhook without text message') return {'statusCode': 200} # Write to SQS for async processing sqs = _get_sqs_client() - sqs.send_message( - QueueUrl=config.queue_url, - MessageBody=json.dumps({ - 'telegram_update': body, - 'chat_id': message.chat_id, - 'message_id': message.message_id, - 'text': message.text, - 'thread_id': message.message_thread_id, - }), - ) + message_body = { + 'telegram_update': body, + 'chat_id': message.chat_id, + 'message_id': message.message_id, + 'text': message.text, + 'thread_id': message.message_thread_id, + } + + success = _send_to_sqs_safe(sqs, config.queue_url, message_body) # Return 200 immediately - processing happens async in consumer + # Note: Even if SQS fails, we return 200 to prevent Telegram webhook retries + if not success: + logger.error( + f'Failed to send message to SQS but returning 200 to Telegram', + extra={ + 'chat_id': message.chat_id, + 'message_id': message.message_id, + }, + ) + return {'statusCode': 200} diff --git a/template.yaml b/template.yaml index 8285abc..f8e40d3 100644 --- a/template.yaml +++ b/template.yaml @@ -85,6 +85,13 @@ Resources: QueueName: !Sub '${AWS::StackName}-DLQueue' MessageRetentionPeriod: 1209600 # 14 days + # SNS Topic for alarm notifications + AlarmTopic: + Type: AWS::SNS::Topic + Properties: + TopicName: !Sub '${AWS::StackName}-Alarms' + DisplayName: OmniCloud Agent Alarms + # Agent Server Lambda (Container) AgentServerFunction: Type: AWS::Serverless::Function @@ -133,7 +140,7 @@ Resources: CodeUri: ./agent-sdk-client Handler: handler.lambda_handler Runtime: python3.12 - Timeout: 30 # Producer should complete quickly + Timeout: 10 # Producer should complete quickly (<1s for message validation + SQS write) Environment: Variables: TELEGRAM_BOT_TOKEN: !Ref TelegramBotToken @@ -143,6 +150,11 @@ Resources: Policies: - SQSSendMessagePolicy: QueueName: !GetAtt TaskQueue.QueueName + - Statement: + - Effect: Allow + Action: + - cloudwatch:PutMetricData + Resource: '*' Events: Webhook: Type: HttpApi @@ -191,10 +203,48 @@ Resources: EvaluationPeriods: 1 Threshold: 1 ComparisonOperator: GreaterThanOrEqualToThreshold + AlarmActions: + - !Ref AlarmTopic Dimensions: - Name: QueueName Value: !GetAtt DLQueue.QueueName + # Producer SQS send error alarm + ProducerSQSErrorAlarm: + Type: AWS::CloudWatch::Alarm + Properties: + AlarmName: !Sub '${AWS::StackName}-Producer-SQS-Errors' + AlarmDescription: Alert when Producer fails to send messages to SQS + MetricName: SQSError.QueueNotFound + Namespace: OmniCloudAgent/Producer + Statistic: Sum + Period: 60 + EvaluationPeriods: 1 + Threshold: 1 + ComparisonOperator: GreaterThanOrEqualToThreshold + AlarmActions: + - !Ref AlarmTopic + TreatMissingData: notBreaching + + # Producer Lambda error alarm + ProducerLambdaErrorAlarm: + Type: AWS::CloudWatch::Alarm + Properties: + AlarmName: !Sub '${AWS::StackName}-Producer-Lambda-Errors' + AlarmDescription: Alert on Producer Lambda exceptions + MetricName: Errors + Namespace: AWS/Lambda + Statistic: Sum + Period: 60 + EvaluationPeriods: 1 + Threshold: 1 + ComparisonOperator: GreaterThanOrEqualToThreshold + AlarmActions: + - !Ref AlarmTopic + Dimensions: + - Name: FunctionName + Value: !Ref SdkClientFunction + Outputs: WebhookUrl: Description: Telegram Webhook URL @@ -208,3 +258,8 @@ Outputs: DLQueueUrl: Description: Dead Letter Queue URL Value: !Ref DLQueue + AlarmTopicArn: + Description: SNS Topic for CloudWatch Alarm notifications + Value: !Ref AlarmTopic + Export: + Name: !Sub '${AWS::StackName}-AlarmTopic' From d18e3bd699076c4cd7fa007f09338d4f9f62637c Mon Sep 17 00:00:00 2001 From: BukeLy Date: Sun, 4 Jan 2026 22:24:20 +0800 Subject: [PATCH 4/5] =?UTF-8?q?docs:=20=E6=B7=BB=E5=8A=A0=E6=9E=B6?= =?UTF-8?q?=E6=9E=84=E8=B0=83=E6=95=B4=E7=9A=84CHANGELOG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..1405592 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,16 @@ +# Changelog + +## [Unreleased] + +### Changed +- **架构调整:从同步到异步处理模式** + - 重构SDK Client为SQS异步架构 + - Producer Lambda(SdkClientFunction):接收Telegram webhook,立即返回200,消息写入SQS队列 + - Consumer Lambda(ConsumerFunction):异步处理SQS消息,调用Agent Server,返回结果给Telegram + - 解决Telegram 30秒webhook超时问题,支持长运行任务 + +### Added +- SQS任务队列(TaskQueue)和死信队列(DLQueue) +- SNS告警主题(AlarmTopic)用于CloudWatch通知 +- CloudWatch告警和自定义指标监控 +- DynamoDB会话表用于多轮对话状态管理 From e6e64cbfe31120a5e8937c246a5ca4f65ca831ae Mon Sep 17 00:00:00 2001 From: BukeLy Date: Sun, 4 Jan 2026 23:32:58 +0800 Subject: [PATCH 5/5] =?UTF-8?q?fix:=20=E5=A2=9E=E5=BC=BA=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E5=A4=84=E7=90=86=E5=92=8CIssue#12=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - consumer.py: * 添加lambda_handler的JSON解析异常处理 * 包装asyncio.run的异常捕获和重新抛出 * 初始化result默认值防止未定义错误 * 添加message为None的检查 * 增强异常处理的日志记录 * 修复Issue#12:使用 or 运算符处理空response - handler.py: * 包装asyncio.run的异常处理,返回结构化错误响应 * 添加session操作的异常处理 * 关键操作失败返回500,非关键操作log后继续 * 完整的日志记录便于调试 确保系统在异常情况下不会导致SQS消息卡死,并正确处理Agent返回空响应的场景。 --- agent-sdk-client/consumer.py | 57 +++++++++++++++++++++++++++++------- agent-sdk-server/handler.py | 52 ++++++++++++++++++++++++++------ 2 files changed, 89 insertions(+), 20 deletions(-) diff --git a/agent-sdk-client/consumer.py b/agent-sdk-client/consumer.py index c75bed9..673bb6c 100644 --- a/agent-sdk-client/consumer.py +++ b/agent-sdk-client/consumer.py @@ -18,14 +18,32 @@ def lambda_handler(event: dict, context: Any) -> dict: """SQS Consumer Lambda entry point.""" for record in event['Records']: - message_data = json.loads(record['body']) - asyncio.run(process_message(message_data)) + try: + message_data = json.loads(record['body']) + except json.JSONDecodeError as e: + # Invalid message format - log and skip + import logging + logger = logging.getLogger() + logger.error(f"Failed to parse SQS message: {e}") + continue + + try: + asyncio.run(process_message(message_data)) + except Exception as e: + # Log and let SQS retry on failure + import logging + logger = logging.getLogger() + logger.exception(f"Failed to process message: {e}") + raise # Re-raise to fail the batch item return {'statusCode': 200} async def process_message(message_data: dict) -> None: """Process single message from SQS queue.""" + import logging + logger = logging.getLogger() + config = Config.from_env() bot = Bot(config.telegram_token) @@ -33,6 +51,10 @@ async def process_message(message_data: dict) -> None: update = Update.de_json(message_data['telegram_update'], bot) message = update.message or update.edited_message + if not message: + logger.warning("Received update with no message or edited_message") + return + # Send typing indicator await bot.send_chat_action( chat_id=message.chat_id, @@ -40,6 +62,14 @@ async def process_message(message_data: dict) -> None: message_thread_id=message.message_thread_id, ) + # Initialize result with default error response + # This ensures result is always defined, even if Agent Server call fails + result = { + 'response': '', + 'is_error': True, + 'error_message': 'Failed to get response from Agent Server' + } + # Call Agent Server try: async with httpx.AsyncClient(timeout=600.0) as client: @@ -59,6 +89,7 @@ async def process_message(message_data: dict) -> None: result = response.json() except httpx.TimeoutException: + logger.warning(f"Agent Server timeout for chat_id={message.chat_id}") await bot.send_message( chat_id=message.chat_id, text="Request timed out.", @@ -67,19 +98,23 @@ async def process_message(message_data: dict) -> None: raise # Re-raise to trigger SQS retry for transient errors except Exception as e: - await bot.send_message( - chat_id=message.chat_id, - text=f"Error: {str(e)[:200]}", - message_thread_id=message.message_thread_id, - ) - # Don't re-raise for general exceptions - error message already sent - # to user, retrying would cause duplicate messages + logger.exception(f"Agent Server error for chat_id={message.chat_id}") + error_text = f"Error: {str(e)[:200]}" + try: + await bot.send_message( + chat_id=message.chat_id, + text=error_text, + message_thread_id=message.message_thread_id, + ) + except Exception as send_error: + logger.error(f"Failed to send error message to Telegram: {send_error}") + # Don't re-raise - error message already sent to user, retrying would cause duplicate messages - # Format response + # Format response (result is guaranteed to be defined now) if result.get('is_error'): text = f"Agent error: {result.get('error_message', 'Unknown')}" else: - text = result.get('response', 'No response') + text = result.get('response') or 'No response' if len(text) > 4000: text = text[:4000] + "\n\n... (truncated)" diff --git a/agent-sdk-server/handler.py b/agent-sdk-server/handler.py index f90622c..b83fbf3 100644 --- a/agent-sdk-server/handler.py +++ b/agent-sdk-server/handler.py @@ -4,12 +4,16 @@ """ import asyncio import json +import logging from typing import Any from config import Config from session_store import SessionStore from agent_session import process_message +logger = logging.getLogger() +logger.setLevel(logging.INFO) + def lambda_handler(event: dict, context: Any) -> dict: """Lambda entry point. @@ -76,26 +80,56 @@ def lambda_handler(event: dict, context: Any) -> dict: # Download session files if resuming if session_id: - store.download_session_files(session_id) + try: + store.download_session_files(session_id) + except Exception as e: + logger.error(f"Failed to download session files for session_id={session_id}: {e}") + return { + 'statusCode': 500, + 'body': json.dumps({'error': 'Failed to download session'}) + } # Process message with Agent SDK - result = asyncio.run(process_message( - user_message=user_message, - session_id=session_id, - model=model, - )) + try: + result = asyncio.run(process_message( + user_message=user_message, + session_id=session_id, + model=model, + )) + except Exception as e: + logger.exception(f"Agent SDK processing failed for chat_id={chat_id}") + return { + 'statusCode': 500, + 'headers': {'Content-Type': 'application/json'}, + 'body': json.dumps({ + 'response': '', + 'session_id': session_id, + 'cost_usd': 0.0, + 'num_turns': 0, + 'is_error': True, + 'error_message': f'Agent processing error: {str(e)[:200]}' + }) + } # Get session_id from result (SDK generates it for new sessions) result_session_id = result.get('session_id', '') # Save session mapping if new session if result_session_id and result_session_id != session_id: - store.save_session_id(chat_id, thread_id, result_session_id) + try: + store.save_session_id(chat_id, thread_id, result_session_id) + except Exception as e: + logger.error(f"Failed to save session mapping for chat_id={chat_id}: {e}") + # Continue anyway - session can still work with S3 files # Upload session files if result_session_id: - store.upload_session_files(result_session_id) - store.update_session_timestamp(chat_id, thread_id) + try: + store.upload_session_files(result_session_id) + store.update_session_timestamp(chat_id, thread_id) + except Exception as e: + logger.error(f"Failed to upload session files for session_id={result_session_id}: {e}") + # Continue anyway - result is already available to return return { 'statusCode': 200,