diff --git a/apps/sim/app/api/mothership/chat/route.ts b/apps/sim/app/api/mothership/chat/route.ts index eee1f4c2e0..18b66b5577 100644 --- a/apps/sim/app/api/mothership/chat/route.ts +++ b/apps/sim/app/api/mothership/chat/route.ts @@ -7,7 +7,11 @@ import { z } from 'zod' import { getSession } from '@/lib/auth' import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle' import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload' -import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/chat-streaming' +import { + createSSEStream, + SSE_RESPONSE_HEADERS, + waitForPendingChatStream, +} from '@/lib/copilot/chat-streaming' import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types' import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents' import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers' @@ -244,6 +248,10 @@ export async function POST(req: NextRequest) { { selectedModel: '' } ) + if (actualChatId) { + await waitForPendingChatStream(actualChatId) + } + const stream = createSSEStream({ requestPayload, userId: authenticatedUserId, diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 2aef0e02c2..299c8f0f85 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -1088,6 +1088,14 @@ export function useChat( }) const stopGeneration = useCallback(async () => { + if (sendingRef.current && !chatIdRef.current) { + const start = Date.now() + while (!chatIdRef.current && sendingRef.current && Date.now() - start < 3000) { + await new Promise((r) => setTimeout(r, 50)) + } + if (!chatIdRef.current) return + } + if (sendingRef.current) { await persistPartialResponse() } diff --git a/apps/sim/lib/copilot/chat-streaming.ts b/apps/sim/lib/copilot/chat-streaming.ts index ea266eb933..2f4e6d1251 100644 --- a/apps/sim/lib/copilot/chat-streaming.ts +++ b/apps/sim/lib/copilot/chat-streaming.ts @@ -20,6 +20,49 @@ const logger = createLogger('CopilotChatStreaming') // reach them. Keyed by streamId, cleaned up when the stream completes. const activeStreams = new Map() +// Tracks in-flight streams by chatId so that a subsequent request for the +// same chat can force-abort the previous stream and wait for it to settle +// before forwarding to Go. +const pendingChatStreams = new Map< + string, + { promise: Promise; resolve: () => void; streamId: string } +>() + +function registerPendingChatStream(chatId: string, streamId: string): void { + if (pendingChatStreams.has(chatId)) { + logger.warn(`registerPendingChatStream: overwriting existing entry for chatId ${chatId}`) + } + let resolve!: () => void + const promise = new Promise((r) => { + resolve = r + }) + pendingChatStreams.set(chatId, { promise, resolve, streamId }) +} + +function resolvePendingChatStream(chatId: string, streamId: string): void { + const entry = pendingChatStreams.get(chatId) + if (entry && entry.streamId === streamId) { + entry.resolve() + pendingChatStreams.delete(chatId) + } +} + +/** + * Abort any in-flight stream on `chatId` and wait for it to fully settle + * (including onComplete and Go-side persistence). Returns immediately if + * no stream is active. Gives up after `timeoutMs`. + */ +export async function waitForPendingChatStream(chatId: string, timeoutMs = 5_000): Promise { + const entry = pendingChatStreams.get(chatId) + if (!entry) return + + // Force-abort the previous stream so we don't passively wait for it to + // finish naturally (which could take tens of seconds for a subagent). + abortActiveStream(entry.streamId) + + await Promise.race([entry.promise, new Promise((r) => setTimeout(r, timeoutMs))]) +} + export function abortActiveStream(streamId: string): boolean { const controller = activeStreams.get(streamId) if (!controller) return false @@ -112,6 +155,10 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS const abortController = new AbortController() activeStreams.set(streamId, abortController) + if (chatId) { + registerPendingChatStream(chatId, streamId) + } + return new ReadableStream({ async start(controller) { const encoder = new TextEncoder() @@ -210,6 +257,9 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS }) } finally { activeStreams.delete(streamId) + if (chatId) { + resolvePendingChatStream(chatId, streamId) + } try { controller.close() } catch { diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts index 7fe32e635e..44157f42a6 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts @@ -610,6 +610,24 @@ export async function executeToolAndReport( } } +function abortAwareSleep(ms: number, abortSignal?: AbortSignal): Promise { + return new Promise((resolve) => { + if (abortSignal?.aborted) { + resolve() + return + } + const timer = setTimeout(resolve, ms) + abortSignal?.addEventListener( + 'abort', + () => { + clearTimeout(timer) + resolve() + }, + { once: true } + ) + }) +} + export async function waitForToolDecision( toolCallId: string, timeoutMs: number, @@ -624,7 +642,7 @@ export async function waitForToolDecision( if (decision?.status) { return decision } - await new Promise((resolve) => setTimeout(resolve, interval)) + await abortAwareSleep(interval, abortSignal) interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval) } return null @@ -663,7 +681,7 @@ export async function waitForToolCompletion( ) { return decision } - await new Promise((resolve) => setTimeout(resolve, interval)) + await abortAwareSleep(interval, abortSignal) interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval) } return null