Skip to content

Commit 19e3d89

Browse files
author
Theodore Li
committed
Merge branch 'staging' into fix/mothership-wf-validation-error
2 parents 52a1880 + 67478bb commit 19e3d89

File tree

16 files changed

+1258
-219
lines changed

16 files changed

+1258
-219
lines changed

apps/sim/executor/execution/block-executor.ts

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ export class BlockExecutor {
7777
if (!isSentinel) {
7878
blockLog = this.createBlockLog(ctx, node.id, block, node)
7979
ctx.blockLogs.push(blockLog)
80-
this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
80+
await this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
8181
}
8282

8383
const startTime = performance.now()
@@ -105,7 +105,7 @@ export class BlockExecutor {
105105
}
106106
} catch (error) {
107107
cleanupSelfReference?.()
108-
return this.handleBlockError(
108+
return await this.handleBlockError(
109109
error,
110110
ctx,
111111
node,
@@ -179,7 +179,7 @@ export class BlockExecutor {
179179
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
180180
block,
181181
})
182-
this.callOnBlockComplete(
182+
await this.callOnBlockComplete(
183183
ctx,
184184
node,
185185
block,
@@ -195,7 +195,7 @@ export class BlockExecutor {
195195

196196
return normalizedOutput
197197
} catch (error) {
198-
return this.handleBlockError(
198+
return await this.handleBlockError(
199199
error,
200200
ctx,
201201
node,
@@ -226,7 +226,7 @@ export class BlockExecutor {
226226
return this.blockHandlers.find((h) => h.canHandle(block))
227227
}
228228

229-
private handleBlockError(
229+
private async handleBlockError(
230230
error: unknown,
231231
ctx: ExecutionContext,
232232
node: DAGNode,
@@ -236,7 +236,7 @@ export class BlockExecutor {
236236
resolvedInputs: Record<string, any>,
237237
isSentinel: boolean,
238238
phase: 'input_resolution' | 'execution'
239-
): NormalizedBlockOutput {
239+
): Promise<NormalizedBlockOutput> {
240240
const duration = performance.now() - startTime
241241
const errorMessage = normalizeError(error)
242242
const hasResolvedInputs =
@@ -287,7 +287,7 @@ export class BlockExecutor {
287287
? error.childWorkflowInstanceId
288288
: undefined
289289
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
290-
this.callOnBlockComplete(
290+
await this.callOnBlockComplete(
291291
ctx,
292292
node,
293293
block,
@@ -439,31 +439,39 @@ export class BlockExecutor {
439439
return redactApiKeys(result)
440440
}
441441

442-
private callOnBlockStart(
442+
private async callOnBlockStart(
443443
ctx: ExecutionContext,
444444
node: DAGNode,
445445
block: SerializedBlock,
446446
executionOrder: number
447-
): void {
447+
): Promise<void> {
448448
const blockId = node.metadata?.originalBlockId ?? node.id
449449
const blockName = block.metadata?.name ?? blockId
450450
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
451451

452452
const iterationContext = getIterationContext(ctx, node?.metadata)
453453

454454
if (this.contextExtensions.onBlockStart) {
455-
this.contextExtensions.onBlockStart(
456-
blockId,
457-
blockName,
458-
blockType,
459-
executionOrder,
460-
iterationContext,
461-
ctx.childWorkflowContext
462-
)
455+
try {
456+
await this.contextExtensions.onBlockStart(
457+
blockId,
458+
blockName,
459+
blockType,
460+
executionOrder,
461+
iterationContext,
462+
ctx.childWorkflowContext
463+
)
464+
} catch (error) {
465+
logger.warn('Block start callback failed', {
466+
blockId,
467+
blockType,
468+
error: error instanceof Error ? error.message : String(error),
469+
})
470+
}
463471
}
464472
}
465473

466-
private callOnBlockComplete(
474+
private async callOnBlockComplete(
467475
ctx: ExecutionContext,
468476
node: DAGNode,
469477
block: SerializedBlock,
@@ -474,30 +482,38 @@ export class BlockExecutor {
474482
executionOrder: number,
475483
endedAt: string,
476484
childWorkflowInstanceId?: string
477-
): void {
485+
): Promise<void> {
478486
const blockId = node.metadata?.originalBlockId ?? node.id
479487
const blockName = block.metadata?.name ?? blockId
480488
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
481489

482490
const iterationContext = getIterationContext(ctx, node?.metadata)
483491

484492
if (this.contextExtensions.onBlockComplete) {
485-
this.contextExtensions.onBlockComplete(
486-
blockId,
487-
blockName,
488-
blockType,
489-
{
490-
input,
491-
output,
492-
executionTime: duration,
493-
startedAt,
494-
executionOrder,
495-
endedAt,
496-
childWorkflowInstanceId,
497-
},
498-
iterationContext,
499-
ctx.childWorkflowContext
500-
)
493+
try {
494+
await this.contextExtensions.onBlockComplete(
495+
blockId,
496+
blockName,
497+
blockType,
498+
{
499+
input,
500+
output,
501+
executionTime: duration,
502+
startedAt,
503+
executionOrder,
504+
endedAt,
505+
childWorkflowInstanceId,
506+
},
507+
iterationContext,
508+
ctx.childWorkflowContext
509+
)
510+
} catch (error) {
511+
logger.warn('Block completion callback failed', {
512+
blockId,
513+
blockType,
514+
error: error instanceof Error ? error.message : String(error),
515+
})
516+
}
501517
}
502518
}
503519

apps/sim/executor/orchestrators/loop.ts

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export class LoopOrchestrator {
5151
private edgeManager: EdgeManager | null = null
5252
) {}
5353

54-
initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope {
54+
async initializeLoopScope(ctx: ExecutionContext, loopId: string): Promise<LoopScope> {
5555
const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined
5656
if (!loopConfig) {
5757
throw new Error(`Loop config not found: ${loopId}`)
@@ -76,7 +76,7 @@ export class LoopOrchestrator {
7676
)
7777
if (iterationError) {
7878
logger.error(iterationError, { loopId, requestedIterations })
79-
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
79+
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
8080
iterations: requestedIterations,
8181
})
8282
scope.maxIterations = 0
@@ -99,7 +99,7 @@ export class LoopOrchestrator {
9999
} catch (error) {
100100
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
101101
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
102-
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
102+
await this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
103103
forEachItems: loopConfig.forEachItems,
104104
})
105105
scope.items = []
@@ -117,7 +117,7 @@ export class LoopOrchestrator {
117117
)
118118
if (sizeError) {
119119
logger.error(sizeError, { loopId, collectionSize: items.length })
120-
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
120+
await this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
121121
forEachItems: loopConfig.forEachItems,
122122
collectionSize: items.length,
123123
})
@@ -155,7 +155,7 @@ export class LoopOrchestrator {
155155
)
156156
if (iterationError) {
157157
logger.error(iterationError, { loopId, requestedIterations })
158-
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
158+
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
159159
iterations: requestedIterations,
160160
})
161161
scope.maxIterations = 0
@@ -182,14 +182,14 @@ export class LoopOrchestrator {
182182
return scope
183183
}
184184

185-
private addLoopErrorLog(
185+
private async addLoopErrorLog(
186186
ctx: ExecutionContext,
187187
loopId: string,
188188
loopType: string,
189189
errorMessage: string,
190190
inputData?: any
191-
): void {
192-
addSubflowErrorLog(
191+
): Promise<void> {
192+
await addSubflowErrorLog(
193193
ctx,
194194
loopId,
195195
'loop',
@@ -238,7 +238,7 @@ export class LoopOrchestrator {
238238
}
239239
if (isCancelled) {
240240
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
241-
return this.createExitResult(ctx, loopId, scope)
241+
return await this.createExitResult(ctx, loopId, scope)
242242
}
243243

244244
const iterationResults: NormalizedBlockOutput[] = []
@@ -253,7 +253,7 @@ export class LoopOrchestrator {
253253
scope.currentIterationOutputs.clear()
254254

255255
if (!(await this.evaluateCondition(ctx, scope, scope.iteration + 1))) {
256-
return this.createExitResult(ctx, loopId, scope)
256+
return await this.createExitResult(ctx, loopId, scope)
257257
}
258258

259259
scope.iteration++
@@ -269,11 +269,11 @@ export class LoopOrchestrator {
269269
}
270270
}
271271

272-
private createExitResult(
272+
private async createExitResult(
273273
ctx: ExecutionContext,
274274
loopId: string,
275275
scope: LoopScope
276-
): LoopContinuationResult {
276+
): Promise<LoopContinuationResult> {
277277
const results = scope.allIterationOutputs
278278
const output = { results }
279279
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
@@ -282,19 +282,26 @@ export class LoopOrchestrator {
282282
const now = new Date().toISOString()
283283
const iterationContext = buildContainerIterationContext(ctx, loopId)
284284

285-
this.contextExtensions.onBlockComplete(
286-
loopId,
287-
'Loop',
288-
'loop',
289-
{
290-
output,
291-
executionTime: DEFAULTS.EXECUTION_TIME,
292-
startedAt: now,
293-
executionOrder: getNextExecutionOrder(ctx),
294-
endedAt: now,
295-
},
296-
iterationContext
297-
)
285+
try {
286+
await this.contextExtensions.onBlockComplete(
287+
loopId,
288+
'Loop',
289+
'loop',
290+
{
291+
output,
292+
executionTime: DEFAULTS.EXECUTION_TIME,
293+
startedAt: now,
294+
executionOrder: getNextExecutionOrder(ctx),
295+
endedAt: now,
296+
},
297+
iterationContext
298+
)
299+
} catch (error) {
300+
logger.warn('Loop completion callback failed', {
301+
loopId,
302+
error: error instanceof Error ? error.message : String(error),
303+
})
304+
}
298305
}
299306

300307
return {
@@ -597,7 +604,7 @@ export class LoopOrchestrator {
597604
if (!scope.items || scope.items.length === 0) {
598605
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
599606
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
600-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
607+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
601608
return false
602609
}
603610
return true
@@ -607,7 +614,7 @@ export class LoopOrchestrator {
607614
if (scope.maxIterations === 0) {
608615
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
609616
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
610-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
617+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
611618
return false
612619
}
613620
return true
@@ -621,7 +628,7 @@ export class LoopOrchestrator {
621628
if (!scope.condition) {
622629
logger.warn('No condition defined for while loop', { loopId })
623630
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
624-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
631+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
625632
return false
626633
}
627634

@@ -634,7 +641,7 @@ export class LoopOrchestrator {
634641

635642
if (!result) {
636643
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
637-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
644+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
638645
}
639646

640647
return result

0 commit comments

Comments
 (0)