Skip to content

Commit 1022923

Browse files
committed
fix(mollifier): keep drainer loop alive across transient redis errors
processOneFromEnv now catches buffer.pop() failures so one env's hiccup doesn't reject Promise.all and bubble up to the loop's outer catch. The polling loop itself wraps each runOnce in try/catch and backs off with capped exponential delay (up to 5s) instead of exiting permanently on the first listEnvs/pop error. Stop semantics are unchanged: only the stopping flag breaks the loop. Adds two regression tests using a stub buffer (no Redis container) so fault injection is deterministic.
1 parent d75c10a commit 1022923

3 files changed

Lines changed: 150 additions & 7 deletions

File tree

.changeset/mollifier-redis-worker-primitives.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@
33
---
44

55
Add MollifierBuffer (with `accept`, `pop`, `ack`, `requeue`, `fail`, and `evaluateTrip`) and MollifierDrainer primitives for trigger burst smoothing. `evaluateTrip` is an atomic Lua sliding-window trip evaluator used by the webapp gate to detect per-env trigger bursts. Phase 1 wires MollifierBuffer dual-write monitoring alongside the real trigger path and runs MollifierDrainer's pop/ack loop end-to-end with a no-op handler; full buffering and replayed drainer-side triggers land in later phases.
6+
7+
MollifierDrainer's polling loop now survives transient Redis errors. `processOneFromEnv` catches `buffer.pop()` failures so one env's hiccup doesn't poison the rest of the batch, and the loop wraps each `runOnce` in a try/catch with capped exponential backoff (up to 5s) instead of dying permanently on the first `listEnvs`/`pop` error.

packages/redis-worker/src/mollifier/drainer.test.ts

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { redisTest } from "@internal/testcontainers";
2-
import { describe, expect, vi } from "vitest";
2+
import { describe, expect, it, vi } from "vitest";
33
import { Logger } from "@trigger.dev/core/logger";
44
import { MollifierBuffer } from "./buffer.js";
55
import { MollifierDrainer } from "./drainer.js";
@@ -217,6 +217,115 @@ describe("MollifierDrainer error handling", () => {
217217
});
218218
});
219219

220+
// Transient Redis errors used to permanently kill the loop because
221+
// `processOneFromEnv` didn't catch `buffer.pop()` rejections — the error
222+
// bubbled through `Promise.all` → `runOnce` → `loop`'s outer catch and
223+
// left `isRunning = false`. These tests use a stubbed buffer (no Redis
224+
// container) so we can deterministically inject failures from `listEnvs`
225+
// and `pop` without racing against a real client.
226+
describe("MollifierDrainer resilience to transient buffer errors", () => {
227+
type StubBuffer = Partial<MollifierBuffer> & { [K in keyof MollifierBuffer]?: any };
228+
229+
function makeStubBuffer(overrides: StubBuffer): MollifierBuffer {
230+
const base: StubBuffer = {
231+
listEnvs: async () => [],
232+
pop: async () => null,
233+
ack: async () => {},
234+
requeue: async () => {},
235+
fail: async () => true,
236+
getEntry: async () => null,
237+
close: async () => {},
238+
};
239+
return { ...base, ...overrides } as unknown as MollifierBuffer;
240+
}
241+
242+
it("survives a transient listEnvs failure and resumes draining", async () => {
243+
let listCalls = 0;
244+
const popped: string[] = [];
245+
const buffer = makeStubBuffer({
246+
listEnvs: async () => {
247+
listCalls += 1;
248+
if (listCalls === 1) {
249+
throw new Error("simulated redis blip");
250+
}
251+
return ["env_a"];
252+
},
253+
pop: async () => {
254+
const runId = `run_${popped.length + 1}`;
255+
if (popped.length >= 2) return null;
256+
popped.push(runId);
257+
return {
258+
runId,
259+
envId: "env_a",
260+
orgId: "org_1",
261+
payload: "{}",
262+
attempts: 0,
263+
createdAt: new Date(),
264+
} as any;
265+
},
266+
});
267+
268+
const handled: string[] = [];
269+
const drainer = new MollifierDrainer({
270+
buffer,
271+
handler: async (input) => {
272+
handled.push(input.runId);
273+
},
274+
concurrency: 1,
275+
maxAttempts: 3,
276+
isRetryable: () => false,
277+
pollIntervalMs: 20,
278+
logger: new Logger("test-drainer", "log"),
279+
});
280+
281+
drainer.start();
282+
const deadline = Date.now() + 3_000;
283+
while (handled.length < 2 && Date.now() < deadline) {
284+
await new Promise((r) => setTimeout(r, 20));
285+
}
286+
await drainer.stop({ timeoutMs: 1_000 });
287+
288+
expect(handled).toEqual(["run_1", "run_2"]);
289+
expect(listCalls).toBeGreaterThan(1);
290+
});
291+
292+
it("a pop failure for one env doesn't poison the rest of the batch", async () => {
293+
const buffer = makeStubBuffer({
294+
listEnvs: async () => ["bad", "good"],
295+
pop: async (envId: string) => {
296+
if (envId === "bad") {
297+
throw new Error("simulated pop failure on bad env");
298+
}
299+
return {
300+
runId: "run_good",
301+
envId: "good",
302+
orgId: "org_1",
303+
payload: "{}",
304+
attempts: 0,
305+
createdAt: new Date(),
306+
} as any;
307+
},
308+
});
309+
310+
const handled: string[] = [];
311+
const drainer = new MollifierDrainer({
312+
buffer,
313+
handler: async (input) => {
314+
handled.push(input.runId);
315+
},
316+
concurrency: 5,
317+
maxAttempts: 3,
318+
isRetryable: () => false,
319+
logger: new Logger("test-drainer", "log"),
320+
});
321+
322+
const result = await drainer.runOnce();
323+
expect(result.drained).toBe(1);
324+
expect(result.failed).toBe(1);
325+
expect(handled).toEqual(["run_good"]);
326+
});
327+
});
328+
220329
describe("MollifierDrainer.start/stop", () => {
221330
redisTest("start polls and processes, stop halts the loop", { timeout: 20_000 }, async ({ redisContainer }) => {
222331
const buffer = new MollifierBuffer({

packages/redis-worker/src/mollifier/drainer.ts

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,21 +90,43 @@ export class MollifierDrainer<TPayload = unknown> {
9090
}
9191
}
9292

93+
// Transient Redis errors (e.g. a connection blip in `listEnvs` or `pop`)
94+
// must not kill the polling loop permanently. We log each `runOnce`
95+
// failure, back off so we don't spin tight on a sustained outage, and
96+
// resume. The loop only exits when `stop()` flips `stopping`.
9397
private async loop(): Promise<void> {
9498
try {
99+
let consecutiveErrors = 0;
95100
while (!this.stopping) {
96-
const result = await this.runOnce();
97-
if (result.drained === 0 && result.failed === 0) {
98-
await this.delay(this.pollIntervalMs);
101+
try {
102+
const result = await this.runOnce();
103+
consecutiveErrors = 0;
104+
if (result.drained === 0 && result.failed === 0) {
105+
await this.delay(this.pollIntervalMs);
106+
}
107+
} catch (err) {
108+
consecutiveErrors += 1;
109+
this.logger.error("MollifierDrainer.runOnce failed; backing off", {
110+
err,
111+
consecutiveErrors,
112+
});
113+
await this.delay(this.backoffMs(consecutiveErrors));
99114
}
100115
}
101-
} catch (err) {
102-
this.logger.error("MollifierDrainer loop crashed", { err });
103116
} finally {
104117
this.isRunning = false;
105118
}
106119
}
107120

121+
// Exponential backoff capped at 5s. Keeps the loop responsive after a
122+
// brief blip while preventing a tight retry loop during a long Redis
123+
// outage. 1 → 200ms, 2 → 400ms, 3 → 800ms, 4 → 1.6s, 5 → 3.2s, 6+ → 5s.
124+
private backoffMs(consecutiveErrors: number): number {
125+
const base = Math.max(this.pollIntervalMs, 100);
126+
const capped = Math.min(base * 2 ** (consecutiveErrors - 1), 5_000);
127+
return capped;
128+
}
129+
108130
private delay(ms: number): Promise<void> {
109131
return new Promise((resolve) => setTimeout(resolve, ms));
110132
}
@@ -115,8 +137,18 @@ export class MollifierDrainer<TPayload = unknown> {
115137
return [...envs.slice(start), ...envs.slice(0, start)];
116138
}
117139

140+
// A `pop()` failure for one env (e.g. a Redis hiccup mid-batch) must not
141+
// poison the rest of the batch — `Promise.all` would otherwise reject and
142+
// bubble all the way to `loop()`. Catch here so the failed env is just
143+
// counted as "failed" for this tick and we move on.
118144
private async processOneFromEnv(envId: string): Promise<"drained" | "failed" | "empty"> {
119-
const entry = await this.buffer.pop(envId);
145+
let entry: BufferEntry | null;
146+
try {
147+
entry = await this.buffer.pop(envId);
148+
} catch (err) {
149+
this.logger.error("MollifierDrainer.pop failed", { envId, err });
150+
return "failed";
151+
}
120152
if (!entry) return "empty";
121153
return this.processEntry(entry);
122154
}

0 commit comments

Comments
 (0)