diff --git a/agent-sdk-client/config.toml b/agent-sdk-client/config.toml index 49d677a..32baa66 100644 --- a/agent-sdk-client/config.toml +++ b/agent-sdk-client/config.toml @@ -8,6 +8,7 @@ commands = [ [local_commands] # Local-only commands handled by the client help = "Hello World" +newchat = "创建新对话" [security] # User IDs allowed to add bot to groups and send private messages. diff --git a/agent-sdk-client/consumer.py b/agent-sdk-client/consumer.py index 52faf6a..5b7e6c8 100644 --- a/agent-sdk-client/consumer.py +++ b/agent-sdk-client/consumer.py @@ -108,6 +108,10 @@ async def process_message(message_data: dict) -> None: 'error_message': 'Failed to get response from Agent Server' } + # Use message_data fields for SQS message (allows handler to override text/thread_id) + user_message = message_data.get('text') or message.text + thread_id = message_data.get('thread_id') or message.message_thread_id + # Call Agent Server try: async with httpx.AsyncClient(timeout=600.0) as client: @@ -118,9 +122,9 @@ async def process_message(message_data: dict) -> None: 'Content-Type': 'application/json', }, json={ - 'user_message': message.text, + 'user_message': user_message, 'chat_id': str(message.chat_id), - 'thread_id': str(message.message_thread_id) if message.message_thread_id else None, + 'thread_id': str(thread_id) if thread_id else None, }, ) response.raise_for_status() @@ -131,7 +135,7 @@ async def process_message(message_data: dict) -> None: await bot.send_message( chat_id=message.chat_id, text="Request timed out.", - message_thread_id=message.message_thread_id, + message_thread_id=thread_id, ) raise # Re-raise to trigger SQS retry for transient errors @@ -142,7 +146,7 @@ async def process_message(message_data: dict) -> None: await bot.send_message( chat_id=message.chat_id, text=error_text, - message_thread_id=message.message_thread_id, + message_thread_id=thread_id, ) except Exception as send_error: logger.error(f"Failed to send error message to Telegram: {send_error}") @@ -163,7 +167,7 @@ async def process_message(message_data: dict) -> None: chat_id=message.chat_id, text=text, parse_mode=ParseMode.MARKDOWN_V2, - message_thread_id=message.message_thread_id, + message_thread_id=thread_id, reply_to_message_id=message.message_id, ) except BadRequest as e: @@ -173,7 +177,7 @@ async def process_message(message_data: dict) -> None: chat_id=message.chat_id, text=safe_text, parse_mode=ParseMode.MARKDOWN_V2, - message_thread_id=message.message_thread_id, + message_thread_id=thread_id, reply_to_message_id=message.message_id, ) else: diff --git a/agent-sdk-client/handler.py b/agent-sdk-client/handler.py index b598fc8..4757ec2 100644 --- a/agent-sdk-client/handler.py +++ b/agent-sdk-client/handler.py @@ -149,6 +149,93 @@ def _handle_local_command(bot: Bot, message, config: Config, cmd: str) -> bool: return True +async def _check_forum_requirements(bot: Bot, chat_id: int) -> tuple[bool, str]: + """检查群组 Topic 功能要求。 + + Returns: + (is_ok, error_message) - 如果满足要求返回 (True, ""),否则返回 (False, 错误提示) + """ + try: + chat = await bot.get_chat(chat_id) + if not chat.is_forum: + return False, ( + "⚠️ 群组未开启 Topics 功能\n\n" + "请按以下步骤开启:\n" + "1. 打开群组设置\n" + "2. 点击「Topics」\n" + "3. 开启 Topics 功能\n" + "4. 重新添加 Bot" + ) + + me = await bot.get_me() + member = await bot.get_chat_member(chat_id, me.id) + if not getattr(member, 'can_manage_topics', False): + return False, ( + "⚠️ Bot 缺少「管理 Topics」权限\n\n" + "请按以下步骤授权:\n" + "1. 打开群组设置 > 管理员\n" + "2. 选择此 Bot\n" + "3. 开启「Manage Topics」权限" + ) + return True, "" + except Exception as e: + logger.warning(f"Failed to check forum requirements: {e}") + return False, f"检查权限失败: {str(e)[:100]}" + + +async def _handle_newchat_command( + bot: Bot, message, body: dict, config: Config, sqs, prompts: str +) -> bool: + """处理 /newchat - 创建 Topic 并发送消息到 SQS。 + + Args: + bot: Telegram Bot 实例 + message: Telegram Message 对象 + body: 原始 webhook body (用于构造 SQS 消息) + config: 配置对象 + sqs: SQS 客户端 + prompts: 用户输入的消息内容 + + Returns: + True 如果成功,False 如果失败 + """ + from datetime import datetime + + chat_id = message.chat_id + topic_name = f"Chat {datetime.now().strftime('%m/%d %H:%M')}" + + try: + forum_topic = await bot.create_forum_topic(chat_id=chat_id, name=topic_name) + new_thread_id = forum_topic.message_thread_id + + # 使用标准 SQS 消息格式,覆盖 text 和 thread_id + message_body = { + 'telegram_update': body, + 'chat_id': chat_id, + 'message_id': message.message_id, + 'text': prompts, + 'thread_id': new_thread_id, + } + + success = _send_to_sqs_safe(sqs, config.queue_url, message_body) + if not success: + await bot.send_message( + chat_id=chat_id, + text="发送消息失败,请重试", + message_thread_id=new_thread_id, + ) + return success + + except Exception as e: + logger.warning(f"Failed to create forum topic: {e}") + await bot.send_message( + chat_id=chat_id, + text=f"创建 Topic 失败: {str(e)[:100]}", + message_thread_id=message.message_thread_id, + ) + return False + + def lambda_handler(event: dict, context: Any) -> dict: """Lambda entry point - Producer. @@ -182,6 +269,29 @@ def lambda_handler(event: dict, context: Any) -> dict: extra={'chat_id': chat_id, 'inviter_id': inviter_id}, ) _send_metric('SecurityBlock.UnauthorizedGroup') + else: + # 授权群组的 Topic 预检 + member_update = update.my_chat_member + old_status = member_update.old_chat_member.status + new_status = member_update.new_chat_member.status + + if old_status in ('left', 'kicked') and new_status in ( + 'member', + 'administrator', + ): + chat_id = member_update.chat.id + + async def _run_topic_precheck(): + is_ok, error_msg = await _check_forum_requirements(bot, chat_id) + if not is_ok: + await bot.send_message(chat_id=chat_id, text=error_msg) + logger.warning( + "Forum requirements check failed", + extra={'chat_id': chat_id, 'error_msg': error_msg}, + ) + _send_metric('TopicPrecheck.Failed') + + asyncio.run(_run_topic_precheck()) return {'statusCode': 200} message = update.message or update.edited_message @@ -200,7 +310,30 @@ def lambda_handler(event: dict, context: Any) -> dict: _send_metric('SecurityBlock.UnauthorizedPrivate') return {'statusCode': 200} + # 群组消息:非 Forum 直接忽略(用户入群时已收到预检提示) + if message.chat.type in ('group', 'supergroup') and not message.chat.is_forum: + return {'statusCode': 200} + cmd = config.get_command(message.text) + + # /newchat 特殊处理 - 创建 Topic 后发 SQS + if cmd == '/newchat': + # 提取 prompts:移除命令部分(包括可能的 @bot 后缀) + parts = message.text.strip().split(maxsplit=1) + prompts = parts[1] if len(parts) > 1 else '' + if not prompts: + bot.send_message( + chat_id=message.chat_id, + text="用法: /newchat <消息内容>", + message_thread_id=message.message_thread_id, + ) + return {'statusCode': 200} + + sqs = _get_sqs_client() + asyncio.run(_handle_newchat_command(bot, message, body, config, sqs, prompts)) + return {'statusCode': 200} + + # 其他 local command 正常处理 if cmd and config.is_local_command(cmd): _handle_local_command(bot, message, config, cmd) return {'statusCode': 200} diff --git a/docs/forum-group-security.md b/docs/forum-group-security.md new file mode 100644 index 0000000..c0e089c --- /dev/null +++ b/docs/forum-group-security.md @@ -0,0 +1,76 @@ +# Forum 群组安全设计 + +## 背景 + +Bot 设计上依赖 Telegram Forum (Topics) 功能,需要确保: +1. Bot 只在满足条件的群组中工作 +2. 用户得到清晰的配置指引 + +## 安全策略 + +### 1. 用户白名单 (最高优先级) + +非白名单用户拉 Bot 进群 → 直接退群,不做任何处理。 + +```python +if should_leave_group(update, config.user_whitelist): + bot.leave_chat(chat_id) + return +``` + +### 2. 入群预检 + +白名单用户拉 Bot 进群时,检查: +- `chat.is_forum` - 群组是否开启 Topics +- `can_manage_topics` - Bot 是否有 Topic 管理权限 + +预检失败时发送详细配置指引,但不退群(给用户配置时间)。 + +### 3. 消息过滤 (统一入口) + +非 Forum 群组的消息在 producer 入口处直接忽略: + +```python +# 群组消息:非 Forum 直接忽略 +if message.chat.type in ('group', 'supergroup') and not message.chat.is_forum: + return {'statusCode': 200} +``` + +**优点**: +- 一行代码,所有命令无需单独检查 +- 私聊不受影响 +- 用户已在入群时收到预检提示 + +## 处理流程 + +``` +Bot 被添加到群组 + ↓ +┌─────────────────────┐ +│ 白名单检查 │ +└─────────────────────┘ + ↓ + 通过? ──No──→ 退群 + ↓ Yes +┌─────────────────────┐ +│ Topic 预检 │ +│ - is_forum? │ +│ - can_manage_topics?│ +└─────────────────────┘ + ↓ + 通过? ──No──→ 发送配置指引 + ↓ Yes + 正常工作 + +--- + +收到群组消息 + ↓ +┌─────────────────────┐ +│ is_forum 检查 │ +└─────────────────────┘ + ↓ + 是 Forum? ──No──→ 静默忽略 + ↓ Yes + 正常处理命令 +```