Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 53 additions & 44 deletions agentic-rag-authorization/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
# Agentic RAG with Fine-Grained Authorization

> **Also available:** [Weaviate version](https://github.com/authzed/examples/tree/weaviate/agentic-rag-authorization) (BM25 keyword search)

This repository demonstrates how to combine agentic behavior with deterministic fine-grained authorization using LangGraph, SpiceDB, and Weaviate. You'll learn to build RAG systems where a user can view information only based on the documents they have access to.
This repository demonstrates how to combine agentic behavior with deterministic fine-grained authorization using LangGraph, SpiceDB, and [Milvus](https://github.com/milvus-io/milvus). You'll learn to build RAG systems where a user can only see information from the documents they have access to.

This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spicedb/) library
This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spicedb/) library.

![screengrab](agentic-rag.gif)


## TL;DR (human-written)

RAG systems typically focus on the retrieval mechanisms, but don't have fine-grained access control to check if the information retrieved is accessible to the user asking the query. This demo shows the setup for a prod-like Agentic RAG. It has a corpus of 50 documents with complex sharing requirements that span individual, departments and exceptions.

The two takeaways from this demo are:

1. Using ReBAC makes it simple to model complex hierarchal permissions. The complexity increases in the context of RAG and AI Applications as there are 10x more principals, so traditional authorization methods such as RBAC fall flat.

2. Never ever let an AI Agent *decide* if it needs to check for authorization. Gen AI is inherently probabilistic so you have to ensure that permission checks are deterministic and cannot be skipped.

## Documentation Navigation

- **[README.md](README.md)** (you are here) - Overview, quick start, core concepts
Expand All @@ -17,12 +29,12 @@ This project uses the [LangChain SpiceDB](https://pypi.org/project/langchain-spi

This repo demonstrates:

1. **Fine-grained authorization in RAG** - How to enforce document-level permissions with SpiceDB to ensure the user only information based on what they have access to
2. **Security architecture** - Deterministic authorization boundary that cannot be bypassed
1. **Fine-grained authorization in RAG** - How to enforce document-level permissions with SpiceDB so users only see what they're allowed to see
2. **Security architecture** - A deterministic authorization boundary that cannot be bypassed by the agent
3. **Production features** - Structured logging, connection pooling, batch operations, error handling
4. **Real-world complexity** - 50 documents, 4 permission patterns with hierarchies.
4. **Real-world complexity** - 50 documents, 4 permission patterns with hierarchies

Note: Despite the "agentic RAG" name, the default mode is intentionally simple and deterministic (3 nodes: retrieve → authorize → generate). This provides fast, predictable behavior suitable for most use cases.
Note: Despite the "agentic RAG" name, the default mode is intentionally simple and deterministic (3 nodes: retrieve → authorize → generate). This provides fast, predictable behavior suitable for most use cases. There is a `MAX_RETRIES` option where the AI Agent can reason if it has to retrieve more data.

## The Problem This Solves

Expand All @@ -31,14 +43,14 @@ Traditional RAG retrieves documents by semantic similarity without considering p
1. **Security risk**: Users might see documents they shouldn't access
2. **Poor UX**: Silent failures when documents are denied, with no explanation

Read the [OWASP Top 10 for LLM](https://owasp.org/www-project-top-10-for-large-language-model-applications/) and [OWASP Top 10 Risks to Web Apps](https://owasp.org/Top10/2025/A01_2025-Broken_Access_Control/) for more information on why access control matters.
Read the [OWASP Top 10 for LLM](https://owasp.org/www-project-top-10-for-large-language-model-applications/) and [OWASP Top 10 Risks to Web Apps](https://owasp.org/Top10/2025/A01_2025-Broken_Access_Control/) for more on why access control matters.

## The Solution

This implementation shows how to combine:
- **Retrieval-first approach**: Direct semantic/keyword search without upfront planning overhead
- **Retrieval-first approach**: Semantic vector search without upfront planning overhead
- **Deterministic security**: SpiceDB authorization that cannot be bypassed
- **Transparency**: Users understand what they can/can't access and why
- **Transparency**: Users understand what they can and can't access, and why

```
Traditional RAG: Query → Retrieve → Generate
Expand Down Expand Up @@ -123,29 +135,30 @@ pip install -r requirements.txt # Includes fastapi and uvicorn
python3 run_ui.py
```

The `setup-environment.py` file sets up Weaviate as the vector DB and SpiceDB with sample documents and department-based access control for the agentic RAG system.

We're creating a schema and writing relationships for a hierarchical permission model with users assigned to departments, department-wide document access, 3 cross-department collaboration grants, and 3 individual user exceptions.
The `setup_environment.py` script sets up Milvus as the vector database and SpiceDB with sample documents and department-based access control. It embeds all 50 documents using OpenAI's `text-embedding-3-small` and inserts them into Milvus, then writes a hierarchical permission model to SpiceDB: users assigned to departments, department-wide document access, 3 cross-department collaboration grants, and 3 individual user exceptions.

The UI launcher will:
- Verify documents are loaded
- Starts the FastAPI server
- Verify documents are loaded in Milvus
- Start the FastAPI server
- Open your browser to http://localhost:8000

Here are few sample prompts you can run:
Here are a few sample prompts to try:

Choose "Bob" from "Sales" as the user and the query as "What are the company handbook guidelines?"
Choose "Bob" from "Sales" as the user and run the query "What are the company handbook guidelines?"

You should see:
You should see:
```
📊 Retrieved: 5
✅ Authorized: 3
❌ Denied: 2
```

Now run the same query as the "HR Manager". You should see:
Now run the same query as "HR Manager":
```
📊 Retrieved: 5
✅ Authorized: 5
❌ Denied: 0

```

### Manual Start

Expand All @@ -162,7 +175,7 @@ open http://localhost:8000

## Run Without UI

```
```bash
# Initialize data
python3 examples/setup_environment.py

Expand All @@ -182,8 +195,11 @@ definition department {
}

definition document {
relation owner: user
relation viewer: user | department#member
permission view = viewer

permission view = viewer + owner
permission edit = owner
}
```

Expand All @@ -198,7 +214,7 @@ definition document {
```
User Query
Retrieval Node ← Weaviate BM25 keyword search
Retrieval Node ← Milvus semantic vector search (text-embedding-3-small)
Authorization Node ← SpiceDB filters (SECURITY BOUNDARY - cannot be bypassed)
Expand All @@ -225,7 +241,7 @@ Reasoning Node ← LLM decides: retry with different query, or give up?
Generation Node ← explains the denial
```

For example, if Bob (sales) asks about "microservices architecture" and the first retrieval returns only engineering-only docs, the reasoning node might try a broader query that surfaces a shared architecture doc Bob can actually access.
For example, if Bob (sales) asks about "microservices architecture" and the first retrieval returns only engineering-restricted docs, the reasoning node might try a broader query that surfaces a shared architecture doc Bob can actually access.

Enable it by setting `MAX_RETRIEVAL_ATTEMPTS` in `.env` (or passing `max_attempts` directly):

Expand All @@ -241,31 +257,30 @@ result = run_agentic_rag(query="...", subject_id="bob", max_attempts=3)

### 3. Security Guarantees

- **Authorization always runs**: Hardcoded in LangGraph workflow, agent cannot skip
- **Deterministic checks**: SpiceDB enforces permissions (no LLM involved)
- **Authorization always runs**: Hardcoded in the LangGraph workflow — the agent cannot skip it
- **Deterministic checks**: SpiceDB enforces permissions (no LLM involved in the decision)
- **Fail closed**: Access denied unless explicitly granted
- **Observable**: Full audit trail in state

## Project Structure

```
agentic-rag-weaviate/
agentic-rag-authorization/
├── agentic_rag/
│ ├── graph.py # LangGraph state machine
│ ├── state.py # State schema
│ ├── config.py # Configuration management
│ ├── nodes/
│ │ ├── retrieval_node.py # Weaviate BM25 search
│ │ ├── retrieval_node.py # Milvus semantic vector search
│ │ ├── authorization_node.py # SpiceDB filtering (security boundary)
│ │ ├── reasoning_node.py # Optional: adaptive retry logic
│ │ └── generation_node.py # Final answer with context
│ ├── authorization_helpers.py # Batch permission checking
│ ├── weaviate_client.py # Connection pooling for Weaviate
│ │ ├── reasoning_node.py # Optional: adaptive retry logic
│ │ └── generation_node.py # Final answer with context
│ ├── milvus_client.py # Connection pooling for Milvus
│ ├── grpc_helpers.py # Connection pooling for SpiceDB
│ ├── logging_config.py # Structured JSON logging
│ └── validation.py # Input validation and sanitization
├── examples/
│ ├── setup_environment.py # Initialize data (loads 50 documents)
│ ├── setup_environment.py # Initialize data (embeds and loads 50 documents)
│ └── basic_example.py # 8 demo scenarios
├── scripts/
│ ├── generate_documents.py # Generate 50 .txt files
Expand All @@ -275,7 +290,7 @@ agentic-rag-weaviate/
│ ├── documents/ # 50 .txt files (5 departments)
│ ├── schema.zed # SpiceDB permission schema
│ └── PERMISSIONS.md # Permission matrix
└── docker-compose.yml # Weaviate + SpiceDB
└── docker-compose.yml # Milvus + SpiceDB
```

## Configuration
Expand All @@ -287,10 +302,11 @@ Environment variables (`.env`):
OPENAI_API_KEY=sk-...

# Optional (defaults shown)
WEAVIATE_URL=http://localhost:8080
MILVUS_URI=http://localhost:19530
MILVUS_TOKEN=
SPICEDB_ENDPOINT=localhost:50051
SPICEDB_TOKEN=devtoken
MAX_RETRIEVAL_ATTEMPTS=3
MAX_RETRIEVAL_ATTEMPTS=1
```

## Dataset Overview
Expand Down Expand Up @@ -318,14 +334,6 @@ The `examples/basic_example.py` demonstrates 8 scenarios:
7. **HR Department** - hr_manager queries HR policies
8. **Transparent Explanations** - Agent explains why access was denied

## Contributing & Extending

See [CONTRIBUTING.md](CONTRIBUTING.md) for:
- Development setup
- Adding documents and permissions
- Customizing agent behavior
- Extending the system

## Testing

```bash
Expand All @@ -339,8 +347,9 @@ pytest tests/test_basic_flow.py::test_authorized_access
## Learn More

- **SpiceDB**: https://authzed.com/docs
- **Weaviate**: https://weaviate.io/developers/weaviate
- **Milvus**: https://milvus.io/docs
- **LangGraph**: https://langchain-ai.github.io/langgraph/
- **langchain-spicedb**: https://github.com/authzed/langchain-spicedb

## License

Expand Down
2 changes: 1 addition & 1 deletion agentic-rag-authorization/agentic_rag/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Agentic RAG with fine-grained authorization using Weaviate and SpiceDB."""
"""Agentic RAG with fine-grained authorization using Milvus and SpiceDB."""

__version__ = "0.1.0"

Expand Down
11 changes: 5 additions & 6 deletions agentic-rag-authorization/agentic_rag/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from dataclasses import dataclass
from functools import lru_cache
from typing import Optional
import os
from dotenv import load_dotenv

Expand All @@ -13,9 +12,9 @@
class Config:
"""Configuration for agentic RAG system."""

# Weaviate
weaviate_url: str
weaviate_api_key: Optional[str]
# Milvus
milvus_uri: str
milvus_token: str

# SpiceDB
spicedb_endpoint: str
Expand All @@ -34,8 +33,8 @@ class Config:
def from_env(cls):
"""Load configuration from environment variables."""
return cls(
weaviate_url=os.getenv("WEAVIATE_URL", "http://localhost:8080"),
weaviate_api_key=os.getenv("WEAVIATE_API_KEY"),
milvus_uri=os.getenv("MILVUS_URI", "http://localhost:19530"),
milvus_token=os.getenv("MILVUS_TOKEN", ""),
spicedb_endpoint=os.getenv("SPICEDB_ENDPOINT", "localhost:50051"),
spicedb_token=os.getenv("SPICEDB_TOKEN", "devtoken"),
openai_api_key=os.getenv("OPENAI_API_KEY", ""),
Expand Down
2 changes: 1 addition & 1 deletion agentic-rag-authorization/agentic_rag/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def build_agentic_rag_graph():
"""Build the agentic RAG graph with deterministic authorization.

Simplified Flow:
1. Retrieval: Fetch documents from Weaviate
1. Retrieval: Fetch documents from Milvus
2. Authorization: Deterministic permission check (security boundary)
3. Conditional:
- If authorized docs exist: Generate answer
Expand Down
2 changes: 1 addition & 1 deletion agentic-rag-authorization/agentic_rag/logging_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def setup_logging(level: str = "INFO") -> None:
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("openai").setLevel(logging.WARNING)
logging.getLogger("weaviate").setLevel(logging.WARNING)
logging.getLogger("pymilvus").setLevel(logging.WARNING)
logging.getLogger("grpc").setLevel(logging.WARNING)


Expand Down
26 changes: 26 additions & 0 deletions agentic-rag-authorization/agentic_rag/milvus_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""Milvus client connection pooling."""

from pymilvus import MilvusClient
from threading import Lock
from typing import Optional

_milvus_client: Optional[MilvusClient] = None
_milvus_lock = Lock()


def get_milvus_client(uri: str, token: str = "") -> MilvusClient:
"""Get or create reusable MilvusClient (singleton, thread-safe)."""
global _milvus_client
if _milvus_client is not None:
return _milvus_client
with _milvus_lock:
if _milvus_client is None:
_milvus_client = MilvusClient(uri=uri, token=token)
return _milvus_client


def reset_milvus_client():
"""Reset singleton (useful for testing)."""
global _milvus_client
with _milvus_lock:
_milvus_client = None
Loading
Loading