Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/web/public/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class CodemanApp {
this.totalTokens = 0;
this.globalStats = null; // Global token/cost stats across all sessions
this.eventSource = null;
// Stable per-page client ID — lets the server target this connection
// for live filter updates (POST /api/events/subscribe) without forcing
// an SSE reconnect on session switches.
this._clientId = (typeof crypto !== 'undefined' && crypto.randomUUID)
? crypto.randomUUID()
: 'c-' + Math.random().toString(36).slice(2) + Date.now().toString(36);
this.terminal = null;
this.fitAddon = null;
this.activeSessionId = null;
Expand Down Expand Up @@ -696,6 +702,28 @@ class CodemanApp {
// SSE Connection
// ═══════════════════════════════════════════════════════════════

/**
* POST a live subscription update so the server filters terminal events
* to the given session(s) for this client. Fire-and-forget — failures
* are non-fatal because we'll still get every event we don't want
* (just at higher cost), and the next reconnect carries the filter via
* the SSE query string.
*/
_updateSseSubscription(sessionId) {
try {
const body = JSON.stringify({
clientId: this._clientId,
sessions: sessionId ? [sessionId] : null,
});
fetch('/api/events/subscribe', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
keepalive: true,
}).catch(() => { /* non-fatal */ });
} catch { /* non-fatal */ }
}

connectSSE() {
// Check if browser is offline
if (!navigator.onLine) {
Expand Down Expand Up @@ -725,7 +753,13 @@ class CodemanApp {
this.setConnectionStatus('reconnecting');
}

this.eventSource = new EventSource('/api/events');
// Build URL with stable client ID and (if known) the active-session
// filter so the server only streams session:terminal events for the
// session we're rendering. Lifecycle/metadata events are sent globally
// regardless of filter (server side).
const _sseParams = new URLSearchParams({ clientId: this._clientId });
if (this.activeSessionId) _sseParams.set('sessions', this.activeSessionId);
this.eventSource = new EventSource(`/api/events?${_sseParams.toString()}`);

// Store all event listeners for cleanup on reconnect
const listeners = [];
Expand Down Expand Up @@ -2424,6 +2458,12 @@ class CodemanApp {
this._cleanupPreviousSession(sessionId);
this.activeSessionId = sessionId;
try { localStorage.setItem('codeman-active-session', sessionId); } catch {}
// Narrow SSE filter to the active session — server stops streaming
// session:terminal events for other sessions to this client. Cuts
// SSE traffic ~Nx for N concurrent sessions. Fire-and-forget; on the
// rare race where server doesn't know our clientId yet, the next
// selectSession or reconnect catches up.
this._updateSseSubscription(sessionId);
this.hideWelcome();
// Clear idle hooks on view, but keep action hooks until user interacts
this.clearPendingHooks(sessionId, 'idle_prompt');
Expand Down
25 changes: 21 additions & 4 deletions src/web/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,9 +579,11 @@ export class WebServer extends EventEmitter {
}

// Parse optional session subscription filter from query parameter.
// /api/events?sessions=id1,id2 — client only receives events for those sessions.
// /api/events (no param) — client receives all events (backwards-compatible).
const query = req.query as { sessions?: string };
// /api/events?sessions=id1,id2 — client only receives session:terminal
// events for those sessions (other events broadcast to all clients).
// /api/events?clientId=<uuid> — enables live filter updates via
// POST /api/events/subscribe without reconnecting.
const query = req.query as { sessions?: string; clientId?: string };
let sessionFilter: Set<string> | null = null;
if (query.sessions) {
const ids = query.sessions
Expand All @@ -592,6 +594,7 @@ export class WebServer extends EventEmitter {
sessionFilter = new Set(ids);
}
}
const clientId = typeof query.clientId === 'string' && query.clientId ? query.clientId : undefined;

reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
Expand All @@ -603,7 +606,7 @@ export class WebServer extends EventEmitter {
// Track tunnel clients — cloudflared proxies locally so req.ip is always
// 127.0.0.1; detect tunnel traffic via Cf-Connecting-Ip header instead.
const isRemote = !!req.headers['cf-connecting-ip'];
this.sse.addClient(reply, sessionFilter, isRemote);
this.sse.addClient(reply, sessionFilter, isRemote, clientId);

// Send initial state
// Use light state for SSE init to avoid sending 2MB+ terminal buffers
Expand All @@ -618,6 +621,20 @@ export class WebServer extends EventEmitter {
});
});

// Live subscription update — change a connected client's session filter
// without forcing an SSE reconnect. Body: { clientId, sessions: string[] | null }
// Empty/null sessions array = remove filter (receive all session:terminal events).
this.app.post('/api/events/subscribe', (req, reply) => {
const body = (req.body || {}) as { clientId?: string; sessions?: string[] | null };
if (!body.clientId || typeof body.clientId !== 'string') {
reply.code(400).send({ error: 'clientId required' });
return;
}
const sessions = Array.isArray(body.sessions) ? body.sessions.filter((s) => typeof s === 'string') : null;
const updated = this.sse.updateClientFilter(body.clientId, sessions);
reply.code(updated ? 204 : 404).send();
});

// Global error handler for structured errors thrown by findSessionOrFail
this.app.setErrorHandler((error, _req, reply) => {
const statusCode = (error as { statusCode?: number }).statusCode ?? 500;
Expand Down
63 changes: 37 additions & 26 deletions src/web/sse-stream-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ export class SseStreamManager {
* or `null` meaning "receive all events" (backwards-compatible default).
*/
private sseClients: Map<FastifyReply, Set<string> | null> = new Map();
/** Optional client-supplied IDs → reply, for live filter updates without reconnecting */
private sseClientsById: Map<string, FastifyReply> = new Map();
/** SSE clients connecting from non-localhost (i.e. through tunnel) */
private remoteSseClients: Set<FastifyReply> = new Set();
/** Clients with backpressure — skip writes until 'drain' fires */
Expand Down Expand Up @@ -103,17 +105,43 @@ export class SseStreamManager {
this._isTunnelActive = active;
}

addClient(reply: FastifyReply, sessionFilter: Set<string> | null, isRemote: boolean): void {
addClient(reply: FastifyReply, sessionFilter: Set<string> | null, isRemote: boolean, clientId?: string): void {
this.sseClients.set(reply, sessionFilter);
if (isRemote) {
this.remoteSseClients.add(reply);
}
if (clientId) {
// If a previous reply registered the same id (reconnect), drop the old one.
const prev = this.sseClientsById.get(clientId);
if (prev && prev !== reply) {
this.sseClients.delete(prev);
this.remoteSseClients.delete(prev);
this.backpressuredClients.delete(prev);
}
this.sseClientsById.set(clientId, reply);
}
}

removeClient(reply: FastifyReply): void {
this.sseClients.delete(reply);
this.remoteSseClients.delete(reply);
this.backpressuredClients.delete(reply);
// Clear any clientId mappings pointing at this reply
for (const [id, r] of this.sseClientsById) {
if (r === reply) this.sseClientsById.delete(id);
}
}

/**
* Update an existing client's session subscription filter without forcing
* an SSE reconnect. Returns true if the client was found and updated.
*/
updateClientFilter(clientId: string, sessions: string[] | null): boolean {
const reply = this.sseClientsById.get(clientId);
if (!reply || !this.sseClients.has(reply)) return false;
const filter = sessions && sessions.length > 0 ? new Set(sessions) : null;
this.sseClients.set(reply, filter);
return true;
}

/** Send a single SSE event to a specific client. */
Expand Down Expand Up @@ -188,35 +216,18 @@ export class SseStreamManager {
console.error(`[Server] Failed to serialize SSE event "${event}":`, err);
return;
}
// Extract sessionId from event data for subscription filtering.
const eventSessionId = this.extractSessionId(event, data);

for (const [client, filter] of this.sseClients) {
// No filter (null) = receive everything. Otherwise, skip if event is
// session-scoped and the session isn't in the client's subscription set.
if (filter && eventSessionId && !filter.has(eventSessionId)) continue;
// Subscription filtering is intentionally NOT applied here. The
// `?sessions=` filter is intended to suppress only the high-volume
// terminal stream — lifecycle/metadata events (session:created,
// session:updated, ralph:*, hook:*, etc.) are needed for correct UI
// state across all sessions even when the client subscribes to a single
// active session's terminal output. Terminal events bypass this method
// entirely (see flushSessionTerminalBatch — it applies the filter).
for (const [client] of this.sseClients) {
this.sendSSEPreformatted(client, message);
}
}

/**
* Extract the session ID from an event's data payload for subscription filtering.
* Returns the sessionId string if the event is session-scoped, or null for global events.
*/
private extractSessionId(event: string, data: unknown): string | null {
if (data == null || typeof data !== 'object') return null;
const record = data as Record<string, unknown>;

// Most session-scoped events use `sessionId`
if (typeof record.sessionId === 'string') return record.sessionId;

// Session lifecycle events (session:*) use `id` from the session state object
if (typeof record.id === 'string' && event.startsWith('session:')) return record.id;

// No session ID found — treat as global event (sent to all clients)
return null;
}

// ========== Terminal Data Batching ==========

// Batch terminal data for better performance (60fps)
Expand Down
55 changes: 28 additions & 27 deletions test/operation-lightspeed.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
*
* Covers:
* - SSE subscription filter edge cases (empty params, whitespace, duplicates)
* - extractSessionId logic (sessionId vs id field, global events)
* - Lifecycle-event broadcast contract (session:*, case:* fan out to all clients;
* only session:terminal is filtered by subscription)
* - Tab switching: terminal buffer loading, session creation + switch
* - Terminal data cap / backpressure recovery
* - Lazy teammate terminal lifecycle
Expand Down Expand Up @@ -254,11 +255,11 @@ describe('Operation Lightspeed', () => {
});

// ═══════════════════════════════════════════════════════════════
// extractSessionId — Event Classification
// Lifecycle Event Broadcast — Event Classification
// ═══════════════════════════════════════════════════════════════

describe('extractSessionId via SSE Filtering', () => {
it('should route session:updated events by id field', async () => {
describe('Lifecycle Event Broadcast Contract', () => {
it('should deliver session:updated events to all clients regardless of filter', async () => {
// Create two sessions
const session1 = await createSession(baseUrl);
const session2 = await createSession(baseUrl);
Expand Down Expand Up @@ -310,21 +311,23 @@ describe('Operation Lightspeed', () => {

const events = parseSSEEvents(receivedData);

// Should receive session:updated for session1 only
// New contract: session:updated is a lifecycle event that broadcasts to ALL clients.
// The subscription filter only applies to session:terminal.
const updatedEvents = events.filter((e) => e.event === 'session:updated');
const session1Updated = updatedEvents.find((e) => (e.data as any).id === session1);
const session2Updated = updatedEvents.find((e) => (e.data as any).id === session2);

expect(session1Updated).toBeDefined();
expect(session2Updated).toBeUndefined();
expect(session2Updated).toBeDefined();

// Cleanup
await deleteSession(baseUrl, session1);
await deleteSession(baseUrl, session2);
});

it('should filter session:deleted by session ID (sessionId extraction from id field)', async () => {
// Tests extractSessionId's fallback path: session:* events use `id` not `sessionId`
it('should deliver session:deleted events to all clients regardless of filter', async () => {
// New contract: lifecycle events (session:*) broadcast to every connected client;
// the per-client filter no longer gates them. Only session:terminal is filtered.
const target = await createSession(baseUrl);
const other = await createSession(baseUrl);

Expand Down Expand Up @@ -367,13 +370,12 @@ describe('Operation Lightspeed', () => {

const events = parseSSEEvents(receivedData);

// Target deletion should arrive (extractSessionId matches `id` field for session:* events)
// Both deletions arrive regardless of the per-client filter
const targetDeleted = events.find((e) => e.event === 'session:deleted' && (e.data as any).id === target);
expect(targetDeleted).toBeDefined();

// Other deletion should NOT arrive
const otherDeleted = events.find((e) => e.event === 'session:deleted' && (e.data as any).id === other);
expect(otherDeleted).toBeUndefined();
expect(otherDeleted).toBeDefined();
});
});

Expand Down Expand Up @@ -488,13 +490,13 @@ describe('Operation Lightspeed', () => {
expect(events.find((e) => e.event === 'init')).toBeDefined();
});

it('should handle multiple SSE clients with different filters', async () => {
it('should fan lifecycle events out to all SSE clients regardless of filter', async () => {
const session1 = await createSession(baseUrl);
const session2 = await createSession(baseUrl);

// Client A: subscribes to session1
// Client B: subscribes to session2
// Client C: no filter (all events)
// Client A: subscribes to session1, Client B: subscribes to session2, Client C: no filter.
// Under the broadcast contract, all three see every session:deleted event — the filter
// only narrows session:terminal traffic.
const controllerA = new AbortController();
const controllerB = new AbortController();
const controllerC = new AbortController();
Expand Down Expand Up @@ -585,15 +587,13 @@ describe('Operation Lightspeed', () => {
const eventsB = parseSSEEvents(dataB);
const eventsC = parseSSEEvents(dataC);

// Client A: sees session1 deleted, not session2
// Every client sees both deletions — lifecycle events are not filter-gated.
expect(eventsA.find((e) => e.event === 'session:deleted' && (e.data as any).id === session1)).toBeDefined();
expect(eventsA.find((e) => e.event === 'session:deleted' && (e.data as any).id === session2)).toBeUndefined();
expect(eventsA.find((e) => e.event === 'session:deleted' && (e.data as any).id === session2)).toBeDefined();

// Client B: sees session2 deleted, not session1
expect(eventsB.find((e) => e.event === 'session:deleted' && (e.data as any).id === session1)).toBeDefined();
expect(eventsB.find((e) => e.event === 'session:deleted' && (e.data as any).id === session2)).toBeDefined();
expect(eventsB.find((e) => e.event === 'session:deleted' && (e.data as any).id === session1)).toBeUndefined();

// Client C: sees both
expect(eventsC.find((e) => e.event === 'session:deleted' && (e.data as any).id === session1)).toBeDefined();
expect(eventsC.find((e) => e.event === 'session:deleted' && (e.data as any).id === session2)).toBeDefined();
});
Expand Down Expand Up @@ -991,13 +991,13 @@ describe('Operation Lightspeed', () => {
});

// ═══════════════════════════════════════════════════════════════
// extractSessionId — Additional Edge Cases
// Lifecycle Event Broadcast — Additional Edge Cases
// ═══════════════════════════════════════════════════════════════

describe('extractSessionId — Edge Cases via SSE', () => {
describe('Lifecycle Event Broadcast — Edge Cases via SSE', () => {
it('should treat non-session: events with id field as global (not filtered)', async () => {
// Events like case:created have an `id` field but aren't session:* events.
// extractSessionId should NOT use the `id` field for non-session:* events.
// Under the broadcast contract they reach every connected client.
const controller = new AbortController();
let receivedData = '';

Expand Down Expand Up @@ -1052,8 +1052,9 @@ describe('Operation Lightspeed', () => {
}
});

it('should deliver session:created for a newly created session to unfiltered client but not mismatched filter', async () => {
// session:created uses `id` field and starts with `session:` — extractSessionId should match it
it('should deliver session:created to every client, even those with a mismatched filter', async () => {
// Under the broadcast contract, lifecycle events ignore the per-client filter.
// A client subscribed only to `existing` still receives `session:created` for `newSession`.
const existing = await createSession(baseUrl);

// Subscribe to existing session only
Expand Down Expand Up @@ -1091,9 +1092,9 @@ describe('Operation Lightspeed', () => {
}

const events = parseSSEEvents(receivedData);
// session:created for newSession should be filtered OUT (id doesn't match our filter)
// session:created reaches the filtered client even though its id doesn't match the filter.
const createdEvent = events.find((e) => e.event === 'session:created' && (e.data as any).id === newSession);
expect(createdEvent).toBeUndefined();
expect(createdEvent).toBeDefined();

await Promise.all([deleteSession(baseUrl, existing), deleteSession(baseUrl, newSession)]);
});
Expand Down