diff --git a/package-lock.json b/package-lock.json index 205ad02..260f021 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,6 +17,7 @@ "@fastify/compress": "^8.3.1", "@fastify/cookie": "^11.0.2", "@fastify/static": "^8.0.0", + "@fastify/websocket": "^11.2.0", "@xterm/addon-fit": "^0.11.0", "@xterm/addon-unicode11": "^0.9.0", "@xterm/addon-webgl": "^0.19.0", @@ -45,6 +46,7 @@ "@types/react": "^19.2.14", "@types/uuid": "^10.0.0", "@types/web-push": "^3.6.4", + "@types/ws": "^8.18.1", "@vitest/coverage-v8": "^4.0.18", "agent-browser": "^0.6.0", "esbuild": "^0.27.3", @@ -945,6 +947,53 @@ "glob": "^11.0.0" } }, + "node_modules/@fastify/websocket": { + "version": "11.2.0", + "resolved": "https://registry.npmjs.org/@fastify/websocket/-/websocket-11.2.0.tgz", + "integrity": "sha512-3HrDPbAG1CzUCqnslgJxppvzaAZffieOVbLp1DAy1huCSynUWPifSvfdEDUR8HlJLp3sp1A36uOM2tJogADS8w==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT", + "dependencies": { + "duplexify": "^4.1.3", + "fastify-plugin": "^5.0.0", + "ws": "^8.16.0" + } + }, + "node_modules/@fastify/websocket/node_modules/duplexify": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/duplexify/-/duplexify-4.1.3.tgz", + "integrity": "sha512-M3BmBhwJRZsSx38lZyhE53Csddgzl5R7xGJNk7CVddZD6CcmwMCH8J+7AprIrQKH7TonKxaCjcv27Qmf+sQ+oA==", + "license": "MIT", + "dependencies": { + "end-of-stream": "^1.4.1", + "inherits": "^2.0.3", + "readable-stream": "^3.1.1", + "stream-shift": "^1.0.2" + } + }, + "node_modules/@fastify/websocket/node_modules/readable-stream": { + "version": "3.6.2", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-3.6.2.tgz", + "integrity": "sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==", + "license": "MIT", + "dependencies": { + "inherits": "^2.0.3", + "string_decoder": "^1.1.1", + "util-deprecate": "^1.0.1" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/@humanfs/core": { "version": "0.19.1", "dev": true, @@ -2118,6 +2167,16 @@ "@types/node": "*" } }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/yauzl": { "version": "2.10.3", "dev": true, @@ -8508,7 +8567,6 @@ }, "node_modules/ws": { "version": "8.19.0", - "dev": true, "license": "MIT", "engines": { "node": ">=10.0.0" diff --git a/package.json b/package.json index 48bc09b..4df62a4 100644 --- a/package.json +++ b/package.json @@ -51,6 +51,7 @@ "@fastify/compress": "^8.3.1", "@fastify/cookie": "^11.0.2", "@fastify/static": "^8.0.0", + "@fastify/websocket": "^11.2.0", "@xterm/addon-fit": "^0.11.0", "@xterm/addon-unicode11": "^0.9.0", "@xterm/addon-webgl": "^0.19.0", @@ -76,6 +77,7 @@ "@types/react": "^19.2.14", "@types/uuid": "^10.0.0", "@types/web-push": "^3.6.4", + "@types/ws": "^8.18.1", "@vitest/coverage-v8": "^4.0.18", "agent-browser": "^0.6.0", "esbuild": "^0.27.3", diff --git a/src/web/public/app.js b/src/web/public/app.js index 8479d36..ab908cf 100644 --- a/src/web/public/app.js +++ b/src/web/public/app.js @@ -172,9 +172,9 @@ const _SSE_HANDLER_MAP = [ [SSE_EVENTS.SESSION_CREATED, '_onSessionCreated'], [SSE_EVENTS.SESSION_UPDATED, '_onSessionUpdated'], [SSE_EVENTS.SESSION_DELETED, '_onSessionDeleted'], - [SSE_EVENTS.SESSION_TERMINAL, '_onSessionTerminal'], - [SSE_EVENTS.SESSION_NEEDS_REFRESH, '_onSessionNeedsRefresh'], - [SSE_EVENTS.SESSION_CLEAR_TERMINAL, '_onSessionClearTerminal'], + [SSE_EVENTS.SESSION_TERMINAL, '_onSSETerminal'], + [SSE_EVENTS.SESSION_NEEDS_REFRESH, '_onSSENeedsRefresh'], + [SSE_EVENTS.SESSION_CLEAR_TERMINAL, '_onSSEClearTerminal'], [SSE_EVENTS.SESSION_COMPLETION, '_onSessionCompletion'], [SSE_EVENTS.SESSION_ERROR, '_onSessionError'], [SSE_EVENTS.SESSION_EXIT, '_onSessionExit'], @@ -361,6 +361,11 @@ class CodemanApp { // Tracks pending hook events that need resolution (permission_prompt, elicitation_dialog, idle_prompt) this.pendingHooks = new Map(); + // WebSocket terminal I/O (low-latency bypass of HTTP POST + SSE) + this._ws = null; // WebSocket instance for active session + this._wsSessionId = null; // Session ID the WS is connected to + this._wsReady = false; // True when WS is open and ready for I/O + // Terminal write batching with DEC 2026 sync support this.pendingWrites = []; this.writeFrameScheduled = false; @@ -1864,6 +1869,7 @@ class CodemanApp { } _onSessionDeleted(data) { + if (this._wsSessionId === data.id) this._disconnectWs(); this._cleanupSessionData(data.id); if (this.activeSessionId === data.id) { this.activeSessionId = null; @@ -1878,6 +1884,21 @@ class CodemanApp { if (this.sessions.size === 0) this.stopSystemStatsPolling(); } + // SSE wrappers — skip terminal events when WebSocket is delivering for this session. + // WS handler calls the underlying _onSession* methods directly. + _onSSETerminal(data) { + if (this._wsReady && this._wsSessionId === data.id) return; + this._onSessionTerminal(data); + } + _onSSENeedsRefresh(data) { + if (this._wsReady && this._wsSessionId === data?.id) return; + this._onSessionNeedsRefresh(data); + } + _onSSEClearTerminal(data) { + if (this._wsReady && this._wsSessionId === data?.id) return; + this._onSessionClearTerminal(data); + } + _onSessionTerminal(data) { if (data.id === this.activeSessionId) { if (data.data.length > 32768) _crashDiag.log(`TERMINAL: ${(data.data.length/1024).toFixed(0)}KB`); @@ -1982,6 +2003,7 @@ class CodemanApp { } _onSessionExit(data) { + if (this._wsSessionId === data.id) this._disconnectWs(); const session = this.sessions.get(data.id); if (session) { session.status = 'stopped'; @@ -2835,6 +2857,72 @@ class CodemanApp { } } + // ═══════════════════════════════════════════════════════════════ + // WebSocket Terminal I/O + // ═══════════════════════════════════════════════════════════════ + + /** + * Open a WebSocket for terminal I/O on the given session. + * Replaces HTTP POST input and SSE terminal output with a single + * bidirectional connection. Falls back to SSE+POST if WS fails. + */ + _connectWs(sessionId) { + this._disconnectWs(); + + const proto = location.protocol === 'https:' ? 'wss:' : 'ws:'; + const url = `${proto}//${location.host}/ws/sessions/${sessionId}/terminal`; + const ws = new WebSocket(url); + this._ws = ws; + this._wsSessionId = sessionId; + + ws.onopen = () => { + // Only mark ready if this is still the intended session + if (this._ws === ws) { + this._wsReady = true; + } + }; + + ws.onmessage = (event) => { + if (this._ws !== ws) return; + try { + const msg = JSON.parse(event.data); + if (msg.t === 'o') { + // Terminal output — route through the same batching pipeline as SSE + this._onSessionTerminal({ id: sessionId, data: msg.d }); + } else if (msg.t === 'c') { + this._onSessionClearTerminal({ id: sessionId }); + } else if (msg.t === 'r') { + this._onSessionNeedsRefresh({ id: sessionId }); + } + } catch { + // Ignore malformed messages + } + }; + + ws.onclose = () => { + if (this._ws === ws) { + this._ws = null; + this._wsSessionId = null; + this._wsReady = false; + } + }; + + ws.onerror = () => { + // onclose will fire after onerror — cleanup happens there + }; + } + + /** Close the active WebSocket connection (if any). */ + _disconnectWs() { + if (this._ws) { + this._ws.onclose = null; // Prevent re-entrant cleanup + this._ws.close(); + this._ws = null; + this._wsSessionId = null; + this._wsReady = false; + } + } + /** * Send input to server without blocking the keystroke flush cycle. * Uses a sequential promise chain to preserve character ordering @@ -2847,11 +2935,19 @@ class CodemanApp { return; } - // Chain on dispatch only — wait for the previous request to be sent before - // dispatching the next one (preserves keystroke ordering), but don't wait - // for the server's response. The server handles writeViaMux as - // fire-and-forget anyway, so the HTTP response carries no useful data - // beyond success/failure for retry purposes. + // Fast path: WebSocket — fire-and-forget, inherently ordered (single TCP stream). + if (this._wsReady && this._wsSessionId === sessionId) { + try { + this._ws.send(JSON.stringify({ t: 'i', d: input })); + this.clearPendingHooks(sessionId); + return; + } catch { + // WS send failed — fall through to HTTP POST + } + } + + // Slow path: HTTP POST — chain on dispatch only, don't wait for response. + // The server handles writeViaMux as fire-and-forget anyway. this._inputSendChain = this._inputSendChain.then(() => { const fetchPromise = fetch(`/api/sessions/${sessionId}/input`, { method: 'POST', @@ -3632,6 +3728,9 @@ class CodemanApp { if (selectGen !== this._selectGeneration) return; // newer tab switch won + // Close WebSocket for previous session (new one opens after buffer load) + this._disconnectWs(); + // Clean up flicker filter state when switching sessions if (this.flickerFilterTimeout) { clearTimeout(this.flickerFilterTimeout); @@ -3932,6 +4031,9 @@ class CodemanApp { } }); + // Open WebSocket for low-latency terminal I/O (after buffer load completes) + this._connectWs(sessionId); + _crashDiag.log('FOCUS'); this.terminal.focus(); this.terminal.scrollToBottom(); @@ -5254,6 +5356,15 @@ class CodemanApp { async sendResize(sessionId) { const dims = this.getTerminalDimensions(); if (!dims) return; + // Fast path: WebSocket resize + if (this._wsReady && this._wsSessionId === sessionId) { + try { + this._ws.send(JSON.stringify({ t: 'z', c: dims.cols, r: dims.rows })); + return; + } catch { + // Fall through to HTTP POST + } + } await fetch(`/api/sessions/${sessionId}/resize`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, diff --git a/src/web/routes/index.ts b/src/web/routes/index.ts index 26248e2..791fde2 100644 --- a/src/web/routes/index.ts +++ b/src/web/routes/index.ts @@ -14,3 +14,4 @@ export { registerSessionRoutes } from './session-routes.js'; export { registerRespawnRoutes } from './respawn-routes.js'; export { registerRalphRoutes } from './ralph-routes.js'; export { registerPlanRoutes } from './plan-routes.js'; +export { registerWsRoutes } from './ws-routes.js'; diff --git a/src/web/routes/ws-routes.ts b/src/web/routes/ws-routes.ts new file mode 100644 index 0000000..eca9778 --- /dev/null +++ b/src/web/routes/ws-routes.ts @@ -0,0 +1,136 @@ +/** + * @fileoverview WebSocket terminal I/O route. + * + * Provides a low-latency bidirectional channel for terminal input/output, + * bypassing the HTTP POST + SSE path that adds per-request middleware overhead. + * Auth is checked once on the WebSocket upgrade handshake (cookies are included + * automatically by the browser). After upgrade, the connection is raw — no + * per-message middleware processing. + * + * Additive: the existing HTTP POST /api/sessions/:id/input and SSE session:terminal + * paths remain fully functional. The frontend opts into WS when available and + * falls back transparently. + * + * Terminal output is micro-batched at 8ms to group Ink's rapid cursor-up redraws + * into single frames, preventing flicker from split ANSI sequences. This matches + * the SSE path's server-side batching (16-50ms) but at a shorter interval since + * WS has no Traefik buffering overhead. + * + * Protocol (all JSON text frames): + * Server -> Client: + * {"t":"o","d":"..."} — terminal output + * {"t":"c"} — clear terminal + * {"t":"r"} — needs refresh (reload buffer) + * Client -> Server: + * {"t":"i","d":"..."} — input (keystroke or paste) + * {"t":"z","c":N,"r":N} — resize terminal + */ + +import { FastifyInstance } from 'fastify'; +import type { WebSocket } from 'ws'; +import type { SessionPort } from '../ports/session-port.js'; +import { MAX_INPUT_LENGTH } from '../../config/terminal-limits.js'; + +/** Micro-batch interval for terminal output (ms). Short enough for low latency, + * long enough to group Ink's rapid cursor-up redraw sequences into single frames. */ +const WS_BATCH_INTERVAL_MS = 8; + +/** Flush immediately when batch exceeds this size (bytes) for responsiveness. */ +const WS_BATCH_FLUSH_THRESHOLD = 16384; + +/** DEC 2026 synchronized update markers. Wrapping output in these tells xterm.js + * to buffer all content and render atomically in a single frame — eliminates + * flicker from cursor-up redraws that Ink sends without its own sync markers + * (DA capability negotiation fails through the PTY→server→WS proxy chain). */ +const DEC_2026_START = '\x1b[?2026h'; +const DEC_2026_END = '\x1b[?2026l'; + +export function registerWsRoutes(app: FastifyInstance, ctx: SessionPort): void { + app.get<{ Params: { id: string } }>('/ws/sessions/:id/terminal', { websocket: true }, (socket: WebSocket, req) => { + const { id } = req.params; + const session = ctx.sessions.get(id); + + if (!session) { + socket.close(4004, 'Session not found'); + return; + } + + // Per-connection micro-batch state + let batchChunks: string[] = []; + let batchSize = 0; + let batchTimer: ReturnType | null = null; + + const flushBatch = () => { + batchTimer = null; + if (batchChunks.length === 0 || socket.readyState !== 1) { + batchChunks = []; + batchSize = 0; + return; + } + const data = batchChunks.join(''); + batchChunks = []; + batchSize = 0; + socket.send(`{"t":"o","d":${JSON.stringify(DEC_2026_START + data + DEC_2026_END)}}`); + }; + + // Attach message handler synchronously BEFORE any async work + // (@fastify/websocket requirement to avoid dropped messages). + socket.on('message', (raw) => { + try { + const msg = JSON.parse(String(raw)); + if (msg.t === 'i' && typeof msg.d === 'string') { + if (msg.d.length > MAX_INPUT_LENGTH) return; + session.write(msg.d); + } else if (msg.t === 'z' && typeof msg.c === 'number' && typeof msg.r === 'number') { + session.resize(msg.c, msg.r); + } + } catch { + // Ignore malformed messages + } + }); + + // Terminal output -> micro-batched WS send + const onTerminal = (data: string) => { + batchChunks.push(data); + batchSize += data.length; + + // Flush immediately for large batches (responsiveness during bulk output) + if (batchSize > WS_BATCH_FLUSH_THRESHOLD) { + if (batchTimer) { + clearTimeout(batchTimer); + } + flushBatch(); + return; + } + + // Start timer if not already running + if (!batchTimer) { + batchTimer = setTimeout(flushBatch, WS_BATCH_INTERVAL_MS); + } + }; + + const onClearTerminal = () => { + if (socket.readyState === 1) { + socket.send('{"t":"c"}'); + } + }; + + const onNeedsRefresh = () => { + if (socket.readyState === 1) { + socket.send('{"t":"r"}'); + } + }; + + session.on('terminal', onTerminal); + session.on('clearTerminal', onClearTerminal); + session.on('needsRefresh', onNeedsRefresh); + + socket.on('close', () => { + if (batchTimer) clearTimeout(batchTimer); + batchChunks = []; + session.off('terminal', onTerminal); + session.off('clearTerminal', onClearTerminal); + session.off('needsRefresh', onNeedsRefresh); + }); + }); +} diff --git a/src/web/server.ts b/src/web/server.ts index ccbd857..e65d80a 100644 --- a/src/web/server.ts +++ b/src/web/server.ts @@ -31,6 +31,7 @@ import Fastify, { FastifyInstance, FastifyReply } from 'fastify'; import fastifyCompress from '@fastify/compress'; import fastifyCookie from '@fastify/cookie'; import fastifyStatic from '@fastify/static'; +import fastifyWebsocket from '@fastify/websocket'; import { join, dirname } from 'node:path'; import { fileURLToPath } from 'node:url'; import { existsSync, mkdirSync, readFileSync, chmodSync } from 'node:fs'; @@ -103,6 +104,7 @@ import { registerRespawnRoutes, registerRalphRoutes, registerPlanRoutes, + registerWsRoutes, } from './routes/index.js'; const __dirname = dirname(fileURLToPath(import.meta.url)); @@ -563,6 +565,9 @@ export class WebServer extends EventEmitter { this.qrAuthFailures = authState.qrAuthFailures; } + // WebSocket support (terminal I/O — low-latency bidirectional channel) + await this.app.register(fastifyWebsocket); + // Security headers + CORS registerSecurityHeaders(this.app, this.https); // Service worker must never be cached — browsers check for SW updates on navigation @@ -700,6 +705,7 @@ export class WebServer extends EventEmitter { registerRespawnRoutes(this.app, ctx); registerRalphRoutes(this.app, ctx); registerPlanRoutes(this.app, ctx); + registerWsRoutes(this.app, ctx); } /**