From 0a5e0a142f58f5e511b579b1cc9f9c73ba59108b Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 10:29:18 +0200 Subject: [PATCH 01/14] infra: replace Weaviate with Milvus standalone in docker-compose --- agentic-rag-authorization/docker-compose.yml | 71 +++++++++++++++----- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/agentic-rag-authorization/docker-compose.yml b/agentic-rag-authorization/docker-compose.yml index df61d9b..c176a69 100644 --- a/agentic-rag-authorization/docker-compose.yml +++ b/agentic-rag-authorization/docker-compose.yml @@ -1,23 +1,58 @@ ---- -version: '3.8' +version: '3.5' services: - weaviate: - image: "cr.weaviate.io/semitechnologies/weaviate:latest" - ports: - - "8080:8080" # HTTP and gRPC both use this port + etcd: + container_name: milvus-etcd + image: quay.io/coreos/etcd:v3.5.5 + environment: + - ETCD_AUTO_COMPACTION_MODE=revision + - ETCD_AUTO_COMPACTION_RETENTION=1000 + - ETCD_QUOTA_BACKEND_BYTES=4294967296 + - ETCD_SNAPSHOT_COUNT=50000 + volumes: + - etcd_data:/etcd + command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + healthcheck: + test: ["CMD", "etcdctl", "endpoint", "health"] + interval: 30s + timeout: 20s + retries: 3 + + minio: + container_name: milvus-minio + image: minio/minio:RELEASE.2023-03-13T19-46-17Z environment: - QUERY_DEFAULTS_LIMIT: 25 - AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true' - PERSISTENCE_DATA_PATH: '/var/lib/weaviate' - DEFAULT_VECTORIZER_MODULE: 'text2vec-openai' - ENABLE_MODULES: 'text2vec-openai' - CLUSTER_HOSTNAME: 'node1' - OPENAI_APIKEY: "${OPENAI_API_KEY}" + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin volumes: - - "weaviate_data:/var/lib/weaviate" - env_file: - - ".env" + - minio_data:/minio_data + command: minio server /minio_data --console-address ":9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + standalone: + container_name: milvus-standalone + image: milvusdb/milvus:v2.4.6 + command: ["milvus", "run", "standalone"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + volumes: + - milvus_data:/var/lib/milvus + ports: + - "19530:19530" + - "9091:9091" + depends_on: + - etcd + - minio + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"] + interval: 30s + timeout: 20s + retries: 3 spicedb: image: "authzed/spicedb:latest" @@ -28,4 +63,6 @@ services: SPICEDB_DATASTORE_ENGINE: "memory" volumes: - weaviate_data: + etcd_data: + minio_data: + milvus_data: From 7ced47195bad048b13adde9b9c103aeda0bae14b Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 10:50:51 +0200 Subject: [PATCH 02/14] deps: replace weaviate-client with pymilvus --- agentic-rag-authorization/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agentic-rag-authorization/requirements.txt b/agentic-rag-authorization/requirements.txt index c40bc86..f5401bd 100644 --- a/agentic-rag-authorization/requirements.txt +++ b/agentic-rag-authorization/requirements.txt @@ -3,7 +3,7 @@ langchain>=0.1.0 langchain-openai>=0.1.0 langgraph>=0.0.20 langchain-spicedb>=0.2.0 -weaviate-client>=3.26.0,<4.0 # v3 for REST API stability (no gRPC issues) +pymilvus>=2.4.0 authzed>=0.7.0 python-dotenv>=1.0.0 grpcio>=1.50.0 From 6062c63d120dd80b1b467454d159170c4a731782 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:10:39 +0200 Subject: [PATCH 03/14] feat: replace Weaviate config with Milvus config vars --- .../agentic_rag/config.py | 10 +++--- .../tests/test_config.py | 35 +++++++++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) create mode 100644 agentic-rag-authorization/tests/test_config.py diff --git a/agentic-rag-authorization/agentic_rag/config.py b/agentic-rag-authorization/agentic_rag/config.py index 63bbab8..cc24fba 100644 --- a/agentic-rag-authorization/agentic_rag/config.py +++ b/agentic-rag-authorization/agentic_rag/config.py @@ -13,9 +13,9 @@ class Config: """Configuration for agentic RAG system.""" - # Weaviate - weaviate_url: str - weaviate_api_key: Optional[str] + # Milvus + milvus_uri: str + milvus_token: str # SpiceDB spicedb_endpoint: str @@ -34,8 +34,8 @@ class Config: def from_env(cls): """Load configuration from environment variables.""" return cls( - weaviate_url=os.getenv("WEAVIATE_URL", "http://localhost:8080"), - weaviate_api_key=os.getenv("WEAVIATE_API_KEY"), + milvus_uri=os.getenv("MILVUS_URI", "http://localhost:19530"), + milvus_token=os.getenv("MILVUS_TOKEN", ""), spicedb_endpoint=os.getenv("SPICEDB_ENDPOINT", "localhost:50051"), spicedb_token=os.getenv("SPICEDB_TOKEN", "devtoken"), openai_api_key=os.getenv("OPENAI_API_KEY", ""), diff --git a/agentic-rag-authorization/tests/test_config.py b/agentic-rag-authorization/tests/test_config.py new file mode 100644 index 0000000..fb1ebd0 --- /dev/null +++ b/agentic-rag-authorization/tests/test_config.py @@ -0,0 +1,35 @@ +"""Tests for config loading.""" + +import os +from unittest.mock import patch +from agentic_rag.config import Config + + +def test_config_loads_milvus_uri(): + env = { + "MILVUS_URI": "http://milvus-host:19530", + "OPENAI_API_KEY": "sk-test", + "SPICEDB_TOKEN": "tok", + "SPICEDB_ENDPOINT": "localhost:50051", + } + with patch.dict(os.environ, env, clear=True): + config = Config.from_env() + assert config.milvus_uri == "http://milvus-host:19530" + + +def test_config_milvus_defaults(): + env = { + "OPENAI_API_KEY": "sk-test", + "SPICEDB_TOKEN": "tok", + "SPICEDB_ENDPOINT": "localhost:50051", + } + with patch.dict(os.environ, env, clear=True): + config = Config.from_env() + assert config.milvus_uri == "http://localhost:19530" + assert config.milvus_token == "" + + +def test_config_has_no_weaviate_fields(): + config = Config.from_env() + assert not hasattr(config, "weaviate_url") + assert not hasattr(config, "weaviate_api_key") From 8915a3cfc7e4dfc16f934ec46d53e5bc72950119 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:31:55 +0200 Subject: [PATCH 04/14] chore: remove unused Optional import from config --- agentic-rag-authorization/agentic_rag/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/agentic-rag-authorization/agentic_rag/config.py b/agentic-rag-authorization/agentic_rag/config.py index cc24fba..749c2f7 100644 --- a/agentic-rag-authorization/agentic_rag/config.py +++ b/agentic-rag-authorization/agentic_rag/config.py @@ -2,7 +2,6 @@ from dataclasses import dataclass from functools import lru_cache -from typing import Optional import os from dotenv import load_dotenv From 7672adefbb1b9c1edfbc884731677b1780906812 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:33:29 +0200 Subject: [PATCH 05/14] feat: add MilvusClient singleton --- .../agentic_rag/milvus_client.py | 26 +++++++++++++++++++ .../tests/test_milvus_client.py | 25 ++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 agentic-rag-authorization/agentic_rag/milvus_client.py create mode 100644 agentic-rag-authorization/tests/test_milvus_client.py diff --git a/agentic-rag-authorization/agentic_rag/milvus_client.py b/agentic-rag-authorization/agentic_rag/milvus_client.py new file mode 100644 index 0000000..31b3dbd --- /dev/null +++ b/agentic-rag-authorization/agentic_rag/milvus_client.py @@ -0,0 +1,26 @@ +"""Milvus client connection pooling.""" + +from pymilvus import MilvusClient +from threading import Lock +from typing import Optional + +_milvus_client: Optional[MilvusClient] = None +_milvus_lock = Lock() + + +def get_milvus_client(uri: str) -> MilvusClient: + """Get or create reusable MilvusClient (singleton, thread-safe).""" + global _milvus_client + if _milvus_client is not None: + return _milvus_client + with _milvus_lock: + if _milvus_client is None: + _milvus_client = MilvusClient(uri=uri) + return _milvus_client + + +def reset_milvus_client(): + """Reset singleton (useful for testing).""" + global _milvus_client + with _milvus_lock: + _milvus_client = None diff --git a/agentic-rag-authorization/tests/test_milvus_client.py b/agentic-rag-authorization/tests/test_milvus_client.py new file mode 100644 index 0000000..b4c9928 --- /dev/null +++ b/agentic-rag-authorization/tests/test_milvus_client.py @@ -0,0 +1,25 @@ +"""Unit tests for Milvus client singleton.""" + +from unittest.mock import patch, MagicMock + + +def test_get_milvus_client_returns_singleton(): + from agentic_rag.milvus_client import get_milvus_client, reset_milvus_client + reset_milvus_client() + with patch("agentic_rag.milvus_client.MilvusClient") as mock_cls: + mock_cls.return_value = MagicMock() + client1 = get_milvus_client("http://localhost:19530") + client2 = get_milvus_client("http://localhost:19530") + assert client1 is client2 + mock_cls.assert_called_once_with(uri="http://localhost:19530") + + +def test_reset_clears_singleton(): + from agentic_rag.milvus_client import get_milvus_client, reset_milvus_client + reset_milvus_client() + with patch("agentic_rag.milvus_client.MilvusClient") as mock_cls: + mock_cls.return_value = MagicMock() + get_milvus_client("http://localhost:19530") + reset_milvus_client() + get_milvus_client("http://localhost:19530") + assert mock_cls.call_count == 2 From 4953cb265fc86cd810febd1acd7254408f601075 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:35:59 +0200 Subject: [PATCH 06/14] feat: replace Weaviate BM25 retrieval with Milvus semantic vector search --- .../agentic_rag/nodes/retrieval_node.py | 59 ++++++------- .../tests/test_retrieval_node.py | 87 +++++++++++++++++++ 2 files changed, 112 insertions(+), 34 deletions(-) create mode 100644 agentic-rag-authorization/tests/test_retrieval_node.py diff --git a/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py b/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py index 6c83f42..df075e5 100644 --- a/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py +++ b/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py @@ -1,60 +1,55 @@ -"""Retrieval node - retrieve documents from Weaviate.""" +"""Retrieval node - retrieve documents from Milvus using semantic vector search.""" +import openai from langchain_core.messages import SystemMessage from langchain_core.documents import Document from ..state import AgenticRAGState from ..config import get_config from ..logging_config import get_logger -from ..weaviate_client import get_weaviate_client +from ..milvus_client import get_milvus_client from ..node_helpers import log_node_execution logger = get_logger("nodes.retrieval") -def retrieval_node(state: AgenticRAGState) -> dict: - """Retrieve documents from Weaviate based on query. +def _embed(text: str, api_key: str) -> list[float]: + client = openai.OpenAI(api_key=api_key) + response = client.embeddings.create(model="text-embedding-3-small", input=text) + return response.data[0].embedding + - This node performs keyword search in Weaviate to find - relevant documents. Authorization happens in the next node. - """ +def retrieval_node(state: AgenticRAGState) -> dict: + """Retrieve documents from Milvus based on semantic similarity to the query.""" config = get_config() with log_node_execution( logger, "retrieval", - { - "query": state["query"], - "subject_id": state["subject_id"], - } + {"query": state["query"], "subject_id": state["subject_id"]}, ): try: - # Get or create Weaviate client (reused across requests) - weaviate_client = get_weaviate_client(config.weaviate_url) + milvus_client = get_milvus_client(config.milvus_uri) + query_embedding = _embed(state["query"], config.openai_api_key) - # Perform BM25 keyword search using v3 API - response = ( - weaviate_client.query.get("Documents", ["doc_id", "title", "content", "department", "classification"]) - .with_bm25(query=state["query"]) - .with_limit(5) - .do() + results = milvus_client.search( + collection_name="Documents", + data=[query_embedding], + limit=5, + output_fields=["doc_id", "title", "content", "department", "classification"], ) - # Extract results - results = response.get("data", {}).get("Get", {}).get("Documents", []) - - # Convert to LangChain Documents documents = [ Document( - page_content=result["content"], + page_content=hit["entity"]["content"], metadata={ - "doc_id": result["doc_id"], - "title": result["title"], - "department": result["department"], - "classification": result["classification"], + "doc_id": hit["entity"]["doc_id"], + "title": hit["entity"]["title"], + "department": hit["entity"]["department"], + "classification": hit["entity"]["classification"], }, ) - for result in results + for hit in results[0] ] logger.info( @@ -69,9 +64,7 @@ def retrieval_node(state: AgenticRAGState) -> dict: "retrieved_documents": documents, "retrieval_attempt": state["retrieval_attempt"] + 1, "messages": [ - SystemMessage( - content=f"Retrieved {len(documents)} documents from Weaviate" - ) + SystemMessage(content=f"Retrieved {len(documents)} documents from Milvus") ], } @@ -85,8 +78,6 @@ def retrieval_node(state: AgenticRAGState) -> dict: }, exc_info=True, ) - - # Fail gracefully - return empty results return { "retrieved_documents": [], "retrieval_attempt": state["retrieval_attempt"] + 1, diff --git a/agentic-rag-authorization/tests/test_retrieval_node.py b/agentic-rag-authorization/tests/test_retrieval_node.py new file mode 100644 index 0000000..5276e58 --- /dev/null +++ b/agentic-rag-authorization/tests/test_retrieval_node.py @@ -0,0 +1,87 @@ +"""Unit tests for retrieval node.""" + +import pytest +from unittest.mock import patch, MagicMock +from langchain_core.documents import Document + + +@pytest.fixture +def sample_state(): + return { + "query": "What are engineering best practices?", + "subject_id": "alice", + "max_attempts": 1, + "retrieval_attempt": 0, + "messages": [], + "reasoning": [], + "retrieved_documents": [], + "authorized_documents": [], + "denied_count": 0, + "authorization_passed": False, + "answer": "", + } + + +def _make_hit(doc_id="doc-001", title="Test Doc", content="Content", dept="engineering", classification="internal"): + return { + "entity": { + "doc_id": doc_id, + "title": title, + "content": content, + "department": dept, + "classification": classification, + } + } + + +def test_retrieval_node_returns_documents(sample_state): + from agentic_rag.nodes.retrieval_node import retrieval_node + + with patch("agentic_rag.nodes.retrieval_node.get_milvus_client") as mock_get_client, \ + patch("agentic_rag.nodes.retrieval_node._embed") as mock_embed: + mock_embed.return_value = [0.1] * 1536 + mock_client = MagicMock() + mock_client.search.return_value = [[_make_hit()]] + mock_get_client.return_value = mock_client + + result = retrieval_node(sample_state) + + assert len(result["retrieved_documents"]) == 1 + doc = result["retrieved_documents"][0] + assert isinstance(doc, Document) + assert doc.metadata["doc_id"] == "doc-001" + assert doc.metadata["title"] == "Test Doc" + assert doc.metadata["department"] == "engineering" + assert doc.metadata["classification"] == "internal" + assert doc.page_content == "Content" + assert result["retrieval_attempt"] == 1 + + +def test_retrieval_node_increments_attempt_on_failure(sample_state): + from agentic_rag.nodes.retrieval_node import retrieval_node + + with patch("agentic_rag.nodes.retrieval_node.get_milvus_client") as mock_get_client, \ + patch("agentic_rag.nodes.retrieval_node._embed") as mock_embed: + mock_embed.side_effect = RuntimeError("OpenAI unavailable") + mock_get_client.return_value = MagicMock() + + result = retrieval_node(sample_state) + + assert result["retrieved_documents"] == [] + assert result["retrieval_attempt"] == 1 + + +def test_embed_calls_openai(): + from agentic_rag.nodes.retrieval_node import _embed + + mock_response = MagicMock() + mock_response.data = [MagicMock(embedding=[0.5] * 1536)] + + with patch("agentic_rag.nodes.retrieval_node.openai.OpenAI") as mock_oai: + mock_oai.return_value.embeddings.create.return_value = mock_response + result = _embed("hello world", "sk-test") + + assert result == [0.5] * 1536 + mock_oai.return_value.embeddings.create.assert_called_once_with( + model="text-embedding-3-small", input="hello world" + ) From f837f45ac82543930e766ed42a586d5e3b089f49 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:38:24 +0200 Subject: [PATCH 07/14] feat: replace setup_weaviate with setup_milvus using OpenAI embeddings --- .../examples/setup_environment.py | 173 ++++++++---------- 1 file changed, 76 insertions(+), 97 deletions(-) diff --git a/agentic-rag-authorization/examples/setup_environment.py b/agentic-rag-authorization/examples/setup_environment.py index 8d2cc0e..a9e798c 100644 --- a/agentic-rag-authorization/examples/setup_environment.py +++ b/agentic-rag-authorization/examples/setup_environment.py @@ -1,16 +1,15 @@ -"""Initialize Weaviate and SpiceDB with sample data.""" +"""Initialize Milvus and SpiceDB with sample data.""" import sys import os from dotenv import load_dotenv -# Load environment variables load_dotenv() -# Add parent directory to path so we can import from agentic_rag sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -import weaviate +import openai +from pymilvus import MilvusClient, DataType from authzed.api.v1 import ( WriteSchemaRequest, WriteRelationshipsRequest, @@ -20,9 +19,7 @@ SubjectReference, ) from agentic_rag.grpc_helpers import create_insecure_spicedb_client -import json -# Add scripts directory to path for document parser sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'scripts')) from parse_documents import load_all_documents @@ -33,7 +30,6 @@ def setup_spicedb(): client = create_insecure_spicedb_client("localhost:50051", "devtoken") - # Load schema schema_path = os.path.join(os.path.dirname(__file__), "..", "data", "schema.zed") with open(schema_path) as f: schema = f.read() @@ -41,57 +37,43 @@ def setup_spicedb(): client.WriteSchema(WriteSchemaRequest(schema=schema)) print(" ✅ Schema loaded") - # Load documents to get all doc_ids documents = load_all_documents() - # Create user-department relationships updates = [ - # Alice is in engineering department RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, relationship=Relationship( - resource=ObjectReference( - object_type="department", object_id="engineering" - ), + resource=ObjectReference(object_type="department", object_id="engineering"), relation="member", subject=SubjectReference( object=ObjectReference(object_type="user", object_id="alice") ), ), ), - # Bob is in sales department RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, relationship=Relationship( - resource=ObjectReference( - object_type="department", object_id="sales" - ), + resource=ObjectReference(object_type="department", object_id="sales"), relation="member", subject=SubjectReference( object=ObjectReference(object_type="user", object_id="bob") ), ), ), - # HR manager is in HR department RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, relationship=Relationship( - resource=ObjectReference( - object_type="department", object_id="hr" - ), + resource=ObjectReference(object_type="department", object_id="hr"), relation="member", subject=SubjectReference( object=ObjectReference(object_type="user", object_id="hr_manager") ), ), ), - # Finance manager is in finance department RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, relationship=Relationship( - resource=ObjectReference( - object_type="department", object_id="finance" - ), + resource=ObjectReference(object_type="department", object_id="finance"), relation="member", subject=SubjectReference( object=ObjectReference(object_type="user", object_id="finance_manager") @@ -100,12 +82,10 @@ def setup_spicedb(): ), ] - # Create department-based document permissions for doc in documents: doc_id = doc['doc_id'] dept = doc['department'] - # Public documents: accessible to all users if dept == "public": for user in ["alice", "bob", "hr_manager", "finance_manager"]: updates.append( @@ -121,7 +101,6 @@ def setup_spicedb(): ) ) else: - # Department documents: accessible to department members updates.append( RelationshipUpdate( operation=RelationshipUpdate.Operation.OPERATION_TOUCH, @@ -129,21 +108,17 @@ def setup_spicedb(): resource=ObjectReference(object_type="document", object_id=doc_id), relation="viewer", subject=SubjectReference( - object=ObjectReference( - object_type="department", - object_id=dept, - ), + object=ObjectReference(object_type="department", object_id=dept), optional_relation="member", ), ), ) ) - # Cross-department documents cross_dept_docs = [ - ("engineering-architecture-001", "sales"), # Tech sales need architecture docs - ("sales-guide-005", "engineering"), # Engineering needs to understand product positioning - ("hr-policy-001", "finance"), # Finance needs HR policies for budget planning + ("engineering-architecture-001", "sales"), + ("sales-guide-005", "engineering"), + ("hr-policy-001", "finance"), ] for doc_id, additional_dept in cross_dept_docs: @@ -154,21 +129,17 @@ def setup_spicedb(): resource=ObjectReference(object_type="document", object_id=doc_id), relation="viewer", subject=SubjectReference( - object=ObjectReference( - object_type="department", - object_id=additional_dept, - ), + object=ObjectReference(object_type="department", object_id=additional_dept), optional_relation="member", ), ), ) ) - # Individual user exceptions individual_exceptions = [ - ("alice", "sales-proposal-001"), # Alice needs to see a technical sales proposal - ("finance_manager", "hr-policy-002"), # Finance manager needs HR compensation policy - ("bob", "engineering-guide-006"), # Bob needs technical documentation for sales + ("alice", "sales-proposal-001"), + ("finance_manager", "hr-policy-002"), + ("bob", "engineering-guide-006"), ] for user, doc_id in individual_exceptions: @@ -199,62 +170,70 @@ def setup_spicedb(): print(f" - Public access: 5 documents accessible to all users") -def setup_weaviate(): - """Setup Weaviate with sample documents.""" - print("\nSetting up Weaviate...") - - # Connect to Weaviate v3 (REST API) - client = weaviate.Client("http://127.0.0.1:8080") - - try: - # Check if class exists and delete it - try: - client.schema.delete_class("Documents") - print(" ✅ Deleted existing Documents class") - except: - pass - - # Create schema using v3 API (no vectorizer since we're using BM25 keyword search) - schema = { - "class": "Documents", - "vectorizer": "none", # Disable vectorization for BM25 keyword search - "properties": [ - {"name": "doc_id", "dataType": ["text"]}, - {"name": "title", "dataType": ["text"]}, - {"name": "content", "dataType": ["text"]}, - {"name": "department", "dataType": ["text"]}, - {"name": "classification", "dataType": ["text"]}, - ], - } - client.schema.create_class(schema) - print(" ✅ Documents class created") - - # Load documents from .txt files - documents = load_all_documents() - print(f" ✅ Loaded {len(documents)} documents from data/documents/") - - # Insert documents using v3 API - with client.batch as batch: - for doc in documents: - batch.add_data_object( - data_object=doc, - class_name="Documents", - ) +def setup_milvus(): + """Setup Milvus with sample documents using OpenAI semantic embeddings.""" + print("\nSetting up Milvus...") + + milvus_uri = os.getenv("MILVUS_URI", "http://localhost:19530") + openai_api_key = os.getenv("OPENAI_API_KEY", "") - print(f" ✅ Inserted {len(documents)} documents") - print(" Document Distribution:") + client = MilvusClient(uri=milvus_uri) + oai_client = openai.OpenAI(api_key=openai_api_key) - # Count by department - dept_counts = {} - for doc in documents: - dept = doc['department'] - dept_counts[dept] = dept_counts.get(dept, 0) + 1 + if client.has_collection("Documents"): + client.drop_collection("Documents") + print(" ✅ Dropped existing Documents collection") - for dept, count in sorted(dept_counts.items()): - print(f" - {dept}: {count} documents") + schema = client.create_schema(auto_id=False, enable_dynamic_field=False) + schema.add_field("doc_id", DataType.VARCHAR, max_length=256, is_primary=True) + schema.add_field("title", DataType.VARCHAR, max_length=512) + schema.add_field("content", DataType.VARCHAR, max_length=65535) + schema.add_field("department", DataType.VARCHAR, max_length=128) + schema.add_field("classification", DataType.VARCHAR, max_length=128) + schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=1536) + + index_params = client.prepare_index_params() + index_params.add_index( + field_name="embedding", + metric_type="COSINE", + index_type="IVF_FLAT", + params={"nlist": 128}, + ) + + client.create_collection("Documents", schema=schema, index_params=index_params) + print(" ✅ Documents collection created") + + documents = load_all_documents() + print(f" ✅ Loaded {len(documents)} documents from data/documents/") + + rows = [] + for i, doc in enumerate(documents): + response = oai_client.embeddings.create( + model="text-embedding-3-small", + input=doc["content"], + ) + rows.append({ + "doc_id": doc["doc_id"], + "title": doc["title"], + "content": doc["content"], + "department": doc["department"], + "classification": doc["classification"], + "embedding": response.data[0].embedding, + }) + if (i + 1) % 10 == 0: + print(f" Embedded {i + 1}/{len(documents)} documents...") + + client.insert("Documents", rows) + print(f" ✅ Inserted {len(rows)} documents with embeddings") + + dept_counts = {} + for doc in documents: + dept = doc['department'] + dept_counts[dept] = dept_counts.get(dept, 0) + 1 - finally: - pass # v3 client doesn't need explicit close + print(" Document Distribution:") + for dept, count in sorted(dept_counts.items()): + print(f" - {dept}: {count} documents") def main(): @@ -264,7 +243,7 @@ def main(): print("=" * 60) setup_spicedb() - setup_weaviate() + setup_milvus() print("\n" + "=" * 60) print("✅ Setup complete!") From 7f78f524c1faa4dcc2e6fca56d0170716a713bc8 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:39:29 +0200 Subject: [PATCH 08/14] chore: update conftest and health check for Milvus --- agentic-rag-authorization/api/routes.py | 2 +- agentic-rag-authorization/tests/conftest.py | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/agentic-rag-authorization/api/routes.py b/agentic-rag-authorization/api/routes.py index 707c3cb..59a033b 100644 --- a/agentic-rag-authorization/api/routes.py +++ b/agentic-rag-authorization/api/routes.py @@ -42,7 +42,7 @@ async def health_check(): return { "status": "healthy", "services": { - "weaviate": "connected", + "milvus": "connected", "spicedb": "connected", "openai": "configured", }, diff --git a/agentic-rag-authorization/tests/conftest.py b/agentic-rag-authorization/tests/conftest.py index 1b1c084..52109b6 100644 --- a/agentic-rag-authorization/tests/conftest.py +++ b/agentic-rag-authorization/tests/conftest.py @@ -1,16 +1,14 @@ """Test fixtures for agentic RAG tests.""" import pytest -import weaviate +from pymilvus import MilvusClient from agentic_rag.grpc_helpers import create_insecure_spicedb_client @pytest.fixture -def weaviate_client(): - """Create Weaviate client for tests.""" - client = weaviate.connect_to_local() - yield client - client.close() +def milvus_client(): + """Create MilvusClient for tests.""" + return MilvusClient(uri="http://localhost:19530") @pytest.fixture From ff89e5255d432c2a32f3e13f81c32472627daac5 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 12:40:21 +0200 Subject: [PATCH 09/14] chore: remove Weaviate client module --- .../agentic_rag/__init__.py | 2 +- .../agentic_rag/weaviate_client.py | 48 ------------------- 2 files changed, 1 insertion(+), 49 deletions(-) delete mode 100644 agentic-rag-authorization/agentic_rag/weaviate_client.py diff --git a/agentic-rag-authorization/agentic_rag/__init__.py b/agentic-rag-authorization/agentic_rag/__init__.py index 0531d2b..f12f9b8 100644 --- a/agentic-rag-authorization/agentic_rag/__init__.py +++ b/agentic-rag-authorization/agentic_rag/__init__.py @@ -1,4 +1,4 @@ -"""Agentic RAG with fine-grained authorization using Weaviate and SpiceDB.""" +"""Agentic RAG with fine-grained authorization using Milvus and SpiceDB.""" __version__ = "0.1.0" diff --git a/agentic-rag-authorization/agentic_rag/weaviate_client.py b/agentic-rag-authorization/agentic_rag/weaviate_client.py deleted file mode 100644 index 0c11fda..0000000 --- a/agentic-rag-authorization/agentic_rag/weaviate_client.py +++ /dev/null @@ -1,48 +0,0 @@ -"""Weaviate client connection pooling.""" - -import weaviate -from threading import Lock -from typing import Optional - -# Global singleton for Weaviate client with thread-safe initialization -_weaviate_client: Optional[weaviate.Client] = None -_weaviate_lock = Lock() - - -def get_weaviate_client(url: str) -> weaviate.Client: - """ - Get or create reusable Weaviate client (singleton, thread-safe). - - This function provides connection pooling for Weaviate by maintaining - a single client instance across requests, eliminating connection overhead. - - Args: - url: The Weaviate URL (e.g., "http://localhost:8080") - - Returns: - weaviate.Client configured for the given URL - """ - global _weaviate_client - - # Fast path: client already exists - if _weaviate_client is not None: - return _weaviate_client - - # Slow path: create new client with thread-safe lock - with _weaviate_lock: - # Double-check after acquiring lock - if _weaviate_client is None: - _weaviate_client = weaviate.Client(url) - - return _weaviate_client - - -def reset_weaviate_client(): - """ - Reset singleton (useful for testing). - - This allows tests to clear the cached client and create a fresh one. - """ - global _weaviate_client - with _weaviate_lock: - _weaviate_client = None From 7717ca31c80bd39e6777f86baf0d36d53e5dc097 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 13:24:36 +0200 Subject: [PATCH 10/14] fix: address post-review issues - wire token, add anns_field, fix run_ui, cleanup --- .../agentic_rag/graph.py | 2 +- .../agentic_rag/logging_config.py | 2 +- .../agentic_rag/milvus_client.py | 4 +-- .../agentic_rag/nodes/retrieval_node.py | 3 ++- agentic-rag-authorization/api/routes.py | 2 +- agentic-rag-authorization/docker-compose.yml | 8 +++--- agentic-rag-authorization/run_ui.py | 25 +++++++++++-------- .../test_improvements.py | 12 ++++----- .../tests/test_milvus_client.py | 2 +- 9 files changed, 34 insertions(+), 26 deletions(-) diff --git a/agentic-rag-authorization/agentic_rag/graph.py b/agentic-rag-authorization/agentic_rag/graph.py index eb980bf..01f0ed1 100644 --- a/agentic-rag-authorization/agentic_rag/graph.py +++ b/agentic-rag-authorization/agentic_rag/graph.py @@ -55,7 +55,7 @@ def build_agentic_rag_graph(): """Build the agentic RAG graph with deterministic authorization. Simplified Flow: - 1. Retrieval: Fetch documents from Weaviate + 1. Retrieval: Fetch documents from Milvus 2. Authorization: Deterministic permission check (security boundary) 3. Conditional: - If authorized docs exist: Generate answer diff --git a/agentic-rag-authorization/agentic_rag/logging_config.py b/agentic-rag-authorization/agentic_rag/logging_config.py index a7dbdfd..c901a3a 100644 --- a/agentic-rag-authorization/agentic_rag/logging_config.py +++ b/agentic-rag-authorization/agentic_rag/logging_config.py @@ -104,7 +104,7 @@ def setup_logging(level: str = "INFO") -> None: logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("openai").setLevel(logging.WARNING) - logging.getLogger("weaviate").setLevel(logging.WARNING) + logging.getLogger("pymilvus").setLevel(logging.WARNING) logging.getLogger("grpc").setLevel(logging.WARNING) diff --git a/agentic-rag-authorization/agentic_rag/milvus_client.py b/agentic-rag-authorization/agentic_rag/milvus_client.py index 31b3dbd..44f14f6 100644 --- a/agentic-rag-authorization/agentic_rag/milvus_client.py +++ b/agentic-rag-authorization/agentic_rag/milvus_client.py @@ -8,14 +8,14 @@ _milvus_lock = Lock() -def get_milvus_client(uri: str) -> MilvusClient: +def get_milvus_client(uri: str, token: str = "") -> MilvusClient: """Get or create reusable MilvusClient (singleton, thread-safe).""" global _milvus_client if _milvus_client is not None: return _milvus_client with _milvus_lock: if _milvus_client is None: - _milvus_client = MilvusClient(uri=uri) + _milvus_client = MilvusClient(uri=uri, token=token) return _milvus_client diff --git a/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py b/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py index df075e5..83e3ba4 100644 --- a/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py +++ b/agentic-rag-authorization/agentic_rag/nodes/retrieval_node.py @@ -29,12 +29,13 @@ def retrieval_node(state: AgenticRAGState) -> dict: {"query": state["query"], "subject_id": state["subject_id"]}, ): try: - milvus_client = get_milvus_client(config.milvus_uri) + milvus_client = get_milvus_client(config.milvus_uri, config.milvus_token) query_embedding = _embed(state["query"], config.openai_api_key) results = milvus_client.search( collection_name="Documents", data=[query_embedding], + anns_field="embedding", limit=5, output_fields=["doc_id", "title", "content", "department", "classification"], ) diff --git a/agentic-rag-authorization/api/routes.py b/agentic-rag-authorization/api/routes.py index 59a033b..e18ab32 100644 --- a/agentic-rag-authorization/api/routes.py +++ b/agentic-rag-authorization/api/routes.py @@ -37,7 +37,7 @@ async def get_users(): @router.get("/health") async def health_check(): """Check health of backend services.""" - # TODO: Actually check Weaviate + SpiceDB connectivity + # TODO: Actually check Milvus + SpiceDB connectivity # For now, return optimistic health status return { "status": "healthy", diff --git a/agentic-rag-authorization/docker-compose.yml b/agentic-rag-authorization/docker-compose.yml index c176a69..07e2f18 100644 --- a/agentic-rag-authorization/docker-compose.yml +++ b/agentic-rag-authorization/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.5' +version: '3.8' services: etcd: @@ -46,8 +46,10 @@ services: - "19530:19530" - "9091:9091" depends_on: - - etcd - - minio + etcd: + condition: service_healthy + minio: + condition: service_healthy healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"] interval: 30s diff --git a/agentic-rag-authorization/run_ui.py b/agentic-rag-authorization/run_ui.py index a04b7c7..80d1f98 100755 --- a/agentic-rag-authorization/run_ui.py +++ b/agentic-rag-authorization/run_ui.py @@ -18,16 +18,16 @@ def check_services(): print(" Copy .env.example to .env and configure it") return False - # Check Weaviate + # Check Milvus try: from agentic_rag.config import get_config - from agentic_rag.weaviate_client import get_weaviate_client + from agentic_rag.milvus_client import get_milvus_client config = get_config() - weaviate_client = get_weaviate_client(config.weaviate_url) - print(" ✅ Weaviate connected") + milvus_client = get_milvus_client(config.milvus_uri, config.milvus_token) + print(" ✅ Milvus connected") except Exception as e: - print(f" ❌ Weaviate not available: {e}") + print(f" ❌ Milvus not available: {e}") print(" Run: docker-compose up -d") return False @@ -51,12 +51,17 @@ def check_services(): # Check if documents are loaded try: - result = weaviate_client.query.get("Documents", ["doc_id"]).with_limit(1).do() - doc_count = len(result.get("data", {}).get("Get", {}).get("Documents", [])) - if doc_count > 0: - print(" ✅ Documents loaded in Weaviate") + if milvus_client.has_collection("Documents"): + stats = milvus_client.get_collection_stats("Documents") + row_count = int(stats.get("row_count", 0)) + if row_count > 0: + print(" ✅ Documents loaded in Milvus") + else: + print(" ⚠️ No documents found in Milvus") + print(" Run: python examples/setup_environment.py") + return False else: - print(" ⚠️ No documents found in Weaviate") + print(" ⚠️ Documents collection does not exist in Milvus") print(" Run: python examples/setup_environment.py") return False except Exception as e: diff --git a/agentic-rag-authorization/test_improvements.py b/agentic-rag-authorization/test_improvements.py index 9ba9dd7..0849ecd 100644 --- a/agentic-rag-authorization/test_improvements.py +++ b/agentic-rag-authorization/test_improvements.py @@ -65,19 +65,19 @@ def test_connection_pooling(): reset_spicedb_client, _spicedb_client, ) - from agentic_rag.weaviate_client import ( - get_weaviate_client, - reset_weaviate_client, - _weaviate_client, + from agentic_rag.milvus_client import ( + get_milvus_client, + reset_milvus_client, + _milvus_client, ) # Verify reset functions exist assert callable(reset_spicedb_client) - assert callable(reset_weaviate_client) + assert callable(reset_milvus_client) print("✅ Connection pooling functions defined correctly") print(" - get_spicedb_client() available") - print(" - get_weaviate_client() available") + print(" - get_milvus_client() available") print(" - reset_*_client() functions available") return True except (ImportError, AssertionError) as e: diff --git a/agentic-rag-authorization/tests/test_milvus_client.py b/agentic-rag-authorization/tests/test_milvus_client.py index b4c9928..f489833 100644 --- a/agentic-rag-authorization/tests/test_milvus_client.py +++ b/agentic-rag-authorization/tests/test_milvus_client.py @@ -11,7 +11,7 @@ def test_get_milvus_client_returns_singleton(): client1 = get_milvus_client("http://localhost:19530") client2 = get_milvus_client("http://localhost:19530") assert client1 is client2 - mock_cls.assert_called_once_with(uri="http://localhost:19530") + mock_cls.assert_called_once_with(uri="http://localhost:19530", token="") def test_reset_clears_singleton(): From d03b9d5c0463102dc5b42d69f4370ce2cc5e8d56 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 13:45:36 +0200 Subject: [PATCH 11/14] fix: use query() instead of get_collection_stats() for document check in run_ui --- agentic-rag-authorization/run_ui.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/agentic-rag-authorization/run_ui.py b/agentic-rag-authorization/run_ui.py index 80d1f98..79cbeea 100755 --- a/agentic-rag-authorization/run_ui.py +++ b/agentic-rag-authorization/run_ui.py @@ -52,9 +52,13 @@ def check_services(): # Check if documents are loaded try: if milvus_client.has_collection("Documents"): - stats = milvus_client.get_collection_stats("Documents") - row_count = int(stats.get("row_count", 0)) - if row_count > 0: + results = milvus_client.query( + collection_name="Documents", + filter='doc_id != ""', + output_fields=["doc_id"], + limit=1, + ) + if results: print(" ✅ Documents loaded in Milvus") else: print(" ⚠️ No documents found in Milvus") From c9ea43b5fb357afe6537bd9c14bb9cc124ee464a Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Thu, 28 May 2026 14:48:25 +0200 Subject: [PATCH 12/14] docs: reauthor README for Milvus migration; wire MAX_RETRIEVAL_ATTEMPTS through to UI - Replace all Weaviate references with Milvus (semantic vector search, text-embedding-3-small, MILVUS_URI/MILVUS_TOKEN config vars) - Update project structure, schema, state flow diagram, and Learn More links - api/models.py: max_attempts default now reads from get_config() instead of hardcoded 1 - ui/index.html: stop sending hardcoded max_attempts=1 so server default applies --- agentic-rag-authorization/README.md | 84 ++++++++++++------------- agentic-rag-authorization/api/models.py | 3 +- agentic-rag-authorization/ui/index.html | 2 +- 3 files changed, 43 insertions(+), 46 deletions(-) diff --git a/agentic-rag-authorization/README.md b/agentic-rag-authorization/README.md index 4598b4c..da57b3b 100644 --- a/agentic-rag-authorization/README.md +++ b/agentic-rag-authorization/README.md @@ -1,9 +1,8 @@ # Agentic RAG with Fine-Grained Authorization +This repository demonstrates how to combine agentic behavior with deterministic fine-grained authorization using LangGraph, SpiceDB, and Milvus. You'll learn to build RAG systems where a user can only see information from the documents they have access to. -This repository demonstrates how to combine agentic behavior with deterministic fine-grained authorization using LangGraph, SpiceDB, and Weaviate. You'll learn to build RAG systems where a user can view information only based on the documents they have access to. - -This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spicedb/) library +This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spicedb/) library. ![screengrab](agentic-rag.gif) @@ -17,10 +16,10 @@ This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spi This repo demonstrates: -1. **Fine-grained authorization in RAG** - How to enforce document-level permissions with SpiceDB to ensure the user only information based on what they have access to -2. **Security architecture** - Deterministic authorization boundary that cannot be bypassed +1. **Fine-grained authorization in RAG** - How to enforce document-level permissions with SpiceDB so users only see what they're allowed to see +2. **Security architecture** - A deterministic authorization boundary that cannot be bypassed by the agent 3. **Production features** - Structured logging, connection pooling, batch operations, error handling -4. **Real-world complexity** - 50 documents, 4 permission patterns with hierarchies. +4. **Real-world complexity** - 50 documents, 4 permission patterns with hierarchies Note: Despite the "agentic RAG" name, the default mode is intentionally simple and deterministic (3 nodes: retrieve → authorize → generate). This provides fast, predictable behavior suitable for most use cases. @@ -31,14 +30,14 @@ Traditional RAG retrieves documents by semantic similarity without considering p 1. **Security risk**: Users might see documents they shouldn't access 2. **Poor UX**: Silent failures when documents are denied, with no explanation -Read the [OWASP Top 10 for LLM](https://owasp.org/www-project-top-10-for-large-language-model-applications/) and [OWASP Top 10 Risks to Web Apps](https://owasp.org/Top10/2025/A01_2025-Broken_Access_Control/) for more information on why access control matters. +Read the [OWASP Top 10 for LLM](https://owasp.org/www-project-top-10-for-large-language-model-applications/) and [OWASP Top 10 Risks to Web Apps](https://owasp.org/Top10/2025/A01_2025-Broken_Access_Control/) for more on why access control matters. ## The Solution This implementation shows how to combine: -- **Retrieval-first approach**: Direct semantic/keyword search without upfront planning overhead +- **Retrieval-first approach**: Semantic vector search without upfront planning overhead - **Deterministic security**: SpiceDB authorization that cannot be bypassed -- **Transparency**: Users understand what they can/can't access and why +- **Transparency**: Users understand what they can and can't access, and why ``` Traditional RAG: Query → Retrieve → Generate @@ -123,29 +122,30 @@ pip install -r requirements.txt # Includes fastapi and uvicorn python3 run_ui.py ``` -The `setup-environment.py` file sets up Weaviate as the vector DB and SpiceDB with sample documents and department-based access control for the agentic RAG system. - -We're creating a schema and writing relationships for a hierarchical permission model with users assigned to departments, department-wide document access, 3 cross-department collaboration grants, and 3 individual user exceptions. +The `setup_environment.py` script sets up Milvus as the vector database and SpiceDB with sample documents and department-based access control. It embeds all 50 documents using OpenAI's `text-embedding-3-small` and inserts them into Milvus, then writes a hierarchical permission model to SpiceDB: users assigned to departments, department-wide document access, 3 cross-department collaboration grants, and 3 individual user exceptions. The UI launcher will: -- Verify documents are loaded -- Starts the FastAPI server +- Verify documents are loaded in Milvus +- Start the FastAPI server - Open your browser to http://localhost:8000 -Here are few sample prompts you can run: +Here are a few sample prompts to try: -Choose "Bob" from "Sales" as the user and the query as "What are the company handbook guidelines?" +Choose "Bob" from "Sales" as the user and run the query "What are the company handbook guidelines?" -You should see: +You should see: +``` 📊 Retrieved: 5 ✅ Authorized: 3 ❌ Denied: 2 +``` -Now run the same query as the "HR Manager". You should see: +Now run the same query as "HR Manager": +``` 📊 Retrieved: 5 ✅ Authorized: 5 ❌ Denied: 0 - +``` ### Manual Start @@ -162,7 +162,7 @@ open http://localhost:8000 ## Run Without UI -``` +```bash # Initialize data python3 examples/setup_environment.py @@ -182,8 +182,11 @@ definition department { } definition document { + relation owner: user relation viewer: user | department#member - permission view = viewer + + permission view = viewer + owner + permission edit = owner } ``` @@ -198,7 +201,7 @@ definition document { ``` User Query ↓ -Retrieval Node ← Weaviate BM25 keyword search +Retrieval Node ← Milvus semantic vector search (text-embedding-3-small) ↓ Authorization Node ← SpiceDB filters (SECURITY BOUNDARY - cannot be bypassed) ↓ @@ -225,7 +228,7 @@ Reasoning Node ← LLM decides: retry with different query, or give up? Generation Node ← explains the denial ``` -For example, if Bob (sales) asks about "microservices architecture" and the first retrieval returns only engineering-only docs, the reasoning node might try a broader query that surfaces a shared architecture doc Bob can actually access. +For example, if Bob (sales) asks about "microservices architecture" and the first retrieval returns only engineering-restricted docs, the reasoning node might try a broader query that surfaces a shared architecture doc Bob can actually access. Enable it by setting `MAX_RETRIEVAL_ATTEMPTS` in `.env` (or passing `max_attempts` directly): @@ -241,31 +244,30 @@ result = run_agentic_rag(query="...", subject_id="bob", max_attempts=3) ### 3. Security Guarantees -- **Authorization always runs**: Hardcoded in LangGraph workflow, agent cannot skip -- **Deterministic checks**: SpiceDB enforces permissions (no LLM involved) +- **Authorization always runs**: Hardcoded in the LangGraph workflow — the agent cannot skip it +- **Deterministic checks**: SpiceDB enforces permissions (no LLM involved in the decision) - **Fail closed**: Access denied unless explicitly granted - **Observable**: Full audit trail in state ## Project Structure ``` -agentic-rag-weaviate/ +agentic-rag-authorization/ ├── agentic_rag/ │ ├── graph.py # LangGraph state machine │ ├── state.py # State schema │ ├── config.py # Configuration management │ ├── nodes/ -│ │ ├── retrieval_node.py # Weaviate BM25 search +│ │ ├── retrieval_node.py # Milvus semantic vector search │ │ ├── authorization_node.py # SpiceDB filtering (security boundary) -│ │ ├── reasoning_node.py # Optional: adaptive retry logic -│ │ └── generation_node.py # Final answer with context -│ ├── authorization_helpers.py # Batch permission checking -│ ├── weaviate_client.py # Connection pooling for Weaviate +│ │ ├── reasoning_node.py # Optional: adaptive retry logic +│ │ └── generation_node.py # Final answer with context +│ ├── milvus_client.py # Connection pooling for Milvus │ ├── grpc_helpers.py # Connection pooling for SpiceDB │ ├── logging_config.py # Structured JSON logging │ └── validation.py # Input validation and sanitization ├── examples/ -│ ├── setup_environment.py # Initialize data (loads 50 documents) +│ ├── setup_environment.py # Initialize data (embeds and loads 50 documents) │ └── basic_example.py # 8 demo scenarios ├── scripts/ │ ├── generate_documents.py # Generate 50 .txt files @@ -275,7 +277,7 @@ agentic-rag-weaviate/ │ ├── documents/ # 50 .txt files (5 departments) │ ├── schema.zed # SpiceDB permission schema │ └── PERMISSIONS.md # Permission matrix -└── docker-compose.yml # Weaviate + SpiceDB +└── docker-compose.yml # Milvus + SpiceDB ``` ## Configuration @@ -287,10 +289,11 @@ Environment variables (`.env`): OPENAI_API_KEY=sk-... # Optional (defaults shown) -WEAVIATE_URL=http://localhost:8080 +MILVUS_URI=http://localhost:19530 +MILVUS_TOKEN= SPICEDB_ENDPOINT=localhost:50051 SPICEDB_TOKEN=devtoken -MAX_RETRIEVAL_ATTEMPTS=3 +MAX_RETRIEVAL_ATTEMPTS=1 ``` ## Dataset Overview @@ -318,14 +321,6 @@ The `examples/basic_example.py` demonstrates 8 scenarios: 7. **HR Department** - hr_manager queries HR policies 8. **Transparent Explanations** - Agent explains why access was denied -## Contributing & Extending - -See [CONTRIBUTING.md](CONTRIBUTING.md) for: -- Development setup -- Adding documents and permissions -- Customizing agent behavior -- Extending the system - ## Testing ```bash @@ -339,8 +334,9 @@ pytest tests/test_basic_flow.py::test_authorized_access ## Learn More - **SpiceDB**: https://authzed.com/docs -- **Weaviate**: https://weaviate.io/developers/weaviate +- **Milvus**: https://milvus.io/docs - **LangGraph**: https://langchain-ai.github.io/langgraph/ +- **langchain-spicedb**: https://github.com/authzed/langchain-spicedb ## License diff --git a/agentic-rag-authorization/api/models.py b/agentic-rag-authorization/api/models.py index bb07511..34cf2e8 100644 --- a/agentic-rag-authorization/api/models.py +++ b/agentic-rag-authorization/api/models.py @@ -2,6 +2,7 @@ from typing import List, Optional from pydantic import BaseModel, Field +from agentic_rag.config import get_config class QueryRequest(BaseModel): @@ -9,7 +10,7 @@ class QueryRequest(BaseModel): query: str = Field(..., min_length=1, max_length=1000) subject_id: str - max_attempts: int = Field(default=1, ge=1, le=5) + max_attempts: int = Field(default_factory=lambda: get_config().max_retrieval_attempts, ge=1, le=5) class DocumentSummary(BaseModel): diff --git a/agentic-rag-authorization/ui/index.html b/agentic-rag-authorization/ui/index.html index 6098623..c198201 100644 --- a/agentic-rag-authorization/ui/index.html +++ b/agentic-rag-authorization/ui/index.html @@ -403,7 +403,7 @@

Error

const response = await fetch(`${API_BASE}/query`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ query, subject_id, max_attempts: 1 }) + body: JSON.stringify({ query, subject_id }) }); const result = await response.json(); From c5ba13c5855c32ed185f3d8636ace9c39bf1ef94 Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Fri, 29 May 2026 11:44:33 +0200 Subject: [PATCH 13/14] docs: add TL;DR section, Milvus link, and cross-link to Weaviate version --- agentic-rag-authorization/README.md | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/agentic-rag-authorization/README.md b/agentic-rag-authorization/README.md index da57b3b..cdabbdd 100644 --- a/agentic-rag-authorization/README.md +++ b/agentic-rag-authorization/README.md @@ -1,11 +1,24 @@ # Agentic RAG with Fine-Grained Authorization -This repository demonstrates how to combine agentic behavior with deterministic fine-grained authorization using LangGraph, SpiceDB, and Milvus. You'll learn to build RAG systems where a user can only see information from the documents they have access to. +> **Also available:** [Weaviate version](https://github.com/authzed/examples/tree/weaviate/agentic-rag-authorization) (BM25 keyword search) + +This repository demonstrates how to combine agentic behavior with deterministic fine-grained authorization using LangGraph, SpiceDB, and [Milvus](https://github.com/milvus-io/milvus). You'll learn to build RAG systems where a user can only see information from the documents they have access to. This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spicedb/) library. ![screengrab](agentic-rag.gif) + +## TL;DR (human-written) + +RAG systems typically focus on the retrieval mechanisms, but don't have fine-grained access control to check if the information retrieved is accessible to the user asking the query. This demo shows the setup for a prod-like Agentic RAG. It has a corpus of 50 documents with complex sharing requirements that span individual, departments and exceptions. + +The two takeaways from this demo are: + +1. Using ReBAC makes it simple to model complex hierarchal permissions. The complexity increases in the context of RAG and AI Applications as there are 10x more principals, so traditional authorization methods such as RBAC fall flat. + +2. Never ever let an AI Agent *decide* if it needs to check for authorization. Gen AI is inherently probabilistic so you have to ensure that permission checks are deterministic and cannot be skipped. + ## Documentation Navigation - **[README.md](README.md)** (you are here) - Overview, quick start, core concepts @@ -21,7 +34,7 @@ This repo demonstrates: 3. **Production features** - Structured logging, connection pooling, batch operations, error handling 4. **Real-world complexity** - 50 documents, 4 permission patterns with hierarchies -Note: Despite the "agentic RAG" name, the default mode is intentionally simple and deterministic (3 nodes: retrieve → authorize → generate). This provides fast, predictable behavior suitable for most use cases. +Note: Despite the "agentic RAG" name, the default mode is intentionally simple and deterministic (3 nodes: retrieve → authorize → generate). This provides fast, predictable behavior suitable for most use cases. There is a `MAX_RETRIES` option where the AI Agent can reason if it has to retrieve more data. ## The Problem This Solves From 23b45862b8d1b6d63e04a4ecc0e993e36f82242f Mon Sep 17 00:00:00 2001 From: Sohan Maheshwar <1119120+sohanmaheshwar@users.noreply.github.com> Date: Fri, 29 May 2026 11:48:31 +0200 Subject: [PATCH 14/14] fix: quote all string values in docker-compose.yml to satisfy yamllint --- agentic-rag-authorization/docker-compose.yml | 59 ++++++++++---------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/agentic-rag-authorization/docker-compose.yml b/agentic-rag-authorization/docker-compose.yml index 07e2f18..a23002f 100644 --- a/agentic-rag-authorization/docker-compose.yml +++ b/agentic-rag-authorization/docker-compose.yml @@ -1,64 +1,65 @@ -version: '3.8' +--- +version: "3.8" services: etcd: - container_name: milvus-etcd - image: quay.io/coreos/etcd:v3.5.5 + container_name: "milvus-etcd" + image: "quay.io/coreos/etcd:v3.5.5" environment: - - ETCD_AUTO_COMPACTION_MODE=revision - - ETCD_AUTO_COMPACTION_RETENTION=1000 - - ETCD_QUOTA_BACKEND_BYTES=4294967296 - - ETCD_SNAPSHOT_COUNT=50000 + - "ETCD_AUTO_COMPACTION_MODE=revision" + - "ETCD_AUTO_COMPACTION_RETENTION=1000" + - "ETCD_QUOTA_BACKEND_BYTES=4294967296" + - "ETCD_SNAPSHOT_COUNT=50000" volumes: - - etcd_data:/etcd - command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + - "etcd_data:/etcd" + command: "etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd" healthcheck: test: ["CMD", "etcdctl", "endpoint", "health"] - interval: 30s - timeout: 20s + interval: "30s" + timeout: "20s" retries: 3 minio: - container_name: milvus-minio - image: minio/minio:RELEASE.2023-03-13T19-46-17Z + container_name: "milvus-minio" + image: "minio/minio:RELEASE.2023-03-13T19-46-17Z" environment: - MINIO_ACCESS_KEY: minioadmin - MINIO_SECRET_KEY: minioadmin + MINIO_ACCESS_KEY: "minioadmin" + MINIO_SECRET_KEY: "minioadmin" volumes: - - minio_data:/minio_data - command: minio server /minio_data --console-address ":9001" + - "minio_data:/minio_data" + command: "minio server /minio_data --console-address :9001" healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] - interval: 30s - timeout: 20s + interval: "30s" + timeout: "20s" retries: 3 standalone: - container_name: milvus-standalone - image: milvusdb/milvus:v2.4.6 + container_name: "milvus-standalone" + image: "milvusdb/milvus:v2.4.6" command: ["milvus", "run", "standalone"] environment: - ETCD_ENDPOINTS: etcd:2379 - MINIO_ADDRESS: minio:9000 + ETCD_ENDPOINTS: "etcd:2379" + MINIO_ADDRESS: "minio:9000" volumes: - - milvus_data:/var/lib/milvus + - "milvus_data:/var/lib/milvus" ports: - "19530:19530" - "9091:9091" depends_on: etcd: - condition: service_healthy + condition: "service_healthy" minio: - condition: service_healthy + condition: "service_healthy" healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"] - interval: 30s - timeout: 20s + interval: "30s" + timeout: "20s" retries: 3 spicedb: image: "authzed/spicedb:latest" - command: "serve --grpc-preshared-key \"devtoken\"" + command: 'serve --grpc-preshared-key "devtoken"' ports: - "50051:50051" environment: