Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 85 additions & 31 deletions src/memos/mem_scheduler/general_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,62 +600,116 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
and hasattr(self.rabbitmq_config, "exchange_type")
and self.rabbitmq_config.exchange_type == "direct"
)
if feedback_result and should_send_log:
feedback_content = []
for mem_item in feedback_result:
# Safely access attributes, assuming mem_item could be dict or object
if should_send_log:
record = feedback_result.get("record") if isinstance(feedback_result, dict) else {}
add_records = record.get("add") if isinstance(record, dict) else []
update_records = record.get("update") if isinstance(record, dict) else []

def _extract_fields(mem_item):
mem_id = (
getattr(mem_item, "id", None) or mem_item.get("id")
if isinstance(mem_item, dict)
else None
getattr(mem_item, "id", None)
if not isinstance(mem_item, dict)
else mem_item.get("id")
)
mem_memory = (
getattr(mem_item, "memory", None) or mem_item.get("memory")
if isinstance(mem_item, dict)
else None
getattr(mem_item, "memory", None)
if not isinstance(mem_item, dict)
else mem_item.get("memory") or mem_item.get("text")
)
if mem_memory is None and isinstance(mem_item, dict):
mem_memory = mem_item.get("text")
original_content = (
getattr(mem_item, "origin_memory", None)
if not isinstance(mem_item, dict)
else mem_item.get("origin_memory")
or mem_item.get("old_memory")
or mem_item.get("original_content")
)
return mem_id, mem_memory, original_content

kb_log_content: list[dict] = []

for mem_item in add_records or []:
mem_id, mem_memory, _ = _extract_fields(mem_item)
if mem_id and mem_memory:
kb_log_content.append(
{
"log_source": "KNOWLEDGE_BASE_LOG",
"trigger_source": "Feedback",
"operation": "ADD",
"memory_id": mem_id,
"content": mem_memory,
"original_content": None,
"source_doc_id": None,
}
)
else:
logger.warning(
"Skipping malformed feedback add item. user_id=%s mem_cube_id=%s task_id=%s item=%s",
user_id,
mem_cube_id,
task_id,
mem_item,
stack_info=True,
)

for mem_item in update_records or []:
mem_id, mem_memory, original_content = _extract_fields(mem_item)
if mem_id and mem_memory:
feedback_content.append(
kb_log_content.append(
{
"log_source": "KNOWLEDGE_BASE_LOG",
"trigger_source": "Feedback",
"operation": "UPDATE",
"memory_id": mem_id,
"content": mem_memory,
"id": mem_id,
"original_content": original_content,
"source_doc_id": None,
}
)
else:
logger.warning(
"Skipping malformed mem_item in feedback_result. user_id=%s mem_cube_id=%s task_id=%s item=%s",
"Skipping malformed feedback update item. user_id=%s mem_cube_id=%s task_id=%s item=%s",
user_id,
mem_cube_id,
task_id,
mem_item,
stack_info=True,
)

if not feedback_content:
if kb_log_content:
logger.info(
"[DIAGNOSTIC] general_scheduler._mem_feedback_message_consumer: Creating knowledgeBaseUpdate event for feedback. user_id=%s mem_cube_id=%s task_id=%s items=%s",
user_id,
mem_cube_id,
task_id,
len(kb_log_content),
)
event = self.create_event_log(
label="knowledgeBaseUpdate",
from_memory_type=USER_INPUT_TYPE,
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
memcube_log_content=kb_log_content,
metadata=None,
memory_len=len(kb_log_content),
memcube_name=self._map_memcube_name(mem_cube_id),
)
event.log_content = (
f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
)
event.task_id = task_id
self._submit_web_logs([event])
else:
logger.warning(
"No valid feedback content generated from feedback_result. user_id=%s mem_cube_id=%s task_id=%s",
"No valid feedback content generated for web log. user_id=%s mem_cube_id=%s task_id=%s",
user_id,
mem_cube_id,
task_id,
stack_info=True,
)
return

event = self.create_event_log(
label="feedbackMemory",
from_memory_type=USER_INPUT_TYPE,
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=mem_cube,
memcube_log_content=feedback_content,
metadata=[],
memory_len=len(feedback_content),
memcube_name=self._map_memcube_name(mem_cube_id),
)
event.task_id = task_id
self._submit_web_logs([event])

except Exception as e:
logger.error(f"Error processing feedbackMemory message: {e}", exc_info=True)
Expand Down
Loading