Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions src/memos/mem_scheduler/general_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,12 +595,8 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
f"Successfully processed feedback for user_id={user_id}, mem_cube_id={mem_cube_id}"
)

should_send_log = (
self.rabbitmq_config is not None
and hasattr(self.rabbitmq_config, "exchange_type")
and self.rabbitmq_config.exchange_type == "direct"
)
if should_send_log:
is_cloud_env = os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
if is_cloud_env:
record = feedback_result.get("record") if isinstance(feedback_result, dict) else {}
add_records = record.get("add") if isinstance(record, dict) else []
update_records = record.get("update") if isinstance(record, dict) else []
Expand Down Expand Up @@ -724,6 +720,11 @@ def _extract_fields(mem_item):
task_id,
stack_info=True,
)
else:
logger.info(
"Skipping web log for feedback. Not in a cloud environment (is_cloud_env=%s)",
is_cloud_env,
)

except Exception as e:
logger.error(f"Error processing feedbackMemory message: {e}", exc_info=True)
Expand Down Expand Up @@ -1324,12 +1325,10 @@ def process_message(message: ScheduleMessageItem):

# Create and submit log for web display
# Only send logs if RabbitMQ is configured with direct exchange (cloud service scenario)
should_send_log = (
self.rabbitmq_config is not None
and hasattr(self.rabbitmq_config, "exchange_type")
and self.rabbitmq_config.exchange_type == "direct"
is_cloud_env = (
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
)
if pref_ids and should_send_log:
if pref_ids and is_cloud_env:
pref_content = []
pref_meta = []
for i, pref_mem_item in enumerate(pref_memories):
Expand Down Expand Up @@ -1365,6 +1364,12 @@ def process_message(message: ScheduleMessageItem):
)
event.task_id = message.task_id
self._submit_web_logs([event])
else:
logger.info(
"Skipping web log for pref_add. pref_ids_count=%s is_cloud_env=%s",
len(pref_ids) if pref_ids else 0,
is_cloud_env,
)

except Exception as e:
logger.error(f"Error processing pref_add message: {e}", exc_info=True)
Expand Down
Loading