Skip to content

Commit 101fcec

Browse files
authored
fix(mothership): stream management (#3623)
* Fix * Fix * Fix * Fix * Fix lint * Fix
1 parent 1873f2d commit 101fcec

File tree

4 files changed

+87
-3
lines changed

4 files changed

+87
-3
lines changed

apps/sim/app/api/mothership/chat/route.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import { z } from 'zod'
77
import { getSession } from '@/lib/auth'
88
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
99
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
10-
import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/chat-streaming'
10+
import {
11+
createSSEStream,
12+
SSE_RESPONSE_HEADERS,
13+
waitForPendingChatStream,
14+
} from '@/lib/copilot/chat-streaming'
1115
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
1216
import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents'
1317
import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers'
@@ -244,6 +248,10 @@ export async function POST(req: NextRequest) {
244248
{ selectedModel: '' }
245249
)
246250

251+
if (actualChatId) {
252+
await waitForPendingChatStream(actualChatId)
253+
}
254+
247255
const stream = createSSEStream({
248256
requestPayload,
249257
userId: authenticatedUserId,

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1088,6 +1088,14 @@ export function useChat(
10881088
})
10891089

10901090
const stopGeneration = useCallback(async () => {
1091+
if (sendingRef.current && !chatIdRef.current) {
1092+
const start = Date.now()
1093+
while (!chatIdRef.current && sendingRef.current && Date.now() - start < 3000) {
1094+
await new Promise((r) => setTimeout(r, 50))
1095+
}
1096+
if (!chatIdRef.current) return
1097+
}
1098+
10911099
if (sendingRef.current) {
10921100
await persistPartialResponse()
10931101
}

apps/sim/lib/copilot/chat-streaming.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,49 @@ const logger = createLogger('CopilotChatStreaming')
2020
// reach them. Keyed by streamId, cleaned up when the stream completes.
2121
const activeStreams = new Map<string, AbortController>()
2222

23+
// Tracks in-flight streams by chatId so that a subsequent request for the
24+
// same chat can force-abort the previous stream and wait for it to settle
25+
// before forwarding to Go.
26+
const pendingChatStreams = new Map<
27+
string,
28+
{ promise: Promise<void>; resolve: () => void; streamId: string }
29+
>()
30+
31+
function registerPendingChatStream(chatId: string, streamId: string): void {
32+
if (pendingChatStreams.has(chatId)) {
33+
logger.warn(`registerPendingChatStream: overwriting existing entry for chatId ${chatId}`)
34+
}
35+
let resolve!: () => void
36+
const promise = new Promise<void>((r) => {
37+
resolve = r
38+
})
39+
pendingChatStreams.set(chatId, { promise, resolve, streamId })
40+
}
41+
42+
function resolvePendingChatStream(chatId: string, streamId: string): void {
43+
const entry = pendingChatStreams.get(chatId)
44+
if (entry && entry.streamId === streamId) {
45+
entry.resolve()
46+
pendingChatStreams.delete(chatId)
47+
}
48+
}
49+
50+
/**
51+
* Abort any in-flight stream on `chatId` and wait for it to fully settle
52+
* (including onComplete and Go-side persistence). Returns immediately if
53+
* no stream is active. Gives up after `timeoutMs`.
54+
*/
55+
export async function waitForPendingChatStream(chatId: string, timeoutMs = 5_000): Promise<void> {
56+
const entry = pendingChatStreams.get(chatId)
57+
if (!entry) return
58+
59+
// Force-abort the previous stream so we don't passively wait for it to
60+
// finish naturally (which could take tens of seconds for a subagent).
61+
abortActiveStream(entry.streamId)
62+
63+
await Promise.race([entry.promise, new Promise<void>((r) => setTimeout(r, timeoutMs))])
64+
}
65+
2366
export function abortActiveStream(streamId: string): boolean {
2467
const controller = activeStreams.get(streamId)
2568
if (!controller) return false
@@ -112,6 +155,10 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
112155
const abortController = new AbortController()
113156
activeStreams.set(streamId, abortController)
114157

158+
if (chatId) {
159+
registerPendingChatStream(chatId, streamId)
160+
}
161+
115162
return new ReadableStream({
116163
async start(controller) {
117164
const encoder = new TextEncoder()
@@ -210,6 +257,9 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
210257
})
211258
} finally {
212259
activeStreams.delete(streamId)
260+
if (chatId) {
261+
resolvePendingChatStream(chatId, streamId)
262+
}
213263
try {
214264
controller.close()
215265
} catch {

apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,24 @@ export async function executeToolAndReport(
610610
}
611611
}
612612

613+
function abortAwareSleep(ms: number, abortSignal?: AbortSignal): Promise<void> {
614+
return new Promise<void>((resolve) => {
615+
if (abortSignal?.aborted) {
616+
resolve()
617+
return
618+
}
619+
const timer = setTimeout(resolve, ms)
620+
abortSignal?.addEventListener(
621+
'abort',
622+
() => {
623+
clearTimeout(timer)
624+
resolve()
625+
},
626+
{ once: true }
627+
)
628+
})
629+
}
630+
613631
export async function waitForToolDecision(
614632
toolCallId: string,
615633
timeoutMs: number,
@@ -624,7 +642,7 @@ export async function waitForToolDecision(
624642
if (decision?.status) {
625643
return decision
626644
}
627-
await new Promise((resolve) => setTimeout(resolve, interval))
645+
await abortAwareSleep(interval, abortSignal)
628646
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
629647
}
630648
return null
@@ -663,7 +681,7 @@ export async function waitForToolCompletion(
663681
) {
664682
return decision
665683
}
666-
await new Promise((resolve) => setTimeout(resolve, interval))
684+
await abortAwareSleep(interval, abortSignal)
667685
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
668686
}
669687
return null

0 commit comments

Comments
 (0)