Skip to content

Commit 331bf2e

Browse files
committed
feat(supervisor): add BackpressureMonitor for dequeue gating
Cached, fail-open monitor that decides whether to skip dequeues based on a pluggable signal source. Disabled is a total no-op (no refresh, no reads). The hot-path read is synchronous and never performs I/O; every failure mode (source throws, returns null, or verdict goes stale) fails open.
1 parent 64151d6 commit 331bf2e

2 files changed

Lines changed: 232 additions & 0 deletions

File tree

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
2+
import { BackpressureMonitor, type BackpressureSignalSource } from "./backpressureMonitor.js";
3+
4+
function countingSource(verdict: { engaged: boolean } | null): {
5+
source: BackpressureSignalSource;
6+
reads: () => number;
7+
} {
8+
let reads = 0;
9+
return {
10+
source: {
11+
read: async () => {
12+
reads++;
13+
return verdict;
14+
},
15+
},
16+
reads: () => reads,
17+
};
18+
}
19+
20+
describe("BackpressureMonitor", () => {
21+
beforeEach(() => {
22+
vi.useFakeTimers();
23+
});
24+
25+
afterEach(() => {
26+
vi.useRealTimers();
27+
});
28+
29+
it("when disabled, never skips dequeue and never reads the signal source", () => {
30+
// Even though the source would report "engaged", a disabled monitor must be
31+
// a complete no-op: this is the backwards-compatibility guarantee.
32+
const { source, reads } = countingSource({ engaged: true });
33+
const monitor = new BackpressureMonitor({ enabled: false, source });
34+
35+
monitor.start();
36+
37+
expect(monitor.shouldSkipDequeue()).toBe(false);
38+
expect(reads()).toBe(0);
39+
40+
monitor.stop();
41+
});
42+
43+
it("when enabled and the source reports engaged, skips dequeue after a refresh", async () => {
44+
const { source } = countingSource({ engaged: true });
45+
const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 });
46+
47+
monitor.start();
48+
await vi.advanceTimersByTimeAsync(0); // flush the initial async read
49+
50+
expect(monitor.shouldSkipDequeue()).toBe(true);
51+
52+
monitor.stop();
53+
});
54+
55+
it("when enabled and the source reports clear, does not skip dequeue", async () => {
56+
const { source } = countingSource({ engaged: false });
57+
const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 });
58+
59+
monitor.start();
60+
await vi.advanceTimersByTimeAsync(0);
61+
62+
expect(monitor.shouldSkipDequeue()).toBe(false);
63+
64+
monitor.stop();
65+
});
66+
67+
it("fails open (stops skipping) when the source throws", async () => {
68+
let call = 0;
69+
const source: BackpressureSignalSource = {
70+
read: async () => {
71+
call++;
72+
if (call === 1) {
73+
return { engaged: true };
74+
}
75+
throw new Error("signal source unreachable");
76+
},
77+
};
78+
const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 });
79+
80+
monitor.start();
81+
await vi.advanceTimersByTimeAsync(0);
82+
expect(monitor.shouldSkipDequeue()).toBe(true); // engaged from the first read
83+
84+
await vi.advanceTimersByTimeAsync(1000); // next refresh throws
85+
expect(monitor.shouldSkipDequeue()).toBe(false); // fail-open: a dead source must not pin the brake
86+
87+
monitor.stop();
88+
});
89+
90+
it("fails open when the source reports unknown (null)", async () => {
91+
const { source } = countingSource(null);
92+
const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 });
93+
94+
monitor.start();
95+
await vi.advanceTimersByTimeAsync(0);
96+
97+
expect(monitor.shouldSkipDequeue()).toBe(false);
98+
99+
monitor.stop();
100+
});
101+
102+
it("fails open when the cached verdict goes stale (older than max age)", async () => {
103+
// Source stops updating (e.g. hangs) after the first read; the verdict ages out.
104+
const source: BackpressureSignalSource = {
105+
read: async () => ({ engaged: true, ts: Date.now() }),
106+
};
107+
const monitor = new BackpressureMonitor({
108+
enabled: true,
109+
source,
110+
refreshIntervalMs: 1_000_000, // effectively only the initial read fires
111+
maxVerdictAgeMs: 15_000,
112+
});
113+
114+
monitor.start();
115+
await vi.advanceTimersByTimeAsync(0);
116+
expect(monitor.shouldSkipDequeue()).toBe(true);
117+
118+
await vi.advanceTimersByTimeAsync(15_001); // verdict now older than max age
119+
expect(monitor.shouldSkipDequeue()).toBe(false);
120+
121+
monitor.stop();
122+
});
123+
124+
it("does not read the source on the hot path (reads are driven by the refresh tick)", async () => {
125+
const { source, reads } = countingSource({ engaged: true });
126+
const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 });
127+
128+
monitor.start();
129+
await vi.advanceTimersByTimeAsync(0);
130+
expect(reads()).toBe(1); // just the initial refresh
131+
132+
for (let i = 0; i < 1000; i++) {
133+
monitor.shouldSkipDequeue();
134+
}
135+
136+
expect(reads()).toBe(1); // hot-path calls performed zero I/O
137+
138+
monitor.stop();
139+
});
140+
141+
it("stops refreshing after stop()", async () => {
142+
const { source, reads } = countingSource({ engaged: true });
143+
const monitor = new BackpressureMonitor({ enabled: true, source, refreshIntervalMs: 1000 });
144+
145+
monitor.start();
146+
await vi.advanceTimersByTimeAsync(0);
147+
const readsAtStop = reads();
148+
149+
monitor.stop();
150+
await vi.advanceTimersByTimeAsync(5000);
151+
152+
expect(reads()).toBe(readsAtStop);
153+
});
154+
});
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
export type BackpressureVerdict = {
2+
engaged: boolean;
3+
/** Epoch ms the verdict was produced. Used for consumer-side staleness fail-open. */
4+
ts?: number;
5+
};
6+
7+
/**
8+
* Source of the current backpressure verdict. `read()` returns `null` when the
9+
* verdict is unknown (missing/unreadable) - the monitor treats unknown as
10+
* "not engaged" (fail-open).
11+
*/
12+
export interface BackpressureSignalSource {
13+
read(): Promise<BackpressureVerdict | null>;
14+
}
15+
16+
export type BackpressureMonitorOptions = {
17+
enabled: boolean;
18+
source: BackpressureSignalSource;
19+
refreshIntervalMs?: number;
20+
/**
21+
* If set, a cached verdict older than this is treated as unknown (fail-open).
22+
* Guards against the source silently going stale (e.g. hanging reads).
23+
*/
24+
maxVerdictAgeMs?: number;
25+
};
26+
27+
const DEFAULT_REFRESH_INTERVAL_MS = 1000;
28+
29+
export class BackpressureMonitor {
30+
private verdict: BackpressureVerdict | null = null;
31+
private timer?: ReturnType<typeof setInterval>;
32+
33+
constructor(private readonly opts: BackpressureMonitorOptions) {}
34+
35+
start(): void {
36+
if (!this.opts.enabled) {
37+
return;
38+
}
39+
40+
void this.refresh();
41+
this.timer = setInterval(
42+
() => void this.refresh(),
43+
this.opts.refreshIntervalMs ?? DEFAULT_REFRESH_INTERVAL_MS
44+
);
45+
}
46+
47+
stop(): void {
48+
if (this.timer) {
49+
clearInterval(this.timer);
50+
this.timer = undefined;
51+
}
52+
}
53+
54+
/** Hot-path read: synchronous, never performs I/O. */
55+
shouldSkipDequeue(): boolean {
56+
const verdict = this.verdict;
57+
if (verdict?.engaged !== true) {
58+
return false;
59+
}
60+
61+
const maxAge = this.opts.maxVerdictAgeMs;
62+
if (maxAge !== undefined && verdict.ts !== undefined && Date.now() - verdict.ts > maxAge) {
63+
return false;
64+
}
65+
66+
return true;
67+
}
68+
69+
private async refresh(): Promise<void> {
70+
try {
71+
this.verdict = await this.opts.source.read();
72+
} catch {
73+
// Fail-open: a dead/unreachable source must never pin the brake. Treat as
74+
// unknown (no verdict) so dequeue resumes as if backpressure were off.
75+
this.verdict = null;
76+
}
77+
}
78+
}

0 commit comments

Comments
 (0)