Skip to content

Commit f9d57d3

Browse files
authored
feat(webapp): add a new backend for the realtime runs feed (#3864)
## Summary Adds a second backend for the realtime runs feed (`useRealtimeRun`, `subscribeToRunsWithTag`, `subscribeToBatch`), built to stay healthy when a single busy environment has many subscribers watching many runs at once. It is gated behind a feature flag with the existing backend as the default, so nothing changes for users until it is enabled per environment. ## Design A run change is published once, as a small self-describing record, to a single per-environment channel. Every feed is then a predicate over that one stream rather than owning a channel: - A per-instance router indexes the currently-held feeds by run, tag, and batch. When a run changes it hydrates the affected rows once and serializes them once, then fans the result to every matching feed. One hot shared tag watched by many subscribers costs a single database query and serialize, not one per subscriber. - Feeds that don't match a change are never woken, wake delivery per environment is coalesced on a leading edge (250ms default) so a burst of changes costs one wake, and cold reads coalesce onto a single short-TTL-cached resolve. - An admission gate bounds how many cold ClickHouse resolves run concurrently, so a mass reconnect across many distinct filters queues instead of stampeding the database. - Changes that land while a client is between long-polls are delivered on its next poll instead of waiting for the periodic backstop: each environment buffers its recent change records, subscriptions linger briefly after the last feed closes, and a newly-armed poll replays exactly the connection's gap. - The per-connection replay cursors behind that are shared across instances via Redis (a single timestamp each), so a poll landing on a different instance behind the load balancer still reads the connection's true gap instead of falling back to a cold resolve. Cursor reads have a bounded deadline and degrade to the cold-read path on any Redis trouble. - Tag subscriptions with multiple tags match runs carrying all of the tags, mirroring the existing backend's filter semantics, and live long-polls hold for about 20 seconds to match its cadence. - The per-environment channel supports Redis Cluster sharded pub/sub, so the wake path scales horizontally across shards by environment. - The backend reports its health through OpenTelemetry metrics (delivery lag, poll resolution paths, backstop outcomes, replay and cursor-store activity), with a provisioned Grafana dashboard for local development. Everything is behind the feature flag and tunable via env vars; the existing backend remains the default.
1 parent 6afc9bf commit f9d57d3

55 files changed

Lines changed: 6587 additions & 48 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Add a new backend for the realtime runs feed (single runs, tags, and batches) that scales under high concurrency, available behind a feature flag

apps/supervisor/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"@kubernetes/client-node": "^1.0.0",
1919
"@trigger.dev/core": "workspace:*",
2020
"dockerode": "^4.0.6",
21-
"ioredis": "^5.3.2",
21+
"ioredis": "~5.6.0",
2222
"p-limit": "^6.2.0",
2323
"prom-client": "^15.1.0",
2424
"socket.io": "4.7.4",

apps/webapp/app/entry.server.tsx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {
2727
registerRunEngineEventBusHandlers,
2828
setupBatchQueueCallbacks,
2929
} from "./v3/runEngineHandlers.server";
30+
import { registerRunChangeNotifierHandlers } from "./services/realtime/runChangeNotifierHandlers.server";
3031
// Touch the sessions replication singleton at entry so it boots deterministically
3132
// on webapp startup. The singleton's initializer wires start (gated on
3233
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
@@ -269,6 +270,9 @@ process.on("uncaughtException", (error, origin) => {
269270

270271
singleton("RunEngineEventBusHandlers", registerRunEngineEventBusHandlers);
271272
singleton("SetupBatchQueueCallbacks", setupBatchQueueCallbacks);
273+
// Attach the realtime run-changed publish delegations to the engine event bus.
274+
// No-ops (registers nothing) unless REALTIME_BACKEND_NATIVE_ENABLED=1.
275+
singleton("RunChangeNotifierHandlers", registerRunChangeNotifierHandlers);
272276

273277
// Wrapped in singleton() so Remix's dev-mode CJS reloads don't append
274278
// duplicate copies of the processor — Sentry's processor list lives in

apps/webapp/app/env.server.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,45 @@ const EnvironmentSchema = z
300300
.int()
301301
.default(24 * 60 * 60 * 1000), // 1 day in milliseconds
302302

303+
// Master switch for the native realtime backend; off = Electric serves everything, publishes no-op.
304+
REALTIME_BACKEND_NATIVE_ENABLED: z.string().default("0"),
305+
// Live long-poll backstop hold (ms); matches Electric's ~20s cadence.
306+
REALTIME_BACKEND_NATIVE_LIVE_POLL_TIMEOUT_MS: z.coerce.number().int().default(20_000),
307+
// Jitter ratio on the live-poll hold (0.15 = ±15%) to avoid synchronized refetch herds.
308+
REALTIME_BACKEND_NATIVE_LIVE_POLL_JITTER_RATIO: z.coerce.number().default(0.15),
309+
// Hard cap on the tag-list snapshot size.
310+
REALTIME_BACKEND_NATIVE_MAX_LIST_RESULTS: z.coerce.number().int().default(1_000),
311+
// TTL/size of the coalescing cache for the multi-run resolve+hydrate (same-filter feeds share one query).
312+
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_TTL_MS: z.coerce.number().int().default(1_000),
313+
REALTIME_BACKEND_NATIVE_RUNSET_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
314+
// Size/TTL of the per-handle working-set cache used to diff multi-run live polls.
315+
REALTIME_BACKEND_NATIVE_WORKING_SET_MAX_ENTRIES: z.coerce.number().int().default(10_000),
316+
REALTIME_BACKEND_NATIVE_WORKING_SET_TTL_MS: z.coerce.number().int().default(300_000),
317+
// Bucket (ms) the tag-list createdAt floor is quantized to so same-tag feeds share a cache entry; 0 disables.
318+
REALTIME_BACKEND_NATIVE_RUNSET_CREATED_AT_BUCKET_MS: z.coerce.number().int().default(60_000),
319+
// Leading-edge throttle (ms) on per-env wake delivery; 0 wakes on every change.
320+
REALTIME_BACKEND_NATIVE_ENV_WAKE_COALESCE_WINDOW_MS: z.coerce.number().int().default(250),
321+
// "1" shares per-connection replay cursors fleet-wide via Redis, so a load-balancer hop reads the connection's true inter-poll gap instead of cold-resolving.
322+
REALTIME_BACKEND_NATIVE_SHARED_REPLAY_CURSORS: z.string().default("1"),
323+
// "1" holds a multi-run live poll open on a non-matching wake instead of replying up-to-date.
324+
REALTIME_BACKEND_NATIVE_HOLD_ON_EMPTY: z.string().default("1"),
325+
// Max concurrent fresh ClickHouse resolves per instance (reconnect-stampede gate); 0 disables.
326+
REALTIME_BACKEND_NATIVE_RESOLVE_ADMISSION_LIMIT: z.coerce.number().int().default(16),
327+
// Replay window (ms) for buffered change records delivered to newly-armed feeds; 0 disables.
328+
REALTIME_BACKEND_NATIVE_REPLAY_WINDOW_MS: z.coerce.number().int().default(2_000),
329+
// Cap on buffered recent records per env (latest record per run).
330+
REALTIME_BACKEND_NATIVE_REPLAY_MAX_RUNS: z.coerce.number().int().default(512),
331+
// Keep an env subscribed + buffering this long (ms) after its last feed closes; 0 disables.
332+
REALTIME_BACKEND_NATIVE_UNSUBSCRIBE_LINGER_MS: z.coerce.number().int().default(5_000),
333+
// Fallback per-env concurrent-connection limit when the org has none configured.
334+
REALTIME_BACKEND_NATIVE_DEFAULT_CONCURRENCY_LIMIT: z.coerce.number().int().default(100_000),
335+
// TTL/size of the single-run read-through cache that collapses duplicate refetch bursts.
336+
REALTIME_BACKEND_NATIVE_RUN_CACHE_TTL_MS: z.coerce.number().int().default(250),
337+
REALTIME_BACKEND_NATIVE_RUN_CACHE_MAX_ENTRIES: z.coerce.number().int().default(5_000),
338+
// TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend.
339+
REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000),
340+
REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000),
341+
303342
PUBSUB_REDIS_HOST: z
304343
.string()
305344
.optional()
@@ -332,6 +371,36 @@ const EnvironmentSchema = z
332371
PUBSUB_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
333372
PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
334373

374+
// Dedicated pub/sub Redis for the native realtime backend; falls back to PUBSUB_REDIS_* then REDIS_*.
375+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_HOST: z
376+
.string()
377+
.optional()
378+
.transform((v) => v ?? process.env.PUBSUB_REDIS_HOST ?? process.env.REDIS_HOST),
379+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PORT: z.coerce
380+
.number()
381+
.optional()
382+
.transform((v) => {
383+
if (v !== undefined) return v;
384+
const raw = process.env.PUBSUB_REDIS_PORT ?? process.env.REDIS_PORT;
385+
return raw ? parseInt(raw) : undefined;
386+
}),
387+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_USERNAME: z
388+
.string()
389+
.optional()
390+
.transform((v) => v ?? process.env.PUBSUB_REDIS_USERNAME ?? process.env.REDIS_USERNAME),
391+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_PASSWORD: z
392+
.string()
393+
.optional()
394+
.transform((v) => v ?? process.env.PUBSUB_REDIS_PASSWORD ?? process.env.REDIS_PASSWORD),
395+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_TLS_DISABLED: z
396+
.string()
397+
.default(process.env.PUBSUB_REDIS_TLS_DISABLED ?? process.env.REDIS_TLS_DISABLED ?? "false"),
398+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_CLUSTER_MODE_ENABLED: z
399+
.string()
400+
.default(process.env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED ?? "0"),
401+
// Use sharded pub/sub (SSUBSCRIBE/SPUBLISH) in cluster mode; "0" forces classic pub/sub.
402+
REALTIME_BACKEND_NATIVE_PUBSUB_REDIS_SHARDED_ENABLED: z.string().default("1"),
403+
335404
DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
336405
DEFAULT_ENV_EXECUTION_CONCURRENCY_BURST_FACTOR: z.coerce.number().default(1.0),
337406
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(300),
@@ -1610,6 +1679,18 @@ const EnvironmentSchema = z
16101679
.enum(["log", "error", "warn", "info", "debug"])
16111680
.default("info"),
16121681
RUN_ENGINE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
1682+
// Dedicated ClickHouse pool for the native backend's tag/batch id resolution; falls back to CLICKHOUSE_URL.
1683+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_URL: z
1684+
.string()
1685+
.optional()
1686+
.transform((v) => v ?? process.env.CLICKHOUSE_URL),
1687+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_ENABLED: z.string().default("1"),
1688+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().optional(),
1689+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
1690+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_LOG_LEVEL: z
1691+
.enum(["log", "error", "warn", "info", "debug"])
1692+
.default("info"),
1693+
REALTIME_BACKEND_NATIVE_CLICKHOUSE_COMPRESSION_REQUEST: z.string().default("1"),
16131694
EVENTS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(1000),
16141695
EVENTS_CLICKHOUSE_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
16151696
METRICS_CLICKHOUSE_BATCH_SIZE: z.coerce.number().int().default(10000),

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,20 @@ export async function findEnvironmentBySlug(
237237
return environment ? toAuthenticated(environment) : null;
238238
}
239239

240+
// The authenticated environment plus the run scalars the realtime publish needs.
241+
// Both come from one taskRun read — see findEnvironmentFromRun.
242+
export type EnvironmentFromRun = {
243+
environment: AuthenticatedEnvironment;
244+
runTags: string[];
245+
batchId: string | null;
246+
};
247+
240248
export async function findEnvironmentFromRun(
241249
runId: string,
242250
tx?: PrismaClientOrTransaction
243-
): Promise<AuthenticatedEnvironment | null> {
251+
): Promise<EnvironmentFromRun | null> {
252+
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
253+
// ride along for free — no extra query for the realtime publish to send a full record.
244254
const taskRun = await (tx ?? $replica).taskRun.findFirst({
245255
where: {
246256
id: runId,
@@ -249,7 +259,14 @@ export async function findEnvironmentFromRun(
249259
runtimeEnvironment: { include: authIncludeBase },
250260
},
251261
});
252-
return taskRun?.runtimeEnvironment ? toAuthenticated(taskRun.runtimeEnvironment) : null;
262+
if (!taskRun?.runtimeEnvironment) {
263+
return null;
264+
}
265+
return {
266+
environment: toAuthenticated(taskRun.runtimeEnvironment),
267+
runTags: taskRun.runTags,
268+
batchId: taskRun.batchId,
269+
};
253270
}
254271

255272
export async function createNewSession(

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1212
import { authenticateApiRequest } from "~/services/apiAuth.server";
1313
import { logger } from "~/services/logger.server";
1414
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
15+
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
1516
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1617
import { ServiceValidationError } from "~/v3/services/common.server";
1718
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
@@ -184,7 +185,15 @@ const { action } = createActionApiRoute(
184185
return json({ error: "Internal Server Error" }, { status: 500 });
185186
}
186187
if (pgResult) {
187-
return json(pgResult, { status: 200 });
188+
// Reflect metadata.set() on a live feed before the next lifecycle event. Publish the
189+
// internal id (the router keys single-run feeds by it, not the friendly id from the URL).
190+
publishChangeRecord({
191+
runId: pgResult.runId,
192+
envId: env.id,
193+
tags: pgResult.runTags,
194+
batchId: pgResult.batchId,
195+
});
196+
return json({ metadata: pgResult.metadata }, { status: 200 });
188197
}
189198

190199
// PG miss. Target run is either buffered or genuinely absent.

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
77
import { authenticateApiRequest } from "~/services/apiAuth.server";
88
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
99
import { logger } from "~/services/logger.server";
10+
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
1011
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
1112

1213
// Pull the existing tags out of a buffer entry's serialised payload so
@@ -90,6 +91,14 @@ export async function action({ request, params }: ActionFunctionArgs) {
9091
},
9192
data: { runTags: { push: newTags } },
9293
});
94+
// Publish a run-changed record with the NEW tag set so tag feeds reindex
95+
// (no-op unless enabled).
96+
publishChangeRecord({
97+
runId: taskRun.id,
98+
envId: env.id,
99+
tags: existing.concat(newTags),
100+
batchId: taskRun.batchId,
101+
});
93102
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
94103
},
95104
// Buffer-applied patch path. The mutateSnapshot Lua deduplicates

apps/webapp/app/routes/realtime.v1.batches.$batchId.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { z } from "zod";
22
import { $replica } from "~/db.server";
33
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
4-
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
4+
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
55
import { anyResource, createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
66

77
const ParamsSchema = z.object({
@@ -33,7 +33,10 @@ export const loader = createLoaderApiRoute(
3333
},
3434
},
3535
async ({ authentication, request, resource: batchRun, apiVersion }) => {
36-
return realtimeClient.streamBatch(
36+
// Pick the Electric proxy or the native backend per org (defaults to Electric); both implement streamBatch.
37+
const client = await resolveRealtimeStreamClient(authentication.environment);
38+
39+
return client.streamBatch(
3740
request.url,
3841
authentication.environment,
3942
batchRun.id,

apps/webapp/app/routes/realtime.v1.runs.$runId.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
44
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
5-
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
5+
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
66
import {
77
anyResource,
88
createLoaderApiRoute,
@@ -48,17 +48,17 @@ export const loader = createLoaderApiRoute(
4848
},
4949
},
5050
async ({ authentication, request, resource: run, apiVersion }) => {
51-
return realtimeClient.streamRun(
51+
// Pick the Electric proxy or the native backend per org (defaults to Electric); both implement streamRun.
52+
const client = await resolveRealtimeStreamClient(authentication.environment);
53+
54+
return client.streamRun(
5255
request.url,
5356
authentication.environment,
5457
run.id,
5558
apiVersion,
5659
authentication.realtime,
5760
request.headers.get("x-trigger-electric-version") ?? undefined,
58-
// Propagate abort on client disconnect so the upstream Electric long-poll
59-
// fetch is cancelled too. Without this, undici buffers from the unconsumed
60-
// upstream response body accumulate until Electric's poll timeout, causing
61-
// steady RSS growth on api (see docs/runbooks for the H1 isolation test).
61+
// Propagate abort on client disconnect so the upstream Electric long-poll is cancelled too, else undici buffers grow RSS until the poll timeout.
6262
getRequestAbortSignal()
6363
);
6464
}

apps/webapp/app/routes/realtime.v1.runs.ts

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { z } from "zod";
22
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
3-
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
3+
import { resolveRealtimeStreamClient } from "~/services/realtime/resolveRealtimeStreamClient.server";
44
import {
55
anyResource,
66
createLoaderApiRoute,
@@ -25,12 +25,7 @@ export const loader = createLoaderApiRoute(
2525
authorization: {
2626
action: "read",
2727
resource: (_, __, searchParams) =>
28-
// Pre-RBAC, the resource was the searchParams object itself and
29-
// the legacy `checkAuthorization` iterated `Object.keys`, so a
30-
// JWT with type-level `read:tags` (no id) granted access to the
31-
// unfiltered runs stream. Including `{ type: "tags" }` here
32-
// preserves that — per-id `read:tags:<tag>` still grants only
33-
// when the filter includes that tag.
28+
// `{ type: "tags" }` preserves pre-RBAC type-level `read:tags` access to the unfiltered stream; per-id `read:tags:<tag>` still grants only when the filter includes that tag.
3429
anyResource([
3530
{ type: "runs" },
3631
{ type: "tags" },
@@ -39,7 +34,10 @@ export const loader = createLoaderApiRoute(
3934
},
4035
},
4136
async ({ searchParams, authentication, request, apiVersion }) => {
42-
return realtimeClient.streamRuns(
37+
// Pick the Electric proxy or the native backend per org (defaults to Electric); both implement streamRuns.
38+
const client = await resolveRealtimeStreamClient(authentication.environment);
39+
40+
return client.streamRuns(
4341
request.url,
4442
authentication.environment,
4543
searchParams,

0 commit comments

Comments
 (0)