From 3ba7bb39f809daed3bd0ad9c7a6e1bbb6b537f1d Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 25 Feb 2026 19:33:06 -0600 Subject: [PATCH] fix(opencode): handle client disconnect in SSE stream writes When running opencode serve with external clients, SSE stream writes would fail with 'Unexpected EOF' when clients disconnect abruptly. These errors were not caught, causing the server to enter a corrupted state where subsequent requests would also fail. This fix wraps all writeSSE calls in try/catch to gracefully handle client disconnects by: - Cleaning up the event subscription - Clearing the heartbeat interval - Logging the disconnection This ensures the server remains stable after client disconnects. --- packages/opencode/src/server/routes/global.ts | 54 ++++++++++++------- packages/opencode/src/server/server.ts | 54 ++++++++++++------- 2 files changed, 72 insertions(+), 36 deletions(-) diff --git a/packages/opencode/src/server/routes/global.ts b/packages/opencode/src/server/routes/global.ts index 4d019f6a7ee..bc503c11fc0 100644 --- a/packages/opencode/src/server/routes/global.ts +++ b/packages/opencode/src/server/routes/global.ts @@ -69,33 +69,51 @@ export const GlobalRoutes = lazy(() => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - stream.writeSSE({ - data: JSON.stringify({ - payload: { - type: "server.connected", - properties: {}, - }, - }), - }) - async function handler(event: any) { - await stream.writeSSE({ - data: JSON.stringify(event), - }) - } - GlobalBus.on("event", handler) - - // Send heartbeat every 10s to prevent stalled proxy streams. - const heartbeat = setInterval(() => { + try { stream.writeSSE({ data: JSON.stringify({ payload: { - type: "server.heartbeat", + type: "server.connected", properties: {}, }, }), }) + } catch { + log.info("global event disconnected (initial write failed)") + return + } + + // Heartbeat must be declared before handler to avoid confusion + const heartbeat = setInterval(() => { + try { + stream.writeSSE({ + data: JSON.stringify({ + payload: { + type: "server.heartbeat", + properties: {}, + }, + }), + }) + } catch { + clearInterval(heartbeat) + GlobalBus.off("event", handler) + log.info("global event disconnected (heartbeat failed)") + } }, 10_000) + async function handler(event: any) { + try { + await stream.writeSSE({ + data: JSON.stringify(event), + }) + } catch { + clearInterval(heartbeat) + GlobalBus.off("event", handler) + log.info("global event disconnected (write failed)") + } + } + GlobalBus.on("event", handler) + await new Promise((resolve) => { stream.onAbort(() => { clearInterval(heartbeat) diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index 9fba9c1fe1a..239373e8df9 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -504,31 +504,49 @@ export namespace Server { c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - stream.writeSSE({ - data: JSON.stringify({ - type: "server.connected", - properties: {}, - }), - }) - const unsub = Bus.subscribeAll(async (event) => { - await stream.writeSSE({ - data: JSON.stringify(event), - }) - if (event.type === Bus.InstanceDisposed.type) { - stream.close() - } - }) - - // Send heartbeat every 10s to prevent stalled proxy streams. - const heartbeat = setInterval(() => { + try { stream.writeSSE({ data: JSON.stringify({ - type: "server.heartbeat", + type: "server.connected", properties: {}, }), }) + } catch { + log.info("event disconnected (initial write failed)") + return + } + + // Heartbeat must be declared before subscription to avoid confusion + const heartbeat = setInterval(() => { + try { + stream.writeSSE({ + data: JSON.stringify({ + type: "server.heartbeat", + properties: {}, + }), + }) + } catch { + clearInterval(heartbeat) + unsub() + log.info("event disconnected (heartbeat failed)") + } }, 10_000) + const unsub = Bus.subscribeAll(async (event) => { + try { + await stream.writeSSE({ + data: JSON.stringify(event), + }) + if (event.type === Bus.InstanceDisposed.type) { + stream.close() + } + } catch { + clearInterval(heartbeat) + unsub() + log.info("event disconnected (write failed)") + } + }) + await new Promise((resolve) => { stream.onAbort(() => { clearInterval(heartbeat)