Skip to content

Commit 092525e

Browse files
authored
fix(mothership): abort streamlining (#3734)
* Fixes * Address bugbot * Fixes * Fix * Fixes * Fix lint * Fixes * Fixes * Truncate log
1 parent 8eb45e3 commit 092525e

File tree

20 files changed

+685
-139
lines changed

20 files changed

+685
-139
lines changed

apps/sim/app/api/copilot/chat/abort/route.ts

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import { NextResponse } from 'next/server'
2-
import { abortActiveStream } from '@/lib/copilot/chat-streaming'
2+
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
3+
import { abortActiveStream, waitForPendingChatStream } from '@/lib/copilot/chat-streaming'
4+
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
35
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
6+
import { env } from '@/lib/core/config/env'
7+
8+
const GO_EXPLICIT_ABORT_TIMEOUT_MS = 3000
49

510
export async function POST(request: Request) {
611
const { userId: authenticatedUserId, isAuthenticated } =
@@ -12,11 +17,48 @@ export async function POST(request: Request) {
1217

1318
const body = await request.json().catch(() => ({}))
1419
const streamId = typeof body.streamId === 'string' ? body.streamId : ''
20+
let chatId = typeof body.chatId === 'string' ? body.chatId : ''
1521

1622
if (!streamId) {
1723
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
1824
}
1925

20-
const aborted = abortActiveStream(streamId)
26+
if (!chatId) {
27+
const run = await getLatestRunForStream(streamId, authenticatedUserId).catch(() => null)
28+
if (run?.chatId) {
29+
chatId = run.chatId
30+
}
31+
}
32+
33+
try {
34+
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
35+
if (env.COPILOT_API_KEY) {
36+
headers['x-api-key'] = env.COPILOT_API_KEY
37+
}
38+
const controller = new AbortController()
39+
const timeout = setTimeout(() => controller.abort(), GO_EXPLICIT_ABORT_TIMEOUT_MS)
40+
const response = await fetch(`${SIM_AGENT_API_URL}/api/streams/explicit-abort`, {
41+
method: 'POST',
42+
headers,
43+
signal: controller.signal,
44+
body: JSON.stringify({
45+
messageId: streamId,
46+
userId: authenticatedUserId,
47+
...(chatId ? { chatId } : {}),
48+
}),
49+
}).finally(() => clearTimeout(timeout))
50+
if (!response.ok) {
51+
throw new Error(`Explicit abort marker request failed: ${response.status}`)
52+
}
53+
} catch {
54+
// best effort: local abort should still proceed even if Go marker fails
55+
}
56+
57+
const aborted = await abortActiveStream(streamId)
58+
if (chatId) {
59+
await waitForPendingChatStream(chatId, GO_EXPLICIT_ABORT_TIMEOUT_MS + 1000, streamId).catch(
60+
() => false
61+
)
62+
}
2163
return NextResponse.json({ aborted })
2264
}

apps/sim/app/api/copilot/chat/route.ts

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,17 @@ import { getSession } from '@/lib/auth'
88
import { getAccessibleCopilotChat, resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
99
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
1010
import {
11+
acquirePendingChatStream,
1112
createSSEStream,
13+
releasePendingChatStream,
1214
requestChatTitle,
1315
SSE_RESPONSE_HEADERS,
1416
} from '@/lib/copilot/chat-streaming'
1517
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
1618
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
1719
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
1820
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
21+
import { resolveActiveResourceContext } from '@/lib/copilot/process-contents'
1922
import {
2023
authenticateCopilotRequestSessionOnly,
2124
createBadRequestResponse,
@@ -44,6 +47,13 @@ const FileAttachmentSchema = z.object({
4447
size: z.number(),
4548
})
4649

50+
const ResourceAttachmentSchema = z.object({
51+
type: z.enum(['workflow', 'table', 'file', 'knowledgebase']),
52+
id: z.string().min(1),
53+
title: z.string().optional(),
54+
active: z.boolean().optional(),
55+
})
56+
4757
const ChatMessageSchema = z.object({
4858
message: z.string().min(1, 'Message is required'),
4959
userMessageId: z.string().optional(),
@@ -58,6 +68,7 @@ const ChatMessageSchema = z.object({
5868
stream: z.boolean().optional().default(true),
5969
implicitFeedback: z.string().optional(),
6070
fileAttachments: z.array(FileAttachmentSchema).optional(),
71+
resourceAttachments: z.array(ResourceAttachmentSchema).optional(),
6172
provider: z.string().optional(),
6273
contexts: z
6374
.array(
@@ -98,6 +109,10 @@ const ChatMessageSchema = z.object({
98109
*/
99110
export async function POST(req: NextRequest) {
100111
const tracker = createRequestTracker()
112+
let actualChatId: string | undefined
113+
let pendingChatStreamAcquired = false
114+
let pendingChatStreamHandedOff = false
115+
let pendingChatStreamID: string | undefined
101116

102117
try {
103118
// Get session to access user information including name
@@ -124,6 +139,7 @@ export async function POST(req: NextRequest) {
124139
stream,
125140
implicitFeedback,
126141
fileAttachments,
142+
resourceAttachments,
127143
provider,
128144
contexts,
129145
commands,
@@ -189,7 +205,7 @@ export async function POST(req: NextRequest) {
189205

190206
let currentChat: any = null
191207
let conversationHistory: any[] = []
192-
let actualChatId = chatId
208+
actualChatId = chatId
193209
const selectedModel = model || 'claude-opus-4-6'
194210

195211
if (chatId || createNewChat) {
@@ -241,6 +257,39 @@ export async function POST(req: NextRequest) {
241257
}
242258
}
243259

260+
if (
261+
Array.isArray(resourceAttachments) &&
262+
resourceAttachments.length > 0 &&
263+
resolvedWorkspaceId
264+
) {
265+
const results = await Promise.allSettled(
266+
resourceAttachments.map(async (r) => {
267+
const ctx = await resolveActiveResourceContext(
268+
r.type,
269+
r.id,
270+
resolvedWorkspaceId!,
271+
authenticatedUserId,
272+
actualChatId
273+
)
274+
if (!ctx) return null
275+
return {
276+
...ctx,
277+
tag: r.active ? '@active_tab' : '@open_tab',
278+
}
279+
})
280+
)
281+
for (const result of results) {
282+
if (result.status === 'fulfilled' && result.value) {
283+
agentContexts.push(result.value)
284+
} else if (result.status === 'rejected') {
285+
logger.error(
286+
`[${tracker.requestId}] Failed to resolve resource attachment`,
287+
result.reason
288+
)
289+
}
290+
}
291+
}
292+
244293
const effectiveMode = mode === 'agent' ? 'build' : mode
245294

246295
const userPermission = resolvedWorkspaceId
@@ -291,6 +340,21 @@ export async function POST(req: NextRequest) {
291340
})
292341
} catch {}
293342

343+
if (stream && actualChatId) {
344+
const acquired = await acquirePendingChatStream(actualChatId, userMessageIdToUse)
345+
if (!acquired) {
346+
return NextResponse.json(
347+
{
348+
error:
349+
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
350+
},
351+
{ status: 409 }
352+
)
353+
}
354+
pendingChatStreamAcquired = true
355+
pendingChatStreamID = userMessageIdToUse
356+
}
357+
294358
if (actualChatId) {
295359
const userMsg = {
296360
id: userMessageIdToUse,
@@ -337,6 +401,7 @@ export async function POST(req: NextRequest) {
337401
titleProvider: provider,
338402
requestId: tracker.requestId,
339403
workspaceId: resolvedWorkspaceId,
404+
pendingChatStreamAlreadyRegistered: Boolean(actualChatId && stream),
340405
orchestrateOptions: {
341406
userId: authenticatedUserId,
342407
workflowId,
@@ -348,6 +413,7 @@ export async function POST(req: NextRequest) {
348413
interactive: true,
349414
onComplete: async (result: OrchestratorResult) => {
350415
if (!actualChatId) return
416+
if (!result.success) return
351417

352418
const assistantMessage: Record<string, unknown> = {
353419
id: crypto.randomUUID(),
@@ -423,6 +489,7 @@ export async function POST(req: NextRequest) {
423489
},
424490
},
425491
})
492+
pendingChatStreamHandedOff = true
426493

427494
return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS })
428495
}
@@ -528,6 +595,14 @@ export async function POST(req: NextRequest) {
528595
},
529596
})
530597
} catch (error) {
598+
if (
599+
actualChatId &&
600+
pendingChatStreamAcquired &&
601+
!pendingChatStreamHandedOff &&
602+
pendingChatStreamID
603+
) {
604+
await releasePendingChatStream(actualChatId, pendingChatStreamID).catch(() => {})
605+
}
531606
const duration = tracker.getDuration()
532607

533608
if (error instanceof z.ZodError) {

apps/sim/app/api/mothership/chat/route.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import { getSession } from '@/lib/auth'
88
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
99
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
1010
import {
11+
acquirePendingChatStream,
1112
createSSEStream,
1213
SSE_RESPONSE_HEADERS,
13-
waitForPendingChatStream,
1414
} from '@/lib/copilot/chat-streaming'
1515
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
1616
import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents'
@@ -253,7 +253,16 @@ export async function POST(req: NextRequest) {
253253
)
254254

255255
if (actualChatId) {
256-
await waitForPendingChatStream(actualChatId)
256+
const acquired = await acquirePendingChatStream(actualChatId, userMessageId)
257+
if (!acquired) {
258+
return NextResponse.json(
259+
{
260+
error:
261+
'A response is already in progress for this chat. Wait for it to finish or use Stop.',
262+
},
263+
{ status: 409 }
264+
)
265+
}
257266
}
258267

259268
const executionId = crypto.randomUUID()
@@ -271,6 +280,7 @@ export async function POST(req: NextRequest) {
271280
titleModel: 'claude-opus-4-6',
272281
requestId: tracker.requestId,
273282
workspaceId,
283+
pendingChatStreamAlreadyRegistered: Boolean(actualChatId),
274284
orchestrateOptions: {
275285
userId: authenticatedUserId,
276286
workspaceId,
@@ -282,6 +292,7 @@ export async function POST(req: NextRequest) {
282292
interactive: true,
283293
onComplete: async (result: OrchestratorResult) => {
284294
if (!actualChatId) return
295+
if (!result.success) return
285296

286297
const assistantMessage: Record<string, unknown> = {
287298
id: crypto.randomUUID(),

0 commit comments

Comments
 (0)