Skip to content

Commit 211a7ac

Browse files
fix(child-workflow): nested spans handoff (#2966)
* fix(child-workflow): nested spans handoff * remove overly defensive programming * update type check * type more code * remove more dead code * address bugbot comments
1 parent 0f9b6ad commit 211a7ac

File tree

12 files changed

+138
-78
lines changed

12 files changed

+138
-78
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import { normalizeName } from '@/executor/constants'
3030
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
3131
import type { ExecutionMetadata, IterationContext } from '@/executor/execution/types'
3232
import type { NormalizedBlockOutput, StreamingExecution } from '@/executor/types'
33+
import { hasExecutionResult } from '@/executor/utils/errors'
3334
import { Serializer } from '@/serializer'
3435
import { CORE_TRIGGER_TYPES, type CoreTriggerType } from '@/stores/logs/filters/types'
3536

@@ -467,17 +468,17 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
467468
}
468469

469470
return NextResponse.json(filteredResult)
470-
} catch (error: any) {
471-
const errorMessage = error.message || 'Unknown error'
471+
} catch (error: unknown) {
472+
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
472473
logger.error(`[${requestId}] Non-SSE execution failed: ${errorMessage}`)
473474

474-
const executionResult = error.executionResult
475+
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
475476

476477
return NextResponse.json(
477478
{
478479
success: false,
479480
output: executionResult?.output,
480-
error: executionResult?.error || error.message || 'Execution failed',
481+
error: executionResult?.error || errorMessage || 'Execution failed',
481482
metadata: executionResult?.metadata
482483
? {
483484
duration: executionResult.metadata.duration,
@@ -788,11 +789,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
788789

789790
// Cleanup base64 cache for this execution
790791
await cleanupExecutionBase64Cache(executionId)
791-
} catch (error: any) {
792-
const errorMessage = error.message || 'Unknown error'
792+
} catch (error: unknown) {
793+
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
793794
logger.error(`[${requestId}] SSE execution failed: ${errorMessage}`)
794795

795-
const executionResult = error.executionResult
796+
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
796797

797798
sendEvent({
798799
type: 'execution:error',

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
} from '@/lib/workflows/triggers/triggers'
1717
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
1818
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
19+
import { hasExecutionResult } from '@/executor/utils/errors'
1920
import { coerceValue } from '@/executor/utils/start-block'
2021
import { subscriptionKeys } from '@/hooks/queries/subscription'
2122
import { useExecutionStream } from '@/hooks/use-execution-stream'
@@ -76,17 +77,6 @@ function normalizeErrorMessage(error: unknown): string {
7677
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
7778
}
7879

79-
function isExecutionResult(value: unknown): value is ExecutionResult {
80-
if (!isRecord(value)) return false
81-
return typeof value.success === 'boolean' && isRecord(value.output)
82-
}
83-
84-
function extractExecutionResult(error: unknown): ExecutionResult | null {
85-
if (!isRecord(error)) return null
86-
const candidate = error.executionResult
87-
return isExecutionResult(candidate) ? candidate : null
88-
}
89-
9080
export function useWorkflowExecution() {
9181
const queryClient = useQueryClient()
9282
const currentWorkflow = useCurrentWorkflow()
@@ -1138,11 +1128,11 @@ export function useWorkflowExecution() {
11381128

11391129
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
11401130
const normalizedMessage = normalizeErrorMessage(error)
1141-
const executionResultFromError = extractExecutionResult(error)
11421131

11431132
let errorResult: ExecutionResult
11441133

1145-
if (executionResultFromError) {
1134+
if (hasExecutionResult(error)) {
1135+
const executionResultFromError = error.executionResult
11461136
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
11471137

11481138
errorResult = {

apps/sim/background/schedule-execution.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
} from '@/lib/workflows/schedules/utils'
2222
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
2323
import type { ExecutionMetadata } from '@/executor/execution/types'
24-
import type { ExecutionResult } from '@/executor/types'
24+
import { hasExecutionResult } from '@/executor/utils/errors'
2525
import { MAX_CONSECUTIVE_FAILURES } from '@/triggers/constants'
2626

2727
const logger = createLogger('TriggerScheduleExecution')
@@ -231,8 +231,7 @@ async function runWorkflowExecution({
231231
} catch (error: unknown) {
232232
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
233233

234-
const errorWithResult = error as { executionResult?: ExecutionResult }
235-
const executionResult = errorWithResult?.executionResult
234+
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
236235
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
237236

238237
await loggingSession.safeCompleteWithError({

apps/sim/background/webhook-execution.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
1616
import { getWorkflowById } from '@/lib/workflows/utils'
1717
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
1818
import type { ExecutionMetadata } from '@/executor/execution/types'
19-
import type { ExecutionResult } from '@/executor/types'
19+
import { hasExecutionResult } from '@/executor/utils/errors'
2020
import { safeAssign } from '@/tools/safe-assign'
2121
import { getTrigger, isTriggerValid } from '@/triggers'
2222

@@ -578,12 +578,13 @@ async function executeWebhookJobInternal(
578578
deploymentVersionId,
579579
})
580580

581-
const errorWithResult = error as { executionResult?: ExecutionResult }
582-
const executionResult = errorWithResult?.executionResult || {
583-
success: false,
584-
output: {},
585-
logs: [],
586-
}
581+
const executionResult = hasExecutionResult(error)
582+
? error.executionResult
583+
: {
584+
success: false,
585+
output: {},
586+
logs: [],
587+
}
587588
const { traceSpans } = buildTraceSpans(executionResult)
588589

589590
await loggingSession.safeCompleteWithError({

apps/sim/background/workflow-execution.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-m
99
import { getWorkflowById } from '@/lib/workflows/utils'
1010
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
1111
import type { ExecutionMetadata } from '@/executor/execution/types'
12-
import type { ExecutionResult } from '@/executor/types'
12+
import { hasExecutionResult } from '@/executor/utils/errors'
1313
import type { CoreTriggerType } from '@/stores/logs/filters/types'
1414

1515
const logger = createLogger('TriggerWorkflowExecution')
@@ -160,8 +160,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
160160
executionId,
161161
})
162162

163-
const errorWithResult = error as { executionResult?: ExecutionResult }
164-
const executionResult = errorWithResult?.executionResult
163+
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
165164
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
166165

167166
await loggingSession.safeCompleteWithError({
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import type { TraceSpan } from '@/lib/logs/types'
2+
import type { ExecutionResult } from '@/executor/types'
3+
4+
interface ChildWorkflowErrorOptions {
5+
message: string
6+
childWorkflowName: string
7+
childTraceSpans?: TraceSpan[]
8+
executionResult?: ExecutionResult
9+
cause?: Error
10+
}
11+
12+
/**
13+
* Error raised when a child workflow execution fails.
14+
*/
15+
export class ChildWorkflowError extends Error {
16+
readonly childTraceSpans: TraceSpan[]
17+
readonly childWorkflowName: string
18+
readonly executionResult?: ExecutionResult
19+
20+
constructor(options: ChildWorkflowErrorOptions) {
21+
super(options.message, { cause: options.cause })
22+
this.name = 'ChildWorkflowError'
23+
this.childWorkflowName = options.childWorkflowName
24+
this.childTraceSpans = options.childTraceSpans ?? []
25+
this.executionResult = options.executionResult
26+
}
27+
28+
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {
29+
return error instanceof ChildWorkflowError
30+
}
31+
}

apps/sim/executor/execution/block-executor.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
isSentinelBlockType,
1414
} from '@/executor/constants'
1515
import type { DAGNode } from '@/executor/dag/builder'
16+
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
1617
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
1718
import {
1819
generatePauseContextId,
@@ -213,24 +214,26 @@ export class BlockExecutor {
213214
? resolvedInputs
214215
: ((block.config?.params as Record<string, any> | undefined) ?? {})
215216

216-
if (blockLog) {
217-
blockLog.endedAt = new Date().toISOString()
218-
blockLog.durationMs = duration
219-
blockLog.success = false
220-
blockLog.error = errorMessage
221-
blockLog.input = input
222-
}
223-
224217
const errorOutput: NormalizedBlockOutput = {
225218
error: errorMessage,
226219
}
227220

228-
if (error && typeof error === 'object' && 'childTraceSpans' in error) {
229-
errorOutput.childTraceSpans = (error as any).childTraceSpans
221+
if (ChildWorkflowError.isChildWorkflowError(error)) {
222+
errorOutput.childTraceSpans = error.childTraceSpans
223+
errorOutput.childWorkflowName = error.childWorkflowName
230224
}
231225

232226
this.state.setBlockOutput(node.id, errorOutput, duration)
233227

228+
if (blockLog) {
229+
blockLog.endedAt = new Date().toISOString()
230+
blockLog.durationMs = duration
231+
blockLog.success = false
232+
blockLog.error = errorMessage
233+
blockLog.input = input
234+
blockLog.output = this.filterOutputForLog(block, errorOutput)
235+
}
236+
234237
logger.error(
235238
phase === 'input_resolution' ? 'Failed to resolve block inputs' : 'Block execution failed',
236239
{

apps/sim/executor/execution/engine.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import type {
1313
PausePoint,
1414
ResumeStatus,
1515
} from '@/executor/types'
16-
import { normalizeError } from '@/executor/utils/errors'
16+
import { attachExecutionResult, normalizeError } from '@/executor/utils/errors'
1717

1818
const logger = createLogger('ExecutionEngine')
1919

@@ -170,8 +170,8 @@ export class ExecutionEngine {
170170
metadata: this.context.metadata,
171171
}
172172

173-
if (error && typeof error === 'object') {
174-
;(error as any).executionResult = executionResult
173+
if (error instanceof Error) {
174+
attachExecutionResult(error, executionResult)
175175
}
176176
throw error
177177
}

apps/sim/executor/handlers/workflow/workflow-handler.ts

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ import type { TraceSpan } from '@/lib/logs/types'
44
import type { BlockOutput } from '@/blocks/types'
55
import { Executor } from '@/executor'
66
import { BlockType, DEFAULTS, HTTP } from '@/executor/constants'
7+
import { ChildWorkflowError } from '@/executor/errors/child-workflow-error'
78
import type {
89
BlockHandler,
910
ExecutionContext,
1011
ExecutionResult,
1112
StreamingExecution,
1213
} from '@/executor/types'
14+
import { hasExecutionResult } from '@/executor/utils/errors'
1315
import { buildAPIUrl, buildAuthHeaders } from '@/executor/utils/http'
1416
import { parseJSON } from '@/executor/utils/json'
1517
import { lazyCleanupInputMapping } from '@/executor/utils/lazy-cleanup'
@@ -137,39 +139,39 @@ export class WorkflowBlockHandler implements BlockHandler {
137139
)
138140

139141
return mappedResult
140-
} catch (error: any) {
142+
} catch (error: unknown) {
141143
logger.error(`Error executing child workflow ${workflowId}:`, error)
142144

143145
const { workflows } = useWorkflowRegistry.getState()
144146
const workflowMetadata = workflows[workflowId]
145147
const childWorkflowName = workflowMetadata?.name || workflowId
146148

147-
const originalError = error.message || 'Unknown error'
148-
const wrappedError = new Error(
149-
`Error in child workflow "${childWorkflowName}": ${originalError}`
150-
)
149+
const originalError = error instanceof Error ? error.message : 'Unknown error'
150+
let childTraceSpans: WorkflowTraceSpan[] = []
151+
let executionResult: ExecutionResult | undefined
151152

152-
if (error.executionResult?.logs) {
153-
const executionResult = error.executionResult as ExecutionResult
153+
if (hasExecutionResult(error) && error.executionResult.logs) {
154+
executionResult = error.executionResult
154155

155156
logger.info(`Extracting child trace spans from error.executionResult`, {
156157
hasLogs: (executionResult.logs?.length ?? 0) > 0,
157158
logCount: executionResult.logs?.length ?? 0,
158159
})
159160

160-
const childTraceSpans = this.captureChildWorkflowLogs(
161-
executionResult,
162-
childWorkflowName,
163-
ctx
164-
)
161+
childTraceSpans = this.captureChildWorkflowLogs(executionResult, childWorkflowName, ctx)
165162

166163
logger.info(`Captured ${childTraceSpans.length} child trace spans from failed execution`)
167-
;(wrappedError as any).childTraceSpans = childTraceSpans
168-
} else if (error.childTraceSpans && Array.isArray(error.childTraceSpans)) {
169-
;(wrappedError as any).childTraceSpans = error.childTraceSpans
164+
} else if (ChildWorkflowError.isChildWorkflowError(error)) {
165+
childTraceSpans = error.childTraceSpans
170166
}
171167

172-
throw wrappedError
168+
throw new ChildWorkflowError({
169+
message: `Error in child workflow "${childWorkflowName}": ${originalError}`,
170+
childWorkflowName,
171+
childTraceSpans,
172+
executionResult,
173+
cause: error instanceof Error ? error : undefined,
174+
})
173175
}
174176
}
175177

@@ -441,11 +443,11 @@ export class WorkflowBlockHandler implements BlockHandler {
441443

442444
if (!success) {
443445
logger.warn(`Child workflow ${childWorkflowName} failed`)
444-
const error = new Error(
445-
`Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`
446-
)
447-
;(error as any).childTraceSpans = childTraceSpans || []
448-
throw error
446+
throw new ChildWorkflowError({
447+
message: `Error in child workflow "${childWorkflowName}": ${childResult.error || 'Child workflow execution failed'}`,
448+
childWorkflowName,
449+
childTraceSpans: childTraceSpans || [],
450+
})
449451
}
450452

451453
return {

apps/sim/executor/utils/errors.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,39 @@
1-
import type { ExecutionContext } from '@/executor/types'
1+
import type { ExecutionContext, ExecutionResult } from '@/executor/types'
22
import type { SerializedBlock } from '@/serializer/types'
33

4+
/**
5+
* Interface for errors that carry an ExecutionResult.
6+
* Used when workflow execution fails and we want to preserve partial results.
7+
*/
8+
export interface ErrorWithExecutionResult extends Error {
9+
executionResult: ExecutionResult
10+
}
11+
12+
/**
13+
* Type guard to check if an error carries an ExecutionResult.
14+
* Validates that executionResult has required fields (success, output).
15+
*/
16+
export function hasExecutionResult(error: unknown): error is ErrorWithExecutionResult {
17+
if (
18+
!(error instanceof Error) ||
19+
!('executionResult' in error) ||
20+
error.executionResult == null ||
21+
typeof error.executionResult !== 'object'
22+
) {
23+
return false
24+
}
25+
26+
const result = error.executionResult as Record<string, unknown>
27+
return typeof result.success === 'boolean' && result.output != null
28+
}
29+
30+
/**
31+
* Attaches an ExecutionResult to an error for propagation to parent workflows.
32+
*/
33+
export function attachExecutionResult(error: Error, executionResult: ExecutionResult): void {
34+
Object.assign(error, { executionResult })
35+
}
36+
437
export interface BlockExecutionErrorDetails {
538
block: SerializedBlock
639
error: Error | string

0 commit comments

Comments
 (0)