From cdd0f75cd5d8c7ca85bb6c6351957124640b6712 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com> Date: Tue, 17 Mar 2026 16:19:47 -0700 Subject: [PATCH 1/6] fix(mothership): fix mothership file uploads (#3640) * Fix files * Fix * Fix --- .../[workspaceId]/home/hooks/use-chat.ts | 2 +- .../contexts/workspace/workspace-file-manager.ts | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 299c8f0f852..74312caab0b 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -132,7 +132,7 @@ function toDisplayAttachment(f: TaskStoredFileAttachment): ChatMessageAttachment media_type: f.media_type, size: f.size, previewUrl: f.media_type.startsWith('image/') - ? `/api/files/serve/${encodeURIComponent(f.key)}?context=copilot` + ? `/api/files/serve/${encodeURIComponent(f.key)}?context=mothership` : undefined, } } diff --git a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts index 9c3dd16ec03..4d18638d3ff 100644 --- a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts +++ b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts @@ -209,8 +209,9 @@ export async function uploadWorkspaceFile( /** * Track a file that was already uploaded to workspace S3 as a chat-scoped upload. - * Creates a workspaceFiles record with context='mothership' and the given chatId. - * No S3 operations -- the file is already in storage from the presigned/upload step. + * Links the existing workspaceFiles metadata record (created by the storage service + * during upload) to the chat by setting chatId and context='mothership'. + * Falls back to inserting a new record if none exists for the key. */ export async function trackChatUpload( workspaceId: string, @@ -221,6 +222,17 @@ export async function trackChatUpload( contentType: string, size: number ): Promise { + const updated = await db + .update(workspaceFiles) + .set({ chatId, context: 'mothership' }) + .where(and(eq(workspaceFiles.key, s3Key), eq(workspaceFiles.workspaceId, workspaceId), isNull(workspaceFiles.deletedAt))) + .returning({ id: workspaceFiles.id }) + + if (updated.length > 0) { + logger.info(`Linked existing file record to chat: ${fileName} for chat ${chatId}`) + return + } + const fileId = `wf_${Date.now()}_${Math.random().toString(36).substring(2, 9)}` await db.insert(workspaceFiles).values({ From 75a3e2c3a813390aa3e6086723a78169d0336336 Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 17 Mar 2026 16:20:06 -0700 Subject: [PATCH 2/6] fix(workspace): prevent stale placeholder data from corrupting workflow registry on switch --- apps/sim/hooks/queries/workflows.ts | 22 +++++++++++++++++---- apps/sim/hooks/queries/workspace.ts | 11 +++-------- apps/sim/lib/copilot/client-sse/handlers.ts | 4 +++- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/apps/sim/hooks/queries/workflows.ts b/apps/sim/hooks/queries/workflows.ts index 97d3df0b2b5..cc9e7b0b696 100644 --- a/apps/sim/hooks/queries/workflows.ts +++ b/apps/sim/hooks/queries/workflows.ts @@ -113,10 +113,15 @@ export function useWorkflows( }) useEffect(() => { - if (syncRegistry && scope === 'active' && workspaceId && query.status === 'pending') { + if ( + syncRegistry && + scope === 'active' && + workspaceId && + (query.status === 'pending' || query.isPlaceholderData) + ) { beginMetadataLoad(workspaceId) } - }, [syncRegistry, scope, workspaceId, query.status, beginMetadataLoad]) + }, [syncRegistry, scope, workspaceId, query.status, query.isPlaceholderData, beginMetadataLoad]) useEffect(() => { if ( @@ -124,11 +129,20 @@ export function useWorkflows( scope === 'active' && workspaceId && query.status === 'success' && - query.data + query.data && + !query.isPlaceholderData ) { completeMetadataLoad(workspaceId, query.data) } - }, [syncRegistry, scope, workspaceId, query.status, query.data, completeMetadataLoad]) + }, [ + syncRegistry, + scope, + workspaceId, + query.status, + query.data, + query.isPlaceholderData, + completeMetadataLoad, + ]) useEffect(() => { if (syncRegistry && scope === 'active' && workspaceId && query.status === 'error') { diff --git a/apps/sim/hooks/queries/workspace.ts b/apps/sim/hooks/queries/workspace.ts index a3eb1bf5e6d..b4722aca3af 100644 --- a/apps/sim/hooks/queries/workspace.ts +++ b/apps/sim/hooks/queries/workspace.ts @@ -86,14 +86,9 @@ export function useCreateWorkspace() { const data = await response.json() return data.workspace as Workspace }, - onSuccess: (data) => { - queryClient.invalidateQueries({ queryKey: workspaceKeys.all }) - if (data?.id) { - queryClient.removeQueries({ queryKey: workspaceKeys.detail(data.id) }) - queryClient.removeQueries({ queryKey: workspaceKeys.settings(data.id) }) - queryClient.removeQueries({ queryKey: workspaceKeys.permissions(data.id) }) - queryClient.removeQueries({ queryKey: workspaceKeys.members(data.id) }) - } + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: workspaceKeys.lists() }) + queryClient.invalidateQueries({ queryKey: workspaceKeys.adminLists() }) }, }) } diff --git a/apps/sim/lib/copilot/client-sse/handlers.ts b/apps/sim/lib/copilot/client-sse/handlers.ts index 0330853d69c..7705e1cf3dd 100644 --- a/apps/sim/lib/copilot/client-sse/handlers.ts +++ b/apps/sim/lib/copilot/client-sse/handlers.ts @@ -3,6 +3,7 @@ import { STREAM_STORAGE_KEY } from '@/lib/copilot/constants' import { asRecord } from '@/lib/copilot/orchestrator/sse/utils' import type { SSEEvent } from '@/lib/copilot/orchestrator/types' import { + abortAllInProgressTools, isBackgroundState, isRejectedState, isReviewState, @@ -956,7 +957,7 @@ export const sseHandlers: Record = { })) context.streamComplete = true }, - stream_end: (_data, context, _get, set) => { + stream_end: (_data, context, get, set) => { if (context.pendingContent) { if (context.isInThinkingBlock && context.currentThinkingBlock) { appendThinkingContent(context, context.pendingContent) @@ -967,6 +968,7 @@ export const sseHandlers: Record = { } finalizeThinkingBlock(context) updateStreamingMessage(set, context) + abortAllInProgressTools(set, get) }, default: () => {}, } From c9f082da1a9802840c2249ddfe5c78ca6d4d5441 Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 17 Mar 2026 17:12:34 -0700 Subject: [PATCH 3/6] feat(csp): allow chat UI to be embedded in iframes (#3643) * feat(csp): allow chat UI to be embedded in iframes Mirror the existing form embed CSP pattern for chat pages: add getChatEmbedCSPPolicy() with frame-ancestors *, configure /chat/:path* headers in next.config.ts without X-Frame-Options, and early-return in proxy.ts so chat routes skip the strict runtime CSP. Co-Authored-By: Claude Opus 4.6 * refactor(csp): extract shared getEmbedCSPPolicy helper Deduplicate getChatEmbedCSPPolicy and getFormEmbedCSPPolicy into a shared private helper to prevent future divergence. Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- apps/sim/lib/core/security/csp.ts | 21 +++++++++++++++++---- apps/sim/next.config.ts | 25 ++++++++++++++++++++++--- apps/sim/proxy.ts | 7 ++----- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/apps/sim/lib/core/security/csp.ts b/apps/sim/lib/core/security/csp.ts index b1148998dd4..c5dde9ef41d 100644 --- a/apps/sim/lib/core/security/csp.ts +++ b/apps/sim/lib/core/security/csp.ts @@ -202,15 +202,28 @@ export function getWorkflowExecutionCSPPolicy(): string { } /** - * CSP for embeddable form pages + * Shared CSP for embeddable pages (chat, forms) * Allows embedding in iframes from any origin while maintaining other security policies */ -export function getFormEmbedCSPPolicy(): string { - const basePolicy = buildCSPString({ +function getEmbedCSPPolicy(): string { + return buildCSPString({ ...buildTimeCSPDirectives, 'frame-ancestors': ['*'], }) - return basePolicy +} + +/** + * CSP for embeddable chat pages + */ +export function getChatEmbedCSPPolicy(): string { + return getEmbedCSPPolicy() +} + +/** + * CSP for embeddable form pages + */ +export function getFormEmbedCSPPolicy(): string { + return getEmbedCSPPolicy() } /** diff --git a/apps/sim/next.config.ts b/apps/sim/next.config.ts index e5975cc6e9a..1b23b9ec93d 100644 --- a/apps/sim/next.config.ts +++ b/apps/sim/next.config.ts @@ -2,6 +2,7 @@ import type { NextConfig } from 'next' import { env, getEnv, isTruthy } from './lib/core/config/env' import { isDev } from './lib/core/config/feature-flags' import { + getChatEmbedCSPPolicy, getFormEmbedCSPPolicy, getMainCSPPolicy, getWorkflowExecutionCSPPolicy, @@ -255,6 +256,24 @@ const nextConfig: NextConfig = { }, ], }, + // Chat pages - allow iframe embedding from any origin + { + source: '/chat/:path*', + headers: [ + { + key: 'X-Content-Type-Options', + value: 'nosniff', + }, + // No X-Frame-Options to allow iframe embedding + { + key: 'Content-Security-Policy', + value: getChatEmbedCSPPolicy(), + }, + // Permissive CORS for chat requests from embedded chats + { key: 'Cross-Origin-Embedder-Policy', value: 'unsafe-none' }, + { key: 'Cross-Origin-Opener-Policy', value: 'unsafe-none' }, + ], + }, // Form pages - allow iframe embedding from any origin { source: '/form/:path*', @@ -284,10 +303,10 @@ const nextConfig: NextConfig = { ], }, // Apply security headers to routes not handled by middleware runtime CSP - // Middleware handles: /, /workspace/*, /chat/* - // Exclude form routes which have their own permissive headers + // Middleware handles: /, /workspace/* + // Exclude chat and form routes which have their own permissive embed headers { - source: '/((?!workspace|chat$|form).*)', + source: '/((?!workspace|chat|form).*)', headers: [ { key: 'X-Content-Type-Options', diff --git a/apps/sim/proxy.ts b/apps/sim/proxy.ts index 36ada3484f1..acd93ebfa12 100644 --- a/apps/sim/proxy.ts +++ b/apps/sim/proxy.ts @@ -155,6 +155,7 @@ export async function proxy(request: NextRequest) { return response } + // Chat pages are publicly accessible embeds — CSP is set in next.config.ts headers if (url.pathname.startsWith('/chat/')) { return NextResponse.next() } @@ -188,11 +189,7 @@ export async function proxy(request: NextRequest) { const response = NextResponse.next() response.headers.set('Vary', 'User-Agent') - if ( - url.pathname.startsWith('/workspace') || - url.pathname.startsWith('/chat') || - url.pathname === '/' - ) { + if (url.pathname.startsWith('/workspace') || url.pathname === '/') { response.headers.set('Content-Security-Policy', generateRuntimeCSP()) } From 67478bbc80382b636f4221d2f5dd754827465105 Mon Sep 17 00:00:00 2001 From: PlaneInABottle Date: Wed, 18 Mar 2026 03:24:40 +0300 Subject: [PATCH 4/6] fix(logs): add durable execution diagnostics foundation (#3564) * fix(logs): persist execution diagnostics markers Store last-started and last-completed block markers with finalization metadata so later read surfaces can explain how a run ended without reconstructing executor state. * fix(executor): preserve durable diagnostics ordering Await only the persistence needed to keep diagnostics durable before terminal completion while keeping callback failures from changing execution behavior. * fix(logs): preserve fallback diagnostics semantics Keep successful fallback output and accumulated cost intact while tightening progress-write draining and deduplicating trace span counting for diagnostics helpers. * fix(api): restore async execute route test mock Add the missing AuthType export to the hybrid auth mock so the async execution route test exercises the 202 queueing path instead of crashing with a 500 in CI. * fix(executor): align async block error handling * fix(logs): tighten marker ordering scope Allow same-millisecond marker writes to replace prior markers and drop the unused diagnostics read helper so this PR stays focused on persistence rather than unread foundation code. * fix(logs): remove unused finalization type guard Drop the unused helper so this PR only ships the persistence-side status types it actually uses. * fix(executor): await subflow diagnostics callbacks Ensure empty-subflow and subflow-error lifecycle callbacks participate in progress-write draining before terminal finalization while still swallowing callback failures. --------- Co-authored-by: test Co-authored-by: Vikhyath Mondreti --- apps/sim/executor/execution/block-executor.ts | 86 +++--- apps/sim/executor/orchestrators/loop.ts | 65 +++-- apps/sim/executor/orchestrators/node.ts | 24 +- .../executor/orchestrators/parallel.test.ts | 142 +++++++++ apps/sim/executor/orchestrators/parallel.ts | 54 ++-- apps/sim/executor/utils/subflow-utils.ts | 89 ++++-- apps/sim/lib/logs/execution/logger.test.ts | 25 +- apps/sim/lib/logs/execution/logger.ts | 57 +++- .../logs/execution/logging-session.test.ts | 271 +++++++++++++++++- .../sim/lib/logs/execution/logging-session.ts | 265 +++++++++++++++-- apps/sim/lib/logs/types.ts | 35 +++ .../workflows/executor/execution-core.test.ts | 192 ++++++++++++- .../lib/workflows/executor/execution-core.ts | 119 ++++++-- 13 files changed, 1217 insertions(+), 207 deletions(-) create mode 100644 apps/sim/executor/orchestrators/parallel.test.ts diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 9c54d2bd993..5680ee4e3cf 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -77,7 +77,7 @@ export class BlockExecutor { if (!isSentinel) { blockLog = this.createBlockLog(ctx, node.id, block, node) ctx.blockLogs.push(blockLog) - this.callOnBlockStart(ctx, node, block, blockLog.executionOrder) + await this.callOnBlockStart(ctx, node, block, blockLog.executionOrder) } const startTime = performance.now() @@ -105,7 +105,7 @@ export class BlockExecutor { } } catch (error) { cleanupSelfReference?.() - return this.handleBlockError( + return await this.handleBlockError( error, ctx, node, @@ -179,7 +179,7 @@ export class BlockExecutor { const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, { block, }) - this.callOnBlockComplete( + await this.callOnBlockComplete( ctx, node, block, @@ -195,7 +195,7 @@ export class BlockExecutor { return normalizedOutput } catch (error) { - return this.handleBlockError( + return await this.handleBlockError( error, ctx, node, @@ -226,7 +226,7 @@ export class BlockExecutor { return this.blockHandlers.find((h) => h.canHandle(block)) } - private handleBlockError( + private async handleBlockError( error: unknown, ctx: ExecutionContext, node: DAGNode, @@ -236,7 +236,7 @@ export class BlockExecutor { resolvedInputs: Record, isSentinel: boolean, phase: 'input_resolution' | 'execution' - ): NormalizedBlockOutput { + ): Promise { const duration = performance.now() - startTime const errorMessage = normalizeError(error) const hasResolvedInputs = @@ -287,7 +287,7 @@ export class BlockExecutor { ? error.childWorkflowInstanceId : undefined const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block }) - this.callOnBlockComplete( + await this.callOnBlockComplete( ctx, node, block, @@ -439,12 +439,12 @@ export class BlockExecutor { return redactApiKeys(result) } - private callOnBlockStart( + private async callOnBlockStart( ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, executionOrder: number - ): void { + ): Promise { const blockId = node.metadata?.originalBlockId ?? node.id const blockName = block.metadata?.name ?? blockId const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE @@ -452,18 +452,26 @@ export class BlockExecutor { const iterationContext = getIterationContext(ctx, node?.metadata) if (this.contextExtensions.onBlockStart) { - this.contextExtensions.onBlockStart( - blockId, - blockName, - blockType, - executionOrder, - iterationContext, - ctx.childWorkflowContext - ) + try { + await this.contextExtensions.onBlockStart( + blockId, + blockName, + blockType, + executionOrder, + iterationContext, + ctx.childWorkflowContext + ) + } catch (error) { + logger.warn('Block start callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } - private callOnBlockComplete( + private async callOnBlockComplete( ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, @@ -474,7 +482,7 @@ export class BlockExecutor { executionOrder: number, endedAt: string, childWorkflowInstanceId?: string - ): void { + ): Promise { const blockId = node.metadata?.originalBlockId ?? node.id const blockName = block.metadata?.name ?? blockId const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE @@ -482,22 +490,30 @@ export class BlockExecutor { const iterationContext = getIterationContext(ctx, node?.metadata) if (this.contextExtensions.onBlockComplete) { - this.contextExtensions.onBlockComplete( - blockId, - blockName, - blockType, - { - input, - output, - executionTime: duration, - startedAt, - executionOrder, - endedAt, - childWorkflowInstanceId, - }, - iterationContext, - ctx.childWorkflowContext - ) + try { + await this.contextExtensions.onBlockComplete( + blockId, + blockName, + blockType, + { + input, + output, + executionTime: duration, + startedAt, + executionOrder, + endedAt, + childWorkflowInstanceId, + }, + iterationContext, + ctx.childWorkflowContext + ) + } catch (error) { + logger.warn('Block completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index 039203c068a..0ea42a137e8 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -51,7 +51,7 @@ export class LoopOrchestrator { private edgeManager: EdgeManager | null = null ) {} - initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope { + async initializeLoopScope(ctx: ExecutionContext, loopId: string): Promise { const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined if (!loopConfig) { throw new Error(`Loop config not found: ${loopId}`) @@ -76,7 +76,7 @@ export class LoopOrchestrator { ) if (iterationError) { logger.error(iterationError, { loopId, requestedIterations }) - this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { + await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { iterations: requestedIterations, }) scope.maxIterations = 0 @@ -99,7 +99,7 @@ export class LoopOrchestrator { } catch (error) { const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}` logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems }) - this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, { + await this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, { forEachItems: loopConfig.forEachItems, }) scope.items = [] @@ -117,7 +117,7 @@ export class LoopOrchestrator { ) if (sizeError) { logger.error(sizeError, { loopId, collectionSize: items.length }) - this.addLoopErrorLog(ctx, loopId, loopType, sizeError, { + await this.addLoopErrorLog(ctx, loopId, loopType, sizeError, { forEachItems: loopConfig.forEachItems, collectionSize: items.length, }) @@ -155,7 +155,7 @@ export class LoopOrchestrator { ) if (iterationError) { logger.error(iterationError, { loopId, requestedIterations }) - this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { + await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { iterations: requestedIterations, }) scope.maxIterations = 0 @@ -182,14 +182,14 @@ export class LoopOrchestrator { return scope } - private addLoopErrorLog( + private async addLoopErrorLog( ctx: ExecutionContext, loopId: string, loopType: string, errorMessage: string, inputData?: any - ): void { - addSubflowErrorLog( + ): Promise { + await addSubflowErrorLog( ctx, loopId, 'loop', @@ -238,7 +238,7 @@ export class LoopOrchestrator { } if (isCancelled) { logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration }) - return this.createExitResult(ctx, loopId, scope) + return await this.createExitResult(ctx, loopId, scope) } const iterationResults: NormalizedBlockOutput[] = [] @@ -253,7 +253,7 @@ export class LoopOrchestrator { scope.currentIterationOutputs.clear() if (!(await this.evaluateCondition(ctx, scope, scope.iteration + 1))) { - return this.createExitResult(ctx, loopId, scope) + return await this.createExitResult(ctx, loopId, scope) } scope.iteration++ @@ -269,11 +269,11 @@ export class LoopOrchestrator { } } - private createExitResult( + private async createExitResult( ctx: ExecutionContext, loopId: string, scope: LoopScope - ): LoopContinuationResult { + ): Promise { const results = scope.allIterationOutputs const output = { results } this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME) @@ -282,19 +282,26 @@ export class LoopOrchestrator { const now = new Date().toISOString() const iterationContext = buildContainerIterationContext(ctx, loopId) - this.contextExtensions.onBlockComplete( - loopId, - 'Loop', - 'loop', - { - output, - executionTime: DEFAULTS.EXECUTION_TIME, - startedAt: now, - executionOrder: getNextExecutionOrder(ctx), - endedAt: now, - }, - iterationContext - ) + try { + await this.contextExtensions.onBlockComplete( + loopId, + 'Loop', + 'loop', + { + output, + executionTime: DEFAULTS.EXECUTION_TIME, + startedAt: now, + executionOrder: getNextExecutionOrder(ctx), + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Loop completion callback failed', { + loopId, + error: error instanceof Error ? error.message : String(error), + }) + } } return { @@ -597,7 +604,7 @@ export class LoopOrchestrator { if (!scope.items || scope.items.length === 0) { logger.info('ForEach loop has empty collection, skipping loop body', { loopId }) this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) return false } return true @@ -607,7 +614,7 @@ export class LoopOrchestrator { if (scope.maxIterations === 0) { logger.info('For loop has 0 iterations, skipping loop body', { loopId }) this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) return false } return true @@ -621,7 +628,7 @@ export class LoopOrchestrator { if (!scope.condition) { logger.warn('No condition defined for while loop', { loopId }) this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) return false } @@ -634,7 +641,7 @@ export class LoopOrchestrator { if (!result) { this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) } return result diff --git a/apps/sim/executor/orchestrators/node.ts b/apps/sim/executor/orchestrators/node.ts index 862f7c1a2e9..7ae57a555c9 100644 --- a/apps/sim/executor/orchestrators/node.ts +++ b/apps/sim/executor/orchestrators/node.ts @@ -53,14 +53,14 @@ export class NodeExecutionOrchestrator { const loopId = node.metadata.loopId if (loopId && !this.loopOrchestrator.getLoopScope(ctx, loopId)) { - this.loopOrchestrator.initializeLoopScope(ctx, loopId) + await this.loopOrchestrator.initializeLoopScope(ctx, loopId) } const parallelId = node.metadata.parallelId if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) { const parallelConfig = this.dag.parallelConfigs.get(parallelId) const nodesInParallel = parallelConfig?.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } if (node.metadata.isSentinel) { @@ -92,7 +92,7 @@ export class NodeExecutionOrchestrator { const isParallelSentinel = node.metadata.isParallelSentinel if (isParallelSentinel) { - return this.handleParallelSentinel(ctx, node, sentinelType, parallelId) + return await this.handleParallelSentinel(ctx, node, sentinelType, parallelId) } switch (sentinelType) { @@ -142,12 +142,12 @@ export class NodeExecutionOrchestrator { } } - private handleParallelSentinel( + private async handleParallelSentinel( ctx: ExecutionContext, node: DAGNode, sentinelType: string | undefined, parallelId: string | undefined - ): NormalizedBlockOutput { + ): Promise { if (!parallelId) { logger.warn('Parallel sentinel called without parallelId') return {} @@ -158,7 +158,7 @@ export class NodeExecutionOrchestrator { const parallelConfig = this.dag.parallelConfigs.get(parallelId) if (parallelConfig) { const nodesInParallel = parallelConfig.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } } @@ -176,7 +176,7 @@ export class NodeExecutionOrchestrator { } if (sentinelType === 'end') { - const result = this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) + const result = await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) return { results: result.results || [], sentinelEnd: true, @@ -210,7 +210,7 @@ export class NodeExecutionOrchestrator { } else if (isParallelBranch) { const parallelId = this.findParallelIdForNode(node.id) if (parallelId) { - this.handleParallelNodeCompletion(ctx, node, output, parallelId) + await this.handleParallelNodeCompletion(ctx, node, output, parallelId) } else { this.handleRegularNodeCompletion(ctx, node, output) } @@ -229,17 +229,17 @@ export class NodeExecutionOrchestrator { this.state.setBlockOutput(node.id, output) } - private handleParallelNodeCompletion( + private async handleParallelNodeCompletion( ctx: ExecutionContext, node: DAGNode, output: NormalizedBlockOutput, parallelId: string - ): void { + ): Promise { const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId) if (!scope) { const parallelConfig = this.dag.parallelConfigs.get(parallelId) const nodesInParallel = parallelConfig?.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } const allComplete = this.parallelOrchestrator.handleParallelBranchCompletion( ctx, @@ -248,7 +248,7 @@ export class NodeExecutionOrchestrator { output ) if (allComplete) { - this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) + await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) } this.state.setBlockOutput(node.id, output) diff --git a/apps/sim/executor/orchestrators/parallel.test.ts b/apps/sim/executor/orchestrators/parallel.test.ts new file mode 100644 index 00000000000..34b2c011f9e --- /dev/null +++ b/apps/sim/executor/orchestrators/parallel.test.ts @@ -0,0 +1,142 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import type { DAG } from '@/executor/dag/builder' +import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types' +import { ParallelOrchestrator } from '@/executor/orchestrators/parallel' +import type { ExecutionContext } from '@/executor/types' + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), +})) + +function createDag(): DAG { + return { + nodes: new Map(), + loopConfigs: new Map(), + parallelConfigs: new Map([ + [ + 'parallel-1', + { + id: 'parallel-1', + nodes: ['task-1'], + distribution: [], + parallelType: 'collection', + }, + ], + ]), + } +} + +function createState(): BlockStateWriter { + return { + setBlockOutput: vi.fn(), + setBlockState: vi.fn(), + deleteBlockState: vi.fn(), + unmarkExecuted: vi.fn(), + } +} + +function createContext(overrides: Partial = {}): ExecutionContext { + return { + workflowId: 'workflow-1', + workspaceId: 'workspace-1', + executionId: 'execution-1', + userId: 'user-1', + blockStates: new Map(), + executedBlocks: new Set(), + blockLogs: [], + metadata: { duration: 0 }, + environmentVariables: {}, + decisions: { + router: new Map(), + condition: new Map(), + }, + completedLoops: new Set(), + activeExecutionPath: new Set(), + workflow: { + version: '1', + blocks: [ + { + id: 'parallel-1', + position: { x: 0, y: 0 }, + config: { tool: '', params: {} }, + inputs: {}, + outputs: {}, + metadata: { id: 'parallel', name: 'Parallel 1' }, + enabled: true, + }, + ], + connections: [], + loops: {}, + parallels: {}, + }, + ...overrides, + } +} + +describe('ParallelOrchestrator', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('awaits empty-subflow lifecycle callbacks before returning the empty scope', async () => { + let releaseStart: (() => void) | undefined + const onBlockStart = vi.fn( + () => + new Promise((resolve) => { + releaseStart = resolve + }) + ) + const onBlockComplete = vi.fn() + const contextExtensions: ContextExtensions = { + onBlockStart, + onBlockComplete, + } + const orchestrator = new ParallelOrchestrator( + createDag(), + createState(), + null, + contextExtensions + ) + const ctx = createContext() + + const initializePromise = orchestrator.initializeParallelScope(ctx, 'parallel-1', 1) + await Promise.resolve() + + expect(onBlockStart).toHaveBeenCalledTimes(1) + expect(onBlockComplete).not.toHaveBeenCalled() + + releaseStart?.() + const scope = await initializePromise + + expect(onBlockComplete).toHaveBeenCalledTimes(1) + expect(scope.isEmpty).toBe(true) + }) + + it('swallows helper callback failures on empty parallel paths', async () => { + const contextExtensions: ContextExtensions = { + onBlockStart: vi.fn().mockRejectedValue(new Error('start failed')), + onBlockComplete: vi.fn().mockRejectedValue(new Error('complete failed')), + } + const orchestrator = new ParallelOrchestrator( + createDag(), + createState(), + null, + contextExtensions + ) + + await expect( + orchestrator.initializeParallelScope(createContext(), 'parallel-1', 1) + ).resolves.toMatchObject({ + parallelId: 'parallel-1', + isEmpty: true, + }) + }) +}) diff --git a/apps/sim/executor/orchestrators/parallel.ts b/apps/sim/executor/orchestrators/parallel.ts index ed098066bc0..23f7dede964 100644 --- a/apps/sim/executor/orchestrators/parallel.ts +++ b/apps/sim/executor/orchestrators/parallel.ts @@ -47,11 +47,11 @@ export class ParallelOrchestrator { private contextExtensions: ContextExtensions | null = null ) {} - initializeParallelScope( + async initializeParallelScope( ctx: ExecutionContext, parallelId: string, terminalNodesCount = 1 - ): ParallelScope { + ): Promise { const parallelConfig = this.dag.parallelConfigs.get(parallelId) if (!parallelConfig) { throw new Error(`Parallel config not found: ${parallelId}`) @@ -69,7 +69,7 @@ export class ParallelOrchestrator { } catch (error) { const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}` logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution }) - this.addParallelErrorLog(ctx, parallelId, errorMessage, { + await this.addParallelErrorLog(ctx, parallelId, errorMessage, { distribution: parallelConfig.distribution, }) this.setErrorScope(ctx, parallelId, errorMessage) @@ -83,7 +83,7 @@ export class ParallelOrchestrator { ) if (branchError) { logger.error(branchError, { parallelId, branchCount }) - this.addParallelErrorLog(ctx, parallelId, branchError, { + await this.addParallelErrorLog(ctx, parallelId, branchError, { distribution: parallelConfig.distribution, branchCount, }) @@ -109,7 +109,7 @@ export class ParallelOrchestrator { this.state.setBlockOutput(parallelId, { results: [] }) - emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions) + await emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions) logger.info('Parallel scope initialized with empty distribution, skipping body', { parallelId, @@ -220,13 +220,13 @@ export class ParallelOrchestrator { return { branchCount: items.length, items } } - private addParallelErrorLog( + private async addParallelErrorLog( ctx: ExecutionContext, parallelId: string, errorMessage: string, inputData?: any - ): void { - addSubflowErrorLog( + ): Promise { + await addSubflowErrorLog( ctx, parallelId, 'parallel', @@ -291,7 +291,10 @@ export class ParallelOrchestrator { return allComplete } - aggregateParallelResults(ctx: ExecutionContext, parallelId: string): ParallelAggregationResult { + async aggregateParallelResults( + ctx: ExecutionContext, + parallelId: string + ): Promise { const scope = ctx.parallelExecutions?.get(parallelId) if (!scope) { logger.error('Parallel scope not found for aggregation', { parallelId }) @@ -316,19 +319,26 @@ export class ParallelOrchestrator { const now = new Date().toISOString() const iterationContext = buildContainerIterationContext(ctx, parallelId) - this.contextExtensions.onBlockComplete( - parallelId, - 'Parallel', - 'parallel', - { - output, - executionTime: 0, - startedAt: now, - executionOrder: getNextExecutionOrder(ctx), - endedAt: now, - }, - iterationContext - ) + try { + await this.contextExtensions.onBlockComplete( + parallelId, + 'Parallel', + 'parallel', + { + output, + executionTime: 0, + startedAt: now, + executionOrder: getNextExecutionOrder(ctx), + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Parallel completion callback failed', { + parallelId, + error: error instanceof Error ? error.message : String(error), + }) + } } return { diff --git a/apps/sim/executor/utils/subflow-utils.ts b/apps/sim/executor/utils/subflow-utils.ts index 977be2788c9..98391442a78 100644 --- a/apps/sim/executor/utils/subflow-utils.ts +++ b/apps/sim/executor/utils/subflow-utils.ts @@ -1,9 +1,12 @@ +import { createLogger } from '@sim/logger' import { DEFAULTS, LOOP, PARALLEL, REFERENCE } from '@/executor/constants' import type { ContextExtensions } from '@/executor/execution/types' import { type BlockLog, type ExecutionContext, getNextExecutionOrder } from '@/executor/types' import { buildContainerIterationContext } from '@/executor/utils/iteration-context' import type { VariableResolver } from '@/executor/variables/resolver' +const logger = createLogger('SubflowUtils') + const BRANCH_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}\\d+${PARALLEL.BRANCH.SUFFIX}$`) const BRANCH_INDEX_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}(\\d+)${PARALLEL.BRANCH.SUFFIX}$`) const LOOP_SENTINEL_START_PATTERN = new RegExp( @@ -265,14 +268,14 @@ export function resolveArrayInput( /** * Creates and logs an error for a subflow (loop or parallel). */ -export function addSubflowErrorLog( +export async function addSubflowErrorLog( ctx: ExecutionContext, blockId: string, blockType: 'loop' | 'parallel', errorMessage: string, inputData: Record, contextExtensions: ContextExtensions | null -): void { +): Promise { const now = new Date().toISOString() const execOrder = getNextExecutionOrder(ctx) @@ -296,18 +299,34 @@ export function addSubflowErrorLog( ctx.blockLogs.push(blockLog) if (contextExtensions?.onBlockStart) { - contextExtensions.onBlockStart(blockId, blockName, blockType, execOrder) + try { + await contextExtensions.onBlockStart(blockId, blockName, blockType, execOrder) + } catch (error) { + logger.warn('Subflow error start callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } if (contextExtensions?.onBlockComplete) { - contextExtensions.onBlockComplete(blockId, blockName, blockType, { - input: inputData, - output: { error: errorMessage }, - executionTime: 0, - startedAt: now, - executionOrder: execOrder, - endedAt: now, - }) + try { + await contextExtensions.onBlockComplete(blockId, blockName, blockType, { + input: inputData, + output: { error: errorMessage }, + executionTime: 0, + startedAt: now, + executionOrder: execOrder, + endedAt: now, + }) + } catch (error) { + logger.warn('Subflow error completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } @@ -316,12 +335,12 @@ export function addSubflowErrorLog( * empty collection or false initial condition. This ensures the container block * appears in terminal logs, execution snapshots, and edge highlighting. */ -export function emitEmptySubflowEvents( +export async function emitEmptySubflowEvents( ctx: ExecutionContext, blockId: string, blockType: 'loop' | 'parallel', contextExtensions: ContextExtensions | null -): void { +): Promise { const now = new Date().toISOString() const executionOrder = getNextExecutionOrder(ctx) const output = { results: [] } @@ -342,22 +361,38 @@ export function emitEmptySubflowEvents( }) if (contextExtensions?.onBlockStart) { - contextExtensions.onBlockStart(blockId, blockName, blockType, executionOrder) + try { + await contextExtensions.onBlockStart(blockId, blockName, blockType, executionOrder) + } catch (error) { + logger.warn('Empty subflow start callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } if (contextExtensions?.onBlockComplete) { - contextExtensions.onBlockComplete( - blockId, - blockName, - blockType, - { - output, - executionTime: DEFAULTS.EXECUTION_TIME, - startedAt: now, - executionOrder, - endedAt: now, - }, - iterationContext - ) + try { + await contextExtensions.onBlockComplete( + blockId, + blockName, + blockType, + { + output, + executionTime: DEFAULTS.EXECUTION_TIME, + startedAt: now, + executionOrder, + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Empty subflow completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } diff --git a/apps/sim/lib/logs/execution/logger.test.ts b/apps/sim/lib/logs/execution/logger.test.ts index a1bd9962d8d..7cfdb6ea9e6 100644 --- a/apps/sim/lib/logs/execution/logger.test.ts +++ b/apps/sim/lib/logs/execution/logger.test.ts @@ -1,6 +1,6 @@ import { databaseMock, loggerMock } from '@sim/testing' import { beforeEach, describe, expect, test, vi } from 'vitest' -import { ExecutionLogger } from './logger' +import { ExecutionLogger } from '@/lib/logs/execution/logger' vi.mock('@sim/db', () => databaseMock) @@ -112,7 +112,7 @@ describe('ExecutionLogger', () => { expect(typeof logger.getWorkflowExecution).toBe('function') }) - test('preserves start correlation data when execution completes', () => { + test('preserves correlation and diagnostics when execution completes', () => { const loggerInstance = new ExecutionLogger() as any const completedData = loggerInstance.buildCompletedExecutionData({ @@ -140,9 +140,24 @@ describe('ExecutionLogger', () => { }, }, }, + lastStartedBlock: { + blockId: 'block-start', + blockName: 'Start', + blockType: 'agent', + startedAt: '2025-01-01T00:00:00.000Z', + }, + lastCompletedBlock: { + blockId: 'block-end', + blockName: 'Finish', + blockType: 'api', + endedAt: '2025-01-01T00:00:05.000Z', + success: true, + }, }, traceSpans: [], finalOutput: { ok: true }, + finalizationPath: 'completed', + completionFailure: 'fallback failure', executionCost: { tokens: { input: 0, output: 0, total: 0 }, models: {}, @@ -161,6 +176,12 @@ describe('ExecutionLogger', () => { }) expect(completedData.correlation).toEqual(completedData.trigger?.data?.correlation) expect(completedData.finalOutput).toEqual({ ok: true }) + expect(completedData.lastStartedBlock?.blockId).toBe('block-start') + expect(completedData.lastCompletedBlock?.blockId).toBe('block-end') + expect(completedData.finalizationPath).toBe('completed') + expect(completedData.completionFailure).toBe('fallback failure') + expect(completedData.hasTraceSpans).toBe(false) + expect(completedData.traceSpanCount).toBe(0) }) }) diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 64e0df01175..a7c8458ac78 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -27,6 +27,7 @@ import { snapshotService } from '@/lib/logs/execution/snapshot/service' import type { BlockOutputData, ExecutionEnvironment, + ExecutionFinalizationPath, ExecutionTrigger, ExecutionLoggerService as IExecutionLoggerService, TraceSpan, @@ -49,11 +50,21 @@ export interface ToolCall { const logger = createLogger('ExecutionLogger') +function countTraceSpans(traceSpans?: TraceSpan[]): number { + if (!Array.isArray(traceSpans) || traceSpans.length === 0) { + return 0 + } + + return traceSpans.reduce((count, span) => count + 1 + countTraceSpans(span.children), 0) +} + export class ExecutionLogger implements IExecutionLoggerService { private buildCompletedExecutionData(params: { existingExecutionData?: WorkflowExecutionLog['executionData'] traceSpans?: TraceSpan[] finalOutput: BlockOutputData + finalizationPath?: ExecutionFinalizationPath + completionFailure?: string executionCost: { tokens: { input: number @@ -64,7 +75,16 @@ export class ExecutionLogger implements IExecutionLoggerService { } executionState?: SerializableExecutionState }): WorkflowExecutionLog['executionData'] { - const { existingExecutionData, traceSpans, finalOutput, executionCost, executionState } = params + const { + existingExecutionData, + traceSpans, + finalOutput, + finalizationPath, + completionFailure, + executionCost, + executionState, + } = params + const traceSpanCount = countTraceSpans(traceSpans) return { ...(existingExecutionData?.environment @@ -78,6 +98,17 @@ export class ExecutionLogger implements IExecutionLoggerService { existingExecutionData?.trigger?.data?.correlation, } : {}), + ...(existingExecutionData?.error ? { error: existingExecutionData.error } : {}), + ...(existingExecutionData?.lastStartedBlock + ? { lastStartedBlock: existingExecutionData.lastStartedBlock } + : {}), + ...(existingExecutionData?.lastCompletedBlock + ? { lastCompletedBlock: existingExecutionData.lastCompletedBlock } + : {}), + ...(completionFailure ? { completionFailure } : {}), + ...(finalizationPath ? { finalizationPath } : {}), + hasTraceSpans: traceSpanCount > 0, + traceSpanCount, traceSpans, finalOutput, tokens: { @@ -173,6 +204,8 @@ export class ExecutionLogger implements IExecutionLoggerService { environment, trigger, ...(trigger.data?.correlation ? { correlation: trigger.data.correlation } : {}), + hasTraceSpans: false, + traceSpanCount: 0, }, cost: { total: BASE_EXECUTION_CHARGE, @@ -232,6 +265,8 @@ export class ExecutionLogger implements IExecutionLoggerService { traceSpans?: TraceSpan[] workflowInput?: any executionState?: SerializableExecutionState + finalizationPath?: ExecutionFinalizationPath + completionFailure?: string isResume?: boolean level?: 'info' | 'error' status?: 'completed' | 'failed' | 'cancelled' | 'pending' @@ -245,6 +280,8 @@ export class ExecutionLogger implements IExecutionLoggerService { traceSpans, workflowInput, executionState, + finalizationPath, + completionFailure, isResume, level: levelOverride, status: statusOverride, @@ -315,6 +352,16 @@ export class ExecutionLogger implements IExecutionLoggerService { ? Math.max(0, Math.round(rawDurationMs)) : 0 + const completedExecutionData = this.buildCompletedExecutionData({ + existingExecutionData, + traceSpans: redactedTraceSpans, + finalOutput: redactedFinalOutput, + finalizationPath, + completionFailure, + executionCost, + executionState, + }) + const [updatedLog] = await db .update(workflowExecutionLogs) .set({ @@ -323,13 +370,7 @@ export class ExecutionLogger implements IExecutionLoggerService { endedAt: new Date(endedAt), totalDurationMs: totalDuration, files: executionFiles.length > 0 ? executionFiles : null, - executionData: this.buildCompletedExecutionData({ - existingExecutionData, - traceSpans: redactedTraceSpans, - finalOutput: redactedFinalOutput, - executionCost, - executionState, - }), + executionData: completedExecutionData, cost: executionCost, }) .where(eq(workflowExecutionLogs.executionId, executionId)) diff --git a/apps/sim/lib/logs/execution/logging-session.test.ts b/apps/sim/lib/logs/execution/logging-session.test.ts index 2f9bd2370f2..a44bc5e72d5 100644 --- a/apps/sim/lib/logs/execution/logging-session.test.ts +++ b/apps/sim/lib/logs/execution/logging-session.test.ts @@ -1,11 +1,48 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' +const dbMocks = vi.hoisted(() => { + const selectLimit = vi.fn() + const selectWhere = vi.fn() + const selectFrom = vi.fn() + const select = vi.fn() + const updateWhere = vi.fn() + const updateSet = vi.fn() + const update = vi.fn() + const execute = vi.fn() + const eq = vi.fn() + const sql = vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })) + + select.mockReturnValue({ from: selectFrom }) + selectFrom.mockReturnValue({ where: selectWhere }) + selectWhere.mockReturnValue({ limit: selectLimit }) + + update.mockReturnValue({ set: updateSet }) + updateSet.mockReturnValue({ where: updateWhere }) + + return { + select, + selectFrom, + selectWhere, + selectLimit, + update, + updateSet, + updateWhere, + execute, + eq, + sql, + } +}) + const { completeWorkflowExecutionMock } = vi.hoisted(() => ({ completeWorkflowExecutionMock: vi.fn(), })) vi.mock('@sim/db', () => ({ - db: {}, + db: { + select: dbMocks.select, + update: dbMocks.update, + execute: dbMocks.execute, + }, })) vi.mock('@sim/db/schema', () => ({ @@ -22,8 +59,8 @@ vi.mock('@sim/logger', () => ({ })) vi.mock('drizzle-orm', () => ({ - eq: vi.fn(), - sql: vi.fn(), + eq: dbMocks.eq, + sql: dbMocks.sql, })) vi.mock('@/lib/logs/execution/logger', () => ({ @@ -56,6 +93,9 @@ import { LoggingSession } from './logging-session' describe('LoggingSession completion retries', () => { beforeEach(() => { vi.clearAllMocks() + dbMocks.selectLimit.mockResolvedValue([{ executionData: {} }]) + dbMocks.updateWhere.mockResolvedValue(undefined) + dbMocks.execute.mockResolvedValue(undefined) }) it('keeps completion best-effort when a later error completion retries after full completion and fallback both fail', async () => { @@ -86,7 +126,6 @@ describe('LoggingSession completion retries', () => { .mockRejectedValueOnce(new Error('cost only failed')) await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined() - await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined() expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2) @@ -118,6 +157,64 @@ describe('LoggingSession completion retries', () => { expect(session.hasCompleted()).toBe(true) }) + it('preserves successful final output during fallback completion', async () => { + const session = new LoggingSession('workflow-1', 'execution-5', 'api', 'req-1') + + completeWorkflowExecutionMock + .mockRejectedValueOnce(new Error('success finalize failed')) + .mockResolvedValueOnce({}) + + await expect( + session.safeComplete({ finalOutput: { ok: true, stage: 'done' } }) + ).resolves.toBeUndefined() + + expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith( + expect.objectContaining({ + executionId: 'execution-5', + finalOutput: { ok: true, stage: 'done' }, + finalizationPath: 'fallback_completed', + }) + ) + }) + + it('preserves accumulated cost during fallback completion', async () => { + const session = new LoggingSession('workflow-1', 'execution-6', 'api', 'req-1') as any + + session.accumulatedCost = { + total: 12, + input: 5, + output: 7, + tokens: { input: 11, output: 13, total: 24 }, + models: { + 'test-model': { + input: 5, + output: 7, + total: 12, + tokens: { input: 11, output: 13, total: 24 }, + }, + }, + } + session.costFlushed = true + + completeWorkflowExecutionMock + .mockRejectedValueOnce(new Error('success finalize failed')) + .mockResolvedValueOnce({}) + + await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined() + + expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith( + expect.objectContaining({ + executionId: 'execution-6', + costSummary: expect.objectContaining({ + totalCost: 12, + totalInputCost: 5, + totalOutputCost: 7, + totalTokens: 24, + }), + }) + ) + }) + it('persists failed error semantics when completeWithError receives non-error trace spans', async () => { const session = new LoggingSession('workflow-1', 'execution-4', 'api', 'req-1') const traceSpans = [ @@ -148,6 +245,8 @@ describe('LoggingSession completion retries', () => { traceSpans, level: 'error', status: 'failed', + finalizationPath: 'force_failed', + completionFailure: 'persist me as failed', }) ) }) @@ -196,4 +295,168 @@ describe('LoggingSession completion retries', () => { expect(session.hasCompleted()).toBe(true) expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2) }) + + it('persists last started block independently from cost accumulation', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + + expect(dbMocks.select).not.toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + + it('enforces started marker monotonicity in the database write path', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + + expect(dbMocks.sql).toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + + it('allows same-millisecond started markers to replace the prior marker', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + + const queryCall = dbMocks.sql.mock.calls.at(-1) + expect(queryCall).toBeDefined() + + const [query] = queryCall! + expect(Array.from(query).join(' ')).toContain('<=') + }) + + it('persists last completed block for zero-cost outputs', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockComplete('block-2', 'Transform', 'function', { + endedAt: '2025-01-01T00:00:01.000Z', + output: { value: true }, + }) + + expect(dbMocks.select).not.toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + + it('allows same-millisecond completed markers to replace the prior marker', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockComplete('block-2', 'Transform', 'function', { + endedAt: '2025-01-01T00:00:01.000Z', + output: { value: true }, + }) + + const queryCall = dbMocks.sql.mock.calls.at(-1) + expect(queryCall).toBeDefined() + + const [query] = queryCall! + expect(Array.from(query).join(' ')).toContain('<=') + }) + + it('drains pending lifecycle writes before terminal completion', async () => { + let releasePersist: (() => void) | undefined + const persistPromise = new Promise((resolve) => { + releasePersist = resolve + }) + + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + session.persistLastStartedBlock = vi.fn(() => persistPromise) + session.complete = vi.fn().mockResolvedValue(undefined) + + const startPromise = session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + const completionPromise = session.safeComplete({ finalOutput: { ok: true } }) + + await Promise.resolve() + + expect(session.complete).not.toHaveBeenCalled() + + releasePersist?.() + + await startPromise + await completionPromise + + expect(session.persistLastStartedBlock).toHaveBeenCalledTimes(1) + expect(session.complete).toHaveBeenCalledTimes(1) + }) + + it('drains fire-and-forget cost flushes before terminal completion', async () => { + let releaseFlush: (() => void) | undefined + const flushPromise = new Promise((resolve) => { + releaseFlush = resolve + }) + + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + session.flushAccumulatedCost = vi.fn(() => flushPromise) + session.complete = vi.fn().mockResolvedValue(undefined) + + await session.onBlockComplete('block-2', 'Transform', 'function', { + endedAt: '2025-01-01T00:00:01.000Z', + output: { value: true }, + cost: { total: 1, input: 1, output: 0 }, + tokens: { input: 1, output: 0, total: 1 }, + model: 'test-model', + }) + + const completionPromise = session.safeComplete({ finalOutput: { ok: true } }) + + await Promise.resolve() + + expect(session.complete).not.toHaveBeenCalled() + + releaseFlush?.() + + await completionPromise + + expect(session.flushAccumulatedCost).toHaveBeenCalledTimes(1) + expect(session.complete).toHaveBeenCalledTimes(1) + }) + + it('keeps draining when new progress writes arrive during drain', async () => { + let releaseFirst: (() => void) | undefined + let releaseSecond: (() => void) | undefined + const firstPromise = new Promise((resolve) => { + releaseFirst = resolve + }) + const secondPromise = new Promise((resolve) => { + releaseSecond = resolve + }) + + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + + void session.trackProgressWrite(firstPromise) + + const drainPromise = session.drainPendingProgressWrites() + + await Promise.resolve() + + void session.trackProgressWrite(secondPromise) + releaseFirst?.() + + await Promise.resolve() + + let drained = false + void drainPromise.then(() => { + drained = true + }) + + await Promise.resolve() + expect(drained).toBe(false) + + releaseSecond?.() + await drainPromise + + expect(session.pendingProgressWrites.size).toBe(0) + }) + + it('marks pause completion as terminal and prevents duplicate pause finalization', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + session.completeExecutionWithFinalization = vi.fn().mockResolvedValue(undefined) + + await session.completeWithPause({ workflowInput: { ok: true } }) + await session.completeWithPause({ workflowInput: { ok: true } }) + + expect(session.completeExecutionWithFinalization).toHaveBeenCalledTimes(1) + expect(session.completed).toBe(true) + expect(session.completing).toBe(true) + }) }) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index 4cd4220d5bf..7fb72bc1d73 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -13,6 +13,9 @@ import { } from '@/lib/logs/execution/logging-factory' import type { ExecutionEnvironment, + ExecutionFinalizationPath, + ExecutionLastCompletedBlock, + ExecutionLastStartedBlock, ExecutionTrigger, TraceSpan, WorkflowState, @@ -23,6 +26,46 @@ type TriggerData = Record & { correlation?: NonNullable['correlation'] } +function buildStartedMarkerPersistenceQuery(params: { + executionId: string + marker: ExecutionLastStartedBlock +}) { + const markerJson = JSON.stringify(params.marker) + + return sql`UPDATE workflow_execution_logs + SET execution_data = jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + '{lastStartedBlock}', + ${markerJson}::jsonb, + true + ) + WHERE execution_id = ${params.executionId} + AND COALESCE( + jsonb_extract_path_text(COALESCE(execution_data, '{}'::jsonb), 'lastStartedBlock', 'startedAt'), + '' + ) <= ${params.marker.startedAt}` +} + +function buildCompletedMarkerPersistenceQuery(params: { + executionId: string + marker: ExecutionLastCompletedBlock +}) { + const markerJson = JSON.stringify(params.marker) + + return sql`UPDATE workflow_execution_logs + SET execution_data = jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + '{lastCompletedBlock}', + ${markerJson}::jsonb, + true + ) + WHERE execution_id = ${params.executionId} + AND COALESCE( + jsonb_extract_path_text(COALESCE(execution_data, '{}'::jsonb), 'lastCompletedBlock', 'endedAt'), + '' + ) <= ${params.marker.endedAt}` +} + const logger = createLogger('LoggingSession') type CompletionAttempt = 'complete' | 'error' | 'cancelled' | 'paused' @@ -109,6 +152,7 @@ export class LoggingSession { tokens: { input: 0, output: 0, total: 0 }, models: {}, } + private pendingProgressWrites = new Set>() private costFlushed = false private postExecutionPromise: Promise | null = null @@ -124,12 +168,132 @@ export class LoggingSession { this.requestId = requestId } + async onBlockStart( + blockId: string, + blockName: string, + blockType: string, + startedAt: string + ): Promise { + await this.trackProgressWrite( + this.persistLastStartedBlock({ + blockId, + blockName, + blockType, + startedAt, + }) + ) + } + + private async persistLastStartedBlock(marker: ExecutionLastStartedBlock): Promise { + try { + await db.execute( + buildStartedMarkerPersistenceQuery({ + executionId: this.executionId, + marker, + }) + ) + } catch (error) { + logger.error(`Failed to persist last started block for execution ${this.executionId}:`, { + error: error instanceof Error ? error.message : String(error), + }) + } + } + + private async persistLastCompletedBlock(marker: ExecutionLastCompletedBlock): Promise { + try { + await db.execute( + buildCompletedMarkerPersistenceQuery({ + executionId: this.executionId, + marker, + }) + ) + } catch (error) { + logger.error(`Failed to persist last completed block for execution ${this.executionId}:`, { + error: error instanceof Error ? error.message : String(error), + }) + } + } + + private async trackProgressWrite(writePromise: Promise): Promise { + this.pendingProgressWrites.add(writePromise) + + try { + await writePromise + } finally { + this.pendingProgressWrites.delete(writePromise) + } + } + + private async drainPendingProgressWrites(): Promise { + while (this.pendingProgressWrites.size > 0) { + await Promise.allSettled(Array.from(this.pendingProgressWrites)) + } + } + + private async completeExecutionWithFinalization(params: { + endedAt: string + totalDurationMs: number + costSummary: { + totalCost: number + totalInputCost: number + totalOutputCost: number + totalTokens: number + totalPromptTokens: number + totalCompletionTokens: number + baseExecutionCharge: number + modelCost: number + models: Record< + string, + { + input: number + output: number + total: number + tokens: { input: number; output: number; total: number } + } + > + } + finalOutput: Record + traceSpans: TraceSpan[] + workflowInput?: unknown + executionState?: SerializableExecutionState + finalizationPath: ExecutionFinalizationPath + completionFailure?: string + level?: 'info' | 'error' + status?: 'completed' | 'failed' | 'cancelled' | 'pending' + }): Promise { + await executionLogger.completeWorkflowExecution({ + executionId: this.executionId, + endedAt: params.endedAt, + totalDurationMs: params.totalDurationMs, + costSummary: params.costSummary, + finalOutput: params.finalOutput, + traceSpans: params.traceSpans, + workflowInput: params.workflowInput, + executionState: params.executionState, + finalizationPath: params.finalizationPath, + completionFailure: params.completionFailure, + isResume: this.isResume, + level: params.level, + status: params.status, + }) + } + async onBlockComplete( blockId: string, blockName: string, blockType: string, output: any ): Promise { + await this.trackProgressWrite( + this.persistLastCompletedBlock({ + blockId, + blockName, + blockType, + endedAt: output?.endedAt || new Date().toISOString(), + success: !output?.output?.error, + }) + ) + if (!output?.cost || typeof output.cost.total !== 'number' || output.cost.total <= 0) { return } @@ -165,7 +329,7 @@ export class LoggingSession { } } - await this.flushAccumulatedCost() + void this.trackProgressWrite(this.flushAccumulatedCost()) } private async flushAccumulatedCost(): Promise { @@ -276,8 +440,7 @@ export class LoggingSession { const endTime = endedAt || new Date().toISOString() const duration = totalDurationMs || 0 - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime, totalDurationMs: duration, costSummary, @@ -285,7 +448,7 @@ export class LoggingSession { traceSpans: traceSpans || [], workflowInput, executionState, - isResume: this.isResume, + finalizationPath: 'completed', }) this.completed = true @@ -403,8 +566,7 @@ export class LoggingSession { const spans = hasProvidedSpans ? traceSpans : [errorSpan] - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime.toISOString(), totalDurationMs: Math.max(1, durationMs), costSummary, @@ -412,6 +574,8 @@ export class LoggingSession { traceSpans: spans, level: 'error', status: 'failed', + finalizationPath: 'force_failed', + completionFailure: message, }) this.completed = true @@ -490,13 +654,13 @@ export class LoggingSession { models: {}, } - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime.toISOString(), totalDurationMs: Math.max(1, durationMs), costSummary, finalOutput: { cancelled: true }, traceSpans: traceSpans || [], + finalizationPath: 'cancelled', status: 'cancelled', }) @@ -577,14 +741,14 @@ export class LoggingSession { models: {}, } - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime.toISOString(), totalDurationMs: Math.max(1, durationMs), costSummary, finalOutput: { paused: true }, traceSpans: traceSpans || [], workflowInput, + finalizationPath: 'paused', status: 'pending', }) @@ -746,7 +910,6 @@ export class LoggingSession { this.completionAttemptFailed = true throw error }) - return this.completionPromise } @@ -756,6 +919,7 @@ export class LoggingSession { private async _safeCompleteImpl(params: SessionCompleteParams = {}): Promise { try { + await this.drainPendingProgressWrites() await this.complete(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -769,6 +933,8 @@ export class LoggingSession { totalDurationMs: params.totalDurationMs, errorMessage: `Failed to store trace spans: ${errorMsg}`, isError: false, + finalizationPath: 'fallback_completed', + finalOutput: params.finalOutput || {}, }) } } @@ -779,6 +945,7 @@ export class LoggingSession { private async _safeCompleteWithErrorImpl(params?: SessionErrorCompleteParams): Promise { try { + await this.drainPendingProgressWrites() await this.completeWithError(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -793,6 +960,10 @@ export class LoggingSession { errorMessage: params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`, isError: true, + finalizationPath: 'force_failed', + finalOutput: { + error: params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`, + }, status: 'failed', }) } @@ -806,6 +977,7 @@ export class LoggingSession { private async _safeCompleteWithCancellationImpl(params?: SessionCancelledParams): Promise { try { + await this.drainPendingProgressWrites() await this.completeWithCancellation(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -819,6 +991,8 @@ export class LoggingSession { totalDurationMs: params?.totalDurationMs, errorMessage: 'Execution was cancelled', isError: false, + finalizationPath: 'cancelled', + finalOutput: { cancelled: true }, status: 'cancelled', }) } @@ -830,6 +1004,7 @@ export class LoggingSession { private async _safeCompleteWithPauseImpl(params?: SessionPausedParams): Promise { try { + await this.drainPendingProgressWrites() await this.completeWithPause(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -843,6 +1018,8 @@ export class LoggingSession { totalDurationMs: params?.totalDurationMs, errorMessage: 'Execution paused but failed to store full trace spans', isError: false, + finalizationPath: 'paused', + finalOutput: { paused: true }, status: 'pending', }) } @@ -867,12 +1044,16 @@ export class LoggingSession { status: 'failed', executionData: sql`jsonb_set( jsonb_set( - COALESCE(execution_data, '{}'::jsonb), - ARRAY['error'], - to_jsonb(${message}::text) + jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + ARRAY['error'], + to_jsonb(${message}::text) + ), + ARRAY['finalOutput'], + jsonb_build_object('error', ${message}::text) ), - ARRAY['finalOutput'], - jsonb_build_object('error', ${message}::text) + ARRAY['finalizationPath'], + to_jsonb('force_failed'::text) )`, }) .where(eq(workflowExecutionLogs.executionId, executionId)) @@ -891,6 +1072,8 @@ export class LoggingSession { totalDurationMs?: number errorMessage: string isError: boolean + finalizationPath: ExecutionFinalizationPath + finalOutput?: Record status?: 'completed' | 'failed' | 'cancelled' | 'pending' }): Promise { if (this.completed || this.completing) { @@ -903,28 +1086,48 @@ export class LoggingSession { ) try { - const costSummary = params.traceSpans?.length - ? calculateCostSummary(params.traceSpans) - : { - totalCost: BASE_EXECUTION_CHARGE, - totalInputCost: 0, - totalOutputCost: 0, - totalTokens: 0, - totalPromptTokens: 0, - totalCompletionTokens: 0, + const hasAccumulatedCost = + this.costFlushed || + this.accumulatedCost.total > BASE_EXECUTION_CHARGE || + this.accumulatedCost.tokens.total > 0 || + Object.keys(this.accumulatedCost.models).length > 0 + + const costSummary = hasAccumulatedCost + ? { + totalCost: this.accumulatedCost.total, + totalInputCost: this.accumulatedCost.input, + totalOutputCost: this.accumulatedCost.output, + totalTokens: this.accumulatedCost.tokens.total, + totalPromptTokens: this.accumulatedCost.tokens.input, + totalCompletionTokens: this.accumulatedCost.tokens.output, baseExecutionCharge: BASE_EXECUTION_CHARGE, - modelCost: 0, - models: {}, + modelCost: Math.max(0, this.accumulatedCost.total - BASE_EXECUTION_CHARGE), + models: this.accumulatedCost.models, } + : params.traceSpans?.length + ? calculateCostSummary(params.traceSpans) + : { + totalCost: BASE_EXECUTION_CHARGE, + totalInputCost: 0, + totalOutputCost: 0, + totalTokens: 0, + totalPromptTokens: 0, + totalCompletionTokens: 0, + baseExecutionCharge: BASE_EXECUTION_CHARGE, + modelCost: 0, + models: {}, + } - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + const finalOutput = params.finalOutput || { _fallback: true, error: params.errorMessage } + + await this.completeExecutionWithFinalization({ endedAt: params.endedAt || new Date().toISOString(), totalDurationMs: params.totalDurationMs || 0, costSummary, - finalOutput: { _fallback: true, error: params.errorMessage }, + finalOutput, traceSpans: [], - isResume: this.isResume, + finalizationPath: params.finalizationPath, + completionFailure: params.errorMessage, level: params.isError ? 'error' : 'info', status: params.status, }) diff --git a/apps/sim/lib/logs/types.ts b/apps/sim/lib/logs/types.ts index c785c3037db..b9402583704 100644 --- a/apps/sim/lib/logs/types.ts +++ b/apps/sim/lib/logs/types.ts @@ -71,6 +71,31 @@ export interface ExecutionStatus { durationMs?: number } +export const EXECUTION_FINALIZATION_PATHS = [ + 'completed', + 'fallback_completed', + 'force_failed', + 'cancelled', + 'paused', +] as const + +export type ExecutionFinalizationPath = (typeof EXECUTION_FINALIZATION_PATHS)[number] + +export interface ExecutionLastStartedBlock { + blockId: string + blockName: string + blockType: string + startedAt: string +} + +export interface ExecutionLastCompletedBlock { + blockId: string + blockName: string + blockType: string + endedAt: string + success: boolean +} + export interface WorkflowExecutionSnapshot { id: string workflowId: string | null @@ -105,6 +130,13 @@ export interface WorkflowExecutionLog { environment?: ExecutionEnvironment trigger?: ExecutionTrigger correlation?: AsyncExecutionCorrelation + error?: string + lastStartedBlock?: ExecutionLastStartedBlock + lastCompletedBlock?: ExecutionLastCompletedBlock + hasTraceSpans?: boolean + traceSpanCount?: number + completionFailure?: string + finalizationPath?: ExecutionFinalizationPath traceSpans?: TraceSpan[] tokens?: { input?: number; output?: number; total?: number } models?: Record< @@ -378,6 +410,9 @@ export interface ExecutionLoggerService { finalOutput: BlockOutputData traceSpans?: TraceSpan[] workflowInput?: any + executionState?: SerializableExecutionState + finalizationPath?: ExecutionFinalizationPath + completionFailure?: string isResume?: boolean level?: 'info' | 'error' status?: 'completed' | 'failed' | 'cancelled' | 'pending' diff --git a/apps/sim/lib/workflows/executor/execution-core.test.ts b/apps/sim/lib/workflows/executor/execution-core.test.ts index bbb702a11ae..048ba62492a 100644 --- a/apps/sim/lib/workflows/executor/execution-core.test.ts +++ b/apps/sim/lib/workflows/executor/execution-core.test.ts @@ -16,6 +16,8 @@ const { buildTraceSpansMock, serializeWorkflowMock, executorExecuteMock, + onBlockStartPersistenceMock, + executorConstructorMock, } = vi.hoisted(() => ({ loadWorkflowFromNormalizedTablesMock: vi.fn(), loadDeployedWorkflowStateMock: vi.fn(), @@ -32,6 +34,8 @@ const { buildTraceSpansMock: vi.fn(), serializeWorkflowMock: vi.fn(), executorExecuteMock: vi.fn(), + onBlockStartPersistenceMock: vi.fn(), + executorConstructorMock: vi.fn(), })) vi.mock('@sim/logger', () => ({ @@ -79,10 +83,13 @@ vi.mock('@/lib/workflows/utils', () => ({ })) vi.mock('@/executor', () => ({ - Executor: vi.fn().mockImplementation(() => ({ - execute: executorExecuteMock, - executeFromBlock: executorExecuteMock, - })), + Executor: vi.fn().mockImplementation((args) => { + executorConstructorMock(args) + return { + execute: executorExecuteMock, + executeFromBlock: executorExecuteMock, + } + }), })) vi.mock('@/serializer', () => ({ @@ -105,6 +112,8 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { safeCompleteWithCancellation: safeCompleteWithCancellationMock, safeCompleteWithPause: safeCompleteWithPauseMock, hasCompleted: hasCompletedMock, + onBlockStart: onBlockStartPersistenceMock, + onBlockComplete: vi.fn(), setPostExecutionPromise: vi.fn(), waitForPostExecution: vi.fn().mockResolvedValue(undefined), } @@ -176,10 +185,72 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { safeCompleteWithCancellationMock.mockResolvedValue(undefined) safeCompleteWithPauseMock.mockResolvedValue(undefined) hasCompletedMock.mockReturnValue(true) + onBlockStartPersistenceMock.mockResolvedValue(undefined) updateWorkflowRunCountsMock.mockResolvedValue(undefined) clearExecutionCancellationMock.mockResolvedValue(undefined) }) + it('routes onBlockStart through logging session persistence path', async () => { + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockStart: async (blockId) => { + expect(blockId).toBe('block-1') + }, + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + await contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + + expect(onBlockStartPersistenceMock).toHaveBeenCalledWith( + 'block-1', + 'Fetch', + 'api', + expect.any(String) + ) + }) + + it('does not await user block start callback after persistence completes', async () => { + let releaseCallback: (() => void) | undefined + const callbackPromise = new Promise((resolve) => { + releaseCallback = resolve + }) + + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockStart: vi.fn(() => callbackPromise), + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + + await expect( + contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + ).resolves.toBeUndefined() + + releaseCallback?.() + }) + it('awaits terminal completion before updating run counts and returning', async () => { const callOrder: string[] = [] @@ -222,7 +293,57 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { ]) }) - it('clears cancellation even when success finalization throws', async () => { + it('awaits wrapped lifecycle persistence before terminal finalization returns', async () => { + let releaseBlockStart: (() => void) | undefined + const blockStartPromise = new Promise((resolve) => { + releaseBlockStart = resolve + }) + const callOrder: string[] = [] + + onBlockStartPersistenceMock.mockImplementation(async () => { + callOrder.push('persist:start') + await blockStartPromise + callOrder.push('persist:end') + }) + + safeCompleteMock.mockImplementation(async () => { + callOrder.push('safeComplete') + }) + + executorExecuteMock.mockImplementation(async () => { + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + const startLifecycle = contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + await Promise.resolve() + callOrder.push('executor:before-release') + releaseBlockStart?.() + await startLifecycle + callOrder.push('executor:after-start') + + return { + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + } + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: {}, + loggingSession: loggingSession as any, + }) + + expect(callOrder).toEqual([ + 'persist:start', + 'executor:before-release', + 'persist:end', + 'executor:after-start', + 'safeComplete', + ]) + }) + + it('preserves successful execution when success finalization throws', async () => { executorExecuteMock.mockResolvedValue({ success: true, status: 'completed', @@ -244,7 +365,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(result.status).toBe('completed') expect(clearExecutionCancellationMock).toHaveBeenCalledWith('execution-1') - expect(updateWorkflowRunCountsMock).not.toHaveBeenCalled() + expect(updateWorkflowRunCountsMock).toHaveBeenCalledWith('workflow-1') }) it('routes cancelled executions through safeCompleteWithCancellation', async () => { @@ -304,6 +425,61 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(updateWorkflowRunCountsMock).not.toHaveBeenCalled() }) + it('swallows wrapped block start callback failures without breaking execution', async () => { + onBlockStartPersistenceMock.mockRejectedValue(new Error('start persistence failed')) + + executorExecuteMock.mockImplementation(async () => { + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + await contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + + return { + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + } + }) + + const result = await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: {}, + loggingSession: loggingSession as any, + }) + + expect(result.status).toBe('completed') + expect(safeCompleteMock).toHaveBeenCalledTimes(1) + }) + + it('swallows wrapped block complete callback failures without blocking completion', async () => { + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockComplete: vi.fn().mockRejectedValue(new Error('complete callback failed')), + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + + await expect( + contextExtensions.onBlockComplete('block-1', 'Fetch', 'api', { + output: { ok: true }, + executionTime: 1, + startedAt: 'start', + endedAt: 'end', + }) + ).resolves.toBeUndefined() + }) + it('finalizes errors before rethrowing and marks them as core-finalized', async () => { const error = new Error('engine failed') const executionResult = { @@ -445,7 +621,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(wasExecutionFinalizedByCore('engine failed', 'execution-a')).toBe(true) }) - it('logs error without rejecting when success finalization rejects', async () => { + it('does not replace a successful outcome when success finalization rejects', async () => { executorExecuteMock.mockResolvedValue({ success: true, status: 'completed', @@ -464,7 +640,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { await loggingSession.setPostExecutionPromise.mock.calls[0][0] - expect(result.status).toBe('completed') + expect(result).toMatchObject({ status: 'completed', success: true }) expect(clearExecutionCancellationMock).toHaveBeenCalledWith('execution-1') expect(safeCompleteWithErrorMock).not.toHaveBeenCalled() }) diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 264cc249e37..aa96e5668a5 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -179,33 +179,41 @@ async function finalizeExecutionOutcome(params: { const endedAt = new Date().toISOString() try { - if (result.status === 'cancelled') { - await loggingSession.safeCompleteWithCancellation({ - endedAt, - totalDurationMs: totalDuration || 0, - traceSpans: traceSpans || [], - }) - return - } + try { + if (result.status === 'cancelled') { + await loggingSession.safeCompleteWithCancellation({ + endedAt, + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + }) + return + } + + if (result.status === 'paused') { + await loggingSession.safeCompleteWithPause({ + endedAt, + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + workflowInput, + }) + return + } - if (result.status === 'paused') { - await loggingSession.safeCompleteWithPause({ + await loggingSession.safeComplete({ endedAt, totalDurationMs: totalDuration || 0, + finalOutput: result.output || {}, traceSpans: traceSpans || [], workflowInput, + executionState: result.executionState, + }) + } catch (error) { + logger.warn(`[${requestId}] Post-execution finalization failed`, { + executionId, + status: result.status, + error, }) - return } - - await loggingSession.safeComplete({ - endedAt, - totalDurationMs: totalDuration || 0, - finalOutput: result.output || {}, - traceSpans: traceSpans || [], - workflowInput, - executionState: result.executionState, - }) } finally { await clearExecutionCancellationSafely(executionId, requestId) } @@ -424,16 +432,69 @@ export async function executeWorkflowCore( iterationContext?: IterationContext, childWorkflowContext?: ChildWorkflowContext ) => { - await loggingSession.onBlockComplete(blockId, blockName, blockType, output) - if (onBlockComplete) { - await onBlockComplete( + try { + await loggingSession.onBlockComplete(blockId, blockName, blockType, output) + if (onBlockComplete) { + void onBlockComplete( + blockId, + blockName, + blockType, + output, + iterationContext, + childWorkflowContext + ).catch((error) => { + logger.warn(`[${requestId}] Block completion callback failed`, { + executionId, + blockId, + blockType, + error, + }) + }) + } + } catch (error) { + logger.warn(`[${requestId}] Block completion persistence failed`, { + executionId, + blockId, + blockType, + error, + }) + } + } + + const wrappedOnBlockStart = async ( + blockId: string, + blockName: string, + blockType: string, + executionOrder: number, + iterationContext?: IterationContext, + childWorkflowContext?: ChildWorkflowContext + ) => { + try { + await loggingSession.onBlockStart(blockId, blockName, blockType, new Date().toISOString()) + if (onBlockStart) { + void onBlockStart( + blockId, + blockName, + blockType, + executionOrder, + iterationContext, + childWorkflowContext + ).catch((error) => { + logger.warn(`[${requestId}] Block start callback failed`, { + executionId, + blockId, + blockType, + error, + }) + }) + } + } catch (error) { + logger.warn(`[${requestId}] Block start persistence failed`, { + executionId, blockId, - blockName, blockType, - output, - iterationContext, - childWorkflowContext - ) + error, + }) } } @@ -445,7 +506,7 @@ export async function executeWorkflowCore( userId, isDeployedContext: !metadata.isClientSession, enforceCredentialAccess: metadata.enforceCredentialAccess ?? false, - onBlockStart, + onBlockStart: wrappedOnBlockStart, onBlockComplete: wrappedOnBlockComplete, onStream, resumeFromSnapshot, From 83fc33a1cfdf9d2626e846eeed41cd68ee0c0d43 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Tue, 17 Mar 2026 17:38:30 -0700 Subject: [PATCH 5/6] feat(admin): add user search by email and ID, remove table border - Replace Load Users button with a live search input; query fires on any input - Email search uses listUsers with contains operator - User ID search (UUID format) uses admin.getUser directly for exact lookup - Remove outer border on user table that rendered white in dark mode - Reset pagination to page 0 on new search Co-Authored-By: Claude Sonnet 4.6 --- .../settings/components/admin/admin.tsx | 32 ++++----- apps/sim/hooks/queries/admin-users.ts | 65 ++++++++++++++----- 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx b/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx index 2e856c39f9c..2d02f7b1e25 100644 --- a/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx +++ b/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx @@ -31,7 +31,7 @@ export function Admin() { const [workflowId, setWorkflowId] = useState('') const [usersOffset, setUsersOffset] = useState(0) - const [usersEnabled, setUsersEnabled] = useState(false) + const [searchQuery, setSearchQuery] = useState('') const [banUserId, setBanUserId] = useState(null) const [banReason, setBanReason] = useState('') @@ -39,8 +39,7 @@ export function Admin() { data: usersData, isLoading: usersLoading, error: usersError, - refetch: refetchUsers, - } = useAdminUsers(usersOffset, PAGE_SIZE, usersEnabled) + } = useAdminUsers(usersOffset, PAGE_SIZE, searchQuery) const totalPages = useMemo( () => Math.ceil((usersData?.total ?? 0) / PAGE_SIZE), @@ -62,14 +61,6 @@ export function Admin() { ) } - const handleLoadUsers = () => { - if (usersEnabled) { - refetchUsers() - } else { - setUsersEnabled(true) - } - } - const pendingUserIds = useMemo(() => { const ids = new Set() if (setUserRole.isPending && (setUserRole.variables as { userId?: string })?.userId) @@ -136,12 +127,15 @@ export function Admin() {
-
-

User Management

- -
+

User Management

+ { + setSearchQuery(e.target.value) + setUsersOffset(0) + }} + placeholder='Search by email or paste a user ID...' + /> {usersError && (

@@ -166,7 +160,7 @@ export function Admin() { {usersData && ( <> -

+
Name Email @@ -176,7 +170,7 @@ export function Admin() {
{usersData.users.length === 0 && ( -
+
No users found.
)} diff --git a/apps/sim/hooks/queries/admin-users.ts b/apps/sim/hooks/queries/admin-users.ts index 69b74dfbaa4..6f9f6bba736 100644 --- a/apps/sim/hooks/queries/admin-users.ts +++ b/apps/sim/hooks/queries/admin-users.ts @@ -7,7 +7,8 @@ const logger = createLogger('AdminUsersQuery') export const adminUserKeys = { all: ['adminUsers'] as const, lists: () => [...adminUserKeys.all, 'list'] as const, - list: (offset: number, limit: number) => [...adminUserKeys.lists(), offset, limit] as const, + list: (offset: number, limit: number, searchQuery: string) => + [...adminUserKeys.lists(), offset, limit, searchQuery] as const, } interface AdminUser { @@ -24,31 +25,59 @@ interface AdminUsersResponse { total: number } -async function fetchAdminUsers(offset: number, limit: number): Promise { +const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i + +function mapUser(u: { + id: string + name: string + email: string + role?: string | null + banned?: boolean | null + banReason?: string | null +}): AdminUser { + return { + id: u.id, + name: u.name || '', + email: u.email, + role: u.role ?? 'user', + banned: u.banned ?? false, + banReason: u.banReason ?? null, + } +} + +async function fetchAdminUsers( + offset: number, + limit: number, + searchQuery: string +): Promise { + if (UUID_REGEX.test(searchQuery.trim())) { + const { data, error } = await client.admin.getUser({ query: { id: searchQuery.trim() } }) + if (error) throw new Error(error.message ?? 'Failed to fetch user') + if (!data) return { users: [], total: 0 } + return { users: [mapUser(data)], total: 1 } + } + const { data, error } = await client.admin.listUsers({ - query: { limit, offset }, + query: { + limit, + offset, + searchField: 'email', + searchValue: searchQuery, + searchOperator: 'contains', + }, }) - if (error) { - throw new Error(error.message ?? 'Failed to fetch users') - } + if (error) throw new Error(error.message ?? 'Failed to fetch users') return { - users: (data?.users ?? []).map((u) => ({ - id: u.id, - name: u.name || '', - email: u.email, - role: u.role ?? 'user', - banned: u.banned ?? false, - banReason: u.banReason ?? null, - })), + users: (data?.users ?? []).map(mapUser), total: data?.total ?? 0, } } -export function useAdminUsers(offset: number, limit: number, enabled: boolean) { +export function useAdminUsers(offset: number, limit: number, searchQuery: string) { return useQuery({ - queryKey: adminUserKeys.list(offset, limit), - queryFn: () => fetchAdminUsers(offset, limit), - enabled, + queryKey: adminUserKeys.list(offset, limit, searchQuery), + queryFn: () => fetchAdminUsers(offset, limit, searchQuery), + enabled: searchQuery.length > 0, staleTime: 30 * 1000, placeholderData: keepPreviousData, }) From 8eb5e23532618c86aabc106b25dfc1882ab3fad3 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Tue, 17 Mar 2026 17:46:32 -0700 Subject: [PATCH 6/6] fix(admin): replace live search with explicit search button - Split searchInput (controlled input) from searchQuery (committed value) so the hook only fires on Search click or Enter, not every keystroke - Gate table render on searchQuery.length > 0 to prevent stale results showing after input is cleared Co-Authored-By: Claude Sonnet 4.6 --- .../settings/components/admin/admin.tsx | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx b/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx index 2d02f7b1e25..5161bb3c5d9 100644 --- a/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx +++ b/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx @@ -31,6 +31,7 @@ export function Admin() { const [workflowId, setWorkflowId] = useState('') const [usersOffset, setUsersOffset] = useState(0) + const [searchInput, setSearchInput] = useState('') const [searchQuery, setSearchQuery] = useState('') const [banUserId, setBanUserId] = useState(null) const [banReason, setBanReason] = useState('') @@ -41,6 +42,11 @@ export function Admin() { error: usersError, } = useAdminUsers(usersOffset, PAGE_SIZE, searchQuery) + const handleSearch = () => { + setUsersOffset(0) + setSearchQuery(searchInput.trim()) + } + const totalPages = useMemo( () => Math.ceil((usersData?.total ?? 0) / PAGE_SIZE), [usersData?.total] @@ -128,14 +134,17 @@ export function Admin() {

User Management

- { - setSearchQuery(e.target.value) - setUsersOffset(0) - }} - placeholder='Search by email or paste a user ID...' - /> +
+ setSearchInput(e.target.value)} + onKeyDown={(e) => e.key === 'Enter' && handleSearch()} + placeholder='Search by email or paste a user ID...' + /> + +
{usersError && (

@@ -158,7 +167,7 @@ export function Admin() {

)} - {usersData && ( + {searchQuery.length > 0 && usersData && ( <>