diff --git a/README.rst b/README.rst index d42c88dc..0b0cdc5b 100644 --- a/README.rst +++ b/README.rst @@ -565,6 +565,10 @@ e.g.: Enable this option if the bot API is running in ``--local`` mode and is using the same file system with ETM. +- ``topic_group`` *(str)* [Default: ``null``] + + Send message to this topic group, per chat per topic + Network configuration: timeout tweaks ------------------------------------- diff --git a/efb_telegram_master/__init__.py b/efb_telegram_master/__init__.py index 8331ac8a..6c3f3a18 100644 --- a/efb_telegram_master/__init__.py +++ b/efb_telegram_master/__init__.py @@ -128,6 +128,7 @@ def __init__(self, instance_id: InstanceID = None): self.commands: CommandsManager = CommandsManager(self) self.chat_binding: ChatBindingManager = ChatBindingManager(self) self.slave_messages: SlaveMessageProcessor = SlaveMessageProcessor(self) + self.topic_group: Optional[TelegramChatID] = TelegramChatID(self.flag('topic_group')) if not self.flag('auto_locale'): self.translator = translation("efb_telegram_master", @@ -204,14 +205,47 @@ def info(self, update: Update, context: CallbackContext): assert isinstance(update, Update) assert isinstance(update.effective_message, Message) if update.effective_message.chat.type != telegram.Chat.PRIVATE: # Group message - msg = self.info_group(update) + if update.effective_chat.is_forum: + msg = self.info_topic(update) + else: + msg = self.info_group(update) elif update.effective_message.forward_from_chat and \ update.effective_message.forward_from_chat.type == 'channel': # Forwarded channel command. msg = self.info_channel(update) else: # Talking to the bot. msg = self.info_general() - update.effective_message.reply_text(msg) + if len(msg) > 4095: + for x in range(0, len(msg), 4095): + update.effective_message.reply_text(msg[x:x+4095]) + else: + update.effective_message.reply_text(msg) + + def info_topic(self, update: Update): + """Generate string for chat linking info of a topic.""" + assert isinstance(update, Update) + assert isinstance(update.effective_message, Message) + + links = self.db.get_topic_slaves(topic_chat_id=update.effective_message.chat_id) + thread_id = update.effective_message.message_thread_id + if thread_id: + chat = None + for (dest, topic_id) in links: + if topic_id == thread_id: + chat = dest + break + if chat is None: + return "This chat is not managed by this bot" + else: + links = [chat] + else: + links = [c for c, t in links] + + msg = self._("The topic {topic_name} ({topic_id}) is linked to:").format( + topic_name=update.effective_message.chat.title, + topic_id=update.effective_message.chat_id) + msg += self.build_link_chats_info_str(links) + return msg def info_general(self): """Generate string for information of the current running EFB instance.""" @@ -322,9 +356,10 @@ def start(self, update: Update, context: CallbackContext): assert isinstance(update.effective_message, telegram.Message) assert isinstance(update.effective_chat, telegram.Chat) if context.args: # Group binding command - if update.effective_message.chat.type != telegram.Chat.PRIVATE or \ + if (update.effective_message.chat.type != telegram.Chat.PRIVATE and update.effective_chat.id != self.topic_group) or \ (update.effective_message.forward_from_chat and - update.effective_message.forward_from_chat.type == telegram.Chat.CHANNEL): + update.effective_message.forward_from_chat.type == telegram.Chat.CHANNEL and + update.effective_message.forward_from_chat.id != self.topic_group): self.chat_binding.link_chat(update, context.args) else: self.bot_manager.send_message(update.effective_chat.id, diff --git a/efb_telegram_master/bot_manager.py b/efb_telegram_master/bot_manager.py index 116633c7..97558d16 100644 --- a/efb_telegram_master/bot_manager.py +++ b/efb_telegram_master/bot_manager.py @@ -10,7 +10,7 @@ import telegram.constants import telegram.error from retrying import retry -from telegram import Update, InputFile, User, File +from telegram import Update, InputFile, User, File, ForumTopic from telegram.ext import CallbackContext, Filters, MessageHandler, Updater, Dispatcher from .locale_handler import LocaleHandler @@ -122,6 +122,28 @@ def caption_affix(self, *args, **kwargs): return caption_affix + @classmethod + def retry_on_topic_closed(cls, fn: Callable): + @wraps(fn) + def wrap(self: 'TelegramBotManager', *args, **kwargs): + try: + return fn(self, *args, **kwargs) + except telegram.error.BadRequest as e: + if "Topic_closed" in e.message: + if 'chat_id' in kwargs: + chat_id = kwargs['chat_id'] + else: + chat_id = args[0] + message_thread_id = kwargs.get('message_thread_id', None) + self.reopen_forum_topic( + chat_id=chat_id, + message_thread_id=message_thread_id + ) + return fn(self, *args, **kwargs) + else: + raise e + return wrap + @classmethod def retry_on_chat_migration(cls, fn: Callable): @wraps(fn) @@ -182,6 +204,7 @@ def __init__(self, channel: 'TelegramChannel'): @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_message(self, *args, prefix: str = '', suffix: str = '', **kwargs): """ Send text message. @@ -232,6 +255,7 @@ def send_message(self, *args, prefix: str = '', suffix: str = '', **kwargs): @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def edit_message_text(self, prefix='', suffix='', **kwargs): """ Edit text message. @@ -313,6 +337,7 @@ def _bot_edit_message_text_fallback(self, *args, **kwargs): @Decorators.retry_on_timeout @Decorators.caption_affix_decorator @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_audio(self, *args, **kwargs): """ Send an audio file. @@ -337,6 +362,7 @@ def send_audio(self, *args, **kwargs): @Decorators.retry_on_timeout @Decorators.caption_affix_decorator @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_voice(self, *args, **kwargs): """ Send an voice message. @@ -361,6 +387,7 @@ def send_voice(self, *args, **kwargs): @Decorators.retry_on_timeout @Decorators.caption_affix_decorator @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_video(self, *args, **kwargs): """ Send an voice message. @@ -385,6 +412,7 @@ def send_video(self, *args, **kwargs): @Decorators.retry_on_timeout @Decorators.caption_affix_decorator @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_document(self, *args, **kwargs): """ Send a document. @@ -404,6 +432,7 @@ def send_document(self, *args, **kwargs): @Decorators.retry_on_timeout @Decorators.caption_affix_decorator @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_animation(self, *args, **kwargs): """ Send a document. @@ -423,6 +452,7 @@ def send_animation(self, *args, **kwargs): @Decorators.retry_on_timeout @Decorators.caption_affix_decorator @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_photo(self, *args, **kwargs): """ Send a document. @@ -444,31 +474,40 @@ def send_photo(self, *args, **kwargs): @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_chat_action(self, *args, **kwargs): + message_thread_id = kwargs.pop('message_thread_id', None) + if message_thread_id != None: + kwargs['api_kwargs'] = { "message_thread_id": message_thread_id} return self.updater.bot.send_chat_action(*args, **kwargs) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def edit_message_reply_markup(self, *args, **kwargs): return self.updater.bot.edit_message_reply_markup(*args, **kwargs) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_location(self, *args, **kwargs): return self.updater.bot.send_location(*args, **kwargs) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_venue(self, *args, **kwargs): return self.updater.bot.send_venue(*args, **kwargs) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def send_sticker(self, *args, **kwargs): return self.updater.bot.send_sticker(*args, **kwargs) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def get_me(self, *args, **kwargs): return self.updater.bot.get_me(*args, **kwargs) @@ -485,11 +524,13 @@ def session_expired(self, update: Update, context: CallbackContext): @Decorators.retry_on_timeout @Decorators.caption_affix_decorator @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def edit_message_caption(self, *args, **kwargs): return self.updater.bot.edit_message_caption(*args, **kwargs) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def edit_message_media(self, *args, **kwargs): return self.updater.bot.edit_message_media(*args, **kwargs) @@ -505,16 +546,19 @@ def reply_error(self, update, errmsg): @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def get_file(self, file_id: str) -> File: return self.updater.bot.get_file(file_id) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def delete_message(self, chat_id, message_id): return self.updater.bot.delete_message(chat_id, message_id) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def answer_callback_query(self, *args, prefix="", suffix="", text=None, message_id=None, **kwargs): if text is None: @@ -545,16 +589,41 @@ def answer_callback_query(self, *args, prefix="", suffix="", text=None, @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed + def get_chat_info(self, *args, **kwargs): + return self.updater.bot.get_chat(*args, **kwargs) + + @Decorators.retry_on_timeout + @Decorators.retry_on_chat_migration + def create_forum_topic(self, *args, **kwargs) -> ForumTopic: + return self.updater.bot.create_forum_topic(*args, **kwargs) + + @Decorators.retry_on_timeout + @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed + def edit_forum_topic(self, *args, **kwargs): + return self.updater.bot.edit_forum_topic(*args, **kwargs) + + @Decorators.retry_on_timeout + @Decorators.retry_on_chat_migration + def reopen_forum_topic(self, *args, **kwargs) -> bool: + return self.updater.bot.reopen_forum_topic(*args, **kwargs) + + @Decorators.retry_on_timeout + @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def set_chat_title(self, *args, **kwargs): return self.updater.bot.set_chat_title(*args, **kwargs) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def set_chat_photo(self, *args, **kwargs): return self.updater.bot.set_chat_photo(*args, **kwargs) @Decorators.retry_on_timeout @Decorators.retry_on_chat_migration + @Decorators.retry_on_topic_closed def set_chat_description(self, *args, **kwargs): return self.updater.bot.set_chat_description(*args, **kwargs) diff --git a/efb_telegram_master/chat_binding.py b/efb_telegram_master/chat_binding.py index b6accf56..60ec2851 100644 --- a/efb_telegram_master/chat_binding.py +++ b/efb_telegram_master/chat_binding.py @@ -5,6 +5,7 @@ import logging import re import urllib.parse +import threading from contextlib import suppress from typing import Tuple, Dict, Optional, List, TYPE_CHECKING, IO, Union, Pattern @@ -27,7 +28,7 @@ from .locale_mixin import LocaleMixin from .message import ETMMsg from .msg_type import TGMsgType -from .utils import EFBChannelChatIDStr, TelegramChatID, TelegramMessageID, TgChatMsgIDStr +from .utils import EFBChannelChatIDStr, TelegramChatID, TelegramMessageID, TgChatMsgIDStr, TelegramTopicID if TYPE_CHECKING: from . import TelegramChannel @@ -97,6 +98,7 @@ def __init__(self, channel: 'TelegramChannel'): self.bot: 'TelegramBotManager' = channel.bot_manager self.db: 'DatabaseManager' = channel.db self.chat_manager: 'ChatObjectCacheManager' = channel.chat_manager + self._topic_mutex = threading.Lock() # Link handler non_edit_filter = Filters.update.message | Filters.update.channel_post @@ -148,6 +150,7 @@ def __init__(self, channel: 'TelegramChannel'): # Update group title and profile picture self.bot.dispatcher.add_handler(CommandHandler('update_info', self.update_group_info)) + self.bot.dispatcher.add_handler(CommandHandler('init_topics', self.topic_migration)) self.bot.dispatcher.add_handler( MessageHandler(Filters.status_update.migrate, self.chat_migration)) @@ -223,6 +226,22 @@ def link_chat_show_list(self, update: Update, context: CallbackContext): self.link_handler.conversations[storage_id] = Flags.LINK_EXEC self.msg_storage[storage_id] = ChatListStorage([chat]) return self.build_link_action_message(chat, tg_chat_id, tg_msg_id) + if message.message_thread_id: + topic = message.message_thread_id + if topic: + slave_origin_uid = self.db.get_topic_slave( + topic_chat_id=TelegramChatID(message.chat_id), + message_thread_id=topic + ) + if slave_origin_uid: + channel_id, chat_id, _ = utils.chat_id_str_to_id(slave_origin_uid) + chat: ETMChatType = self.chat_manager.get_chat(channel_id, chat_id, build_dummy=True) + tg_chat_id = TelegramChatID(message.chat_id) + tg_msg_id = TelegramMessageID(message.reply_text(self._("Processing...")).message_id) + storage_id: Tuple[TelegramChatID, TelegramMessageID] = (tg_chat_id, tg_msg_id) + self.link_handler.conversations[storage_id] = Flags.LINK_EXEC + self.msg_storage[storage_id] = ChatListStorage([chat]) + return self.build_link_action_message(chat, tg_chat_id, tg_msg_id) if message.chat.type != telegram.Chat.PRIVATE: links = self.db.get_chat_assoc( @@ -475,7 +494,7 @@ def build_link_action_message(self, chat: ETMChatType, def link_chat_exec(self, update: Update, context: CallbackContext) -> int: """ - Action to link a chat. Triggered by callback message with status `Flags.EXEC_LINK`. + Action to link a chat. Triggered by callback message with status `Flags.LINK_EXEC`. """ assert isinstance(update, Update) assert update.effective_chat @@ -546,6 +565,7 @@ def link_chat(self, update: Update, args: Optional[List[str]]): chat: ETMChatType = data.chats[0] chat_display_name = chat.full_name slave_channel, slave_chat_uid = chat.module_id, chat.uid + chat_uid = utils.chat_id_to_str(slave_channel, slave_chat_uid) try: coordinator.get_module_by_id(slave_channel) except NameError: @@ -558,14 +578,28 @@ def link_chat(self, update: Update, args: Optional[List[str]]): # Use channel ID if command is forwarded from a channel. forwarded_chat = update.effective_message.forward_from_chat if forwarded_chat and forwarded_chat.type == telegram.Chat.CHANNEL: - tg_chat_to_link = forwarded_chat.id + tg_chat_to_link = forwarded_chat else: - tg_chat_to_link = update.effective_chat.id + tg_chat_to_link = update.effective_chat txt = self._('Trying to link chat {0}...').format(chat_display_name) - msg = self.bot.send_message(tg_chat_to_link, text=txt) + msg = self.bot.send_message(tg_chat_to_link.id, text=txt) - chat.link(self.channel.channel_id, ChatID(str(tg_chat_to_link)), self.channel.flag("multiple_slave_chats")) + chat.link(self.channel.channel_id, ChatID(str(tg_chat_to_link.id)), self.channel.flag("multiple_slave_chats")) + self.db.remove_topic_assoc( + slave_uid=chat_uid, + ) + + if tg_chat_to_link.is_forum: + thread_id = self.create_topic(slave_uid=chat_uid, telegram_chat_id=TelegramChatID(tg_chat_to_link.id)) + if not thread_id: + msg.reply_text( + self._( + "Failed to create topic for {name} in the group.\n" + "Please make sure the bot has the right.\n" + "You can send /init_topics to create again." + ).format(name=chat_display_name), + reply_to_message_id=msg.message_id) txt = self._("Chat {0} is now linked.").format(chat_display_name) self.bot.edit_message_text(text=txt, chat_id=msg.chat.id, message_id=msg.message_id) @@ -866,13 +900,18 @@ def update_group_info(self, update: Update, context: CallbackContext): if update.effective_chat.type == telegram.Chat.PRIVATE: return self.bot.reply_error(update, self._('Send /update_info to a group where this bot is a group admin ' 'to update group title, description and profile picture.')) + + if update.effective_chat.is_forum: + return self.update_thread_info(update, context) + forwarded_from_chat = update.effective_message.forward_from_chat if forwarded_from_chat and forwarded_from_chat.type == telegram.Chat.CHANNEL: - tg_chat = forwarded_from_chat.id + tg_chat = forwarded_from_chat else: - tg_chat = update.effective_chat.id + tg_chat = update.effective_chat + chats = self.db.get_chat_assoc(master_uid=utils.chat_id_to_str(channel=self.channel, - chat_uid=ChatID(str(tg_chat)))) + chat_uid=ChatID(str(tg_chat.id)))) if len(chats) != 1: return self.bot.reply_error(update, self.ngettext('This only works in a group linked with one chat. ' 'Currently {0} chat linked to this group.', @@ -890,7 +929,7 @@ def update_group_info(self, update: Update, context: CallbackContext): try: chat = self.chat_manager.update_chat_obj(channel.get_chat(chat_uid), full_update=True) - self.bot.set_chat_title(tg_chat, self.truncate_ellipsis(chat.chat_title, self.MAX_LEN_CHAT_TITLE)) + self.bot.set_chat_title(tg_chat.id, self.truncate_ellipsis(chat.chat_title, self.MAX_LEN_CHAT_TITLE)) # Update remote group members list to Telegram group description if available desc = chat.description @@ -905,7 +944,7 @@ def update_group_info(self, update: Update, context: CallbackContext): if desc: try: self.bot.set_chat_description( - tg_chat, self.truncate_ellipsis(desc, self.MAX_LEN_CHAT_DESC)) + tg_chat.id, self.truncate_ellipsis(desc, self.MAX_LEN_CHAT_DESC)) except BadRequest as e: if "Chat description is not modified" in e.message: pass @@ -930,7 +969,7 @@ def update_group_info(self, update: Update, context: CallbackContext): picture.seek(0) - self.bot.set_chat_photo(tg_chat, pic_resized or picture) + self.bot.set_chat_photo(tg_chat.id, pic_resized or picture) update.effective_message.reply_text(self._('Chat details updated.')) except EFBChatNotFound: self.logger.exception("Chat linked (%s) is not found in the slave channel " @@ -955,6 +994,82 @@ def update_group_info(self, update: Update, context: CallbackContext): if pic_resized and getattr(pic_resized, 'close', None): pic_resized.close() + def update_thread_info(self, update: Update, context: CallbackContext): + assert isinstance(update, Update) + assert update.effective_message + assert update.effective_chat + + try: + thread_id = update.effective_message.message_thread_id + if thread_id: + slave_origin_uid = self.db.get_topic_slave( + topic_chat_id=TelegramChatID(update.effective_message.chat_id), + message_thread_id=thread_id + ) + if not slave_origin_uid: + return self.bot.reply_error(update, self._("This chat is not managed by this bot. Update failed")) + channel_id, chat_id, _ = utils.chat_id_str_to_id(slave_origin_uid) + etm_chat: ETMChatType = self.chat_manager.get_chat(channel_id, chat_id, build_dummy=True) + self.bot.edit_forum_topic( + chat_id=update.effective_chat.id, + message_thread_id=thread_id, + name=self.truncate_ellipsis(etm_chat.chat_title, self.MAX_LEN_CHAT_TITLE), + icon_custom_emoji_id="" # param required by telegram + ) + update.effective_message.reply_text(self._('Chat details updated.')) + except EFBChatNotFound: + self.logger.exception("Chat linked (%s) is not found in the slave channel " + "(%s).", channel_id, chat_uid) + return self.bot.reply_error(update, self._("Chat linked ({chat_uid}) is not found in the slave channel " + "({channel_name}, {channel_id}).") + .format(channel_name=channel.channel_name, channel_id=channel_id, + chat_uid=chat_uid)) + except TelegramError as e: + if e.message == "Topic_not_modified": + update.effective_message.reply_text(self._('Chat details updated.')) + else: + self.logger.exception("Error occurred while update chat details.") + return self.bot.reply_error(update, self._('Error occurred while update chat details.\n' + '{0}'.format(e.message))) + except EFBOperationNotSupported: + return self.bot.reply_error(update, self._('No profile picture provided from this chat.')) + except Exception as e: + self.logger.exception("Unknown error caught when querying chat.") + return self.bot.reply_error(update, self._('Error occurred while update chat details. \n' + '{0}'.format(e))) + + def topic_migration(self, update: Update, context: CallbackContext): + assert isinstance(update, Update) + assert update.effective_message + + message = update.effective_message + chats = self.db.get_chat_assoc(master_uid=utils.chat_id_to_str(self.channel.channel_id, ChatID(str(message.chat.id)))) + for i in chats: + self.create_topic(slave_uid=i, telegram_chat_id=TelegramChatID(message.chat.id)) + + def create_topic(self, slave_uid: EFBChannelChatIDStr, telegram_chat_id: TelegramChatID) -> TelegramTopicID: + thread_id = self.db.get_topic_thread_id(slave_uid=slave_uid, topic_chat_id=telegram_chat_id) + if not thread_id: + with self._topic_mutex: + thread_id = self.db.get_topic_thread_id(slave_uid=slave_uid, topic_chat_id=telegram_chat_id) + if not thread_id: + channel_id, chat_id, _ = utils.chat_id_str_to_id(slave_uid) + chat: ETMChatType = self.chat_manager.get_chat(channel_id, chat_id, build_dummy=True) + try: + topic = self.bot.create_forum_topic( + chat_id=telegram_chat_id, + name=chat.chat_title + ) + thread_id = topic.message_thread_id + self.db.add_topic_assoc( + topic_chat_id=telegram_chat_id, + message_thread_id=topic.message_thread_id, + slave_uid=slave_uid, + ) + except Exception as e: + self.logger.info('Failed to create topic, Reason: %s', e) + return thread_id + def chat_migration(self, update: Update, context: CallbackContext): """Triggered by any message update with either ``migrate_from_chat_id`` or ``migrate_to_chat_id`` @@ -970,6 +1085,8 @@ def chat_migration(self, update: Update, context: CallbackContext): elif message.migrate_to_chat_id is not None: from_id = ChatID(str(message.chat.id)) to_id = ChatID(str(message.migrate_to_chat_id)) + if str(message.migrate_to_chat_id).startswith('-100') and self.bot.get_chat_info(message.migrate_to_chat_id).is_forum: + self.topic_migration(update, context) else: # Per ptb filter specs, this part of code should not be reached. return diff --git a/efb_telegram_master/db.py b/efb_telegram_master/db.py index 0a3a0669..c3f719a0 100644 --- a/efb_telegram_master/db.py +++ b/efb_telegram_master/db.py @@ -22,7 +22,7 @@ from .message import ETMMsg from .msg_type import TGMsgType from .utils import TelegramChatID, EFBChannelChatIDStr, TgChatMsgIDStr, message_id_to_str, \ - chat_id_to_str, OldMsgID, chat_id_str_to_id, TelegramMessageID + chat_id_to_str, OldMsgID, chat_id_str_to_id, TelegramMessageID, TelegramTopicID if TYPE_CHECKING: from . import TelegramChannel @@ -55,6 +55,11 @@ class Meta: database = database +class TopicAssoc(BaseModel): + topic_chat_id = TextField() + message_thread_id = TextField() + slave_uid = TextField() + class ChatAssoc(BaseModel): master_uid = TextField() slave_uid = TextField() @@ -184,7 +189,7 @@ def __init__(self, channel: 'TelegramChannel'): self.logger.debug("Database loaded.") self.logger.debug("Checking database migration...") - if not ChatAssoc.table_exists(): + if not ChatAssoc.table_exists() or not TopicAssoc.table_exists(): self._create() else: msg_log_columns = {i.name for i in database.get_columns("msglog")} @@ -207,7 +212,7 @@ def _create(): """ Initializing tables. """ - database.create_tables([ChatAssoc, MsgLog, SlaveChatInfo]) + database.create_tables([ChatAssoc, MsgLog, SlaveChatInfo, TopicAssoc]) @staticmethod def _migrate(i: int): @@ -374,6 +379,120 @@ def get_chat_assoc(master_uid: Optional[EFBChannelChatIDStr] = None, except DoesNotExist: return [] + def add_topic_assoc(self, topic_chat_id: TelegramChatID, + message_thread_id: EFBChannelChatIDStr, + slave_uid: EFBChannelChatIDStr, ): + """ + Add topic associations (topic links). + One Master channel with many Slave channel. + + Args: + topic_chat_id (TelegramChatID): The topic group chat ID + message_thread_id (EFBChannelChatIDStr): The topic thread ID + slave_uid (EFBChannelChatIDStr): Slave channel UID ("%(channel_id)s.%(chat_id)s") + """ + return TopicAssoc.create(topic_chat_id=topic_chat_id, message_thread_id=message_thread_id, slave_uid=slave_uid) + + @staticmethod + def get_topic_thread_id(slave_uid: EFBChannelChatIDStr, topic_chat_id: TelegramChatID=None) -> Optional[TelegramTopicID]: + """ + Get topic association (topic link) information. + Only one parameter is to be provided. + + Args: + topic_chat_id (TelegramChatID): The topic UID + slave_uid (EFBChannelChatIDStr): Slave channel UID ("%(channel_id)s.%(chat_id)s") + + Returns: + The message thread_id + """ + try: + if topic_chat_id: + assoc = TopicAssoc.select(TopicAssoc.message_thread_id)\ + .where(TopicAssoc.slave_uid == slave_uid, TopicAssoc.topic_chat_id == topic_chat_id)\ + .order_by(TopicAssoc.id.desc()).first() + else: + assoc = TopicAssoc.select(TopicAssoc.message_thread_id)\ + .where(TopicAssoc.slave_uid == slave_uid)\ + .order_by(TopicAssoc.id.desc()).first() + if assoc: + return TelegramTopicID(int(assoc.message_thread_id)) + except DoesNotExist: + return None + + @staticmethod + def get_topic_slave(topic_chat_id: TelegramChatID, + message_thread_id: Optional[TelegramTopicID] = None, + ) -> Optional[EFBChannelChatIDStr]: + """ + Get topic association (topic link) information. + Only one parameter is to be provided. + + Args: + topic_chat_id (TelegramChatID): The topic chat UID + message_thread_id (TelegramTopicID): The message thread ID + + Returns: + Slave channel UID ("%(channel_id)s.%(chat_id)s") + """ + try: + if message_thread_id: + return TopicAssoc.select(TopicAssoc.slave_uid)\ + .where(TopicAssoc.message_thread_id == message_thread_id, TopicAssoc.topic_chat_id == topic_chat_id).first().slave_uid + else: + return TopicAssoc.select(TopicAssoc.slave_uid)\ + .where(TopicAssoc.topic_chat_id == topic_chat_id).first().slave_uid + except DoesNotExist: + return None + except AttributeError: + return None + + @staticmethod + def get_topic_slaves(topic_chat_id: TelegramChatID) -> Optional[List[Tuple[EFBChannelChatIDStr, TelegramTopicID]]]: + """ + Get topic association (topic link) information. + Only one parameter is to be provided. + + Args: + topic_chat_id (TelegramChatID): The topic UID + + Returns: + List[Tuple[EFBChannelChatIDStr, TelegramTopicID]]: A list of tuples containing slave channel UID and message thread ID + """ + try: + query = TopicAssoc.select(TopicAssoc.slave_uid, TopicAssoc.message_thread_id)\ + .where(TopicAssoc.topic_chat_id == topic_chat_id).order_by(TopicAssoc.id.desc()) + return [(row.slave_uid, int(row.message_thread_id)) for row in query] + except DoesNotExist: + return None + except AttributeError: + return None + + @staticmethod + def remove_topic_assoc(topic_chat_id: Optional[TelegramChatID] = None, + message_thread_id: Optional[EFBChannelChatIDStr] = None, + slave_uid: Optional[EFBChannelChatIDStr] = None): + """ + Remove topic association (topic link). + + Args: + topic_chat_id (TelegramChatID): The topic group chat ID + message_thread_id (EFBChannelChatIDStr): The topic thread ID + slave_uid (EFBChannelChatIDStr): Slave channel UID ("%(channel_id)s.%(chat_id)s") + """ + try: + if bool(topic_chat_id and message_thread_id) == bool(slave_uid): + raise ValueError("Please provide either topic_chat_id and message_thread_id or slave_uid.") + elif topic_chat_id and message_thread_id: + return TopicAssoc.delete().where( + (TopicAssoc.topic_chat_id == str(topic_chat_id)) & + (TopicAssoc.message_thread_id == str(message_thread_id)) + ).execute() + elif slave_uid: + return TopicAssoc.delete().where(TopicAssoc.slave_uid == slave_uid).execute() + except DoesNotExist: + return 0 + def add_or_update_message_log(self, msg: ETMMsg, master_message: Message, diff --git a/efb_telegram_master/master_message.py b/efb_telegram_master/master_message.py index 726f7bc0..c2e22630 100644 --- a/efb_telegram_master/master_message.py +++ b/efb_telegram_master/master_message.py @@ -158,9 +158,38 @@ def msg(self, update: Update, context: CallbackContext): if destination is None: destination = self.get_singly_linked_chat_id_str(update.effective_chat) if destination: - # if the chat is singly-linked quote = message.reply_to_message is not None self.logger.debug("[%s] Chat %s is singly-linked to %s", mid, message.chat, destination) + if message.chat.is_forum: + ideal_thread_id = self.db.get_topic_thread_id(slave_uid=destination, topic_chat_id=update.effective_chat.id) + if ideal_thread_id and ideal_thread_id != message.message_thread_id: + self.logger.debug("[%s] Chat %s is singly-linked to %s, but the thread ID is not matching.", mid, message.chat, destination) + destination = None + quote = False + return + + if destination is None: + if message.chat.is_forum: + topic_destinations = self.db.get_topic_slaves(topic_chat_id=message.chat.id) + thread_id = message.message_thread_id + if thread_id: + for (dest, topic_id) in topic_destinations: + if topic_id == thread_id: + self.logger.debug("[%s] Chat %s is singly-linked to %s in topic %s", mid, message.chat, dest, topic_id) + destination = dest + quote = message.reply_to_message.message_id != message.reply_to_message.message_thread_id + if not quote: + message.reply_to_message = None + break + if destination is None: + self.logger.debug("[%s] Ignored message as it's a topic which wasn't created by this bot", mid) + return + else: + self.logger.debug("[%s] Chat %s is a forum, but no thread ID is found.", mid, message.chat) + destinations = self.db.get_chat_assoc(master_uid=utils.chat_id_to_str(self.channel_id, ChatID(str(message.chat.id)))) + if len(destinations) == len(topic_destinations): + self.logger.debug("[%s] Chat %s is a forum, and all destinations are in topics. The new message is not in any topic, so ignore it.", mid, message.chat) + return if destination is None: # not singly linked quote = False diff --git a/efb_telegram_master/slave_message.py b/efb_telegram_master/slave_message.py index 8c4e9eeb..f8d45004 100644 --- a/efb_telegram_master/slave_message.py +++ b/efb_telegram_master/slave_message.py @@ -5,8 +5,10 @@ import logging import os import tempfile +import threading import traceback import urllib.parse +from collections import defaultdict from pathlib import Path from typing import Tuple, Optional, TYPE_CHECKING, List, IO, Union @@ -34,7 +36,7 @@ from .locale_mixin import LocaleMixin from .message import ETMMsg from .msg_type import get_msg_type -from .utils import TelegramChatID, TelegramMessageID, OldMsgID +from .utils import TelegramChatID, TelegramTopicID, TelegramMessageID, OldMsgID if TYPE_CHECKING: from . import TelegramChannel @@ -90,7 +92,7 @@ def send_message(self, msg: Message) -> Message: xid = msg.uid self.logger.debug("[%s] Slave message delivered to ETM.\n%s", xid, msg) - msg_template, tg_dest = self.get_slave_msg_dest(msg) + msg_template, (tg_dest, thread_id) = self.get_slave_msg_dest(msg) silent = self.is_silent(msg) if silent is None: @@ -117,14 +119,16 @@ def send_message(self, msg: Message) -> Message: 'but it does not exist in database. Sending new message instead.', msg.uid) - self.dispatch_message(msg, msg_template, old_msg_id, tg_dest, silent) + self.dispatch_message(msg, msg_template, old_msg_id, tg_dest, thread_id, silent) except Exception as e: self.logger.error("Error occurred while processing message from slave channel.\nMessage: %s\n%s\n%s", repr(msg), repr(e), traceback.format_exc()) return msg def dispatch_message(self, msg: Message, msg_template: str, - old_msg_id: Optional[OldMsgID], tg_dest: TelegramChatID, + old_msg_id: Optional[OldMsgID], + tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], silent: bool = False): """Dispatch with header, destination and Telegram message ID and destinations.""" @@ -143,6 +147,8 @@ def dispatch_message(self, msg: Message, msg_template: str, else: self.logger.debug("[%s] Target message has database entry: %s.", msg.uid, log) target_msg = utils.message_id_str_to_id(log.master_msg_id) + # Assuming target_msg = (chat_id, message_id). Thread ID might need separate handling/DB storage. + # We only check if the reply target is in the same main chat. Replying across topics is allowed by Telegram. if not target_msg or target_msg[0] != int(tg_dest): self.logger.error('[%s] Trying to reply to a message not from this chat. ' 'Message destination: %s. Target message: %s.', @@ -168,46 +174,47 @@ def dispatch_message(self, msg: Message, msg_template: str, # Type dispatching if msg.type == MsgType.Text: - tg_msg = self.slave_message_text(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_text(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) elif msg.type == MsgType.Link: - tg_msg = self.slave_message_link(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_link(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) elif msg.type == MsgType.Sticker: - tg_msg = self.slave_message_sticker(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_sticker(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) elif msg.type == MsgType.Image: if self.flag("send_image_as_file"): - tg_msg = self.slave_message_file(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_file(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) else: - tg_msg = self.slave_message_image(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_image(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) elif msg.type == MsgType.Animation: - tg_msg = self.slave_message_animation(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_animation(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) elif msg.type == MsgType.File: - tg_msg = self.slave_message_file(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_file(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) elif msg.type == MsgType.Voice: - tg_msg = self.slave_message_voice(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_voice(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) elif msg.type == MsgType.Location: - tg_msg = self.slave_message_location(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_location(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) elif msg.type == MsgType.Video: - tg_msg = self.slave_message_video(msg, tg_dest, msg_template, reactions, old_msg_id, target_msg_id, + tg_msg = self.slave_message_video(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) elif msg.type == MsgType.Status: # Status messages are not to be recorded in databases - return self.slave_message_status(msg, tg_dest) + return self.slave_message_status(msg, tg_dest, thread_id) elif msg.type == MsgType.Unsupported: - tg_msg = self.slave_message_unsupported(msg, tg_dest, msg_template, reactions, old_msg_id, + tg_msg = self.slave_message_unsupported(msg, tg_dest, thread_id, msg_template, reactions, old_msg_id, target_msg_id, reply_markup, silent) else: - self.bot.send_chat_action(tg_dest, ChatAction.TYPING) + self.bot.send_chat_action(tg_dest, ChatAction.TYPING, message_thread_id=thread_id) tg_msg = self.bot.send_message(tg_dest, prefix=msg_template, suffix=reactions, disable_notification=silent, + message_thread_id=thread_id, text=self._('Unknown type of message "{0}". (UT01)') .format(msg.type.name)) @@ -225,20 +232,23 @@ def dispatch_message(self, msg: Message, msg_template: str, self.db.add_or_update_message_log(etm_msg, tg_msg, old_msg_id) # self.logger.debug("[%s] Message inserted/updated to the database.", xid) - def get_slave_msg_dest(self, msg: Message) -> Tuple[str, Optional[TelegramChatID]]: + def get_slave_msg_dest(self, msg: Message) -> Tuple[str, Tuple[Optional[TelegramChatID], Optional[TelegramTopicID]]]: """Get the Telegram destination of a message with its header. Returns: msg_template (str): header of the message. - tg_dest (Optional[str]): Telegram destination chat, None if muted. + (Optional[TelegramChatID], Optional[TelegramTopicID]): Telegram destination chat ID and thread ID, None if muted. """ xid = msg.uid - msg.chat = self.chat_manager.update_chat_obj(msg.chat) + chat = self.chat_manager.update_chat_obj(msg.chat) + msg.chat = chat msg.author = self.chat_manager.get_or_enrol_member(msg.chat, msg.author) chat_uid = utils.chat_id_to_str(chat=msg.chat) tg_chats = self.db.get_chat_assoc(slave_uid=chat_uid) tg_chat = None + tg_dest: Optional[TelegramChatID] = None + thread_id: Optional[TelegramTopicID] = None if tg_chats: tg_chat = tg_chats[0] @@ -254,11 +264,20 @@ def get_slave_msg_dest(self, msg: Message) -> Tuple[str, Optional[TelegramChatID # Generate chat text template & Decide type target tg_dest = TelegramChatID(self.channel.config['admins'][0]) - - if tg_chat: # if this chat is linked + + if tg_chat: tg_dest = TelegramChatID(int(utils.chat_id_str_to_id(tg_chat)[1])) - else: + if self.channel.topic_group: + if not isinstance(chat, SystemChat): + tg_dest = TelegramChatID(int(utils.chat_id_str_to_id(tg_chat)[1]) if tg_chat else self.channel.topic_group) + master_chat_info = self.bot.get_chat_info(tg_dest) + if master_chat_info.is_forum: + thread_id = self.channel.chat_binding.create_topic(slave_uid=chat_uid, telegram_chat_id=tg_dest) + + if not tg_chat: singly_linked = False + if thread_id: + singly_linked = True msg_template = self.generate_message_template(msg, singly_linked) self.logger.debug("[%s] Message is sent to Telegram chat %s, with header \"%s\".", @@ -267,7 +286,8 @@ def get_slave_msg_dest(self, msg: Message) -> Tuple[str, Optional[TelegramChatID if self.chat_dest_cache.get(str(tg_dest)) != chat_uid: self.chat_dest_cache.remove(str(tg_dest)) - return msg_template, tg_dest + return msg_template, (tg_dest, thread_id) + def html_substitutions(self, msg: Message) -> str: """Build a Telegram-flavored HTML string for message text substitutions.""" @@ -294,7 +314,8 @@ def html_substitutions(self, msg: Message) -> str: return html.escape(text) return text - def slave_message_text(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_text(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[ReplyMarkup] = None, @@ -303,20 +324,20 @@ def slave_message_text(self, msg: Message, tg_dest: TelegramChatID, msg_template Send message as text to Telegram. Args: - msg: Message - tg_dest: Telegram Chat ID + msg (Message): Message + tg_dest (TelegramChatID): Telegram Chat ID + thread_id (Optional[TelegramTopicID]): Telegram Thread ID msg_template: Header of the message reactions: Footer of the message old_msg_id: Telegram message ID to edit target_msg_id: Telegram message ID to reply to reply_markup: Reply markup to be added to the message silent: Silent notification of the message when sending - Returns: The telegram bot message object sent """ self.logger.debug("[%s] Sending as a text message.", msg.uid) - self.bot.send_chat_action(tg_dest, ChatAction.TYPING) + self.bot.send_chat_action(tg_dest, ChatAction.TYPING, message_thread_id=thread_id) text = self.html_substitutions(msg) @@ -325,6 +346,7 @@ def slave_message_text(self, msg: Message, tg_dest: TelegramChatID, msg_template text=text, prefix=msg_template, suffix=reactions, parse_mode='HTML', reply_to_message_id=target_msg_id, + message_thread_id=thread_id, reply_markup=reply_markup, disable_notification=silent) else: @@ -338,12 +360,13 @@ def slave_message_text(self, msg: Message, tg_dest: TelegramChatID, msg_template self.logger.debug("[%s] Processed and sent as text message", msg.uid) return tg_msg - def slave_message_link(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_link(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[ReplyMarkup] = None, silent: bool = False) -> telegram.Message: - self.bot.send_chat_action(tg_dest, ChatAction.TYPING) + self.bot.send_chat_action(tg_dest, ChatAction.TYPING, message_thread_id=thread_id) assert isinstance(msg.attributes, LinkAttribute) attributes: LinkAttribute = msg.attributes @@ -368,6 +391,7 @@ def slave_message_link(self, msg: Message, tg_dest: TelegramChatID, msg_template prefix=msg_template, suffix=reactions, parse_mode="HTML", reply_to_message_id=target_msg_id, + message_thread_id=thread_id, reply_markup=reply_markup, disable_notification=silent) @@ -381,13 +405,14 @@ def slave_message_link(self, msg: Message, tg_dest: TelegramChatID, msg_template IMG_SIZE_MAX_RATIO = 10 """Threshold of aspect ratio (longer side to shorter side) to send as file, used alone.""" - def slave_message_image(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_image(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[ReplyMarkup] = None, silent: bool = False) -> telegram.Message: assert msg.file - self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_PHOTO) + self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_PHOTO, message_thread_id=thread_id) self.logger.debug("[%s] Message is of %s type; Path: %s; MIME: %s", msg.uid, msg.type, msg.path, msg.mime) if msg.path: self.logger.debug("[%s] Size of %s is %s.", msg.uid, msg.path, os.stat(msg.path).st_size) @@ -442,7 +467,8 @@ def slave_message_image(self, msg: Message, tg_dest: TelegramChatID, msg_templat edit_media = False self.bot.send_message(chat_id=old_msg_id[0], reply_to_message_id=old_msg_id[1], text=file_too_large) else: - message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, text=text, + message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, text=text, parse_mode="HTML", reply_markup=reply_markup, disable_notification=silent, prefix=msg_template, suffix=reactions) message.reply_text(file_too_large) @@ -458,22 +484,28 @@ def slave_message_image(self, msg: Message, tg_dest: TelegramChatID, msg_templat media = InputMediaDocument(file) else: media = InputMediaPhoto(file) - self.bot.edit_message_media(chat_id=old_msg_id[0], message_id=old_msg_id[1], media=media) + self.bot.edit_message_media(chat_id=old_msg_id[0], message_id=old_msg_id[1], media=media, + reply_markup=reply_markup) return self.bot.edit_message_caption(chat_id=old_msg_id[0], message_id=old_msg_id[1], reply_markup=reply_markup, prefix=msg_template, suffix=reactions, caption=text, parse_mode="HTML") - except telegram.error.BadRequest: - # Send as an reply if cannot edit previous message. - if old_msg_id[0] == str(target_msg_id): - target_msg_id = target_msg_id or old_msg_id[1] + except telegram.error.BadRequest as e: + self.logger.warning("[%s] Failed to edit media/caption (BadRequest: %s). Sending new message instead.", msg.uid, e) + # Send as a reply if cannot edit previous message. + # Check if the target is within the same chat_id (thread_id doesn't matter for this check) + if old_msg_id[0] == str(tg_dest): + target_msg_id = target_msg_id or old_msg_id[1] # Reply to the original message msg.file.seek(0) + # Fall through to send a new message + # Sending new message (either initially or as fallback from edit) if send_as_file: assert msg.path file = self.process_file_obj(msg.file, msg.path) return self.bot.send_document(tg_dest, file, prefix=msg_template, suffix=reactions, caption=text, parse_mode="HTML", filename=msg.filename, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, reply_markup=reply_markup, disable_notification=silent) else: @@ -483,28 +515,32 @@ def slave_message_image(self, msg: Message, tg_dest: TelegramChatID, msg_templat return self.bot.send_photo(tg_dest, file, prefix=msg_template, suffix=reactions, caption=text, parse_mode="HTML", reply_to_message_id=target_msg_id, + message_thread_id=thread_id, reply_markup=reply_markup, disable_notification=silent) except telegram.error.BadRequest as e: self.logger.error('[%s] Failed to send it as image, sending as document. Reason: %s', msg.uid, e) assert msg.path + msg.file.seek(0) # Rewind file pointer file = self.process_file_obj(msg.file, msg.path) return self.bot.send_document(tg_dest, file, prefix=msg_template, suffix=reactions, caption=text, parse_mode="HTML", filename=msg.filename, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, reply_markup=reply_markup, disable_notification=silent) finally: if msg.file: msg.file.close() - def slave_message_animation(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_animation(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[ReplyMarkup] = None, silent: bool = None) -> telegram.Message: - self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_PHOTO) + self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_PHOTO, message_thread_id=thread_id) # UPLOAD_VIDEO_NOTE might be better? self.logger.debug("[%s] Message is an Animation; Path: %s; MIME: %s", msg.uid, msg.path, msg.mime) if msg.path: @@ -524,7 +560,8 @@ def slave_message_animation(self, msg: Message, tg_dest: TelegramChatID, msg_tem edit_media = False self.bot.send_message(chat_id=old_msg_id[0], reply_to_message_id=old_msg_id[1], text=file_too_large) else: - message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, text=text, + message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, text=text, parse_mode="HTML", reply_markup=reply_markup, disable_notification=silent, prefix=msg_template, suffix=reactions) @@ -535,7 +572,8 @@ def slave_message_animation(self, msg: Message, tg_dest: TelegramChatID, msg_tem if edit_media: assert msg.file and msg.path file = self.process_file_obj(msg.file, msg.path) - self.bot.edit_message_media(chat_id=old_msg_id[0], message_id=old_msg_id[1], media=InputMediaAnimation(file)) + self.bot.edit_message_media(chat_id=old_msg_id[0], message_id=old_msg_id[1], media=InputMediaAnimation(file), + reply_markup=reply_markup) return self.bot.edit_message_caption(chat_id=old_msg_id[0], message_id=old_msg_id[1], prefix=msg_template, suffix=reactions, reply_markup=reply_markup, @@ -548,19 +586,21 @@ def slave_message_animation(self, msg: Message, tg_dest: TelegramChatID, msg_tem prefix=msg_template, suffix=reactions, caption=text, parse_mode="HTML", reply_to_message_id=target_msg_id, + message_thread_id=thread_id, reply_markup=reply_markup, disable_notification=silent) finally: if msg.file is not None: msg.file.close() - def slave_message_sticker(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_sticker(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[InlineKeyboardMarkup] = None, silent: bool = False) -> telegram.Message: - self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_PHOTO) + self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_PHOTO, message_thread_id=thread_id) sticker_reply_markup = self.build_chat_info_inline_keyboard(msg, msg_template, reactions, reply_markup) @@ -569,11 +609,17 @@ def slave_message_sticker(self, msg: Message, tg_dest: TelegramChatID, msg_templ self.logger.debug("[%s] Size of %s is %s.", msg.uid, msg.path, os.stat(msg.path).st_size) try: + # If only media changed (e.g., replaced sticker), send new one replying to old. + # Telegram doesn't support editing sticker media directly. if msg.edit_media and old_msg_id is not None: - target_msg_id = old_msg_id[1] - old_msg_id = None + if old_msg_id[0] == str(tg_dest): + target_msg_id = old_msg_id[1] # Set reply target to the message being "edited" + old_msg_id = None # Force sending a new message + + # If not editing media, but have old_msg_id, try editing reply_markup (e.g., for reactions) if old_msg_id and not msg.edit_media: try: + # Editing reply markup doesn't involve thread_id return self.bot.edit_message_reply_markup(chat_id=old_msg_id[0], message_id=old_msg_id[1], reply_markup=sticker_reply_markup) except TelegramError: @@ -582,6 +628,7 @@ def slave_message_sticker(self, msg: Message, tg_dest: TelegramChatID, msg_templ reply_markup=reply_markup, disable_notification=silent) + # Sending a new sticker (initial send or edit_media fallback) else: webp_img = None @@ -591,7 +638,9 @@ def slave_message_sticker(self, msg: Message, tg_dest: TelegramChatID, msg_templ self.bot.send_message(chat_id=old_msg_id[0], reply_to_message_id=old_msg_id[1], text=file_too_large) else: + # Send placeholder text first message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, text=self.html_substitutions(msg), parse_mode="HTML", reply_markup=reply_markup, disable_notification=silent, @@ -606,12 +655,15 @@ def slave_message_sticker(self, msg: Message, tg_dest: TelegramChatID, msg_templ webp_img.seek(0) file = self.process_file_obj(webp_img, webp_img.name) return self.bot.send_sticker(tg_dest, file, reply_markup=sticker_reply_markup, + message_thread_id=thread_id, reply_to_message_id=target_msg_id, disable_notification=silent) except IOError: + self.logger.warning("[%s] Failed to convert image to webp sticker, sending as document.", msg.uid) assert msg.file and msg.path file = self.process_file_obj(msg.file, msg.path) return self.bot.send_document(tg_dest, file, prefix=msg_template, suffix=reactions, + message_thread_id=thread_id, caption=msg.text, filename=msg.filename, reply_to_message_id=target_msg_id, reply_markup=reply_markup, @@ -638,16 +690,18 @@ def build_chat_info_inline_keyboard(msg: Message, msg_template: str, reactions: description.append([InlineKeyboardButton(msg.text, callback_data="void")]) if reactions: description.append([InlineKeyboardButton(reactions, callback_data="void")]) - sticker_reply_markup = reply_markup or InlineKeyboardMarkup([]) - sticker_reply_markup.inline_keyboard = description + sticker_reply_markup.inline_keyboard - return sticker_reply_markup + effective_reply_markup = reply_markup if isinstance(reply_markup, InlineKeyboardMarkup) else InlineKeyboardMarkup([]) + effective_reply_markup.inline_keyboard = description + effective_reply_markup.inline_keyboard + return effective_reply_markup + - def slave_message_file(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_file(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[ReplyMarkup] = None, silent: bool = False) -> telegram.Message: - self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_DOCUMENT) + self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_DOCUMENT, message_thread_id=thread_id) if msg.filename is None and msg.path is not None: file_name = os.path.basename(msg.path) @@ -682,7 +736,8 @@ def slave_message_file(self, msg: Message, tg_dest: TelegramChatID, msg_template edit_media = False self.bot.send_message(chat_id=old_msg_id[0], reply_to_message_id=old_msg_id[1], text=file_too_large) else: - message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, text=text, + message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, text=text, parse_mode="HTML", reply_markup=reply_markup, disable_notification=silent, prefix=msg_template, suffix=reactions) @@ -704,18 +759,20 @@ def slave_message_file(self, msg: Message, tg_dest: TelegramChatID, msg_template prefix=msg_template, suffix=reactions, caption=text, parse_mode="HTML", filename=file_name, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, reply_markup=reply_markup, disable_notification=silent) finally: if msg.file is not None: msg.file.close() - def slave_message_voice(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_voice(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[ReplyMarkup] = None, silent: bool = False) -> telegram.Message: - self.bot.send_chat_action(tg_dest, ChatAction.RECORD_AUDIO) + self.bot.send_chat_action(tg_dest, ChatAction.RECORD_AUDIO, message_thread_id=thread_id) if msg.text: text = self.html_substitutions(msg) else: @@ -730,7 +787,8 @@ def slave_message_voice(self, msg: Message, tg_dest: TelegramChatID, msg_templat edit_media = False self.bot.send_message(chat_id=old_msg_id[0], reply_to_message_id=old_msg_id[1], text=file_too_large) else: - message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, text=text, + message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, text=text, parse_mode="HTML", reply_markup=reply_markup, disable_notification=silent, prefix=msg_template, suffix=reactions) @@ -743,6 +801,7 @@ def slave_message_voice(self, msg: Message, tg_dest: TelegramChatID, msg_templat msg_template += " " + self._("[Edited]") if str(tg_dest) == old_msg_id[0]: target_msg_id = target_msg_id or old_msg_id[1] + old_msg_id = None # Force sending new message below else: return self.bot.edit_message_caption(chat_id=old_msg_id[0], message_id=old_msg_id[1], reply_markup=reply_markup, prefix=msg_template, @@ -755,19 +814,22 @@ def slave_message_voice(self, msg: Message, tg_dest: TelegramChatID, msg_templat tg_msg = self.bot.send_voice(tg_dest, file, prefix=msg_template, suffix=reactions, caption=text, parse_mode="HTML", reply_to_message_id=target_msg_id, reply_markup=reply_markup, + message_thread_id=thread_id, disable_notification=silent) return tg_msg finally: if msg.file is not None: msg.file.close() - def slave_message_location(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_location(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[InlineKeyboardMarkup] = None, silent: bool = False) -> telegram.Message: - # TODO: Move msg_template to caption during MTProto migration (if we ever had a chance to do that). - self.bot.send_chat_action(tg_dest, ChatAction.FIND_LOCATION) + # Location messages cannot be edited in content by bots. + # If an edit request comes, send a new message replying to the old one. + self.bot.send_chat_action(tg_dest, ChatAction.FIND_LOCATION, message_thread_id=thread_id) assert (isinstance(msg.attributes, LocationAttribute)) attributes: LocationAttribute = msg.attributes self.logger.info("[%s] Sending as a Telegram venue.\nlat: %s, long: %s\ntitle: %s\naddress: %s", @@ -787,15 +849,17 @@ def slave_message_location(self, msg: Message, tg_dest: TelegramChatID, msg_temp # TODO: Use live location if possible? Lift live location messages to EFB Framework? return self.bot.send_location(tg_dest, latitude=attributes.latitude, longitude=attributes.longitude, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, reply_markup=location_reply_markup, disable_notification=silent) - def slave_message_video(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_video(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[ReplyMarkup] = None, silent: bool = False) -> telegram.Message: - self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_VIDEO) + self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_VIDEO, message_thread_id=thread_id) if msg.text: text = self.html_substitutions(msg) elif msg_template: @@ -803,7 +867,7 @@ def slave_message_video(self, msg: Message, tg_dest: TelegramChatID, msg_templat if placeholder_flag == "emoji": text = "🎥" elif placeholder_flag == "text": - text = self._("Sent a file.") + text = self._("Sent a video.") else: text = "" else: @@ -817,7 +881,8 @@ def slave_message_video(self, msg: Message, tg_dest: TelegramChatID, msg_templat edit_media = False self.bot.send_message(chat_id=old_msg_id[0], reply_to_message_id=old_msg_id[1], text=file_too_large) else: - message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, text=text, + message = self.bot.send_message(chat_id=tg_dest, reply_to_message_id=target_msg_id, + message_thread_id=thread_id, text=text, parse_mode="HTML", reply_markup=reply_markup, disable_notification=silent, prefix=msg_template, suffix=reactions) @@ -828,7 +893,8 @@ def slave_message_video(self, msg: Message, tg_dest: TelegramChatID, msg_templat if edit_media: assert msg.file is not None and msg.path is not None file = self.process_file_obj(msg.file, msg.path) - self.bot.edit_message_media(chat_id=old_msg_id[0], message_id=old_msg_id[1], media=InputMediaVideo(file)) + self.bot.edit_message_media(chat_id=old_msg_id[0], message_id=old_msg_id[1], media=InputMediaVideo(file), + reply_markup=reply_markup) return self.bot.edit_message_caption(chat_id=old_msg_id[0], message_id=old_msg_id[1], reply_markup=reply_markup, prefix=msg_template, suffix=reactions, caption=text, parse_mode="HTML") assert msg.file is not None and msg.path is not None @@ -836,20 +902,22 @@ def slave_message_video(self, msg: Message, tg_dest: TelegramChatID, msg_templat return self.bot.send_video(tg_dest, file, prefix=msg_template, suffix=reactions, caption=text, parse_mode="HTML", reply_to_message_id=target_msg_id, + message_thread_id=thread_id, reply_markup=reply_markup, disable_notification=silent) finally: if msg.file is not None: msg.file.close() - def slave_message_unsupported(self, msg: Message, tg_dest: TelegramChatID, msg_template: str, reactions: str, + def slave_message_unsupported(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID], msg_template: str, reactions: str, old_msg_id: OldMsgID = None, target_msg_id: Optional[TelegramMessageID] = None, reply_markup: Optional[ReplyMarkup] = None, silent: bool = False) -> telegram.Message: self.logger.debug("[%s] Sending as an unsupported message.", msg.uid) - self.bot.send_chat_action(tg_dest, ChatAction.TYPING) - + # Note: send_chat_action for unsupported might need adjustment if PTB changes behavior + self.bot.send_chat_action(tg_dest, ChatAction.TYPING, message_thread_id=thread_id) if msg.text: text = self.html_substitutions(msg) else: @@ -860,33 +928,34 @@ def slave_message_unsupported(self, msg: Message, tg_dest: TelegramChatID, msg_t text=text, parse_mode="HTML", prefix=msg_template + " " + self._("(unsupported)"), suffix=reactions, - reply_to_message_id=target_msg_id, reply_markup=reply_markup, + reply_to_message_id=target_msg_id, message_thread_id=thread_id, reply_markup=reply_markup, disable_notification=silent) else: - # Cannot change reply_to_message_id when editing a message + # Cannot change reply_to_message_id or thread_id when editing a message tg_msg = self.bot.edit_message_text(chat_id=old_msg_id[0], message_id=old_msg_id[1], text=text, parse_mode="HTML", - prefix=msg_template + " " + self._("(unsupported)"), + prefix=msg_template + " " + self._("(unsupported) [Edited]"), # Mark as edited suffix=reactions, reply_markup=reply_markup) self.logger.debug("[%s] Processed and sent as text message", msg.uid) return tg_msg - def slave_message_status(self, msg: Message, tg_dest: TelegramChatID): + def slave_message_status(self, msg: Message, tg_dest: TelegramChatID, + thread_id: Optional[TelegramTopicID]): attributes = msg.attributes assert isinstance(attributes, StatusAttribute) if attributes.status_type is StatusAttribute.Types.TYPING: - self.bot.send_chat_action(tg_dest, ChatAction.TYPING) + self.bot.send_chat_action(tg_dest, ChatAction.TYPING, message_thread_id=thread_id) elif attributes.status_type is StatusAttribute.Types.UPLOADING_VOICE: - self.bot.send_chat_action(tg_dest, ChatAction.RECORD_AUDIO) + self.bot.send_chat_action(tg_dest, ChatAction.RECORD_AUDIO, message_thread_id=thread_id) elif attributes.status_type is StatusAttribute.Types.UPLOADING_IMAGE: - self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_PHOTO) + self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_PHOTO, message_thread_id=thread_id) elif attributes.status_type is StatusAttribute.Types.UPLOADING_VIDEO: - self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_VIDEO) + self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_VIDEO, message_thread_id=thread_id) elif attributes.status_type is StatusAttribute.Types.UPLOADING_FILE: - self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_DOCUMENT) + self.bot.send_chat_action(tg_dest, ChatAction.UPLOAD_DOCUMENT, message_thread_id=thread_id) def send_status(self, status: Status): if isinstance(status, ChatUpdates): @@ -919,11 +988,13 @@ def send_status(self, status: Status): if not self.channel.flag('prevent_message_removal'): self.bot.delete_message(*old_msg_id) return - except TelegramError: + except TelegramError as e: + self.logger.warning("Failed to delete message %s.%s: %s. Sending notification instead.", *old_msg_id, e) pass self.bot.send_message(chat_id=old_msg_id[0], text=self._("Message is removed in remote chat."), - reply_to_message_id=old_msg_id[1]) + reply_to_message_id=old_msg_id[1], + disable_notification=True) # Probably silent notification else: self.logger.info('Was supposed to delete a message, ' 'but it does not exist in database: %s', status) @@ -953,7 +1024,8 @@ def update_reactions(self, status: MessageReactionsUpdate): old_msg: ETMMsg = old_msg_db.build_etm_msg(chat_manager=self.chat_manager) old_msg.reactions = status.reactions - old_msg.edit = True + old_msg.edit = True # Mark as edit so dispatch knows it's an update + old_msg.edit_media = False # Ensure media is not considered edited msg_template, _ = self.get_slave_msg_dest(old_msg) effective_msg = old_msg_db.master_msg_id_alt or old_msg_db.master_msg_id diff --git a/efb_telegram_master/utils.py b/efb_telegram_master/utils.py index f6b9cc77..3438c08b 100644 --- a/efb_telegram_master/utils.py +++ b/efb_telegram_master/utils.py @@ -27,6 +27,7 @@ TelegramChatID = NewType('TelegramChatID', int) +TelegramTopicID = NewType('TelegramTopicID', int) TelegramMessageID = NewType('TelegramMessageID', int) TgChatMsgIDStr = NewType('TgChatMsgIDStr', str) EFBChannelChatIDStr = NewType('EFBChannelChatIDStr', str) @@ -55,6 +56,7 @@ class ExperimentalFlagsManager(LocaleMixin): "api_base_url": None, "api_base_file_url": None, "local_tdlib_api": False, + "topic_group": None, } def __init__(self, channel: 'TelegramChannel'): diff --git a/setup.py b/setup.py index 05a0f81f..30cd61ec 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ tests_require=tests_require, install_requires=[ "ehforwarderbot>=2.0.0", - "python-telegram-bot~=13.11", + "python-telegram-bot~=13.15", "python-magic", "ffmpeg-python", "peewee",