Skip to content

Commit 852dc93

Browse files
Sg312icecrasher321
andauthored
fix(mothership): tool durability (#3731)
* Durability * Go check * Fix * add pptxgen setup to dockerfile * Update tools * Fix * Fix aborts and gen viz --------- Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
1 parent 5e53757 commit 852dc93

File tree

13 files changed

+330
-61
lines changed

13 files changed

+330
-61
lines changed

apps/sim/app/api/workspaces/[id]/pptx/preview/route.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,12 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
2727
return NextResponse.json({ error: 'Insufficient permissions' }, { status: 403 })
2828
}
2929

30-
const body = await req.json()
30+
let body: unknown
31+
try {
32+
body = await req.json()
33+
} catch {
34+
return NextResponse.json({ error: 'Invalid or missing JSON body' }, { status: 400 })
35+
}
3136
const { code } = body as { code?: string }
3237

3338
if (typeof code !== 'string' || code.trim().length === 0) {

apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/file-viewer.tsx

Lines changed: 59 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -546,50 +546,80 @@ function PptxPreview({
546546
const [rendering, setRendering] = useState(false)
547547
const [renderError, setRenderError] = useState<string | null>(null)
548548

549+
// Streaming preview: only re-triggers when the streaming source code or
550+
// workspace changes. Isolated from fileData/dataUpdatedAt so that file-list
551+
// refreshes don't abort the in-flight compilation request.
549552
useEffect(() => {
553+
if (streamingContent === undefined) return
554+
550555
let cancelled = false
551556
const controller = new AbortController()
552-
let debounceTimer: ReturnType<typeof setTimeout> | null = null
553557

554-
async function render() {
558+
const debounceTimer = setTimeout(async () => {
555559
if (cancelled) return
556560
try {
557561
setRendering(true)
558562
setRenderError(null)
559563

560-
if (streamingContent !== undefined) {
561-
const response = await fetch(`/api/workspaces/${workspaceId}/pptx/preview`, {
562-
method: 'POST',
563-
headers: { 'Content-Type': 'application/json' },
564-
body: JSON.stringify({ code: streamingContent }),
565-
signal: controller.signal,
566-
})
567-
if (!response.ok) {
568-
const err = await response.json().catch(() => ({ error: 'Preview failed' }))
569-
throw new Error(err.error || 'Preview failed')
570-
}
571-
if (cancelled) return
572-
const arrayBuffer = await response.arrayBuffer()
573-
if (cancelled) return
574-
const data = new Uint8Array(arrayBuffer)
575-
const images: string[] = []
576-
await renderPptxSlides(
577-
data,
578-
(src) => {
579-
images.push(src)
580-
if (!cancelled) setSlides([...images])
581-
},
582-
() => cancelled
583-
)
584-
return
564+
const response = await fetch(`/api/workspaces/${workspaceId}/pptx/preview`, {
565+
method: 'POST',
566+
headers: { 'Content-Type': 'application/json' },
567+
body: JSON.stringify({ code: streamingContent }),
568+
signal: controller.signal,
569+
})
570+
if (!response.ok) {
571+
const err = await response.json().catch(() => ({ error: 'Preview failed' }))
572+
throw new Error(err.error || 'Preview failed')
573+
}
574+
if (cancelled) return
575+
const arrayBuffer = await response.arrayBuffer()
576+
if (cancelled) return
577+
const data = new Uint8Array(arrayBuffer)
578+
const images: string[] = []
579+
await renderPptxSlides(
580+
data,
581+
(src) => {
582+
images.push(src)
583+
if (!cancelled) setSlides([...images])
584+
},
585+
() => cancelled
586+
)
587+
} catch (err) {
588+
if (!cancelled && !(err instanceof DOMException && err.name === 'AbortError')) {
589+
const msg = err instanceof Error ? err.message : 'Failed to render presentation'
590+
logger.error('PPTX render failed', { error: msg })
591+
setRenderError(msg)
585592
}
593+
} finally {
594+
if (!cancelled) setRendering(false)
595+
}
596+
}, 500)
597+
598+
return () => {
599+
cancelled = true
600+
clearTimeout(debounceTimer)
601+
controller.abort()
602+
}
603+
}, [streamingContent, workspaceId])
604+
605+
// Non-streaming render: uses the fetched binary directly on the client.
606+
// Skipped while streaming is active so it doesn't interfere.
607+
useEffect(() => {
608+
if (streamingContent !== undefined) return
609+
610+
let cancelled = false
586611

612+
async function render() {
613+
if (cancelled) return
614+
try {
587615
if (cached) {
588616
setSlides(cached)
589617
return
590618
}
591619

592620
if (!fileData) return
621+
setRendering(true)
622+
setRenderError(null)
593623
setSlides([])
594624
const data = new Uint8Array(fileData)
595625
const images: string[] = []
@@ -605,7 +635,7 @@ function PptxPreview({
605635
pptxCacheSet(cacheKey, images)
606636
}
607637
} catch (err) {
608-
if (!cancelled && !(err instanceof DOMException && err.name === 'AbortError')) {
638+
if (!cancelled) {
609639
const msg = err instanceof Error ? err.message : 'Failed to render presentation'
610640
logger.error('PPTX render failed', { error: msg })
611641
setRenderError(msg)
@@ -615,18 +645,10 @@ function PptxPreview({
615645
}
616646
}
617647

618-
// Debounce streaming renders so rapid SSE updates don't spawn a subprocess
619-
// per event. Non-streaming renders (file load / cache) run immediately.
620-
if (streamingContent !== undefined) {
621-
debounceTimer = setTimeout(render, 500)
622-
} else {
623-
render()
624-
}
648+
render()
625649

626650
return () => {
627651
cancelled = true
628-
if (debounceTimer) clearTimeout(debounceTimer)
629-
controller.abort()
630652
}
631653
}, [fileData, dataUpdatedAt, streamingContent, cacheKey, workspaceId])
632654

apps/sim/lib/copilot/orchestrator/index.ts

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,16 @@ function didAsyncToolSucceed(input: {
3333
durableError?: string | null
3434
completion?: { status: string } | undefined
3535
toolStateSuccess?: boolean | undefined
36+
toolStateStatus?: string | undefined
3637
}) {
37-
const { durableStatus, durableResult, durableError, completion, toolStateSuccess } = input
38+
const {
39+
durableStatus,
40+
durableResult,
41+
durableError,
42+
completion,
43+
toolStateSuccess,
44+
toolStateStatus,
45+
} = input
3846

3947
if (durableStatus === ASYNC_TOOL_STATUS.completed) {
4048
return true
@@ -50,6 +58,9 @@ function didAsyncToolSucceed(input: {
5058
})
5159
}
5260

61+
if (toolStateStatus === 'success') return true
62+
if (toolStateStatus === 'error' || toolStateStatus === 'cancelled') return false
63+
5364
return completion?.status === 'success' || toolStateSuccess === true
5465
}
5566

@@ -212,17 +223,29 @@ export async function orchestrateCopilotStream(
212223
})
213224
continue
214225
}
226+
const toolState = context.toolCalls.get(toolCallId)
227+
if (!durableRow && !localPendingPromise && toolState) {
228+
logger.info('Including Go-handled tool in resume payload (no Sim-side row)', {
229+
toolCallId,
230+
toolName: toolState.name,
231+
status: toolState.status,
232+
runId: continuation.runId,
233+
})
234+
claimableToolCallIds.push(toolCallId)
235+
continue
236+
}
215237
logger.warn('Skipping already-claimed or missing async tool resume', {
216238
toolCallId,
217239
runId: continuation.runId,
218240
})
219241
}
220242

243+
if (localPendingPromises.length > 0) {
244+
await Promise.allSettled(localPendingPromises)
245+
continue
246+
}
247+
221248
if (claimableToolCallIds.length === 0) {
222-
if (localPendingPromises.length > 0) {
223-
await Promise.allSettled(localPendingPromises)
224-
continue
225-
}
226249
logger.warn('Skipping async resume because no tool calls were claimable', {
227250
checkpointId: continuation.checkpointId,
228251
runId: continuation.runId,
@@ -257,6 +280,7 @@ export async function orchestrateCopilotStream(
257280
durableError: durable?.error,
258281
completion,
259282
toolStateSuccess: toolState?.result?.success,
283+
toolStateStatus: toolState?.status,
260284
})
261285
const data =
262286
durableResult ||

apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,39 @@ describe('sse-handlers tool lifecycle', () => {
155155
expect(updated?.status).toBe('cancelled')
156156
})
157157

158+
it('does not replace an in-flight pending promise on duplicate tool_call', async () => {
159+
let resolveTool: ((value: { success: boolean; output: { ok: boolean } }) => void) | undefined
160+
executeToolServerSide.mockImplementationOnce(
161+
() =>
162+
new Promise((resolve) => {
163+
resolveTool = resolve
164+
})
165+
)
166+
markToolComplete.mockResolvedValueOnce(true)
167+
168+
const event = {
169+
type: 'tool_call',
170+
data: { id: 'tool-inflight', name: 'read', arguments: { workflowId: 'workflow-1' } },
171+
}
172+
173+
await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })
174+
await new Promise((resolve) => setTimeout(resolve, 0))
175+
176+
const firstPromise = context.pendingToolPromises.get('tool-inflight')
177+
expect(firstPromise).toBeDefined()
178+
179+
await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })
180+
181+
expect(executeToolServerSide).toHaveBeenCalledTimes(1)
182+
expect(context.pendingToolPromises.get('tool-inflight')).toBe(firstPromise)
183+
184+
resolveTool?.({ success: true, output: { ok: true } })
185+
await new Promise((resolve) => setTimeout(resolve, 0))
186+
187+
expect(context.pendingToolPromises.has('tool-inflight')).toBe(false)
188+
expect(markToolComplete).toHaveBeenCalledTimes(1)
189+
})
190+
158191
it('still executes the tool when async row upsert fails', async () => {
159192
upsertAsyncToolCall.mockRejectedValueOnce(new Error('db down'))
160193
executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } })

apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,19 @@ import { executeToolAndReport, waitForToolCompletion } from './tool-execution'
2424

2525
const logger = createLogger('CopilotSseHandlers')
2626

27+
function registerPendingToolPromise(
28+
context: StreamingContext,
29+
toolCallId: string,
30+
pendingPromise: Promise<{ status: string; message?: string; data?: Record<string, unknown> }>
31+
) {
32+
context.pendingToolPromises.set(toolCallId, pendingPromise)
33+
pendingPromise.finally(() => {
34+
if (context.pendingToolPromises.get(toolCallId) === pendingPromise) {
35+
context.pendingToolPromises.delete(toolCallId)
36+
}
37+
})
38+
}
39+
2740
/**
2841
* When the Sim→Go stream is aborted, avoid starting server-side tool work and
2942
* unblock the Go async waiter with a terminal 499 completion.
@@ -327,6 +340,9 @@ export const sseHandlers: Record<string, SSEHandler> = {
327340

328341
if (isPartial) return
329342
if (wasToolResultSeen(toolCallId)) return
343+
if (context.pendingToolPromises.has(toolCallId) || existing?.status === 'executing') {
344+
return
345+
}
330346

331347
const toolCall = context.toolCalls.get(toolCallId)
332348
if (!toolCall) return
@@ -375,10 +391,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
375391
data: { error: err instanceof Error ? err.message : String(err) },
376392
}
377393
})
378-
context.pendingToolPromises.set(toolCallId, pendingPromise)
379-
pendingPromise.finally(() => {
380-
context.pendingToolPromises.delete(toolCallId)
381-
})
394+
registerPendingToolPromise(context, toolCallId, pendingPromise)
382395
}
383396

384397
if (options.interactive === false) {
@@ -574,6 +587,9 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
574587
}
575588

576589
if (isPartial) return
590+
if (context.pendingToolPromises.has(toolCallId) || existing?.status === 'executing') {
591+
return
592+
}
577593

578594
const { clientExecutable, internal } = getEventUI(event)
579595

@@ -614,10 +630,7 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
614630
data: { error: err instanceof Error ? err.message : String(err) },
615631
}
616632
})
617-
context.pendingToolPromises.set(toolCallId, pendingPromise)
618-
pendingPromise.finally(() => {
619-
context.pendingToolPromises.delete(toolCallId)
620-
})
633+
registerPendingToolPromise(context, toolCallId, pendingPromise)
621634
}
622635

623636
if (options.interactive === false) {

0 commit comments

Comments
 (0)