From 8b5f476e299f60256fb74d3b6cc76cbb57a97ef1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel?= Date: Fri, 12 Jun 2026 10:46:16 -0400 Subject: [PATCH] fix(engine): real back-pressure in StreamingEncoder.writeFrame MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit writeFrame returned the stdin.write boolean synchronously; when FFmpeg encoded slower than workers captured, Node's writable buffer grew without bound (multi-worker worst case ~80GB over a 1h render) until the kernel OOM-killed the process. writeFrame is now async: a buffered write awaits the drain event before resolving, so back-pressure propagates through the frame reorder buffer to the capture loops and in-flight frames stay bounded. Inactivity-timer semantics are preserved: no reset before drain, so a hung FFmpeg still trips SIGTERM. The drain wait races one-shot drain/close listeners (aborted in a finally) rather than chaining onto the shared exit promise — V8 retains reaction-list entries on unsettled promises, so per-frame .then chains would accumulate ~108K closures over a 1h back-pressured render. An exit-status re-check after listener attachment closes the close-before-attach hang window. All five writeFrame call sites (streaming stage and HDR loops) check the result via a shared ensureFrameWritten guard and stop the render with a frame-indexed error when the encoder is gone instead of discarding the boolean. The MULTI_WORKER_MAX_DURATION_SECONDS cap can be relaxed in a follow-up now that buffering is bounded. Fixes #1353 --- .../src/services/streamingEncoder.test.ts | 166 ++++++++++++++++-- .../engine/src/services/streamingEncoder.ts | 86 +++++++-- .../render/stages/captureHdrFrameShared.ts | 13 ++ .../render/stages/captureHdrHybridLoop.ts | 3 +- .../render/stages/captureHdrSequentialLoop.ts | 5 +- .../render/stages/captureStreamingStage.ts | 5 +- 6 files changed, 247 insertions(+), 31 deletions(-) diff --git a/packages/engine/src/services/streamingEncoder.test.ts b/packages/engine/src/services/streamingEncoder.test.ts index b5c61a9b2..a193f653d 100644 --- a/packages/engine/src/services/streamingEncoder.test.ts +++ b/packages/engine/src/services/streamingEncoder.test.ts @@ -418,6 +418,20 @@ const baseOptions: StreamingEncoderOptions = { useGpu: false, }; +async function resolveWithin(promise: Promise, ms = 100): Promise { + let timeout: ReturnType | undefined; + try { + return await Promise.race([ + promise, + new Promise<"timeout">((resolve) => { + timeout = setTimeout(() => resolve("timeout"), ms); + }), + ]); + } finally { + if (timeout) clearTimeout(timeout); + } +} + describe("spawnStreamingEncoder lifecycle and cleanup", () => { afterEach(() => { vi.resetModules(); @@ -556,7 +570,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => { const dir = mkdtempSync(join(tmpdir(), "se-writefail-")); const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions); - expect(encoder.writeFrame(Buffer.from([0]))).toBe(true); + expect(await encoder.writeFrame(Buffer.from([0]))).toBe(true); const proc = calls[0]!.proc; await new Promise((resolve) => { @@ -566,7 +580,136 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => { }); }); - expect(encoder.writeFrame(Buffer.from([0]))).toBe(false); + expect(await encoder.writeFrame(Buffer.from([0]))).toBe(false); + }); + + it("writeFrame waits for stdin drain when FFmpeg applies back-pressure", async () => { + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { spawnStreamingEncoder } = await import("./streamingEncoder.js"); + const dir = mkdtempSync(join(tmpdir(), "se-drain-")); + const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions); + + const proc = calls[0]!.proc; + proc.stdin.write = (_chunk: Buffer): boolean => false; + + const writeResult = encoder.writeFrame(Buffer.from([1])) as unknown; + expect(writeResult).toBeInstanceOf(Promise); + + const writePromise = writeResult as Promise; + let settled = false; + void writePromise.then(() => { + settled = true; + }); + + await Promise.resolve(); + expect(settled).toBe(false); + expect(proc.stdin.listenerCount("drain")).toBe(1); + + proc.stdin.emit("drain"); + + await expect(writePromise).resolves.toBe(true); + expect(settled).toBe(true); + expect(proc.stdin.listenerCount("drain")).toBe(0); + + process.nextTick(() => proc.emit("close", 0)); + await encoder.close(); + }); + + it("does not accumulate process close listeners across repeated back-pressured writes", async () => { + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { spawnStreamingEncoder } = await import("./streamingEncoder.js"); + const dir = mkdtempSync(join(tmpdir(), "se-drain-listeners-")); + const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions); + + const proc = calls[0]!.proc; + const baselineCloseListeners = proc.listenerCount("close"); + const baselineDrainListeners = proc.stdin.listenerCount("drain"); + proc.stdin.write = (_chunk: Buffer): boolean => false; + + for (let i = 0; i < 12; i++) { + const writePromise = encoder.writeFrame(Buffer.from([i])); + + await Promise.resolve(); + expect(proc.stdin.listenerCount("drain")).toBe(baselineDrainListeners + 1); + expect(proc.listenerCount("close")).toBe(baselineCloseListeners + 1); + + proc.stdin.emit("drain"); + + await expect(writePromise).resolves.toBe(true); + expect(proc.stdin.listenerCount("drain")).toBe(baselineDrainListeners); + expect(proc.listenerCount("close")).toBe(baselineCloseListeners); + } + + process.nextTick(() => proc.emit("close", 0)); + await encoder.close(); + }); + + it("writeFrame resolves false instead of hanging when FFmpeg exits before drain", async () => { + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { spawnStreamingEncoder } = await import("./streamingEncoder.js"); + const dir = mkdtempSync(join(tmpdir(), "se-drain-exit-")); + const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions); + + const proc = calls[0]!.proc; + proc.stdin.write = (_chunk: Buffer): boolean => false; + + const writeResult = encoder.writeFrame(Buffer.from([1])) as unknown; + expect(writeResult).toBeInstanceOf(Promise); + + const writePromise = writeResult as Promise; + let settled = false; + void writePromise.then(() => { + settled = true; + }); + + await Promise.resolve(); + expect(settled).toBe(false); + expect(proc.stdin.listenerCount("drain")).toBe(1); + + proc.emit("close", 1); + + await expect(writePromise).resolves.toBe(false); + expect(settled).toBe(true); + expect(proc.stdin.listenerCount("drain")).toBe(0); + + const result = await encoder.close(); + expect(result.success).toBe(false); + }); + + it("writeFrame resolves false when close fires after write returns false before await attaches listeners", async () => { + const { spawn, calls } = createSpawnSpy(); + vi.resetModules(); + vi.doMock("child_process", () => ({ spawn })); + + const { spawnStreamingEncoder } = await import("./streamingEncoder.js"); + const dir = mkdtempSync(join(tmpdir(), "se-drain-already-closed-")); + const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions); + + const proc = calls[0]!.proc; + const baselineCloseListeners = proc.listenerCount("close"); + proc.stdin.write = (_chunk: Buffer): boolean => { + proc.emit("close", 1); + return false; + }; + + const writePromise = encoder.writeFrame(Buffer.from([1])); + + await expect(resolveWithin(writePromise)).resolves.toBe(false); + expect(encoder.getExitStatus()).toBe("error"); + expect(proc.stdin.listenerCount("drain")).toBe(0); + expect(proc.listenerCount("close")).toBe(baselineCloseListeners); + + const result = await encoder.close(); + expect(result.success).toBe(false); }); it("close() removes the abort listener so a post-close abort does not re-kill ffmpeg", async () => { @@ -613,7 +756,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => { // progressing" capture the encoder must still be alive. The old total- // render timeout would have fired SIGTERM at ~1000ms. for (let i = 0; i < 9; i++) { - encoder.writeFrame(Buffer.from([i])); + await encoder.writeFrame(Buffer.from([i])); vi.advanceTimersByTime(900); } expect(proc.kill).not.toHaveBeenCalled(); @@ -647,14 +790,17 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => { const proc = calls[0]!.proc; proc.stdin.write = (_chunk: Buffer) => false; - // Pump 9 frames at 900ms intervals — all returning false. The reset - // should NOT fire (every write was buffered, not accepted), so the - // 1000ms timer (last reset on spawn) elapses near the start. - for (let i = 0; i < 9; i++) { - encoder.writeFrame(Buffer.from([i])); - vi.advanceTimersByTime(900); - } + // A buffered write should remain pending and must NOT reset the timer. + // The 1000ms timer (last reset on spawn) therefore elapses while the + // caller is correctly back-pressured on the first frame. + const writePromise = encoder.writeFrame(Buffer.from([0])); + await Promise.resolve(); + + vi.advanceTimersByTime(1100); expect(proc.kill).toHaveBeenCalledWith("SIGTERM"); + + proc.emit("close", null); + await expect(writePromise).resolves.toBe(false); } finally { vi.useRealTimers(); } diff --git a/packages/engine/src/services/streamingEncoder.ts b/packages/engine/src/services/streamingEncoder.ts index 8dcd7219c..ebb6f1a8f 100644 --- a/packages/engine/src/services/streamingEncoder.ts +++ b/packages/engine/src/services/streamingEncoder.ts @@ -10,10 +10,11 @@ * 1. Frame reorder buffer – ensures out-of-order parallel workers feed * frames to FFmpeg stdin in sequential order. * 2. Streaming FFmpeg encoder – spawns FFmpeg with `-f image2pipe` and - * exposes a `writeFrame(buffer)` + `close()` API. + * exposes an async `writeFrame(buffer)` + `close()` API. */ import { spawn, type ChildProcess } from "child_process"; +import { once } from "events"; import { trackChildProcess } from "../utils/processTracker.js"; import { existsSync, mkdirSync, statSync } from "fs"; import { dirname } from "path"; @@ -126,7 +127,14 @@ export interface StreamingEncoderResult { } export interface StreamingEncoder { - writeFrame: (buffer: Buffer) => boolean; + /** + * Write one frame to FFmpeg stdin, awaiting `drain` when the pipe is full + * so back-pressure propagates to the caller. Resolves `false` when FFmpeg + * is already gone. Callers must serialize calls — one in-flight writeFrame + * per encoder (the frame reorder buffer provides this ordering); concurrent + * calls would interleave frame bytes on the pipe and race the drain wait. + */ + writeFrame: (buffer: Buffer) => Promise; close: () => Promise; getExitStatus: () => "running" | "success" | "error"; } @@ -448,9 +456,45 @@ export async function spawnStreamingEncoder( }; resetTimer(); + const waitForDrainOrExit = async ( + stdin: NonNullable, + ): Promise<"drain" | "exit"> => { + // Back-pressure can hit once per frame. Do not race `exitPromise.then(...)` + // here: V8 retains `.then` reaction-list entries on an unsettled promise, + // so a one-hour 30fps render under steady back-pressure can accumulate + // ~108K closures + AbortControllers. Use one-shot listeners for this write + // instead, then abort them in finally. `close` is the event that flips + // `exitStatus`; re-check after listener attachment so a close emitted + // between `stdin.write(false)` and this await cannot hang forever. + const abortController = new AbortController(); + try { + const drainPromise = once(stdin, "drain", { signal: abortController.signal }).then( + () => "drain" as const, + ); + const closePromise = once(ffmpeg, "close", { signal: abortController.signal }).then( + () => "exit" as const, + ); + const racePromise = Promise.race([drainPromise, closePromise]).catch((err: unknown) => { + if (err instanceof Error && err.name === "AbortError") { + return "exit" as const; + } + throw err; + }); + + if (exitStatus !== "running") { + return "exit"; + } + + return await racePromise; + } finally { + abortController.abort(); + } + }; + const encoder: StreamingEncoder = { - writeFrame: (buffer: Buffer): boolean => { - if (exitStatus !== "running" || !ffmpeg.stdin || ffmpeg.stdin.destroyed) { + writeFrame: async (buffer: Buffer): Promise => { + const stdin = ffmpeg.stdin; + if (exitStatus !== "running" || !stdin || stdin.destroyed) { return false; } // Copy the buffer before writing — Node streams hold a reference to the @@ -459,18 +503,28 @@ export async function spawnStreamingEncoder( // so without this copy the pipe would read partially-overwritten data // and flicker. const copy = Buffer.from(buffer); - const accepted = ffmpeg.stdin.write(copy); - // Reset inactivity timer ONLY on `accepted === true`. `true` means the - // write went through to the kernel pipe without buffering in Node — - // proof FFmpeg is actually consuming. `false` means Node's writable - // stream had to buffer (FFmpeg hasn't drained the pipe yet); we deliberately - // don't reset on `false` so a hung FFmpeg with a still-producing Chrome - // can't keep us alive forever while Node's stdin buffer grows to OOM. In - // steady state with a slower-but-alive FFmpeg, writes alternate between - // true and false as the buffer drains and refills; the trues are enough - // to keep the heartbeat ticking. - if (accepted) resetTimer(); - return accepted; + const accepted = stdin.write(copy); + // Reset inactivity timer immediately ONLY on `accepted === true`. `true` + // means the write went through to the kernel pipe without buffering in + // Node — proof FFmpeg is actually consuming. `false` means Node's writable + // stream had to buffer (FFmpeg hasn't drained the pipe yet); we await + // `drain` before letting callers produce the next frame, and only reset + // after drain proves consumption. We deliberately don't reset before + // drain so a hung FFmpeg with a still-producing Chrome can't keep us + // alive forever while Node's stdin buffer grows to OOM. If FFmpeg exits + // before draining, waitForDrainOrExit returns "exit", removes its + // one-shot listeners, and callers see `false` instead of hanging. + if (accepted) { + resetTimer(); + return true; + } + + const drainResult = await waitForDrainOrExit(stdin); + if (drainResult !== "drain" || exitStatus !== "running") { + return false; + } + resetTimer(); + return true; }, close: async (): Promise => { diff --git a/packages/producer/src/services/render/stages/captureHdrFrameShared.ts b/packages/producer/src/services/render/stages/captureHdrFrameShared.ts index 862141733..8c2b923cf 100644 --- a/packages/producer/src/services/render/stages/captureHdrFrameShared.ts +++ b/packages/producer/src/services/render/stages/captureHdrFrameShared.ts @@ -301,6 +301,19 @@ export async function captureTransitionFrameOnWorker( } } +// ─── Streaming-encoder write guard ────────────────────────────────────────── + +/** + * Streaming-encoder writes report `false` when FFmpeg is already gone. + * Continuing to capture into a dead encoder wastes the rest of the render, + * so every frame loop stops with a frame-indexed error instead. + */ +export function ensureFrameWritten(frameWritten: boolean, frameIndex: number): void { + if (!frameWritten) { + throw new Error(`Streaming encoder exited before frame ${frameIndex} was written`); + } +} + // ─── HDR video raw-frame cleanup (sequential path only) ──────────────────── export function cleanupEndedHdrVideos(args: { diff --git a/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts b/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts index 3b3e8644c..7b0fbaeea 100644 --- a/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts +++ b/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts @@ -54,6 +54,7 @@ import { type LayeredTransitionBuffers, captureTransitionFrameOnWorker, distributeLayeredHybridFrameRanges, + ensureFrameWritten, partitionTransitionFrames, } from "./captureHdrFrameShared.js"; import { updateJobStatus } from "../shared.js"; @@ -185,7 +186,7 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise const writeEncoded = async (frameIdx: number, buf: Buffer): Promise => { await reorderBuffer.waitForFrame(frameIdx); const writeStart = Date.now(); - hdrEncoder.writeFrame(buf); + ensureFrameWritten(await hdrEncoder.writeFrame(buf), frameIdx); addHdrTiming(hdrPerf, "encoderWriteMs", writeStart); reorderBuffer.advanceTo(frameIdx + 1); framesWritten += 1; diff --git a/packages/producer/src/services/render/stages/captureHdrSequentialLoop.ts b/packages/producer/src/services/render/stages/captureHdrSequentialLoop.ts index b372359b2..883cd2945 100644 --- a/packages/producer/src/services/render/stages/captureHdrSequentialLoop.ts +++ b/packages/producer/src/services/render/stages/captureHdrSequentialLoop.ts @@ -34,6 +34,7 @@ import { writeFileExclusiveSync } from "../shared.js"; import { captureSceneIntoBuffer, cleanupEndedHdrVideos, + ensureFrameWritten, type LayeredTransitionBuffers, } from "./captureHdrFrameShared.js"; import { updateJobStatus } from "../shared.js"; @@ -189,7 +190,7 @@ export async function runSequentialLayeredFrameLoop(input: SequentialLoopInput): ); addHdrTiming(hdrPerf, "transitionCompositeMs", transitionTimingStart); timingStart = Date.now(); - hdrEncoder.writeFrame(transitionBuffers.output); + ensureFrameWritten(await hdrEncoder.writeFrame(transitionBuffers.output), i); addHdrTiming(hdrPerf, "encoderWriteMs", timingStart); } else { if (hdrPerf) hdrPerf.normalFrames += 1; @@ -206,7 +207,7 @@ export async function runSequentialLayeredFrameLoop(input: SequentialLoopInput): ); } timingStart = Date.now(); - hdrEncoder.writeFrame(normalCanvas); + ensureFrameWritten(await hdrEncoder.writeFrame(normalCanvas), i); addHdrTiming(hdrPerf, "encoderWriteMs", timingStart); } diff --git a/packages/producer/src/services/render/stages/captureStreamingStage.ts b/packages/producer/src/services/render/stages/captureStreamingStage.ts index ac68348e8..fca554558 100644 --- a/packages/producer/src/services/render/stages/captureStreamingStage.ts +++ b/packages/producer/src/services/render/stages/captureStreamingStage.ts @@ -60,6 +60,7 @@ import type { FileServerHandle } from "../../fileServer.js"; import type { ProducerLogger } from "../../../logger.js"; import type { ProgressCallback, RenderJob } from "../../renderOrchestrator.js"; import { wrapCaptureStageError } from "../captureStageError.js"; +import { ensureFrameWritten } from "./captureHdrFrameShared.js"; import { updateJobStatus } from "../shared.js"; /** @@ -195,7 +196,7 @@ export async function runCaptureStreamingStage( const onFrameBuffer = async (frameIndex: number, buffer: Buffer): Promise => { await reorderBuffer.waitForFrame(frameIndex); - currentEncoder.writeFrame(buffer); + ensureFrameWritten(await currentEncoder.writeFrame(buffer), frameIndex); reorderBuffer.advanceTo(frameIndex + 1); }; @@ -263,7 +264,7 @@ export async function runCaptureStreamingStage( const time = (i * job.config.fps.den) / job.config.fps.num; const { buffer } = await captureFrameToBuffer(session, i, time); await reorderBuffer.waitForFrame(i); - currentEncoder.writeFrame(buffer); + ensureFrameWritten(await currentEncoder.writeFrame(buffer), i); reorderBuffer.advanceTo(i + 1); job.framesRendered = i + 1;