From 4b3fe4b2e3bdbe2b7417a2158f4eee6c76658ebe Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 3 Dec 2025 18:07:39 +0800 Subject: [PATCH 1/2] Fix dequeue timestamp logging for pydantic models --- src/memos/mem_scheduler/base_scheduler.py | 5 +++-- .../mem_scheduler/task_schedule_modules/dispatcher.py | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 5720939e0..916f3bc98 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -773,7 +773,7 @@ def _message_consumer(self) -> None: for msg in messages: enqueue_ts_obj = getattr(msg, "timestamp", None) enqueue_epoch = None - if isinstance(enqueue_ts_obj, int | float): + if isinstance(enqueue_ts_obj, (int, float)): enqueue_epoch = float(enqueue_ts_obj) elif hasattr(enqueue_ts_obj, "timestamp"): dt = enqueue_ts_obj @@ -785,7 +785,8 @@ def _message_consumer(self) -> None: if enqueue_epoch is not None: queue_wait_ms = max(0.0, now - enqueue_epoch) * 1000 - msg.dequeue_ts = now + # Avoid pydantic attribute enforcement + object.__setattr__(msg, "_dequeue_ts", now) emit_monitor_event( "dequeue", msg, diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index a2d01df6b..f1acb30cd 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -149,7 +149,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): enq_ts = getattr(first_msg, "timestamp", None) # Path 1: epoch seconds (preferred) - if isinstance(enq_ts, int | float): + if isinstance(enq_ts, (int, float)): enq_epoch = float(enq_ts) # Path 2: datetime -> normalize to UTC epoch @@ -166,9 +166,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): wait_sec = max(0.0, now - enq_epoch) self.metrics.observe_task_wait_duration(wait_sec, m.user_id, m.label) - dequeue_ts = getattr(first_msg, "dequeue_ts", None) + dequeue_ts = getattr(first_msg, "_dequeue_ts", None) start_delay_ms = None - if isinstance(dequeue_ts, int | float): + if isinstance(dequeue_ts, (int, float)): start_delay_ms = max(0.0, start_time - dequeue_ts) * 1000 emit_monitor_event( @@ -180,7 +180,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): "enqueue_ts": to_iso(enq_ts), "dequeue_ts": to_iso( datetime.fromtimestamp(dequeue_ts, tz=timezone.utc) - if isinstance(dequeue_ts, int | float) + if isinstance(dequeue_ts, (int, float)) else None ), }, From 59447aca662594340af0ad1ab5f99a7bf43c15f3 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Wed, 3 Dec 2025 18:13:20 +0800 Subject: [PATCH 2/2] Address ruff UP038 warnings in monitor events --- src/memos/mem_scheduler/base_scheduler.py | 2 +- src/memos/mem_scheduler/task_schedule_modules/dispatcher.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 916f3bc98..62e1d0242 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -773,7 +773,7 @@ def _message_consumer(self) -> None: for msg in messages: enqueue_ts_obj = getattr(msg, "timestamp", None) enqueue_epoch = None - if isinstance(enqueue_ts_obj, (int, float)): + if isinstance(enqueue_ts_obj, int | float): enqueue_epoch = float(enqueue_ts_obj) elif hasattr(enqueue_ts_obj, "timestamp"): dt = enqueue_ts_obj diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index f1acb30cd..ade2bbfbf 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -149,7 +149,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): enq_ts = getattr(first_msg, "timestamp", None) # Path 1: epoch seconds (preferred) - if isinstance(enq_ts, (int, float)): + if isinstance(enq_ts, int | float): enq_epoch = float(enq_ts) # Path 2: datetime -> normalize to UTC epoch @@ -168,7 +168,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): dequeue_ts = getattr(first_msg, "_dequeue_ts", None) start_delay_ms = None - if isinstance(dequeue_ts, (int, float)): + if isinstance(dequeue_ts, int | float): start_delay_ms = max(0.0, start_time - dequeue_ts) * 1000 emit_monitor_event( @@ -180,7 +180,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): "enqueue_ts": to_iso(enq_ts), "dequeue_ts": to_iso( datetime.fromtimestamp(dequeue_ts, tz=timezone.utc) - if isinstance(dequeue_ts, (int, float)) + if isinstance(dequeue_ts, int | float) else None ), },