From 0a5e0a142f58f5e511b579b1cc9f9c73ba59108b Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 10:29:18 +0200 Subject: [PATCH 01/14] infra: replace Weaviate with Milvus standalone in docker-compose --- agentic-rag-authorization/docker-compose.yml | 71 +++++++++++++++----- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/agentic-rag-authorization/docker-compose.yml b/agentic-rag-authorization/docker-compose.yml index df61d9b..c176a69 100644 --- a/agentic-rag-authorization/docker-compose.yml +++ b/agentic-rag-authorization/docker-compose.yml @@ -1,23 +1,58 @@ ---- -version: '3.8' +version: '3.5' services: - weaviate: - image: "cr.weaviate.io/semitechnologies/weaviate:latest" - ports: - - "8080:8080" # HTTP and gRPC both use this port + etcd: + container_name: milvus-etcd + image: quay.io/coreos/etcd:v3.5.5 + environment: + - ETCD_AUTO_COMPACTION_MODE=revision + - ETCD_AUTO_COMPACTION_RETENTION=1000 + - ETCD_QUOTA_BACKEND_BYTES=4294967296 + - ETCD_SNAPSHOT_COUNT=50000 + volumes: + - etcd_data:/etcd + command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + healthcheck: + test: ["CMD", "etcdctl", "endpoint", "health"] + interval: 30s + timeout: 20s + retries: 3 + + minio: + container_name: milvus-minio + image: minio/minio:RELEASE.2023-03-13T19-46-17Z environment: - QUERY_DEFAULTS_LIMIT: 25 - AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' - PERSISTENCE_DATA_PATH: '/var/lib/weaviate' - DEFAULT_VECTORIZER_MODULE: 'text2vec-openai' - ENABLE_MODULES: 'text2vec-openai' - CLUSTER_HOSTNAME: 'node1' - OPENAI_APIKEY: "${OPENAI_API_KEY}" + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin volumes: - - "weaviate_data:/var/lib/weaviate" - env_file: - - ".env" + - minio_data:/minio_data + command: minio server /minio_data --console-address ":9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + standalone: + container_name: milvus-standalone + image: milvusdb/milvus:v2.4.6 + command: ["milvus", "run", "standalone"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + volumes: + - milvus_data:/var/lib/milvus + ports: + - "19530:19530" + - "9091:9091" + depends_on: + - etcd + - minio + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"] + interval: 30s + timeout: 20s + retries: 3 spicedb: image: "authzed/spicedb:latest" @@ -28,4 +63,6 @@ services: SPICEDB_DATASTORE_ENGINE: "memory" volumes: - weaviate_data: + etcd_data: + minio_data: + milvus_data: From 7ced47195bad048b13adde9b9c103aeda0bae14b Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 10:50:51 +0200 Subject: [PATCH 02/14] deps: replace weaviate-client with pymilvus --- agentic-rag-authorization/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agentic-rag-authorization/requirements.txt b/agentic-rag-authorization/requirements.txt index c40bc86..f5401bd 100644 --- a/agentic-rag-authorization/requirements.txt +++ b/agentic-rag-authorization/requirements.txt @@ -3,7 +3,7 @@ langchain>=0.1.0 langchain-openai>=0.1.0 langgraph>=0.0.20 langchain-spicedb>=0.2.0 -weaviate-client>=3.26.0,<4.0 # v3 for REST API stability (no gRPC issues) +pymilvus>=2.4.0 authzed>=0.7.0 python-dotenv>=1.0.0 grpcio>=1.50.0 From 6062c63d120dd80b1b467454d159170c4a731782 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:10:39 +0200 Subject: [PATCH 03/14] feat: replace Weaviate config with Milvus config vars --- .../agentic_rag/config.py | 10 +++--- .../tests/test_config.py | 35 +++++++++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) create mode 100644 agentic-rag-authorization/tests/test_config.py diff --git a/agentic-rag-authorization/agentic_rag/config.py b/agentic-rag-authorization/agentic_rag/config.py index 63bbab8..cc24fba 100644 --- a/agentic-rag-authorization/agentic_rag/config.py +++ b/agentic-rag-authorization/agentic_rag/config.py @@ -13,9 +13,9 @@ class Config: """Configuration for agentic RAG system.""" - # Weaviate - weaviate_url: str - weaviate_api_key: Optional[str] + # Milvus + milvus_uri: str + milvus_token: str # SpiceDB spicedb_endpoint: str @@ -34,8 +34,8 @@ class Config: def from_env(cls): """Load configuration from environment variables.""" return cls( - weaviate_url=os.getenv("WEAVIATE_URL", "http://localhost:8080"), - weaviate_api_key=os.getenv("WEAVIATE_API_KEY"), + milvus_uri=os.getenv("MILVUS_URI", "http://localhost:19530"), + milvus_token=os.getenv("MILVUS_TOKEN", ""), spicedb_endpoint=os.getenv("SPICEDB_ENDPOINT", "localhost:50051"), spicedb_token=os.getenv("SPICEDB_TOKEN", "devtoken"), openai_api_key=os.getenv("OPENAI_API_KEY", ""), diff --git a/agentic-rag-authorization/tests/test_config.py b/agentic-rag-authorization/tests/test_config.py new file mode 100644 index 0000000..fb1ebd0 --- /dev/null +++ b/agentic-rag-authorization/tests/test_config.py @@ -0,0 +1,35 @@ +"""Tests for config loading.""" + +import os +from unittest.mock import patch +from agentic_rag.config import Config + + +def test_config_loads_milvus_uri(): + env = { + "MILVUS_URI": "http://milvus-host:19530", + "OPENAI_API_KEY": "sk-test", + "SPICEDB_TOKEN": "tok", + "SPICEDB_ENDPOINT": "localhost:50051", + } + with patch.dict(os.environ, env, clear=True): + config = Config.from_env() + assert config.milvus_uri == "http://milvus-host:19530" + + +def test_config_milvus_defaults(): + env = { + "OPENAI_API_KEY": "sk-test", + "SPICEDB_TOKEN": "tok", + "SPICEDB_ENDPOINT": "localhost:50051", + } + with patch.dict(os.environ, env, clear=True): + config = Config.from_env() + assert config.milvus_uri == "http://localhost:19530" + assert config.milvus_token == "" + + +def test_config_has_no_weaviate_fields(): + config = Config.from_env() + assert not hasattr(config, "weaviate_url") + assert not hasattr(config, "weaviate_api_key") From 8915a3cfc7e4dfc16f934ec46d53e5bc72950119 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:31:55 +0200 Subject: [PATCH 04/14] chore: remove unused Optional import from config --- agentic-rag-authorization/agentic_rag/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/agentic-rag-authorization/agentic_rag/config.py b/agentic-rag-authorization/agentic_rag/config.py index cc24fba..749c2f7 100644 --- a/agentic-rag-authorization/agentic_rag/config.py +++ b/agentic-rag-authorization/agentic_rag/config.py @@ -2,7 +2,6 @@ from dataclasses import dataclass from functools import lru_cache -from typing import Optional import os from dotenv import load_dotenv From 7672adefbb1b9c1edfbc884731677b1780906812 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:33:29 +0200 Subject: [PATCH 05/14] feat: add MilvusClient singleton --- .../agentic_rag/milvus_client.py | 26 +++++++++++++++++++ .../tests/test_milvus_client.py | 25 ++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 agentic-rag-authorization/agentic_rag/milvus_client.py create mode 100644 agentic-rag-authorization/tests/test_milvus_client.py diff --git a/agentic-rag-authorization/agentic_rag/milvus_client.py b/agentic-rag-authorization/agentic_rag/milvus_client.py new file mode 100644 index 0000000..31b3dbd --- /dev/null +++ b/agentic-rag-authorization/agentic_rag/milvus_client.py @@ -0,0 +1,26 @@ +"""Milvus client connection pooling.""" + +from pymilvus import MilvusClient +from threading import Lock +from typing import Optional + +_milvus_client: Optional[MilvusClient] = None +_milvus_lock = Lock() + + +def get_milvus_client(uri: str) -> MilvusClient: + """Get or create reusable MilvusClient (singleton, thread-safe).""" + global _milvus_client + if _milvus_client is not None: + return _milvus_client + with _milvus_lock: + if _milvus_client is None: + _milvus_client = MilvusClient(uri=uri) + return _milvus_client + + +def reset_milvus_client(): + """Reset singleton (useful for testing).""" + global _milvus_client + with _milvus_lock: + _milvus_client = None diff --git a/agentic-rag-authorization/tests/test_milvus_client.py b/agentic-rag-authorization/tests/test_milvus_client.py new file mode 100644 index 0000000..b4c9928 --- /dev/null +++ b/agentic-rag-authorization/tests/test_milvus_client.py @@ -0,0 +1,25 @@ +"""Unit tests for Milvus client singleton.""" + +from unittest.mock import patch, MagicMock + + +def test_get_milvus_client_returns_singleton(): + from agentic_rag.milvus_client import get_milvus_client, reset_milvus_client + reset_milvus_client() + with patch("agentic_rag.milvus_client.MilvusClient") as mock_cls: + mock_cls.return_value = MagicMock() + client1 = get_milvus_client("http://localhost:19530") + client2 = get_milvus_client("http://localhost:19530") + assert client1 is client2 + mock_cls.assert_called_once_with(uri="http://localhost:19530") + + +def test_reset_clears_singleton(): + from agentic_rag.milvus_client import get_milvus_client, reset_milvus_client + reset_milvus_client() + with patch("agentic_rag.milvus_client.MilvusClient") as mock_cls: + mock_cls.return_value = MagicMock() + get_milvus_client("http://localhost:19530") + reset_milvus_client() + get_milvus_client("http://localhost:19530") + assert mock_cls.call_count == 2 From 4953cb265fc86cd810febd1acd7254408f601075 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:35:59 +0200 Subject: [PATCH 06/14] feat: replace Weaviate BM25 retrieval with Milvus semantic vector search --- .../agentic_rag/nodes/retrieval_node.py | 59 ++++++------- .../tests/test_retrieval_node.py | 87 +++++++++++++++++++ 2 files changed, 112 insertions(+), 34 deletions(-) create mode 100644 agentic-rag-authorization/tests/test_retrieval_node.py diff --git a/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py b/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py index 6c83f42..df075e5 100644 --- a/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py +++ b/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py @@ -1,60 +1,55 @@ -"""Retrieval node - retrieve documents from Weaviate.""" +"""Retrieval node - retrieve documents from Milvus using semantic vector search.""" +import openai from langchain_core.messages import SystemMessage from langchain_core.documents import Document from ..state import AgenticRAGState from ..config import get_config from ..logging_config import get_logger -from ..weaviate_client import get_weaviate_client +from ..milvus_client import get_milvus_client from ..node_helpers import log_node_execution logger = get_logger("nodes.retrieval") -def retrieval_node(state: AgenticRAGState) -> dict: - """Retrieve documents from Weaviate based on query. +def _embed(text: str, api_key: str) -> list[float]: + client = openai.OpenAI(api_key=api_key) + response = client.embeddings.create(model="text-embedding-3-small", input=text) + return response.data[0].embedding + - This node performs keyword search in Weaviate to find - relevant documents. Authorization happens in the next node. - """ +def retrieval_node(state: AgenticRAGState) -> dict: + """Retrieve documents from Milvus based on semantic similarity to the query.""" config = get_config() with log_node_execution( logger, "retrieval", - { - "query": state["query"], - "subject_id": state["subject_id"], - } + {"query": state["query"], "subject_id": state["subject_id"]}, ): try: - # Get or create Weaviate client (reused across requests) - weaviate_client = get_weaviate_client(config.weaviate_url) + milvus_client = get_milvus_client(config.milvus_uri) + query_embedding = _embed(state["query"], config.openai_api_key) - # Perform BM25 keyword search using v3 API - response = ( - weaviate_client.query.get("Documents", ["doc_id", "title", "content", "department", "classification"]) - .with_bm25(query=state["query"]) - .with_limit(5) - .do() + results = milvus_client.search( + collection_name="Documents", + data=[query_embedding], + limit=5, + output_fields=["doc_id", "title", "content", "department", "classification"], ) - # Extract results - results = response.get("data", {}).get("Get", {}).get("Documents", []) - - # Convert to LangChain Documents documents = [ Document( - page_content=result["content"], + page_content=hit["entity"]["content"], metadata={ - "doc_id": result["doc_id"], - "title": result["title"], - "department": result["department"], - "classification": result["classification"], + "doc_id": hit["entity"]["doc_id"], + "title": hit["entity"]["title"], + "department": hit["entity"]["department"], + "classification": hit["entity"]["classification"], }, ) - for result in results + for hit in results[0] ] logger.info( @@ -69,9 +64,7 @@ def retrieval_node(state: AgenticRAGState) -> dict: "retrieved_documents": documents, "retrieval_attempt": state["retrieval_attempt"] + 1, "messages": [ - SystemMessage( - content=f"Retrieved {len(documents)} documents from Weaviate" - ) + SystemMessage(content=f"Retrieved {len(documents)} documents from Milvus") ], } @@ -85,8 +78,6 @@ def retrieval_node(state: AgenticRAGState) -> dict: }, exc_info=True, ) - - # Fail gracefully - return empty results return { "retrieved_documents": [], "retrieval_attempt": state["retrieval_attempt"] + 1, diff --git a/agentic-rag-authorization/tests/test_retrieval_node.py b/agentic-rag-authorization/tests/test_retrieval_node.py new file mode 100644 index 0000000..5276e58 --- /dev/null +++ b/agentic-rag-authorization/tests/test_retrieval_node.py @@ -0,0 +1,87 @@ +"""Unit tests for retrieval node.""" + +import pytest +from unittest.mock import patch, MagicMock +from langchain_core.documents import Document + + +@pytest.fixture +def sample_state(): + return { + "query": "What are engineering best practices?", + "subject_id": "alice", + "max_attempts": 1, + "retrieval_attempt": 0, + "messages": [], + "reasoning": [], + "retrieved_documents": [], + "authorized_documents": [], + "denied_count": 0, + "authorization_passed": False, + "answer": "", + } + + +def _make_hit(doc_id="doc-001", title="Test Doc", content="Content", dept="engineering", classification="internal"): + return { + "entity": { + "doc_id": doc_id, + "title": title, + "content": content, + "department": dept, + "classification": classification, + } + } + + +def test_retrieval_node_returns_documents(sample_state): + from agentic_rag.nodes.retrieval_node import retrieval_node + + with patch("agentic_rag.nodes.retrieval_node.get_milvus_client") as mock_get_client, \ + patch("agentic_rag.nodes.retrieval_node._embed") as mock_embed: + mock_embed.return_value = [0.1] * 1536 + mock_client = MagicMock() + mock_client.search.return_value = [[_make_hit()]] + mock_get_client.return_value = mock_client + + result = retrieval_node(sample_state) + + assert len(result["retrieved_documents"]) == 1 + doc = result["retrieved_documents"][0] + assert isinstance(doc, Document) + assert doc.metadata["doc_id"] == "doc-001" + assert doc.metadata["title"] == "Test Doc" + assert doc.metadata["department"] == "engineering" + assert doc.metadata["classification"] == "internal" + assert doc.page_content == "Content" + assert result["retrieval_attempt"] == 1 + + +def test_retrieval_node_increments_attempt_on_failure(sample_state): + from agentic_rag.nodes.retrieval_node import retrieval_node + + with patch("agentic_rag.nodes.retrieval_node.get_milvus_client") as mock_get_client, \ + patch("agentic_rag.nodes.retrieval_node._embed") as mock_embed: + mock_embed.side_effect = RuntimeError("OpenAI unavailable") + mock_get_client.return_value = MagicMock() + + result = retrieval_node(sample_state) + + assert result["retrieved_documents"] == [] + assert result["retrieval_attempt"] == 1 + + +def test_embed_calls_openai(): + from agentic_rag.nodes.retrieval_node import _embed + + mock_response = MagicMock() + mock_response.data = [MagicMock(embedding=[0.5] * 1536)] + + with patch("agentic_rag.nodes.retrieval_node.openai.OpenAI") as mock_oai: + mock_oai.return_value.embeddings.create.return_value = mock_response + result = _embed("hello world", "sk-test") + + assert result == [0.5] * 1536 + mock_oai.return_value.embeddings.create.assert_called_once_with( + model="text-embedding-3-small", input="hello world" + ) From f837f45ac82543930e766ed42a586d5e3b089f49 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:38:24 +0200 Subject: [PATCH 07/14] feat: replace setup_weaviate with setup_milvus using OpenAI embeddings --- .../examples/setup_environment.py | 173 ++++++++---------- 1 file changed, 76 insertions(+), 97 deletions(-) diff --git a/agentic-rag-authorization/examples/setup_environment.py b/agentic-rag-authorization/examples/setup_environment.py index 8d2cc0e..a9e798c 100644 --- a/agentic-rag-authorization/examples/setup_environment.py +++ b/agentic-rag-authorization/examples/setup_environment.py @@ -1,16 +1,15 @@ -"""Initialize Weaviate and SpiceDB with sample data.""" +"""Initialize Milvus and SpiceDB with sample data.""" import sys import os from dotenv import load_dotenv -# Load environment variables load_dotenv() -# Add parent directory to path so we can import from agentic_rag sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -import weaviate +import openai +from pymilvus import MilvusClient, DataType from authzed.api.v1 import ( WriteSchemaRequest, WriteRelationshipsRequest, @@ -20,9 +19,7 @@ SubjectReference, ) from agentic_rag.grpc_helpers import create_insecure_spicedb_client -import json -# Add scripts directory to path for document parser sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'scripts')) from parse_documents import load_all_documents @@ -33,7 +30,6 @@ def setup_spicedb(): client = create_insecure_spicedb_client("localhost:50051", "devtoken") - # Load schema schema_path = os.path.join(os.path.dirname(__file__), "..", "data", "schema.zed") with open(schema_path) as f: schema = f.read() @@ -41,57 +37,43 @@ def setup_spicedb(): client.WriteSchema(WriteSchemaRequest(schema=schema)) print(" ✅ Schema loaded") - # Load documents to get all doc_ids documents = load_all_documents() - # Create user-department relationships updates = [ - # Alice is in engineering department RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, relationship=Relationship( - resource=ObjectReference( - object_type="department", object_id="engineering" - ), + resource=ObjectReference(object_type="department", object_id="engineering"), relation="member", subject=SubjectReference( object=ObjectReference(object_type="user", object_id="alice") ), ), ), - # Bob is in sales department RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, relationship=Relationship( - resource=ObjectReference( - object_type="department", object_id="sales" - ), + resource=ObjectReference(object_type="department", object_id="sales"), relation="member", subject=SubjectReference( object=ObjectReference(object_type="user", object_id="bob") ), ), ), - # HR manager is in HR department RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, relationship=Relationship( - resource=ObjectReference( - object_type="department", object_id="hr" - ), + resource=ObjectReference(object_type="department", object_id="hr"), relation="member", subject=SubjectReference( object=ObjectReference(object_type="user", object_id="hr_manager") ), ), ), - # Finance manager is in finance department RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, relationship=Relationship( - resource=ObjectReference( - object_type="department", object_id="finance" - ), + resource=ObjectReference(object_type="department", object_id="finance"), relation="member", subject=SubjectReference( object=ObjectReference(object_type="user", object_id="finance_manager") @@ -100,12 +82,10 @@ def setup_spicedb(): ), ] - # Create department-based document permissions for doc in documents: doc_id = doc['doc_id'] dept = doc['department'] - # Public documents: accessible to all users if dept == "public": for user in ["alice", "bob", "hr_manager", "finance_manager"]: updates.append( @@ -121,7 +101,6 @@ def setup_spicedb(): ) ) else: - # Department documents: accessible to department members updates.append( RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, @@ -129,21 +108,17 @@ def setup_spicedb(): resource=ObjectReference(object_type="document", object_id=doc_id), relation="viewer", subject=SubjectReference( - object=ObjectReference( - object_type="department", - object_id=dept, - ), + object=ObjectReference(object_type="department", object_id=dept), optional_relation="member", ), ), ) ) - # Cross-department documents cross_dept_docs = [ - ("engineering-architecture-001", "sales"), # Tech sales need architecture docs - ("sales-guide-005", "engineering"), # Engineering needs to understand product positioning - ("hr-policy-001", "finance"), # Finance needs HR policies for budget planning + ("engineering-architecture-001", "sales"), + ("sales-guide-005", "engineering"), + ("hr-policy-001", "finance"), ] for doc_id, additional_dept in cross_dept_docs: @@ -154,21 +129,17 @@ def setup_spicedb(): resource=ObjectReference(object_type="document", object_id=doc_id), relation="viewer", subject=SubjectReference( - object=ObjectReference( - object_type="department", - object_id=additional_dept, - ), + object=ObjectReference(object_type="department", object_id=additional_dept), optional_relation="member", ), ), ) ) - # Individual user exceptions individual_exceptions = [ - ("alice", "sales-proposal-001"), # Alice needs to see a technical sales proposal - ("finance_manager", "hr-policy-002"), # Finance manager needs HR compensation policy - ("bob", "engineering-guide-006"), # Bob needs technical documentation for sales + ("alice", "sales-proposal-001"), + ("finance_manager", "hr-policy-002"), + ("bob", "engineering-guide-006"), ] for user, doc_id in individual_exceptions: @@ -199,62 +170,70 @@ def setup_spicedb(): print(f" - Public access: 5 documents accessible to all users") -def setup_weaviate(): - """Setup Weaviate with sample documents.""" - print("\nSetting up Weaviate...") - - # Connect to Weaviate v3 (REST API) - client = weaviate.Client("http://127.0.0.1:8080") - - try: - # Check if class exists and delete it - try: - client.schema.delete_class("Documents") - print(" ✅ Deleted existing Documents class") - except: - pass - - # Create schema using v3 API (no vectorizer since we're using BM25 keyword search) - schema = { - "class": "Documents", - "vectorizer": "none", # Disable vectorization for BM25 keyword search - "properties": [ - {"name": "doc_id", "dataType": ["text"]}, - {"name": "title", "dataType": ["text"]}, - {"name": "content", "dataType": ["text"]}, - {"name": "department", "dataType": ["text"]}, - {"name": "classification", "dataType": ["text"]}, - ], - } - client.schema.create_class(schema) - print(" ✅ Documents class created") - - # Load documents from .txt files - documents = load_all_documents() - print(f" ✅ Loaded {len(documents)} documents from data/documents/") - - # Insert documents using v3 API - with client.batch as batch: - for doc in documents: - batch.add_data_object( - data_object=doc, - class_name="Documents", - ) +def setup_milvus(): + """Setup Milvus with sample documents using OpenAI semantic embeddings.""" + print("\nSetting up Milvus...") + + milvus_uri = os.getenv("MILVUS_URI", "http://localhost:19530") + openai_api_key = os.getenv("OPENAI_API_KEY", "") - print(f" ✅ Inserted {len(documents)} documents") - print(" Document Distribution:") + client = MilvusClient(uri=milvus_uri) + oai_client = openai.OpenAI(api_key=openai_api_key) - # Count by department - dept_counts = {} - for doc in documents: - dept = doc['department'] - dept_counts[dept] = dept_counts.get(dept, 0) + 1 + if client.has_collection("Documents"): + client.drop_collection("Documents") + print(" ✅ Dropped existing Documents collection") - for dept, count in sorted(dept_counts.items()): - print(f" - {dept}: {count} documents") + schema = client.create_schema(auto_id=False, enable_dynamic_field=False) + schema.add_field("doc_id", DataType.VARCHAR, max_length=256, is_primary=True) + schema.add_field("title", DataType.VARCHAR, max_length=512) + schema.add_field("content", DataType.VARCHAR, max_length=65535) + schema.add_field("department", DataType.VARCHAR, max_length=128) + schema.add_field("classification", DataType.VARCHAR, max_length=128) + schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=1536) + + index_params = client.prepare_index_params() + index_params.add_index( + field_name="embedding", + metric_type="COSINE", + index_type="IVF_FLAT", + params={"nlist": 128}, + ) + + client.create_collection("Documents", schema=schema, index_params=index_params) + print(" ✅ Documents collection created") + + documents = load_all_documents() + print(f" ✅ Loaded {len(documents)} documents from data/documents/") + + rows = [] + for i, doc in enumerate(documents): + response = oai_client.embeddings.create( + model="text-embedding-3-small", + input=doc["content"], + ) + rows.append({ + "doc_id": doc["doc_id"], + "title": doc["title"], + "content": doc["content"], + "department": doc["department"], + "classification": doc["classification"], + "embedding": response.data[0].embedding, + }) + if (i + 1) % 10 == 0: + print(f" Embedded {i + 1}/{len(documents)} documents...") + + client.insert("Documents", rows) + print(f" ✅ Inserted {len(rows)} documents with embeddings") + + dept_counts = {} + for doc in documents: + dept = doc['department'] + dept_counts[dept] = dept_counts.get(dept, 0) + 1 - finally: - pass # v3 client doesn't need explicit close + print(" Document Distribution:") + for dept, count in sorted(dept_counts.items()): + print(f" - {dept}: {count} documents") def main(): @@ -264,7 +243,7 @@ def main(): print("=" * 60) setup_spicedb() - setup_weaviate() + setup_milvus() print("\n" + "=" * 60) print("✅ Setup complete!") From 7f78f524c1faa4dcc2e6fca56d0170716a713bc8 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:39:29 +0200 Subject: [PATCH 08/14] chore: update conftest and health check for Milvus --- agentic-rag-authorization/api/routes.py | 2 +- agentic-rag-authorization/tests/conftest.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/agentic-rag-authorization/api/routes.py b/agentic-rag-authorization/api/routes.py index 707c3cb..59a033b 100644 --- a/agentic-rag-authorization/api/routes.py +++ b/agentic-rag-authorization/api/routes.py @@ -42,7 +42,7 @@ async def health_check(): return { "status": "healthy", "services": { - "weaviate": "connected", + "milvus": "connected", "spicedb": "connected", "openai": "configured", }, diff --git a/agentic-rag-authorization/tests/conftest.py b/agentic-rag-authorization/tests/conftest.py index 1b1c084..52109b6 100644 --- a/agentic-rag-authorization/tests/conftest.py +++ b/agentic-rag-authorization/tests/conftest.py @@ -1,16 +1,14 @@ """Test fixtures for agentic RAG tests.""" import pytest -import weaviate +from pymilvus import MilvusClient from agentic_rag.grpc_helpers import create_insecure_spicedb_client @pytest.fixture -def weaviate_client(): - """Create Weaviate client for tests.""" - client = weaviate.connect_to_local() - yield client - client.close() +def milvus_client(): + """Create MilvusClient for tests.""" + return MilvusClient(uri="http://localhost:19530") @pytest.fixture From ff89e5255d432c2a32f3e13f81c32472627daac5 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:40:21 +0200 Subject: [PATCH 09/14] chore: remove Weaviate client module --- .../agentic_rag/__init__.py | 2 +- .../agentic_rag/weaviate_client.py | 48 ------------------- 2 files changed, 1 insertion(+), 49 deletions(-) delete mode 100644 agentic-rag-authorization/agentic_rag/weaviate_client.py diff --git a/agentic-rag-authorization/agentic_rag/__init__.py b/agentic-rag-authorization/agentic_rag/__init__.py index 0531d2b..f12f9b8 100644 --- a/agentic-rag-authorization/agentic_rag/__init__.py +++ b/agentic-rag-authorization/agentic_rag/__init__.py @@ -1,4 +1,4 @@ -"""Agentic RAG with fine-grained authorization using Weaviate and SpiceDB.""" +"""Agentic RAG with fine-grained authorization using Milvus and SpiceDB.""" __version__ = "0.1.0" diff --git a/agentic-rag-authorization/agentic_rag/weaviate_client.py b/agentic-rag-authorization/agentic_rag/weaviate_client.py deleted file mode 100644 index 0c11fda..0000000 --- a/agentic-rag-authorization/agentic_rag/weaviate_client.py +++ /dev/null @@ -1,48 +0,0 @@ -"""Weaviate client connection pooling.""" - -import weaviate -from threading import Lock -from typing import Optional - -# Global singleton for Weaviate client with thread-safe initialization -_weaviate_client: Optional[weaviate.Client] = None -_weaviate_lock = Lock() - - -def get_weaviate_client(url: str) -> weaviate.Client: - """ - Get or create reusable Weaviate client (singleton, thread-safe). - - This function provides connection pooling for Weaviate by maintaining - a single client instance across requests, eliminating connection overhead. - - Args: - url: The Weaviate URL (e.g., "http://localhost:8080") - - Returns: - weaviate.Client configured for the given URL - """ - global _weaviate_client - - # Fast path: client already exists - if _weaviate_client is not None: - return _weaviate_client - - # Slow path: create new client with thread-safe lock - with _weaviate_lock: - # Double-check after acquiring lock - if _weaviate_client is None: - _weaviate_client = weaviate.Client(url) - - return _weaviate_client - - -def reset_weaviate_client(): - """ - Reset singleton (useful for testing). - - This allows tests to clear the cached client and create a fresh one. - """ - global _weaviate_client - with _weaviate_lock: - _weaviate_client = None From 7717ca31c80bd39e6777f86baf0d36d53e5dc097 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 13:24:36 +0200 Subject: [PATCH 10/14] fix: address post-review issues - wire token, add anns_field, fix run_ui, cleanup --- .../agentic_rag/graph.py | 2 +- .../agentic_rag/logging_config.py | 2 +- .../agentic_rag/milvus_client.py | 4 +-- .../agentic_rag/nodes/retrieval_node.py | 3 ++- agentic-rag-authorization/api/routes.py | 2 +- agentic-rag-authorization/docker-compose.yml | 8 +++--- agentic-rag-authorization/run_ui.py | 25 +++++++++++-------- .../test_improvements.py | 12 ++++----- .../tests/test_milvus_client.py | 2 +- 9 files changed, 34 insertions(+), 26 deletions(-) diff --git a/agentic-rag-authorization/agentic_rag/graph.py b/agentic-rag-authorization/agentic_rag/graph.py index eb980bf..01f0ed1 100644 --- a/agentic-rag-authorization/agentic_rag/graph.py +++ b/agentic-rag-authorization/agentic_rag/graph.py @@ -55,7 +55,7 @@ def build_agentic_rag_graph(): """Build the agentic RAG graph with deterministic authorization. Simplified Flow: - 1. Retrieval: Fetch documents from Weaviate + 1. Retrieval: Fetch documents from Milvus 2. Authorization: Deterministic permission check (security boundary) 3. Conditional: - If authorized docs exist: Generate answer diff --git a/agentic-rag-authorization/agentic_rag/logging_config.py b/agentic-rag-authorization/agentic_rag/logging_config.py index a7dbdfd..c901a3a 100644 --- a/agentic-rag-authorization/agentic_rag/logging_config.py +++ b/agentic-rag-authorization/agentic_rag/logging_config.py @@ -104,7 +104,7 @@ def setup_logging(level: str = "INFO") -> None: logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("openai").setLevel(logging.WARNING) - logging.getLogger("weaviate").setLevel(logging.WARNING) + logging.getLogger("pymilvus").setLevel(logging.WARNING) logging.getLogger("grpc").setLevel(logging.WARNING) diff --git a/agentic-rag-authorization/agentic_rag/milvus_client.py b/agentic-rag-authorization/agentic_rag/milvus_client.py index 31b3dbd..44f14f6 100644 --- a/agentic-rag-authorization/agentic_rag/milvus_client.py +++ b/agentic-rag-authorization/agentic_rag/milvus_client.py @@ -8,14 +8,14 @@ _milvus_lock = Lock() -def get_milvus_client(uri: str) -> MilvusClient: +def get_milvus_client(uri: str, token: str = "") -> MilvusClient: """Get or create reusable MilvusClient (singleton, thread-safe).""" global _milvus_client if _milvus_client is not None: return _milvus_client with _milvus_lock: if _milvus_client is None: - _milvus_client = MilvusClient(uri=uri) + _milvus_client = MilvusClient(uri=uri, token=token) return _milvus_client diff --git a/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py b/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py index df075e5..83e3ba4 100644 --- a/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py +++ b/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py @@ -29,12 +29,13 @@ def retrieval_node(state: AgenticRAGState) -> dict: {"query": state["query"], "subject_id": state["subject_id"]}, ): try: - milvus_client = get_milvus_client(config.milvus_uri) + milvus_client = get_milvus_client(config.milvus_uri, config.milvus_token) query_embedding = _embed(state["query"], config.openai_api_key) results = milvus_client.search( collection_name="Documents", data=[query_embedding], + anns_field="embedding", limit=5, output_fields=["doc_id", "title", "content", "department", "classification"], ) diff --git a/agentic-rag-authorization/api/routes.py b/agentic-rag-authorization/api/routes.py index 59a033b..e18ab32 100644 --- a/agentic-rag-authorization/api/routes.py +++ b/agentic-rag-authorization/api/routes.py @@ -37,7 +37,7 @@ async def get_users(): @router.get("/health") async def health_check(): """Check health of backend services.""" - # TODO: Actually check Weaviate + SpiceDB connectivity + # TODO: Actually check Milvus + SpiceDB connectivity # For now, return optimistic health status return { "status": "healthy", diff --git a/agentic-rag-authorization/docker-compose.yml b/agentic-rag-authorization/docker-compose.yml index c176a69..07e2f18 100644 --- a/agentic-rag-authorization/docker-compose.yml +++ b/agentic-rag-authorization/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.5' +version: '3.8' services: etcd: @@ -46,8 +46,10 @@ services: - "19530:19530" - "9091:9091" depends_on: - - etcd - - minio + etcd: + condition: service_healthy + minio: + condition: service_healthy healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"] interval: 30s diff --git a/agentic-rag-authorization/run_ui.py b/agentic-rag-authorization/run_ui.py index a04b7c7..80d1f98 100755 --- a/agentic-rag-authorization/run_ui.py +++ b/agentic-rag-authorization/run_ui.py @@ -18,16 +18,16 @@ def check_services(): print(" Copy .env.example to .env and configure it") return False - # Check Weaviate + # Check Milvus try: from agentic_rag.config import get_config - from agentic_rag.weaviate_client import get_weaviate_client + from agentic_rag.milvus_client import get_milvus_client config = get_config() - weaviate_client = get_weaviate_client(config.weaviate_url) - print(" ✅ Weaviate connected") + milvus_client = get_milvus_client(config.milvus_uri, config.milvus_token) + print(" ✅ Milvus connected") except Exception as e: - print(f" ❌ Weaviate not available: {e}") + print(f" ❌ Milvus not available: {e}") print(" Run: docker-compose up -d") return False @@ -51,12 +51,17 @@ def check_services(): # Check if documents are loaded try: - result = weaviate_client.query.get("Documents", ["doc_id"]).with_limit(1).do() - doc_count = len(result.get("data", {}).get("Get", {}).get("Documents", [])) - if doc_count > 0: - print(" ✅ Documents loaded in Weaviate") + if milvus_client.has_collection("Documents"): + stats = milvus_client.get_collection_stats("Documents") + row_count = int(stats.get("row_count", 0)) + if row_count > 0: + print(" ✅ Documents loaded in Milvus") + else: + print(" ⚠️ No documents found in Milvus") + print(" Run: python examples/setup_environment.py") + return False else: - print(" ⚠️ No documents found in Weaviate") + print(" ⚠️ Documents collection does not exist in Milvus") print(" Run: python examples/setup_environment.py") return False except Exception as e: diff --git a/agentic-rag-authorization/test_improvements.py b/agentic-rag-authorization/test_improvements.py index 9ba9dd7..0849ecd 100644 --- a/agentic-rag-authorization/test_improvements.py +++ b/agentic-rag-authorization/test_improvements.py @@ -65,19 +65,19 @@ def test_connection_pooling(): reset_spicedb_client, _spicedb_client, ) - from agentic_rag.weaviate_client import ( - get_weaviate_client, - reset_weaviate_client, - _weaviate_client, + from agentic_rag.milvus_client import ( + get_milvus_client, + reset_milvus_client, + _milvus_client, ) # Verify reset functions exist assert callable(reset_spicedb_client) - assert callable(reset_weaviate_client) + assert callable(reset_milvus_client) print("✅ Connection pooling functions defined correctly") print(" - get_spicedb_client() available") - print(" - get_weaviate_client() available") + print(" - get_milvus_client() available") print(" - reset_*_client() functions available") return True except (ImportError, AssertionError) as e: diff --git a/agentic-rag-authorization/tests/test_milvus_client.py b/agentic-rag-authorization/tests/test_milvus_client.py index b4c9928..f489833 100644 --- a/agentic-rag-authorization/tests/test_milvus_client.py +++ b/agentic-rag-authorization/tests/test_milvus_client.py @@ -11,7 +11,7 @@ def test_get_milvus_client_returns_singleton(): client1 = get_milvus_client("http://localhost:19530") client2 = get_milvus_client("http://localhost:19530") assert client1 is client2 - mock_cls.assert_called_once_with(uri="http://localhost:19530") + mock_cls.assert_called_once_with(uri="http://localhost:19530", token="") def test_reset_clears_singleton(): From d03b9d5c0463102dc5b42d69f4370ce2cc5e8d56 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 13:45:36 +0200 Subject: [PATCH 11/14] fix: use query() instead of get_collection_stats() for document check in run_ui --- agentic-rag-authorization/run_ui.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/agentic-rag-authorization/run_ui.py b/agentic-rag-authorization/run_ui.py index 80d1f98..79cbeea 100755 --- a/agentic-rag-authorization/run_ui.py +++ b/agentic-rag-authorization/run_ui.py @@ -52,9 +52,13 @@ def check_services(): # Check if documents are loaded try: if milvus_client.has_collection("Documents"): - stats = milvus_client.get_collection_stats("Documents") - row_count = int(stats.get("row_count", 0)) - if row_count > 0: + results = milvus_client.query( + collection_name="Documents", + filter='doc_id != ""', + output_fields=["doc_id"], + limit=1, + ) + if results: print(" ✅ Documents loaded in Milvus") else: print(" ⚠️ No documents found in Milvus") From c9ea43b5fb357afe6537bd9c14bb9cc124ee464a Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 14:48:25 +0200 Subject: [PATCH 12/14] docs: reauthor README for Milvus migration; wire MAX_RETRIEVAL_ATTEMPTS through to UI - Replace all Weaviate references with Milvus (semantic vector search, text-embedding-3-small, MILVUS_URI/MILVUS_TOKEN config vars) - Update project structure, schema, state flow diagram, and Learn More links - api/models.py: max_attempts default now reads from get_config() instead of hardcoded 1 - ui/index.html: stop sending hardcoded max_attempts=1 so server default applies --- agentic-rag-authorization/README.md | 84 ++++++++++++------------- agentic-rag-authorization/api/models.py | 3 +- agentic-rag-authorization/ui/index.html | 2 +- 3 files changed, 43 insertions(+), 46 deletions(-) diff --git a/agentic-rag-authorization/README.md b/agentic-rag-authorization/README.md index 4598b4c..da57b3b 100644 --- a/agentic-rag-authorization/README.md +++ b/agentic-rag-authorization/README.md @@ -1,9 +1,8 @@ # Agentic RAG with Fine-Grained Authorization +This repository demonstrates how to combine agentic behavior with deterministic fine-grained authorization using LangGraph, SpiceDB, and Milvus. You'll learn to build RAG systems where a user can only see information from the documents they have access to. -This repository demonstrates how to combine agentic behavior with deterministic fine-grained authorization using LangGraph, SpiceDB, and Weaviate. You'll learn to build RAG systems where a user can view information only based on the documents they have access to. - -This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spicedb/) library +This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spicedb/) library.  @@ -17,10 +16,10 @@ This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spi This repo demonstrates: -1. **Fine-grained authorization in RAG** - How to enforce document-level permissions with SpiceDB to ensure the user only information based on what they have access to -2. **Security architecture** - Deterministic authorization boundary that cannot be bypassed +1. **Fine-grained authorization in RAG** - How to enforce document-level permissions with SpiceDB so users only see what they're allowed to see +2. **Security architecture** - A deterministic authorization boundary that cannot be bypassed by the agent 3. **Production features** - Structured logging, connection pooling, batch operations, error handling -4. **Real-world complexity** - 50 documents, 4 permission patterns with hierarchies. +4. **Real-world complexity** - 50 documents, 4 permission patterns with hierarchies Note: Despite the "agentic RAG" name, the default mode is intentionally simple and deterministic (3 nodes: retrieve → authorize → generate). This provides fast, predictable behavior suitable for most use cases. @@ -31,14 +30,14 @@ Traditional RAG retrieves documents by semantic similarity without considering p 1. **Security risk**: Users might see documents they shouldn't access 2. **Poor UX**: Silent failures when documents are denied, with no explanation -Read the [OWASP Top 10 for LLM](https://owasp.org/www-project-top-10-for-large-language-model-applications/) and [OWASP Top 10 Risks to Web Apps](https://owasp.org/Top10/2025/A01_2025-Broken_Access_Control/) for more information on why access control matters. +Read the [OWASP Top 10 for LLM](https://owasp.org/www-project-top-10-for-large-language-model-applications/) and [OWASP Top 10 Risks to Web Apps](https://owasp.org/Top10/2025/A01_2025-Broken_Access_Control/) for more on why access control matters. ## The Solution This implementation shows how to combine: -- **Retrieval-first approach**: Direct semantic/keyword search without upfront planning overhead +- **Retrieval-first approach**: Semantic vector search without upfront planning overhead - **Deterministic security**: SpiceDB authorization that cannot be bypassed -- **Transparency**: Users understand what they can/can't access and why +- **Transparency**: Users understand what they can and can't access, and why ``` Traditional RAG: Query → Retrieve → Generate @@ -123,29 +122,30 @@ pip install -r requirements.txt # Includes fastapi and uvicorn python3 run_ui.py ``` -The `setup-environment.py` file sets up Weaviate as the vector DB and SpiceDB with sample documents and department-based access control for the agentic RAG system. - -We're creating a schema and writing relationships for a hierarchical permission model with users assigned to departments, department-wide document access, 3 cross-department collaboration grants, and 3 individual user exceptions. +The `setup_environment.py` script sets up Milvus as the vector database and SpiceDB with sample documents and department-based access control. It embeds all 50 documents using OpenAI's `text-embedding-3-small` and inserts them into Milvus, then writes a hierarchical permission model to SpiceDB: users assigned to departments, department-wide document access, 3 cross-department collaboration grants, and 3 individual user exceptions. The UI launcher will: -- Verify documents are loaded -- Starts the FastAPI server +- Verify documents are loaded in Milvus +- Start the FastAPI server - Open your browser to http://localhost:8000 -Here are few sample prompts you can run: +Here are a few sample prompts to try: -Choose "Bob" from "Sales" as the user and the query as "What are the company handbook guidelines?" +Choose "Bob" from "Sales" as the user and run the query "What are the company handbook guidelines?" -You should see: +You should see: +``` 📊 Retrieved: 5 ✅ Authorized: 3 ❌ Denied: 2 +``` -Now run the same query as the "HR Manager". You should see: +Now run the same query as "HR Manager": +``` 📊 Retrieved: 5 ✅ Authorized: 5 ❌ Denied: 0 - +``` ### Manual Start @@ -162,7 +162,7 @@ open http://localhost:8000 ## Run Without UI -``` +```bash # Initialize data python3 examples/setup_environment.py @@ -182,8 +182,11 @@ definition department { } definition document { + relation owner: user relation viewer: user | department#member - permission view = viewer + + permission view = viewer + owner + permission edit = owner } ``` @@ -198,7 +201,7 @@ definition document { ``` User Query ↓ -Retrieval Node ← Weaviate BM25 keyword search +Retrieval Node ← Milvus semantic vector search (text-embedding-3-small) ↓ Authorization Node ← SpiceDB filters (SECURITY BOUNDARY - cannot be bypassed) ↓ @@ -225,7 +228,7 @@ Reasoning Node ← LLM decides: retry with different query, or give up? Generation Node ← explains the denial ``` -For example, if Bob (sales) asks about "microservices architecture" and the first retrieval returns only engineering-only docs, the reasoning node might try a broader query that surfaces a shared architecture doc Bob can actually access. +For example, if Bob (sales) asks about "microservices architecture" and the first retrieval returns only engineering-restricted docs, the reasoning node might try a broader query that surfaces a shared architecture doc Bob can actually access. Enable it by setting `MAX_RETRIEVAL_ATTEMPTS` in `.env` (or passing `max_attempts` directly): @@ -241,31 +244,30 @@ result = run_agentic_rag(query="...", subject_id="bob", max_attempts=3) ### 3. Security Guarantees -- **Authorization always runs**: Hardcoded in LangGraph workflow, agent cannot skip -- **Deterministic checks**: SpiceDB enforces permissions (no LLM involved) +- **Authorization always runs**: Hardcoded in the LangGraph workflow — the agent cannot skip it +- **Deterministic checks**: SpiceDB enforces permissions (no LLM involved in the decision) - **Fail closed**: Access denied unless explicitly granted - **Observable**: Full audit trail in state ## Project Structure ``` -agentic-rag-weaviate/ +agentic-rag-authorization/ ├── agentic_rag/ │ ├── graph.py # LangGraph state machine │ ├── state.py # State schema │ ├── config.py # Configuration management │ ├── nodes/ -│ │ ├── retrieval_node.py # Weaviate BM25 search +│ │ ├── retrieval_node.py # Milvus semantic vector search │ │ ├── authorization_node.py # SpiceDB filtering (security boundary) -│ │ ├── reasoning_node.py # Optional: adaptive retry logic -│ │ └── generation_node.py # Final answer with context -│ ├── authorization_helpers.py # Batch permission checking -│ ├── weaviate_client.py # Connection pooling for Weaviate +│ │ ├── reasoning_node.py # Optional: adaptive retry logic +│ │ └── generation_node.py # Final answer with context +│ ├── milvus_client.py # Connection pooling for Milvus │ ├── grpc_helpers.py # Connection pooling for SpiceDB │ ├── logging_config.py # Structured JSON logging │ └── validation.py # Input validation and sanitization ├── examples/ -│ ├── setup_environment.py # Initialize data (loads 50 documents) +│ ├── setup_environment.py # Initialize data (embeds and loads 50 documents) │ └── basic_example.py # 8 demo scenarios ├── scripts/ │ ├── generate_documents.py # Generate 50 .txt files @@ -275,7 +277,7 @@ agentic-rag-weaviate/ │ ├── documents/ # 50 .txt files (5 departments) │ ├── schema.zed # SpiceDB permission schema │ └── PERMISSIONS.md # Permission matrix -└── docker-compose.yml # Weaviate + SpiceDB +└── docker-compose.yml # Milvus + SpiceDB ``` ## Configuration @@ -287,10 +289,11 @@ Environment variables (`.env`): OPENAI_API_KEY=sk-... # Optional (defaults shown) -WEAVIATE_URL=http://localhost:8080 +MILVUS_URI=http://localhost:19530 +MILVUS_TOKEN= SPICEDB_ENDPOINT=localhost:50051 SPICEDB_TOKEN=devtoken -MAX_RETRIEVAL_ATTEMPTS=3 +MAX_RETRIEVAL_ATTEMPTS=1 ``` ## Dataset Overview @@ -318,14 +321,6 @@ The `examples/basic_example.py` demonstrates 8 scenarios: 7. **HR Department** - hr_manager queries HR policies 8. **Transparent Explanations** - Agent explains why access was denied -## Contributing & Extending - -See [CONTRIBUTING.md](CONTRIBUTING.md) for: -- Development setup -- Adding documents and permissions -- Customizing agent behavior -- Extending the system - ## Testing ```bash @@ -339,8 +334,9 @@ pytest tests/test_basic_flow.py::test_authorized_access ## Learn More - **SpiceDB**: https://authzed.com/docs -- **Weaviate**: https://weaviate.io/developers/weaviate +- **Milvus**: https://milvus.io/docs - **LangGraph**: https://langchain-ai.github.io/langgraph/ +- **langchain-spicedb**: https://github.com/authzed/langchain-spicedb ## License diff --git a/agentic-rag-authorization/api/models.py b/agentic-rag-authorization/api/models.py index bb07511..34cf2e8 100644 --- a/agentic-rag-authorization/api/models.py +++ b/agentic-rag-authorization/api/models.py @@ -2,6 +2,7 @@ from typing import List, Optional from pydantic import BaseModel, Field +from agentic_rag.config import get_config class QueryRequest(BaseModel): @@ -9,7 +10,7 @@ class QueryRequest(BaseModel): query: str = Field(..., min_length=1, max_length=1000) subject_id: str - max_attempts: int = Field(default=1, ge=1, le=5) + max_attempts: int = Field(default_factory=lambda: get_config().max_retrieval_attempts, ge=1, le=5) class DocumentSummary(BaseModel): diff --git a/agentic-rag-authorization/ui/index.html b/agentic-rag-authorization/ui/index.html index 6098623..c198201 100644 --- a/agentic-rag-authorization/ui/index.html +++ b/agentic-rag-authorization/ui/index.html @@ -403,7 +403,7 @@