diff --git a/.github/workflows/release-changelog.yml b/.github/workflows/release-changelog.yml deleted file mode 100644 index 8b3526a..0000000 --- a/.github/workflows/release-changelog.yml +++ /dev/null @@ -1,55 +0,0 @@ -name: Release - Update Changelog - -on: [] - -permissions: - contents: write - pull-requests: write - id-token: write - -jobs: - update-changelog: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - name: Configure AWS Credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - aws-access-key-id: ${{ secrets.BEDROCK_AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.BEDROCK_AWS_SECRET_ACCESS_KEY }} - aws-region: us-east-1 - - - name: Update Changelog with Claude Code - uses: anthropics/claude-code-action@v1 - with: - use_bedrock: "true" - show_full_output: "false" - prompt: | - 请更新 CHANGELOG.md 文件以完成版本发布: - - 1. 当前release版本: ${{ github.event.release.tag_name || github.event.inputs.tag }} - 2. 使用 `git tag -l --sort=-version:refname` 获取前一个release标签 - 3. 使用 `git log [previous-tag]..${{ github.event.release.tag_name || github.event.inputs.tag }} --oneline` 获取此版本的所有提交 - 4. 根据提交信息总结变更(分类为 Added/Changed/Fixed/Removed) - 5. 在 CHANGELOG.md 中: - - 将 [Unreleased] 部分改为 [${{ github.event.release.tag_name || github.event.inputs.tag }}] 日期格式为 YYYY-MM-DD - - 在顶部添加新的 [Unreleased] 部分 - - 保持原有的 Markdown 格式 - - 完成后 commit 此更新。 - claude_args: | - --allowedTools "Read,Write,Edit,Bash(git:*)" - - - name: Push changelog update - run: | - git config user.name "github-actions[bot]" - git config user.email "github-actions[bot]@users.noreply.github.com" - - if git diff --quiet CHANGELOG.md; then - echo "No changes to CHANGELOG.md" - else - git push - fi diff --git a/agent-sdk-client/config.py b/agent-sdk-client/config.py index 89b008b..b421632 100644 --- a/agent-sdk-client/config.py +++ b/agent-sdk-client/config.py @@ -6,6 +6,14 @@ from pathlib import Path from typing import Optional + +@dataclass +class LocalCommand: + """Local command configuration.""" + type: str # "static" or "handler" + response: str = "" # for static type + handler: str = "" # for handler type + logger = logging.getLogger(__name__) DEFAULT_CONFIG_PATH = Path(__file__).with_name("config.toml") @@ -28,7 +36,46 @@ def extract_command(text: Optional[str]) -> Optional[str]: return command -def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], dict[str, str], list[int | str]]: +def _parse_local_command(name: str, value) -> tuple[str, LocalCommand | None]: + """Parse a single local command entry. + + Args: + name: Command name (with or without leading slash) + value: Command value (string for legacy, dict for new format) + + Returns: + Tuple of (normalized_cmd, LocalCommand) or (normalized_cmd, None) if invalid + """ + cmd = f"/{name.lstrip('/')}" if not name.startswith('/') else name + + # Legacy format: string value = static response + if isinstance(value, str): + return cmd, LocalCommand(type="static", response=value) + + # New format: dict with type field + if isinstance(value, dict): + cmd_type = value.get('type', '') + if cmd_type == 'static': + response = value.get('response', '') + if not response: + logger.warning(f"Local command {cmd} has no response; skipping") + return cmd, None + return cmd, LocalCommand(type="static", response=response) + elif cmd_type == 'handler': + handler = value.get('handler', '') + if not handler: + logger.warning(f"Local command {cmd} has no handler; skipping") + return cmd, None + return cmd, LocalCommand(type="handler", handler=handler) + else: + logger.warning(f"Local command {cmd} has unknown type: {cmd_type}; skipping") + return cmd, None + + logger.warning(f"Local command {cmd} has invalid value type; skipping") + return cmd, None + + +def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], dict[str, LocalCommand], list[int | str]]: """Load commands and security config from TOML config file. Returns: @@ -48,16 +95,18 @@ def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], di agent_commands = [] agent_commands = [cmd for cmd in agent_commands if isinstance(cmd, str)] - # Load local commands + # Load local commands (supports both legacy string and new dict format) local_commands_raw = data.get('local_commands', {}) if not isinstance(local_commands_raw, dict): logger.warning("Local commands config is not a table; ignoring configuration") local_commands_raw = {} - local_commands = { - f"/{name.lstrip('/')}" if not name.startswith('/') else name: str(value) - for name, value in local_commands_raw.items() - if isinstance(name, str) and isinstance(value, str) - } + local_commands: dict[str, LocalCommand] = {} + for name, value in local_commands_raw.items(): + if not isinstance(name, str): + continue + cmd, parsed = _parse_local_command(name, value) + if parsed: + local_commands[cmd] = parsed # Load security whitelist security = data.get('security', {}) @@ -92,8 +141,9 @@ class Config: auth_token: str queue_url: str agent_commands: list[str] - local_commands: dict[str, str] + local_commands: dict[str, LocalCommand] user_whitelist: list[int | str] + telegram_webhook_secret: str = "" @classmethod def from_env(cls, config_path: Optional[Path] = None) -> 'Config': @@ -107,6 +157,7 @@ def from_env(cls, config_path: Optional[Path] = None) -> 'Config': agent_commands=agent_cmds, local_commands=local_cmds, user_whitelist=whitelist, + telegram_webhook_secret=os.getenv('TELEGRAM_WEBHOOK_SECRET', ''), ) def get_command(self, text: Optional[str]) -> Optional[str]: @@ -118,8 +169,16 @@ def is_agent_command(self, cmd: Optional[str]) -> bool: def is_local_command(self, cmd: Optional[str]) -> bool: return bool(cmd) and cmd in self.local_commands + def get_local_command(self, cmd: str) -> LocalCommand | None: + """Get local command config by command name.""" + return self.local_commands.get(cmd) + def local_response(self, cmd: str) -> str: - return self.local_commands.get(cmd, "Unsupported command.") + """Get static response for a local command (legacy compatibility).""" + local_cmd = self.local_commands.get(cmd) + if local_cmd and local_cmd.type == "static": + return local_cmd.response + return "Unsupported command." def unknown_command_message(self) -> str: parts = [] diff --git a/agent-sdk-client/config.toml b/agent-sdk-client/config.toml index 32baa66..9b063c0 100644 --- a/agent-sdk-client/config.toml +++ b/agent-sdk-client/config.toml @@ -7,8 +7,15 @@ commands = [ [local_commands] # Local-only commands handled by the client -help = "Hello World" -newchat = "创建新对话" +# Supports two formats: +# - Legacy: help = "static response text" +# - New: help = { type = "static", response = "text" } +# - Handler: newchat = { type = "handler", handler = "newchat" } + +help = { type = "static", response = "Hello World" } +newchat = { type = "handler", handler = "newchat" } +start = { type = "handler", handler = "start" } +debug = { type = "handler", handler = "debug" } [security] # User IDs allowed to add bot to groups and send private messages. diff --git a/agent-sdk-client/consumer.py b/agent-sdk-client/consumer.py index 5b7e6c8..5e630c2 100644 --- a/agent-sdk-client/consumer.py +++ b/agent-sdk-client/consumer.py @@ -4,17 +4,91 @@ """ import asyncio import json +import re from typing import Any import httpx from telegram import Bot, Update from telegram.constants import ParseMode, ChatAction -from telegram.helpers import escape_markdown +from telegramify_markdown import markdownify from telegram.error import BadRequest from config import Config +def fix_heading_bold(text: str) -> str: + """Remove bold markers from headings: ## **Title** -> ## Title. + + Only applies when heading contains **bold** markers. + """ + if re.search(r'^#{1,6}\s*\*\*', text, flags=re.MULTILINE): + return re.sub(r'^(#{1,6})\s*\*\*(.+?)\*\*\s*$', r'\1 \2', text, flags=re.MULTILINE) + return text + + +def fix_code_escaping(text: str) -> str: + """Remove escaping inside code blocks: \\| -> |, \\- -> -. + + Only applies when code blocks contain escaped characters. + """ + if '```' not in text and '`' not in text: + return text + + escaped_chars = '`|-.()+!#={}[]><_*~' + + def unescape(content: str) -> str: + for char in escaped_chars: + content = content.replace(f'\\{char}', char) + return content + + # Fix fenced code blocks + if '```' in text: + text = re.sub( + r'```(.*?)```', + lambda m: f'```{unescape(m.group(1))}```', + text, + flags=re.DOTALL + ) + # Fix inline code + if '`' in text: + text = re.sub( + r'`([^`]+)`', + lambda m: f'`{unescape(m.group(1))}`', + text + ) + return text + + +def fix_unescaped_chars(text: str) -> str: + """Escape special chars outside code blocks that markdownify missed. + + Only applies when unescaped special chars exist outside code blocks. + """ + # Extract code blocks to protect them + blocks = [] + def save(m): + blocks.append(m.group(0)) + return f'\x00{len(blocks)-1}\x00' + + protected = re.sub(r'```.*?```', save, text, flags=re.DOTALL) + protected = re.sub(r'`[^`]+`', save, protected) + + # Check if any unescaped chars exist + chars = r'-.!()+=|{}[]#>' + if not re.search(rf'(? dict: """SQS Consumer Lambda entry point.""" for record in event['Records']: @@ -43,6 +117,7 @@ async def process_message(message_data: dict) -> None: """Process single message from SQS queue.""" import logging logger = logging.getLogger() + logger.setLevel(logging.INFO) config = Config.from_env() bot = Bot(config.telegram_token) @@ -55,51 +130,6 @@ async def process_message(message_data: dict) -> None: logger.warning("Received update with no message or edited_message") return - cmd = config.get_command(message.text) - if cmd: - if config.is_local_command(cmd): - logger.info( - "Handling local command in consumer (fallback path)", - extra={'chat_id': message.chat_id, 'message_id': message.message_id}, - ) - try: - await bot.send_message( - chat_id=message.chat_id, - text=config.local_response(cmd), - message_thread_id=message.message_thread_id, - reply_to_message_id=message.message_id, - ) - except Exception: - logger.warning("Failed to send local command response", exc_info=True) - return - - if not config.is_agent_command(cmd): - # Defensive guard: producer should already block non-agent commands. - logger.info( - "Skipping non-agent command (consumer fallback)", - extra={ - 'chat_id': message.chat_id, - 'message_id': message.message_id, - }, - ) - try: - await bot.send_message( - chat_id=message.chat_id, - text=config.unknown_command_message(), - message_thread_id=message.message_thread_id, - reply_to_message_id=message.message_id, - ) - except Exception: - logger.warning("Failed to send local command response", exc_info=True) - return - - # Send typing indicator - await bot.send_chat_action( - chat_id=message.chat_id, - action=ChatAction.TYPING, - message_thread_id=message.message_thread_id, - ) - # Initialize result with default error response # This ensures result is always defined, even if Agent Server call fails result = { @@ -112,8 +142,21 @@ async def process_message(message_data: dict) -> None: user_message = message_data.get('text') or message.text thread_id = message_data.get('thread_id') or message.message_thread_id - # Call Agent Server - try: + async def keep_typing(): + """Send typing indicator every 4 seconds (Telegram typing expires after 5s).""" + while True: + try: + await bot.send_chat_action( + chat_id=message.chat_id, + action=ChatAction.TYPING, + message_thread_id=thread_id, + ) + except Exception: + pass # Ignore typing errors, don't interrupt main flow + await asyncio.sleep(4) + + async def call_agent_server(): + """Call Agent Server and return result.""" async with httpx.AsyncClient(timeout=600.0) as client: response = await client.post( config.agent_server_url, @@ -125,10 +168,16 @@ async def process_message(message_data: dict) -> None: 'user_message': user_message, 'chat_id': str(message.chat_id), 'thread_id': str(thread_id) if thread_id else None, + 'message_time': message_data.get('message_time'), }, ) response.raise_for_status() - result = response.json() + return response.json() + + # Call Agent Server with continuous typing indicator + typing_task = asyncio.create_task(keep_typing()) + try: + result = await call_agent_server() except httpx.TimeoutException: logger.warning(f"Agent Server timeout for chat_id={message.chat_id}") @@ -152,6 +201,14 @@ async def process_message(message_data: dict) -> None: logger.error(f"Failed to send error message to Telegram: {send_error}") # Don't re-raise - error message already sent to user, retrying would cause duplicate messages + finally: + # Stop typing indicator + typing_task.cancel() + try: + await typing_task + except asyncio.CancelledError: + pass + # Format response (result is guaranteed to be defined now) if result.get('is_error'): text = f"Agent error: {result.get('error_message', 'Unknown')}" @@ -162,23 +219,38 @@ async def process_message(message_data: dict) -> None: text = text[:4000] + "\n\n... (truncated)" # Send response to Telegram + # Convert standard Markdown to Telegram MarkdownV2 format + # Pipeline: fix_heading_bold -> markdownify -> fix_code_escaping -> fix_unescaped_chars + telegram_text = fix_unescaped_chars(fix_code_escaping(markdownify(fix_heading_bold(text)))) + + # Only reply_to original message if thread_id matches (not for /newchat) + reply_to_id = ( + message.message_id + if thread_id == message.message_thread_id + else None + ) + try: await bot.send_message( chat_id=message.chat_id, - text=text, + text=telegram_text, parse_mode=ParseMode.MARKDOWN_V2, message_thread_id=thread_id, - reply_to_message_id=message.message_id, + reply_to_message_id=reply_to_id, + ) + logger.info( + "Sent response to Telegram", + extra={'chat_id': message.chat_id, 'thread_id': thread_id}, ) except BadRequest as e: + logger.warning(f"BadRequest sending message: {e}") if "parse entities" in str(e).lower(): - safe_text = escape_markdown(text, version=2) + # Fallback: send as plain text without any formatting await bot.send_message( chat_id=message.chat_id, - text=safe_text, - parse_mode=ParseMode.MARKDOWN_V2, + text=text, message_thread_id=thread_id, - reply_to_message_id=message.message_id, + reply_to_message_id=reply_to_id, ) else: raise diff --git a/agent-sdk-client/handler.py b/agent-sdk-client/handler.py index 77b64f9..599de6e 100644 --- a/agent-sdk-client/handler.py +++ b/agent-sdk-client/handler.py @@ -5,6 +5,8 @@ import asyncio import json import logging +import os +import uuid from typing import Any import boto3 @@ -12,14 +14,16 @@ from telegram import Bot, Update from config import Config -from security import is_user_allowed, should_leave_group +from security import is_user_allowed, should_leave_group, verify_telegram_secret_token logger = logging.getLogger() logger.setLevel(logging.INFO) -# Reuse SQS client across invocations +# Reuse boto3 clients across invocations (Lambda container reuse) _sqs_client = None _cloudwatch_client = None +_dynamodb_resource = None +_s3_client = None def _get_sqs_client(): @@ -38,6 +42,22 @@ def _get_cloudwatch_client(): return _cloudwatch_client +def _get_dynamodb_resource(): + """Get or create DynamoDB resource singleton.""" + global _dynamodb_resource + if _dynamodb_resource is None: + _dynamodb_resource = boto3.resource('dynamodb') + return _dynamodb_resource + + +def _get_s3_client(): + """Get or create S3 client singleton.""" + global _s3_client + if _s3_client is None: + _s3_client = boto3.client('s3') + return _s3_client + + def _send_metric(metric_name: str, value: float = 1.0): """Send custom metric to CloudWatch (non-blocking).""" try: @@ -57,17 +77,28 @@ def _send_metric(metric_name: str, value: float = 1.0): def _send_to_sqs_safe(sqs, queue_url: str, message_body: dict) -> bool: - """Send message to SQS with comprehensive error handling. + """Send message to SQS FIFO queue with comprehensive error handling. + + Uses chat_id:thread_id as MessageGroupId to ensure same-session ordering. Returns: True if message sent successfully, False otherwise. """ try: + # FIFO queue requires MessageGroupId and MessageDeduplicationId + chat_id = message_body.get('chat_id') + thread_id = message_body.get('thread_id') or 'default' + message_group_id = f"{chat_id}:{thread_id}" + dedup_id = f"{chat_id}-{message_body.get('message_id')}-{uuid.uuid4().hex[:8]}" + response = sqs.send_message( - QueueUrl=queue_url, MessageBody=json.dumps(message_body) + QueueUrl=queue_url, + MessageBody=json.dumps(message_body), + MessageGroupId=message_group_id, + MessageDeduplicationId=dedup_id, ) message_id = response.get('MessageId', 'unknown') - logger.info(f"Message sent to SQS: {message_id}") + logger.info(f"Message sent to SQS: {message_id}, group: {message_group_id}") _send_metric('SQSMessageSent') return True @@ -122,73 +153,287 @@ def _send_to_sqs_safe(sqs, queue_url: str, message_body: dict) -> bool: return False -def _handle_local_command(bot: Bot, message, config: Config, cmd: str) -> bool: - """Handle local commands or unknown commands.""" - if config.is_local_command(cmd): - text = config.local_response(cmd) - else: - text = config.unknown_command_message() +# Handler type 命令处理器映射 +HANDLER_TYPE_HANDLERS = { + 'newchat': '_handle_newchat_handler', + 'start': '_handle_start_handler', + 'debug': '_handle_debug_handler', +} - try: + +def _handle_newchat_handler(bot: Bot, message, body: dict, config: Config, sqs) -> bool: + """处理 /newchat - 创建 Topic 后发 SQS 调用 Agent。 + + Returns: + True: 已完全处理 + """ + # 限制只能在 General Topic 执行 (General Topic ID 为 1 或 None) + if message.message_thread_id and message.message_thread_id != 1: asyncio.run( bot.send_message( chat_id=message.chat_id, - text=text, + text="⚠️ /newchat 只能在主频道中使用", message_thread_id=message.message_thread_id, reply_to_message_id=message.message_id, ) ) - except Exception: - logger.warning("Failed to send local command response", exc_info=True) + return True + + parts = message.text.strip().split(maxsplit=1) + prompts = parts[1] if len(parts) > 1 else '' + + if not prompts: + asyncio.run( + bot.send_message( + chat_id=message.chat_id, + text="用法: /newchat <消息内容>", + message_thread_id=message.message_thread_id, + ) + ) + return True + + asyncio.run(_handle_newchat_async(bot, message, body, config, sqs, prompts)) + return True + + +def _handle_start_handler(bot: Bot, message, body: dict, config: Config, sqs) -> bool: + """私聊 /start - 发送欢迎消息。""" + if message.chat.type != 'private': + return True + asyncio.run(bot.send_message( + chat_id=message.chat_id, + text="👋 欢迎!直接发送消息即可开始对话。", + )) + return True + + +def _handle_debug_handler(bot: Bot, message, body: dict, config: Config, sqs) -> bool: + """处理 /debug - 下载当前会话的 session 文件并发送给用户。""" + asyncio.run(_handle_debug_async(bot, message)) + return True + + +async def _handle_debug_async(bot: Bot, message) -> None: + """异步处理 /debug 命令。""" + import tempfile + from pathlib import Path + + chat_id = str(message.chat_id) + thread_id = str(message.message_thread_id) if message.message_thread_id else 'default' + + # 1. 查询 DynamoDB 获取 session_id + session_key = f"{chat_id}:{thread_id}" + session_table = os.environ.get('SESSION_TABLE') + session_bucket = os.environ.get('SESSION_BUCKET') + + if not session_table or not session_bucket: + await bot.send_message( + chat_id=message.chat_id, + text="❌ 环境变量未配置 (SESSION_TABLE/SESSION_BUCKET)", + message_thread_id=message.message_thread_id, + ) + return + + dynamodb = _get_dynamodb_resource() + table = dynamodb.Table(session_table) + + try: + response = table.get_item(Key={'session_key': session_key}) + except Exception as e: + logger.error(f"DynamoDB query failed: {e}") + await bot.send_message( + chat_id=message.chat_id, + text=f"❌ 查询会话失败: {str(e)[:100]}", + message_thread_id=message.message_thread_id, + ) + return + + if 'Item' not in response: + await bot.send_message( + chat_id=message.chat_id, + text="❌ 当前会话无历史记录", + message_thread_id=message.message_thread_id, + ) + return + + session_id = response['Item']['session_id'] + + # 2. 从 S3 下载文件 + s3 = _get_s3_client() + + files_to_send = [] + with tempfile.TemporaryDirectory() as tmpdir: + for s3_name in ['conversation.jsonl', 'debug.txt', 'todos.json']: + s3_key = f'sessions/{session_id}/{s3_name}' + local_path = Path(tmpdir) / s3_name + try: + s3.download_file(session_bucket, s3_key, str(local_path)) + files_to_send.append(local_path) + except Exception: + pass # 文件可能不存在 + + # 3. 发送文件到 Telegram + if not files_to_send: + await bot.send_message( + chat_id=message.chat_id, + text=f"❌ Session `{session_id}` 无可用文件", + parse_mode='MarkdownV2', + message_thread_id=message.message_thread_id, + ) + return + + # 转义 session_id 中的特殊字符 + escaped_session_id = session_id.replace('-', r'\-').replace('.', r'\.') + await bot.send_message( + chat_id=message.chat_id, + text=f"📦 Session: `{escaped_session_id}`", + parse_mode='MarkdownV2', + message_thread_id=message.message_thread_id, + ) + + for file_path in files_to_send: + with open(file_path, 'rb') as f: + await bot.send_document( + chat_id=message.chat_id, + document=f, + filename=file_path.name, + message_thread_id=message.message_thread_id, + ) + + +def _handle_local_command( + bot: Bot, message, body: dict, config: Config, sqs, cmd: str +) -> bool: + """处理 local command,根据配置的 type 分发。 + + Returns: + True: 已完全处理,不需要发 SQS + """ + local_cmd = config.get_local_command(cmd) + + if not local_cmd: + # 未知命令 + text = config.unknown_command_message() + try: + asyncio.run( + bot.send_message( + chat_id=message.chat_id, + text=text, + message_thread_id=message.message_thread_id, + reply_to_message_id=message.message_id, + ) + ) + except Exception: + logger.warning("Failed to send unknown command response", exc_info=True) + return True + + if local_cmd.type == 'static': + # 静态回复 + try: + asyncio.run( + bot.send_message( + chat_id=message.chat_id, + text=local_cmd.response, + message_thread_id=message.message_thread_id, + reply_to_message_id=message.message_id, + ) + ) + except Exception: + logger.warning("Failed to send static command response", exc_info=True) + + elif local_cmd.type == 'handler': + # 调用 handler 函数 + handler_name = HANDLER_TYPE_HANDLERS.get(local_cmd.handler) + if handler_name: + handler_func = globals().get(handler_name) + if handler_func: + return handler_func(bot, message, body, config, sqs) + else: + logger.error(f"Handler function {handler_name} not found") + else: + logger.error(f"Unknown handler: {local_cmd.handler}") logger.info( - 'Handled non-whitelisted command locally', + 'Handled local command', extra={ 'chat_id': message.chat_id, 'message_id': message.message_id, + 'cmd': cmd, + 'type': local_cmd.type, }, ) return True -async def _check_forum_requirements(bot: Bot, chat_id: int) -> tuple[bool, str]: - """检查群组 Topic 功能要求。 +MSG_NO_FORUM = ( + "⚠️ 群组未开启 Topics 功能\n\n" + "请按以下步骤开启:\n" + "1. 打开群组设置\n" + "2. 点击「Topics」\n" + "3. 开启 Topics 功能\n" + "4. 重新添加 Bot" +) - Returns: - (is_ok, error_message) - 如果满足要求返回 (True, ""),否则返回 (False, 错误提示) - """ +MSG_NO_PERMISSION = ( + "⚠️ Bot 缺少「管理 Topics」权限\n\n" + "请按以下步骤授权:\n" + "1. 打开群组设置 > 管理员\n" + "2. 选择此 Bot\n" + "3. 开启「Manage Topics」权限" +) + + +async def _check_forum_requirements(bot: Bot, chat_id: int) -> tuple[bool, str]: + """检查群组 Topic 功能要求。""" try: chat = await bot.get_chat(chat_id) if not chat.is_forum: - return False, ( - "⚠️ 群组未开启 Topics 功能\n\n" - "请按以下步骤开启:\n" - "1. 打开群组设置\n" - "2. 点击「Topics」\n" - "3. 开启 Topics 功能\n" - "4. 重新添加 Bot" - ) + return False, MSG_NO_FORUM me = await bot.get_me() member = await bot.get_chat_member(chat_id, me.id) if not getattr(member, 'can_manage_topics', False): - return False, ( - "⚠️ Bot 缺少「管理 Topics」权限\n\n" - "请按以下步骤授权:\n" - "1. 打开群组设置 > 管理员\n" - "2. 选择此 Bot\n" - "3. 开启「Manage Topics」权限" - ) + return False, MSG_NO_PERMISSION return True, "" except Exception as e: logger.warning(f"Failed to check forum requirements: {e}") return False, f"检查权限失败: {str(e)[:100]}" -async def _handle_newchat_command( +async def _on_bot_joined(bot: Bot, chat_id: int) -> None: + """Bot 入群时:检查 is_forum,提示授予管理员权限。""" + try: + chat = await bot.get_chat(chat_id) + if not chat.is_forum: + await bot.send_message(chat_id=chat_id, text=MSG_NO_FORUM) + _send_metric('TopicPrecheck.NoForum') + else: + await bot.send_message( + chat_id=chat_id, + text="👋 已加入群组!请将 Bot 设为管理员并授予「管理 Topics」权限。", + ) + except Exception as e: + logger.warning(f"Failed to check forum: {e}") + + +async def _on_bot_promoted(bot: Bot, chat_id: int) -> None: + """Bot 被提升为管理员时:检查权限,发送欢迎消息。""" + is_ok, error_msg = await _check_forum_requirements(bot, chat_id) + if not is_ok: + await bot.send_message(chat_id=chat_id, text=error_msg) + _send_metric('TopicPrecheck.Failed') + else: + await bot.send_message( + chat_id=chat_id, + text="👋 欢迎使用!使用 /newchat <消息> 开始新对话。", + ) + _send_metric('TopicPrecheck.Success') + + +async def _handle_newchat_async( bot: Bot, message, body: dict, config: Config, sqs, prompts: str ) -> bool: - """处理 /newchat - 创建 Topic 并发送消息到 SQS。 + """处理 /newchat 的异步部分 - 创建 Topic 并发送消息到 SQS。 Args: bot: Telegram Bot 实例 @@ -210,6 +455,23 @@ async def _handle_newchat_command( forum_topic = await bot.create_forum_topic(chat_id=chat_id, name=topic_name) new_thread_id = forum_topic.message_thread_id + # 发送确认消息到原位置(General Topic) + # Telegram 私有群 Topic 链接格式: t.me/c/// + # Topic ID 就是创建该 Topic 的服务消息 ID,所以用 thread_id 作为 message_id + internal_chat_id = str(chat_id).replace('-100', '') + topic_link = f"https://t.me/c/{internal_chat_id}/{new_thread_id}/{new_thread_id}" + + # 显示名称: 用消息前20字 + display_name = prompts[:20] + ('...' if len(prompts) > 20 else '') + + await bot.send_message( + chat_id=chat_id, + text=f'✅ 已创建新对话: {display_name}', + parse_mode='HTML', + message_thread_id=message.message_thread_id, + reply_to_message_id=message.message_id, + ) + # 使用标准 SQS 消息格式,覆盖 text 和 thread_id message_body = { 'telegram_update': body, @@ -244,6 +506,16 @@ def lambda_handler(event: dict, context: Any) -> dict: Validates Telegram message and writes to SQS queue. Returns 200 immediately without waiting for processing. """ + # Verify Telegram secret token (if configured) + headers = event.get('headers', {}) + request_token = headers.get('x-telegram-bot-api-secret-token') + expected_token = os.getenv('TELEGRAM_WEBHOOK_SECRET') + + if not verify_telegram_secret_token(request_token, expected_token): + logger.warning('Invalid or missing Telegram secret token') + _send_metric('SecurityBlock.InvalidSecretToken') + return {'statusCode': 401} + try: body = json.loads(event.get('body', '{}')) except json.JSONDecodeError: @@ -260,7 +532,6 @@ def lambda_handler(event: dict, context: Any) -> dict: logger.debug('Ignoring non-update webhook') return {'statusCode': 200} - # Handle my_chat_member event (bot added to group) if update.my_chat_member: if should_leave_group(update, config.user_whitelist): chat_id = update.my_chat_member.chat.id @@ -272,28 +543,15 @@ def lambda_handler(event: dict, context: Any) -> dict: ) _send_metric('SecurityBlock.UnauthorizedGroup') else: - # 授权群组的 Topic 预检 member_update = update.my_chat_member old_status = member_update.old_chat_member.status new_status = member_update.new_chat_member.status + chat_id = member_update.chat.id - if old_status in ('left', 'kicked') and new_status in ( - 'member', - 'administrator', - ): - chat_id = member_update.chat.id - - async def _run_topic_precheck(): - is_ok, error_msg = await _check_forum_requirements(bot, chat_id) - if not is_ok: - await bot.send_message(chat_id=chat_id, text=error_msg) - logger.warning( - "Forum requirements check failed", - extra={'chat_id': chat_id, 'error_msg': error_msg}, - ) - _send_metric('TopicPrecheck.Failed') - - asyncio.run(_run_topic_precheck()) + if old_status in ('left', 'kicked') and new_status in ('member', 'administrator'): + asyncio.run(_on_bot_joined(bot, chat_id)) + elif old_status == 'member' and new_status == 'administrator': + asyncio.run(_on_bot_promoted(bot, chat_id)) return {'statusCode': 200} message = update.message or update.edited_message @@ -316,42 +574,41 @@ async def _run_topic_precheck(): if message.chat.type in ('group', 'supergroup') and not message.chat.is_forum: return {'statusCode': 200} - cmd = config.get_command(message.text) - - # /newchat 特殊处理 - 创建 Topic 后发 SQS - if cmd == '/newchat': - # 提取 prompts:移除命令部分(包括可能的 @bot 后缀) - parts = message.text.strip().split(maxsplit=1) - prompts = parts[1] if len(parts) > 1 else '' - if not prompts: - bot.send_message( - chat_id=message.chat_id, - text="用法: /newchat <消息内容>", - message_thread_id=message.message_thread_id, - ) - return {'statusCode': 200} + # 拦截 General Topic (message_thread_id=1 或 None) 的普通消息 + if message.chat.type in ('group', 'supergroup') and message.chat.is_forum: + thread_id = message.message_thread_id + if thread_id is None or thread_id == 1: + # 仅拦截非命令消息 + if not message.text.startswith('/'): + asyncio.run(bot.send_message( + chat_id=message.chat_id, + text="⚠️ 请到具体的对话窗口中与 AI 对话,本 Topic 仅限创建新对话。\n\n使用 /newchat <消息> 创建新对话。", + message_thread_id=thread_id, + reply_to_message_id=message.message_id, + )) + return {'statusCode': 200} - sqs = _get_sqs_client() - asyncio.run(_handle_newchat_command(bot, message, body, config, sqs, prompts)) - return {'statusCode': 200} + cmd = config.get_command(message.text) + sqs = _get_sqs_client() - # 其他 local command 正常处理 + # Local command 统一处理 (包括 /newchat) if cmd and config.is_local_command(cmd): - _handle_local_command(bot, message, config, cmd) + _handle_local_command(bot, message, body, config, sqs, cmd) return {'statusCode': 200} + # 未知命令 if cmd and not config.is_agent_command(cmd): - _handle_local_command(bot, message, config, cmd) + _handle_local_command(bot, message, body, config, sqs, cmd) return {'statusCode': 200} # Write to SQS for async processing - sqs = _get_sqs_client() message_body = { 'telegram_update': body, 'chat_id': message.chat_id, 'message_id': message.message_id, 'text': message.text, 'thread_id': message.message_thread_id, + 'message_time': message.date.isoformat(), # ISO 8601格式 } success = _send_to_sqs_safe(sqs, config.queue_url, message_body) diff --git a/agent-sdk-client/requirements.txt b/agent-sdk-client/requirements.txt index ed85ad0..c6750b6 100644 --- a/agent-sdk-client/requirements.txt +++ b/agent-sdk-client/requirements.txt @@ -1,2 +1,3 @@ python-telegram-bot>=21.0 httpx>=0.27.0 +telegramify-markdown>=0.1.0 diff --git a/agent-sdk-client/security.py b/agent-sdk-client/security.py index 1478ce7..f1c4203 100644 --- a/agent-sdk-client/security.py +++ b/agent-sdk-client/security.py @@ -1,7 +1,33 @@ """Security module for Telegram Bot access control.""" +import hmac + from telegram import Update +def verify_telegram_secret_token( + request_token: str | None, expected_token: str | None +) -> bool: + """Verify X-Telegram-Bot-Api-Secret-Token header. + + Args: + request_token: Token from request header. + expected_token: Expected secret token (from env/config). + + Returns: + True if token matches or no token configured, False otherwise. + """ + # If no secret token configured, skip verification (backward compatible) + if not expected_token: + return True + + # If token configured but not provided in request, reject + if not request_token: + return False + + # Constant-time comparison to prevent timing attacks + return hmac.compare_digest(request_token, expected_token) + + def is_user_allowed(user_id: int, whitelist: list[int | str]) -> bool: """Check if user is in whitelist. diff --git a/agent-sdk-server/Dockerfile b/agent-sdk-server/Dockerfile index 43ccd94..1c306f4 100644 --- a/agent-sdk-server/Dockerfile +++ b/agent-sdk-server/Dockerfile @@ -7,11 +7,18 @@ WORKDIR ${LAMBDA_TASK_ROOT} COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv RUN ln -s /usr/local/bin/uv /usr/local/bin/uvx -# Install Claude Code CLI (npm package, requires nodejs) +# Install Node.js 20+ (required for MCP's undici dependency) +SHELL ["/bin/bash", "-o", "pipefail", "-c"] # hadolint ignore=DL3016,DL3041 -RUN dnf install -y nodejs npm && \ +RUN curl -fsSL https://rpm.nodesource.com/setup_20.x | bash - && \ + dnf install -y nodejs && \ dnf clean all && \ npm install -g @anthropic-ai/claude-code +SHELL ["/bin/sh", "-c"] + +# Fix npm cache permissions for Lambda +RUN mkdir -p /tmp/.npm && chmod -R 777 /tmp/.npm +ENV npm_config_cache=/tmp/.npm # Install Python dependencies RUN uv pip install --system boto3 claude-agent-sdk @@ -29,13 +36,14 @@ RUN chmod -R 755 /opt/claude-config/ COPY claude-config/skills/ /opt/claude-skills/ RUN chmod -R 755 /opt/claude-skills/ -# Create ~/.claude and ~/.aws directories and ensure writable -RUN mkdir -p /root/.claude/projects /root/.claude/debug /root/.claude/todos /root/.aws && \ - touch /root/.claude.json && \ - chmod -R 777 /root/.claude /root/.claude.json /root/.aws - -ENV HOME=/root -# Redirect cache to /tmp (Lambda only allows writes to /tmp) +# Lambda only allows writes to /tmp +# Set HOME=/tmp so: +# - mcp-remote writes to /tmp/.mcp-auth/ instead of /root/.mcp-auth/ +# - Claude SDK writes to /tmp/.claude/ instead of /root/.claude/ +# - AWS SDK reads from /tmp/.aws/ instead of /root/.aws/ +# Directories are created at runtime in setup_lambda_environment() +ENV HOME=/tmp ENV XDG_CACHE_HOME=/tmp/.cache +ENV npm_config_cache=/tmp/.npm CMD ["handler.lambda_handler"] diff --git a/template.yaml b/template.yaml index f8e40d3..73cc57f 100644 --- a/template.yaml +++ b/template.yaml @@ -31,6 +31,11 @@ Parameters: Default: '' Description: "(Optional) ARN for Bedrock Opus 4.5 model" NoEcho: true + TelegramWebhookSecret: + Type: String + Default: '' + Description: "(Optional) Secret token for Telegram webhook verification" + NoEcho: true Globals: Function: @@ -67,22 +72,28 @@ Resources: AttributeName: ttl Enabled: true - # SQS Task Queue + # SQS Task Queue (FIFO for session message ordering) TaskQueue: Type: AWS::SQS::Queue Properties: - QueueName: !Sub '${AWS::StackName}-TaskQueue' + QueueName: !Sub '${AWS::StackName}-TaskQueue.fifo' + FifoQueue: true + ContentBasedDeduplication: false + DeduplicationScope: messageGroup + FifoThroughputLimit: perMessageGroupId VisibilityTimeout: 900 # 15 minutes = Lambda timeout MessageRetentionPeriod: 1209600 # 14 days RedrivePolicy: deadLetterTargetArn: !GetAtt DLQueue.Arn maxReceiveCount: 3 # Retry 3 times then move to DLQ - # Dead Letter Queue + # Dead Letter Queue (FIFO to match TaskQueue) DLQueue: Type: AWS::SQS::Queue Properties: - QueueName: !Sub '${AWS::StackName}-DLQueue' + QueueName: !Sub '${AWS::StackName}-DLQueue.fifo' + FifoQueue: true + ContentBasedDeduplication: true MessageRetentionPeriod: 1209600 # 14 days # SNS Topic for alarm notifications @@ -123,7 +134,6 @@ Resources: # Bedrock model ARNs ANTHROPIC_DEFAULT_HAIKU_MODEL: !Ref BedrockHaikuModelArn ANTHROPIC_DEFAULT_SONNET_MODEL: !Ref BedrockSonnetModelArn - ANTHROPIC_DEFAULT_OPUS_4_5_MODEL: !Ref BedrockOpusModelArn ANTHROPIC_DEFAULT_OPUS_MODEL: !Ref BedrockOpusModelArn Policies: - S3CrudPolicy: @@ -147,6 +157,9 @@ Resources: AGENT_SERVER_URL: !GetAtt AgentServerFunctionUrl.FunctionUrl SDK_CLIENT_AUTH_TOKEN: !Ref AuthToken QUEUE_URL: !Ref TaskQueue + TELEGRAM_WEBHOOK_SECRET: !Ref TelegramWebhookSecret + SESSION_TABLE: !Ref SessionTable + SESSION_BUCKET: !Ref SessionBucket Policies: - SQSSendMessagePolicy: QueueName: !GetAtt TaskQueue.QueueName @@ -155,6 +168,10 @@ Resources: Action: - cloudwatch:PutMetricData Resource: '*' + - DynamoDBReadPolicy: + TableName: !Ref SessionTable + - S3ReadPolicy: + BucketName: !Ref SessionBucket Events: Webhook: Type: HttpApi