Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 246 additions & 1 deletion apps/sim/executor/handlers/condition/condition-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ describe('ConditionBlockHandler', () => {

await handler.execute(mockContext, mockBlock, inputs)

expect(mockCollectBlockData).toHaveBeenCalledWith(mockContext)
// collectBlockData is now called with the current node ID for parallel branch context
expect(mockCollectBlockData).toHaveBeenCalledWith(mockContext, mockBlock.id)
})

it('should handle function_execute tool failure', async () => {
Expand Down Expand Up @@ -620,4 +621,248 @@ describe('ConditionBlockHandler', () => {
expect(mockContext.decisions.condition.has(mockBlock.id)).toBe(false)
})
})

describe('Parallel branch handling', () => {
it('should resolve connections and block data correctly when inside a parallel branch', async () => {
// Simulate a condition block inside a parallel branch
// Virtual block ID uses subscript notation: blockId₍branchIndex₎
const parallelConditionBlock: SerializedBlock = {
id: 'cond-block-1₍0₎', // Virtual ID for branch 0
metadata: { id: 'condition', name: 'Condition' },
position: { x: 0, y: 0 },
config: {},
}

// Source block also has a virtual ID in the same branch
const sourceBlockVirtualId = 'agent-block-1₍0₎'

// Set up workflow with connections using BASE block IDs (as they are in the workflow definition)
const parallelWorkflow: SerializedWorkflow = {
blocks: [
{
id: 'agent-block-1',
metadata: { id: 'agent', name: 'Agent' },
position: { x: 0, y: 0 },
config: {},
},
{
id: 'cond-block-1',
metadata: { id: 'condition', name: 'Condition' },
position: { x: 100, y: 0 },
config: {},
},
{
id: 'target-block-1',
metadata: { id: 'api', name: 'Target' },
position: { x: 200, y: 0 },
config: {},
},
],
connections: [
// Connections use base IDs, not virtual IDs
{ source: 'agent-block-1', target: 'cond-block-1' },
{ source: 'cond-block-1', target: 'target-block-1', sourceHandle: 'condition-cond1' },
],
loops: [],
parallels: [],
}

// Block states use virtual IDs (as outputs are stored per-branch)
const parallelBlockStates = new Map<string, BlockState>([
[
sourceBlockVirtualId,
{ output: { response: 'hello from branch 0', success: true }, executed: true },
],
])

const parallelContext: ExecutionContext = {
workflowId: 'test-workflow-id',
workspaceId: 'test-workspace-id',
workflow: parallelWorkflow,
blockStates: parallelBlockStates,
blockLogs: [],
completedBlocks: new Set(),
decisions: {
router: new Map(),
condition: new Map(),
},
environmentVariables: {},
workflowVariables: {},
}

const conditions = [
{ id: 'cond1', title: 'if', value: 'context.response === "hello from branch 0"' },
{ id: 'else1', title: 'else', value: '' },
]
const inputs = { conditions: JSON.stringify(conditions) }

const result = await handler.execute(parallelContext, parallelConditionBlock, inputs)

// The condition should evaluate to true because:
// 1. Connection lookup uses base ID 'cond-block-1' (extracted from 'cond-block-1₍0₎')
// 2. Source block output is found at virtual ID 'agent-block-1₍0₎' (same branch)
// 3. The evaluation context contains { response: 'hello from branch 0' }
expect((result as any).conditionResult).toBe(true)
expect((result as any).selectedOption).toBe('cond1')
expect((result as any).selectedPath).toEqual({
blockId: 'target-block-1',
blockType: 'api',
blockTitle: 'Target',
})
})

it('should find correct source block output in parallel branch context', async () => {
// Test that when multiple branches exist, the correct branch output is used
const parallelConditionBlock: SerializedBlock = {
id: 'cond-block-1₍1₎', // Virtual ID for branch 1
metadata: { id: 'condition', name: 'Condition' },
position: { x: 0, y: 0 },
config: {},
}

const parallelWorkflow: SerializedWorkflow = {
blocks: [
{
id: 'agent-block-1',
metadata: { id: 'agent', name: 'Agent' },
position: { x: 0, y: 0 },
config: {},
},
{
id: 'cond-block-1',
metadata: { id: 'condition', name: 'Condition' },
position: { x: 100, y: 0 },
config: {},
},
{
id: 'target-block-1',
metadata: { id: 'api', name: 'Target' },
position: { x: 200, y: 0 },
config: {},
},
],
connections: [
{ source: 'agent-block-1', target: 'cond-block-1' },
{ source: 'cond-block-1', target: 'target-block-1', sourceHandle: 'condition-cond1' },
],
loops: [],
parallels: [],
}

// Multiple branches have executed - each has different output
const parallelBlockStates = new Map<string, BlockState>([
['agent-block-1₍0₎', { output: { value: 10 }, executed: true }],
['agent-block-1₍1₎', { output: { value: 25 }, executed: true }], // Branch 1 has value 25
['agent-block-1₍2₎', { output: { value: 5 }, executed: true }],
])

const parallelContext: ExecutionContext = {
workflowId: 'test-workflow-id',
workspaceId: 'test-workspace-id',
workflow: parallelWorkflow,
blockStates: parallelBlockStates,
blockLogs: [],
completedBlocks: new Set(),
decisions: {
router: new Map(),
condition: new Map(),
},
environmentVariables: {},
workflowVariables: {},
}

// Condition checks if value > 20 - should be true for branch 1 (value=25)
const conditions = [
{ id: 'cond1', title: 'if', value: 'context.value > 20' },
{ id: 'else1', title: 'else', value: '' },
]
const inputs = { conditions: JSON.stringify(conditions) }

const result = await handler.execute(parallelContext, parallelConditionBlock, inputs)

// Should evaluate using branch 1's data (value=25), not branch 0 (value=10) or branch 2 (value=5)
expect((result as any).conditionResult).toBe(true)
expect((result as any).selectedOption).toBe('cond1')
})

it('should fall back to else when condition is false in parallel branch', async () => {
const parallelConditionBlock: SerializedBlock = {
id: 'cond-block-1₍2₎', // Virtual ID for branch 2
metadata: { id: 'condition', name: 'Condition' },
position: { x: 0, y: 0 },
config: {},
}

const parallelWorkflow: SerializedWorkflow = {
blocks: [
{
id: 'agent-block-1',
metadata: { id: 'agent', name: 'Agent' },
position: { x: 0, y: 0 },
config: {},
},
{
id: 'cond-block-1',
metadata: { id: 'condition', name: 'Condition' },
position: { x: 100, y: 0 },
config: {},
},
{
id: 'target-true',
metadata: { id: 'api', name: 'True Path' },
position: { x: 200, y: 0 },
config: {},
},
{
id: 'target-false',
metadata: { id: 'api', name: 'False Path' },
position: { x: 200, y: 100 },
config: {},
},
],
connections: [
{ source: 'agent-block-1', target: 'cond-block-1' },
{ source: 'cond-block-1', target: 'target-true', sourceHandle: 'condition-cond1' },
{ source: 'cond-block-1', target: 'target-false', sourceHandle: 'condition-else1' },
],
loops: [],
parallels: [],
}

const parallelBlockStates = new Map<string, BlockState>([
['agent-block-1₍0₎', { output: { value: 100 }, executed: true }],
['agent-block-1₍1₎', { output: { value: 50 }, executed: true }],
['agent-block-1₍2₎', { output: { value: 5 }, executed: true }], // Branch 2 has value 5
])

const parallelContext: ExecutionContext = {
workflowId: 'test-workflow-id',
workspaceId: 'test-workspace-id',
workflow: parallelWorkflow,
blockStates: parallelBlockStates,
blockLogs: [],
completedBlocks: new Set(),
decisions: {
router: new Map(),
condition: new Map(),
},
environmentVariables: {},
workflowVariables: {},
}

// Condition checks if value > 20 - should be false for branch 2 (value=5)
const conditions = [
{ id: 'cond1', title: 'if', value: 'context.value > 20' },
{ id: 'else1', title: 'else', value: '' },
]
const inputs = { conditions: JSON.stringify(conditions) }

const result = await handler.execute(parallelContext, parallelConditionBlock, inputs)

// Should fall back to else path because branch 2's value (5) is not > 20
expect((result as any).conditionResult).toBe(true)
expect((result as any).selectedOption).toBe('else1')
expect((result as any).selectedPath.blockId).toBe('target-false')
})
})
})
38 changes: 31 additions & 7 deletions apps/sim/executor/handlers/condition/condition-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import type { BlockOutput } from '@/blocks/types'
import { BlockType, CONDITION, DEFAULTS, EDGE } from '@/executor/constants'
import type { BlockHandler, ExecutionContext } from '@/executor/types'
import { collectBlockData } from '@/executor/utils/block-data'
import {
buildBranchNodeId,
extractBaseBlockId,
extractBranchIndex,
isBranchNodeId,
} from '@/executor/utils/subflow-utils'
import type { SerializedBlock } from '@/serializer/types'
import { executeTool } from '@/tools'

Expand All @@ -18,15 +24,16 @@ const CONDITION_TIMEOUT_MS = 5000
export async function evaluateConditionExpression(
ctx: ExecutionContext,
conditionExpression: string,
providedEvalContext?: Record<string, any>
providedEvalContext?: Record<string, any>,
currentNodeId?: string
): Promise<boolean> {
const evalContext = providedEvalContext || {}

try {
const contextSetup = `const context = ${JSON.stringify(evalContext)};`
const code = `${contextSetup}\nreturn Boolean(${conditionExpression})`

const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx)
const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx, currentNodeId)

const result = await executeTool(
'function_execute',
Expand Down Expand Up @@ -83,21 +90,36 @@ export class ConditionBlockHandler implements BlockHandler {
): Promise<BlockOutput> {
const conditions = this.parseConditions(inputs.conditions)

const sourceBlockId = ctx.workflow?.connections.find((conn) => conn.target === block.id)?.source
const baseBlockId = extractBaseBlockId(block.id)
const branchIndex = isBranchNodeId(block.id) ? extractBranchIndex(block.id) : null

const sourceConnection = ctx.workflow?.connections.find((conn) => conn.target === baseBlockId)
let sourceBlockId = sourceConnection?.source

if (sourceBlockId && branchIndex !== null) {
const virtualSourceId = buildBranchNodeId(sourceBlockId, branchIndex)
if (ctx.blockStates.has(virtualSourceId)) {
sourceBlockId = virtualSourceId
}
}

const evalContext = this.buildEvaluationContext(ctx, sourceBlockId)
const rawSourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null

// Filter out _pauseMetadata from source output to prevent the engine from
// thinking this block is pausing (it was already resumed by the HITL block)
const sourceOutput = this.filterPauseMetadata(rawSourceOutput)

const outgoingConnections = ctx.workflow?.connections.filter((conn) => conn.source === block.id)
const outgoingConnections = ctx.workflow?.connections.filter(
(conn) => conn.source === baseBlockId
)

const { selectedConnection, selectedCondition } = await this.evaluateConditions(
conditions,
outgoingConnections || [],
evalContext,
ctx
ctx,
block.id
)

if (!selectedConnection || !selectedCondition) {
Expand Down Expand Up @@ -170,7 +192,8 @@ export class ConditionBlockHandler implements BlockHandler {
conditions: Array<{ id: string; title: string; value: string }>,
outgoingConnections: Array<{ source: string; target: string; sourceHandle?: string }>,
evalContext: Record<string, any>,
ctx: ExecutionContext
ctx: ExecutionContext,
currentNodeId?: string
): Promise<{
selectedConnection: { target: string; sourceHandle?: string } | null
selectedCondition: { id: string; title: string; value: string } | null
Expand All @@ -189,7 +212,8 @@ export class ConditionBlockHandler implements BlockHandler {
const conditionMet = await evaluateConditionExpression(
ctx,
conditionValueString,
evalContext
evalContext,
currentNodeId
)

if (conditionMet) {
Expand Down
23 changes: 22 additions & 1 deletion apps/sim/executor/utils/block-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import { normalizeInputFormatValue } from '@/lib/workflows/input-format'
import { isTriggerBehavior, normalizeName } from '@/executor/constants'
import type { ExecutionContext } from '@/executor/types'
import type { OutputSchema } from '@/executor/utils/block-reference'
import {
extractBaseBlockId,
extractBranchIndex,
isBranchNodeId,
} from '@/executor/utils/subflow-utils'
import type { SerializedBlock } from '@/serializer/types'
import type { ToolConfig } from '@/tools/types'
import { getTool } from '@/tools/utils'
Expand Down Expand Up @@ -86,14 +91,30 @@ export function getBlockSchema(
return undefined
}

export function collectBlockData(ctx: ExecutionContext): BlockDataCollection {
export function collectBlockData(
ctx: ExecutionContext,
currentNodeId?: string
): BlockDataCollection {
const blockData: Record<string, unknown> = {}
const blockNameMapping: Record<string, string> = {}
const blockOutputSchemas: Record<string, OutputSchema> = {}

const branchIndex =
currentNodeId && isBranchNodeId(currentNodeId) ? extractBranchIndex(currentNodeId) : null

for (const [id, state] of ctx.blockStates.entries()) {
if (state.output !== undefined) {
blockData[id] = state.output

if (branchIndex !== null && isBranchNodeId(id)) {
const stateBranchIndex = extractBranchIndex(id)
if (stateBranchIndex === branchIndex) {
const baseId = extractBaseBlockId(id)
if (blockData[baseId] === undefined) {
blockData[baseId] = state.output
}
}
}
}
}

Expand Down