Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/memos/mem_scheduler/schemas/message_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class ScheduleLogForWebItem(BaseModel, DictConversionMixin):
status: str | None = Field(
default=None, description="Completion status of the task (e.g., 'completed', 'failed')"
)
source_doc_id: str | None = Field(default=None, description="Source document ID")

def debug_info(self) -> dict[str, Any]:
"""Return structured debug information for logging purposes."""
Expand Down
19 changes: 17 additions & 2 deletions src/memos/mem_scheduler/task_schedule_modules/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,20 @@ def _maybe_emit_task_completion(
return

# messages in one batch can belong to different business task_ids; check each
task_ids = {getattr(msg, "task_id", None) for msg in messages}
task_ids.discard(None)
task_ids = set()
task_id_to_doc_id = {}

for msg in messages:
tid = getattr(msg, "task_id", None)
if tid:
task_ids.add(tid)
# Try to capture source_doc_id for this task if we haven't already
if tid not in task_id_to_doc_id:
info = msg.info or {}
sid = info.get("source_doc_id")
if sid:
task_id_to_doc_id[tid] = sid

if not task_ids:
return

Expand All @@ -311,6 +323,7 @@ def _maybe_emit_task_completion(
return

for task_id in task_ids:
source_doc_id = task_id_to_doc_id.get(task_id)
status_data = self.status_tracker.get_task_status_by_business_id(
business_task_id=task_id, user_id=user_id
)
Expand All @@ -332,6 +345,7 @@ def _maybe_emit_task_completion(
to_memory_type="status",
log_content=f"Task {task_id} completed",
status="completed",
source_doc_id=source_doc_id,
)
self.submit_web_logs(event)

Expand All @@ -355,6 +369,7 @@ def _maybe_emit_task_completion(
to_memory_type="status",
log_content=f"Task {task_id} failed: {error_msg}",
status="failed",
source_doc_id=source_doc_id,
)
self.submit_web_logs(event)
except Exception:
Expand Down