diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts index fdc6b5d789..1721669b4d 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts @@ -239,6 +239,8 @@ async function readFirstPromptMessage( const THREAD_ID = ThreadId.makeUnsafe("thread-claude-1"); const RESUME_THREAD_ID = ThreadId.makeUnsafe("thread-claude-resume"); +const INTERRUPTED_TOOL_RESULT_TEXT = + "The user doesn't want to proceed with this tool use. The tool use was rejected (eg. if it was a file edit, the new_string was NOT written to the file). STOP what you are doing and wait for the user to tell you how to proceed."; describe("ClaudeAdapterLive", () => { it.effect("returns validation error for non-claude provider on startSession", () => { @@ -1101,6 +1103,182 @@ describe("ClaudeAdapterLive", () => { ); }); + it.effect("does not surface ede_diagnostic-only Claude results as runtime errors", () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const adapter = yield* ClaudeAdapter; + + const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 6).pipe( + Stream.runCollect, + Effect.forkChild, + ); + + const session = yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + const turn = yield* adapter.sendTurn({ + threadId: session.threadId, + input: "hello", + attachments: [], + }); + + harness.query.emit({ + type: "result", + subtype: "error_during_execution", + is_error: false, + errors: ["[ede_diagnostic] result_type=user last_content_type=n/a stop_reason=tool_use"], + stop_reason: "tool_use", + session_id: "sdk-session-ede-diagnostic", + uuid: "result-ede-diagnostic", + } as unknown as SDKMessage); + + const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber)); + assert.deepEqual( + runtimeEvents.map((event) => event.type), + [ + "session.started", + "session.configured", + "session.state.changed", + "turn.started", + "thread.started", + "turn.completed", + ], + ); + + const turnCompleted = runtimeEvents[runtimeEvents.length - 1]; + assert.equal(turnCompleted?.type, "turn.completed"); + if (turnCompleted?.type === "turn.completed") { + assert.equal(String(turnCompleted.turnId), String(turn.turnId)); + assert.equal(turnCompleted.payload.state, "completed"); + assert.isUndefined(turnCompleted.payload.errorMessage); + assert.equal(turnCompleted.payload.stopReason, "tool_use"); + } + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }); + + it.effect( + "marks rejected tool results after interruptTurn as declined and completes interrupted", + () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const adapter = yield* ClaudeAdapter; + + const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 10).pipe( + Stream.runCollect, + Effect.forkChild, + ); + + const session = yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + const turn = yield* adapter.sendTurn({ + threadId: session.threadId, + input: "hello", + attachments: [], + }); + + harness.query.emit({ + type: "stream_event", + session_id: "sdk-session-interrupted-tool-result", + uuid: "stream-tool-start-interrupted", + parent_tool_use_id: null, + event: { + type: "content_block_start", + index: 1, + content_block: { + type: "tool_use", + id: "tool-bash-1", + name: "Bash", + input: { + command: "ls", + }, + }, + }, + } as unknown as SDKMessage); + + yield* adapter.interruptTurn(session.threadId, turn.turnId); + + harness.query.emit({ + type: "user", + session_id: "sdk-session-interrupted-tool-result", + uuid: "user-tool-result-interrupted", + parent_tool_use_id: null, + message: { + role: "user", + content: [ + { + type: "tool_result", + tool_use_id: "tool-bash-1", + content: INTERRUPTED_TOOL_RESULT_TEXT, + is_error: true, + }, + ], + }, + } as unknown as SDKMessage); + + harness.query.emit({ + type: "result", + subtype: "error_during_execution", + is_error: true, + errors: [INTERRUPTED_TOOL_RESULT_TEXT], + stop_reason: "tool_use", + session_id: "sdk-session-interrupted-tool-result", + uuid: "result-interrupted-tool-result", + } as unknown as SDKMessage); + + const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber)); + assert.deepEqual( + runtimeEvents.map((event) => event.type), + [ + "session.started", + "session.configured", + "session.state.changed", + "turn.started", + "thread.started", + "item.started", + "item.updated", + "content.delta", + "item.completed", + "turn.completed", + ], + ); + + const toolUpdated = runtimeEvents[6]; + assert.equal(toolUpdated?.type, "item.updated"); + if (toolUpdated?.type === "item.updated") { + assert.equal(toolUpdated.payload.status, "declined"); + } + + const toolCompleted = runtimeEvents[8]; + assert.equal(toolCompleted?.type, "item.completed"); + if (toolCompleted?.type === "item.completed") { + assert.equal(toolCompleted.payload.status, "declined"); + } + + const turnCompleted = runtimeEvents[9]; + assert.equal(turnCompleted?.type, "turn.completed"); + if (turnCompleted?.type === "turn.completed") { + assert.equal(String(turnCompleted.turnId), String(turn.turnId)); + assert.equal(turnCompleted.payload.state, "interrupted"); + assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted."); + assert.equal(turnCompleted.payload.stopReason, "tool_use"); + } + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }, + ); + it.effect("closes the session when the Claude stream aborts after a turn starts", () => { const harness = makeHarness(); return Effect.gen(function* () { @@ -1169,6 +1347,145 @@ describe("ClaudeAdapterLive", () => { ); }); + it.effect( + "treats Claude ede_diagnostic tool_use cancellation as interrupted without a runtime error", + () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const services = yield* Effect.services(); + const runFork = Effect.runForkWith(services); + + const adapter = yield* ClaudeAdapter; + const runtimeEvents: Array = []; + + const runtimeEventsFiber = runFork( + Stream.runForEach(adapter.streamEvents, (event) => + Effect.sync(() => { + runtimeEvents.push(event); + }), + ), + ); + + yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + const turn = yield* adapter.sendTurn({ + threadId: THREAD_ID, + input: "hello", + attachments: [], + }); + + harness.query.fail( + new Error("[ede_diagnostic] result_type=user last_content_type=n/a stop_reason=tool_use"), + ); + + yield* Effect.yieldNow; + yield* Effect.yieldNow; + yield* Effect.yieldNow; + runtimeEventsFiber.interruptUnsafe(); + + assert.deepEqual( + runtimeEvents.map((event) => event.type), + [ + "session.started", + "session.configured", + "session.state.changed", + "turn.started", + "turn.completed", + "session.exited", + ], + ); + + const turnCompleted = runtimeEvents[4]; + assert.equal(turnCompleted?.type, "turn.completed"); + if (turnCompleted?.type === "turn.completed") { + assert.equal(String(turnCompleted.turnId), String(turn.turnId)); + assert.equal(turnCompleted.payload.state, "interrupted"); + assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted."); + } + + const sessionExited = runtimeEvents[5]; + assert.equal(sessionExited?.type, "session.exited"); + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }, + ); + + it.effect( + "treats aborted Claude stream failures after interruptTurn as interrupted without a runtime error", + () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const services = yield* Effect.services(); + const runFork = Effect.runForkWith(services); + + const adapter = yield* ClaudeAdapter; + const runtimeEvents: Array = []; + + const runtimeEventsFiber = runFork( + Stream.runForEach(adapter.streamEvents, (event) => + Effect.sync(() => { + runtimeEvents.push(event); + }), + ), + ); + + yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + const turn = yield* adapter.sendTurn({ + threadId: THREAD_ID, + input: "hello", + attachments: [], + }); + + yield* adapter.interruptTurn(THREAD_ID, turn.turnId); + harness.query.fail( + "Error: Request was aborted.\n at makeRequest (/$bunfs/root/src/entrypoints/cli.js:50:3448)\n at processTicksAndRejections (native:7:39)", + ); + + yield* Effect.yieldNow; + yield* Effect.yieldNow; + yield* Effect.yieldNow; + runtimeEventsFiber.interruptUnsafe(); + + assert.deepEqual( + runtimeEvents.map((event) => event.type), + [ + "session.started", + "session.configured", + "session.state.changed", + "turn.started", + "turn.completed", + "session.exited", + ], + ); + + const turnCompleted = runtimeEvents[4]; + assert.equal(turnCompleted?.type, "turn.completed"); + if (turnCompleted?.type === "turn.completed") { + assert.equal(String(turnCompleted.turnId), String(turn.turnId)); + assert.equal(turnCompleted.payload.state, "interrupted"); + assert.equal(turnCompleted.payload.errorMessage, "Claude runtime interrupted."); + } + + const sessionExited = runtimeEvents[5]; + assert.equal(sessionExited?.type, "session.exited"); + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }, + ); + it.effect("stopSession does not throw into the SDK prompt consumer", () => { // The SDK consumes user messages via `for await (... of prompt)`. // Stopping a session must end that loop cleanly — not throw an error. @@ -1228,11 +1545,9 @@ describe("ClaudeAdapterLive", () => { runtimeEventsFiber.interruptUnsafe(); - assert.equal( + assert.isUndefined( promptConsumerError, - undefined, - `Prompt consumer should not receive a thrown error on session stop, ` + - `but got: "${promptConsumerError instanceof Error ? promptConsumerError.message : String(promptConsumerError)}"`, + "Prompt consumer should not receive a thrown error on session stop", ); }).pipe( Effect.provideService(Random.Random, makeDeterministicRandomService()), diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index 94f8ba0c90..836c74ef5a 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -55,10 +55,12 @@ import { FileSystem, Fiber, Layer, + Option, Queue, Random, Ref, Stream, + Schema, } from "effect"; import { resolveAttachmentPath } from "../../attachmentStore.ts"; @@ -145,7 +147,7 @@ interface ClaudeSessionContext { session: ProviderSession; readonly promptQueue: Queue.Queue; readonly query: ClaudeQueryRuntime; - streamFiber: Fiber.Fiber | undefined; + streamFiber: Fiber.Fiber | undefined; readonly startedAt: string; readonly basePermissionMode: PermissionMode | undefined; currentApiModelId: string | undefined; @@ -162,6 +164,7 @@ interface ClaudeSessionContext { lastKnownTokenUsage: ThreadTokenUsageSnapshot | undefined; lastAssistantUuid: string | undefined; lastThreadStartedId: string | undefined; + interruptRequested: boolean; stopped: boolean; } @@ -182,6 +185,23 @@ export interface ClaudeAdapterLiveOptions { readonly nativeEventLogger?: EventNdjsonLogger; } +class ClaudeStreamInterruptedError extends Schema.TaggedErrorClass()( + "ClaudeStreamInterruptedError", + { + message: Schema.String, + cause: Schema.optional(Schema.Defect), + }, +) {} +class ClaudeStreamFailedError extends Schema.TaggedErrorClass()( + "ClaudeStreamFailedError", + { + message: Schema.String, + cause: Schema.optional(Schema.Defect), + }, +) {} +const isClaudeStreamInterruptedError = Schema.is(ClaudeStreamInterruptedError); +type ClaudeStreamError = ClaudeStreamInterruptedError | ClaudeStreamFailedError; + function isUuid(value: string): boolean { return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(value); } @@ -190,29 +210,6 @@ function isSyntheticClaudeThreadId(value: string): boolean { return value.startsWith("claude-thread-"); } -function toMessage(cause: unknown, fallback: string): string { - if (cause instanceof Error && cause.message.length > 0) { - return cause.message; - } - return fallback; -} - -function toError(cause: unknown, fallback: string): Error { - return cause instanceof Error ? cause : new Error(toMessage(cause, fallback)); -} - -function normalizeClaudeStreamMessages(cause: Cause.Cause): ReadonlyArray { - const errors = Cause.prettyErrors(cause) - .map((error) => error.message.trim()) - .filter((message) => message.length > 0); - if (errors.length > 0) { - return errors; - } - - const squashed = toMessage(Cause.squash(cause), "").trim(); - return squashed.length > 0 ? [squashed] : []; -} - function getEffectiveClaudeCodeEffort( effort: ClaudeCodeEffort | null | undefined, ): Exclude | null { @@ -231,20 +228,16 @@ function isClaudeInterruptedMessage(message: string): boolean { ); } -function isClaudeInterruptedCause(cause: Cause.Cause): boolean { - return ( - Cause.hasInterruptsOnly(cause) || - normalizeClaudeStreamMessages(cause).some(isClaudeInterruptedMessage) - ); -} - -function messageFromClaudeStreamCause(cause: Cause.Cause, fallback: string): string { - return normalizeClaudeStreamMessages(cause)[0] ?? fallback; +function isClaudeDiagnosticMessage(message: string): boolean { + return message.toLowerCase().includes("[ede_diagnostic]"); } -function interruptionMessageFromClaudeCause(cause: Cause.Cause): string { - const message = messageFromClaudeStreamCause(cause, "Claude runtime interrupted."); - return isClaudeInterruptedMessage(message) ? "Claude runtime interrupted." : message; +function isClaudeInterruptedToolResultMessage(message: string): boolean { + const normalized = message.toLowerCase(); + return ( + normalized.includes("tool use was rejected") || + normalized.includes("doesn't want to proceed with this tool use") + ); } function resultErrorsText(result: SDKResultMessage): string { @@ -597,7 +590,7 @@ const buildUserMessageEffect = Effect.fn("buildUserMessageEffect")(function* ( new ProviderAdapterRequestError({ provider: PROVIDER, method: "turn/start", - detail: toMessage(cause, "Failed to read attachment file."), + detail: `Failed to read attachment file: ${cause.message}.`, cause, }), ), @@ -626,6 +619,11 @@ function turnStatusFromResult(result: SDKResultMessage): ProviderRuntimeTurnStat if (errors.includes("cancel")) { return "cancelled"; } + + if (result.is_error === false) { + return "completed"; + } + return "failed"; } @@ -813,7 +811,7 @@ function toSessionError( threadId: ThreadId, cause: unknown, ): ProviderAdapterSessionNotFoundError | ProviderAdapterSessionClosedError | undefined { - const normalized = toMessage(cause, "").toLowerCase(); + const normalized = cause instanceof Error ? cause.message.toLowerCase() : ""; if (normalized.includes("unknown session") || normalized.includes("not found")) { return new ProviderAdapterSessionNotFoundError({ provider: PROVIDER, @@ -839,7 +837,7 @@ function toRequestError(threadId: ThreadId, method: string, cause: unknown): Pro return new ProviderAdapterRequestError({ provider: PROVIDER, method, - detail: toMessage(cause, `${method} failed`), + detail: cause instanceof Error ? `${method} failed: ${cause.message}` : `${method} failed`, cause, }); } @@ -1322,6 +1320,42 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( }); }); + const handleClaudeStreamError = ( + context: ClaudeSessionContext, + cause: Cause.Cause, + ) => { + if (context.interruptRequested) { + return context.turnState + ? completeTurn(context, "interrupted", "Claude runtime interrupted.") + : Effect.void; + } + + return Option.match(Cause.findErrorOption(cause), { + onNone: () => + context.turnState + ? completeTurn(context, "failed", "Claude runtime stream failed.") + : Effect.void, + onSome: (streamError) => + isClaudeStreamInterruptedError(streamError) + ? context.turnState + ? completeTurn(context, "interrupted", "Claude runtime interrupted.") + : Effect.void + : Effect.gen(function* () { + const message = streamError.message; + if (isClaudeDiagnosticMessage(message)) { + if (context.turnState) { + yield* completeTurn(context, "interrupted", "Claude runtime interrupted."); + } + return; + } + yield* emitRuntimeError(context, message, Cause.pretty(cause)); + if (context.turnState) { + yield* completeTurn(context, "failed", message); + } + }), + }); + }; + const completeTurn = Effect.fn("completeTurn")(function* ( context: ClaudeSessionContext, status: ProviderRuntimeTurnStatus, @@ -1481,6 +1515,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( const updatedAt = yield* nowIso; context.turnState = undefined; + context.interruptRequested = false; context.session = { ...context.session, status: "ready", @@ -1728,7 +1763,15 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( } const [index, tool] = toolEntry; - const itemStatus = toolResult.isError ? "failed" : "completed"; + const interruptedToolResult = + context.interruptRequested && + toolResult.isError && + isClaudeInterruptedToolResultMessage(toolResult.text); + const itemStatus = interruptedToolResult + ? "declined" + : toolResult.isError + ? "failed" + : "completed"; const toolData = { toolName: tool.toolName, input: tool.input, @@ -1746,7 +1789,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( itemId: asRuntimeItemId(tool.itemId), payload: { itemType: tool.itemType, - status: toolResult.isError ? "failed" : "inProgress", + status: interruptedToolResult ? "declined" : toolResult.isError ? "failed" : "inProgress", title: tool.title, ...(tool.detail ? { detail: tool.detail } : {}), data: toolData, @@ -1906,8 +1949,17 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( return; } - const status = turnStatusFromResult(message); - const errorMessage = message.subtype === "success" ? undefined : message.errors[0]; + const status = + context.interruptRequested && message.subtype !== "success" + ? "interrupted" + : turnStatusFromResult(message); + // Skip [ede_diagnostic] entries (SDK-internal diagnostics, not user-facing errors). + const errorMessage = + status === "interrupted" && context.interruptRequested + ? "Claude runtime interrupted." + : message.subtype === "success" + ? undefined + : message.errors.find((entry: string) => !isClaudeDiagnosticMessage(entry)); if (status === "failed") { yield* emitRuntimeError(context, errorMessage ?? "Claude turn failed."); @@ -2226,39 +2278,40 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( } }); - const runSdkStream = (context: ClaudeSessionContext): Effect.Effect => - Stream.fromAsyncIterable(context.query, (cause) => - toError(cause, "Claude runtime stream failed."), - ).pipe( + const runSdkStream = (context: ClaudeSessionContext) => + Stream.fromAsyncIterable(context.query, (cause) => { + const message = + typeof cause === "string" && cause.trim().length > 0 + ? cause + : cause instanceof Error && cause.message.trim().length > 0 + ? cause.message + : "Claude runtime stream failed."; + return isClaudeInterruptedMessage(message) + ? new ClaudeStreamInterruptedError({ message, cause }) + : new ClaudeStreamFailedError({ message, cause }); + }).pipe( Stream.takeWhile(() => !context.stopped), Stream.runForEach((message) => handleSdkMessage(context, message)), ); const handleStreamExit = Effect.fn("handleStreamExit")(function* ( context: ClaudeSessionContext, - exit: Exit.Exit, + exit: Exit.Exit, ) { if (context.stopped) { return; } - if (Exit.isFailure(exit)) { - if (isClaudeInterruptedCause(exit.cause)) { - if (context.turnState) { - yield* completeTurn( - context, - "interrupted", - interruptionMessageFromClaudeCause(exit.cause), - ); - } - } else { - const message = messageFromClaudeStreamCause(exit.cause, "Claude runtime stream failed."); - yield* emitRuntimeError(context, message, Cause.pretty(exit.cause)); - yield* completeTurn(context, "failed", message); - } - } else if (context.turnState) { - yield* completeTurn(context, "interrupted", "Claude runtime stream ended."); - } + yield* Exit.match(exit, { + onSuccess: () => + context.turnState + ? completeTurn(context, "interrupted", "Claude runtime stream ended.") + : Effect.void, + onFailure: (cause) => + Cause.hasInterruptsOnly(cause) && context.turnState + ? completeTurn(context, "interrupted", "Claude runtime interrupted.") + : handleClaudeStreamError(context, cause), + }); yield* stopSessionInternal(context, { emitExitEvent: true, @@ -2732,7 +2785,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( new ProviderAdapterProcessError({ provider: PROVIDER, threadId, - detail: toMessage(cause, "Failed to start Claude runtime session."), + detail: `Failed to create Claude runtime query: ${String(cause)}.`, cause, }), }); @@ -2773,6 +2826,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( lastKnownTokenUsage: undefined, lastAssistantUuid: resumeState?.resumeSessionAt, lastThreadStartedId: undefined, + interruptRequested: false, stopped: false, }; yield* Ref.set(contextRef, context); @@ -2903,6 +2957,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( const updatedAt = yield* nowIso; context.turnState = turnState; + context.interruptRequested = false; context.session = { ...context.session, status: "running", @@ -2944,9 +2999,13 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( const interruptTurn: ClaudeAdapterShape["interruptTurn"] = Effect.fn("interruptTurn")( function* (threadId, _turnId) { const context = yield* requireSession(threadId); + context.interruptRequested = true; yield* Effect.tryPromise({ try: () => context.query.interrupt(), - catch: (cause) => toRequestError(threadId, "turn/interrupt", cause), + catch: (cause) => { + context.interruptRequested = false; + return toRequestError(threadId, "turn/interrupt", cause); + }, }); }, ); diff --git a/apps/server/src/provider/Layers/ClaudeProvider.ts b/apps/server/src/provider/Layers/ClaudeProvider.ts index 9feec28637..c339ce13b7 100644 --- a/apps/server/src/provider/Layers/ClaudeProvider.ts +++ b/apps/server/src/provider/Layers/ClaudeProvider.ts @@ -495,7 +495,7 @@ export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")( auth: { status: "unknown" }, message: isCommandMissingCause(error) ? "Claude Agent CLI (`claude`) is not installed or not on PATH." - : `Failed to execute Claude Agent CLI health check: ${error instanceof Error ? error.message : String(error)}.`, + : `Failed to execute Claude Agent CLI health check: ${error.message}.`, }, }); } @@ -579,10 +579,7 @@ export const checkClaudeProviderStatus = Effect.fn("checkClaudeProviderStatus")( version: parsedVersion, status: "warning", auth: { status: "unknown" }, - message: - error instanceof Error - ? `Could not verify Claude authentication status: ${error.message}.` - : "Could not verify Claude authentication status.", + message: `Could not verify Claude authentication status: ${error.message}.`, }, }); }