Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Changelog

## [Unreleased]

### Changed
- **架构调整:从同步到异步处理模式**
- 重构SDK Client为SQS异步架构
- Producer Lambda(SdkClientFunction):接收Telegram webhook,立即返回200,消息写入SQS队列
- Consumer Lambda(ConsumerFunction):异步处理SQS消息,调用Agent Server,返回结果给Telegram
- 解决Telegram 30秒webhook超时问题,支持长运行任务

### Added
- SQS任务队列(TaskQueue)和死信队列(DLQueue)
- SNS告警主题(AlarmTopic)用于CloudWatch通知
- CloudWatch告警和自定义指标监控
- DynamoDB会话表用于多轮对话状态管理
70 changes: 56 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ A Serverless AI Agent system built on Claude Agent SDK, implementing stateful co
## Architecture

```
Telegram User → Bot API → API Gateway → sdk-client Lambda
API Gateway → agent-container Lambda
DynamoDB (Session mapping) + S3 (Session files) + Bedrock (Claude)
Telegram User → Bot API → API Gateway → Producer Lambda → SQS Queue → Consumer Lambda
Return 200 agent-server Lambda
immediately
DynamoDB (Session mapping) + S3 (Session files) + Bedrock (Claude)
```

**Core Design**: Uses the Hybrid Sessions pattern recommended by Claude Agent SDK
**Core Design**:
- Uses the Hybrid Sessions pattern recommended by Claude Agent SDK
- **SQS Async Architecture**: Producer returns 200 immediately to Telegram, Consumer processes requests asynchronously

## Features

Expand All @@ -34,6 +36,7 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda
- **Skills Support**: Reusable skill modules with hello-world example
- **MCP Integration**: Support for HTTP and local command-based MCP servers
- **Auto Cleanup**: 25-day TTL + S3 lifecycle management
- **SQS Queue**: Async processing + auto retry + dead letter queue
- **Quick Start**: Provides example Skill/SubAgent/MCP configurations for adding other components

## Project Structure
Expand All @@ -51,7 +54,9 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda
│ └── system_prompt.md # System Prompt
├── agent-sdk-client/ # Telegram Client (ZIP Deployment)
│ └── handler.py # Webhook Handler
│ ├── handler.py # Producer: Webhook receiver, writes to SQS
│ ├── consumer.py # Consumer: SQS consumer, calls Agent
│ └── config.py # Configuration management
├── docs/ # Documentation
│ └── anthropic-agent-sdk-official/ # SDK Official Docs Reference
Expand Down Expand Up @@ -93,16 +98,32 @@ sam deploy --guided
| `BEDROCK_SECRET_ACCESS_KEY` | Bedrock secret key |
| `SDK_CLIENT_AUTH_TOKEN` | Internal authentication token |
| `TELEGRAM_BOT_TOKEN` | Telegram Bot Token |
| `QUEUE_URL` | SQS queue URL (auto-created) |

## Tech Stack

- **Runtime**: Python 3.12 + Claude Agent SDK
- **Computing**: AWS Lambda (ARM64)
- **Storage**: S3 + DynamoDB
- **Message Queue**: AWS SQS (Standard Queue + DLQ)
- **AI**: Claude via Amazon Bedrock
- **Orchestration**: AWS SAM
- **Integration**: Telegram Bot API + MCP

## SQS Async Architecture

**Problem Solved**: Telegram Webhook times out and retries after ~27s, while Agent processing may take 30-70s, causing duplicate responses.

**Solution**:
1. Producer Lambda receives Webhook, writes to SQS, returns 200 immediately (<1s)
2. Consumer Lambda consumes from SQS, calls Agent Server, sends response to Telegram
3. Retry 3 times on failure, then move to dead letter queue (DLQ)

**Queue Configuration**:
- VisibilityTimeout: 900s (= Lambda timeout)
- maxReceiveCount: 3 (retry 3 times)
- DLQ Alarm: CloudWatch alarm triggers when messages enter DLQ

## Session Management

**Lifecycle**:
Expand Down Expand Up @@ -181,14 +202,16 @@ MIT
## 架构

```
Telegram User → Bot API → API Gateway → sdk-client Lambda
API Gateway → agent-container Lambda
DynamoDB (Session映射) + S3 (Session文件) + Bedrock (Claude)
Telegram User → Bot API → API Gateway → Producer Lambda → SQS Queue → Consumer Lambda
立即返回 200 agent-server Lambda
DynamoDB (Session映射) + S3 (Session文件) + Bedrock (Claude)
```

**核心设计**:采用 Claude Agent SDK 官方推荐的 Hybrid Sessions 模式
**核心设计**:
- 采用 Claude Agent SDK 官方推荐的 Hybrid Sessions 模式
- **SQS 异步架构**:Producer 立即返回 200 给 Telegram,Consumer 异步处理请求

## 特性

Expand All @@ -198,6 +221,7 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda
- **Skills 支持**:可复用的技能模块,包含 hello-world 示例
- **MCP 集成**:支持 HTTP 和本地命令类型的 MCP 服务器
- **自动清理**:25天 TTL + S3 生命周期管理
- **SQS 队列**:异步处理 + 自动重试 + 死信队列
- **快速开始**:提供示例 Skill/SubAgent/MCP 配置,可按照示例添加其他组件

## 项目结构
Expand All @@ -215,7 +239,9 @@ Telegram User → Bot API → API Gateway → sdk-client Lambda
│ └── system_prompt.md # 系统提示
├── agent-sdk-client/ # Telegram客户端 (ZIP部署)
│ └── handler.py # Webhook处理
│ ├── handler.py # Producer: Webhook接收,写入SQS
│ ├── consumer.py # Consumer: SQS消费,调用Agent
│ └── config.py # 配置管理
├── docs/ # 文档
│ └── anthropic-agent-sdk-official/ # SDK官方文档参考
Expand Down Expand Up @@ -257,16 +283,32 @@ sam deploy --guided
| `BEDROCK_SECRET_ACCESS_KEY` | Bedrock密钥 |
| `SDK_CLIENT_AUTH_TOKEN` | 内部认证Token |
| `TELEGRAM_BOT_TOKEN` | Telegram Bot Token |
| `QUEUE_URL` | SQS队列URL(自动创建) |

## 技术栈

- **Runtime**: Python 3.12 + Claude Agent SDK
- **计算**: AWS Lambda (ARM64)
- **存储**: S3 + DynamoDB
- **消息队列**: AWS SQS (Standard Queue + DLQ)
- **AI**: Claude via Amazon Bedrock
- **编排**: AWS SAM
- **集成**: Telegram Bot API + MCP

## SQS 异步架构

**解决的问题**:Telegram Webhook 在 ~27s 后超时重试,而 Agent 处理可能需要 30-70s,导致重复响应。

**解决方案**:
1. Producer Lambda 接收 Webhook,写入 SQS,立即返回 200(<1s)
2. Consumer Lambda 从 SQS 消费,调用 Agent Server,发送响应给 Telegram
3. 失败重试 3 次,最终失败进入死信队列(DLQ)

**队列配置**:
- VisibilityTimeout: 900s(= Lambda 超时)
- maxReceiveCount: 3(重试 3 次)
- DLQ 告警:消息进入 DLQ 时触发 CloudWatch 告警

## Session 管理

**生命周期**:
Expand Down
2 changes: 2 additions & 0 deletions agent-sdk-client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Config:
telegram_token: str
agent_server_url: str
auth_token: str
queue_url: str

@classmethod
def from_env(cls) -> 'Config':
Expand All @@ -18,4 +19,5 @@ def from_env(cls) -> 'Config':
telegram_token=os.getenv('TELEGRAM_BOT_TOKEN', ''),
agent_server_url=os.getenv('AGENT_SERVER_URL', ''),
auth_token=os.getenv('SDK_CLIENT_AUTH_TOKEN', 'default-token'),
queue_url=os.getenv('QUEUE_URL', ''),
)
142 changes: 142 additions & 0 deletions agent-sdk-client/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Lambda handler for SQS Consumer.

Processes messages from SQS queue, calls Agent Server, sends response to Telegram.
"""
import asyncio
import json
from typing import Any

import httpx
from telegram import Bot, Update
from telegram.constants import ParseMode, ChatAction
from telegram.helpers import escape_markdown
from telegram.error import BadRequest

from config import Config


def lambda_handler(event: dict, context: Any) -> dict:
"""SQS Consumer Lambda entry point."""
for record in event['Records']:
try:
message_data = json.loads(record['body'])
except json.JSONDecodeError as e:
# Invalid message format - log and skip
import logging
logger = logging.getLogger()
logger.error(f"Failed to parse SQS message: {e}")
continue

try:
asyncio.run(process_message(message_data))
except Exception as e:
# Log and let SQS retry on failure
import logging
logger = logging.getLogger()
logger.exception(f"Failed to process message: {e}")
raise # Re-raise to fail the batch item

return {'statusCode': 200}


async def process_message(message_data: dict) -> None:
"""Process single message from SQS queue."""
import logging
logger = logging.getLogger()

config = Config.from_env()
bot = Bot(config.telegram_token)

# Reconstruct Update object from stored data
update = Update.de_json(message_data['telegram_update'], bot)
message = update.message or update.edited_message
Comment on lines +50 to +52
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message object reconstruction assumes that message will always exist after calling Update.de_json. However, there's no validation to ensure the message exists or has text before accessing properties like message.chat_id. This could lead to AttributeError if the stored telegram_update is for an unsupported update type. Add validation similar to the producer's validation logic.

Copilot uses AI. Check for mistakes.

if not message:
logger.warning("Received update with no message or edited_message")
return

# Send typing indicator
await bot.send_chat_action(
chat_id=message.chat_id,
action=ChatAction.TYPING,
message_thread_id=message.message_thread_id,
)

# Initialize result with default error response
# This ensures result is always defined, even if Agent Server call fails
result = {
'response': '',
'is_error': True,
'error_message': 'Failed to get response from Agent Server'
}

# Call Agent Server
try:
async with httpx.AsyncClient(timeout=600.0) as client:
response = await client.post(
config.agent_server_url,
headers={
'Authorization': f'Bearer {config.auth_token}',
'Content-Type': 'application/json',
},
json={
'user_message': message.text,
'chat_id': str(message.chat_id),
'thread_id': str(message.message_thread_id) if message.message_thread_id else None,
},
)
response.raise_for_status()
result = response.json()

except httpx.TimeoutException:
logger.warning(f"Agent Server timeout for chat_id={message.chat_id}")
await bot.send_message(
chat_id=message.chat_id,
text="Request timed out.",
message_thread_id=message.message_thread_id,
)
raise # Re-raise to trigger SQS retry for transient errors

except Exception as e:
logger.exception(f"Agent Server error for chat_id={message.chat_id}")
error_text = f"Error: {str(e)[:200]}"
try:
await bot.send_message(
chat_id=message.chat_id,
text=error_text,
message_thread_id=message.message_thread_id,
)
except Exception as send_error:
logger.error(f"Failed to send error message to Telegram: {send_error}")
# Don't re-raise - error message already sent to user, retrying would cause duplicate messages

# Format response (result is guaranteed to be defined now)
if result.get('is_error'):
text = f"Agent error: {result.get('error_message', 'Unknown')}"
else:
text = result.get('response') or 'No response'

if len(text) > 4000:
text = text[:4000] + "\n\n... (truncated)"

# Send response to Telegram
try:
await bot.send_message(
chat_id=message.chat_id,
text=text,
parse_mode=ParseMode.MARKDOWN_V2,
message_thread_id=message.message_thread_id,
reply_to_message_id=message.message_id,
)
except BadRequest as e:
if "parse entities" in str(e).lower():
safe_text = escape_markdown(text, version=2)
await bot.send_message(
chat_id=message.chat_id,
text=safe_text,
parse_mode=ParseMode.MARKDOWN_V2,
message_thread_id=message.message_thread_id,
reply_to_message_id=message.message_id,
)
else:
raise
Loading