From fd94cb6c8a811ea3ee34a6170f9402fe15bd40a0 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Mon, 18 May 2026 23:04:54 +0200 Subject: [PATCH] fix: Event ordering Closes #143 TIL a lot about how events are queued due to "syntactic sugar" that has more effects than I thought! The change is able to get this test passing though! --- src/acp.test.ts | 163 +++++++++++++++++++++ src/acp.ts | 376 +++++++++++++++++++++++++----------------------- 2 files changed, 356 insertions(+), 183 deletions(-) diff --git a/src/acp.test.ts b/src/acp.test.ts index aef9feb..d5039d9 100644 --- a/src/acp.test.ts +++ b/src/acp.test.ts @@ -649,6 +649,147 @@ describe("Connection", () => { expect(events).toEqual(["NewSessionResponse", "SessionNotification"]); }); + it("processes notification after response when both arrive in the same chunk", async () => { + const events: string[] = []; + const { + promise: sessionNotification, + resolve: resolveSessionNotification, + } = Promise.withResolvers(); + + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_: SessionNotification): Promise { + events.push("SessionNotification"); + resolveSessionNotification(); + } + } + + const connection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + const newSessionResponse = connection + .newSession({ cwd: "/test", mcpServers: [] }) + .then((result) => { + events.push("NewSessionResponse"); + return result; + }); + + const requestReader = clientToAgent.readable.getReader(); + const { value: requestChunk } = await requestReader.read(); + requestReader.releaseLock(); + const { id: requestId } = JSON.parse( + new TextDecoder().decode(requestChunk), + ); + + const sessionId = "test-session"; + const writer = agentToClient.writable.getWriter(); + await writer.write( + new TextEncoder().encode( + JSON.stringify({ + jsonrpc: "2.0", + id: requestId, + result: { sessionId }, + }) + + "\n" + + JSON.stringify({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId, + update: { + sessionUpdate: "available_commands_update", + availableCommands: [], + }, + }, + }) + + "\n", + ), + ); + writer.releaseLock(); + + await newSessionResponse; + await sessionNotification; + + expect(events).toEqual(["NewSessionResponse", "SessionNotification"]); + }); + + it("normalizes null results for known empty object responses", async () => { + class TestClient implements Client { + async writeTextFile( + _: WriteTextFileRequest, + ): Promise { + return {}; + } + async readTextFile( + _: ReadTextFileRequest, + ): Promise { + return { content: "test" }; + } + async requestPermission( + _: RequestPermissionRequest, + ): Promise { + return { + outcome: { + outcome: "selected", + optionId: "allow", + }, + }; + } + async sessionUpdate(_: SessionNotification): Promise {} + } + + const connection = new ClientSideConnection( + () => new TestClient(), + ndJsonStream(clientToAgent.writable, agentToClient.readable), + ); + + const authenticateResponse = connection.authenticate({ + methodId: "test", + }); + + const requestReader = clientToAgent.readable.getReader(); + const { value: requestChunk } = await requestReader.read(); + requestReader.releaseLock(); + const { id: requestId } = JSON.parse( + new TextDecoder().decode(requestChunk), + ); + + const writer = agentToClient.writable.getWriter(); + await writer.write( + new TextEncoder().encode( + JSON.stringify({ + jsonrpc: "2.0", + id: requestId, + result: null, + }) + "\n", + ), + ); + writer.releaseLock(); + + await expect(authenticateResponse).resolves.toEqual({}); + }); + it("handles initialize method", async () => { // Create client class TestClient implements Client { @@ -1307,6 +1448,28 @@ describe("Connection", () => { ).rejects.toThrow("ACP connection closed"); }); + it("rejects requests issued after the connection closes with a falsy reason", async () => { + const connection = new ClientSideConnection(() => new MinimalTestClient(), { + readable: new ReadableStream({ + start(controller) { + controller.error(0); + }, + }), + writable: new WritableStream({ + async write() { + // no-op + }, + }), + }); + + await connection.closed; + expect(connection.signal.aborted).toBe(true); + + await expect( + connection.newSession({ cwd: "/test", mcpServers: [] }), + ).rejects.toBe(0); + }); + it("supports removing signal event listeners", async () => { const closeLog: string[] = []; diff --git a/src/acp.ts b/src/acp.ts index 6eb9c5a..9f361cd 100644 --- a/src/acp.ts +++ b/src/acp.ts @@ -20,6 +20,16 @@ type ConnectionPendingResponse = { reject: (error: unknown) => void; }; +function emptyObjectResponse(response: T | null | undefined): T { + return response ?? ({} as T); +} + +function rejectedPromise(error: unknown): Promise { + const promise = Promise.reject(error); + promise.catch(() => {}); + return promise; +} + /** * An agent-side connection to a client. * @@ -281,8 +291,8 @@ export class AgentSideConnection { * * See protocol docs: [Agent Reports Output](https://agentclientprotocol.com/protocol/prompt-turn#3-agent-reports-output) */ - async sessionUpdate(params: schema.SessionNotification): Promise { - return await this.connection.sendNotification( + sessionUpdate(params: schema.SessionNotification): Promise { + return this.connection.sendNotification( schema.CLIENT_METHODS.session_update, params, ); @@ -300,10 +310,10 @@ export class AgentSideConnection { * * See protocol docs: [Requesting Permission](https://agentclientprotocol.com/protocol/tool-calls#requesting-permission) */ - async requestPermission( + requestPermission( params: schema.RequestPermissionRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.CLIENT_METHODS.session_request_permission, params, ); @@ -317,10 +327,10 @@ export class AgentSideConnection { * * See protocol docs: [Client](https://agentclientprotocol.com/protocol/overview#client) */ - async readTextFile( + readTextFile( params: schema.ReadTextFileRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.CLIENT_METHODS.fs_read_text_file, params, ); @@ -334,15 +344,13 @@ export class AgentSideConnection { * * See protocol docs: [Client](https://agentclientprotocol.com/protocol/overview#client) */ - async writeTextFile( + writeTextFile( params: schema.WriteTextFileRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.CLIENT_METHODS.fs_write_text_file, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.WriteTextFileRequest, + schema.WriteTextFileResponse + >(schema.CLIENT_METHODS.fs_write_text_file, params, emptyObjectResponse); } /** @@ -357,18 +365,22 @@ export class AgentSideConnection { * @param params - The terminal creation parameters * @returns A handle to control and monitor the terminal */ - async createTerminal( + createTerminal( params: schema.CreateTerminalRequest, ): Promise { - const response = await this.connection.sendRequest< + return this.connection.sendRequest< schema.CreateTerminalRequest, - schema.CreateTerminalResponse - >(schema.CLIENT_METHODS.terminal_create, params); - - return new TerminalHandle( - response.terminalId, - params.sessionId, - this.connection, + schema.CreateTerminalResponse, + TerminalHandle + >( + schema.CLIENT_METHODS.terminal_create, + params, + (response) => + new TerminalHandle( + response.terminalId, + params.sessionId, + this.connection, + ), ); } @@ -381,10 +393,10 @@ export class AgentSideConnection { * * @experimental */ - async unstable_createElicitation( + unstable_createElicitation( params: schema.CreateElicitationRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.CLIENT_METHODS.elicitation_create, params, ); @@ -399,10 +411,10 @@ export class AgentSideConnection { * * @experimental */ - async unstable_completeElicitation( + unstable_completeElicitation( params: schema.CompleteElicitationNotification, ): Promise { - return await this.connection.sendNotification( + return this.connection.sendNotification( schema.CLIENT_METHODS.elicitation_complete, params, ); @@ -413,11 +425,11 @@ export class AgentSideConnection { * * Allows the Agent to send an arbitrary request that is not part of the ACP spec. */ - async extMethod( + extMethod( method: string, params: Record, ): Promise> { - return await this.connection.sendRequest(method, params); + return this.connection.sendRequest(method, params); } /** @@ -425,11 +437,11 @@ export class AgentSideConnection { * * Allows the Agent to send an arbitrary notification that is not part of the ACP spec. */ - async extNotification( + extNotification( method: string, params: Record, ): Promise { - return await this.connection.sendNotification(method, params); + return this.connection.sendNotification(method, params); } /** @@ -516,21 +528,18 @@ export class TerminalHandle { /** * Gets the current terminal output without waiting for the command to exit. */ - async currentOutput(): Promise { - return await this.connection.sendRequest( - schema.CLIENT_METHODS.terminal_output, - { - sessionId: this.sessionId, - terminalId: this.id, - }, - ); + currentOutput(): Promise { + return this.connection.sendRequest(schema.CLIENT_METHODS.terminal_output, { + sessionId: this.sessionId, + terminalId: this.id, + }); } /** * Waits for the terminal command to complete and returns its exit status. */ - async waitForExit(): Promise { - return await this.connection.sendRequest( + waitForExit(): Promise { + return this.connection.sendRequest( schema.CLIENT_METHODS.terminal_wait_for_exit, { sessionId: this.sessionId, @@ -549,12 +558,17 @@ export class TerminalHandle { * * Useful for implementing timeouts or cancellation. */ - async kill(): Promise { - return ( - (await this.connection.sendRequest(schema.CLIENT_METHODS.terminal_kill, { + kill(): Promise { + return this.connection.sendRequest< + schema.KillTerminalRequest, + schema.KillTerminalResponse + >( + schema.CLIENT_METHODS.terminal_kill, + { sessionId: this.sessionId, terminalId: this.id, - })) ?? {} + }, + emptyObjectResponse, ); } @@ -570,15 +584,17 @@ export class TerminalHandle { * * **Important:** Always call this method when done with the terminal. */ - async release(): Promise { - return ( - (await this.connection.sendRequest( - schema.CLIENT_METHODS.terminal_release, - { - sessionId: this.sessionId, - terminalId: this.id, - }, - )) ?? {} + release(): Promise { + return this.connection.sendRequest< + schema.ReleaseTerminalRequest, + schema.ReleaseTerminalResponse | void + >( + schema.CLIENT_METHODS.terminal_release, + { + sessionId: this.sessionId, + terminalId: this.id, + }, + emptyObjectResponse, ); } @@ -622,7 +638,8 @@ export class ClientSideConnection implements Agent { switch (method) { case schema.CLIENT_METHODS.fs_write_text_file: { const validatedParams = validate.zWriteTextFileRequest.parse(params); - return client.writeTextFile?.(validatedParams); + const result = await client.writeTextFile?.(validatedParams); + return result ?? {}; } case schema.CLIENT_METHODS.fs_read_text_file: { const validatedParams = validate.zReadTextFileRequest.parse(params); @@ -718,13 +735,10 @@ export class ClientSideConnection implements Agent { * * See protocol docs: [Initialization](https://agentclientprotocol.com/protocol/initialization) */ - async initialize( + initialize( params: schema.InitializeRequest, ): Promise { - return await this.connection.sendRequest( - schema.AGENT_METHODS.initialize, - params, - ); + return this.connection.sendRequest(schema.AGENT_METHODS.initialize, params); } /** @@ -744,10 +758,10 @@ export class ClientSideConnection implements Agent { * * See protocol docs: [Session Setup](https://agentclientprotocol.com/protocol/session-setup) */ - async newSession( + newSession( params: schema.NewSessionRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.AGENT_METHODS.session_new, params, ); @@ -768,15 +782,13 @@ export class ClientSideConnection implements Agent { * * See protocol docs: [Loading Sessions](https://agentclientprotocol.com/protocol/session-setup#loading-sessions) */ - async loadSession( + loadSession( params: schema.LoadSessionRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.AGENT_METHODS.session_load, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.LoadSessionRequest, + schema.LoadSessionResponse + >(schema.AGENT_METHODS.session_load, params, emptyObjectResponse); } /** @@ -796,10 +808,10 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_forkSession( + unstable_forkSession( params: schema.ForkSessionRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.AGENT_METHODS.session_fork, params, ); @@ -814,10 +826,10 @@ export class ClientSideConnection implements Agent { * title, and last update time. Supports filtering by working directory, * `additionalDirectories`, and cursor-based pagination. */ - async listSessions( + listSessions( params: schema.ListSessionsRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.AGENT_METHODS.session_list, params, ); @@ -834,15 +846,13 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_deleteSession( + unstable_deleteSession( params: schema.DeleteSessionRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.AGENT_METHODS.session_delete, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.DeleteSessionRequest, + schema.DeleteSessionResponse + >(schema.AGENT_METHODS.session_delete, params, emptyObjectResponse); } /** @@ -856,10 +866,10 @@ export class ClientSideConnection implements Agent { * The request may include `additionalDirectories` to set the complete list of * additional workspace roots for the resumed session. */ - async resumeSession( + resumeSession( params: schema.ResumeSessionRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.AGENT_METHODS.session_resume, params, ); @@ -873,10 +883,10 @@ export class ClientSideConnection implements Agent { * The agent must cancel any ongoing work (as if `session/cancel` was called) * and then free up any resources associated with the session. */ - async closeSession( + closeSession( params: schema.CloseSessionRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.AGENT_METHODS.session_close, params, ); @@ -897,15 +907,13 @@ export class ClientSideConnection implements Agent { * * See protocol docs: [Session Modes](https://agentclientprotocol.com/protocol/session-modes) */ - async setSessionMode( + setSessionMode( params: schema.SetSessionModeRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.AGENT_METHODS.session_set_mode, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.SetSessionModeRequest, + schema.SetSessionModeResponse + >(schema.AGENT_METHODS.session_set_mode, params, emptyObjectResponse); } /** @@ -917,15 +925,13 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_setSessionModel( + unstable_setSessionModel( params: schema.SetSessionModelRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.AGENT_METHODS.session_set_model, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.SetSessionModelRequest, + schema.SetSessionModelResponse + >(schema.AGENT_METHODS.session_set_model, params, emptyObjectResponse); } /** @@ -934,10 +940,10 @@ export class ClientSideConnection implements Agent { * The response contains the full set of configuration options and their current values, * as changing one option may affect the available values or state of other options. */ - async setSessionConfigOption( + setSessionConfigOption( params: schema.SetSessionConfigOptionRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.AGENT_METHODS.session_set_config_option, params, ); @@ -954,15 +960,13 @@ export class ClientSideConnection implements Agent { * * See protocol docs: [Initialization](https://agentclientprotocol.com/protocol/initialization) */ - async authenticate( + authenticate( params: schema.AuthenticateRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.AGENT_METHODS.authenticate, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.AuthenticateRequest, + schema.AuthenticateResponse + >(schema.AGENT_METHODS.authenticate, params, emptyObjectResponse); } /** @@ -976,10 +980,10 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_listProviders( + unstable_listProviders( params: schema.ListProvidersRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.AGENT_METHODS.providers_list, params, ); @@ -996,15 +1000,13 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_setProvider( + unstable_setProvider( params: schema.SetProvidersRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.AGENT_METHODS.providers_set, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.SetProvidersRequest, + schema.SetProvidersResponse + >(schema.AGENT_METHODS.providers_set, params, emptyObjectResponse); } /** @@ -1018,15 +1020,13 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_disableProvider( + unstable_disableProvider( params: schema.DisableProvidersRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.AGENT_METHODS.providers_disable, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.DisableProvidersRequest, + schema.DisableProvidersResponse + >(schema.AGENT_METHODS.providers_disable, params, emptyObjectResponse); } /** @@ -1036,15 +1036,13 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_logout( + unstable_logout( params: schema.LogoutRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.AGENT_METHODS.logout, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.LogoutRequest, + schema.LogoutResponse + >(schema.AGENT_METHODS.logout, params, emptyObjectResponse); } /** @@ -1060,8 +1058,8 @@ export class ClientSideConnection implements Agent { * * See protocol docs: [Prompt Turn](https://agentclientprotocol.com/protocol/prompt-turn) */ - async prompt(params: schema.PromptRequest): Promise { - return await this.connection.sendRequest( + prompt(params: schema.PromptRequest): Promise { + return this.connection.sendRequest( schema.AGENT_METHODS.session_prompt, params, ); @@ -1080,8 +1078,8 @@ export class ClientSideConnection implements Agent { * * See protocol docs: [Cancellation](https://agentclientprotocol.com/protocol/prompt-turn#cancellation) */ - async cancel(params: schema.CancelNotification): Promise { - return await this.connection.sendNotification( + cancel(params: schema.CancelNotification): Promise { + return this.connection.sendNotification( schema.AGENT_METHODS.session_cancel, params, ); @@ -1094,13 +1092,10 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_startNes( + unstable_startNes( params: schema.StartNesRequest, ): Promise { - return await this.connection.sendRequest( - schema.AGENT_METHODS.nes_start, - params, - ); + return this.connection.sendRequest(schema.AGENT_METHODS.nes_start, params); } /** @@ -1110,10 +1105,10 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_suggestNes( + unstable_suggestNes( params: schema.SuggestNesRequest, ): Promise { - return await this.connection.sendRequest( + return this.connection.sendRequest( schema.AGENT_METHODS.nes_suggest, params, ); @@ -1126,15 +1121,13 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_closeNes( + unstable_closeNes( params: schema.CloseNesRequest, ): Promise { - return ( - (await this.connection.sendRequest( - schema.AGENT_METHODS.nes_close, - params, - )) ?? {} - ); + return this.connection.sendRequest< + schema.CloseNesRequest, + schema.CloseNesResponse + >(schema.AGENT_METHODS.nes_close, params, emptyObjectResponse); } /** @@ -1144,10 +1137,10 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_didOpenDocument( + unstable_didOpenDocument( params: schema.DidOpenDocumentNotification, ): Promise { - return await this.connection.sendNotification( + return this.connection.sendNotification( schema.AGENT_METHODS.document_did_open, params, ); @@ -1160,10 +1153,10 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_didChangeDocument( + unstable_didChangeDocument( params: schema.DidChangeDocumentNotification, ): Promise { - return await this.connection.sendNotification( + return this.connection.sendNotification( schema.AGENT_METHODS.document_did_change, params, ); @@ -1176,10 +1169,10 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_didCloseDocument( + unstable_didCloseDocument( params: schema.DidCloseDocumentNotification, ): Promise { - return await this.connection.sendNotification( + return this.connection.sendNotification( schema.AGENT_METHODS.document_did_close, params, ); @@ -1192,10 +1185,10 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_didSaveDocument( + unstable_didSaveDocument( params: schema.DidSaveDocumentNotification, ): Promise { - return await this.connection.sendNotification( + return this.connection.sendNotification( schema.AGENT_METHODS.document_did_save, params, ); @@ -1208,10 +1201,10 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_didFocusDocument( + unstable_didFocusDocument( params: schema.DidFocusDocumentNotification, ): Promise { - return await this.connection.sendNotification( + return this.connection.sendNotification( schema.AGENT_METHODS.document_did_focus, params, ); @@ -1224,10 +1217,8 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_acceptNes( - params: schema.AcceptNesNotification, - ): Promise { - return await this.connection.sendNotification( + unstable_acceptNes(params: schema.AcceptNesNotification): Promise { + return this.connection.sendNotification( schema.AGENT_METHODS.nes_accept, params, ); @@ -1240,10 +1231,8 @@ export class ClientSideConnection implements Agent { * * @experimental */ - async unstable_rejectNes( - params: schema.RejectNesNotification, - ): Promise { - return await this.connection.sendNotification( + unstable_rejectNes(params: schema.RejectNesNotification): Promise { + return this.connection.sendNotification( schema.AGENT_METHODS.nes_reject, params, ); @@ -1254,11 +1243,11 @@ export class ClientSideConnection implements Agent { * * Allows the Client to send an arbitrary request that is not part of the ACP spec. */ - async extMethod( + extMethod( method: string, params: Record, ): Promise> { - return await this.connection.sendRequest(method, params); + return this.connection.sendRequest(method, params); } /** @@ -1266,11 +1255,11 @@ export class ClientSideConnection implements Agent { * * Allows the Client to send an arbitrary notification that is not part of the ACP spec. */ - async extNotification( + extNotification( method: string, params: Record, ): Promise { - return await this.connection.sendNotification(method, params); + return this.connection.sendNotification(method, params); } /** @@ -1569,11 +1558,31 @@ class Connection { } } - sendRequest(method: string, params?: Req): Promise { - this.throwIfClosed(); + sendRequest( + method: string, + params?: Req, + mapResponse?: (response: Resp) => Output, + ): Promise { + if (this.abortController.signal.aborted) { + return rejectedPromise(this.closedReason()); + } + const id = this.nextRequestId++; - const responsePromise = new Promise((resolve, reject) => { - this.pendingResponses.set(id, { resolve, reject }); + const responsePromise = new Promise((resolve, reject) => { + this.pendingResponses.set(id, { + resolve: (response) => { + try { + resolve( + mapResponse + ? mapResponse(response as Resp) + : (response as Output), + ); + } catch (error) { + reject(error); + } + }, + reject, + }); }); // If the transport fails (or receive observes stream closure) during the // `sendMessage` below, close() will reject `responsePromise` @@ -1584,20 +1593,21 @@ class Connection { // rejection is still delivered to the caller via `return responsePromise`. responsePromise.catch(() => {}); void this.sendMessage({ jsonrpc: "2.0", id, method, params }); - return responsePromise as Promise; + return responsePromise; } - async sendNotification(method: string, params?: N): Promise { - this.throwIfClosed(); - await this.sendMessage({ jsonrpc: "2.0", method, params }); - } - - private throwIfClosed() { + sendNotification(method: string, params?: N): Promise { if (this.abortController.signal.aborted) { - throw ( - this.abortController.signal.reason ?? new Error("ACP connection closed") - ); + return rejectedPromise(this.closedReason()); } + + return this.sendMessage({ jsonrpc: "2.0", method, params }); + } + + private closedReason(): unknown { + return ( + this.abortController.signal.reason ?? new Error("ACP connection closed") + ); } private async sendMessage(message: AnyMessage) {