Skip to content

Commit 4996eea

Browse files
committed
Fix
1 parent d963142 commit 4996eea

File tree

3 files changed

+96
-16
lines changed

3 files changed

+96
-16
lines changed

apps/sim/executor/execution/executor.ts

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
1616
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
1717
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
1818
import {
19-
computeDirtySet,
19+
computeExecutionSets,
2020
type RunFromBlockContext,
2121
resolveContainerToSentinelStart,
2222
validateRunFromBlock,
@@ -121,14 +121,31 @@ export class DAGExecutor {
121121
throw new Error(validation.error)
122122
}
123123

124-
const dirtySet = computeDirtySet(dag, startBlockId)
124+
const { dirtySet, upstreamSet } = computeExecutionSets(dag, startBlockId)
125125
const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId
126126

127+
// Filter snapshot to only include upstream blocks - prevents references to non-upstream blocks
128+
const filteredBlockStates: Record<string, any> = {}
129+
for (const [blockId, state] of Object.entries(sourceSnapshot.blockStates)) {
130+
if (upstreamSet.has(blockId)) {
131+
filteredBlockStates[blockId] = state
132+
}
133+
}
134+
const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter((id) => upstreamSet.has(id))
135+
136+
const filteredSnapshot: SerializableExecutionState = {
137+
...sourceSnapshot,
138+
blockStates: filteredBlockStates,
139+
executedBlocks: filteredExecutedBlocks,
140+
}
141+
127142
logger.info('Executing from block', {
128143
workflowId,
129144
startBlockId,
130145
effectiveStartBlockId,
131146
dirtySetSize: dirtySet.size,
147+
upstreamSetSize: upstreamSet.size,
148+
filteredBlockStatesCount: Object.keys(filteredBlockStates).length,
132149
totalBlocks: dag.nodes.size,
133150
dirtyBlocks: Array.from(dirtySet),
134151
})
@@ -156,7 +173,7 @@ export class DAGExecutor {
156173

157174
const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet }
158175
const { context, state } = this.createExecutionContext(workflowId, undefined, {
159-
snapshotState: sourceSnapshot,
176+
snapshotState: filteredSnapshot,
160177
runFromBlockContext,
161178
})
162179

apps/sim/executor/utils/run-from-block.ts

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,48 +51,89 @@ export interface RunFromBlockContext {
5151
}
5252

5353
/**
54-
* Computes all blocks that need re-execution when running from a specific block.
55-
* Uses BFS to find all downstream blocks reachable via outgoing edges.
54+
* Result of computing execution sets for run-from-block mode.
55+
*/
56+
export interface ExecutionSets {
57+
/** Blocks that need re-execution (start block + all downstream) */
58+
dirtySet: Set<string>
59+
/** Blocks that are upstream (ancestors) of the start block */
60+
upstreamSet: Set<string>
61+
}
62+
63+
/**
64+
* Computes both the dirty set (downstream) and upstream set in a single traversal pass.
65+
* - Dirty set: start block + all blocks reachable via outgoing edges (need re-execution)
66+
* - Upstream set: all blocks reachable via incoming edges (can be referenced)
5667
*
5768
* For loop/parallel containers, starts from the sentinel-start node and includes
5869
* the container ID itself in the dirty set.
5970
*
6071
* @param dag - The workflow DAG
6172
* @param startBlockId - The block to start execution from
62-
* @returns Set of block IDs that are "dirty" and need re-execution
73+
* @returns Object containing both dirtySet and upstreamSet
6374
*/
64-
export function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
75+
export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionSets {
6576
const dirty = new Set<string>([startBlockId])
77+
const upstream = new Set<string>()
6678
const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag)
6779
const traversalStartId = sentinelStartId ?? startBlockId
6880

6981
if (sentinelStartId) {
7082
dirty.add(sentinelStartId)
7183
}
7284

73-
const queue = [traversalStartId]
74-
75-
while (queue.length > 0) {
76-
const nodeId = queue.shift()!
85+
// BFS downstream for dirty set
86+
const downstreamQueue = [traversalStartId]
87+
while (downstreamQueue.length > 0) {
88+
const nodeId = downstreamQueue.shift()!
7789
const node = dag.nodes.get(nodeId)
7890
if (!node) continue
7991

8092
for (const [, edge] of node.outgoingEdges) {
8193
if (!dirty.has(edge.target)) {
8294
dirty.add(edge.target)
83-
queue.push(edge.target)
95+
downstreamQueue.push(edge.target)
8496
}
8597
}
8698
}
8799

88-
logger.debug('Computed dirty set', {
100+
// BFS upstream for upstream set
101+
const upstreamQueue = [traversalStartId]
102+
while (upstreamQueue.length > 0) {
103+
const nodeId = upstreamQueue.shift()!
104+
const node = dag.nodes.get(nodeId)
105+
if (!node) continue
106+
107+
for (const sourceId of node.incomingEdges) {
108+
if (!upstream.has(sourceId)) {
109+
upstream.add(sourceId)
110+
upstreamQueue.push(sourceId)
111+
}
112+
}
113+
}
114+
115+
logger.debug('Computed execution sets', {
89116
startBlockId,
90117
traversalStartId,
91118
dirtySetSize: dirty.size,
92-
dirtyBlocks: Array.from(dirty),
119+
upstreamSetSize: upstream.size,
93120
})
94121

95-
return dirty
122+
return { dirtySet: dirty, upstreamSet: upstream }
123+
}
124+
125+
/**
126+
* @deprecated Use computeExecutionSets instead for combined computation
127+
*/
128+
export function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
129+
return computeExecutionSets(dag, startBlockId).dirtySet
130+
}
131+
132+
/**
133+
* @deprecated Use computeExecutionSets instead for combined computation
134+
*/
135+
export function computeUpstreamSet(dag: DAG, blockId: string): Set<string> {
136+
return computeExecutionSets(dag, blockId).upstreamSet
96137
}
97138

98139
/**

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ import type {
2727
} from '@/executor/execution/types'
2828
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
2929
import { hasExecutionResult } from '@/executor/utils/errors'
30+
import {
31+
buildParallelSentinelEndId,
32+
buildSentinelEndId,
33+
} from '@/executor/utils/subflow-utils'
3034
import { Serializer } from '@/serializer'
3135

3236
const logger = createLogger('ExecutionCore')
@@ -255,6 +259,24 @@ export async function executeWorkflowCore(
255259

256260
processedInput = input || {}
257261

262+
// Resolve stopAfterBlockId for loop/parallel containers to their sentinel-end IDs
263+
let resolvedStopAfterBlockId = stopAfterBlockId
264+
if (stopAfterBlockId) {
265+
if (serializedWorkflow.loops?.[stopAfterBlockId]) {
266+
resolvedStopAfterBlockId = buildSentinelEndId(stopAfterBlockId)
267+
logger.info(`[${requestId}] Resolved loop container to sentinel-end`, {
268+
original: stopAfterBlockId,
269+
resolved: resolvedStopAfterBlockId,
270+
})
271+
} else if (serializedWorkflow.parallels?.[stopAfterBlockId]) {
272+
resolvedStopAfterBlockId = buildParallelSentinelEndId(stopAfterBlockId)
273+
logger.info(`[${requestId}] Resolved parallel container to sentinel-end`, {
274+
original: stopAfterBlockId,
275+
resolved: resolvedStopAfterBlockId,
276+
})
277+
}
278+
}
279+
258280
// Create and execute workflow with callbacks
259281
if (resumeFromSnapshot) {
260282
logger.info(`[${requestId}] Resume execution detected`, {
@@ -305,7 +327,7 @@ export async function executeWorkflowCore(
305327
abortSignal,
306328
includeFileBase64,
307329
base64MaxBytes,
308-
stopAfterBlockId,
330+
stopAfterBlockId: resolvedStopAfterBlockId,
309331
}
310332

311333
const executorInstance = new Executor({

0 commit comments

Comments
 (0)