diff --git a/apps/sim/executor/dag/construction/edges.ts b/apps/sim/executor/dag/construction/edges.ts index 4a36e5f918..ef6c238de6 100644 --- a/apps/sim/executor/dag/construction/edges.ts +++ b/apps/sim/executor/dag/construction/edges.ts @@ -207,6 +207,7 @@ export class EdgeConstructor { for (const connection of workflow.connections) { let { source, target } = connection const originalSource = source + const originalTarget = target let sourceHandle = this.generateSourceHandle( source, target, @@ -257,14 +258,14 @@ export class EdgeConstructor { target = sentinelStartId } - if (loopSentinelStartId) { - this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle) - } - if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) { continue } + if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) { + this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle) + } + if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) { continue } diff --git a/apps/sim/executor/execution/edge-manager.ts b/apps/sim/executor/execution/edge-manager.ts index f0ac33fa7f..3598bed7d3 100644 --- a/apps/sim/executor/execution/edge-manager.ts +++ b/apps/sim/executor/execution/edge-manager.ts @@ -77,15 +77,16 @@ export class EdgeManager { } } - // Check if any deactivation targets that previously received an activated edge are now ready - for (const { target } of edgesToDeactivate) { - if ( - !readyNodes.includes(target) && - !activatedTargets.includes(target) && - this.nodesWithActivatedEdge.has(target) && - this.isTargetReady(target) - ) { - readyNodes.push(target) + if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) { + for (const { target } of edgesToDeactivate) { + if ( + !readyNodes.includes(target) && + !activatedTargets.includes(target) && + this.nodesWithActivatedEdge.has(target) && + this.isTargetReady(target) + ) { + readyNodes.push(target) + } } } diff --git a/apps/sim/executor/execution/engine.ts b/apps/sim/executor/execution/engine.ts index 05e7e04843..86e7fd6256 100644 --- a/apps/sim/executor/execution/engine.ts +++ b/apps/sim/executor/execution/engine.ts @@ -390,6 +390,12 @@ export class ExecutionEngine { logger.info('Processing outgoing edges', { nodeId, outgoingEdgesCount: node.outgoingEdges.size, + outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({ + id, + target: e.target, + sourceHandle: e.sourceHandle, + })), + output, readyNodesCount: readyNodes.length, readyNodes, }) diff --git a/apps/sim/executor/execution/state.ts b/apps/sim/executor/execution/state.ts index 7cf849c9ef..bbbc7bc42c 100644 --- a/apps/sim/executor/execution/state.ts +++ b/apps/sim/executor/execution/state.ts @@ -27,6 +27,8 @@ export interface ParallelScope { items?: any[] /** Error message if parallel validation failed (e.g., exceeded max branches) */ validationError?: string + /** Whether the parallel has an empty distribution and should be skipped */ + isEmpty?: boolean } export class ExecutionState implements BlockStateController { diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index b9a5bd3351..f0757e642f 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -386,10 +386,10 @@ export class LoopOrchestrator { return true } - // forEach: skip if items array is empty if (scope.loopType === 'forEach') { if (!scope.items || scope.items.length === 0) { - logger.info('ForEach loop has empty items, skipping loop body', { loopId }) + logger.info('ForEach loop has empty collection, skipping loop body', { loopId }) + this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) return false } return true @@ -399,6 +399,8 @@ export class LoopOrchestrator { if (scope.loopType === 'for') { if (scope.maxIterations === 0) { logger.info('For loop has 0 iterations, skipping loop body', { loopId }) + // Set empty output for the loop + this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) return false } return true diff --git a/apps/sim/executor/orchestrators/node.ts b/apps/sim/executor/orchestrators/node.ts index e5d7bc1a11..7ec669bd33 100644 --- a/apps/sim/executor/orchestrators/node.ts +++ b/apps/sim/executor/orchestrators/node.ts @@ -97,7 +97,7 @@ export class NodeExecutionOrchestrator { if (loopId) { const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId) if (!shouldExecute) { - logger.info('While loop initial condition false, skipping loop body', { loopId }) + logger.info('Loop initial condition false, skipping loop body', { loopId }) return { sentinelStart: true, shouldExit: true, @@ -158,6 +158,17 @@ export class NodeExecutionOrchestrator { this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } } + + const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId) + if (scope?.isEmpty) { + logger.info('Parallel has empty distribution, skipping parallel body', { parallelId }) + return { + sentinelStart: true, + shouldExit: true, + selectedRoute: EDGE.PARALLEL_EXIT, + } + } + return { sentinelStart: true } } diff --git a/apps/sim/executor/orchestrators/parallel.ts b/apps/sim/executor/orchestrators/parallel.ts index ef17d624af..12ae70f726 100644 --- a/apps/sim/executor/orchestrators/parallel.ts +++ b/apps/sim/executor/orchestrators/parallel.ts @@ -61,11 +61,13 @@ export class ParallelOrchestrator { let items: any[] | undefined let branchCount: number + let isEmpty = false try { - const resolved = this.resolveBranchCount(ctx, parallelConfig) + const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId) branchCount = resolved.branchCount items = resolved.items + isEmpty = resolved.isEmpty ?? false } catch (error) { const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}` logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution }) @@ -91,6 +93,34 @@ export class ParallelOrchestrator { throw new Error(branchError) } + // Handle empty distribution - skip parallel body + if (isEmpty || branchCount === 0) { + const scope: ParallelScope = { + parallelId, + totalBranches: 0, + branchOutputs: new Map(), + completedCount: 0, + totalExpectedNodes: 0, + items: [], + isEmpty: true, + } + + if (!ctx.parallelExecutions) { + ctx.parallelExecutions = new Map() + } + ctx.parallelExecutions.set(parallelId, scope) + + // Set empty output for the parallel + this.state.setBlockOutput(parallelId, { results: [] }) + + logger.info('Parallel scope initialized with empty distribution, skipping body', { + parallelId, + branchCount: 0, + }) + + return scope + } + const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items) const scope: ParallelScope = { @@ -127,15 +157,17 @@ export class ParallelOrchestrator { private resolveBranchCount( ctx: ExecutionContext, - config: SerializedParallel - ): { branchCount: number; items?: any[] } { + config: SerializedParallel, + parallelId: string + ): { branchCount: number; items?: any[]; isEmpty?: boolean } { if (config.parallelType === 'count') { return { branchCount: config.count ?? 1 } } const items = this.resolveDistributionItems(ctx, config) if (items.length === 0) { - return { branchCount: config.count ?? 1 } + logger.info('Parallel has empty distribution, skipping parallel body', { parallelId }) + return { branchCount: 0, items: [], isEmpty: true } } return { branchCount: items.length, items }