From 99a9a711a5803a982ac5f7ca2f6e636c2cb2a95d Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 3 Dec 2025 21:55:13 +0800 Subject: [PATCH] fix(scheduler): Correctly process feedback logs by checking for 'text' key --- src/memos/mem_scheduler/general_scheduler.py | 116 ++++++++++++++----- 1 file changed, 85 insertions(+), 31 deletions(-) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 5848fe176..a2e4f5d4e 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -600,31 +600,76 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> and hasattr(self.rabbitmq_config, "exchange_type") and self.rabbitmq_config.exchange_type == "direct" ) - if feedback_result and should_send_log: - feedback_content = [] - for mem_item in feedback_result: - # Safely access attributes, assuming mem_item could be dict or object + if should_send_log: + record = feedback_result.get("record") if isinstance(feedback_result, dict) else {} + add_records = record.get("add") if isinstance(record, dict) else [] + update_records = record.get("update") if isinstance(record, dict) else [] + + def _extract_fields(mem_item): mem_id = ( - getattr(mem_item, "id", None) or mem_item.get("id") - if isinstance(mem_item, dict) - else None + getattr(mem_item, "id", None) + if not isinstance(mem_item, dict) + else mem_item.get("id") ) mem_memory = ( - getattr(mem_item, "memory", None) or mem_item.get("memory") - if isinstance(mem_item, dict) - else None + getattr(mem_item, "memory", None) + if not isinstance(mem_item, dict) + else mem_item.get("memory") or mem_item.get("text") + ) + if mem_memory is None and isinstance(mem_item, dict): + mem_memory = mem_item.get("text") + original_content = ( + getattr(mem_item, "origin_memory", None) + if not isinstance(mem_item, dict) + else mem_item.get("origin_memory") + or mem_item.get("old_memory") + or mem_item.get("original_content") ) + return mem_id, mem_memory, original_content + + kb_log_content: list[dict] = [] + + for mem_item in add_records or []: + mem_id, mem_memory, _ = _extract_fields(mem_item) + if mem_id and mem_memory: + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": "Feedback", + "operation": "ADD", + "memory_id": mem_id, + "content": mem_memory, + "original_content": None, + "source_doc_id": None, + } + ) + else: + logger.warning( + "Skipping malformed feedback add item. user_id=%s mem_cube_id=%s task_id=%s item=%s", + user_id, + mem_cube_id, + task_id, + mem_item, + stack_info=True, + ) + for mem_item in update_records or []: + mem_id, mem_memory, original_content = _extract_fields(mem_item) if mem_id and mem_memory: - feedback_content.append( + kb_log_content.append( { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": "Feedback", + "operation": "UPDATE", + "memory_id": mem_id, "content": mem_memory, - "id": mem_id, + "original_content": original_content, + "source_doc_id": None, } ) else: logger.warning( - "Skipping malformed mem_item in feedback_result. user_id=%s mem_cube_id=%s task_id=%s item=%s", + "Skipping malformed feedback update item. user_id=%s mem_cube_id=%s task_id=%s item=%s", user_id, mem_cube_id, task_id, @@ -632,30 +677,39 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> stack_info=True, ) - if not feedback_content: + if kb_log_content: + logger.info( + "[DIAGNOSTIC] general_scheduler._mem_feedback_message_consumer: Creating knowledgeBaseUpdate event for feedback. user_id=%s mem_cube_id=%s task_id=%s items=%s", + user_id, + mem_cube_id, + task_id, + len(kb_log_content), + ) + event = self.create_event_log( + label="knowledgeBaseUpdate", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + memcube_log_content=kb_log_content, + metadata=None, + memory_len=len(kb_log_content), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + event.log_content = ( + f"Knowledge Base Memory Update: {len(kb_log_content)} changes." + ) + event.task_id = task_id + self._submit_web_logs([event]) + else: logger.warning( - "No valid feedback content generated from feedback_result. user_id=%s mem_cube_id=%s task_id=%s", + "No valid feedback content generated for web log. user_id=%s mem_cube_id=%s task_id=%s", user_id, mem_cube_id, task_id, stack_info=True, ) - return - - event = self.create_event_log( - label="feedbackMemory", - from_memory_type=USER_INPUT_TYPE, - to_memory_type=LONG_TERM_MEMORY_TYPE, - user_id=user_id, - mem_cube_id=mem_cube_id, - mem_cube=mem_cube, - memcube_log_content=feedback_content, - metadata=[], - memory_len=len(feedback_content), - memcube_name=self._map_memcube_name(mem_cube_id), - ) - event.task_id = task_id - self._submit_web_logs([event]) except Exception as e: logger.error(f"Error processing feedbackMemory message: {e}", exc_info=True)