Skip to content

Commit cf2f1ab

Browse files
fix(executor): condition inside parallel (#3094)
* fix(executor): condition inside parallel * remove comments
1 parent 4109fee commit cf2f1ab

File tree

3 files changed

+299
-9
lines changed

3 files changed

+299
-9
lines changed

apps/sim/executor/handlers/condition/condition-handler.test.ts

Lines changed: 246 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ describe('ConditionBlockHandler', () => {
322322

323323
await handler.execute(mockContext, mockBlock, inputs)
324324

325-
expect(mockCollectBlockData).toHaveBeenCalledWith(mockContext)
325+
// collectBlockData is now called with the current node ID for parallel branch context
326+
expect(mockCollectBlockData).toHaveBeenCalledWith(mockContext, mockBlock.id)
326327
})
327328

328329
it('should handle function_execute tool failure', async () => {
@@ -620,4 +621,248 @@ describe('ConditionBlockHandler', () => {
620621
expect(mockContext.decisions.condition.has(mockBlock.id)).toBe(false)
621622
})
622623
})
624+
625+
describe('Parallel branch handling', () => {
626+
it('should resolve connections and block data correctly when inside a parallel branch', async () => {
627+
// Simulate a condition block inside a parallel branch
628+
// Virtual block ID uses subscript notation: blockId₍branchIndex₎
629+
const parallelConditionBlock: SerializedBlock = {
630+
id: 'cond-block-1₍0₎', // Virtual ID for branch 0
631+
metadata: { id: 'condition', name: 'Condition' },
632+
position: { x: 0, y: 0 },
633+
config: {},
634+
}
635+
636+
// Source block also has a virtual ID in the same branch
637+
const sourceBlockVirtualId = 'agent-block-1₍0₎'
638+
639+
// Set up workflow with connections using BASE block IDs (as they are in the workflow definition)
640+
const parallelWorkflow: SerializedWorkflow = {
641+
blocks: [
642+
{
643+
id: 'agent-block-1',
644+
metadata: { id: 'agent', name: 'Agent' },
645+
position: { x: 0, y: 0 },
646+
config: {},
647+
},
648+
{
649+
id: 'cond-block-1',
650+
metadata: { id: 'condition', name: 'Condition' },
651+
position: { x: 100, y: 0 },
652+
config: {},
653+
},
654+
{
655+
id: 'target-block-1',
656+
metadata: { id: 'api', name: 'Target' },
657+
position: { x: 200, y: 0 },
658+
config: {},
659+
},
660+
],
661+
connections: [
662+
// Connections use base IDs, not virtual IDs
663+
{ source: 'agent-block-1', target: 'cond-block-1' },
664+
{ source: 'cond-block-1', target: 'target-block-1', sourceHandle: 'condition-cond1' },
665+
],
666+
loops: [],
667+
parallels: [],
668+
}
669+
670+
// Block states use virtual IDs (as outputs are stored per-branch)
671+
const parallelBlockStates = new Map<string, BlockState>([
672+
[
673+
sourceBlockVirtualId,
674+
{ output: { response: 'hello from branch 0', success: true }, executed: true },
675+
],
676+
])
677+
678+
const parallelContext: ExecutionContext = {
679+
workflowId: 'test-workflow-id',
680+
workspaceId: 'test-workspace-id',
681+
workflow: parallelWorkflow,
682+
blockStates: parallelBlockStates,
683+
blockLogs: [],
684+
completedBlocks: new Set(),
685+
decisions: {
686+
router: new Map(),
687+
condition: new Map(),
688+
},
689+
environmentVariables: {},
690+
workflowVariables: {},
691+
}
692+
693+
const conditions = [
694+
{ id: 'cond1', title: 'if', value: 'context.response === "hello from branch 0"' },
695+
{ id: 'else1', title: 'else', value: '' },
696+
]
697+
const inputs = { conditions: JSON.stringify(conditions) }
698+
699+
const result = await handler.execute(parallelContext, parallelConditionBlock, inputs)
700+
701+
// The condition should evaluate to true because:
702+
// 1. Connection lookup uses base ID 'cond-block-1' (extracted from 'cond-block-1₍0₎')
703+
// 2. Source block output is found at virtual ID 'agent-block-1₍0₎' (same branch)
704+
// 3. The evaluation context contains { response: 'hello from branch 0' }
705+
expect((result as any).conditionResult).toBe(true)
706+
expect((result as any).selectedOption).toBe('cond1')
707+
expect((result as any).selectedPath).toEqual({
708+
blockId: 'target-block-1',
709+
blockType: 'api',
710+
blockTitle: 'Target',
711+
})
712+
})
713+
714+
it('should find correct source block output in parallel branch context', async () => {
715+
// Test that when multiple branches exist, the correct branch output is used
716+
const parallelConditionBlock: SerializedBlock = {
717+
id: 'cond-block-1₍1₎', // Virtual ID for branch 1
718+
metadata: { id: 'condition', name: 'Condition' },
719+
position: { x: 0, y: 0 },
720+
config: {},
721+
}
722+
723+
const parallelWorkflow: SerializedWorkflow = {
724+
blocks: [
725+
{
726+
id: 'agent-block-1',
727+
metadata: { id: 'agent', name: 'Agent' },
728+
position: { x: 0, y: 0 },
729+
config: {},
730+
},
731+
{
732+
id: 'cond-block-1',
733+
metadata: { id: 'condition', name: 'Condition' },
734+
position: { x: 100, y: 0 },
735+
config: {},
736+
},
737+
{
738+
id: 'target-block-1',
739+
metadata: { id: 'api', name: 'Target' },
740+
position: { x: 200, y: 0 },
741+
config: {},
742+
},
743+
],
744+
connections: [
745+
{ source: 'agent-block-1', target: 'cond-block-1' },
746+
{ source: 'cond-block-1', target: 'target-block-1', sourceHandle: 'condition-cond1' },
747+
],
748+
loops: [],
749+
parallels: [],
750+
}
751+
752+
// Multiple branches have executed - each has different output
753+
const parallelBlockStates = new Map<string, BlockState>([
754+
['agent-block-1₍0₎', { output: { value: 10 }, executed: true }],
755+
['agent-block-1₍1₎', { output: { value: 25 }, executed: true }], // Branch 1 has value 25
756+
['agent-block-1₍2₎', { output: { value: 5 }, executed: true }],
757+
])
758+
759+
const parallelContext: ExecutionContext = {
760+
workflowId: 'test-workflow-id',
761+
workspaceId: 'test-workspace-id',
762+
workflow: parallelWorkflow,
763+
blockStates: parallelBlockStates,
764+
blockLogs: [],
765+
completedBlocks: new Set(),
766+
decisions: {
767+
router: new Map(),
768+
condition: new Map(),
769+
},
770+
environmentVariables: {},
771+
workflowVariables: {},
772+
}
773+
774+
// Condition checks if value > 20 - should be true for branch 1 (value=25)
775+
const conditions = [
776+
{ id: 'cond1', title: 'if', value: 'context.value > 20' },
777+
{ id: 'else1', title: 'else', value: '' },
778+
]
779+
const inputs = { conditions: JSON.stringify(conditions) }
780+
781+
const result = await handler.execute(parallelContext, parallelConditionBlock, inputs)
782+
783+
// Should evaluate using branch 1's data (value=25), not branch 0 (value=10) or branch 2 (value=5)
784+
expect((result as any).conditionResult).toBe(true)
785+
expect((result as any).selectedOption).toBe('cond1')
786+
})
787+
788+
it('should fall back to else when condition is false in parallel branch', async () => {
789+
const parallelConditionBlock: SerializedBlock = {
790+
id: 'cond-block-1₍2₎', // Virtual ID for branch 2
791+
metadata: { id: 'condition', name: 'Condition' },
792+
position: { x: 0, y: 0 },
793+
config: {},
794+
}
795+
796+
const parallelWorkflow: SerializedWorkflow = {
797+
blocks: [
798+
{
799+
id: 'agent-block-1',
800+
metadata: { id: 'agent', name: 'Agent' },
801+
position: { x: 0, y: 0 },
802+
config: {},
803+
},
804+
{
805+
id: 'cond-block-1',
806+
metadata: { id: 'condition', name: 'Condition' },
807+
position: { x: 100, y: 0 },
808+
config: {},
809+
},
810+
{
811+
id: 'target-true',
812+
metadata: { id: 'api', name: 'True Path' },
813+
position: { x: 200, y: 0 },
814+
config: {},
815+
},
816+
{
817+
id: 'target-false',
818+
metadata: { id: 'api', name: 'False Path' },
819+
position: { x: 200, y: 100 },
820+
config: {},
821+
},
822+
],
823+
connections: [
824+
{ source: 'agent-block-1', target: 'cond-block-1' },
825+
{ source: 'cond-block-1', target: 'target-true', sourceHandle: 'condition-cond1' },
826+
{ source: 'cond-block-1', target: 'target-false', sourceHandle: 'condition-else1' },
827+
],
828+
loops: [],
829+
parallels: [],
830+
}
831+
832+
const parallelBlockStates = new Map<string, BlockState>([
833+
['agent-block-1₍0₎', { output: { value: 100 }, executed: true }],
834+
['agent-block-1₍1₎', { output: { value: 50 }, executed: true }],
835+
['agent-block-1₍2₎', { output: { value: 5 }, executed: true }], // Branch 2 has value 5
836+
])
837+
838+
const parallelContext: ExecutionContext = {
839+
workflowId: 'test-workflow-id',
840+
workspaceId: 'test-workspace-id',
841+
workflow: parallelWorkflow,
842+
blockStates: parallelBlockStates,
843+
blockLogs: [],
844+
completedBlocks: new Set(),
845+
decisions: {
846+
router: new Map(),
847+
condition: new Map(),
848+
},
849+
environmentVariables: {},
850+
workflowVariables: {},
851+
}
852+
853+
// Condition checks if value > 20 - should be false for branch 2 (value=5)
854+
const conditions = [
855+
{ id: 'cond1', title: 'if', value: 'context.value > 20' },
856+
{ id: 'else1', title: 'else', value: '' },
857+
]
858+
const inputs = { conditions: JSON.stringify(conditions) }
859+
860+
const result = await handler.execute(parallelContext, parallelConditionBlock, inputs)
861+
862+
// Should fall back to else path because branch 2's value (5) is not > 20
863+
expect((result as any).conditionResult).toBe(true)
864+
expect((result as any).selectedOption).toBe('else1')
865+
expect((result as any).selectedPath.blockId).toBe('target-false')
866+
})
867+
})
623868
})

apps/sim/executor/handlers/condition/condition-handler.ts

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ import type { BlockOutput } from '@/blocks/types'
33
import { BlockType, CONDITION, DEFAULTS, EDGE } from '@/executor/constants'
44
import type { BlockHandler, ExecutionContext } from '@/executor/types'
55
import { collectBlockData } from '@/executor/utils/block-data'
6+
import {
7+
buildBranchNodeId,
8+
extractBaseBlockId,
9+
extractBranchIndex,
10+
isBranchNodeId,
11+
} from '@/executor/utils/subflow-utils'
612
import type { SerializedBlock } from '@/serializer/types'
713
import { executeTool } from '@/tools'
814

@@ -18,15 +24,16 @@ const CONDITION_TIMEOUT_MS = 5000
1824
export async function evaluateConditionExpression(
1925
ctx: ExecutionContext,
2026
conditionExpression: string,
21-
providedEvalContext?: Record<string, any>
27+
providedEvalContext?: Record<string, any>,
28+
currentNodeId?: string
2229
): Promise<boolean> {
2330
const evalContext = providedEvalContext || {}
2431

2532
try {
2633
const contextSetup = `const context = ${JSON.stringify(evalContext)};`
2734
const code = `${contextSetup}\nreturn Boolean(${conditionExpression})`
2835

29-
const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx)
36+
const { blockData, blockNameMapping, blockOutputSchemas } = collectBlockData(ctx, currentNodeId)
3037

3138
const result = await executeTool(
3239
'function_execute',
@@ -83,21 +90,36 @@ export class ConditionBlockHandler implements BlockHandler {
8390
): Promise<BlockOutput> {
8491
const conditions = this.parseConditions(inputs.conditions)
8592

86-
const sourceBlockId = ctx.workflow?.connections.find((conn) => conn.target === block.id)?.source
93+
const baseBlockId = extractBaseBlockId(block.id)
94+
const branchIndex = isBranchNodeId(block.id) ? extractBranchIndex(block.id) : null
95+
96+
const sourceConnection = ctx.workflow?.connections.find((conn) => conn.target === baseBlockId)
97+
let sourceBlockId = sourceConnection?.source
98+
99+
if (sourceBlockId && branchIndex !== null) {
100+
const virtualSourceId = buildBranchNodeId(sourceBlockId, branchIndex)
101+
if (ctx.blockStates.has(virtualSourceId)) {
102+
sourceBlockId = virtualSourceId
103+
}
104+
}
105+
87106
const evalContext = this.buildEvaluationContext(ctx, sourceBlockId)
88107
const rawSourceOutput = sourceBlockId ? ctx.blockStates.get(sourceBlockId)?.output : null
89108

90109
// Filter out _pauseMetadata from source output to prevent the engine from
91110
// thinking this block is pausing (it was already resumed by the HITL block)
92111
const sourceOutput = this.filterPauseMetadata(rawSourceOutput)
93112

94-
const outgoingConnections = ctx.workflow?.connections.filter((conn) => conn.source === block.id)
113+
const outgoingConnections = ctx.workflow?.connections.filter(
114+
(conn) => conn.source === baseBlockId
115+
)
95116

96117
const { selectedConnection, selectedCondition } = await this.evaluateConditions(
97118
conditions,
98119
outgoingConnections || [],
99120
evalContext,
100-
ctx
121+
ctx,
122+
block.id
101123
)
102124

103125
if (!selectedConnection || !selectedCondition) {
@@ -170,7 +192,8 @@ export class ConditionBlockHandler implements BlockHandler {
170192
conditions: Array<{ id: string; title: string; value: string }>,
171193
outgoingConnections: Array<{ source: string; target: string; sourceHandle?: string }>,
172194
evalContext: Record<string, any>,
173-
ctx: ExecutionContext
195+
ctx: ExecutionContext,
196+
currentNodeId?: string
174197
): Promise<{
175198
selectedConnection: { target: string; sourceHandle?: string } | null
176199
selectedCondition: { id: string; title: string; value: string } | null
@@ -189,7 +212,8 @@ export class ConditionBlockHandler implements BlockHandler {
189212
const conditionMet = await evaluateConditionExpression(
190213
ctx,
191214
conditionValueString,
192-
evalContext
215+
evalContext,
216+
currentNodeId
193217
)
194218

195219
if (conditionMet) {

apps/sim/executor/utils/block-data.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ import { normalizeInputFormatValue } from '@/lib/workflows/input-format'
22
import { isTriggerBehavior, normalizeName } from '@/executor/constants'
33
import type { ExecutionContext } from '@/executor/types'
44
import type { OutputSchema } from '@/executor/utils/block-reference'
5+
import {
6+
extractBaseBlockId,
7+
extractBranchIndex,
8+
isBranchNodeId,
9+
} from '@/executor/utils/subflow-utils'
510
import type { SerializedBlock } from '@/serializer/types'
611
import type { ToolConfig } from '@/tools/types'
712
import { getTool } from '@/tools/utils'
@@ -86,14 +91,30 @@ export function getBlockSchema(
8691
return undefined
8792
}
8893

89-
export function collectBlockData(ctx: ExecutionContext): BlockDataCollection {
94+
export function collectBlockData(
95+
ctx: ExecutionContext,
96+
currentNodeId?: string
97+
): BlockDataCollection {
9098
const blockData: Record<string, unknown> = {}
9199
const blockNameMapping: Record<string, string> = {}
92100
const blockOutputSchemas: Record<string, OutputSchema> = {}
93101

102+
const branchIndex =
103+
currentNodeId && isBranchNodeId(currentNodeId) ? extractBranchIndex(currentNodeId) : null
104+
94105
for (const [id, state] of ctx.blockStates.entries()) {
95106
if (state.output !== undefined) {
96107
blockData[id] = state.output
108+
109+
if (branchIndex !== null && isBranchNodeId(id)) {
110+
const stateBranchIndex = extractBranchIndex(id)
111+
if (stateBranchIndex === branchIndex) {
112+
const baseId = extractBaseBlockId(id)
113+
if (blockData[baseId] === undefined) {
114+
blockData[baseId] = state.output
115+
}
116+
}
117+
}
97118
}
98119
}
99120

0 commit comments

Comments
 (0)