-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathjsonResponseStreamableHttp.ts
More file actions
165 lines (146 loc) · 5.6 KB
/
jsonResponseStreamableHttp.ts
File metadata and controls
165 lines (146 loc) · 5.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import { randomUUID } from 'node:crypto';
import { createMcpExpressApp } from '@modelcontextprotocol/express';
import { NodeStreamableHTTPServerTransport } from '@modelcontextprotocol/node';
import type { CallToolResult } from '@modelcontextprotocol/server';
import { isInitializeRequest, McpServer } from '@modelcontextprotocol/server';
import type { Request, Response } from 'express';
import * as z from 'zod/v4';
// Create an MCP server with implementation details
const getServer = () => {
const server = new McpServer(
{
name: 'json-response-streamable-http-server',
version: '1.0.0'
},
{
capabilities: {
logging: {}
}
}
);
// Register a simple tool that returns a greeting
server.registerTool(
'greet',
{
description: 'A simple greeting tool',
inputSchema: z.object({
name: z.string().describe('Name to greet')
})
},
async ({ name }): Promise<CallToolResult> => {
return {
content: [
{
type: 'text',
text: `Hello, ${name}!`
}
]
};
}
);
// Register a tool that sends multiple greetings with notifications
server.registerTool(
'multi-greet',
{
description: 'A tool that sends different greetings with delays between them',
inputSchema: z.object({
name: z.string().describe('Name to greet')
})
},
async ({ name }, ctx): Promise<CallToolResult> => {
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
await ctx.mcpReq.log('debug', `Starting multi-greet for ${name}`);
await sleep(1000); // Wait 1 second before first greeting
await ctx.mcpReq.log('info', `Sending first greeting to ${name}`);
await sleep(1000); // Wait another second before second greeting
await ctx.mcpReq.log('info', `Sending second greeting to ${name}`);
return {
content: [
{
type: 'text',
text: `Good morning, ${name}!`
}
]
};
}
);
return server;
};
const app = createMcpExpressApp();
// Map to store transports by session ID
const transports: { [sessionId: string]: NodeStreamableHTTPServerTransport } = {};
app.post('/mcp', async (req: Request, res: Response) => {
console.log('Received MCP request:', req.body);
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: NodeStreamableHTTPServerTransport;
if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
// New initialization request - use JSON response mode
transport = new NodeStreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
enableJsonResponse: true, // Enable JSON response mode
onsessioninitialized: sessionId => {
// Store the transport by session ID when session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports[sessionId] = transport;
}
});
// Connect the transport to the MCP server BEFORE handling the request
const server = getServer();
await server.connect(transport);
await transport.handleRequest(req, res, req.body);
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32_000,
message: 'Bad Request: No valid session ID provided'
},
id: null
});
return;
}
// Handle the request with existing transport - no need to reconnect
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error('Error handling MCP request:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32_603,
message: 'Internal server error'
},
id: null
});
}
}
});
// Handle GET requests for SSE streams according to spec
app.get('/mcp', async (req: Request, res: Response) => {
// Since this is a very simple example, we don't support GET requests for this server
// The spec requires returning 405 Method Not Allowed in this case
res.status(405).set('Allow', 'POST').send('Method Not Allowed');
});
// Start the server
const PORT = 3000;
app.listen(PORT, error => {
if (error) {
console.error('Failed to start server:', error);
// eslint-disable-next-line unicorn/no-process-exit
process.exit(1);
}
console.log(`MCP Streamable HTTP Server listening on port ${PORT}`);
});
// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');
process.exit(0);
});