Skip to content

Commit 2e27a6b

Browse files
committed
Refactor WebSocket state machine to use action-based dispatch
Replace direct state assignments with a pure reducer pattern: - Add StateAction type and reduceState() pure function - Add #dispatch() method that validates transitions - Remove #pendingReconnect flag - reconnect() now restarts immediately
1 parent 93355b9 commit 2e27a6b

3 files changed

Lines changed: 111 additions & 48 deletions

File tree

src/websocket/reconnectingWebSocket.ts

Lines changed: 97 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,73 @@ export enum ConnectionState {
3535
DISPOSED = "DISPOSED",
3636
}
3737

38+
/**
39+
* Actions that trigger state transitions.
40+
*/
41+
type StateAction =
42+
| { readonly type: "CONNECT" }
43+
| { readonly type: "OPEN" }
44+
| { readonly type: "SCHEDULE_RETRY" }
45+
| { readonly type: "DISCONNECT" }
46+
| { readonly type: "DISPOSE" };
47+
48+
/**
49+
* Pure reducer function for state transitions.
50+
*/
51+
function reduceState(
52+
state: ConnectionState,
53+
action: StateAction,
54+
): ConnectionState {
55+
switch (action.type) {
56+
case "CONNECT":
57+
switch (state) {
58+
case ConnectionState.IDLE:
59+
case ConnectionState.CONNECTED:
60+
case ConnectionState.AWAITING_RETRY:
61+
case ConnectionState.DISCONNECTED:
62+
return ConnectionState.CONNECTING;
63+
default:
64+
return state;
65+
}
66+
67+
case "OPEN":
68+
switch (state) {
69+
case ConnectionState.CONNECTING:
70+
return ConnectionState.CONNECTED;
71+
default:
72+
return state;
73+
}
74+
75+
case "SCHEDULE_RETRY":
76+
switch (state) {
77+
case ConnectionState.CONNECTING:
78+
case ConnectionState.CONNECTED:
79+
return ConnectionState.AWAITING_RETRY;
80+
default:
81+
return state;
82+
}
83+
84+
case "DISCONNECT":
85+
switch (state) {
86+
case ConnectionState.IDLE:
87+
case ConnectionState.CONNECTING:
88+
case ConnectionState.CONNECTED:
89+
case ConnectionState.AWAITING_RETRY:
90+
return ConnectionState.DISCONNECTED;
91+
default:
92+
return state;
93+
}
94+
95+
case "DISPOSE":
96+
switch (state) {
97+
case ConnectionState.DISPOSED:
98+
return state;
99+
default:
100+
return ConnectionState.DISPOSED;
101+
}
102+
}
103+
}
104+
38105
export type SocketFactory<TData> = () => Promise<UnidirectionalStream<TData>>;
39106

40107
export interface ReconnectingWebSocketOptions {
@@ -65,10 +132,28 @@ export class ReconnectingWebSocket<
65132
#backoffMs: number;
66133
#reconnectTimeoutId: NodeJS.Timeout | null = null;
67134
#state: ConnectionState = ConnectionState.IDLE;
68-
#pendingReconnect = false; // Queue reconnect during CONNECTING state
69135
#certRefreshAttempted = false; // Tracks if cert refresh was already attempted this connection cycle
70136
readonly #onDispose?: () => void;
71137

138+
/**
139+
* Dispatch an action to transition state. Returns true if transition is allowed.
140+
*/
141+
#dispatch(action: StateAction): boolean {
142+
const newState = reduceState(this.#state, action);
143+
if (newState === this.#state) {
144+
// Allow CONNECT from CONNECTING as a "restart" operation
145+
if (
146+
action.type === "CONNECT" &&
147+
this.#state === ConnectionState.CONNECTING
148+
) {
149+
return true;
150+
}
151+
return false;
152+
}
153+
this.#state = newState;
154+
return true;
155+
}
156+
72157
private constructor(
73158
socketFactory: SocketFactory<TData>,
74159
logger: Logger,
@@ -153,7 +238,6 @@ export class ReconnectingWebSocket<
153238
}
154239

155240
if (this.#state === ConnectionState.DISCONNECTED) {
156-
this.#state = ConnectionState.IDLE;
157241
this.#backoffMs = this.#options.initialBackoffMs;
158242
this.#certRefreshAttempted = false; // User-initiated reconnect, allow retry
159243
}
@@ -163,11 +247,6 @@ export class ReconnectingWebSocket<
163247
this.#reconnectTimeoutId = null;
164248
}
165249

166-
if (this.#state === ConnectionState.CONNECTING) {
167-
this.#pendingReconnect = true;
168-
return;
169-
}
170-
171250
// connect() handles all errors internally
172251
void this.connect();
173252
}
@@ -176,14 +255,9 @@ export class ReconnectingWebSocket<
176255
* Temporarily disconnect the socket. Can be resumed via reconnect().
177256
*/
178257
disconnect(code?: number, reason?: string): void {
179-
if (
180-
this.#state === ConnectionState.DISPOSED ||
181-
this.#state === ConnectionState.DISCONNECTED
182-
) {
258+
if (!this.#dispatch({ type: "DISCONNECT" })) {
183259
return;
184260
}
185-
186-
this.#state = ConnectionState.DISCONNECTED;
187261
this.clearCurrentSocket(code, reason);
188262
}
189263

@@ -205,15 +279,9 @@ export class ReconnectingWebSocket<
205279
}
206280

207281
private async connect(): Promise<void> {
208-
if (
209-
this.#state !== ConnectionState.IDLE &&
210-
this.#state !== ConnectionState.CONNECTED &&
211-
this.#state !== ConnectionState.AWAITING_RETRY
212-
) {
282+
if (!this.#dispatch({ type: "CONNECT" })) {
213283
return;
214284
}
215-
216-
this.#state = ConnectionState.CONNECTING;
217285
try {
218286
// Close any existing socket before creating a new one
219287
if (this.#currentSocket) {
@@ -240,7 +308,9 @@ export class ReconnectingWebSocket<
240308
return;
241309
}
242310

243-
this.#state = ConnectionState.CONNECTED;
311+
if (!this.#dispatch({ type: "OPEN" })) {
312+
return;
313+
}
244314
// Reset backoff on successful connection
245315
this.#backoffMs = this.#options.initialBackoffMs;
246316
this.#certRefreshAttempted = false;
@@ -298,25 +368,14 @@ export class ReconnectingWebSocket<
298368
});
299369
} catch (error) {
300370
await this.handleConnectionError(error);
301-
} finally {
302-
if (this.#pendingReconnect) {
303-
this.#pendingReconnect = false;
304-
this.reconnect();
305-
}
306371
}
307372
}
308373

309374
private scheduleReconnect(): void {
310-
if (
311-
this.#state === ConnectionState.DISPOSED ||
312-
this.#state === ConnectionState.DISCONNECTED ||
313-
this.#state === ConnectionState.AWAITING_RETRY
314-
) {
375+
if (!this.#dispatch({ type: "SCHEDULE_RETRY" })) {
315376
return;
316377
}
317378

318-
this.#state = ConnectionState.AWAITING_RETRY;
319-
320379
const jitter =
321380
this.#backoffMs * this.#options.jitterFactor * (Math.random() * 2 - 1);
322381
const delayMs = Math.max(0, this.#backoffMs + jitter);
@@ -401,6 +460,10 @@ export class ReconnectingWebSocket<
401460
this.#state === ConnectionState.DISPOSED ||
402461
this.#state === ConnectionState.DISCONNECTED
403462
) {
463+
this.#logger.debug(
464+
`Ignoring connection error in ${this.#state} state for ${this.#route}`,
465+
error,
466+
);
404467
return;
405468
}
406469

@@ -442,11 +505,9 @@ export class ReconnectingWebSocket<
442505
}
443506

444507
private dispose(code?: number, reason?: string): void {
445-
if (this.#state === ConnectionState.DISPOSED) {
508+
if (!this.#dispatch({ type: "DISPOSE" })) {
446509
return;
447510
}
448-
449-
this.#state = ConnectionState.DISPOSED;
450511
this.clearCurrentSocket(code, reason);
451512

452513
for (const set of Object.values(this.#eventHandlers)) {
@@ -457,9 +518,6 @@ export class ReconnectingWebSocket<
457518
}
458519

459520
private clearCurrentSocket(code?: number, reason?: string): void {
460-
// Clear pending reconnect to prevent resume
461-
this.#pendingReconnect = false;
462-
463521
if (this.#reconnectTimeoutId !== null) {
464522
clearTimeout(this.#reconnectTimeoutId);
465523
this.#reconnectTimeoutId = null;

test/unit/api/coderApi.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,7 @@ describe("CoderApi", () => {
801801
const sockets = setupAutoOpeningWebSocket();
802802
api = createApi(CODER_URL, AXIOS_TOKEN);
803803
await api.watchAgentMetadata(AGENT_ID);
804+
await tick(); // Wait for open event to fire (socket becomes CONNECTED)
804805

805806
mockConfig.set("coder.insecure", true);
806807
await tick();
@@ -820,6 +821,7 @@ describe("CoderApi", () => {
820821
const sockets = setupAutoOpeningWebSocket();
821822
api = createApi(CODER_URL, AXIOS_TOKEN);
822823
await api.watchAgentMetadata(AGENT_ID);
824+
await tick(); // Wait for open event to fire (socket becomes CONNECTED)
823825

824826
mockConfig.set(key, value);
825827
await tick();
@@ -833,6 +835,7 @@ describe("CoderApi", () => {
833835
const sockets = setupAutoOpeningWebSocket();
834836
api = createApi(CODER_URL, AXIOS_TOKEN);
835837
await api.watchAgentMetadata(AGENT_ID);
838+
await tick(); // Wait for open event to fire (socket becomes CONNECTED)
836839

837840
api.dispose();
838841
mockConfig.set("coder.insecure", true);

test/unit/websocket/reconnectingWebSocket.test.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ describe("ReconnectingWebSocket", () => {
3838

3939
await vi.advanceTimersByTimeAsync(300);
4040
expect(sockets).toHaveLength(2);
41+
sockets[1].fireOpen();
4142
expect(ws.state).toBe(ConnectionState.CONNECTED);
4243

4344
ws.close();
@@ -164,41 +165,41 @@ describe("ReconnectingWebSocket", () => {
164165
ws.close();
165166
});
166167

167-
it("queues reconnect() calls made during connection", async () => {
168+
it("reconnect() during CONNECTING immediately restarts connection", async () => {
168169
const { ws, sockets, completeConnection } =
169170
await createBlockingReconnectingWebSocket();
170171

171172
// Start first reconnect (will block on factory promise)
172173
ws.reconnect();
173174
expect(sockets).toHaveLength(2);
174175
// Call reconnect again while first reconnect is in progress
176+
// This immediately restarts (creates a new socket)
175177
ws.reconnect();
176-
// Still only 2 sockets (queued reconnect hasn't started)
177-
expect(sockets).toHaveLength(2);
178+
expect(sockets).toHaveLength(3);
178179

180+
// Complete the third socket's connection
179181
completeConnection();
180182
await Promise.resolve();
181-
// Now queued reconnect should have executed, creating third socket
182-
expect(sockets).toHaveLength(3);
183+
expect(ws.state).toBe(ConnectionState.CONNECTED);
183184

184185
ws.close();
185186
});
186187

187-
it("suspend() cancels pending reconnect queued during connection", async () => {
188+
it("disconnect() cancels in-progress reconnect and prevents new connections", async () => {
188189
const { ws, sockets, failConnection } =
189190
await createBlockingReconnectingWebSocket();
190191

191192
ws.reconnect();
192193
expect(ws.state).toBe(ConnectionState.CONNECTING);
193-
ws.reconnect(); // queued
194194
expect(sockets).toHaveLength(2);
195195

196-
// This should cancel the queued request
196+
// Disconnect while reconnect is in progress
197197
ws.disconnect();
198198
expect(ws.state).toBe(ConnectionState.DISCONNECTED);
199199
failConnection(new Error("No base URL"));
200200
await Promise.resolve();
201201

202+
// No new socket should be created after disconnect
202203
expect(sockets).toHaveLength(2);
203204
await vi.advanceTimersByTimeAsync(10000);
204205
expect(sockets).toHaveLength(2);
@@ -791,7 +792,8 @@ async function createBlockingReconnectingWebSocket(): Promise<{
791792
completeConnection: () => {
792793
const socket = sockets.at(-1)!;
793794
pendingResolve?.(socket);
794-
socket.fireOpen();
795+
// Fire open after microtask so event listener is attached
796+
queueMicrotask(() => socket.fireOpen());
795797
},
796798
failConnection: (error: Error) => pendingReject?.(error),
797799
};

0 commit comments

Comments
 (0)