diff --git a/apps/server/integration/TestProviderAdapter.integration.ts b/apps/server/integration/TestProviderAdapter.integration.ts index f497395307..c47767c9a0 100644 --- a/apps/server/integration/TestProviderAdapter.integration.ts +++ b/apps/server/integration/TestProviderAdapter.integration.ts @@ -472,6 +472,15 @@ export const makeTestProviderAdapterHarness = (options?: MakeTestProviderAdapter sessions.clear(); }); + const compactThread: ProviderAdapterShape["compactThread"] = (threadId) => + readThread(threadId).pipe( + Effect.map((snapshot) => { + const latestTurn = snapshot.turns.at(-1); + const latestItem = latestTurn?.items.at(-1); + return typeof latestItem === "string" ? latestItem : null; + }), + ); + const adapter: ProviderAdapterShape = { provider, capabilities: { @@ -486,6 +495,7 @@ export const makeTestProviderAdapterHarness = (options?: MakeTestProviderAdapter listSessions, hasSession, readThread, + compactThread, rollbackThread, stopAll, streamEvents: Stream.fromQueue(runtimeEvents), diff --git a/apps/server/src/codexAppServerManager.ts b/apps/server/src/codexAppServerManager.ts index 230ba8e364..1f8c743791 100644 --- a/apps/server/src/codexAppServerManager.ts +++ b/apps/server/src/codexAppServerManager.ts @@ -138,6 +138,7 @@ export interface CodexThreadSnapshot { } const CODEX_VERSION_CHECK_TIMEOUT_MS = 4_000; +const CODEX_THREAD_COMPACTION_TIMEOUT_MS = 60_000; const ANSI_ESCAPE_CHAR = String.fromCharCode(27); const ANSI_ESCAPE_REGEX = new RegExp(`${ANSI_ESCAPE_CHAR}\\[[0-9;]*m`, "g"); @@ -795,6 +796,60 @@ export class CodexAppServerManager extends EventEmitter { + const context = this.requireSession(threadId); + const providerThreadId = readResumeThreadId({ + threadId: context.session.threadId, + runtimeMode: context.session.runtimeMode, + resumeCursor: context.session.resumeCursor, + }); + if (!providerThreadId) { + throw new Error("Session is missing a provider resume thread id."); + } + + const compactionWait = { + cleanup: () => undefined, + }; + const waitForCompaction = new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + compactionWait.cleanup(); + reject(new Error("Timed out waiting for Codex thread compaction.")); + }, CODEX_THREAD_COMPACTION_TIMEOUT_MS); + + const onEvent = (event: ProviderEvent) => { + if ( + event.provider !== "codex" || + event.threadId !== context.session.threadId || + event.kind !== "notification" + ) { + return; + } + if (event.method === "thread/compacted") { + compactionWait.cleanup(); + resolve(); + } + }; + + compactionWait.cleanup = () => { + clearTimeout(timeout); + this.off("event", onEvent); + }; + + this.on("event", onEvent); + }); + + try { + await this.sendRequest(context, "thread/compact/start", { + threadId: providerThreadId, + }); + await waitForCompaction; + } catch (error) { + compactionWait.cleanup(); + throw error; + } + return this.readThread(threadId); + } + async rollbackThread(threadId: ThreadId, numTurns: number): Promise { const context = this.requireSession(threadId); const providerThreadId = readResumeThreadId({ diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts index 12e11450dd..7b3a85b151 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.test.ts @@ -95,8 +95,10 @@ function createProviderServiceHarness( respondToRequest: () => unsupported(), respondToUserInput: () => unsupported(), stopSession: () => unsupported(), + stopSessionForProvider: () => unsupported(), listSessions, getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }), + compactThread: () => unsupported(), rollbackConversation, get streamEvents() { return Stream.fromPubSub(runtimeEventPubSub); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index ded64fb9e6..d0156509fe 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -106,7 +106,7 @@ describe("ProviderCommandReactor", () => { const runtimeEventPubSub = Effect.runSync(PubSub.unbounded()); let nextSessionIndex = 1; const runtimeSessions: Array = []; - const modelSelection = input?.threadModelSelection ?? { + const defaultModelSelection = input?.threadModelSelection ?? { provider: "codex", model: "gpt-5-codex", }; @@ -123,8 +123,25 @@ describe("ProviderCommandReactor", () => { typeof input.threadId === "string" ? ThreadId.make(input.threadId) : ThreadId.make(`thread-${sessionIndex}`); + const provider = + typeof input === "object" && + input !== null && + "provider" in input && + (input.provider === "codex" || input.provider === "claudeAgent") + ? input.provider + : defaultModelSelection.provider; + const model = + typeof input === "object" && + input !== null && + "modelSelection" in input && + typeof input.modelSelection === "object" && + input.modelSelection !== null && + "model" in input.modelSelection && + typeof input.modelSelection.model === "string" + ? input.modelSelection.model + : defaultModelSelection.model; const session: ProviderSession = { - provider: modelSelection.provider, + provider, status: "ready" as const, runtimeMode: typeof input === "object" && @@ -133,13 +150,21 @@ describe("ProviderCommandReactor", () => { (input.runtimeMode === "approval-required" || input.runtimeMode === "full-access") ? input.runtimeMode : "full-access", - ...(modelSelection.model !== undefined ? { model: modelSelection.model } : {}), + ...(model ? { model } : {}), threadId, resumeCursor: resumeCursor ?? { opaque: `resume-${sessionIndex}` }, createdAt: now, updatedAt: now, }; - runtimeSessions.push(session); + const existingIndex = runtimeSessions.findIndex( + (runtimeSession) => + runtimeSession.threadId === threadId && runtimeSession.provider === provider, + ); + if (existingIndex >= 0) { + runtimeSessions.splice(existingIndex, 1, session); + } else { + runtimeSessions.push(session); + } return Effect.succeed(session); }); const sendTurn = vi.fn((_: unknown) => @@ -151,6 +176,13 @@ describe("ProviderCommandReactor", () => { const interruptTurn = vi.fn((_: unknown) => Effect.void); const respondToRequest = vi.fn(() => Effect.void); const respondToUserInput = vi.fn(() => Effect.void); + const compactThread = vi.fn((input: unknown) => + Effect.succeed( + typeof input === "object" && input !== null && "threadId" in input + ? `Summary for ${String(input.threadId)}` + : "Summary", + ), + ); const stopSession = vi.fn((input: unknown) => Effect.sync(() => { const threadId = @@ -160,7 +192,18 @@ describe("ProviderCommandReactor", () => { if (!threadId) { return; } - const index = runtimeSessions.findIndex((session) => session.threadId === threadId); + const provider = + typeof input === "object" && + input !== null && + "provider" in input && + (input.provider === "codex" || input.provider === "claudeAgent") + ? input.provider + : undefined; + const index = runtimeSessions.findIndex( + (session) => + session.threadId === threadId && + (provider === undefined || session.provider === provider), + ); if (index >= 0) { runtimeSessions.splice(index, 1); } @@ -201,7 +244,9 @@ describe("ProviderCommandReactor", () => { interruptTurn: interruptTurn as ProviderServiceShape["interruptTurn"], respondToRequest: respondToRequest as ProviderServiceShape["respondToRequest"], respondToUserInput: respondToUserInput as ProviderServiceShape["respondToUserInput"], + compactThread: compactThread as ProviderServiceShape["compactThread"], stopSession: stopSession as ProviderServiceShape["stopSession"], + stopSessionForProvider: stopSession as ProviderServiceShape["stopSessionForProvider"], listSessions: () => Effect.succeed(runtimeSessions), getCapabilities: (_provider) => Effect.succeed({ @@ -250,7 +295,7 @@ describe("ProviderCommandReactor", () => { projectId: asProjectId("project-1"), title: "Provider Project", workspaceRoot: "/tmp/provider-project", - defaultModelSelection: modelSelection, + defaultModelSelection: defaultModelSelection, createdAt: now, }), ); @@ -261,7 +306,7 @@ describe("ProviderCommandReactor", () => { threadId: ThreadId.make("thread-1"), projectId: asProjectId("project-1"), title: "Thread", - modelSelection: modelSelection, + modelSelection: defaultModelSelection, interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, runtimeMode: "approval-required", branch: null, @@ -277,6 +322,7 @@ describe("ProviderCommandReactor", () => { interruptTurn, respondToRequest, respondToUserInput, + compactThread, stopSession, renameBranch, generateBranchName, @@ -716,7 +762,7 @@ describe("ProviderCommandReactor", () => { }); }); - it("rejects a first turn when requested provider conflicts with the thread model", async () => { + it("starts the requested provider on the first turn even when the thread default differs", async () => { const harness = await createHarness({ threadModelSelection: { provider: "codex", model: "gpt-5-codex" }, }); @@ -743,29 +789,28 @@ describe("ProviderCommandReactor", () => { }), ); - await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); - const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); - return ( - thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ?? - false - ); - }); - - expect(harness.startSession).not.toHaveBeenCalled(); - expect(harness.sendTurn).not.toHaveBeenCalled(); + await waitFor(() => harness.startSession.mock.calls.length === 1); + await waitFor(() => harness.sendTurn.mock.calls.length === 1); const readModel = await Effect.runPromise(harness.engine.getReadModel()); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); - expect(thread?.session).toBeNull(); - expect( - thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"), - ).toMatchObject({ - summary: "Provider turn start failed", - payload: { - detail: expect.stringContaining("cannot switch to 'claudeAgent'"), + expect(harness.compactThread).not.toHaveBeenCalled(); + expect(harness.startSession.mock.calls[0]?.[1]).toMatchObject({ + provider: "claudeAgent", + modelSelection: { + provider: "claudeAgent", + model: "claude-opus-4-6", }, }); + expect(harness.sendTurn.mock.calls[0]?.[0]).toMatchObject({ + threadId: ThreadId.make("thread-1"), + input: "hello claude", + modelSelection: { + provider: "claudeAgent", + model: "claude-opus-4-6", + }, + }); + expect(thread?.session?.providerName).toBe("claudeAgent"); }); it("preserves the active session model when in-session model switching is unsupported", async () => { @@ -1066,7 +1111,7 @@ describe("ProviderCommandReactor", () => { }); }); - it("rejects provider changes after a thread is already bound to a session provider", async () => { + it("compacts and hands off context when switching providers mid-thread", async () => { const harness = await createHarness(); const now = new Date().toISOString(); @@ -1111,31 +1156,69 @@ describe("ProviderCommandReactor", () => { }), ); - await waitFor(async () => { - const readModel = await Effect.runPromise(harness.engine.getReadModel()); - const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); - return ( - thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ?? - false - ); - }); + await waitFor(() => harness.startSession.mock.calls.length === 2); + await waitFor(() => harness.sendTurn.mock.calls.length === 2); - expect(harness.startSession.mock.calls.length).toBe(1); - expect(harness.sendTurn.mock.calls.length).toBe(1); - expect(harness.stopSession.mock.calls.length).toBe(0); + expect(harness.compactThread.mock.calls.length).toBe(1); + expect(harness.compactThread.mock.calls[0]?.[0]).toMatchObject({ + threadId: ThreadId.make("thread-1"), + }); + expect(harness.startSession.mock.calls[1]?.[1]).toMatchObject({ + provider: "claudeAgent", + modelSelection: { + provider: "claudeAgent", + model: "claude-opus-4-6", + }, + }); + expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({ + threadId: ThreadId.make("thread-1"), + modelSelection: { + provider: "claudeAgent", + model: "claude-opus-4-6", + }, + }); + const switchedTurnInput = harness.sendTurn.mock.calls[1]?.[0] as { input?: string } | undefined; + expect(switchedTurnInput?.input).toContain(""); + expect(switchedTurnInput?.input).toContain("Summary for thread-1"); + expect(switchedTurnInput?.input).toContain("second"); + expect(harness.stopSession.mock.calls.length).toBe(1); + expect(harness.stopSession.mock.calls[0]?.[0]).toMatchObject({ + threadId: ThreadId.make("thread-1"), + provider: "codex", + }); const readModel = await Effect.runPromise(harness.engine.getReadModel()); const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1")); expect(thread?.session?.threadId).toBe("thread-1"); - expect(thread?.session?.providerName).toBe("codex"); + expect(thread?.session?.providerName).toBe("claudeAgent"); expect(thread?.session?.runtimeMode).toBe("approval-required"); expect( - thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"), - ).toMatchObject({ - payload: { - detail: expect.stringContaining("cannot switch to 'claudeAgent'"), - }, - }); + thread?.activities.filter((activity) => activity.kind.startsWith("provider.handoff.")), + ).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + kind: "provider.handoff.compacting", + summary: "Handing off thread", + turnId: null, + payload: expect.objectContaining({ + sourceProvider: "codex", + targetProvider: "claudeAgent", + }), + }), + expect.objectContaining({ + kind: "provider.handoff.completed", + summary: "Handed off thread", + turnId: null, + payload: expect.objectContaining({ + sourceProvider: "codex", + targetProvider: "claudeAgent", + }), + }), + ]), + ); + expect( + thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed"), + ).toBe(false); }); it("does not stop the active session when restart fails before rebind", async () => { diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index a1a69f0efa..a38c7b57fc 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -18,6 +18,7 @@ import { resolveThreadWorkspaceCwd } from "../../checkpointing/Utils.ts"; import { GitCore } from "../../git/Services/GitCore.ts"; import { increment, orchestrationEventsProcessedTotal } from "../../observability/Metrics.ts"; import { ProviderAdapterRequestError, ProviderServiceError } from "../../provider/Errors.ts"; +import { buildProviderSwitchHandoffInput } from "../../provider/handoffSummary.ts"; import { TextGeneration } from "../../git/Services/TextGeneration.ts"; import { ProviderService } from "../../provider/Services/ProviderService.ts"; import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; @@ -40,6 +41,12 @@ type ProviderIntentEvent = Extract< } >; +interface ProviderSwitchHandoff { + readonly sourceProvider: ProviderKind; + readonly targetProvider: ProviderKind; + readonly compactSummary: string | null; +} + function toNonEmptyProviderInput(value: string | undefined): string | undefined { const normalized = value?.trim(); return normalized && normalized.length > 0 ? normalized : undefined; @@ -200,6 +207,34 @@ const make = Effect.gen(function* () { createdAt: input.createdAt, }); + const appendProviderHandoffActivity = (input: { + readonly threadId: ThreadId; + readonly kind: "provider.handoff.compacting" | "provider.handoff.completed"; + readonly summary: string; + readonly sourceProvider: ProviderKind; + readonly targetProvider: ProviderKind; + readonly turnId: TurnId | null; + readonly createdAt: string; + }) => + orchestrationEngine.dispatch({ + type: "thread.activity.append", + commandId: serverCommandId("provider-handoff-activity"), + threadId: input.threadId, + activity: { + id: EventId.make(crypto.randomUUID()), + tone: "info", + kind: input.kind, + summary: input.summary, + payload: { + sourceProvider: input.sourceProvider, + targetProvider: input.targetProvider, + }, + turnId: input.turnId, + createdAt: input.createdAt, + }, + createdAt: input.createdAt, + }); + const setThreadSession = (input: { readonly threadId: ThreadId; readonly session: OrchestrationSession; @@ -223,6 +258,7 @@ const make = Effect.gen(function* () { createdAt: string, options?: { readonly modelSelection?: ModelSelection; + readonly turnId?: TurnId | null; }, ) { const readModel = yield* orchestrationEngine.getReadModel(); @@ -238,19 +274,8 @@ const make = Effect.gen(function* () { ? thread.session.providerName : undefined; const requestedModelSelection = options?.modelSelection; - const threadProvider: ProviderKind = currentProvider ?? thread.modelSelection.provider; - if ( - requestedModelSelection !== undefined && - requestedModelSelection.provider !== threadProvider - ) { - return yield* new ProviderAdapterRequestError({ - provider: threadProvider, - method: "thread.turn.start", - detail: `Thread '${threadId}' is bound to provider '${threadProvider}' and cannot switch to '${requestedModelSelection.provider}'.`, - }); - } - const preferredProvider: ProviderKind = currentProvider ?? threadProvider; const desiredModelSelection = requestedModelSelection ?? thread.modelSelection; + const desiredProvider: ProviderKind = desiredModelSelection.provider; const effectiveCwd = resolveThreadWorkspaceCwd({ thread, projects: readModel.projects, @@ -267,7 +292,7 @@ const make = Effect.gen(function* () { }) => providerService.startSession(threadId, { threadId, - ...(preferredProvider ? { provider: preferredProvider } : {}), + provider: input?.provider ?? desiredProvider, ...(effectiveCwd ? { cwd: effectiveCwd } : {}), modelSelection: desiredModelSelection, ...(input?.resumeCursor !== undefined ? { resumeCursor: input.resumeCursor } : {}), @@ -290,6 +315,21 @@ const make = Effect.gen(function* () { createdAt, }); + const startAndBindProviderSession = (input?: { + readonly resumeCursor?: unknown; + readonly provider?: ProviderKind; + }) => + startProviderSession(input).pipe( + Effect.tap((session) => + Effect.logInfo("provider command reactor restarted provider session", { + threadId, + provider: session.provider, + runtimeMode: session.runtimeMode, + }), + ), + Effect.tap(bindSessionToThread), + ); + const existingSessionThreadId = thread.session && thread.session.status !== "stopped" ? thread.id : null; if (existingSessionThreadId) { @@ -318,7 +358,10 @@ const make = Effect.gen(function* () { !shouldRestartForModelChange && !shouldRestartForModelSelectionChange ) { - return existingSessionThreadId; + return { + threadId: existingSessionThreadId, + handoff: null, + } as const; } const resumeCursor = @@ -339,23 +382,74 @@ const make = Effect.gen(function* () { shouldRestartForModelSelectionChange, hasResumeCursor: resumeCursor !== undefined, }); - const restartedSession = yield* startProviderSession( - resumeCursor !== undefined ? { resumeCursor } : undefined, + if (providerChanged && currentProvider) { + yield* appendProviderHandoffActivity({ + threadId, + kind: "provider.handoff.compacting", + summary: "Handing off thread", + sourceProvider: currentProvider, + targetProvider: desiredProvider, + turnId: options?.turnId ?? null, + createdAt, + }); + const compactSummary = yield* providerService.compactThread({ + threadId: existingSessionThreadId, + }); + const restartedSession = yield* startAndBindProviderSession({ + provider: desiredProvider, + }); + yield* providerService + .stopSessionForProvider({ + threadId: existingSessionThreadId, + provider: currentProvider, + }) + .pipe( + Effect.catchCause((cause) => + Effect.logWarning( + "provider command reactor failed to stop replaced provider session", + { + threadId, + provider: currentProvider, + cause: Cause.pretty(cause), + }, + ), + ), + ); + yield* appendProviderHandoffActivity({ + threadId, + kind: "provider.handoff.completed", + summary: "Handed off thread", + sourceProvider: currentProvider, + targetProvider: desiredProvider, + turnId: options?.turnId ?? null, + createdAt, + }); + return { + threadId: restartedSession.threadId, + handoff: { + sourceProvider: currentProvider, + targetProvider: desiredProvider, + compactSummary, + } satisfies ProviderSwitchHandoff, + } as const; + } + + const restartedSession = yield* startAndBindProviderSession( + resumeCursor !== undefined ? { resumeCursor, provider: desiredProvider } : undefined, ); - yield* Effect.logInfo("provider command reactor restarted provider session", { - threadId, - previousSessionId: existingSessionThreadId, - restartedSessionThreadId: restartedSession.threadId, - provider: restartedSession.provider, - runtimeMode: restartedSession.runtimeMode, - }); - yield* bindSessionToThread(restartedSession); - return restartedSession.threadId; + return { + threadId: restartedSession.threadId, + handoff: null, + } as const; } - const startedSession = yield* startProviderSession(undefined); - yield* bindSessionToThread(startedSession); - return startedSession.threadId; + const startedSession = yield* startAndBindProviderSession({ + provider: desiredProvider, + }); + return { + threadId: startedSession.threadId, + handoff: null, + } as const; }); const sendTurnForThread = Effect.fn("sendTurnForThread")(function* (input: { @@ -370,15 +464,23 @@ const make = Effect.gen(function* () { if (!thread) { return; } - yield* ensureSessionForThread( - input.threadId, - input.createdAt, - input.modelSelection !== undefined ? { modelSelection: input.modelSelection } : {}, - ); + const ensuredSession = yield* ensureSessionForThread(input.threadId, input.createdAt, { + ...(input.modelSelection !== undefined ? { modelSelection: input.modelSelection } : {}), + turnId: thread.latestTurn?.turnId ?? null, + }); if (input.modelSelection !== undefined) { threadModelSelections.set(input.threadId, input.modelSelection); } - const normalizedInput = toNonEmptyProviderInput(input.messageText); + const normalizedInput = ensuredSession.handoff + ? toNonEmptyProviderInput( + buildProviderSwitchHandoffInput({ + sourceProvider: ensuredSession.handoff.sourceProvider, + targetProvider: ensuredSession.handoff.targetProvider, + compactSummary: ensuredSession.handoff.compactSummary, + userMessage: input.messageText, + }), + ) + : toNonEmptyProviderInput(input.messageText); const normalizedAttachments = input.attachments ?? []; const activeSession = yield* providerService .listSessions() diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 2a330a36b5..2eea2f5d12 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -96,8 +96,10 @@ function createProviderServiceHarness() { respondToRequest: () => unsupported(), respondToUserInput: () => unsupported(), stopSession: () => unsupported(), + stopSessionForProvider: () => unsupported(), listSessions: () => Effect.succeed([...runtimeSessions]), getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }), + compactThread: () => unsupported(), rollbackConversation: () => unsupported(), get streamEvents() { return Stream.fromPubSub(runtimeEventPubSub); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index c1241241cc..2661011f3c 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -882,6 +882,7 @@ const make = Effect.fn("make")(function* () { const now = event.createdAt; const eventTurnId = toTurnId(event.turnId); const activeTurnId = thread.session?.activeTurnId ?? null; + const sessionProviderName = thread.session?.providerName ?? null; const conflictsWithActiveTurn = activeTurnId !== null && eventTurnId !== undefined && !sameId(activeTurnId, eventTurnId); @@ -891,6 +892,17 @@ const make = Effect.fn("make")(function* () { if (!STRICT_PROVIDER_LIFECYCLE_GUARD) { return true; } + if (sessionProviderName !== null && sessionProviderName !== event.provider) { + switch (event.type) { + case "session.started": + case "session.state.changed": + case "session.exited": + case "thread.started": + case "turn.started": + case "turn.completed": + return false; + } + } switch (event.type) { case "session.exited": return true; @@ -1144,7 +1156,8 @@ const make = Effect.fn("make")(function* () { const shouldApplyRuntimeError = !STRICT_PROVIDER_LIFECYCLE_GUARD ? true - : activeTurnId === null || eventTurnId === undefined || sameId(activeTurnId, eventTurnId); + : (sessionProviderName === null || sessionProviderName === event.provider) && + (activeTurnId === null || eventTurnId === undefined || sameId(activeTurnId, eventTurnId)); if (shouldApplyRuntimeError) { yield* orchestrationEngine.dispatch({ diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index fb32da78c5..33e84c6d1c 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -8,6 +8,7 @@ */ import { type CanUseTool, + type HookCallback, query, type Options as ClaudeQueryOptions, type PermissionMode, @@ -77,6 +78,8 @@ import { ClaudeAdapter, type ClaudeAdapterShape } from "../Services/ClaudeAdapte import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts"; const PROVIDER = "claudeAgent" as const; +const CLAUDE_COMPACT_COMMAND = "/compact"; +const CLAUDE_COMPACT_TIMEOUT_MS = 60_000; type ClaudeTextStreamKind = Extract; type ClaudeToolResultStreamKind = Extract< RuntimeContentStreamKind, @@ -130,6 +133,10 @@ interface PendingUserInput { readonly answers: Deferred.Deferred; } +interface PendingCompaction { + readonly summary: Deferred.Deferred; +} + interface ToolInFlight { readonly itemId: string; readonly itemType: CanonicalItemType; @@ -157,6 +164,7 @@ interface ClaudeSessionContext { items: Array; }>; readonly inFlightTools: Map; + pendingCompaction: PendingCompaction | undefined; turnState: ClaudeTurnState | undefined; lastKnownContextWindow: number | undefined; lastKnownTokenUsage: ThreadTokenUsageSnapshot | undefined; @@ -2297,6 +2305,11 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( yield* completeTurn(context, "interrupted", "Session stopped."); } + if (context.pendingCompaction) { + yield* Deferred.succeed(context.pendingCompaction.summary, null).pipe(Effect.ignore); + context.pendingCompaction = undefined; + } + yield* Queue.shutdown(context.promptQueue); const streamFiber = context.streamFiber; @@ -2399,6 +2412,15 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( const inFlightTools = new Map(); const contextRef = yield* Ref.make(undefined); + const postCompactHook: HookCallback = async (hookInput) => { + if (hookInput.hook_event_name === "PostCompact") { + const context = await runPromise(Ref.get(contextRef)); + if (context?.pendingCompaction) { + runFork(Deferred.succeed(context.pendingCompaction.summary, hookInput.compact_summary)); + } + } + return { continue: true }; + }; /** * Handle AskUserQuestion tool calls by emitting a `user-input.requested` @@ -2720,6 +2742,9 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( canUseTool, env: process.env, ...(input.cwd ? { additionalDirectories: [input.cwd] } : {}), + hooks: { + PostCompact: [{ hooks: [postCompactHook] }], + }, }; const queryRuntime = yield* Effect.try({ @@ -2768,6 +2793,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( pendingUserInputs, turns: [], inFlightTools, + pendingCompaction: undefined, turnState: undefined, lastKnownContextWindow: undefined, lastKnownTokenUsage: undefined, @@ -2958,6 +2984,52 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( }, ); + const compactThread: ClaudeAdapterShape["compactThread"] = Effect.fn("compactThread")( + function* (threadId) { + const context = yield* requireSession(threadId); + if (context.pendingCompaction) { + return yield* new ProviderAdapterRequestError({ + provider: PROVIDER, + method: "thread/compact/start", + detail: "Claude thread compaction is already in progress.", + }); + } + + const summary = yield* Deferred.make(); + context.pendingCompaction = { summary }; + + yield* Queue.offer(context.promptQueue, { + type: "message", + message: buildUserMessage({ + sdkContent: [{ type: "text", text: CLAUDE_COMPACT_COMMAND }], + }), + }).pipe(Effect.mapError((cause) => toRequestError(threadId, "thread/compact/start", cause))); + + return yield* Deferred.await(summary).pipe( + Effect.raceFirst( + Effect.sleep(CLAUDE_COMPACT_TIMEOUT_MS).pipe( + Effect.flatMap(() => + Effect.fail( + new ProviderAdapterRequestError({ + provider: PROVIDER, + method: "thread/compact/start", + detail: "Timed out waiting for Claude thread compaction.", + }), + ), + ), + ), + ), + Effect.ensuring( + Effect.sync(() => { + if (context.pendingCompaction?.summary === summary) { + context.pendingCompaction = undefined; + } + }), + ), + ); + }, + ); + const rollbackThread: ClaudeAdapterShape["rollbackThread"] = Effect.fn("rollbackThread")( function* (threadId, numTurns) { const context = yield* requireSession(threadId); @@ -3050,6 +3122,7 @@ const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( sendTurn, interruptTurn, readThread, + compactThread, rollbackThread, respondToRequest, respondToUserInput, diff --git a/apps/server/src/provider/Layers/CodexAdapter.test.ts b/apps/server/src/provider/Layers/CodexAdapter.test.ts index c4ee33b776..acd30c43e4 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.test.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.test.ts @@ -65,6 +65,11 @@ class FakeCodexManager extends CodexAppServerManager { turns: [], })); + public compactThreadImpl = vi.fn(async (_threadId: ThreadId) => ({ + threadId: asThreadId("thread-1"), + turns: [], + })); + public rollbackThreadImpl = vi.fn(async (_threadId: ThreadId, _numTurns: number) => ({ threadId: asThreadId("thread-1"), turns: [], @@ -104,6 +109,10 @@ class FakeCodexManager extends CodexAppServerManager { return this.readThreadImpl(threadId); } + override compactThread(threadId: ThreadId) { + return this.compactThreadImpl(threadId); + } + override rollbackThread(threadId: ThreadId, numTurns: number) { return this.rollbackThreadImpl(threadId, numTurns); } diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index 60de91b79a..de72fe54b7 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -40,6 +40,7 @@ import { import { resolveAttachmentPath } from "../../attachmentStore.ts"; import { ServerConfig } from "../../config.ts"; import { ServerSettingsService } from "../../serverSettings.ts"; +import { extractCompactionSummaryFromSnapshot } from "../handoffSummary.ts"; import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts"; const PROVIDER = "codex" as const; @@ -1514,6 +1515,12 @@ const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* ( })), ); + const compactThread: CodexAdapterShape["compactThread"] = (threadId) => + Effect.tryPromise({ + try: () => manager.compactThread(threadId), + catch: (cause) => toRequestError(threadId, "thread/compact/start", cause), + }).pipe(Effect.map((snapshot) => extractCompactionSummaryFromSnapshot(snapshot))); + const rollbackThread: CodexAdapterShape["rollbackThread"] = (threadId, numTurns) => { if (!Number.isInteger(numTurns) || numTurns < 1) { return Effect.fail( @@ -1619,6 +1626,7 @@ const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* ( sendTurn, interruptTurn, readThread, + compactThread, rollbackThread, respondToRequest, respondToUserInput, diff --git a/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts b/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts index db0293f0fe..000ae6b65e 100644 --- a/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderAdapterRegistry.test.ts @@ -23,6 +23,7 @@ const fakeCodexAdapter: CodexAdapterShape = { listSessions: vi.fn(), hasSession: vi.fn(), readThread: vi.fn(), + compactThread: vi.fn(), rollbackThread: vi.fn(), stopAll: vi.fn(), streamEvents: Stream.empty, @@ -40,6 +41,7 @@ const fakeClaudeAdapter: ClaudeAdapterShape = { listSessions: vi.fn(), hasSession: vi.fn(), readThread: vi.fn(), + compactThread: vi.fn(), rollbackThread: vi.fn(), stopAll: vi.fn(), streamEvents: Stream.empty, diff --git a/apps/server/src/provider/Layers/ProviderService.test.ts b/apps/server/src/provider/Layers/ProviderService.test.ts index 56f9f8d65c..4bb1f6dc89 100644 --- a/apps/server/src/provider/Layers/ProviderService.test.ts +++ b/apps/server/src/provider/Layers/ProviderService.test.ts @@ -160,6 +160,11 @@ function makeFakeCodexAdapter(provider: ProviderKind = "codex") { }), ); + const compactThread = vi.fn( + (threadId: ThreadId): Effect.Effect => + Effect.succeed(`Summary for ${String(threadId)}`), + ); + const rollbackThread = vi.fn( ( threadId: ThreadId, @@ -189,6 +194,7 @@ function makeFakeCodexAdapter(provider: ProviderKind = "codex") { listSessions, hasSession, readThread, + compactThread, rollbackThread, stopAll, get streamEvents() { @@ -224,6 +230,7 @@ function makeFakeCodexAdapter(provider: ProviderKind = "codex") { listSessions, hasSession, readThread, + compactThread, rollbackThread, stopAll, }; @@ -694,6 +701,61 @@ routing.layer("ProviderServiceLive routing", (it) => { }), ); + it.effect("routes native thread compaction through the bound adapter", () => + Effect.gen(function* () { + const provider = yield* ProviderService; + + const session = yield* provider.startSession(asThreadId("thread-compact"), { + provider: "codex", + threadId: asThreadId("thread-compact"), + cwd: "/tmp/project-compact", + runtimeMode: "full-access", + }); + + routing.codex.compactThread.mockClear(); + const summary = yield* provider.compactThread({ + threadId: session.threadId, + }); + + assert.equal(summary, `Summary for ${String(session.threadId)}`); + assert.deepEqual(routing.codex.compactThread.mock.calls, [[session.threadId]]); + }), + ); + + it.effect("stops a specific provider session without clearing a different active binding", () => + Effect.gen(function* () { + const provider = yield* ProviderService; + const directory = yield* ProviderSessionDirectory; + + const claudeSession = yield* provider.startSession(asThreadId("thread-switch"), { + provider: "claudeAgent", + threadId: asThreadId("thread-switch"), + cwd: "/tmp/project-switch", + runtimeMode: "full-access", + }); + + yield* routing.codex.startSession({ + provider: "codex", + threadId: claudeSession.threadId, + cwd: "/tmp/project-switch", + runtimeMode: "full-access", + }); + routing.codex.stopSession.mockClear(); + + yield* provider.stopSessionForProvider({ + threadId: claudeSession.threadId, + provider: "codex", + }); + + assert.deepEqual(routing.codex.stopSession.mock.calls, [[claudeSession.threadId]]); + const binding = yield* directory.getBinding(claudeSession.threadId); + assert.equal(Option.isSome(binding), true); + if (Option.isSome(binding)) { + assert.equal(binding.value.provider, "claudeAgent"); + } + }), + ); + it.effect("recovers stale claudeAgent sessions for sendTurn using persisted cwd", () => Effect.gen(function* () { const provider = yield* ProviderService; diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index 85fe9fbc32..917c3bcafa 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -13,6 +13,7 @@ import { ModelSelection, NonNegativeInt, ThreadId, + ProviderKind, ProviderInterruptTurnInput, ProviderRespondToRequestInput, ProviderRespondToUserInputInput, @@ -54,6 +55,13 @@ const ProviderRollbackConversationInput = Schema.Struct({ threadId: ThreadId, numTurns: NonNegativeInt, }); +const ProviderCompactThreadInput = Schema.Struct({ + threadId: ThreadId, +}); +const ProviderStopSessionForProviderInput = Schema.Struct({ + threadId: ThreadId, + provider: ProviderKind, +}); function toValidationError( operation: string, @@ -297,6 +305,23 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( return { adapter: recovered.adapter, threadId: input.threadId, isActive: true } as const; }); + const stopSessionForProviderInternal = Effect.fn("stopSessionForProviderInternal")( + function* (input: { readonly threadId: ThreadId; readonly provider: ProviderKind }) { + const adapter = yield* registry.getByProvider(input.provider); + const hasSession = yield* adapter.hasSession(input.threadId); + if (hasSession) { + yield* adapter.stopSession(input.threadId); + } + const binding = Option.getOrUndefined(yield* directory.getBinding(input.threadId)); + if (binding?.provider === input.provider) { + yield* directory.remove(input.threadId); + } + yield* analytics.record("provider.session.stopped", { + provider: input.provider, + }); + }, + ); + const startSession: ProviderServiceShape["startSession"] = Effect.fn("startSession")( function* (threadId, rawInput) { const parsed = yield* decodeInputOrValidationError({ @@ -579,11 +604,8 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( "provider.kind": routed.adapter.provider, "provider.thread_id": input.threadId, }); - if (routed.isActive) { - yield* routed.adapter.stopSession(routed.threadId); - } - yield* directory.remove(input.threadId); - yield* analytics.record("provider.session.stopped", { + yield* stopSessionForProviderInternal({ + threadId: routed.threadId, provider: routed.adapter.provider, }); }).pipe( @@ -598,6 +620,17 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( }, ); + const stopSessionForProvider: ProviderServiceShape["stopSessionForProvider"] = Effect.fn( + "stopSessionForProvider", + )(function* (rawInput) { + const input = yield* decodeInputOrValidationError({ + operation: "ProviderService.stopSessionForProvider", + schema: ProviderStopSessionForProviderInput, + payload: rawInput, + }); + return yield* stopSessionForProviderInternal(input); + }); + const listSessions: ProviderServiceShape["listSessions"] = Effect.fn("listSessions")( function* () { const sessionsByProvider = yield* Effect.forEach(adapters, (adapter) => @@ -649,6 +682,22 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( const getCapabilities: ProviderServiceShape["getCapabilities"] = (provider) => registry.getByProvider(provider).pipe(Effect.map((adapter) => adapter.capabilities)); + const compactThread: ProviderServiceShape["compactThread"] = Effect.fn("compactThread")( + function* (rawInput) { + const input = yield* decodeInputOrValidationError({ + operation: "ProviderService.compactThread", + schema: ProviderCompactThreadInput, + payload: rawInput, + }); + const routed = yield* resolveRoutableSession({ + threadId: input.threadId, + operation: "ProviderService.compactThread", + allowRecovery: true, + }); + return yield* routed.adapter.compactThread(routed.threadId); + }, + ); + const rollbackConversation: ProviderServiceShape["rollbackConversation"] = Effect.fn( "rollbackConversation", )(function* (rawInput) { @@ -737,8 +786,10 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( respondToRequest, respondToUserInput, stopSession, + stopSessionForProvider, listSessions, getCapabilities, + compactThread, rollbackConversation, // Each access creates a fresh PubSub subscription so that multiple // consumers (ProviderRuntimeIngestion, CheckpointReactor, etc.) each diff --git a/apps/server/src/provider/Services/ProviderAdapter.ts b/apps/server/src/provider/Services/ProviderAdapter.ts index 38a05f7574..8aad260699 100644 --- a/apps/server/src/provider/Services/ProviderAdapter.ts +++ b/apps/server/src/provider/Services/ProviderAdapter.ts @@ -106,6 +106,11 @@ export interface ProviderAdapterShape { */ readonly readThread: (threadId: ThreadId) => Effect.Effect; + /** + * Trigger native thread compaction and return the provider's compacted summary when available. + */ + readonly compactThread: (threadId: ThreadId) => Effect.Effect; + /** * Roll back a provider thread by N turns. */ diff --git a/apps/server/src/provider/Services/ProviderService.ts b/apps/server/src/provider/Services/ProviderService.ts index 1e461fcd1c..2969522b3d 100644 --- a/apps/server/src/provider/Services/ProviderService.ts +++ b/apps/server/src/provider/Services/ProviderService.ts @@ -77,6 +77,14 @@ export interface ProviderServiceShape { input: ProviderStopSessionInput, ) => Effect.Effect; + /** + * Stop a specific provider session for a thread without routing through the active binding. + */ + readonly stopSessionForProvider: (input: { + readonly threadId: ThreadId; + readonly provider: ProviderKind; + }) => Effect.Effect; + /** * List active provider sessions. * @@ -91,6 +99,13 @@ export interface ProviderServiceShape { provider: ProviderKind, ) => Effect.Effect; + /** + * Trigger native compaction for the currently bound provider thread. + */ + readonly compactThread: (input: { + readonly threadId: ThreadId; + }) => Effect.Effect; + /** * Roll back provider conversation state by a number of turns. */ diff --git a/apps/server/src/provider/handoffSummary.ts b/apps/server/src/provider/handoffSummary.ts new file mode 100644 index 0000000000..38fe8f4f95 --- /dev/null +++ b/apps/server/src/provider/handoffSummary.ts @@ -0,0 +1,133 @@ +import type { ProviderKind } from "@t3tools/contracts"; + +const COMPACTION_TYPE_SNIPPETS = ["compact", "summary"]; +const PREFERRED_TEXT_KEYS = [ + "compact_summary", + "compactSummary", + "summary", + "text", + "content", + "message", + "body", +] as const; +const IGNORED_OBJECT_KEYS = new Set([ + "id", + "uuid", + "type", + "kind", + "status", + "createdAt", + "updatedAt", + "timestamp", +]); + +function normalizeText(value: string): string { + return value.replace(/\r\n/g, "\n").trim(); +} + +function normalizeType(value: unknown): string { + if (typeof value !== "string") { + return ""; + } + return value + .replace(/([a-z0-9])([A-Z])/g, "$1 $2") + .replace(/[._/-]/g, " ") + .replace(/\s+/g, " ") + .trim() + .toLowerCase(); +} + +function extractStrings(value: unknown, depth = 0): string[] { + if (depth > 5 || value === null || value === undefined) { + return []; + } + if (typeof value === "string") { + const normalized = normalizeText(value); + return normalized.length > 0 ? [normalized] : []; + } + if (Array.isArray(value)) { + return value.flatMap((entry) => extractStrings(entry, depth + 1)); + } + if (typeof value !== "object") { + return []; + } + + const record = value as Record; + const preferred = PREFERRED_TEXT_KEYS.flatMap((key) => extractStrings(record[key], depth + 1)); + if (preferred.length > 0) { + return preferred; + } + + return Object.entries(record).flatMap(([key, entry]) => + IGNORED_OBJECT_KEYS.has(key) ? [] : extractStrings(entry, depth + 1), + ); +} + +function looksLikeCompactionItem(value: unknown): boolean { + if (!value || typeof value !== "object") { + return false; + } + const record = value as Record; + const type = normalizeType(record.type ?? record.kind); + if (COMPACTION_TYPE_SNIPPETS.some((snippet) => type.includes(snippet))) { + return true; + } + return ( + typeof record.compact_summary === "string" || + typeof record.compactSummary === "string" || + typeof record.summary === "string" + ); +} + +export function extractCompactionSummaryFromSnapshot(snapshot: { + readonly turns: ReadonlyArray<{ + readonly items: ReadonlyArray; + }>; +}): string | null { + const segments: string[] = []; + for (const turn of snapshot.turns.toReversed()) { + for (const item of turn.items.toReversed()) { + if (!looksLikeCompactionItem(item)) { + continue; + } + for (const segment of extractStrings(item)) { + if (!segments.includes(segment)) { + segments.push(segment); + } + } + if (segments.length > 0) { + const summary = segments.join("\n\n").trim(); + return summary.length > 0 ? summary : null; + } + } + } + return null; +} + +function providerLabel(provider: ProviderKind): string { + return provider === "claudeAgent" ? "Claude" : "Codex"; +} + +export function buildProviderSwitchHandoffInput(input: { + readonly sourceProvider: ProviderKind; + readonly targetProvider: ProviderKind; + readonly compactSummary: string | null; + readonly userMessage: string; +}): string { + const sections = [ + "", + `The conversation is continuing after a provider switch from ${providerLabel(input.sourceProvider)} to ${providerLabel(input.targetProvider)}.`, + input.compactSummary + ? `Native compaction summary from ${providerLabel(input.sourceProvider)}:\n${normalizeText(input.compactSummary)}` + : `${providerLabel(input.sourceProvider)} compacted the thread before the switch, but did not expose a textual summary.`, + "Carry forward the decisions and unfinished work in that summary when answering the user's next message.", + "", + ]; + + const trimmedUserMessage = input.userMessage.trim(); + if (trimmedUserMessage.length > 0) { + sections.push("", trimmedUserMessage); + } + + return sections.join("\n"); +} diff --git a/apps/web/src/components/ChatView.logic.test.ts b/apps/web/src/components/ChatView.logic.test.ts index 1b96265a40..5e27cd6b6b 100644 --- a/apps/web/src/components/ChatView.logic.test.ts +++ b/apps/web/src/components/ChatView.logic.test.ts @@ -9,6 +9,7 @@ import { buildExpiredTerminalContextToastCopy, createLocalDispatchSnapshot, deriveComposerSendState, + deriveLockedProvider, hasServerAcknowledgedLocalDispatch, reconcileMountedTerminalThreadIds, shouldWriteThreadErrorToCurrentServerThread, @@ -193,6 +194,68 @@ describe("shouldWriteThreadErrorToCurrentServerThread", () => { }); }); +describe("deriveLockedProvider", () => { + it("unlocks provider selection after a started thread is idle", () => { + expect( + deriveLockedProvider({ + thread: { + ...makeThread(), + session: { + provider: "codex", + status: "ready", + createdAt: "2026-03-29T00:00:00.000Z", + updatedAt: "2026-03-29T00:00:10.000Z", + orchestrationStatus: "idle", + }, + }, + threadProvider: "codex", + }), + ).toBeNull(); + }); + + it("keeps provider selection locked while the active session is running", () => { + expect( + deriveLockedProvider({ + thread: { + ...makeThread(), + session: { + provider: "codex", + status: "running", + activeTurnId: TurnId.make("turn-running"), + createdAt: "2026-03-29T00:00:00.000Z", + updatedAt: "2026-03-29T00:00:10.000Z", + orchestrationStatus: "running", + }, + }, + threadProvider: "codex", + }), + ).toBe("codex"); + }); + + it("locks to the target provider once a running handoff updates the thread selection", () => { + expect( + deriveLockedProvider({ + thread: { + ...makeThread(), + modelSelection: { + provider: "claudeAgent", + model: "claude-opus-4-6", + }, + session: { + provider: "codex", + status: "running", + activeTurnId: TurnId.make("turn-running"), + createdAt: "2026-03-29T00:00:00.000Z", + updatedAt: "2026-03-29T00:00:10.000Z", + orchestrationStatus: "running", + }, + }, + threadProvider: "claudeAgent", + }), + ).toBe("claudeAgent"); + }); +}); + const makeThread = (input?: { id?: ThreadId; latestTurn?: { diff --git a/apps/web/src/components/ChatView.logic.ts b/apps/web/src/components/ChatView.logic.ts index a753a71b39..7c39ef1206 100644 --- a/apps/web/src/components/ChatView.logic.ts +++ b/apps/web/src/components/ChatView.logic.ts @@ -228,13 +228,21 @@ export function threadHasStarted(thread: Thread | null | undefined): boolean { export function deriveLockedProvider(input: { thread: Thread | null | undefined; - selectedProvider: ProviderKind | null; threadProvider: ProviderKind | null; }): ProviderKind | null { if (!threadHasStarted(input.thread)) { return null; } - return input.thread?.session?.provider ?? input.threadProvider ?? input.selectedProvider ?? null; + const session = input.thread?.session; + if (!session) { + return null; + } + if (session.status !== "running" && session.status !== "connecting") { + return null; + } + return input.threadProvider && input.threadProvider !== session.provider + ? input.threadProvider + : session.provider; } export async function waitForStartedServerThread( diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 44ad594ff9..4411ee1d9e 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -1024,7 +1024,6 @@ export default function ChatView(props: ChatViewProps) { activeThread?.modelSelection.provider ?? activeProject?.defaultModelSelection?.provider ?? null; const lockedProvider = deriveLockedProvider({ thread: activeThread, - selectedProvider: selectedProviderByThreadId, threadProvider, }); const primaryServerConfig = useServerConfig(); diff --git a/apps/web/src/components/chat/MessagesTimeline.test.tsx b/apps/web/src/components/chat/MessagesTimeline.test.tsx index c4bca4b4f0..8affaacfa6 100644 --- a/apps/web/src/components/chat/MessagesTimeline.test.tsx +++ b/apps/web/src/components/chat/MessagesTimeline.test.tsx @@ -148,4 +148,57 @@ describe("MessagesTimeline", () => { expect(markup).toContain("Context compacted"); expect(markup).toContain("Work log"); }); + + it("renders provider handoff entries as transition banners", async () => { + const { MessagesTimeline } = await import("./MessagesTimeline"); + const markup = renderToStaticMarkup( + {}} + changedFilesExpandedByTurnId={{}} + onSetChangedFilesExpanded={() => {}} + onOpenTurnDiff={() => {}} + revertTurnCountByUserMessageId={new Map()} + onRevertUserMessage={() => {}} + isRevertingCheckpoint={false} + onImageExpand={() => {}} + activeThreadEnvironmentId={ACTIVE_THREAD_ENVIRONMENT_ID} + markdownCwd={undefined} + resolvedTheme="light" + timestampFormat="locale" + workspaceRoot={undefined} + />, + ); + + expect(markup).toContain("Handing off thread"); + expect(markup).toContain("Codex to Claude"); + expect(markup).not.toContain("Work log"); + }); }); diff --git a/apps/web/src/components/chat/MessagesTimeline.tsx b/apps/web/src/components/chat/MessagesTimeline.tsx index 085131eaa4..fa8d67e7d4 100644 --- a/apps/web/src/components/chat/MessagesTimeline.tsx +++ b/apps/web/src/components/chat/MessagesTimeline.tsx @@ -20,6 +20,7 @@ import { type TurnDiffSummary } from "../../types"; import { summarizeTurnDiffStats } from "../../lib/turnDiffTree"; import ChatMarkdown from "../ChatMarkdown"; import { + ArrowRightIcon, BotIcon, CheckIcon, CircleAlertIcon, @@ -56,6 +57,7 @@ import { import { cn } from "~/lib/utils"; import { type TimestampFormat } from "@t3tools/contracts/settings"; import { formatTimestamp } from "../../timestampFormat"; +import { PROVIDER_ICON_BY_PROVIDER, providerIconClassName } from "./ProviderModelPicker"; import { buildInlineTerminalContextText, formatInlineTerminalContextLabel, @@ -321,11 +323,20 @@ export const MessagesTimeline = memo(function MessagesTimeline({ : groupedEntries; const hiddenCount = groupedEntries.length - visibleEntries.length; const onlyToolEntries = groupedEntries.every((entry) => entry.tone === "tool"); - const showHeader = hasOverflow || !onlyToolEntries; + const onlyProviderHandoffEntries = groupedEntries.every( + (entry) => entry.providerHandoff !== undefined, + ); + const showHeader = !onlyProviderHandoffEntries && (hasOverflow || !onlyToolEntries); const groupLabel = onlyToolEntries ? "Tool calls" : "Work log"; return ( -
+
{showHeader && (

@@ -344,7 +355,11 @@ export const MessagesTimeline = memo(function MessagesTimeline({ )}

{visibleEntries.map((workEntry) => ( - + ))}
@@ -845,8 +860,17 @@ function toolWorkEntryHeading(workEntry: TimelineWorkEntry): string { const SimpleWorkEntryRow = memo(function SimpleWorkEntryRow(props: { workEntry: TimelineWorkEntry; + standaloneProviderHandoff?: boolean; }) { const { workEntry } = props; + if (workEntry.providerHandoff) { + return ( + + ); + } const iconConfig = workToneIcon(workEntry.tone); const EntryIcon = workEntryIcon(workEntry); const heading = toolWorkEntryHeading(workEntry); @@ -928,3 +952,63 @@ const SimpleWorkEntryRow = memo(function SimpleWorkEntryRow(props: {
); }); + +const ProviderHandoffWorkEntryRow = memo(function ProviderHandoffWorkEntryRow(props: { + workEntry: TimelineWorkEntry; + standalone: boolean; +}) { + const { workEntry } = props; + const providerHandoff = workEntry.providerHandoff; + if (!providerHandoff) { + return null; + } + const { sourceProvider, targetProvider, state } = providerHandoff; + const SourceProviderIcon = PROVIDER_ICON_BY_PROVIDER[sourceProvider]; + const TargetProviderIcon = PROVIDER_ICON_BY_PROVIDER[targetProvider]; + + const pill = ( +
+ + + + {sourceProvider === "codex" ? "Codex" : "Claude"} to{" "} + {targetProvider === "codex" ? "Codex" : "Claude"} + + + {workEntry.label} + + + {state === "compacting" ? ( +
+ ); + + if (!props.standalone) { + return
{pill}
; + } + + return ( +
+ + {pill} + +
+ ); +}); diff --git a/apps/web/src/components/chat/ProviderModelPicker.tsx b/apps/web/src/components/chat/ProviderModelPicker.tsx index 01fa37516e..a709a58453 100644 --- a/apps/web/src/components/chat/ProviderModelPicker.tsx +++ b/apps/web/src/components/chat/ProviderModelPicker.tsx @@ -30,7 +30,7 @@ function isAvailableProviderOption(option: (typeof PROVIDER_OPTIONS)[number]): o return option.available; } -const PROVIDER_ICON_BY_PROVIDER: Record = { +export const PROVIDER_ICON_BY_PROVIDER: Record = { codex: OpenAI, claudeAgent: ClaudeAI, cursor: CursorIcon, @@ -43,7 +43,7 @@ const COMING_SOON_PROVIDER_OPTIONS = [ { id: "gemini", label: "Gemini", icon: Gemini }, ] as const; -function providerIconClassName( +export function providerIconClassName( provider: ProviderKind | ProviderPickerKind, fallbackClassName: string, ): string { diff --git a/apps/web/src/composerDraftStore.test.ts b/apps/web/src/composerDraftStore.test.ts index 2169bbf858..b43f920fef 100644 --- a/apps/web/src/composerDraftStore.test.ts +++ b/apps/web/src/composerDraftStore.test.ts @@ -6,8 +6,10 @@ import { } from "@t3tools/client-runtime"; import * as Schema from "effect/Schema"; import { + DEFAULT_UNIFIED_SETTINGS, EnvironmentId, ProjectId, + type ServerProvider, ThreadId, type ModelSelection, type ProviderModelOptions, @@ -16,6 +18,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { COMPOSER_DRAFT_STORAGE_KEY, + deriveEffectiveComposerModelState, finalizePromotedDraftThreadByRef, markPromotedDraftThread, markPromotedDraftThreadByRef, @@ -109,6 +112,44 @@ function providerModelOptions(options: ProviderModelOptions): ProviderModelOptio const TEST_ENVIRONMENT_ID = EnvironmentId.make("environment-local"); const OTHER_TEST_ENVIRONMENT_ID = EnvironmentId.make("environment-remote"); const LEGACY_TEST_ENVIRONMENT_ID = EnvironmentId.make("__legacy__"); +const TEST_PROVIDERS: ServerProvider[] = [ + { + provider: "codex", + enabled: true, + installed: true, + version: "1.0.0", + status: "ready", + auth: { status: "authenticated" }, + checkedAt: "2026-04-10T00:00:00.000Z", + models: [ + { slug: "gpt-5.4", name: "GPT-5.4", isCustom: false, capabilities: null }, + { slug: "gpt-5.4-mini", name: "GPT-5.4 Mini", isCustom: false, capabilities: null }, + ], + }, + { + provider: "claudeAgent", + enabled: true, + installed: true, + version: "1.0.0", + status: "ready", + auth: { status: "authenticated" }, + checkedAt: "2026-04-10T00:00:00.000Z", + models: [ + { + slug: "claude-opus-4-6", + name: "Claude Opus 4.6", + isCustom: false, + capabilities: null, + }, + { + slug: "claude-sonnet-4-6", + name: "Claude Sonnet 4.6", + isCustom: false, + capabilities: null, + }, + ], + }, +]; function threadKeyFor( threadId: ThreadId, @@ -481,6 +522,28 @@ describe("composerDraftStore terminal contexts", () => { }); }); +describe("deriveEffectiveComposerModelState", () => { + it("keeps the locked provider on its own model while another provider is pending", () => { + const state = deriveEffectiveComposerModelState({ + draft: null, + providers: TEST_PROVIDERS, + selectedProvider: "codex", + threadModelSelection: { + provider: "claudeAgent", + model: "claude-opus-4-6", + }, + projectModelSelection: { + provider: "codex", + model: "gpt-5.4", + }, + settings: DEFAULT_UNIFIED_SETTINGS, + }); + + expect(state.selectedModel).toBe("gpt-5.4"); + expect(state.modelOptions).toBeNull(); + }); +}); + describe("composerDraftStore project draft thread mapping", () => { const projectId = ProjectId.make("project-a"); const otherProjectId = ProjectId.make("project-b"); diff --git a/apps/web/src/composerDraftStore.ts b/apps/web/src/composerDraftStore.ts index 231cf06566..aef0747037 100644 --- a/apps/web/src/composerDraftStore.ts +++ b/apps/web/src/composerDraftStore.ts @@ -747,9 +747,17 @@ export function deriveEffectiveComposerModelState(input: { projectModelSelection: ModelSelection | null | undefined; settings: UnifiedSettings; }): EffectiveComposerModelState { + const threadModelSelectionForProvider = + input.threadModelSelection?.provider === input.selectedProvider + ? input.threadModelSelection + : null; + const projectModelSelectionForProvider = + input.projectModelSelection?.provider === input.selectedProvider + ? input.projectModelSelection + : null; const baseModel = normalizeModelSlug( - input.threadModelSelection?.model ?? input.projectModelSelection?.model, + threadModelSelectionForProvider?.model ?? projectModelSelectionForProvider?.model, input.selectedProvider, ) ?? getDefaultServerModel(input.providers, input.selectedProvider); const activeSelection = input.draft?.modelSelectionByProvider?.[input.selectedProvider]; @@ -763,8 +771,8 @@ export function deriveEffectiveComposerModelState(input: { : baseModel; const modelOptions = modelSelectionByProviderToOptions(input.draft?.modelSelectionByProvider) ?? - providerModelOptionsFromSelection(input.threadModelSelection) ?? - providerModelOptionsFromSelection(input.projectModelSelection) ?? + providerModelOptionsFromSelection(threadModelSelectionForProvider) ?? + providerModelOptionsFromSelection(projectModelSelectionForProvider) ?? null; return { diff --git a/apps/web/src/session-logic.test.ts b/apps/web/src/session-logic.test.ts index d3fd304349..35082d9cca 100644 --- a/apps/web/src/session-logic.test.ts +++ b/apps/web/src/session-logic.test.ts @@ -1136,6 +1136,75 @@ describe("deriveWorkLogEntries context window handling", () => { expect(entries).toHaveLength(1); expect(entries[0]?.label).toBe("Context compacted"); }); + + it("preserves provider handoff metadata for dedicated UI treatment", () => { + const [entry] = deriveWorkLogEntries( + [ + makeActivity({ + id: "provider-handoff-1", + turnId: "turn-1", + kind: "provider.handoff.compacting", + summary: "Handing off thread", + tone: "info", + payload: { + sourceProvider: "codex", + targetProvider: "claudeAgent", + }, + }), + ], + TurnId.make("turn-1"), + ); + + expect(entry).toMatchObject({ + label: "Handing off thread", + providerHandoff: { + sourceProvider: "codex", + targetProvider: "claudeAgent", + state: "compacting", + }, + }); + }); + + it("replaces the compacting handoff entry with the completed entry", () => { + const entries = deriveWorkLogEntries( + [ + makeActivity({ + id: "provider-handoff-1", + turnId: "turn-1", + kind: "provider.handoff.compacting", + summary: "Handing off thread", + tone: "info", + payload: { + sourceProvider: "codex", + targetProvider: "claudeAgent", + }, + }), + makeActivity({ + id: "provider-handoff-2", + turnId: "turn-1", + kind: "provider.handoff.completed", + summary: "Handed off thread", + tone: "info", + payload: { + sourceProvider: "codex", + targetProvider: "claudeAgent", + }, + }), + ], + TurnId.make("turn-1"), + ); + + expect(entries).toHaveLength(1); + expect(entries[0]).toMatchObject({ + id: "provider-handoff-2", + label: "Handed off thread", + providerHandoff: { + sourceProvider: "codex", + targetProvider: "claudeAgent", + state: "completed", + }, + }); + }); }); describe("hasToolActivityForTurn", () => { diff --git a/apps/web/src/session-logic.ts b/apps/web/src/session-logic.ts index 54e84c883a..8c54e074be 100644 --- a/apps/web/src/session-logic.ts +++ b/apps/web/src/session-logic.ts @@ -44,6 +44,11 @@ export interface WorkLogEntry { toolTitle?: string; itemType?: ToolLifecycleItemType; requestKind?: PendingApproval["requestKind"]; + providerHandoff?: { + sourceProvider: ProviderKind; + targetProvider: ProviderKind; + state: "compacting" | "completed"; + }; } interface DerivedWorkLogEntry extends WorkLogEntry { @@ -462,7 +467,9 @@ export function deriveWorkLogEntries( ): WorkLogEntry[] { const ordered = [...activities].toSorted(compareActivitiesByOrder); const entries = ordered - .filter((activity) => (latestTurnId ? activity.turnId === latestTurnId : true)) + .filter((activity) => + latestTurnId ? activity.turnId === latestTurnId || isProviderHandoffActivity(activity) : true, + ) .filter((activity) => activity.kind !== "tool.started") .filter((activity) => activity.kind !== "task.started" && activity.kind !== "task.completed") .filter((activity) => activity.kind !== "context-window.updated") @@ -474,6 +481,13 @@ export function deriveWorkLogEntries( ); } +function isProviderHandoffActivity(activity: OrchestrationThreadActivity): boolean { + return ( + activity.kind === "provider.handoff.compacting" || + activity.kind === "provider.handoff.completed" + ); +} + function isPlanBoundaryToolActivity(activity: OrchestrationThreadActivity): boolean { if (activity.kind !== "tool.updated" && activity.kind !== "tool.completed") { return false; @@ -501,6 +515,7 @@ function toDerivedWorkLogEntry(activity: OrchestrationThreadActivity): DerivedWo tone: activity.tone === "approval" ? "info" : activity.tone, activityKind: activity.kind, }; + const providerHandoff = extractProviderHandoff(activity.kind, payload); const itemType = extractWorkLogItemType(payload); const requestKind = extractWorkLogRequestKind(payload); if (payload && typeof payload.detail === "string" && payload.detail.length > 0) { @@ -527,6 +542,9 @@ function toDerivedWorkLogEntry(activity: OrchestrationThreadActivity): DerivedWo if (requestKind) { entry.requestKind = requestKind; } + if (providerHandoff) { + entry.providerHandoff = providerHandoff; + } const collapseKey = deriveToolLifecycleCollapseKey(entry); if (collapseKey) { entry.collapseKey = collapseKey; @@ -534,12 +552,47 @@ function toDerivedWorkLogEntry(activity: OrchestrationThreadActivity): DerivedWo return entry; } +function extractProviderHandoff( + activityKind: OrchestrationThreadActivity["kind"], + payload: Record | null, +): WorkLogEntry["providerHandoff"] | undefined { + const state = + activityKind === "provider.handoff.compacting" + ? "compacting" + : activityKind === "provider.handoff.completed" + ? "completed" + : null; + if (!state) { + return undefined; + } + const sourceProvider = + payload?.sourceProvider === "codex" || payload?.sourceProvider === "claudeAgent" + ? payload.sourceProvider + : null; + const targetProvider = + payload?.targetProvider === "codex" || payload?.targetProvider === "claudeAgent" + ? payload.targetProvider + : null; + if (!sourceProvider || !targetProvider) { + return undefined; + } + return { + sourceProvider, + targetProvider, + state, + }; +} + function collapseDerivedWorkLogEntries( entries: ReadonlyArray, ): DerivedWorkLogEntry[] { const collapsed: DerivedWorkLogEntry[] = []; for (const entry of entries) { const previous = collapsed.at(-1); + if (previous && shouldCollapseProviderHandoffEntries(previous, entry)) { + collapsed[collapsed.length - 1] = entry; + continue; + } if (previous && shouldCollapseToolLifecycleEntries(previous, entry)) { collapsed[collapsed.length - 1] = mergeDerivedWorkLogEntries(previous, entry); continue; @@ -549,6 +602,24 @@ function collapseDerivedWorkLogEntries( return collapsed; } +function shouldCollapseProviderHandoffEntries( + previous: DerivedWorkLogEntry, + next: DerivedWorkLogEntry, +): boolean { + const previousHandoff = previous.providerHandoff; + const nextHandoff = next.providerHandoff; + if (!previousHandoff || !nextHandoff) { + return false; + } + if (previousHandoff.state !== "compacting") { + return false; + } + return ( + previousHandoff.sourceProvider === nextHandoff.sourceProvider && + previousHandoff.targetProvider === nextHandoff.targetProvider + ); +} + function shouldCollapseToolLifecycleEntries( previous: DerivedWorkLogEntry, next: DerivedWorkLogEntry,