Skip to content

Commit 1b82875

Browse files
committed
feat(webapp): add delivery-lag and health metrics to the realtime backend
New metrics covering the questions an operator asks during rollout: an end-to-end delivery-lag histogram per emission path, a backstop outcome split (sustained delivered counts mean live wakes are being missed), publish success/failure and coalesce-ratio counters on the pub/sub side, held-feed and active-environment gauges as the capacity signal, concurrency-limiter rejections, replay-buffer evictions split by cause, and a rows-per-emission histogram that exposes full-set re-emission regressions.
1 parent 46a0cbb commit 1b82875

7 files changed

Lines changed: 162 additions & 11 deletions

apps/webapp/app/services/realtime/envChangeRouter.server.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ export type EnvChangeRouterOptions = {
4848
unsubscribeLingerMs?: number;
4949
/** Observability: a replay scan found candidates and delivered rows (or none survived). */
5050
onReplay?: (result: "delivered" | "empty") => void;
51+
/** Observability: a buffered record was evicted. `cap` evictions mean the env churns more
52+
* runs inside the window than the buffer holds (the replay guarantee is degrading). */
53+
onReplayEviction?: (reason: "cap" | "window") => void;
5154
};
5255

5356
const DEFAULT_REPLAY_WINDOW_MS = 2_000;
@@ -210,6 +213,17 @@ export class EnvChangeRouter {
210213
return this.#envs.size;
211214
}
212215

216+
/** Currently-held feeds by kind (for metrics) — the system's capacity unit. */
217+
get heldFeedCounts(): { run: number; tag: number; batch: number } {
218+
const counts = { run: 0, tag: 0, batch: 0 };
219+
for (const env of this.#envs.values()) {
220+
for (const feed of env.feeds) {
221+
counts[feed.filter.kind]++;
222+
}
223+
}
224+
return counts;
225+
}
226+
213227
#ensureEnv(environmentId: string): EnvState {
214228
const existing = this.#envs.get(environmentId);
215229
if (existing) {
@@ -282,6 +296,7 @@ export class EnvChangeRouter {
282296
if (entry.receivedAtMs >= cutoff && env.recent.size <= maxRuns) {
283297
break;
284298
}
299+
this.options.onReplayEviction?.(entry.receivedAtMs < cutoff ? "window" : "cap");
285300
env.recent.delete(runId);
286301
}
287302
}

apps/webapp/app/services/realtime/nativeRealtimeClient.server.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,14 @@ export type NativeRealtimeClientOptions = {
116116
onRunSetQuery?: (stage: "resolve" | "hydrate", ms: number) => void;
117117
/** Observability hook: a fresh resolve waited `ms` for an admission permit (only when the gate engaged). */
118118
onResolveAdmissionWait?: (ms: number) => void;
119+
/** Observability hook: a live emission left the server — lag is now minus the newest
120+
* emitted row's updatedAt (the end-to-end delivery SLI), rowCount the delta size. */
121+
onEmit?: (path: LivePollPath, lagMs: number, rowCount: number) => void;
122+
/** Observability hook: a backstop resolve found missed changes (delivered) or nothing
123+
* (empty). Sustained `delivered` means the notify/replay path is leaking. */
124+
onBackstopResult?: (result: "delivered" | "empty") => void;
125+
/** Observability hook: a poll was rejected by the per-env concurrency limiter (429). */
126+
onConcurrencyRejected?: () => void;
119127
};
120128

121129
const DEFAULT_CONCURRENCY_LIMIT = 100_000;
@@ -414,6 +422,7 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
414422
const probed = await this.options.runReader.getRunById(environment.id, runId);
415423
if (probed && probed.updatedAt.getTime() > lastSeenMs) {
416424
const seq = this.#nextSeq();
425+
this.options.onEmit?.("cold-resolve", Date.now() - probed.updatedAt.getTime(), 1);
417426
return this.#buildResponse(
418427
buildUpdateBody(probed, skipColumns),
419428
apiVersion,
@@ -450,6 +459,7 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
450459
const updatedAtMs = matched.row.updatedAt.getTime();
451460
if (updatedAtMs > lastSeenMs) {
452461
const seq = this.#nextSeq();
462+
this.options.onEmit?.("fast-hydrate", Date.now() - updatedAtMs, 1);
453463
return this.#buildResponse(
454464
buildRowsBodyFromSerialized([
455465
{ runId: matched.row.id, value: matched.value, operation: "update" },
@@ -469,6 +479,8 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
469479
const row = await this.options.runReader.getRunById(environment.id, runId);
470480
const seq = this.#nextSeq();
471481
if (row && row.updatedAt.getTime() > lastSeenMs) {
482+
this.options.onBackstopResult?.("delivered");
483+
this.options.onEmit?.("full-resolve", Date.now() - row.updatedAt.getTime(), 1);
472484
return this.#buildResponse(
473485
buildUpdateBody(row, skipColumns),
474486
apiVersion,
@@ -480,6 +492,7 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
480492
}
481493
);
482494
}
495+
this.options.onBackstopResult?.("empty");
483496
return this.#buildResponse(buildUpToDateBody(), apiVersion, clientVersion, {
484497
offset,
485498
handle,
@@ -601,6 +614,7 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
601614
this.#workingSetCache.set(workingSetKey, touched);
602615
prevSeen = touched;
603616
if (changes.length > 0) {
617+
this.options.onEmit?.("cold-resolve", Date.now() - maxUpdatedAt, changes.length);
604618
return emitFromRows(changes, maxUpdatedAt);
605619
}
606620
continue; // nothing was missed — hold as usual
@@ -633,6 +647,7 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
633647
prevSeen = merged;
634648

635649
if (changes.length > 0) {
650+
this.options.onEmit?.("fast-hydrate", Date.now() - maxUpdatedAt, changes.length);
636651
return emitFromSerialized(changes, maxUpdatedAt);
637652
}
638653
// Matched but no row advanced (already seen). Keep holding.
@@ -655,10 +670,13 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
655670
prevSeen = touched;
656671

657672
if (changes.length > 0) {
673+
this.options.onBackstopResult?.("delivered");
674+
this.options.onEmit?.("full-resolve", Date.now() - maxUpdatedAt, changes.length);
658675
return emitFromRows(changes, maxUpdatedAt);
659676
}
660677
// Empty backstop diff: timeout returns up-to-date; (holdOnEmpty never reaches
661678
// here on a notify — those are handled in the fast path above).
679+
this.options.onBackstopResult?.("empty");
662680
return emitUpToDate(maxUpdatedAt);
663681
}
664682
} finally {
@@ -946,6 +964,7 @@ export class NativeRealtimeClient implements RealtimeStreamClient {
946964
);
947965

948966
if (!canProceed) {
967+
this.options.onConcurrencyRejected?.();
949968
return json({ error: "Too many concurrent requests" }, { status: 429 });
950969
}
951970

apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,41 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
6363
registers: [metricsRegister],
6464
});
6565

66+
const deliveryLagMs = new Histogram({
67+
name: "realtime_native_delivery_lag_ms",
68+
help: "Live emissions: now minus the newest emitted row's updatedAt (PG clock vs app clock, so approximate). The end-to-end delivery SLI — a p99 near the backstop hold means wakes are being missed.",
69+
labelNames: ["path"] as const,
70+
buckets: [5, 10, 25, 50, 100, 250, 500, 1_000, 2_500, 5_000, 10_000, 30_000],
71+
registers: [metricsRegister],
72+
});
73+
74+
const emittedRows = new Histogram({
75+
name: "realtime_native_emitted_rows",
76+
help: "Rows per live emission. Deltas should be small; a fat tail means working-set/offset-floor fallbacks are re-emitting full sets.",
77+
buckets: [1, 2, 5, 10, 25, 50, 100, 250, 1_000],
78+
registers: [metricsRegister],
79+
});
80+
81+
const backstops = new Counter({
82+
name: "realtime_native_backstop_total",
83+
help: "Backstop full resolves by outcome. 'empty' is normal idle behavior; sustained 'delivered' means the notify/replay path missed changes — alert on it.",
84+
labelNames: ["result"] as const,
85+
registers: [metricsRegister],
86+
});
87+
88+
const concurrencyRejections = new Counter({
89+
name: "realtime_native_concurrency_rejections_total",
90+
help: "Polls rejected (429) by the per-env concurrency limiter.",
91+
registers: [metricsRegister],
92+
});
93+
94+
const replayEvictions = new Counter({
95+
name: "realtime_native_replay_evictions_total",
96+
help: "Replay-buffer evictions. 'window' expiry is normal; 'cap' means an env churns more runs inside the window than the buffer holds (replay guarantee degrading — retune the knobs).",
97+
labelNames: ["reason"] as const,
98+
registers: [metricsRegister],
99+
});
100+
66101
const limiter = new RealtimeConcurrencyLimiter({
67102
keyPrefix: "tr:realtime:native:concurrency",
68103
redis: {
@@ -90,6 +125,7 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
90125
replayMaxRunsPerEnv: env.REALTIME_BACKEND_NATIVE_REPLAY_MAX_RUNS,
91126
unsubscribeLingerMs: env.REALTIME_BACKEND_NATIVE_UNSUBSCRIBE_LINGER_MS,
92127
onReplay: (result) => replays.inc({ result }),
128+
onReplayEviction: (reason) => replayEvictions.inc({ reason }),
93129
});
94130

95131
const client = new NativeRealtimeClient({
@@ -128,6 +164,12 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
128164
onRunSetResolve: (result) => runSetResolves.inc({ result }),
129165
onRunSetQuery: (stage, ms) => runSetQueryMs.observe({ stage }, ms),
130166
onResolveAdmissionWait: () => resolveAdmissionWaits.inc(),
167+
onEmit: (path, lagMs, rowCount) => {
168+
deliveryLagMs.observe({ path }, Math.max(lagMs, 0));
169+
emittedRows.observe(rowCount);
170+
},
171+
onBackstopResult: (result) => backstops.inc({ result }),
172+
onConcurrencyRejected: () => concurrencyRejections.inc(),
131173
});
132174

133175
new Gauge({
@@ -148,6 +190,28 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
148190
},
149191
});
150192

193+
new Gauge({
194+
name: "realtime_native_held_feeds",
195+
help: "Long-polls currently held, by feed kind — the system's capacity unit.",
196+
labelNames: ["kind"] as const,
197+
registers: [metricsRegister],
198+
collect() {
199+
const counts = router.heldFeedCounts;
200+
this.set({ kind: "run" }, counts.run);
201+
this.set({ kind: "tag" }, counts.tag);
202+
this.set({ kind: "batch" }, counts.batch);
203+
},
204+
});
205+
206+
new Gauge({
207+
name: "realtime_native_active_envs",
208+
help: "Environments currently routed on this instance (held feeds + lingering subscriptions).",
209+
registers: [metricsRegister],
210+
collect() {
211+
this.set(router.activeEnvCount);
212+
},
213+
});
214+
151215
return client;
152216
}
153217

apps/webapp/app/services/realtime/runChangeNotifier.server.ts

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ export type RunChangeNotifierOptions = {
6060
envWakeCoalesceWindowMs?: number;
6161
/** Use Redis sharded pub/sub (SSUBSCRIBE/SPUBLISH); cluster-only and requires `clusterOptions.shardedSubscribers`. Defaults to false (classic). */
6262
shardedPubSub?: boolean;
63+
/** Observability hook: a publish settled (ok) or failed (the leading degradation signal). */
64+
onPublishResult?: (ok: boolean) => void;
65+
/** Observability hook: a raw channel message arrived (pre-coalesce). */
66+
onMessageReceived?: () => void;
67+
/** Observability hook: a coalesced batch was delivered to listeners (records per batch). */
68+
onBatchDelivered?: (recordCount: number) => void;
6369
};
6470

6571
const DEFAULT_CHANNEL_PREFIX = "realtime:";
@@ -115,15 +121,22 @@ export class RunChangeNotifier {
115121
const result = this.#sharded
116122
? publisher.spublish(channel, payload)
117123
: publisher.publish(channel, payload);
118-
if (typeof (result as Promise<number>)?.catch === "function") {
119-
(result as Promise<number>).catch((error) => {
120-
logger.error("[runChangeNotifier] Failed to publish run-changed notification", {
121-
error,
122-
channel,
123-
});
124-
});
124+
if (typeof (result as Promise<number>)?.then === "function") {
125+
(result as Promise<number>).then(
126+
() => this.options.onPublishResult?.(true),
127+
(error) => {
128+
this.options.onPublishResult?.(false);
129+
logger.error("[runChangeNotifier] Failed to publish run-changed notification", {
130+
error,
131+
channel,
132+
});
133+
}
134+
);
135+
} else {
136+
this.options.onPublishResult?.(true);
125137
}
126138
} catch (error) {
139+
this.options.onPublishResult?.(false);
127140
logger.error("[runChangeNotifier] Failed to publish run-changed notification", {
128141
error,
129142
channel,
@@ -241,6 +254,7 @@ export class RunChangeNotifier {
241254
}
242255

243256
#onMessage(channel: string, message: string) {
257+
this.options.onMessageReceived?.();
244258
// Accumulate the decoded record (deduped by runId) before delivering, so a coalesced
245259
// wake carries every run that moved during the window.
246260
this.#addPending(channel, decodeChangeRecord(message));
@@ -274,6 +288,7 @@ export class RunChangeNotifier {
274288
if (!listeners || batch.length === 0) {
275289
return;
276290
}
291+
this.options.onBatchDelivered?.(batch.length);
277292
for (const onBatch of [...listeners]) {
278293
onBatch(batch);
279294
}

apps/webapp/app/services/realtime/runChangeNotifierInstance.server.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Gauge } from "prom-client";
1+
import { Counter, Gauge } from "prom-client";
22
import { env } from "~/env.server";
33
import { metricsRegister } from "~/metrics.server";
44
import { singleton } from "~/utils/singleton";
@@ -16,6 +16,25 @@ function initializeRunChangeNotifier(): RunChangeNotifier {
1616
// broadcast every message to every node, so this is what actually shards load.
1717
const shardedPubSub = clusterMode && env.REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_SHARDED_ENABLED === "1";
1818

19+
const publishes = new Counter({
20+
name: "realtime_run_change_notifier_publishes_total",
21+
help: "Change-record publishes by outcome. Failures are the leading indicator that feeds are degrading to their backstops (pub/sub Redis trouble).",
22+
labelNames: ["result"] as const,
23+
registers: [metricsRegister],
24+
});
25+
26+
const received = new Counter({
27+
name: "realtime_run_change_notifier_messages_received_total",
28+
help: "Raw channel messages received by this instance's subscriber, pre-coalesce.",
29+
registers: [metricsRegister],
30+
});
31+
32+
const delivered = new Counter({
33+
name: "realtime_run_change_notifier_batches_delivered_total",
34+
help: "Coalesced batches delivered to listeners. received/batches = the coalesce ratio (how hard a busy env is being collapsed).",
35+
registers: [metricsRegister],
36+
});
37+
1938
const notifier = new RunChangeNotifier({
2039
redis: {
2140
host: env.REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_HOST,
@@ -29,6 +48,9 @@ function initializeRunChangeNotifier(): RunChangeNotifier {
2948
},
3049
envWakeCoalesceWindowMs: env.REALTIME_BACKEND_NATIVE_ENV_WAKE_COALESCE_WINDOW_MS,
3150
shardedPubSub,
51+
onPublishResult: (ok) => publishes.inc({ result: ok ? "ok" : "error" }),
52+
onMessageReceived: () => received.inc(),
53+
onBatchDelivered: () => delivered.inc(),
3254
});
3355

3456
new Gauge({

apps/webapp/test/realtime/envChangeRouter.test.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,11 @@ describe("EnvChangeRouter", () => {
317317
["r2", row("r2")],
318318
["r3", row("r3")],
319319
]);
320-
const { router, src, hydrateSpy } = makeRouter(rows, { replayMaxRunsPerEnv: 2 });
320+
const evictions: string[] = [];
321+
const { router, src, hydrateSpy } = makeRouter(rows, {
322+
replayMaxRunsPerEnv: 2,
323+
onReplayEviction: (reason: string) => evictions.push(reason),
324+
});
321325
const reg = router.register("env_1", { kind: "batch", batchId: "batch_1" }, []);
322326
src.push("env_1", [
323327
record("r1", { batchId: "batch_1" }),
@@ -329,6 +333,7 @@ describe("EnvChangeRouter", () => {
329333
expect(result.reason).toBe("notify");
330334
// r1 was evicted by the cap; only the newest two replay.
331335
expect(hydrateSpy).toHaveBeenCalledWith("env_1", ["r2", "r3"], []);
336+
expect(evictions).toEqual(["cap"]);
332337
reg.close();
333338
});
334339
});

apps/webapp/test/realtime/nativeHoldOnEmpty.test.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ const isUpToDate = (body: Awaited<ReturnType<typeof bodyOf>>) =>
120120

121121
describe("NativeRealtimeClient multi-run live path over the router", () => {
122122
it("a matching change hydrates by id (no ClickHouse) and returns a delta", async () => {
123-
const { client, src, hydrateSpy, resolveSpy, setRows } = makeClient();
123+
const emits: Array<[string, number, number]> = [];
124+
const { client, src, hydrateSpy, resolveSpy, setRows } = makeClient({
125+
onEmit: (path: string, lagMs: number, rows: number) => emits.push([path, lagMs, rows]),
126+
});
124127
setRows([row("run_1", FLOOR_MS + 5_000, { tags: ["t"] })]);
125128

126129
const responsePromise = liveRuns(client);
@@ -132,6 +135,9 @@ describe("NativeRealtimeClient multi-run live path over the router", () => {
132135
expect(hasRowOp(await bodyOf(res))).toBe(true);
133136
expect(resolveSpy).not.toHaveBeenCalled(); // ClickHouse skipped
134137
expect(hydrateSpy).toHaveBeenCalledWith("env_1", ["run_1"], expect.anything());
138+
expect(emits).toHaveLength(1);
139+
expect(emits[0][0]).toBe("fast-hydrate");
140+
expect(emits[0][2]).toBe(1); // one delta row
135141
});
136142

137143
it("a change that doesn't match the filter never wakes the feed (no CH, no PG); a later match does", async () => {
@@ -175,11 +181,16 @@ describe("NativeRealtimeClient multi-run live path over the router", () => {
175181
});
176182

177183
it("the backstop timeout does a full ClickHouse resolve and returns up-to-date", async () => {
178-
const { client, resolveSpy } = makeClient({ livePollTimeoutMs: 50 });
184+
const backstopResults: string[] = [];
185+
const { client, resolveSpy } = makeClient({
186+
livePollTimeoutMs: 50,
187+
onBackstopResult: (r: string) => backstopResults.push(r),
188+
});
179189
const res = await liveRuns(client); // never pushed -> backstop fires
180190
expect(res.status).toBe(200);
181191
expect(isUpToDate(await bodyOf(res))).toBe(true);
182192
expect(resolveSpy).toHaveBeenCalled();
193+
expect(backstopResults).toEqual(["empty"]);
183194
});
184195

185196
it("a cold env registration resolves immediately instead of holding blind", async () => {

0 commit comments

Comments
 (0)