Skip to content

Commit 92d0841

Browse files
committed
fix(redis-worker): clear MollifierDrainer.stop() timeout timer when loop wins the race
The Promise.race between this.loopPromise and this.delay(timeoutMs) discarded the timeout's underlying setTimeout handle whenever the loop branch won. The discarded timer was still ref'd by libuv and pinned the Node event loop alive for the remainder of `timeoutMs` — exactly the shutdown slack the timeout was supposed to bound. Inline the timer in stop() with a captured handle and clearTimeout() it in a finally block, so every exit path (loop-won, timeout-won, throw) releases the ref. The in-loop delay() calls are unchanged — they're awaited normally and their timers fire-and-clear themselves.
1 parent ee474b5 commit 92d0841

1 file changed

Lines changed: 23 additions & 9 deletions

File tree

packages/redis-worker/src/mollifier/drainer.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -130,16 +130,30 @@ export class MollifierDrainer<TPayload = unknown> {
130130
await this.loopPromise;
131131
return;
132132
}
133+
// Hold the timer handle so we can clearTimeout() it after the race.
134+
// Without this, when the loop wins the race, the discarded timer is
135+
// still ref'd and pins the Node event loop for up to `timeoutMs`,
136+
// delaying process shutdown by exactly the slack we were trying to
137+
// bound. try/finally clears the handle in every exit path (loop-won,
138+
// timeout-won, or exception).
133139
const timeoutSentinel = Symbol("mollifier.stop.timeout");
134-
const winner = await Promise.race([
135-
this.loopPromise.then(() => "done" as const),
136-
this.delay(options.timeoutMs).then(() => timeoutSentinel),
137-
]);
138-
if (winner === timeoutSentinel) {
139-
this.logger.warn(
140-
"MollifierDrainer.stop: deadline exceeded; returning while loop iteration is in flight",
141-
{ timeoutMs: options.timeoutMs },
142-
);
140+
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
141+
const timeoutPromise = new Promise<typeof timeoutSentinel>((resolve) => {
142+
timeoutHandle = setTimeout(() => resolve(timeoutSentinel), options.timeoutMs);
143+
});
144+
try {
145+
const winner = await Promise.race([
146+
this.loopPromise.then(() => "done" as const),
147+
timeoutPromise,
148+
]);
149+
if (winner === timeoutSentinel) {
150+
this.logger.warn(
151+
"MollifierDrainer.stop: deadline exceeded; returning while loop iteration is in flight",
152+
{ timeoutMs: options.timeoutMs },
153+
);
154+
}
155+
} finally {
156+
if (timeoutHandle) clearTimeout(timeoutHandle);
143157
}
144158
}
145159

0 commit comments

Comments
 (0)