Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 156 additions & 10 deletions packages/engine/src/services/streamingEncoder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,20 @@ const baseOptions: StreamingEncoderOptions = {
useGpu: false,
};

async function resolveWithin<T>(promise: Promise<T>, ms = 100): Promise<T | "timeout"> {
let timeout: ReturnType<typeof setTimeout> | undefined;
try {
return await Promise.race([
promise,
new Promise<"timeout">((resolve) => {
timeout = setTimeout(() => resolve("timeout"), ms);
}),
]);
} finally {
if (timeout) clearTimeout(timeout);
}
}

describe("spawnStreamingEncoder lifecycle and cleanup", () => {
afterEach(() => {
vi.resetModules();
Expand Down Expand Up @@ -556,7 +570,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
const dir = mkdtempSync(join(tmpdir(), "se-writefail-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

expect(encoder.writeFrame(Buffer.from([0]))).toBe(true);
expect(await encoder.writeFrame(Buffer.from([0]))).toBe(true);

const proc = calls[0]!.proc;
await new Promise<void>((resolve) => {
Expand All @@ -566,7 +580,136 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
});
});

expect(encoder.writeFrame(Buffer.from([0]))).toBe(false);
expect(await encoder.writeFrame(Buffer.from([0]))).toBe(false);
});

it("writeFrame waits for stdin drain when FFmpeg applies back-pressure", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-drain-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
proc.stdin.write = (_chunk: Buffer): boolean => false;

const writeResult = encoder.writeFrame(Buffer.from([1])) as unknown;
expect(writeResult).toBeInstanceOf(Promise);

const writePromise = writeResult as Promise<boolean>;
let settled = false;
void writePromise.then(() => {
settled = true;
});

await Promise.resolve();
expect(settled).toBe(false);
expect(proc.stdin.listenerCount("drain")).toBe(1);

proc.stdin.emit("drain");

await expect(writePromise).resolves.toBe(true);
expect(settled).toBe(true);
expect(proc.stdin.listenerCount("drain")).toBe(0);

process.nextTick(() => proc.emit("close", 0));
await encoder.close();
});

it("does not accumulate process close listeners across repeated back-pressured writes", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-drain-listeners-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
const baselineCloseListeners = proc.listenerCount("close");
const baselineDrainListeners = proc.stdin.listenerCount("drain");
proc.stdin.write = (_chunk: Buffer): boolean => false;

for (let i = 0; i < 12; i++) {
const writePromise = encoder.writeFrame(Buffer.from([i]));

await Promise.resolve();
expect(proc.stdin.listenerCount("drain")).toBe(baselineDrainListeners + 1);
expect(proc.listenerCount("close")).toBe(baselineCloseListeners + 1);

proc.stdin.emit("drain");

await expect(writePromise).resolves.toBe(true);
expect(proc.stdin.listenerCount("drain")).toBe(baselineDrainListeners);
expect(proc.listenerCount("close")).toBe(baselineCloseListeners);
}

process.nextTick(() => proc.emit("close", 0));
await encoder.close();
});

it("writeFrame resolves false instead of hanging when FFmpeg exits before drain", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-drain-exit-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
proc.stdin.write = (_chunk: Buffer): boolean => false;

const writeResult = encoder.writeFrame(Buffer.from([1])) as unknown;
expect(writeResult).toBeInstanceOf(Promise);

const writePromise = writeResult as Promise<boolean>;
let settled = false;
void writePromise.then(() => {
settled = true;
});

await Promise.resolve();
expect(settled).toBe(false);
expect(proc.stdin.listenerCount("drain")).toBe(1);

proc.emit("close", 1);

await expect(writePromise).resolves.toBe(false);
expect(settled).toBe(true);
expect(proc.stdin.listenerCount("drain")).toBe(0);

const result = await encoder.close();
expect(result.success).toBe(false);
});

it("writeFrame resolves false when close fires after write returns false before await attaches listeners", async () => {
const { spawn, calls } = createSpawnSpy();
vi.resetModules();
vi.doMock("child_process", () => ({ spawn }));

const { spawnStreamingEncoder } = await import("./streamingEncoder.js");
const dir = mkdtempSync(join(tmpdir(), "se-drain-already-closed-"));
const encoder = await spawnStreamingEncoder(join(dir, "out.mp4"), baseOptions);

const proc = calls[0]!.proc;
const baselineCloseListeners = proc.listenerCount("close");
proc.stdin.write = (_chunk: Buffer): boolean => {
proc.emit("close", 1);
return false;
};

const writePromise = encoder.writeFrame(Buffer.from([1]));

await expect(resolveWithin(writePromise)).resolves.toBe(false);
expect(encoder.getExitStatus()).toBe("error");
expect(proc.stdin.listenerCount("drain")).toBe(0);
expect(proc.listenerCount("close")).toBe(baselineCloseListeners);

const result = await encoder.close();
expect(result.success).toBe(false);
});

it("close() removes the abort listener so a post-close abort does not re-kill ffmpeg", async () => {
Expand Down Expand Up @@ -613,7 +756,7 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
// progressing" capture the encoder must still be alive. The old total-
// render timeout would have fired SIGTERM at ~1000ms.
for (let i = 0; i < 9; i++) {
encoder.writeFrame(Buffer.from([i]));
await encoder.writeFrame(Buffer.from([i]));
vi.advanceTimersByTime(900);
}
expect(proc.kill).not.toHaveBeenCalled();
Expand Down Expand Up @@ -647,14 +790,17 @@ describe("spawnStreamingEncoder lifecycle and cleanup", () => {
const proc = calls[0]!.proc;
proc.stdin.write = (_chunk: Buffer) => false;

// Pump 9 frames at 900ms intervals — all returning false. The reset
// should NOT fire (every write was buffered, not accepted), so the
// 1000ms timer (last reset on spawn) elapses near the start.
for (let i = 0; i < 9; i++) {
encoder.writeFrame(Buffer.from([i]));
vi.advanceTimersByTime(900);
}
// A buffered write should remain pending and must NOT reset the timer.
// The 1000ms timer (last reset on spawn) therefore elapses while the
// caller is correctly back-pressured on the first frame.
const writePromise = encoder.writeFrame(Buffer.from([0]));
await Promise.resolve();

vi.advanceTimersByTime(1100);
expect(proc.kill).toHaveBeenCalledWith("SIGTERM");

proc.emit("close", null);
await expect(writePromise).resolves.toBe(false);
} finally {
vi.useRealTimers();
}
Expand Down
86 changes: 70 additions & 16 deletions packages/engine/src/services/streamingEncoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
* 1. Frame reorder buffer – ensures out-of-order parallel workers feed
* frames to FFmpeg stdin in sequential order.
* 2. Streaming FFmpeg encoder – spawns FFmpeg with `-f image2pipe` and
* exposes a `writeFrame(buffer)` + `close()` API.
* exposes an async `writeFrame(buffer)` + `close()` API.
*/

import { spawn, type ChildProcess } from "child_process";
import { once } from "events";
import { trackChildProcess } from "../utils/processTracker.js";
import { existsSync, mkdirSync, statSync } from "fs";
import { dirname } from "path";
Expand Down Expand Up @@ -126,7 +127,14 @@ export interface StreamingEncoderResult {
}

export interface StreamingEncoder {
writeFrame: (buffer: Buffer) => boolean;
/**
* Write one frame to FFmpeg stdin, awaiting `drain` when the pipe is full
* so back-pressure propagates to the caller. Resolves `false` when FFmpeg
* is already gone. Callers must serialize calls — one in-flight writeFrame
* per encoder (the frame reorder buffer provides this ordering); concurrent
* calls would interleave frame bytes on the pipe and race the drain wait.
*/
writeFrame: (buffer: Buffer) => Promise<boolean>;
close: () => Promise<StreamingEncoderResult>;
getExitStatus: () => "running" | "success" | "error";
}
Expand Down Expand Up @@ -448,9 +456,45 @@ export async function spawnStreamingEncoder(
};
resetTimer();

const waitForDrainOrExit = async (
stdin: NonNullable<ChildProcess["stdin"]>,
): Promise<"drain" | "exit"> => {
// Back-pressure can hit once per frame. Do not race `exitPromise.then(...)`
// here: V8 retains `.then` reaction-list entries on an unsettled promise,
// so a one-hour 30fps render under steady back-pressure can accumulate
// ~108K closures + AbortControllers. Use one-shot listeners for this write
// instead, then abort them in finally. `close` is the event that flips
// `exitStatus`; re-check after listener attachment so a close emitted
// between `stdin.write(false)` and this await cannot hang forever.
const abortController = new AbortController();
try {
const drainPromise = once(stdin, "drain", { signal: abortController.signal }).then(
() => "drain" as const,
);
const closePromise = once(ffmpeg, "close", { signal: abortController.signal }).then(
() => "exit" as const,
);
const racePromise = Promise.race([drainPromise, closePromise]).catch((err: unknown) => {
if (err instanceof Error && err.name === "AbortError") {
return "exit" as const;
}
throw err;
});

if (exitStatus !== "running") {
return "exit";
}

return await racePromise;
} finally {
abortController.abort();
}
};

const encoder: StreamingEncoder = {
writeFrame: (buffer: Buffer): boolean => {
if (exitStatus !== "running" || !ffmpeg.stdin || ffmpeg.stdin.destroyed) {
writeFrame: async (buffer: Buffer): Promise<boolean> => {
const stdin = ffmpeg.stdin;
if (exitStatus !== "running" || !stdin || stdin.destroyed) {
return false;
}
// Copy the buffer before writing — Node streams hold a reference to the
Expand All @@ -459,18 +503,28 @@ export async function spawnStreamingEncoder(
// so without this copy the pipe would read partially-overwritten data
// and flicker.
const copy = Buffer.from(buffer);
const accepted = ffmpeg.stdin.write(copy);
// Reset inactivity timer ONLY on `accepted === true`. `true` means the
// write went through to the kernel pipe without buffering in Node —
// proof FFmpeg is actually consuming. `false` means Node's writable
// stream had to buffer (FFmpeg hasn't drained the pipe yet); we deliberately
// don't reset on `false` so a hung FFmpeg with a still-producing Chrome
// can't keep us alive forever while Node's stdin buffer grows to OOM. In
// steady state with a slower-but-alive FFmpeg, writes alternate between
// true and false as the buffer drains and refills; the trues are enough
// to keep the heartbeat ticking.
if (accepted) resetTimer();
return accepted;
const accepted = stdin.write(copy);
// Reset inactivity timer immediately ONLY on `accepted === true`. `true`
// means the write went through to the kernel pipe without buffering in
// Node — proof FFmpeg is actually consuming. `false` means Node's writable
// stream had to buffer (FFmpeg hasn't drained the pipe yet); we await
// `drain` before letting callers produce the next frame, and only reset
// after drain proves consumption. We deliberately don't reset before
// drain so a hung FFmpeg with a still-producing Chrome can't keep us
// alive forever while Node's stdin buffer grows to OOM. If FFmpeg exits
// before draining, waitForDrainOrExit returns "exit", removes its
// one-shot listeners, and callers see `false` instead of hanging.
if (accepted) {
resetTimer();
return true;
}

const drainResult = await waitForDrainOrExit(stdin);
if (drainResult !== "drain" || exitStatus !== "running") {
return false;
}
resetTimer();
return true;
},

close: async (): Promise<StreamingEncoderResult> => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,19 @@ export async function captureTransitionFrameOnWorker(
}
}

// ─── Streaming-encoder write guard ──────────────────────────────────────────

/**
* Streaming-encoder writes report `false` when FFmpeg is already gone.
* Continuing to capture into a dead encoder wastes the rest of the render,
* so every frame loop stops with a frame-indexed error instead.
*/
export function ensureFrameWritten(frameWritten: boolean, frameIndex: number): void {
if (!frameWritten) {
throw new Error(`Streaming encoder exited before frame ${frameIndex} was written`);
}
}

// ─── HDR video raw-frame cleanup (sequential path only) ────────────────────

export function cleanupEndedHdrVideos(args: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
type LayeredTransitionBuffers,
captureTransitionFrameOnWorker,
distributeLayeredHybridFrameRanges,
ensureFrameWritten,
partitionTransitionFrames,
} from "./captureHdrFrameShared.js";
import { updateJobStatus } from "../shared.js";
Expand Down Expand Up @@ -185,7 +186,7 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise
const writeEncoded = async (frameIdx: number, buf: Buffer): Promise<void> => {
await reorderBuffer.waitForFrame(frameIdx);
const writeStart = Date.now();
hdrEncoder.writeFrame(buf);
ensureFrameWritten(await hdrEncoder.writeFrame(buf), frameIdx);
addHdrTiming(hdrPerf, "encoderWriteMs", writeStart);
reorderBuffer.advanceTo(frameIdx + 1);
framesWritten += 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { writeFileExclusiveSync } from "../shared.js";
import {
captureSceneIntoBuffer,
cleanupEndedHdrVideos,
ensureFrameWritten,
type LayeredTransitionBuffers,
} from "./captureHdrFrameShared.js";
import { updateJobStatus } from "../shared.js";
Expand Down Expand Up @@ -189,7 +190,7 @@ export async function runSequentialLayeredFrameLoop(input: SequentialLoopInput):
);
addHdrTiming(hdrPerf, "transitionCompositeMs", transitionTimingStart);
timingStart = Date.now();
hdrEncoder.writeFrame(transitionBuffers.output);
ensureFrameWritten(await hdrEncoder.writeFrame(transitionBuffers.output), i);
addHdrTiming(hdrPerf, "encoderWriteMs", timingStart);
} else {
if (hdrPerf) hdrPerf.normalFrames += 1;
Expand All @@ -206,7 +207,7 @@ export async function runSequentialLayeredFrameLoop(input: SequentialLoopInput):
);
}
timingStart = Date.now();
hdrEncoder.writeFrame(normalCanvas);
ensureFrameWritten(await hdrEncoder.writeFrame(normalCanvas), i);
addHdrTiming(hdrPerf, "encoderWriteMs", timingStart);
}

Expand Down
Loading
Loading