From 071089d1b4212f8eba09320d9e215a0173e8cf16 Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Mon, 8 Dec 2025 19:28:18 +0800 Subject: [PATCH 1/2] feat(monitor): add event_duration_ms and total_duration_ms to MONITOR_EVENT logs - Add duration tracking for enqueue, dequeue, start, and finish events - Handle both standard and retry/timeout scenarios - Preserve existing log fields for backward compatibility --- src/memos/mem_scheduler/base_scheduler.py | 12 +++++++++++- .../task_schedule_modules/dispatcher.py | 4 ++++ .../task_schedule_modules/task_queue.py | 16 ++++++++++++++-- 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 8f8ac8b3b..d628b10a8 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -716,7 +716,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt # emit enqueue events for consistency for m in immediate_msgs: emit_monitor_event( - "enqueue", m, {"enqueue_ts": to_iso(getattr(m, "timestamp", None))} + "enqueue", + m, + { + "enqueue_ts": to_iso(getattr(m, "timestamp", None)), + "event_duration_ms": 0, + "total_duration_ms": 0, + }, ) # simulate dequeue for immediately dispatched messages so monitor logs stay complete @@ -745,6 +751,8 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt "enqueue_ts": to_iso(enqueue_ts_obj), "dequeue_ts": datetime.fromtimestamp(now, tz=timezone.utc).isoformat(), "queue_wait_ms": queue_wait_ms, + "event_duration_ms": queue_wait_ms, + "total_duration_ms": queue_wait_ms, }, ) self.metrics.task_dequeued(user_id=m.user_id, task_type=m.label) @@ -923,6 +931,8 @@ def _message_consumer(self) -> None: now, tz=timezone.utc ).isoformat(), "queue_wait_ms": queue_wait_ms, + "event_duration_ms": queue_wait_ms, + "total_duration_ms": queue_wait_ms, }, ) self.metrics.task_dequeued(user_id=msg.user_id, task_type=msg.label) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 928b2f5bd..44365e8ab 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -185,6 +185,8 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): if isinstance(dequeue_ts, int | float) else None ), + "event_duration_ms": start_delay_ms, + "total_duration_ms": wait_sec * 1000, }, ) @@ -210,6 +212,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): finish_time, tz=timezone.utc ).isoformat(), "exec_duration_ms": duration * 1000, + "event_duration_ms": duration * 1000, "total_duration_ms": self._calc_total_duration_ms( finish_time, getattr(first_msg, "timestamp", None) ), @@ -244,6 +247,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): finish_time, tz=timezone.utc ).isoformat(), "exec_duration_ms": (finish_time - start_time) * 1000, + "event_duration_ms": (finish_time - start_time) * 1000, "error_type": type(e).__name__, "error_msg": str(e), "total_duration_ms": self._calc_total_duration_ms( diff --git a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py index 7dc19d01d..030b2aff0 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py @@ -94,7 +94,11 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt logger.error("Submit empty") elif len(messages) == 1: enqueue_ts = to_iso(getattr(messages[0], "timestamp", None)) - emit_monitor_event("enqueue", messages[0], {"enqueue_ts": enqueue_ts}) + emit_monitor_event( + "enqueue", + messages[0], + {"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0}, + ) self.memos_message_queue.put(messages[0]) else: user_cube_groups = group_messages_by_user_and_mem_cube(messages) @@ -118,7 +122,15 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt continue enqueue_ts = to_iso(getattr(message, "timestamp", None)) - emit_monitor_event("enqueue", message, {"enqueue_ts": enqueue_ts}) + emit_monitor_event( + "enqueue", + message, + { + "enqueue_ts": enqueue_ts, + "event_duration_ms": 0, + "total_duration_ms": 0, + }, + ) self.memos_message_queue.put(message) logger.info( f"Submitted message to local queue: {message.label} - {message.content}" From 571994c4f8239eb2e9307c1bd42e932dfecb2aac Mon Sep 17 00:00:00 2001 From: "glin1993@outlook.com" <> Date: Mon, 8 Dec 2025 19:52:21 +0800 Subject: [PATCH 2/2] fix(monitor): improve duration calculation accuracy and robustness - Use start_time for start event duration calculation to ensure consistency - Add timestamp backfill for single message submission in task queue - Ensure robust handling of missing timestamps --- src/memos/mem_scheduler/task_schedule_modules/dispatcher.py | 2 +- src/memos/mem_scheduler/task_schedule_modules/task_queue.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 44365e8ab..e3ce0d4e9 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -186,7 +186,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): else None ), "event_duration_ms": start_delay_ms, - "total_duration_ms": wait_sec * 1000, + "total_duration_ms": self._calc_total_duration_ms(start_time, enq_ts), }, ) diff --git a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py index 030b2aff0..a01bc3fce 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py @@ -93,6 +93,8 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt if len(messages) < 1: logger.error("Submit empty") elif len(messages) == 1: + if getattr(messages[0], "timestamp", None) is None: + messages[0].timestamp = get_utc_now() enqueue_ts = to_iso(getattr(messages[0], "timestamp", None)) emit_monitor_event( "enqueue",