diff --git a/apps/web/src/rpc/wsConnectionState.ts b/apps/web/src/rpc/wsConnectionState.ts index 9e67f461184..959430f066c 100644 --- a/apps/web/src/rpc/wsConnectionState.ts +++ b/apps/web/src/rpc/wsConnectionState.ts @@ -1,5 +1,7 @@ import { useAtomValue } from "@effect/atom-react"; import { DEFAULT_RECONNECT_BACKOFF, getReconnectDelayMs } from "@t3tools/client-runtime"; +import * as Duration from "effect/Duration"; +import * as Option from "effect/Option"; import { Atom } from "effect/unstable/reactivity"; import { appAtomRegistry } from "./atomRegistry"; @@ -7,10 +9,14 @@ import { appAtomRegistry } from "./atomRegistry"; export type WsConnectionUiState = "connected" | "connecting" | "error" | "offline" | "reconnecting"; export type WsReconnectPhase = "attempting" | "exhausted" | "idle" | "waiting"; -export const WS_RECONNECT_INITIAL_DELAY_MS = DEFAULT_RECONNECT_BACKOFF.initialDelayMs; +export const WS_RECONNECT_INITIAL_DELAY_MS = Duration.toMillis( + Duration.fromInputUnsafe(DEFAULT_RECONNECT_BACKOFF.initialDelay), +); export const WS_RECONNECT_BACKOFF_FACTOR = DEFAULT_RECONNECT_BACKOFF.backoffFactor; -export const WS_RECONNECT_MAX_DELAY_MS = DEFAULT_RECONNECT_BACKOFF.maxDelayMs; -export const WS_RECONNECT_MAX_RETRIES = DEFAULT_RECONNECT_BACKOFF.maxRetries!; +export const WS_RECONNECT_MAX_DELAY_MS = Duration.toMillis( + Duration.fromInputUnsafe(DEFAULT_RECONNECT_BACKOFF.maxDelay), +); +export const WS_RECONNECT_MAX_RETRIES = Option.getOrThrow(DEFAULT_RECONNECT_BACKOFF.maxRetries); export const WS_RECONNECT_MAX_ATTEMPTS = WS_RECONNECT_MAX_RETRIES + 1; export interface WsConnectionStatus { diff --git a/packages/client-runtime/src/reconnectBackoff.test.ts b/packages/client-runtime/src/reconnectBackoff.test.ts index fb6bb415217..59184c01814 100644 --- a/packages/client-runtime/src/reconnectBackoff.test.ts +++ b/packages/client-runtime/src/reconnectBackoff.test.ts @@ -1,65 +1,89 @@ -import { describe, expect, it } from "vite-plus/test"; +import { assert, describe, it } from "@effect/vitest"; +import * as Duration from "effect/Duration"; +import * as Option from "effect/Option"; import { DEFAULT_RECONNECT_BACKOFF, + getReconnectDelay, getReconnectDelayMs, type ReconnectBackoffConfig, } from "./reconnectBackoff.ts"; -describe("getReconnectDelayMs", () => { +function assertDelayMs(delay: Option.Option, expectedMs: number) { + if (Option.isNone(delay)) { + assert.fail("Expected reconnect delay to be present"); + } + assert.strictEqual(Duration.toMillis(delay.value), expectedMs); +} + +describe("getReconnectDelay", () => { it("returns exponential delays with default config", () => { - expect(getReconnectDelayMs(0)).toBe(1_000); - expect(getReconnectDelayMs(1)).toBe(2_000); - expect(getReconnectDelayMs(2)).toBe(4_000); - expect(getReconnectDelayMs(3)).toBe(8_000); - expect(getReconnectDelayMs(4)).toBe(16_000); - expect(getReconnectDelayMs(5)).toBe(32_000); - expect(getReconnectDelayMs(6)).toBe(64_000); + assertDelayMs(getReconnectDelay(0), 1_000); + assertDelayMs(getReconnectDelay(1), 2_000); + assertDelayMs(getReconnectDelay(2), 4_000); + assertDelayMs(getReconnectDelay(3), 8_000); + assertDelayMs(getReconnectDelay(4), 16_000); + assertDelayMs(getReconnectDelay(5), 32_000); + assertDelayMs(getReconnectDelay(6), 64_000); }); - it("returns null when retry index exceeds maxRetries", () => { - expect(getReconnectDelayMs(7)).toBeNull(); - expect(getReconnectDelayMs(100)).toBeNull(); + it("returns none when retry index exceeds maxRetries", () => { + assert.strictEqual(Option.isNone(getReconnectDelay(7)), true); + assert.strictEqual(Option.isNone(getReconnectDelay(100)), true); }); - it("returns null for negative indices", () => { - expect(getReconnectDelayMs(-1)).toBeNull(); + it("returns none for negative indices", () => { + assert.strictEqual(Option.isNone(getReconnectDelay(-1)), true); }); - it("returns null for non-integer indices", () => { - expect(getReconnectDelayMs(1.5)).toBeNull(); + it("returns none for non-integer indices", () => { + assert.strictEqual(Option.isNone(getReconnectDelay(1.5)), true); }); - it("caps delay at maxDelayMs", () => { + it("caps delay at maxDelay", () => { const config: ReconnectBackoffConfig = { - initialDelayMs: 10_000, + initialDelay: Duration.seconds(10), backoffFactor: 10, - maxDelayMs: 30_000, - maxRetries: 5, + maxDelay: Duration.seconds(30), + maxRetries: Option.some(5), }; - expect(getReconnectDelayMs(0, config)).toBe(10_000); - expect(getReconnectDelayMs(1, config)).toBe(30_000); // 100_000 capped to 30_000 - expect(getReconnectDelayMs(2, config)).toBe(30_000); // 1_000_000 capped to 30_000 + assertDelayMs(getReconnectDelay(0, config), 10_000); + assertDelayMs(getReconnectDelay(1, config), 30_000); + assertDelayMs(getReconnectDelay(2, config), 30_000); }); - it("supports unlimited retries when maxRetries is null", () => { + it("supports unlimited retries when maxRetries is none", () => { const config: ReconnectBackoffConfig = { ...DEFAULT_RECONNECT_BACKOFF, - maxRetries: null, + maxRetries: Option.none(), }; - expect(getReconnectDelayMs(0, config)).toBe(1_000); - expect(getReconnectDelayMs(50, config)).toBe(64_000); // capped at maxDelayMs - expect(getReconnectDelayMs(100, config)).toBe(64_000); + assertDelayMs(getReconnectDelay(0, config), 1_000); + assertDelayMs(getReconnectDelay(50, config), 64_000); + assertDelayMs(getReconnectDelay(100, config), 64_000); + }); +}); + +describe("getReconnectDelayMs", () => { + it("returns millisecond values for compatibility", () => { + assert.strictEqual(getReconnectDelayMs(0), 1_000); + assert.strictEqual(getReconnectDelayMs(1), 2_000); + assert.strictEqual(getReconnectDelayMs(7), null); }); }); describe("DEFAULT_RECONNECT_BACKOFF", () => { it("has sensible defaults", () => { - expect(DEFAULT_RECONNECT_BACKOFF.initialDelayMs).toBe(1_000); - expect(DEFAULT_RECONNECT_BACKOFF.backoffFactor).toBe(2); - expect(DEFAULT_RECONNECT_BACKOFF.maxDelayMs).toBe(64_000); - expect(DEFAULT_RECONNECT_BACKOFF.maxRetries).toBe(7); + assert.strictEqual( + Duration.toMillis(Duration.fromInputUnsafe(DEFAULT_RECONNECT_BACKOFF.initialDelay)), + 1_000, + ); + assert.strictEqual(DEFAULT_RECONNECT_BACKOFF.backoffFactor, 2); + assert.strictEqual( + Duration.toMillis(Duration.fromInputUnsafe(DEFAULT_RECONNECT_BACKOFF.maxDelay)), + 64_000, + ); + assert.deepStrictEqual(DEFAULT_RECONNECT_BACKOFF.maxRetries, Option.some(7)); }); }); diff --git a/packages/client-runtime/src/reconnectBackoff.ts b/packages/client-runtime/src/reconnectBackoff.ts index 4f7ddd15a52..5a36ef0453f 100644 --- a/packages/client-runtime/src/reconnectBackoff.ts +++ b/packages/client-runtime/src/reconnectBackoff.ts @@ -1,15 +1,18 @@ +import * as Duration from "effect/Duration"; +import * as Option from "effect/Option"; + /** * Configuration for exponential reconnect backoff. */ export interface ReconnectBackoffConfig { - /** Base delay in milliseconds before the first retry. */ - readonly initialDelayMs: number; + /** Base delay before the first retry. */ + readonly initialDelay: Duration.Input; /** Multiplier applied per retry (exponential factor). */ readonly backoffFactor: number; - /** Hard upper bound on delay in milliseconds. */ - readonly maxDelayMs: number; - /** Maximum number of retries (0-based). `null` means unlimited. */ - readonly maxRetries: number | null; + /** Hard upper bound on delay. */ + readonly maxDelay: Duration.Input; + /** Maximum number of retries (0-based). `Option.none()` means unlimited. */ + readonly maxRetries: Option.Option; } /** @@ -18,30 +21,49 @@ export interface ReconnectBackoffConfig { * - 1 s initial delay, doubling each retry, capped at 64 s, up to 7 retries. */ export const DEFAULT_RECONNECT_BACKOFF: ReconnectBackoffConfig = { - initialDelayMs: 1_000, + initialDelay: Duration.seconds(1), backoffFactor: 2, - maxDelayMs: 64_000, - maxRetries: 7, + maxDelay: Duration.seconds(64), + maxRetries: Option.some(7), }; /** * Calculate the reconnect delay for a given retry index using exponential - * backoff. Returns `null` when `retryIndex` exceeds the configured maximum. + * backoff. Returns `Option.none()` when `retryIndex` exceeds the configured + * maximum. */ -export function getReconnectDelayMs( +export function getReconnectDelay( retryIndex: number, config: ReconnectBackoffConfig = DEFAULT_RECONNECT_BACKOFF, -): number | null { +): Option.Option { if (!Number.isInteger(retryIndex) || retryIndex < 0) { - return null; + return Option.none(); } - if (config.maxRetries !== null && retryIndex >= config.maxRetries) { - return null; + if (Option.isSome(config.maxRetries) && retryIndex >= config.maxRetries.value) { + return Option.none(); } - return Math.min( - Math.round(config.initialDelayMs * config.backoffFactor ** retryIndex), - config.maxDelayMs, + const initialDelayMs = Duration.toMillis(Duration.fromInputUnsafe(config.initialDelay)); + const maxDelayMs = Duration.toMillis(Duration.fromInputUnsafe(config.maxDelay)); + + return Option.some( + Duration.millis( + Math.min(Math.round(initialDelayMs * config.backoffFactor ** retryIndex), maxDelayMs), + ), ); } + +/** + * Compatibility wrapper for UI surfaces that still display reconnect delays in + * milliseconds. + */ +export function getReconnectDelayMs( + retryIndex: number, + config: ReconnectBackoffConfig = DEFAULT_RECONNECT_BACKOFF, +): number | null { + return Option.match(getReconnectDelay(retryIndex, config), { + onNone: () => null, + onSome: Duration.toMillis, + }); +} diff --git a/packages/client-runtime/src/wsRpcProtocol.ts b/packages/client-runtime/src/wsRpcProtocol.ts index 869c07f8766..ed419ae32da 100644 --- a/packages/client-runtime/src/wsRpcProtocol.ts +++ b/packages/client-runtime/src/wsRpcProtocol.ts @@ -2,13 +2,15 @@ import { WsRpcGroup } from "@t3tools/contracts"; import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; import * as Schedule from "effect/Schedule"; +import * as Schema from "effect/Schema"; import { RpcClient, RpcSerialization } from "effect/unstable/rpc"; import * as Socket from "effect/unstable/socket/Socket"; import { DEFAULT_RECONNECT_BACKOFF, - getReconnectDelayMs, + getReconnectDelay, type ReconnectBackoffConfig, } from "./reconnectBackoff.ts"; @@ -69,6 +71,10 @@ export type WsRpcProtocolClient = RpcClientFactory extends Effect.Effect ? Client : never; export type WsRpcProtocolSocketUrlProvider = string | (() => Promise); +const decodeRpcPongMessage = Schema.decodeUnknownOption( + Schema.fromJsonString(Schema.TaggedStruct("Pong", {})), +); + function formatSocketErrorMessage(error: unknown): string { if (error instanceof Error && error.message.trim().length > 0) { return error.message; @@ -234,13 +240,8 @@ export function createWsRpcProtocolLayer( { once: true }, ); socket.addEventListener("message", (event) => { - try { - const message = JSON.parse(String(event.data)) as { readonly _tag?: string }; - if (message._tag === "Pong") { - lifecycle.onHeartbeatPong(); - } - } catch { - // Ignore malformed messages here; the Effect RPC parser still owns protocol errors. + if (Option.isSome(decodeRpcPongMessage(String(event.data)))) { + lifecycle.onHeartbeatPong(); } }); socket.addEventListener( @@ -266,10 +267,17 @@ export function createWsRpcProtocolLayer( Layer.provide(trackingWebSocketConstructorLayer), ); - const baseSchedule = - backoff.maxRetries === null ? Schedule.forever : Schedule.recurs(backoff.maxRetries); + const baseSchedule = Option.match(backoff.maxRetries, { + onNone: () => Schedule.forever, + onSome: Schedule.recurs, + }); const retryPolicy = Schedule.addDelay(baseSchedule, (retryCount) => - Effect.succeed(Duration.millis(getReconnectDelayMs(retryCount, backoff) ?? 0)), + Effect.succeed( + Option.match(getReconnectDelay(retryCount, backoff), { + onNone: () => Duration.zero, + onSome: (delay) => delay, + }), + ), ); const protocolLayer = Layer.effect( RpcClient.Protocol,