From 53a131c459e695a957c18de6073371144e241813 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Tue, 16 Dec 2025 11:40:40 +0800 Subject: [PATCH 1/3] Handle RabbitMQ publish when offline and avoid duplicate init --- src/memos/mem_scheduler/base_scheduler.py | 9 ++- .../webservice_modules/rabbitmq_service.py | 78 ++++++++++++++++++- 2 files changed, 80 insertions(+), 7 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index d945db671..6ee450384 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -844,6 +844,9 @@ def _submit_web_logs( f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}" ) if self.rabbitmq_config is None: + logger.info( + "[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish." + ) return if isinstance(messages, ScheduleLogForWebItem): @@ -859,9 +862,9 @@ def _submit_web_logs( message_info = message.debug_info() logger.debug(f"Submitted Scheduling log for web: {message_info}") - if self.is_rabbitmq_connected(): - logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}") - self.rabbitmq_publish_message(message=message.to_dict()) + # Always call publish; the publisher now caches when offline and flushes after reconnect + logger.info(f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}") + self.rabbitmq_publish_message(message=message.to_dict()) logger.debug( f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}" ) diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index a711e4bc4..981556ed6 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -4,6 +4,7 @@ import threading import time +from queue import Empty from pathlib import Path from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig @@ -44,6 +45,11 @@ def __init__(self): self.rabbitmq_message_cache = AutoDroppingQueue( maxsize=self.rabbitmq_message_cache_max_size ) + # Pending outgoing messages to avoid loss when connection is not ready + self.rabbitmq_publish_cache_max_size = 50 + self.rabbitmq_publish_cache = AutoDroppingQueue( + maxsize=self.rabbitmq_publish_cache_max_size + ) self.rabbitmq_connection_attempts = 3 # Max retry attempts on connection failure self.rabbitmq_retry_delay = 5 # Delay (seconds) between retries self.rabbitmq_heartbeat = 60 # Heartbeat interval (seconds) for connectio @@ -54,6 +60,7 @@ def __init__(self): self._rabbitmq_io_loop_thread = None # For IOLoop execution self._rabbitmq_stop_flag = False # Graceful shutdown flag self._rabbitmq_lock = threading.Lock() # Ensure thread safety + self._rabbitmq_initializing = False # Avoid duplicate concurrent initializations def is_rabbitmq_connected(self) -> bool: """Check if RabbitMQ connection is alive""" @@ -70,11 +77,20 @@ def initialize_rabbitmq( """ Establish connection to RabbitMQ using pika. """ + with self._rabbitmq_lock: + if self._rabbitmq_initializing: + logger.info("[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call.") + return + self._rabbitmq_initializing = True try: # Skip remote initialization in CI/pytest unless explicitly enabled enable_env = os.getenv("MEMOS_ENABLE_RABBITMQ", "").lower() == "true" in_ci = os.getenv("CI", "").lower() == "true" in_pytest = os.getenv("PYTEST_CURRENT_TEST") is not None + logger.info( + f"[DIAGNOSTIC] initialize_rabbitmq called. in_ci={in_ci}, in_pytest={in_pytest}, " + f"MEMOS_ENABLE_RABBITMQ={enable_env}, config_path={config_path}" + ) if (in_ci or in_pytest) and not enable_env: logger.info( "Skipping RabbitMQ initialization in CI/test environment. Set MEMOS_ENABLE_RABBITMQ=true to enable." @@ -131,6 +147,9 @@ def initialize_rabbitmq( logger.info("RabbitMQ connection process started") except Exception: logger.error("Fail to initialize auth_config", exc_info=True) + finally: + with self._rabbitmq_lock: + self._rabbitmq_initializing = False def get_rabbitmq_queue_size(self) -> int: """Get the current number of messages in the queue. @@ -197,7 +216,7 @@ def get_rabbitmq_connection_param(self): # Connection lifecycle callbacks def on_rabbitmq_connection_open(self, connection): """Called when connection is established.""" - logger.debug("Connection opened") + logger.info("[DIAGNOSTIC] RabbitMQ connection opened") connection.channel(on_open_callback=self.on_rabbitmq_channel_open) def on_rabbitmq_connection_error(self, connection, error): @@ -215,7 +234,7 @@ def on_rabbitmq_connection_closed(self, connection, reason): def on_rabbitmq_channel_open(self, channel): """Called when channel is ready.""" self.rabbitmq_channel = channel - logger.debug("Channel opened") + logger.info("[DIAGNOSTIC] RabbitMQ channel opened") # Setup exchange and queue channel.exchange_declare( @@ -243,6 +262,8 @@ def on_rabbitmq_queue_declared(self, frame): def on_rabbitmq_bind_ok(self, frame): """Final setup step when bind is complete.""" logger.info("RabbitMQ setup completed") + # Flush any cached publish messages now that connection is ready + self._flush_cached_publish_messages() def on_rabbitmq_message(self, channel, method, properties, body): """Handle incoming messages. Only for test.""" @@ -311,11 +332,24 @@ def rabbitmq_publish_message(self, message: dict): logger.info(f" - Message Content: {json.dumps(message, indent=2)}") with self._rabbitmq_lock: + logger.info( + f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message invoked. " + f"is_connected={self.is_rabbitmq_connected()}, exchange={exchange_name}, " + f"routing_key='{routing_key}', label={label}" + ) if not self.is_rabbitmq_connected(): - logger.error("Cannot publish - no active connection") + logger.error( + "[DIAGNOSTIC] Cannot publish - no active connection. Caching message for retry. " + f"connection_exists={bool(self.rabbitmq_connection)}, " + f"channel_exists={bool(self.rabbitmq_channel)}, " + f"config_loaded={self.rabbitmq_config is not None}" + ) + self.rabbitmq_publish_cache.put(message) + # Best-effort to connect + self.initialize_rabbitmq(config=self.rabbitmq_config) return False - logger.info( + logger.warning( f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message: Attempting to publish message. Exchange: {exchange_name}, Routing Key: {routing_key}, Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}" ) try: @@ -332,6 +366,8 @@ def rabbitmq_publish_message(self, message: dict): return True except Exception as e: logger.error(f"Failed to publish message: {e}") + # Cache message for retry on next connection + self.rabbitmq_publish_cache.put(message) self.rabbit_reconnect() return False @@ -379,3 +415,37 @@ def rabbitmq_close(self): logger.warning("IOLoop thread did not terminate cleanly") logger.info("RabbitMQ connection closed") + + def _flush_cached_publish_messages(self): + """Flush cached outgoing messages once connection is available.""" + if self.rabbitmq_publish_cache.empty(): + return + + if not self.is_rabbitmq_connected(): + logger.info( + "[DIAGNOSTIC] _flush_cached_publish_messages: connection still down; " + f"pending={self.rabbitmq_publish_cache.qsize()}" + ) + return + + drained: list[dict] = [] + while True: + try: + drained.append(self.rabbitmq_publish_cache.get_nowait()) + except Empty: + break + + if not drained: + return + + logger.info( + f"[DIAGNOSTIC] Flushing {len(drained)} cached RabbitMQ messages after reconnect." + ) + for cached_msg in drained: + success = self.rabbitmq_publish_message(cached_msg) + if not success: + # Message already re-cached inside publish; avoid tight loop + logger.error( + "[DIAGNOSTIC] Failed to flush cached message; re-queued for next attempt." + ) + break From c31f3e3242b468777a28358b6f9e8ac3ebbdf970 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Tue, 16 Dec 2025 11:43:24 +0800 Subject: [PATCH 2/3] Apply ruff check/format --- src/memos/mem_scheduler/base_scheduler.py | 4 +++- .../mem_scheduler/webservice_modules/rabbitmq_service.py | 8 +++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 6ee450384..1752edd56 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -863,7 +863,9 @@ def _submit_web_logs( logger.debug(f"Submitted Scheduling log for web: {message_info}") # Always call publish; the publisher now caches when offline and flushes after reconnect - logger.info(f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}") + logger.info( + f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}" + ) self.rabbitmq_publish_message(message=message.to_dict()) logger.debug( f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}" diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index 981556ed6..b58e84798 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -4,8 +4,8 @@ import threading import time -from queue import Empty from pathlib import Path +from queue import Empty from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig from memos.context.context import ContextThread @@ -79,7 +79,9 @@ def initialize_rabbitmq( """ with self._rabbitmq_lock: if self._rabbitmq_initializing: - logger.info("[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call.") + logger.info( + "[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call." + ) return self._rabbitmq_initializing = True try: @@ -349,7 +351,7 @@ def rabbitmq_publish_message(self, message: dict): self.initialize_rabbitmq(config=self.rabbitmq_config) return False - logger.warning( + logger.info( f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message: Attempting to publish message. Exchange: {exchange_name}, Routing Key: {routing_key}, Message Content: {json.dumps(message, indent=2, ensure_ascii=False)}" ) try: From 8ceb59ffdd0d08a302d51093f54566355485a160 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 17 Dec 2025 10:56:12 +0800 Subject: [PATCH 3/3] Fix RabbitMQ publish cache deadlock --- src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index b58e84798..9c85a4872 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -59,7 +59,8 @@ def __init__(self): # Thread management self._rabbitmq_io_loop_thread = None # For IOLoop execution self._rabbitmq_stop_flag = False # Graceful shutdown flag - self._rabbitmq_lock = threading.Lock() # Ensure thread safety + # Use RLock because publishing may trigger initialization, which also grabs the lock. + self._rabbitmq_lock = threading.RLock() self._rabbitmq_initializing = False # Avoid duplicate concurrent initializations def is_rabbitmq_connected(self) -> bool: