Skip to content

Commit c56ea0a

Browse files
TheodoreSpeaksSg312Theodore Li
authored andcommitted
fix(subagent, streaming) fix deploy subagent and task streamnig (#3610)
* Fix deploy subagent * fix(stream) handle task switching (#3609) * Fix task switching causing stream to abort * Process all task streams all the time * Process task streams that are in the background --------- Co-authored-by: Theodore Li <theo@sim.ai> * Always return isDeployed for undeploy chat * Fix lint --------- Co-authored-by: Siddharth Ganesan <siddharthganesan@gmail.com> Co-authored-by: Theodore Li <theo@sim.ai>
1 parent 1157a3a commit c56ea0a

File tree

6 files changed

+191
-33
lines changed

6 files changed

+191
-33
lines changed

apps/sim/app/api/mcp/copilot/route.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { type NextRequest, NextResponse } from 'next/server'
1919
import { validateOAuthAccessToken } from '@/lib/auth/oauth-token'
2020
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
2121
import { ORCHESTRATION_TIMEOUT_MS, SIM_AGENT_API_URL } from '@/lib/copilot/constants'
22+
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
2223
import { orchestrateSubagentStream } from '@/lib/copilot/orchestrator/subagent'
2324
import {
2425
executeToolServerSide,
@@ -28,6 +29,10 @@ import { DIRECT_TOOL_DEFS, SUBAGENT_TOOL_DEFS } from '@/lib/copilot/tools/mcp/de
2829
import { env } from '@/lib/core/config/env'
2930
import { RateLimiter } from '@/lib/core/rate-limiter'
3031
import { getBaseUrl } from '@/lib/core/utils/urls'
32+
import {
33+
authorizeWorkflowByWorkspacePermission,
34+
resolveWorkflowIdForUser,
35+
} from '@/lib/workflows/utils'
3136

3237
const logger = createLogger('CopilotMcpAPI')
3338
const mcpRateLimiter = new RateLimiter()
@@ -660,12 +665,110 @@ async function handleDirectToolCall(
660665
}
661666
}
662667

668+
/**
669+
* Build mode uses the main chat orchestrator with the 'fast' command instead of
670+
* the subagent endpoint. In Go, 'build' is not a registered subagent — it's a mode
671+
* (ModeFast) on the main chat processor that bypasses subagent orchestration and
672+
* executes all tools directly.
673+
*/
674+
async function handleBuildToolCall(
675+
args: Record<string, unknown>,
676+
userId: string,
677+
abortSignal?: AbortSignal
678+
): Promise<CallToolResult> {
679+
try {
680+
const requestText = (args.request as string) || JSON.stringify(args)
681+
const workflowId = args.workflowId as string | undefined
682+
683+
const resolved = workflowId
684+
? await (async () => {
685+
const authorization = await authorizeWorkflowByWorkspacePermission({
686+
workflowId,
687+
userId,
688+
action: 'read',
689+
})
690+
return authorization.allowed ? { workflowId } : null
691+
})()
692+
: await resolveWorkflowIdForUser(userId)
693+
694+
if (!resolved?.workflowId) {
695+
return {
696+
content: [
697+
{
698+
type: 'text',
699+
text: JSON.stringify(
700+
{
701+
success: false,
702+
error: 'workflowId is required for build. Call create_workflow first.',
703+
},
704+
null,
705+
2
706+
),
707+
},
708+
],
709+
isError: true,
710+
}
711+
}
712+
713+
const chatId = randomUUID()
714+
715+
const requestPayload = {
716+
message: requestText,
717+
workflowId: resolved.workflowId,
718+
userId,
719+
model: DEFAULT_COPILOT_MODEL,
720+
mode: 'agent',
721+
commands: ['fast'],
722+
messageId: randomUUID(),
723+
chatId,
724+
}
725+
726+
const result = await orchestrateCopilotStream(requestPayload, {
727+
userId,
728+
workflowId: resolved.workflowId,
729+
chatId,
730+
goRoute: '/api/mcp',
731+
autoExecuteTools: true,
732+
timeout: 300000,
733+
interactive: false,
734+
abortSignal,
735+
})
736+
737+
const responseData = {
738+
success: result.success,
739+
content: result.content,
740+
toolCalls: result.toolCalls,
741+
error: result.error,
742+
}
743+
744+
return {
745+
content: [{ type: 'text', text: JSON.stringify(responseData, null, 2) }],
746+
isError: !result.success,
747+
}
748+
} catch (error) {
749+
logger.error('Build tool call failed', { error })
750+
return {
751+
content: [
752+
{
753+
type: 'text',
754+
text: `Build failed: ${error instanceof Error ? error.message : String(error)}`,
755+
},
756+
],
757+
isError: true,
758+
}
759+
}
760+
}
761+
663762
async function handleSubagentToolCall(
664763
toolDef: (typeof SUBAGENT_TOOL_DEFS)[number],
665764
args: Record<string, unknown>,
666765
userId: string,
667766
abortSignal?: AbortSignal
668767
): Promise<CallToolResult> {
768+
if (toolDef.agentId === 'build') {
769+
return handleBuildToolCall(args, userId, abortSignal)
770+
}
771+
669772
try {
670773
const requestText =
671774
(args.request as string) ||

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { VFS_DIR_TO_RESOURCE } from '@/lib/copilot/resource-types'
1212
import { isWorkflowToolName } from '@/lib/copilot/workflow-tools'
1313
import { getNextWorkflowColor } from '@/lib/workflows/colors'
1414
import { invalidateResourceQueries } from '@/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry'
15+
import { deploymentKeys } from '@/hooks/queries/deployments'
1516
import {
1617
type TaskChatHistory,
1718
type TaskStoredContentBlock,
@@ -22,6 +23,7 @@ import {
2223
useChatHistory,
2324
} from '@/hooks/queries/tasks'
2425
import { getTopInsertionSortOrder } from '@/hooks/queries/utils/top-insertion-sort-order'
26+
import { workflowKeys } from '@/hooks/queries/workflows'
2527
import { useExecutionStream } from '@/hooks/use-execution-stream'
2628
import { useExecutionStore } from '@/stores/execution/store'
2729
import { useFolderStore } from '@/stores/folders/store'
@@ -74,6 +76,8 @@ const STATE_TO_STATUS: Record<string, ToolCallStatus> = {
7476
skipped: 'success',
7577
} as const
7678

79+
const DEPLOY_TOOL_NAMES = new Set(['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy'])
80+
7781
function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock {
7882
const mapped: ContentBlock = {
7983
type: block.type as ContentBlockType,
@@ -361,6 +365,15 @@ export function useChat(
361365

362366
useEffect(() => {
363367
if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return
368+
369+
const activeStreamId = chatHistory.activeStreamId
370+
const snapshot = chatHistory.streamSnapshot
371+
372+
if (activeStreamId && !snapshot && !sendingRef.current) {
373+
queryClient.invalidateQueries({ queryKey: taskKeys.detail(chatHistory.id) })
374+
return
375+
}
376+
364377
appliedChatIdRef.current = chatHistory.id
365378
setMessages(chatHistory.messages.map(mapStoredMessage))
366379

@@ -374,11 +387,6 @@ export function useChat(
374387
}
375388
}
376389

377-
// Kick off stream reconnection immediately if there's an active stream.
378-
// The stream snapshot was fetched in parallel with the chat history (same
379-
// API call), so there's no extra round-trip.
380-
const activeStreamId = chatHistory.activeStreamId
381-
const snapshot = chatHistory.streamSnapshot
382390
if (activeStreamId && !sendingRef.current) {
383391
const gen = ++streamGenRef.current
384392
const abortController = new AbortController()
@@ -396,8 +404,7 @@ export function useChat(
396404
const batchEvents = snapshot?.events ?? []
397405
const streamStatus = snapshot?.status ?? ''
398406

399-
if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) {
400-
// No snapshot available — stream buffer expired. Clean up.
407+
if (batchEvents.length === 0 && streamStatus === 'unknown') {
401408
const cid = chatIdRef.current
402409
if (cid) {
403410
fetch('/api/mothership/chat/stop', {
@@ -462,7 +469,7 @@ export function useChat(
462469
}
463470
reconnect()
464471
}
465-
}, [chatHistory, workspaceId])
472+
}, [chatHistory, workspaceId, queryClient])
466473

467474
useEffect(() => {
468475
if (resources.length === 0) {
@@ -687,6 +694,33 @@ export function useChat(
687694
onResourceEventRef.current?.()
688695
}
689696
}
697+
698+
if (DEPLOY_TOOL_NAMES.has(tc.name) && tc.status === 'success') {
699+
const output = tc.result?.output as Record<string, unknown> | undefined
700+
const deployedWorkflowId = (output?.workflowId as string) ?? undefined
701+
if (deployedWorkflowId && typeof output?.isDeployed === 'boolean') {
702+
const isDeployed = output.isDeployed as boolean
703+
const serverDeployedAt = output.deployedAt
704+
? new Date(output.deployedAt as string)
705+
: undefined
706+
useWorkflowRegistry
707+
.getState()
708+
.setDeploymentStatus(
709+
deployedWorkflowId,
710+
isDeployed,
711+
isDeployed ? (serverDeployedAt ?? new Date()) : undefined
712+
)
713+
queryClient.invalidateQueries({
714+
queryKey: deploymentKeys.info(deployedWorkflowId),
715+
})
716+
queryClient.invalidateQueries({
717+
queryKey: deploymentKeys.versions(deployedWorkflowId),
718+
})
719+
queryClient.invalidateQueries({
720+
queryKey: workflowKeys.list(workspaceId),
721+
})
722+
}
723+
}
690724
}
691725

692726
break
@@ -1117,11 +1151,6 @@ export function useChat(
11171151
useEffect(() => {
11181152
return () => {
11191153
streamGenRef.current++
1120-
// Only drop the browser→Sim read; the Sim→Go stream stays open
1121-
// so the backend can finish persisting. Explicit abort is only
1122-
// triggered by the stop button via /api/copilot/chat/abort.
1123-
abortControllerRef.current?.abort()
1124-
abortControllerRef.current = null
11251154
sendingRef.current = false
11261155
}
11271156
}, [])

apps/sim/lib/copilot/client-sse/handlers.ts

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,6 @@ export const sseHandlers: Record<string, SSEHandler> = {
567567
}
568568
}
569569

570-
// Deploy tools: update deployment status in workflow registry
571570
if (
572571
targetState === ClientToolCallState.success &&
573572
(current.name === 'deploy_api' ||
@@ -579,21 +578,30 @@ export const sseHandlers: Record<string, SSEHandler> = {
579578
const resultPayload = asRecord(
580579
data?.result || eventData.result || eventData.data || data?.data
581580
)
582-
const input = asRecord(current.params)
583-
const workflowId =
584-
(resultPayload?.workflowId as string) ||
585-
(input?.workflowId as string) ||
586-
useWorkflowRegistry.getState().activeWorkflowId
587-
const isDeployed = resultPayload?.isDeployed !== false
588-
if (workflowId) {
589-
useWorkflowRegistry
590-
.getState()
591-
.setDeploymentStatus(workflowId, isDeployed, isDeployed ? new Date() : undefined)
592-
logger.info('[SSE] Updated deployment status from tool result', {
593-
toolName: current.name,
594-
workflowId,
595-
isDeployed,
596-
})
581+
if (typeof resultPayload?.isDeployed === 'boolean') {
582+
const input = asRecord(current.params)
583+
const workflowId =
584+
(resultPayload?.workflowId as string) ||
585+
(input?.workflowId as string) ||
586+
useWorkflowRegistry.getState().activeWorkflowId
587+
const isDeployed = resultPayload.isDeployed as boolean
588+
const serverDeployedAt = resultPayload.deployedAt
589+
? new Date(resultPayload.deployedAt as string)
590+
: undefined
591+
if (workflowId) {
592+
useWorkflowRegistry
593+
.getState()
594+
.setDeploymentStatus(
595+
workflowId,
596+
isDeployed,
597+
isDeployed ? (serverDeployedAt ?? new Date()) : undefined
598+
)
599+
logger.info('[SSE] Updated deployment status from tool result', {
600+
toolName: current.name,
601+
workflowId,
602+
isDeployed,
603+
})
604+
}
597605
}
598606
} catch (err) {
599607
logger.warn('[SSE] Failed to hydrate deployment status', {

apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ export async function executeToolAndReport(
455455
logger.info('Tool execution succeeded', {
456456
toolCallId: toolCall.id,
457457
toolName: toolCall.name,
458+
output: result.output,
458459
})
459460
} else {
460461
logger.warn('Tool execution failed', {

apps/sim/lib/copilot/orchestrator/tool-executor/deployment-tools/deploy.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,16 @@ export async function executeDeployChat(
8787
return { success: false, error: 'Unauthorized chat access' }
8888
}
8989
await db.delete(chat).where(eq(chat.id, existing[0].id))
90-
return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } }
90+
return {
91+
success: true,
92+
output: {
93+
workflowId,
94+
success: true,
95+
action: 'undeploy',
96+
isDeployed: true,
97+
isChatDeployed: false,
98+
},
99+
}
91100
}
92101

93102
const { hasAccess } = await checkWorkflowAccessForChatCreation(workflowId, context.userId)
@@ -199,9 +208,11 @@ export async function executeDeployChat(
199208
return {
200209
success: true,
201210
output: {
211+
workflowId,
202212
success: true,
203213
action: 'deploy',
204214
isDeployed: true,
215+
isChatDeployed: true,
205216
identifier,
206217
chatUrl: `${baseUrl}/chat/${identifier}`,
207218
apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`,
@@ -252,6 +263,8 @@ export async function executeDeployMcp(
252263

253264
mcpPubSub?.publishWorkflowToolsChanged({ serverId, workspaceId })
254265

266+
// Intentionally omits `isDeployed` — removing from an MCP server does not
267+
// affect the workflow's API deployment.
255268
return {
256269
success: true,
257270
output: { workflowId, serverId, action: 'undeploy', removed: true },
@@ -335,9 +348,12 @@ export async function executeDeployMcp(
335348
}
336349
}
337350

338-
export async function executeRedeploy(context: ExecutionContext): Promise<ToolCallResult> {
351+
export async function executeRedeploy(
352+
params: { workflowId?: string },
353+
context: ExecutionContext
354+
): Promise<ToolCallResult> {
339355
try {
340-
const workflowId = context.workflowId
356+
const workflowId = params.workflowId || context.workflowId
341357
if (!workflowId) {
342358
return { success: false, error: 'workflowId is required' }
343359
}
@@ -352,6 +368,7 @@ export async function executeRedeploy(context: ExecutionContext): Promise<ToolCa
352368
success: true,
353369
output: {
354370
workflowId,
371+
isDeployed: true,
355372
deployedAt: result.deployedAt || null,
356373
version: result.version,
357374
apiEndpoint: `${baseUrl}/api/workflows/${workflowId}/run`,

apps/sim/lib/copilot/orchestrator/tool-executor/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record<
864864
deploy_api: (p, c) => executeDeployApi(p as DeployApiParams, c),
865865
deploy_chat: (p, c) => executeDeployChat(p as DeployChatParams, c),
866866
deploy_mcp: (p, c) => executeDeployMcp(p as DeployMcpParams, c),
867-
redeploy: (_p, c) => executeRedeploy(c),
867+
redeploy: (p, c) => executeRedeploy(p as { workflowId?: string }, c),
868868
check_deployment_status: (p, c) =>
869869
executeCheckDeploymentStatus(p as CheckDeploymentStatusParams, c),
870870
list_workspace_mcp_servers: (p, c) =>

0 commit comments

Comments
 (0)