diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index d945db671..1752edd56 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -844,6 +844,9 @@ def _submit_web_logs( f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}" ) if self.rabbitmq_config is None: + logger.info( + "[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish." + ) return if isinstance(messages, ScheduleLogForWebItem): @@ -859,9 +862,11 @@ def _submit_web_logs( message_info = message.debug_info() logger.debug(f"Submitted Scheduling log for web: {message_info}") - if self.is_rabbitmq_connected(): - logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}") - self.rabbitmq_publish_message(message=message.to_dict()) + # Always call publish; the publisher now caches when offline and flushes after reconnect + logger.info( + f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}" + ) + self.rabbitmq_publish_message(message=message.to_dict()) logger.debug( f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}" ) diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index a711e4bc4..9c85a4872 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -5,6 +5,7 @@ import time from pathlib import Path +from queue import Empty from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig from memos.context.context import ContextThread @@ -44,6 +45,11 @@ def __init__(self): self.rabbitmq_message_cache = AutoDroppingQueue( maxsize=self.rabbitmq_message_cache_max_size ) + # Pending outgoing messages to avoid loss when connection is not ready + self.rabbitmq_publish_cache_max_size = 50 + self.rabbitmq_publish_cache = AutoDroppingQueue( + maxsize=self.rabbitmq_publish_cache_max_size + ) self.rabbitmq_connection_attempts = 3 # Max retry attempts on connection failure self.rabbitmq_retry_delay = 5 # Delay (seconds) between retries self.rabbitmq_heartbeat = 60 # Heartbeat interval (seconds) for connectio @@ -53,7 +59,9 @@ def __init__(self): # Thread management self._rabbitmq_io_loop_thread = None # For IOLoop execution self._rabbitmq_stop_flag = False # Graceful shutdown flag - self._rabbitmq_lock = threading.Lock() # Ensure thread safety + # Use RLock because publishing may trigger initialization, which also grabs the lock. + self._rabbitmq_lock = threading.RLock() + self._rabbitmq_initializing = False # Avoid duplicate concurrent initializations def is_rabbitmq_connected(self) -> bool: """Check if RabbitMQ connection is alive""" @@ -70,11 +78,22 @@ def initialize_rabbitmq( """ Establish connection to RabbitMQ using pika. """ + with self._rabbitmq_lock: + if self._rabbitmq_initializing: + logger.info( + "[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call." + ) + return + self._rabbitmq_initializing = True try: # Skip remote initialization in CI/pytest unless explicitly enabled enable_env = os.getenv("MEMOS_ENABLE_RABBITMQ", "").lower() == "true" in_ci = os.getenv("CI", "").lower() == "true" in_pytest = os.getenv("PYTEST_CURRENT_TEST") is not None + logger.info( + f"[DIAGNOSTIC] initialize_rabbitmq called. in_ci={in_ci}, in_pytest={in_pytest}, " + f"MEMOS_ENABLE_RABBITMQ={enable_env}, config_path={config_path}" + ) if (in_ci or in_pytest) and not enable_env: logger.info( "Skipping RabbitMQ initialization in CI/test environment. Set MEMOS_ENABLE_RABBITMQ=true to enable." @@ -131,6 +150,9 @@ def initialize_rabbitmq( logger.info("RabbitMQ connection process started") except Exception: logger.error("Fail to initialize auth_config", exc_info=True) + finally: + with self._rabbitmq_lock: + self._rabbitmq_initializing = False def get_rabbitmq_queue_size(self) -> int: """Get the current number of messages in the queue. @@ -197,7 +219,7 @@ def get_rabbitmq_connection_param(self): # Connection lifecycle callbacks def on_rabbitmq_connection_open(self, connection): """Called when connection is established.""" - logger.debug("Connection opened") + logger.info("[DIAGNOSTIC] RabbitMQ connection opened") connection.channel(on_open_callback=self.on_rabbitmq_channel_open) def on_rabbitmq_connection_error(self, connection, error): @@ -215,7 +237,7 @@ def on_rabbitmq_connection_closed(self, connection, reason): def on_rabbitmq_channel_open(self, channel): """Called when channel is ready.""" self.rabbitmq_channel = channel - logger.debug("Channel opened") + logger.info("[DIAGNOSTIC] RabbitMQ channel opened") # Setup exchange and queue channel.exchange_declare( @@ -243,6 +265,8 @@ def on_rabbitmq_queue_declared(self, frame): def on_rabbitmq_bind_ok(self, frame): """Final setup step when bind is complete.""" logger.info("RabbitMQ setup completed") + # Flush any cached publish messages now that connection is ready + self._flush_cached_publish_messages() def on_rabbitmq_message(self, channel, method, properties, body): """Handle incoming messages. Only for test.""" @@ -311,8 +335,21 @@ def rabbitmq_publish_message(self, message: dict): logger.info(f" - Message Content: {json.dumps(message, indent=2)}") with self._rabbitmq_lock: + logger.info( + f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message invoked. " + f"is_connected={self.is_rabbitmq_connected()}, exchange={exchange_name}, " + f"routing_key='{routing_key}', label={label}" + ) if not self.is_rabbitmq_connected(): - logger.error("Cannot publish - no active connection") + logger.error( + "[DIAGNOSTIC] Cannot publish - no active connection. Caching message for retry. " + f"connection_exists={bool(self.rabbitmq_connection)}, " + f"channel_exists={bool(self.rabbitmq_channel)}, " + f"config_loaded={self.rabbitmq_config is not None}" + ) + self.rabbitmq_publish_cache.put(message) + # Best-effort to connect + self.initialize_rabbitmq(config=self.rabbitmq_config) return False logger.info( @@ -332,6 +369,8 @@ def rabbitmq_publish_message(self, message: dict): return True except Exception as e: logger.error(f"Failed to publish message: {e}") + # Cache message for retry on next connection + self.rabbitmq_publish_cache.put(message) self.rabbit_reconnect() return False @@ -379,3 +418,37 @@ def rabbitmq_close(self): logger.warning("IOLoop thread did not terminate cleanly") logger.info("RabbitMQ connection closed") + + def _flush_cached_publish_messages(self): + """Flush cached outgoing messages once connection is available.""" + if self.rabbitmq_publish_cache.empty(): + return + + if not self.is_rabbitmq_connected(): + logger.info( + "[DIAGNOSTIC] _flush_cached_publish_messages: connection still down; " + f"pending={self.rabbitmq_publish_cache.qsize()}" + ) + return + + drained: list[dict] = [] + while True: + try: + drained.append(self.rabbitmq_publish_cache.get_nowait()) + except Empty: + break + + if not drained: + return + + logger.info( + f"[DIAGNOSTIC] Flushing {len(drained)} cached RabbitMQ messages after reconnect." + ) + for cached_msg in drained: + success = self.rabbitmq_publish_message(cached_msg) + if not success: + # Message already re-cached inside publish; avoid tight loop + logger.error( + "[DIAGNOSTIC] Failed to flush cached message; re-queued for next attempt." + ) + break