diff --git a/packages/opencode/src/server/routes/global.ts b/packages/opencode/src/server/routes/global.ts index 4d019f6a7ee..bc503c11fc0 100644 --- a/packages/opencode/src/server/routes/global.ts +++ b/packages/opencode/src/server/routes/global.ts @@ -69,33 +69,51 @@ export const GlobalRoutes = lazy(() => c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - stream.writeSSE({ - data: JSON.stringify({ - payload: { - type: "server.connected", - properties: {}, - }, - }), - }) - async function handler(event: any) { - await stream.writeSSE({ - data: JSON.stringify(event), - }) - } - GlobalBus.on("event", handler) - - // Send heartbeat every 10s to prevent stalled proxy streams. - const heartbeat = setInterval(() => { + try { stream.writeSSE({ data: JSON.stringify({ payload: { - type: "server.heartbeat", + type: "server.connected", properties: {}, }, }), }) + } catch { + log.info("global event disconnected (initial write failed)") + return + } + + // Heartbeat must be declared before handler to avoid confusion + const heartbeat = setInterval(() => { + try { + stream.writeSSE({ + data: JSON.stringify({ + payload: { + type: "server.heartbeat", + properties: {}, + }, + }), + }) + } catch { + clearInterval(heartbeat) + GlobalBus.off("event", handler) + log.info("global event disconnected (heartbeat failed)") + } }, 10_000) + async function handler(event: any) { + try { + await stream.writeSSE({ + data: JSON.stringify(event), + }) + } catch { + clearInterval(heartbeat) + GlobalBus.off("event", handler) + log.info("global event disconnected (write failed)") + } + } + GlobalBus.on("event", handler) + await new Promise((resolve) => { stream.onAbort(() => { clearInterval(heartbeat) diff --git a/packages/opencode/src/server/server.ts b/packages/opencode/src/server/server.ts index 9fba9c1fe1a..239373e8df9 100644 --- a/packages/opencode/src/server/server.ts +++ b/packages/opencode/src/server/server.ts @@ -504,31 +504,49 @@ export namespace Server { c.header("X-Accel-Buffering", "no") c.header("X-Content-Type-Options", "nosniff") return streamSSE(c, async (stream) => { - stream.writeSSE({ - data: JSON.stringify({ - type: "server.connected", - properties: {}, - }), - }) - const unsub = Bus.subscribeAll(async (event) => { - await stream.writeSSE({ - data: JSON.stringify(event), - }) - if (event.type === Bus.InstanceDisposed.type) { - stream.close() - } - }) - - // Send heartbeat every 10s to prevent stalled proxy streams. - const heartbeat = setInterval(() => { + try { stream.writeSSE({ data: JSON.stringify({ - type: "server.heartbeat", + type: "server.connected", properties: {}, }), }) + } catch { + log.info("event disconnected (initial write failed)") + return + } + + // Heartbeat must be declared before subscription to avoid confusion + const heartbeat = setInterval(() => { + try { + stream.writeSSE({ + data: JSON.stringify({ + type: "server.heartbeat", + properties: {}, + }), + }) + } catch { + clearInterval(heartbeat) + unsub() + log.info("event disconnected (heartbeat failed)") + } }, 10_000) + const unsub = Bus.subscribeAll(async (event) => { + try { + await stream.writeSSE({ + data: JSON.stringify(event), + }) + if (event.type === Bus.InstanceDisposed.type) { + stream.close() + } + } catch { + clearInterval(heartbeat) + unsub() + log.info("event disconnected (write failed)") + } + }) + await new Promise((resolve) => { stream.onAbort(() => { clearInterval(heartbeat)