-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Expand file tree
/
Copy pathssePollingExample.ts
More file actions
135 lines (113 loc) · 4.89 KB
/
ssePollingExample.ts
File metadata and controls
135 lines (113 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
* SSE Polling Example Server (SEP-1699)
*
* This example demonstrates server-initiated SSE stream disconnection
* and client reconnection with Last-Event-ID for resumability.
*
* Key features:
* - Configures `retryInterval` to tell clients how long to wait before reconnecting
* - Uses `eventStore` to persist events for replay after reconnection
* - Uses `ctx.http?.closeSSE()` callback to gracefully disconnect clients mid-operation
*
* Run with: pnpm tsx src/ssePollingExample.ts
* Test with: curl or the MCP Inspector
*/
import { randomUUID } from 'node:crypto';
import { createMcpExpressApp } from '@modelcontextprotocol/express';
import { NodeStreamableHTTPServerTransport } from '@modelcontextprotocol/node';
import type { CallToolResult } from '@modelcontextprotocol/server';
import { McpServer } from '@modelcontextprotocol/server';
import cors from 'cors';
import type { Request, Response } from 'express';
import { InMemoryEventStore } from './inMemoryEventStore.js';
// Create a fresh MCP server per client connection to avoid shared state between clients
const getServer = () => {
const server = new McpServer(
{
name: 'sse-polling-example',
version: '1.0.0'
},
{
capabilities: { logging: {} }
}
);
// Register a long-running tool that demonstrates server-initiated disconnect
server.registerTool(
'long-task',
{
description: 'A long-running task that sends progress updates. Server will disconnect mid-task to demonstrate polling.'
},
async (ctx): Promise<CallToolResult> => {
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
console.log(`[${ctx.sessionId}] Starting long-task...`);
// Send first progress notification
await ctx.mcpReq.log('info', 'Progress: 25% - Starting work...');
await sleep(1000);
// Send second progress notification
await ctx.mcpReq.log('info', 'Progress: 50% - Halfway there...');
await sleep(1000);
// Server decides to disconnect the client to free resources
// Client will reconnect via GET with Last-Event-ID after the transport's retryInterval
// Use ctx.http?.closeSSE callback - available when eventStore is configured
if (ctx.http?.closeSSE) {
console.log(`[${ctx.sessionId}] Closing SSE stream to trigger client polling...`);
ctx.http?.closeSSE();
}
// Continue processing while client is disconnected
// Events are stored in eventStore and will be replayed on reconnect
await sleep(500);
await ctx.mcpReq.log('info', 'Progress: 75% - Almost done (sent while client disconnected)...');
await sleep(500);
await ctx.mcpReq.log('info', 'Progress: 100% - Complete!');
console.log(`[${ctx.sessionId}] Task complete`);
return {
content: [
{
type: 'text',
text: 'Long task completed successfully!'
}
]
};
}
);
return server;
};
// Set up Express app
const app = createMcpExpressApp();
app.use(cors());
// Create event store for resumability
const eventStore = new InMemoryEventStore();
// Track transports by session ID for session reuse
const transports = new Map<string, NodeStreamableHTTPServerTransport>();
// Handle all MCP requests
app.all('/mcp', async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
// Reuse existing transport or create new one
let transport = sessionId ? transports.get(sessionId) : undefined;
if (!transport) {
transport = new NodeStreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore,
retryInterval: 2000, // Default retry interval for priming events
onsessioninitialized: id => {
console.log(`[${id}] Session initialized`);
transports.set(id, transport!);
}
});
// Connect a fresh MCP server to the transport
const server = getServer();
await server.connect(transport);
}
await transport.handleRequest(req, res, req.body);
});
// Start the server
const PORT = 3001;
app.listen(PORT, () => {
console.log(`SSE Polling Example Server running on http://localhost:${PORT}/mcp`);
console.log('');
console.log('This server demonstrates SEP-1699 SSE polling:');
console.log('- retryInterval: 2000ms (client waits 2s before reconnecting)');
console.log('- eventStore: InMemoryEventStore (events are persisted for replay)');
console.log('');
console.log('Try calling the "long-task" tool to see server-initiated disconnect in action.');
});