Skip to content

Commit a2eaf3e

Browse files
committed
fix(webapp): drop bogus isStopped check, route leader-lock failure through handle()
The previous post-subscribe() isStopped check was always true on the happy path: subscribe() calls stop() up front (setting _isStopped=true) and only resets the flag inside the replicationStart event, which fires asynchronously after subscribe() returns. So the check threw on every successful reconnect, the catch rescheduled, the next attempt tore down the just-built client, and the cycle continued — replication briefly worked between teardowns, which is why the integration test passed. Replace it with the correct nudge: subscribe to leaderElection and call the recovery handler on isLeader=false. That's the only subscribe() exit path that doesn't either throw or emit an "error" event (the other silent-return paths emit "error" first via createPublication/createSlot failures).
1 parent 1e6617c commit a2eaf3e

2 files changed

Lines changed: 15 additions & 12 deletions

File tree

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -264,13 +264,6 @@ export class RunsReplicationService {
264264
logger: this.logger,
265265
reconnect: async () => {
266266
await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined);
267-
if (this._replicationClient.isStopped) {
268-
// subscribe() can resolve without throwing or emitting an "error"
269-
// event when leader-lock acquisition fails (see LogicalReplication-
270-
// Client.subscribe leader-election branch). Throw here so the
271-
// recovery handler reschedules the next attempt.
272-
throw new Error("Replication client stopped after subscribe()");
273-
}
274267
},
275268
isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete,
276269
});
@@ -293,6 +286,15 @@ export class RunsReplicationService {
293286

294287
this._replicationClient.events.on("leaderElection", (isLeader) => {
295288
this.logger.info("Leader election", { isLeader });
289+
if (!isLeader) {
290+
// Failed leader election doesn't throw or emit an "error" event —
291+
// subscribe() just emits leaderElection(false), calls stop(), and
292+
// returns. Nudge the recovery handler so reconnect doesn't silently
293+
// stall when another instance holds the lock.
294+
this._errorRecovery.handle(
295+
new Error("Failed to acquire replication leader lock")
296+
);
297+
}
296298
});
297299

298300
// Initialize retry configuration

apps/webapp/app/services/sessionsReplicationService.server.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,6 @@ export class SessionsReplicationService {
245245
logger: this.logger,
246246
reconnect: async () => {
247247
await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined);
248-
if (this._replicationClient.isStopped) {
249-
// See RunsReplicationService for the rationale: subscribe() can
250-
// resolve without throwing when leader-lock acquisition fails.
251-
throw new Error("Replication client stopped after subscribe()");
252-
}
253248
},
254249
isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete,
255250
});
@@ -272,6 +267,12 @@ export class SessionsReplicationService {
272267

273268
this._replicationClient.events.on("leaderElection", (isLeader) => {
274269
this.logger.info("Leader election", { isLeader });
270+
if (!isLeader) {
271+
// See RunsReplicationService for the rationale.
272+
this._errorRecovery.handle(
273+
new Error("Failed to acquire replication leader lock")
274+
);
275+
}
275276
});
276277

277278
// Initialize retry configuration

0 commit comments

Comments
 (0)