Skip to content

Commit c3513c2

Browse files
committed
hitl gaps
1 parent 79be435 commit c3513c2

File tree

10 files changed

+286
-127
lines changed

10 files changed

+286
-127
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -713,14 +713,25 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
713713
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
714714
executionId,
715715
})
716+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
716717
} else {
717-
await PauseResumeManager.persistPauseResult({
718-
workflowId,
719-
executionId,
720-
pausePoints: result.pausePoints || [],
721-
snapshotSeed: result.snapshotSeed,
722-
executorUserId: result.metadata?.userId,
723-
})
718+
try {
719+
await PauseResumeManager.persistPauseResult({
720+
workflowId,
721+
executionId,
722+
pausePoints: result.pausePoints || [],
723+
snapshotSeed: result.snapshotSeed,
724+
executorUserId: result.metadata?.userId,
725+
})
726+
} catch (pauseError) {
727+
logger.error(`[${requestId}] Failed to persist pause result`, {
728+
executionId,
729+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
730+
})
731+
await loggingSession.markAsFailed(
732+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
733+
)
734+
}
724735
}
725736
} else {
726737
await PauseResumeManager.processQueuedResumes(executionId)

apps/sim/background/schedule-execution.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -285,14 +285,25 @@ async function runWorkflowExecution({
285285
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
286286
executionId,
287287
})
288+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
288289
} else {
289-
await PauseResumeManager.persistPauseResult({
290-
workflowId: payload.workflowId,
291-
executionId,
292-
pausePoints: executionResult.pausePoints || [],
293-
snapshotSeed: executionResult.snapshotSeed,
294-
executorUserId: executionResult.metadata?.userId,
295-
})
290+
try {
291+
await PauseResumeManager.persistPauseResult({
292+
workflowId: payload.workflowId,
293+
executionId,
294+
pausePoints: executionResult.pausePoints || [],
295+
snapshotSeed: executionResult.snapshotSeed,
296+
executorUserId: executionResult.metadata?.userId,
297+
})
298+
} catch (pauseError) {
299+
logger.error(`[${requestId}] Failed to persist pause result`, {
300+
executionId,
301+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
302+
})
303+
await loggingSession.markAsFailed(
304+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
305+
)
306+
}
296307
}
297308
} else {
298309
await PauseResumeManager.processQueuedResumes(executionId)

apps/sim/background/webhook-execution.ts

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -268,14 +268,25 @@ async function executeWebhookJobInternal(
268268
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
269269
executionId,
270270
})
271+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
271272
} else {
272-
await PauseResumeManager.persistPauseResult({
273-
workflowId: payload.workflowId,
274-
executionId,
275-
pausePoints: executionResult.pausePoints || [],
276-
snapshotSeed: executionResult.snapshotSeed,
277-
executorUserId: executionResult.metadata?.userId,
278-
})
273+
try {
274+
await PauseResumeManager.persistPauseResult({
275+
workflowId: payload.workflowId,
276+
executionId,
277+
pausePoints: executionResult.pausePoints || [],
278+
snapshotSeed: executionResult.snapshotSeed,
279+
executorUserId: executionResult.metadata?.userId,
280+
})
281+
} catch (pauseError) {
282+
logger.error(`[${requestId}] Failed to persist pause result`, {
283+
executionId,
284+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
285+
})
286+
await loggingSession.markAsFailed(
287+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
288+
)
289+
}
279290
}
280291
} else {
281292
await PauseResumeManager.processQueuedResumes(executionId)
@@ -509,14 +520,25 @@ async function executeWebhookJobInternal(
509520
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
510521
executionId,
511522
})
523+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
512524
} else {
513-
await PauseResumeManager.persistPauseResult({
514-
workflowId: payload.workflowId,
515-
executionId,
516-
pausePoints: executionResult.pausePoints || [],
517-
snapshotSeed: executionResult.snapshotSeed,
518-
executorUserId: executionResult.metadata?.userId,
519-
})
525+
try {
526+
await PauseResumeManager.persistPauseResult({
527+
workflowId: payload.workflowId,
528+
executionId,
529+
pausePoints: executionResult.pausePoints || [],
530+
snapshotSeed: executionResult.snapshotSeed,
531+
executorUserId: executionResult.metadata?.userId,
532+
})
533+
} catch (pauseError) {
534+
logger.error(`[${requestId}] Failed to persist pause result`, {
535+
executionId,
536+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
537+
})
538+
await loggingSession.markAsFailed(
539+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
540+
)
541+
}
520542
}
521543
} else {
522544
await PauseResumeManager.processQueuedResumes(executionId)

apps/sim/background/workflow-execution.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,25 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
112112
logger.error(`[${requestId}] Missing snapshot seed for paused execution`, {
113113
executionId,
114114
})
115+
await loggingSession.markAsFailed('Missing snapshot seed for paused execution')
115116
} else {
116-
await PauseResumeManager.persistPauseResult({
117-
workflowId,
118-
executionId,
119-
pausePoints: result.pausePoints || [],
120-
snapshotSeed: result.snapshotSeed,
121-
executorUserId: result.metadata?.userId,
122-
})
117+
try {
118+
await PauseResumeManager.persistPauseResult({
119+
workflowId,
120+
executionId,
121+
pausePoints: result.pausePoints || [],
122+
snapshotSeed: result.snapshotSeed,
123+
executorUserId: result.metadata?.userId,
124+
})
125+
} catch (pauseError) {
126+
logger.error(`[${requestId}] Failed to persist pause result`, {
127+
executionId,
128+
error: pauseError instanceof Error ? pauseError.message : String(pauseError),
129+
})
130+
await loggingSession.markAsFailed(
131+
`Failed to persist pause state: ${pauseError instanceof Error ? pauseError.message : String(pauseError)}`
132+
)
133+
}
123134
}
124135
} else {
125136
await PauseResumeManager.processQueuedResumes(executionId)

apps/sim/lib/logs/execution/logger.ts

Lines changed: 20 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -47,34 +47,6 @@ export interface ToolCall {
4747
const logger = createLogger('ExecutionLogger')
4848

4949
export class ExecutionLogger implements IExecutionLoggerService {
50-
private mergeCostModels(
51-
existing: Record<string, any>,
52-
additional: Record<string, any>
53-
): Record<string, any> {
54-
const merged = { ...existing }
55-
for (const [model, costs] of Object.entries(additional)) {
56-
if (merged[model]) {
57-
merged[model] = {
58-
input: (merged[model].input || 0) + (costs.input || 0),
59-
output: (merged[model].output || 0) + (costs.output || 0),
60-
total: (merged[model].total || 0) + (costs.total || 0),
61-
tokens: {
62-
input:
63-
(merged[model].tokens?.input || merged[model].tokens?.prompt || 0) +
64-
(costs.tokens?.input || costs.tokens?.prompt || 0),
65-
output:
66-
(merged[model].tokens?.output || merged[model].tokens?.completion || 0) +
67-
(costs.tokens?.output || costs.tokens?.completion || 0),
68-
total: (merged[model].tokens?.total || 0) + (costs.tokens?.total || 0),
69-
},
70-
}
71-
} else {
72-
merged[model] = costs
73-
}
74-
}
75-
return merged
76-
}
77-
7850
async startWorkflowExecution(params: {
7951
workflowId: string
8052
workspaceId: string
@@ -209,7 +181,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
209181
workflowInput?: any
210182
isResume?: boolean
211183
level?: 'info' | 'error'
212-
status?: 'completed' | 'failed' | 'cancelled'
184+
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
213185
}): Promise<WorkflowExecutionLog> {
214186
const {
215187
executionId,
@@ -268,43 +240,19 @@ export class ExecutionLogger implements IExecutionLoggerService {
268240
const redactedTraceSpans = redactApiKeys(filteredTraceSpans)
269241
const redactedFinalOutput = redactApiKeys(filteredFinalOutput)
270242

271-
// Merge costs if resuming
272-
const existingCost = isResume && existingLog?.cost ? existingLog.cost : null
273-
const mergedCost = existingCost
274-
? {
275-
// For resume, add only the model costs, NOT the base execution charge again
276-
total: (existingCost.total || 0) + costSummary.modelCost,
277-
input: (existingCost.input || 0) + costSummary.totalInputCost,
278-
output: (existingCost.output || 0) + costSummary.totalOutputCost,
279-
tokens: {
280-
input:
281-
(existingCost.tokens?.input || existingCost.tokens?.prompt || 0) +
282-
costSummary.totalPromptTokens,
283-
output:
284-
(existingCost.tokens?.output || existingCost.tokens?.completion || 0) +
285-
costSummary.totalCompletionTokens,
286-
total: (existingCost.tokens?.total || 0) + costSummary.totalTokens,
287-
},
288-
models: this.mergeCostModels(existingCost.models || {}, costSummary.models),
289-
}
290-
: {
291-
total: costSummary.totalCost,
292-
input: costSummary.totalInputCost,
293-
output: costSummary.totalOutputCost,
294-
tokens: {
295-
input: costSummary.totalPromptTokens,
296-
output: costSummary.totalCompletionTokens,
297-
total: costSummary.totalTokens,
298-
},
299-
models: costSummary.models,
300-
}
301-
302-
// Merge files if resuming
303-
const existingFiles = isResume && existingLog?.files ? existingLog.files : []
304-
const mergedFiles = [...existingFiles, ...executionFiles]
243+
const executionCost = {
244+
total: costSummary.totalCost,
245+
input: costSummary.totalInputCost,
246+
output: costSummary.totalOutputCost,
247+
tokens: {
248+
input: costSummary.totalPromptTokens,
249+
output: costSummary.totalCompletionTokens,
250+
total: costSummary.totalTokens,
251+
},
252+
models: costSummary.models,
253+
}
305254

306-
// Calculate the actual total duration for resume executions
307-
const actualTotalDuration =
255+
const totalDuration =
308256
isResume && existingLog?.startedAt
309257
? new Date(endedAt).getTime() - new Date(existingLog.startedAt).getTime()
310258
: totalDurationMs
@@ -315,19 +263,19 @@ export class ExecutionLogger implements IExecutionLoggerService {
315263
level,
316264
status,
317265
endedAt: new Date(endedAt),
318-
totalDurationMs: actualTotalDuration,
319-
files: mergedFiles.length > 0 ? mergedFiles : null,
266+
totalDurationMs: totalDuration,
267+
files: executionFiles.length > 0 ? executionFiles : null,
320268
executionData: {
321269
traceSpans: redactedTraceSpans,
322270
finalOutput: redactedFinalOutput,
323271
tokens: {
324-
input: mergedCost.tokens.input,
325-
output: mergedCost.tokens.output,
326-
total: mergedCost.tokens.total,
272+
input: executionCost.tokens.input,
273+
output: executionCost.tokens.output,
274+
total: executionCost.tokens.total,
327275
},
328-
models: mergedCost.models,
276+
models: executionCost.models,
329277
},
330-
cost: mergedCost,
278+
cost: executionCost,
331279
})
332280
.where(eq(workflowExecutionLogs.executionId, executionId))
333281
.returning()

0 commit comments

Comments
 (0)