@@ -40,15 +40,31 @@ export type EnvChangeRouterOptions = {
4040 hydrator : RowHydrator ;
4141 /** Observability: a hydrate-by-id batch ran (count = runs hydrated this tick). */
4242 onHydrate ?: ( runCount : number ) => void ;
43+ /** How far back (ms) a newly-armed feed replays buffered records. 0 disables replay. */
44+ replayWindowMs ?: number ;
45+ /** Cap on buffered recent records per env (latest record per run). */
46+ replayMaxRunsPerEnv ?: number ;
47+ /** How long (ms) to keep an env subscribed + buffering after its last feed closes. 0 disables. */
48+ unsubscribeLingerMs ?: number ;
49+ /** Observability: a replay scan found candidates and delivered rows (or none survived). */
50+ onReplay ?: ( result : "delivered" | "empty" ) => void ;
4351} ;
4452
53+ const DEFAULT_REPLAY_WINDOW_MS = 2_000 ;
54+ const DEFAULT_REPLAY_MAX_RUNS_PER_ENV = 512 ;
55+ const DEFAULT_UNSUBSCRIBE_LINGER_MS = 5_000 ;
56+
4557/** Handle a feed holds for the duration of one long-poll. */
4658export type FeedRegistration = {
4759 /** Wait for the next batch matching this feed (or timeout/abort), with the matched runs
4860 * hydrated + serialized for this feed's columns. One wait active at a time. */
4961 waitForMatch ( signal : AbortSignal | undefined , timeoutMs : number ) : Promise < WaitResult > ;
5062 /** Deregister from the index; unsubscribes the env when the last feed leaves. */
5163 close ( ) : void ;
64+ /** False when this instance's env subscription is younger than the replay window, so a
65+ * change in the caller's inter-poll gap may have been missed (hop/cold start) — the
66+ * caller should resolve once instead of holding blind. */
67+ gapCovered : boolean ;
5268} ;
5369
5470type Feed = {
@@ -57,6 +73,8 @@ type Feed = {
5773 columnSig : string ;
5874 /** The currently-waiting poll's resolver (null between polls). */
5975 resolve : ( ( result : WaitResult ) => void ) | null ;
76+ /** Buffered records at or before this timestamp have been replayed (or predate this feed). */
77+ replayCursorMs : number ;
6078} ;
6179
6280type EnvState = {
@@ -67,6 +85,12 @@ type EnvState = {
6785 byBatchId : Map < string , Set < Feed > > ;
6886 /** All tag feeds, for routing partial records (no tags) as hydrate-to-classify candidates. */
6987 tagFeeds : Set < Feed > ;
88+ /** When this env's channel subscription started (for the gap-coverage check). */
89+ subscribedAtMs : number ;
90+ /** Latest record per run, insertion-ordered, for replaying inter-poll gaps to newly-armed feeds. */
91+ recent : Map < string , { record : ChangeRecord ; receivedAtMs : number } > ;
92+ /** Pending teardown while the env lingers with zero feeds. */
93+ lingerTimer ?: ReturnType < typeof setTimeout > ;
7094} ;
7195
7296function addToIndex ( index : Map < string , Set < Feed > > , key : string , feed : Feed ) {
@@ -93,13 +117,29 @@ export class EnvChangeRouter {
93117
94118 constructor ( private readonly options : EnvChangeRouterOptions ) { }
95119
96- register ( environmentId : string , filter : FeedFilter , skipColumns : string [ ] ) : FeedRegistration {
120+ register (
121+ environmentId : string ,
122+ filter : FeedFilter ,
123+ skipColumns : string [ ] ,
124+ opts ?: {
125+ /** When the caller last received data for this connection. Bounds the replay to the
126+ * true inter-poll gap; older than the window can't be proven covered. */
127+ replaySinceMs ?: number ;
128+ }
129+ ) : FeedRegistration {
97130 const env = this . #ensureEnv( environmentId ) ;
131+ const replayWindowMs = this . options . replayWindowMs ?? DEFAULT_REPLAY_WINDOW_MS ;
132+ const now = Date . now ( ) ;
133+ const windowFloorMs = now - replayWindowMs ;
134+ const sinceMs = opts ?. replaySinceMs ?? windowFloorMs ;
98135 const feed : Feed = {
99136 filter,
100137 skipColumns,
101138 columnSig : skipColumns . length > 0 ? [ ...skipColumns ] . sort ( ) . join ( "," ) : "" ,
102139 resolve : null ,
140+ // First arm replays the caller's inter-poll gap; later arms only what arrived since.
141+ // The buffer only spans the window, so never rewind past it.
142+ replayCursorMs : Math . max ( sinceMs , windowFloorMs ) ,
103143 } ;
104144
105145 env . feeds . add ( feed ) ;
@@ -129,6 +169,16 @@ export class EnvChangeRouter {
129169 onAbort = ( ) => settle ( { reason : "abort" , rows : [ ] } ) ;
130170 signal . addEventListener ( "abort" , onAbort , { once : true } ) ;
131171 }
172+ // Deliver any buffered records this feed hasn't seen (catches changes that
173+ // landed while the caller was between polls).
174+ if ( replayWindowMs > 0 && env . recent . size > 0 ) {
175+ this . #replayRecent( environmentId , env , feed ) . catch ( ( error ) => {
176+ logger . error ( "[envChangeRouter] failed to replay buffered records" , {
177+ environmentId,
178+ error,
179+ } ) ;
180+ } ) ;
181+ }
132182 } ) ;
133183
134184 const close = ( ) => {
@@ -141,12 +191,18 @@ export class EnvChangeRouter {
141191 feed . resolve ?.( { reason : "abort" , rows : [ ] } ) ;
142192 feed . resolve = null ;
143193 if ( env . feeds . size === 0 ) {
144- this . #envs. delete ( environmentId ) ;
145- env . unsubscribe ( ) ;
194+ this . #scheduleEnvTeardown( environmentId , env ) ;
146195 }
147196 } ;
148197
149- return { waitForMatch, close } ;
198+ return {
199+ waitForMatch,
200+ close,
201+ // Covered when this instance was already subscribed (and buffering) at the gap's
202+ // start, and the gap fits inside the buffer's window.
203+ gapCovered :
204+ replayWindowMs <= 0 || ( env . subscribedAtMs <= sinceMs && sinceMs >= windowFloorMs ) ,
205+ } ;
150206 }
151207
152208 /** Distinct environments currently routed (for metrics). */
@@ -157,6 +213,11 @@ export class EnvChangeRouter {
157213 #ensureEnv( environmentId : string ) : EnvState {
158214 const existing = this . #envs. get ( environmentId ) ;
159215 if ( existing ) {
216+ // A pending teardown is cancelled by new interest; the buffer survives the gap.
217+ if ( existing . lingerTimer ) {
218+ clearTimeout ( existing . lingerTimer ) ;
219+ existing . lingerTimer = undefined ;
220+ }
160221 return existing ;
161222 }
162223 const env : EnvState = {
@@ -166,9 +227,12 @@ export class EnvChangeRouter {
166227 byTag : new Map ( ) ,
167228 byBatchId : new Map ( ) ,
168229 tagFeeds : new Set ( ) ,
230+ subscribedAtMs : Date . now ( ) ,
231+ recent : new Map ( ) ,
169232 } ;
170233 this . #envs. set ( environmentId , env ) ;
171234 env . unsubscribe = this . options . source . subscribeToEnv ( environmentId , ( records ) => {
235+ this . #bufferRecent( env , records ) ;
172236 // Fire-and-forget; catch hydrate failures here (unhandled rejection exits the process) — waiters time out into the backstop.
173237 this . #onBatch( environmentId , env , records ) . catch ( ( error ) => {
174238 logger . error ( "[envChangeRouter] failed to route a change batch" , {
@@ -180,6 +244,105 @@ export class EnvChangeRouter {
180244 return env ;
181245 }
182246
247+ /** Keep the env subscribed + buffering for a linger after its last feed closes, so a
248+ * client's next poll (or another instance hop landing back here) can replay the gap. */
249+ #scheduleEnvTeardown( environmentId : string , env : EnvState ) {
250+ const lingerMs = this . options . unsubscribeLingerMs ?? DEFAULT_UNSUBSCRIBE_LINGER_MS ;
251+ if ( lingerMs <= 0 ) {
252+ this . #envs. delete ( environmentId ) ;
253+ env . unsubscribe ( ) ;
254+ return ;
255+ }
256+ if ( env . lingerTimer ) {
257+ clearTimeout ( env . lingerTimer ) ;
258+ }
259+ env . lingerTimer = setTimeout ( ( ) => {
260+ if ( env . feeds . size === 0 ) {
261+ this . #envs. delete ( environmentId ) ;
262+ env . unsubscribe ( ) ;
263+ }
264+ } , lingerMs ) ;
265+ env . lingerTimer . unref ?.( ) ;
266+ }
267+
268+ /** Upsert the latest record per run (insertion-ordered) and prune to the window + cap. */
269+ #bufferRecent( env : EnvState , records : ChangeRecord [ ] ) {
270+ const windowMs = this . options . replayWindowMs ?? DEFAULT_REPLAY_WINDOW_MS ;
271+ if ( windowMs <= 0 ) {
272+ return ;
273+ }
274+ const maxRuns = this . options . replayMaxRunsPerEnv ?? DEFAULT_REPLAY_MAX_RUNS_PER_ENV ;
275+ const now = Date . now ( ) ;
276+ for ( const record of records ) {
277+ env . recent . delete ( record . runId ) ;
278+ env . recent . set ( record . runId , { record, receivedAtMs : now } ) ;
279+ }
280+ const cutoff = now - windowMs ;
281+ for ( const [ runId , entry ] of env . recent ) {
282+ if ( entry . receivedAtMs >= cutoff && env . recent . size <= maxRuns ) {
283+ break ;
284+ }
285+ env . recent . delete ( runId ) ;
286+ }
287+ }
288+
289+ /** Whether a buffered record matches a feed's predicate (mirrors #onBatch's routing). */
290+ #recordMatchesFeed( record : ChangeRecord , feed : Feed ) : boolean {
291+ switch ( feed . filter . kind ) {
292+ case "run" :
293+ return record . runId === feed . filter . runId ;
294+ case "batch" :
295+ return record . batchId != null && record . batchId === feed . filter . batchId ;
296+ case "tag" : {
297+ // Partial record (no tags) = hydrate-to-classify candidate, like the live path.
298+ if ( record . tags === undefined ) {
299+ return true ;
300+ }
301+ const tags = feed . filter . tags ;
302+ return record . tags . some ( ( tag ) => tags . includes ( tag ) ) ;
303+ }
304+ }
305+ }
306+
307+ /** Deliver buffered records newer than the feed's cursor through the normal
308+ * hydrate -> serialize -> settle pipeline. Already-seen rows diff to nothing downstream. */
309+ async #replayRecent( environmentId : string , env : EnvState , feed : Feed ) {
310+ const cursor = feed . replayCursorMs ;
311+ feed . replayCursorMs = Date . now ( ) ;
312+
313+ const runIds : string [ ] = [ ] ;
314+ for ( const [ runId , entry ] of env . recent ) {
315+ if ( entry . receivedAtMs > cursor && this . #recordMatchesFeed( entry . record , feed ) ) {
316+ runIds . push ( runId ) ;
317+ }
318+ }
319+ if ( runIds . length === 0 || ! feed . resolve ) {
320+ return ;
321+ }
322+
323+ const hydrated = await this . options . hydrator . hydrateByIds (
324+ environmentId ,
325+ runIds ,
326+ feed . skipColumns
327+ ) ;
328+ this . options . onHydrate ?.( hydrated . length ) ;
329+
330+ const rows : MatchedRow [ ] = [ ] ;
331+ for ( const row of hydrated ) {
332+ if ( feed . filter . kind === "tag" && ! this . #tagRowMatches( row , feed . filter ) ) {
333+ continue ;
334+ }
335+ rows . push ( { row, value : serializeRunRow ( row , feed . skipColumns ) } ) ;
336+ }
337+
338+ if ( rows . length > 0 && feed . resolve ) {
339+ this . options . onReplay ?.( "delivered" ) ;
340+ feed . resolve ( { reason : "notify" , rows } ) ;
341+ } else {
342+ this . options . onReplay ?.( "empty" ) ;
343+ }
344+ }
345+
183346 #indexFeed( env : EnvState , feed : Feed ) {
184347 switch ( feed . filter . kind ) {
185348 case "run" :
0 commit comments