Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/replication-error-recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Runs and sessions replication services now auto-recover from stream errors (e.g. after a Postgres failover) instead of silently leaving replication stopped. Behaviour is configurable per service — reconnect (default), exit so a process supervisor can restart the host, or log.
23 changes: 23 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,16 @@ const EnvironmentSchema = z
RUN_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"),
RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"),
RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING: z.string().default("0"),
// What to do when the runs replication client errors (e.g. after a
// Postgres failover). `reconnect` (default) re-subscribes in-process with
// exponential backoff; `exit` exits the process so a supervisor restarts
// it; `log` preserves the old no-op behaviour. Reconnect tuning is
// shared across both replication services via REPLICATION_RECONNECT_*.
RUN_REPLICATION_ERROR_STRATEGY: z
.enum(["reconnect", "exit", "log"])
.default("reconnect"),
RUN_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000),
RUN_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1),

// Session replication (Postgres → ClickHouse sessions_v1). Shares Redis
// with the runs replicator for leader locking but has its own slot and
Expand Down Expand Up @@ -1362,6 +1372,19 @@ const EnvironmentSchema = z
SESSION_REPLICATION_INSERT_MAX_RETRIES: z.coerce.number().int().default(3),
SESSION_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100),
SESSION_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000),
// Error recovery — same semantics as RUN_REPLICATION_ERROR_STRATEGY.
SESSION_REPLICATION_ERROR_STRATEGY: z
.enum(["reconnect", "exit", "log"])
.default("reconnect"),
SESSION_REPLICATION_EXIT_DELAY_MS: z.coerce.number().int().min(0).default(5_000),
SESSION_REPLICATION_EXIT_CODE: z.coerce.number().int().min(0).max(255).default(1),

// Reconnect tuning shared across both replication services. Only
// applies when error strategy is `reconnect`. Max attempts of 0 means
// unlimited (default).
REPLICATION_RECONNECT_INITIAL_DELAY_MS: z.coerce.number().int().min(0).default(1_000),
REPLICATION_RECONNECT_MAX_DELAY_MS: z.coerce.number().int().min(0).default(60_000),
REPLICATION_RECONNECT_MAX_ATTEMPTS: z.coerce.number().int().min(0).default(0),

// Clickhouse
CLICKHOUSE_URL: z.string(),
Expand Down
207 changes: 207 additions & 0 deletions apps/webapp/app/services/replicationErrorRecovery.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import { Logger } from "@trigger.dev/core/logger";

// When the LogicalReplicationClient's WAL stream errors (e.g. after a
// Postgres failover) it calls stop() on itself and stays stopped. The host
// service has to decide how to recover. Three strategies are available:
//
// - "reconnect" — re-subscribe in-process with exponential backoff. Default;
// works without a process supervisor.
// - "exit" — exit the process so an external supervisor (Docker
// restart=always, ECS, systemd, k8s, ...) replaces it. Recommended when a
// supervisor is present because it gets a clean slate every time.
// - "log" — preserve the historical no-op behaviour. Useful for
// debugging or in test environments where you want to observe the
// silent-death failure mode.
export type ReplicationErrorRecoveryStrategy =
| {
type: "reconnect";
initialDelayMs?: number;
maxDelayMs?: number;
// 0 (or undefined) means retry forever.
maxAttempts?: number;
}
| {
type: "exit";
exitDelayMs?: number;
exitCode?: number;
}
| { type: "log" };

export type ReplicationErrorRecoveryDeps = {
strategy: ReplicationErrorRecoveryStrategy;
logger: Logger;
// Re-subscribe the underlying replication client. Implementations should
// call client.subscribe(...) and resolve once the stream is started.
reconnect: () => Promise<void>;
// True once the host service has begun graceful shutdown — recovery
// suppresses all work in that state.
isShuttingDown: () => boolean;
};

export type ReplicationErrorRecovery = {
// Called from the replication client's "error" event handler.
handle(error: unknown): void;
// Called from the replication client's "start" event handler. Resets the
// reconnect attempt counter so the next failure starts from initialDelayMs.
notifyStreamStarted(): void;
// Called from the replication client's "leaderElection" event handler with
// isLeader=false. Only the reconnect strategy acts on this; exit and log
// strategies treat losing the lock as a normal multi-instance state (an
// "exit" instance would otherwise restart-loop whenever a peer holds it).
notifyLeaderElectionLost(error: unknown): void;
// Cancel any pending reconnect/exit timer. Called from shutdown().
dispose(): void;
};

export function createReplicationErrorRecovery(
deps: ReplicationErrorRecoveryDeps
): ReplicationErrorRecovery {
const { strategy, logger, reconnect, isShuttingDown } = deps;
let attempt = 0;
let pendingReconnect: NodeJS.Timeout | null = null;
let pendingExit: NodeJS.Timeout | null = null;
let exiting = false;

function scheduleReconnect(error: unknown): void {
if (strategy.type !== "reconnect") return;
if (pendingReconnect) return;

attempt += 1;
const maxAttempts = strategy.maxAttempts ?? 0;
if (maxAttempts > 0 && attempt > maxAttempts) {
logger.error("Replication reconnect exceeded maxAttempts; giving up", {
attempt,
maxAttempts,
error,
});
return;
}

const initialDelay = strategy.initialDelayMs ?? 1_000;
const maxDelay = strategy.maxDelayMs ?? 60_000;
const delay = Math.min(initialDelay * Math.pow(2, attempt - 1), maxDelay);

logger.error("Replication stream lost — scheduling reconnect", {
attempt,
delayMs: delay,
error,
});

pendingReconnect = setTimeout(async () => {
pendingReconnect = null;
if (isShuttingDown()) return;

try {
await reconnect();
// Success path is handled by notifyStreamStarted, which fires from
// the replication client's "start" event after the stream is live.
} catch (err) {
// subscribe() can throw without first emitting an "error" event —
// notably when the initial pg client.connect() fails because Postgres
// is still unreachable mid-failover. Schedule the next attempt
// ourselves so recovery doesn't silently stop. If subscribe() did
// also emit an "error" event, handle() will call scheduleReconnect()
// first; the guard on pendingReconnect makes this idempotent.
logger.error("Replication reconnect attempt failed", {
attempt,
error: err,
});
scheduleReconnect(err);
}
Comment thread
ericallam marked this conversation as resolved.
}, delay);
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
}

function scheduleExit(): void {
if (strategy.type !== "exit") return;
if (exiting) return;
exiting = true;

const delay = strategy.exitDelayMs ?? 5_000;
const code = strategy.exitCode ?? 1;

logger.error("Fatal replication error — exiting to let process supervisor restart", {
exitCode: code,
exitDelayMs: delay,
});

pendingExit = setTimeout(() => {
// eslint-disable-next-line no-process-exit
process.exit(code);
}, delay);
// Don't hold a clean shutdown back on this timer.
pendingExit.unref();
}

return {
handle(error) {
if (isShuttingDown()) return;
switch (strategy.type) {
case "log":
return;
case "exit":
return scheduleExit();
case "reconnect":
return scheduleReconnect(error);
}
},
notifyStreamStarted() {
if (attempt > 0) {
logger.info("Replication reconnect succeeded", { attempt });
attempt = 0;
}
},
notifyLeaderElectionLost(error) {
if (isShuttingDown()) return;
// Only the reconnect strategy should react. For exit, losing the
// lock to a peer would otherwise trigger a restart loop. For log,
// we keep historical no-op semantics.
if (strategy.type !== "reconnect") return;
scheduleReconnect(error);
},
dispose() {
if (pendingReconnect) {
clearTimeout(pendingReconnect);
pendingReconnect = null;
}
if (pendingExit) {
clearTimeout(pendingExit);
pendingExit = null;
}
},
};
}

// Shape of the env-driven configuration object the instance bootstrap files
// build from process.env. Kept separate from the strategy union above so the
// instance code can pass a single object regardless of which strategy is set.
export type ReplicationErrorRecoveryEnv = {
strategy: "reconnect" | "exit" | "log";
reconnectInitialDelayMs?: number;
reconnectMaxDelayMs?: number;
reconnectMaxAttempts?: number;
exitDelayMs?: number;
exitCode?: number;
};

export function strategyFromEnv(
env: ReplicationErrorRecoveryEnv
): ReplicationErrorRecoveryStrategy {
switch (env.strategy) {
case "exit":
return {
type: "exit",
exitDelayMs: env.exitDelayMs,
exitCode: env.exitCode,
};
case "log":
return { type: "log" };
case "reconnect":
default:
return {
type: "reconnect",
initialDelayMs: env.reconnectInitialDelayMs,
maxDelayMs: env.reconnectMaxDelayMs,
maxAttempts: env.reconnectMaxAttempts,
};
}
}
9 changes: 9 additions & 0 deletions apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import invariant from "tiny-invariant";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { meter, provider } from "~/v3/tracer.server";
import { strategyFromEnv } from "./replicationErrorRecovery.server";
import { RunsReplicationService } from "./runsReplicationService.server";
import { signalsEmitter } from "./signals.server";

Expand Down Expand Up @@ -69,6 +70,14 @@ function initializeRunsReplicationInstance() {
insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY,
disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1",
disableErrorFingerprinting: env.RUN_REPLICATION_DISABLE_ERROR_FINGERPRINTING === "1",
errorRecovery: strategyFromEnv({
strategy: env.RUN_REPLICATION_ERROR_STRATEGY,
reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS,
reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS,
reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS,
exitDelayMs: env.RUN_REPLICATION_EXIT_DELAY_MS,
exitCode: env.RUN_REPLICATION_EXIT_CODE,
}),
});

if (env.RUN_REPLICATION_ENABLED === "1") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Initial start() failure from pg connection error bypasses error recovery

If start()subscribe() throws due to client.connect() failing (at internal-packages/replication/src/client.ts:284), the error propagates to the instance's .catch() handler at runsReplicationInstance.server.ts:89-93 which only logs. No error event is emitted by the LogicalReplicationClient for connect() failures, so the error recovery never triggers. The service is left dead. This is a pre-existing gap — the PR's error recovery correctly handles mid-stream failures and leader election losses, but the initial connection failure path remains unprotected. The reconnect strategy DOES handle this case during subsequent reconnect attempts (the catch block at replicationErrorRecovery.server.ts:98-110 re-schedules), so only the very first start() call is affected.

(Refers to lines 83-93)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Expand Down
31 changes: 31 additions & 0 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import EventEmitter from "node:events";
import pLimit from "p-limit";
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
import {
createReplicationErrorRecovery,
type ReplicationErrorRecovery,
type ReplicationErrorRecoveryStrategy,
} from "./replicationErrorRecovery.server";

interface TransactionEvent<T = any> {
tag: "insert" | "update" | "delete";
Expand Down Expand Up @@ -73,6 +78,9 @@ export type RunsReplicationServiceOptions = {
insertMaxDelayMs?: number;
disablePayloadInsert?: boolean;
disableErrorFingerprinting?: boolean;
// What to do when the replication client errors (e.g. after a Postgres
// failover). Defaults to in-process reconnect with exponential backoff.
errorRecovery?: ReplicationErrorRecoveryStrategy;
};

type PostgresTaskRun = TaskRun & { masterQueue: string };
Expand Down Expand Up @@ -119,6 +127,7 @@ export class RunsReplicationService {
private _insertStrategy: "insert" | "insert_async";
private _disablePayloadInsert: boolean;
private _disableErrorFingerprinting: boolean;
private _errorRecovery: ReplicationErrorRecovery;

// Metrics
private _replicationLagHistogram: Histogram;
Expand Down Expand Up @@ -250,14 +259,25 @@ export class RunsReplicationService {
}
});

this._errorRecovery = createReplicationErrorRecovery({
strategy: options.errorRecovery ?? { type: "reconnect" },
logger: this.logger,
reconnect: async () => {
await this._replicationClient.subscribe(this._latestCommitEndLsn ?? undefined);
Comment thread
ericallam marked this conversation as resolved.
},
Comment thread
ericallam marked this conversation as resolved.
isShuttingDown: () => this._isShuttingDown || this._isShutDownComplete,
});

this._replicationClient.events.on("error", (error) => {
this.logger.error("Replication client error", {
error,
});
this._errorRecovery.handle(error);
});

this._replicationClient.events.on("start", () => {
this.logger.info("Replication client started");
this._errorRecovery.notifyStreamStarted();
});

this._replicationClient.events.on("acknowledge", ({ lsn }) => {
Expand All @@ -266,6 +286,16 @@ export class RunsReplicationService {

this._replicationClient.events.on("leaderElection", (isLeader) => {
this.logger.info("Leader election", { isLeader });
if (!isLeader) {
// Failed leader election doesn't throw or emit an "error" event —
// subscribe() just emits leaderElection(false), calls stop(), and
// returns. Route through a dedicated handler so only the reconnect
// strategy acts; the exit strategy must not restart-loop when
// another instance holds the lock.
this._errorRecovery.notifyLeaderElectionLost(
new Error("Failed to acquire replication leader lock")
);
}
Comment thread
ericallam marked this conversation as resolved.
});

// Initialize retry configuration
Expand All @@ -278,6 +308,7 @@ export class RunsReplicationService {
if (this._isShuttingDown) return;

this._isShuttingDown = true;
this._errorRecovery.dispose();

this.logger.info("Initiating shutdown of runs replication service");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import invariant from "tiny-invariant";
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { meter, provider } from "~/v3/tracer.server";
import { strategyFromEnv } from "./replicationErrorRecovery.server";
import { SessionsReplicationService } from "./sessionsReplicationService.server";

export const sessionsReplicationInstance = singleton(
Expand Down Expand Up @@ -66,6 +67,14 @@ function initializeSessionsReplicationInstance() {
insertBaseDelayMs: env.SESSION_REPLICATION_INSERT_BASE_DELAY_MS,
insertMaxDelayMs: env.SESSION_REPLICATION_INSERT_MAX_DELAY_MS,
insertStrategy: env.SESSION_REPLICATION_INSERT_STRATEGY,
errorRecovery: strategyFromEnv({
strategy: env.SESSION_REPLICATION_ERROR_STRATEGY,
reconnectInitialDelayMs: env.REPLICATION_RECONNECT_INITIAL_DELAY_MS,
reconnectMaxDelayMs: env.REPLICATION_RECONNECT_MAX_DELAY_MS,
reconnectMaxAttempts: env.REPLICATION_RECONNECT_MAX_ATTEMPTS,
exitDelayMs: env.SESSION_REPLICATION_EXIT_DELAY_MS,
exitCode: env.SESSION_REPLICATION_EXIT_CODE,
}),
});
Comment on lines +70 to 78
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Sessions replication service is created with error recovery but never started in the instance file

The sessionsReplicationInstance.server.ts creates the SessionsReplicationService with full error recovery configuration but never calls service.start() — unlike runsReplicationInstance.server.ts:83-93 which conditionally starts and registers signal handlers. The sessions instance is reference-held in adminWorker.server.ts:12 (void sessionsReplicationInstance) with a comment claiming it "subscribes to the logical replication slot, wires signal handlers" — but neither happens. The error recovery added by this PR would only become effective if/when start() is called externally. This is a pre-existing issue, not introduced by this PR.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


return service;
Expand Down
Loading