-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathstandaloneSseWithGetStreamableHttp.ts
More file actions
161 lines (142 loc) · 5.87 KB
/
standaloneSseWithGetStreamableHttp.ts
File metadata and controls
161 lines (142 loc) · 5.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import { randomUUID } from 'node:crypto';
import { createMcpExpressApp } from '@modelcontextprotocol/express';
import { NodeStreamableHTTPServerTransport } from '@modelcontextprotocol/node';
import type { ReadResourceResult } from '@modelcontextprotocol/server';
import { isInitializeRequest, McpServer } from '@modelcontextprotocol/server';
import type { Request, Response } from 'express';
// Helper to register a dynamic resource on a given server instance
const addResource = (server: McpServer, name: string, content: string) => {
const uri = `https://mcp-example.com/dynamic/${encodeURIComponent(name)}`;
server.registerResource(
name,
uri,
{ mimeType: 'text/plain', description: `Dynamic resource: ${name}` },
async (): Promise<ReadResourceResult> => {
return {
contents: [{ uri, text: content }]
};
}
);
};
// Create a fresh MCP server per client connection to avoid shared state between clients
const getServer = () => {
const server = new McpServer({
name: 'resource-list-changed-notification-server',
version: '1.0.0'
});
addResource(server, 'example-resource', 'Initial content for example-resource');
return server;
};
// Store transports and their associated servers by session ID
const transports: { [sessionId: string]: NodeStreamableHTTPServerTransport } = {};
const servers: { [sessionId: string]: McpServer } = {};
// Periodically add a new resource to all active server instances for testing
const resourceChangeInterval = setInterval(() => {
const name = randomUUID();
for (const sessionId in servers) {
addResource(servers[sessionId]!, name, `Content for ${name}`);
}
}, 5000); // Change resources every 5 seconds for testing
const app = createMcpExpressApp();
app.post('/mcp', async (req: Request, res: Response) => {
console.log('Received MCP request:', req.body);
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: NodeStreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
// New initialization request - create a fresh server for this client
const server = getServer();
transport = new NodeStreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: sessionId => {
// Store the transport and server by session ID when session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
servers[sessionId] = server;
}
});
// Clean up both maps when the transport closes
transport.onclose = () => {
const sid = transport.sessionId;
if (sid) {
delete transports[sid];
delete servers[sid];
}
};
// Connect the fresh MCP server to the transport
await server.connect(transport);
// Handle the request - the onsessioninitialized callback will store the transport
await transport.handleRequest(req, res, req.body);
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32_000,
message: 'Bad Request: No valid session ID provided'
},
id: null
});
return;
}
// Handle the request with existing transport
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error('Error handling MCP request:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32_603,
message: 'Internal server error'
},
id: null
});
}
}
});
// Handle GET requests for SSE streams (now using built-in support from StreamableHTTP)
app.get('/mcp', async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
if (!sessionId || !transports[sessionId]) {
res.status(400).send('Invalid or missing session ID');
return;
}
console.log(`Establishing SSE stream for session ${sessionId}`);
const transport = transports[sessionId];
await transport.handleRequest(req, res);
});
// Start the server
const PORT = 3000;
app.listen(PORT, error => {
if (error) {
console.error('Failed to start server:', error);
// eslint-disable-next-line unicorn/no-process-exit
process.exit(1);
}
console.log(`Server listening on port ${PORT}`);
});
// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');
clearInterval(resourceChangeInterval);
// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId]!.close();
delete transports[sessionId];
delete servers[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log('Server shutdown complete');
process.exit(0);
});