Skip to content

Commit 67478bb

Browse files
PlaneInABottletesticecrasher321
authored
fix(logs): add durable execution diagnostics foundation (#3564)
* fix(logs): persist execution diagnostics markers Store last-started and last-completed block markers with finalization metadata so later read surfaces can explain how a run ended without reconstructing executor state. * fix(executor): preserve durable diagnostics ordering Await only the persistence needed to keep diagnostics durable before terminal completion while keeping callback failures from changing execution behavior. * fix(logs): preserve fallback diagnostics semantics Keep successful fallback output and accumulated cost intact while tightening progress-write draining and deduplicating trace span counting for diagnostics helpers. * fix(api): restore async execute route test mock Add the missing AuthType export to the hybrid auth mock so the async execution route test exercises the 202 queueing path instead of crashing with a 500 in CI. * fix(executor): align async block error handling * fix(logs): tighten marker ordering scope Allow same-millisecond marker writes to replace prior markers and drop the unused diagnostics read helper so this PR stays focused on persistence rather than unread foundation code. * fix(logs): remove unused finalization type guard Drop the unused helper so this PR only ships the persistence-side status types it actually uses. * fix(executor): await subflow diagnostics callbacks Ensure empty-subflow and subflow-error lifecycle callbacks participate in progress-write draining before terminal finalization while still swallowing callback failures. --------- Co-authored-by: test <test@example.com> Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
1 parent c9f082d commit 67478bb

File tree

13 files changed

+1217
-207
lines changed

13 files changed

+1217
-207
lines changed

apps/sim/executor/execution/block-executor.ts

Lines changed: 51 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ export class BlockExecutor {
7777
if (!isSentinel) {
7878
blockLog = this.createBlockLog(ctx, node.id, block, node)
7979
ctx.blockLogs.push(blockLog)
80-
this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
80+
await this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
8181
}
8282

8383
const startTime = performance.now()
@@ -105,7 +105,7 @@ export class BlockExecutor {
105105
}
106106
} catch (error) {
107107
cleanupSelfReference?.()
108-
return this.handleBlockError(
108+
return await this.handleBlockError(
109109
error,
110110
ctx,
111111
node,
@@ -179,7 +179,7 @@ export class BlockExecutor {
179179
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
180180
block,
181181
})
182-
this.callOnBlockComplete(
182+
await this.callOnBlockComplete(
183183
ctx,
184184
node,
185185
block,
@@ -195,7 +195,7 @@ export class BlockExecutor {
195195

196196
return normalizedOutput
197197
} catch (error) {
198-
return this.handleBlockError(
198+
return await this.handleBlockError(
199199
error,
200200
ctx,
201201
node,
@@ -226,7 +226,7 @@ export class BlockExecutor {
226226
return this.blockHandlers.find((h) => h.canHandle(block))
227227
}
228228

229-
private handleBlockError(
229+
private async handleBlockError(
230230
error: unknown,
231231
ctx: ExecutionContext,
232232
node: DAGNode,
@@ -236,7 +236,7 @@ export class BlockExecutor {
236236
resolvedInputs: Record<string, any>,
237237
isSentinel: boolean,
238238
phase: 'input_resolution' | 'execution'
239-
): NormalizedBlockOutput {
239+
): Promise<NormalizedBlockOutput> {
240240
const duration = performance.now() - startTime
241241
const errorMessage = normalizeError(error)
242242
const hasResolvedInputs =
@@ -287,7 +287,7 @@ export class BlockExecutor {
287287
? error.childWorkflowInstanceId
288288
: undefined
289289
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
290-
this.callOnBlockComplete(
290+
await this.callOnBlockComplete(
291291
ctx,
292292
node,
293293
block,
@@ -439,31 +439,39 @@ export class BlockExecutor {
439439
return redactApiKeys(result)
440440
}
441441

442-
private callOnBlockStart(
442+
private async callOnBlockStart(
443443
ctx: ExecutionContext,
444444
node: DAGNode,
445445
block: SerializedBlock,
446446
executionOrder: number
447-
): void {
447+
): Promise<void> {
448448
const blockId = node.metadata?.originalBlockId ?? node.id
449449
const blockName = block.metadata?.name ?? blockId
450450
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
451451

452452
const iterationContext = getIterationContext(ctx, node?.metadata)
453453

454454
if (this.contextExtensions.onBlockStart) {
455-
this.contextExtensions.onBlockStart(
456-
blockId,
457-
blockName,
458-
blockType,
459-
executionOrder,
460-
iterationContext,
461-
ctx.childWorkflowContext
462-
)
455+
try {
456+
await this.contextExtensions.onBlockStart(
457+
blockId,
458+
blockName,
459+
blockType,
460+
executionOrder,
461+
iterationContext,
462+
ctx.childWorkflowContext
463+
)
464+
} catch (error) {
465+
logger.warn('Block start callback failed', {
466+
blockId,
467+
blockType,
468+
error: error instanceof Error ? error.message : String(error),
469+
})
470+
}
463471
}
464472
}
465473

466-
private callOnBlockComplete(
474+
private async callOnBlockComplete(
467475
ctx: ExecutionContext,
468476
node: DAGNode,
469477
block: SerializedBlock,
@@ -474,30 +482,38 @@ export class BlockExecutor {
474482
executionOrder: number,
475483
endedAt: string,
476484
childWorkflowInstanceId?: string
477-
): void {
485+
): Promise<void> {
478486
const blockId = node.metadata?.originalBlockId ?? node.id
479487
const blockName = block.metadata?.name ?? blockId
480488
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
481489

482490
const iterationContext = getIterationContext(ctx, node?.metadata)
483491

484492
if (this.contextExtensions.onBlockComplete) {
485-
this.contextExtensions.onBlockComplete(
486-
blockId,
487-
blockName,
488-
blockType,
489-
{
490-
input,
491-
output,
492-
executionTime: duration,
493-
startedAt,
494-
executionOrder,
495-
endedAt,
496-
childWorkflowInstanceId,
497-
},
498-
iterationContext,
499-
ctx.childWorkflowContext
500-
)
493+
try {
494+
await this.contextExtensions.onBlockComplete(
495+
blockId,
496+
blockName,
497+
blockType,
498+
{
499+
input,
500+
output,
501+
executionTime: duration,
502+
startedAt,
503+
executionOrder,
504+
endedAt,
505+
childWorkflowInstanceId,
506+
},
507+
iterationContext,
508+
ctx.childWorkflowContext
509+
)
510+
} catch (error) {
511+
logger.warn('Block completion callback failed', {
512+
blockId,
513+
blockType,
514+
error: error instanceof Error ? error.message : String(error),
515+
})
516+
}
501517
}
502518
}
503519

apps/sim/executor/orchestrators/loop.ts

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export class LoopOrchestrator {
5151
private edgeManager: EdgeManager | null = null
5252
) {}
5353

54-
initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope {
54+
async initializeLoopScope(ctx: ExecutionContext, loopId: string): Promise<LoopScope> {
5555
const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined
5656
if (!loopConfig) {
5757
throw new Error(`Loop config not found: ${loopId}`)
@@ -76,7 +76,7 @@ export class LoopOrchestrator {
7676
)
7777
if (iterationError) {
7878
logger.error(iterationError, { loopId, requestedIterations })
79-
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
79+
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
8080
iterations: requestedIterations,
8181
})
8282
scope.maxIterations = 0
@@ -99,7 +99,7 @@ export class LoopOrchestrator {
9999
} catch (error) {
100100
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
101101
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
102-
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
102+
await this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
103103
forEachItems: loopConfig.forEachItems,
104104
})
105105
scope.items = []
@@ -117,7 +117,7 @@ export class LoopOrchestrator {
117117
)
118118
if (sizeError) {
119119
logger.error(sizeError, { loopId, collectionSize: items.length })
120-
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
120+
await this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
121121
forEachItems: loopConfig.forEachItems,
122122
collectionSize: items.length,
123123
})
@@ -155,7 +155,7 @@ export class LoopOrchestrator {
155155
)
156156
if (iterationError) {
157157
logger.error(iterationError, { loopId, requestedIterations })
158-
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
158+
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
159159
iterations: requestedIterations,
160160
})
161161
scope.maxIterations = 0
@@ -182,14 +182,14 @@ export class LoopOrchestrator {
182182
return scope
183183
}
184184

185-
private addLoopErrorLog(
185+
private async addLoopErrorLog(
186186
ctx: ExecutionContext,
187187
loopId: string,
188188
loopType: string,
189189
errorMessage: string,
190190
inputData?: any
191-
): void {
192-
addSubflowErrorLog(
191+
): Promise<void> {
192+
await addSubflowErrorLog(
193193
ctx,
194194
loopId,
195195
'loop',
@@ -238,7 +238,7 @@ export class LoopOrchestrator {
238238
}
239239
if (isCancelled) {
240240
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
241-
return this.createExitResult(ctx, loopId, scope)
241+
return await this.createExitResult(ctx, loopId, scope)
242242
}
243243

244244
const iterationResults: NormalizedBlockOutput[] = []
@@ -253,7 +253,7 @@ export class LoopOrchestrator {
253253
scope.currentIterationOutputs.clear()
254254

255255
if (!(await this.evaluateCondition(ctx, scope, scope.iteration + 1))) {
256-
return this.createExitResult(ctx, loopId, scope)
256+
return await this.createExitResult(ctx, loopId, scope)
257257
}
258258

259259
scope.iteration++
@@ -269,11 +269,11 @@ export class LoopOrchestrator {
269269
}
270270
}
271271

272-
private createExitResult(
272+
private async createExitResult(
273273
ctx: ExecutionContext,
274274
loopId: string,
275275
scope: LoopScope
276-
): LoopContinuationResult {
276+
): Promise<LoopContinuationResult> {
277277
const results = scope.allIterationOutputs
278278
const output = { results }
279279
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
@@ -282,19 +282,26 @@ export class LoopOrchestrator {
282282
const now = new Date().toISOString()
283283
const iterationContext = buildContainerIterationContext(ctx, loopId)
284284

285-
this.contextExtensions.onBlockComplete(
286-
loopId,
287-
'Loop',
288-
'loop',
289-
{
290-
output,
291-
executionTime: DEFAULTS.EXECUTION_TIME,
292-
startedAt: now,
293-
executionOrder: getNextExecutionOrder(ctx),
294-
endedAt: now,
295-
},
296-
iterationContext
297-
)
285+
try {
286+
await this.contextExtensions.onBlockComplete(
287+
loopId,
288+
'Loop',
289+
'loop',
290+
{
291+
output,
292+
executionTime: DEFAULTS.EXECUTION_TIME,
293+
startedAt: now,
294+
executionOrder: getNextExecutionOrder(ctx),
295+
endedAt: now,
296+
},
297+
iterationContext
298+
)
299+
} catch (error) {
300+
logger.warn('Loop completion callback failed', {
301+
loopId,
302+
error: error instanceof Error ? error.message : String(error),
303+
})
304+
}
298305
}
299306

300307
return {
@@ -597,7 +604,7 @@ export class LoopOrchestrator {
597604
if (!scope.items || scope.items.length === 0) {
598605
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
599606
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
600-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
607+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
601608
return false
602609
}
603610
return true
@@ -607,7 +614,7 @@ export class LoopOrchestrator {
607614
if (scope.maxIterations === 0) {
608615
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
609616
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
610-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
617+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
611618
return false
612619
}
613620
return true
@@ -621,7 +628,7 @@ export class LoopOrchestrator {
621628
if (!scope.condition) {
622629
logger.warn('No condition defined for while loop', { loopId })
623630
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
624-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
631+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
625632
return false
626633
}
627634

@@ -634,7 +641,7 @@ export class LoopOrchestrator {
634641

635642
if (!result) {
636643
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
637-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
644+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
638645
}
639646

640647
return result

0 commit comments

Comments
 (0)