Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion apps/sim/app/api/mothership/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/chat-streaming'
import {
createSSEStream,
SSE_RESPONSE_HEADERS,
waitForPendingChatStream,
} from '@/lib/copilot/chat-streaming'
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents'
import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers'
Expand Down Expand Up @@ -244,6 +248,10 @@ export async function POST(req: NextRequest) {
{ selectedModel: '' }
)

if (actualChatId) {
await waitForPendingChatStream(actualChatId)
}

const stream = createSSEStream({
requestPayload,
userId: authenticatedUserId,
Expand Down
8 changes: 8 additions & 0 deletions apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,14 @@ export function useChat(
})

const stopGeneration = useCallback(async () => {
if (sendingRef.current && !chatIdRef.current) {
const start = Date.now()
while (!chatIdRef.current && sendingRef.current && Date.now() - start < 3000) {
await new Promise((r) => setTimeout(r, 50))
}
if (!chatIdRef.current) return
}

if (sendingRef.current) {
await persistPartialResponse()
}
Expand Down
50 changes: 50 additions & 0 deletions apps/sim/lib/copilot/chat-streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,49 @@ const logger = createLogger('CopilotChatStreaming')
// reach them. Keyed by streamId, cleaned up when the stream completes.
const activeStreams = new Map<string, AbortController>()

// Tracks in-flight streams by chatId so that a subsequent request for the
// same chat can force-abort the previous stream and wait for it to settle
// before forwarding to Go.
const pendingChatStreams = new Map<
string,
{ promise: Promise<void>; resolve: () => void; streamId: string }
>()

function registerPendingChatStream(chatId: string, streamId: string): void {
if (pendingChatStreams.has(chatId)) {
logger.warn(`registerPendingChatStream: overwriting existing entry for chatId ${chatId}`)
}
let resolve!: () => void
const promise = new Promise<void>((r) => {
resolve = r
})
pendingChatStreams.set(chatId, { promise, resolve, streamId })
}

function resolvePendingChatStream(chatId: string, streamId: string): void {
const entry = pendingChatStreams.get(chatId)
if (entry && entry.streamId === streamId) {
entry.resolve()
pendingChatStreams.delete(chatId)
}
}

/**
* Abort any in-flight stream on `chatId` and wait for it to fully settle
* (including onComplete and Go-side persistence). Returns immediately if
* no stream is active. Gives up after `timeoutMs`.
*/
export async function waitForPendingChatStream(chatId: string, timeoutMs = 5_000): Promise<void> {
const entry = pendingChatStreams.get(chatId)
if (!entry) return

// Force-abort the previous stream so we don't passively wait for it to
// finish naturally (which could take tens of seconds for a subagent).
abortActiveStream(entry.streamId)

await Promise.race([entry.promise, new Promise<void>((r) => setTimeout(r, timeoutMs))])
}

export function abortActiveStream(streamId: string): boolean {
const controller = activeStreams.get(streamId)
if (!controller) return false
Expand Down Expand Up @@ -112,6 +155,10 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
const abortController = new AbortController()
activeStreams.set(streamId, abortController)

if (chatId) {
registerPendingChatStream(chatId, streamId)
}

return new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
Expand Down Expand Up @@ -210,6 +257,9 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
})
} finally {
activeStreams.delete(streamId)
if (chatId) {
resolvePendingChatStream(chatId, streamId)
}
try {
controller.close()
} catch {
Expand Down
22 changes: 20 additions & 2 deletions apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,24 @@ export async function executeToolAndReport(
}
}

function abortAwareSleep(ms: number, abortSignal?: AbortSignal): Promise<void> {
return new Promise<void>((resolve) => {
if (abortSignal?.aborted) {
resolve()
return
}
const timer = setTimeout(resolve, ms)
abortSignal?.addEventListener(
'abort',
() => {
clearTimeout(timer)
resolve()
},
{ once: true }
)
})
}

export async function waitForToolDecision(
toolCallId: string,
timeoutMs: number,
Expand All @@ -624,7 +642,7 @@ export async function waitForToolDecision(
if (decision?.status) {
return decision
}
await new Promise((resolve) => setTimeout(resolve, interval))
await abortAwareSleep(interval, abortSignal)
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
}
return null
Expand Down Expand Up @@ -663,7 +681,7 @@ export async function waitForToolCompletion(
) {
return decision
}
await new Promise((resolve) => setTimeout(resolve, interval))
await abortAwareSleep(interval, abortSignal)
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
}
return null
Expand Down
Loading