Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"@fastify/compress": "^8.3.1",
"@fastify/cookie": "^11.0.2",
"@fastify/static": "^8.0.0",
"@fastify/websocket": "^11.2.0",
"@xterm/addon-fit": "^0.11.0",
"@xterm/addon-unicode11": "^0.9.0",
"@xterm/addon-webgl": "^0.19.0",
Expand All @@ -76,6 +77,7 @@
"@types/react": "^19.2.14",
"@types/uuid": "^10.0.0",
"@types/web-push": "^3.6.4",
"@types/ws": "^8.18.1",
"@vitest/coverage-v8": "^4.0.18",
"agent-browser": "^0.6.0",
"esbuild": "^0.27.3",
Expand Down
127 changes: 119 additions & 8 deletions src/web/public/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ const _SSE_HANDLER_MAP = [
[SSE_EVENTS.SESSION_CREATED, '_onSessionCreated'],
[SSE_EVENTS.SESSION_UPDATED, '_onSessionUpdated'],
[SSE_EVENTS.SESSION_DELETED, '_onSessionDeleted'],
[SSE_EVENTS.SESSION_TERMINAL, '_onSessionTerminal'],
[SSE_EVENTS.SESSION_NEEDS_REFRESH, '_onSessionNeedsRefresh'],
[SSE_EVENTS.SESSION_CLEAR_TERMINAL, '_onSessionClearTerminal'],
[SSE_EVENTS.SESSION_TERMINAL, '_onSSETerminal'],
[SSE_EVENTS.SESSION_NEEDS_REFRESH, '_onSSENeedsRefresh'],
[SSE_EVENTS.SESSION_CLEAR_TERMINAL, '_onSSEClearTerminal'],
[SSE_EVENTS.SESSION_COMPLETION, '_onSessionCompletion'],
[SSE_EVENTS.SESSION_ERROR, '_onSessionError'],
[SSE_EVENTS.SESSION_EXIT, '_onSessionExit'],
Expand Down Expand Up @@ -361,6 +361,11 @@ class CodemanApp {
// Tracks pending hook events that need resolution (permission_prompt, elicitation_dialog, idle_prompt)
this.pendingHooks = new Map();

// WebSocket terminal I/O (low-latency bypass of HTTP POST + SSE)
this._ws = null; // WebSocket instance for active session
this._wsSessionId = null; // Session ID the WS is connected to
this._wsReady = false; // True when WS is open and ready for I/O

// Terminal write batching with DEC 2026 sync support
this.pendingWrites = [];
this.writeFrameScheduled = false;
Expand Down Expand Up @@ -1864,6 +1869,7 @@ class CodemanApp {
}

_onSessionDeleted(data) {
if (this._wsSessionId === data.id) this._disconnectWs();
this._cleanupSessionData(data.id);
if (this.activeSessionId === data.id) {
this.activeSessionId = null;
Expand All @@ -1878,6 +1884,21 @@ class CodemanApp {
if (this.sessions.size === 0) this.stopSystemStatsPolling();
}

// SSE wrappers — skip terminal events when WebSocket is delivering for this session.
// WS handler calls the underlying _onSession* methods directly.
_onSSETerminal(data) {
if (this._wsReady && this._wsSessionId === data.id) return;
this._onSessionTerminal(data);
}
_onSSENeedsRefresh(data) {
if (this._wsReady && this._wsSessionId === data?.id) return;
this._onSessionNeedsRefresh(data);
}
_onSSEClearTerminal(data) {
if (this._wsReady && this._wsSessionId === data?.id) return;
this._onSessionClearTerminal(data);
}

_onSessionTerminal(data) {
if (data.id === this.activeSessionId) {
if (data.data.length > 32768) _crashDiag.log(`TERMINAL: ${(data.data.length/1024).toFixed(0)}KB`);
Expand Down Expand Up @@ -1982,6 +2003,7 @@ class CodemanApp {
}

_onSessionExit(data) {
if (this._wsSessionId === data.id) this._disconnectWs();
const session = this.sessions.get(data.id);
if (session) {
session.status = 'stopped';
Expand Down Expand Up @@ -2835,6 +2857,72 @@ class CodemanApp {
}
}

// ═══════════════════════════════════════════════════════════════
// WebSocket Terminal I/O
// ═══════════════════════════════════════════════════════════════

/**
* Open a WebSocket for terminal I/O on the given session.
* Replaces HTTP POST input and SSE terminal output with a single
* bidirectional connection. Falls back to SSE+POST if WS fails.
*/
_connectWs(sessionId) {
this._disconnectWs();

const proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
const url = `${proto}//${location.host}/ws/sessions/${sessionId}/terminal`;
const ws = new WebSocket(url);
this._ws = ws;
this._wsSessionId = sessionId;

ws.onopen = () => {
// Only mark ready if this is still the intended session
if (this._ws === ws) {
this._wsReady = true;
}
};

ws.onmessage = (event) => {
if (this._ws !== ws) return;
try {
const msg = JSON.parse(event.data);
if (msg.t === 'o') {
// Terminal output — route through the same batching pipeline as SSE
this._onSessionTerminal({ id: sessionId, data: msg.d });
} else if (msg.t === 'c') {
this._onSessionClearTerminal({ id: sessionId });
} else if (msg.t === 'r') {
this._onSessionNeedsRefresh({ id: sessionId });
}
} catch {
// Ignore malformed messages
}
};

ws.onclose = () => {
if (this._ws === ws) {
this._ws = null;
this._wsSessionId = null;
this._wsReady = false;
}
};

ws.onerror = () => {
// onclose will fire after onerror — cleanup happens there
};
}

/** Close the active WebSocket connection (if any). */
_disconnectWs() {
if (this._ws) {
this._ws.onclose = null; // Prevent re-entrant cleanup
this._ws.close();
this._ws = null;
this._wsSessionId = null;
this._wsReady = false;
}
}

/**
* Send input to server without blocking the keystroke flush cycle.
* Uses a sequential promise chain to preserve character ordering
Expand All @@ -2847,11 +2935,19 @@ class CodemanApp {
return;
}

// Chain on dispatch only — wait for the previous request to be sent before
// dispatching the next one (preserves keystroke ordering), but don't wait
// for the server's response. The server handles writeViaMux as
// fire-and-forget anyway, so the HTTP response carries no useful data
// beyond success/failure for retry purposes.
// Fast path: WebSocket — fire-and-forget, inherently ordered (single TCP stream).
if (this._wsReady && this._wsSessionId === sessionId) {
try {
this._ws.send(JSON.stringify({ t: 'i', d: input }));
this.clearPendingHooks(sessionId);
return;
} catch {
// WS send failed — fall through to HTTP POST
}
}

// Slow path: HTTP POST — chain on dispatch only, don't wait for response.
// The server handles writeViaMux as fire-and-forget anyway.
this._inputSendChain = this._inputSendChain.then(() => {
const fetchPromise = fetch(`/api/sessions/${sessionId}/input`, {
method: 'POST',
Expand Down Expand Up @@ -3632,6 +3728,9 @@ class CodemanApp {

if (selectGen !== this._selectGeneration) return; // newer tab switch won

// Close WebSocket for previous session (new one opens after buffer load)
this._disconnectWs();

// Clean up flicker filter state when switching sessions
if (this.flickerFilterTimeout) {
clearTimeout(this.flickerFilterTimeout);
Expand Down Expand Up @@ -3932,6 +4031,9 @@ class CodemanApp {
}
});

// Open WebSocket for low-latency terminal I/O (after buffer load completes)
this._connectWs(sessionId);

_crashDiag.log('FOCUS');
this.terminal.focus();
this.terminal.scrollToBottom();
Expand Down Expand Up @@ -5254,6 +5356,15 @@ class CodemanApp {
async sendResize(sessionId) {
const dims = this.getTerminalDimensions();
if (!dims) return;
// Fast path: WebSocket resize
if (this._wsReady && this._wsSessionId === sessionId) {
try {
this._ws.send(JSON.stringify({ t: 'z', c: dims.cols, r: dims.rows }));
return;
} catch {
// Fall through to HTTP POST
}
}
await fetch(`/api/sessions/${sessionId}/resize`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
Expand Down
1 change: 1 addition & 0 deletions src/web/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ export { registerSessionRoutes } from './session-routes.js';
export { registerRespawnRoutes } from './respawn-routes.js';
export { registerRalphRoutes } from './ralph-routes.js';
export { registerPlanRoutes } from './plan-routes.js';
export { registerWsRoutes } from './ws-routes.js';
Loading