Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 121 additions & 1 deletion src/memos/mem_scheduler/general_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ def process_message(message: ScheduleMessageItem):
text_mem=text_mem,
user_name=user_name,
custom_tags=info.get("custom_tags", None),
task_id=message.task_id,
info=info,
)

logger.info(
Expand All @@ -529,6 +531,8 @@ def _process_memories_with_reader(
text_mem: TreeTextMemory,
user_name: str,
custom_tags: list[str] | None = None,
task_id: str | None = None,
info: dict | None = None,
) -> None:
"""
Process memories using mem_reader for enhanced memory processing.
Expand All @@ -540,6 +544,7 @@ def _process_memories_with_reader(
text_mem: Text memory instance
custom_tags: Optional list of custom tags for memory processing
"""
kb_log_content: list[dict] = []
try:
# Get the mem_reader from the parent MOSCore
if not hasattr(self, "mem_reader") or self.mem_reader is None:
Expand Down Expand Up @@ -602,6 +607,86 @@ def _process_memories_with_reader(
logger.info(
f"Added {len(enhanced_mem_ids)} enhanced memories: {enhanced_mem_ids}"
)

# LOGGING BLOCK START
# This block is replicated from _add_message_consumer to ensure consistent logging
is_cloud_env = (
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
)
if is_cloud_env:
# New: Knowledge Base Logging (Cloud Service)
kb_log_content = []
for item in flattened_memories:
kb_log_content.append(
{
"log_source": "KNOWLEDGE_BASE_LOG",
"trigger_source": info.get("trigger_source", "Messages")
if info
else "Messages",
"operation": "ADD",
"memory_id": item.id,
"content": item.memory,
"original_content": None,
"source_doc_id": getattr(item.metadata, "source_doc_id", None),
}
)
if kb_log_content:
event = self.create_event_log(
label="knowledgeBaseUpdate",
log_content=f"Knowledge Base Memory Update: {len(kb_log_content)} changes.",
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=self.current_mem_cube,
memcube_log_content=kb_log_content,
metadata=None,
memory_len=len(kb_log_content),
memcube_name=self._map_memcube_name(mem_cube_id),
)
event.task_id = task_id
self._submit_web_logs([event])
else:
# Existing: Playground/Default Logging
add_content_legacy: list[dict] = []
add_meta_legacy: list[dict] = []
for item_id, item in zip(
enhanced_mem_ids, flattened_memories, strict=False
):
key = getattr(item.metadata, "key", None) or transform_name_to_key(
name=item.memory
)
add_content_legacy.append(
{"content": f"{key}: {item.memory}", "ref_id": item_id}
)
add_meta_legacy.append(
{
"ref_id": item_id,
"id": item_id,
"key": item.metadata.key,
"memory": item.memory,
"memory_type": item.metadata.memory_type,
"status": item.metadata.status,
"confidence": item.metadata.confidence,
"tags": item.metadata.tags,
"updated_at": getattr(item.metadata, "updated_at", None)
or getattr(item.metadata, "update_at", None),
}
)
if add_content_legacy:
event = self.create_event_log(
label="addMemory",
from_memory_type=USER_INPUT_TYPE,
to_memory_type=LONG_TERM_MEMORY_TYPE,
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=self.current_mem_cube,
memcube_log_content=add_content_legacy,
metadata=add_meta_legacy,
memory_len=len(add_content_legacy),
memcube_name=self._map_memcube_name(mem_cube_id),
)
event.task_id = task_id
self._submit_web_logs([event])
# LOGGING BLOCK END
else:
logger.info("No enhanced memories generated by mem_reader")
else:
Expand Down Expand Up @@ -630,10 +715,45 @@ def _process_memories_with_reader(
logger.info("Remove and Refresh Memories")
logger.debug(f"Finished add {user_id} memory: {mem_ids}")

except Exception:
except Exception as exc:
logger.error(
f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True
)
with contextlib.suppress(Exception):
is_cloud_env = (
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
)
if is_cloud_env:
if not kb_log_content:
trigger_source = (
info.get("trigger_source", "Messages") if info else "Messages"
)
kb_log_content = [
{
"log_source": "KNOWLEDGE_BASE_LOG",
"trigger_source": trigger_source,
"operation": "ADD",
"memory_id": mem_id,
"content": None,
"original_content": None,
"source_doc_id": None,
}
for mem_id in mem_ids
]
event = self.create_event_log(
label="knowledgeBaseUpdate",
log_content=f"Knowledge Base Memory Update failed: {exc!s}",
user_id=user_id,
mem_cube_id=mem_cube_id,
mem_cube=self.current_mem_cube,
memcube_log_content=kb_log_content,
metadata=None,
memory_len=len(kb_log_content),
memcube_name=self._map_memcube_name(mem_cube_id),
)
event.task_id = task_id
event.status = "failed"
self._submit_web_logs([event])

def _mem_reorganize_message_consumer(self, messages: list[ScheduleMessageItem]) -> None:
logger.info(f"Messages {messages} assigned to {MEM_ORGANIZE_LABEL} handler.")
Expand Down
31 changes: 1 addition & 30 deletions src/memos/mem_scheduler/task_schedule_modules/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import concurrent
import os
import threading
import time

Expand All @@ -15,7 +14,7 @@
from memos.mem_scheduler.schemas.general_schemas import (
DEFAULT_STOP_WAIT,
)
from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem, ScheduleMessageItem
from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem
from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem
from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube
from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker
Expand Down Expand Up @@ -159,20 +158,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
)
self.metrics.task_completed(user_id=m.user_id, task_type=m.label)

is_cloud_env = (
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
)
if self.submit_web_logs and is_cloud_env:
status_log = ScheduleLogForWebItem(
user_id=task_item.user_id,
mem_cube_id=task_item.mem_cube_id,
item_id=task_item.item_id,
label=m.label,
log_content=f"Task {task_item.item_id} completed successfully for user {task_item.user_id}.",
status="completed",
)
self.submit_web_logs([status_log])

# acknowledge redis messages
if self.use_redis_queue and self.memos_message_queue is not None:
for msg in messages:
Expand Down Expand Up @@ -211,20 +196,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]):
self._completed_tasks.pop(0)
logger.error(f"Task failed: {task_item.get_execution_info()}, Error: {e}")

is_cloud_env = (
os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change"
)
if self.submit_web_logs and is_cloud_env:
status_log = ScheduleLogForWebItem(
user_id=task_item.user_id,
mem_cube_id=task_item.mem_cube_id,
item_id=task_item.item_id,
label=m.label,
log_content=f"Task {task_item.item_id} failed for user {task_item.user_id} with error: {e!s}.",
status="failed",
exception=str(e),
)
self.submit_web_logs([status_log])
raise

return wrapped_handler
Expand Down
Loading