Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions src/memos/mem_scheduler/base_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,9 +844,6 @@ def _submit_web_logs(
f"[DIAGNOSTIC] base_scheduler._submit_web_logs called. Message to publish: {message.model_dump_json(indent=2)}"
)
if self.rabbitmq_config is None:
logger.info(
"[DIAGNOSTIC] base_scheduler._submit_web_logs: RabbitMQ config not loaded; skipping publish."
)
return

if isinstance(messages, ScheduleLogForWebItem):
Expand All @@ -862,11 +859,9 @@ def _submit_web_logs(
message_info = message.debug_info()
logger.debug(f"Submitted Scheduling log for web: {message_info}")

# Always call publish; the publisher now caches when offline and flushes after reconnect
logger.info(
f"[DIAGNOSTIC] base_scheduler._submit_web_logs: enqueue publish {message_info}"
)
self.rabbitmq_publish_message(message=message.to_dict())
if self.is_rabbitmq_connected():
logger.info(f"Submitted Scheduling log to rabbitmq: {message_info}")
self.rabbitmq_publish_message(message=message.to_dict())
logger.debug(
f"{len(messages)} submitted. {self._web_log_message_queue.qsize()} in queue. additional_log_info: {additional_log_info}"
)
Expand Down
78 changes: 3 additions & 75 deletions src/memos/mem_scheduler/webservice_modules/rabbitmq_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import time

from pathlib import Path
from queue import Empty

from memos.configs.mem_scheduler import AuthConfig, RabbitMQConfig
from memos.context.context import ContextThread
Expand Down Expand Up @@ -45,11 +44,6 @@ def __init__(self):
self.rabbitmq_message_cache = AutoDroppingQueue(
maxsize=self.rabbitmq_message_cache_max_size
)
# Pending outgoing messages to avoid loss when connection is not ready
self.rabbitmq_publish_cache_max_size = 50
self.rabbitmq_publish_cache = AutoDroppingQueue(
maxsize=self.rabbitmq_publish_cache_max_size
)
self.rabbitmq_connection_attempts = 3 # Max retry attempts on connection failure
self.rabbitmq_retry_delay = 5 # Delay (seconds) between retries
self.rabbitmq_heartbeat = 60 # Heartbeat interval (seconds) for connectio
Expand All @@ -60,7 +54,6 @@ def __init__(self):
self._rabbitmq_io_loop_thread = None # For IOLoop execution
self._rabbitmq_stop_flag = False # Graceful shutdown flag
self._rabbitmq_lock = threading.Lock() # Ensure thread safety
self._rabbitmq_initializing = False # Avoid duplicate concurrent initializations

def is_rabbitmq_connected(self) -> bool:
"""Check if RabbitMQ connection is alive"""
Expand All @@ -77,22 +70,11 @@ def initialize_rabbitmq(
"""
Establish connection to RabbitMQ using pika.
"""
with self._rabbitmq_lock:
if self._rabbitmq_initializing:
logger.info(
"[DIAGNOSTIC] initialize_rabbitmq: initialization already in progress; skipping duplicate call."
)
return
self._rabbitmq_initializing = True
try:
# Skip remote initialization in CI/pytest unless explicitly enabled
enable_env = os.getenv("MEMOS_ENABLE_RABBITMQ", "").lower() == "true"
in_ci = os.getenv("CI", "").lower() == "true"
in_pytest = os.getenv("PYTEST_CURRENT_TEST") is not None
logger.info(
f"[DIAGNOSTIC] initialize_rabbitmq called. in_ci={in_ci}, in_pytest={in_pytest}, "
f"MEMOS_ENABLE_RABBITMQ={enable_env}, config_path={config_path}"
)
if (in_ci or in_pytest) and not enable_env:
logger.info(
"Skipping RabbitMQ initialization in CI/test environment. Set MEMOS_ENABLE_RABBITMQ=true to enable."
Expand Down Expand Up @@ -149,9 +131,6 @@ def initialize_rabbitmq(
logger.info("RabbitMQ connection process started")
except Exception:
logger.error("Fail to initialize auth_config", exc_info=True)
finally:
with self._rabbitmq_lock:
self._rabbitmq_initializing = False

def get_rabbitmq_queue_size(self) -> int:
"""Get the current number of messages in the queue.
Expand Down Expand Up @@ -218,7 +197,7 @@ def get_rabbitmq_connection_param(self):
# Connection lifecycle callbacks
def on_rabbitmq_connection_open(self, connection):
"""Called when connection is established."""
logger.info("[DIAGNOSTIC] RabbitMQ connection opened")
logger.debug("Connection opened")
connection.channel(on_open_callback=self.on_rabbitmq_channel_open)

def on_rabbitmq_connection_error(self, connection, error):
Expand All @@ -236,7 +215,7 @@ def on_rabbitmq_connection_closed(self, connection, reason):
def on_rabbitmq_channel_open(self, channel):
"""Called when channel is ready."""
self.rabbitmq_channel = channel
logger.info("[DIAGNOSTIC] RabbitMQ channel opened")
logger.debug("Channel opened")

# Setup exchange and queue
channel.exchange_declare(
Expand Down Expand Up @@ -264,8 +243,6 @@ def on_rabbitmq_queue_declared(self, frame):
def on_rabbitmq_bind_ok(self, frame):
"""Final setup step when bind is complete."""
logger.info("RabbitMQ setup completed")
# Flush any cached publish messages now that connection is ready
self._flush_cached_publish_messages()

def on_rabbitmq_message(self, channel, method, properties, body):
"""Handle incoming messages. Only for test."""
Expand Down Expand Up @@ -334,21 +311,8 @@ def rabbitmq_publish_message(self, message: dict):
logger.info(f" - Message Content: {json.dumps(message, indent=2)}")

with self._rabbitmq_lock:
logger.info(
f"[DIAGNOSTIC] rabbitmq_service.rabbitmq_publish_message invoked. "
f"is_connected={self.is_rabbitmq_connected()}, exchange={exchange_name}, "
f"routing_key='{routing_key}', label={label}"
)
if not self.is_rabbitmq_connected():
logger.error(
"[DIAGNOSTIC] Cannot publish - no active connection. Caching message for retry. "
f"connection_exists={bool(self.rabbitmq_connection)}, "
f"channel_exists={bool(self.rabbitmq_channel)}, "
f"config_loaded={self.rabbitmq_config is not None}"
)
self.rabbitmq_publish_cache.put(message)
# Best-effort to connect
self.initialize_rabbitmq(config=self.rabbitmq_config)
logger.error("Cannot publish - no active connection")
return False

logger.info(
Expand All @@ -368,8 +332,6 @@ def rabbitmq_publish_message(self, message: dict):
return True
except Exception as e:
logger.error(f"Failed to publish message: {e}")
# Cache message for retry on next connection
self.rabbitmq_publish_cache.put(message)
self.rabbit_reconnect()
return False

Expand Down Expand Up @@ -417,37 +379,3 @@ def rabbitmq_close(self):
logger.warning("IOLoop thread did not terminate cleanly")

logger.info("RabbitMQ connection closed")

def _flush_cached_publish_messages(self):
"""Flush cached outgoing messages once connection is available."""
if self.rabbitmq_publish_cache.empty():
return

if not self.is_rabbitmq_connected():
logger.info(
"[DIAGNOSTIC] _flush_cached_publish_messages: connection still down; "
f"pending={self.rabbitmq_publish_cache.qsize()}"
)
return

drained: list[dict] = []
while True:
try:
drained.append(self.rabbitmq_publish_cache.get_nowait())
except Empty:
break

if not drained:
return

logger.info(
f"[DIAGNOSTIC] Flushing {len(drained)} cached RabbitMQ messages after reconnect."
)
for cached_msg in drained:
success = self.rabbitmq_publish_message(cached_msg)
if not success:
# Message already re-cached inside publish; avoid tight loop
logger.error(
"[DIAGNOSTIC] Failed to flush cached message; re-queued for next attempt."
)
break