Skip to content

Commit bed1156

Browse files
committed
Fix
1 parent 2a7b07e commit bed1156

File tree

3 files changed

+71
-3
lines changed

3 files changed

+71
-3
lines changed

apps/sim/app/api/mothership/chat/route.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@ import { z } from 'zod'
77
import { getSession } from '@/lib/auth'
88
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
99
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
10-
import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/chat-streaming'
10+
import {
11+
createSSEStream,
12+
SSE_RESPONSE_HEADERS,
13+
waitForPendingChatStream,
14+
} from '@/lib/copilot/chat-streaming'
1115
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
1216
import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents'
1317
import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers'
@@ -244,6 +248,10 @@ export async function POST(req: NextRequest) {
244248
{ selectedModel: '' }
245249
)
246250

251+
if (actualChatId) {
252+
await waitForPendingChatStream(actualChatId)
253+
}
254+
247255
const stream = createSSEStream({
248256
requestPayload,
249257
userId: authenticatedUserId,

apps/sim/lib/copilot/chat-streaming.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,40 @@ const logger = createLogger('CopilotChatStreaming')
2020
// reach them. Keyed by streamId, cleaned up when the stream completes.
2121
const activeStreams = new Map<string, AbortController>()
2222

23+
// Tracks in-flight streams by chatId so that a subsequent request for the
24+
// same chat can wait until the previous stream (and its onComplete / Go-side
25+
// persistence) has fully settled before forwarding to Go.
26+
const pendingChatStreams = new Map<string, { promise: Promise<void>; resolve: () => void }>()
27+
28+
function registerPendingChatStream(chatId: string): void {
29+
let resolve: () => void
30+
const promise = new Promise<void>((r) => {
31+
resolve = r
32+
})
33+
pendingChatStreams.set(chatId, { promise, resolve: resolve! })
34+
}
35+
36+
function resolvePendingChatStream(chatId: string): void {
37+
const entry = pendingChatStreams.get(chatId)
38+
if (entry) {
39+
entry.resolve()
40+
pendingChatStreams.delete(chatId)
41+
}
42+
}
43+
44+
/**
45+
* Wait for any in-flight stream on `chatId` to finish before proceeding.
46+
* Returns immediately if no stream is active. Gives up after `timeoutMs`.
47+
*/
48+
export async function waitForPendingChatStream(
49+
chatId: string,
50+
timeoutMs = 5_000
51+
): Promise<void> {
52+
const entry = pendingChatStreams.get(chatId)
53+
if (!entry) return
54+
await Promise.race([entry.promise, new Promise<void>((r) => setTimeout(r, timeoutMs))])
55+
}
56+
2357
export function abortActiveStream(streamId: string): boolean {
2458
const controller = activeStreams.get(streamId)
2559
if (!controller) return false
@@ -112,6 +146,10 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
112146
const abortController = new AbortController()
113147
activeStreams.set(streamId, abortController)
114148

149+
if (chatId) {
150+
registerPendingChatStream(chatId)
151+
}
152+
115153
return new ReadableStream({
116154
async start(controller) {
117155
const encoder = new TextEncoder()
@@ -210,6 +248,9 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
210248
})
211249
} finally {
212250
activeStreams.delete(streamId)
251+
if (chatId) {
252+
resolvePendingChatStream(chatId)
253+
}
213254
try {
214255
controller.close()
215256
} catch {
@@ -219,6 +260,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
219260
},
220261
cancel() {
221262
clientDisconnected = true
263+
abortController.abort()
222264
if (eventWriter) {
223265
eventWriter.flush().catch(() => {})
224266
}

apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,24 @@ export async function executeToolAndReport(
610610
}
611611
}
612612

613+
function abortAwareSleep(ms: number, abortSignal?: AbortSignal): Promise<void> {
614+
return new Promise<void>((resolve) => {
615+
if (abortSignal?.aborted) {
616+
resolve()
617+
return
618+
}
619+
const timer = setTimeout(resolve, ms)
620+
abortSignal?.addEventListener(
621+
'abort',
622+
() => {
623+
clearTimeout(timer)
624+
resolve()
625+
},
626+
{ once: true }
627+
)
628+
})
629+
}
630+
613631
export async function waitForToolDecision(
614632
toolCallId: string,
615633
timeoutMs: number,
@@ -624,7 +642,7 @@ export async function waitForToolDecision(
624642
if (decision?.status) {
625643
return decision
626644
}
627-
await new Promise((resolve) => setTimeout(resolve, interval))
645+
await abortAwareSleep(interval, abortSignal)
628646
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
629647
}
630648
return null
@@ -663,7 +681,7 @@ export async function waitForToolCompletion(
663681
) {
664682
return decision
665683
}
666-
await new Promise((resolve) => setTimeout(resolve, interval))
684+
await abortAwareSleep(interval, abortSignal)
667685
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
668686
}
669687
return null

0 commit comments

Comments
 (0)