From 0397fdf8365a284ef4057699edd15348f396181e Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 1 May 2026 23:12:08 -0700 Subject: [PATCH 1/5] Revert "improvement(executor): correctness-by-construction for workflow logs (#4382)" This reverts commit add55b4ffa938231893ced19979c364c9f1269a8. --- .../app/api/workflows/[id]/execute/route.ts | 17 +- .../hooks/use-workflow-execution.ts | 28 ++- .../utils/workflow-execution-utils.test.ts | 210 ++++++++++++++++-- .../utils/workflow-execution-utils.ts | 177 ++++++++++++++- apps/sim/executor/execution/block-executor.ts | 50 +---- apps/sim/executor/execution/executor.ts | 19 +- apps/sim/executor/execution/types.ts | 11 +- apps/sim/executor/types.ts | 6 - .../workflows/executor/execution-core.test.ts | 31 +++ .../lib/workflows/executor/execution-core.ts | 14 +- .../workflows/executor/execution-events.ts | 26 +-- .../sim/stores/terminal/console/store.test.ts | 153 ------------- apps/sim/stores/terminal/console/store.ts | 96 +------- apps/sim/stores/terminal/console/types.ts | 4 - 14 files changed, 458 insertions(+), 384 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 9b62d97be71..90e4d15b848 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -919,8 +919,7 @@ async function handleExecutePost( blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { reqLogger.info('onBlockStart called', { blockId, blockName, blockType }) sendEvent({ @@ -946,7 +945,6 @@ async function handleExecutePost( childWorkflowBlockId: childWorkflowContext.parentBlockId, childWorkflowName: childWorkflowContext.workflowName, }), - ...(blockExecutionId && { blockExecutionId }), }, }) } @@ -957,8 +955,7 @@ async function handleExecutePost( blockType: string, callbackData: any, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { const hasError = callbackData.output?.error const childWorkflowData = childWorkflowContext @@ -972,11 +969,6 @@ async function handleExecutePost( ? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId } : {} - const resolvedBlockExecutionId = blockExecutionId ?? callbackData.blockExecutionId - const blockExecData = resolvedBlockExecutionId - ? { blockExecutionId: resolvedBlockExecutionId } - : {} - if (hasError) { reqLogger.info('onBlockComplete (error) called', { blockId, @@ -1010,7 +1002,6 @@ async function handleExecutePost( }), ...childWorkflowData, ...instanceData, - ...blockExecData, }, }) } else { @@ -1045,7 +1036,6 @@ async function handleExecutePost( }), ...childWorkflowData, ...instanceData, - ...blockExecData, }, }) } @@ -1175,6 +1165,7 @@ async function handleExecutePost( data: { error: timeoutErrorMessage, duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, }) finalMetaStatus = 'error' @@ -1188,6 +1179,7 @@ async function handleExecutePost( workflowId, data: { duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, }) finalMetaStatus = 'cancelled' @@ -1252,6 +1244,7 @@ async function handleExecutePost( data: { error: executionResult?.error || errorMessage, duration: executionResult?.metadata?.duration || 0, + finalBlockLogs: executionResult?.logs, }, }) finalMetaStatus = 'error' diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 7dcb0a7529c..de67aed44cb 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -230,25 +230,31 @@ export function useWorkflowExecution() { durationMs?: number blockLogs: BlockLog[] isPreExecutionError?: boolean + finalBlockLogs?: BlockLog[] }) => { if (!params.workflowId) return sharedHandleExecutionErrorConsole( - { addConsole, updateConsole }, + { addConsole, updateConsole, cancelRunningEntries }, { ...params, workflowId: params.workflowId } ) }, - [addConsole, updateConsole] + [addConsole, cancelRunningEntries, updateConsole] ) const handleExecutionCancelledConsole = useCallback( - (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { + (params: { + workflowId?: string + executionId?: string + durationMs?: number + finalBlockLogs?: BlockLog[] + }) => { if (!params.workflowId) return sharedHandleExecutionCancelledConsole( - { addConsole, updateConsole }, + { addConsole, updateConsole, cancelRunningEntries }, { ...params, workflowId: params.workflowId } ) }, - [addConsole, updateConsole] + [addConsole, cancelRunningEntries, updateConsole] ) const buildBlockEventHandlers = useCallback( @@ -1030,6 +1036,8 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, + consoleMode: 'update', + includeStartConsoleEntry: true, onBlockCompleteCallback: onBlockComplete, }) @@ -1232,6 +1240,7 @@ export function useWorkflowExecution() { durationMs: data.duration, blockLogs: accumulatedBlockLogs, isPreExecutionError, + finalBlockLogs: data.finalBlockLogs, }) if (activeWorkflowId && !isExecutingFromChat) { @@ -1258,6 +1267,7 @@ export function useWorkflowExecution() { workflowId: activeWorkflowId, executionId: executionIdRef.current, durationMs: data?.duration, + finalBlockLogs: data?.finalBlockLogs, }) if (activeWorkflowId && !isExecutingFromChat) { @@ -1674,6 +1684,8 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, + consoleMode: 'update', + includeStartConsoleEntry: true, }) await executionStream.executeFromBlock({ @@ -1743,6 +1755,7 @@ export function useWorkflowExecution() { error: data.error, durationMs: data.duration, blockLogs: accumulatedBlockLogs, + finalBlockLogs: data.finalBlockLogs, }) setCurrentExecutionId(workflowId, null) @@ -1755,6 +1768,7 @@ export function useWorkflowExecution() { workflowId, executionId: executionIdRef.current, durationMs: data?.duration, + finalBlockLogs: data?.finalBlockLogs, }) setCurrentExecutionId(workflowId, null) @@ -1901,6 +1915,8 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, + consoleMode: 'update', + includeStartConsoleEntry: true, }) const capturedExecutionId = executionId @@ -2001,6 +2017,7 @@ export function useWorkflowExecution() { executionId: capturedExecutionId, error: data.error, blockLogs: accumulatedBlockLogs, + finalBlockLogs: data.finalBlockLogs, }) }, onExecutionCancelled: (data) => { @@ -2021,6 +2038,7 @@ export function useWorkflowExecution() { workflowId: reconnectWorkflowId, executionId: capturedExecutionId, durationMs: data?.duration, + finalBlockLogs: data?.finalBlockLogs, }) }, }, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.test.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.test.ts index a05b215c2a0..13840aad4cd 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.test.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.test.ts @@ -1,12 +1,14 @@ /** * @vitest-environment node */ -import { resetTerminalConsoleMock } from '@sim/testing' +import { resetTerminalConsoleMock, terminalConsoleMockFns } from '@sim/testing' import { beforeEach, describe, expect, it, vi } from 'vitest' import { addExecutionErrorConsoleEntry, handleExecutionErrorConsole, + reconcileFinalBlockLogs, } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils' +import type { BlockLog } from '@/executor/types' describe('workflow-execution-utils', () => { beforeEach(() => { @@ -55,6 +57,52 @@ describe('workflow-execution-utils', () => { expect(addConsole).not.toHaveBeenCalled() }) + it('skips when console store already has a block-level error for this execution (Fix D)', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'fetchAshbyData', + blockName: 'fetchAshbyData', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 1, + success: false, + error: 'Failed to parse response as JSON', + }) + + const addConsole = vi.fn() + addExecutionErrorConsoleEntry(addConsole, { + workflowId: 'wf-1', + executionId: 'exec-1', + error: 'Run failed', + blockLogs: [], + }) + + expect(addConsole).not.toHaveBeenCalled() + }) + + it('still adds when only existing entries are themselves Run Error rows', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'execution-error', + blockName: 'Run Error', + blockType: 'error', + executionId: 'exec-1', + executionOrder: Number.MAX_SAFE_INTEGER, + success: false, + error: 'previous unrelated error', + }) + + const addConsole = vi.fn() + addExecutionErrorConsoleEntry(addConsole, { + workflowId: 'wf-1', + executionId: 'exec-1', + error: 'New run failed', + blockLogs: [], + }) + + expect(addConsole).toHaveBeenCalledTimes(1) + }) + it('uses Timeout Error label when error indicates a timeout', () => { const addConsole = vi.fn() addExecutionErrorConsoleEntry(addConsole, { @@ -83,12 +131,121 @@ describe('workflow-execution-utils', () => { }) }) + describe('reconcileFinalBlockLogs', () => { + const makeLog = (over: Partial): BlockLog => ({ + blockId: 'b1', + blockName: 'Function', + blockType: 'function', + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + durationMs: 50, + success: true, + executionOrder: 1, + ...over, + }) + + it('flips a still-running entry to the server-reported success state', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'kb-1', + blockName: 'Knowledge 1', + blockType: 'knowledge', + executionId: 'exec-1', + executionOrder: 2, + isRunning: true, + }) + + const updateConsole = vi.fn() + reconcileFinalBlockLogs(updateConsole, 'wf-1', 'exec-1', [ + makeLog({ + blockId: 'kb-1', + blockName: 'Knowledge 1', + blockType: 'knowledge', + executionOrder: 2, + success: true, + output: { items: [] }, + }), + ]) + + expect(updateConsole).toHaveBeenCalledTimes(1) + const [blockId, update, executionId] = updateConsole.mock.calls[0] + expect(blockId).toBe('kb-1') + expect(executionId).toBe('exec-1') + expect(update).toMatchObject({ + success: true, + isRunning: false, + replaceOutput: { items: [] }, + }) + }) + + it('flips a still-running entry to the server-reported error state (Bug 1 reconciliation)', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'fn-1', + blockName: 'Function', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 3, + isRunning: true, + }) + + const updateConsole = vi.fn() + reconcileFinalBlockLogs(updateConsole, 'wf-1', 'exec-1', [ + makeLog({ + blockId: 'fn-1', + executionOrder: 3, + success: false, + error: 'JSON parse failed', + }), + ]) + + expect(updateConsole).toHaveBeenCalledTimes(1) + expect(updateConsole.mock.calls[0][1]).toMatchObject({ + success: false, + error: 'JSON parse failed', + isRunning: false, + }) + }) + + it('skips entries that are not running', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'fn-1', + blockName: 'Function', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 1, + isRunning: false, + success: true, + }) + + const updateConsole = vi.fn() + reconcileFinalBlockLogs(updateConsole, 'wf-1', 'exec-1', [makeLog({ blockId: 'fn-1' })]) + + expect(updateConsole).not.toHaveBeenCalled() + }) + + it('is a no-op when finalBlockLogs is empty or executionId is missing', () => { + const updateConsole = vi.fn() + reconcileFinalBlockLogs(updateConsole, 'wf-1', 'exec-1', []) + reconcileFinalBlockLogs(updateConsole, 'wf-1', undefined, [makeLog({})]) + expect(updateConsole).not.toHaveBeenCalled() + }) + }) + describe('handleExecutionErrorConsole', () => { - it('adds a synthetic Run Error entry when no block-level error covers it', () => { - const addConsole = vi.fn() + it('cancels running entries before adding the synthetic entry', () => { + const calls: string[] = [] + const addConsole = vi.fn(() => { + calls.push('add') + return undefined + }) + const cancelRunningEntries = vi.fn(() => { + calls.push('cancel') + }) handleExecutionErrorConsole( - { addConsole, updateConsole: vi.fn() }, + { addConsole, updateConsole: vi.fn(), cancelRunningEntries }, { workflowId: 'wf-1', executionId: 'exec-1', @@ -97,36 +254,57 @@ describe('workflow-execution-utils', () => { } ) - expect(addConsole).toHaveBeenCalledTimes(1) - expect(addConsole.mock.calls[0][0].blockName).toBe('Run Error') + expect(calls[0]).toBe('cancel') + expect(calls).toContain('add') }) - it('skips the synthetic entry when a block-level error already covers the failure', () => { - const addConsole = vi.fn() + it('reconciles finalBlockLogs before sweeping running entries (Fix C)', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'kb-1', + blockName: 'Knowledge 1', + blockType: 'knowledge', + executionId: 'exec-1', + executionOrder: 1, + isRunning: true, + }) + + const calls: string[] = [] + const addConsole = vi.fn(() => { + calls.push('add') + return undefined + }) + const cancelRunningEntries = vi.fn(() => { + calls.push('cancel') + }) + const updateConsole = vi.fn(() => { + calls.push('update') + }) handleExecutionErrorConsole( - { addConsole, updateConsole: vi.fn() }, + { addConsole, updateConsole, cancelRunningEntries }, { workflowId: 'wf-1', executionId: 'exec-1', error: 'boom', - blockLogs: [ + blockLogs: [], + finalBlockLogs: [ { - blockId: 'fn-1', - blockName: 'Function', - blockType: 'function', - success: false, - error: 'JSON parse failed', + blockId: 'kb-1', + blockName: 'Knowledge 1', + blockType: 'knowledge', startedAt: new Date().toISOString(), endedAt: new Date().toISOString(), durationMs: 10, + success: true, executionOrder: 1, } as any, ], } ) - expect(addConsole).not.toHaveBeenCalled() + expect(updateConsole).toHaveBeenCalledTimes(1) + expect(calls).toEqual(['update', 'cancel', 'add']) }) }) }) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index 41f4038fa96..34d1a0a90db 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -5,7 +5,13 @@ import type { BlockErrorData, BlockStartedData, } from '@/lib/workflows/executor/execution-events' -import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types' +import type { + BlockLog, + BlockState, + ExecutionResult, + NormalizedBlockOutput, + StreamingExecution, +} from '@/executor/types' import { stripCloneSuffixes } from '@/executor/utils/subflow-utils' import { processSSEStream } from '@/hooks/use-execution-stream' @@ -106,6 +112,8 @@ export interface BlockEventHandlerConfig { accumulatedBlockLogs: BlockLog[] accumulatedBlockStates: Map executedBlockIds: Set + consoleMode: 'update' | 'add' + includeStartConsoleEntry: boolean onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise } @@ -134,6 +142,8 @@ export function createBlockEventHandlers( accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, + consoleMode, + includeStartConsoleEntry, onBlockCompleteCallback, } = config @@ -174,7 +184,6 @@ export function createBlockEventHandlers( ...('childWorkflowInstanceId' in data && { childWorkflowInstanceId: data.childWorkflowInstanceId, }), - ...(data.blockExecutionId && { blockExecutionId: data.blockExecutionId }), }) const createBlockLogEntry = ( @@ -194,6 +203,61 @@ export function createBlockEventHandlers( endedAt: data.endedAt, }) + const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => { + if (!workflowId) return + addConsole({ + input: data.input || {}, + output, + success: true, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + workflowId, + blockId: data.blockId, + executionId: executionIdRef.current, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + ...extractIterationFields(data), + }) + } + + const addConsoleErrorEntry = (data: BlockErrorData) => { + if (!workflowId) return + + const existingRunningEntry = useTerminalConsoleStore + .getState() + .getWorkflowEntries(workflowId) + .some( + (entry) => + entry.blockId === data.blockId && + entry.executionId === executionIdRef.current && + entry.isRunning + ) + + if (existingRunningEntry) { + updateConsoleErrorEntry(data) + return + } + + addConsole({ + input: data.input || {}, + output: {}, + success: false, + error: data.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + workflowId, + blockId: data.blockId, + executionId: executionIdRef.current, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + ...extractIterationFields(data), + }) + } + const updateConsoleEntry = (data: BlockCompletedData) => { updateConsole( data.blockId, @@ -235,7 +299,7 @@ export function createBlockEventHandlers( if (isStaleExecution()) return updateActiveBlocks(data.blockId, true) - if (!workflowId) return + if (!includeStartConsoleEntry || !workflowId) return const startedAt = new Date().toISOString() addConsole({ @@ -280,13 +344,20 @@ export function createBlockEventHandlers( const output = data.output as Record | undefined const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0 if (!isEmptySubflow) { - updateConsoleEntry(data) + if (includeStartConsoleEntry) { + updateConsoleEntry(data) + } return } } accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output })) - updateConsoleEntry(data) + + if (consoleMode === 'update') { + updateConsoleEntry(data) + } else { + addConsoleEntry(data, data.output as NormalizedBlockOutput) + } if (onBlockCompleteCallback) { onBlockCompleteCallback(data.blockId, data.output).catch((error) => { @@ -320,7 +391,11 @@ export function createBlockEventHandlers( createBlockLogEntry(data, { success: false, output: {}, error: data.error }) ) - updateConsoleErrorEntry(data) + if (consoleMode === 'update') { + updateConsoleErrorEntry(data) + } else { + addConsoleErrorEntry(data) + } } const onBlockChildWorkflowStarted = (data: { @@ -349,6 +424,7 @@ export function createBlockEventHandlers( } type AddConsoleFn = (entry: Omit) => ConsoleEntry | undefined +type CancelRunningEntriesFn = (workflowId: string) => void type UpdateConsoleFn = ( blockId: string, update: string | ConsoleUpdate, @@ -357,10 +433,49 @@ type UpdateConsoleFn = ( /** * Bundle of console-store actions used by the execution-level handlers. + * Mirrors the deps-object pattern established by `createBlockEventHandlers`. */ export interface ExecutionConsoleDeps { addConsole: AddConsoleFn updateConsole: UpdateConsoleFn + cancelRunningEntries: CancelRunningEntriesFn +} + +/** + * Reconciles still-running console entries with the server's authoritative + * `finalBlockLogs` so that any block whose terminal `block:completed`/`block:error` + * SSE event was lost gets the correct success/error state instead of being + * swept to "canceled". + */ +export function reconcileFinalBlockLogs( + updateConsole: UpdateConsoleFn, + workflowId: string, + executionId: string | undefined, + finalBlockLogs: BlockLog[] | undefined +): void { + if (!finalBlockLogs?.length || !executionId) return + const entries = useTerminalConsoleStore.getState().getWorkflowEntries(workflowId) + for (const log of finalBlockLogs) { + const running = entries.find( + (e) => e.blockId === log.blockId && e.executionId === executionId && e.isRunning + ) + if (!running) continue + updateConsole( + log.blockId, + { + executionOrder: log.executionOrder, + replaceOutput: (log.output ?? {}) as Record, + ...(log.input ? { input: log.input } : {}), + success: log.success, + ...(log.error ? { error: log.error } : {}), + durationMs: log.durationMs, + startedAt: log.startedAt, + endedAt: log.endedAt, + isRunning: false, + }, + executionId + ) + } } export interface ExecutionTimingFields { @@ -388,6 +503,8 @@ export interface ExecutionErrorConsoleParams { durationMs?: number blockLogs: BlockLog[] isPreExecutionError?: boolean + /** Server's authoritative per-block terminal states, used to reconcile lost SSE events. */ + finalBlockLogs?: BlockLog[] } /** @@ -398,7 +515,19 @@ export function addExecutionErrorConsoleEntry( addConsole: AddConsoleFn, params: ExecutionErrorConsoleParams ): void { - const hasBlockError = params.blockLogs.some((log) => log.error) + const hasBlockErrorInLogs = params.blockLogs.some((log) => log.error) + const hasBlockErrorInConsole = useTerminalConsoleStore + .getState() + .getWorkflowEntries(params.workflowId) + .some( + (entry) => + entry.executionId === params.executionId && + entry.error != null && + entry.error !== '' && + entry.blockType !== 'error' && + entry.blockType !== 'validation' + ) + const hasBlockError = hasBlockErrorInLogs || hasBlockErrorInConsole const isPreExecutionError = params.isPreExecutionError ?? false if (!isPreExecutionError && hasBlockError) return @@ -428,12 +557,21 @@ export function addExecutionErrorConsoleEntry( } /** - * Adds an execution-level error console entry when no block-level error already covers it. + * Reconciles `finalBlockLogs` against still-running entries, sweeps any + * remaining running entries to canceled, and adds an execution-level error + * console entry when no block-level error already covers it. */ export function handleExecutionErrorConsole( deps: ExecutionConsoleDeps, params: ExecutionErrorConsoleParams ): void { + reconcileFinalBlockLogs( + deps.updateConsole, + params.workflowId, + params.executionId, + params.finalBlockLogs + ) + deps.cancelRunningEntries(params.workflowId) addExecutionErrorConsoleEntry(deps.addConsole, params) } @@ -474,6 +612,8 @@ export interface CancelledConsoleParams { workflowId: string executionId?: string durationMs?: number + /** Server's authoritative per-block terminal states, used to reconcile lost SSE events. */ + finalBlockLogs?: BlockLog[] } /** @@ -502,12 +642,21 @@ export function addCancelledConsoleEntry( } /** - * Adds the execution-level cancellation console entry. + * Reconciles `finalBlockLogs` against still-running entries, sweeps any + * remaining running entries to canceled, and adds the execution-level + * cancellation console entry. */ export function handleExecutionCancelledConsole( deps: ExecutionConsoleDeps, params: CancelledConsoleParams ): void { + reconcileFinalBlockLogs( + deps.updateConsole, + params.workflowId, + params.executionId, + params.finalBlockLogs + ) + deps.cancelRunningEntries(params.workflowId) addCancelledConsoleEntry(deps.addConsole, params) } @@ -544,7 +693,7 @@ export async function executeWorkflowWithFullLogging( } const executionId = options.executionId || generateId() - const { addConsole, updateConsole } = useTerminalConsoleStore.getState() + const { addConsole, updateConsole, cancelRunningEntries } = useTerminalConsoleStore.getState() const { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, setCurrentExecutionId } = useExecutionStore.getState() const wfId = targetWorkflowId @@ -565,6 +714,8 @@ export async function executeWorkflowWithFullLogging( accumulatedBlockLogs, accumulatedBlockStates: new Map(), executedBlockIds: new Set(), + consoleMode: 'update', + includeStartConsoleEntry: true, onBlockCompleteCallback: options.onBlockComplete, }, { addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus } @@ -674,11 +825,12 @@ export async function executeWorkflowWithFullLogging( } handleExecutionCancelledConsole( - { addConsole, updateConsole }, + { addConsole, updateConsole, cancelRunningEntries }, { workflowId: wfId, executionId: executionIdRef.current, durationMs: data?.duration, + finalBlockLogs: data?.finalBlockLogs, } ) }, @@ -695,7 +847,7 @@ export async function executeWorkflowWithFullLogging( } handleExecutionErrorConsole( - { addConsole, updateConsole }, + { addConsole, updateConsole, cancelRunningEntries }, { workflowId: wfId, executionId: executionIdRef.current, @@ -703,6 +855,7 @@ export async function executeWorkflowWithFullLogging( durationMs: data.duration || 0, blockLogs: accumulatedBlockLogs, isPreExecutionError: accumulatedBlockLogs.length === 0, + finalBlockLogs: data.finalBlockLogs, } ) }, diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index a5ffef197ab..340b2aab01a 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -1,6 +1,5 @@ import { createLogger, type Logger } from '@sim/logger' import { toError } from '@sim/utils/errors' -import { generateId } from '@sim/utils/id' import { redactApiKeys } from '@/lib/core/security/redaction' import { getBaseUrl } from '@/lib/core/utils/urls' import { @@ -53,7 +52,6 @@ const logger = createLogger('BlockExecutor') export class BlockExecutor { private execLogger: Logger - private pendingCallbacks: Set> = new Set() constructor( private blockHandlers: BlockHandler[], @@ -97,13 +95,7 @@ export class BlockExecutor { if (!isSentinel) { blockLog = this.createBlockLog(ctx, node.id, block, node, startedAt) ctx.blockLogs.push(blockLog) - this.fireBlockStartCallback( - ctx, - node, - block, - blockLog.executionOrder, - blockLog.blockExecutionId - ) + this.fireBlockStartCallback(ctx, node, block, blockLog.executionOrder) } let resolvedInputs: Record = {} @@ -215,8 +207,7 @@ export class BlockExecutor { blockLog.startedAt, blockLog.executionOrder, blockLog.endedAt, - childWorkflowInstanceId, - blockLog.blockExecutionId + childWorkflowInstanceId ) } @@ -325,8 +316,7 @@ export class BlockExecutor { blockLog.startedAt, blockLog.executionOrder, blockLog.endedAt, - childWorkflowInstanceId, - blockLog.blockExecutionId + childWorkflowInstanceId ) } @@ -400,7 +390,6 @@ export class BlockExecutor { return { blockId, - blockExecutionId: generateId(), blockName, blockType: block.metadata?.id ?? DEFAULTS.BLOCK_TYPE, startedAt, @@ -478,8 +467,7 @@ export class BlockExecutor { ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, - executionOrder: number, - blockExecutionId: string | undefined + executionOrder: number ): void { if (!this.contextExtensions.onBlockStart) return @@ -488,15 +476,14 @@ export class BlockExecutor { const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE const iterationContext = getIterationContext(ctx, node?.metadata) - const promise = this.contextExtensions + void this.contextExtensions .onBlockStart( blockId, blockName, blockType, executionOrder, iterationContext, - ctx.childWorkflowContext, - blockExecutionId + ctx.childWorkflowContext ) .catch((error) => { this.execLogger.warn('Block start callback failed', { @@ -505,7 +492,6 @@ export class BlockExecutor { error: toError(error).message, }) }) - this.trackCallback(promise) } /** @@ -523,8 +509,7 @@ export class BlockExecutor { startedAt: string, executionOrder: number, endedAt: string, - childWorkflowInstanceId?: string, - blockExecutionId?: string + childWorkflowInstanceId?: string ): void { if (!this.contextExtensions.onBlockComplete) return @@ -533,7 +518,7 @@ export class BlockExecutor { const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE const iterationContext = getIterationContext(ctx, node?.metadata) - const promise = this.contextExtensions + void this.contextExtensions .onBlockComplete( blockId, blockName, @@ -546,7 +531,6 @@ export class BlockExecutor { executionOrder, endedAt, childWorkflowInstanceId, - blockExecutionId, }, iterationContext, ctx.childWorkflowContext @@ -558,24 +542,6 @@ export class BlockExecutor { error: toError(error).message, }) }) - this.trackCallback(promise) - } - - private trackCallback(promise: Promise): void { - this.pendingCallbacks.add(promise) - promise.finally(() => { - this.pendingCallbacks.delete(promise) - }) - } - - /** - * Resolves once every in-flight `onBlockStart` / `onBlockComplete` callback has settled. - * Drained at terminal-event boundaries so block events land before `execution:*` events. - */ - async awaitPendingCallbacks(): Promise { - while (this.pendingCallbacks.size > 0) { - await Promise.allSettled([...this.pendingCallbacks]) - } } private preparePauseResumeSelfReference( diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index b05fce36a29..8e3a8c8c8c9 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -79,12 +79,8 @@ export class DAGExecutor { const { context, state } = this.createExecutionContext(workflowId, triggerBlockId) context.subflowParentMap = this.buildSubflowParentMap(dag) - const { engine, blockExecutor } = this.buildExecutionPipeline(context, dag, state) - try { - return await engine.run(triggerBlockId) - } finally { - await blockExecutor.awaitPendingCallbacks() - } + const engine = this.buildExecutionPipeline(context, dag, state) + return await engine.run(triggerBlockId) } async continueExecution( @@ -210,12 +206,8 @@ export class DAGExecutor { }) context.subflowParentMap = this.buildSubflowParentMap(dag) - const { engine, blockExecutor } = this.buildExecutionPipeline(context, dag, state) - try { - return await engine.run() - } finally { - await blockExecutor.awaitPendingCallbacks() - } + const engine = this.buildExecutionPipeline(context, dag, state) + return await engine.run() } private buildExecutionPipeline(context: ExecutionContext, dag: DAG, state: ExecutionState) { @@ -243,8 +235,7 @@ export class DAGExecutor { loopOrchestrator, parallelOrchestrator ) - const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) - return { engine, blockExecutor } + return new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) } private createExecutionContext( diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 8bbddf5f8a7..3c5130d8220 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -118,8 +118,7 @@ export interface ExecutionCallbacks { blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => Promise onBlockComplete?: ( blockId: string, @@ -127,8 +126,7 @@ export interface ExecutionCallbacks { blockType: string, output: any, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => Promise /** Fires immediately after instanceId is generated, before child execution begins. */ onChildWorkflowInstanceReady?: ( @@ -174,8 +172,7 @@ export interface ContextExtensions { blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => Promise onBlockComplete?: ( blockId: string, @@ -190,8 +187,6 @@ export interface ContextExtensions { endedAt: string /** Per-invocation unique ID linking this workflow block execution to its child block events. */ childWorkflowInstanceId?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations). */ - blockExecutionId?: string }, iterationContext?: IterationContext, childWorkflowContext?: ChildWorkflowContext diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 657b536e392..7d844257d1e 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -208,12 +208,6 @@ export interface NormalizedBlockOutput { export interface BlockLog { blockId: string - /** - * Unique per-invocation ID. Same `blockId` can appear multiple times across loop/parallel - * iterations and across runs; `blockExecutionId` is unique for each individual execution - * and survives across `block:started` → `block:completed | block:error`. - */ - blockExecutionId?: string blockName?: string blockType?: string startedAt: string diff --git a/apps/sim/lib/workflows/executor/execution-core.test.ts b/apps/sim/lib/workflows/executor/execution-core.test.ts index 6e756be204f..da8092d3366 100644 --- a/apps/sim/lib/workflows/executor/execution-core.test.ts +++ b/apps/sim/lib/workflows/executor/execution-core.test.ts @@ -239,6 +239,37 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(findStartBlockMock).toHaveBeenCalledWith(expect.anything(), 'external', false) }) + it('does not await user block start callback after persistence completes', async () => { + let releaseCallback: (() => void) | undefined + const callbackPromise = new Promise((resolve) => { + releaseCallback = resolve + }) + + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockStart: vi.fn(() => callbackPromise), + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + + await expect( + contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + ).resolves.toBeUndefined() + + releaseCallback?.() + }) + it('awaits terminal completion before updating run counts and returning', async () => { const callOrder: string[] = [] diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 14e11d9f114..65a45e781b6 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -435,7 +435,6 @@ export async function executeWorkflowCore( executionTime: number startedAt: string endedAt: string - blockExecutionId?: string }, iterationContext?: IterationContext, childWorkflowContext?: ChildWorkflowContext @@ -443,14 +442,13 @@ export async function executeWorkflowCore( try { await loggingSession.onBlockComplete(blockId, blockName, blockType, output) if (onBlockComplete) { - await onBlockComplete( + void onBlockComplete( blockId, blockName, blockType, output, iterationContext, - childWorkflowContext, - output.blockExecutionId + childWorkflowContext ).catch((error) => { logger.warn(`[${requestId}] Block completion callback failed`, { executionId, @@ -476,20 +474,18 @@ export async function executeWorkflowCore( blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { try { await loggingSession.onBlockStart(blockId, blockName, blockType, new Date().toISOString()) if (onBlockStart) { - await onBlockStart( + void onBlockStart( blockId, blockName, blockType, executionOrder, iterationContext, - childWorkflowContext, - blockExecutionId + childWorkflowContext ).catch((error) => { logger.warn(`[${requestId}] Block start callback failed`, { executionId, diff --git a/apps/sim/lib/workflows/executor/execution-events.ts b/apps/sim/lib/workflows/executor/execution-events.ts index c76908ff932..20cce3c5bac 100644 --- a/apps/sim/lib/workflows/executor/execution-events.ts +++ b/apps/sim/lib/workflows/executor/execution-events.ts @@ -3,6 +3,7 @@ import type { IterationContext, ParentIteration, } from '@/executor/execution/types' +import type { BlockLog } from '@/executor/types' import type { SubflowType } from '@/stores/workflows/workflow/types' export type ExecutionEventType = @@ -77,6 +78,8 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent { data: { error: string duration: number + /** Authoritative per-block terminal states from the server's blockLogs. */ + finalBlockLogs?: BlockLog[] } } @@ -85,6 +88,8 @@ export interface ExecutionCancelledEvent extends BaseExecutionEvent { workflowId: string data: { duration: number + /** Authoritative per-block terminal states from the server's blockLogs. */ + finalBlockLogs?: BlockLog[] } } @@ -106,8 +111,6 @@ export interface BlockStartedEvent extends BaseExecutionEvent { parentIterations?: ParentIteration[] childWorkflowBlockId?: string childWorkflowName?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations). */ - blockExecutionId?: string } } @@ -136,8 +139,6 @@ export interface BlockCompletedEvent extends BaseExecutionEvent { childWorkflowName?: string /** Per-invocation unique ID for correlating child block events with this workflow block. */ childWorkflowInstanceId?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations). */ - blockExecutionId?: string } } @@ -166,8 +167,6 @@ export interface BlockErrorEvent extends BaseExecutionEvent { childWorkflowName?: string /** Per-invocation unique ID for correlating child block events with this workflow block. */ childWorkflowInstanceId?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations). */ - blockExecutionId?: string } } @@ -284,8 +283,7 @@ export function createExecutionCallbacks(options: { blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { await sendBufferedEvent({ type: 'block:started', @@ -310,7 +308,6 @@ export function createExecutionCallbacks(options: { childWorkflowBlockId: childWorkflowContext.parentBlockId, childWorkflowName: childWorkflowContext.workflowName, }), - ...(blockExecutionId && { blockExecutionId }), }, }) } @@ -327,11 +324,9 @@ export function createExecutionCallbacks(options: { executionOrder: number endedAt: string childWorkflowInstanceId?: string - blockExecutionId?: string }, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { const hasError = callbackData.output?.error const iterationData = iterationContext @@ -356,11 +351,6 @@ export function createExecutionCallbacks(options: { ? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId } : {} - const blockExecData = - blockExecutionId || callbackData.blockExecutionId - ? { blockExecutionId: blockExecutionId ?? callbackData.blockExecutionId } - : {} - if (hasError) { await sendBufferedEvent({ type: 'block:error', @@ -380,7 +370,6 @@ export function createExecutionCallbacks(options: { ...iterationData, ...childWorkflowData, ...instanceData, - ...blockExecData, }, }) } else { @@ -402,7 +391,6 @@ export function createExecutionCallbacks(options: { ...iterationData, ...childWorkflowData, ...instanceData, - ...blockExecData, }, }) } diff --git a/apps/sim/stores/terminal/console/store.test.ts b/apps/sim/stores/terminal/console/store.test.ts index 9c4cf267d1b..da0606eaf48 100644 --- a/apps/sim/stores/terminal/console/store.test.ts +++ b/apps/sim/stores/terminal/console/store.test.ts @@ -1,7 +1,6 @@ /** * @vitest-environment node */ -import { createLogger } from '@sim/logger' import { beforeEach, describe, expect, it, vi } from 'vitest' vi.unmock('@/stores/terminal') @@ -9,25 +8,15 @@ vi.unmock('@/stores/terminal/console/store') import { useTerminalConsoleStore } from '@/stores/terminal/console/store' -const storeLoggerCallIdx = vi - .mocked(createLogger) - .mock.calls.findIndex((call) => call[0] === 'TerminalConsoleStore') -const storeLogger = - storeLoggerCallIdx >= 0 - ? vi.mocked(createLogger).mock.results[storeLoggerCallIdx]?.value - : undefined - describe('terminal console store', () => { beforeEach(() => { useTerminalConsoleStore.setState({ workflowEntries: {}, entryIdsByBlockExecution: {}, - entryIdByBlockExecutionId: {}, entryLocationById: {}, isOpen: false, _hasHydrated: true, }) - storeLogger?.warn.mockClear() }) it('normalizes oversized payloads when adding console entries', () => { @@ -128,148 +117,6 @@ describe('terminal console store', () => { expect(after.getWorkflowEntries('wf-1')[0].output).toMatchObject({ status: 'updated' }) }) - describe('blockExecutionId keying', () => { - it('updates an entry via the primary index without firing legacy warn', () => { - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-1', - executionOrder: 1, - isRunning: true, - }) - - useTerminalConsoleStore.getState().updateConsole( - 'block-1', - { - executionOrder: 1, - blockExecutionId: 'bex-1', - success: true, - replaceOutput: { status: 'done' }, - }, - 'exec-1' - ) - - const [entry] = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entry.success).toBe(true) - expect(entry.output).toMatchObject({ status: 'done' }) - expect(storeLogger?.warn).not.toHaveBeenCalled() - }) - - it('falls back to legacy keying and warns when blockExecutionId is unknown', () => { - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - executionOrder: 1, - isRunning: true, - }) - - useTerminalConsoleStore.getState().updateConsole( - 'block-1', - { - executionOrder: 1, - blockExecutionId: 'bex-unknown', - success: true, - replaceOutput: { status: 'done' }, - }, - 'exec-1' - ) - - const [entry] = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entry.success).toBe(true) - expect(storeLogger?.warn).toHaveBeenCalledWith( - 'updateConsole used legacy keying (hydrated or cross-deploy entry)', - expect.objectContaining({ blockExecutionId: 'bex-unknown', blockId: 'block-1' }) - ) - }) - - it('uses legacy keying without warning when no blockExecutionId is provided', () => { - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - executionOrder: 1, - isRunning: true, - }) - - useTerminalConsoleStore - .getState() - .updateConsole( - 'block-1', - { executionOrder: 1, success: true, replaceOutput: { status: 'done' } }, - 'exec-1' - ) - - const [entry] = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entry.success).toBe(true) - expect(storeLogger?.warn).not.toHaveBeenCalled() - }) - }) - - describe('addConsole idempotency', () => { - it('returns the existing entry when called twice with the same blockExecutionId', () => { - const first = useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-1', - executionOrder: 1, - isRunning: true, - }) - - const second = useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-1', - executionOrder: 1, - isRunning: true, - }) - - const entries = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entries).toHaveLength(1) - expect(second?.id).toBe(first?.id) - }) - - it('creates distinct entries for different blockExecutionIds (loop iterations)', () => { - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-iter-1', - executionOrder: 1, - isRunning: true, - }) - - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-iter-2', - executionOrder: 2, - isRunning: true, - }) - - const entries = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entries).toHaveLength(2) - }) - }) - describe('cancelRunningEntries', () => { it('flips a plain running entry to canceled', () => { useTerminalConsoleStore.getState().addConsole({ diff --git a/apps/sim/stores/terminal/console/store.ts b/apps/sim/stores/terminal/console/store.ts index d7c5f0e8962..d6d8414a16a 100644 --- a/apps/sim/stores/terminal/console/store.ts +++ b/apps/sim/stores/terminal/console/store.ts @@ -123,14 +123,10 @@ function removeWorkflowIndexes( workflowId: string, entries: ConsoleEntry[], entryIdsByBlockExecution: Record, - entryIdByBlockExecutionId: Record, entryLocationById: Record ): void { for (const entry of entries) { delete entryLocationById[entry.id] - if (entry.blockExecutionId && entryIdByBlockExecutionId[entry.blockExecutionId] === entry.id) { - delete entryIdByBlockExecutionId[entry.blockExecutionId] - } const blockExecutionKey = getBlockExecutionKey(entry.blockId, entry.executionId) const existingIds = entryIdsByBlockExecution[blockExecutionKey] if (!existingIds) { @@ -150,14 +146,10 @@ function indexWorkflowEntries( workflowId: string, entries: ConsoleEntry[], entryIdsByBlockExecution: Record, - entryIdByBlockExecutionId: Record, entryLocationById: Record ): void { entries.forEach((entry, index) => { entryLocationById[entry.id] = { workflowId, index } - if (entry.blockExecutionId) { - entryIdByBlockExecutionId[entry.blockExecutionId] = entry.id - } const blockExecutionKey = getBlockExecutionKey(entry.blockId, entry.executionId) const existingIds = entryIdsByBlockExecution[blockExecutionKey] if (existingIds) { @@ -170,58 +162,35 @@ function indexWorkflowEntries( function rebuildWorkflowStateMaps(workflowEntries: Record) { const entryIdsByBlockExecution: Record = {} - const entryIdByBlockExecutionId: Record = {} const entryLocationById: Record = {} Object.entries(workflowEntries).forEach(([workflowId, entries]) => { - indexWorkflowEntries( - workflowId, - entries, - entryIdsByBlockExecution, - entryIdByBlockExecutionId, - entryLocationById - ) + indexWorkflowEntries(workflowId, entries, entryIdsByBlockExecution, entryLocationById) }) - return { entryIdsByBlockExecution, entryIdByBlockExecutionId, entryLocationById } + return { entryIdsByBlockExecution, entryLocationById } } function replaceWorkflowEntries( state: ConsoleStore, workflowId: string, nextEntries: ConsoleEntry[] -): Pick< - ConsoleStore, - 'workflowEntries' | 'entryIdsByBlockExecution' | 'entryIdByBlockExecutionId' | 'entryLocationById' -> { +): Pick { const workflowEntries = cloneWorkflowEntries(state.workflowEntries) const entryIdsByBlockExecution = { ...state.entryIdsByBlockExecution } - const entryIdByBlockExecutionId = { ...state.entryIdByBlockExecutionId } const entryLocationById = { ...state.entryLocationById } const previousEntries = workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES - removeWorkflowIndexes( - workflowId, - previousEntries, - entryIdsByBlockExecution, - entryIdByBlockExecutionId, - entryLocationById - ) + removeWorkflowIndexes(workflowId, previousEntries, entryIdsByBlockExecution, entryLocationById) if (nextEntries.length === 0) { delete workflowEntries[workflowId] } else { workflowEntries[workflowId] = nextEntries - indexWorkflowEntries( - workflowId, - nextEntries, - entryIdsByBlockExecution, - entryIdByBlockExecutionId, - entryLocationById - ) + indexWorkflowEntries(workflowId, nextEntries, entryIdsByBlockExecution, entryLocationById) } - return { workflowEntries, entryIdsByBlockExecution, entryIdByBlockExecutionId, entryLocationById } + return { workflowEntries, entryIdsByBlockExecution, entryLocationById } } function appendWorkflowEntry( @@ -229,38 +198,24 @@ function appendWorkflowEntry( workflowId: string, newEntry: ConsoleEntry, trimmedEntries: ConsoleEntry[] -): Pick< - ConsoleStore, - 'workflowEntries' | 'entryIdsByBlockExecution' | 'entryIdByBlockExecutionId' | 'entryLocationById' -> { +): Pick { const workflowEntries = cloneWorkflowEntries(state.workflowEntries) const previousEntries = workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES workflowEntries[workflowId] = trimmedEntries const entryLocationById = { ...state.entryLocationById } const entryIdsByBlockExecution = { ...state.entryIdsByBlockExecution } - const entryIdByBlockExecutionId = { ...state.entryIdByBlockExecutionId } const survivingIds = new Set(trimmedEntries.map((e) => e.id)) const droppedEntries = previousEntries.filter((e) => !survivingIds.has(e.id)) if (droppedEntries.length > 0) { - removeWorkflowIndexes( - workflowId, - droppedEntries, - entryIdsByBlockExecution, - entryIdByBlockExecutionId, - entryLocationById - ) + removeWorkflowIndexes(workflowId, droppedEntries, entryIdsByBlockExecution, entryLocationById) } trimmedEntries.forEach((entry, index) => { entryLocationById[entry.id] = { workflowId, index } }) - if (newEntry.blockExecutionId) { - entryIdByBlockExecutionId[newEntry.blockExecutionId] = newEntry.id - } - const blockExecutionKey = getBlockExecutionKey(newEntry.blockId, newEntry.executionId) const existingIds = entryIdsByBlockExecution[blockExecutionKey] if (existingIds) { @@ -271,7 +226,7 @@ function appendWorkflowEntry( entryIdsByBlockExecution[blockExecutionKey] = [newEntry.id] } - return { workflowEntries, entryIdsByBlockExecution, entryIdByBlockExecutionId, entryLocationById } + return { workflowEntries, entryIdsByBlockExecution, entryLocationById } } interface NotifyBlockErrorParams { @@ -314,7 +269,6 @@ export const useTerminalConsoleStore = create()( devtools((set, get) => ({ workflowEntries: {}, entryIdsByBlockExecution: {}, - entryIdByBlockExecutionId: {}, entryLocationById: {}, isOpen: false, _hasHydrated: false, @@ -324,16 +278,6 @@ export const useTerminalConsoleStore = create()( return get().getWorkflowEntries(entry.workflowId)[0] as ConsoleEntry | undefined } - if (entry.blockExecutionId) { - const existingId = get().entryIdByBlockExecutionId[entry.blockExecutionId] - if (existingId) { - const location = get().entryLocationById[existingId] - if (location) { - return get().workflowEntries[location.workflowId]?.[location.index] - } - } - } - const redactedEntry = { ...entry } if ( !isStreamingOutput(entry.output) && @@ -497,23 +441,11 @@ export const useTerminalConsoleStore = create()( updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => { set((state) => { - const blockExecutionId = typeof update === 'object' ? update.blockExecutionId : undefined - const directId = blockExecutionId - ? state.entryIdByBlockExecutionId[blockExecutionId] - : undefined - const candidateIds = directId - ? [directId] - : (state.entryIdsByBlockExecution[getBlockExecutionKey(blockId, executionId)] ?? []) + const candidateIds = + state.entryIdsByBlockExecution[getBlockExecutionKey(blockId, executionId)] ?? [] if (candidateIds.length === 0) { return state } - if (blockExecutionId && !directId) { - logger.warn('updateConsole used legacy keying (hydrated or cross-deploy entry)', { - blockExecutionId, - blockId, - executionId, - }) - } const workflowId = state.entryLocationById[candidateIds[0]]?.workflowId if (!workflowId) { @@ -530,7 +462,7 @@ export const useTerminalConsoleStore = create()( const source = nextEntries ?? currentEntries const entry = source[location.index] if (!entry || entry.id !== candidateId) continue - if (!directId && !matchesEntryForUpdate(entry, blockId, executionId, update)) continue + if (!matchesEntryForUpdate(entry, blockId, executionId, update)) continue if (!nextEntries) { nextEntries = [...currentEntries] @@ -638,10 +570,6 @@ export const useTerminalConsoleStore = create()( updatedEntry.childWorkflowInstanceId = update.childWorkflowInstanceId } - if (update.blockExecutionId !== undefined) { - updatedEntry.blockExecutionId = update.blockExecutionId - } - nextEntries[location.index] = updatedEntry } diff --git a/apps/sim/stores/terminal/console/types.ts b/apps/sim/stores/terminal/console/types.ts index 2515c1987e5..8aa342d7a11 100644 --- a/apps/sim/stores/terminal/console/types.ts +++ b/apps/sim/stores/terminal/console/types.ts @@ -32,8 +32,6 @@ export interface ConsoleEntry { childWorkflowName?: string /** Per-invocation unique ID linking this workflow block to its child block events */ childWorkflowInstanceId?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations) */ - blockExecutionId?: string } export interface ConsoleUpdate { @@ -58,7 +56,6 @@ export interface ConsoleUpdate { childWorkflowBlockId?: string childWorkflowName?: string childWorkflowInstanceId?: string - blockExecutionId?: string } export interface ConsoleEntryLocation { @@ -69,7 +66,6 @@ export interface ConsoleEntryLocation { export interface ConsoleStore { workflowEntries: Record entryIdsByBlockExecution: Record - entryIdByBlockExecutionId: Record entryLocationById: Record isOpen: boolean addConsole: (entry: Omit) => ConsoleEntry | undefined From 25f46e3f637b558b98bddf0aacd069327c8ab5a4 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 1 May 2026 23:20:03 -0700 Subject: [PATCH 2/5] fix(terminal): re-read entries inside reconcileFinalBlockLogs loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For workflows with loop or parallel iterations, finalBlockLogs can contain multiple terminal logs sharing the same blockId. The prior code captured entries once before the loop, so entries.find always matched the same first running entry — later iterations stayed isRunning: true and got swept to "cancelled" instead of showing their actual terminal state. Re-read the snapshot per iteration so each updateConsole's isRunning: false flush narrows the next match. --- .../w/[workflowId]/utils/workflow-execution-utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index 34d1a0a90db..e5a339a277c 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -454,8 +454,8 @@ export function reconcileFinalBlockLogs( finalBlockLogs: BlockLog[] | undefined ): void { if (!finalBlockLogs?.length || !executionId) return - const entries = useTerminalConsoleStore.getState().getWorkflowEntries(workflowId) for (const log of finalBlockLogs) { + const entries = useTerminalConsoleStore.getState().getWorkflowEntries(workflowId) const running = entries.find( (e) => e.blockId === log.blockId && e.executionId === executionId && e.isRunning ) From f396975879569b755bc8e52e967426b061ffc5d2 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 1 May 2026 23:22:28 -0700 Subject: [PATCH 3/5] chore(terminal): remove dead consoleMode 'add' branch and helpers Every callsite passes consoleMode: 'update'. Deletes the field, the unreachable 'add' branches in onBlockCompleted/onBlockError, and the addConsoleEntry/addConsoleErrorEntry helpers that only the dead branch invoked. --- .../hooks/use-workflow-execution.ts | 3 - .../utils/workflow-execution-utils.ts | 78 +------------------ 2 files changed, 3 insertions(+), 78 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index de67aed44cb..b3ec9345453 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -1036,7 +1036,6 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, - consoleMode: 'update', includeStartConsoleEntry: true, onBlockCompleteCallback: onBlockComplete, }) @@ -1684,7 +1683,6 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, - consoleMode: 'update', includeStartConsoleEntry: true, }) @@ -1915,7 +1913,6 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, - consoleMode: 'update', includeStartConsoleEntry: true, }) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index e5a339a277c..e4cae66b163 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -5,13 +5,7 @@ import type { BlockErrorData, BlockStartedData, } from '@/lib/workflows/executor/execution-events' -import type { - BlockLog, - BlockState, - ExecutionResult, - NormalizedBlockOutput, - StreamingExecution, -} from '@/executor/types' +import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types' import { stripCloneSuffixes } from '@/executor/utils/subflow-utils' import { processSSEStream } from '@/hooks/use-execution-stream' @@ -112,7 +106,6 @@ export interface BlockEventHandlerConfig { accumulatedBlockLogs: BlockLog[] accumulatedBlockStates: Map executedBlockIds: Set - consoleMode: 'update' | 'add' includeStartConsoleEntry: boolean onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise } @@ -142,7 +135,6 @@ export function createBlockEventHandlers( accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, - consoleMode, includeStartConsoleEntry, onBlockCompleteCallback, } = config @@ -203,61 +195,6 @@ export function createBlockEventHandlers( endedAt: data.endedAt, }) - const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => { - if (!workflowId) return - addConsole({ - input: data.input || {}, - output, - success: true, - durationMs: data.durationMs, - startedAt: data.startedAt, - executionOrder: data.executionOrder, - endedAt: data.endedAt, - workflowId, - blockId: data.blockId, - executionId: executionIdRef.current, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - ...extractIterationFields(data), - }) - } - - const addConsoleErrorEntry = (data: BlockErrorData) => { - if (!workflowId) return - - const existingRunningEntry = useTerminalConsoleStore - .getState() - .getWorkflowEntries(workflowId) - .some( - (entry) => - entry.blockId === data.blockId && - entry.executionId === executionIdRef.current && - entry.isRunning - ) - - if (existingRunningEntry) { - updateConsoleErrorEntry(data) - return - } - - addConsole({ - input: data.input || {}, - output: {}, - success: false, - error: data.error, - durationMs: data.durationMs, - startedAt: data.startedAt, - executionOrder: data.executionOrder, - endedAt: data.endedAt, - workflowId, - blockId: data.blockId, - executionId: executionIdRef.current, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - ...extractIterationFields(data), - }) - } - const updateConsoleEntry = (data: BlockCompletedData) => { updateConsole( data.blockId, @@ -353,11 +290,7 @@ export function createBlockEventHandlers( accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output })) - if (consoleMode === 'update') { - updateConsoleEntry(data) - } else { - addConsoleEntry(data, data.output as NormalizedBlockOutput) - } + updateConsoleEntry(data) if (onBlockCompleteCallback) { onBlockCompleteCallback(data.blockId, data.output).catch((error) => { @@ -391,11 +324,7 @@ export function createBlockEventHandlers( createBlockLogEntry(data, { success: false, output: {}, error: data.error }) ) - if (consoleMode === 'update') { - updateConsoleErrorEntry(data) - } else { - addConsoleErrorEntry(data) - } + updateConsoleErrorEntry(data) } const onBlockChildWorkflowStarted = (data: { @@ -714,7 +643,6 @@ export async function executeWorkflowWithFullLogging( accumulatedBlockLogs, accumulatedBlockStates: new Map(), executedBlockIds: new Set(), - consoleMode: 'update', includeStartConsoleEntry: true, onBlockCompleteCallback: options.onBlockComplete, }, From 910a51a4d3eef3aa66b980ddb09ec6c80260c568 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 1 May 2026 23:40:36 -0700 Subject: [PATCH 4/5] fix(executor): reconcile finalBlockLogs on execution:completed block:completed callbacks fire-and-forget; the last block's event can arrive after execution:completed, leaving its console entry stuck isRunning. Mirror the error/cancelled paths: emit finalBlockLogs on execution:completed and reconcile + sweep on the client. --- .../app/api/workflows/[id]/execute/route.ts | 1 + .../hooks/use-workflow-execution.ts | 25 ++++++++++++++++++- .../utils/workflow-execution-utils.ts | 2 ++ .../workflows/executor/execution-events.ts | 2 ++ .../executor/human-in-the-loop-manager.ts | 1 + .../executor/queued-workflow-execution.ts | 1 + 6 files changed, 31 insertions(+), 1 deletion(-) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 90e4d15b848..24e9038dd55 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -1220,6 +1220,7 @@ async function handleExecutePost( duration: result.metadata?.duration || 0, startTime: result.metadata?.startTime || startTime.toISOString(), endTime: result.metadata?.endTime || new Date().toISOString(), + finalBlockLogs: result.logs, }, }) } diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index b3ec9345453..f51ed9d8b13 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -25,6 +25,7 @@ import { addHttpErrorConsoleEntry, type BlockEventHandlerConfig, createBlockEventHandlers, + reconcileFinalBlockLogs, addExecutionErrorConsoleEntry as sharedAddExecutionErrorConsoleEntry, handleExecutionCancelledConsole as sharedHandleExecutionCancelledConsole, handleExecutionErrorConsole as sharedHandleExecutionErrorConsole, @@ -1130,6 +1131,13 @@ export function useWorkflowExecution() { if (activeWorkflowId) { setCurrentExecutionId(activeWorkflowId, null) + reconcileFinalBlockLogs( + updateConsole, + activeWorkflowId, + executionIdRef.current, + data.finalBlockLogs + ) + cancelRunningEntries(activeWorkflowId) } executionResult = { @@ -1702,6 +1710,14 @@ export function useWorkflowExecution() { onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted, onExecutionCompleted: (data) => { + reconcileFinalBlockLogs( + updateConsole, + workflowId, + executionIdRef.current, + data.finalBlockLogs + ) + cancelRunningEntries(workflowId) + if (data.success) { executedBlockIds.add(blockId) @@ -1980,7 +1996,7 @@ export function useWorkflowExecution() { onBlockCompleted: wrapHandler(handlers.onBlockCompleted), onBlockError: wrapHandler(handlers.onBlockError), onBlockChildWorkflowStarted: wrapHandler(handlers.onBlockChildWorkflowStarted), - onExecutionCompleted: () => { + onExecutionCompleted: (data) => { reconnectionComplete = true activeReconnections.delete(reconnectWorkflowId) if (!activated) { @@ -1994,6 +2010,13 @@ export function useWorkflowExecution() { setCurrentExecutionId(reconnectWorkflowId, null) setIsExecuting(reconnectWorkflowId, false) setActiveBlocks(reconnectWorkflowId, new Set()) + reconcileFinalBlockLogs( + updateConsole, + reconnectWorkflowId, + capturedExecutionId, + data?.finalBlockLogs + ) + cancelRunningEntries(reconnectWorkflowId) }, onExecutionError: (data) => { reconnectionComplete = true diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index e4cae66b163..4872ab7c156 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -731,6 +731,8 @@ export async function executeWorkflowWithFullLogging( onExecutionCompleted: (data) => { setCurrentExecutionId(wfId, null) + reconcileFinalBlockLogs(updateConsole, wfId, executionIdRef.current, data.finalBlockLogs) + cancelRunningEntries(wfId) executionResult = { success: data.success, output: data.output, diff --git a/apps/sim/lib/workflows/executor/execution-events.ts b/apps/sim/lib/workflows/executor/execution-events.ts index 20cce3c5bac..b8089ea2146 100644 --- a/apps/sim/lib/workflows/executor/execution-events.ts +++ b/apps/sim/lib/workflows/executor/execution-events.ts @@ -52,6 +52,8 @@ export interface ExecutionCompletedEvent extends BaseExecutionEvent { duration: number startTime: string endTime: string + /** Authoritative per-block terminal states from the server's blockLogs. */ + finalBlockLogs?: BlockLog[] } } diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 98626dd16eb..48e5c9ae708 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -1077,6 +1077,7 @@ export class PauseResumeManager { duration: result.metadata?.duration || 0, startTime: result.metadata?.startTime || new Date().toISOString(), endTime: result.metadata?.endTime || new Date().toISOString(), + finalBlockLogs: result.logs, }, } as ExecutionEvent) finalMetaStatus = 'complete' diff --git a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts index 4d9e3dfaa36..43102ffcb4b 100644 --- a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts +++ b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts @@ -243,6 +243,7 @@ export async function executeQueuedWorkflowJob( duration: result.metadata?.duration || 0, startTime: result.metadata?.startTime || metadata.startTime, endTime: result.metadata?.endTime || new Date().toISOString(), + finalBlockLogs: result.logs, }, }) await setExecutionMeta(executionId, { status: 'complete' }) From 7819cfc0f584e58a1101ed6719ececd62aab0358 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Fri, 1 May 2026 23:58:50 -0700 Subject: [PATCH 5/5] fix(executor): emit finalBlockLogs on queued/HITL error and cancel paths queued-workflow-execution and human-in-the-loop-manager only attached finalBlockLogs to execution:completed. Without it on cancel/error, reconcileFinalBlockLogs is a no-op and the client sweep flips already-completed blocks to canceled. Mirror the route.ts pattern. --- .../lib/workflows/executor/human-in-the-loop-manager.ts | 9 ++++++++- .../lib/workflows/executor/queued-workflow-execution.ts | 7 +++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 48e5c9ae708..010ac6532ff 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -23,6 +23,7 @@ import type { SerializedSnapshot, StreamingExecution, } from '@/executor/types' +import { hasExecutionResult } from '@/executor/utils/errors' import { filterOutputForLog } from '@/executor/utils/output-filter' import type { SerializedConnection } from '@/serializer/types' @@ -1039,6 +1040,7 @@ export class PauseResumeManager { data: { error: timeoutErrorMessage, duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, } as ExecutionEvent) finalMetaStatus = 'error' @@ -1048,7 +1050,10 @@ export class PauseResumeManager { timestamp: new Date().toISOString(), executionId: resumeExecutionId, workflowId, - data: { duration: result.metadata?.duration || 0 }, + data: { + duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, + }, } as ExecutionEvent) finalMetaStatus = 'cancelled' } else if (result.status === 'paused') { @@ -1083,6 +1088,7 @@ export class PauseResumeManager { finalMetaStatus = 'complete' } } catch (execError) { + const execErrorResult = hasExecutionResult(execError) ? execError.executionResult : undefined writeBufferedEvent({ type: 'execution:error', timestamp: new Date().toISOString(), @@ -1091,6 +1097,7 @@ export class PauseResumeManager { data: { error: toError(execError).message, duration: 0, + finalBlockLogs: execErrorResult?.logs, }, } as ExecutionEvent) finalMetaStatus = 'error' diff --git a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts index 43102ffcb4b..2dc0fc85318 100644 --- a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts +++ b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts @@ -169,6 +169,7 @@ export async function executeQueuedWorkflowJob( data: { error: timeoutErrorMessage, duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, }) @@ -214,6 +215,7 @@ export async function executeQueuedWorkflowJob( workflowId, data: { duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, }) await setExecutionMeta(executionId, { status: 'cancelled' }) @@ -290,6 +292,8 @@ export async function executeQueuedWorkflowJob( }) } + const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + if (eventWriter) { await eventWriter.write({ type: 'execution:error', @@ -299,13 +303,12 @@ export async function executeQueuedWorkflowJob( data: { error: toError(error).message, duration: 0, + finalBlockLogs: executionResult?.logs, }, }) await setExecutionMeta(executionId, { status: 'error' }) } - const executionResult = hasExecutionResult(error) ? error.executionResult : undefined - return buildResult( 'failed', {