Skip to content

Commit 337a4f0

Browse files
committed
fix(webapp): emit realtime backend metrics through OpenTelemetry
The realtime backend's metrics were registered with the in-process Prometheus registry, which is no longer how the webapp ships metrics. They now emit through the OpenTelemetry meter (realtime_native.*, realtime_notifier.*, realtime_shadow.*) so they flow through the internal metrics exporter like the rest of the webapp's instrumentation. Gauges become observable gauges sampled at export time; names move from prom-style _total suffixes to the meter's dot-namespaced convention.
1 parent 1b82875 commit 337a4f0

3 files changed

Lines changed: 115 additions & 153 deletions

File tree

apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts

Lines changed: 84 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import { Counter, Gauge, Histogram } from "prom-client";
1+
import { getMeter } from "@internal/tracing";
22
import { $replica } from "~/db.server";
33
import { env } from "~/env.server";
4-
import { metricsRegister } from "~/metrics.server";
54
import { singleton } from "~/utils/singleton";
65
import { getCachedLimit } from "../platform.v3.server";
76
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
@@ -15,87 +14,67 @@ import { RunHydrator } from "./runReader.server";
1514
// Process-singleton wiring for the native realtime client; only constructed when a
1615
// request actually routes to it, so a disabled webapp never instantiates it.
1716
function initializeNativeRealtimeClient(): NativeRealtimeClient {
18-
const wakeups = new Counter({
19-
name: "realtime_native_wakeups_total",
20-
help: "Live realtime wakeups by reason. A rising 'timeout' share suggests a write site is missing its publishChangeRecord delegate.",
21-
labelNames: ["reason"] as const,
22-
registers: [metricsRegister],
17+
const meter = getMeter("realtime-native");
18+
19+
const wakeups = meter.createCounter("realtime_native.wakeups", {
20+
description:
21+
"Live realtime wakeups by reason. A rising 'timeout' share suggests a write site is missing its publishChangeRecord delegate.",
2322
});
2423

25-
const runSetResolves = new Counter({
26-
name: "realtime_native_runset_resolve_total",
27-
help: "Multi-run (tag-list/batch) resolve+hydrate outcomes. 'hit'/'coalesced' vs 'miss' shows how effectively concurrent same-filter feeds share a single ClickHouse + Postgres query under an env-wide wake.",
28-
labelNames: ["result"] as const,
29-
registers: [metricsRegister],
24+
const runSetResolves = meter.createCounter("realtime_native.runset_resolves", {
25+
description:
26+
"Multi-run (tag-list/batch) resolve+hydrate outcomes. 'hit'/'coalesced' vs 'miss' shows how effectively concurrent same-filter feeds share a single ClickHouse + Postgres query.",
3027
});
3128

32-
const runSetQueryMs = new Histogram({
33-
name: "realtime_native_runset_query_ms",
34-
help: "Latency of the multi-run resolve (ClickHouse) and hydrate (Postgres) stages.",
35-
labelNames: ["stage"] as const,
36-
buckets: [1, 5, 10, 25, 50, 100, 250, 500, 1_000, 2_500, 5_000],
37-
registers: [metricsRegister],
29+
const runSetQueryMs = meter.createHistogram("realtime_native.runset_query_ms", {
30+
description: "Latency of the multi-run resolve (ClickHouse) and hydrate (Postgres) stages.",
31+
unit: "ms",
3832
});
3933

40-
const livePollPaths = new Counter({
41-
name: "realtime_native_live_poll_total",
42-
help: "How live polls resolved. 'fast-hydrate' = the router woke the feed with matched runs hydrated by id (no ClickHouse); 'full-resolve' = the backstop timeout did a ClickHouse resolve. A high fast-path share is the local-membership routing working.",
43-
labelNames: ["path"] as const,
44-
registers: [metricsRegister],
34+
const livePollPaths = meter.createCounter("realtime_native.live_polls", {
35+
description:
36+
"How live polls resolved. 'fast-hydrate' = router wake with rows hydrated by id (no ClickHouse); 'full-resolve' = backstop; 'cold-resolve' = fresh env subscription probed once.",
4537
});
4638

47-
const routerHydrates = new Counter({
48-
name: "realtime_native_router_hydrated_runs_total",
49-
help: "Runs hydrated by the EnvChangeRouter's batch-hydrate (one query per column set per wake, shared across all feeds matching the same run — the hot-shared-tag fan-out collapse).",
50-
registers: [metricsRegister],
39+
const routerHydrates = meter.createCounter("realtime_native.router_hydrated_runs", {
40+
description:
41+
"Runs hydrated by the EnvChangeRouter's batch-hydrate (one query per column set per wake, shared across all feeds matching the same run).",
5142
});
5243

53-
const resolveAdmissionWaits = new Counter({
54-
name: "realtime_native_resolve_admission_waits_total",
55-
help: "Fresh ClickHouse resolves that had to queue for an admission permit. A rising count means a distinct-filter reconnect stampede is being throttled (the gate is doing its job).",
56-
registers: [metricsRegister],
44+
const resolveAdmissionWaits = meter.createCounter("realtime_native.resolve_admission_waits", {
45+
description:
46+
"Fresh ClickHouse resolves that had to queue for an admission permit. A rising count means a distinct-filter reconnect stampede is being throttled (the gate is doing its job).",
5747
});
5848

59-
const replays = new Counter({
60-
name: "realtime_native_replays_total",
61-
help: "Buffered change records replayed to a newly-armed feed (inter-poll gap recovery). 'delivered' = rows reached the feed; 'empty' = candidates hydrated but none survived the filter/diff.",
62-
labelNames: ["result"] as const,
63-
registers: [metricsRegister],
49+
const replays = meter.createCounter("realtime_native.replays", {
50+
description:
51+
"Buffered change records replayed to a newly-armed feed (inter-poll gap recovery). 'delivered' = rows reached the feed; 'empty' = candidates hydrated but none survived the filter/diff.",
6452
});
6553

66-
const deliveryLagMs = new Histogram({
67-
name: "realtime_native_delivery_lag_ms",
68-
help: "Live emissions: now minus the newest emitted row's updatedAt (PG clock vs app clock, so approximate). The end-to-end delivery SLI — a p99 near the backstop hold means wakes are being missed.",
69-
labelNames: ["path"] as const,
70-
buckets: [5, 10, 25, 50, 100, 250, 500, 1_000, 2_500, 5_000, 10_000, 30_000],
71-
registers: [metricsRegister],
54+
const replayEvictions = meter.createCounter("realtime_native.replay_evictions", {
55+
description:
56+
"Replay-buffer evictions. 'window' expiry is normal; 'cap' means an env churns more runs inside the window than the buffer holds (replay guarantee degrading — retune the knobs).",
7257
});
7358

74-
const emittedRows = new Histogram({
75-
name: "realtime_native_emitted_rows",
76-
help: "Rows per live emission. Deltas should be small; a fat tail means working-set/offset-floor fallbacks are re-emitting full sets.",
77-
buckets: [1, 2, 5, 10, 25, 50, 100, 250, 1_000],
78-
registers: [metricsRegister],
59+
const deliveryLagMs = meter.createHistogram("realtime_native.delivery_lag_ms", {
60+
description:
61+
"Live emissions: now minus the newest emitted row's updatedAt (PG clock vs app clock, so approximate). The end-to-end delivery SLI — a p99 near the backstop hold means wakes are being missed.",
62+
unit: "ms",
7963
});
8064

81-
const backstops = new Counter({
82-
name: "realtime_native_backstop_total",
83-
help: "Backstop full resolves by outcome. 'empty' is normal idle behavior; sustained 'delivered' means the notify/replay path missed changes — alert on it.",
84-
labelNames: ["result"] as const,
85-
registers: [metricsRegister],
65+
const emittedRows = meter.createHistogram("realtime_native.emitted_rows", {
66+
description:
67+
"Rows per live emission. Deltas should be small; a fat tail means working-set/offset-floor fallbacks are re-emitting full sets.",
68+
unit: "rows",
8669
});
8770

88-
const concurrencyRejections = new Counter({
89-
name: "realtime_native_concurrency_rejections_total",
90-
help: "Polls rejected (429) by the per-env concurrency limiter.",
91-
registers: [metricsRegister],
71+
const backstops = meter.createCounter("realtime_native.backstops", {
72+
description:
73+
"Backstop full resolves by outcome. 'empty' is normal idle behavior; sustained 'delivered' means the notify/replay path missed changes — alert on it.",
9274
});
9375

94-
const replayEvictions = new Counter({
95-
name: "realtime_native_replay_evictions_total",
96-
help: "Replay-buffer evictions. 'window' expiry is normal; 'cap' means an env churns more runs inside the window than the buffer holds (replay guarantee degrading — retune the knobs).",
97-
labelNames: ["reason"] as const,
98-
registers: [metricsRegister],
76+
const concurrencyRejections = meter.createCounter("realtime_native.concurrency_rejections", {
77+
description: "Polls rejected (429) by the per-env concurrency limiter.",
9978
});
10079

10180
const limiter = new RealtimeConcurrencyLimiter({
@@ -120,12 +99,12 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
12099
const router = new EnvChangeRouter({
121100
source: getRunChangeNotifier(),
122101
hydrator: runReader,
123-
onHydrate: (runCount) => routerHydrates.inc(runCount),
102+
onHydrate: (runCount) => routerHydrates.add(runCount),
124103
replayWindowMs: env.REALTIME_BACKEND_NATIVE_REPLAY_WINDOW_MS,
125104
replayMaxRunsPerEnv: env.REALTIME_BACKEND_NATIVE_REPLAY_MAX_RUNS,
126105
unsubscribeLingerMs: env.REALTIME_BACKEND_NATIVE_UNSUBSCRIBE_LINGER_MS,
127-
onReplay: (result) => replays.inc({ result }),
128-
onReplayEviction: (reason) => replayEvictions.inc({ reason }),
106+
onReplay: (result) => replays.add(1, { result }),
107+
onReplayEviction: (reason) => replayEvictions.add(1, { reason }),
129108
});
130109

131110
const client = new NativeRealtimeClient({
@@ -159,58 +138,50 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient {
159138
runSetCreatedAtBucketMs: env.REALTIME_BACKEND_NATIVE_RUNSET_CREATED_AT_BUCKET_MS,
160139
holdOnEmpty: env.REALTIME_BACKEND_NATIVE_HOLD_ON_EMPTY === "1",
161140
resolveAdmissionLimit: env.REALTIME_BACKEND_NATIVE_RESOLVE_ADMISSION_LIMIT,
162-
onWakeup: (reason) => wakeups.inc({ reason }),
163-
onLivePollPath: (path) => livePollPaths.inc({ path }),
164-
onRunSetResolve: (result) => runSetResolves.inc({ result }),
165-
onRunSetQuery: (stage, ms) => runSetQueryMs.observe({ stage }, ms),
166-
onResolveAdmissionWait: () => resolveAdmissionWaits.inc(),
141+
onWakeup: (reason) => wakeups.add(1, { reason }),
142+
onLivePollPath: (path) => livePollPaths.add(1, { path }),
143+
onRunSetResolve: (result) => runSetResolves.add(1, { result }),
144+
onRunSetQuery: (stage, ms) => runSetQueryMs.record(ms, { stage }),
145+
onResolveAdmissionWait: () => resolveAdmissionWaits.add(1),
167146
onEmit: (path, lagMs, rowCount) => {
168-
deliveryLagMs.observe({ path }, Math.max(lagMs, 0));
169-
emittedRows.observe(rowCount);
170-
},
171-
onBackstopResult: (result) => backstops.inc({ result }),
172-
onConcurrencyRejected: () => concurrencyRejections.inc(),
173-
});
174-
175-
new Gauge({
176-
name: "realtime_native_working_set_size",
177-
help: "Entries in the per-handle working-set cache (one per active multi-run feed session).",
178-
registers: [metricsRegister],
179-
collect() {
180-
this.set(client.workingSetCacheSize);
181-
},
182-
});
183-
184-
new Gauge({
185-
name: "realtime_native_resolve_admission_in_use",
186-
help: "Fresh ClickHouse resolves currently holding an admission permit (live concurrency against the gate's limit).",
187-
registers: [metricsRegister],
188-
collect() {
189-
this.set(client.resolveAdmissionInUse);
147+
deliveryLagMs.record(Math.max(lagMs, 0), { path });
148+
emittedRows.record(rowCount);
190149
},
191-
});
192-
193-
new Gauge({
194-
name: "realtime_native_held_feeds",
195-
help: "Long-polls currently held, by feed kind — the system's capacity unit.",
196-
labelNames: ["kind"] as const,
197-
registers: [metricsRegister],
198-
collect() {
150+
onBackstopResult: (result) => backstops.add(1, { result }),
151+
onConcurrencyRejected: () => concurrencyRejections.add(1),
152+
});
153+
154+
meter
155+
.createObservableGauge("realtime_native.working_set_size", {
156+
description:
157+
"Entries in the per-handle working-set cache (one per active multi-run feed session).",
158+
})
159+
.addCallback((result) => result.observe(client.workingSetCacheSize));
160+
161+
meter
162+
.createObservableGauge("realtime_native.resolve_admission_in_use", {
163+
description:
164+
"Fresh ClickHouse resolves currently holding an admission permit (live concurrency against the gate's limit).",
165+
})
166+
.addCallback((result) => result.observe(client.resolveAdmissionInUse));
167+
168+
meter
169+
.createObservableGauge("realtime_native.held_feeds", {
170+
description: "Long-polls currently held, by feed kind — the system's capacity unit.",
171+
})
172+
.addCallback((result) => {
199173
const counts = router.heldFeedCounts;
200-
this.set({ kind: "run" }, counts.run);
201-
this.set({ kind: "tag" }, counts.tag);
202-
this.set({ kind: "batch" }, counts.batch);
203-
},
204-
});
205-
206-
new Gauge({
207-
name: "realtime_native_active_envs",
208-
help: "Environments currently routed on this instance (held feeds + lingering subscriptions).",
209-
registers: [metricsRegister],
210-
collect() {
211-
this.set(router.activeEnvCount);
212-
},
213-
});
174+
result.observe(counts.run, { kind: "run" });
175+
result.observe(counts.tag, { kind: "tag" });
176+
result.observe(counts.batch, { kind: "batch" });
177+
});
178+
179+
meter
180+
.createObservableGauge("realtime_native.active_envs", {
181+
description:
182+
"Environments currently routed on this instance (held feeds + lingering subscriptions).",
183+
})
184+
.addCallback((result) => result.observe(router.activeEnvCount));
214185

215186
return client;
216187
}

apps/webapp/app/services/realtime/runChangeNotifierInstance.server.ts

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import { Counter, Gauge } from "prom-client";
1+
import { getMeter } from "@internal/tracing";
22
import { env } from "~/env.server";
3-
import { metricsRegister } from "~/metrics.server";
43
import { singleton } from "~/utils/singleton";
54
import { RunChangeNotifier, type ChangeRecordInput } from "./runChangeNotifier.server";
65

@@ -16,23 +15,20 @@ function initializeRunChangeNotifier(): RunChangeNotifier {
1615
// broadcast every message to every node, so this is what actually shards load.
1716
const shardedPubSub = clusterMode && env.REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_SHARDED_ENABLED === "1";
1817

19-
const publishes = new Counter({
20-
name: "realtime_run_change_notifier_publishes_total",
21-
help: "Change-record publishes by outcome. Failures are the leading indicator that feeds are degrading to their backstops (pub/sub Redis trouble).",
22-
labelNames: ["result"] as const,
23-
registers: [metricsRegister],
18+
const meter = getMeter("realtime-notifier");
19+
20+
const publishes = meter.createCounter("realtime_notifier.publishes", {
21+
description:
22+
"Change-record publishes by outcome. Failures are the leading indicator that feeds are degrading to their backstops (pub/sub Redis trouble).",
2423
});
2524

26-
const received = new Counter({
27-
name: "realtime_run_change_notifier_messages_received_total",
28-
help: "Raw channel messages received by this instance's subscriber, pre-coalesce.",
29-
registers: [metricsRegister],
25+
const received = meter.createCounter("realtime_notifier.messages_received", {
26+
description: "Raw channel messages received by this instance's subscriber, pre-coalesce.",
3027
});
3128

32-
const delivered = new Counter({
33-
name: "realtime_run_change_notifier_batches_delivered_total",
34-
help: "Coalesced batches delivered to listeners. received/batches = the coalesce ratio (how hard a busy env is being collapsed).",
35-
registers: [metricsRegister],
29+
const delivered = meter.createCounter("realtime_notifier.batches_delivered", {
30+
description:
31+
"Coalesced batches delivered to listeners. received/batches = the coalesce ratio (how hard a busy env is being collapsed).",
3632
});
3733

3834
const notifier = new RunChangeNotifier({
@@ -48,19 +44,16 @@ function initializeRunChangeNotifier(): RunChangeNotifier {
4844
},
4945
envWakeCoalesceWindowMs: env.REALTIME_BACKEND_NATIVE_ENV_WAKE_COALESCE_WINDOW_MS,
5046
shardedPubSub,
51-
onPublishResult: (ok) => publishes.inc({ result: ok ? "ok" : "error" }),
52-
onMessageReceived: () => received.inc(),
53-
onBatchDelivered: () => delivered.inc(),
47+
onPublishResult: (ok) => publishes.add(1, { result: ok ? "ok" : "error" }),
48+
onMessageReceived: () => received.add(1),
49+
onBatchDelivered: () => delivered.add(1),
5450
});
5551

56-
new Gauge({
57-
name: "realtime_run_change_notifier_active_subscriptions",
58-
help: "Distinct runs currently subscribed for realtime change notifications",
59-
collect() {
60-
this.set(notifier.activeSubscriptionCount);
61-
},
62-
registers: [metricsRegister],
63-
});
52+
meter
53+
.createObservableGauge("realtime_notifier.active_subscriptions", {
54+
description: "Distinct env channels currently subscribed for realtime change notifications.",
55+
})
56+
.addCallback((result) => result.observe(notifier.activeSubscriptionCount));
6457

6558
return notifier;
6659
}

apps/webapp/app/services/realtime/shadowRealtimeClientInstance.server.ts

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import { Counter } from "prom-client";
1+
import { getMeter } from "@internal/tracing";
22
import { $replica } from "~/db.server";
33
import { env } from "~/env.server";
4-
import { metricsRegister } from "~/metrics.server";
54
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
65
import { singleton } from "~/utils/singleton";
76
import { realtimeClient } from "../realtimeClientGlobal.server";
@@ -15,11 +14,9 @@ import { ShadowRealtimeClient } from "./shadowRealtimeClient.server";
1514
* when an org's `realtimeBackend` flag is set to "shadow".
1615
*/
1716
function initializeShadowRealtimeClient(): ShadowRealtimeClient {
18-
const compares = new Counter({
19-
name: "realtime_shadow_compare_total",
20-
help: "Dual-run shadow-compare outcomes (Electric vs native). kind=serialization|membership, result=match|diverge|skew.",
21-
labelNames: ["feed", "kind", "result"] as const,
22-
registers: [metricsRegister],
17+
const compares = getMeter("realtime-shadow").createCounter("realtime_shadow.compares", {
18+
description:
19+
"Dual-run shadow-compare outcomes (Electric vs native). kind=serialization|membership, result=match|diverge|skew.",
2320
});
2421

2522
const comparator = new RealtimeShadowComparator({
@@ -39,19 +36,20 @@ function initializeShadowRealtimeClient(): ShadowRealtimeClient {
3936
onOutcome: (outcome) => {
4037
const { feed } = outcome;
4138
if (outcome.serializationMatched) {
42-
compares.inc({ feed, kind: "serialization", result: "match" }, outcome.serializationMatched);
39+
compares.add(outcome.serializationMatched, { feed, kind: "serialization", result: "match" });
4340
}
4441
if (outcome.serializationDiverged) {
45-
compares.inc(
46-
{ feed, kind: "serialization", result: "diverge" },
47-
outcome.serializationDiverged
48-
);
42+
compares.add(outcome.serializationDiverged, {
43+
feed,
44+
kind: "serialization",
45+
result: "diverge",
46+
});
4947
}
5048
if (outcome.serializationSkew) {
51-
compares.inc({ feed, kind: "serialization", result: "skew" }, outcome.serializationSkew);
49+
compares.add(outcome.serializationSkew, { feed, kind: "serialization", result: "skew" });
5250
}
5351
if (outcome.membershipMatch !== undefined) {
54-
compares.inc({
52+
compares.add(1, {
5553
feed,
5654
kind: "membership",
5755
result: outcome.membershipMatch ? "match" : "diverge",

0 commit comments

Comments
 (0)