diff --git a/src/memos/api/handlers/chat_handler.py b/src/memos/api/handlers/chat_handler.py index c9e01573a..1054644d2 100644 --- a/src/memos/api/handlers/chat_handler.py +++ b/src/memos/api/handlers/chat_handler.py @@ -894,7 +894,7 @@ def _send_message_to_scheduler( content=query, timestamp=datetime.utcnow(), ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item]) + self.mem_scheduler.submit_messages(messages=[message_item]) self.logger.info(f"Sent message to scheduler with label: {label}") except Exception as e: self.logger.error(f"Failed to send message to scheduler: {e}", exc_info=True) diff --git a/src/memos/api/handlers/scheduler_handler.py b/src/memos/api/handlers/scheduler_handler.py index 4596889ac..697822a77 100644 --- a/src/memos/api/handlers/scheduler_handler.py +++ b/src/memos/api/handlers/scheduler_handler.py @@ -34,7 +34,9 @@ def handle_scheduler_status( Args: user_id: User ID to query for. status_tracker: The TaskStatusTracker instance. - task_id: Optional Task ID to query a specific task. + task_id: Optional Task ID to query. Can be either: + - business_task_id (will aggregate all related item statuses) + - item_id (will return single item status) Returns: StatusResponse with a list of task statuses. @@ -46,12 +48,22 @@ def handle_scheduler_status( try: if task_id: - task_data = status_tracker.get_task_status(task_id, user_id) - if not task_data: - raise HTTPException( - status_code=404, detail=f"Task {task_id} not found for user {user_id}" + # First try as business_task_id (aggregated query) + business_task_data = status_tracker.get_task_status_by_business_id(task_id, user_id) + if business_task_data: + response_data.append( + StatusResponseItem(task_id=task_id, status=business_task_data["status"]) + ) + else: + # Fallback: try as item_id (single item query) + item_task_data = status_tracker.get_task_status(task_id, user_id) + if not item_task_data: + raise HTTPException( + status_code=404, detail=f"Task {task_id} not found for user {user_id}" + ) + response_data.append( + StatusResponseItem(task_id=task_id, status=item_task_data["status"]) ) - response_data.append(StatusResponseItem(task_id=task_id, status=task_data["status"])) else: all_tasks = status_tracker.get_all_tasks_for_user(user_id) # The plan returns an empty list, which is good. diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 961b14b6b..5aa617d6e 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -258,6 +258,7 @@ class MemoryCreateRequest(BaseRequest): source: str | None = Field(None, description="Source of the memory") user_profile: bool = Field(False, description="User profile memory") session_id: str | None = Field(None, description="Session id") + task_id: str | None = Field(None, description="Task ID for monitoring async tasks") class SearchRequest(BaseRequest): diff --git a/src/memos/api/routers/product_router.py b/src/memos/api/routers/product_router.py index ccacee816..71e384014 100644 --- a/src/memos/api/routers/product_router.py +++ b/src/memos/api/routers/product_router.py @@ -188,9 +188,43 @@ def get_all_memories(memory_req: GetMemoryPlaygroundRequest): @router.post("/add", summary="add a new memory", response_model=SimpleResponse) def create_memory(memory_req: MemoryCreateRequest): """Create a new memory for a specific user.""" + # Initialize status_tracker outside try block to avoid NameError in except blocks + status_tracker = None + try: time_start_add = time.time() mos_product = get_mos_product_instance() + + # Track task if task_id is provided + item_id: str | None = None + if ( + memory_req.task_id + and hasattr(mos_product, "mem_scheduler") + and mos_product.mem_scheduler + ): + from uuid import uuid4 + + from memos.mem_scheduler.utils.status_tracker import TaskStatusTracker + + item_id = str(uuid4()) # Generate a unique item_id for this submission + + # Get Redis client from scheduler + if ( + hasattr(mos_product.mem_scheduler, "redis_client") + and mos_product.mem_scheduler.redis_client + ): + status_tracker = TaskStatusTracker(mos_product.mem_scheduler.redis_client) + # Submit task with "product_add" type + status_tracker.task_submitted( + task_id=item_id, # Use generated item_id for internal tracking + user_id=memory_req.user_id, + task_type="product_add", + mem_cube_id=memory_req.mem_cube_id or memory_req.user_id, + business_task_id=memory_req.task_id, # Use memory_req.task_id as business_task_id + ) + status_tracker.task_started(item_id, memory_req.user_id) # Use item_id here + + # Execute the add operation mos_product.add( user_id=memory_req.user_id, memory_content=memory_req.memory_content, @@ -200,15 +234,27 @@ def create_memory(memory_req: MemoryCreateRequest): source=memory_req.source, user_profile=memory_req.user_profile, session_id=memory_req.session_id, + task_id=memory_req.task_id, ) + + # Mark task as completed + if status_tracker and item_id: + status_tracker.task_completed(item_id, memory_req.user_id) + logger.info( f"time add api : add time user_id: {memory_req.user_id} time is: {time.time() - time_start_add}" ) return SimpleResponse(message="Memory created successfully") except ValueError as err: + # Mark task as failed if tracking + if status_tracker and item_id: + status_tracker.task_failed(item_id, memory_req.user_id, str(err)) raise HTTPException(status_code=404, detail=str(traceback.format_exc())) from err except Exception as err: + # Mark task as failed if tracking + if status_tracker and item_id: + status_tracker.task_failed(item_id, memory_req.user_id, str(err)) logger.error(f"Failed to create memory: {traceback.format_exc()}") raise HTTPException(status_code=500, detail=str(traceback.format_exc())) from err diff --git a/src/memos/mem_os/core.py b/src/memos/mem_os/core.py index f11b3a44c..edf50feb1 100644 --- a/src/memos/mem_os/core.py +++ b/src/memos/mem_os/core.py @@ -687,6 +687,7 @@ def add( mem_cube_id: str | None = None, user_id: str | None = None, session_id: str | None = None, + task_id: str | None = None, # New: Add task_id parameter **kwargs, ) -> None: """ @@ -773,6 +774,7 @@ def process_textual_memory(): label=MEM_READ_LABEL, content=json.dumps(mem_ids), timestamp=datetime.utcnow(), + task_id=task_id, ) self.mem_scheduler.memos_message_queue.submit_messages( messages=[message_item] @@ -784,6 +786,7 @@ def process_textual_memory(): label=ADD_LABEL, content=json.dumps(mem_ids), timestamp=datetime.utcnow(), + task_id=task_id, ) self.mem_scheduler.memos_message_queue.submit_messages( messages=[message_item] diff --git a/src/memos/mem_os/product.py b/src/memos/mem_os/product.py index 9a4ab3f4d..969d42c6e 100644 --- a/src/memos/mem_os/product.py +++ b/src/memos/mem_os/product.py @@ -1499,13 +1499,20 @@ def add( source: str | None = None, user_profile: bool = False, session_id: str | None = None, + task_id: str | None = None, # Add task_id parameter ): """Add memory for a specific user.""" # Load user cubes if not already loaded self._load_user_cubes(user_id, self.default_cube_config) result = super().add( - messages, memory_content, doc_path, mem_cube_id, user_id, session_id=session_id + messages, + memory_content, + doc_path, + mem_cube_id, + user_id, + session_id=session_id, + task_id=task_id, ) if user_profile: try: diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index f641fc442..6f4bf1b88 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -582,6 +582,7 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt user_id=message.user_id, task_type=message.label, mem_cube_id=message.mem_cube_id, + business_task_id=message.task_id, # Pass business task_id if provided ) self.memos_message_queue.submit_messages(messages=messages) diff --git a/src/memos/mem_scheduler/general_modules/scheduler_logger.py b/src/memos/mem_scheduler/general_modules/scheduler_logger.py index c2a5364d7..89cd9b7ba 100644 --- a/src/memos/mem_scheduler/general_modules/scheduler_logger.py +++ b/src/memos/mem_scheduler/general_modules/scheduler_logger.py @@ -49,7 +49,7 @@ def create_autofilled_log_item( mem_cube: GeneralMemCube, ) -> ScheduleLogForWebItem: text_mem_base: TreeTextMemory = mem_cube.text_mem - current_memory_sizes = text_mem_base.get_current_memory_size() + current_memory_sizes = text_mem_base.get_current_memory_size(user_name=mem_cube_id) current_memory_sizes = { "long_term_memory_size": current_memory_sizes.get("LongTermMemory", 0), "user_memory_size": current_memory_sizes.get("UserMemory", 0), diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index ac2ea2bfa..2093083e6 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -1,6 +1,7 @@ import concurrent.futures import contextlib import json +import os import traceback from memos.configs.mem_scheduler import GeneralSchedulerConfig @@ -19,7 +20,6 @@ PREF_ADD_LABEL, QUERY_LABEL, USER_INPUT_TYPE, - WORKING_MEMORY_TYPE, ) from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.monitor_schemas import QueryMonitorItem @@ -252,6 +252,7 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: if not batch: continue + # Process each message in the batch for msg in batch: try: userinput_memory_ids = json.loads(msg.content) @@ -259,102 +260,211 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.error(f"Error: {e}. Content: {msg.content}", exc_info=True) userinput_memory_ids = [] - mem_items: list[TextualMemoryItem] = [] + # Prepare data for both logging paths, fetching original content for updates + prepared_add_items = [] + prepared_update_items_with_original = [] + for memory_id in userinput_memory_ids: try: + # This mem_item represents the NEW content that was just added/processed mem_item: TextualMemoryItem = self.current_mem_cube.text_mem.get( memory_id=memory_id ) - mem_items.append(mem_item) + # Check if a memory with the same key already exists (determining if it's an update) + key = getattr( + mem_item.metadata, "key", None + ) or transform_name_to_key(name=mem_item.memory) + exists = False + original_content = None + original_item_id = None + + # Only check graph_store if a key exists and the text_mem has a graph_store + if key and hasattr(self.current_mem_cube.text_mem, "graph_store"): + candidates = ( + self.current_mem_cube.text_mem.graph_store.get_by_metadata( + [ + {"field": "key", "op": "=", "value": key}, + { + "field": "memory_type", + "op": "=", + "value": mem_item.metadata.memory_type, + }, + ] + ) + ) + if candidates: + exists = True + original_item_id = candidates[0] + # Crucial step: Fetch the original content for updates + # This `get` is for the *existing* memory that will be updated + original_mem_item = self.current_mem_cube.text_mem.get( + memory_id=original_item_id + ) + original_content = original_mem_item.memory + + if exists: + prepared_update_items_with_original.append( + { + "new_item": mem_item, + "original_content": original_content, + "original_item_id": original_item_id, + } + ) + else: + prepared_add_items.append(mem_item) + except Exception: logger.warning( - f"This MemoryItem {memory_id} has already been deleted." + f"This MemoryItem {memory_id} has already been deleted or an error occurred during preparation." ) continue - add_content: list[dict] = [] - add_meta: list[dict] = [] - update_content: list[dict] = [] - update_meta: list[dict] = [] - for mem_item in mem_items: - if mem_item.metadata.memory_type == WORKING_MEMORY_TYPE: - continue - key = getattr(mem_item.metadata, "key", None) or transform_name_to_key( - name=mem_item.memory - ) - exists = False - try: - text_mem = self.current_mem_cube.text_mem - if key and hasattr(text_mem, "graph_store"): - candidates = text_mem.graph_store.get_by_metadata( - [ - {"field": "memory", "op": "=", "value": key}, - { - "field": "memory_type", - "op": "=", - "value": mem_item.metadata.memory_type, - }, - ] - ) - exists = bool(candidates) - except Exception: - exists = False - payload = { - "content": f"{key}: {mem_item.memory}", - "ref_id": mem_item.id, - } - meta_dict = { - "ref_id": mem_item.id, - "id": mem_item.id, - "key": mem_item.metadata.key, - "memory": mem_item.memory, - "memory_type": mem_item.metadata.memory_type, - "status": mem_item.metadata.status, - "confidence": mem_item.metadata.confidence, - "tags": mem_item.metadata.tags, - "updated_at": getattr(mem_item.metadata, "updated_at", None) - or getattr(mem_item.metadata, "update_at", None), - } - if exists: - update_content.append(payload) - update_meta.append(meta_dict) - else: - add_content.append(payload) - add_meta.append(meta_dict) - - events = [] - if add_content: - events.append( - self.create_event_log( + # Conditional Logging: Knowledge Base (Cloud Service) vs. Playground/Default + is_cloud_env = ( + os.getenv("MEMSCHEDULER_RABBITMQ_EXCHANGE_NAME") + == "memos-memory-change" + ) + + if is_cloud_env: + # New: Knowledge Base Logging (Cloud Service) + kb_log_content = [] + for item in prepared_add_items: + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": msg.info.get("trigger_source", "Messages") + if msg.info + else "Messages", # Assuming msg.info is available and contains trigger_source + "operation": "ADD", + "memory_id": item.id, + "content": item.memory, + "original_content": None, + "source_doc_id": getattr( + item.metadata, "source_doc_id", None + ), + } + ) + for item_data in prepared_update_items_with_original: + new_item = item_data["new_item"] + kb_log_content.append( + { + "log_source": "KNOWLEDGE_BASE_LOG", + "trigger_source": msg.info.get("trigger_source", "Messages") + if msg.info + else "Messages", + "operation": "UPDATE", + "memory_id": new_item.id, + "content": new_item.memory, + "original_content": item_data[ + "original_content" + ], # Now correctly fetched + "source_doc_id": getattr( + new_item.metadata, "source_doc_id", None + ), + } + ) + + if kb_log_content: + event = self.create_event_log( + label="knowledgeBaseUpdate", + log_content=f"Knowledge Base Memory Update: {len(kb_log_content)} changes.", + user_id=msg.user_id, + mem_cube_id=msg.mem_cube_id, + mem_cube=self.current_mem_cube, + memcube_log_content=kb_log_content, + metadata=None, # Per design doc for KB logs + memory_len=len(kb_log_content), + memcube_name=self._map_memcube_name(msg.mem_cube_id), + ) + event.task_id = msg.task_id + self._submit_web_logs([event]) + else: + # Existing: Playground/Default Logging + # Reconstruct add_content/add_meta/update_content/update_meta from prepared_items + # This ensures existing logging path continues to work with pre-existing data structures + add_content_legacy: list[dict] = [] + add_meta_legacy: list[dict] = [] + update_content_legacy: list[dict] = [] + update_meta_legacy: list[dict] = [] + + for item in prepared_add_items: + key = getattr(item.metadata, "key", None) or transform_name_to_key( + name=item.memory + ) + add_content_legacy.append( + {"content": f"{key}: {item.memory}", "ref_id": item.id} + ) + add_meta_legacy.append( + { + "ref_id": item.id, + "id": item.id, + "key": item.metadata.key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), + } + ) + + for item_data in prepared_update_items_with_original: + item = item_data["new_item"] + key = getattr(item.metadata, "key", None) or transform_name_to_key( + name=item.memory + ) + update_content_legacy.append( + {"content": f"{key}: {item.memory}", "ref_id": item.id} + ) + update_meta_legacy.append( + { + "ref_id": item.id, + "id": item.id, + "key": item.metadata.key, + "memory": item.memory, + "memory_type": item.metadata.memory_type, + "status": item.metadata.status, + "confidence": item.metadata.confidence, + "tags": item.metadata.tags, + "updated_at": getattr(item.metadata, "updated_at", None) + or getattr(item.metadata, "update_at", None), + } + ) + + events = [] + if add_content_legacy: + event = self.create_event_log( label="addMemory", from_memory_type=USER_INPUT_TYPE, to_memory_type=LONG_TERM_MEMORY_TYPE, user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, mem_cube=self.current_mem_cube, - memcube_log_content=add_content, - metadata=add_meta, - memory_len=len(add_content), + memcube_log_content=add_content_legacy, + metadata=add_meta_legacy, + memory_len=len(add_content_legacy), memcube_name=self._map_memcube_name(msg.mem_cube_id), ) - ) - if update_content: - events.append( - self.create_event_log( + event.task_id = msg.task_id + events.append(event) + if update_content_legacy: + event = self.create_event_log( label="updateMemory", from_memory_type=LONG_TERM_MEMORY_TYPE, to_memory_type=LONG_TERM_MEMORY_TYPE, user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, mem_cube=self.current_mem_cube, - memcube_log_content=update_content, - metadata=update_meta, - memory_len=len(update_content), + memcube_log_content=update_content_legacy, + metadata=update_meta_legacy, + memory_len=len(update_content_legacy), memcube_name=self._map_memcube_name(msg.mem_cube_id), ) - ) - if events: - self._submit_web_logs(events) + event.task_id = msg.task_id + events.append(event) + if events: + self._submit_web_logs(events) except Exception as e: logger.error(f"Error: {e}", exc_info=True) @@ -526,7 +636,7 @@ def _process_memories_with_reader( ) def _mem_reorganize_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: - logger.info(f"Messages {messages} assigned to {MEM_READ_LABEL} handler.") + logger.info(f"Messages {messages} assigned to {MEM_ORGANIZE_LABEL} handler.") def process_message(message: ScheduleMessageItem): try: @@ -542,7 +652,7 @@ def process_message(message: ScheduleMessageItem): return logger.info( - f"Processing mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}, mem_ids={mem_ids}" + f"Processing mem_reorganize for user_id={user_id}, mem_cube_id={mem_cube_id}, mem_ids={mem_ids}" ) # Get the text memory from the mem_cube @@ -685,11 +795,11 @@ def process_message(message: ScheduleMessageItem): self._submit_web_logs([event]) logger.info( - f"Successfully processed mem_read for user_id={user_id}, mem_cube_id={mem_cube_id}" + f"Successfully processed mem_reorganize for user_id={user_id}, mem_cube_id={mem_cube_id}" ) except Exception as e: - logger.error(f"Error processing mem_read message: {e}", exc_info=True) + logger.error(f"Error processing mem_reorganize message: {e}", exc_info=True) with ContextThreadPoolExecutor(max_workers=min(8, len(messages))) as executor: futures = [executor.submit(process_message, msg) for msg in messages] @@ -748,7 +858,8 @@ def _process_memories_with_reorganize( except Exception: logger.error( - f"Error in _process_memories_with_reader: {traceback.format_exc()}", exc_info=True + f"Error in _process_memories_with_reorganize: {traceback.format_exc()}", + exc_info=True, ) def _pref_add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: diff --git a/src/memos/mem_scheduler/schemas/message_schemas.py b/src/memos/mem_scheduler/schemas/message_schemas.py index 2f406e216..87738671c 100644 --- a/src/memos/mem_scheduler/schemas/message_schemas.py +++ b/src/memos/mem_scheduler/schemas/message_schemas.py @@ -33,7 +33,6 @@ class ScheduleMessageItem(BaseModel, DictConversionMixin): item_id: str = Field(description="uuid", default_factory=lambda: str(uuid4())) - task_id: str | None = Field(default=None, description="Parent task ID, if applicable") redis_message_id: str = Field(default="", description="the message get from redis stream") user_id: str = Field(..., description="user id") mem_cube_id: str = Field(..., description="memcube id") @@ -48,6 +47,10 @@ class ScheduleMessageItem(BaseModel, DictConversionMixin): description="user name / display name (optional)", ) info: dict | None = Field(default=None, description="user custom info") + task_id: str | None = Field( + default=None, + description="Optional business-level task ID. Multiple items can share the same task_id.", + ) # Pydantic V2 model configuration model_config = ConfigDict( diff --git a/src/memos/mem_scheduler/utils/status_tracker.py b/src/memos/mem_scheduler/utils/status_tracker.py index 98d4c6a3f..9a8fa53df 100644 --- a/src/memos/mem_scheduler/utils/status_tracker.py +++ b/src/memos/mem_scheduler/utils/status_tracker.py @@ -19,7 +19,28 @@ def __init__(self, redis_client: "redis.Redis"): def _get_key(self, user_id: str) -> str: return f"memos:task_meta:{user_id}" - def task_submitted(self, task_id: str, user_id: str, task_type: str, mem_cube_id: str): + def _get_task_items_key(self, user_id: str, task_id: str) -> str: + """Get Redis key for task_id → [item_id] mapping.""" + return f"memos:task_items:{user_id}:{task_id}" + + def task_submitted( + self, + task_id: str, + user_id: str, + task_type: str, + mem_cube_id: str, + business_task_id: str | None = None, + ): + """ + Submit a new task for tracking. + + Args: + task_id: Internal item_id (UUID) + user_id: User identifier + task_type: Type of task (label) + mem_cube_id: Memory cube identifier + business_task_id: Optional business-level task ID (one task_id can have multiple item_ids) + """ key = self._get_key(user_id) payload = { "status": "waiting", @@ -27,6 +48,15 @@ def task_submitted(self, task_id: str, user_id: str, task_type: str, mem_cube_id "mem_cube_id": mem_cube_id, "submitted_at": datetime.now(timezone.utc).isoformat(), } + + # Add business_task_id to payload if provided + if business_task_id: + payload["business_task_id"] = business_task_id + # Add item_id to the task_id → [item_ids] set + task_items_key = self._get_task_items_key(user_id, business_task_id) + self.redis.sadd(task_items_key, task_id) + self.redis.expire(task_items_key, timedelta(days=7)) + self.redis.hset(key, task_id, json.dumps(payload)) self.redis.expire(key, timedelta(days=7)) @@ -86,3 +116,55 @@ def get_all_tasks_for_user(self, user_id: str) -> dict[str, dict]: key = self._get_key(user_id) all_tasks = self.redis.hgetall(key) return {tid: json.loads(t_data) for tid, t_data in all_tasks.items()} + + def get_task_status_by_business_id(self, business_task_id: str, user_id: str) -> dict | None: + """ + Get aggregated status for a business-level task_id. + + Args: + business_task_id: Business-level task ID + user_id: User identifier + + Returns: + Aggregated status dict with status determined by all item statuses: + - If any item is 'waiting' or 'in_progress' → 'in_progress' + - If all items are 'completed' → 'completed' + - If any item is 'failed' → 'failed' + Returns None if task_id not found. + """ + # Get all item_ids for this task_id + task_items_key = self._get_task_items_key(user_id, business_task_id) + item_ids = self.redis.smembers(task_items_key) + + if not item_ids: + return None + + # Get statuses for all items + key = self._get_key(user_id) + item_statuses = [] + for item_id in item_ids: + item_data_json = self.redis.hget(key, item_id) + if item_data_json: + item_data = json.loads(item_data_json) + item_statuses.append(item_data["status"]) + + if not item_statuses: + return None + + # Aggregate status + if "failed" in item_statuses: + aggregated_status = "failed" + elif "in_progress" in item_statuses or "waiting" in item_statuses: + aggregated_status = "in_progress" + elif all(s == "completed" for s in item_statuses): + aggregated_status = "completed" + else: + # Fallback + aggregated_status = "unknown" + + return { + "status": aggregated_status, + "business_task_id": business_task_id, + "item_count": len(item_ids), + "item_statuses": item_statuses, + } diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 0e95ec5fa..92ad1a3c9 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -437,7 +437,7 @@ def _schedule_memory_tasks( user_name=self.cube_id, info=add_req.info, ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item_read]) + self.mem_scheduler.submit_messages(messages=[message_item_read]) self.logger.info( f"[SingleCubeView] cube={self.cube_id} Submitted async MEM_READ: {json.dumps(mem_ids)}" ) @@ -458,7 +458,7 @@ def _schedule_memory_tasks( timestamp=datetime.utcnow(), user_name=self.cube_id, ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item_add]) + self.mem_scheduler.submit_messages(messages=[message_item_add]) def _process_pref_mem( self, @@ -489,7 +489,6 @@ def _process_pref_mem( messages_list = [add_req.messages] message_item_pref = ScheduleMessageItem( user_id=add_req.user_id, - task_id=add_req.task_id, session_id=target_session_id, mem_cube_id=self.cube_id, mem_cube=self.naive_mem_cube, @@ -497,8 +496,10 @@ def _process_pref_mem( content=json.dumps(messages_list), timestamp=datetime.utcnow(), info=add_req.info, + user_name=self.cube_id, + task_id=add_req.task_id, ) - self.mem_scheduler.memos_message_queue.submit_messages(messages=[message_item_pref]) + self.mem_scheduler.submit_messages(messages=[message_item_pref]) self.logger.info(f"[SingleCubeView] cube={self.cube_id} Submitted PREF_ADD async") except Exception as e: self.logger.error( diff --git a/src/memos/reranker/factory.py b/src/memos/reranker/factory.py index d2c50ba5e..1440704a6 100644 --- a/src/memos/reranker/factory.py +++ b/src/memos/reranker/factory.py @@ -2,6 +2,7 @@ from __future__ import annotations import json + from typing import TYPE_CHECKING, Any # Import singleton decorator