From ef375d015c0101e6055fb5af1bd8fd076a3584b2 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Thu, 12 Mar 2026 21:28:51 -0700 Subject: [PATCH 1/6] feat(rivetkit): add workflow try helpers and visualizer support --- .../actors/workflow/workflow-example-data.ts | 127 ++ .../workflow/workflow-to-xyflow.test.ts | 42 + .../actors/workflow/workflow-to-xyflow.ts | 1101 ++++++++++++----- .../actors/workflow/workflow-types.ts | 4 +- .../actors/workflow/workflow-visualizer.tsx | 9 +- .../actors/workflow/xyflow-nodes.stories.tsx | 21 + .../actors/workflow/xyflow-nodes.tsx | 142 ++- .../fixtures/driver-test-suite/registry.ts | 2 + .../fixtures/driver-test-suite/workflow.ts | 56 + .../driver-test-suite/tests/actor-workflow.ts | 24 + .../packages/rivetkit/src/workflow/context.ts | 61 + .../packages/rivetkit/src/workflow/mod.ts | 8 + .../packages/workflow-engine/QUICKSTART.md | 30 + .../workflow-engine/docs/control-flow.md | 33 + .../packages/workflow-engine/src/context.ts | 667 ++++++++-- .../packages/workflow-engine/src/index.ts | 8 + .../packages/workflow-engine/src/types.ts | 53 + .../workflow-engine/tests/join.test.ts | 60 + .../workflow-engine/tests/race.test.ts | 69 ++ .../workflow-engine/tests/try.test.ts | 342 +++++ website/src/content/docs/actors/workflows.mdx | 70 ++ 21 files changed, 2556 insertions(+), 373 deletions(-) create mode 100644 frontend/src/components/actors/workflow/workflow-to-xyflow.test.ts create mode 100644 rivetkit-typescript/packages/workflow-engine/tests/try.test.ts diff --git a/frontend/src/components/actors/workflow/workflow-example-data.ts b/frontend/src/components/actors/workflow/workflow-example-data.ts index 53710cda70..88da299695 100644 --- a/frontend/src/components/actors/workflow/workflow-example-data.ts +++ b/frontend/src/components/actors/workflow/workflow-example-data.ts @@ -1178,6 +1178,133 @@ export const inProgressWorkflow: WorkflowHistory = { ], }; +// Workflow with tryStep and try block recovery +export const tryWorkflow: WorkflowHistory = { + workflowId: "try-workflow-001", + state: "completed", + nameRegistry: [ + "prepare", + "charge-card", + "payment-flow", + "parallel-verification", + "fraud", + "score-order", + "inventory", + "reserve-stock", + "finalize", + ], + input: { orderId: "ord_123", amount: 4200 }, + output: { status: "manual-review", recovered: true }, + history: [ + { + key: "prepare", + entry: { + id: "1", + location: [0], + kind: { + type: "step", + data: { output: { ready: true } }, + }, + dirty: false, + status: "completed", + startedAt: 1700000450000, + completedAt: 1700000450045, + }, + }, + { + key: "charge-card", + entry: { + id: "2", + location: [1], + kind: { + type: "step", + data: { error: "card declined" }, + }, + dirty: false, + status: "failed", + startedAt: 1700000450100, + completedAt: 1700000450280, + retryCount: 0, + error: "card declined", + }, + }, + { + key: "payment-flow/parallel-verification", + entry: { + id: "3", + location: [2, 3], + kind: { + type: "join", + data: { + branches: { + fraud: { + status: "completed", + output: { risk: "low" }, + }, + inventory: { + status: "failed", + error: "inventory mismatch", + }, + }, + }, + }, + dirty: false, + status: "failed", + startedAt: 1700000450400, + completedAt: 1700000450850, + error: "join failed", + }, + }, + { + key: "payment-flow/parallel-verification/fraud/score-order", + entry: { + id: "4", + location: [2, 3, 4, 5], + kind: { + type: "step", + data: { output: { score: 0.08 } }, + }, + dirty: false, + status: "completed", + startedAt: 1700000450410, + completedAt: 1700000450620, + }, + }, + { + key: "payment-flow/parallel-verification/inventory/reserve-stock", + entry: { + id: "5", + location: [2, 3, 6, 7], + kind: { + type: "step", + data: { error: "inventory mismatch" }, + }, + dirty: false, + status: "failed", + startedAt: 1700000450415, + completedAt: 1700000450790, + retryCount: 1, + error: "inventory mismatch", + }, + }, + { + key: "finalize", + entry: { + id: "6", + location: [8], + kind: { + type: "step", + data: { output: { queuedManualReview: true } }, + }, + dirty: false, + status: "completed", + startedAt: 1700000450960, + completedAt: 1700000451010, + }, + }, + ], +}; + // Workflow with retrying step export const retryWorkflow: WorkflowHistory = { workflowId: "retry-workflow-001", diff --git a/frontend/src/components/actors/workflow/workflow-to-xyflow.test.ts b/frontend/src/components/actors/workflow/workflow-to-xyflow.test.ts new file mode 100644 index 0000000000..7ae11d2374 --- /dev/null +++ b/frontend/src/components/actors/workflow/workflow-to-xyflow.test.ts @@ -0,0 +1,42 @@ +import { describe, expect, it } from "vitest"; +import { tryWorkflow } from "./workflow-example-data"; +import { workflowHistoryToXYFlow } from "./workflow-to-xyflow"; + +describe("workflowHistoryToXYFlow", () => { + it("renders synthetic try scopes instead of dropping nested try entries", () => { + const { nodes } = workflowHistoryToXYFlow(tryWorkflow); + + expect( + nodes.some( + (node) => + node.type === "tryGroup" && + node.data?.label === "payment-flow", + ), + ).toBe(true); + + expect( + nodes.some( + (node) => + node.type === "workflow" && + node.data?.label === "parallel-verification", + ), + ).toBe(true); + }); + + it("marks handled step failures so tryStep is visually distinct", () => { + const { nodes } = workflowHistoryToXYFlow(tryWorkflow); + const chargeCardNode = nodes.find( + (node) => + node.type === "workflow" && node.data?.label === "charge-card", + ); + const reserveStockNode = nodes.find( + (node) => + node.type === "workflow" && + node.data?.label === "reserve-stock", + ); + + expect(chargeCardNode?.data?.handledFailure).toBe(true); + expect(chargeCardNode?.data?.summary).toBe("handled error"); + expect(reserveStockNode?.data?.handledFailure).toBe(true); + }); +}); diff --git a/frontend/src/components/actors/workflow/workflow-to-xyflow.ts b/frontend/src/components/actors/workflow/workflow-to-xyflow.ts index 9bb45168d9..496bab233c 100644 --- a/frontend/src/components/actors/workflow/workflow-to-xyflow.ts +++ b/frontend/src/components/actors/workflow/workflow-to-xyflow.ts @@ -5,9 +5,9 @@ import type { ExtendedEntryType, HistoryItem, JoinEntry, - LoopEntry, - LoopIterationMarker, + Location, MessageEntry, + PathSegment, RaceEntry, RemovedEntry, SleepEntry, @@ -25,24 +25,24 @@ import { NODE_WIDTH, TERMINATION_NODE_SIZE, type TerminationNodeData, + type TryGroupNodeData, type WorkflowNodeData, } from "./xyflow-nodes"; -// ─── Constants ─────────────────────────────────────────────── - const NODE_GAP_Y = 48; const BRANCH_GAP_X = 60; const GROUP_WIDTH = NODE_WIDTH + 2 * LOOP_PADDING_X; - -// ─── Node types ────────────────────────────────────────────── +const TERMINATION_GAP = 24; type XYNode = Node; type XYLoopGroupNode = Node; +type XYTryGroupNode = Node; type XYBranchGroupNode = Node; type XYTerminationNode = Node; type AnyXYNode = | XYNode | XYLoopGroupNode + | XYTryGroupNode | XYBranchGroupNode | XYTerminationNode; @@ -56,6 +56,7 @@ type WorkflowNodeInput = { summary?: string; entryType: EntryKindType | "input" | "output"; status: EntryStatus; + handledFailure?: boolean; duration?: number; retryCount?: number; error?: string; @@ -66,17 +67,68 @@ type WorkflowNodeInput = { name?: string; }; -// ─── Helpers ───────────────────────────────────────────────── +type Bounds = { + minX: number; + minY: number; + maxX: number; + maxY: number; +}; + +interface LayoutFragment { + nodes: AnyXYNode[]; + edges: Edge[]; + bounds: Bounds | null; + firstTargetId?: string; + firstStartedAt?: number; + outgoingSources: string[]; + lastCompletedAt?: number; +} + +interface RenderTreeBase { + id: string; + key: string; + name: string; + location: Location; + children: RenderTreeNode[]; +} + +interface RenderTreeEntryNode extends RenderTreeBase { + kind: "entry"; + item: HistoryItem; +} + +interface RenderTreeTryNode extends RenderTreeBase { + kind: "try"; +} + +type RenderTreeNode = RenderTreeEntryNode | RenderTreeTryNode; + +interface LayoutContext { + workflow: WorkflowHistory; + hasTryAncestor: boolean; +} + +interface TreeStats { + handledFailureNames: string[]; + unhandledFailureNames: string[]; + hasRunning: boolean; + hasPending: boolean; +} function getDisplayName(key: string): string { const parts = key.split("/"); - return parts[parts.length - 1].replace(/^~\d+\//, ""); + return parts[parts.length - 1]?.replace(/^~\d+\//, "") ?? key; } -function getEntrySummary(type: ExtendedEntryType, data: unknown): string { +function getEntrySummary( + type: ExtendedEntryType, + data: unknown, + options?: { handledFailure?: boolean }, +): string { switch (type) { case "step": { const d = data as StepEntry; + if (options?.handledFailure) return "handled error"; if (d.error) return "error"; if (d.output === true) return "success"; if (typeof d.output === "number") return String(d.output); @@ -87,13 +139,13 @@ function getEntrySummary(type: ExtendedEntryType, data: unknown): string { case "message": return (data as MessageEntry).name.split(":").pop() || "received"; case "loop": - return `${(data as LoopEntry).iteration} iterations`; + return `${(data as { iteration: number }).iteration} iterations`; case "rollback_checkpoint": return "checkpoint"; case "join": { const d = data as JoinEntry; const done = Object.values(d.branches).filter( - (b) => b.status === "completed", + (branch) => branch.status === "completed", ).length; return `${done}/${Object.keys(d.branches).length} done`; } @@ -112,31 +164,49 @@ function getEntrySummary(type: ExtendedEntryType, data: unknown): string { } return String(d.value); } + case "try": + return "protected scope"; default: return ""; } } -/** Extract common node properties from a HistoryItem. */ -function itemToNodeData(item: HistoryItem) { - const { - startedAt, - completedAt, - kind, - status: rawStatus, - retryCount, - error, - } = item.entry; +function deriveEntryStatus(item: HistoryItem): EntryStatus { + const rawStatus = item.entry.status; + if (rawStatus) { + return rawStatus; + } + if (item.entry.completedAt != null) { + return "completed"; + } + if (item.entry.startedAt != null) { + return "running"; + } + return "pending"; +} + +function itemToNodeData( + item: HistoryItem, + _workflow: WorkflowHistory, + options?: { handledFailure?: boolean }, +) { + const { startedAt, completedAt, kind, retryCount, error } = item.entry; const duration = - startedAt && completedAt ? completedAt - startedAt : undefined; - const status: EntryStatus = - rawStatus || - (completedAt ? "completed" : startedAt ? "running" : "completed"); + startedAt != null && completedAt != null + ? completedAt - startedAt + : undefined; + const handledFailure = + options?.handledFailure && + (kind.type === "step" || deriveEntryStatus(item) === "failed"); + return { name: getDisplayName(item.key), - summary: getEntrySummary(kind.type, kind.data), + summary: getEntrySummary(kind.type, kind.data, { + handledFailure, + }), entryType: kind.type, - status, + status: deriveEntryStatus(item), + handledFailure, duration, startedAt, completedAt, @@ -147,27 +217,58 @@ function itemToNodeData(item: HistoryItem) { }; } -/** Sort history items by their last location segment. */ -function sortByLocation(items: HistoryItem[]) { - items.sort((a, b) => { - const aLoc = a.entry.location[a.entry.location.length - 1] as number; - const bLoc = b.entry.location[b.entry.location.length - 1] as number; - return aLoc - bLoc; - }); +function comparePathSegments(a: PathSegment, b: PathSegment): number { + if (typeof a === "number" && typeof b === "number") { + return a - b; + } + if (typeof a === "number") { + return -1; + } + if (typeof b === "number") { + return 1; + } + if (a.loop !== b.loop) { + return a.loop - b.loop; + } + return a.iteration - b.iteration; } -/** Calculate the height of a group container given a child count. */ -function groupHeight(childCount: number): number { - if (childCount <= 0) return LOOP_HEADER_HEIGHT + LOOP_PADDING_BOTTOM; - return ( - LOOP_HEADER_HEIGHT + - childCount * NODE_HEIGHT + - (childCount - 1) * NODE_GAP_Y + - LOOP_PADDING_BOTTOM - ); +function compareLocations(a: Location, b: Location): number { + const shared = Math.min(a.length, b.length); + for (let i = 0; i < shared; i++) { + const diff = comparePathSegments(a[i], b[i]); + if (diff !== 0) { + return diff; + } + } + return a.length - b.length; +} + +function serializePathSegment(segment: PathSegment): string { + if (typeof segment === "number") { + return `n${segment}`; + } + return `l${segment.loop}:${segment.iteration}`; +} + +function serializeLocation(location: Location): string { + return location.map(serializePathSegment).join("|"); +} + +function buildSyntheticKey( + location: Location, + nameRegistry: readonly string[], +): string { + return location + .map((segment) => { + if (typeof segment === "number") { + return nameRegistry[segment] ?? `unknown-${segment}`; + } + return `~${segment.iteration}`; + }) + .join("/"); } -/** Create a workflow node at the given position. */ function makeNode( id: string, x: number, @@ -184,6 +285,7 @@ function makeNode( summary: data.summary ?? "", entryType: data.entryType, status: data.status, + handledFailure: data.handledFailure, duration: data.duration, retryCount: data.retryCount, error: data.error, @@ -195,328 +297,751 @@ function makeNode( }; } -/** Create a child node inside a parent group. */ -function makeChildNode( - id: string, - parentId: string, - y: number, - data: Parameters[3], -): XYNode { - const node = makeNode(id, LOOP_PADDING_X, y, data); - (node as XYNode & { parentId: string }).parentId = parentId; - (node as XYNode & { extent: string }).extent = "parent"; - return node; +function measureNode(node: AnyXYNode): { width: number; height: number } { + const measuredWidth = node.measured?.width; + const measuredHeight = node.measured?.height; + const styleWidth = + typeof node.style?.width === "number" ? node.style.width : undefined; + const styleHeight = + typeof node.style?.height === "number" ? node.style.height : undefined; + + return { + width: measuredWidth ?? styleWidth ?? NODE_WIDTH, + height: measuredHeight ?? styleHeight ?? NODE_HEIGHT, + }; } -// ─── Main transform ────────────────────────────────────────── +function getNodeBounds(node: AnyXYNode): Bounds { + const { width, height } = measureNode(node); + return { + minX: node.position.x, + minY: node.position.y, + maxX: node.position.x + width, + maxY: node.position.y + height, + }; +} -export function workflowHistoryToXYFlow( - history: WorkflowHistory, -): LayoutResult { - const nodes: AnyXYNode[] = []; - const edges: Edge[] = []; +function mergeBounds(a: Bounds | null, b: Bounds | null): Bounds | null { + if (!a) return b; + if (!b) return a; + return { + minX: Math.min(a.minX, b.minX), + minY: Math.min(a.minY, b.minY), + maxX: Math.max(a.maxX, b.maxX), + maxY: Math.max(a.maxY, b.maxY), + }; +} + +function translateBounds( + bounds: Bounds | null, + dx: number, + dy: number, +): Bounds | null { + if (!bounds) { + return null; + } + return { + minX: bounds.minX + dx, + minY: bounds.minY + dy, + maxX: bounds.maxX + dx, + maxY: bounds.maxY + dy, + }; +} + +function translateFragment( + fragment: LayoutFragment, + dx: number, + dy: number, +): LayoutFragment { + return { + ...fragment, + nodes: fragment.nodes.map((node) => ({ + ...node, + position: { + x: node.position.x + dx, + y: node.position.y + dy, + }, + })), + bounds: translateBounds(fragment.bounds, dx, dy), + }; +} + +function gapLabel( + completedAt: number | undefined, + startedAt: number | undefined, +): Pick | undefined { + if (completedAt == null || startedAt == null || startedAt <= completedAt) { + return undefined; + } + + return { + label: formatDuration(startedAt - completedAt), + style: { stroke: "hsl(var(--muted-foreground))" }, + labelStyle: { + fill: "hsl(var(--muted-foreground))", + fontSize: 10, + }, + labelBgStyle: { + fill: "hsl(var(--background))", + fillOpacity: 0.8, + }, + }; +} + +function isTerminalFailureStatus(status: EntryStatus): boolean { + return status === "failed" || status === "retrying"; +} + +function shouldMarkHandledFailure( + node: RenderTreeEntryNode, + context: LayoutContext, +): boolean { + if (node.item.entry.kind.type !== "step") { + return false; + } + + return ( + isTerminalFailureStatus(deriveEntryStatus(node.item)) && + (context.hasTryAncestor || context.workflow.state !== "failed") + ); +} + +function collectTreeStats( + nodes: readonly RenderTreeNode[], + workflow: WorkflowHistory, + hasTryAncestor: boolean, +): TreeStats { + const stats: TreeStats = { + handledFailureNames: [], + unhandledFailureNames: [], + hasRunning: false, + hasPending: false, + }; + + for (const node of nodes) { + if (node.kind === "try") { + const childStats = collectTreeStats(node.children, workflow, true); + stats.handledFailureNames.push(...childStats.handledFailureNames); + stats.unhandledFailureNames.push( + ...childStats.unhandledFailureNames, + ); + stats.hasRunning ||= childStats.hasRunning; + stats.hasPending ||= childStats.hasPending; + continue; + } + + const status = deriveEntryStatus(node.item); + if (status === "running") { + stats.hasRunning = true; + } + if (status === "pending") { + stats.hasPending = true; + } + if (isTerminalFailureStatus(status)) { + const isHandled = hasTryAncestor || workflow.state !== "failed"; + if (isHandled) { + stats.handledFailureNames.push(node.name); + } else { + stats.unhandledFailureNames.push(node.name); + } + } + + const childStats = collectTreeStats( + node.children, + workflow, + hasTryAncestor, + ); + stats.handledFailureNames.push(...childStats.handledFailureNames); + stats.unhandledFailureNames.push(...childStats.unhandledFailureNames); + stats.hasRunning ||= childStats.hasRunning; + stats.hasPending ||= childStats.hasPending; + } + + return stats; +} + +function summarizeTryGroup( + node: RenderTreeTryNode, + workflow: WorkflowHistory, +): { summary: string; handledFailureCount: number } { + const stats = collectTreeStats(node.children, workflow, true); + if (stats.handledFailureNames.length === 1) { + return { + summary: `caught ${stats.handledFailureNames[0]}`, + handledFailureCount: 1, + }; + } + if (stats.handledFailureNames.length > 1) { + return { + summary: `caught ${stats.handledFailureNames.length} failures`, + handledFailureCount: stats.handledFailureNames.length, + }; + } + if (stats.unhandledFailureNames.length > 0) { + return { + summary: + stats.unhandledFailureNames.length === 1 + ? `failed at ${stats.unhandledFailureNames[0]}` + : `${stats.unhandledFailureNames.length} failures`, + handledFailureCount: 0, + }; + } + if (stats.hasRunning) { + return { summary: "running", handledFailureCount: 0 }; + } + if (stats.hasPending) { + return { summary: "pending", handledFailureCount: 0 }; + } + return { summary: "completed", handledFailureCount: 0 }; +} + +function findBranchName( + location: Location, + containerLocation: Location, + nameRegistry: readonly string[], +): string | null { + const segment = location[containerLocation.length]; + if (typeof segment !== "number") { + return null; + } + return nameRegistry[segment] ?? `unknown-${segment}`; +} + +function buildRenderTree(workflow: WorkflowHistory): RenderTreeNode[] { + const concreteNodes = new Map(); + for (const item of workflow.history) { + concreteNodes.set(serializeLocation(item.entry.location), { + id: `entry:${item.entry.id}`, + kind: "entry", + item, + key: item.key, + name: getDisplayName(item.key), + location: item.entry.location, + children: [], + }); + } + + const renderNodes = new Map(concreteNodes); + + for (const item of workflow.history) { + for (let i = 1; i < item.entry.location.length; i++) { + const prefix = item.entry.location.slice(0, i); + const prefixKey = serializeLocation(prefix); + if (renderNodes.has(prefixKey)) { + continue; + } + + const last = prefix[prefix.length - 1]; + if (typeof last !== "number") { + continue; + } + + const parentPrefix = prefix.slice(0, -1); + const parentConcrete = concreteNodes.get( + serializeLocation(parentPrefix), + ); + if ( + parentConcrete && + (parentConcrete.item.entry.kind.type === "join" || + parentConcrete.item.entry.kind.type === "race") + ) { + continue; + } + + renderNodes.set(prefixKey, { + id: `try:${prefixKey}`, + kind: "try", + key: buildSyntheticKey(prefix, workflow.nameRegistry), + name: workflow.nameRegistry[last] ?? `unknown-${last}`, + location: prefix, + children: [], + }); + } + } + + const rootChildren: RenderTreeNode[] = []; + const orderedNodes = Array.from(renderNodes.values()).sort((a, b) => { + if (a.location.length !== b.location.length) { + return a.location.length - b.location.length; + } + return compareLocations(a.location, b.location); + }); - // Partition items into top-level and nested (grouped by parent key). - const topLevel: HistoryItem[] = []; - const nestedByParent = new Map(); + for (const node of orderedNodes) { + let parentNode: RenderTreeNode | undefined; + for (let i = node.location.length - 1; i > 0; i--) { + parentNode = renderNodes.get( + serializeLocation(node.location.slice(0, i)), + ); + if (parentNode) { + break; + } + } - for (const item of history.history) { - const loc = item.entry.location; - if (loc.length === 1 && typeof loc[0] === "number") { - topLevel.push(item); + if (parentNode) { + parentNode.children.push(node); } else { - const parentKey = item.key.split("/")[0]; - const siblings = nestedByParent.get(parentKey) ?? []; - siblings.push(item); - nestedByParent.set(parentKey, siblings); + rootChildren.push(node); } } - sortByLocation(topLevel); + const sortTree = (nodes: RenderTreeNode[]) => { + nodes.sort((a, b) => compareLocations(a.location, b.location)); + for (const node of nodes) { + sortTree(node.children); + } + }; + sortTree(rootChildren); + + return rootChildren; +} - // Cursor state for sequential layout. +function layoutSequence( + items: readonly RenderTreeNode[], + context: LayoutContext, +): LayoutFragment { + const nodes: AnyXYNode[] = []; + const edges: Edge[] = []; + let bounds: Bounds | null = null; let currentY = 0; + let firstTargetId: string | undefined; + let firstStartedAt: number | undefined; let prevNodeId: string | null = null; let prevCompletedAt: number | undefined; let pendingBranchSources: string[] = []; + let outgoingSources: string[] = []; + let lastCompletedAt: number | undefined; - /** Connect one or more predecessors to a target node, with optional gap labels. */ - function connectTo(targetId: string, targetStartedAt?: number) { + const connectTo = (targetId: string, targetStartedAt?: number) => { + if (!firstTargetId) { + firstTargetId = targetId; + firstStartedAt = targetStartedAt; + } if (prevNodeId) { - const gap = - prevCompletedAt && - targetStartedAt && - targetStartedAt > prevCompletedAt - ? formatDuration(targetStartedAt - prevCompletedAt) - : undefined; edges.push({ id: `e-${prevNodeId}-${targetId}`, source: prevNodeId, target: targetId, - ...(gap && { - label: gap, - style: { stroke: "hsl(var(--muted-foreground))" }, - labelStyle: { - fill: "hsl(var(--muted-foreground))", - fontSize: 10, - }, - labelBgStyle: { - fill: "hsl(var(--background))", - fillOpacity: 0.8, - }, - }), + ...gapLabel(prevCompletedAt, targetStartedAt), }); } - for (const srcId of pendingBranchSources) { + for (const source of pendingBranchSources) { edges.push({ - id: `e-${srcId}-${targetId}`, - source: srcId, + id: `e-${source}-${targetId}`, + source, target: targetId, }); } pendingBranchSources = []; - } - - /** Place a sequential node, connect it, and advance the cursor. */ - function addSequentialNode( - id: string, - data: Parameters[3], - startedAt?: number, - ) { - nodes.push(makeNode(id, 0, currentY, data)); - connectTo(id, startedAt); - prevNodeId = id; - prevCompletedAt = data.completedAt; - currentY += NODE_HEIGHT + NODE_GAP_Y; - } + }; - /** Chain a list of child nodes inside a parent group, connecting them sequentially. Returns the last child id. */ - function addChildChain( - items: HistoryItem[], - parentId: string, - startY: number, - ): { lastChildId: string | null; endY: number } { - let y = startY; - let lastId: string | null = null; - - for (const item of items) { - const d = itemToNodeData(item); - const id = `child-${item.entry.id}`; - nodes.push(makeChildNode(id, parentId, y, { ...d, label: d.name })); - - if (lastId) { - edges.push({ - id: `e-${lastId}-${id}`, - source: lastId, - target: id, - }); - } - lastId = id; - y += NODE_HEIGHT + NODE_GAP_Y; + const applyContinuation = ( + nextSources: string[], + completedAt: number | undefined, + ) => { + if (nextSources.length === 1) { + [prevNodeId] = nextSources; + pendingBranchSources = []; + } else if (nextSources.length > 1) { + prevNodeId = null; + pendingBranchSources = [...nextSources]; + } else { + prevNodeId = null; + pendingBranchSources = []; } + outgoingSources = [...nextSources]; + prevCompletedAt = completedAt; + lastCompletedAt = completedAt; + }; - return { lastChildId: lastId, endY: y }; - } - - // ── Input meta node ── + for (const item of items) { + if (item.kind === "try") { + const childFragment = layoutSequence(item.children, { + ...context, + hasTryAncestor: true, + }); + const childWidth = childFragment.bounds + ? childFragment.bounds.maxX - childFragment.bounds.minX + : NODE_WIDTH; + const groupWidth = Math.max( + GROUP_WIDTH, + childWidth + 2 * LOOP_PADDING_X, + ); + const groupX = NODE_WIDTH / 2 - groupWidth / 2; + const translatedChildren = translateFragment( + childFragment, + groupX + LOOP_PADDING_X - (childFragment.bounds?.minX ?? 0), + currentY + + LOOP_HEADER_HEIGHT - + (childFragment.bounds?.minY ?? 0), + ); + const groupHeight = translatedChildren.bounds + ? translatedChildren.bounds.maxY - + currentY + + LOOP_PADDING_BOTTOM + : LOOP_HEADER_HEIGHT + LOOP_PADDING_BOTTOM; + const summary = summarizeTryGroup(item, context.workflow); + const groupNode: XYTryGroupNode = { + id: item.id, + type: "tryGroup", + position: { x: groupX, y: currentY }, + measured: { width: groupWidth, height: groupHeight }, + style: { width: groupWidth, height: groupHeight }, + data: { + label: item.name, + summary: summary.summary, + handledFailureCount: summary.handledFailureCount, + }, + }; - if (history.input !== undefined) { - addSequentialNode("meta-input", { - label: "Input", - summary: getEntrySummary("input", { value: history.input }), - entryType: "input", - status: "completed", - nodeKey: "input", - rawData: { value: history.input }, - }); - } + connectTo(item.id, translatedChildren.firstStartedAt); + nodes.push(groupNode, ...translatedChildren.nodes); + edges.push(...translatedChildren.edges); + bounds = mergeBounds(bounds, getNodeBounds(groupNode)); + bounds = mergeBounds(bounds, translatedChildren.bounds); - // ── Main loop over top-level items ── + currentY += groupHeight + NODE_GAP_Y; + applyContinuation( + translatedChildren.outgoingSources.length > 0 + ? translatedChildren.outgoingSources + : [item.id], + translatedChildren.lastCompletedAt, + ); + continue; + } - for (const item of topLevel) { - const entryType = item.entry.kind.type; - const d = itemToNodeData(item); + const entry = item.item.entry; + const nodeData = itemToNodeData(item.item, context.workflow, { + handledFailure: shouldMarkHandledFailure(item, context), + }); - if (entryType === "loop") { - const loopId = `loop-${item.entry.id}`; - const children = collectLoopChildren( - nestedByParent.get(item.key) ?? [], + if (entry.kind.type === "loop") { + const childFragment = layoutSequence(item.children, context); + const childWidth = childFragment.bounds + ? childFragment.bounds.maxX - childFragment.bounds.minX + : NODE_WIDTH; + const groupWidth = Math.max( + GROUP_WIDTH, + childWidth + 2 * LOOP_PADDING_X, ); - const height = groupHeight(children.length); - - nodes.push({ - id: loopId, - type: "loopGroup", - position: { x: -LOOP_PADDING_X, y: currentY }, - measured: { width: GROUP_WIDTH, height }, - style: { width: GROUP_WIDTH, height }, - data: { label: d.name, summary: d.summary }, - } as XYLoopGroupNode); - - connectTo(loopId, d.startedAt); - - const { lastChildId } = addChildChain( - children, - loopId, - LOOP_HEADER_HEIGHT, + const groupX = NODE_WIDTH / 2 - groupWidth / 2; + const translatedChildren = translateFragment( + childFragment, + groupX + LOOP_PADDING_X - (childFragment.bounds?.minX ?? 0), + currentY + + LOOP_HEADER_HEIGHT - + (childFragment.bounds?.minY ?? 0), ); + const groupHeight = translatedChildren.bounds + ? translatedChildren.bounds.maxY - + currentY + + LOOP_PADDING_BOTTOM + : LOOP_HEADER_HEIGHT + LOOP_PADDING_BOTTOM; + const groupId = `loop-${entry.id}`; + const groupNode: XYLoopGroupNode = { + id: groupId, + type: "loopGroup", + position: { x: groupX, y: currentY }, + measured: { width: groupWidth, height: groupHeight }, + style: { width: groupWidth, height: groupHeight }, + data: { + label: item.name, + summary: nodeData.summary, + }, + }; - currentY += height + NODE_GAP_Y; - prevNodeId = lastChildId ?? loopId; - prevCompletedAt = d.completedAt; - } else if (entryType === "join" || entryType === "race") { - // Header node for the join/race. - const headerId = `header-${item.entry.id}`; - addSequentialNode(headerId, { ...d, label: d.name }); + connectTo(groupId, nodeData.startedAt); + nodes.push(groupNode, ...translatedChildren.nodes); + edges.push(...translatedChildren.edges); + bounds = mergeBounds(bounds, getNodeBounds(groupNode)); + bounds = mergeBounds(bounds, translatedChildren.bounds); - const branchData = item.entry.kind.data as JoinEntry | RaceEntry; - const branchNames = Object.keys(branchData.branches); - const nested = nestedByParent.get(item.key) ?? []; + currentY += groupHeight + NODE_GAP_Y; + applyContinuation( + translatedChildren.outgoingSources.length > 0 + ? translatedChildren.outgoingSources + : [groupId], + nodeData.completedAt ?? translatedChildren.lastCompletedAt, + ); + continue; + } - // Build per-branch info. - const TERMINATION_GAP = 24; + if (entry.kind.type === "join" || entry.kind.type === "race") { + const headerId = `header-${entry.id}`; + const headerNode = makeNode(headerId, 0, currentY, { + ...nodeData, + label: item.name, + }); + connectTo(headerId, nodeData.startedAt); + nodes.push(headerNode); + bounds = mergeBounds(bounds, getNodeBounds(headerNode)); - const branches = branchNames.map((name) => { - const branchItems = nested.filter((ni) => - ni.key.includes(`/${name}/`), + const branchData = entry.kind.data as JoinEntry | RaceEntry; + const branchNames = Object.keys(branchData.branches); + const branchStartY = currentY + NODE_HEIGHT + NODE_GAP_Y; + const branchLayouts = branchNames.map((name) => { + const branchItems = item.children.filter( + (child) => + findBranchName( + child.location, + entry.location, + context.workflow.nameRegistry, + ) === name, ); - sortByLocation(branchItems); - const status = branchData.branches[name].status; - const isFailed = status === "failed" || status === "cancelled"; + const fragment = layoutSequence(branchItems, context); + const fragmentWidth = fragment.bounds + ? fragment.bounds.maxX - fragment.bounds.minX + : NODE_WIDTH; + const requiredWidth = Math.max( + GROUP_WIDTH, + fragmentWidth + 2 * LOOP_PADDING_X, + ); + const requiredHeight = fragment.bounds + ? fragment.bounds.maxY - + fragment.bounds.minY + + LOOP_HEADER_HEIGHT + + LOOP_PADDING_BOTTOM + : LOOP_HEADER_HEIGHT + LOOP_PADDING_BOTTOM; + return { name, - items: branchItems, - status, - isFailed, - height: groupHeight(branchItems.length), + fragment, + status: branchData.branches[name]?.status ?? "pending", + isFailed: + branchData.branches[name]?.status === "failed" || + branchData.branches[name]?.status === "cancelled", + requiredWidth, + requiredHeight, }; }); - const maxHeight = Math.max(...branches.map((b) => b.height)); + const branchWidth = Math.max( + GROUP_WIDTH, + ...branchLayouts.map((layout) => layout.requiredWidth), + ); + const branchHeight = Math.max( + LOOP_HEADER_HEIGHT + LOOP_PADDING_BOTTOM, + ...branchLayouts.map((layout) => layout.requiredHeight), + ); const totalWidth = - branches.length * GROUP_WIDTH + - (branches.length - 1) * BRANCH_GAP_X; - const startX = -totalWidth / 2 + GROUP_WIDTH / 2 - LOOP_PADDING_X; - const branchStartY = currentY; - const branchGroupIds: string[] = []; - - for (let i = 0; i < branches.length; i++) { - const branch = branches[i]; - const branchX = startX + i * (GROUP_WIDTH + BRANCH_GAP_X); - const groupId = `branchgroup-${item.entry.id}-${branch.name}`; - - nodes.push({ + branchLayouts.length * branchWidth + + Math.max(0, branchLayouts.length - 1) * BRANCH_GAP_X; + const startX = NODE_WIDTH / 2 - totalWidth / 2; + const nextSources: string[] = []; + + for (let i = 0; i < branchLayouts.length; i++) { + const branch = branchLayouts[i]; + const branchX = startX + i * (branchWidth + BRANCH_GAP_X); + const groupId = `branchgroup-${entry.id}-${branch.name}`; + const translatedChildren = translateFragment( + branch.fragment, + branchX + + LOOP_PADDING_X - + (branch.fragment.bounds?.minX ?? 0), + branchStartY + + LOOP_HEADER_HEIGHT - + (branch.fragment.bounds?.minY ?? 0), + ); + const groupNode: XYBranchGroupNode = { id: groupId, type: "branchGroup", position: { x: branchX, y: branchStartY }, - measured: { width: GROUP_WIDTH, height: maxHeight }, - style: { width: GROUP_WIDTH, height: maxHeight }, + measured: { width: branchWidth, height: branchHeight }, + style: { width: branchWidth, height: branchHeight }, data: { label: branch.name, - entryType: entryType as "join" | "race", + entryType: entry.kind.type, branchStatus: branch.status, }, - } as XYBranchGroupNode); + }; + nodes.push(groupNode, ...translatedChildren.nodes); edges.push({ id: `e-${headerId}-${groupId}`, source: headerId, target: groupId, }); - - addChildChain(branch.items, groupId, LOOP_HEADER_HEIGHT); + edges.push(...translatedChildren.edges); + bounds = mergeBounds(bounds, getNodeBounds(groupNode)); + bounds = mergeBounds(bounds, translatedChildren.bounds); if (!branch.isFailed) { - branchGroupIds.push(groupId); + nextSources.push(groupId); } } - // Place termination nodes below failed/cancelled branch groups. - const hasTerminations = branches.some((b) => b.isFailed); - const termY = branchStartY + maxHeight + TERMINATION_GAP; - - for (let i = 0; i < branches.length; i++) { - const branch = branches[i]; - if (!branch.isFailed) continue; - - const branchX = startX + i * (GROUP_WIDTH + BRANCH_GAP_X); - const groupId = `branchgroup-${item.entry.id}-${branch.name}`; - const termId = `term-${item.entry.id}-${branch.name}`; - const termX = - branchX + GROUP_WIDTH / 2 - TERMINATION_NODE_SIZE / 2; - - nodes.push({ - id: termId, - type: "termination", - position: { x: termX, y: termY }, - measured: { - width: TERMINATION_NODE_SIZE, - height: TERMINATION_NODE_SIZE, - }, - data: {}, - } as XYTerminationNode); + const hasTerminations = branchLayouts.some( + (branch) => branch.isFailed, + ); + const terminationY = branchStartY + branchHeight + TERMINATION_GAP; - edges.push({ - id: `e-${groupId}-${termId}`, - source: groupId, - target: termId, - }); + if (hasTerminations) { + for (let i = 0; i < branchLayouts.length; i++) { + const branch = branchLayouts[i]; + if (!branch.isFailed) { + continue; + } + + const branchX = startX + i * (branchWidth + BRANCH_GAP_X); + const termId = `term-${entry.id}-${branch.name}`; + const termNode: XYTerminationNode = { + id: termId, + type: "termination", + position: { + x: + branchX + + branchWidth / 2 - + TERMINATION_NODE_SIZE / 2, + y: terminationY, + }, + measured: { + width: TERMINATION_NODE_SIZE, + height: TERMINATION_NODE_SIZE, + }, + data: {}, + }; + + nodes.push(termNode); + edges.push({ + id: `e-branchgroup-${entry.id}-${branch.name}-${termId}`, + source: `branchgroup-${entry.id}-${branch.name}`, + target: termId, + }); + bounds = mergeBounds(bounds, getNodeBounds(termNode)); + } } - currentY = hasTerminations - ? termY + TERMINATION_NODE_SIZE + NODE_GAP_Y - : branchStartY + maxHeight + NODE_GAP_Y; - prevNodeId = null; - prevCompletedAt = d.completedAt; - pendingBranchSources = branchGroupIds; - } else { - addSequentialNode(`node-${item.entry.id}`, { ...d, label: d.name }); + currentY = + (hasTerminations + ? terminationY + TERMINATION_NODE_SIZE + : branchStartY + branchHeight) + NODE_GAP_Y; + applyContinuation(nextSources, nodeData.completedAt); + continue; } - } - // ── Output meta node ── + const nodeId = `node-${entry.id}`; + const node = makeNode(nodeId, 0, currentY, { + ...nodeData, + label: item.name, + }); - if (history.output !== undefined && history.state === "completed") { - const id = "meta-output"; - nodes.push( - makeNode(id, 0, currentY, { - label: "Output", - summary: getEntrySummary("output", { value: history.output }), - entryType: "output", - status: "completed", - nodeKey: "output", - rawData: { value: history.output }, - }), - ); - connectTo(id); + connectTo(nodeId, nodeData.startedAt); + nodes.push(node); + bounds = mergeBounds(bounds, getNodeBounds(node)); + currentY += NODE_HEIGHT + NODE_GAP_Y; + applyContinuation([nodeId], nodeData.completedAt); } - return { nodes, edges }; + return { + nodes, + edges, + bounds, + firstTargetId, + firstStartedAt, + outgoingSources, + lastCompletedAt, + }; } -// ─── Loop child collection ─────────────────────────────────── +export function workflowHistoryToXYFlow( + history: WorkflowHistory, +): LayoutResult { + const rootItems = buildRenderTree(history); + const rootFragment = layoutSequence(rootItems, { + workflow: history, + hasTryAncestor: false, + }); + + const nodes: AnyXYNode[] = []; + const edges: Edge[] = []; + let currentY = 0; + let prevNodeId: string | null = null; + let prevCompletedAt: number | undefined; + let pendingBranchSources: string[] = []; -/** Collect loop children from nested items, flattened across iterations and sorted. */ -function collectLoopChildren(items: HistoryItem[]): HistoryItem[] { - const iterationMap = new Map(); - for (const item of items) { - const marker = item.entry.location.find( - (s): s is LoopIterationMarker => - typeof s === "object" && "iteration" in s, - ); - if (marker) { - const list = iterationMap.get(marker.iteration) ?? []; - list.push(item); - iterationMap.set(marker.iteration, list); + const connectTo = (targetId: string, targetStartedAt?: number) => { + if (prevNodeId) { + edges.push({ + id: `e-${prevNodeId}-${targetId}`, + source: prevNodeId, + target: targetId, + ...gapLabel(prevCompletedAt, targetStartedAt), + }); } + for (const source of pendingBranchSources) { + edges.push({ + id: `e-${source}-${targetId}`, + source, + target: targetId, + }); + } + pendingBranchSources = []; + }; + + const applyContinuation = ( + nextSources: string[], + completedAt: number | undefined, + ) => { + if (nextSources.length === 1) { + [prevNodeId] = nextSources; + pendingBranchSources = []; + } else if (nextSources.length > 1) { + prevNodeId = null; + pendingBranchSources = [...nextSources]; + } else { + prevNodeId = null; + pendingBranchSources = []; + } + prevCompletedAt = completedAt; + }; + + if (history.input !== undefined) { + const inputNode = makeNode("meta-input", 0, currentY, { + label: "Input", + summary: getEntrySummary("input", { value: history.input }), + entryType: "input", + status: "completed", + nodeKey: "input", + rawData: { value: history.input }, + }); + nodes.push(inputNode); + currentY += NODE_HEIGHT + NODE_GAP_Y; + applyContinuation(["meta-input"], undefined); + } + + if (rootFragment.nodes.length > 0) { + const translatedRoot = translateFragment(rootFragment, 0, currentY); + if (translatedRoot.firstTargetId) { + connectTo( + translatedRoot.firstTargetId, + translatedRoot.firstStartedAt, + ); + } + nodes.push(...translatedRoot.nodes); + edges.push(...translatedRoot.edges); + currentY += translatedRoot.bounds + ? translatedRoot.bounds.maxY - + translatedRoot.bounds.minY + + NODE_GAP_Y + : 0; + applyContinuation( + translatedRoot.outgoingSources, + translatedRoot.lastCompletedAt, + ); } - const result: HistoryItem[] = []; - const sortedKeys = Array.from(iterationMap.keys()).sort((a, b) => a - b); - for (const key of sortedKeys) { - const iterItems = iterationMap.get(key) ?? []; - sortByLocation(iterItems); - result.push(...iterItems); + if (history.output !== undefined && history.state === "completed") { + const outputNode = makeNode("meta-output", 0, currentY, { + label: "Output", + summary: getEntrySummary("output", { value: history.output }), + entryType: "output", + status: "completed", + nodeKey: "output", + rawData: { value: history.output }, + }); + connectTo("meta-output"); + nodes.push(outputNode); } - return result; + + return { nodes, edges }; } diff --git a/frontend/src/components/actors/workflow/workflow-types.ts b/frontend/src/components/actors/workflow/workflow-types.ts index 95ade7c026..70f60531fa 100644 --- a/frontend/src/components/actors/workflow/workflow-types.ts +++ b/frontend/src/components/actors/workflow/workflow-types.ts @@ -100,8 +100,8 @@ export type EntryStatus = | "failed" | "retrying"; -// Extended type for visualization (includes meta nodes) -export type ExtendedEntryType = EntryKindType | "input" | "output"; +// Extended type for visualization (includes meta nodes and synthetic groups) +export type ExtendedEntryType = EntryKindType | "input" | "output" | "try"; export interface Entry { id: string; diff --git a/frontend/src/components/actors/workflow/workflow-visualizer.tsx b/frontend/src/components/actors/workflow/workflow-visualizer.tsx index 43194e9261..106471aded 100644 --- a/frontend/src/components/actors/workflow/workflow-visualizer.tsx +++ b/frontend/src/components/actors/workflow/workflow-visualizer.tsx @@ -13,7 +13,8 @@ import { import { useCallback, useMemo, useState } from "react"; import "@xyflow/react/dist/style.css"; import { faXmark, Icon } from "@rivet-gg/icons"; -import { cn, DiscreteCopyButton } from "@/components"; +import { DiscreteCopyButton } from "../../copy-area"; +import { cn } from "../../lib/utils"; import { ActorObjectInspector } from "../console/actor-inspector"; import { workflowHistoryToXYFlow } from "./workflow-to-xyflow"; import type { WorkflowHistory } from "./workflow-types"; @@ -33,6 +34,7 @@ type MetaExtendedEntryType = | "rollback_checkpoint" | "join" | "race" + | "try" | "removed" | "input" | "output"; @@ -158,6 +160,11 @@ export function WorkflowVisualizer({ {selectedNode.retryCount} retry(s) )} + {selectedNode.handledFailure && ( +
+ handled +
+ )}