From 6d757996e7f3e780c481660af012ea17ed1eccea Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Thu, 4 Dec 2025 15:02:17 +0800 Subject: [PATCH] feat(scheduler): Unify web log submission checks and add debug logs --- src/memos/mem_scheduler/general_scheduler.py | 27 ++++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index ad34530bc..a5e80370f 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -595,12 +595,8 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> f"Successfully processed feedback for user_id={user_id}, mem_cube_id={mem_cube_id}" ) - should_send_log = ( - self.rabbitmq_config is not None - and hasattr(self.rabbitmq_config, "exchange_type") - and self.rabbitmq_config.exchange_type == "direct" - ) - if should_send_log: + is_cloud_env = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" + if is_cloud_env: record = feedback_result.get("record") if isinstance(feedback_result, dict) else {} add_records = record.get("add") if isinstance(record, dict) else [] update_records = record.get("update") if isinstance(record, dict) else [] @@ -724,6 +720,11 @@ def _extract_fields(mem_item): task_id, stack_info=True, ) + else: + logger.info( + "Skipping web log for feedback. Not in a cloud environment (is_cloud_env=%s)", + is_cloud_env, + ) except Exception as e: logger.error(f"Error processing feedbackMemory message: {e}", exc_info=True) @@ -1324,12 +1325,10 @@ def process_message(message: ScheduleMessageItem): # Create and submit log for web display # Only send logs if RabbitMQ is configured with direct exchange (cloud service scenario) - should_send_log = ( - self.rabbitmq_config is not None - and hasattr(self.rabbitmq_config, "exchange_type") - and self.rabbitmq_config.exchange_type == "direct" + is_cloud_env = ( + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" ) - if pref_ids and should_send_log: + if pref_ids and is_cloud_env: pref_content = [] pref_meta = [] for i, pref_mem_item in enumerate(pref_memories): @@ -1365,6 +1364,12 @@ def process_message(message: ScheduleMessageItem): ) event.task_id = message.task_id self._submit_web_logs([event]) + else: + logger.info( + "Skipping web log for pref_add. pref_ids_count=%s is_cloud_env=%s", + len(pref_ids) if pref_ids else 0, + is_cloud_env, + ) except Exception as e: logger.error(f"Error processing pref_add message: {e}", exc_info=True)