Skip to content
86 changes: 51 additions & 35 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class BlockExecutor {
if (!isSentinel) {
blockLog = this.createBlockLog(ctx, node.id, block, node)
ctx.blockLogs.push(blockLog)
this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
await this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
}

const startTime = performance.now()
Expand Down Expand Up @@ -105,7 +105,7 @@ export class BlockExecutor {
}
} catch (error) {
cleanupSelfReference?.()
return this.handleBlockError(
return await this.handleBlockError(
error,
ctx,
node,
Expand Down Expand Up @@ -179,7 +179,7 @@ export class BlockExecutor {
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
block,
})
this.callOnBlockComplete(
await this.callOnBlockComplete(
ctx,
node,
block,
Expand All @@ -195,7 +195,7 @@ export class BlockExecutor {

return normalizedOutput
} catch (error) {
return this.handleBlockError(
return await this.handleBlockError(
error,
ctx,
node,
Expand Down Expand Up @@ -226,7 +226,7 @@ export class BlockExecutor {
return this.blockHandlers.find((h) => h.canHandle(block))
}

private handleBlockError(
private async handleBlockError(
error: unknown,
ctx: ExecutionContext,
node: DAGNode,
Expand All @@ -236,7 +236,7 @@ export class BlockExecutor {
resolvedInputs: Record<string, any>,
isSentinel: boolean,
phase: 'input_resolution' | 'execution'
): NormalizedBlockOutput {
): Promise<NormalizedBlockOutput> {
const duration = performance.now() - startTime
const errorMessage = normalizeError(error)
const hasResolvedInputs =
Expand Down Expand Up @@ -287,7 +287,7 @@ export class BlockExecutor {
? error.childWorkflowInstanceId
: undefined
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
this.callOnBlockComplete(
await this.callOnBlockComplete(
ctx,
node,
block,
Expand Down Expand Up @@ -439,31 +439,39 @@ export class BlockExecutor {
return redactApiKeys(result)
}

private callOnBlockStart(
private async callOnBlockStart(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
executionOrder: number
): void {
): Promise<void> {
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE

const iterationContext = getIterationContext(ctx, node?.metadata)

if (this.contextExtensions.onBlockStart) {
this.contextExtensions.onBlockStart(
blockId,
blockName,
blockType,
executionOrder,
iterationContext,
ctx.childWorkflowContext
)
try {
await this.contextExtensions.onBlockStart(
blockId,
blockName,
blockType,
executionOrder,
iterationContext,
ctx.childWorkflowContext
)
} catch (error) {
logger.warn('Block start callback failed', {
blockId,
blockType,
error: error instanceof Error ? error.message : String(error),
})
}
}
}

private callOnBlockComplete(
private async callOnBlockComplete(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
Expand All @@ -474,30 +482,38 @@ export class BlockExecutor {
executionOrder: number,
endedAt: string,
childWorkflowInstanceId?: string
): void {
): Promise<void> {
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE

const iterationContext = getIterationContext(ctx, node?.metadata)

if (this.contextExtensions.onBlockComplete) {
this.contextExtensions.onBlockComplete(
blockId,
blockName,
blockType,
{
input,
output,
executionTime: duration,
startedAt,
executionOrder,
endedAt,
childWorkflowInstanceId,
},
iterationContext,
ctx.childWorkflowContext
)
try {
await this.contextExtensions.onBlockComplete(
blockId,
blockName,
blockType,
{
input,
output,
executionTime: duration,
startedAt,
executionOrder,
endedAt,
childWorkflowInstanceId,
},
iterationContext,
ctx.childWorkflowContext
)
} catch (error) {
logger.warn('Block completion callback failed', {
blockId,
blockType,
error: error instanceof Error ? error.message : String(error),
})
}
}
}

Expand Down
65 changes: 36 additions & 29 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class LoopOrchestrator {
private edgeManager: EdgeManager | null = null
) {}

initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope {
async initializeLoopScope(ctx: ExecutionContext, loopId: string): Promise<LoopScope> {
const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined
if (!loopConfig) {
throw new Error(`Loop config not found: ${loopId}`)
Expand All @@ -76,7 +76,7 @@ export class LoopOrchestrator {
)
if (iterationError) {
logger.error(iterationError, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
iterations: requestedIterations,
})
scope.maxIterations = 0
Expand All @@ -99,7 +99,7 @@ export class LoopOrchestrator {
} catch (error) {
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
await this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
forEachItems: loopConfig.forEachItems,
})
scope.items = []
Expand All @@ -117,7 +117,7 @@ export class LoopOrchestrator {
)
if (sizeError) {
logger.error(sizeError, { loopId, collectionSize: items.length })
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
await this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
forEachItems: loopConfig.forEachItems,
collectionSize: items.length,
})
Expand Down Expand Up @@ -155,7 +155,7 @@ export class LoopOrchestrator {
)
if (iterationError) {
logger.error(iterationError, { loopId, requestedIterations })
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
iterations: requestedIterations,
})
scope.maxIterations = 0
Expand All @@ -182,14 +182,14 @@ export class LoopOrchestrator {
return scope
}

private addLoopErrorLog(
private async addLoopErrorLog(
ctx: ExecutionContext,
loopId: string,
loopType: string,
errorMessage: string,
inputData?: any
): void {
addSubflowErrorLog(
): Promise<void> {
await addSubflowErrorLog(
ctx,
loopId,
'loop',
Expand Down Expand Up @@ -238,7 +238,7 @@ export class LoopOrchestrator {
}
if (isCancelled) {
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
return this.createExitResult(ctx, loopId, scope)
return await this.createExitResult(ctx, loopId, scope)
}

const iterationResults: NormalizedBlockOutput[] = []
Expand All @@ -253,7 +253,7 @@ export class LoopOrchestrator {
scope.currentIterationOutputs.clear()

if (!(await this.evaluateCondition(ctx, scope, scope.iteration + 1))) {
return this.createExitResult(ctx, loopId, scope)
return await this.createExitResult(ctx, loopId, scope)
}

scope.iteration++
Expand All @@ -269,11 +269,11 @@ export class LoopOrchestrator {
}
}

private createExitResult(
private async createExitResult(
ctx: ExecutionContext,
loopId: string,
scope: LoopScope
): LoopContinuationResult {
): Promise<LoopContinuationResult> {
const results = scope.allIterationOutputs
const output = { results }
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
Expand All @@ -282,19 +282,26 @@ export class LoopOrchestrator {
const now = new Date().toISOString()
const iterationContext = buildContainerIterationContext(ctx, loopId)

this.contextExtensions.onBlockComplete(
loopId,
'Loop',
'loop',
{
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
try {
await this.contextExtensions.onBlockComplete(
loopId,
'Loop',
'loop',
{
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
} catch (error) {
logger.warn('Loop completion callback failed', {
loopId,
error: error instanceof Error ? error.message : String(error),
})
}
}

return {
Expand Down Expand Up @@ -597,7 +604,7 @@ export class LoopOrchestrator {
if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}
return true
Expand All @@ -607,7 +614,7 @@ export class LoopOrchestrator {
if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}
return true
Expand All @@ -621,7 +628,7 @@ export class LoopOrchestrator {
if (!scope.condition) {
logger.warn('No condition defined for while loop', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
return false
}

Expand All @@ -634,7 +641,7 @@ export class LoopOrchestrator {

if (!result) {
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
}

return result
Expand Down
Loading
Loading