diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 9b62d97be71..24e9038dd55 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -919,8 +919,7 @@ async function handleExecutePost( blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { reqLogger.info('onBlockStart called', { blockId, blockName, blockType }) sendEvent({ @@ -946,7 +945,6 @@ async function handleExecutePost( childWorkflowBlockId: childWorkflowContext.parentBlockId, childWorkflowName: childWorkflowContext.workflowName, }), - ...(blockExecutionId && { blockExecutionId }), }, }) } @@ -957,8 +955,7 @@ async function handleExecutePost( blockType: string, callbackData: any, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { const hasError = callbackData.output?.error const childWorkflowData = childWorkflowContext @@ -972,11 +969,6 @@ async function handleExecutePost( ? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId } : {} - const resolvedBlockExecutionId = blockExecutionId ?? callbackData.blockExecutionId - const blockExecData = resolvedBlockExecutionId - ? { blockExecutionId: resolvedBlockExecutionId } - : {} - if (hasError) { reqLogger.info('onBlockComplete (error) called', { blockId, @@ -1010,7 +1002,6 @@ async function handleExecutePost( }), ...childWorkflowData, ...instanceData, - ...blockExecData, }, }) } else { @@ -1045,7 +1036,6 @@ async function handleExecutePost( }), ...childWorkflowData, ...instanceData, - ...blockExecData, }, }) } @@ -1175,6 +1165,7 @@ async function handleExecutePost( data: { error: timeoutErrorMessage, duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, }) finalMetaStatus = 'error' @@ -1188,6 +1179,7 @@ async function handleExecutePost( workflowId, data: { duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, }) finalMetaStatus = 'cancelled' @@ -1228,6 +1220,7 @@ async function handleExecutePost( duration: result.metadata?.duration || 0, startTime: result.metadata?.startTime || startTime.toISOString(), endTime: result.metadata?.endTime || new Date().toISOString(), + finalBlockLogs: result.logs, }, }) } @@ -1252,6 +1245,7 @@ async function handleExecutePost( data: { error: executionResult?.error || errorMessage, duration: executionResult?.metadata?.duration || 0, + finalBlockLogs: executionResult?.logs, }, }) finalMetaStatus = 'error' diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 7dcb0a7529c..f51ed9d8b13 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -25,6 +25,7 @@ import { addHttpErrorConsoleEntry, type BlockEventHandlerConfig, createBlockEventHandlers, + reconcileFinalBlockLogs, addExecutionErrorConsoleEntry as sharedAddExecutionErrorConsoleEntry, handleExecutionCancelledConsole as sharedHandleExecutionCancelledConsole, handleExecutionErrorConsole as sharedHandleExecutionErrorConsole, @@ -230,25 +231,31 @@ export function useWorkflowExecution() { durationMs?: number blockLogs: BlockLog[] isPreExecutionError?: boolean + finalBlockLogs?: BlockLog[] }) => { if (!params.workflowId) return sharedHandleExecutionErrorConsole( - { addConsole, updateConsole }, + { addConsole, updateConsole, cancelRunningEntries }, { ...params, workflowId: params.workflowId } ) }, - [addConsole, updateConsole] + [addConsole, cancelRunningEntries, updateConsole] ) const handleExecutionCancelledConsole = useCallback( - (params: { workflowId?: string; executionId?: string; durationMs?: number }) => { + (params: { + workflowId?: string + executionId?: string + durationMs?: number + finalBlockLogs?: BlockLog[] + }) => { if (!params.workflowId) return sharedHandleExecutionCancelledConsole( - { addConsole, updateConsole }, + { addConsole, updateConsole, cancelRunningEntries }, { ...params, workflowId: params.workflowId } ) }, - [addConsole, updateConsole] + [addConsole, cancelRunningEntries, updateConsole] ) const buildBlockEventHandlers = useCallback( @@ -1030,6 +1037,7 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, + includeStartConsoleEntry: true, onBlockCompleteCallback: onBlockComplete, }) @@ -1123,6 +1131,13 @@ export function useWorkflowExecution() { if (activeWorkflowId) { setCurrentExecutionId(activeWorkflowId, null) + reconcileFinalBlockLogs( + updateConsole, + activeWorkflowId, + executionIdRef.current, + data.finalBlockLogs + ) + cancelRunningEntries(activeWorkflowId) } executionResult = { @@ -1232,6 +1247,7 @@ export function useWorkflowExecution() { durationMs: data.duration, blockLogs: accumulatedBlockLogs, isPreExecutionError, + finalBlockLogs: data.finalBlockLogs, }) if (activeWorkflowId && !isExecutingFromChat) { @@ -1258,6 +1274,7 @@ export function useWorkflowExecution() { workflowId: activeWorkflowId, executionId: executionIdRef.current, durationMs: data?.duration, + finalBlockLogs: data?.finalBlockLogs, }) if (activeWorkflowId && !isExecutingFromChat) { @@ -1674,6 +1691,7 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, + includeStartConsoleEntry: true, }) await executionStream.executeFromBlock({ @@ -1692,6 +1710,14 @@ export function useWorkflowExecution() { onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted, onExecutionCompleted: (data) => { + reconcileFinalBlockLogs( + updateConsole, + workflowId, + executionIdRef.current, + data.finalBlockLogs + ) + cancelRunningEntries(workflowId) + if (data.success) { executedBlockIds.add(blockId) @@ -1743,6 +1769,7 @@ export function useWorkflowExecution() { error: data.error, durationMs: data.duration, blockLogs: accumulatedBlockLogs, + finalBlockLogs: data.finalBlockLogs, }) setCurrentExecutionId(workflowId, null) @@ -1755,6 +1782,7 @@ export function useWorkflowExecution() { workflowId, executionId: executionIdRef.current, durationMs: data?.duration, + finalBlockLogs: data?.finalBlockLogs, }) setCurrentExecutionId(workflowId, null) @@ -1901,6 +1929,7 @@ export function useWorkflowExecution() { accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, + includeStartConsoleEntry: true, }) const capturedExecutionId = executionId @@ -1967,7 +1996,7 @@ export function useWorkflowExecution() { onBlockCompleted: wrapHandler(handlers.onBlockCompleted), onBlockError: wrapHandler(handlers.onBlockError), onBlockChildWorkflowStarted: wrapHandler(handlers.onBlockChildWorkflowStarted), - onExecutionCompleted: () => { + onExecutionCompleted: (data) => { reconnectionComplete = true activeReconnections.delete(reconnectWorkflowId) if (!activated) { @@ -1981,6 +2010,13 @@ export function useWorkflowExecution() { setCurrentExecutionId(reconnectWorkflowId, null) setIsExecuting(reconnectWorkflowId, false) setActiveBlocks(reconnectWorkflowId, new Set()) + reconcileFinalBlockLogs( + updateConsole, + reconnectWorkflowId, + capturedExecutionId, + data?.finalBlockLogs + ) + cancelRunningEntries(reconnectWorkflowId) }, onExecutionError: (data) => { reconnectionComplete = true @@ -2001,6 +2037,7 @@ export function useWorkflowExecution() { executionId: capturedExecutionId, error: data.error, blockLogs: accumulatedBlockLogs, + finalBlockLogs: data.finalBlockLogs, }) }, onExecutionCancelled: (data) => { @@ -2021,6 +2058,7 @@ export function useWorkflowExecution() { workflowId: reconnectWorkflowId, executionId: capturedExecutionId, durationMs: data?.duration, + finalBlockLogs: data?.finalBlockLogs, }) }, }, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.test.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.test.ts index a05b215c2a0..13840aad4cd 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.test.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.test.ts @@ -1,12 +1,14 @@ /** * @vitest-environment node */ -import { resetTerminalConsoleMock } from '@sim/testing' +import { resetTerminalConsoleMock, terminalConsoleMockFns } from '@sim/testing' import { beforeEach, describe, expect, it, vi } from 'vitest' import { addExecutionErrorConsoleEntry, handleExecutionErrorConsole, + reconcileFinalBlockLogs, } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils' +import type { BlockLog } from '@/executor/types' describe('workflow-execution-utils', () => { beforeEach(() => { @@ -55,6 +57,52 @@ describe('workflow-execution-utils', () => { expect(addConsole).not.toHaveBeenCalled() }) + it('skips when console store already has a block-level error for this execution (Fix D)', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'fetchAshbyData', + blockName: 'fetchAshbyData', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 1, + success: false, + error: 'Failed to parse response as JSON', + }) + + const addConsole = vi.fn() + addExecutionErrorConsoleEntry(addConsole, { + workflowId: 'wf-1', + executionId: 'exec-1', + error: 'Run failed', + blockLogs: [], + }) + + expect(addConsole).not.toHaveBeenCalled() + }) + + it('still adds when only existing entries are themselves Run Error rows', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'execution-error', + blockName: 'Run Error', + blockType: 'error', + executionId: 'exec-1', + executionOrder: Number.MAX_SAFE_INTEGER, + success: false, + error: 'previous unrelated error', + }) + + const addConsole = vi.fn() + addExecutionErrorConsoleEntry(addConsole, { + workflowId: 'wf-1', + executionId: 'exec-1', + error: 'New run failed', + blockLogs: [], + }) + + expect(addConsole).toHaveBeenCalledTimes(1) + }) + it('uses Timeout Error label when error indicates a timeout', () => { const addConsole = vi.fn() addExecutionErrorConsoleEntry(addConsole, { @@ -83,12 +131,121 @@ describe('workflow-execution-utils', () => { }) }) + describe('reconcileFinalBlockLogs', () => { + const makeLog = (over: Partial): BlockLog => ({ + blockId: 'b1', + blockName: 'Function', + blockType: 'function', + startedAt: new Date().toISOString(), + endedAt: new Date().toISOString(), + durationMs: 50, + success: true, + executionOrder: 1, + ...over, + }) + + it('flips a still-running entry to the server-reported success state', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'kb-1', + blockName: 'Knowledge 1', + blockType: 'knowledge', + executionId: 'exec-1', + executionOrder: 2, + isRunning: true, + }) + + const updateConsole = vi.fn() + reconcileFinalBlockLogs(updateConsole, 'wf-1', 'exec-1', [ + makeLog({ + blockId: 'kb-1', + blockName: 'Knowledge 1', + blockType: 'knowledge', + executionOrder: 2, + success: true, + output: { items: [] }, + }), + ]) + + expect(updateConsole).toHaveBeenCalledTimes(1) + const [blockId, update, executionId] = updateConsole.mock.calls[0] + expect(blockId).toBe('kb-1') + expect(executionId).toBe('exec-1') + expect(update).toMatchObject({ + success: true, + isRunning: false, + replaceOutput: { items: [] }, + }) + }) + + it('flips a still-running entry to the server-reported error state (Bug 1 reconciliation)', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'fn-1', + blockName: 'Function', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 3, + isRunning: true, + }) + + const updateConsole = vi.fn() + reconcileFinalBlockLogs(updateConsole, 'wf-1', 'exec-1', [ + makeLog({ + blockId: 'fn-1', + executionOrder: 3, + success: false, + error: 'JSON parse failed', + }), + ]) + + expect(updateConsole).toHaveBeenCalledTimes(1) + expect(updateConsole.mock.calls[0][1]).toMatchObject({ + success: false, + error: 'JSON parse failed', + isRunning: false, + }) + }) + + it('skips entries that are not running', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'fn-1', + blockName: 'Function', + blockType: 'function', + executionId: 'exec-1', + executionOrder: 1, + isRunning: false, + success: true, + }) + + const updateConsole = vi.fn() + reconcileFinalBlockLogs(updateConsole, 'wf-1', 'exec-1', [makeLog({ blockId: 'fn-1' })]) + + expect(updateConsole).not.toHaveBeenCalled() + }) + + it('is a no-op when finalBlockLogs is empty or executionId is missing', () => { + const updateConsole = vi.fn() + reconcileFinalBlockLogs(updateConsole, 'wf-1', 'exec-1', []) + reconcileFinalBlockLogs(updateConsole, 'wf-1', undefined, [makeLog({})]) + expect(updateConsole).not.toHaveBeenCalled() + }) + }) + describe('handleExecutionErrorConsole', () => { - it('adds a synthetic Run Error entry when no block-level error covers it', () => { - const addConsole = vi.fn() + it('cancels running entries before adding the synthetic entry', () => { + const calls: string[] = [] + const addConsole = vi.fn(() => { + calls.push('add') + return undefined + }) + const cancelRunningEntries = vi.fn(() => { + calls.push('cancel') + }) handleExecutionErrorConsole( - { addConsole, updateConsole: vi.fn() }, + { addConsole, updateConsole: vi.fn(), cancelRunningEntries }, { workflowId: 'wf-1', executionId: 'exec-1', @@ -97,36 +254,57 @@ describe('workflow-execution-utils', () => { } ) - expect(addConsole).toHaveBeenCalledTimes(1) - expect(addConsole.mock.calls[0][0].blockName).toBe('Run Error') + expect(calls[0]).toBe('cancel') + expect(calls).toContain('add') }) - it('skips the synthetic entry when a block-level error already covers the failure', () => { - const addConsole = vi.fn() + it('reconciles finalBlockLogs before sweeping running entries (Fix C)', () => { + terminalConsoleMockFns.mockAddConsole({ + workflowId: 'wf-1', + blockId: 'kb-1', + blockName: 'Knowledge 1', + blockType: 'knowledge', + executionId: 'exec-1', + executionOrder: 1, + isRunning: true, + }) + + const calls: string[] = [] + const addConsole = vi.fn(() => { + calls.push('add') + return undefined + }) + const cancelRunningEntries = vi.fn(() => { + calls.push('cancel') + }) + const updateConsole = vi.fn(() => { + calls.push('update') + }) handleExecutionErrorConsole( - { addConsole, updateConsole: vi.fn() }, + { addConsole, updateConsole, cancelRunningEntries }, { workflowId: 'wf-1', executionId: 'exec-1', error: 'boom', - blockLogs: [ + blockLogs: [], + finalBlockLogs: [ { - blockId: 'fn-1', - blockName: 'Function', - blockType: 'function', - success: false, - error: 'JSON parse failed', + blockId: 'kb-1', + blockName: 'Knowledge 1', + blockType: 'knowledge', startedAt: new Date().toISOString(), endedAt: new Date().toISOString(), durationMs: 10, + success: true, executionOrder: 1, } as any, ], } ) - expect(addConsole).not.toHaveBeenCalled() + expect(updateConsole).toHaveBeenCalledTimes(1) + expect(calls).toEqual(['update', 'cancel', 'add']) }) }) }) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index 41f4038fa96..4872ab7c156 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -106,6 +106,7 @@ export interface BlockEventHandlerConfig { accumulatedBlockLogs: BlockLog[] accumulatedBlockStates: Map executedBlockIds: Set + includeStartConsoleEntry: boolean onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise } @@ -134,6 +135,7 @@ export function createBlockEventHandlers( accumulatedBlockLogs, accumulatedBlockStates, executedBlockIds, + includeStartConsoleEntry, onBlockCompleteCallback, } = config @@ -174,7 +176,6 @@ export function createBlockEventHandlers( ...('childWorkflowInstanceId' in data && { childWorkflowInstanceId: data.childWorkflowInstanceId, }), - ...(data.blockExecutionId && { blockExecutionId: data.blockExecutionId }), }) const createBlockLogEntry = ( @@ -235,7 +236,7 @@ export function createBlockEventHandlers( if (isStaleExecution()) return updateActiveBlocks(data.blockId, true) - if (!workflowId) return + if (!includeStartConsoleEntry || !workflowId) return const startedAt = new Date().toISOString() addConsole({ @@ -280,12 +281,15 @@ export function createBlockEventHandlers( const output = data.output as Record | undefined const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0 if (!isEmptySubflow) { - updateConsoleEntry(data) + if (includeStartConsoleEntry) { + updateConsoleEntry(data) + } return } } accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output })) + updateConsoleEntry(data) if (onBlockCompleteCallback) { @@ -349,6 +353,7 @@ export function createBlockEventHandlers( } type AddConsoleFn = (entry: Omit) => ConsoleEntry | undefined +type CancelRunningEntriesFn = (workflowId: string) => void type UpdateConsoleFn = ( blockId: string, update: string | ConsoleUpdate, @@ -357,10 +362,49 @@ type UpdateConsoleFn = ( /** * Bundle of console-store actions used by the execution-level handlers. + * Mirrors the deps-object pattern established by `createBlockEventHandlers`. */ export interface ExecutionConsoleDeps { addConsole: AddConsoleFn updateConsole: UpdateConsoleFn + cancelRunningEntries: CancelRunningEntriesFn +} + +/** + * Reconciles still-running console entries with the server's authoritative + * `finalBlockLogs` so that any block whose terminal `block:completed`/`block:error` + * SSE event was lost gets the correct success/error state instead of being + * swept to "canceled". + */ +export function reconcileFinalBlockLogs( + updateConsole: UpdateConsoleFn, + workflowId: string, + executionId: string | undefined, + finalBlockLogs: BlockLog[] | undefined +): void { + if (!finalBlockLogs?.length || !executionId) return + for (const log of finalBlockLogs) { + const entries = useTerminalConsoleStore.getState().getWorkflowEntries(workflowId) + const running = entries.find( + (e) => e.blockId === log.blockId && e.executionId === executionId && e.isRunning + ) + if (!running) continue + updateConsole( + log.blockId, + { + executionOrder: log.executionOrder, + replaceOutput: (log.output ?? {}) as Record, + ...(log.input ? { input: log.input } : {}), + success: log.success, + ...(log.error ? { error: log.error } : {}), + durationMs: log.durationMs, + startedAt: log.startedAt, + endedAt: log.endedAt, + isRunning: false, + }, + executionId + ) + } } export interface ExecutionTimingFields { @@ -388,6 +432,8 @@ export interface ExecutionErrorConsoleParams { durationMs?: number blockLogs: BlockLog[] isPreExecutionError?: boolean + /** Server's authoritative per-block terminal states, used to reconcile lost SSE events. */ + finalBlockLogs?: BlockLog[] } /** @@ -398,7 +444,19 @@ export function addExecutionErrorConsoleEntry( addConsole: AddConsoleFn, params: ExecutionErrorConsoleParams ): void { - const hasBlockError = params.blockLogs.some((log) => log.error) + const hasBlockErrorInLogs = params.blockLogs.some((log) => log.error) + const hasBlockErrorInConsole = useTerminalConsoleStore + .getState() + .getWorkflowEntries(params.workflowId) + .some( + (entry) => + entry.executionId === params.executionId && + entry.error != null && + entry.error !== '' && + entry.blockType !== 'error' && + entry.blockType !== 'validation' + ) + const hasBlockError = hasBlockErrorInLogs || hasBlockErrorInConsole const isPreExecutionError = params.isPreExecutionError ?? false if (!isPreExecutionError && hasBlockError) return @@ -428,12 +486,21 @@ export function addExecutionErrorConsoleEntry( } /** - * Adds an execution-level error console entry when no block-level error already covers it. + * Reconciles `finalBlockLogs` against still-running entries, sweeps any + * remaining running entries to canceled, and adds an execution-level error + * console entry when no block-level error already covers it. */ export function handleExecutionErrorConsole( deps: ExecutionConsoleDeps, params: ExecutionErrorConsoleParams ): void { + reconcileFinalBlockLogs( + deps.updateConsole, + params.workflowId, + params.executionId, + params.finalBlockLogs + ) + deps.cancelRunningEntries(params.workflowId) addExecutionErrorConsoleEntry(deps.addConsole, params) } @@ -474,6 +541,8 @@ export interface CancelledConsoleParams { workflowId: string executionId?: string durationMs?: number + /** Server's authoritative per-block terminal states, used to reconcile lost SSE events. */ + finalBlockLogs?: BlockLog[] } /** @@ -502,12 +571,21 @@ export function addCancelledConsoleEntry( } /** - * Adds the execution-level cancellation console entry. + * Reconciles `finalBlockLogs` against still-running entries, sweeps any + * remaining running entries to canceled, and adds the execution-level + * cancellation console entry. */ export function handleExecutionCancelledConsole( deps: ExecutionConsoleDeps, params: CancelledConsoleParams ): void { + reconcileFinalBlockLogs( + deps.updateConsole, + params.workflowId, + params.executionId, + params.finalBlockLogs + ) + deps.cancelRunningEntries(params.workflowId) addCancelledConsoleEntry(deps.addConsole, params) } @@ -544,7 +622,7 @@ export async function executeWorkflowWithFullLogging( } const executionId = options.executionId || generateId() - const { addConsole, updateConsole } = useTerminalConsoleStore.getState() + const { addConsole, updateConsole, cancelRunningEntries } = useTerminalConsoleStore.getState() const { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, setCurrentExecutionId } = useExecutionStore.getState() const wfId = targetWorkflowId @@ -565,6 +643,7 @@ export async function executeWorkflowWithFullLogging( accumulatedBlockLogs, accumulatedBlockStates: new Map(), executedBlockIds: new Set(), + includeStartConsoleEntry: true, onBlockCompleteCallback: options.onBlockComplete, }, { addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus } @@ -652,6 +731,8 @@ export async function executeWorkflowWithFullLogging( onExecutionCompleted: (data) => { setCurrentExecutionId(wfId, null) + reconcileFinalBlockLogs(updateConsole, wfId, executionIdRef.current, data.finalBlockLogs) + cancelRunningEntries(wfId) executionResult = { success: data.success, output: data.output, @@ -674,11 +755,12 @@ export async function executeWorkflowWithFullLogging( } handleExecutionCancelledConsole( - { addConsole, updateConsole }, + { addConsole, updateConsole, cancelRunningEntries }, { workflowId: wfId, executionId: executionIdRef.current, durationMs: data?.duration, + finalBlockLogs: data?.finalBlockLogs, } ) }, @@ -695,7 +777,7 @@ export async function executeWorkflowWithFullLogging( } handleExecutionErrorConsole( - { addConsole, updateConsole }, + { addConsole, updateConsole, cancelRunningEntries }, { workflowId: wfId, executionId: executionIdRef.current, @@ -703,6 +785,7 @@ export async function executeWorkflowWithFullLogging( durationMs: data.duration || 0, blockLogs: accumulatedBlockLogs, isPreExecutionError: accumulatedBlockLogs.length === 0, + finalBlockLogs: data.finalBlockLogs, } ) }, diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index a5ffef197ab..340b2aab01a 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -1,6 +1,5 @@ import { createLogger, type Logger } from '@sim/logger' import { toError } from '@sim/utils/errors' -import { generateId } from '@sim/utils/id' import { redactApiKeys } from '@/lib/core/security/redaction' import { getBaseUrl } from '@/lib/core/utils/urls' import { @@ -53,7 +52,6 @@ const logger = createLogger('BlockExecutor') export class BlockExecutor { private execLogger: Logger - private pendingCallbacks: Set> = new Set() constructor( private blockHandlers: BlockHandler[], @@ -97,13 +95,7 @@ export class BlockExecutor { if (!isSentinel) { blockLog = this.createBlockLog(ctx, node.id, block, node, startedAt) ctx.blockLogs.push(blockLog) - this.fireBlockStartCallback( - ctx, - node, - block, - blockLog.executionOrder, - blockLog.blockExecutionId - ) + this.fireBlockStartCallback(ctx, node, block, blockLog.executionOrder) } let resolvedInputs: Record = {} @@ -215,8 +207,7 @@ export class BlockExecutor { blockLog.startedAt, blockLog.executionOrder, blockLog.endedAt, - childWorkflowInstanceId, - blockLog.blockExecutionId + childWorkflowInstanceId ) } @@ -325,8 +316,7 @@ export class BlockExecutor { blockLog.startedAt, blockLog.executionOrder, blockLog.endedAt, - childWorkflowInstanceId, - blockLog.blockExecutionId + childWorkflowInstanceId ) } @@ -400,7 +390,6 @@ export class BlockExecutor { return { blockId, - blockExecutionId: generateId(), blockName, blockType: block.metadata?.id ?? DEFAULTS.BLOCK_TYPE, startedAt, @@ -478,8 +467,7 @@ export class BlockExecutor { ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, - executionOrder: number, - blockExecutionId: string | undefined + executionOrder: number ): void { if (!this.contextExtensions.onBlockStart) return @@ -488,15 +476,14 @@ export class BlockExecutor { const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE const iterationContext = getIterationContext(ctx, node?.metadata) - const promise = this.contextExtensions + void this.contextExtensions .onBlockStart( blockId, blockName, blockType, executionOrder, iterationContext, - ctx.childWorkflowContext, - blockExecutionId + ctx.childWorkflowContext ) .catch((error) => { this.execLogger.warn('Block start callback failed', { @@ -505,7 +492,6 @@ export class BlockExecutor { error: toError(error).message, }) }) - this.trackCallback(promise) } /** @@ -523,8 +509,7 @@ export class BlockExecutor { startedAt: string, executionOrder: number, endedAt: string, - childWorkflowInstanceId?: string, - blockExecutionId?: string + childWorkflowInstanceId?: string ): void { if (!this.contextExtensions.onBlockComplete) return @@ -533,7 +518,7 @@ export class BlockExecutor { const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE const iterationContext = getIterationContext(ctx, node?.metadata) - const promise = this.contextExtensions + void this.contextExtensions .onBlockComplete( blockId, blockName, @@ -546,7 +531,6 @@ export class BlockExecutor { executionOrder, endedAt, childWorkflowInstanceId, - blockExecutionId, }, iterationContext, ctx.childWorkflowContext @@ -558,24 +542,6 @@ export class BlockExecutor { error: toError(error).message, }) }) - this.trackCallback(promise) - } - - private trackCallback(promise: Promise): void { - this.pendingCallbacks.add(promise) - promise.finally(() => { - this.pendingCallbacks.delete(promise) - }) - } - - /** - * Resolves once every in-flight `onBlockStart` / `onBlockComplete` callback has settled. - * Drained at terminal-event boundaries so block events land before `execution:*` events. - */ - async awaitPendingCallbacks(): Promise { - while (this.pendingCallbacks.size > 0) { - await Promise.allSettled([...this.pendingCallbacks]) - } } private preparePauseResumeSelfReference( diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index b05fce36a29..8e3a8c8c8c9 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -79,12 +79,8 @@ export class DAGExecutor { const { context, state } = this.createExecutionContext(workflowId, triggerBlockId) context.subflowParentMap = this.buildSubflowParentMap(dag) - const { engine, blockExecutor } = this.buildExecutionPipeline(context, dag, state) - try { - return await engine.run(triggerBlockId) - } finally { - await blockExecutor.awaitPendingCallbacks() - } + const engine = this.buildExecutionPipeline(context, dag, state) + return await engine.run(triggerBlockId) } async continueExecution( @@ -210,12 +206,8 @@ export class DAGExecutor { }) context.subflowParentMap = this.buildSubflowParentMap(dag) - const { engine, blockExecutor } = this.buildExecutionPipeline(context, dag, state) - try { - return await engine.run() - } finally { - await blockExecutor.awaitPendingCallbacks() - } + const engine = this.buildExecutionPipeline(context, dag, state) + return await engine.run() } private buildExecutionPipeline(context: ExecutionContext, dag: DAG, state: ExecutionState) { @@ -243,8 +235,7 @@ export class DAGExecutor { loopOrchestrator, parallelOrchestrator ) - const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) - return { engine, blockExecutor } + return new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator) } private createExecutionContext( diff --git a/apps/sim/executor/execution/types.ts b/apps/sim/executor/execution/types.ts index 8bbddf5f8a7..3c5130d8220 100644 --- a/apps/sim/executor/execution/types.ts +++ b/apps/sim/executor/execution/types.ts @@ -118,8 +118,7 @@ export interface ExecutionCallbacks { blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => Promise onBlockComplete?: ( blockId: string, @@ -127,8 +126,7 @@ export interface ExecutionCallbacks { blockType: string, output: any, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => Promise /** Fires immediately after instanceId is generated, before child execution begins. */ onChildWorkflowInstanceReady?: ( @@ -174,8 +172,7 @@ export interface ContextExtensions { blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => Promise onBlockComplete?: ( blockId: string, @@ -190,8 +187,6 @@ export interface ContextExtensions { endedAt: string /** Per-invocation unique ID linking this workflow block execution to its child block events. */ childWorkflowInstanceId?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations). */ - blockExecutionId?: string }, iterationContext?: IterationContext, childWorkflowContext?: ChildWorkflowContext diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 657b536e392..7d844257d1e 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -208,12 +208,6 @@ export interface NormalizedBlockOutput { export interface BlockLog { blockId: string - /** - * Unique per-invocation ID. Same `blockId` can appear multiple times across loop/parallel - * iterations and across runs; `blockExecutionId` is unique for each individual execution - * and survives across `block:started` → `block:completed | block:error`. - */ - blockExecutionId?: string blockName?: string blockType?: string startedAt: string diff --git a/apps/sim/lib/workflows/executor/execution-core.test.ts b/apps/sim/lib/workflows/executor/execution-core.test.ts index 6e756be204f..da8092d3366 100644 --- a/apps/sim/lib/workflows/executor/execution-core.test.ts +++ b/apps/sim/lib/workflows/executor/execution-core.test.ts @@ -239,6 +239,37 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(findStartBlockMock).toHaveBeenCalledWith(expect.anything(), 'external', false) }) + it('does not await user block start callback after persistence completes', async () => { + let releaseCallback: (() => void) | undefined + const callbackPromise = new Promise((resolve) => { + releaseCallback = resolve + }) + + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockStart: vi.fn(() => callbackPromise), + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + + await expect( + contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + ).resolves.toBeUndefined() + + releaseCallback?.() + }) + it('awaits terminal completion before updating run counts and returning', async () => { const callOrder: string[] = [] diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 14e11d9f114..65a45e781b6 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -435,7 +435,6 @@ export async function executeWorkflowCore( executionTime: number startedAt: string endedAt: string - blockExecutionId?: string }, iterationContext?: IterationContext, childWorkflowContext?: ChildWorkflowContext @@ -443,14 +442,13 @@ export async function executeWorkflowCore( try { await loggingSession.onBlockComplete(blockId, blockName, blockType, output) if (onBlockComplete) { - await onBlockComplete( + void onBlockComplete( blockId, blockName, blockType, output, iterationContext, - childWorkflowContext, - output.blockExecutionId + childWorkflowContext ).catch((error) => { logger.warn(`[${requestId}] Block completion callback failed`, { executionId, @@ -476,20 +474,18 @@ export async function executeWorkflowCore( blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { try { await loggingSession.onBlockStart(blockId, blockName, blockType, new Date().toISOString()) if (onBlockStart) { - await onBlockStart( + void onBlockStart( blockId, blockName, blockType, executionOrder, iterationContext, - childWorkflowContext, - blockExecutionId + childWorkflowContext ).catch((error) => { logger.warn(`[${requestId}] Block start callback failed`, { executionId, diff --git a/apps/sim/lib/workflows/executor/execution-events.ts b/apps/sim/lib/workflows/executor/execution-events.ts index c76908ff932..b8089ea2146 100644 --- a/apps/sim/lib/workflows/executor/execution-events.ts +++ b/apps/sim/lib/workflows/executor/execution-events.ts @@ -3,6 +3,7 @@ import type { IterationContext, ParentIteration, } from '@/executor/execution/types' +import type { BlockLog } from '@/executor/types' import type { SubflowType } from '@/stores/workflows/workflow/types' export type ExecutionEventType = @@ -51,6 +52,8 @@ export interface ExecutionCompletedEvent extends BaseExecutionEvent { duration: number startTime: string endTime: string + /** Authoritative per-block terminal states from the server's blockLogs. */ + finalBlockLogs?: BlockLog[] } } @@ -77,6 +80,8 @@ export interface ExecutionErrorEvent extends BaseExecutionEvent { data: { error: string duration: number + /** Authoritative per-block terminal states from the server's blockLogs. */ + finalBlockLogs?: BlockLog[] } } @@ -85,6 +90,8 @@ export interface ExecutionCancelledEvent extends BaseExecutionEvent { workflowId: string data: { duration: number + /** Authoritative per-block terminal states from the server's blockLogs. */ + finalBlockLogs?: BlockLog[] } } @@ -106,8 +113,6 @@ export interface BlockStartedEvent extends BaseExecutionEvent { parentIterations?: ParentIteration[] childWorkflowBlockId?: string childWorkflowName?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations). */ - blockExecutionId?: string } } @@ -136,8 +141,6 @@ export interface BlockCompletedEvent extends BaseExecutionEvent { childWorkflowName?: string /** Per-invocation unique ID for correlating child block events with this workflow block. */ childWorkflowInstanceId?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations). */ - blockExecutionId?: string } } @@ -166,8 +169,6 @@ export interface BlockErrorEvent extends BaseExecutionEvent { childWorkflowName?: string /** Per-invocation unique ID for correlating child block events with this workflow block. */ childWorkflowInstanceId?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations). */ - blockExecutionId?: string } } @@ -284,8 +285,7 @@ export function createExecutionCallbacks(options: { blockType: string, executionOrder: number, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { await sendBufferedEvent({ type: 'block:started', @@ -310,7 +310,6 @@ export function createExecutionCallbacks(options: { childWorkflowBlockId: childWorkflowContext.parentBlockId, childWorkflowName: childWorkflowContext.workflowName, }), - ...(blockExecutionId && { blockExecutionId }), }, }) } @@ -327,11 +326,9 @@ export function createExecutionCallbacks(options: { executionOrder: number endedAt: string childWorkflowInstanceId?: string - blockExecutionId?: string }, iterationContext?: IterationContext, - childWorkflowContext?: ChildWorkflowContext, - blockExecutionId?: string + childWorkflowContext?: ChildWorkflowContext ) => { const hasError = callbackData.output?.error const iterationData = iterationContext @@ -356,11 +353,6 @@ export function createExecutionCallbacks(options: { ? { childWorkflowInstanceId: callbackData.childWorkflowInstanceId } : {} - const blockExecData = - blockExecutionId || callbackData.blockExecutionId - ? { blockExecutionId: blockExecutionId ?? callbackData.blockExecutionId } - : {} - if (hasError) { await sendBufferedEvent({ type: 'block:error', @@ -380,7 +372,6 @@ export function createExecutionCallbacks(options: { ...iterationData, ...childWorkflowData, ...instanceData, - ...blockExecData, }, }) } else { @@ -402,7 +393,6 @@ export function createExecutionCallbacks(options: { ...iterationData, ...childWorkflowData, ...instanceData, - ...blockExecData, }, }) } diff --git a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts index 98626dd16eb..010ac6532ff 100644 --- a/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts +++ b/apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts @@ -23,6 +23,7 @@ import type { SerializedSnapshot, StreamingExecution, } from '@/executor/types' +import { hasExecutionResult } from '@/executor/utils/errors' import { filterOutputForLog } from '@/executor/utils/output-filter' import type { SerializedConnection } from '@/serializer/types' @@ -1039,6 +1040,7 @@ export class PauseResumeManager { data: { error: timeoutErrorMessage, duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, } as ExecutionEvent) finalMetaStatus = 'error' @@ -1048,7 +1050,10 @@ export class PauseResumeManager { timestamp: new Date().toISOString(), executionId: resumeExecutionId, workflowId, - data: { duration: result.metadata?.duration || 0 }, + data: { + duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, + }, } as ExecutionEvent) finalMetaStatus = 'cancelled' } else if (result.status === 'paused') { @@ -1077,11 +1082,13 @@ export class PauseResumeManager { duration: result.metadata?.duration || 0, startTime: result.metadata?.startTime || new Date().toISOString(), endTime: result.metadata?.endTime || new Date().toISOString(), + finalBlockLogs: result.logs, }, } as ExecutionEvent) finalMetaStatus = 'complete' } } catch (execError) { + const execErrorResult = hasExecutionResult(execError) ? execError.executionResult : undefined writeBufferedEvent({ type: 'execution:error', timestamp: new Date().toISOString(), @@ -1090,6 +1097,7 @@ export class PauseResumeManager { data: { error: toError(execError).message, duration: 0, + finalBlockLogs: execErrorResult?.logs, }, } as ExecutionEvent) finalMetaStatus = 'error' diff --git a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts index 4d9e3dfaa36..2dc0fc85318 100644 --- a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts +++ b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts @@ -169,6 +169,7 @@ export async function executeQueuedWorkflowJob( data: { error: timeoutErrorMessage, duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, }) @@ -214,6 +215,7 @@ export async function executeQueuedWorkflowJob( workflowId, data: { duration: result.metadata?.duration || 0, + finalBlockLogs: result.logs, }, }) await setExecutionMeta(executionId, { status: 'cancelled' }) @@ -243,6 +245,7 @@ export async function executeQueuedWorkflowJob( duration: result.metadata?.duration || 0, startTime: result.metadata?.startTime || metadata.startTime, endTime: result.metadata?.endTime || new Date().toISOString(), + finalBlockLogs: result.logs, }, }) await setExecutionMeta(executionId, { status: 'complete' }) @@ -289,6 +292,8 @@ export async function executeQueuedWorkflowJob( }) } + const executionResult = hasExecutionResult(error) ? error.executionResult : undefined + if (eventWriter) { await eventWriter.write({ type: 'execution:error', @@ -298,13 +303,12 @@ export async function executeQueuedWorkflowJob( data: { error: toError(error).message, duration: 0, + finalBlockLogs: executionResult?.logs, }, }) await setExecutionMeta(executionId, { status: 'error' }) } - const executionResult = hasExecutionResult(error) ? error.executionResult : undefined - return buildResult( 'failed', { diff --git a/apps/sim/stores/terminal/console/store.test.ts b/apps/sim/stores/terminal/console/store.test.ts index 9c4cf267d1b..da0606eaf48 100644 --- a/apps/sim/stores/terminal/console/store.test.ts +++ b/apps/sim/stores/terminal/console/store.test.ts @@ -1,7 +1,6 @@ /** * @vitest-environment node */ -import { createLogger } from '@sim/logger' import { beforeEach, describe, expect, it, vi } from 'vitest' vi.unmock('@/stores/terminal') @@ -9,25 +8,15 @@ vi.unmock('@/stores/terminal/console/store') import { useTerminalConsoleStore } from '@/stores/terminal/console/store' -const storeLoggerCallIdx = vi - .mocked(createLogger) - .mock.calls.findIndex((call) => call[0] === 'TerminalConsoleStore') -const storeLogger = - storeLoggerCallIdx >= 0 - ? vi.mocked(createLogger).mock.results[storeLoggerCallIdx]?.value - : undefined - describe('terminal console store', () => { beforeEach(() => { useTerminalConsoleStore.setState({ workflowEntries: {}, entryIdsByBlockExecution: {}, - entryIdByBlockExecutionId: {}, entryLocationById: {}, isOpen: false, _hasHydrated: true, }) - storeLogger?.warn.mockClear() }) it('normalizes oversized payloads when adding console entries', () => { @@ -128,148 +117,6 @@ describe('terminal console store', () => { expect(after.getWorkflowEntries('wf-1')[0].output).toMatchObject({ status: 'updated' }) }) - describe('blockExecutionId keying', () => { - it('updates an entry via the primary index without firing legacy warn', () => { - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-1', - executionOrder: 1, - isRunning: true, - }) - - useTerminalConsoleStore.getState().updateConsole( - 'block-1', - { - executionOrder: 1, - blockExecutionId: 'bex-1', - success: true, - replaceOutput: { status: 'done' }, - }, - 'exec-1' - ) - - const [entry] = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entry.success).toBe(true) - expect(entry.output).toMatchObject({ status: 'done' }) - expect(storeLogger?.warn).not.toHaveBeenCalled() - }) - - it('falls back to legacy keying and warns when blockExecutionId is unknown', () => { - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - executionOrder: 1, - isRunning: true, - }) - - useTerminalConsoleStore.getState().updateConsole( - 'block-1', - { - executionOrder: 1, - blockExecutionId: 'bex-unknown', - success: true, - replaceOutput: { status: 'done' }, - }, - 'exec-1' - ) - - const [entry] = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entry.success).toBe(true) - expect(storeLogger?.warn).toHaveBeenCalledWith( - 'updateConsole used legacy keying (hydrated or cross-deploy entry)', - expect.objectContaining({ blockExecutionId: 'bex-unknown', blockId: 'block-1' }) - ) - }) - - it('uses legacy keying without warning when no blockExecutionId is provided', () => { - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - executionOrder: 1, - isRunning: true, - }) - - useTerminalConsoleStore - .getState() - .updateConsole( - 'block-1', - { executionOrder: 1, success: true, replaceOutput: { status: 'done' } }, - 'exec-1' - ) - - const [entry] = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entry.success).toBe(true) - expect(storeLogger?.warn).not.toHaveBeenCalled() - }) - }) - - describe('addConsole idempotency', () => { - it('returns the existing entry when called twice with the same blockExecutionId', () => { - const first = useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-1', - executionOrder: 1, - isRunning: true, - }) - - const second = useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-1', - executionOrder: 1, - isRunning: true, - }) - - const entries = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entries).toHaveLength(1) - expect(second?.id).toBe(first?.id) - }) - - it('creates distinct entries for different blockExecutionIds (loop iterations)', () => { - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-iter-1', - executionOrder: 1, - isRunning: true, - }) - - useTerminalConsoleStore.getState().addConsole({ - workflowId: 'wf-1', - blockId: 'block-1', - blockName: 'Function', - blockType: 'function', - executionId: 'exec-1', - blockExecutionId: 'bex-iter-2', - executionOrder: 2, - isRunning: true, - }) - - const entries = useTerminalConsoleStore.getState().getWorkflowEntries('wf-1') - expect(entries).toHaveLength(2) - }) - }) - describe('cancelRunningEntries', () => { it('flips a plain running entry to canceled', () => { useTerminalConsoleStore.getState().addConsole({ diff --git a/apps/sim/stores/terminal/console/store.ts b/apps/sim/stores/terminal/console/store.ts index d7c5f0e8962..d6d8414a16a 100644 --- a/apps/sim/stores/terminal/console/store.ts +++ b/apps/sim/stores/terminal/console/store.ts @@ -123,14 +123,10 @@ function removeWorkflowIndexes( workflowId: string, entries: ConsoleEntry[], entryIdsByBlockExecution: Record, - entryIdByBlockExecutionId: Record, entryLocationById: Record ): void { for (const entry of entries) { delete entryLocationById[entry.id] - if (entry.blockExecutionId && entryIdByBlockExecutionId[entry.blockExecutionId] === entry.id) { - delete entryIdByBlockExecutionId[entry.blockExecutionId] - } const blockExecutionKey = getBlockExecutionKey(entry.blockId, entry.executionId) const existingIds = entryIdsByBlockExecution[blockExecutionKey] if (!existingIds) { @@ -150,14 +146,10 @@ function indexWorkflowEntries( workflowId: string, entries: ConsoleEntry[], entryIdsByBlockExecution: Record, - entryIdByBlockExecutionId: Record, entryLocationById: Record ): void { entries.forEach((entry, index) => { entryLocationById[entry.id] = { workflowId, index } - if (entry.blockExecutionId) { - entryIdByBlockExecutionId[entry.blockExecutionId] = entry.id - } const blockExecutionKey = getBlockExecutionKey(entry.blockId, entry.executionId) const existingIds = entryIdsByBlockExecution[blockExecutionKey] if (existingIds) { @@ -170,58 +162,35 @@ function indexWorkflowEntries( function rebuildWorkflowStateMaps(workflowEntries: Record) { const entryIdsByBlockExecution: Record = {} - const entryIdByBlockExecutionId: Record = {} const entryLocationById: Record = {} Object.entries(workflowEntries).forEach(([workflowId, entries]) => { - indexWorkflowEntries( - workflowId, - entries, - entryIdsByBlockExecution, - entryIdByBlockExecutionId, - entryLocationById - ) + indexWorkflowEntries(workflowId, entries, entryIdsByBlockExecution, entryLocationById) }) - return { entryIdsByBlockExecution, entryIdByBlockExecutionId, entryLocationById } + return { entryIdsByBlockExecution, entryLocationById } } function replaceWorkflowEntries( state: ConsoleStore, workflowId: string, nextEntries: ConsoleEntry[] -): Pick< - ConsoleStore, - 'workflowEntries' | 'entryIdsByBlockExecution' | 'entryIdByBlockExecutionId' | 'entryLocationById' -> { +): Pick { const workflowEntries = cloneWorkflowEntries(state.workflowEntries) const entryIdsByBlockExecution = { ...state.entryIdsByBlockExecution } - const entryIdByBlockExecutionId = { ...state.entryIdByBlockExecutionId } const entryLocationById = { ...state.entryLocationById } const previousEntries = workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES - removeWorkflowIndexes( - workflowId, - previousEntries, - entryIdsByBlockExecution, - entryIdByBlockExecutionId, - entryLocationById - ) + removeWorkflowIndexes(workflowId, previousEntries, entryIdsByBlockExecution, entryLocationById) if (nextEntries.length === 0) { delete workflowEntries[workflowId] } else { workflowEntries[workflowId] = nextEntries - indexWorkflowEntries( - workflowId, - nextEntries, - entryIdsByBlockExecution, - entryIdByBlockExecutionId, - entryLocationById - ) + indexWorkflowEntries(workflowId, nextEntries, entryIdsByBlockExecution, entryLocationById) } - return { workflowEntries, entryIdsByBlockExecution, entryIdByBlockExecutionId, entryLocationById } + return { workflowEntries, entryIdsByBlockExecution, entryLocationById } } function appendWorkflowEntry( @@ -229,38 +198,24 @@ function appendWorkflowEntry( workflowId: string, newEntry: ConsoleEntry, trimmedEntries: ConsoleEntry[] -): Pick< - ConsoleStore, - 'workflowEntries' | 'entryIdsByBlockExecution' | 'entryIdByBlockExecutionId' | 'entryLocationById' -> { +): Pick { const workflowEntries = cloneWorkflowEntries(state.workflowEntries) const previousEntries = workflowEntries[workflowId] ?? EMPTY_CONSOLE_ENTRIES workflowEntries[workflowId] = trimmedEntries const entryLocationById = { ...state.entryLocationById } const entryIdsByBlockExecution = { ...state.entryIdsByBlockExecution } - const entryIdByBlockExecutionId = { ...state.entryIdByBlockExecutionId } const survivingIds = new Set(trimmedEntries.map((e) => e.id)) const droppedEntries = previousEntries.filter((e) => !survivingIds.has(e.id)) if (droppedEntries.length > 0) { - removeWorkflowIndexes( - workflowId, - droppedEntries, - entryIdsByBlockExecution, - entryIdByBlockExecutionId, - entryLocationById - ) + removeWorkflowIndexes(workflowId, droppedEntries, entryIdsByBlockExecution, entryLocationById) } trimmedEntries.forEach((entry, index) => { entryLocationById[entry.id] = { workflowId, index } }) - if (newEntry.blockExecutionId) { - entryIdByBlockExecutionId[newEntry.blockExecutionId] = newEntry.id - } - const blockExecutionKey = getBlockExecutionKey(newEntry.blockId, newEntry.executionId) const existingIds = entryIdsByBlockExecution[blockExecutionKey] if (existingIds) { @@ -271,7 +226,7 @@ function appendWorkflowEntry( entryIdsByBlockExecution[blockExecutionKey] = [newEntry.id] } - return { workflowEntries, entryIdsByBlockExecution, entryIdByBlockExecutionId, entryLocationById } + return { workflowEntries, entryIdsByBlockExecution, entryLocationById } } interface NotifyBlockErrorParams { @@ -314,7 +269,6 @@ export const useTerminalConsoleStore = create()( devtools((set, get) => ({ workflowEntries: {}, entryIdsByBlockExecution: {}, - entryIdByBlockExecutionId: {}, entryLocationById: {}, isOpen: false, _hasHydrated: false, @@ -324,16 +278,6 @@ export const useTerminalConsoleStore = create()( return get().getWorkflowEntries(entry.workflowId)[0] as ConsoleEntry | undefined } - if (entry.blockExecutionId) { - const existingId = get().entryIdByBlockExecutionId[entry.blockExecutionId] - if (existingId) { - const location = get().entryLocationById[existingId] - if (location) { - return get().workflowEntries[location.workflowId]?.[location.index] - } - } - } - const redactedEntry = { ...entry } if ( !isStreamingOutput(entry.output) && @@ -497,23 +441,11 @@ export const useTerminalConsoleStore = create()( updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => { set((state) => { - const blockExecutionId = typeof update === 'object' ? update.blockExecutionId : undefined - const directId = blockExecutionId - ? state.entryIdByBlockExecutionId[blockExecutionId] - : undefined - const candidateIds = directId - ? [directId] - : (state.entryIdsByBlockExecution[getBlockExecutionKey(blockId, executionId)] ?? []) + const candidateIds = + state.entryIdsByBlockExecution[getBlockExecutionKey(blockId, executionId)] ?? [] if (candidateIds.length === 0) { return state } - if (blockExecutionId && !directId) { - logger.warn('updateConsole used legacy keying (hydrated or cross-deploy entry)', { - blockExecutionId, - blockId, - executionId, - }) - } const workflowId = state.entryLocationById[candidateIds[0]]?.workflowId if (!workflowId) { @@ -530,7 +462,7 @@ export const useTerminalConsoleStore = create()( const source = nextEntries ?? currentEntries const entry = source[location.index] if (!entry || entry.id !== candidateId) continue - if (!directId && !matchesEntryForUpdate(entry, blockId, executionId, update)) continue + if (!matchesEntryForUpdate(entry, blockId, executionId, update)) continue if (!nextEntries) { nextEntries = [...currentEntries] @@ -638,10 +570,6 @@ export const useTerminalConsoleStore = create()( updatedEntry.childWorkflowInstanceId = update.childWorkflowInstanceId } - if (update.blockExecutionId !== undefined) { - updatedEntry.blockExecutionId = update.blockExecutionId - } - nextEntries[location.index] = updatedEntry } diff --git a/apps/sim/stores/terminal/console/types.ts b/apps/sim/stores/terminal/console/types.ts index 2515c1987e5..8aa342d7a11 100644 --- a/apps/sim/stores/terminal/console/types.ts +++ b/apps/sim/stores/terminal/console/types.ts @@ -32,8 +32,6 @@ export interface ConsoleEntry { childWorkflowName?: string /** Per-invocation unique ID linking this workflow block to its child block events */ childWorkflowInstanceId?: string - /** Per-invocation unique ID for this block execution (distinct across loop/parallel iterations) */ - blockExecutionId?: string } export interface ConsoleUpdate { @@ -58,7 +56,6 @@ export interface ConsoleUpdate { childWorkflowBlockId?: string childWorkflowName?: string childWorkflowInstanceId?: string - blockExecutionId?: string } export interface ConsoleEntryLocation { @@ -69,7 +66,6 @@ export interface ConsoleEntryLocation { export interface ConsoleStore { workflowEntries: Record entryIdsByBlockExecution: Record - entryIdByBlockExecutionId: Record entryLocationById: Record isOpen: boolean addConsole: (entry: Omit) => ConsoleEntry | undefined