diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 8f8ac8b3b..d628b10a8 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -716,7 +716,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt # emit enqueue events for consistency for m in immediate_msgs: emit_monitor_event( - "enqueue", m, {"enqueue_ts": to_iso(getattr(m, "timestamp", None))} + "enqueue", + m, + { + "enqueue_ts": to_iso(getattr(m, "timestamp", None)), + "event_duration_ms": 0, + "total_duration_ms": 0, + }, ) # simulate dequeue for immediately dispatched messages so monitor logs stay complete @@ -745,6 +751,8 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt "enqueue_ts": to_iso(enqueue_ts_obj), "dequeue_ts": datetime.fromtimestamp(now, tz=timezone.utc).isoformat(), "queue_wait_ms": queue_wait_ms, + "event_duration_ms": queue_wait_ms, + "total_duration_ms": queue_wait_ms, }, ) self.metrics.task_dequeued(user_id=m.user_id, task_type=m.label) @@ -923,6 +931,8 @@ def _message_consumer(self) -> None: now, tz=timezone.utc ).isoformat(), "queue_wait_ms": queue_wait_ms, + "event_duration_ms": queue_wait_ms, + "total_duration_ms": queue_wait_ms, }, ) self.metrics.task_dequeued(user_id=msg.user_id, task_type=msg.label) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 928b2f5bd..e3ce0d4e9 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -185,6 +185,8 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): if isinstance(dequeue_ts, int | float) else None ), + "event_duration_ms": start_delay_ms, + "total_duration_ms": self._calc_total_duration_ms(start_time, enq_ts), }, ) @@ -210,6 +212,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): finish_time, tz=timezone.utc ).isoformat(), "exec_duration_ms": duration * 1000, + "event_duration_ms": duration * 1000, "total_duration_ms": self._calc_total_duration_ms( finish_time, getattr(first_msg, "timestamp", None) ), @@ -244,6 +247,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): finish_time, tz=timezone.utc ).isoformat(), "exec_duration_ms": (finish_time - start_time) * 1000, + "event_duration_ms": (finish_time - start_time) * 1000, "error_type": type(e).__name__, "error_msg": str(e), "total_duration_ms": self._calc_total_duration_ms( diff --git a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py index 7dc19d01d..a01bc3fce 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py @@ -93,8 +93,14 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt if len(messages) < 1: logger.error("Submit empty") elif len(messages) == 1: + if getattr(messages[0], "timestamp", None) is None: + messages[0].timestamp = get_utc_now() enqueue_ts = to_iso(getattr(messages[0], "timestamp", None)) - emit_monitor_event("enqueue", messages[0], {"enqueue_ts": enqueue_ts}) + emit_monitor_event( + "enqueue", + messages[0], + {"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0}, + ) self.memos_message_queue.put(messages[0]) else: user_cube_groups = group_messages_by_user_and_mem_cube(messages) @@ -118,7 +124,15 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt continue enqueue_ts = to_iso(getattr(message, "timestamp", None)) - emit_monitor_event("enqueue", message, {"enqueue_ts": enqueue_ts}) + emit_monitor_event( + "enqueue", + message, + { + "enqueue_ts": enqueue_ts, + "event_duration_ms": 0, + "total_duration_ms": 0, + }, + ) self.memos_message_queue.put(message) logger.info( f"Submitted message to local queue: {message.label} - {message.content}"