Skip to content

Commit 7728c64

Browse files
committed
Fix
1 parent bed1156 commit 7728c64

File tree

1 file changed

+17
-8
lines changed

1 file changed

+17
-8
lines changed

apps/sim/lib/copilot/chat-streaming.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,19 @@ const logger = createLogger('CopilotChatStreaming')
2121
const activeStreams = new Map<string, AbortController>()
2222

2323
// Tracks in-flight streams by chatId so that a subsequent request for the
24-
// same chat can wait until the previous stream (and its onComplete / Go-side
25-
// persistence) has fully settled before forwarding to Go.
26-
const pendingChatStreams = new Map<string, { promise: Promise<void>; resolve: () => void }>()
24+
// same chat can force-abort the previous stream and wait for it to settle
25+
// before forwarding to Go.
26+
const pendingChatStreams = new Map<
27+
string,
28+
{ promise: Promise<void>; resolve: () => void; streamId: string }
29+
>()
2730

28-
function registerPendingChatStream(chatId: string): void {
31+
function registerPendingChatStream(chatId: string, streamId: string): void {
2932
let resolve: () => void
3033
const promise = new Promise<void>((r) => {
3134
resolve = r
3235
})
33-
pendingChatStreams.set(chatId, { promise, resolve: resolve! })
36+
pendingChatStreams.set(chatId, { promise, resolve: resolve!, streamId })
3437
}
3538

3639
function resolvePendingChatStream(chatId: string): void {
@@ -42,15 +45,21 @@ function resolvePendingChatStream(chatId: string): void {
4245
}
4346

4447
/**
45-
* Wait for any in-flight stream on `chatId` to finish before proceeding.
46-
* Returns immediately if no stream is active. Gives up after `timeoutMs`.
48+
* Abort any in-flight stream on `chatId` and wait for it to fully settle
49+
* (including onComplete and Go-side persistence). Returns immediately if
50+
* no stream is active. Gives up after `timeoutMs`.
4751
*/
4852
export async function waitForPendingChatStream(
4953
chatId: string,
5054
timeoutMs = 5_000
5155
): Promise<void> {
5256
const entry = pendingChatStreams.get(chatId)
5357
if (!entry) return
58+
59+
// Force-abort the previous stream so we don't passively wait for it to
60+
// finish naturally (which could take tens of seconds for a subagent).
61+
abortActiveStream(entry.streamId)
62+
5463
await Promise.race([entry.promise, new Promise<void>((r) => setTimeout(r, timeoutMs))])
5564
}
5665

@@ -147,7 +156,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
147156
activeStreams.set(streamId, abortController)
148157

149158
if (chatId) {
150-
registerPendingChatStream(chatId)
159+
registerPendingChatStream(chatId, streamId)
151160
}
152161

153162
return new ReadableStream({

0 commit comments

Comments
 (0)