Conversation
解决 Telegram Webhook 超时重试导致重复响应的问题: - Producer Lambda 接收 Webhook 后立即返回 200(<1s) - Consumer Lambda 从 SQS 异步消费,调用 Agent Server - 添加死信队列(DLQ)和 CloudWatch 告警
There was a problem hiding this comment.
Pull request overview
This PR refactors the SDK Client architecture from a synchronous to an asynchronous SQS-based system to solve Telegram webhook timeout issues. The change separates message reception from processing, ensuring webhooks return within 1 second to prevent Telegram's ~27-second timeout retries.
Key Changes:
- Introduced SQS queue architecture with Producer (webhook receiver) and Consumer (async processor) Lambda functions
- Added Dead Letter Queue (DLQ) with CloudWatch alarms for failure monitoring
- Split the original handler.py logic into producer (handler.py) and consumer (consumer.py) components
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
| template.yaml | Added SQS TaskQueue, DLQ, Consumer Lambda, and CloudWatch alarm; updated Producer Lambda configuration with SQS policies |
| agent-sdk-client/handler.py | Refactored to producer pattern - validates webhooks and enqueues to SQS, returns 200 immediately |
| agent-sdk-client/consumer.py | New consumer Lambda - processes SQS messages, calls Agent Server, sends responses to Telegram |
| agent-sdk-client/config.py | Added queue_url configuration field |
| README.md | Updated architecture diagrams and documentation to reflect SQS async architecture |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| DLQAlarm: | ||
| Type: AWS::CloudWatch::Alarm | ||
| Properties: | ||
| AlarmName: !Sub '${AWS::StackName}-DLQ-Messages' | ||
| AlarmDescription: Alert when messages land in Dead Letter Queue | ||
| MetricName: ApproximateNumberOfMessagesVisible | ||
| Namespace: AWS/SQS | ||
| Statistic: Sum | ||
| Period: 300 | ||
| EvaluationPeriods: 1 | ||
| Threshold: 1 | ||
| ComparisonOperator: GreaterThanOrEqualToThreshold | ||
| Dimensions: | ||
| - Name: QueueName | ||
| Value: !GetAtt DLQueue.QueueName |
There was a problem hiding this comment.
The CloudWatch alarm lacks an AlarmActions configuration. The alarm will trigger but won't send notifications anywhere. Consider adding an SNS topic ARN to AlarmActions so that operators are actually notified when messages land in the DLQ.
agent-sdk-client/handler.py
Outdated
| # Write to SQS for async processing | ||
| sqs = _get_sqs_client() | ||
| sqs.send_message( | ||
| QueueUrl=config.queue_url, | ||
| MessageBody=json.dumps({ | ||
| 'telegram_update': body, | ||
| 'chat_id': message.chat_id, | ||
| 'message_id': message.message_id, | ||
| 'text': message.text, | ||
| 'thread_id': message.message_thread_id, | ||
| }), | ||
| ) |
There was a problem hiding this comment.
The SQS send_message call lacks error handling. If the SQS call fails (e.g., due to network issues, permissions, or queue not existing), the function will raise an unhandled exception. This should be caught and handled gracefully, possibly still returning a 200 status to Telegram to avoid webhook retries, while logging the error for debugging.
agent-sdk-client/consumer.py
Outdated
| except Exception as e: | ||
| await bot.send_message( | ||
| chat_id=message.chat_id, | ||
| text=f"Error: {str(e)[:200]}", | ||
| message_thread_id=message.message_thread_id, | ||
| ) | ||
| raise # Re-raise to trigger SQS retry |
There was a problem hiding this comment.
After sending an error message to the user, the exception is re-raised to trigger SQS retry. This will result in the user receiving multiple duplicate error messages (up to 3 retries). Consider only re-raising for transient errors (like TimeoutException) and not for general exceptions, or avoid sending the error message to Telegram before confirming it's non-retryable.
| from typing import Any | ||
|
|
||
| import httpx | ||
| import boto3 |
There was a problem hiding this comment.
The boto3 dependency is not included in requirements.txt. While boto3 is available in the Lambda Python runtime by default, it's a best practice to explicitly declare all dependencies in requirements.txt to ensure version consistency and make local development/testing easier.
| # Quick validation - parse update to check if it's a valid message | ||
| bot = Bot(config.telegram_token) | ||
| update = Update.de_json(body, bot) |
There was a problem hiding this comment.
The Bot object is created unnecessarily just for validation. Creating a Bot instance establishes connections and can be relatively expensive. Consider using Update.de_json with None as the bot parameter for validation, or implement a lighter validation approach that doesn't require Bot instantiation.
| ConsumerFunction: | ||
| Type: AWS::Serverless::Function | ||
| Properties: | ||
| CodeUri: ./agent-sdk-client | ||
| Handler: consumer.lambda_handler | ||
| Runtime: python3.12 | ||
| Timeout: 900 # 15 minutes for long Agent Server calls | ||
| Environment: | ||
| Variables: | ||
| TELEGRAM_BOT_TOKEN: !Ref TelegramBotToken | ||
| AGENT_SERVER_URL: !GetAtt AgentServerFunctionUrl.FunctionUrl | ||
| SDK_CLIENT_AUTH_TOKEN: !Ref AuthToken | ||
| QUEUE_URL: !Ref TaskQueue | ||
| Events: | ||
| SQSEvent: | ||
| Type: SQS | ||
| Properties: | ||
| Queue: !GetAtt TaskQueue.Arn | ||
| BatchSize: 1 # Process one message at a time | ||
| MaximumBatchingWindowInSeconds: 0 # Process immediately |
There was a problem hiding this comment.
The ConsumerFunction is missing IAM policies. The consumer needs permissions to receive and delete messages from the SQS queue. While SAM may auto-generate some policies based on the SQS event source, it's better to explicitly add SQSPollerPolicy to ensure proper permissions are granted.
Code reviewFound 2 issues:
ConsumerFunction has SQS event source but no Policies:
- SQSPollerPolicy:
QueueName: !GetAtt TaskQueue.QueueName
The previous implementation (commit ff4ca85) had DynamoDB message deduplication to prevent webhook duplicates. Current code raises exceptions on failure (L66, L74), triggering SQS retry up to 3 times. Each retry resends the Telegram message, causing user to receive duplicate error messages. Need to restore deduplication logic or use FIFO queue with deduplication enabled. 🤖 Generated with Claude Code
|
- template.yaml: 为 ConsumerFunction 添加 SQSPollerPolicy 权限 - consumer.py: 添加 response.raise_for_status() 检查 HTTP 错误状态码
1. 修复评论4 (95分) - 异常重试导致重复Telegram消息 - consumer.py: 移除一般异常的re-raise,只对TimeoutException重试 - 避免已发送错误消息后还会重试导致用户收到3次重复消息 2. 修复评论3 (92分) - SQS send_message缺少错误处理 - handler.py: 添加完整的SQS错误处理和恢复机制 - 区分可恢复(限流)和不可恢复错误(权限、队列不存在) - 发送CloudWatch自定义指标用于监控 - SQS失败仍返回200给Telegram,防止webhook重试雪崩 3. 修复评论1 (85分) - CloudWatch告警缺少AlarmActions - template.yaml: 添加SNS Topic用于告警通知 - DLQAlarm: 配置AlarmActions发送到SNS - 新增ProducerSQSErrorAlarm和ProducerLambdaErrorAlarm 其他改进: - Producer Lambda超时从30秒改为10秒(评论6: 75分) - handler.py添加CloudWatch权限策略
- consumer.py: * 添加lambda_handler的JSON解析异常处理 * 包装asyncio.run的异常捕获和重新抛出 * 初始化result默认值防止未定义错误 * 添加message为None的检查 * 增强异常处理的日志记录 * 修复Issue#12:使用 or 运算符处理空response - handler.py: * 包装asyncio.run的异常处理,返回结构化错误响应 * 添加session操作的异常处理 * 关键操作失败返回500,非关键操作log后继续 * 完整的日志记录便于调试 确保系统在异常情况下不会导致SQS消息卡死,并正确处理Agent返回空响应的场景。
Summary
Changes
架构设计
异常处理增强
Issue#12部分修复
Test Plan
Related Issues
Fixes #1
Partially fixes #12 (防止Telegram发送失败,但系统命令仍需改进)