diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 86718ec82..25d0461c2 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -168,6 +168,17 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: # Process each message in the batch for msg in batch: + if ( + not (msg.user_name or "").strip() + or (msg.user_name or "").strip() == "memosdefault" + ): + logger.warning( + "[AddConsumer] Dropping message with empty or default user_name; user_id=%s mem_cube_id=%s content=%s", + msg.user_id, + msg.mem_cube_id, + msg.content, + ) + continue prepared_add_items, prepared_update_items_with_original = ( self.log_add_messages(msg=msg) ) @@ -212,6 +223,17 @@ def _query_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: continue try: for msg in batch: + if ( + not (msg.user_name or "").strip() + or (msg.user_name or "").strip() == "memosdefault" + ): + logger.warning( + "[QueryConsumer] Dropping message with empty or default user_name; user_id=%s mem_cube_id=%s content=%s", + msg.user_id, + msg.mem_cube_id, + msg.content, + ) + continue event = self.create_event_log( label="addMessage", from_memory_type=USER_INPUT_TYPE, @@ -257,6 +279,17 @@ def _answer_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: continue try: for msg in batch: + if ( + not (msg.user_name or "").strip() + or (msg.user_name or "").strip() == "memosdefault" + ): + logger.warning( + "[AnswerConsumer] Dropping message with empty or default user_name; user_id=%s mem_cube_id=%s content=%s", + msg.user_id, + msg.mem_cube_id, + msg.content, + ) + continue event = self.create_event_log( label="addMessage", from_memory_type=USER_INPUT_TYPE, @@ -536,6 +569,17 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: # Process each message in the batch for msg in batch: + if ( + not (msg.user_name or "").strip() + or (msg.user_name or "").strip() == "memosdefault" + ): + logger.warning( + "[AddConsumer] Dropping message with empty or default user_name; user_id=%s mem_cube_id=%s content=%s", + msg.user_id, + msg.mem_cube_id, + msg.content, + ) + continue prepared_add_items, prepared_update_items_with_original = ( self.log_add_messages(msg=msg) ) @@ -738,7 +782,15 @@ def process_message(message: ScheduleMessageItem): return content = message.content - user_name = message.user_name + user_name = (message.user_name or "").strip() + if not user_name or user_name == "memosdefault": + logger.warning( + "[MemRead] Dropping message with empty or default user_name; user_id=%s mem_cube_id=%s mem_ids=%s", + user_id, + mem_cube_id, + content, + ) + return info = message.info or {} # Parse the memory IDs from content @@ -1041,7 +1093,15 @@ def process_message(message: ScheduleMessageItem): ) return content = message.content - user_name = message.user_name + user_name = (message.user_name or "").strip() + if not user_name or user_name == "memosdefault": + logger.warning( + "[MemReorganize] Dropping message with empty or default user_name; user_id=%s mem_cube_id=%s mem_ids=%s", + user_id, + mem_cube_id, + content, + ) + return # Parse the memory IDs from content mem_ids = json.loads(content) if isinstance(content, str) else content @@ -1264,6 +1324,15 @@ def _pref_add_message_consumer(self, messages: list[ScheduleMessageItem]) -> Non def process_message(message: ScheduleMessageItem): try: + user_name = (message.user_name or "").strip() + if not user_name or user_name == "memosdefault": + logger.warning( + "[PrefAdd] Dropping message with empty or default user_name; user_id=%s mem_cube_id=%s content=%s", + message.user_id, + message.mem_cube_id, + message.content, + ) + return mem_cube = self.current_mem_cube if mem_cube is None: logger.warning(