From da50222b7cf70da153e8b83a25b66c513ae9937a Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Wed, 6 May 2026 09:44:24 -0700 Subject: [PATCH] improvement(func-exec): normalize inputs to match schema --- apps/sim/executor/execution/block-executor.ts | 3 +- apps/sim/executor/execution/executor.ts | 8 +- .../executor/execution/snapshot-serializer.ts | 4 +- apps/sim/executor/execution/snapshot.test.ts | 30 +++++++ apps/sim/executor/execution/snapshot.ts | 30 +++++-- .../executor/handlers/agent/agent-handler.ts | 9 +- .../handlers/condition/condition-handler.ts | 5 +- .../function/function-handler.test.ts | 22 +++++ .../handlers/function/function-handler.ts | 12 ++- apps/sim/lib/core/utils/arrays.test.ts | 13 +++ apps/sim/lib/core/utils/arrays.ts | 10 +++ apps/sim/lib/core/utils/records.test.ts | 64 +++++++++++++ apps/sim/lib/core/utils/records.ts | 90 +++++++++++++++++++ .../lib/workflows/executor/execution-core.ts | 21 ++--- .../executor/queued-workflow-execution.ts | 2 +- apps/sim/providers/utils.ts | 19 +++- apps/sim/tools/function/execute.ts | 18 ++-- apps/sim/tools/utils.ts | 15 ++-- 18 files changed, 325 insertions(+), 50 deletions(-) create mode 100644 apps/sim/executor/execution/snapshot.test.ts create mode 100644 apps/sim/lib/core/utils/arrays.test.ts create mode 100644 apps/sim/lib/core/utils/arrays.ts create mode 100644 apps/sim/lib/core/utils/records.test.ts create mode 100644 apps/sim/lib/core/utils/records.ts diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 73cdcb8d674..3803e53ffd6 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -1,6 +1,7 @@ import { createLogger, type Logger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { redactApiKeys } from '@/lib/core/security/redaction' +import { normalizeStringArray } from '@/lib/core/utils/arrays' import { getBaseUrl } from '@/lib/core/utils/urls' import { containsUserFileWithMetadata, @@ -164,7 +165,7 @@ export class BlockExecutor { block, streamingExec, resolvedInputs, - ctx.selectedOutputs ?? [] + normalizeStringArray(ctx.selectedOutputs) ) } diff --git a/apps/sim/executor/execution/executor.ts b/apps/sim/executor/execution/executor.ts index 8e3a8c8c8c9..a141e017fb1 100644 --- a/apps/sim/executor/execution/executor.ts +++ b/apps/sim/executor/execution/executor.ts @@ -1,4 +1,6 @@ import { createLogger, type Logger } from '@sim/logger' +import { normalizeStringArray } from '@/lib/core/utils/arrays' +import { normalizeStringRecord, normalizeWorkflowVariables } from '@/lib/core/utils/records' import { StartBlockPath } from '@/lib/workflows/triggers/triggers' import type { DAG } from '@/executor/dag/builder' import { DAGBuilder } from '@/executor/dag/builder' @@ -56,9 +58,9 @@ export class DAGExecutor { constructor(options: DAGExecutorOptions) { this.workflow = options.workflow - this.environmentVariables = options.envVarValues ?? {} + this.environmentVariables = normalizeStringRecord(options.envVarValues) this.workflowInput = options.workflowInput ?? {} - this.workflowVariables = options.workflowVariables ?? {} + this.workflowVariables = normalizeWorkflowVariables(options.workflowVariables) this.contextExtensions = options.contextExtensions ?? {} this.dagBuilder = new DAGBuilder() this.execLogger = logger.withMetadata({ @@ -325,7 +327,7 @@ export class DAGExecutor { : new Set(), workflow: this.workflow, stream: this.contextExtensions.stream ?? false, - selectedOutputs: this.contextExtensions.selectedOutputs ?? [], + selectedOutputs: normalizeStringArray(this.contextExtensions.selectedOutputs), edges: this.contextExtensions.edges ?? [], onStream: this.contextExtensions.onStream, onBlockStart: this.contextExtensions.onBlockStart, diff --git a/apps/sim/executor/execution/snapshot-serializer.ts b/apps/sim/executor/execution/snapshot-serializer.ts index 052d4b284b3..76c2a3dba5f 100644 --- a/apps/sim/executor/execution/snapshot-serializer.ts +++ b/apps/sim/executor/execution/snapshot-serializer.ts @@ -119,8 +119,8 @@ export function serializePauseSnapshot( executionMetadata, context.workflow, {}, - context.workflowVariables ?? {}, - context.selectedOutputs ?? [], + context.workflowVariables, + context.selectedOutputs, state ) diff --git a/apps/sim/executor/execution/snapshot.test.ts b/apps/sim/executor/execution/snapshot.test.ts new file mode 100644 index 00000000000..23c7d110cdf --- /dev/null +++ b/apps/sim/executor/execution/snapshot.test.ts @@ -0,0 +1,30 @@ +import { describe, expect, it } from 'vitest' +import { ExecutionSnapshot } from '@/executor/execution/snapshot' +import type { ExecutionMetadata } from '@/executor/execution/types' + +const metadata: ExecutionMetadata = { + requestId: 'request-1', + executionId: 'execution-1', + workflowId: 'workflow-1', + workspaceId: 'workspace-1', + userId: 'user-1', + triggerType: 'manual', + startTime: '2026-05-06T00:00:00.000Z', +} + +describe('ExecutionSnapshot', () => { + it('normalizes untyped persisted execution state at construction', () => { + const variable = { id: 'var-1', name: 'brand', type: 'plain', value: 'myfitness' } + + const snapshot = new ExecutionSnapshot( + metadata, + { blocks: [] }, + {}, + [variable], + ['agent.content', 123, 'function.result'] + ) + + expect(snapshot.workflowVariables).toEqual({ 'var-1': variable }) + expect(snapshot.selectedOutputs).toEqual(['agent.content', 'function.result']) + }) +}) diff --git a/apps/sim/executor/execution/snapshot.ts b/apps/sim/executor/execution/snapshot.ts index afe9bf52d7f..6e372f97d73 100644 --- a/apps/sim/executor/execution/snapshot.ts +++ b/apps/sim/executor/execution/snapshot.ts @@ -1,14 +1,30 @@ +import { normalizeStringArray } from '@/lib/core/utils/arrays' +import { normalizeWorkflowVariables } from '@/lib/core/utils/records' import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types' export class ExecutionSnapshot { + public readonly metadata: ExecutionMetadata + public readonly workflow: any + public readonly input: any + public readonly workflowVariables: Record + public readonly selectedOutputs: string[] + public readonly state?: SerializableExecutionState + constructor( - public readonly metadata: ExecutionMetadata, - public readonly workflow: any, - public readonly input: any, - public readonly workflowVariables: Record, - public readonly selectedOutputs: string[] = [], - public readonly state?: SerializableExecutionState - ) {} + metadata: ExecutionMetadata, + workflow: any, + input: any, + workflowVariables: unknown, + selectedOutputs: unknown = [], + state?: SerializableExecutionState + ) { + this.metadata = metadata + this.workflow = workflow + this.input = input + this.workflowVariables = normalizeWorkflowVariables(workflowVariables) + this.selectedOutputs = normalizeStringArray(selectedOutputs) + this.state = state + } toJSON(): string { return JSON.stringify({ diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts index b0384a0b7c4..e09b75387f7 100644 --- a/apps/sim/executor/handlers/agent/agent-handler.ts +++ b/apps/sim/executor/handlers/agent/agent-handler.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' import { sleep } from '@sim/utils/helpers' import { and, eq, inArray, isNull } from 'drizzle-orm' +import { normalizeStringRecord, normalizeWorkflowVariables } from '@/lib/core/utils/records' import { createMcpToolId } from '@/lib/mcp/utils' import { getCustomToolById } from '@/lib/workflows/custom-tools/operations' import { getAllBlocks } from '@/blocks' @@ -815,8 +816,8 @@ export class AgentBlockHandler implements BlockHandler { userId: ctx.userId, stream: streaming, messages: messages?.map(({ executionId, ...msg }) => msg), - environmentVariables: ctx.environmentVariables || {}, - workflowVariables: ctx.workflowVariables || {}, + environmentVariables: normalizeStringRecord(ctx.environmentVariables), + workflowVariables: normalizeWorkflowVariables(ctx.workflowVariables), blockData, blockNameMapping, reasoningEffort: inputs.reasoningEffort, @@ -885,8 +886,8 @@ export class AgentBlockHandler implements BlockHandler { userId: ctx.userId, stream: providerRequest.stream, messages: 'messages' in providerRequest ? providerRequest.messages : undefined, - environmentVariables: ctx.environmentVariables || {}, - workflowVariables: ctx.workflowVariables || {}, + environmentVariables: normalizeStringRecord(ctx.environmentVariables), + workflowVariables: normalizeWorkflowVariables(ctx.workflowVariables), blockData, blockNameMapping, isDeployedContext: ctx.isDeployedContext, diff --git a/apps/sim/executor/handlers/condition/condition-handler.ts b/apps/sim/executor/handlers/condition/condition-handler.ts index 60ad9f99860..9fd4dd50f3c 100644 --- a/apps/sim/executor/handlers/condition/condition-handler.ts +++ b/apps/sim/executor/handlers/condition/condition-handler.ts @@ -1,4 +1,5 @@ import { createLogger } from '@sim/logger' +import { normalizeStringRecord, normalizeWorkflowVariables } from '@/lib/core/utils/records' import type { BlockOutput } from '@/blocks/types' import { BlockType, CONDITION, DEFAULTS, EDGE } from '@/executor/constants' import type { BlockHandler, ExecutionContext } from '@/executor/types' @@ -40,8 +41,8 @@ export async function evaluateConditionExpression( { code, timeout: CONDITION_TIMEOUT_MS, - envVars: ctx.environmentVariables || {}, - workflowVariables: ctx.workflowVariables || {}, + envVars: normalizeStringRecord(ctx.environmentVariables), + workflowVariables: normalizeWorkflowVariables(ctx.workflowVariables), blockData, blockNameMapping, blockOutputSchemas, diff --git a/apps/sim/executor/handlers/function/function-handler.test.ts b/apps/sim/executor/handlers/function/function-handler.test.ts index 384540cc7b2..d7a4e19929f 100644 --- a/apps/sim/executor/handlers/function/function-handler.test.ts +++ b/apps/sim/executor/handlers/function/function-handler.test.ts @@ -196,6 +196,28 @@ describe('FunctionBlockHandler', () => { ) }) + it('should normalize malformed execution context records before calling function_execute', async () => { + const legacyVariable = { id: 'var-1', name: 'brand', type: 'plain', value: 'myfitness' } + mockContext.workflowVariables = [legacyVariable] as unknown as Record + mockContext.environmentVariables = ['invalid-env'] as unknown as Record + + await handler.execute(mockContext, mockBlock, { + code: 'return "myfitness"', + [FUNCTION_BLOCK_CONTEXT_VARS_KEY]: ['invalid-context'], + }) + + expect(mockExecuteTool).toHaveBeenCalledWith( + 'function_execute', + expect.objectContaining({ + envVars: {}, + workflowVariables: { 'var-1': legacyVariable }, + contextVariables: {}, + }), + false, + mockContext + ) + }) + it('should handle tool error with no specific message', async () => { const inputs = { code: 'some code' } const errorResult = { success: false } diff --git a/apps/sim/executor/handlers/function/function-handler.ts b/apps/sim/executor/handlers/function/function-handler.ts index c008a8d07ea..5cd01e1aa1f 100644 --- a/apps/sim/executor/handlers/function/function-handler.ts +++ b/apps/sim/executor/handlers/function/function-handler.ts @@ -1,3 +1,8 @@ +import { + normalizeRecord, + normalizeStringRecord, + normalizeWorkflowVariables, +} from '@/lib/core/utils/records' import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/execution/constants' import { DEFAULT_CODE_LANGUAGE } from '@/lib/execution/languages' import { BlockType } from '@/executor/constants' @@ -26,8 +31,7 @@ export class FunctionBlockHandler implements BlockHandler { const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx) - const contextVariables = - (inputs[FUNCTION_BLOCK_CONTEXT_VARS_KEY] as Record | undefined) ?? {} + const contextVariables = normalizeRecord(inputs[FUNCTION_BLOCK_CONTEXT_VARS_KEY]) const result = await executeTool( 'function_execute', @@ -35,8 +39,8 @@ export class FunctionBlockHandler implements BlockHandler { code: codeContent, language: inputs.language || DEFAULT_CODE_LANGUAGE, timeout: inputs.timeout || DEFAULT_EXECUTION_TIMEOUT_MS, - envVars: ctx.environmentVariables || {}, - workflowVariables: ctx.workflowVariables || {}, + envVars: normalizeStringRecord(ctx.environmentVariables), + workflowVariables: normalizeWorkflowVariables(ctx.workflowVariables), blockData, blockNameMapping, blockOutputSchemas, diff --git a/apps/sim/lib/core/utils/arrays.test.ts b/apps/sim/lib/core/utils/arrays.test.ts new file mode 100644 index 00000000000..54062fa7f8d --- /dev/null +++ b/apps/sim/lib/core/utils/arrays.test.ts @@ -0,0 +1,13 @@ +import { describe, expect, it } from 'vitest' +import { normalizeStringArray } from '@/lib/core/utils/arrays' + +describe('array normalization utilities', () => { + it('normalizes string arrays loaded from untyped state', () => { + expect(normalizeStringArray(['output-1', 2, 'output-2', null])).toEqual([ + 'output-1', + 'output-2', + ]) + expect(normalizeStringArray('output-1')).toEqual([]) + expect(normalizeStringArray(undefined)).toEqual([]) + }) +}) diff --git a/apps/sim/lib/core/utils/arrays.ts b/apps/sim/lib/core/utils/arrays.ts new file mode 100644 index 00000000000..ffc270d6081 --- /dev/null +++ b/apps/sim/lib/core/utils/arrays.ts @@ -0,0 +1,10 @@ +/** + * Normalizes optional string-list values loaded from untyped persisted state. + */ +export function normalizeStringArray(value: unknown): string[] { + if (!Array.isArray(value)) { + return [] + } + + return value.filter((item): item is string => typeof item === 'string') +} diff --git a/apps/sim/lib/core/utils/records.test.ts b/apps/sim/lib/core/utils/records.test.ts new file mode 100644 index 00000000000..97a46f0e5dc --- /dev/null +++ b/apps/sim/lib/core/utils/records.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, it } from 'vitest' +import { + isPlainRecord, + normalizeRecord, + normalizeRecordMap, + normalizeStringRecord, + normalizeWorkflowVariables, +} from '@/lib/core/utils/records' + +describe('record normalization utilities', () => { + it('identifies plain records without accepting arrays or null', () => { + expect(isPlainRecord({})).toBe(true) + expect(isPlainRecord(Object.create(null))).toBe(true) + expect(isPlainRecord([])).toBe(false) + expect(isPlainRecord(null)).toBe(false) + }) + + it('normalizes unknown values to object records', () => { + expect(normalizeRecord({ value: 1 })).toEqual({ value: 1 }) + expect(normalizeRecord([])).toEqual({}) + expect(normalizeRecord('not-a-record')).toEqual({}) + }) + + it('normalizes string records for environment-like values', () => { + expect( + normalizeStringRecord({ + TOKEN: 'secret', + RETRIES: 3, + ENABLED: true, + EMPTY: null, + }) + ).toEqual({ + TOKEN: 'secret', + RETRIES: '3', + ENABLED: 'true', + }) + expect(normalizeStringRecord([])).toEqual({}) + }) + + it('normalizes record maps by dropping malformed entries', () => { + expect( + normalizeRecordMap({ + valid: { type: 'string' }, + invalid: [], + }) + ).toEqual({ + valid: { type: 'string' }, + }) + }) + + it('normalizes legacy workflow variable arrays into records', () => { + const variableWithId = { id: 'var-1', name: 'brand', type: 'plain', value: 'myfitness' } + const variableWithName = { name: 'channel', type: 'plain', value: 'whatsapp' } + + expect(normalizeWorkflowVariables([variableWithId, variableWithName, []])).toEqual({ + 'var-1': variableWithId, + channel: variableWithName, + }) + expect(normalizeWorkflowVariables({ existing: variableWithId })).toEqual({ + existing: variableWithId, + }) + expect(normalizeWorkflowVariables('not-a-record')).toEqual({}) + }) +}) diff --git a/apps/sim/lib/core/utils/records.ts b/apps/sim/lib/core/utils/records.ts new file mode 100644 index 00000000000..f5e0c4842ca --- /dev/null +++ b/apps/sim/lib/core/utils/records.ts @@ -0,0 +1,90 @@ +export type UnknownRecord = Record +export type StringRecord = Record + +/** + * Returns true only for object-map values, excluding arrays and null. + */ +export function isPlainRecord(value: unknown): value is UnknownRecord { + if (typeof value !== 'object' || value === null || Array.isArray(value)) { + return false + } + + const prototype = Object.getPrototypeOf(value) + return prototype === Object.prototype || prototype === null +} + +/** + * Normalizes optional execution context maps to the record shape expected by + * internal API contracts. + */ +export function normalizeRecord(value: unknown): UnknownRecord { + return isPlainRecord(value) ? value : {} +} + +/** + * Normalizes environment-like maps to string values, matching process/env + * semantics at execution boundaries. + */ +export function normalizeStringRecord(value: unknown): StringRecord { + if (!isPlainRecord(value)) { + return {} + } + + const normalized: StringRecord = {} + for (const [key, entryValue] of Object.entries(value)) { + if (entryValue === undefined || entryValue === null) { + continue + } + normalized[key] = typeof entryValue === 'string' ? entryValue : String(entryValue) + } + return normalized +} + +/** + * Normalizes record-of-record maps such as block output schema maps. + */ +export function normalizeRecordMap(value: unknown): Record { + if (!isPlainRecord(value)) { + return {} + } + + const normalized: Record = {} + for (const [key, entryValue] of Object.entries(value)) { + if (isPlainRecord(entryValue)) { + normalized[key] = entryValue + } + } + return normalized +} + +/** + * Workflow variables are stored as a record in current state, while some + * legacy and imported snapshots can carry an array of variable objects. + */ +export function normalizeWorkflowVariables(value: unknown): UnknownRecord { + if (isPlainRecord(value)) { + return value + } + + if (!Array.isArray(value)) { + return {} + } + + const normalized: UnknownRecord = {} + for (const variable of value) { + if (!isPlainRecord(variable)) { + continue + } + + const id = typeof variable.id === 'string' && variable.id.trim() ? variable.id : undefined + const name = + typeof variable.name === 'string' && variable.name.trim() ? variable.name : undefined + const key = id ?? name + + if (key) { + normalized[key] = variable + } + } + + return normalized +} diff --git a/apps/sim/lib/workflows/executor/execution-core.ts b/apps/sim/lib/workflows/executor/execution-core.ts index aec5f956c50..22b58c5e707 100644 --- a/apps/sim/lib/workflows/executor/execution-core.ts +++ b/apps/sim/lib/workflows/executor/execution-core.ts @@ -7,6 +7,7 @@ import { createLogger } from '@sim/logger' import { mergeSubblockStateWithValues } from '@sim/workflow-persistence/subblocks' import type { Edge } from 'reactflow' import { z } from 'zod' +import { isPlainRecord } from '@/lib/core/utils/records' import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils' import { clearExecutionCancellation } from '@/lib/execution/cancellation' import type { LoggingSession } from '@/lib/logs/execution/logging-session' @@ -581,6 +582,16 @@ export async function executeWorkflowCore( callChain: metadata.callChain, } + for (const variable of Object.values(workflowVariables)) { + if ( + isPlainRecord(variable) && + variable.value !== undefined && + typeof variable.type === 'string' + ) { + variable.value = parseVariableValueByType(variable.value, variable.type) + } + } + const executorInstance = new Executor({ workflow: serializedWorkflow, envVarValues: decryptedEnvVars, @@ -589,16 +600,6 @@ export async function executeWorkflowCore( contextExtensions, }) - // Convert initial workflow variables to their native types - if (workflowVariables) { - for (const [varId, variable] of Object.entries(workflowVariables)) { - const v = variable as { value?: unknown; type?: string } - if (v.value !== undefined && v.type) { - v.value = parseVariableValueByType(v.value, v.type) - } - } - } - const result = runFromBlock ? ((await executorInstance.executeFromBlock( workflowId, diff --git a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts index 06a851a3b53..0106e823195 100644 --- a/apps/sim/lib/workflows/executor/queued-workflow-execution.ts +++ b/apps/sim/lib/workflows/executor/queued-workflow-execution.ts @@ -138,7 +138,7 @@ export async function executeQueuedWorkflowJob( payload.workflow, payload.input, payload.variables, - payload.selectedOutputs ?? [] + payload.selectedOutputs ) let callbacks = {} diff --git a/apps/sim/providers/utils.ts b/apps/sim/providers/utils.ts index 0d5b7be022f..780776aed36 100644 --- a/apps/sim/providers/utils.ts +++ b/apps/sim/providers/utils.ts @@ -5,6 +5,11 @@ import type { CompletionUsage } from 'openai/resources/completions' import { dollarsToCredits } from '@/lib/billing/credits/conversion' import { env } from '@/lib/core/config/env' import { getBlacklistedProvidersFromEnv, isHosted } from '@/lib/core/config/feature-flags' +import { + normalizeRecord, + normalizeStringRecord, + normalizeWorkflowVariables, +} from '@/lib/core/utils/records' import { buildCanonicalIndex, type CanonicalGroup, @@ -1166,10 +1171,16 @@ export function prepareToolExecution( }, } : {}), - ...(request.environmentVariables ? { envVars: request.environmentVariables } : {}), - ...(request.workflowVariables ? { workflowVariables: request.workflowVariables } : {}), - ...(request.blockData ? { blockData: request.blockData } : {}), - ...(request.blockNameMapping ? { blockNameMapping: request.blockNameMapping } : {}), + ...(request.environmentVariables + ? { envVars: normalizeStringRecord(request.environmentVariables) } + : {}), + ...(request.workflowVariables + ? { workflowVariables: normalizeWorkflowVariables(request.workflowVariables) } + : {}), + ...(request.blockData ? { blockData: normalizeRecord(request.blockData) } : {}), + ...(request.blockNameMapping + ? { blockNameMapping: normalizeStringRecord(request.blockNameMapping) } + : {}), ...(tool.parameters ? { _toolSchema: tool.parameters } : {}), } diff --git a/apps/sim/tools/function/execute.ts b/apps/sim/tools/function/execute.ts index 59873843cbb..7ee26f5c4b3 100644 --- a/apps/sim/tools/function/execute.ts +++ b/apps/sim/tools/function/execute.ts @@ -1,3 +1,9 @@ +import { + normalizeRecord, + normalizeRecordMap, + normalizeStringRecord, + normalizeWorkflowVariables, +} from '@/lib/core/utils/records' import { DEFAULT_EXECUTION_TIMEOUT_MS } from '@/lib/execution/constants' import { DEFAULT_CODE_LANGUAGE } from '@/lib/execution/languages' import type { CodeExecutionInput, CodeExecutionOutput } from '@/tools/function/types' @@ -123,12 +129,12 @@ export const functionExecuteTool: ToolConfig