-
Notifications
You must be signed in to change notification settings - Fork 1
feat: add user whitelist security module #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -28,20 +28,27 @@ def extract_command(text: Optional[str]) -> Optional[str]: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return command | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], dict[str, str]]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Load agent/local commands from TOML config file.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], dict[str, str], list[int | str]]: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Load commands and security config from TOML config file. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Tuple of (agent_commands, local_commands, user_whitelist). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not config_path.exists(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return [], {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return [], {}, ['all'] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with config_path.open('rb') as f: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| data = tomllib.load(f) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Load agent commands | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| agent_commands = data.get('agent_commands', {}).get('commands', []) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not isinstance(agent_commands, list): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning("Agent commands config is not a list; ignoring configuration") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| agent_commands = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| agent_commands = [cmd for cmd in agent_commands if isinstance(cmd, str)] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Load local commands | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| local_commands_raw = data.get('local_commands', {}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not isinstance(local_commands_raw, dict): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning("Local commands config is not a table; ignoring configuration") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -52,10 +59,28 @@ def _load_config(config_path: Path = DEFAULT_CONFIG_PATH) -> tuple[list[str], di | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if isinstance(name, str) and isinstance(value, str) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return agent_commands, local_commands | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Load security whitelist | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| security = data.get('security', {}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| whitelist = security.get('user_whitelist', ['all']) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not isinstance(whitelist, list): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning("user_whitelist is not a list; using default ['all']") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| whitelist = ['all'] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| validated = [] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for item in whitelist: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if item == 'all': | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| validated.append('all') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| elif isinstance(item, int): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| validated.append(item) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.warning(f"Invalid whitelist entry: {item}; skipping") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| whitelist = validated if validated else ['all'] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+64
to
+77
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| whitelist = security.get('user_whitelist', ['all']) | |
| if not isinstance(whitelist, list): | |
| logger.warning("user_whitelist is not a list; using default ['all']") | |
| whitelist = ['all'] | |
| else: | |
| validated = [] | |
| for item in whitelist: | |
| if item == 'all': | |
| validated.append('all') | |
| elif isinstance(item, int): | |
| validated.append(item) | |
| else: | |
| logger.warning(f"Invalid whitelist entry: {item}; skipping") | |
| whitelist = validated if validated else ['all'] | |
| raw_whitelist = security.get('user_whitelist', None) | |
| if raw_whitelist is None: | |
| # No whitelist configured; default to allow all | |
| whitelist = ['all'] | |
| elif not isinstance(raw_whitelist, list): | |
| logger.warning("user_whitelist is not a list; using default ['all']") | |
| whitelist = ['all'] | |
| else: | |
| validated: list[int | str] = [] | |
| for item in raw_whitelist: | |
| if item == 'all': | |
| validated.append('all') | |
| elif isinstance(item, int): | |
| validated.append(item) | |
| else: | |
| logger.warning(f"Invalid whitelist entry: {item}; skipping") | |
| if not validated: | |
| # Explicit whitelist provided but no valid entries; deny all access | |
| logger.warning( | |
| "user_whitelist is empty after validation; denying all access" | |
| ) | |
| whitelist = validated |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Receives Telegram webhook, writes to SQS, returns 200 immediately. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import json | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Any | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -11,6 +12,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from telegram import Bot, Update | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from config import Config | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from security import is_user_allowed, should_leave_group | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger = logging.getLogger() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.setLevel(logging.INFO) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -169,11 +171,35 @@ def lambda_handler(event: dict, context: Any) -> dict: | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| logger.debug('Ignoring non-update webhook') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return {'statusCode': 200} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Handle my_chat_member event (bot added to group) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if update.my_chat_member: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if should_leave_group(update, config.user_whitelist): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| chat_id = update.my_chat_member.chat.id | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| inviter_id = update.my_chat_member.from_user.id | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| asyncio.run(bot.leave_chat(chat_id)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| asyncio.run(bot.leave_chat(chat_id)) | |
| try: | |
| asyncio.run(bot.leave_chat(chat_id)) | |
| except Exception: | |
| logger.exception( | |
| "Failed to leave unauthorized group", | |
| extra={'chat_id': chat_id, 'inviter_id': inviter_id}, | |
| ) |
Copilot
AI
Jan 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The security whitelist check only applies to private messages (chat.type == 'private'), but group messages are not filtered by user whitelist. This means that even if a bot leaves a group when added by an unauthorized user, if the bot is added to a group by an authorized user, all members of that group can interact with the bot regardless of whether they're in the whitelist. Consider whether this is the intended behavior or if group messages should also be filtered by the sender's user_id against the whitelist.
| # Check private message whitelist | |
| if message.chat.type == 'private': | |
| user_id = message.from_user.id if message.from_user else None | |
| if user_id and not is_user_allowed(user_id, config.user_whitelist): | |
| logger.info( | |
| f"Blocked private message from unauthorized user", | |
| extra={'user_id': user_id}, | |
| ) | |
| _send_metric('SecurityBlock.UnauthorizedPrivate') | |
| return {'statusCode': 200} | |
| # Check message whitelist (applies to both private and group chats) | |
| user_id = message.from_user.id if message.from_user else None | |
| if user_id and not is_user_allowed(user_id, config.user_whitelist): | |
| chat_type = getattr(message.chat, "type", None) | |
| metric_name = ( | |
| 'SecurityBlock.UnauthorizedPrivate' | |
| if chat_type == 'private' | |
| else 'SecurityBlock.UnauthorizedGroupMessage' | |
| ) | |
| logger.info( | |
| "Blocked message from unauthorized user", | |
| extra={ | |
| 'user_id': user_id, | |
| 'chat_id': getattr(message.chat, "id", None), | |
| 'chat_type': chat_type, | |
| }, | |
| ) | |
| _send_metric(metric_name) | |
| return {'statusCode': 200} |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,42 @@ | ||||||||||||||
| """Security module for Telegram Bot access control.""" | ||||||||||||||
| from telegram import Update | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| def is_user_allowed(user_id: int, whitelist: list[int | str]) -> bool: | ||||||||||||||
| """Check if user is in whitelist. | ||||||||||||||
|
|
||||||||||||||
| Args: | ||||||||||||||
| user_id: Telegram user ID to check. | ||||||||||||||
| whitelist: List of allowed user IDs, or ['all'] to allow everyone. | ||||||||||||||
|
|
||||||||||||||
| Returns: | ||||||||||||||
| True if user is allowed, False otherwise. | ||||||||||||||
| """ | ||||||||||||||
|
||||||||||||||
| """ | |
| """ | |
| # Explicitly handle empty whitelist: deny access to all users. | |
| if not whitelist: | |
| return False |
Copilot
AI
Jan 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function only checks for status transitions from 'left' or 'kicked' to 'member' or 'administrator', but doesn't handle the case where the bot's status is upgraded from 'member' to 'administrator' by someone other than the original inviter. If an unauthorized user promotes the bot to admin in a group where it was invited by an authorized user, the bot won't leave. Consider whether permission changes should also be validated against the whitelist.
| # Bot being promoted to administrator by a (potentially) unauthorized user | |
| if old_status not in ('administrator', 'creator') and new_status == 'administrator': | |
| inviter_id = member_update.from_user.id | |
| return not is_user_allowed(inviter_id, whitelist) |
Copilot
AI
Jan 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new security functionality lacks test coverage. The repository has comprehensive tests for config loading (test_command_config.py), but no tests are included for:
- The is_user_allowed function with various whitelist configurations
- The should_leave_group function with different chat member status transitions
- Integration of security checks in the handler
- Edge cases like empty whitelists, mixed valid/invalid entries, etc.
Consider adding tests for the new security module to ensure the whitelist logic works correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation logic checks for the string 'all' (line 71) but doesn't validate that it's actually the string 'all' - it just checks equality. This means if someone provides user_whitelist = ["all", "ALL", "All"], only the lowercase "all" would be recognized. Consider normalizing the string to lowercase before comparison, or documenting that only lowercase "all" is valid. Similarly, the documentation in config.toml should clarify that "all" must be lowercase.