Conversation
记录 SDK 中 plugin 机制的实现细节
- Add security.py with user whitelist validation - Support my_chat_member event to auto-leave unauthorized groups - Block private messages from non-whitelisted users - Configure via [security] section in config.toml - Default: user_whitelist = ["all"] (allow everyone)
There was a problem hiding this comment.
Pull request overview
This pull request adds a user whitelist security module to control access to a Telegram bot. The feature allows bot administrators to specify which Telegram user IDs can add the bot to groups and send private messages, with a default of allowing everyone for backward compatibility.
Changes:
- New security module with whitelist validation functions
- Configuration system extended to load user whitelist from TOML
- Handler updated to block unauthorized private messages and auto-leave groups added by unauthorized users
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| docs/authropic-agent-sdk-official/plugin-in-sdk.md | Complete documentation file about plugin loading - appears unrelated to security feature |
| agent-sdk-client/security.py | New module implementing whitelist checks and group leave logic |
| agent-sdk-client/config.py | Extended to load and validate user_whitelist from config file with fallback defaults |
| agent-sdk-client/config.toml | Added [security] section with user_whitelist configuration and documentation |
| agent-sdk-client/handler.py | Integrated security checks for private messages and my_chat_member events |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| whitelist = security.get('user_whitelist', ['all']) | ||
| if not isinstance(whitelist, list): | ||
| logger.warning("user_whitelist is not a list; using default ['all']") | ||
| whitelist = ['all'] | ||
| else: | ||
| validated = [] | ||
| for item in whitelist: | ||
| if item == 'all': | ||
| validated.append('all') | ||
| elif isinstance(item, int): | ||
| validated.append(item) | ||
| else: | ||
| logger.warning(f"Invalid whitelist entry: {item}; skipping") | ||
| whitelist = validated if validated else ['all'] |
There was a problem hiding this comment.
The whitelist validation logic has an edge case issue. If a user provides a whitelist that contains only invalid entries (e.g., strings other than 'all', floats, etc.), all entries will be skipped, resulting in an empty validated list. The code then falls back to ['all'], which effectively disables security even though the user explicitly configured a whitelist. This could be a security issue as it silently fails open rather than closed. Consider either rejecting the configuration entirely with an error, or keeping the empty list to deny all access when all entries are invalid.
| whitelist = security.get('user_whitelist', ['all']) | |
| if not isinstance(whitelist, list): | |
| logger.warning("user_whitelist is not a list; using default ['all']") | |
| whitelist = ['all'] | |
| else: | |
| validated = [] | |
| for item in whitelist: | |
| if item == 'all': | |
| validated.append('all') | |
| elif isinstance(item, int): | |
| validated.append(item) | |
| else: | |
| logger.warning(f"Invalid whitelist entry: {item}; skipping") | |
| whitelist = validated if validated else ['all'] | |
| raw_whitelist = security.get('user_whitelist', None) | |
| if raw_whitelist is None: | |
| # No whitelist configured; default to allow all | |
| whitelist = ['all'] | |
| elif not isinstance(raw_whitelist, list): | |
| logger.warning("user_whitelist is not a list; using default ['all']") | |
| whitelist = ['all'] | |
| else: | |
| validated: list[int | str] = [] | |
| for item in raw_whitelist: | |
| if item == 'all': | |
| validated.append('all') | |
| elif isinstance(item, int): | |
| validated.append(item) | |
| else: | |
| logger.warning(f"Invalid whitelist entry: {item}; skipping") | |
| if not validated: | |
| # Explicit whitelist provided but no valid entries; deny all access | |
| logger.warning( | |
| "user_whitelist is empty after validation; denying all access" | |
| ) | |
| whitelist = validated |
| if old_status in ('left', 'kicked') and new_status in ('member', 'administrator'): | ||
| inviter_id = member_update.from_user.id | ||
| return not is_user_allowed(inviter_id, whitelist) | ||
|
|
There was a problem hiding this comment.
The function only checks for status transitions from 'left' or 'kicked' to 'member' or 'administrator', but doesn't handle the case where the bot's status is upgraded from 'member' to 'administrator' by someone other than the original inviter. If an unauthorized user promotes the bot to admin in a group where it was invited by an authorized user, the bot won't leave. Consider whether permission changes should also be validated against the whitelist.
| # Bot being promoted to administrator by a (potentially) unauthorized user | |
| if old_status not in ('administrator', 'creator') and new_status == 'administrator': | |
| inviter_id = member_update.from_user.id | |
| return not is_user_allowed(inviter_id, whitelist) |
| if should_leave_group(update, config.user_whitelist): | ||
| chat_id = update.my_chat_member.chat.id | ||
| inviter_id = update.my_chat_member.from_user.id | ||
| asyncio.run(bot.leave_chat(chat_id)) |
There was a problem hiding this comment.
Using asyncio.run() in a Lambda handler that's already in a synchronous context can create problems. The bot.leave_chat() call is async, but wrapping it in asyncio.run() creates a new event loop each time. This works but is inefficient. Additionally, there's no error handling if the leave_chat operation fails (e.g., network issues, bot lacks permissions). Consider either:
- Wrapping the entire operation in a try-except block to handle potential exceptions
- Using a synchronous approach if the telegram library supports it
- At minimum, catching and logging any exceptions from the leave_chat call to prevent the Lambda from failing
| asyncio.run(bot.leave_chat(chat_id)) | |
| try: | |
| asyncio.run(bot.leave_chat(chat_id)) | |
| except Exception: | |
| logger.exception( | |
| "Failed to leave unauthorized group", | |
| extra={'chat_id': chat_id, 'inviter_id': inviter_id}, | |
| ) |
| # Check private message whitelist | ||
| if message.chat.type == 'private': | ||
| user_id = message.from_user.id if message.from_user else None | ||
| if user_id and not is_user_allowed(user_id, config.user_whitelist): | ||
| logger.info( | ||
| f"Blocked private message from unauthorized user", | ||
| extra={'user_id': user_id}, | ||
| ) | ||
| _send_metric('SecurityBlock.UnauthorizedPrivate') | ||
| return {'statusCode': 200} |
There was a problem hiding this comment.
The security whitelist check only applies to private messages (chat.type == 'private'), but group messages are not filtered by user whitelist. This means that even if a bot leaves a group when added by an unauthorized user, if the bot is added to a group by an authorized user, all members of that group can interact with the bot regardless of whether they're in the whitelist. Consider whether this is the intended behavior or if group messages should also be filtered by the sender's user_id against the whitelist.
| # Check private message whitelist | |
| if message.chat.type == 'private': | |
| user_id = message.from_user.id if message.from_user else None | |
| if user_id and not is_user_allowed(user_id, config.user_whitelist): | |
| logger.info( | |
| f"Blocked private message from unauthorized user", | |
| extra={'user_id': user_id}, | |
| ) | |
| _send_metric('SecurityBlock.UnauthorizedPrivate') | |
| return {'statusCode': 200} | |
| # Check message whitelist (applies to both private and group chats) | |
| user_id = message.from_user.id if message.from_user else None | |
| if user_id and not is_user_allowed(user_id, config.user_whitelist): | |
| chat_type = getattr(message.chat, "type", None) | |
| metric_name = ( | |
| 'SecurityBlock.UnauthorizedPrivate' | |
| if chat_type == 'private' | |
| else 'SecurityBlock.UnauthorizedGroupMessage' | |
| ) | |
| logger.info( | |
| "Blocked message from unauthorized user", | |
| extra={ | |
| 'user_id': user_id, | |
| 'chat_id': getattr(message.chat, "id", None), | |
| 'chat_type': chat_type, | |
| }, | |
| ) | |
| _send_metric(metric_name) | |
| return {'statusCode': 200} |
| else: | ||
| validated = [] | ||
| for item in whitelist: | ||
| if item == 'all': |
There was a problem hiding this comment.
The validation logic checks for the string 'all' (line 71) but doesn't validate that it's actually the string 'all' - it just checks equality. This means if someone provides user_whitelist = ["all", "ALL", "All"], only the lowercase "all" would be recognized. Consider normalizing the string to lowercase before comparison, or documenting that only lowercase "all" is valid. Similarly, the documentation in config.toml should clarify that "all" must be lowercase.
| if item == 'all': | |
| if isinstance(item, str) and item.lower() == 'all': |
|
|
||
| Returns: | ||
| True if user is allowed, False otherwise. | ||
| """ |
There was a problem hiding this comment.
The function doesn't explicitly handle the case of an empty whitelist ([]). If the whitelist is empty (which shouldn't happen due to the fallback to ['all'] in config loading, but could happen if called programmatically), the function would return False for all users, which is safe but might be unexpected. Consider adding explicit handling or documentation for this edge case.
| """ | |
| """ | |
| # Explicitly handle empty whitelist: deny access to all users. | |
| if not whitelist: | |
| return False |
| """Security module for Telegram Bot access control.""" | ||
| from telegram import Update | ||
|
|
||
|
|
||
| def is_user_allowed(user_id: int, whitelist: list[int | str]) -> bool: | ||
| """Check if user is in whitelist. | ||
|
|
||
| Args: | ||
| user_id: Telegram user ID to check. | ||
| whitelist: List of allowed user IDs, or ['all'] to allow everyone. | ||
|
|
||
| Returns: | ||
| True if user is allowed, False otherwise. | ||
| """ | ||
| if 'all' in whitelist: | ||
| return True | ||
| return user_id in whitelist | ||
|
|
||
|
|
||
| def should_leave_group(update: Update, whitelist: list[int | str]) -> bool: | ||
| """Check if bot should leave a group based on who added it. | ||
|
|
||
| Args: | ||
| update: Telegram Update object with my_chat_member event. | ||
| whitelist: List of allowed user IDs who can add bot to groups. | ||
|
|
||
| Returns: | ||
| True if bot should leave (added by unauthorized user), False otherwise. | ||
| """ | ||
| if not update.my_chat_member: | ||
| return False | ||
|
|
||
| member_update = update.my_chat_member | ||
| old_status = member_update.old_chat_member.status | ||
| new_status = member_update.new_chat_member.status | ||
|
|
||
| # Bot being added to group (status changed from left/kicked to member/administrator) | ||
| if old_status in ('left', 'kicked') and new_status in ('member', 'administrator'): | ||
| inviter_id = member_update.from_user.id | ||
| return not is_user_allowed(inviter_id, whitelist) | ||
|
|
||
| return False |
There was a problem hiding this comment.
The new security functionality lacks test coverage. The repository has comprehensive tests for config loading (test_command_config.py), but no tests are included for:
- The is_user_allowed function with various whitelist configurations
- The should_leave_group function with different chat member status transitions
- Integration of security checks in the handler
- Edge cases like empty whitelists, mixed valid/invalid entries, etc.
Consider adding tests for the new security module to ensure the whitelist logic works correctly.
|
@copilot open a new pull request to apply changes based on the comments in this thread |
Summary
security.pymodule for Telegram Bot access controlmy_chat_memberevent to auto-leave groups when added by unauthorized users[security]section inconfig.tomlFeatures
user_whitelist: List of allowed Telegram user IDs["all"]allows everyone (backward compatible)user_whitelist = [123456789, 987654321]Changes
agent-sdk-client/security.py- New security moduleagent-sdk-client/config.py- Add whitelist loadingagent-sdk-client/config.toml- Add security sectionagent-sdk-client/handler.py- Add security checksTest plan
["all"](should work for everyone)