diff --git a/src/memos/mem_scheduler/schemas/message_schemas.py b/src/memos/mem_scheduler/schemas/message_schemas.py index 8b74995d4..db28f3d71 100644 --- a/src/memos/mem_scheduler/schemas/message_schemas.py +++ b/src/memos/mem_scheduler/schemas/message_schemas.py @@ -157,6 +157,7 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin): status: str | None = Field( default=None, description="Completion status of the task (e.g., 'completed', 'failed')" ) + source_doc_id: str | None = Field(default=None, description="Source document ID") def debug_info(self) -> dict[str, Any]: """Return structured debug information for logging purposes.""" diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index ca6798726..c4e4a66bd 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -295,8 +295,20 @@ def _maybe_emit_task_completion( return # messages in one batch can belong to different business task_ids; check each - task_ids = {getattr(msg, "task_id", None) for msg in messages} - task_ids.discard(None) + task_ids = set() + task_id_to_doc_id = {} + + for msg in messages: + tid = getattr(msg, "task_id", None) + if tid: + task_ids.add(tid) + # Try to capture source_doc_id for this task if we haven't already + if tid not in task_id_to_doc_id: + info = msg.info or {} + sid = info.get("source_doc_id") + if sid: + task_id_to_doc_id[tid] = sid + if not task_ids: return @@ -311,6 +323,7 @@ def _maybe_emit_task_completion( return for task_id in task_ids: + source_doc_id = task_id_to_doc_id.get(task_id) status_data = self.status_tracker.get_task_status_by_business_id( business_task_id=task_id, user_id=user_id ) @@ -332,6 +345,7 @@ def _maybe_emit_task_completion( to_memory_type="status", log_content=f"Task {task_id} completed", status="completed", + source_doc_id=source_doc_id, ) self.submit_web_logs(event) @@ -355,6 +369,7 @@ def _maybe_emit_task_completion( to_memory_type="status", log_content=f"Task {task_id} failed: {error_msg}", status="failed", + source_doc_id=source_doc_id, ) self.submit_web_logs(event) except Exception: