diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 1752edd56..d945db671 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -844,9 +844,6 @@ def _submit_web_logs( f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}" ) if self.rabbitmq_config is None: - logger.info( - "[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish." - ) return if isinstance(messages, ScheduleLogForWebItem): @@ -862,11 +859,9 @@ def _submit_web_logs( message_info = message.debug_info() logger.debug(f"Submitted Scheduling log for web: {message_info}") - # Always call publish; the publisher now caches when offline and flushes after reconnect - logger.info( - f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}" - ) - self.rabbitmq_publish_message(message=message.to_dict()) + if self.is_rabbitmq_connected(): + logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}") + self.rabbitmq_publish_message(message=message.to_dict()) logger.debug( f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}" ) diff --git a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py index b58e84798..a711e4bc4 100644 --- a/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py +++ b/src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py @@ -5,7 +5,6 @@ import time from pathlib import Path -from queue import Empty from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig from memos.context.context import ContextThread @@ -45,11 +44,6 @@ def __init__(self): self.rabbitmq_message_cache = AutoDroppingQueue( maxsize=self.rabbitmq_message_cache_max_size ) - # Pending outgoing messages to avoid loss when connection is not ready - self.rabbitmq_publish_cache_max_size = 50 - self.rabbitmq_publish_cache = AutoDroppingQueue( - maxsize=self.rabbitmq_publish_cache_max_size - ) self.rabbitmq_connection_attempts = 3 # Max retry attempts on connection failure self.rabbitmq_retry_delay = 5 # Delay (seconds) between retries self.rabbitmq_heartbeat = 60 # Heartbeat interval (seconds) for connectio @@ -60,7 +54,6 @@ def __init__(self): self._rabbitmq_io_loop_thread = None # For IOLoop execution self._rabbitmq_stop_flag = False # Graceful shutdown flag self._rabbitmq_lock = threading.Lock() # Ensure thread safety - self._rabbitmq_initializing = False # Avoid duplicate concurrent initializations def is_rabbitmq_connected(self) -> bool: """Check if RabbitMQ connection is alive""" @@ -77,22 +70,11 @@ def initialize_rabbitmq( """ Establish connection to RabbitMQ using pika. """ - with self._rabbitmq_lock: - if self._rabbitmq_initializing: - logger.info( - "[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call." - ) - return - self._rabbitmq_initializing = True try: # Skip remote initialization in CI/pytest unless explicitly enabled enable_env = os.getenv("MEMOS_ENABLE_RABBITMQ", "").lower() == "true" in_ci = os.getenv("CI", "").lower() == "true" in_pytest = os.getenv("PYTEST_CURRENT_TEST") is not None - logger.info( - f"[DIAGNOSTIC] initialize_rabbitmq called. in_ci={in_ci}, in_pytest={in_pytest}, " - f"MEMOS_ENABLE_RABBITMQ={enable_env}, config_path={config_path}" - ) if (in_ci or in_pytest) and not enable_env: logger.info( "Skipping RabbitMQ initialization in CI/test environment. Set MEMOS_ENABLE_RABBITMQ=true to enable." @@ -149,9 +131,6 @@ def initialize_rabbitmq( logger.info("RabbitMQ connection process started") except Exception: logger.error("Fail to initialize auth_config", exc_info=True) - finally: - with self._rabbitmq_lock: - self._rabbitmq_initializing = False def get_rabbitmq_queue_size(self) -> int: """Get the current number of messages in the queue. @@ -218,7 +197,7 @@ def get_rabbitmq_connection_param(self): # Connection lifecycle callbacks def on_rabbitmq_connection_open(self, connection): """Called when connection is established.""" - logger.info("[DIAGNOSTIC] RabbitMQ connection opened") + logger.debug("Connection opened") connection.channel(on_open_callback=self.on_rabbitmq_channel_open) def on_rabbitmq_connection_error(self, connection, error): @@ -236,7 +215,7 @@ def on_rabbitmq_connection_closed(self, connection, reason): def on_rabbitmq_channel_open(self, channel): """Called when channel is ready.""" self.rabbitmq_channel = channel - logger.info("[DIAGNOSTIC] RabbitMQ channel opened") + logger.debug("Channel opened") # Setup exchange and queue channel.exchange_declare( @@ -264,8 +243,6 @@ def on_rabbitmq_queue_declared(self, frame): def on_rabbitmq_bind_ok(self, frame): """Final setup step when bind is complete.""" logger.info("RabbitMQ setup completed") - # Flush any cached publish messages now that connection is ready - self._flush_cached_publish_messages() def on_rabbitmq_message(self, channel, method, properties, body): """Handle incoming messages. Only for test.""" @@ -334,21 +311,8 @@ def rabbitmq_publish_message(self, message: dict): logger.info(f" - Message Content: {json.dumps(message, indent=2)}") with self._rabbitmq_lock: - logger.info( - f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message invoked. " - f"is_connected={self.is_rabbitmq_connected()}, exchange={exchange_name}, " - f"routing_key='{routing_key}', label={label}" - ) if not self.is_rabbitmq_connected(): - logger.error( - "[DIAGNOSTIC] Cannot publish - no active connection. Caching message for retry. " - f"connection_exists={bool(self.rabbitmq_connection)}, " - f"channel_exists={bool(self.rabbitmq_channel)}, " - f"config_loaded={self.rabbitmq_config is not None}" - ) - self.rabbitmq_publish_cache.put(message) - # Best-effort to connect - self.initialize_rabbitmq(config=self.rabbitmq_config) + logger.error("Cannot publish - no active connection") return False logger.info( @@ -368,8 +332,6 @@ def rabbitmq_publish_message(self, message: dict): return True except Exception as e: logger.error(f"Failed to publish message: {e}") - # Cache message for retry on next connection - self.rabbitmq_publish_cache.put(message) self.rabbit_reconnect() return False @@ -417,37 +379,3 @@ def rabbitmq_close(self): logger.warning("IOLoop thread did not terminate cleanly") logger.info("RabbitMQ connection closed") - - def _flush_cached_publish_messages(self): - """Flush cached outgoing messages once connection is available.""" - if self.rabbitmq_publish_cache.empty(): - return - - if not self.is_rabbitmq_connected(): - logger.info( - "[DIAGNOSTIC] _flush_cached_publish_messages: connection still down; " - f"pending={self.rabbitmq_publish_cache.qsize()}" - ) - return - - drained: list[dict] = [] - while True: - try: - drained.append(self.rabbitmq_publish_cache.get_nowait()) - except Empty: - break - - if not drained: - return - - logger.info( - f"[DIAGNOSTIC] Flushing {len(drained)} cached RabbitMQ messages after reconnect." - ) - for cached_msg in drained: - success = self.rabbitmq_publish_message(cached_msg) - if not success: - # Message already re-cached inside publish; avoid tight loop - logger.error( - "[DIAGNOSTIC] Failed to flush cached message; re-queued for next attempt." - ) - break