From 182385400a5f1271ec2b27f31f53d0af644d9ad8 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Sun, 7 Dec 2025 16:25:37 +0800 Subject: [PATCH 1/3] feat: Propagate trace_id to scheduled messages and improve context robustness - Propagate from request context to to align logs across asynchronous operations. - Update context getter functions (, , , ) to return default empty/production values instead of for improved robustness. --- src/memos/context/context.py | 10 +++++----- src/memos/mem_scheduler/base_scheduler.py | 7 +++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/memos/context/context.py b/src/memos/context/context.py index 5c8401732..f8c885ed9 100644 --- a/src/memos/context/context.py +++ b/src/memos/context/context.py @@ -119,8 +119,8 @@ def get_current_api_path() -> str | None: """ context = _request_context.get() if context: - return context.get("api_path") - return None + return context.get("api_path") or "" + return "" def get_current_env() -> str | None: @@ -129,7 +129,7 @@ def get_current_env() -> str | None: """ context = _request_context.get() if context: - return context.get("env") + return context.get("env") or "prod" return "prod" @@ -139,7 +139,7 @@ def get_current_user_type() -> str | None: """ context = _request_context.get() if context: - return context.get("user_type") + return context.get("user_type") or "opensource" return "opensource" @@ -149,7 +149,7 @@ def get_current_user_name() -> str | None: """ context = _request_context.get() if context: - return context.get("user_name") + return context.get("user_name") or "memos" return "memos" diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 79c28c32c..58765f055 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -16,6 +16,7 @@ ContextThread, RequestContext, get_current_context, + get_current_trace_id, set_request_context, ) from memos.llms.base import BaseLLM @@ -664,10 +665,16 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt if not messages: return + current_trace_id = get_current_trace_id() + immediate_msgs: list[ScheduleMessageItem] = [] queued_msgs: list[ScheduleMessageItem] = [] for msg in messages: + # propagate request trace_id when available so monitor logs align with request logs + if current_trace_id: + msg.trace_id = current_trace_id + # basic metrics and status tracking with suppress(Exception): self.metrics.task_enqueued(user_id=msg.user_id, task_type=msg.label) From 760c881c1e146a30856bf704e673db0d5473efd7 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Sun, 7 Dec 2025 16:34:50 +0800 Subject: [PATCH 2/3] Add scheduler total duration metric and keep context defaults --- src/memos/context/context.py | 10 +++---- .../task_schedule_modules/dispatcher.py | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/src/memos/context/context.py b/src/memos/context/context.py index f8c885ed9..5c8401732 100644 --- a/src/memos/context/context.py +++ b/src/memos/context/context.py @@ -119,8 +119,8 @@ def get_current_api_path() -> str | None: """ context = _request_context.get() if context: - return context.get("api_path") or "" - return "" + return context.get("api_path") + return None def get_current_env() -> str | None: @@ -129,7 +129,7 @@ def get_current_env() -> str | None: """ context = _request_context.get() if context: - return context.get("env") or "prod" + return context.get("env") return "prod" @@ -139,7 +139,7 @@ def get_current_user_type() -> str | None: """ context = _request_context.get() if context: - return context.get("user_type") or "opensource" + return context.get("user_type") return "opensource" @@ -149,7 +149,7 @@ def get_current_user_name() -> str | None: """ context = _request_context.get() if context: - return context.get("user_name") or "memos" + return context.get("user_name") return "memos" diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index b32e4588d..581f18d1a 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -210,6 +210,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): finish_time, tz=timezone.utc ).isoformat(), "exec_duration_ms": duration * 1000, + "total_duration_ms": self._calc_total_duration_ms( + finish_time, getattr(first_msg, "timestamp", None) + ), }, ) # Redis ack is handled in finally to cover failure cases @@ -243,6 +246,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): "exec_duration_ms": (finish_time - start_time) * 1000, "error_type": type(e).__name__, "error_msg": str(e), + "total_duration_ms": self._calc_total_duration_ms( + finish_time, getattr(m, "timestamp", None) + ), }, ) # Mark task as failed and remove from tracking @@ -423,6 +429,30 @@ def _handle_future_result(self, future): except Exception as e: logger.error(f"Handler execution failed: {e!s}", exc_info=True) + @staticmethod + def _calc_total_duration_ms(finish_epoch: float, enqueue_ts) -> float | None: + """ + Calculate total duration from enqueue timestamp to finish time in milliseconds. + """ + try: + enq_epoch = None + + if isinstance(enqueue_ts, (int, float)): + enq_epoch = float(enqueue_ts) + elif hasattr(enqueue_ts, "timestamp"): + dt = enqueue_ts + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + enq_epoch = dt.timestamp() + + if enq_epoch is None: + return None + + total_ms = max(0.0, finish_epoch - enq_epoch) * 1000 + return total_ms + except Exception: + return None + def execute_task( self, user_id: str, From 8ad47c546f199f4d2fb0d6914ed47e2fc52266a2 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Sun, 7 Dec 2025 17:14:39 +0800 Subject: [PATCH 3/3] Fix: Ruff UP038 in dispatcher.py --- src/memos/mem_scheduler/task_schedule_modules/dispatcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 581f18d1a..ab67c683f 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -437,7 +437,7 @@ def _calc_total_duration_ms(finish_epoch: float, enqueue_ts) -> float | None: try: enq_epoch = None - if isinstance(enqueue_ts, (int, float)): + if isinstance(enqueue_ts, int | float): enq_epoch = float(enqueue_ts) elif hasattr(enqueue_ts, "timestamp"): dt = enqueue_ts