diff --git a/src/memos/mem_feedback/feedback.py b/src/memos/mem_feedback/feedback.py index 8df04333c..e0fd6cc77 100644 --- a/src/memos/mem_feedback/feedback.py +++ b/src/memos/mem_feedback/feedback.py @@ -143,7 +143,17 @@ def _pure_add(self, user_name: str, feedback_content: str, feedback_time: str, i return { "record": { "add": [ - {"id": _id, "text": added_mem.memory} + { + "id": _id, + "text": added_mem.memory, + "source_doc_id": ( + added_mem.metadata.file_ids[0] + if hasattr(added_mem.metadata, "file_ids") + and isinstance(added_mem.metadata.file_ids, list) + and added_mem.metadata.file_ids + else None + ), + } for _id, added_mem in zip(added_ids, to_add_memories, strict=False) ], "update": [], @@ -230,7 +240,17 @@ def _single_add_operation( ) logger.info(f"[Memory Feedback ADD] memory id: {added_ids!s}") - return {"id": added_ids[0], "text": to_add_memory.memory} + return { + "id": added_ids[0], + "text": to_add_memory.memory, + "source_doc_id": ( + to_add_memory.metadata.file_ids[0] + if hasattr(to_add_memory.metadata, "file_ids") + and isinstance(to_add_memory.metadata.file_ids, list) + and to_add_memory.metadata.file_ids + else None + ), + } def _single_update_operation( self, @@ -239,11 +259,22 @@ def _single_update_operation( user_id: str, user_name: str, async_mode: str = "sync", + operation: dict | None = None, ) -> dict: """ Individual update operations """ memory_type = old_memory_item.metadata.memory_type + source_doc_id = ( + old_memory_item.metadata.file_ids[0] + if hasattr(old_memory_item.metadata, "file_ids") + and isinstance(old_memory_item.metadata.file_ids, list) + and old_memory_item.metadata.file_ids + else None + ) + if operation and "text" in operation and operation["text"]: + new_memory_item.memory = operation["text"] + if memory_type == "WorkingMemory": fields = { "memory": new_memory_item.memory, @@ -274,6 +305,7 @@ def _single_update_operation( return { "id": item_id, "text": new_memory_item.memory, + "source_doc_id": source_doc_id, "archived_id": old_memory_item.id, "origin_memory": old_memory_item.memory, } @@ -417,6 +449,7 @@ def semantics_feedback( memory_item, user_id, user_name, + operation=op, ) future_to_op[future] = ("update", op) diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index bd7fb202d..6256467ba 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -644,8 +644,8 @@ def _extract_fields(mem_item): or mem_item.get("original_content") ) source_doc_id = None - if "archived_id" in mem_item: - source_doc_id = mem_item.get("archived_id") + if isinstance(mem_item, dict): + source_doc_id = mem_item.get("source_doc_id", None) return mem_id, mem_memory, original_content, source_doc_id @@ -699,6 +699,7 @@ def _extract_fields(mem_item): stack_info=True, ) + logger.info(f"[Feedback Scheduler] kb_log_content: {kb_log_content!s}") if kb_log_content: logger.info( "[DIAGNOSTIC] general_scheduler._mem_feedback_message_consumer: Creating knowledgeBaseUpdate event for feedback. user_id=%s mem_cube_id=%s task_id=%s items=%s",