Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion src/lib/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,17 @@ export class Container<Env = Cloudflare.Env> extends DurableObject<Env> {
// Renew the activity timeout whenever a request is proxied
this.renewActivityTimeout();
const res = await tcpPort.fetch(containerUrl, request);

// Track WebSocket connections via a proxy so we detect close
if ((res as { webSocket?: WebSocket }).webSocket) {
return this.proxyWebSocket(res);
}

// Track streaming HTTP responses so we detect when the body finishes
if (res.body) {
return this.trackResponseBody(res);
}

return res;
} catch (e) {
if (!(e instanceof Error)) {
Expand Down Expand Up @@ -782,6 +793,12 @@ export class Container<Env = Cloudflare.Env> extends DurableObject<Env> {

private sleepAfterMs = 0;

// Tracks in-flight HTTP responses and WebSocket connections in isolate memory.
// Prevents the container from sleeping while requests are still being streamed.
// If the isolate dies, the counter resets to 0 — which is correct since
// all in-flight work dies with it.
private inFlightRequestCount = 0;

// ==========================
// GENERAL HELPERS
// ==========================
Expand Down Expand Up @@ -1137,7 +1154,7 @@ export class Container<Env = Cloudflare.Env> extends DurableObject<Env> {
return;
}

if (this.isActivityExpired()) {
if (this.isActivityExpired() && this.inFlightRequestCount === 0) {
await this.onActivityExpired();
// renewActivityTimeout makes sure we don't spam calls here
this.renewActivityTimeout();
Expand Down Expand Up @@ -1279,6 +1296,93 @@ export class Container<Env = Cloudflare.Env> extends DurableObject<Env> {
return this.toSchedule(schedule);
}

// =======================================
// IN-FLIGHT REQUEST/WS TRACKING
// =======================================

/**
* Wraps an HTTP response body with a TransformStream so we can detect
* when the stream finishes. Increments the in-flight counter while the
* body is being read, and renews the activity timeout when it completes.
*
* The TransformStream is just acting as a pass-through here; we don't
* transform anything. We're only using it to get a hook into when the
* stream finishes via pipeTo().finally().
*/
private trackResponseBody(res: Response): Response {
const { readable, writable } = new TransformStream();
this.inFlightRequestCount++;
res.body!.pipeTo(writable).finally(() => {
this.inFlightRequestCount--;
this.renewActivityTimeout();
});
return new Response(readable, res);
}

/**
* Creates a WebSocketPair that proxies between the client and the container,
* so the DO can track when the connection closes and renew activity on messages.
*
* NOTE: Unlike HTTP streaming where pipeTo moves data at the runtime level,
* this proxy reads every WebSocket frame into the JS heap before forwarding it.
* There's no pipeTo equivalent for WebSockets today. If one is added to the
* runtime, we should switch to it to avoid the overhead on high-throughput
* connections.
*/
private proxyWebSocket(res: Response): Response {
const containerWs = (res as unknown as { webSocket: WebSocket }).webSocket;
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);

containerWs.accept();
server.accept();

this.inFlightRequestCount++;
let closed = false;

const cleanup = (code?: number, reason?: string) => {
if (closed) return;
closed = true;
this.inFlightRequestCount--;
this.renewActivityTimeout();
try {
containerWs.close(code, reason);
} catch {}
try {
server.close(code, reason);
} catch {}
};

// Proxy messages bidirectionally, renewing activity on each frame
containerWs.addEventListener('message', (event: MessageEvent) => {
this.renewActivityTimeout();
try {
server.send(event.data as string | ArrayBuffer);
} catch {
cleanup(1011, 'Failed to forward message to client');
}
});

server.addEventListener('message', (event: MessageEvent) => {
this.renewActivityTimeout();
try {
containerWs.send(event.data as string | ArrayBuffer);
} catch {
cleanup(1011, 'Failed to forward message to container');
}
});

containerWs.addEventListener('close', (event: CloseEvent) => cleanup(event.code, event.reason));
server.addEventListener('close', (event: CloseEvent) => cleanup(event.code, event.reason));
containerWs.addEventListener('error', () => cleanup(1011, 'Container WebSocket error'));
server.addEventListener('error', () => cleanup(1011, 'Client WebSocket error'));

return new Response(null, {
status: 101,
webSocket: client,
} as ResponseInit);
}

private isActivityExpired(): boolean {
return this.sleepAfterMs <= Date.now();
}
Expand Down
Loading