Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent-sdk-client/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ commands = [
[local_commands]
# Local-only commands handled by the client
help = "Hello World"
newchat = "创建新对话"
Copy link

Copilot AI Jan 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config.toml adds /newchat as a local command with response "创建新对话", but /newchat is not actually a local command - it requires async processing and SQS messaging. This is inconsistent with the implementation in handler.py where /newchat is handled specially before the local command check. The entry in local_commands may cause confusion or incorrect behavior if config.is_local_command() is called for /newchat. Consider removing this entry from local_commands or documenting why it's included.

Suggested change
newchat = "创建新对话"

Copilot uses AI. Check for mistakes.

[security]
# User IDs allowed to add bot to groups and send private messages.
Expand Down
16 changes: 10 additions & 6 deletions agent-sdk-client/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ async def process_message(message_data: dict) -> None:
'error_message': 'Failed to get response from Agent Server'
}

# Use message_data fields for SQS message (allows handler to override text/thread_id)
user_message = message_data.get('text') or message.text
thread_id = message_data.get('thread_id') or message.message_thread_id

# Call Agent Server
try:
async with httpx.AsyncClient(timeout=600.0) as client:
Expand All @@ -118,9 +122,9 @@ async def process_message(message_data: dict) -> None:
'Content-Type': 'application/json',
},
json={
'user_message': message.text,
'user_message': user_message,
'chat_id': str(message.chat_id),
'thread_id': str(message.message_thread_id) if message.message_thread_id else None,
'thread_id': str(thread_id) if thread_id else None,
},
)
response.raise_for_status()
Expand All @@ -131,7 +135,7 @@ async def process_message(message_data: dict) -> None:
await bot.send_message(
chat_id=message.chat_id,
text="Request timed out.",
message_thread_id=message.message_thread_id,
message_thread_id=thread_id,
)
raise # Re-raise to trigger SQS retry for transient errors

Expand All @@ -142,7 +146,7 @@ async def process_message(message_data: dict) -> None:
await bot.send_message(
chat_id=message.chat_id,
text=error_text,
message_thread_id=message.message_thread_id,
message_thread_id=thread_id,
)
except Exception as send_error:
logger.error(f"Failed to send error message to Telegram: {send_error}")
Expand All @@ -163,7 +167,7 @@ async def process_message(message_data: dict) -> None:
chat_id=message.chat_id,
text=text,
parse_mode=ParseMode.MARKDOWN_V2,
message_thread_id=message.message_thread_id,
message_thread_id=thread_id,
reply_to_message_id=message.message_id,
)
except BadRequest as e:
Expand All @@ -173,7 +177,7 @@ async def process_message(message_data: dict) -> None:
chat_id=message.chat_id,
text=safe_text,
parse_mode=ParseMode.MARKDOWN_V2,
message_thread_id=message.message_thread_id,
message_thread_id=thread_id,
reply_to_message_id=message.message_id,
)
else:
Expand Down
133 changes: 133 additions & 0 deletions agent-sdk-client/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,93 @@ def _handle_local_command(bot: Bot, message, config: Config, cmd: str) -> bool:
return True


async def _check_forum_requirements(bot: Bot, chat_id: int) -> tuple[bool, str]:
"""检查群组 Topic 功能要求。

Returns:
(is_ok, error_message) - 如果满足要求返回 (True, ""),否则返回 (False, 错误提示)
"""
try:
chat = await bot.get_chat(chat_id)
if not chat.is_forum:
return False, (
"⚠️ 群组未开启 Topics 功能\n\n"
"请按以下步骤开启:\n"
"1. 打开群组设置\n"
"2. 点击「Topics」\n"
"3. 开启 Topics 功能\n"
"4. 重新添加 Bot"
)

me = await bot.get_me()
member = await bot.get_chat_member(chat_id, me.id)
if not getattr(member, 'can_manage_topics', False):
return False, (
"⚠️ Bot 缺少「管理 Topics」权限\n\n"
"请按以下步骤授权:\n"
"1. 打开群组设置 > 管理员\n"
"2. 选择此 Bot\n"
"3. 开启「Manage Topics」权限"
)
return True, ""
except Exception as e:
logger.warning(f"Failed to check forum requirements: {e}")
return False, f"检查权限失败: {str(e)[:100]}"


async def _handle_newchat_command(
bot: Bot, message, body: dict, config: Config, sqs, prompts: str
) -> bool:
"""处理 /newchat - 创建 Topic 并发送消息到 SQS。

Args:
bot: Telegram Bot 实例
message: Telegram Message 对象
body: 原始 webhook body (用于构造 SQS 消息)
config: 配置对象
sqs: SQS 客户端
prompts: 用户输入的消息内容

Returns:
True 如果成功,False 如果失败
"""
from datetime import datetime

chat_id = message.chat_id
topic_name = f"Chat {datetime.now().strftime('%m/%d %H:%M')}"

try:
forum_topic = await bot.create_forum_topic(chat_id=chat_id, name=topic_name)
new_thread_id = forum_topic.message_thread_id

# 使用标准 SQS 消息格式,覆盖 text 和 thread_id
message_body = {
'telegram_update': body,
'chat_id': chat_id,
'message_id': message.message_id,
'text': prompts,
'thread_id': new_thread_id,
}

success = _send_to_sqs_safe(sqs, config.queue_url, message_body)
if not success:
await bot.send_message(
chat_id=chat_id,
text="发送消息失败,请重试",
message_thread_id=new_thread_id,
)
return success

except Exception as e:
logger.warning(f"Failed to create forum topic: {e}")
await bot.send_message(
chat_id=chat_id,
text=f"创建 Topic 失败: {str(e)[:100]}",
message_thread_id=message.message_thread_id,
)
return False


def lambda_handler(event: dict, context: Any) -> dict:
"""Lambda entry point - Producer.

Expand Down Expand Up @@ -182,6 +269,29 @@ def lambda_handler(event: dict, context: Any) -> dict:
extra={'chat_id': chat_id, 'inviter_id': inviter_id},
)
_send_metric('SecurityBlock.UnauthorizedGroup')
else:
# 授权群组的 Topic 预检
member_update = update.my_chat_member
old_status = member_update.old_chat_member.status
new_status = member_update.new_chat_member.status

if old_status in ('left', 'kicked') and new_status in (
'member',
'administrator',
):
chat_id = member_update.chat.id

async def _run_topic_precheck():
is_ok, error_msg = await _check_forum_requirements(bot, chat_id)
if not is_ok:
await bot.send_message(chat_id=chat_id, text=error_msg)
logger.warning(
"Forum requirements check failed",
extra={'chat_id': chat_id, 'error_msg': error_msg},
)
_send_metric('TopicPrecheck.Failed')

asyncio.run(_run_topic_precheck())
return {'statusCode': 200}

message = update.message or update.edited_message
Expand All @@ -200,7 +310,30 @@ def lambda_handler(event: dict, context: Any) -> dict:
_send_metric('SecurityBlock.UnauthorizedPrivate')
return {'statusCode': 200}

# 群组消息:非 Forum 直接忽略(用户入群时已收到预检提示)
if message.chat.type in ('group', 'supergroup') and not message.chat.is_forum:
return {'statusCode': 200}

cmd = config.get_command(message.text)

# /newchat 特殊处理 - 创建 Topic 后发 SQS
if cmd == '/newchat':
# 提取 prompts:移除命令部分(包括可能的 @bot 后缀)
parts = message.text.strip().split(maxsplit=1)
prompts = parts[1] if len(parts) > 1 else ''
if not prompts:
bot.send_message(
chat_id=message.chat_id,
text="用法: /newchat <消息内容>",
message_thread_id=message.message_thread_id,
)
return {'statusCode': 200}

sqs = _get_sqs_client()
asyncio.run(_handle_newchat_command(bot, message, body, config, sqs, prompts))
return {'statusCode': 200}

# 其他 local command 正常处理
if cmd and config.is_local_command(cmd):
_handle_local_command(bot, message, config, cmd)
return {'statusCode': 200}
Expand Down
76 changes: 76 additions & 0 deletions docs/forum-group-security.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Forum 群组安全设计

## 背景

Bot 设计上依赖 Telegram Forum (Topics) 功能,需要确保:
1. Bot 只在满足条件的群组中工作
2. 用户得到清晰的配置指引

## 安全策略

### 1. 用户白名单 (最高优先级)

非白名单用户拉 Bot 进群 → 直接退群,不做任何处理。

```python
if should_leave_group(update, config.user_whitelist):
bot.leave_chat(chat_id)
return
```

### 2. 入群预检

白名单用户拉 Bot 进群时,检查:
- `chat.is_forum` - 群组是否开启 Topics
- `can_manage_topics` - Bot 是否有 Topic 管理权限

预检失败时发送详细配置指引,但不退群(给用户配置时间)。

### 3. 消息过滤 (统一入口)

非 Forum 群组的消息在 producer 入口处直接忽略:

```python
# 群组消息:非 Forum 直接忽略
if message.chat.type in ('group', 'supergroup') and not message.chat.is_forum:
return {'statusCode': 200}
```

**优点**:
- 一行代码,所有命令无需单独检查
- 私聊不受影响
- 用户已在入群时收到预检提示

## 处理流程

```
Bot 被添加到群组
┌─────────────────────┐
│ 白名单检查 │
└─────────────────────┘
通过? ──No──→ 退群
↓ Yes
┌─────────────────────┐
│ Topic 预检 │
│ - is_forum? │
│ - can_manage_topics?│
└─────────────────────┘
通过? ──No──→ 发送配置指引
↓ Yes
正常工作

---

收到群组消息
┌─────────────────────┐
│ is_forum 检查 │
└─────────────────────┘
是 Forum? ──No──→ 静默忽略
↓ Yes
正常处理命令
```