Skip to content

feat: 重构 SDK Client 为 SQS 异步架构#11

Merged
BukeLy merged 6 commits intomainfrom
feature/sqs-async-architecture
Jan 4, 2026
Merged

feat: 重构 SDK Client 为 SQS 异步架构#11
BukeLy merged 6 commits intomainfrom
feature/sqs-async-architecture

Conversation

@BukeLy
Copy link
Owner

@BukeLy BukeLy commented Jan 4, 2026

Summary

  • 重构为 SQS 异步架构解决 Telegram 30s 超时问题
  • Producer Lambda 接收 Webhook 后写入 SQS,立即返回 200(响应时间 <1s)
  • Consumer Lambda 从 SQS 异步消费,调用 Agent Server,发送响应给 Telegram
  • 添加死信队列(DLQ)和 CloudWatch 告警用于故障监控
  • 增强异常处理和错误恢复机制,防止消息卡死
  • 部分修复 Issue#12:防止空响应导致Telegram发送失败

Changes

架构设计

  • Producer-Consumer 模式,解耦 Webhook 接收和消息处理
  • 使用 SQS 作为任务队列,支持消息重试和死信队列
  • DLQ 配置 maxReceiveCount=3,超过则进入死信队列

异常处理增强

  • Consumer层:JSON解析、消息处理、Telegram发送的异常处理
  • Server层:session操作、Agent SDK调用的异常处理
  • 初始化result默认值防止未定义错误
  • 关键操作失败返回500,非关键操作log后继续

Issue#12部分修复

  • ✅ Client层:使用`or`运算符处理空response,确保不发送空消息给Telegram
  • ❌ 上游层:Agent SDK仍返回空响应(需后续优化)
  • 用户体验:系统命令显示"No response"而非帮助信息

Test Plan

  • 验证 Producer Lambda 响应时间 < 1s
  • 验证 Consumer Lambda 正确处理 SQS 消息
  • 验证空response场景不会导致Telegram发送失败
  • 验证失败消息进入 DLQ
  • 验证 DLQ 告警触发
  • 端到端测试:发送 Telegram 消息并收到响应

Related Issues

Fixes #1
Partially fixes #12 (防止Telegram发送失败,但系统命令仍需改进)

解决 Telegram Webhook 超时重试导致重复响应的问题:
- Producer Lambda 接收 Webhook 后立即返回 200(<1s)
- Consumer Lambda 从 SQS 异步消费,调用 Agent Server
- 添加死信队列(DLQ)和 CloudWatch 告警
Copilot AI review requested due to automatic review settings January 4, 2026 12:50
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the SDK Client architecture from a synchronous to an asynchronous SQS-based system to solve Telegram webhook timeout issues. The change separates message reception from processing, ensuring webhooks return within 1 second to prevent Telegram's ~27-second timeout retries.

Key Changes:

  • Introduced SQS queue architecture with Producer (webhook receiver) and Consumer (async processor) Lambda functions
  • Added Dead Letter Queue (DLQ) with CloudWatch alarms for failure monitoring
  • Split the original handler.py logic into producer (handler.py) and consumer (consumer.py) components

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
template.yaml Added SQS TaskQueue, DLQ, Consumer Lambda, and CloudWatch alarm; updated Producer Lambda configuration with SQS policies
agent-sdk-client/handler.py Refactored to producer pattern - validates webhooks and enqueues to SQS, returns 200 immediately
agent-sdk-client/consumer.py New consumer Lambda - processes SQS messages, calls Agent Server, sends responses to Telegram
agent-sdk-client/config.py Added queue_url configuration field
README.md Updated architecture diagrams and documentation to reflect SQS async architecture

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +176 to +190
DLQAlarm:
Type: AWS::CloudWatch::Alarm
Properties:
AlarmName: !Sub '${AWS::StackName}-DLQ-Messages'
AlarmDescription: Alert when messages land in Dead Letter Queue
MetricName: ApproximateNumberOfMessagesVisible
Namespace: AWS/SQS
Statistic: Sum
Period: 300
EvaluationPeriods: 1
Threshold: 1
ComparisonOperator: GreaterThanOrEqualToThreshold
Dimensions:
- Name: QueueName
Value: !GetAtt DLQueue.QueueName
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CloudWatch alarm lacks an AlarmActions configuration. The alarm will trigger but won't send notifications anywhere. Consider adding an SNS topic ARN to AlarmActions so that operators are actually notified when messages land in the DLQ.

Copilot uses AI. Check for mistakes.
Comment on lines 50 to 61
# Write to SQS for async processing
sqs = _get_sqs_client()
sqs.send_message(
QueueUrl=config.queue_url,
MessageBody=json.dumps({
'telegram_update': body,
'chat_id': message.chat_id,
'message_id': message.message_id,
'text': message.text,
'thread_id': message.message_thread_id,
}),
)
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SQS send_message call lacks error handling. If the SQS call fails (e.g., due to network issues, permissions, or queue not existing), the function will raise an unhandled exception. This should be caught and handled gracefully, possibly still returning a 200 status to Telegram to avoid webhook retries, while logging the error for debugging.

Copilot uses AI. Check for mistakes.
Comment on lines +68 to +74
except Exception as e:
await bot.send_message(
chat_id=message.chat_id,
text=f"Error: {str(e)[:200]}",
message_thread_id=message.message_thread_id,
)
raise # Re-raise to trigger SQS retry
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After sending an error message to the user, the exception is re-raised to trigger SQS retry. This will result in the user receiving multiple duplicate error messages (up to 3 retries). Consider only re-raising for transient errors (like TimeoutException) and not for general exceptions, or avoid sending the error message to Telegram before confirming it's non-retryable.

Copilot uses AI. Check for mistakes.
from typing import Any

import httpx
import boto3
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boto3 dependency is not included in requirements.txt. While boto3 is available in the Lambda Python runtime by default, it's a best practice to explicitly declare all dependencies in requirements.txt to ensure version consistency and make local development/testing easier.

Copilot uses AI. Check for mistakes.
Comment on lines +39 to 41
# Quick validation - parse update to check if it's a valid message
bot = Bot(config.telegram_token)
update = Update.de_json(body, bot)
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Bot object is created unnecessarily just for validation. Creating a Bot instance establishes connections and can be relatively expensive. Consider using Update.de_json with None as the bot parameter for validation, or implement a lighter validation approach that doesn't require Bot instantiation.

Copilot uses AI. Check for mistakes.
Comment on lines +154 to +173
ConsumerFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./agent-sdk-client
Handler: consumer.lambda_handler
Runtime: python3.12
Timeout: 900 # 15 minutes for long Agent Server calls
Environment:
Variables:
TELEGRAM_BOT_TOKEN: !Ref TelegramBotToken
AGENT_SERVER_URL: !GetAtt AgentServerFunctionUrl.FunctionUrl
SDK_CLIENT_AUTH_TOKEN: !Ref AuthToken
QUEUE_URL: !Ref TaskQueue
Events:
SQSEvent:
Type: SQS
Properties:
Queue: !GetAtt TaskQueue.Arn
BatchSize: 1 # Process one message at a time
MaximumBatchingWindowInSeconds: 0 # Process immediately
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConsumerFunction is missing IAM policies. The consumer needs permissions to receive and delete messages from the SQS queue. While SAM may auto-generate some policies based on the SQS event source, it's better to explicitly add SQSPollerPolicy to ensure proper permissions are granted.

Copilot uses AI. Check for mistakes.
@BukeLy
Copy link
Owner Author

BukeLy commented Jan 4, 2026

Code review

Found 2 issues:

  1. ConsumerFunction missing SQS receive permissions (AWS SAM requires explicit policy configuration)

https://github.com/BukeLy/OmniCloud-Ops-Agent/blob/8b2dc5e341ffdecd859edbb899057b87ba85a7c7/template.yaml#L154-L173

ConsumerFunction has SQS event source but no Policies field. Add:

      Policies:
        - SQSPollerPolicy:
            QueueName: !GetAtt TaskQueue.QueueName
  1. Message deduplication removed, SQS retry causes duplicate Telegram messages

https://github.com/BukeLy/OmniCloud-Ops-Agent/blob/8b2dc5e341ffdecd859edbb899057b87ba85a7c7/agent-sdk-client/consumer.py#L60-L74

The previous implementation (commit ff4ca85) had DynamoDB message deduplication to prevent webhook duplicates. Current code raises exceptions on failure (L66, L74), triggering SQS retry up to 3 times. Each retry resends the Telegram message, causing user to receive duplicate error messages. Need to restore deduplication logic or use FIFO queue with deduplication enabled.

🤖 Generated with Claude Code

  • If this code review was useful, please react with 👍. Otherwise, react with 👎.

BukeLy added 4 commits January 4, 2026 21:15
- template.yaml: 为 ConsumerFunction 添加 SQSPollerPolicy 权限
- consumer.py: 添加 response.raise_for_status() 检查 HTTP 错误状态码
1. 修复评论4 (95分) - 异常重试导致重复Telegram消息
   - consumer.py: 移除一般异常的re-raise,只对TimeoutException重试
   - 避免已发送错误消息后还会重试导致用户收到3次重复消息

2. 修复评论3 (92分) - SQS send_message缺少错误处理
   - handler.py: 添加完整的SQS错误处理和恢复机制
   - 区分可恢复(限流)和不可恢复错误(权限、队列不存在)
   - 发送CloudWatch自定义指标用于监控
   - SQS失败仍返回200给Telegram,防止webhook重试雪崩

3. 修复评论1 (85分) - CloudWatch告警缺少AlarmActions
   - template.yaml: 添加SNS Topic用于告警通知
   - DLQAlarm: 配置AlarmActions发送到SNS
   - 新增ProducerSQSErrorAlarm和ProducerLambdaErrorAlarm

其他改进:
   - Producer Lambda超时从30秒改为10秒(评论6: 75分)
   - handler.py添加CloudWatch权限策略
- consumer.py:
  * 添加lambda_handler的JSON解析异常处理
  * 包装asyncio.run的异常捕获和重新抛出
  * 初始化result默认值防止未定义错误
  * 添加message为None的检查
  * 增强异常处理的日志记录
  * 修复Issue#12:使用 or 运算符处理空response

- handler.py:
  * 包装asyncio.run的异常处理,返回结构化错误响应
  * 添加session操作的异常处理
  * 关键操作失败返回500,非关键操作log后继续
  * 完整的日志记录便于调试

确保系统在异常情况下不会导致SQS消息卡死,并正确处理Agent返回空响应的场景。
@BukeLy BukeLy merged commit 3ac0038 into main Jan 4, 2026
4 checks passed
@BukeLy BukeLy deleted the feature/sqs-async-architecture branch January 4, 2026 15:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants