This document provides a comprehensive overview of the LangGraph.js AI Agent Template architecture, designed to help developers understand the system's design patterns and extend functionality.
- System Overview
- Core Components
- Data Flow
- Database Schema
- Agent Workflow
- MCP Integration
- Tool Approval Process
- File Upload & Storage
- Streaming Architecture
- Error Handling
- Performance Considerations
- Observability
┌─────────────────────────────────────────────────────────────────┐
│ Browser (Client) │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Chat UI │ │ Settings UI │ │ Thread List │ │
│ │ Components │ │ (MCP Config) │ │ Sidebar │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ React Query │ │ Context API │ │ Custom Hooks │ │
│ │ (State Mgmt) │ │ (UI State) │ │ (Data Logic) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
HTTP/SSE
│
┌─────────────────────────────────────────────────────────────────┐
│ Next.js Server │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ API Routes │ │ Agent Service │ │ Chat Service │ │
│ │ (REST/SSE) │ │ (Streaming) │ │ (Utils) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Agent Builder │ │ MCP Client │ │ Memory Mgmt │ │
│ │ (LangGraph) │ │ (Tools) │ │ (History) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
Database/Network
│
┌─────────────────────────────────────────────────────────────────────────────┐
│ External Systems │
├─────────────────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ PostgreSQL │ │OpenAI/Google│ │ MCP Servers │ │ MinIO/S3 (Storage) │ │
│ │(Persistence)│ │ (LLM APIs) │ │ (Tools) │ │ (File Uploads) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
- Next.js 15: App Router with Server Components
- React 19: Latest features including Suspense and concurrent rendering
- TypeScript: Strict mode for type safety
- Tailwind CSS: Utility-first styling
- shadcn/ui: Accessible component library
- React Query (TanStack Query): Server state management
- Node.js: JavaScript runtime
- Prisma ORM: Type-safe database access
- PostgreSQL: Primary database
- Server-Sent Events: Real-time streaming
- MinIO/S3: Object storage for file uploads
- LangGraph.js: Agent orchestration framework
- LangChain: LLM abstraction and tools
- OpenAI/Google: Language model providers
- Model Context Protocol: Dynamic tool integration
The heart of the AI agent system, responsible for creating and configuring LangGraph StateGraphs.
export class AgentBuilder {
private toolNode: ToolNode;
private readonly model: BaseChatModel;
private tools: DynamicTool[];
private systemPrompt: string;
private approveAllTools: boolean;
private checkpointer?: BaseCheckpointSaver;
build() {
const stateGraph = new StateGraph(MessagesAnnotation);
stateGraph
.addNode("agent", this.callModel.bind(this))
.addNode("tools", this.toolNode)
.addNode("tool_approval", this.approveToolCall.bind(this))
.addEdge(START, "agent")
.addConditionalEdges("agent", this.shouldApproveTool.bind(this))
.addEdge("tools", "agent");
return stateGraph.compile({ checkpointer: this.checkpointer });
}
}Key Responsibilities:
- StateGraph construction with human-in-the-loop pattern
- Tool binding and approval workflow
- Model configuration and prompt management
- Checkpointer integration for persistence
Manages dynamic tool loading from Model Context Protocol servers.
export async function createMCPClient(): Promise<MultiServerMCPClient | null> {
const mcpServers = await getMCPServerConfigs(); // From database
if (Object.keys(mcpServers).length === 0) {
return null;
}
const client = new MultiServerMCPClient({
mcpServers: mcpServers,
throwOnLoadError: false,
prefixToolNameWithServerName: true, // Prevent conflicts
});
return client;
}Key Features:
- Database-driven MCP server configuration
- Support for stdio and HTTP transports
- Tool name prefixing for conflict prevention
- Graceful error handling for failed servers
Handles real-time streaming of agent responses via Server-Sent Events.
export async function streamResponse(params: {
threadId: string;
userText: string;
opts?: MessageOptions;
}) {
// Ensure thread exists
await ensureThread(threadId, userText);
// Handle tool approval vs normal input
const inputs = opts?.allowTool
? new Command({ resume: { action: opts.allowTool === "allow" ? "continue" : "update" } })
: { messages: [new HumanMessage(userText)] };
const agent = await ensureAgent({
model: opts?.model,
tools: opts?.tools,
approveAllTools: opts?.approveAllTools,
});
// Stream with checkpointer for persistence
const iterable = await agent.stream(inputs, {
streamMode: ["updates"],
configurable: { thread_id: threadId },
});
// Process and yield streaming chunks
async function* generator(): AsyncGenerator<MessageResponse, void, unknown> {
for await (const chunk of iterable) {
// Process chunk and yield MessageResponse
}
}
return generator();
}React hook providing chat functionality with optimistic UI updates.
export function useChatThread({ threadId }: UseChatThreadOptions) {
const queryClient = useQueryClient();
const streamRef = useRef<EventSource | null>(null);
const sendMessage = useCallback(
async (text: string, opts?: MessageOptions) => {
// Optimistic UI: Add user message immediately
const userMessage: MessageResponse = {
type: "human",
data: { id: `temp-${Date.now()}`, content: text },
};
queryClient.setQueryData(["messages", threadId], (old: MessageResponse[] = []) => [
...old,
userMessage,
]);
// Stream agent response
await handleStreamResponse({ threadId, text, opts });
},
[threadId, queryClient, handleStreamResponse],
);
return {
messages,
sendMessage,
approveToolExecution,
// ... other methods
};
}User Input → Optimistic UI → API Route → Agent Service → LangGraph Agent
↓ ↓
React Query ←─ SSE Stream ←─ Stream Response ←─ Agent Stream ←─┘
↓
UI Update
-
User Input
- User types message in
MessageInputcomponent useChatThread.sendMessage()called
- User types message in
-
Optimistic UI Update
- User message immediately added to React Query cache
- UI updates instantly for responsive feel
-
API Request
- SSE connection opened to
/api/agent/stream - Request includes thread ID, message content, and options
- SSE connection opened to
-
Agent Processing
streamResponse()ensures thread exists in database- Agent created with current MCP tools and configuration
- LangGraph begins processing with checkpointer for persistence
-
Tool Approval (if needed)
- Agent pauses at tool calls if approval required
- Tool details sent via SSE to frontend
- User approves/denies via UI
- Resume command sent to continue processing
-
Streaming Response
- Agent response streamed chunk-by-chunk via SSE
- Frontend accumulates chunks by message ID
- React Query cache updated in real-time
-
Persistence
- All messages stored in LangGraph checkpointer
- Thread metadata updated in PostgreSQL
- MCP server configurations persisted
┌─────────────────┐ ┌─────────────────┐
│ Thread │ │ MCPServer │
├─────────────────┤ ├─────────────────┤
│ id: String (PK) │ │ id: String (PK) │
│ title: String │ │ name: String │
│ createdAt: Date │ │ type: Enum │
│ updatedAt: Date │ │ enabled: Bool │
└─────────────────┘ │ command: String?│
│ args: Json? │
│ env: Json? │
│ url: String? │
│ headers: Json? │
│ createdAt: Date │
│ updatedAt: Date │
└─────────────────┘
┌─────────────────────────┐
│ LangGraph Checkpoints │
│ (managed by framework)│
├─────────────────────────┤
│ thread_id: String │
│ checkpoint_id: String │
│ │
└─────────────────────────┘
model Thread {
id String @id @default(uuid())
title String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}Purpose: Minimal metadata for conversation threads. The actual conversation history is stored in LangGraph checkpoints for efficient state management.
model MCPServer {
id String @id @default(uuid())
name String @unique
type MCPServerType // stdio | http
enabled Boolean @default(true)
// For stdio servers
command String?
args Json?
env Json?
// For http servers
url String?
headers Json?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}Purpose: Dynamic configuration of MCP servers. Supports both stdio (command-line) and HTTP-based servers with flexible JSON configuration.
START
│
▼
┌──────────┐
│ agent │ ──► Call language model with tools
└──────────┘
│
▼
Should approve
tool?
┌─────────┐
│ Yes │ ──► ┌─────────────┐
└─────────┘ │tool_approval│ ──► Human review
└─────────────┘
┌─────────┐ │
│ No │ ▼
└─────────┘ ┌─────────┐
│ │ tools │ ──► Execute tools
▼ └─────────┘
END │
▼
Back to agent
- Input: Current conversation state
- Process:
- Add system prompt to message history
- Bind available tools to language model
- Generate response with potential tool calls
- Output: AI message (text and/or tool calls)
- Input: AI message with tool calls
- Process:
- Check if
approveAllToolsis enabled - If not, interrupt with tool details for human review
- Wait for user decision (allow/deny/modify)
- Check if
- Output: Command to continue to tools or return to agent
- Input: Approved tool calls
- Process: Execute tools via MCP clients
- Output: Tool results as messages
const humanReview = interrupt<
{ question: string; toolCall: ToolCall },
{ action: string; data: string | MessageContentComplex[] }
>({
question: "Is this correct?",
toolCall: toolCall,
});
switch (humanReview.action) {
case "continue":
return new Command({ goto: "tools" });
case "update":
return new Command({
goto: "tools",
update: { messages: [updatedMessage] },
});
case "feedback":
return new Command({
goto: "agent",
update: { messages: [toolMessage] },
});
}Database MCPServer → getMCPServerConfigs() → MultiServerMCPClient → Agent Tools
{
name: "filesystem",
type: "stdio",
command: "npx",
args: ["@modelcontextprotocol/server-filesystem", "/allowed/path"],
env: { "LOG_LEVEL": "info" }
}{
name: "web-search",
type: "http",
url: "https://api.example.com/mcp",
headers: {
"Authorization": "Bearer token",
"Content-Type": "application/json"
}
}- Database Query: Fetch enabled MCP servers
- Client Creation: Initialize MultiServerMCPClient
- Tool Discovery: Get available tools from each server
- Name Prefixing: Add server name prefix to prevent conflicts
- Agent Binding: Bind tools to language model
HTTP MCP servers may require OAuth 2.0 authentication. See OAuth Documentation for the complete flow and implementation details.
The application supports multimodal AI conversations through file uploads. Files are stored in S3-compatible storage (MinIO for development) and processed for AI consumption.
User → MessageInput → Upload API → MinIO/S3 → File Metadata
↓
Agent Request ← processAttachmentsForAI ← Download & Convert to Base64
| Type | Extensions | Max Size | AI Processing |
|---|---|---|---|
| Images | PNG, JPEG | 5MB | Base64 data URL |
| Documents | 10MB | Base64 data URL | |
| Text | MD, TXT | 2MB | UTF-8 text extraction |
Handles file validation and storage:
- Validates MIME type and file size
- Handles
application/octet-streamfor text files by extension - Uploads to MinIO/S3 with unique keys
- Returns file metadata (URL, key, name, type, size)
- s3-client.ts: AWS SDK S3 client configuration
- upload.ts: Upload functions with multipart support for large files
- validation.ts: File type and size validation rules
- content.ts: File processing for AI (base64 conversion, text extraction)
if (opts?.attachments && opts.attachments.length > 0) {
const attachmentContents = await processAttachmentsForAI(opts.attachments);
messageContent = [{ type: "text", text: userText }, ...attachmentContents];
}┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ MessageInput │────►│ Upload API │────►│ MinIO/S3 │
│ (File Select) │ │ (Validation) │ │ (Storage) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ LangChain │◄────│ processAttach- │◄────│ Download & │
│ HumanMessage │ │ mentsForAI() │ │ Base64 Convert │
└─────────────────┘ └─────────────────┘ └─────────────────┘
The storage layer uses AWS SDK v3, which works with any S3-compatible service. To switch from MinIO to production storage (AWS S3, Cloudflare R2, etc.), update the environment variables - no code changes required.
Tool Call Detected → Approval UI Rendered → User Decision → Command Sent → Agent Resumes
- Action: Execute tool with original parameters
- Implementation:
new Command({ goto: "tools" }) - Result: Tool runs and agent continues with results
- Action: Skip tool execution
- Implementation: Return to agent with denial message
- Result: Agent continues without tool results
- Action: Edit tool parameters before execution
- Implementation: Update message with new parameters
- Result: Tool runs with modified inputs
const approveToolExecution = useCallback(
async (toolCallId: string, action: "allow" | "deny") => {
await handleStreamResponse({
threadId,
text: "",
opts: { allowTool: action },
});
},
[threadId, handleStreamResponse],
);Client Request → API Route → Agent Stream → SSE Response → Client Handler
export async function POST(request: Request) {
const stream = new ReadableStream({
async start(controller) {
try {
const responseGenerator = streamResponse(params);
for await (const messageResponse of responseGenerator) {
const data = JSON.stringify(messageResponse);
controller.enqueue(new TextEncoder().encode(`data: ${data}\n\n`));
}
controller.enqueue(new TextEncoder().encode(`event: done\ndata: {}\n\n`));
} catch (error) {
controller.enqueue(
new TextEncoder().encode(
`event: error\ndata: ${JSON.stringify({ message: error.message })}\n\n`,
),
);
} finally {
controller.close();
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}stream.onmessage = (event: MessageEvent) => {
const messageResponse = JSON.parse(event.data) as MessageResponse;
const data = messageResponse.data as AIMessageData;
// First chunk: create new message
if (!currentMessageRef.current || currentMessageRef.current.data.id !== data.id) {
currentMessageRef.current = messageResponse;
queryClient.setQueryData(["messages", threadId], (old: MessageResponse[] = []) => [
...old,
currentMessageRef.current!,
]);
} else {
// Subsequent chunks: accumulate content
const currentData = currentMessageRef.current.data as AIMessageData;
const newContent = currentData.content + data.content;
currentMessageRef.current = {
...currentMessageRef.current,
data: { ...currentData, content: newContent },
};
// Update React Query cache
queryClient.setQueryData(["messages", threadId], (old: MessageResponse[] = []) => {
const idx = old.findIndex((m) => m.data?.id === currentMessageRef.current!.data.id);
if (idx === -1) return old;
const clone = [...old];
clone[idx] = currentMessageRef.current!;
return clone;
});
}
};type MessageResponse =
| { type: "human"; data: HumanMessageData }
| { type: "ai"; data: AIMessageData }
| { type: "tool"; data: ToolMessageData }
| { type: "error"; data: ErrorMessageData };
interface AIMessageData {
id: string;
content: string;
tool_calls?: ToolCall[];
additional_kwargs?: Record<string, unknown>;
response_metadata?: Record<string, unknown>;
}- Causes: Connection failures, timeouts
- Handling: Retry with exponential backoff
- UI: Error message with retry button
- Causes: Invalid API keys, expired tokens
- Handling: Clear invalid credentials, prompt for re-auth
- UI: Settings panel with credential update
- Causes: Server unavailable, configuration issues
- Handling: Graceful degradation, disable failed servers
- UI: Server status indicators in settings
- Causes: Invalid parameters, permission issues
- Handling: Return error to agent for recovery
- UI: Error display in tool result
// Stream error handling
stream.addEventListener("error", async (ev: Event) => {
try {
const dataText = (ev as MessageEvent<string>)?.data;
const message = extractErrorMessage(dataText);
// Surface error in chat
const errorMsg: MessageResponse = {
type: "error",
data: { id: `err-${Date.now()}`, content: `⚠️ ${message}` },
};
queryClient.setQueryData(["messages", threadId], (old: MessageResponse[] = []) => [
...old,
errorMsg,
]);
} finally {
// Always cleanup
setIsSending(false);
currentMessageRef.current = null;
stream.close();
streamRef.current = null;
}
});- Strategy: Stale-while-revalidate
- Cache Time: 5 minutes for message history
- Background Refetch: On window focus
- Usage: Memoize expensive renders
- Example: Message list virtualization for long conversations
- Route-based: Automatic with Next.js App Router
- Component-based: Dynamic imports for heavy components
-- Thread lookup optimization
CREATE INDEX idx_thread_updated_at ON "Thread" ("updatedAt" DESC);
-- MCP server query optimization
CREATE INDEX idx_mcpserver_enabled ON "MCPServer" ("enabled") WHERE enabled = true;- Database: Prisma connection pooling
- MCP Servers: Reuse client connections
- Chunking: Optimal chunk sizes for SSE
- Backpressure: Handle slow clients gracefully
useEffect(
() => () => {
if (streamRef.current) {
try {
streamRef.current.close();
} catch {}
}
},
[],
);- Automatic: Old checkpoints cleaned by framework
- Configuration: Retention policies via checkpointer settings
logger.info("Agent processing started", {
threadId,
model: opts?.model,
toolCount: tools.length,
approveAllTools: opts?.approveAllTools,
});- Client: Error boundaries with error reporting
- Server: Centralized error logging with context
- Response Time: Track agent processing duration
- Tool Usage: Monitor MCP server performance
- Stream Health: SSE connection success rates
export async function healthCheck() {
try {
await prisma.$queryRaw`SELECT 1`;
return { status: "healthy", database: "connected" };
} catch (error) {
return { status: "unhealthy", database: "disconnected", error };
}
}export async function checkMCPServers() {
const servers = await prisma.mCPServer.findMany({ where: { enabled: true } });
const statuses = await Promise.allSettled(servers.map((server) => testMCPConnection(server)));
return statuses.map((status, i) => ({
server: servers[i].name,
status: status.status,
error: status.status === "rejected" ? status.reason : null,
}));
}This architecture is designed for scalability, maintainability, and extensibility. The modular design allows for easy addition of new features while maintaining clean separation of concerns. The comprehensive error handling and performance optimizations ensure a robust production-ready system.