Skip to content

Commit 7316482

Browse files
TheodoreSpeaksTheodore Li
andauthored
fix(stream) handle task switching (#3609)
* Fix task switching causing stream to abort * Process all task streams all the time * Process task streams that are in the background --------- Co-authored-by: Theodore Li <theo@sim.ai>
1 parent 6d74f7b commit 7316482

File tree

3 files changed

+73
-30
lines changed

3 files changed

+73
-30
lines changed

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { VFS_DIR_TO_RESOURCE } from '@/lib/copilot/resource-types'
1212
import { isWorkflowToolName } from '@/lib/copilot/workflow-tools'
1313
import { getNextWorkflowColor } from '@/lib/workflows/colors'
1414
import { invalidateResourceQueries } from '@/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry'
15+
import { deploymentKeys } from '@/hooks/queries/deployments'
1516
import {
1617
type TaskChatHistory,
1718
type TaskStoredContentBlock,
@@ -21,6 +22,7 @@ import {
2122
taskKeys,
2223
useChatHistory,
2324
} from '@/hooks/queries/tasks'
25+
import { workflowKeys } from '@/hooks/queries/workflows'
2426
import { getTopInsertionSortOrder } from '@/hooks/queries/utils/top-insertion-sort-order'
2527
import { useExecutionStream } from '@/hooks/use-execution-stream'
2628
import { useExecutionStore } from '@/stores/execution/store'
@@ -74,6 +76,8 @@ const STATE_TO_STATUS: Record<string, ToolCallStatus> = {
7476
skipped: 'success',
7577
} as const
7678

79+
const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy'])
80+
7781
function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock {
7882
const mapped: ContentBlock = {
7983
type: block.type as ContentBlockType,
@@ -361,6 +365,15 @@ export function useChat(
361365

362366
useEffect(() => {
363367
if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return
368+
369+
const activeStreamId = chatHistory.activeStreamId
370+
const snapshot = chatHistory.streamSnapshot
371+
372+
if (activeStreamId && !snapshot && !sendingRef.current) {
373+
queryClient.invalidateQueries({ queryKey: taskKeys.detail(chatHistory.id) })
374+
return
375+
}
376+
364377
appliedChatIdRef.current = chatHistory.id
365378
setMessages(chatHistory.messages.map(mapStoredMessage))
366379

@@ -374,11 +387,6 @@ export function useChat(
374387
}
375388
}
376389

377-
// Kick off stream reconnection immediately if there's an active stream.
378-
// The stream snapshot was fetched in parallel with the chat history (same
379-
// API call), so there's no extra round-trip.
380-
const activeStreamId = chatHistory.activeStreamId
381-
const snapshot = chatHistory.streamSnapshot
382390
if (activeStreamId && !sendingRef.current) {
383391
const gen = ++streamGenRef.current
384392
const abortController = new AbortController()
@@ -396,8 +404,7 @@ export function useChat(
396404
const batchEvents = snapshot?.events ?? []
397405
const streamStatus = snapshot?.status ?? ''
398406

399-
if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) {
400-
// No snapshot available — stream buffer expired. Clean up.
407+
if (batchEvents.length === 0 && streamStatus === 'unknown') {
401408
const cid = chatIdRef.current
402409
if (cid) {
403410
fetch('/api/mothership/chat/stop', {
@@ -462,7 +469,7 @@ export function useChat(
462469
}
463470
reconnect()
464471
}
465-
}, [chatHistory, workspaceId])
472+
}, [chatHistory, workspaceId, queryClient])
466473

467474
useEffect(() => {
468475
if (resources.length === 0) {
@@ -686,6 +693,33 @@ export function useChat(
686693
onResourceEventRef.current?.()
687694
}
688695
}
696+
697+
if (DEPLOY_TOOL_NAMES.has(tc.name) && tc.status === 'success') {
698+
const output = tc.result?.output as Record<string, unknown> | undefined
699+
const deployedWorkflowId = (output?.workflowId as string) ?? undefined
700+
if (deployedWorkflowId && typeof output?.isDeployed === 'boolean') {
701+
const isDeployed = output.isDeployed as boolean
702+
const serverDeployedAt = output.deployedAt
703+
? new Date(output.deployedAt as string)
704+
: undefined
705+
useWorkflowRegistry
706+
.getState()
707+
.setDeploymentStatus(
708+
deployedWorkflowId,
709+
isDeployed,
710+
isDeployed ? (serverDeployedAt ?? new Date()) : undefined
711+
)
712+
queryClient.invalidateQueries({
713+
queryKey: deploymentKeys.info(deployedWorkflowId),
714+
})
715+
queryClient.invalidateQueries({
716+
queryKey: deploymentKeys.versions(deployedWorkflowId),
717+
})
718+
queryClient.invalidateQueries({
719+
queryKey: workflowKeys.list(workspaceId),
720+
})
721+
}
722+
}
689723
}
690724

691725
break
@@ -1116,11 +1150,6 @@ export function useChat(
11161150
useEffect(() => {
11171151
return () => {
11181152
streamGenRef.current++
1119-
// Only drop the browser→Sim read; the Sim→Go stream stays open
1120-
// so the backend can finish persisting. Explicit abort is only
1121-
// triggered by the stop button via /api/copilot/chat/abort.
1122-
abortControllerRef.current?.abort()
1123-
abortControllerRef.current = null
11241153
sendingRef.current = false
11251154
}
11261155
}, [])

apps/sim/lib/copilot/client-sse/handlers.ts

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,6 @@ export const sseHandlers: Record<string, SSEHandler> = {
567567
}
568568
}
569569

570-
// Deploy tools: update deployment status in workflow registry
571570
if (
572571
targetState === ClientToolCallState.success &&
573572
(current.name === 'deploy_api' ||
@@ -579,21 +578,30 @@ export const sseHandlers: Record<string, SSEHandler> = {
579578
const resultPayload = asRecord(
580579
data?.result || eventData.result || eventData.data || data?.data
581580
)
582-
const input = asRecord(current.params)
583-
const workflowId =
584-
(resultPayload?.workflowId as string) ||
585-
(input?.workflowId as string) ||
586-
useWorkflowRegistry.getState().activeWorkflowId
587-
const isDeployed = resultPayload?.isDeployed !== false
588-
if (workflowId) {
589-
useWorkflowRegistry
590-
.getState()
591-
.setDeploymentStatus(workflowId, isDeployed, isDeployed ? new Date() : undefined)
592-
logger.info('[SSE] Updated deployment status from tool result', {
593-
toolName: current.name,
594-
workflowId,
595-
isDeployed,
596-
})
581+
if (typeof resultPayload?.isDeployed === 'boolean') {
582+
const input = asRecord(current.params)
583+
const workflowId =
584+
(resultPayload?.workflowId as string) ||
585+
(input?.workflowId as string) ||
586+
useWorkflowRegistry.getState().activeWorkflowId
587+
const isDeployed = resultPayload.isDeployed as boolean
588+
const serverDeployedAt = resultPayload.deployedAt
589+
? new Date(resultPayload.deployedAt as string)
590+
: undefined
591+
if (workflowId) {
592+
useWorkflowRegistry
593+
.getState()
594+
.setDeploymentStatus(
595+
workflowId,
596+
isDeployed,
597+
isDeployed ? (serverDeployedAt ?? new Date()) : undefined
598+
)
599+
logger.info('[SSE] Updated deployment status from tool result', {
600+
toolName: current.name,
601+
workflowId,
602+
isDeployed,
603+
})
604+
}
597605
}
598606
} catch (err) {
599607
logger.warn('[SSE] Failed to hydrate deployment status', {

apps/sim/lib/copilot/orchestrator/tool-executor/deployment-tools/deploy.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ export async function executeDeployChat(
8787
return { success: false, error: 'Unauthorized chat access' }
8888
}
8989
await db.delete(chat).where(eq(chat.id, existing[0].id))
90-
return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } }
90+
return {
91+
success: true,
92+
output: { workflowId, success: true, action: 'undeploy', isChatDeployed: false },
93+
}
9194
}
9295

9396
const { hasAccess } = await checkWorkflowAccessForChatCreation(workflowId, context.userId)
@@ -199,9 +202,11 @@ export async function executeDeployChat(
199202
return {
200203
success: true,
201204
output: {
205+
workflowId,
202206
success: true,
203207
action: 'deploy',
204208
isDeployed: true,
209+
isChatDeployed: true,
205210
identifier,
206211
chatUrl: `${baseUrl}/chat/${identifier}`,
207212
apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`,
@@ -355,6 +360,7 @@ export async function executeRedeploy(
355360
success: true,
356361
output: {
357362
workflowId,
363+
isDeployed: true,
358364
deployedAt: result.deployedAt || null,
359365
version: result.version,
360366
apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`,

0 commit comments

Comments
 (0)