Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 117 additions & 15 deletions src/memos/mem_scheduler/general_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,63 @@ def send_add_log_messages_to_local_env(
if events:
self._submit_web_logs(events, additional_log_info="send_add_log_messages_to_cloud_env")

def send_add_log_messages_to_cloud_env(
self, msg: ScheduleMessageItem, prepared_add_items, prepared_update_items_with_original
):
"""
Cloud logging path for add/update events.
"""
kb_log_content: list[dict] = []
info = msg.info or {}
# Process added items
for item in prepared_add_items:
kb_log_content.append(
{
"log_source": "KNOWLEDGE_BASE_LOG",
"trigger_source": info.get("trigger_source", "Messages"),
"operation": "ADD",
"memory_id": item.id,
"content": item.memory,
"original_content": None,
"source_doc_id": getattr(item.metadata, "source_doc_id", None),
}
)

# Process updated items
for item_data in prepared_update_items_with_original:
item = item_data["new_item"]
kb_log_content.append(
{
"log_source": "KNOWLEDGE_BASE_LOG",
"trigger_source": info.get("trigger_source", "Messages"),
"operation": "UPDATE",
"memory_id": item.id,
"content": item.memory,
"original_content": item_data.get("original_content"),
"source_doc_id": getattr(item.metadata, "source_doc_id", None),
}
)

if kb_log_content:
logger.info(
f"[DIAGNOSTIC] general_scheduler.send_add_log_messages_to_cloud_env: Creating event log for KB update. Label: knowledgeBaseUpdate, user_id: {msg.user_id}, mem_cube_id: {msg.mem_cube_id}, task_id: {msg.task_id}. KB content: {json.dumps(kb_log_content, indent=2)}"
)
event = self.create_event_log(
label="knowledgeBaseUpdate",
from_memory_type=USER_INPUT_TYPE,
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=msg.user_id,
mem_cube_id=msg.mem_cube_id,
mem_cube=self.current_mem_cube,
memcube_log_content=kb_log_content,
metadata=None,
memory_len=len(kb_log_content),
memcube_name=self._map_memcube_name(msg.mem_cube_id),
)
event.log_content = f"Knowledge Base Memory Update: {len(kb_log_content)} changes."
event.task_id = msg.task_id
self._submit_web_logs([event])

def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
logger.info(f"Messages {messages} assigned to {ADD_LABEL} handler.")
# Process the query in a session turn
Expand Down Expand Up @@ -502,28 +559,40 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:

def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
try:
if not messages:
return
message = messages[0]
mem_cube = self.current_mem_cube

user_id = message.user_id
mem_cube_id = message.mem_cube_id
content = message.content

feedback_data = json.loads(content)
try:
feedback_data = json.loads(content) if isinstance(content, str) else content
if not isinstance(feedback_data, dict):
logger.error(
f"Failed to decode feedback_data or it is not a dict: {feedback_data}"
)
return
except json.JSONDecodeError:
logger.error(f"Invalid JSON content for feedback message: {content}", exc_info=True)
return

task_id = feedback_data.get("task_id") or message.task_id
feedback_result = self.feedback_server.process_feedback(
user_id=user_id,
user_name=mem_cube_id,
session_id=feedback_data["session_id"],
chat_history=feedback_data["history"],
retrieved_memory_ids=feedback_data["retrieved_memory_ids"],
feedback_content=feedback_data["feedback_content"],
feedback_time=feedback_data["feedback_time"],
task_id=feedback_data["task_id"],
session_id=feedback_data.get("session_id"),
chat_history=feedback_data.get("history", []),
retrieved_memory_ids=feedback_data.get("retrieved_memory_ids", []),
feedback_content=feedback_data.get("feedback_content"),
feedback_time=feedback_data.get("feedback_time"),
task_id=task_id,
)

logger.info(
f"Successfully feedback memories for user_id={user_id}, mem_cube_id={mem_cube_id}"
f"Successfully processed feedback for user_id={user_id}, mem_cube_id={mem_cube_id}"
)

should_send_log = (
Expand All @@ -533,13 +602,46 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
)
if feedback_result and should_send_log:
feedback_content = []
for _i, mem_item in enumerate(feedback_result):
feedback_content.append(
{
"content": mem_item.memory,
"id": mem_item["id"],
}
for mem_item in feedback_result:
# Safely access attributes, assuming mem_item could be dict or object
mem_id = (
getattr(mem_item, "id", None) or mem_item.get("id")
if isinstance(mem_item, dict)
else None
)
mem_memory = (
getattr(mem_item, "memory", None) or mem_item.get("memory")
if isinstance(mem_item, dict)
else None
)

if mem_id and mem_memory:
feedback_content.append(
{
"content": mem_memory,
"id": mem_id,
}
)
else:
logger.warning(
"Skipping malformed mem_item in feedback_result. user_id=%s mem_cube_id=%s task_id=%s item=%s",
user_id,
mem_cube_id,
task_id,
mem_item,
stack_info=True,
)

if not feedback_content:
logger.warning(
"No valid feedback content generated from feedback_result. user_id=%s mem_cube_id=%s task_id=%s",
user_id,
mem_cube_id,
task_id,
stack_info=True,
)
return

event = self.create_event_log(
label="feedbackMemory",
from_memory_type=USER_INPUT_TYPE,
Expand All @@ -552,7 +654,7 @@ def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) ->
memory_len=len(feedback_content),
memcube_name=self._map_memcube_name(mem_cube_id),
)
event.task_id = message.task_id
event.task_id = task_id
self._submit_web_logs([event])

except Exception as e:
Expand Down
Loading