From cdd0f75cd5d8c7ca85bb6c6351957124640b6712 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com> Date: Tue, 17 Mar 2026 16:19:47 -0700 Subject: [PATCH 01/11] fix(mothership): fix mothership file uploads (#3640) * Fix files * Fix * Fix --- .../[workspaceId]/home/hooks/use-chat.ts | 2 +- .../contexts/workspace/workspace-file-manager.ts | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 299c8f0f852..74312caab0b 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -132,7 +132,7 @@ function toDisplayAttachment(f: TaskStoredFileAttachment): ChatMessageAttachment media_type: f.media_type, size: f.size, previewUrl: f.media_type.startsWith('image/') - ? `/api/files/serve/${encodeURIComponent(f.key)}?context=copilot` + ? `/api/files/serve/${encodeURIComponent(f.key)}?context=mothership` : undefined, } } diff --git a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts index 9c3dd16ec03..4d18638d3ff 100644 --- a/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts +++ b/apps/sim/lib/uploads/contexts/workspace/workspace-file-manager.ts @@ -209,8 +209,9 @@ export async function uploadWorkspaceFile( /** * Track a file that was already uploaded to workspace S3 as a chat-scoped upload. - * Creates a workspaceFiles record with context='mothership' and the given chatId. - * No S3 operations -- the file is already in storage from the presigned/upload step. + * Links the existing workspaceFiles metadata record (created by the storage service + * during upload) to the chat by setting chatId and context='mothership'. + * Falls back to inserting a new record if none exists for the key. */ export async function trackChatUpload( workspaceId: string, @@ -221,6 +222,17 @@ export async function trackChatUpload( contentType: string, size: number ): Promise { + const updated = await db + .update(workspaceFiles) + .set({ chatId, context: 'mothership' }) + .where(and(eq(workspaceFiles.key, s3Key), eq(workspaceFiles.workspaceId, workspaceId), isNull(workspaceFiles.deletedAt))) + .returning({ id: workspaceFiles.id }) + + if (updated.length > 0) { + logger.info(`Linked existing file record to chat: ${fileName} for chat ${chatId}`) + return + } + const fileId = `wf_${Date.now()}_${Math.random().toString(36).substring(2, 9)}` await db.insert(workspaceFiles).values({ From 75a3e2c3a813390aa3e6086723a78169d0336336 Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 17 Mar 2026 16:20:06 -0700 Subject: [PATCH 02/11] fix(workspace): prevent stale placeholder data from corrupting workflow registry on switch --- apps/sim/hooks/queries/workflows.ts | 22 +++++++++++++++++---- apps/sim/hooks/queries/workspace.ts | 11 +++-------- apps/sim/lib/copilot/client-sse/handlers.ts | 4 +++- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/apps/sim/hooks/queries/workflows.ts b/apps/sim/hooks/queries/workflows.ts index 97d3df0b2b5..cc9e7b0b696 100644 --- a/apps/sim/hooks/queries/workflows.ts +++ b/apps/sim/hooks/queries/workflows.ts @@ -113,10 +113,15 @@ export function useWorkflows( }) useEffect(() => { - if (syncRegistry && scope === 'active' && workspaceId && query.status === 'pending') { + if ( + syncRegistry && + scope === 'active' && + workspaceId && + (query.status === 'pending' || query.isPlaceholderData) + ) { beginMetadataLoad(workspaceId) } - }, [syncRegistry, scope, workspaceId, query.status, beginMetadataLoad]) + }, [syncRegistry, scope, workspaceId, query.status, query.isPlaceholderData, beginMetadataLoad]) useEffect(() => { if ( @@ -124,11 +129,20 @@ export function useWorkflows( scope === 'active' && workspaceId && query.status === 'success' && - query.data + query.data && + !query.isPlaceholderData ) { completeMetadataLoad(workspaceId, query.data) } - }, [syncRegistry, scope, workspaceId, query.status, query.data, completeMetadataLoad]) + }, [ + syncRegistry, + scope, + workspaceId, + query.status, + query.data, + query.isPlaceholderData, + completeMetadataLoad, + ]) useEffect(() => { if (syncRegistry && scope === 'active' && workspaceId && query.status === 'error') { diff --git a/apps/sim/hooks/queries/workspace.ts b/apps/sim/hooks/queries/workspace.ts index a3eb1bf5e6d..b4722aca3af 100644 --- a/apps/sim/hooks/queries/workspace.ts +++ b/apps/sim/hooks/queries/workspace.ts @@ -86,14 +86,9 @@ export function useCreateWorkspace() { const data = await response.json() return data.workspace as Workspace }, - onSuccess: (data) => { - queryClient.invalidateQueries({ queryKey: workspaceKeys.all }) - if (data?.id) { - queryClient.removeQueries({ queryKey: workspaceKeys.detail(data.id) }) - queryClient.removeQueries({ queryKey: workspaceKeys.settings(data.id) }) - queryClient.removeQueries({ queryKey: workspaceKeys.permissions(data.id) }) - queryClient.removeQueries({ queryKey: workspaceKeys.members(data.id) }) - } + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: workspaceKeys.lists() }) + queryClient.invalidateQueries({ queryKey: workspaceKeys.adminLists() }) }, }) } diff --git a/apps/sim/lib/copilot/client-sse/handlers.ts b/apps/sim/lib/copilot/client-sse/handlers.ts index 0330853d69c..7705e1cf3dd 100644 --- a/apps/sim/lib/copilot/client-sse/handlers.ts +++ b/apps/sim/lib/copilot/client-sse/handlers.ts @@ -3,6 +3,7 @@ import { STREAM_STORAGE_KEY } from '@/lib/copilot/constants' import { asRecord } from '@/lib/copilot/orchestrator/sse/utils' import type { SSEEvent } from '@/lib/copilot/orchestrator/types' import { + abortAllInProgressTools, isBackgroundState, isRejectedState, isReviewState, @@ -956,7 +957,7 @@ export const sseHandlers: Record = { })) context.streamComplete = true }, - stream_end: (_data, context, _get, set) => { + stream_end: (_data, context, get, set) => { if (context.pendingContent) { if (context.isInThinkingBlock && context.currentThinkingBlock) { appendThinkingContent(context, context.pendingContent) @@ -967,6 +968,7 @@ export const sseHandlers: Record = { } finalizeThinkingBlock(context) updateStreamingMessage(set, context) + abortAllInProgressTools(set, get) }, default: () => {}, } From c9f082da1a9802840c2249ddfe5c78ca6d4d5441 Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 17 Mar 2026 17:12:34 -0700 Subject: [PATCH 03/11] feat(csp): allow chat UI to be embedded in iframes (#3643) * feat(csp): allow chat UI to be embedded in iframes Mirror the existing form embed CSP pattern for chat pages: add getChatEmbedCSPPolicy() with frame-ancestors *, configure /chat/:path* headers in next.config.ts without X-Frame-Options, and early-return in proxy.ts so chat routes skip the strict runtime CSP. Co-Authored-By: Claude Opus 4.6 * refactor(csp): extract shared getEmbedCSPPolicy helper Deduplicate getChatEmbedCSPPolicy and getFormEmbedCSPPolicy into a shared private helper to prevent future divergence. Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- apps/sim/lib/core/security/csp.ts | 21 +++++++++++++++++---- apps/sim/next.config.ts | 25 ++++++++++++++++++++++--- apps/sim/proxy.ts | 7 ++----- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/apps/sim/lib/core/security/csp.ts b/apps/sim/lib/core/security/csp.ts index b1148998dd4..c5dde9ef41d 100644 --- a/apps/sim/lib/core/security/csp.ts +++ b/apps/sim/lib/core/security/csp.ts @@ -202,15 +202,28 @@ export function getWorkflowExecutionCSPPolicy(): string { } /** - * CSP for embeddable form pages + * Shared CSP for embeddable pages (chat, forms) * Allows embedding in iframes from any origin while maintaining other security policies */ -export function getFormEmbedCSPPolicy(): string { - const basePolicy = buildCSPString({ +function getEmbedCSPPolicy(): string { + return buildCSPString({ ...buildTimeCSPDirectives, 'frame-ancestors': ['*'], }) - return basePolicy +} + +/** + * CSP for embeddable chat pages + */ +export function getChatEmbedCSPPolicy(): string { + return getEmbedCSPPolicy() +} + +/** + * CSP for embeddable form pages + */ +export function getFormEmbedCSPPolicy(): string { + return getEmbedCSPPolicy() } /** diff --git a/apps/sim/next.config.ts b/apps/sim/next.config.ts index e5975cc6e9a..1b23b9ec93d 100644 --- a/apps/sim/next.config.ts +++ b/apps/sim/next.config.ts @@ -2,6 +2,7 @@ import type { NextConfig } from 'next' import { env, getEnv, isTruthy } from './lib/core/config/env' import { isDev } from './lib/core/config/feature-flags' import { + getChatEmbedCSPPolicy, getFormEmbedCSPPolicy, getMainCSPPolicy, getWorkflowExecutionCSPPolicy, @@ -255,6 +256,24 @@ const nextConfig: NextConfig = { }, ], }, + // Chat pages - allow iframe embedding from any origin + { + source: '/chat/:path*', + headers: [ + { + key: 'X-Content-Type-Options', + value: 'nosniff', + }, + // No X-Frame-Options to allow iframe embedding + { + key: 'Content-Security-Policy', + value: getChatEmbedCSPPolicy(), + }, + // Permissive CORS for chat requests from embedded chats + { key: 'Cross-Origin-Embedder-Policy', value: 'unsafe-none' }, + { key: 'Cross-Origin-Opener-Policy', value: 'unsafe-none' }, + ], + }, // Form pages - allow iframe embedding from any origin { source: '/form/:path*', @@ -284,10 +303,10 @@ const nextConfig: NextConfig = { ], }, // Apply security headers to routes not handled by middleware runtime CSP - // Middleware handles: /, /workspace/*, /chat/* - // Exclude form routes which have their own permissive headers + // Middleware handles: /, /workspace/* + // Exclude chat and form routes which have their own permissive embed headers { - source: '/((?!workspace|chat$|form).*)', + source: '/((?!workspace|chat|form).*)', headers: [ { key: 'X-Content-Type-Options', diff --git a/apps/sim/proxy.ts b/apps/sim/proxy.ts index 36ada3484f1..acd93ebfa12 100644 --- a/apps/sim/proxy.ts +++ b/apps/sim/proxy.ts @@ -155,6 +155,7 @@ export async function proxy(request: NextRequest) { return response } + // Chat pages are publicly accessible embeds — CSP is set in next.config.ts headers if (url.pathname.startsWith('/chat/')) { return NextResponse.next() } @@ -188,11 +189,7 @@ export async function proxy(request: NextRequest) { const response = NextResponse.next() response.headers.set('Vary', 'User-Agent') - if ( - url.pathname.startsWith('/workspace') || - url.pathname.startsWith('/chat') || - url.pathname === '/' - ) { + if (url.pathname.startsWith('/workspace') || url.pathname === '/') { response.headers.set('Content-Security-Policy', generateRuntimeCSP()) } From 67478bbc80382b636f4221d2f5dd754827465105 Mon Sep 17 00:00:00 2001 From: PlaneInABottle Date: Wed, 18 Mar 2026 03:24:40 +0300 Subject: [PATCH 04/11] fix(logs): add durable execution diagnostics foundation (#3564) * fix(logs): persist execution diagnostics markers Store last-started and last-completed block markers with finalization metadata so later read surfaces can explain how a run ended without reconstructing executor state. * fix(executor): preserve durable diagnostics ordering Await only the persistence needed to keep diagnostics durable before terminal completion while keeping callback failures from changing execution behavior. * fix(logs): preserve fallback diagnostics semantics Keep successful fallback output and accumulated cost intact while tightening progress-write draining and deduplicating trace span counting for diagnostics helpers. * fix(api): restore async execute route test mock Add the missing AuthType export to the hybrid auth mock so the async execution route test exercises the 202 queueing path instead of crashing with a 500 in CI. * fix(executor): align async block error handling * fix(logs): tighten marker ordering scope Allow same-millisecond marker writes to replace prior markers and drop the unused diagnostics read helper so this PR stays focused on persistence rather than unread foundation code. * fix(logs): remove unused finalization type guard Drop the unused helper so this PR only ships the persistence-side status types it actually uses. * fix(executor): await subflow diagnostics callbacks Ensure empty-subflow and subflow-error lifecycle callbacks participate in progress-write draining before terminal finalization while still swallowing callback failures. --------- Co-authored-by: test Co-authored-by: Vikhyath Mondreti --- apps/sim/executor/execution/block-executor.ts | 86 +++--- apps/sim/executor/orchestrators/loop.ts | 65 +++-- apps/sim/executor/orchestrators/node.ts | 24 +- .../executor/orchestrators/parallel.test.ts | 142 +++++++++ apps/sim/executor/orchestrators/parallel.ts | 54 ++-- apps/sim/executor/utils/subflow-utils.ts | 89 ++++-- apps/sim/lib/logs/execution/logger.test.ts | 25 +- apps/sim/lib/logs/execution/logger.ts | 57 +++- .../logs/execution/logging-session.test.ts | 271 +++++++++++++++++- .../sim/lib/logs/execution/logging-session.ts | 265 +++++++++++++++-- apps/sim/lib/logs/types.ts | 35 +++ .../workflows/executor/execution-core.test.ts | 192 ++++++++++++- .../lib/workflows/executor/execution-core.ts | 119 ++++++-- 13 files changed, 1217 insertions(+), 207 deletions(-) create mode 100644 apps/sim/executor/orchestrators/parallel.test.ts diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 9c54d2bd993..5680ee4e3cf 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -77,7 +77,7 @@ export class BlockExecutor { if (!isSentinel) { blockLog = this.createBlockLog(ctx, node.id, block, node) ctx.blockLogs.push(blockLog) - this.callOnBlockStart(ctx, node, block, blockLog.executionOrder) + await this.callOnBlockStart(ctx, node, block, blockLog.executionOrder) } const startTime = performance.now() @@ -105,7 +105,7 @@ export class BlockExecutor { } } catch (error) { cleanupSelfReference?.() - return this.handleBlockError( + return await this.handleBlockError( error, ctx, node, @@ -179,7 +179,7 @@ export class BlockExecutor { const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, { block, }) - this.callOnBlockComplete( + await this.callOnBlockComplete( ctx, node, block, @@ -195,7 +195,7 @@ export class BlockExecutor { return normalizedOutput } catch (error) { - return this.handleBlockError( + return await this.handleBlockError( error, ctx, node, @@ -226,7 +226,7 @@ export class BlockExecutor { return this.blockHandlers.find((h) => h.canHandle(block)) } - private handleBlockError( + private async handleBlockError( error: unknown, ctx: ExecutionContext, node: DAGNode, @@ -236,7 +236,7 @@ export class BlockExecutor { resolvedInputs: Record, isSentinel: boolean, phase: 'input_resolution' | 'execution' - ): NormalizedBlockOutput { + ): Promise { const duration = performance.now() - startTime const errorMessage = normalizeError(error) const hasResolvedInputs = @@ -287,7 +287,7 @@ export class BlockExecutor { ? error.childWorkflowInstanceId : undefined const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block }) - this.callOnBlockComplete( + await this.callOnBlockComplete( ctx, node, block, @@ -439,12 +439,12 @@ export class BlockExecutor { return redactApiKeys(result) } - private callOnBlockStart( + private async callOnBlockStart( ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, executionOrder: number - ): void { + ): Promise { const blockId = node.metadata?.originalBlockId ?? node.id const blockName = block.metadata?.name ?? blockId const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE @@ -452,18 +452,26 @@ export class BlockExecutor { const iterationContext = getIterationContext(ctx, node?.metadata) if (this.contextExtensions.onBlockStart) { - this.contextExtensions.onBlockStart( - blockId, - blockName, - blockType, - executionOrder, - iterationContext, - ctx.childWorkflowContext - ) + try { + await this.contextExtensions.onBlockStart( + blockId, + blockName, + blockType, + executionOrder, + iterationContext, + ctx.childWorkflowContext + ) + } catch (error) { + logger.warn('Block start callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } - private callOnBlockComplete( + private async callOnBlockComplete( ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, @@ -474,7 +482,7 @@ export class BlockExecutor { executionOrder: number, endedAt: string, childWorkflowInstanceId?: string - ): void { + ): Promise { const blockId = node.metadata?.originalBlockId ?? node.id const blockName = block.metadata?.name ?? blockId const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE @@ -482,22 +490,30 @@ export class BlockExecutor { const iterationContext = getIterationContext(ctx, node?.metadata) if (this.contextExtensions.onBlockComplete) { - this.contextExtensions.onBlockComplete( - blockId, - blockName, - blockType, - { - input, - output, - executionTime: duration, - startedAt, - executionOrder, - endedAt, - childWorkflowInstanceId, - }, - iterationContext, - ctx.childWorkflowContext - ) + try { + await this.contextExtensions.onBlockComplete( + blockId, + blockName, + blockType, + { + input, + output, + executionTime: duration, + startedAt, + executionOrder, + endedAt, + childWorkflowInstanceId, + }, + iterationContext, + ctx.childWorkflowContext + ) + } catch (error) { + logger.warn('Block completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index 039203c068a..0ea42a137e8 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -51,7 +51,7 @@ export class LoopOrchestrator { private edgeManager: EdgeManager | null = null ) {} - initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope { + async initializeLoopScope(ctx: ExecutionContext, loopId: string): Promise { const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined if (!loopConfig) { throw new Error(`Loop config not found: ${loopId}`) @@ -76,7 +76,7 @@ export class LoopOrchestrator { ) if (iterationError) { logger.error(iterationError, { loopId, requestedIterations }) - this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { + await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { iterations: requestedIterations, }) scope.maxIterations = 0 @@ -99,7 +99,7 @@ export class LoopOrchestrator { } catch (error) { const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}` logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems }) - this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, { + await this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, { forEachItems: loopConfig.forEachItems, }) scope.items = [] @@ -117,7 +117,7 @@ export class LoopOrchestrator { ) if (sizeError) { logger.error(sizeError, { loopId, collectionSize: items.length }) - this.addLoopErrorLog(ctx, loopId, loopType, sizeError, { + await this.addLoopErrorLog(ctx, loopId, loopType, sizeError, { forEachItems: loopConfig.forEachItems, collectionSize: items.length, }) @@ -155,7 +155,7 @@ export class LoopOrchestrator { ) if (iterationError) { logger.error(iterationError, { loopId, requestedIterations }) - this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { + await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, { iterations: requestedIterations, }) scope.maxIterations = 0 @@ -182,14 +182,14 @@ export class LoopOrchestrator { return scope } - private addLoopErrorLog( + private async addLoopErrorLog( ctx: ExecutionContext, loopId: string, loopType: string, errorMessage: string, inputData?: any - ): void { - addSubflowErrorLog( + ): Promise { + await addSubflowErrorLog( ctx, loopId, 'loop', @@ -238,7 +238,7 @@ export class LoopOrchestrator { } if (isCancelled) { logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration }) - return this.createExitResult(ctx, loopId, scope) + return await this.createExitResult(ctx, loopId, scope) } const iterationResults: NormalizedBlockOutput[] = [] @@ -253,7 +253,7 @@ export class LoopOrchestrator { scope.currentIterationOutputs.clear() if (!(await this.evaluateCondition(ctx, scope, scope.iteration + 1))) { - return this.createExitResult(ctx, loopId, scope) + return await this.createExitResult(ctx, loopId, scope) } scope.iteration++ @@ -269,11 +269,11 @@ export class LoopOrchestrator { } } - private createExitResult( + private async createExitResult( ctx: ExecutionContext, loopId: string, scope: LoopScope - ): LoopContinuationResult { + ): Promise { const results = scope.allIterationOutputs const output = { results } this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME) @@ -282,19 +282,26 @@ export class LoopOrchestrator { const now = new Date().toISOString() const iterationContext = buildContainerIterationContext(ctx, loopId) - this.contextExtensions.onBlockComplete( - loopId, - 'Loop', - 'loop', - { - output, - executionTime: DEFAULTS.EXECUTION_TIME, - startedAt: now, - executionOrder: getNextExecutionOrder(ctx), - endedAt: now, - }, - iterationContext - ) + try { + await this.contextExtensions.onBlockComplete( + loopId, + 'Loop', + 'loop', + { + output, + executionTime: DEFAULTS.EXECUTION_TIME, + startedAt: now, + executionOrder: getNextExecutionOrder(ctx), + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Loop completion callback failed', { + loopId, + error: error instanceof Error ? error.message : String(error), + }) + } } return { @@ -597,7 +604,7 @@ export class LoopOrchestrator { if (!scope.items || scope.items.length === 0) { logger.info('ForEach loop has empty collection, skipping loop body', { loopId }) this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) return false } return true @@ -607,7 +614,7 @@ export class LoopOrchestrator { if (scope.maxIterations === 0) { logger.info('For loop has 0 iterations, skipping loop body', { loopId }) this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) return false } return true @@ -621,7 +628,7 @@ export class LoopOrchestrator { if (!scope.condition) { logger.warn('No condition defined for while loop', { loopId }) this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) return false } @@ -634,7 +641,7 @@ export class LoopOrchestrator { if (!result) { this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME) - emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) + await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions) } return result diff --git a/apps/sim/executor/orchestrators/node.ts b/apps/sim/executor/orchestrators/node.ts index 862f7c1a2e9..7ae57a555c9 100644 --- a/apps/sim/executor/orchestrators/node.ts +++ b/apps/sim/executor/orchestrators/node.ts @@ -53,14 +53,14 @@ export class NodeExecutionOrchestrator { const loopId = node.metadata.loopId if (loopId && !this.loopOrchestrator.getLoopScope(ctx, loopId)) { - this.loopOrchestrator.initializeLoopScope(ctx, loopId) + await this.loopOrchestrator.initializeLoopScope(ctx, loopId) } const parallelId = node.metadata.parallelId if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) { const parallelConfig = this.dag.parallelConfigs.get(parallelId) const nodesInParallel = parallelConfig?.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } if (node.metadata.isSentinel) { @@ -92,7 +92,7 @@ export class NodeExecutionOrchestrator { const isParallelSentinel = node.metadata.isParallelSentinel if (isParallelSentinel) { - return this.handleParallelSentinel(ctx, node, sentinelType, parallelId) + return await this.handleParallelSentinel(ctx, node, sentinelType, parallelId) } switch (sentinelType) { @@ -142,12 +142,12 @@ export class NodeExecutionOrchestrator { } } - private handleParallelSentinel( + private async handleParallelSentinel( ctx: ExecutionContext, node: DAGNode, sentinelType: string | undefined, parallelId: string | undefined - ): NormalizedBlockOutput { + ): Promise { if (!parallelId) { logger.warn('Parallel sentinel called without parallelId') return {} @@ -158,7 +158,7 @@ export class NodeExecutionOrchestrator { const parallelConfig = this.dag.parallelConfigs.get(parallelId) if (parallelConfig) { const nodesInParallel = parallelConfig.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } } @@ -176,7 +176,7 @@ export class NodeExecutionOrchestrator { } if (sentinelType === 'end') { - const result = this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) + const result = await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) return { results: result.results || [], sentinelEnd: true, @@ -210,7 +210,7 @@ export class NodeExecutionOrchestrator { } else if (isParallelBranch) { const parallelId = this.findParallelIdForNode(node.id) if (parallelId) { - this.handleParallelNodeCompletion(ctx, node, output, parallelId) + await this.handleParallelNodeCompletion(ctx, node, output, parallelId) } else { this.handleRegularNodeCompletion(ctx, node, output) } @@ -229,17 +229,17 @@ export class NodeExecutionOrchestrator { this.state.setBlockOutput(node.id, output) } - private handleParallelNodeCompletion( + private async handleParallelNodeCompletion( ctx: ExecutionContext, node: DAGNode, output: NormalizedBlockOutput, parallelId: string - ): void { + ): Promise { const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId) if (!scope) { const parallelConfig = this.dag.parallelConfigs.get(parallelId) const nodesInParallel = parallelConfig?.nodes?.length || 1 - this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) + await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel) } const allComplete = this.parallelOrchestrator.handleParallelBranchCompletion( ctx, @@ -248,7 +248,7 @@ export class NodeExecutionOrchestrator { output ) if (allComplete) { - this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) + await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId) } this.state.setBlockOutput(node.id, output) diff --git a/apps/sim/executor/orchestrators/parallel.test.ts b/apps/sim/executor/orchestrators/parallel.test.ts new file mode 100644 index 00000000000..34b2c011f9e --- /dev/null +++ b/apps/sim/executor/orchestrators/parallel.test.ts @@ -0,0 +1,142 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import type { DAG } from '@/executor/dag/builder' +import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types' +import { ParallelOrchestrator } from '@/executor/orchestrators/parallel' +import type { ExecutionContext } from '@/executor/types' + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }), +})) + +function createDag(): DAG { + return { + nodes: new Map(), + loopConfigs: new Map(), + parallelConfigs: new Map([ + [ + 'parallel-1', + { + id: 'parallel-1', + nodes: ['task-1'], + distribution: [], + parallelType: 'collection', + }, + ], + ]), + } +} + +function createState(): BlockStateWriter { + return { + setBlockOutput: vi.fn(), + setBlockState: vi.fn(), + deleteBlockState: vi.fn(), + unmarkExecuted: vi.fn(), + } +} + +function createContext(overrides: Partial = {}): ExecutionContext { + return { + workflowId: 'workflow-1', + workspaceId: 'workspace-1', + executionId: 'execution-1', + userId: 'user-1', + blockStates: new Map(), + executedBlocks: new Set(), + blockLogs: [], + metadata: { duration: 0 }, + environmentVariables: {}, + decisions: { + router: new Map(), + condition: new Map(), + }, + completedLoops: new Set(), + activeExecutionPath: new Set(), + workflow: { + version: '1', + blocks: [ + { + id: 'parallel-1', + position: { x: 0, y: 0 }, + config: { tool: '', params: {} }, + inputs: {}, + outputs: {}, + metadata: { id: 'parallel', name: 'Parallel 1' }, + enabled: true, + }, + ], + connections: [], + loops: {}, + parallels: {}, + }, + ...overrides, + } +} + +describe('ParallelOrchestrator', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('awaits empty-subflow lifecycle callbacks before returning the empty scope', async () => { + let releaseStart: (() => void) | undefined + const onBlockStart = vi.fn( + () => + new Promise((resolve) => { + releaseStart = resolve + }) + ) + const onBlockComplete = vi.fn() + const contextExtensions: ContextExtensions = { + onBlockStart, + onBlockComplete, + } + const orchestrator = new ParallelOrchestrator( + createDag(), + createState(), + null, + contextExtensions + ) + const ctx = createContext() + + const initializePromise = orchestrator.initializeParallelScope(ctx, 'parallel-1', 1) + await Promise.resolve() + + expect(onBlockStart).toHaveBeenCalledTimes(1) + expect(onBlockComplete).not.toHaveBeenCalled() + + releaseStart?.() + const scope = await initializePromise + + expect(onBlockComplete).toHaveBeenCalledTimes(1) + expect(scope.isEmpty).toBe(true) + }) + + it('swallows helper callback failures on empty parallel paths', async () => { + const contextExtensions: ContextExtensions = { + onBlockStart: vi.fn().mockRejectedValue(new Error('start failed')), + onBlockComplete: vi.fn().mockRejectedValue(new Error('complete failed')), + } + const orchestrator = new ParallelOrchestrator( + createDag(), + createState(), + null, + contextExtensions + ) + + await expect( + orchestrator.initializeParallelScope(createContext(), 'parallel-1', 1) + ).resolves.toMatchObject({ + parallelId: 'parallel-1', + isEmpty: true, + }) + }) +}) diff --git a/apps/sim/executor/orchestrators/parallel.ts b/apps/sim/executor/orchestrators/parallel.ts index ed098066bc0..23f7dede964 100644 --- a/apps/sim/executor/orchestrators/parallel.ts +++ b/apps/sim/executor/orchestrators/parallel.ts @@ -47,11 +47,11 @@ export class ParallelOrchestrator { private contextExtensions: ContextExtensions | null = null ) {} - initializeParallelScope( + async initializeParallelScope( ctx: ExecutionContext, parallelId: string, terminalNodesCount = 1 - ): ParallelScope { + ): Promise { const parallelConfig = this.dag.parallelConfigs.get(parallelId) if (!parallelConfig) { throw new Error(`Parallel config not found: ${parallelId}`) @@ -69,7 +69,7 @@ export class ParallelOrchestrator { } catch (error) { const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}` logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution }) - this.addParallelErrorLog(ctx, parallelId, errorMessage, { + await this.addParallelErrorLog(ctx, parallelId, errorMessage, { distribution: parallelConfig.distribution, }) this.setErrorScope(ctx, parallelId, errorMessage) @@ -83,7 +83,7 @@ export class ParallelOrchestrator { ) if (branchError) { logger.error(branchError, { parallelId, branchCount }) - this.addParallelErrorLog(ctx, parallelId, branchError, { + await this.addParallelErrorLog(ctx, parallelId, branchError, { distribution: parallelConfig.distribution, branchCount, }) @@ -109,7 +109,7 @@ export class ParallelOrchestrator { this.state.setBlockOutput(parallelId, { results: [] }) - emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions) + await emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions) logger.info('Parallel scope initialized with empty distribution, skipping body', { parallelId, @@ -220,13 +220,13 @@ export class ParallelOrchestrator { return { branchCount: items.length, items } } - private addParallelErrorLog( + private async addParallelErrorLog( ctx: ExecutionContext, parallelId: string, errorMessage: string, inputData?: any - ): void { - addSubflowErrorLog( + ): Promise { + await addSubflowErrorLog( ctx, parallelId, 'parallel', @@ -291,7 +291,10 @@ export class ParallelOrchestrator { return allComplete } - aggregateParallelResults(ctx: ExecutionContext, parallelId: string): ParallelAggregationResult { + async aggregateParallelResults( + ctx: ExecutionContext, + parallelId: string + ): Promise { const scope = ctx.parallelExecutions?.get(parallelId) if (!scope) { logger.error('Parallel scope not found for aggregation', { parallelId }) @@ -316,19 +319,26 @@ export class ParallelOrchestrator { const now = new Date().toISOString() const iterationContext = buildContainerIterationContext(ctx, parallelId) - this.contextExtensions.onBlockComplete( - parallelId, - 'Parallel', - 'parallel', - { - output, - executionTime: 0, - startedAt: now, - executionOrder: getNextExecutionOrder(ctx), - endedAt: now, - }, - iterationContext - ) + try { + await this.contextExtensions.onBlockComplete( + parallelId, + 'Parallel', + 'parallel', + { + output, + executionTime: 0, + startedAt: now, + executionOrder: getNextExecutionOrder(ctx), + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Parallel completion callback failed', { + parallelId, + error: error instanceof Error ? error.message : String(error), + }) + } } return { diff --git a/apps/sim/executor/utils/subflow-utils.ts b/apps/sim/executor/utils/subflow-utils.ts index 977be2788c9..98391442a78 100644 --- a/apps/sim/executor/utils/subflow-utils.ts +++ b/apps/sim/executor/utils/subflow-utils.ts @@ -1,9 +1,12 @@ +import { createLogger } from '@sim/logger' import { DEFAULTS, LOOP, PARALLEL, REFERENCE } from '@/executor/constants' import type { ContextExtensions } from '@/executor/execution/types' import { type BlockLog, type ExecutionContext, getNextExecutionOrder } from '@/executor/types' import { buildContainerIterationContext } from '@/executor/utils/iteration-context' import type { VariableResolver } from '@/executor/variables/resolver' +const logger = createLogger('SubflowUtils') + const BRANCH_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}\\d+${PARALLEL.BRANCH.SUFFIX}$`) const BRANCH_INDEX_PATTERN = new RegExp(`${PARALLEL.BRANCH.PREFIX}(\\d+)${PARALLEL.BRANCH.SUFFIX}$`) const LOOP_SENTINEL_START_PATTERN = new RegExp( @@ -265,14 +268,14 @@ export function resolveArrayInput( /** * Creates and logs an error for a subflow (loop or parallel). */ -export function addSubflowErrorLog( +export async function addSubflowErrorLog( ctx: ExecutionContext, blockId: string, blockType: 'loop' | 'parallel', errorMessage: string, inputData: Record, contextExtensions: ContextExtensions | null -): void { +): Promise { const now = new Date().toISOString() const execOrder = getNextExecutionOrder(ctx) @@ -296,18 +299,34 @@ export function addSubflowErrorLog( ctx.blockLogs.push(blockLog) if (contextExtensions?.onBlockStart) { - contextExtensions.onBlockStart(blockId, blockName, blockType, execOrder) + try { + await contextExtensions.onBlockStart(blockId, blockName, blockType, execOrder) + } catch (error) { + logger.warn('Subflow error start callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } if (contextExtensions?.onBlockComplete) { - contextExtensions.onBlockComplete(blockId, blockName, blockType, { - input: inputData, - output: { error: errorMessage }, - executionTime: 0, - startedAt: now, - executionOrder: execOrder, - endedAt: now, - }) + try { + await contextExtensions.onBlockComplete(blockId, blockName, blockType, { + input: inputData, + output: { error: errorMessage }, + executionTime: 0, + startedAt: now, + executionOrder: execOrder, + endedAt: now, + }) + } catch (error) { + logger.warn('Subflow error completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } @@ -316,12 +335,12 @@ export function addSubflowErrorLog( * empty collection or false initial condition. This ensures the container block * appears in terminal logs, execution snapshots, and edge highlighting. */ -export function emitEmptySubflowEvents( +export async function emitEmptySubflowEvents( ctx: ExecutionContext, blockId: string, blockType: 'loop' | 'parallel', contextExtensions: ContextExtensions | null -): void { +): Promise { const now = new Date().toISOString() const executionOrder = getNextExecutionOrder(ctx) const output = { results: [] } @@ -342,22 +361,38 @@ export function emitEmptySubflowEvents( }) if (contextExtensions?.onBlockStart) { - contextExtensions.onBlockStart(blockId, blockName, blockType, executionOrder) + try { + await contextExtensions.onBlockStart(blockId, blockName, blockType, executionOrder) + } catch (error) { + logger.warn('Empty subflow start callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } if (contextExtensions?.onBlockComplete) { - contextExtensions.onBlockComplete( - blockId, - blockName, - blockType, - { - output, - executionTime: DEFAULTS.EXECUTION_TIME, - startedAt: now, - executionOrder, - endedAt: now, - }, - iterationContext - ) + try { + await contextExtensions.onBlockComplete( + blockId, + blockName, + blockType, + { + output, + executionTime: DEFAULTS.EXECUTION_TIME, + startedAt: now, + executionOrder, + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Empty subflow completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } } } diff --git a/apps/sim/lib/logs/execution/logger.test.ts b/apps/sim/lib/logs/execution/logger.test.ts index a1bd9962d8d..7cfdb6ea9e6 100644 --- a/apps/sim/lib/logs/execution/logger.test.ts +++ b/apps/sim/lib/logs/execution/logger.test.ts @@ -1,6 +1,6 @@ import { databaseMock, loggerMock } from '@sim/testing' import { beforeEach, describe, expect, test, vi } from 'vitest' -import { ExecutionLogger } from './logger' +import { ExecutionLogger } from '@/lib/logs/execution/logger' vi.mock('@sim/db', () => databaseMock) @@ -112,7 +112,7 @@ describe('ExecutionLogger', () => { expect(typeof logger.getWorkflowExecution).toBe('function') }) - test('preserves start correlation data when execution completes', () => { + test('preserves correlation and diagnostics when execution completes', () => { const loggerInstance = new ExecutionLogger() as any const completedData = loggerInstance.buildCompletedExecutionData({ @@ -140,9 +140,24 @@ describe('ExecutionLogger', () => { }, }, }, + lastStartedBlock: { + blockId: 'block-start', + blockName: 'Start', + blockType: 'agent', + startedAt: '2025-01-01T00:00:00.000Z', + }, + lastCompletedBlock: { + blockId: 'block-end', + blockName: 'Finish', + blockType: 'api', + endedAt: '2025-01-01T00:00:05.000Z', + success: true, + }, }, traceSpans: [], finalOutput: { ok: true }, + finalizationPath: 'completed', + completionFailure: 'fallback failure', executionCost: { tokens: { input: 0, output: 0, total: 0 }, models: {}, @@ -161,6 +176,12 @@ describe('ExecutionLogger', () => { }) expect(completedData.correlation).toEqual(completedData.trigger?.data?.correlation) expect(completedData.finalOutput).toEqual({ ok: true }) + expect(completedData.lastStartedBlock?.blockId).toBe('block-start') + expect(completedData.lastCompletedBlock?.blockId).toBe('block-end') + expect(completedData.finalizationPath).toBe('completed') + expect(completedData.completionFailure).toBe('fallback failure') + expect(completedData.hasTraceSpans).toBe(false) + expect(completedData.traceSpanCount).toBe(0) }) }) diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 64e0df01175..a7c8458ac78 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -27,6 +27,7 @@ import { snapshotService } from '@/lib/logs/execution/snapshot/service' import type { BlockOutputData, ExecutionEnvironment, + ExecutionFinalizationPath, ExecutionTrigger, ExecutionLoggerService as IExecutionLoggerService, TraceSpan, @@ -49,11 +50,21 @@ export interface ToolCall { const logger = createLogger('ExecutionLogger') +function countTraceSpans(traceSpans?: TraceSpan[]): number { + if (!Array.isArray(traceSpans) || traceSpans.length === 0) { + return 0 + } + + return traceSpans.reduce((count, span) => count + 1 + countTraceSpans(span.children), 0) +} + export class ExecutionLogger implements IExecutionLoggerService { private buildCompletedExecutionData(params: { existingExecutionData?: WorkflowExecutionLog['executionData'] traceSpans?: TraceSpan[] finalOutput: BlockOutputData + finalizationPath?: ExecutionFinalizationPath + completionFailure?: string executionCost: { tokens: { input: number @@ -64,7 +75,16 @@ export class ExecutionLogger implements IExecutionLoggerService { } executionState?: SerializableExecutionState }): WorkflowExecutionLog['executionData'] { - const { existingExecutionData, traceSpans, finalOutput, executionCost, executionState } = params + const { + existingExecutionData, + traceSpans, + finalOutput, + finalizationPath, + completionFailure, + executionCost, + executionState, + } = params + const traceSpanCount = countTraceSpans(traceSpans) return { ...(existingExecutionData?.environment @@ -78,6 +98,17 @@ export class ExecutionLogger implements IExecutionLoggerService { existingExecutionData?.trigger?.data?.correlation, } : {}), + ...(existingExecutionData?.error ? { error: existingExecutionData.error } : {}), + ...(existingExecutionData?.lastStartedBlock + ? { lastStartedBlock: existingExecutionData.lastStartedBlock } + : {}), + ...(existingExecutionData?.lastCompletedBlock + ? { lastCompletedBlock: existingExecutionData.lastCompletedBlock } + : {}), + ...(completionFailure ? { completionFailure } : {}), + ...(finalizationPath ? { finalizationPath } : {}), + hasTraceSpans: traceSpanCount > 0, + traceSpanCount, traceSpans, finalOutput, tokens: { @@ -173,6 +204,8 @@ export class ExecutionLogger implements IExecutionLoggerService { environment, trigger, ...(trigger.data?.correlation ? { correlation: trigger.data.correlation } : {}), + hasTraceSpans: false, + traceSpanCount: 0, }, cost: { total: BASE_EXECUTION_CHARGE, @@ -232,6 +265,8 @@ export class ExecutionLogger implements IExecutionLoggerService { traceSpans?: TraceSpan[] workflowInput?: any executionState?: SerializableExecutionState + finalizationPath?: ExecutionFinalizationPath + completionFailure?: string isResume?: boolean level?: 'info' | 'error' status?: 'completed' | 'failed' | 'cancelled' | 'pending' @@ -245,6 +280,8 @@ export class ExecutionLogger implements IExecutionLoggerService { traceSpans, workflowInput, executionState, + finalizationPath, + completionFailure, isResume, level: levelOverride, status: statusOverride, @@ -315,6 +352,16 @@ export class ExecutionLogger implements IExecutionLoggerService { ? Math.max(0, Math.round(rawDurationMs)) : 0 + const completedExecutionData = this.buildCompletedExecutionData({ + existingExecutionData, + traceSpans: redactedTraceSpans, + finalOutput: redactedFinalOutput, + finalizationPath, + completionFailure, + executionCost, + executionState, + }) + const [updatedLog] = await db .update(workflowExecutionLogs) .set({ @@ -323,13 +370,7 @@ export class ExecutionLogger implements IExecutionLoggerService { endedAt: new Date(endedAt), totalDurationMs: totalDuration, files: executionFiles.length > 0 ? executionFiles : null, - executionData: this.buildCompletedExecutionData({ - existingExecutionData, - traceSpans: redactedTraceSpans, - finalOutput: redactedFinalOutput, - executionCost, - executionState, - }), + executionData: completedExecutionData, cost: executionCost, }) .where(eq(workflowExecutionLogs.executionId, executionId)) diff --git a/apps/sim/lib/logs/execution/logging-session.test.ts b/apps/sim/lib/logs/execution/logging-session.test.ts index 2f9bd2370f2..a44bc5e72d5 100644 --- a/apps/sim/lib/logs/execution/logging-session.test.ts +++ b/apps/sim/lib/logs/execution/logging-session.test.ts @@ -1,11 +1,48 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' +const dbMocks = vi.hoisted(() => { + const selectLimit = vi.fn() + const selectWhere = vi.fn() + const selectFrom = vi.fn() + const select = vi.fn() + const updateWhere = vi.fn() + const updateSet = vi.fn() + const update = vi.fn() + const execute = vi.fn() + const eq = vi.fn() + const sql = vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })) + + select.mockReturnValue({ from: selectFrom }) + selectFrom.mockReturnValue({ where: selectWhere }) + selectWhere.mockReturnValue({ limit: selectLimit }) + + update.mockReturnValue({ set: updateSet }) + updateSet.mockReturnValue({ where: updateWhere }) + + return { + select, + selectFrom, + selectWhere, + selectLimit, + update, + updateSet, + updateWhere, + execute, + eq, + sql, + } +}) + const { completeWorkflowExecutionMock } = vi.hoisted(() => ({ completeWorkflowExecutionMock: vi.fn(), })) vi.mock('@sim/db', () => ({ - db: {}, + db: { + select: dbMocks.select, + update: dbMocks.update, + execute: dbMocks.execute, + }, })) vi.mock('@sim/db/schema', () => ({ @@ -22,8 +59,8 @@ vi.mock('@sim/logger', () => ({ })) vi.mock('drizzle-orm', () => ({ - eq: vi.fn(), - sql: vi.fn(), + eq: dbMocks.eq, + sql: dbMocks.sql, })) vi.mock('@/lib/logs/execution/logger', () => ({ @@ -56,6 +93,9 @@ import { LoggingSession } from './logging-session' describe('LoggingSession completion retries', () => { beforeEach(() => { vi.clearAllMocks() + dbMocks.selectLimit.mockResolvedValue([{ executionData: {} }]) + dbMocks.updateWhere.mockResolvedValue(undefined) + dbMocks.execute.mockResolvedValue(undefined) }) it('keeps completion best-effort when a later error completion retries after full completion and fallback both fail', async () => { @@ -86,7 +126,6 @@ describe('LoggingSession completion retries', () => { .mockRejectedValueOnce(new Error('cost only failed')) await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined() - await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined() expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2) @@ -118,6 +157,64 @@ describe('LoggingSession completion retries', () => { expect(session.hasCompleted()).toBe(true) }) + it('preserves successful final output during fallback completion', async () => { + const session = new LoggingSession('workflow-1', 'execution-5', 'api', 'req-1') + + completeWorkflowExecutionMock + .mockRejectedValueOnce(new Error('success finalize failed')) + .mockResolvedValueOnce({}) + + await expect( + session.safeComplete({ finalOutput: { ok: true, stage: 'done' } }) + ).resolves.toBeUndefined() + + expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith( + expect.objectContaining({ + executionId: 'execution-5', + finalOutput: { ok: true, stage: 'done' }, + finalizationPath: 'fallback_completed', + }) + ) + }) + + it('preserves accumulated cost during fallback completion', async () => { + const session = new LoggingSession('workflow-1', 'execution-6', 'api', 'req-1') as any + + session.accumulatedCost = { + total: 12, + input: 5, + output: 7, + tokens: { input: 11, output: 13, total: 24 }, + models: { + 'test-model': { + input: 5, + output: 7, + total: 12, + tokens: { input: 11, output: 13, total: 24 }, + }, + }, + } + session.costFlushed = true + + completeWorkflowExecutionMock + .mockRejectedValueOnce(new Error('success finalize failed')) + .mockResolvedValueOnce({}) + + await expect(session.safeComplete({ finalOutput: { ok: true } })).resolves.toBeUndefined() + + expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith( + expect.objectContaining({ + executionId: 'execution-6', + costSummary: expect.objectContaining({ + totalCost: 12, + totalInputCost: 5, + totalOutputCost: 7, + totalTokens: 24, + }), + }) + ) + }) + it('persists failed error semantics when completeWithError receives non-error trace spans', async () => { const session = new LoggingSession('workflow-1', 'execution-4', 'api', 'req-1') const traceSpans = [ @@ -148,6 +245,8 @@ describe('LoggingSession completion retries', () => { traceSpans, level: 'error', status: 'failed', + finalizationPath: 'force_failed', + completionFailure: 'persist me as failed', }) ) }) @@ -196,4 +295,168 @@ describe('LoggingSession completion retries', () => { expect(session.hasCompleted()).toBe(true) expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2) }) + + it('persists last started block independently from cost accumulation', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + + expect(dbMocks.select).not.toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + + it('enforces started marker monotonicity in the database write path', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + + expect(dbMocks.sql).toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + + it('allows same-millisecond started markers to replace the prior marker', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + + const queryCall = dbMocks.sql.mock.calls.at(-1) + expect(queryCall).toBeDefined() + + const [query] = queryCall! + expect(Array.from(query).join(' ')).toContain('<=') + }) + + it('persists last completed block for zero-cost outputs', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockComplete('block-2', 'Transform', 'function', { + endedAt: '2025-01-01T00:00:01.000Z', + output: { value: true }, + }) + + expect(dbMocks.select).not.toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + + it('allows same-millisecond completed markers to replace the prior marker', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') + + await session.onBlockComplete('block-2', 'Transform', 'function', { + endedAt: '2025-01-01T00:00:01.000Z', + output: { value: true }, + }) + + const queryCall = dbMocks.sql.mock.calls.at(-1) + expect(queryCall).toBeDefined() + + const [query] = queryCall! + expect(Array.from(query).join(' ')).toContain('<=') + }) + + it('drains pending lifecycle writes before terminal completion', async () => { + let releasePersist: (() => void) | undefined + const persistPromise = new Promise((resolve) => { + releasePersist = resolve + }) + + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + session.persistLastStartedBlock = vi.fn(() => persistPromise) + session.complete = vi.fn().mockResolvedValue(undefined) + + const startPromise = session.onBlockStart('block-1', 'Fetch', 'api', '2025-01-01T00:00:00.000Z') + const completionPromise = session.safeComplete({ finalOutput: { ok: true } }) + + await Promise.resolve() + + expect(session.complete).not.toHaveBeenCalled() + + releasePersist?.() + + await startPromise + await completionPromise + + expect(session.persistLastStartedBlock).toHaveBeenCalledTimes(1) + expect(session.complete).toHaveBeenCalledTimes(1) + }) + + it('drains fire-and-forget cost flushes before terminal completion', async () => { + let releaseFlush: (() => void) | undefined + const flushPromise = new Promise((resolve) => { + releaseFlush = resolve + }) + + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + session.flushAccumulatedCost = vi.fn(() => flushPromise) + session.complete = vi.fn().mockResolvedValue(undefined) + + await session.onBlockComplete('block-2', 'Transform', 'function', { + endedAt: '2025-01-01T00:00:01.000Z', + output: { value: true }, + cost: { total: 1, input: 1, output: 0 }, + tokens: { input: 1, output: 0, total: 1 }, + model: 'test-model', + }) + + const completionPromise = session.safeComplete({ finalOutput: { ok: true } }) + + await Promise.resolve() + + expect(session.complete).not.toHaveBeenCalled() + + releaseFlush?.() + + await completionPromise + + expect(session.flushAccumulatedCost).toHaveBeenCalledTimes(1) + expect(session.complete).toHaveBeenCalledTimes(1) + }) + + it('keeps draining when new progress writes arrive during drain', async () => { + let releaseFirst: (() => void) | undefined + let releaseSecond: (() => void) | undefined + const firstPromise = new Promise((resolve) => { + releaseFirst = resolve + }) + const secondPromise = new Promise((resolve) => { + releaseSecond = resolve + }) + + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + + void session.trackProgressWrite(firstPromise) + + const drainPromise = session.drainPendingProgressWrites() + + await Promise.resolve() + + void session.trackProgressWrite(secondPromise) + releaseFirst?.() + + await Promise.resolve() + + let drained = false + void drainPromise.then(() => { + drained = true + }) + + await Promise.resolve() + expect(drained).toBe(false) + + releaseSecond?.() + await drainPromise + + expect(session.pendingProgressWrites.size).toBe(0) + }) + + it('marks pause completion as terminal and prevents duplicate pause finalization', async () => { + const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any + session.completeExecutionWithFinalization = vi.fn().mockResolvedValue(undefined) + + await session.completeWithPause({ workflowInput: { ok: true } }) + await session.completeWithPause({ workflowInput: { ok: true } }) + + expect(session.completeExecutionWithFinalization).toHaveBeenCalledTimes(1) + expect(session.completed).toBe(true) + expect(session.completing).toBe(true) + }) }) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index 4cd4220d5bf..7fb72bc1d73 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -13,6 +13,9 @@ import { } from '@/lib/logs/execution/logging-factory' import type { ExecutionEnvironment, + ExecutionFinalizationPath, + ExecutionLastCompletedBlock, + ExecutionLastStartedBlock, ExecutionTrigger, TraceSpan, WorkflowState, @@ -23,6 +26,46 @@ type TriggerData = Record & { correlation?: NonNullable['correlation'] } +function buildStartedMarkerPersistenceQuery(params: { + executionId: string + marker: ExecutionLastStartedBlock +}) { + const markerJson = JSON.stringify(params.marker) + + return sql`UPDATE workflow_execution_logs + SET execution_data = jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + '{lastStartedBlock}', + ${markerJson}::jsonb, + true + ) + WHERE execution_id = ${params.executionId} + AND COALESCE( + jsonb_extract_path_text(COALESCE(execution_data, '{}'::jsonb), 'lastStartedBlock', 'startedAt'), + '' + ) <= ${params.marker.startedAt}` +} + +function buildCompletedMarkerPersistenceQuery(params: { + executionId: string + marker: ExecutionLastCompletedBlock +}) { + const markerJson = JSON.stringify(params.marker) + + return sql`UPDATE workflow_execution_logs + SET execution_data = jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + '{lastCompletedBlock}', + ${markerJson}::jsonb, + true + ) + WHERE execution_id = ${params.executionId} + AND COALESCE( + jsonb_extract_path_text(COALESCE(execution_data, '{}'::jsonb), 'lastCompletedBlock', 'endedAt'), + '' + ) <= ${params.marker.endedAt}` +} + const logger = createLogger('LoggingSession') type CompletionAttempt = 'complete' | 'error' | 'cancelled' | 'paused' @@ -109,6 +152,7 @@ export class LoggingSession { tokens: { input: 0, output: 0, total: 0 }, models: {}, } + private pendingProgressWrites = new Set>() private costFlushed = false private postExecutionPromise: Promise | null = null @@ -124,12 +168,132 @@ export class LoggingSession { this.requestId = requestId } + async onBlockStart( + blockId: string, + blockName: string, + blockType: string, + startedAt: string + ): Promise { + await this.trackProgressWrite( + this.persistLastStartedBlock({ + blockId, + blockName, + blockType, + startedAt, + }) + ) + } + + private async persistLastStartedBlock(marker: ExecutionLastStartedBlock): Promise { + try { + await db.execute( + buildStartedMarkerPersistenceQuery({ + executionId: this.executionId, + marker, + }) + ) + } catch (error) { + logger.error(`Failed to persist last started block for execution ${this.executionId}:`, { + error: error instanceof Error ? error.message : String(error), + }) + } + } + + private async persistLastCompletedBlock(marker: ExecutionLastCompletedBlock): Promise { + try { + await db.execute( + buildCompletedMarkerPersistenceQuery({ + executionId: this.executionId, + marker, + }) + ) + } catch (error) { + logger.error(`Failed to persist last completed block for execution ${this.executionId}:`, { + error: error instanceof Error ? error.message : String(error), + }) + } + } + + private async trackProgressWrite(writePromise: Promise): Promise { + this.pendingProgressWrites.add(writePromise) + + try { + await writePromise + } finally { + this.pendingProgressWrites.delete(writePromise) + } + } + + private async drainPendingProgressWrites(): Promise { + while (this.pendingProgressWrites.size > 0) { + await Promise.allSettled(Array.from(this.pendingProgressWrites)) + } + } + + private async completeExecutionWithFinalization(params: { + endedAt: string + totalDurationMs: number + costSummary: { + totalCost: number + totalInputCost: number + totalOutputCost: number + totalTokens: number + totalPromptTokens: number + totalCompletionTokens: number + baseExecutionCharge: number + modelCost: number + models: Record< + string, + { + input: number + output: number + total: number + tokens: { input: number; output: number; total: number } + } + > + } + finalOutput: Record + traceSpans: TraceSpan[] + workflowInput?: unknown + executionState?: SerializableExecutionState + finalizationPath: ExecutionFinalizationPath + completionFailure?: string + level?: 'info' | 'error' + status?: 'completed' | 'failed' | 'cancelled' | 'pending' + }): Promise { + await executionLogger.completeWorkflowExecution({ + executionId: this.executionId, + endedAt: params.endedAt, + totalDurationMs: params.totalDurationMs, + costSummary: params.costSummary, + finalOutput: params.finalOutput, + traceSpans: params.traceSpans, + workflowInput: params.workflowInput, + executionState: params.executionState, + finalizationPath: params.finalizationPath, + completionFailure: params.completionFailure, + isResume: this.isResume, + level: params.level, + status: params.status, + }) + } + async onBlockComplete( blockId: string, blockName: string, blockType: string, output: any ): Promise { + await this.trackProgressWrite( + this.persistLastCompletedBlock({ + blockId, + blockName, + blockType, + endedAt: output?.endedAt || new Date().toISOString(), + success: !output?.output?.error, + }) + ) + if (!output?.cost || typeof output.cost.total !== 'number' || output.cost.total <= 0) { return } @@ -165,7 +329,7 @@ export class LoggingSession { } } - await this.flushAccumulatedCost() + void this.trackProgressWrite(this.flushAccumulatedCost()) } private async flushAccumulatedCost(): Promise { @@ -276,8 +440,7 @@ export class LoggingSession { const endTime = endedAt || new Date().toISOString() const duration = totalDurationMs || 0 - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime, totalDurationMs: duration, costSummary, @@ -285,7 +448,7 @@ export class LoggingSession { traceSpans: traceSpans || [], workflowInput, executionState, - isResume: this.isResume, + finalizationPath: 'completed', }) this.completed = true @@ -403,8 +566,7 @@ export class LoggingSession { const spans = hasProvidedSpans ? traceSpans : [errorSpan] - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime.toISOString(), totalDurationMs: Math.max(1, durationMs), costSummary, @@ -412,6 +574,8 @@ export class LoggingSession { traceSpans: spans, level: 'error', status: 'failed', + finalizationPath: 'force_failed', + completionFailure: message, }) this.completed = true @@ -490,13 +654,13 @@ export class LoggingSession { models: {}, } - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime.toISOString(), totalDurationMs: Math.max(1, durationMs), costSummary, finalOutput: { cancelled: true }, traceSpans: traceSpans || [], + finalizationPath: 'cancelled', status: 'cancelled', }) @@ -577,14 +741,14 @@ export class LoggingSession { models: {}, } - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + await this.completeExecutionWithFinalization({ endedAt: endTime.toISOString(), totalDurationMs: Math.max(1, durationMs), costSummary, finalOutput: { paused: true }, traceSpans: traceSpans || [], workflowInput, + finalizationPath: 'paused', status: 'pending', }) @@ -746,7 +910,6 @@ export class LoggingSession { this.completionAttemptFailed = true throw error }) - return this.completionPromise } @@ -756,6 +919,7 @@ export class LoggingSession { private async _safeCompleteImpl(params: SessionCompleteParams = {}): Promise { try { + await this.drainPendingProgressWrites() await this.complete(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -769,6 +933,8 @@ export class LoggingSession { totalDurationMs: params.totalDurationMs, errorMessage: `Failed to store trace spans: ${errorMsg}`, isError: false, + finalizationPath: 'fallback_completed', + finalOutput: params.finalOutput || {}, }) } } @@ -779,6 +945,7 @@ export class LoggingSession { private async _safeCompleteWithErrorImpl(params?: SessionErrorCompleteParams): Promise { try { + await this.drainPendingProgressWrites() await this.completeWithError(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -793,6 +960,10 @@ export class LoggingSession { errorMessage: params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`, isError: true, + finalizationPath: 'force_failed', + finalOutput: { + error: params?.error?.message || `Execution failed to store trace spans: ${errorMsg}`, + }, status: 'failed', }) } @@ -806,6 +977,7 @@ export class LoggingSession { private async _safeCompleteWithCancellationImpl(params?: SessionCancelledParams): Promise { try { + await this.drainPendingProgressWrites() await this.completeWithCancellation(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -819,6 +991,8 @@ export class LoggingSession { totalDurationMs: params?.totalDurationMs, errorMessage: 'Execution was cancelled', isError: false, + finalizationPath: 'cancelled', + finalOutput: { cancelled: true }, status: 'cancelled', }) } @@ -830,6 +1004,7 @@ export class LoggingSession { private async _safeCompleteWithPauseImpl(params?: SessionPausedParams): Promise { try { + await this.drainPendingProgressWrites() await this.completeWithPause(params) } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error) @@ -843,6 +1018,8 @@ export class LoggingSession { totalDurationMs: params?.totalDurationMs, errorMessage: 'Execution paused but failed to store full trace spans', isError: false, + finalizationPath: 'paused', + finalOutput: { paused: true }, status: 'pending', }) } @@ -867,12 +1044,16 @@ export class LoggingSession { status: 'failed', executionData: sql`jsonb_set( jsonb_set( - COALESCE(execution_data, '{}'::jsonb), - ARRAY['error'], - to_jsonb(${message}::text) + jsonb_set( + COALESCE(execution_data, '{}'::jsonb), + ARRAY['error'], + to_jsonb(${message}::text) + ), + ARRAY['finalOutput'], + jsonb_build_object('error', ${message}::text) ), - ARRAY['finalOutput'], - jsonb_build_object('error', ${message}::text) + ARRAY['finalizationPath'], + to_jsonb('force_failed'::text) )`, }) .where(eq(workflowExecutionLogs.executionId, executionId)) @@ -891,6 +1072,8 @@ export class LoggingSession { totalDurationMs?: number errorMessage: string isError: boolean + finalizationPath: ExecutionFinalizationPath + finalOutput?: Record status?: 'completed' | 'failed' | 'cancelled' | 'pending' }): Promise { if (this.completed || this.completing) { @@ -903,28 +1086,48 @@ export class LoggingSession { ) try { - const costSummary = params.traceSpans?.length - ? calculateCostSummary(params.traceSpans) - : { - totalCost: BASE_EXECUTION_CHARGE, - totalInputCost: 0, - totalOutputCost: 0, - totalTokens: 0, - totalPromptTokens: 0, - totalCompletionTokens: 0, + const hasAccumulatedCost = + this.costFlushed || + this.accumulatedCost.total > BASE_EXECUTION_CHARGE || + this.accumulatedCost.tokens.total > 0 || + Object.keys(this.accumulatedCost.models).length > 0 + + const costSummary = hasAccumulatedCost + ? { + totalCost: this.accumulatedCost.total, + totalInputCost: this.accumulatedCost.input, + totalOutputCost: this.accumulatedCost.output, + totalTokens: this.accumulatedCost.tokens.total, + totalPromptTokens: this.accumulatedCost.tokens.input, + totalCompletionTokens: this.accumulatedCost.tokens.output, baseExecutionCharge: BASE_EXECUTION_CHARGE, - modelCost: 0, - models: {}, + modelCost: Math.max(0, this.accumulatedCost.total - BASE_EXECUTION_CHARGE), + models: this.accumulatedCost.models, } + : params.traceSpans?.length + ? calculateCostSummary(params.traceSpans) + : { + totalCost: BASE_EXECUTION_CHARGE, + totalInputCost: 0, + totalOutputCost: 0, + totalTokens: 0, + totalPromptTokens: 0, + totalCompletionTokens: 0, + baseExecutionCharge: BASE_EXECUTION_CHARGE, + modelCost: 0, + models: {}, + } - await executionLogger.completeWorkflowExecution({ - executionId: this.executionId, + const finalOutput = params.finalOutput || { _fallback: true, error: params.errorMessage } + + await this.completeExecutionWithFinalization({ endedAt: params.endedAt || new Date().toISOString(), totalDurationMs: params.totalDurationMs || 0, costSummary, - finalOutput: { _fallback: true, error: params.errorMessage }, + finalOutput, traceSpans: [], - isResume: this.isResume, + finalizationPath: params.finalizationPath, + completionFailure: params.errorMessage, level: params.isError ? 'error' : 'info', status: params.status, }) diff --git a/apps/sim/lib/logs/types.ts b/apps/sim/lib/logs/types.ts index c785c3037db..b9402583704 100644 --- a/apps/sim/lib/logs/types.ts +++ b/apps/sim/lib/logs/types.ts @@ -71,6 +71,31 @@ export interface ExecutionStatus { durationMs?: number } +export const EXECUTION_FINALIZATION_PATHS = [ + 'completed', + 'fallback_completed', + 'force_failed', + 'cancelled', + 'paused', +] as const + +export type ExecutionFinalizationPath = (typeof EXECUTION_FINALIZATION_PATHS)[number] + +export interface ExecutionLastStartedBlock { + blockId: string + blockName: string + blockType: string + startedAt: string +} + +export interface ExecutionLastCompletedBlock { + blockId: string + blockName: string + blockType: string + endedAt: string + success: boolean +} + export interface WorkflowExecutionSnapshot { id: string workflowId: string | null @@ -105,6 +130,13 @@ export interface WorkflowExecutionLog { environment?: ExecutionEnvironment trigger?: ExecutionTrigger correlation?: AsyncExecutionCorrelation + error?: string + lastStartedBlock?: ExecutionLastStartedBlock + lastCompletedBlock?: ExecutionLastCompletedBlock + hasTraceSpans?: boolean + traceSpanCount?: number + completionFailure?: string + finalizationPath?: ExecutionFinalizationPath traceSpans?: TraceSpan[] tokens?: { input?: number; output?: number; total?: number } models?: Record< @@ -378,6 +410,9 @@ export interface ExecutionLoggerService { finalOutput: BlockOutputData traceSpans?: TraceSpan[] workflowInput?: any + executionState?: SerializableExecutionState + finalizationPath?: ExecutionFinalizationPath + completionFailure?: string isResume?: boolean level?: 'info' | 'error' status?: 'completed' | 'failed' | 'cancelled' | 'pending' diff --git a/apps/sim/lib/workflows/executor/execution-core.test.ts b/apps/sim/lib/workflows/executor/execution-core.test.ts index bbb702a11ae..048ba62492a 100644 --- a/apps/sim/lib/workflows/executor/execution-core.test.ts +++ b/apps/sim/lib/workflows/executor/execution-core.test.ts @@ -16,6 +16,8 @@ const { buildTraceSpansMock, serializeWorkflowMock, executorExecuteMock, + onBlockStartPersistenceMock, + executorConstructorMock, } = vi.hoisted(() => ({ loadWorkflowFromNormalizedTablesMock: vi.fn(), loadDeployedWorkflowStateMock: vi.fn(), @@ -32,6 +34,8 @@ const { buildTraceSpansMock: vi.fn(), serializeWorkflowMock: vi.fn(), executorExecuteMock: vi.fn(), + onBlockStartPersistenceMock: vi.fn(), + executorConstructorMock: vi.fn(), })) vi.mock('@sim/logger', () => ({ @@ -79,10 +83,13 @@ vi.mock('@/lib/workflows/utils', () => ({ })) vi.mock('@/executor', () => ({ - Executor: vi.fn().mockImplementation(() => ({ - execute: executorExecuteMock, - executeFromBlock: executorExecuteMock, - })), + Executor: vi.fn().mockImplementation((args) => { + executorConstructorMock(args) + return { + execute: executorExecuteMock, + executeFromBlock: executorExecuteMock, + } + }), })) vi.mock('@/serializer', () => ({ @@ -105,6 +112,8 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { safeCompleteWithCancellation: safeCompleteWithCancellationMock, safeCompleteWithPause: safeCompleteWithPauseMock, hasCompleted: hasCompletedMock, + onBlockStart: onBlockStartPersistenceMock, + onBlockComplete: vi.fn(), setPostExecutionPromise: vi.fn(), waitForPostExecution: vi.fn().mockResolvedValue(undefined), } @@ -176,10 +185,72 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { safeCompleteWithCancellationMock.mockResolvedValue(undefined) safeCompleteWithPauseMock.mockResolvedValue(undefined) hasCompletedMock.mockReturnValue(true) + onBlockStartPersistenceMock.mockResolvedValue(undefined) updateWorkflowRunCountsMock.mockResolvedValue(undefined) clearExecutionCancellationMock.mockResolvedValue(undefined) }) + it('routes onBlockStart through logging session persistence path', async () => { + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockStart: async (blockId) => { + expect(blockId).toBe('block-1') + }, + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + await contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + + expect(onBlockStartPersistenceMock).toHaveBeenCalledWith( + 'block-1', + 'Fetch', + 'api', + expect.any(String) + ) + }) + + it('does not await user block start callback after persistence completes', async () => { + let releaseCallback: (() => void) | undefined + const callbackPromise = new Promise((resolve) => { + releaseCallback = resolve + }) + + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockStart: vi.fn(() => callbackPromise), + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + + await expect( + contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + ).resolves.toBeUndefined() + + releaseCallback?.() + }) + it('awaits terminal completion before updating run counts and returning', async () => { const callOrder: string[] = [] @@ -222,7 +293,57 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { ]) }) - it('clears cancellation even when success finalization throws', async () => { + it('awaits wrapped lifecycle persistence before terminal finalization returns', async () => { + let releaseBlockStart: (() => void) | undefined + const blockStartPromise = new Promise((resolve) => { + releaseBlockStart = resolve + }) + const callOrder: string[] = [] + + onBlockStartPersistenceMock.mockImplementation(async () => { + callOrder.push('persist:start') + await blockStartPromise + callOrder.push('persist:end') + }) + + safeCompleteMock.mockImplementation(async () => { + callOrder.push('safeComplete') + }) + + executorExecuteMock.mockImplementation(async () => { + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + const startLifecycle = contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + await Promise.resolve() + callOrder.push('executor:before-release') + releaseBlockStart?.() + await startLifecycle + callOrder.push('executor:after-start') + + return { + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + } + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: {}, + loggingSession: loggingSession as any, + }) + + expect(callOrder).toEqual([ + 'persist:start', + 'executor:before-release', + 'persist:end', + 'executor:after-start', + 'safeComplete', + ]) + }) + + it('preserves successful execution when success finalization throws', async () => { executorExecuteMock.mockResolvedValue({ success: true, status: 'completed', @@ -244,7 +365,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(result.status).toBe('completed') expect(clearExecutionCancellationMock).toHaveBeenCalledWith('execution-1') - expect(updateWorkflowRunCountsMock).not.toHaveBeenCalled() + expect(updateWorkflowRunCountsMock).toHaveBeenCalledWith('workflow-1') }) it('routes cancelled executions through safeCompleteWithCancellation', async () => { @@ -304,6 +425,61 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(updateWorkflowRunCountsMock).not.toHaveBeenCalled() }) + it('swallows wrapped block start callback failures without breaking execution', async () => { + onBlockStartPersistenceMock.mockRejectedValue(new Error('start persistence failed')) + + executorExecuteMock.mockImplementation(async () => { + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + await contextExtensions.onBlockStart('block-1', 'Fetch', 'api', 1) + + return { + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + } + }) + + const result = await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: {}, + loggingSession: loggingSession as any, + }) + + expect(result.status).toBe('completed') + expect(safeCompleteMock).toHaveBeenCalledTimes(1) + }) + + it('swallows wrapped block complete callback failures without blocking completion', async () => { + executorExecuteMock.mockResolvedValue({ + success: true, + status: 'completed', + output: { done: true }, + logs: [], + metadata: { duration: 123, startTime: 'start', endTime: 'end' }, + }) + + await executeWorkflowCore({ + snapshot: createSnapshot() as any, + callbacks: { + onBlockComplete: vi.fn().mockRejectedValue(new Error('complete callback failed')), + }, + loggingSession: loggingSession as any, + }) + + const contextExtensions = executorConstructorMock.mock.calls[0]?.[0]?.contextExtensions + + await expect( + contextExtensions.onBlockComplete('block-1', 'Fetch', 'api', { + output: { ok: true }, + executionTime: 1, + startedAt: 'start', + endedAt: 'end', + }) + ).resolves.toBeUndefined() + }) + it('finalizes errors before rethrowing and marks them as core-finalized', async () => { const error = new Error('engine failed') const executionResult = { @@ -445,7 +621,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { expect(wasExecutionFinalizedByCore('engine failed', 'execution-a')).toBe(true) }) - it('logs error without rejecting when success finalization rejects', async () => { + it('does not replace a successful outcome when success finalization rejects', async () => { executorExecuteMock.mockResolvedValue({ success: true, status: 'completed', @@ -464,7 +640,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => { await loggingSession.setPostExecutionPromise.mock.calls[0][0] - expect(result.status).toBe('completed') + expect(result).toMatchObject({ status: 'completed', success: true }) expect(clearExecutionCancellationMock).toHaveBeenCalledWith('execution-1') expect(safeCompleteWithErrorMock).not.toHaveBeenCalled() }) diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index 264cc249e37..aa96e5668a5 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -179,33 +179,41 @@ async function finalizeExecutionOutcome(params: { const endedAt = new Date().toISOString() try { - if (result.status === 'cancelled') { - await loggingSession.safeCompleteWithCancellation({ - endedAt, - totalDurationMs: totalDuration || 0, - traceSpans: traceSpans || [], - }) - return - } + try { + if (result.status === 'cancelled') { + await loggingSession.safeCompleteWithCancellation({ + endedAt, + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + }) + return + } + + if (result.status === 'paused') { + await loggingSession.safeCompleteWithPause({ + endedAt, + totalDurationMs: totalDuration || 0, + traceSpans: traceSpans || [], + workflowInput, + }) + return + } - if (result.status === 'paused') { - await loggingSession.safeCompleteWithPause({ + await loggingSession.safeComplete({ endedAt, totalDurationMs: totalDuration || 0, + finalOutput: result.output || {}, traceSpans: traceSpans || [], workflowInput, + executionState: result.executionState, + }) + } catch (error) { + logger.warn(`[${requestId}] Post-execution finalization failed`, { + executionId, + status: result.status, + error, }) - return } - - await loggingSession.safeComplete({ - endedAt, - totalDurationMs: totalDuration || 0, - finalOutput: result.output || {}, - traceSpans: traceSpans || [], - workflowInput, - executionState: result.executionState, - }) } finally { await clearExecutionCancellationSafely(executionId, requestId) } @@ -424,16 +432,69 @@ export async function executeWorkflowCore( iterationContext?: IterationContext, childWorkflowContext?: ChildWorkflowContext ) => { - await loggingSession.onBlockComplete(blockId, blockName, blockType, output) - if (onBlockComplete) { - await onBlockComplete( + try { + await loggingSession.onBlockComplete(blockId, blockName, blockType, output) + if (onBlockComplete) { + void onBlockComplete( + blockId, + blockName, + blockType, + output, + iterationContext, + childWorkflowContext + ).catch((error) => { + logger.warn(`[${requestId}] Block completion callback failed`, { + executionId, + blockId, + blockType, + error, + }) + }) + } + } catch (error) { + logger.warn(`[${requestId}] Block completion persistence failed`, { + executionId, + blockId, + blockType, + error, + }) + } + } + + const wrappedOnBlockStart = async ( + blockId: string, + blockName: string, + blockType: string, + executionOrder: number, + iterationContext?: IterationContext, + childWorkflowContext?: ChildWorkflowContext + ) => { + try { + await loggingSession.onBlockStart(blockId, blockName, blockType, new Date().toISOString()) + if (onBlockStart) { + void onBlockStart( + blockId, + blockName, + blockType, + executionOrder, + iterationContext, + childWorkflowContext + ).catch((error) => { + logger.warn(`[${requestId}] Block start callback failed`, { + executionId, + blockId, + blockType, + error, + }) + }) + } + } catch (error) { + logger.warn(`[${requestId}] Block start persistence failed`, { + executionId, blockId, - blockName, blockType, - output, - iterationContext, - childWorkflowContext - ) + error, + }) } } @@ -445,7 +506,7 @@ export async function executeWorkflowCore( userId, isDeployedContext: !metadata.isClientSession, enforceCredentialAccess: metadata.enforceCredentialAccess ?? false, - onBlockStart, + onBlockStart: wrappedOnBlockStart, onBlockComplete: wrappedOnBlockComplete, onStream, resumeFromSnapshot, From 2bc11a70bac99376dedbad3e354f971f0a3a55b3 Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 17 Mar 2026 18:00:55 -0700 Subject: [PATCH 05/11] waleedlatif1/hangzhou v2 (#3647) * feat(admin): add user search by email and ID, remove table border - Replace Load Users button with a live search input; query fires on any input - Email search uses listUsers with contains operator - User ID search (UUID format) uses admin.getUser directly for exact lookup - Remove outer border on user table that rendered white in dark mode - Reset pagination to page 0 on new search Co-Authored-By: Claude Sonnet 4.6 * fix(admin): replace live search with explicit search button - Split searchInput (controlled input) from searchQuery (committed value) so the hook only fires on Search click or Enter, not every keystroke - Gate table render on searchQuery.length > 0 to prevent stale results showing after input is cleared Co-Authored-By: Claude Sonnet 4.6 --------- Co-authored-by: Claude Sonnet 4.6 --- .../settings/components/admin/admin.tsx | 39 ++++++----- apps/sim/hooks/queries/admin-users.ts | 65 ++++++++++++++----- 2 files changed, 68 insertions(+), 36 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx b/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx index 2e856c39f9c..5161bb3c5d9 100644 --- a/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx +++ b/apps/sim/app/workspace/[workspaceId]/settings/components/admin/admin.tsx @@ -31,7 +31,8 @@ export function Admin() { const [workflowId, setWorkflowId] = useState('') const [usersOffset, setUsersOffset] = useState(0) - const [usersEnabled, setUsersEnabled] = useState(false) + const [searchInput, setSearchInput] = useState('') + const [searchQuery, setSearchQuery] = useState('') const [banUserId, setBanUserId] = useState(null) const [banReason, setBanReason] = useState('') @@ -39,8 +40,12 @@ export function Admin() { data: usersData, isLoading: usersLoading, error: usersError, - refetch: refetchUsers, - } = useAdminUsers(usersOffset, PAGE_SIZE, usersEnabled) + } = useAdminUsers(usersOffset, PAGE_SIZE, searchQuery) + + const handleSearch = () => { + setUsersOffset(0) + setSearchQuery(searchInput.trim()) + } const totalPages = useMemo( () => Math.ceil((usersData?.total ?? 0) / PAGE_SIZE), @@ -62,14 +67,6 @@ export function Admin() { ) } - const handleLoadUsers = () => { - if (usersEnabled) { - refetchUsers() - } else { - setUsersEnabled(true) - } - } - const pendingUserIds = useMemo(() => { const ids = new Set() if (setUserRole.isPending && (setUserRole.variables as { userId?: string })?.userId) @@ -136,10 +133,16 @@ export function Admin() {
-
-

User Management

-
@@ -164,9 +167,9 @@ export function Admin() {
)} - {usersData && ( + {searchQuery.length > 0 && usersData && ( <> -
+
Name Email @@ -176,7 +179,7 @@ export function Admin() {
{usersData.users.length === 0 && ( -
+
No users found.
)} diff --git a/apps/sim/hooks/queries/admin-users.ts b/apps/sim/hooks/queries/admin-users.ts index 69b74dfbaa4..6f9f6bba736 100644 --- a/apps/sim/hooks/queries/admin-users.ts +++ b/apps/sim/hooks/queries/admin-users.ts @@ -7,7 +7,8 @@ const logger = createLogger('AdminUsersQuery') export const adminUserKeys = { all: ['adminUsers'] as const, lists: () => [...adminUserKeys.all, 'list'] as const, - list: (offset: number, limit: number) => [...adminUserKeys.lists(), offset, limit] as const, + list: (offset: number, limit: number, searchQuery: string) => + [...adminUserKeys.lists(), offset, limit, searchQuery] as const, } interface AdminUser { @@ -24,31 +25,59 @@ interface AdminUsersResponse { total: number } -async function fetchAdminUsers(offset: number, limit: number): Promise { +const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i + +function mapUser(u: { + id: string + name: string + email: string + role?: string | null + banned?: boolean | null + banReason?: string | null +}): AdminUser { + return { + id: u.id, + name: u.name || '', + email: u.email, + role: u.role ?? 'user', + banned: u.banned ?? false, + banReason: u.banReason ?? null, + } +} + +async function fetchAdminUsers( + offset: number, + limit: number, + searchQuery: string +): Promise { + if (UUID_REGEX.test(searchQuery.trim())) { + const { data, error } = await client.admin.getUser({ query: { id: searchQuery.trim() } }) + if (error) throw new Error(error.message ?? 'Failed to fetch user') + if (!data) return { users: [], total: 0 } + return { users: [mapUser(data)], total: 1 } + } + const { data, error } = await client.admin.listUsers({ - query: { limit, offset }, + query: { + limit, + offset, + searchField: 'email', + searchValue: searchQuery, + searchOperator: 'contains', + }, }) - if (error) { - throw new Error(error.message ?? 'Failed to fetch users') - } + if (error) throw new Error(error.message ?? 'Failed to fetch users') return { - users: (data?.users ?? []).map((u) => ({ - id: u.id, - name: u.name || '', - email: u.email, - role: u.role ?? 'user', - banned: u.banned ?? false, - banReason: u.banReason ?? null, - })), + users: (data?.users ?? []).map(mapUser), total: data?.total ?? 0, } } -export function useAdminUsers(offset: number, limit: number, enabled: boolean) { +export function useAdminUsers(offset: number, limit: number, searchQuery: string) { return useQuery({ - queryKey: adminUserKeys.list(offset, limit), - queryFn: () => fetchAdminUsers(offset, limit), - enabled, + queryKey: adminUserKeys.list(offset, limit, searchQuery), + queryFn: () => fetchAdminUsers(offset, limit, searchQuery), + enabled: searchQuery.length > 0, staleTime: 30 * 1000, placeholderData: keepPreviousData, }) From 5f89c7140caed7ce5d84327a20ec3ea57791ae1b Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 17 Mar 2026 18:14:29 -0700 Subject: [PATCH 06/11] feat(knowledge): add upsert document operation (#3644) * feat(knowledge): add upsert document operation to Knowledge block Add a "Create or Update" (upsert) document capability that finds an existing document by ID or filename, deletes it if found, then creates a new document and queues re-processing. Includes new tool, API route, block wiring, and typed interfaces. Co-Authored-By: Claude Opus 4.6 * fix(knowledge): address review comments on upsert document - Reorder create-then-delete to prevent data loss if creation fails - Move Zod validation before workflow authorization for validated input - Fix btoa stack overflow for large content using loop-based encoding Co-Authored-By: Claude Opus 4.6 * fix(knowledge): guard against empty createDocumentRecords result Add safety check before accessing firstDocument to prevent TypeError and data loss if createDocumentRecords unexpectedly returns empty. Co-Authored-By: Claude Opus 4.6 * fix(knowledge): prevent documentId fallthrough and use byte-count limit - Use if/else so filename lookup only runs when no documentId is provided, preventing stale IDs from silently replacing unrelated documents - Check utf8 byte length instead of character count for 1MB size limit, correctly handling multi-byte characters (CJK, emoji) Co-Authored-By: Claude Opus 4.6 * fix(knowledge): rollback on delete failure, deduplicate sub-block IDs - Add compensating rollback: if deleteDocument throws after create succeeds, clean up the new record to prevent orphaned pending docs - Merge duplicate name/content sub-blocks into single entries with array conditions, matching the documentTags pattern Co-Authored-By: Claude Opus 4.6 * lint * lint Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- .../knowledge/[id]/documents/upsert/route.ts | 248 ++++++++++++++++++ apps/sim/blocks/blocks/knowledge.ts | 25 +- apps/sim/tools/knowledge/index.ts | 2 + apps/sim/tools/knowledge/types.ts | 30 +++ apps/sim/tools/knowledge/upsert_document.ts | 189 +++++++++++++ apps/sim/tools/registry.ts | 2 + 6 files changed, 492 insertions(+), 4 deletions(-) create mode 100644 apps/sim/app/api/knowledge/[id]/documents/upsert/route.ts create mode 100644 apps/sim/tools/knowledge/upsert_document.ts diff --git a/apps/sim/app/api/knowledge/[id]/documents/upsert/route.ts b/apps/sim/app/api/knowledge/[id]/documents/upsert/route.ts new file mode 100644 index 00000000000..2499006ed35 --- /dev/null +++ b/apps/sim/app/api/knowledge/[id]/documents/upsert/route.ts @@ -0,0 +1,248 @@ +import { randomUUID } from 'crypto' +import { db } from '@sim/db' +import { document } from '@sim/db/schema' +import { createLogger } from '@sim/logger' +import { and, eq, isNull } from 'drizzle-orm' +import { type NextRequest, NextResponse } from 'next/server' +import { z } from 'zod' +import { AuditAction, AuditResourceType, recordAudit } from '@/lib/audit/log' +import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' +import { + createDocumentRecords, + deleteDocument, + getProcessingConfig, + processDocumentsWithQueue, +} from '@/lib/knowledge/documents/service' +import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' +import { checkKnowledgeBaseWriteAccess } from '@/app/api/knowledge/utils' + +const logger = createLogger('DocumentUpsertAPI') + +const UpsertDocumentSchema = z.object({ + documentId: z.string().optional(), + filename: z.string().min(1, 'Filename is required'), + fileUrl: z.string().min(1, 'File URL is required'), + fileSize: z.number().min(1, 'File size must be greater than 0'), + mimeType: z.string().min(1, 'MIME type is required'), + documentTagsData: z.string().optional(), + processingOptions: z.object({ + chunkSize: z.number().min(100).max(4000), + minCharactersPerChunk: z.number().min(1).max(2000), + recipe: z.string(), + lang: z.string(), + chunkOverlap: z.number().min(0).max(500), + }), + workflowId: z.string().optional(), +}) + +export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) { + const requestId = randomUUID().slice(0, 8) + const { id: knowledgeBaseId } = await params + + try { + const body = await req.json() + + logger.info(`[${requestId}] Knowledge base document upsert request`, { + knowledgeBaseId, + hasDocumentId: !!body.documentId, + filename: body.filename, + }) + + const auth = await checkSessionOrInternalAuth(req, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + logger.warn(`[${requestId}] Authentication failed: ${auth.error || 'Unauthorized'}`) + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + const userId = auth.userId + + const validatedData = UpsertDocumentSchema.parse(body) + + if (validatedData.workflowId) { + const authorization = await authorizeWorkflowByWorkspacePermission({ + workflowId: validatedData.workflowId, + userId, + action: 'write', + }) + if (!authorization.allowed) { + return NextResponse.json( + { error: authorization.message || 'Access denied' }, + { status: authorization.status } + ) + } + } + + const accessCheck = await checkKnowledgeBaseWriteAccess(knowledgeBaseId, userId) + + if (!accessCheck.hasAccess) { + if ('notFound' in accessCheck && accessCheck.notFound) { + logger.warn(`[${requestId}] Knowledge base not found: ${knowledgeBaseId}`) + return NextResponse.json({ error: 'Knowledge base not found' }, { status: 404 }) + } + logger.warn( + `[${requestId}] User ${userId} attempted to upsert document in unauthorized knowledge base ${knowledgeBaseId}` + ) + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + let existingDocumentId: string | null = null + let isUpdate = false + + if (validatedData.documentId) { + const existingDoc = await db + .select({ id: document.id }) + .from(document) + .where( + and( + eq(document.id, validatedData.documentId), + eq(document.knowledgeBaseId, knowledgeBaseId), + isNull(document.deletedAt) + ) + ) + .limit(1) + + if (existingDoc.length > 0) { + existingDocumentId = existingDoc[0].id + } + } else { + const docsByFilename = await db + .select({ id: document.id }) + .from(document) + .where( + and( + eq(document.filename, validatedData.filename), + eq(document.knowledgeBaseId, knowledgeBaseId), + isNull(document.deletedAt) + ) + ) + .limit(1) + + if (docsByFilename.length > 0) { + existingDocumentId = docsByFilename[0].id + } + } + + if (existingDocumentId) { + isUpdate = true + logger.info( + `[${requestId}] Found existing document ${existingDocumentId}, creating replacement before deleting old` + ) + } + + const createdDocuments = await createDocumentRecords( + [ + { + filename: validatedData.filename, + fileUrl: validatedData.fileUrl, + fileSize: validatedData.fileSize, + mimeType: validatedData.mimeType, + ...(validatedData.documentTagsData && { + documentTagsData: validatedData.documentTagsData, + }), + }, + ], + knowledgeBaseId, + requestId + ) + + const firstDocument = createdDocuments[0] + if (!firstDocument) { + logger.error(`[${requestId}] createDocumentRecords returned empty array unexpectedly`) + return NextResponse.json({ error: 'Failed to create document record' }, { status: 500 }) + } + + if (existingDocumentId) { + try { + await deleteDocument(existingDocumentId, requestId) + } catch (deleteError) { + logger.error( + `[${requestId}] Failed to delete old document ${existingDocumentId}, rolling back new record`, + deleteError + ) + await deleteDocument(firstDocument.documentId, requestId).catch(() => {}) + return NextResponse.json({ error: 'Failed to replace existing document' }, { status: 500 }) + } + } + + processDocumentsWithQueue( + createdDocuments, + knowledgeBaseId, + validatedData.processingOptions, + requestId + ).catch((error: unknown) => { + logger.error(`[${requestId}] Critical error in document processing pipeline:`, error) + }) + + try { + const { PlatformEvents } = await import('@/lib/core/telemetry') + PlatformEvents.knowledgeBaseDocumentsUploaded({ + knowledgeBaseId, + documentsCount: 1, + uploadType: 'single', + chunkSize: validatedData.processingOptions.chunkSize, + recipe: validatedData.processingOptions.recipe, + }) + } catch (_e) { + // Silently fail + } + + recordAudit({ + workspaceId: accessCheck.knowledgeBase?.workspaceId ?? null, + actorId: userId, + actorName: auth.userName, + actorEmail: auth.userEmail, + action: isUpdate ? AuditAction.DOCUMENT_UPDATED : AuditAction.DOCUMENT_UPLOADED, + resourceType: AuditResourceType.DOCUMENT, + resourceId: knowledgeBaseId, + resourceName: validatedData.filename, + description: isUpdate + ? `Upserted (replaced) document "${validatedData.filename}" in knowledge base "${knowledgeBaseId}"` + : `Upserted (created) document "${validatedData.filename}" in knowledge base "${knowledgeBaseId}"`, + metadata: { + fileName: validatedData.filename, + previousDocumentId: existingDocumentId, + isUpdate, + }, + request: req, + }) + + return NextResponse.json({ + success: true, + data: { + documentsCreated: [ + { + documentId: firstDocument.documentId, + filename: firstDocument.filename, + status: 'pending', + }, + ], + isUpdate, + previousDocumentId: existingDocumentId, + processingMethod: 'background', + processingConfig: { + maxConcurrentDocuments: getProcessingConfig().maxConcurrentDocuments, + batchSize: getProcessingConfig().batchSize, + }, + }, + }) + } catch (error) { + if (error instanceof z.ZodError) { + logger.warn(`[${requestId}] Invalid upsert request data`, { errors: error.errors }) + return NextResponse.json( + { error: 'Invalid request data', details: error.errors }, + { status: 400 } + ) + } + + logger.error(`[${requestId}] Error upserting document`, error) + + const errorMessage = error instanceof Error ? error.message : 'Failed to upsert document' + const isStorageLimitError = + errorMessage.includes('Storage limit exceeded') || errorMessage.includes('storage limit') + const isMissingKnowledgeBase = errorMessage === 'Knowledge base not found' + + return NextResponse.json( + { error: errorMessage }, + { status: isMissingKnowledgeBase ? 404 : isStorageLimitError ? 413 : 500 } + ) + } +} diff --git a/apps/sim/blocks/blocks/knowledge.ts b/apps/sim/blocks/blocks/knowledge.ts index aaa2e78e565..f488c49db74 100644 --- a/apps/sim/blocks/blocks/knowledge.ts +++ b/apps/sim/blocks/blocks/knowledge.ts @@ -29,6 +29,7 @@ export const KnowledgeBlock: BlockConfig = { { label: 'List Documents', id: 'list_documents' }, { label: 'Get Document', id: 'get_document' }, { label: 'Create Document', id: 'create_document' }, + { label: 'Upsert Document', id: 'upsert_document' }, { label: 'Delete Document', id: 'delete_document' }, { label: 'List Chunks', id: 'list_chunks' }, { label: 'Upload Chunk', id: 'upload_chunk' }, @@ -175,14 +176,14 @@ export const KnowledgeBlock: BlockConfig = { condition: { field: 'operation', value: 'upload_chunk' }, }, - // --- Create Document --- + // --- Create Document / Upsert Document --- { id: 'name', title: 'Document Name', type: 'short-input', placeholder: 'Enter document name', required: true, - condition: { field: 'operation', value: 'create_document' }, + condition: { field: 'operation', value: ['create_document', 'upsert_document'] }, }, { id: 'content', @@ -191,14 +192,21 @@ export const KnowledgeBlock: BlockConfig = { placeholder: 'Enter the document content', rows: 6, required: true, - condition: { field: 'operation', value: 'create_document' }, + condition: { field: 'operation', value: ['create_document', 'upsert_document'] }, + }, + { + id: 'upsertDocumentId', + title: 'Document ID (Optional)', + type: 'short-input', + placeholder: 'Enter existing document ID to update (or leave empty to match by name)', + condition: { field: 'operation', value: 'upsert_document' }, }, { id: 'documentTags', title: 'Document Tags', type: 'document-tag-entry', dependsOn: ['knowledgeBaseSelector'], - condition: { field: 'operation', value: 'create_document' }, + condition: { field: 'operation', value: ['create_document', 'upsert_document'] }, }, // --- Update Chunk / Delete Chunk --- @@ -264,6 +272,7 @@ export const KnowledgeBlock: BlockConfig = { 'knowledge_search', 'knowledge_upload_chunk', 'knowledge_create_document', + 'knowledge_upsert_document', 'knowledge_list_tags', 'knowledge_list_documents', 'knowledge_get_document', @@ -284,6 +293,8 @@ export const KnowledgeBlock: BlockConfig = { return 'knowledge_upload_chunk' case 'create_document': return 'knowledge_create_document' + case 'upsert_document': + return 'knowledge_upsert_document' case 'list_tags': return 'knowledge_list_tags' case 'list_documents': @@ -355,6 +366,11 @@ export const KnowledgeBlock: BlockConfig = { if (params.chunkEnabledFilter) params.enabled = params.chunkEnabledFilter } + // Map upsert sub-block field to tool param + if (params.operation === 'upsert_document' && params.upsertDocumentId) { + params.documentId = String(params.upsertDocumentId).trim() + } + // Convert enabled dropdown string to boolean for update_chunk if (params.operation === 'update_chunk' && typeof params.enabled === 'string') { params.enabled = params.enabled === 'true' @@ -382,6 +398,7 @@ export const KnowledgeBlock: BlockConfig = { documentTags: { type: 'string', description: 'Document tags' }, chunkSearch: { type: 'string', description: 'Search filter for chunks' }, chunkEnabledFilter: { type: 'string', description: 'Filter chunks by enabled status' }, + upsertDocumentId: { type: 'string', description: 'Document ID for upsert operation' }, connectorId: { type: 'string', description: 'Connector identifier' }, }, outputs: { diff --git a/apps/sim/tools/knowledge/index.ts b/apps/sim/tools/knowledge/index.ts index 8fafa6a603c..0c0edd54448 100644 --- a/apps/sim/tools/knowledge/index.ts +++ b/apps/sim/tools/knowledge/index.ts @@ -11,6 +11,7 @@ import { knowledgeSearchTool } from '@/tools/knowledge/search' import { knowledgeTriggerSyncTool } from '@/tools/knowledge/trigger_sync' import { knowledgeUpdateChunkTool } from '@/tools/knowledge/update_chunk' import { knowledgeUploadChunkTool } from '@/tools/knowledge/upload_chunk' +import { knowledgeUpsertDocumentTool } from '@/tools/knowledge/upsert_document' export { knowledgeSearchTool, @@ -26,4 +27,5 @@ export { knowledgeListConnectorsTool, knowledgeGetConnectorTool, knowledgeTriggerSyncTool, + knowledgeUpsertDocumentTool, } diff --git a/apps/sim/tools/knowledge/types.ts b/apps/sim/tools/knowledge/types.ts index 09a4f8695a2..49fb6d8c338 100644 --- a/apps/sim/tools/knowledge/types.ts +++ b/apps/sim/tools/knowledge/types.ts @@ -286,3 +286,33 @@ export interface KnowledgeTriggerSyncResponse { } error?: string } + +export interface KnowledgeUpsertDocumentParams { + knowledgeBaseId: string + name: string + content: string + documentId?: string + documentTags?: Record + _context?: { workflowId?: string } +} + +export interface KnowledgeUpsertDocumentResult { + documentId: string + documentName: string + type: string + enabled: boolean + isUpdate: boolean + previousDocumentId: string | null + createdAt: string + updatedAt: string +} + +export interface KnowledgeUpsertDocumentResponse { + success: boolean + output: { + data: KnowledgeUpsertDocumentResult + message: string + documentId: string + } + error?: string +} diff --git a/apps/sim/tools/knowledge/upsert_document.ts b/apps/sim/tools/knowledge/upsert_document.ts new file mode 100644 index 00000000000..0314350a0db --- /dev/null +++ b/apps/sim/tools/knowledge/upsert_document.ts @@ -0,0 +1,189 @@ +import type { + KnowledgeUpsertDocumentParams, + KnowledgeUpsertDocumentResponse, +} from '@/tools/knowledge/types' +import { enrichKBTagsSchema } from '@/tools/schema-enrichers' +import { formatDocumentTagsForAPI, parseDocumentTags } from '@/tools/shared/tags' +import type { ToolConfig } from '@/tools/types' + +export const knowledgeUpsertDocumentTool: ToolConfig< + KnowledgeUpsertDocumentParams, + KnowledgeUpsertDocumentResponse +> = { + id: 'knowledge_upsert_document', + name: 'Knowledge Upsert Document', + description: + 'Create or update a document in a knowledge base. If a document with the given ID or filename already exists, it will be replaced with the new content.', + version: '1.0.0', + + params: { + knowledgeBaseId: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'ID of the knowledge base containing the document', + }, + documentId: { + type: 'string', + required: false, + visibility: 'user-or-llm', + description: + 'Optional ID of an existing document to update. If not provided, lookup is done by filename.', + }, + name: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Name of the document', + }, + content: { + type: 'string', + required: true, + visibility: 'user-or-llm', + description: 'Content of the document', + }, + documentTags: { + type: 'json', + required: false, + visibility: 'user-or-llm', + description: 'Document tags', + }, + }, + + schemaEnrichment: { + documentTags: { + dependsOn: 'knowledgeBaseId', + enrichSchema: enrichKBTagsSchema, + }, + }, + + request: { + url: (params) => `/api/knowledge/${params.knowledgeBaseId}/documents/upsert`, + method: 'POST', + headers: () => ({ + 'Content-Type': 'application/json', + }), + body: (params) => { + const workflowId = params._context?.workflowId + const textContent = params.content?.trim() + const documentName = params.name?.trim() + + if (!documentName || documentName.length === 0) { + throw new Error('Document name is required') + } + if (documentName.length > 255) { + throw new Error('Document name must be 255 characters or less') + } + if (!textContent || textContent.length < 1) { + throw new Error('Document content cannot be empty') + } + const utf8Bytes = new TextEncoder().encode(textContent) + const contentBytes = utf8Bytes.length + + if (contentBytes > 1_000_000) { + throw new Error('Document content exceeds maximum size of 1MB') + } + let base64Content: string + if (typeof Buffer !== 'undefined') { + base64Content = Buffer.from(textContent, 'utf8').toString('base64') + } else { + let binary = '' + for (let i = 0; i < utf8Bytes.length; i++) { + binary += String.fromCharCode(utf8Bytes[i]) + } + base64Content = btoa(binary) + } + + const dataUri = `data:text/plain;base64,${base64Content}` + + const parsedTags = parseDocumentTags(params.documentTags) + const tagData = formatDocumentTagsForAPI(parsedTags) + + const filename = documentName.endsWith('.txt') ? documentName : `${documentName}.txt` + + const requestBody: Record = { + filename, + fileUrl: dataUri, + fileSize: contentBytes, + mimeType: 'text/plain', + ...tagData, + processingOptions: { + chunkSize: 1024, + minCharactersPerChunk: 1, + chunkOverlap: 200, + recipe: 'default', + lang: 'en', + }, + ...(workflowId && { workflowId }), + } + + if (params.documentId && String(params.documentId).trim().length > 0) { + requestBody.documentId = String(params.documentId).trim() + } + + return requestBody + }, + }, + + transformResponse: async (response): Promise => { + const result = await response.json() + const data = result.data ?? result + const documentsCreated = data.documentsCreated ?? [] + const firstDocument = documentsCreated[0] + const isUpdate = data.isUpdate ?? false + const previousDocumentId = data.previousDocumentId ?? null + const documentId = firstDocument?.documentId ?? firstDocument?.id ?? '' + + return { + success: true, + output: { + message: isUpdate + ? 'Successfully updated document in knowledge base' + : 'Successfully created document in knowledge base', + documentId, + data: { + documentId, + documentName: firstDocument?.filename ?? 'Unknown', + type: 'document', + enabled: true, + isUpdate, + previousDocumentId, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }, + }, + } + }, + + outputs: { + data: { + type: 'object', + description: 'Information about the upserted document', + properties: { + documentId: { type: 'string', description: 'Document ID' }, + documentName: { type: 'string', description: 'Document name' }, + type: { type: 'string', description: 'Document type' }, + enabled: { type: 'boolean', description: 'Whether the document is enabled' }, + isUpdate: { + type: 'boolean', + description: 'Whether an existing document was replaced', + }, + previousDocumentId: { + type: 'string', + description: 'ID of the document that was replaced, if any', + optional: true, + }, + createdAt: { type: 'string', description: 'Creation timestamp' }, + updatedAt: { type: 'string', description: 'Last update timestamp' }, + }, + }, + message: { + type: 'string', + description: 'Success or error message describing the operation result', + }, + documentId: { + type: 'string', + description: 'ID of the upserted document', + }, + }, +} diff --git a/apps/sim/tools/registry.ts b/apps/sim/tools/registry.ts index f14ff024ebf..4c7f2af33ad 100644 --- a/apps/sim/tools/registry.ts +++ b/apps/sim/tools/registry.ts @@ -1195,6 +1195,7 @@ import { knowledgeTriggerSyncTool, knowledgeUpdateChunkTool, knowledgeUploadChunkTool, + knowledgeUpsertDocumentTool, } from '@/tools/knowledge' import { langsmithCreateRunsBatchTool, langsmithCreateRunTool } from '@/tools/langsmith' import { lemlistGetActivitiesTool, lemlistGetLeadTool, lemlistSendEmailTool } from '@/tools/lemlist' @@ -3703,6 +3704,7 @@ export const tools: Record = { knowledge_list_connectors: knowledgeListConnectorsTool, knowledge_get_connector: knowledgeGetConnectorTool, knowledge_trigger_sync: knowledgeTriggerSyncTool, + knowledge_upsert_document: knowledgeUpsertDocumentTool, search_tool: searchTool, elevenlabs_tts: elevenLabsTtsTool, fathom_list_meetings: fathomListMeetingsTool, From 168cd585cb1785eb3dd9a2f031d191ebde37cb25 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com> Date: Tue, 17 Mar 2026 18:24:27 -0700 Subject: [PATCH 07/11] feat(mothership): request ids (#3645) * Include rid * Persist rid * fix ui * address comments * update types --------- Co-authored-by: Vikhyath Mondreti --- apps/sim/app/api/mothership/chat/route.ts | 1 + .../[workspaceId]/components/index.ts | 1 + .../components/message-actions/index.ts | 1 + .../message-actions/message-actions.tsx | 84 +++++++++++++++++++ .../app/workspace/[workspaceId]/home/home.tsx | 8 +- .../[workspaceId]/home/hooks/use-chat.ts | 21 ++++- .../app/workspace/[workspaceId]/home/types.ts | 2 + .../copilot-message/copilot-message.tsx | 10 ++- apps/sim/hooks/queries/tasks.ts | 1 + apps/sim/lib/copilot/client-sse/handlers.ts | 9 ++ apps/sim/lib/copilot/client-sse/types.ts | 1 + .../sim/lib/copilot/messages/serialization.ts | 4 + apps/sim/lib/copilot/orchestrator/index.ts | 1 + .../orchestrator/sse/handlers/handlers.ts | 6 ++ apps/sim/lib/copilot/orchestrator/types.ts | 3 + apps/sim/stores/panel/copilot/store.ts | 2 + apps/sim/stores/panel/copilot/types.ts | 1 + 17 files changed, 151 insertions(+), 5 deletions(-) create mode 100644 apps/sim/app/workspace/[workspaceId]/components/message-actions/index.ts create mode 100644 apps/sim/app/workspace/[workspaceId]/components/message-actions/message-actions.tsx diff --git a/apps/sim/app/api/mothership/chat/route.ts b/apps/sim/app/api/mothership/chat/route.ts index 18b66b5577c..c1478c172fb 100644 --- a/apps/sim/app/api/mothership/chat/route.ts +++ b/apps/sim/app/api/mothership/chat/route.ts @@ -279,6 +279,7 @@ export async function POST(req: NextRequest) { role: 'assistant' as const, content: result.content, timestamp: new Date().toISOString(), + ...(result.requestId ? { requestId: result.requestId } : {}), } if (result.toolCalls.length > 0) { assistantMessage.toolCalls = result.toolCalls diff --git a/apps/sim/app/workspace/[workspaceId]/components/index.ts b/apps/sim/app/workspace/[workspaceId]/components/index.ts index 4a08e4f6c79..28ae1e475a5 100644 --- a/apps/sim/app/workspace/[workspaceId]/components/index.ts +++ b/apps/sim/app/workspace/[workspaceId]/components/index.ts @@ -1,5 +1,6 @@ export { ErrorState, type ErrorStateProps } from './error' export { InlineRenameInput } from './inline-rename-input' +export { MessageActions } from './message-actions' export { ownerCell } from './resource/components/owner-cell/owner-cell' export type { BreadcrumbEditing, diff --git a/apps/sim/app/workspace/[workspaceId]/components/message-actions/index.ts b/apps/sim/app/workspace/[workspaceId]/components/message-actions/index.ts new file mode 100644 index 00000000000..906e30d5a6f --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/components/message-actions/index.ts @@ -0,0 +1 @@ +export { MessageActions } from './message-actions' diff --git a/apps/sim/app/workspace/[workspaceId]/components/message-actions/message-actions.tsx b/apps/sim/app/workspace/[workspaceId]/components/message-actions/message-actions.tsx new file mode 100644 index 00000000000..9d86664c812 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/components/message-actions/message-actions.tsx @@ -0,0 +1,84 @@ +'use client' + +import { useCallback, useEffect, useRef, useState } from 'react' +import { Check, Copy, Ellipsis, Hash } from 'lucide-react' +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuTrigger, +} from '@/components/emcn' + +interface MessageActionsProps { + content: string + requestId?: string +} + +export function MessageActions({ content, requestId }: MessageActionsProps) { + const [copied, setCopied] = useState<'message' | 'request' | null>(null) + const resetTimeoutRef = useRef(null) + + useEffect(() => { + return () => { + if (resetTimeoutRef.current !== null) { + window.clearTimeout(resetTimeoutRef.current) + } + } + }, []) + + const copyToClipboard = useCallback(async (text: string, type: 'message' | 'request') => { + try { + await navigator.clipboard.writeText(text) + setCopied(type) + if (resetTimeoutRef.current !== null) { + window.clearTimeout(resetTimeoutRef.current) + } + resetTimeoutRef.current = window.setTimeout(() => setCopied(null), 1500) + } catch { + return + } + }, []) + + if (!content && !requestId) { + return null + } + + return ( + + + + + + { + event.stopPropagation() + void copyToClipboard(content, 'message') + }} + > + {copied === 'message' ? : } + Copy Message + + { + event.stopPropagation() + if (requestId) { + void copyToClipboard(requestId, 'request') + } + }} + > + {copied === 'request' ? : } + Copy Request ID + + + + ) +} diff --git a/apps/sim/app/workspace/[workspaceId]/home/home.tsx b/apps/sim/app/workspace/[workspaceId]/home/home.tsx index 8bc0dac39b2..b238e60ca82 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/home.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/home.tsx @@ -13,6 +13,7 @@ import { LandingWorkflowSeedStorage, } from '@/lib/core/utils/browser-storage' import { persistImportedWorkflow } from '@/lib/workflows/operations/import-export' +import { MessageActions } from '@/app/workspace/[workspaceId]/components' import { useChatHistory, useMarkTaskRead } from '@/hooks/queries/tasks' import type { ChatContext } from '@/stores/panel' import { useSidebarStore } from '@/stores/sidebar/store' @@ -414,7 +415,12 @@ export function Home({ chatId }: HomeProps = {}) { const isLastMessage = index === messages.length - 1 return ( -
+
+ {!isThisStreaming && (msg.content || msg.contentBlocks?.length) && ( +
+ +
+ )} 0 @@ -509,6 +510,7 @@ export function useChat( let activeSubagent: string | undefined let runningText = '' let lastContentSource: 'main' | 'subagent' | null = null + let streamRequestId: string | undefined streamingContentRef.current = '' streamingBlocksRef.current = [] @@ -526,14 +528,21 @@ export function useChat( const flush = () => { if (isStale()) return streamingBlocksRef.current = [...blocks] - const snapshot = { content: runningText, contentBlocks: [...blocks] } + const snapshot: Partial = { + content: runningText, + contentBlocks: [...blocks], + } + if (streamRequestId) snapshot.requestId = streamRequestId setMessages((prev) => { if (expectedGen !== undefined && streamGenRef.current !== expectedGen) return prev const idx = prev.findIndex((m) => m.id === assistantId) if (idx >= 0) { return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m)) } - return [...prev, { id: assistantId, role: 'assistant' as const, ...snapshot }] + return [ + ...prev, + { id: assistantId, role: 'assistant' as const, content: '', ...snapshot }, + ] }) } @@ -597,6 +606,14 @@ export function useChat( } break } + case 'request_id': { + const rid = typeof parsed.data === 'string' ? parsed.data : undefined + if (rid) { + streamRequestId = rid + flush() + } + break + } case 'content': { const chunk = typeof parsed.data === 'string' ? parsed.data : (parsed.content ?? '') if (chunk) { diff --git a/apps/sim/app/workspace/[workspaceId]/home/types.ts b/apps/sim/app/workspace/[workspaceId]/home/types.ts index 1ed8d0ac65e..4a827c97aa1 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/types.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/types.ts @@ -33,6 +33,7 @@ export interface QueuedMessage { */ export type SSEEventType = | 'chat_id' + | 'request_id' | 'title_updated' | 'content' | 'reasoning' // openai reasoning - render as thinking text @@ -199,6 +200,7 @@ export interface ChatMessage { contentBlocks?: ContentBlock[] attachments?: ChatMessageAttachment[] contexts?: ChatMessageContext[] + requestId?: string } export const SUBAGENT_LABELS: Record = { diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/copilot-message/copilot-message.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/copilot-message/copilot-message.tsx index 187ff159480..0fc34449dec 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/copilot-message/copilot-message.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/components/copilot-message/copilot-message.tsx @@ -3,6 +3,7 @@ import { type FC, memo, useCallback, useMemo, useRef, useState } from 'react' import { RotateCcw } from 'lucide-react' import { Button } from '@/components/emcn' +import { MessageActions } from '@/app/workspace/[workspaceId]/components' import { OptionsSelector, parseSpecialTags, @@ -409,10 +410,15 @@ const CopilotMessage: FC = memo( if (isAssistant) { return (
-
+ {!isStreaming && (message.content || message.contentBlocks?.length) && ( +
+ +
+ )} +
{/* Content blocks in chronological order */} {memoizedContentBlocks || (isStreaming &&
)} diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index 198d59701c2..184859f2526 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -54,6 +54,7 @@ export interface TaskStoredMessage { id: string role: 'user' | 'assistant' content: string + requestId?: string toolCalls?: TaskStoredToolCall[] contentBlocks?: TaskStoredContentBlock[] fileAttachments?: TaskStoredFileAttachment[] diff --git a/apps/sim/lib/copilot/client-sse/handlers.ts b/apps/sim/lib/copilot/client-sse/handlers.ts index 7705e1cf3dd..f929d1057b5 100644 --- a/apps/sim/lib/copilot/client-sse/handlers.ts +++ b/apps/sim/lib/copilot/client-sse/handlers.ts @@ -92,6 +92,7 @@ export function flushStreamingUpdates(set: StoreSet) { if (update) { return { ...msg, + requestId: update.requestId ?? msg.requestId, content: '', contentBlocks: update.contentBlocks.length > 0 @@ -129,6 +130,7 @@ export function updateStreamingMessage(set: StoreSet, context: ClientStreamingCo const newMessages = [...messages] newMessages[messages.length - 1] = { ...lastMessage, + requestId: lastMessageUpdate.requestId ?? lastMessage.requestId, content: '', contentBlocks: lastMessageUpdate.contentBlocks.length > 0 @@ -143,6 +145,7 @@ export function updateStreamingMessage(set: StoreSet, context: ClientStreamingCo if (update) { return { ...msg, + requestId: update.requestId ?? msg.requestId, content: '', contentBlocks: update.contentBlocks.length > 0 @@ -429,6 +432,12 @@ export const sseHandlers: Record = { writeActiveStreamToStorage(updatedStream) } }, + request_id: (data, context) => { + const requestId = typeof data.data === 'string' ? data.data : undefined + if (requestId) { + context.requestId = requestId + } + }, title_updated: (_data, _context, get, set) => { const title = _data.title if (!title) return diff --git a/apps/sim/lib/copilot/client-sse/types.ts b/apps/sim/lib/copilot/client-sse/types.ts index 95fa8f077e2..66c18030e75 100644 --- a/apps/sim/lib/copilot/client-sse/types.ts +++ b/apps/sim/lib/copilot/client-sse/types.ts @@ -22,6 +22,7 @@ export interface ClientContentBlock { export interface StreamingContext { messageId: string + requestId?: string accumulatedContent: string contentBlocks: ClientContentBlock[] currentTextBlock: ClientContentBlock | null diff --git a/apps/sim/lib/copilot/messages/serialization.ts b/apps/sim/lib/copilot/messages/serialization.ts index 4a970cc92fe..89a30466806 100644 --- a/apps/sim/lib/copilot/messages/serialization.ts +++ b/apps/sim/lib/copilot/messages/serialization.ts @@ -141,6 +141,10 @@ export function serializeMessagesForDB( timestamp, } + if (msg.requestId) { + serialized.requestId = msg.requestId + } + if (Array.isArray(msg.contentBlocks) && msg.contentBlocks.length > 0) { serialized.contentBlocks = deepClone(msg.contentBlocks) } diff --git a/apps/sim/lib/copilot/orchestrator/index.ts b/apps/sim/lib/copilot/orchestrator/index.ts index 5e07bbf38a1..c29fae00550 100644 --- a/apps/sim/lib/copilot/orchestrator/index.ts +++ b/apps/sim/lib/copilot/orchestrator/index.ts @@ -76,6 +76,7 @@ export async function orchestrateCopilotStream( contentBlocks: context.contentBlocks, toolCalls: buildToolCallSummaries(context), chatId: context.chatId, + requestId: context.requestId, errors: context.errors.length ? context.errors : undefined, usage: context.usage, cost: context.cost, diff --git a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts index d0431b59cd5..a20ccd40996 100644 --- a/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts @@ -187,6 +187,12 @@ export const sseHandlers: Record = { execContext.chatId = chatId } }, + request_id: (event, context) => { + const rid = typeof event.data === 'string' ? event.data : undefined + if (rid) { + context.requestId = rid + } + }, title_updated: () => {}, tool_result: (event, context) => { const data = getEventData(event) diff --git a/apps/sim/lib/copilot/orchestrator/types.ts b/apps/sim/lib/copilot/orchestrator/types.ts index 130666c81e6..e79d0a65360 100644 --- a/apps/sim/lib/copilot/orchestrator/types.ts +++ b/apps/sim/lib/copilot/orchestrator/types.ts @@ -2,6 +2,7 @@ import type { MothershipResource } from '@/lib/copilot/resource-types' export type SSEEventType = | 'chat_id' + | 'request_id' | 'title_updated' | 'content' | 'reasoning' @@ -88,6 +89,7 @@ export interface ContentBlock { export interface StreamingContext { chatId?: string + requestId?: string messageId: string accumulatedContent: string contentBlocks: ContentBlock[] @@ -154,6 +156,7 @@ export interface OrchestratorResult { contentBlocks: ContentBlock[] toolCalls: ToolCallSummary[] chatId?: string + requestId?: string error?: string errors?: string[] usage?: { prompt: number; completion: number } diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index dba7db4ac42..a2c3249441f 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -224,6 +224,7 @@ function replaceTextBlocks(blocks: ClientContentBlock[], text: string): ClientCo function createClientStreamingContext(messageId: string): ClientStreamingContext { return { messageId, + requestId: undefined, accumulatedContent: '', contentBlocks: [], currentTextBlock: null, @@ -2043,6 +2044,7 @@ export const useCopilotStore = create()( msg.id === assistantMessageId ? { ...msg, + requestId: context.requestId ?? msg.requestId, content: finalContentWithOptions, contentBlocks: sanitizedContentBlocks, } diff --git a/apps/sim/stores/panel/copilot/types.ts b/apps/sim/stores/panel/copilot/types.ts index dde7e3fc552..798c71e1545 100644 --- a/apps/sim/stores/panel/copilot/types.ts +++ b/apps/sim/stores/panel/copilot/types.ts @@ -70,6 +70,7 @@ export interface CopilotMessage { role: 'user' | 'assistant' | 'system' content: string timestamp: string + requestId?: string citations?: { id: number; title: string; url: string; similarity?: number }[] toolCalls?: CopilotToolCall[] contentBlocks?: ClientContentBlock[] From 28de28899a33df670add0aa08d0a1ec766720057 Mon Sep 17 00:00:00 2001 From: Waleed Date: Tue, 17 Mar 2026 20:38:16 -0700 Subject: [PATCH 08/11] improvement(landing): added enterprise section (#3637) * improvement(landing): added enterprise section * make components interactive * added more things to pricing sheet * remove debug log * fix(landing): remove dead DotGrid component and fix enterprise CTA to use Link Co-Authored-By: Claude Sonnet 4.6 --------- Co-authored-by: Claude Sonnet 4.6 --- .../components/enterprise/enterprise.tsx | 476 +++++++++++++++++- .../features/components/features-preview.tsx | 312 +++++++++--- .../app/(home)/components/pricing/pricing.tsx | 37 +- apps/sim/app/(home)/landing.tsx | 6 +- 4 files changed, 730 insertions(+), 101 deletions(-) diff --git a/apps/sim/app/(home)/components/enterprise/enterprise.tsx b/apps/sim/app/(home)/components/enterprise/enterprise.tsx index 1d07bd58a38..ca967b13b2c 100644 --- a/apps/sim/app/(home)/components/enterprise/enterprise.tsx +++ b/apps/sim/app/(home)/components/enterprise/enterprise.tsx @@ -4,14 +4,484 @@ * SEO: * - `
`. * - `

` for the section title. - * - Compliance certs (SOC2, HIPAA) as visible `` text. + * - Compliance certs (SOC 2, HIPAA) as visible `` text. * - Enterprise CTA links to contact form via `` with `rel="noopener noreferrer"`. * * GEO: - * - Entity-rich: "Sim is SOC2 and HIPAA compliant" — not "We are compliant." + * - Entity-rich: "Sim is SOC 2 and HIPAA compliant" — not "We are compliant." * - `
    ` checklist of features (SSO, RBAC, audit logs, SLA, on-premise deployment) * as an atomic answer block for "What enterprise features does Sim offer?". */ +'use client' + +import { useEffect, useRef, useState } from 'react' +import { AnimatePresence, motion } from 'framer-motion' +import Image from 'next/image' +import Link from 'next/link' +import { Badge, ChevronDown } from '@/components/emcn' +import { Lock } from '@/components/emcn/icons' +import { GithubIcon } from '@/components/icons' + +/** Consistent color per actor — same pattern as Collaboration section cursors. */ +const ACTOR_COLORS: Record = { + 'Sarah K.': '#2ABBF8', + 'Sid G.': '#33C482', + 'Theo L.': '#FA4EDF', + 'Abhay K.': '#FFCC02', + 'Danny S.': '#FF6B35', +} + +/** Left accent bar opacity by recency — newest is brightest. */ +const ACCENT_OPACITIES = [0.75, 0.45, 0.28, 0.15, 0.07] as const + +/** Human-readable label per resource type. */ +const RESOURCE_TYPE_LABEL: Record = { + workflow: 'Workflow', + member: 'Member', + byok_key: 'BYOK Key', + api_key: 'API Key', + permission_group: 'Permission Group', + credential_set: 'Credential Set', + knowledge_base: 'Knowledge Base', + environment: 'Environment', + mcp_server: 'MCP Server', + file: 'File', + webhook: 'Webhook', + chat: 'Chat', + table: 'Table', + folder: 'Folder', + document: 'Document', +} + +interface LogEntry { + id: number + actor: string + /** Matches the `description` field stored by recordAudit() */ + description: string + resourceType: string + /** Unix ms timestamp of when this entry was "received" */ + insertedAt: number +} + +function formatTimeAgo(insertedAt: number): string { + const elapsed = Date.now() - insertedAt + if (elapsed < 8_000) return 'just now' + if (elapsed < 60_000) return `${Math.floor(elapsed / 1000)}s ago` + return `${Math.floor(elapsed / 60_000)}m ago` +} + +/** + * Entry templates using real description strings from the actual recordAudit() + * calls across the codebase (e.g. `Added BYOK key for openai`, + * `Invited alex@acme.com to workspace as member`). + */ +const ENTRY_TEMPLATES: Omit[] = [ + { actor: 'Sarah K.', description: 'Deployed workflow "Email Triage"', resourceType: 'workflow' }, + { + actor: 'Sid G.', + description: 'Invited alex@acme.com to workspace as member', + resourceType: 'member', + }, + { actor: 'Theo L.', description: 'Added BYOK key for openai', resourceType: 'byok_key' }, + { actor: 'Sarah K.', description: 'Created workflow "Invoice Parser"', resourceType: 'workflow' }, + { + actor: 'Abhay K.', + description: 'Created permission group "Engineering"', + resourceType: 'permission_group', + }, + { actor: 'Danny S.', description: 'Created API key "Production Key"', resourceType: 'api_key' }, + { + actor: 'Theo L.', + description: 'Changed permissions for sam@acme.com to editor', + resourceType: 'member', + }, + { actor: 'Sarah K.', description: 'Uploaded file "Q3_Report.pdf"', resourceType: 'file' }, + { + actor: 'Sid G.', + description: 'Created credential set "Prod Keys"', + resourceType: 'credential_set', + }, + { + actor: 'Abhay K.', + description: 'Created knowledge base "Internal Docs"', + resourceType: 'knowledge_base', + }, + { actor: 'Danny S.', description: 'Updated environment variables', resourceType: 'environment' }, + { + actor: 'Sarah K.', + description: 'Added tool "search_web" to MCP server', + resourceType: 'mcp_server', + }, + { actor: 'Sid G.', description: 'Created webhook "Stripe Payment"', resourceType: 'webhook' }, + { actor: 'Theo L.', description: 'Deployed chat "Support Assistant"', resourceType: 'chat' }, + { actor: 'Abhay K.', description: 'Created table "Lead Tracker"', resourceType: 'table' }, + { actor: 'Danny S.', description: 'Revoked API key "Staging Key"', resourceType: 'api_key' }, + { + actor: 'Sarah K.', + description: 'Duplicated workflow "Data Enrichment"', + resourceType: 'workflow', + }, + { + actor: 'Sid G.', + description: 'Removed member theo@acme.com from workspace', + resourceType: 'member', + }, + { + actor: 'Theo L.', + description: 'Updated knowledge base "Product Docs"', + resourceType: 'knowledge_base', + }, + { actor: 'Abhay K.', description: 'Created folder "Finance Workflows"', resourceType: 'folder' }, + { + actor: 'Danny S.', + description: 'Uploaded document "onboarding-guide.pdf"', + resourceType: 'document', + }, + { + actor: 'Sarah K.', + description: 'Updated credential set "Prod Keys"', + resourceType: 'credential_set', + }, + { + actor: 'Sid G.', + description: 'Added member abhay@acme.com to permission group "Engineering"', + resourceType: 'permission_group', + }, + { actor: 'Theo L.', description: 'Locked workflow "Customer Sync"', resourceType: 'workflow' }, +] + +const INITIAL_OFFSETS_MS = [0, 20_000, 75_000, 240_000, 540_000] + +const MARQUEE_KEYFRAMES = ` + @keyframes marquee { + 0% { transform: translateX(0); } + 100% { transform: translateX(-25%); } + } + @media (prefers-reduced-motion: reduce) { + @keyframes marquee { 0%, 100% { transform: none; } } + } +` + +const FEATURE_TAGS = [ + 'Access Control', + 'Self-Hosting', + 'Bring Your Own Key', + 'Credential Sharing', + 'Custom Limits', + 'Admin API', + 'White Labeling', + 'Dedicated Support', + '99.99% Uptime SLA', + 'Workflow Versioning', + 'On-Premise', + 'Organizations', + 'Workspace Export', + 'Audit Logs', +] as const + +interface AuditRowProps { + entry: LogEntry + index: number +} + +function AuditRow({ entry, index }: AuditRowProps) { + const color = ACTOR_COLORS[entry.actor] ?? '#F6F6F6' + const accentOpacity = ACCENT_OPACITIES[index] ?? 0.04 + const timeAgo = formatTimeAgo(entry.insertedAt) + const resourceLabel = RESOURCE_TYPE_LABEL[entry.resourceType] + + return ( +
    + {/* Left accent bar — brightness encodes recency */} + + ) +} + +function AuditLogPreview() { + const counterRef = useRef(ENTRY_TEMPLATES.length) + const templateIndexRef = useRef(5 % ENTRY_TEMPLATES.length) + + const now = Date.now() + const [entries, setEntries] = useState(() => + ENTRY_TEMPLATES.slice(0, 5).map((t, i) => ({ + ...t, + id: i, + insertedAt: now - INITIAL_OFFSETS_MS[i], + })) + ) + const [, tick] = useState(0) + + useEffect(() => { + const addInterval = setInterval(() => { + const template = ENTRY_TEMPLATES[templateIndexRef.current] + templateIndexRef.current = (templateIndexRef.current + 1) % ENTRY_TEMPLATES.length + + setEntries((prev) => [ + { ...template, id: counterRef.current++, insertedAt: Date.now() }, + ...prev.slice(0, 4), + ]) + }, 2600) + + // Refresh time labels every 5s so "just now" ages to "Xs ago" + const tickInterval = setInterval(() => tick((n) => n + 1), 5_000) + + return () => { + clearInterval(addInterval) + clearInterval(tickInterval) + } + }, []) + + return ( +
    + {/* Header */} +
    +
    + {/* Pulsing live indicator */} + + + + + + Audit Log + +
    +
    + + Export + + + Filter + +
    +
    + + {/* Log entries — new items push existing ones down */} +
    + + {entries.map((entry, index) => ( + + + + ))} + +
    +
    + ) +} + +function TrustStrip() { + return ( +
    + {/* SOC 2 + HIPAA combined */} + + SOC 2 Type II +
    + + SOC 2 & HIPAA + + + Type II · PHI protected → + +
    + + + {/* Open Source — center */} + +
    + +
    +
    + + Open Source + + + View on GitHub → + +
    + + + {/* SSO */} +
    +
    + +
    +
    + + SSO & SCIM + + + Okta, Azure AD, Google + +
    +
    +
    + ) +} + export default function Enterprise() { - return null + return ( +
    +
    +
    + + Enterprise + + +

    + Enterprise features for +
    + fast, scalable workflows +

    +
    + +
    + + + + {/* Scrolling feature ticker */} +
    +