From 094e3f0ae001c777dcdb21d30b7f49143f9c0d84 Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Tue, 16 Dec 2025 00:01:03 +0800 Subject: [PATCH 1/8] feat(vector): support filtering by multiple unique_ids in vector stores --- .../vector_store/pgvector_vector_store.py | 21 +++++++--- .../core/vector_store/qdrant_vector_store.py | 21 +++++++++- tests/test_memory_vector_store.py | 38 +++++++++++++++++++ 3 files changed, 73 insertions(+), 7 deletions(-) diff --git a/flowllm/core/vector_store/pgvector_vector_store.py b/flowllm/core/vector_store/pgvector_vector_store.py index fa6b30b..497a6df 100644 --- a/flowllm/core/vector_store/pgvector_vector_store.py +++ b/flowllm/core/vector_store/pgvector_vector_store.py @@ -168,12 +168,23 @@ def _build_sql_filters( # Handle special keys that are stored as direct columns if key == "unique_id": # unique_id is a direct column, not in metadata JSONB - if use_async: - conditions.append(f"unique_id = ${param_idx}") + # Support both single value and list of values + if isinstance(filter_value, list): + if use_async: + placeholders = ", ".join(f"${param_idx + i}" for i in range(len(filter_value))) + conditions.append(f"unique_id IN ({placeholders})") + else: + placeholders = ", ".join(["%s"] * len(filter_value)) + conditions.append(f"unique_id IN ({placeholders})") + params.extend([str(v) for v in filter_value]) + param_idx += len(filter_value) else: - conditions.append("unique_id = %s") - params.append(str(filter_value)) - param_idx += 1 + if use_async: + conditions.append(f"unique_id = ${param_idx}") + else: + conditions.append("unique_id = %s") + params.append(str(filter_value)) + param_idx += 1 continue # Strip "metadata." prefix if present (since we're already accessing metadata column) diff --git a/flowllm/core/vector_store/qdrant_vector_store.py b/flowllm/core/vector_store/qdrant_vector_store.py index 07126aa..04d5500 100644 --- a/flowllm/core/vector_store/qdrant_vector_store.py +++ b/flowllm/core/vector_store/qdrant_vector_store.py @@ -190,9 +190,26 @@ def _build_qdrant_filters(filter_dict: Optional[Dict[str, Any]] = None): for key, filter_value in filter_dict.items(): # Handle special keys that are stored at payload root level if key == "unique_id": - qdrant_key = "original_id" + # unique_id is stored as original_id in Qdrant payload + # Support both single value and list of values + if isinstance(filter_value, list): + conditions.append( + FieldCondition( + key="original_id", + match=MatchAny(any=filter_value), + ), + ) + else: + conditions.append( + FieldCondition( + key="original_id", + match=MatchValue(value=filter_value), + ), + ) + continue + # Handle nested keys by prefixing with metadata. - elif not key.startswith("metadata."): + if not key.startswith("metadata."): qdrant_key = f"metadata.{key}" else: qdrant_key = key diff --git a/tests/test_memory_vector_store.py b/tests/test_memory_vector_store.py index a385a85..e141aa6 100644 --- a/tests/test_memory_vector_store.py +++ b/tests/test_memory_vector_store.py @@ -317,6 +317,24 @@ def test_search_with_id(self, workspace_id: str): assert results[0].unique_id == target_unique_id, f"Result should have unique_id={target_unique_id}" logger.info(f"Found node: {results[0].model_dump(exclude={'vector'})}") + def test_search_with_id_list(self, workspace_id: str): + """Test vector search by multiple unique_ids with list filter.""" + logger.info("=" * 20 + " SEARCH BY ID LIST TEST " + "=" * 20) + target_unique_ids = [f"{self.workspace_prefix}_node1", f"{self.workspace_prefix}_node2"] + filter_dict = {"unique_id": target_unique_ids} + results = self.client.search( + "", + workspace_id=workspace_id, + top_k=5, + filter_dict=filter_dict, + ) + logger.info(f"Search by ID list returned {len(results)} results") + assert len(results) == 2, f"Should return exactly 2 results, got {len(results)}" + result_ids = {r.unique_id for r in results} + assert result_ids == set(target_unique_ids), f"Result unique_ids should match {target_unique_ids}" + for r in results: + logger.info(f"Found node: {r.model_dump(exclude={'vector'})}") + def test_update(self, workspace_id: str): """Test node update (insert with existing unique_id).""" logger.info("=" * 20 + " UPDATE TEST " + "=" * 20) @@ -447,6 +465,7 @@ def run_all_tests(self): self.test_search(workspace_id) self.test_search_with_filter(workspace_id) self.test_search_with_id(workspace_id) + self.test_search_with_id_list(workspace_id) self.test_update(workspace_id) self.test_delete(workspace_id) self.test_list_workspace_nodes(workspace_id) @@ -546,6 +565,24 @@ async def test_search_with_id(self, workspace_id: str): assert results[0].unique_id == target_unique_id, f"Result should have unique_id={target_unique_id}" logger.info(f"Found node: {results[0].model_dump(exclude={'vector'})}") + async def test_search_with_id_list(self, workspace_id: str): + """Test async vector search by multiple unique_ids with list filter.""" + logger.info("ASYNC - " + "=" * 20 + " SEARCH BY ID LIST TEST " + "=" * 20) + target_unique_ids = [f"{self.workspace_prefix}_node1", f"{self.workspace_prefix}_node2"] + filter_dict = {"unique_id": target_unique_ids} + results = await self.client.async_search( + "", + workspace_id=workspace_id, + top_k=5, + filter_dict=filter_dict, + ) + logger.info(f"Search by ID list returned {len(results)} results") + assert len(results) == 2, f"Should return exactly 2 results, got {len(results)}" + result_ids = {r.unique_id for r in results} + assert result_ids == set(target_unique_ids), f"Result unique_ids should match {target_unique_ids}" + for r in results: + logger.info(f"Found node: {r.model_dump(exclude={'vector'})}") + async def test_update(self, workspace_id: str): """Test async node update (insert with existing unique_id).""" logger.info("ASYNC - " + "=" * 20 + " UPDATE TEST " + "=" * 20) @@ -678,6 +715,7 @@ async def _run_all_tests_async(self): await self.test_search(workspace_id) await self.test_search_with_filter(workspace_id) await self.test_search_with_id(workspace_id) + await self.test_search_with_id_list(workspace_id) await self.test_update(workspace_id) await self.test_delete(workspace_id) await self.test_list_workspace_nodes(workspace_id) From df20afbbe6b25c282f1c91a503b09dfa0e670231 Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Tue, 16 Dec 2025 00:33:50 +0800 Subject: [PATCH 2/8] refactor(core): remove mandatory file_path requirement - Removed file_path attribute from multiple Op classes - Updated base class to auto-detect subclass file path using inspect - Simplified prompt path resolution logic - Updated documentation to reflect removal of file_path requirement - Affected classes: QAOp, ChatOp, EchoOp, StreamChatOp, MockSearchOp - Affected file tool operations: edit, exit_plan_mode, glob, grep, ls - Affected file tool operations: read_file, read_many_files, rip_grep, shell - Affected file tool operations: smart_edit, task, write_file, write_todos - Removed redundant file_path declarations across extensions - Improved automatic prompt file discovery mechanism --- docs/zh/guide/async_op_llm_guide.md | 3 --- docs/zh/guide/cmd_service_guide.md | 2 -- docs/zh/guide/http_service_guide.md | 2 -- docs/zh/guide/http_stream_guide.md | 2 -- docs/zh/guide/mcp_service_guide.md | 1 - flowllm/core/op/base_op.py | 7 ++++--- flowllm/extensions/file_tool/edit_op.py | 2 -- flowllm/extensions/file_tool/exit_plan_mode_op.py | 2 -- flowllm/extensions/file_tool/glob_op.py | 2 -- flowllm/extensions/file_tool/grep_op.py | 2 -- flowllm/extensions/file_tool/ls_op.py | 2 -- flowllm/extensions/file_tool/read_file_op.py | 2 -- flowllm/extensions/file_tool/read_many_files_op.py | 2 -- flowllm/extensions/file_tool/rip_grep_op.py | 2 -- flowllm/extensions/file_tool/shell_op.py | 2 -- flowllm/extensions/file_tool/smart_edit_op.py | 2 -- flowllm/extensions/file_tool/task_op.py | 2 -- flowllm/extensions/file_tool/write_file_op.py | 2 -- flowllm/extensions/file_tool/write_todos_op.py | 2 -- flowllm/gallery/chat_op.py | 2 -- flowllm/gallery/think_tool_op.py | 2 -- 21 files changed, 4 insertions(+), 43 deletions(-) diff --git a/docs/zh/guide/async_op_llm_guide.md b/docs/zh/guide/async_op_llm_guide.md index 237a023..c06348a 100644 --- a/docs/zh/guide/async_op_llm_guide.md +++ b/docs/zh/guide/async_op_llm_guide.md @@ -50,8 +50,6 @@ import json @C.register_op() class QAOp(BaseAsyncOp): - file_path: str = __file__ # 必须设置,用于自动查找 prompt 文件 - async def async_execute(self): """执行问答逻辑""" # 1. 读取输入 @@ -156,7 +154,6 @@ if __name__ == "__main__": 6. **调用 LLM**:使用 `await self.llm.achat(messages=messages, ...)` 7. **处理响应**:使用 `callback_fn` 处理或转换响应,返回处理后的结果 8. **应用上下文**:必须在 `FlowLLMApp()` 上下文里调用 -9. **file_path**:Op 类中必须设置 `file_path = __file__`,用于自动查找 prompt 文件 --- diff --git a/docs/zh/guide/cmd_service_guide.md b/docs/zh/guide/cmd_service_guide.md index 780a93c..f60806d 100644 --- a/docs/zh/guide/cmd_service_guide.md +++ b/docs/zh/guide/cmd_service_guide.md @@ -15,8 +15,6 @@ from flowllm.core.op import BaseAsyncOp @C.register_op() class EchoOp(BaseAsyncOp): - file_path: str = __file__ - async def async_execute(self): text = self.context.get("text", "") self.context.response.answer = f"echo: {text}" diff --git a/docs/zh/guide/http_service_guide.md b/docs/zh/guide/http_service_guide.md index 1a1c299..23821d4 100644 --- a/docs/zh/guide/http_service_guide.md +++ b/docs/zh/guide/http_service_guide.md @@ -14,8 +14,6 @@ from flowllm.core.op import BaseAsyncOp @C.register_op() class EchoOp(BaseAsyncOp): - file_path: str = __file__ - async def async_execute(self): text = self.context.get("text", "") self.context.response.answer = f"echo: {text}" diff --git a/docs/zh/guide/http_stream_guide.md b/docs/zh/guide/http_stream_guide.md index 450e0e7..90fa212 100644 --- a/docs/zh/guide/http_stream_guide.md +++ b/docs/zh/guide/http_stream_guide.md @@ -16,8 +16,6 @@ from flowllm.core.schema import FlowStreamChunk, Message @C.register_op() class StreamChatOp(BaseAsyncOp): - file_path: str = __file__ - async def async_execute(self): messages = self.context.messages system_prompt = self.context.system_prompt diff --git a/docs/zh/guide/mcp_service_guide.md b/docs/zh/guide/mcp_service_guide.md index 19aa517..663e75d 100644 --- a/docs/zh/guide/mcp_service_guide.md +++ b/docs/zh/guide/mcp_service_guide.md @@ -15,7 +15,6 @@ from flowllm.core.op import BaseAsyncOp @C.register_op() class MockSearchOp(BaseAsyncOp): """Mock search operation that uses LLM to generate realistic search results.""" - file_path: str = __file__ async def async_execute(self): query = self.context.query diff --git a/flowllm/core/op/base_op.py b/flowllm/core/op/base_op.py index 0d6f40b..aa658ef 100644 --- a/flowllm/core/op/base_op.py +++ b/flowllm/core/op/base_op.py @@ -6,6 +6,7 @@ """ import copy +import inspect from abc import ABC, abstractmethod from pathlib import Path from typing import Callable, List, Union @@ -78,8 +79,6 @@ def execute(self): ``` """ - file_path: str = __file__ - def __new__(cls, *args, **kwargs): """Create a new instance and save initialization arguments for copying. @@ -149,7 +148,9 @@ def __init__( self.raise_exception: bool = raise_exception self.enable_multithread: bool = enable_multithread self.language: str = language or C.language - default_prompt_path: str = self.file_path.replace("op.py", "prompt.yaml") + + subclass_file_path: str = inspect.getfile(self.__class__) + default_prompt_path: str = subclass_file_path.replace("op.py", "prompt.yaml") self.prompt_path: Path = Path(prompt_path if prompt_path else default_prompt_path) self.prompt = PromptHandler(language=self.language).load_prompt_by_file(self.prompt_path) self._llm: BaseLLM | str = llm diff --git a/flowllm/extensions/file_tool/edit_op.py b/flowllm/extensions/file_tool/edit_op.py index 81bab3e..c6ab4e7 100644 --- a/flowllm/extensions/file_tool/edit_op.py +++ b/flowllm/extensions/file_tool/edit_op.py @@ -21,8 +21,6 @@ class EditOp(BaseAsyncToolOp): is specified. Supports creating new files when old_string is empty. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/exit_plan_mode_op.py b/flowllm/extensions/file_tool/exit_plan_mode_op.py index 30a46ba..939b57f 100644 --- a/flowllm/extensions/file_tool/exit_plan_mode_op.py +++ b/flowllm/extensions/file_tool/exit_plan_mode_op.py @@ -17,8 +17,6 @@ class ExitPlanModeOp(BaseAsyncToolOp): to the user for approval before proceeding with implementation. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/glob_op.py b/flowllm/extensions/file_tool/glob_op.py index 6f1aeb5..3f7fc78 100644 --- a/flowllm/extensions/file_tool/glob_op.py +++ b/flowllm/extensions/file_tool/glob_op.py @@ -28,8 +28,6 @@ class GlobOp(BaseAsyncToolOp): Supports gitignore patterns for filtering files. """ - file_path = __file__ - def __init__(self, gitignore_patterns: List[str] = None, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/grep_op.py b/flowllm/extensions/file_tool/grep_op.py index bcc0eb9..1e18013 100644 --- a/flowllm/extensions/file_tool/grep_op.py +++ b/flowllm/extensions/file_tool/grep_op.py @@ -25,8 +25,6 @@ class GrepOp(BaseAsyncToolOp): Supports glob pattern filtering and result limiting. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/ls_op.py b/flowllm/extensions/file_tool/ls_op.py index bee9898..57d84d7 100644 --- a/flowllm/extensions/file_tool/ls_op.py +++ b/flowllm/extensions/file_tool/ls_op.py @@ -22,8 +22,6 @@ class LSOp(BaseAsyncToolOp): matching provided glob patterns. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/read_file_op.py b/flowllm/extensions/file_tool/read_file_op.py index 46c9d51..dc0a752 100644 --- a/flowllm/extensions/file_tool/read_file_op.py +++ b/flowllm/extensions/file_tool/read_file_op.py @@ -20,8 +20,6 @@ class ReadFileOp(BaseAsyncToolOp): For text files, it can read specific line ranges using offset and limit. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/read_many_files_op.py b/flowllm/extensions/file_tool/read_many_files_op.py index 8b998cd..7f3a199 100644 --- a/flowllm/extensions/file_tool/read_many_files_op.py +++ b/flowllm/extensions/file_tool/read_many_files_op.py @@ -24,8 +24,6 @@ class ReadManyFilesOp(BaseAsyncToolOp): and concatenates them with separators. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/rip_grep_op.py b/flowllm/extensions/file_tool/rip_grep_op.py index 79592ae..84967af 100644 --- a/flowllm/extensions/file_tool/rip_grep_op.py +++ b/flowllm/extensions/file_tool/rip_grep_op.py @@ -22,8 +22,6 @@ class RipGrepOp(BaseAsyncToolOp): Supports glob pattern filtering and result limiting. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/shell_op.py b/flowllm/extensions/file_tool/shell_op.py index 5b71c8e..4fee738 100644 --- a/flowllm/extensions/file_tool/shell_op.py +++ b/flowllm/extensions/file_tool/shell_op.py @@ -23,8 +23,6 @@ class ShellOp(BaseAsyncToolOp): Supports both foreground and background execution. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/smart_edit_op.py b/flowllm/extensions/file_tool/smart_edit_op.py index 5076fcf..b57f762 100644 --- a/flowllm/extensions/file_tool/smart_edit_op.py +++ b/flowllm/extensions/file_tool/smart_edit_op.py @@ -24,8 +24,6 @@ class SmartEditOp(BaseAsyncToolOp): exact matching, flexible matching (ignoring indentation), and regex-based matching. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/task_op.py b/flowllm/extensions/file_tool/task_op.py index 434c160..577f9e6 100644 --- a/flowllm/extensions/file_tool/task_op.py +++ b/flowllm/extensions/file_tool/task_op.py @@ -16,8 +16,6 @@ class TaskOp(BaseAsyncToolOp): This operation delegates tasks to specialized subagents for autonomous execution. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/write_file_op.py b/flowllm/extensions/file_tool/write_file_op.py index 977cfdd..7457a13 100644 --- a/flowllm/extensions/file_tool/write_file_op.py +++ b/flowllm/extensions/file_tool/write_file_op.py @@ -20,8 +20,6 @@ class WriteFileOp(BaseAsyncToolOp): it will be created. If parent directories don't exist, they will be created automatically. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/extensions/file_tool/write_todos_op.py b/flowllm/extensions/file_tool/write_todos_op.py index d041fbc..f652e63 100644 --- a/flowllm/extensions/file_tool/write_todos_op.py +++ b/flowllm/extensions/file_tool/write_todos_op.py @@ -21,8 +21,6 @@ class WriteTodosOp(BaseAsyncToolOp): through different statuses: pending, in_progress, completed, cancelled. """ - file_path = __file__ - def __init__(self, **kwargs): kwargs.setdefault("raise_exception", False) super().__init__(**kwargs) diff --git a/flowllm/gallery/chat_op.py b/flowllm/gallery/chat_op.py index ab309a6..7b701aa 100644 --- a/flowllm/gallery/chat_op.py +++ b/flowllm/gallery/chat_op.py @@ -17,8 +17,6 @@ class ChatOp(BaseAsyncOp): and system_prompt to be present in the context. """ - file_path: str = __file__ - def __init__( self, llm: str = "qwen3_30b_instruct", diff --git a/flowllm/gallery/think_tool_op.py b/flowllm/gallery/think_tool_op.py index b933e03..c8c5423 100644 --- a/flowllm/gallery/think_tool_op.py +++ b/flowllm/gallery/think_tool_op.py @@ -9,8 +9,6 @@ class ThinkToolOp(BaseAsyncToolOp): """Utility operation that prompts the model for explicit reflection text.""" - file_path = __file__ - def __init__(self, add_output_reflection: bool = False, **kwargs): super().__init__(**kwargs) self.add_output_reflection: bool = add_output_reflection From fd809f7fddad1205e9b501d6eeae9a15e29e9aa3 Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Tue, 16 Dec 2025 00:36:18 +0800 Subject: [PATCH 3/8] chore(config): update pre-commit config to disable R0915 --- .pre-commit-config.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7067f85..fdc156e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -81,6 +81,7 @@ repos: --disable=C3001, --disable=R1702, --disable=R0912, + --disable=R0915, --max-line-length=120, --max-statements=75, ] From 3219ab015444ca79080c8cdcadd50a96be1bef63 Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Tue, 16 Dec 2025 18:35:23 +0800 Subject: [PATCH 4/8] feat(message): add format_message method to Message class --- flowllm/core/schema/message.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/flowllm/core/schema/message.py b/flowllm/core/schema/message.py index b62b42b..471c120 100644 --- a/flowllm/core/schema/message.py +++ b/flowllm/core/schema/message.py @@ -95,6 +95,38 @@ def simple_dump(self, add_reasoning: bool = True) -> dict: return result + def format_message( + self, + i: int | None = None, + add_time_created: bool = False, + use_name_first: bool = False, + add_reasoning_content: bool = True, + add_tool_calls: bool = True + ) -> str: + content = "" + if i is not None: + content += f"round{i} " + + if add_time_created: + content += f"[{self.time_created}] " + + if use_name_first: + content += f"{self.name or self.role.value}:\n" + else: + content += f"{self.role.value}:\n" + + if add_reasoning_content and self.reasoning_content: + content += self.reasoning_content + "\n" + + if self.content: + content += self.content + "\n" + + if add_tool_calls and self.tool_calls: + for tool_call in self.tool_calls: + content += f" - tool_call={tool_call.name} params={tool_call.arguments}\n" + + return content.strip() + class Trajectory(BaseModel): """Represents a conversation trajectory with messages and optional scoring.""" From c2ee276ecc07da0cfb65723ac26de8ace128cb43 Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Tue, 16 Dec 2025 20:30:30 +0800 Subject: [PATCH 5/8] test(memory): enhance vector store tests with metadata and multi-key filters --- tests/test_memory_vector_store.py | 138 ++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) diff --git a/tests/test_memory_vector_store.py b/tests/test_memory_vector_store.py index e141aa6..8092d23 100644 --- a/tests/test_memory_vector_store.py +++ b/tests/test_memory_vector_store.py @@ -153,6 +153,12 @@ def create_sample_nodes(workspace_id: str, prefix: str = "") -> List[VectorNode] metadata={ "node_type": "tech", "category": "AI", + "source": "research", + "priority": "high", + "year": 2023, + "department": "engineering", + "language": "english", + "status": "published", }, ), VectorNode( @@ -162,6 +168,12 @@ def create_sample_nodes(workspace_id: str, prefix: str = "") -> List[VectorNode] metadata={ "node_type": "tech", "category": "ML", + "source": "research", + "priority": "high", + "year": 2022, + "department": "engineering", + "language": "english", + "status": "published", }, ), VectorNode( @@ -171,6 +183,12 @@ def create_sample_nodes(workspace_id: str, prefix: str = "") -> List[VectorNode] metadata={ "node_type": "tech_new", "category": "ML", + "source": "blog", + "priority": "medium", + "year": 2024, + "department": "marketing", + "language": "chinese", + "status": "draft", }, ), VectorNode( @@ -180,6 +198,12 @@ def create_sample_nodes(workspace_id: str, prefix: str = "") -> List[VectorNode] metadata={ "node_type": "food", "category": "preference", + "source": "personal", + "priority": "low", + "year": 2023, + "department": "lifestyle", + "language": "english", + "status": "published", }, ), VectorNode( @@ -189,6 +213,12 @@ def create_sample_nodes(workspace_id: str, prefix: str = "") -> List[VectorNode] metadata={ "node_type": "tech", "category": "DL", + "source": "research", + "priority": "high", + "year": 2024, + "department": "engineering", + "language": "chinese", + "status": "review", }, ), ] @@ -286,6 +316,7 @@ def test_search(self, workspace_id: str): def test_search_with_filter(self, workspace_id: str): """Test vector search with filter.""" logger.info("=" * 20 + " FILTER SEARCH TEST " + "=" * 20) + # Test 1: Filter by node_type only filter_dict = {"metadata.node_type": ["tech", "tech_new"]} results = self.client.search( "What is artificial intelligence?", @@ -301,6 +332,57 @@ def test_search_with_filter(self, workspace_id: str): "tech_new", ], "All results should have node_type in [tech, tech_new]" + # Test 2: Filter by both node_type and source (multiple metadata keys) + logger.info("=" * 20 + " MULTI-KEY FILTER SEARCH TEST " + "=" * 20) + filter_dict_multi = { + "metadata.node_type": ["tech", "tech_new"], + "metadata.source": "research", + } + results_multi = self.client.search( + "What is artificial intelligence?", + workspace_id=workspace_id, + top_k=5, + filter_dict=filter_dict_multi, + ) + logger.info( + f"Multi-key filtered search returned {len(results_multi)} results " + f"(node_type in [tech, tech_new] AND source=research)" + ) + for i, r in enumerate(results_multi, 1): + logger.info(f"Multi-key Filtered Result {i}: {r.model_dump(exclude={'vector'})}") + assert r.metadata.get("node_type") in [ + "tech", + "tech_new", + ], "All results should have node_type in [tech, tech_new]" + assert r.metadata.get("source") == "research", "All results should have source=research" + + # Test 3: Filter by both node_type and language with list values for both + logger.info("=" * 20 + " MULTI-KEY LIST FILTER SEARCH TEST " + "=" * 20) + filter_dict_multi_list = { + "metadata.node_type": ["tech", "tech_new"], + "metadata.language": ["english", "chinese"], + } + results_multi_list = self.client.search( + "What is artificial intelligence?", + workspace_id=workspace_id, + top_k=5, + filter_dict=filter_dict_multi_list, + ) + logger.info( + f"Multi-key list filtered search returned {len(results_multi_list)} results " + f"(node_type in [tech, tech_new] AND language in [english, chinese])" + ) + for i, r in enumerate(results_multi_list, 1): + logger.info(f"Multi-key List Filtered Result {i}: {r.model_dump(exclude={'vector'})}") + assert r.metadata.get("node_type") in [ + "tech", + "tech_new", + ], "All results should have node_type in [tech, tech_new]" + assert r.metadata.get("language") in [ + "english", + "chinese", + ], "All results should have language in [english, chinese]" + def test_search_with_id(self, workspace_id: str): """Test vector search by unique_id with empty query.""" logger.info("=" * 20 + " SEARCH BY ID TEST " + "=" * 20) @@ -534,6 +616,7 @@ async def test_search(self, workspace_id: str): async def test_search_with_filter(self, workspace_id: str): """Test async vector search with filter.""" logger.info("ASYNC - " + "=" * 20 + " FILTER SEARCH TEST " + "=" * 20) + # Test 1: Filter by node_type only filter_dict = {"metadata.node_type": ["tech", "tech_new"]} results = await self.client.async_search( "What is artificial intelligence?", @@ -549,6 +632,57 @@ async def test_search_with_filter(self, workspace_id: str): "tech_new", ], "All results should have node_type in [tech, tech_new]" + # Test 2: Filter by both node_type and source (multiple metadata keys) + logger.info("ASYNC - " + "=" * 20 + " MULTI-KEY FILTER SEARCH TEST " + "=" * 20) + filter_dict_multi = { + "metadata.node_type": ["tech", "tech_new"], + "metadata.source": "research", + } + results_multi = await self.client.async_search( + "What is artificial intelligence?", + workspace_id=workspace_id, + top_k=5, + filter_dict=filter_dict_multi, + ) + logger.info( + f"Multi-key filtered search returned {len(results_multi)} results " + f"(node_type in [tech, tech_new] AND source=research)" + ) + for i, r in enumerate(results_multi, 1): + logger.info(f"Multi-key Filtered Result {i}: {r.model_dump(exclude={'vector'})}") + assert r.metadata.get("node_type") in [ + "tech", + "tech_new", + ], "All results should have node_type in [tech, tech_new]" + assert r.metadata.get("source") == "research", "All results should have source=research" + + # Test 3: Filter by both node_type and language with list values for both + logger.info("ASYNC - " + "=" * 20 + " MULTI-KEY LIST FILTER SEARCH TEST " + "=" * 20) + filter_dict_multi_list = { + "metadata.node_type": ["tech", "tech_new"], + "metadata.language": ["english", "chinese"], + } + results_multi_list = await self.client.async_search( + "What is artificial intelligence?", + workspace_id=workspace_id, + top_k=5, + filter_dict=filter_dict_multi_list, + ) + logger.info( + f"Multi-key list filtered search returned {len(results_multi_list)} results " + f"(node_type in [tech, tech_new] AND language in [english, chinese])" + ) + for i, r in enumerate(results_multi_list, 1): + logger.info(f"Multi-key List Filtered Result {i}: {r.model_dump(exclude={'vector'})}") + assert r.metadata.get("node_type") in [ + "tech", + "tech_new", + ], "All results should have node_type in [tech, tech_new]" + assert r.metadata.get("language") in [ + "english", + "chinese", + ], "All results should have language in [english, chinese]" + async def test_search_with_id(self, workspace_id: str): """Test async vector search by unique_id with empty query.""" logger.info("ASYNC - " + "=" * 20 + " SEARCH BY ID TEST " + "=" * 20) @@ -823,12 +957,16 @@ def print_usage(): if args[0] == "--all": # Test all vector stores run_tests(store_types=valid_store_types) + # Clean up test files after testing + delete_test_files() elif args[0] == "delete": # Delete all test-generated files delete_test_files() elif args[0] in valid_store_types: # Test specific vector store run_tests(store_types=[args[0]]) + # Clean up test files after testing + delete_test_files() else: print(f"Unknown argument: {args[0]}") print_usage() From 1a2078a71b289d2e75310723f4d0e999d26c9018 Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Wed, 17 Dec 2025 17:37:34 +0800 Subject: [PATCH 6/8] feat(logging): improve logging for chat methods --- flowllm/core/llm/lite_llm.py | 14 ++++++++++++-- flowllm/core/llm/openai_compatible_llm.py | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/flowllm/core/llm/lite_llm.py b/flowllm/core/llm/lite_llm.py index 24f2962..c7f275f 100644 --- a/flowllm/core/llm/lite_llm.py +++ b/flowllm/core/llm/lite_llm.py @@ -110,7 +110,12 @@ def stream_chat( **self.kwargs, **kwargs, } - log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"} + log_kwargs: Dict[str, object] = {} + for k, v in chat_kwargs.items(): + if k in ["messages", "tools"]: + log_kwargs[k] = len(v) if v is not None else 0 + else: + log_kwargs[k] = v logger.info(f"LiteLLM.stream_chat: {log_kwargs}") for i in range(self.max_retries): @@ -217,7 +222,12 @@ async def astream_chat( **self.kwargs, **kwargs, } - log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"} + log_kwargs: Dict[str, object] = {} + for k, v in chat_kwargs.items(): + if k in ["messages", "tools"]: + log_kwargs[k] = len(v) if v is not None else 0 + else: + log_kwargs[k] = v logger.info(f"LiteLLM.astream_chat: {log_kwargs}") for i in range(self.max_retries): diff --git a/flowllm/core/llm/openai_compatible_llm.py b/flowllm/core/llm/openai_compatible_llm.py index 22ec71e..51c1816 100644 --- a/flowllm/core/llm/openai_compatible_llm.py +++ b/flowllm/core/llm/openai_compatible_llm.py @@ -106,7 +106,12 @@ def stream_chat( **self.kwargs, **kwargs, } - log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"} + log_kwargs: Dict[str, object] = {} + for k, v in chat_kwargs.items(): + if k in ["messages", "tools"]: + log_kwargs[k] = len(v) if v is not None else 0 + else: + log_kwargs[k] = v logger.info(f"OpenAICompatibleLLM.stream_chat: {log_kwargs}") for i in range(self.max_retries): @@ -208,7 +213,12 @@ async def astream_chat( **self.kwargs, **kwargs, } - log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"} + log_kwargs: Dict[str, object] = {} + for k, v in chat_kwargs.items(): + if k in ["messages", "tools"]: + log_kwargs[k] = len(v) if v is not None else 0 + else: + log_kwargs[k] = v logger.info(f"OpenAICompatibleLLM.astream_chat: {log_kwargs}") for i in range(self.max_retries): From 56ae13992816f11ee8b2a4bc4d713d693286a57d Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Thu, 18 Dec 2025 15:23:17 +0800 Subject: [PATCH 7/8] feat(storage): update cache handler to accept Path objects --- flowllm/core/storage/cache_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowllm/core/storage/cache_handler.py b/flowllm/core/storage/cache_handler.py index aad6b3e..d7ceee1 100644 --- a/flowllm/core/storage/cache_handler.py +++ b/flowllm/core/storage/cache_handler.py @@ -23,7 +23,7 @@ class CacheHandler: - Recording and managing update timestamps """ - def __init__(self, cache_dir: str = "cache"): + def __init__(self, cache_dir: Union[str, Path] = "cache"): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self.metadata_file = self.cache_dir / "metadata.json" From b1cb7643ac24ba9ceea0da4631840ec2f5ab9b04 Mon Sep 17 00:00:00 2001 From: "jinli.yl" Date: Mon, 22 Dec 2025 00:30:38 +0800 Subject: [PATCH 8/8] refactor(core): remove deprecated save_load_cache method --- docs/todo.md | 9 ++++ flowllm/core/op/base_op.py | 63 +-------------------------- flowllm/core/storage/cache_handler.py | 17 ++++++++ 3 files changed, 28 insertions(+), 61 deletions(-) create mode 100644 docs/todo.md diff --git a/docs/todo.md b/docs/todo.md new file mode 100644 index 0000000..975254a --- /dev/null +++ b/docs/todo.md @@ -0,0 +1,9 @@ +1. 去掉复杂的save_load_cache +2. 去掉tool中的input和output参数 +3. 新建自己的测试函数 + +4. base_context 增加dict函数 +5. token 之前 self.llm +6. cache 增加reset path的能力 + +7. 完善文档 \ No newline at end of file diff --git a/flowllm/core/op/base_op.py b/flowllm/core/op/base_op.py index aa658ef..c80728b 100644 --- a/flowllm/core/op/base_op.py +++ b/flowllm/core/op/base_op.py @@ -189,46 +189,12 @@ def cache(self): self._cache = CacheHandler(self.cache_path.format(op_name=self.short_name)) return self._cache - def save_load_cache(self, key: str, fn: Callable, **kwargs): - """Save or load from cache. - - If caching is enabled, checks cache first. If not found, executes the - function and saves the result. Otherwise, executes the function directly. - - Args: - key: Cache key for storing/retrieving the result - fn: Function to execute if cache miss - **kwargs: Additional arguments for cache load operation - - Returns: - Cached result if available, otherwise result from function execution - """ - if self.enable_cache: - result = self.cache.load(key, **kwargs) - if result is None: - result = fn() - self.cache.save(key, result, expire_hours=self.cache_expire_hours) - else: - logger.info(f"load {key} from cache") - else: - result = fn() - - return result - def before_execute(self): """Hook method called before execute(). Override in subclasses. This method is called automatically by `call()` before executing the main `execute()` method. Use this to perform any setup, validation, or preprocessing needed before execution. - - Example: - ```python - def before_execute(self): - # Validate inputs - if not self.context.get("input"): - raise ValueError("Input is required") - ``` """ def after_execute(self): @@ -237,22 +203,11 @@ def after_execute(self): This method is called automatically by `call()` after successfully executing the main `execute()` method. Use this to perform any cleanup, post-processing, or result transformation. - - Example: - ```python - def after_execute(self): - # Post-process results - if self.context.response: - self.context.response.answer = self.context.response.answer.upper() - ``` """ @abstractmethod def execute(self): """Main execution method. Must be implemented in subclasses. - - Returns: - Execution result """ def default_execute(self, e: Exception = None, **kwargs): @@ -261,24 +216,10 @@ def default_execute(self, e: Exception = None, **kwargs): This method is called when `execute()` fails and `raise_exception` is False. It provides a fallback mechanism to return a default result instead of raising an exception. - - Args: - e: The exception that was raised during execution (if any) - **kwargs: Additional keyword arguments - - Returns: - Default execution result - - Example: - ```python - def default_execute(self, e: Exception = None, **kwargs): - logger.warning(f"Execution failed: {e}, returning default result") - return {"status": "error", "message": str(e)} - ``` """ @staticmethod - def build_context(context: FlowContext = None, **kwargs): + def build_context(context: FlowContext = None, **kwargs) -> FlowContext: """Build or update a flow context. Args: @@ -593,7 +534,7 @@ def vector_store(self) -> BaseVectorStore: return self._vector_store @property - def service_config_metadata(self) -> dict: + def service_metadata(self) -> dict: """Get the service config metadata for this operation. Returns: diff --git a/flowllm/core/storage/cache_handler.py b/flowllm/core/storage/cache_handler.py index d7ceee1..510601c 100644 --- a/flowllm/core/storage/cache_handler.py +++ b/flowllm/core/storage/cache_handler.py @@ -30,6 +30,23 @@ def __init__(self, cache_dir: Union[str, Path] = "cache"): self.metadata = {} self._load_metadata() + def set_cache_dir(self, cache_dir: Union[str, Path]): + """ + Set a new cache directory and reload metadata + + Args: + cache_dir: New cache directory path + """ + new_cache_dir = Path(cache_dir) + new_cache_dir.mkdir(parents=True, exist_ok=True) + + self.cache_dir = new_cache_dir + self.metadata_file = self.cache_dir / "metadata.json" + self.metadata = {} + self._load_metadata() + + logger.info(f"Cache directory changed to: {self.cache_dir}") + def _load_metadata(self): """Load metadata""" if self.metadata_file.exists():