diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 299c8f0f852..74312caab0b 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -132,7 +132,7 @@ function toDisplayAttachment(f: TaskStoredFileAttachment): ChatMessageAttachment media_type: f.media_type, size: f.size, previewUrl: f.media_type.startsWith('image/') - ? `/api/files/serve/${encodeURIComponent(f.key)}?context=copilot` + ? `/api/files/serve/${encodeURIComponent(f.key)}?context=mothership` : undefined, } } diff --git a/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx b/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx index 2e856c39f9c..5161bb3c5d9 100644 --- a/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx +++ b/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx @@ -31,7 +31,8 @@ export function Admin() { const [workflowId, setWorkflowId] = useState('') const [usersOffset, setUsersOffset] = useState(0) - const [usersEnabled, setUsersEnabled] = useState(false) + const [searchInput, setSearchInput] = useState('') + const [searchQuery, setSearchQuery] = useState('') const [banUserId, setBanUserId] = useState(null) const [banReason, setBanReason] = useState('') @@ -39,8 +40,12 @@ export function Admin() { data: usersData, isLoading: usersLoading, error: usersError, - refetch: refetchUsers, - } = useAdminUsers(usersOffset, PAGE_SIZE, usersEnabled) + } = useAdminUsers(usersOffset, PAGE_SIZE, searchQuery) + + const handleSearch = () => { + setUsersOffset(0) + setSearchQuery(searchInput.trim()) + } const totalPages = useMemo( () => Math.ceil((usersData?.total ?? 0) / PAGE_SIZE), @@ -62,14 +67,6 @@ export function Admin() { ) } - const handleLoadUsers = () => { - if (usersEnabled) { - refetchUsers() - } else { - setUsersEnabled(true) - } - } - const pendingUserIds = useMemo(() => { const ids = new Set() if (setUserRole.isPending && (setUserRole.variables as { userId?: string })?.userId) @@ -136,10 +133,16 @@ export function Admin() {
-
-

User Management

-
@@ -164,9 +167,9 @@ export function Admin() {
)} - {usersData && ( + {searchQuery.length > 0 && usersData && ( <> -
+
Name Email @@ -176,7 +179,7 @@ export function Admin() {
{usersData.users.length === 0 && ( -
+
No users found.
)} diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 9c54d2bd993..5680ee4e3cf 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -77,7 +77,7 @@ export class BlockExecutor { if (!isSentinel) { blockLog = this.createBlockLog(ctx, node.id, block, node) ctx.blockLogs.push(blockLog) - this.callOnBlockStart(ctx, node, block, blockLog.executionOrder) + await this.callOnBlockStart(ctx, node, block, blockLog.executionOrder) } const startTime = performance.now() @@ -105,7 +105,7 @@ export class BlockExecutor { } } catch (error) { cleanupSelfReference?.() - return this.handleBlockError( + return await this.handleBlockError( error, ctx, node, @@ -179,7 +179,7 @@ export class BlockExecutor { const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, { block, }) - this.callOnBlockComplete( + await this.callOnBlockComplete( ctx, node, block, @@ -195,7 +195,7 @@ export class BlockExecutor { return normalizedOutput } catch (error) { - return this.handleBlockError( + return await this.handleBlockError( error, ctx, node, @@ -226,7 +226,7 @@ export class BlockExecutor { return this.blockHandlers.find((h) => h.canHandle(block)) } - private handleBlockError( + private async handleBlockError( error: unknown, ctx: ExecutionContext, node: DAGNode, @@ -236,7 +236,7 @@ export class BlockExecutor { resolvedInputs: Record, isSentinel: boolean, phase: 'input_resolution' | 'execution' - ): NormalizedBlockOutput { + ): Promise { const duration = performance.now() - startTime const errorMessage = normalizeError(error) const hasResolvedInputs = @@ -287,7 +287,7 @@ export class BlockExecutor { ? error.childWorkflowInstanceId : undefined const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block }) - this.callOnBlockComplete( + await this.callOnBlockComplete( ctx, node, block, @@ -439,12 +439,12 @@ export class BlockExecutor { return redactApiKeys(result) } - private callOnBlockStart( + private async callOnBlockStart( ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, executionOrder: number - ): void { + ): Promise { const blockId = node.metadata?.originalBlockId ?? node.id const blockName = block.metadata?.name ?? blockId const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE @@ -452,18 +452,26 @@ export class BlockExecutor { const iterationContext = getIterationContext(ctx, node?.metadata) if (this.contextExtensions.onBlockStart) { - this.contextExtensions.onBlockStart( - blockId, - blockName, - blockType, - executionOrder, - iterationContext, - ctx.childWorkflowContext - ) + try { + await this.contextExtensions.onBlockStart( + blockId, + blockName, + blockType, + executionOrder, + iterationContext, + ctx.childWorkflowContext + ) + } catch (error) { + logger.warn('Block start callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } - private callOnBlockComplete( + private async callOnBlockComplete( ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, @@ -474,7 +482,7 @@ export class BlockExecutor { executionOrder: number, endedAt: string, childWorkflowInstanceId?: string - ): void { + ): Promise { const blockId = node.metadata?.originalBlockId ?? node.id const blockName = block.metadata?.name ?? blockId const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE @@ -482,22 +490,30 @@ export class BlockExecutor { const iterationContext = getIterationContext(ctx, node?.metadata) if (this.contextExtensions.onBlockComplete) { - this.contextExtensions.onBlockComplete( - blockId, - blockName, - blockType, - { - input, - output, - executionTime: duration, - startedAt, - executionOrder, - endedAt, - childWorkflowInstanceId, - }, - iterationContext, - ctx.childWorkflowContext - ) + try { + await this.contextExtensions.onBlockComplete( + blockId, + blockName, + blockType, + { + input, + output, + executionTime: duration, + startedAt, + executionOrder, + endedAt, + childWorkflowInstanceId, + }, + iterationContext, + ctx.childWorkflowContext + ) + } catch (error) { + logger.warn('Block completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index 039203c068a..0ea42a137e8 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -51,7 +51,7 @@ export class LoopOrchestrator { private edgeManager: EdgeManager | null = null ) {} - initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope { + async initializeLoopScope(ctx: ExecutionContext, loopId: string): Promise { const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined if (!loopConfig) { throw new Error(`Loop config not found: ${loopId}`) @@ -76,7 +76,7 @@ export class LoopOrchestrator { ) if (iterationError) { logger.error(iterationError, { loopId, requestedIterations }) - this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { + await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { iterations: requestedIterations, }) scope.maxIterations = 0 @@ -99,7 +99,7 @@ export class LoopOrchestrator { } catch (error) { const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}` logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems }) - this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, { + await this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, { forEachItems: loopConfig.forEachItems, }) scope.items = [] @@ -117,7 +117,7 @@ export class LoopOrchestrator { ) if (sizeError) { logger.error(sizeError, { loopId, collectionSize: items.length }) - this.addLoopErrorLog(ctx, loopId, loopType, sizeError, { + await this.addLoopErrorLog(ctx, loopId, loopType, sizeError, { forEachItems: loopConfig.forEachItems, collectionSize: items.length, }) @@ -155,7 +155,7 @@ export class LoopOrchestrator { ) if (iterationError) { logger.error(iterationError, { loopId, requestedIterations }) - this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { + await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { iterations: requestedIterations, }) scope.maxIterations = 0 @@ -182,14 +182,14 @@ export class LoopOrchestrator { return scope } - private addLoopErrorLog( + private async addLoopErrorLog( ctx: ExecutionContext, loopId: string, loopType: string, errorMessage: string, inputData?: any - ): void { - addSubflowErrorLog( + ): Promise { + await addSubflowErrorLog( ctx, loopId, 'loop', @@ -238,7 +238,7 @@ export class LoopOrchestrator { } if (isCancelled) { logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration }) - return this.createExitResult(ctx, loopId, scope) + return await this.createExitResult(ctx, loopId, scope) } const iterationResults: NormalizedBlockOutput[] = [] @@ -253,7 +253,7 @@ export class LoopOrchestrator { scope.currentIterationOutputs.clear() if (!(await this.evaluateCondition(ctx, scope, scope.iteration + 1))) { - return this.createExitResult(ctx, loopId, scope) + return await this.createExitResult(ctx, loopId, scope) } scope.iteration++ @@ -269,11 +269,11 @@ export class LoopOrchestrator { } } - private createExitResult( + private async createExitResult( ctx: ExecutionContext, loopId: string, scope: LoopScope - ): LoopContinuationResult { + ): Promise { const results = scope.allIterationOutputs const output = { results } this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME) @@ -282,19 +282,26 @@ export class LoopOrchestrator { const now = new Date().toISOString() const iterationContext = buildContainerIterationContext(ctx, loopId) - this.contextExtensions.onBlockComplete( - loopId, - 'Loop', - 'loop', - { - output, - executionTime: DEFAULTS.EXECUTION_TIME, - startedAt: now, - executionOrder: getNextExecutionOrder(ctx), - endedAt: now, - }, - iterationContext - ) + try { + await this.contextExtensions.onBlockComplete( + loopId, + 'Loop', + 'loop', + { + output, + executionTime: DEFAULTS.EXECUTION_TIME, + startedAt: now, + executionOrder: getNextExecutionOrder(ctx), + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Loop completion callback failed', { + loopId, + error: error instanceof Error ? error.message : String(error), + }) + } } return { @@ -597,7 +604,7 @@ export class LoopOrchestrator { if (!scope.items || scope.items.length === 0) { logger.info('ForEach loop has empty collection, skipping loop body', { loopId }) this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) return false } return true @@ -607,7 +614,7 @@ export class LoopOrchestrator { if (scope.maxIterations === 0) { logger.info('For loop has 0 iterations, skipping loop body', { loopId }) this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) return false } return true @@ -621,7 +628,7 @@ export class LoopOrchestrator { if (!scope.condition) { logger.warn('No condition defined for while loop', { loopId }) this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) return false } @@ -634,7 +641,7 @@ export class LoopOrchestrator { if (!result) { this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) } return result diff --git a/apps/sim/executor/orchestrators/node.ts b/apps/sim/executor/orchestrators/node.ts index 862f7c1a2e9..7ae57a555c9 100644 --- a/apps/sim/executor/orchestrators/node.ts +++ b/apps/sim/executor/orchestrators/node.ts @@ -53,14 +53,14 @@ export class NodeExecutionOrchestrator { const loopId = node.metadata.loopId if (loopId && !this.loopOrchestrator.getLoopScope(ctx, loopId)) { - this.loopOrchestrator.initializeLoopScope(ctx, loopId) + await this.loopOrchestrator.initializeLoopScope(ctx, loopId) } const parallelId = node.metadata.parallelId if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) { const parallelConfig = this.dag.parallelConfigs.get(parallelId) const nodesInParallel = parallelConfig?.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } if (node.metadata.isSentinel) { @@ -92,7 +92,7 @@ export class NodeExecutionOrchestrator { const isParallelSentinel = node.metadata.isParallelSentinel if (isParallelSentinel) { - return this.handleParallelSentinel(ctx, node, sentinelType, parallelId) + return await this.handleParallelSentinel(ctx, node, sentinelType, parallelId) } switch (sentinelType) { @@ -142,12 +142,12 @@ export class NodeExecutionOrchestrator { } } - private handleParallelSentinel( + private async handleParallelSentinel( ctx: ExecutionContext, node: DAGNode, sentinelType: string | undefined, parallelId: string | undefined - ): NormalizedBlockOutput { + ): Promise { if (!parallelId) { logger.warn('Parallel sentinel called without parallelId') return {} @@ -158,7 +158,7 @@ export class NodeExecutionOrchestrator { const parallelConfig = this.dag.parallelConfigs.get(parallelId) if (parallelConfig) { const nodesInParallel = parallelConfig.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } } @@ -176,7 +176,7 @@ export class NodeExecutionOrchestrator { } if (sentinelType === 'end') { - const result = this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) + const result = await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) return { results: result.results || [], sentinelEnd: true, @@ -210,7 +210,7 @@ export class NodeExecutionOrchestrator { } else if (isParallelBranch) { const parallelId = this.findParallelIdForNode(node.id) if (parallelId) { - this.handleParallelNodeCompletion(ctx, node, output, parallelId) + await this.handleParallelNodeCompletion(ctx, node, output, parallelId) } else { this.handleRegularNodeCompletion(ctx, node, output) } @@ -229,17 +229,17 @@ export class NodeExecutionOrchestrator { this.state.setBlockOutput(node.id, output) } - private handleParallelNodeCompletion( + private async handleParallelNodeCompletion( ctx: ExecutionContext, node: DAGNode, output: NormalizedBlockOutput, parallelId: string - ): void { + ): Promise { const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId) if (!scope) { const parallelConfig = this.dag.parallelConfigs.get(parallelId) const nodesInParallel = parallelConfig?.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } const allComplete = this.parallelOrchestrator.handleParallelBranchCompletion( ctx, @@ -248,7 +248,7 @@ export class NodeExecutionOrchestrator { output ) if (allComplete) { - this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) + await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) } this.state.setBlockOutput(node.id, output) diff --git a/apps/sim/executor/orchestrators/parallel.test.ts b/apps/sim/executor/orchestrators/parallel.test.ts new file mode 100644 index 00000000000..34b2c011f9e --- /dev/null +++ b/apps/sim/executor/orchestrators/parallel.test.ts @@ -0,0 +1,142 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import type { DAG } from '@/executor/dag/builder' +import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types' +import { ParallelOrchestrator } from '@/executor/orchestrators/parallel' +import type { ExecutionContext } from '@/executor/types' + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), +})) + +function createDag(): DAG { + return { + nodes: new Map(), + loopConfigs: new Map(), + parallelConfigs: new Map([ + [ + 'parallel-1', + { + id: 'parallel-1', + nodes: ['task-1'], + distribution: [], + parallelType: 'collection', + }, + ], + ]), + } +} + +function createState(): BlockStateWriter { + return { + setBlockOutput: vi.fn(), + setBlockState: vi.fn(), + deleteBlockState: vi.fn(), + unmarkExecuted: vi.fn(), + } +} + +function createContext(overrides: Partial = {}): ExecutionContext { + return { + workflowId: 'workflow-1', + workspaceId: 'workspace-1', + executionId: 'execution-1', + userId: 'user-1', + blockStates: new Map(), + executedBlocks: new Set(), + blockLogs: [], + metadata: { duration: 0 }, + environmentVariables: {}, + decisions: { + router: new Map(), + condition: new Map(), + }, + completedLoops: new Set(), + activeExecutionPath: new Set(), + workflow: { + version: '1', + blocks: [ + { + id: 'parallel-1', + position: { x: 0, y: 0 }, + config: { tool: '', params: {} }, + inputs: {}, + outputs: {}, + metadata: { id: 'parallel', name: 'Parallel 1' }, + enabled: true, + }, + ], + connections: [], + loops: {}, + parallels: {}, + }, + ...overrides, + } +} + +describe('ParallelOrchestrator', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('awaits empty-subflow lifecycle callbacks before returning the empty scope', async () => { + let releaseStart: (() => void) | undefined + const onBlockStart = vi.fn( + () => + new Promise((resolve) => { + releaseStart = resolve + }) + ) + const onBlockComplete = vi.fn() + const contextExtensions: ContextExtensions = { + onBlockStart, + onBlockComplete, + } + const orchestrator = new ParallelOrchestrator( + createDag(), + createState(), + null, + contextExtensions + ) + const ctx = createContext() + + const initializePromise = orchestrator.initializeParallelScope(ctx, 'parallel-1', 1) + await Promise.resolve() + + expect(onBlockStart).toHaveBeenCalledTimes(1) + expect(onBlockComplete).not.toHaveBeenCalled() + + releaseStart?.() + const scope = await initializePromise + + expect(onBlockComplete).toHaveBeenCalledTimes(1) + expect(scope.isEmpty).toBe(true) + }) + + it('swallows helper callback failures on empty parallel paths', async () => { + const contextExtensions: ContextExtensions = { + onBlockStart: vi.fn().mockRejectedValue(new Error('start failed')), + onBlockComplete: vi.fn().mockRejectedValue(new Error('complete failed')), + } + const orchestrator = new ParallelOrchestrator( + createDag(), + createState(), + null, + contextExtensions + ) + + await expect( + orchestrator.initializeParallelScope(createContext(), 'parallel-1', 1) + ).resolves.toMatchObject({ + parallelId: 'parallel-1', + isEmpty: true, + }) + }) +}) diff --git a/apps/sim/executor/orchestrators/parallel.ts b/apps/sim/executor/orchestrators/parallel.ts index ed098066bc0..23f7dede964 100644 --- a/apps/sim/executor/orchestrators/parallel.ts +++ b/apps/sim/executor/orchestrators/parallel.ts @@ -47,11 +47,11 @@ export class ParallelOrchestrator { private contextExtensions: ContextExtensions | null = null ) {} - initializeParallelScope( + async initializeParallelScope( ctx: ExecutionContext, parallelId: string, terminalNodesCount = 1 - ): ParallelScope { + ): Promise { const parallelConfig = this.dag.parallelConfigs.get(parallelId) if (!parallelConfig) { throw new Error(`Parallel config not found: ${parallelId}`) @@ -69,7 +69,7 @@ export class ParallelOrchestrator { } catch (error) { const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}` logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution }) - this.addParallelErrorLog(ctx, parallelId, errorMessage, { + await this.addParallelErrorLog(ctx, parallelId, errorMessage, { distribution: parallelConfig.distribution, }) this.setErrorScope(ctx, parallelId, errorMessage) @@ -83,7 +83,7 @@ export class ParallelOrchestrator { ) if (branchError) { logger.error(branchError, { parallelId, branchCount }) - this.addParallelErrorLog(ctx, parallelId, branchError, { + await this.addParallelErrorLog(ctx, parallelId, branchError, { distribution: parallelConfig.distribution, branchCount, }) @@ -109,7 +109,7 @@ export class ParallelOrchestrator { this.state.setBlockOutput(parallelId, { results: [] }) - emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions) + await emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions) logger.info('Parallel scope initialized with empty distribution, skipping body', { parallelId, @@ -220,13 +220,13 @@ export class ParallelOrchestrator { return { branchCount: items.length, items } } - private addParallelErrorLog( + private async addParallelErrorLog( ctx: ExecutionContext, parallelId: string, errorMessage: string, inputData?: any - ): void { - addSubflowErrorLog( + ): Promise { + await addSubflowErrorLog( ctx, parallelId, 'parallel', @@ -291,7 +291,10 @@ export class ParallelOrchestrator { return allComplete } - aggregateParallelResults(ctx: ExecutionContext, parallelId: string): ParallelAggregationResult { + async aggregateParallelResults( + ctx: ExecutionContext, + parallelId: string + ): Promise { const scope = ctx.parallelExecutions?.get(parallelId) if (!scope) { logger.error('Parallel scope not found for aggregation', { parallelId }) @@ -316,19 +319,26 @@ export class ParallelOrchestrator { const now = new Date().toISOString() const iterationContext = buildContainerIterationContext(ctx, parallelId) - this.contextExtensions.onBlockComplete( - parallelId, - 'Parallel', - 'parallel', - { - output, - executionTime: 0, - startedAt: now, - executionOrder: getNextExecutionOrder(ctx), - endedAt: now, - }, - iterationContext - ) + try { + await this.contextExtensions.onBlockComplete( + parallelId, + 'Parallel', + 'parallel', + { + output, + executionTime: 0, + startedAt: now, + executionOrder: getNextExecutionOrder(ctx), + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Parallel completion callback failed', { + parallelId, + error: error instanceof Error ? error.message : String(error), + }) + } } return { diff --git a/apps/sim/executor/utils/subflow-utils.ts b/apps/sim/executor/utils/subflow-utils.ts index 977be2788c9..98391442a78 100644 --- a/apps/sim/executor/utils/subflow-utils.ts +++ b/apps/sim/executor/utils/subflow-utils.ts @@ -1,9 +1,12 @@ +import { createLogger } from '@sim/logger' import { DEFAULTS, LOOP, PARALLEL, REFERENCE } from '@/executor/constants' import type { ContextExtensions } from '@/executor/execution/types' import { type BlockLog, type ExecutionContext, getNextExecutionOrder } from '@/executor/types' import { buildContainerIterationContext } from '@/executor/utils/iteration-context' import type { VariableResolver } from '@/executor/variables/resolver' +const logger = createLogger('SubflowUtils') + const BRANCH_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}\\d+${PARALLEL.BRANCH.SUFFIX}$`) const BRANCH_INDEX_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}(\\d+)${PARALLEL.BRANCH.SUFFIX}$`) const LOOP_SENTINEL_START_PATTERN = new RegExp( @@ -265,14 +268,14 @@ export function resolveArrayInput( /** * Creates and logs an error for a subflow (loop or parallel). */ -export function addSubflowErrorLog( +export async function addSubflowErrorLog( ctx: ExecutionContext, blockId: string, blockType: 'loop' | 'parallel', errorMessage: string, inputData: Record, contextExtensions: ContextExtensions | null -): void { +): Promise { const now = new Date().toISOString() const execOrder = getNextExecutionOrder(ctx) @@ -296,18 +299,34 @@ export function addSubflowErrorLog( ctx.blockLogs.push(blockLog) if (contextExtensions?.onBlockStart) { - contextExtensions.onBlockStart(blockId, blockName, blockType, execOrder) + try { + await contextExtensions.onBlockStart(blockId, blockName, blockType, execOrder) + } catch (error) { + logger.warn('Subflow error start callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } if (contextExtensions?.onBlockComplete) { - contextExtensions.onBlockComplete(blockId, blockName, blockType, { - input: inputData, - output: { error: errorMessage }, - executionTime: 0, - startedAt: now, - executionOrder: execOrder, - endedAt: now, - }) + try { + await contextExtensions.onBlockComplete(blockId, blockName, blockType, { + input: inputData, + output: { error: errorMessage }, + executionTime: 0, + startedAt: now, + executionOrder: execOrder, + endedAt: now, + }) + } catch (error) { + logger.warn('Subflow error completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } @@ -316,12 +335,12 @@ export function addSubflowErrorLog( * empty collection or false initial condition. This ensures the container block * appears in terminal logs, execution snapshots, and edge highlighting. */ -export function emitEmptySubflowEvents( +export async function emitEmptySubflowEvents( ctx: ExecutionContext, blockId: string, blockType: 'loop' | 'parallel', contextExtensions: ContextExtensions | null -): void { +): Promise { const now = new Date().toISOString() const executionOrder = getNextExecutionOrder(ctx) const output = { results: [] } @@ -342,22 +361,38 @@ export function emitEmptySubflowEvents( }) if (contextExtensions?.onBlockStart) { - contextExtensions.onBlockStart(blockId, blockName, blockType, executionOrder) + try { + await contextExtensions.onBlockStart(blockId, blockName, blockType, executionOrder) + } catch (error) { + logger.warn('Empty subflow start callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } if (contextExtensions?.onBlockComplete) { - contextExtensions.onBlockComplete( - blockId, - blockName, - blockType, - { - output, - executionTime: DEFAULTS.EXECUTION_TIME, - startedAt: now, - executionOrder, - endedAt: now, - }, - iterationContext - ) + try { + await contextExtensions.onBlockComplete( + blockId, + blockName, + blockType, + { + output, + executionTime: DEFAULTS.EXECUTION_TIME, + startedAt: now, + executionOrder, + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Empty subflow completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } diff --git a/apps/sim/hooks/queries/admin-users.ts b/apps/sim/hooks/queries/admin-users.ts index 69b74dfbaa4..6f9f6bba736 100644 --- a/apps/sim/hooks/queries/admin-users.ts +++ b/apps/sim/hooks/queries/admin-users.ts @@ -7,7 +7,8 @@ const logger = createLogger('AdminUsersQuery') export const adminUserKeys = { all: ['adminUsers'] as const, lists: () => [...adminUserKeys.all, 'list'] as const, - list: (offset: number, limit: number) => [...adminUserKeys.lists(), offset, limit] as const, + list: (offset: number, limit: number, searchQuery: string) => + [...adminUserKeys.lists(), offset, limit, searchQuery] as const, } interface AdminUser { @@ -24,31 +25,59 @@ interface AdminUsersResponse { total: number } -async function fetchAdminUsers(offset: number, limit: number): Promise { +const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i + +function mapUser(u: { + id: string + name: string + email: string + role?: string | null + banned?: boolean | null + banReason?: string | null +}): AdminUser { + return { + id: u.id, + name: u.name || '', + email: u.email, + role: u.role ?? 'user', + banned: u.banned ?? false, + banReason: u.banReason ?? null, + } +} + +async function fetchAdminUsers( + offset: number, + limit: number, + searchQuery: string +): Promise { + if (UUID_REGEX.test(searchQuery.trim())) { + const { data, error } = await client.admin.getUser({ query: { id: searchQuery.trim() } }) + if (error) throw new Error(error.message ?? 'Failed to fetch user') + if (!data) return { users: [], total: 0 } + return { users: [mapUser(data)], total: 1 } + } + const { data, error } = await client.admin.listUsers({ - query: { limit, offset }, + query: { + limit, + offset, + searchField: 'email', + searchValue: searchQuery, + searchOperator: 'contains', + }, }) - if (error) { - throw new Error(error.message ?? 'Failed to fetch users') - } + if (error) throw new Error(error.message ?? 'Failed to fetch users') return { - users: (data?.users ?? []).map((u) => ({ - id: u.id, - name: u.name || '', - email: u.email, - role: u.role ?? 'user', - banned: u.banned ?? false, - banReason: u.banReason ?? null, - })), + users: (data?.users ?? []).map(mapUser), total: data?.total ?? 0, } } -export function useAdminUsers(offset: number, limit: number, enabled: boolean) { +export function useAdminUsers(offset: number, limit: number, searchQuery: string) { return useQuery({ - queryKey: adminUserKeys.list(offset, limit), - queryFn: () => fetchAdminUsers(offset, limit), - enabled, + queryKey: adminUserKeys.list(offset, limit, searchQuery), + queryFn: () => fetchAdminUsers(offset, limit, searchQuery), + enabled: searchQuery.length > 0, staleTime: 30 * 1000, placeholderData: keepPreviousData, }) diff --git a/apps/sim/hooks/queries/workflows.ts b/apps/sim/hooks/queries/workflows.ts index 97d3df0b2b5..cc9e7b0b696 100644 --- a/apps/sim/hooks/queries/workflows.ts +++ b/apps/sim/hooks/queries/workflows.ts @@ -113,10 +113,15 @@ export function useWorkflows( }) useEffect(() => { - if (syncRegistry && scope === 'active' && workspaceId && query.status === 'pending') { + if ( + syncRegistry && + scope === 'active' && + workspaceId && + (query.status === 'pending' || query.isPlaceholderData) + ) { beginMetadataLoad(workspaceId) } - }, [syncRegistry, scope, workspaceId, query.status, beginMetadataLoad]) + }, [syncRegistry, scope, workspaceId, query.status, query.isPlaceholderData, beginMetadataLoad]) useEffect(() => { if ( @@ -124,11 +129,20 @@ export function useWorkflows( scope === 'active' && workspaceId && query.status === 'success' && - query.data + query.data && + !query.isPlaceholderData ) { completeMetadataLoad(workspaceId, query.data) } - }, [syncRegistry, scope, workspaceId, query.status, query.data, completeMetadataLoad]) + }, [ + syncRegistry, + scope, + workspaceId, + query.status, + query.data, + query.isPlaceholderData, + completeMetadataLoad, + ]) useEffect(() => { if (syncRegistry && scope === 'active' && workspaceId && query.status === 'error') { diff --git a/apps/sim/hooks/queries/workspace.ts b/apps/sim/hooks/queries/workspace.ts index a3eb1bf5e6d..b4722aca3af 100644 --- a/apps/sim/hooks/queries/workspace.ts +++ b/apps/sim/hooks/queries/workspace.ts @@ -86,14 +86,9 @@ export function useCreateWorkspace() { const data = await response.json() return data.workspace as Workspace }, - onSuccess: (data) => { - queryClient.invalidateQueries({ queryKey: workspaceKeys.all }) - if (data?.id) { - queryClient.removeQueries({ queryKey: workspaceKeys.detail(data.id) }) - queryClient.removeQueries({ queryKey: workspaceKeys.settings(data.id) }) - queryClient.removeQueries({ queryKey: workspaceKeys.permissions(data.id) }) - queryClient.removeQueries({ queryKey: workspaceKeys.members(data.id) }) - } + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: workspaceKeys.lists() }) + queryClient.invalidateQueries({ queryKey: workspaceKeys.adminLists() }) }, }) } diff --git a/apps/sim/lib/copilot/client-sse/handlers.ts b/apps/sim/lib/copilot/client-sse/handlers.ts index 0330853d69c..7705e1cf3dd 100644 --- a/apps/sim/lib/copilot/client-sse/handlers.ts +++ b/apps/sim/lib/copilot/client-sse/handlers.ts @@ -3,6 +3,7 @@ import { STREAM_STORAGE_KEY } from '@/lib/copilot/constants' import { asRecord } from '@/lib/copilot/orchestrator/sse/utils' import type { SSEEvent } from '@/lib/copilot/orchestrator/types' import { + abortAllInProgressTools, isBackgroundState, isRejectedState, isReviewState, @@ -956,7 +957,7 @@ export const sseHandlers: Record = { })) context.streamComplete = true }, - stream_end: (_data, context, _get, set) => { + stream_end: (_data, context, get, set) => { if (context.pendingContent) { if (context.isInThinkingBlock && context.currentThinkingBlock) { appendThinkingContent(context, context.pendingContent) @@ -967,6 +968,7 @@ export const sseHandlers: Record = { } finalizeThinkingBlock(context) updateStreamingMessage(set, context) + abortAllInProgressTools(set, get) }, default: () => {}, } diff --git a/apps/sim/lib/core/security/csp.ts b/apps/sim/lib/core/security/csp.ts index b1148998dd4..c5dde9ef41d 100644 --- a/apps/sim/lib/core/security/csp.ts +++ b/apps/sim/lib/core/security/csp.ts @@ -202,15 +202,28 @@ export function getWorkflowExecutionCSPPolicy(): string { } /** - * CSP for embeddable form pages + * Shared CSP for embeddable pages (chat, forms) * Allows embedding in iframes from any origin while maintaining other security policies */ -export function getFormEmbedCSPPolicy(): string { - const basePolicy = buildCSPString({ +function getEmbedCSPPolicy(): string { + return buildCSPString({ ...buildTimeCSPDirectives, 'frame-ancestors': ['*'], }) - return basePolicy +} + +/** + * CSP for embeddable chat pages + */ +export function getChatEmbedCSPPolicy(): string { + return getEmbedCSPPolicy() +} + +/** + * CSP for embeddable form pages + */ +export function getFormEmbedCSPPolicy(): string { + return getEmbedCSPPolicy() } /** diff --git a/apps/sim/lib/logs/execution/logger.test.ts b/apps/sim/lib/logs/execution/logger.test.ts index a1bd9962d8d..7cfdb6ea9e6 100644 --- a/apps/sim/lib/logs/execution/logger.test.ts +++ b/apps/sim/lib/logs/execution/logger.test.ts @@ -1,6 +1,6 @@ import { databaseMock, loggerMock } from '@sim/testing' import { beforeEach, describe, expect, test, vi } from 'vitest' -import { ExecutionLogger } from './logger' +import { ExecutionLogger } from '@/lib/logs/execution/logger' vi.mock('@sim/db', () => databaseMock) @@ -112,7 +112,7 @@ describe('ExecutionLogger', () => { expect(typeof logger.getWorkflowExecution).toBe('function') }) - test('preserves start correlation data when execution completes', () => { + test('preserves correlation and diagnostics when execution completes', () => { const loggerInstance = new ExecutionLogger() as any const completedData = loggerInstance.buildCompletedExecutionData({ @@ -140,9 +140,24 @@ describe('ExecutionLogger', () => { }, }, }, + lastStartedBlock: { + blockId: 'block-start', + blockName: 'Start', + blockType: 'agent', + startedAt: '2025-01-01T00:00:00.000Z', + }, + lastCompletedBlock: { + blockId: 'block-end', + blockName: 'Finish', + blockType: 'api', + endedAt: '2025-01-01T00:00:05.000Z', + success: true, + }, }, traceSpans: [], finalOutput: { ok: true }, + finalizationPath: 'completed', + completionFailure: 'fallback failure', executionCost: { tokens: { input: 0, output: 0, total: 0 }, models: {}, @@ -161,6 +176,12 @@ describe('ExecutionLogger', () => { }) expect(completedData.correlation).toEqual(completedData.trigger?.data?.correlation) expect(completedData.finalOutput).toEqual({ ok: true }) + expect(completedData.lastStartedBlock?.blockId).toBe('block-start') + expect(completedData.lastCompletedBlock?.blockId).toBe('block-end') + expect(completedData.finalizationPath).toBe('completed') + expect(completedData.completionFailure).toBe('fallback failure') + expect(completedData.hasTraceSpans).toBe(false) + expect(completedData.traceSpanCount).toBe(0) }) }) diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 64e0df01175..a7c8458ac78 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -27,6 +27,7 @@ import { snapshotService } from '@/lib/logs/execution/snapshot/service' import type { BlockOutputData, ExecutionEnvironment, + ExecutionFinalizationPath, ExecutionTrigger, ExecutionLoggerService as IExecutionLoggerService, TraceSpan, @@ -49,11 +50,21 @@ export interface ToolCall { const logger = createLogger('ExecutionLogger') +function countTraceSpans(traceSpans?: TraceSpan[]): number { + if (!Array.isArray(traceSpans) || traceSpans.length === 0) { + return 0 + } + + return traceSpans.reduce((count, span) => count + 1 + countTraceSpans(span.children), 0) +} + export class ExecutionLogger implements IExecutionLoggerService { private buildCompletedExecutionData(params: { existingExecutionData?: WorkflowExecutionLog['executionData'] traceSpans?: TraceSpan[] finalOutput: BlockOutputData + finalizationPath?: ExecutionFinalizationPath + completionFailure?: string executionCost: { tokens: { input: number @@ -64,7 +75,16 @@ export class ExecutionLogger implements IExecutionLoggerService { } executionState?: SerializableExecutionState }): WorkflowExecutionLog['executionData'] { - const { existingExecutionData, traceSpans, finalOutput, executionCost, executionState } = params + const { + existingExecutionData, + traceSpans, + finalOutput, + finalizationPath, + completionFailure, + executionCost, + executionState, + } = params + const traceSpanCount = countTraceSpans(traceSpans) return { ...(existingExecutionData?.environment @@ -78,6 +98,17 @@ export class ExecutionLogger implements IExecutionLoggerService { existingExecutionData?.trigger?.data?.correlation, } : {}), + ...(existingExecutionData?.error ? { error: existingExecutionData.error } : {}), + ...(existingExecutionData?.lastStartedBlock + ? { lastStartedBlock: existingExecutionData.lastStartedBlock } + : {}), + ...(existingExecutionData?.lastCompletedBlock + ? { lastCompletedBlock: existingExecutionData.lastCompletedBlock } + : {}), + ...(completionFailure ? { completionFailure } : {}), + ...(finalizationPath ? { finalizationPath } : {}), + hasTraceSpans: traceSpanCount > 0, + traceSpanCount, traceSpans, finalOutput, tokens: { @@ -173,6 +204,8 @@ export class ExecutionLogger implements IExecutionLoggerService { environment, trigger, ...(trigger.data?.correlation ? { correlation: trigger.data.correlation } : {}), + hasTraceSpans: false, + traceSpanCount: 0, }, cost: { total: BASE_EXECUTION_CHARGE, @@ -232,6 +265,8 @@ export class ExecutionLogger implements IExecutionLoggerService { traceSpans?: TraceSpan[] workflowInput?: any executionState?: SerializableExecutionState + finalizationPath?: ExecutionFinalizationPath + completionFailure?: string isResume?: boolean level?: 'info' | 'error' status?: 'completed' | 'failed' | 'cancelled' | 'pending' @@ -245,6 +280,8 @@ export class ExecutionLogger implements IExecutionLoggerService { traceSpans, workflowInput, executionState, + finalizationPath, + completionFailure, isResume, level: levelOverride, status: statusOverride, @@ -315,6 +352,16 @@ export class ExecutionLogger implements IExecutionLoggerService { ? Math.max(0, Math.round(rawDurationMs)) : 0 + const completedExecutionData = this.buildCompletedExecutionData({ + existingExecutionData, + traceSpans: redactedTraceSpans, + finalOutput: redactedFinalOutput, + finalizationPath, + completionFailure, + executionCost, + executionState, + }) + const [updatedLog] = await db .update(workflowExecutionLogs) .set({ @@ -323,13 +370,7 @@ export class ExecutionLogger implements IExecutionLoggerService { endedAt: new Date(endedAt), totalDurationMs: totalDuration, files: executionFiles.length > 0 ? executionFiles : null, - executionData: this.buildCompletedExecutionData({ - existingExecutionData, - traceSpans: redactedTraceSpans, - finalOutput: redactedFinalOutput, - executionCost, - executionState, - }), + executionData: completedExecutionData, cost: executionCost, }) .where(eq(workflowExecutionLogs.executionId, executionId)) diff --git a/apps/sim/lib/logs/execution/logging-session.test.ts b/apps/sim/lib/logs/execution/logging-session.test.ts index 2f9bd2370f2..a44bc5e72d5 100644 --- a/apps/sim/lib/logs/execution/logging-session.test.ts +++ b/apps/sim/lib/logs/execution/logging-session.test.ts @@ -1,11 +1,48 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' +const dbMocks = vi.hoisted(() => { + const selectLimit = vi.fn() + const selectWhere = vi.fn() + const selectFrom = vi.fn() + const select = vi.fn() + const updateWhere = vi.fn() + const updateSet = vi.fn() + const update = vi.fn() + const execute = vi.fn() + const eq = vi.fn() + const sql = vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })) + + select.mockReturnValue({ from: selectFrom }) + selectFrom.mockReturnValue({ where: selectWhere }) + selectWhere.mockReturnValue({ limit: selectLimit }) + + update.mockReturnValue({ set: updateSet }) + updateSet.mockReturnValue({ where: updateWhere }) + + return { + select, + selectFrom, + selectWhere, + selectLimit, + update, + updateSet, + updateWhere, + execute, + eq, + sql, + } +}) + const { completeWorkflowExecutionMock } = vi.hoisted(() => ({ completeWorkflowExecutionMock: vi.fn(), })) vi.mock('@sim/db', () => ({ - db: {}, + db: { + select: dbMocks.select, + update: dbMocks.update, + execute: dbMocks.execute, + }, })) vi.mock('@sim/db/schema', () => ({ @@ -22,8 +59,8 @@ vi.mock('@sim/logger', () => ({ })) vi.mock('drizzle-orm', () => ({ - eq: vi.fn(), - sql: vi.fn(), + eq: dbMocks.eq, + sql: dbMocks.sql, })) vi.mock('@/lib/logs/execution/logger', () => ({ @@ -56,6 +93,9 @@ import { LoggingSession } from './logging-session' describe('LoggingSession completion retries', () => { beforeEach(() => { vi.clearAllMocks() + dbMocks.selectLimit.mockResolvedValue([{ executionData: {} }]) + dbMocks.updateWhere.mockResolvedValue(undefined) + dbMocks.execute.mockResolvedValue(undefined) }) it('keeps completion best-effort when a later error completion retries after full completion and fallback both fail', async () => { @@ -86,7 +126,6 @@ describe('LoggingSession completion retries', () => { .mockRejectedValueOnce(new Error('cost only failed')) await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined() - await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined() expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2) @@ -118,6 +157,64 @@ describe('LoggingSession completion retries', () => { expect(session.hasCompleted()).toBe(true) }) + it('preserves successful final output during fallback completion', async () => { + const session = new LoggingSession('workflow-1', 'execution-5', 'api', 'req-1') + + completeWorkflowExecutionMock + .mockRejectedValueOnce(new Error('success finalize failed')) + .mockResolvedValueOnce({}) + + await expect( + session.safeComplete({ finalOutput: { ok: true, stage: 'done' } }) + ).resolves.toBeUndefined() + + expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith( + expect.objectContaining({ + executionId: 'execution-5', + finalOutput: { ok: true, stage: 'done' }, + finalizationPath: 'fallback_completed', + }) + ) + }) + + it('preserves accumulated cost during fallback completion', async () => { + const session = new LoggingSession('workflow-1', 'execution-6', 'api', 'req-1') as any + + session.accumulatedCost = { + total: 12, + input: 5, + output: 7, + tokens: { input: 11, output: 13, total: 24 }, + models: { + 'test-model': { + input: 5, + output: 7, + total: 12, + tokens: { input: 11, output: 13, total: 24 }, + }, + }, + } + session.costFlushed = true + + completeWorkflowExecutionMock + .mockRejectedValueOnce(new Error('success finalize failed')) + .mockResolvedValueOnce({}) + + await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined() + + expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith( + expect.objectContaining({ + executionId: 'execution-6', + costSummary: expect.objectContaining({ + totalCost: 12, + totalInputCost: 5, + totalOutputCost: 7, + totalTokens: 24, + }), + }) + ) + }) + it('persists failed error semantics when completeWithError receives non-error trace spans', async () => { const session = new LoggingSession('workflow-1', 'execution-4', 'api', 'req-1') const traceSpans = [ @@ -148,6 +245,8 @@ describe('LoggingSession completion retries', () => { traceSpans, level: 'error', status: 'failed', + finalizationPath: 'force_failed', + completionFailure: 'persist me as failed', }) ) }) @@ -196,4 +295,168 @@ describe('LoggingSession completion retries', () => { expect(session.hasCompleted()).toBe(true) expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2) }) + + it('persists last started block independently from cost accumulation', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + + expect(dbMocks.select).not.toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + + it('enforces started marker monotonicity in the database write path', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + + expect(dbMocks.sql).toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + + it('allows same-millisecond started markers to replace the prior marker', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + + const queryCall = dbMocks.sql.mock.calls.at(-1) + expect(queryCall).toBeDefined() + + const [query] = queryCall! + expect(Array.from(query).join(' ')).toContain('<=') + }) + + it('persists last completed block for zero-cost outputs', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockComplete('block-2', 'Transform', 'function', { + endedAt: '2025-01-01T00:00:01.000Z', + output: { value: true }, + }) + + expect(dbMocks.select).not.toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + + it('allows same-millisecond completed markers to replace the prior marker', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockComplete('block-2', 'Transform', 'function', { + endedAt: '2025-01-01T00:00:01.000Z', + output: { value: true }, + }) + + const queryCall = dbMocks.sql.mock.calls.at(-1) + expect(queryCall).toBeDefined() + + const [query] = queryCall! + expect(Array.from(query).join(' ')).toContain('<=') + }) + + it('drains pending lifecycle writes before terminal completion', async () => { + let releasePersist: (() => void) | undefined + const persistPromise = new Promise((resolve) => { + releasePersist = resolve + }) + + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + session.persistLastStartedBlock = vi.fn(() => persistPromise) + session.complete = vi.fn().mockResolvedValue(undefined) + + const startPromise = session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + const completionPromise = session.safeComplete({ finalOutput: { ok: true } }) + + await Promise.resolve() + + expect(session.complete).not.toHaveBeenCalled() + + releasePersist?.() + + await startPromise + await completionPromise + + expect(session.persistLastStartedBlock).toHaveBeenCalledTimes(1) + expect(session.complete).toHaveBeenCalledTimes(1) + }) + + it('drains fire-and-forget cost flushes before terminal completion', async () => { + let releaseFlush: (() => void) | undefined + const flushPromise = new Promise((resolve) => { + releaseFlush = resolve + }) + + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + session.flushAccumulatedCost = vi.fn(() => flushPromise) + session.complete = vi.fn().mockResolvedValue(undefined) + + await session.onBlockComplete('block-2', 'Transform', 'function', { + endedAt: '2025-01-01T00:00:01.000Z', + output: { value: true }, + cost: { total: 1, input: 1, output: 0 }, + tokens: { input: 1, output: 0, total: 1 }, + model: 'test-model', + }) + + const completionPromise = session.safeComplete({ finalOutput: { ok: true } }) + + await Promise.resolve() + + expect(session.complete).not.toHaveBeenCalled() + + releaseFlush?.() + + await completionPromise + + expect(session.flushAccumulatedCost).toHaveBeenCalledTimes(1) + expect(session.complete).toHaveBeenCalledTimes(1) + }) + + it('keeps draining when new progress writes arrive during drain', async () => { + let releaseFirst: (() => void) | undefined + let releaseSecond: (() => void) | undefined + const firstPromise = new Promise((resolve) => { + releaseFirst = resolve + }) + const secondPromise = new Promise((resolve) => { + releaseSecond = resolve + }) + + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + + void session.trackProgressWrite(firstPromise) + + const drainPromise = session.drainPendingProgressWrites() + + await Promise.resolve() + + void session.trackProgressWrite(secondPromise) + releaseFirst?.() + + await Promise.resolve() + + let drained = false + void drainPromise.then(() => { + drained = true + }) + + await Promise.resolve() + expect(drained).toBe(false) + + releaseSecond?.() + await drainPromise + + expect(session.pendingProgressWrites.size).toBe(0) + }) + + it('marks pause completion as terminal and prevents duplicate pause finalization', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + session.completeExecutionWithFinalization = vi.fn().mockResolvedValue(undefined) + + await session.completeWithPause({ workflowInput: { ok: true } }) + await session.completeWithPause({ workflowInput: { ok: true } }) + + expect(session.completeExecutionWithFinalization).toHaveBeenCalledTimes(1) + expect(session.completed).toBe(true) + expect(session.completing).toBe(true) + }) }) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index 4cd4220d5bf..7fb72bc1d73 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -13,6 +13,9 @@ import { } from '@/lib/logs/execution/logging-factory' import type { ExecutionEnvironment, + ExecutionFinalizationPath, + ExecutionLastCompletedBlock, + ExecutionLastStartedBlock, ExecutionTrigger, TraceSpan, WorkflowState, @@ -23,6 +26,46 @@ type TriggerData = Record & { correlation?: NonNullable['correlation'] } +function buildStartedMarkerPersistenceQuery(params: { + executionId: string + marker: ExecutionLastStartedBlock +}) { + const markerJson = JSON.stringify(params.marker) + + return sql`UPDATE workflow_execution_logs + SET execution_data = jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + '{lastStartedBlock}', + ${markerJson}::jsonb, + true + ) + WHERE execution_id = ${params.executionId} + AND COALESCE( + jsonb_extract_path_text(COALESCE(execution_data, '{}'::jsonb), 'lastStartedBlock', 'startedAt'), + '' + ) <= ${params.marker.startedAt}` +} + +function buildCompletedMarkerPersistenceQuery(params: { + executionId: string + marker: ExecutionLastCompletedBlock +}) { + const markerJson = JSON.stringify(params.marker) + + return sql`UPDATE workflow_execution_logs + SET execution_data = jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + '{lastCompletedBlock}', + ${markerJson}::jsonb, + true + ) + WHERE execution_id = ${params.executionId} + AND COALESCE( + jsonb_extract_path_text(COALESCE(execution_data, '{}'::jsonb), 'lastCompletedBlock', 'endedAt'), + '' + ) <= ${params.marker.endedAt}` +} + const logger = createLogger('LoggingSession') type CompletionAttempt = 'complete' | 'error' | 'cancelled' | 'paused' @@ -109,6 +152,7 @@ export class LoggingSession { tokens: { input: 0, output: 0, total: 0 }, models: {}, } + private pendingProgressWrites = new Set>() private costFlushed = false private postExecutionPromise: Promise | null = null @@ -124,12 +168,132 @@ export class LoggingSession { this.requestId = requestId } + async onBlockStart( + blockId: string, + blockName: string, + blockType: string, + startedAt: string + ): Promise { + await this.trackProgressWrite( + this.persistLastStartedBlock({ + blockId, + blockName, + blockType, + startedAt, + }) + ) + } + + private async persistLastStartedBlock(marker: ExecutionLastStartedBlock): Promise { + try { + await db.execute( + buildStartedMarkerPersistenceQuery({ + executionId: this.executionId, + marker, + }) + ) + } catch (error) { + logger.error(`Failed to persist last started block for execution ${this.executionId}:`, { + error: error instanceof Error ? error.message : String(error), + }) + } + } + + private async persistLastCompletedBlock(marker: ExecutionLastCompletedBlock): Promise { + try { + await db.execute( + buildCompletedMarkerPersistenceQuery({ + executionId: this.executionId, + marker, + }) + ) + } catch (error) { + logger.error(`Failed to persist last completed block for execution ${this.executionId}:`, { + error: error instanceof Error ? error.message : String(error), + }) + } + } + + private async trackProgressWrite(writePromise: Promise): Promise { + this.pendingProgressWrites.add(writePromise) + + try { + await writePromise + } finally { + this.pendingProgressWrites.delete(writePromise) + } + } + + private async drainPendingProgressWrites(): Promise { + while (this.pendingProgressWrites.size > 0) { + await Promise.allSettled(Array.from(this.pendingProgressWrites)) + } + } + + private async completeExecutionWithFinalization(params: { + endedAt: string + totalDurationMs: number + costSummary: { + totalCost: number + totalInputCost: number + totalOutputCost: number + totalTokens: number + totalPromptTokens: number + totalCompletionTokens: number + baseExecutionCharge: number + modelCost: number + models: Record< + string, + { + input: number + output: number + total: number + tokens: { input: number; output: number; total: number } + } + > + } + finalOutput: Record + traceSpans: TraceSpan[] + workflowInput?: unknown + executionState?: SerializableExecutionState + finalizationPath: ExecutionFinalizationPath + completionFailure?: string + level?: 'info' | 'error' + status?: 'completed' | 'failed' | 'cancelled' | 'pending' + }): Promise { + await executionLogger.completeWorkflowExecution({ + executionId: this.executionId, + endedAt: params.endedAt, + totalDurationMs: params.totalDurationMs, + costSummary: params.costSummary, + finalOutput: params.finalOutput, + traceSpans: params.traceSpans, + workflowInput: params.workflowInput, + executionState: params.executionState, + finalizationPath: params.finalizationPath, + completionFailure: params.completionFailure, + isResume: this.isResume, + level: params.level, + status: params.status, + }) + } + async onBlockComplete( blockId: string, blockName: string, blockType: string, output: any ): Promise { + await this.trackProgressWrite( + this.persistLastCompletedBlock({ + blockId, + blockName, + blockType, + endedAt: output?.endedAt || new Date().toISOString(), + success: !output?.output?.error, + }) + ) + if (!output?.cost || typeof output.cost.total !== 'number' || output.cost.total <= 0) { return } @@ -165,7 +329,7 @@ export class LoggingSession { } } - await this.flushAccumulatedCost() + void this.trackProgressWrite(this.flushAccumulatedCost()) } private async flushAccumulatedCost(): Promise { @@ -276,8 +440,7 @@ export class LoggingSession { const endTime = endedAt || new Date().toISOString() const duration = totalDurationMs || 0 - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime, totalDurationMs: duration, costSummary, @@ -285,7 +448,7 @@ export class LoggingSession { traceSpans: traceSpans || [], workflowInput, executionState, - isResume: this.isResume, + finalizationPath: 'completed', }) this.completed = true @@ -403,8 +566,7 @@ export class LoggingSession { const spans = hasProvidedSpans ? traceSpans : [errorSpan] - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime.toISOString(), totalDurationMs: Math.max(1, durationMs), costSummary, @@ -412,6 +574,8 @@ export class LoggingSession { traceSpans: spans, level: 'error', status: 'failed', + finalizationPath: 'force_failed', + completionFailure: message, }) this.completed = true @@ -490,13 +654,13 @@ export class LoggingSession { models: {}, } - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime.toISOString(), totalDurationMs: Math.max(1, durationMs), costSummary, finalOutput: { cancelled: true }, traceSpans: traceSpans || [], + finalizationPath: 'cancelled', status: 'cancelled', }) @@ -577,14 +741,14 @@ export class LoggingSession { models: {}, } - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime.toISOString(), totalDurationMs: Math.max(1, durationMs), costSummary, finalOutput: { paused: true }, traceSpans: traceSpans || [], workflowInput, + finalizationPath: 'paused', status: 'pending', }) @@ -746,7 +910,6 @@ export class LoggingSession { this.completionAttemptFailed = true throw error }) - return this.completionPromise } @@ -756,6 +919,7 @@ export class LoggingSession { private async _safeCompleteImpl(params: SessionCompleteParams = {}): Promise { try { + await this.drainPendingProgressWrites() await this.complete(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -769,6 +933,8 @@ export class LoggingSession { totalDurationMs: params.totalDurationMs, errorMessage: `Failed to store trace spans: ${errorMsg}`, isError: false, + finalizationPath: 'fallback_completed', + finalOutput: params.finalOutput || {}, }) } } @@ -779,6 +945,7 @@ export class LoggingSession { private async _safeCompleteWithErrorImpl(params?: SessionErrorCompleteParams): Promise { try { + await this.drainPendingProgressWrites() await this.completeWithError(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -793,6 +960,10 @@ export class LoggingSession { errorMessage: params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`, isError: true, + finalizationPath: 'force_failed', + finalOutput: { + error: params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`, + }, status: 'failed', }) } @@ -806,6 +977,7 @@ export class LoggingSession { private async _safeCompleteWithCancellationImpl(params?: SessionCancelledParams): Promise { try { + await this.drainPendingProgressWrites() await this.completeWithCancellation(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -819,6 +991,8 @@ export class LoggingSession { totalDurationMs: params?.totalDurationMs, errorMessage: 'Execution was cancelled', isError: false, + finalizationPath: 'cancelled', + finalOutput: { cancelled: true }, status: 'cancelled', }) } @@ -830,6 +1004,7 @@ export class LoggingSession { private async _safeCompleteWithPauseImpl(params?: SessionPausedParams): Promise { try { + await this.drainPendingProgressWrites() await this.completeWithPause(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -843,6 +1018,8 @@ export class LoggingSession { totalDurationMs: params?.totalDurationMs, errorMessage: 'Execution paused but failed to store full trace spans', isError: false, + finalizationPath: 'paused', + finalOutput: { paused: true }, status: 'pending', }) } @@ -867,12 +1044,16 @@ export class LoggingSession { status: 'failed', executionData: sql`jsonb_set( jsonb_set( - COALESCE(execution_data, '{}'::jsonb), - ARRAY['error'], - to_jsonb(${message}::text) + jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + ARRAY['error'], + to_jsonb(${message}::text) + ), + ARRAY['finalOutput'], + jsonb_build_object('error', ${message}::text) ), - ARRAY['finalOutput'], - jsonb_build_object('error', ${message}::text) + ARRAY['finalizationPath'], + to_jsonb('force_failed'::text) )`, }) .where(eq(workflowExecutionLogs.executionId, executionId)) @@ -891,6 +1072,8 @@ export class LoggingSession { totalDurationMs?: number errorMessage: string isError: boolean + finalizationPath: ExecutionFinalizationPath + finalOutput?: Record status?: 'completed' | 'failed' | 'cancelled' | 'pending' }): Promise { if (this.completed || this.completing) { @@ -903,28 +1086,48 @@ export class LoggingSession { ) try { - const costSummary = params.traceSpans?.length - ? calculateCostSummary(params.traceSpans) - : { - totalCost: BASE_EXECUTION_CHARGE, - totalInputCost: 0, - totalOutputCost: 0, - totalTokens: 0, - totalPromptTokens: 0, - totalCompletionTokens: 0, + const hasAccumulatedCost = + this.costFlushed || + this.accumulatedCost.total > BASE_EXECUTION_CHARGE || + this.accumulatedCost.tokens.total > 0 || + Object.keys(this.accumulatedCost.models).length > 0 + + const costSummary = hasAccumulatedCost + ? { + totalCost: this.accumulatedCost.total, + totalInputCost: this.accumulatedCost.input, + totalOutputCost: this.accumulatedCost.output, + totalTokens: this.accumulatedCost.tokens.total, + totalPromptTokens: this.accumulatedCost.tokens.input, + totalCompletionTokens: this.accumulatedCost.tokens.output, baseExecutionCharge: BASE_EXECUTION_CHARGE, - modelCost: 0, - models: {}, + modelCost: Math.max(0, this.accumulatedCost.total - BASE_EXECUTION_CHARGE), + models: this.accumulatedCost.models, } + : params.traceSpans?.length + ? calculateCostSummary(params.traceSpans) + : { + totalCost: BASE_EXECUTION_CHARGE, + totalInputCost: 0, + totalOutputCost: 0, + totalTokens: 0, + totalPromptTokens: 0, + totalCompletionTokens: 0, + baseExecutionCharge: BASE_EXECUTION_CHARGE, + modelCost: 0, + models: {}, + } - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + const finalOutput = params.finalOutput || { _fallback: true, error: params.errorMessage } + + await this.completeExecutionWithFinalization({ endedAt: params.endedAt || new Date().toISOString(), totalDurationMs: params.totalDurationMs || 0, costSummary, - finalOutput: { _fallback: true, error: params.errorMessage }, + finalOutput, traceSpans: [], - isResume: this.isResume, + finalizationPath: params.finalizationPath, + completionFailure: params.errorMessage, level: params.isError ? 'error' : 'info', status: params.status, }) diff --git a/apps/sim/lib/logs/types.ts b/apps/sim/lib/logs/types.ts index c785c3037db..b9402583704 100644 --- a/apps/sim/lib/logs/types.ts +++ b/apps/sim/lib/logs/types.ts @@ -71,6 +71,31 @@ export interface ExecutionStatus { durationMs?: number } +export const EXECUTION_FINALIZATION_PATHS = [ + 'completed', + 'fallback_completed', + 'force_failed', + 'cancelled', + 'paused', +] as const + +export type ExecutionFinalizationPath = (typeof EXECUTION_FINALIZATION_PATHS)[number] + +export interface ExecutionLastStartedBlock { + blockId: string + blockName: string + blockType: string + startedAt: string +} + +export interface ExecutionLastCompletedBlock { + blockId: string + blockName: string + blockType: string + endedAt: string + success: boolean +} + export interface WorkflowExecutionSnapshot { id: string workflowId: string | null @@ -105,6 +130,13 @@ export interface WorkflowExecutionLog { environment?: ExecutionEnvironment trigger?: ExecutionTrigger correlation?: AsyncExecutionCorrelation + error?: string + lastStartedBlock?: ExecutionLastStartedBlock + lastCompletedBlock?: ExecutionLastCompletedBlock + hasTraceSpans?: boolean + traceSpanCount?: number + completionFailure?: string + finalizationPath?: ExecutionFinalizationPath traceSpans?: TraceSpan[] tokens?: { input?: number; output?: number; total?: number } models?: Record< @@ -378,6 +410,9 @@ export interface ExecutionLoggerService { finalOutput: BlockOutputData traceSpans?: TraceSpan[] workflowInput?: any + executionState?: SerializableExecutionState + finalizationPath?: ExecutionFinalizationPath + completionFailure?: string isResume?: boolean level?: 'info' | 'error' status?: 'completed' | 'failed' | 'cancelled' | 'pending' diff --git a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts index 9c3dd16ec03..4d18638d3ff 100644 --- a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts +++ b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts @@ -209,8 +209,9 @@ export async function uploadWorkspaceFile( /** * Track a file that was already uploaded to workspace S3 as a chat-scoped upload. - * Creates a workspaceFiles record with context='mothership' and the given chatId. - * No S3 operations -- the file is already in storage from the presigned/upload step. + * Links the existing workspaceFiles metadata record (created by the storage service + * during upload) to the chat by setting chatId and context='mothership'. + * Falls back to inserting a new record if none exists for the key. */ export async function trackChatUpload( workspaceId: string, @@ -221,6 +222,17 @@ export async function trackChatUpload( contentType: string, size: number ): Promise { + const updated = await db + .update(workspaceFiles) + .set({ chatId, context: 'mothership' }) + .where(and(eq(workspaceFiles.key, s3Key), eq(workspaceFiles.workspaceId, workspaceId), isNull(workspaceFiles.deletedAt))) + .returning({ id: workspaceFiles.id }) + + if (updated.length > 0) { + logger.info(`Linked existing file record to chat: ${fileName} for chat ${chatId}`) + return + } + const fileId = `wf_${Date.now()}_${Math.random().toString(36).substring(2, 9)}` await db.insert(workspaceFiles).values({ diff --git a/apps/sim/lib/workflows/executor/execution-core.test.ts b/apps/sim/lib/workflows/executor/execution-core.test.ts index bbb702a11ae..048ba62492a 100644 --- a/apps/sim/lib/workflows/executor/execution-core.test.ts +++ b/apps/sim/lib/workflows/executor/execution-core.test.ts @@ -16,6 +16,8 @@ const { buildTraceSpansMock, serializeWorkflowMock, executorExecuteMock, + onBlockStartPersistenceMock, + executorConstructorMock, } = vi.hoisted(() => ({ loadWorkflowFromNormalizedTablesMock: vi.fn(), loadDeployedWorkflowStateMock: vi.fn(), @@ -32,6 +34,8 @@ const { buildTraceSpansMock: vi.fn(), serializeWorkflowMock: vi.fn(), executorExecuteMock: vi.fn(), + onBlockStartPersistenceMock: vi.fn(), + executorConstructorMock: vi.fn(), })) vi.mock('@sim/logger', () => ({ @@ -79,10 +83,13 @@ vi.mock('@/lib/workflows/utils', () => ({ })) vi.mock('@/executor', () => ({ - Executor: vi.fn().mockImplementation(() => ({ - execute: executorExecuteMock, - executeFromBlock: executorExecuteMock, - })), + Executor: vi.fn().mockImplementation((args) => { + executorConstructorMock(args) + return { + execute: executorExecuteMock, + executeFromBlock: executorExecuteMock, + } + }), })) vi.mock('@/serializer', () => ({ @@ -105,6 +112,8 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { safeCompleteWithCancellation: safeCompleteWithCancellationMock, safeCompleteWithPause: safeCompleteWithPauseMock, hasCompleted: hasCompletedMock, + onBlockStart: onBlockStartPersistenceMock, + onBlockComplete: vi.fn(), setPostExecutionPromise: vi.fn(), waitForPostExecution: vi.fn().mockResolvedValue(undefined), } @@ -176,10 +185,72 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { safeCompleteWithCancellationMock.mockResolvedValue(undefined) safeCompleteWithPauseMock.mockResolvedValue(undefined) hasCompletedMock.mockReturnValue(true) + onBlockStartPersistenceMock.mockResolvedValue(undefined) updateWorkflowRunCountsMock.mockResolvedValue(undefined) clearExecutionCancellationMock.mockResolvedValue(undefined) }) + it('routes onBlockStart through logging session persistence path', async () => { + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockStart: async (blockId) => { + expect(blockId).toBe('block-1') + }, + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + await contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + + expect(onBlockStartPersistenceMock).toHaveBeenCalledWith( + 'block-1', + 'Fetch', + 'api', + expect.any(String) + ) + }) + + it('does not await user block start callback after persistence completes', async () => { + let releaseCallback: (() => void) | undefined + const callbackPromise = new Promise((resolve) => { + releaseCallback = resolve + }) + + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockStart: vi.fn(() => callbackPromise), + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + + await expect( + contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + ).resolves.toBeUndefined() + + releaseCallback?.() + }) + it('awaits terminal completion before updating run counts and returning', async () => { const callOrder: string[] = [] @@ -222,7 +293,57 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { ]) }) - it('clears cancellation even when success finalization throws', async () => { + it('awaits wrapped lifecycle persistence before terminal finalization returns', async () => { + let releaseBlockStart: (() => void) | undefined + const blockStartPromise = new Promise((resolve) => { + releaseBlockStart = resolve + }) + const callOrder: string[] = [] + + onBlockStartPersistenceMock.mockImplementation(async () => { + callOrder.push('persist:start') + await blockStartPromise + callOrder.push('persist:end') + }) + + safeCompleteMock.mockImplementation(async () => { + callOrder.push('safeComplete') + }) + + executorExecuteMock.mockImplementation(async () => { + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + const startLifecycle = contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + await Promise.resolve() + callOrder.push('executor:before-release') + releaseBlockStart?.() + await startLifecycle + callOrder.push('executor:after-start') + + return { + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + } + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: {}, + loggingSession: loggingSession as any, + }) + + expect(callOrder).toEqual([ + 'persist:start', + 'executor:before-release', + 'persist:end', + 'executor:after-start', + 'safeComplete', + ]) + }) + + it('preserves successful execution when success finalization throws', async () => { executorExecuteMock.mockResolvedValue({ success: true, status: 'completed', @@ -244,7 +365,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(result.status).toBe('completed') expect(clearExecutionCancellationMock).toHaveBeenCalledWith('execution-1') - expect(updateWorkflowRunCountsMock).not.toHaveBeenCalled() + expect(updateWorkflowRunCountsMock).toHaveBeenCalledWith('workflow-1') }) it('routes cancelled executions through safeCompleteWithCancellation', async () => { @@ -304,6 +425,61 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(updateWorkflowRunCountsMock).not.toHaveBeenCalled() }) + it('swallows wrapped block start callback failures without breaking execution', async () => { + onBlockStartPersistenceMock.mockRejectedValue(new Error('start persistence failed')) + + executorExecuteMock.mockImplementation(async () => { + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + await contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + + return { + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + } + }) + + const result = await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: {}, + loggingSession: loggingSession as any, + }) + + expect(result.status).toBe('completed') + expect(safeCompleteMock).toHaveBeenCalledTimes(1) + }) + + it('swallows wrapped block complete callback failures without blocking completion', async () => { + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockComplete: vi.fn().mockRejectedValue(new Error('complete callback failed')), + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + + await expect( + contextExtensions.onBlockComplete('block-1', 'Fetch', 'api', { + output: { ok: true }, + executionTime: 1, + startedAt: 'start', + endedAt: 'end', + }) + ).resolves.toBeUndefined() + }) + it('finalizes errors before rethrowing and marks them as core-finalized', async () => { const error = new Error('engine failed') const executionResult = { @@ -445,7 +621,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(wasExecutionFinalizedByCore('engine failed', 'execution-a')).toBe(true) }) - it('logs error without rejecting when success finalization rejects', async () => { + it('does not replace a successful outcome when success finalization rejects', async () => { executorExecuteMock.mockResolvedValue({ success: true, status: 'completed', @@ -464,7 +640,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { await loggingSession.setPostExecutionPromise.mock.calls[0][0] - expect(result.status).toBe('completed') + expect(result).toMatchObject({ status: 'completed', success: true }) expect(clearExecutionCancellationMock).toHaveBeenCalledWith('execution-1') expect(safeCompleteWithErrorMock).not.toHaveBeenCalled() }) diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 264cc249e37..aa96e5668a5 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -179,33 +179,41 @@ async function finalizeExecutionOutcome(params: { const endedAt = new Date().toISOString() try { - if (result.status === 'cancelled') { - await loggingSession.safeCompleteWithCancellation({ - endedAt, - totalDurationMs: totalDuration || 0, - traceSpans: traceSpans || [], - }) - return - } + try { + if (result.status === 'cancelled') { + await loggingSession.safeCompleteWithCancellation({ + endedAt, + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + }) + return + } + + if (result.status === 'paused') { + await loggingSession.safeCompleteWithPause({ + endedAt, + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + workflowInput, + }) + return + } - if (result.status === 'paused') { - await loggingSession.safeCompleteWithPause({ + await loggingSession.safeComplete({ endedAt, totalDurationMs: totalDuration || 0, + finalOutput: result.output || {}, traceSpans: traceSpans || [], workflowInput, + executionState: result.executionState, + }) + } catch (error) { + logger.warn(`[${requestId}] Post-execution finalization failed`, { + executionId, + status: result.status, + error, }) - return } - - await loggingSession.safeComplete({ - endedAt, - totalDurationMs: totalDuration || 0, - finalOutput: result.output || {}, - traceSpans: traceSpans || [], - workflowInput, - executionState: result.executionState, - }) } finally { await clearExecutionCancellationSafely(executionId, requestId) } @@ -424,16 +432,69 @@ export async function executeWorkflowCore( iterationContext?: IterationContext, childWorkflowContext?: ChildWorkflowContext ) => { - await loggingSession.onBlockComplete(blockId, blockName, blockType, output) - if (onBlockComplete) { - await onBlockComplete( + try { + await loggingSession.onBlockComplete(blockId, blockName, blockType, output) + if (onBlockComplete) { + void onBlockComplete( + blockId, + blockName, + blockType, + output, + iterationContext, + childWorkflowContext + ).catch((error) => { + logger.warn(`[${requestId}] Block completion callback failed`, { + executionId, + blockId, + blockType, + error, + }) + }) + } + } catch (error) { + logger.warn(`[${requestId}] Block completion persistence failed`, { + executionId, + blockId, + blockType, + error, + }) + } + } + + const wrappedOnBlockStart = async ( + blockId: string, + blockName: string, + blockType: string, + executionOrder: number, + iterationContext?: IterationContext, + childWorkflowContext?: ChildWorkflowContext + ) => { + try { + await loggingSession.onBlockStart(blockId, blockName, blockType, new Date().toISOString()) + if (onBlockStart) { + void onBlockStart( + blockId, + blockName, + blockType, + executionOrder, + iterationContext, + childWorkflowContext + ).catch((error) => { + logger.warn(`[${requestId}] Block start callback failed`, { + executionId, + blockId, + blockType, + error, + }) + }) + } + } catch (error) { + logger.warn(`[${requestId}] Block start persistence failed`, { + executionId, blockId, - blockName, blockType, - output, - iterationContext, - childWorkflowContext - ) + error, + }) } } @@ -445,7 +506,7 @@ export async function executeWorkflowCore( userId, isDeployedContext: !metadata.isClientSession, enforceCredentialAccess: metadata.enforceCredentialAccess ?? false, - onBlockStart, + onBlockStart: wrappedOnBlockStart, onBlockComplete: wrappedOnBlockComplete, onStream, resumeFromSnapshot, diff --git a/apps/sim/next.config.ts b/apps/sim/next.config.ts index e5975cc6e9a..1b23b9ec93d 100644 --- a/apps/sim/next.config.ts +++ b/apps/sim/next.config.ts @@ -2,6 +2,7 @@ import type { NextConfig } from 'next' import { env, getEnv, isTruthy } from './lib/core/config/env' import { isDev } from './lib/core/config/feature-flags' import { + getChatEmbedCSPPolicy, getFormEmbedCSPPolicy, getMainCSPPolicy, getWorkflowExecutionCSPPolicy, @@ -255,6 +256,24 @@ const nextConfig: NextConfig = { }, ], }, + // Chat pages - allow iframe embedding from any origin + { + source: '/chat/:path*', + headers: [ + { + key: 'X-Content-Type-Options', + value: 'nosniff', + }, + // No X-Frame-Options to allow iframe embedding + { + key: 'Content-Security-Policy', + value: getChatEmbedCSPPolicy(), + }, + // Permissive CORS for chat requests from embedded chats + { key: 'Cross-Origin-Embedder-Policy', value: 'unsafe-none' }, + { key: 'Cross-Origin-Opener-Policy', value: 'unsafe-none' }, + ], + }, // Form pages - allow iframe embedding from any origin { source: '/form/:path*', @@ -284,10 +303,10 @@ const nextConfig: NextConfig = { ], }, // Apply security headers to routes not handled by middleware runtime CSP - // Middleware handles: /, /workspace/*, /chat/* - // Exclude form routes which have their own permissive headers + // Middleware handles: /, /workspace/* + // Exclude chat and form routes which have their own permissive embed headers { - source: '/((?!workspace|chat$|form).*)', + source: '/((?!workspace|chat|form).*)', headers: [ { key: 'X-Content-Type-Options', diff --git a/apps/sim/proxy.ts b/apps/sim/proxy.ts index 36ada3484f1..acd93ebfa12 100644 --- a/apps/sim/proxy.ts +++ b/apps/sim/proxy.ts @@ -155,6 +155,7 @@ export async function proxy(request: NextRequest) { return response } + // Chat pages are publicly accessible embeds — CSP is set in next.config.ts headers if (url.pathname.startsWith('/chat/')) { return NextResponse.next() } @@ -188,11 +189,7 @@ export async function proxy(request: NextRequest) { const response = NextResponse.next() response.headers.set('Vary', 'User-Agent') - if ( - url.pathname.startsWith('/workspace') || - url.pathname.startsWith('/chat') || - url.pathname === '/' - ) { + if (url.pathname.startsWith('/workspace') || url.pathname === '/') { response.headers.set('Content-Security-Policy', generateRuntimeCSP()) }