diff --git a/.github/workflows/version-check.yml b/.github/workflows/version-check.yml deleted file mode 100644 index b327649fc5..0000000000 --- a/.github/workflows/version-check.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: Version Consistency Check - -on: - push: - branches: - - main - pull_request: - release: - types: [published] - -jobs: - github: - name: Check GitHub server version consistency - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Check version consistency - run: | - PACKAGE_VERSION=$(node -p "require('./src/github/package.json').version") - TS_VERSION=$(grep -o '".*"' ./src/github/common/version.ts | tr -d '"') - - if [ "$PACKAGE_VERSION" != "$TS_VERSION" ]; then - echo "::error::Version mismatch detected!" - echo "::error::package.json version: $PACKAGE_VERSION" - echo "::error::version.ts version: $TS_VERSION" - exit 1 - else - echo "✅ Versions match: $PACKAGE_VERSION" - fi diff --git a/src/everything/sse.ts b/src/everything/sse.ts index 01794cddd2..a657af75be 100644 --- a/src/everything/sse.ts +++ b/src/everything/sse.ts @@ -6,36 +6,48 @@ console.error('Starting SSE server...'); const app = express(); -const { server, cleanup } = createServer(); - -let transport: SSEServerTransport; +const transports: Map = new Map(); app.get("/sse", async (req, res) => { - console.error("Received connection"); - transport = new SSEServerTransport("/message", res); - await server.connect(transport); - - server.onclose = async () => { - await cleanup(); - await server.close(); - }; + let transport: SSEServerTransport; + const { server, cleanup } = createServer(); + + if (req?.query?.sessionId) { + const sessionId = (req?.query?.sessionId as string); + transport = transports.get(sessionId) as SSEServerTransport; + console.error("Client Reconnecting? This shouldn't happen; when client has a sessionId, GET /sse should not be called again.", transport.sessionId); + } else { + // Create and store transport for new session + transport = new SSEServerTransport("/message", res); + transports.set(transport.sessionId, transport); + + // Connect server to transport + await server.connect(transport); + console.error("Client Connected: ", transport.sessionId); + + // Handle close of connection + server.onclose = async () => { + console.error("Client Disconnected: ", transport.sessionId); + transports.delete(transport.sessionId); + await cleanup(); + }; + + } }); app.post("/message", async (req, res) => { - console.error("Received message"); - - await transport.handlePostMessage(req, res); -}); - -process.on("SIGINT", async () => { - await cleanup(); - await server.close(); - process.exit(0); + const sessionId = (req?.query?.sessionId as string); + const transport = transports.get(sessionId); + if (transport) { + console.error("Client Message from", sessionId); + await transport.handlePostMessage(req, res); + } else { + console.error(`No transport found for sessionId ${sessionId}`) + } }); const PORT = process.env.PORT || 3001; app.listen(PORT, () => { console.error(`Server is running on port ${PORT}`); }); - diff --git a/src/everything/streamableHttp.ts b/src/everything/streamableHttp.ts index 6f07c4810b..f748fd2ade 100644 --- a/src/everything/streamableHttp.ts +++ b/src/everything/streamableHttp.ts @@ -8,9 +8,7 @@ console.error('Starting Streamable HTTP server...'); const app = express(); -const { server, cleanup } = createServer(); - -const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; +const transports: Map = new Map(); app.post('/mcp', async (req: Request, res: Response) => { console.error('Received MCP POST request'); @@ -19,10 +17,13 @@ app.post('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; let transport: StreamableHTTPServerTransport; - if (sessionId && transports[sessionId]) { + if (sessionId && transports.has(sessionId)) { // Reuse existing transport - transport = transports[sessionId]; + transport = transports.get(sessionId)!; } else if (!sessionId) { + + const { server, cleanup } = createServer(); + // New initialization request const eventStore = new InMemoryEventStore(); transport = new StreamableHTTPServerTransport({ @@ -32,16 +33,18 @@ app.post('/mcp', async (req: Request, res: Response) => { // Store the transport by session ID when session is initialized // This avoids race conditions where requests might come in before the session is stored console.error(`Session initialized with ID: ${sessionId}`); - transports[sessionId] = transport; + transports.set(sessionId, transport); } }); + // Set up onclose handler to clean up transport when closed - transport.onclose = () => { + server.onclose = async () => { const sid = transport.sessionId; - if (sid && transports[sid]) { + if (sid && transports.has(sid)) { console.error(`Transport closed for session ${sid}, removing from transports map`); - delete transports[sid]; + transports.delete(sid); + await cleanup(); } }; @@ -87,7 +90,7 @@ app.post('/mcp', async (req: Request, res: Response) => { app.get('/mcp', async (req: Request, res: Response) => { console.error('Received MCP GET request'); const sessionId = req.headers['mcp-session-id'] as string | undefined; - if (!sessionId || !transports[sessionId]) { + if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { @@ -107,14 +110,14 @@ app.get('/mcp', async (req: Request, res: Response) => { console.error(`Establishing new SSE stream for session ${sessionId}`); } - const transport = transports[sessionId]; - await transport.handleRequest(req, res); + const transport = transports.get(sessionId); + await transport!.handleRequest(req, res); }); // Handle DELETE requests for session termination (according to MCP spec) app.delete('/mcp', async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; - if (!sessionId || !transports[sessionId]) { + if (!sessionId || !transports.has(sessionId)) { res.status(400).json({ jsonrpc: '2.0', error: { @@ -129,8 +132,8 @@ app.delete('/mcp', async (req: Request, res: Response) => { console.error(`Received session termination request for session ${sessionId}`); try { - const transport = transports[sessionId]; - await transport.handleRequest(req, res); + const transport = transports.get(sessionId); + await transport!.handleRequest(req, res); } catch (error) { console.error('Error handling session termination:', error); if (!res.headersSent) { @@ -161,14 +164,13 @@ process.on('SIGINT', async () => { for (const sessionId in transports) { try { console.error(`Closing transport for session ${sessionId}`); - await transports[sessionId].close(); - delete transports[sessionId]; + await transports.get(sessionId)!.close(); + transports.delete(sessionId); } catch (error) { console.error(`Error closing transport for session ${sessionId}:`, error); } } - await cleanup(); - await server.close(); + console.error('Server shutdown complete'); process.exit(0); });