Skip to content

Commit 2854944

Browse files
waleedlatif1claude
andcommitted
fix(executor): capture BlockExecutor locally so finally drains its own instance
Previously this.blockExecutor was overwritten on every buildExecutionPipeline call. Concurrent or re-entrant execute()/executeFromBlock() calls would have their finally block drain the wrong instance, allowing the first execution's block events to land after its terminal event. Returning { engine, blockExecutor } and capturing both locally makes the drain pinned to the same instance the engine.run() ran against. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent a544293 commit 2854944

1 file changed

Lines changed: 6 additions & 7 deletions

File tree

apps/sim/executor/execution/executor.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ export class DAGExecutor {
5353
private contextExtensions: ContextExtensions
5454
private dagBuilder: DAGBuilder
5555
private execLogger: Logger
56-
private blockExecutor: BlockExecutor | null = null
5756

5857
constructor(options: DAGExecutorOptions) {
5958
this.workflow = options.workflow
@@ -80,11 +79,11 @@ export class DAGExecutor {
8079
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
8180
context.subflowParentMap = this.buildSubflowParentMap(dag)
8281

83-
const engine = this.buildExecutionPipeline(context, dag, state)
82+
const { engine, blockExecutor } = this.buildExecutionPipeline(context, dag, state)
8483
try {
8584
return await engine.run(triggerBlockId)
8685
} finally {
87-
await this.blockExecutor?.awaitPendingCallbacks()
86+
await blockExecutor.awaitPendingCallbacks()
8887
}
8988
}
9089

@@ -211,19 +210,18 @@ export class DAGExecutor {
211210
})
212211
context.subflowParentMap = this.buildSubflowParentMap(dag)
213212

214-
const engine = this.buildExecutionPipeline(context, dag, state)
213+
const { engine, blockExecutor } = this.buildExecutionPipeline(context, dag, state)
215214
try {
216215
return await engine.run()
217216
} finally {
218-
await this.blockExecutor?.awaitPendingCallbacks()
217+
await blockExecutor.awaitPendingCallbacks()
219218
}
220219
}
221220

222221
private buildExecutionPipeline(context: ExecutionContext, dag: DAG, state: ExecutionState) {
223222
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
224223
const allHandlers = createBlockHandlers()
225224
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
226-
this.blockExecutor = blockExecutor
227225
const edgeManager = new EdgeManager(dag)
228226
const loopOrchestrator = new LoopOrchestrator(
229227
dag,
@@ -245,7 +243,8 @@ export class DAGExecutor {
245243
loopOrchestrator,
246244
parallelOrchestrator
247245
)
248-
return new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
246+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
247+
return { engine, blockExecutor }
249248
}
250249

251250
private createExecutionContext(

0 commit comments

Comments
 (0)