diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 2093083e6..c3dba6d8c 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -504,6 +504,8 @@ def process_message(message: ScheduleMessageItem): text_mem=text_mem, user_name=user_name, custom_tags=info.get("custom_tags", None), + task_id=message.task_id, + info=info, ) logger.info( @@ -529,6 +531,8 @@ def _process_memories_with_reader( text_mem: TreeTextMemory, user_name: str, custom_tags: list[str] | None = None, + task_id: str | None = None, + info: dict | None = None, ) -> None: """ Process memories using mem_reader for enhanced memory processing. @@ -540,6 +544,7 @@ def _process_memories_with_reader( text_mem: Text memory instance custom_tags: Optional list of custom tags for memory processing """ + kb_log_content: list[dict] = [] try: # Get the mem_reader from the parent MOSCore if not hasattr(self, "mem_reader") or self.mem_reader is None: @@ -602,6 +607,86 @@ def _process_memories_with_reader( logger.info( f"Added {len(enhanced_mem_ids)} enhanced memories: {enhanced_mem_ids}" ) + + # LOGGING BLOCK START + # This block is replicated from _add_message_consumer to ensure consistent logging + is_cloud_env = ( + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" + ) + if is_cloud_env: + # New: Knowledge Base Logging (Cloud Service) + kb_log_content = [] + for item in flattened_memories: + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": info.get("trigger_source", "Messages") + if info + else "Messages", + "operation": "ADD", + "memory_id": item.id, + "content": item.memory, + "original_content": None, + "source_doc_id": getattr(item.metadata, "source_doc_id", None), + } + ) + if kb_log_content: + event = self.create_event_log( + label="knowledgeBaseUpdate", + log_content=f"Knowledge Base Memory Update: {len(kb_log_content)} changes.", + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=kb_log_content, + metadata=None, + memory_len=len(kb_log_content), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + event.task_id = task_id + self._submit_web_logs([event]) + else: + # Existing: Playground/Default Logging + add_content_legacy: list[dict] = [] + add_meta_legacy: list[dict] = [] + for item_id, item in zip( + enhanced_mem_ids, flattened_memories, strict=False + ): + key = getattr(item.metadata, "key", None) or transform_name_to_key( + name=item.memory + ) + add_content_legacy.append( + {"content": f"{key}: {item.memory}", "ref_id": item_id} + ) + add_meta_legacy.append( + { + "ref_id": item_id, + "id": item_id, + "key": item.metadata.key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), + } + ) + if add_content_legacy: + event = self.create_event_log( + label="addMemory", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=add_content_legacy, + metadata=add_meta_legacy, + memory_len=len(add_content_legacy), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + event.task_id = task_id + self._submit_web_logs([event]) + # LOGGING BLOCK END else: logger.info("No enhanced memories generated by mem_reader") else: @@ -630,10 +715,45 @@ def _process_memories_with_reader( logger.info("Remove and Refresh Memories") logger.debug(f"Finished add {user_id} memory: {mem_ids}") - except Exception: + except Exception as exc: logger.error( f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True ) + with contextlib.suppress(Exception): + is_cloud_env = ( + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" + ) + if is_cloud_env: + if not kb_log_content: + trigger_source = ( + info.get("trigger_source", "Messages") if info else "Messages" + ) + kb_log_content = [ + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": trigger_source, + "operation": "ADD", + "memory_id": mem_id, + "content": None, + "original_content": None, + "source_doc_id": None, + } + for mem_id in mem_ids + ] + event = self.create_event_log( + label="knowledgeBaseUpdate", + log_content=f"Knowledge Base Memory Update failed: {exc!s}", + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=kb_log_content, + metadata=None, + memory_len=len(kb_log_content), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + event.task_id = task_id + event.status = "failed" + self._submit_web_logs([event]) def _mem_reorganize_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {MEM_ORGANIZE_LABEL} handler.") diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index df3e2055e..c361a77a2 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -1,5 +1,4 @@ import concurrent -import os import threading import time @@ -15,7 +14,7 @@ from memos.mem_scheduler.schemas.general_schemas import ( DEFAULT_STOP_WAIT, ) -from memos.mem_scheduler.schemas.message_schemas import ScheduleLogForWebItem, ScheduleMessageItem +from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem from memos.mem_scheduler.utils.misc_utils import group_messages_by_user_and_mem_cube from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker @@ -159,20 +158,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): ) self.metrics.task_completed(user_id=m.user_id, task_type=m.label) - is_cloud_env = ( - os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" - ) - if self.submit_web_logs and is_cloud_env: - status_log = ScheduleLogForWebItem( - user_id=task_item.user_id, - mem_cube_id=task_item.mem_cube_id, - item_id=task_item.item_id, - label=m.label, - log_content=f"Task {task_item.item_id} completed successfully for user {task_item.user_id}.", - status="completed", - ) - self.submit_web_logs([status_log]) - # acknowledge redis messages if self.use_redis_queue and self.memos_message_queue is not None: for msg in messages: @@ -211,20 +196,6 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): self._completed_tasks.pop(0) logger.error(f"Task failed: {task_item.get_execution_info()}, Error: {e}") - is_cloud_env = ( - os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") == "memos-memory-change" - ) - if self.submit_web_logs and is_cloud_env: - status_log = ScheduleLogForWebItem( - user_id=task_item.user_id, - mem_cube_id=task_item.mem_cube_id, - item_id=task_item.item_id, - label=m.label, - log_content=f"Task {task_item.item_id} failed for user {task_item.user_id} with error: {e!s}.", - status="failed", - exception=str(e), - ) - self.submit_web_logs([status_log]) raise return wrapped_handler