From 623da17ac14502fbdc5553cf272bea58d8df895b Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 2 Dec 2025 19:12:05 +0800 Subject: [PATCH 1/7] feat: timer add log args --- src/memos/llms/openai.py | 2 +- src/memos/utils.py | 39 +++++++++++++++++++++++++++++++-------- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/src/memos/llms/openai.py b/src/memos/llms/openai.py index 19d7a60fe..c45038e9d 100644 --- a/src/memos/llms/openai.py +++ b/src/memos/llms/openai.py @@ -28,7 +28,7 @@ def __init__(self, config: OpenAILLMConfig): ) logger.info("OpenAI LLM instance initialized") - @timed(log=True, log_prefix="OpenAI LLM") + @timed(log=True, log_prefix="OpenAI LLM", log_args=["model_name_or_path"]) def generate(self, messages: MessageList, **kwargs) -> str: """Generate a response from OpenAI LLM, optionally overriding generation params.""" response = self.client.chat.completions.create( diff --git a/src/memos/utils.py b/src/memos/utils.py index 4b1a59834..a330f0e79 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -1,3 +1,4 @@ +import functools import time from memos.log import get_logger @@ -6,20 +7,42 @@ logger = get_logger(__name__) -def timed(func=None, *, log=True, log_prefix=""): - """Decorator to measure and optionally log time of retrieval steps. - - Can be used as @timed or @timed(log=True) +def timed(func=None, *, log=True, log_prefix="", log_args=None): + """ + Parameters: + - log: enable timing logs (default True) + - log_prefix: prefix; falls back to function name + - log_args: names to include in logs (str or list/tuple of str). + Value priority: kwargs → args[0].config. (if available). + Non-string items are ignored. + + Examples: + - @timed(log=True, log_prefix="OpenAI LLM", log_args=["model_name_or_path", "temperature"]) + - @timed(log=True, log_prefix="OpenAI LLM", log_args=["temperature"]) + - @timed() # defaults """ def decorator(fn): + @functools.wraps(fn) def wrapper(*args, **kwargs): start = time.perf_counter() result = fn(*args, **kwargs) - elapsed = time.perf_counter() - start - elapsed_ms = elapsed * 1000.0 - if log: - logger.info(f"[TIMER] {log_prefix or fn.__name__} took {elapsed_ms:.0f} ms") + elapsed_ms = (time.perf_counter() - start) * 1000.0 + ctx_str = "" + + if log is not True: + return result + + if log_args: + ctx_parts = [] + for key in log_args: + val = kwargs.get(key) + ctx_parts.append(f"{key}={val}") + ctx_str = f" [{', '.join(ctx_parts)}]" + logger.info( + f"[TIMER] {log_prefix or fn.__name__} took {elapsed_ms:.0f} ms, args: {ctx_str}" + ) + return result return wrapper From d3d7602e028edf0fbae4ac7ee0a7b5e9ea0ce97a Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 2 Dec 2025 19:15:12 +0800 Subject: [PATCH 2/7] feat: timer add log args --- src/memos/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/memos/utils.py b/src/memos/utils.py index a330f0e79..bdde53904 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -1,4 +1,3 @@ -import functools import time from memos.log import get_logger @@ -23,7 +22,6 @@ def timed(func=None, *, log=True, log_prefix="", log_args=None): """ def decorator(fn): - @functools.wraps(fn) def wrapper(*args, **kwargs): start = time.perf_counter() result = fn(*args, **kwargs) From 51ec395fda1fe327243fd88786d69028168b7b5a Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Tue, 2 Dec 2025 19:56:19 +0800 Subject: [PATCH 3/7] feat: timer add log args --- src/memos/embedders/universal_api.py | 6 +++++- src/memos/reranker/http_bge.py | 4 +++- src/memos/utils.py | 17 ++++++++++++----- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index f39ffaa58..79a5d9ea6 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -30,7 +30,11 @@ def __init__(self, config: UniversalAPIEmbedderConfig): else: raise ValueError(f"Embeddings unsupported provider: {self.provider}") - @timed(log=True, log_prefix="model_timed_embedding") + @timed( + log=True, + log_prefix="model_timed_embedding", + log_extra_args={"model_name_or_path": "text-embedding-3-large"}, + ) def embed(self, texts: list[str]) -> list[list[float]]: if self.provider == "openai" or self.provider == "azure": try: diff --git a/src/memos/reranker/http_bge.py b/src/memos/reranker/http_bge.py index 764b53032..29f41e38f 100644 --- a/src/memos/reranker/http_bge.py +++ b/src/memos/reranker/http_bge.py @@ -119,7 +119,9 @@ def __init__( self.warn_unknown_filter_keys = bool(warn_unknown_filter_keys) self._warned_missing_keys: set[str] = set() - @timed(log=True, log_prefix="model_timed_rerank") + @timed( + log=True, log_prefix="model_timed_rerank", log_extra_args={"model_name_or_path": "reranker"} + ) def rerank( self, query: str, diff --git a/src/memos/utils.py b/src/memos/utils.py index bdde53904..6671d88b7 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -6,7 +6,7 @@ logger = get_logger(__name__) -def timed(func=None, *, log=True, log_prefix="", log_args=None): +def timed(func=None, *, log=True, log_prefix="", log_args=None, log_extra_args=None): """ Parameters: - log: enable timing logs (default True) @@ -27,19 +27,26 @@ def wrapper(*args, **kwargs): result = fn(*args, **kwargs) elapsed_ms = (time.perf_counter() - start) * 1000.0 ctx_str = "" + ctx_parts = [] if log is not True: return result if log_args: - ctx_parts = [] for key in log_args: val = kwargs.get(key) ctx_parts.append(f"{key}={val}") ctx_str = f" [{', '.join(ctx_parts)}]" - logger.info( - f"[TIMER] {log_prefix or fn.__name__} took {elapsed_ms:.0f} ms, args: {ctx_str}" - ) + + if log_extra_args: + ctx_parts.extend([f"{key}={val}" for key, val in log_extra_args.items()]) + + if ctx_parts: + ctx_str = f" [{', '.join(ctx_parts)}]" + + logger.info( + f"[TIMER] {log_prefix or fn.__name__} took {elapsed_ms:.0f} ms, args: {ctx_str}" + ) return result From 9f67440b46ef9e3073f2d468e52819b0ef531d41 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Wed, 3 Dec 2025 17:06:34 +0800 Subject: [PATCH 4/7] feat: add openai model log --- src/memos/llms/openai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/llms/openai.py b/src/memos/llms/openai.py index c45038e9d..f4ebf45c7 100644 --- a/src/memos/llms/openai.py +++ b/src/memos/llms/openai.py @@ -55,7 +55,7 @@ def generate(self, messages: MessageList, **kwargs) -> str: return reasoning_content + response_content return response_content - @timed(log=True, log_prefix="OpenAI LLM") + @timed(log=True, log_prefix="OpenAI LLM", log_args=["model_name_or_path"]) def generate_stream(self, messages: MessageList, **kwargs) -> Generator[str, None, None]: """Stream response from OpenAI LLM with optional reasoning support.""" if kwargs.get("tools"): From 934062300085314083c0e391096327ad7f0a8e4c Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Thu, 4 Dec 2025 11:03:21 +0800 Subject: [PATCH 5/7] feat: add timed_with_status --- src/memos/utils.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/memos/utils.py b/src/memos/utils.py index 6671d88b7..a320ed1cd 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -6,19 +6,13 @@ logger = get_logger(__name__) -def timed(func=None, *, log=True, log_prefix="", log_args=None, log_extra_args=None): +def timed_with_status(func=None, *, log=True, log_prefix="", log_args=None, log_extra_args=None): """ Parameters: - log: enable timing logs (default True) - log_prefix: prefix; falls back to function name - log_args: names to include in logs (str or list/tuple of str). - Value priority: kwargs → args[0].config. (if available). - Non-string items are ignored. - - Examples: - - @timed(log=True, log_prefix="OpenAI LLM", log_args=["model_name_or_path", "temperature"]) - - @timed(log=True, log_prefix="OpenAI LLM", log_args=["temperature"]) - - @timed() # defaults + - log_extra_args: extra arguments to include in logs (dict). """ def decorator(fn): @@ -52,6 +46,27 @@ def wrapper(*args, **kwargs): return wrapper + if func is None: + return decorator + return decorator(func) + + +def timed(func=None, *, log=True): + def decorator(fn): + def wrapper(*args, **kwargs): + start = time.perf_counter() + result = fn(*args, **kwargs) + elapsed_ms = (time.perf_counter() - start) * 1000.0 + + if log is not True: + return result + + logger.info(f"[TIMER] {fn.__name__} took {elapsed_ms:.0f} ms") + + return result + + return wrapper + # Handle both @timed and @timed(log=True) cases if func is None: return decorator From 854741d0eaea90e01dfa966cd0987a90e8e9ff23 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Thu, 4 Dec 2025 14:37:04 +0800 Subject: [PATCH 6/7] feat: add openai model log --- src/memos/embedders/universal_api.py | 5 +- src/memos/llms/openai.py | 6 +- src/memos/reranker/http_bge.py | 106 +++++++++++++-------------- src/memos/utils.py | 69 ++++++++++++----- 4 files changed, 105 insertions(+), 81 deletions(-) diff --git a/src/memos/embedders/universal_api.py b/src/memos/embedders/universal_api.py index 79a5d9ea6..886d091d1 100644 --- a/src/memos/embedders/universal_api.py +++ b/src/memos/embedders/universal_api.py @@ -4,7 +4,7 @@ from memos.configs.embedder import UniversalAPIEmbedderConfig from memos.embedders.base import BaseEmbedder from memos.log import get_logger -from memos.utils import timed +from memos.utils import timed_with_status logger = get_logger(__name__) @@ -30,8 +30,7 @@ def __init__(self, config: UniversalAPIEmbedderConfig): else: raise ValueError(f"Embeddings unsupported provider: {self.provider}") - @timed( - log=True, + @timed_with_status( log_prefix="model_timed_embedding", log_extra_args={"model_name_or_path": "text-embedding-3-large"}, ) diff --git a/src/memos/llms/openai.py b/src/memos/llms/openai.py index f4ebf45c7..3259edc1b 100644 --- a/src/memos/llms/openai.py +++ b/src/memos/llms/openai.py @@ -12,7 +12,7 @@ from memos.llms.utils import remove_thinking_tags from memos.log import get_logger from memos.types import MessageList -from memos.utils import timed +from memos.utils import timed_with_status logger = get_logger(__name__) @@ -28,7 +28,7 @@ def __init__(self, config: OpenAILLMConfig): ) logger.info("OpenAI LLM instance initialized") - @timed(log=True, log_prefix="OpenAI LLM", log_args=["model_name_or_path"]) + @timed_with_status(log_prefix="OpenAI LLM", log_args=["model_name_or_path"]) def generate(self, messages: MessageList, **kwargs) -> str: """Generate a response from OpenAI LLM, optionally overriding generation params.""" response = self.client.chat.completions.create( @@ -55,7 +55,7 @@ def generate(self, messages: MessageList, **kwargs) -> str: return reasoning_content + response_content return response_content - @timed(log=True, log_prefix="OpenAI LLM", log_args=["model_name_or_path"]) + @timed_with_status(log=True, log_prefix="OpenAI LLM", log_args=["model_name_or_path"]) def generate_stream(self, messages: MessageList, **kwargs) -> Generator[str, None, None]: """Stream response from OpenAI LLM with optional reasoning support.""" if kwargs.get("tools"): diff --git a/src/memos/reranker/http_bge.py b/src/memos/reranker/http_bge.py index 29f41e38f..4e9054f1e 100644 --- a/src/memos/reranker/http_bge.py +++ b/src/memos/reranker/http_bge.py @@ -9,7 +9,7 @@ import requests from memos.log import get_logger -from memos.utils import timed +from memos.utils import timed_with_status from .base import BaseReranker from .concat import concat_original_source @@ -119,8 +119,12 @@ def __init__( self.warn_unknown_filter_keys = bool(warn_unknown_filter_keys) self._warned_missing_keys: set[str] = set() - @timed( - log=True, log_prefix="model_timed_rerank", log_extra_args={"model_name_or_path": "reranker"} + @timed_with_status( + log_prefix="model_timed_rerank", + log_extra_args={"model_name_or_path": "reranker"}, + fallback=lambda exc, self, query, graph_results, top_k, *a, **kw: [ + (item, 0.0) for item in graph_results[:top_k] + ], ) def rerank( self, @@ -150,6 +154,7 @@ def rerank( list[tuple[TextualMemoryItem, float]] Re-ranked items with scores, sorted descending by score. """ + if not graph_results: return [] @@ -173,63 +178,54 @@ def rerank( headers = {"Content-Type": "application/json", **self.headers_extra} payload = {"model": self.model, "query": query, "documents": documents} - try: - # Make the HTTP request to the reranker service - resp = requests.post( - self.reranker_url, headers=headers, json=payload, timeout=self.timeout - ) - resp.raise_for_status() - data = resp.json() - - scored_items: list[tuple[TextualMemoryItem, float]] = [] - - if "results" in data: - # Format: - # dict("results": [{"index": int, "relevance_score": float}, - # ...]) - rows = data.get("results", []) - for r in rows: - idx = r.get("index") - # The returned index refers to 'documents' (i.e., our 'pairs' order), - # so we must map it back to the original graph_results index. - if isinstance(idx, int) and 0 <= idx < len(graph_results): - raw_score = float(r.get("relevance_score", r.get("score", 0.0))) - item = graph_results[idx] - # generic boost - score = self._apply_boost_generic(item, raw_score, search_priority) - scored_items.append((item, score)) - - scored_items.sort(key=lambda x: x[1], reverse=True) - return scored_items[: min(top_k, len(scored_items))] - - elif "data" in data: - # Format: {"data": [{"score": float}, ...]} aligned by list order - rows = data.get("data", []) - # Build a list of scores aligned with our 'documents' (pairs) - score_list = [float(r.get("score", 0.0)) for r in rows] - - if len(score_list) < len(graph_results): - score_list += [0.0] * (len(graph_results) - len(score_list)) - elif len(score_list) > len(graph_results): - score_list = score_list[: len(graph_results)] - - scored_items = [] - for item, raw_score in zip(graph_results, score_list, strict=False): + # Make the HTTP request to the reranker service + resp = requests.post(self.reranker_url, headers=headers, json=payload, timeout=self.timeout) + resp.raise_for_status() + data = resp.json() + + scored_items: list[tuple[TextualMemoryItem, float]] = [] + + if "results" in data: + # Format: + # dict("results": [{"index": int, "relevance_score": float}, + # ...]) + rows = data.get("results", []) + for r in rows: + idx = r.get("index") + # The returned index refers to 'documents' (i.e., our 'pairs' order), + # so we must map it back to the original graph_results index. + if isinstance(idx, int) and 0 <= idx < len(graph_results): + raw_score = float(r.get("relevance_score", r.get("score", 0.0))) + item = graph_results[idx] + # generic boost score = self._apply_boost_generic(item, raw_score, search_priority) scored_items.append((item, score)) - scored_items.sort(key=lambda x: x[1], reverse=True) - return scored_items[: min(top_k, len(scored_items))] + scored_items.sort(key=lambda x: x[1], reverse=True) + return scored_items[: min(top_k, len(scored_items))] + + elif "data" in data: + # Format: {"data": [{"score": float}, ...]} aligned by list order + rows = data.get("data", []) + # Build a list of scores aligned with our 'documents' (pairs) + score_list = [float(r.get("score", 0.0)) for r in rows] + + if len(score_list) < len(graph_results): + score_list += [0.0] * (len(graph_results) - len(score_list)) + elif len(score_list) > len(graph_results): + score_list = score_list[: len(graph_results)] - else: - # Unexpected response schema: return a 0.0-scored fallback of the first top_k valid docs - # Note: we use 'pairs' to keep alignment with valid (string) docs. - return [(item, 0.0) for item in graph_results[:top_k]] + scored_items = [] + for item, raw_score in zip(graph_results, score_list, strict=False): + score = self._apply_boost_generic(item, raw_score, search_priority) + scored_items.append((item, score)) - except Exception as e: - # Network error, timeout, JSON decode error, etc. - # Degrade gracefully by returning first top_k valid docs with 0.0 score. - logger.error(f"[HTTPBGEReranker] request failed: {e}") + scored_items.sort(key=lambda x: x[1], reverse=True) + return scored_items[: min(top_k, len(scored_items))] + + else: + # Unexpected response schema: return a 0.0-scored fallback of the first top_k valid docs + # Note: we use 'pairs' to keep alignment with valid (string) docs. return [(item, 0.0) for item in graph_results[:top_k]] def _get_attr_or_key(self, obj: Any, key: str) -> Any: diff --git a/src/memos/utils.py b/src/memos/utils.py index a320ed1cd..a29eaf99d 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -1,3 +1,4 @@ +import functools import time from memos.log import get_logger @@ -6,7 +7,14 @@ logger = get_logger(__name__) -def timed_with_status(func=None, *, log=True, log_prefix="", log_args=None, log_extra_args=None): +def timed_with_status( + func=None, + *, + log_prefix="", + log_args=None, + log_extra_args=None, + fallback=None, +): """ Parameters: - log: enable timing logs (default True) @@ -15,34 +23,55 @@ def timed_with_status(func=None, *, log=True, log_prefix="", log_args=None, log_ - log_extra_args: extra arguments to include in logs (dict). """ + if isinstance(log_args, str): + effective_log_args = [log_args] + else: + effective_log_args = list(log_args) if log_args else [] + def decorator(fn): + @functools.wraps(fn) def wrapper(*args, **kwargs): start = time.perf_counter() - result = fn(*args, **kwargs) - elapsed_ms = (time.perf_counter() - start) * 1000.0 - ctx_str = "" - ctx_parts = [] + exc_type = None + result = None + success_flag = False - if log is not True: + try: + result = fn(*args, **kwargs) + success_flag = True return result - - if log_args: - for key in log_args: + except Exception as e: + exc_type = type(e) + success_flag = False + + if fallback is not None and callable(fallback): + result = fallback(e, *args, **kwargs) + return result + finally: + elapsed_ms = (time.perf_counter() - start) * 1000.0 + + ctx_parts = [] + for key in effective_log_args: val = kwargs.get(key) ctx_parts.append(f"{key}={val}") - ctx_str = f" [{', '.join(ctx_parts)}]" - if log_extra_args: - ctx_parts.extend([f"{key}={val}" for key, val in log_extra_args.items()]) + if log_extra_args: + ctx_parts.extend(f"{key}={val}" for key, val in log_extra_args.items()) - if ctx_parts: - ctx_str = f" [{', '.join(ctx_parts)}]" + ctx_str = f" [{', '.join(ctx_parts)}]" if ctx_parts else "" - logger.info( - f"[TIMER] {log_prefix or fn.__name__} took {elapsed_ms:.0f} ms, args: {ctx_str}" - ) + status = "SUCCESS" if success_flag else "FAILED" + status_info = f", status: {status}" - return result + if not success_flag and exc_type is not None: + status_info += f", error: {exc_type.__name__}" + + msg = ( + f"[TIMER_WITH_STATUS] {log_prefix or fn.__name__} " + f"took {elapsed_ms:.0f} ms{status_info}, args: {ctx_str}" + ) + + logger.info(msg) return wrapper @@ -51,7 +80,7 @@ def wrapper(*args, **kwargs): return decorator(func) -def timed(func=None, *, log=True): +def timed(func=None, *, log=True, log_prefix=""): def decorator(fn): def wrapper(*args, **kwargs): start = time.perf_counter() @@ -61,7 +90,7 @@ def wrapper(*args, **kwargs): if log is not True: return result - logger.info(f"[TIMER] {fn.__name__} took {elapsed_ms:.0f} ms") + logger.info(f"[TIMER] {log_prefix or fn.__name__} took {elapsed_ms:.0f} ms") return result From 687350d2ee5f080d7b82b5ad831ea3a7be6c9231 Mon Sep 17 00:00:00 2001 From: harvey_xiang Date: Thu, 4 Dec 2025 14:45:21 +0800 Subject: [PATCH 7/7] fix: conflict --- src/memos/llms/openai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/llms/openai.py b/src/memos/llms/openai.py index 3259edc1b..35a9c7117 100644 --- a/src/memos/llms/openai.py +++ b/src/memos/llms/openai.py @@ -55,7 +55,7 @@ def generate(self, messages: MessageList, **kwargs) -> str: return reasoning_content + response_content return response_content - @timed_with_status(log=True, log_prefix="OpenAI LLM", log_args=["model_name_or_path"]) + @timed_with_status(log_prefix="OpenAI LLM", log_args=["model_name_or_path"]) def generate_stream(self, messages: MessageList, **kwargs) -> Generator[str, None, None]: """Stream response from OpenAI LLM with optional reasoning support.""" if kwargs.get("tools"):