diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index 6913429c3..ed8171ade 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -424,9 +424,12 @@ def ack_message( redis_message_id, message: ScheduleMessageItem | None, ) -> None: - stream_key = self.get_stream_key( - user_id=user_id, mem_cube_id=mem_cube_id, task_label=task_label - ) + if message and hasattr(message, "stream_key") and message.stream_key: + stream_key = message.stream_key + else: + stream_key = self.get_stream_key( + user_id=user_id, mem_cube_id=mem_cube_id, task_label=task_label + ) # No-op if not connected or message doesn't come from Redis if not self._redis_conn: logger.debug( @@ -574,36 +577,26 @@ def _read_new_messages_batch( try: res_list = pipe.execute() except Exception as e: - logger.error(f"Pipeline xreadgroup failed: {e}") - # Fallback to sequential non-blocking reads - res_list = [] - for stream_key in stream_keys: - try: - res = self._redis_conn.xreadgroup( - self.consumer_group, - self.consumer_name, - {stream_key: ">"}, - count=stream_quotas.get(stream_key), - block=None, - ) - except Exception as read_err: - err_msg = str(read_err).lower() - if "nogroup" in err_msg or "no such key" in err_msg: + err_msg = str(e).lower() + if "nogroup" in err_msg or "no such key" in err_msg: + # Fallback to sequential non-blocking reads + res_list = [] + for stream_key in stream_keys: + try: self._ensure_consumer_group(stream_key=stream_key) - try: - res = self._redis_conn.xreadgroup( - self.consumer_group, - self.consumer_name, - {stream_key: ">"}, - count=stream_quotas.get(stream_key), - block=None, - ) - except Exception: - res = [] - else: - logger.error(f"{read_err}", stack_info=True) - res = [] - res_list.append(res) + res = self._redis_conn.xreadgroup( + self.consumer_group, + self.consumer_name, + {stream_key: ">"}, + count=stream_quotas.get(stream_key), + block=None, + ) + res_list.append(res) + except Exception: + res_list.append([]) + else: + logger.error(f"Pipeline xreadgroup failed: {e}") + res_list = [] out: dict[str, list[tuple[str, list[tuple[str, dict]]]]] = {} for stream_key, res in zip(stream_keys, res_list, strict=False): @@ -707,48 +700,26 @@ def _batch_claim_pending_messages( try: results = pipe.execute() except Exception as e: - logger.error(f"Pipeline xautoclaim failed: {e}") - # Fallback: attempt sequential xautoclaim for robustness - results = [] - for stream_key, need_count, label in claims_spec: - try: - res = self._redis_conn.xautoclaim( - name=stream_key, - groupname=self.consumer_group, - consumername=self.consumer_name, - min_idle_time=self.orchestrator.get_task_idle_min(task_label=label), - start_id="0-0", - count=need_count, - justid=False, - ) - results.append(res) - except Exception as se: - err_msg = str(se).lower() - if "nogroup" in err_msg or "no such key" in err_msg: - logger.warning( - f"Sequential xautoclaim failed for '{stream_key}': {se}. Retrying with _ensure_consumer_group." + err_msg = str(e).lower() + if "nogroup" in err_msg or "no such key" in err_msg: + # Fallback: attempt sequential xautoclaim for robustness + for stream_key, need_count, label in claims_spec: + try: + self._ensure_consumer_group(stream_key=stream_key) + res = self._redis_conn.xautoclaim( + name=stream_key, + groupname=self.consumer_group, + consumername=self.consumer_name, + min_idle_time=self.orchestrator.get_task_idle_min(task_label=label), + start_id="0-0", + count=need_count, + justid=False, ) - with contextlib.suppress(Exception): - self._ensure_consumer_group(stream_key=stream_key) - try: - res = self._redis_conn.xautoclaim( - name=stream_key, - groupname=self.consumer_group, - consumername=self.consumer_name, - min_idle_time=self.orchestrator.get_task_idle_min(task_label=label), - start_id="0-0", - count=need_count, - justid=False, - ) - results.append(res) - except Exception as retry_err: - logger.warning( - f"Retry sequential xautoclaim failed for '{stream_key}': {retry_err}" - ) - results.append(None) - else: - logger.warning(f"Sequential xautoclaim failed for '{stream_key}': {se}") - results.append(None) + results.append(res) + except Exception: + continue + else: + logger.error(f"Pipeline xautoclaim failed: {e}") claimed_pairs: list[tuple[str, list[tuple[str, dict]]]] = [] for (stream_key, _need_count, _label), claimed_result in zip(