Skip to content

Commit 4611cb4

Browse files
committed
fix(webapp): wake unfiltered runs feeds on the native realtime backend
Runs subscriptions with no tag filter were never indexed for change routing, so they only received updates from the periodic backstop poll instead of sub-second change notifications. Route every change record to zero-filter feeds and apply the same rule in the replay path.
1 parent ff0e9ef commit 4611cb4

2 files changed

Lines changed: 36 additions & 3 deletions

File tree

apps/webapp/app/services/realtime/envChangeRouter.server.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ type EnvState = {
8888
byBatchId: Map<string, Set<Feed>>;
8989
/** All tag feeds, for routing partial records (no tags) as hydrate-to-classify candidates. */
9090
tagFeeds: Set<Feed>;
91+
/** Tag feeds with no tag filter — they match every record but are unreachable via byTag. */
92+
unfilteredTagFeeds: Set<Feed>;
9193
/** When this env's channel subscription started (for the gap-coverage check). */
9294
subscribedAtMs: number;
9395
/** Latest record per run, insertion-ordered, for replaying inter-poll gaps to newly-armed feeds. */
@@ -241,6 +243,7 @@ export class EnvChangeRouter {
241243
byTag: new Map(),
242244
byBatchId: new Map(),
243245
tagFeeds: new Set(),
246+
unfilteredTagFeeds: new Set(),
244247
subscribedAtMs: Date.now(),
245248
recent: new Map(),
246249
};
@@ -309,11 +312,11 @@ export class EnvChangeRouter {
309312
case "batch":
310313
return record.batchId != null && record.batchId === feed.filter.batchId;
311314
case "tag": {
312-
// Partial record (no tags) = hydrate-to-classify candidate, like the live path.
313-
if (record.tags === undefined) {
315+
const tags = feed.filter.tags;
316+
// Unfiltered feed matches everything; partial record (no tags) = hydrate-to-classify.
317+
if (tags.length === 0 || record.tags === undefined) {
314318
return true;
315319
}
316-
const tags = feed.filter.tags;
317320
return record.tags.some((tag) => tags.includes(tag));
318321
}
319322
}
@@ -368,6 +371,9 @@ export class EnvChangeRouter {
368371
break;
369372
case "tag":
370373
env.tagFeeds.add(feed);
374+
if (feed.filter.tags.length === 0) {
375+
env.unfilteredTagFeeds.add(feed);
376+
}
371377
for (const tag of feed.filter.tags) {
372378
addToIndex(env.byTag, tag, feed);
373379
}
@@ -385,6 +391,7 @@ export class EnvChangeRouter {
385391
break;
386392
case "tag":
387393
env.tagFeeds.delete(feed);
394+
env.unfilteredTagFeeds.delete(feed);
388395
for (const tag of feed.filter.tags) {
389396
removeFromIndex(env.byTag, tag, feed);
390397
}
@@ -436,6 +443,8 @@ export class EnvChangeRouter {
436443
addMatch(feed, record.runId);
437444
}
438445
}
446+
// Unfiltered tag feeds match every record but live outside the index.
447+
for (const feed of env.unfilteredTagFeeds) addMatch(feed, record.runId);
439448
} else {
440449
// Partial record (no membership data): route to every tag feed as a candidate to
441450
// hydrate-and-classify (rare; the publish side emits full records in practice).

apps/webapp/test/realtime/envChangeRouter.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,30 @@ describe("EnvChangeRouter", () => {
8686
reg.close();
8787
});
8888

89+
it("wakes an unfiltered tag feed (no tags) for every full record, live and via replay", async () => {
90+
const rows = new Map([["r1", row("r1", { tags: ["a"] })]]);
91+
const { router, src } = makeRouter(rows);
92+
93+
// Live path: a full record (tags defined) must reach the zero-filter feed even
94+
// though it can never appear in the byTag index.
95+
const reg = router.register("env_1", { kind: "tag", tags: [] }, []);
96+
const wait = reg.waitForMatch(undefined, 1_000);
97+
src.push("env_1", [record("r1", { tags: ["a"] })]);
98+
const live = await wait;
99+
expect(live.reason).toBe("notify");
100+
expect(live.rows.map((m) => m.row.id)).toEqual(["r1"]);
101+
reg.close();
102+
103+
// Replay path: the buffered record matches an unfiltered feed registered after the push.
104+
const late = router.register("env_1", { kind: "tag", tags: [] }, [], {
105+
replaySinceMs: Date.now() - 1_000,
106+
});
107+
const replayed = await late.waitForMatch(undefined, 1_000);
108+
expect(replayed.reason).toBe("notify");
109+
expect(replayed.rows.map((m) => m.row.id)).toEqual(["r1"]);
110+
late.close();
111+
});
112+
89113
it("batch-hydrates ONCE and shares the serialized value across feeds matching the same run", async () => {
90114
const rows = new Map([["r1", row("r1", { tags: ["a"] })]]);
91115
const { router, src, hydrateSpy } = makeRouter(rows);

0 commit comments

Comments
 (0)