Skip to content

Commit 43f9119

Browse files
icecrasher321Vikhyath Mondreti
andauthored
improvement(workflow-state): split workflow state into separate tables (#511)
* new tables to track workflow state * fix lint * refactor into separate tables * fix typing * fix lint * add tests * fix lint * add correct foreign key constraint * add self ref * remove unused checks * fix types * fix type --------- Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
1 parent 1100bfd commit 43f9119

File tree

9 files changed

+4945
-5
lines changed

9 files changed

+4945
-5
lines changed

apps/sim/app/api/workflows/[id]/route.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { and, eq } from 'drizzle-orm'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { getSession } from '@/lib/auth'
44
import { createLogger } from '@/lib/logs/console-logger'
5+
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
56
import { db } from '@/db'
67
import { workflow, workspaceMember } from '@/db/schema'
78

@@ -10,6 +11,7 @@ const logger = createLogger('WorkflowByIdAPI')
1011
/**
1112
* GET /api/workflows/[id]
1213
* Fetch a single workflow by ID
14+
* Uses hybrid approach: try normalized tables first, fallback to JSON blob
1315
*/
1416
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
1517
const requestId = crypto.randomUUID().slice(0, 8)
@@ -69,10 +71,43 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
6971
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
7072
}
7173

74+
// Try to load from normalized tables first
75+
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
76+
77+
const finalWorkflowData = { ...workflowData }
78+
79+
if (normalizedData) {
80+
// Use normalized table data - reconstruct complete state object
81+
// First get any existing state properties, then override with normalized data
82+
const existingState =
83+
workflowData.state && typeof workflowData.state === 'object' ? workflowData.state : {}
84+
85+
finalWorkflowData.state = {
86+
// Default values for expected properties
87+
deploymentStatuses: {},
88+
hasActiveSchedule: false,
89+
hasActiveWebhook: false,
90+
// Preserve any existing state properties
91+
...existingState,
92+
// Override with normalized data (this takes precedence)
93+
blocks: normalizedData.blocks,
94+
edges: normalizedData.edges,
95+
loops: normalizedData.loops,
96+
parallels: normalizedData.parallels,
97+
lastSaved: Date.now(),
98+
isDeployed: workflowData.isDeployed || false,
99+
deployedAt: workflowData.deployedAt,
100+
}
101+
logger.info(`[${requestId}] Loaded workflow ${workflowId} from normalized tables`)
102+
} else {
103+
// Fallback to JSON blob
104+
logger.info(`[${requestId}] Using JSON blob for workflow ${workflowId}`)
105+
}
106+
72107
const elapsed = Date.now() - startTime
73108
logger.info(`[${requestId}] Successfully fetched workflow ${workflowId} in ${elapsed}ms`)
74109

75-
return NextResponse.json({ data: workflowData }, { status: 200 })
110+
return NextResponse.json({ data: finalWorkflowData }, { status: 200 })
76111
} catch (error: any) {
77112
const elapsed = Date.now() - startTime
78113
logger.error(`[${requestId}] Error fetching workflow ${workflowId} after ${elapsed}ms`, error)

apps/sim/app/api/workflows/sync/route.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server'
33
import { z } from 'zod'
44
import { getSession } from '@/lib/auth'
55
import { createLogger } from '@/lib/logs/console-logger'
6+
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers'
67
import { db } from '@/db'
78
import { workflow, workspace, workspaceMember } from '@/db/schema'
89

@@ -386,6 +387,34 @@ export async function POST(req: NextRequest) {
386387
// Ensure the workflow has the correct workspaceId
387388
const effectiveWorkspaceId = clientWorkflow.workspaceId || workspaceId
388389

390+
// Save to normalized tables for all workflows (hybrid approach)
391+
const normalizedResult = await saveWorkflowToNormalizedTables(id, {
392+
blocks: clientWorkflow.state.blocks || {},
393+
edges: clientWorkflow.state.edges || [],
394+
loops: clientWorkflow.state.loops || {},
395+
parallels: clientWorkflow.state.parallels || {},
396+
lastSaved: clientWorkflow.state.lastSaved,
397+
isDeployed: clientWorkflow.state.isDeployed,
398+
deployedAt: clientWorkflow.state.deployedAt,
399+
deploymentStatuses: (clientWorkflow.state as any).deploymentStatuses || {},
400+
hasActiveSchedule: (clientWorkflow.state as any).hasActiveSchedule,
401+
hasActiveWebhook: (clientWorkflow.state as any).hasActiveWebhook,
402+
})
403+
404+
// Use the JSON blob from normalized save for compatibility, or fallback to original state
405+
const stateToSave =
406+
normalizedResult.success && normalizedResult.jsonBlob
407+
? normalizedResult.jsonBlob
408+
: clientWorkflow.state
409+
410+
if (normalizedResult.success) {
411+
logger.info(`[${requestId}] Saved workflow ${id} to normalized tables`)
412+
} else {
413+
logger.warn(
414+
`[${requestId}] Failed to save workflow ${id} to normalized tables: ${normalizedResult.error}`
415+
)
416+
}
417+
389418
if (!dbWorkflow) {
390419
// New workflow - create (state is required by schema)
391420
operations.push(
@@ -397,7 +426,7 @@ export async function POST(req: NextRequest) {
397426
name: clientWorkflow.name,
398427
description: clientWorkflow.description,
399428
color: clientWorkflow.color,
400-
state: clientWorkflow.state,
429+
state: stateToSave,
401430
marketplaceData: clientWorkflow.marketplaceData || null,
402431
lastSynced: now,
403432
createdAt: now,
@@ -451,8 +480,8 @@ export async function POST(req: NextRequest) {
451480
}
452481

453482
// Always update state since we only sync the active workflow with valid state
454-
if (JSON.stringify(dbWorkflow.state) !== JSON.stringify(clientWorkflow.state)) {
455-
updateData.state = clientWorkflow.state
483+
if (JSON.stringify(dbWorkflow.state) !== JSON.stringify(stateToSave)) {
484+
updateData.state = stateToSave
456485
needsUpdate = true
457486
}
458487

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
CREATE TABLE "workflow_blocks" (
2+
"id" text PRIMARY KEY NOT NULL,
3+
"workflow_id" text NOT NULL,
4+
"type" text NOT NULL,
5+
"name" text NOT NULL,
6+
"position_x" integer NOT NULL,
7+
"position_y" integer NOT NULL,
8+
"enabled" boolean DEFAULT true NOT NULL,
9+
"horizontal_handles" boolean DEFAULT true NOT NULL,
10+
"is_wide" boolean DEFAULT false NOT NULL,
11+
"height" integer DEFAULT 0 NOT NULL,
12+
"sub_blocks" jsonb DEFAULT '{}' NOT NULL,
13+
"outputs" jsonb DEFAULT '{}' NOT NULL,
14+
"data" jsonb DEFAULT '{}',
15+
"parent_id" text,
16+
"extent" text,
17+
"created_at" timestamp DEFAULT now() NOT NULL,
18+
"updated_at" timestamp DEFAULT now() NOT NULL
19+
);
20+
--> statement-breakpoint
21+
CREATE TABLE "workflow_edges" (
22+
"id" text PRIMARY KEY NOT NULL,
23+
"workflow_id" text NOT NULL,
24+
"source_block_id" text NOT NULL,
25+
"target_block_id" text NOT NULL,
26+
"source_handle" text,
27+
"target_handle" text,
28+
"created_at" timestamp DEFAULT now() NOT NULL
29+
);
30+
--> statement-breakpoint
31+
CREATE TABLE "workflow_subflows" (
32+
"id" text PRIMARY KEY NOT NULL,
33+
"workflow_id" text NOT NULL,
34+
"type" text NOT NULL,
35+
"config" jsonb DEFAULT '{}' NOT NULL,
36+
"created_at" timestamp DEFAULT now() NOT NULL,
37+
"updated_at" timestamp DEFAULT now() NOT NULL
38+
);
39+
--> statement-breakpoint
40+
ALTER TABLE "workflow_blocks" ADD CONSTRAINT "workflow_blocks_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
41+
ALTER TABLE "workflow_blocks" ADD CONSTRAINT "workflow_blocks_parent_id_workflow_blocks_id_fk" FOREIGN KEY ("parent_id") REFERENCES "public"."workflow_blocks"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
42+
ALTER TABLE "workflow_edges" ADD CONSTRAINT "workflow_edges_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
43+
ALTER TABLE "workflow_edges" ADD CONSTRAINT "workflow_edges_source_block_id_workflow_blocks_id_fk" FOREIGN KEY ("source_block_id") REFERENCES "public"."workflow_blocks"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
44+
ALTER TABLE "workflow_edges" ADD CONSTRAINT "workflow_edges_target_block_id_workflow_blocks_id_fk" FOREIGN KEY ("target_block_id") REFERENCES "public"."workflow_blocks"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
45+
ALTER TABLE "workflow_subflows" ADD CONSTRAINT "workflow_subflows_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
46+
CREATE INDEX "workflow_blocks_workflow_id_idx" ON "workflow_blocks" USING btree ("workflow_id");--> statement-breakpoint
47+
CREATE INDEX "workflow_blocks_parent_id_idx" ON "workflow_blocks" USING btree ("parent_id");--> statement-breakpoint
48+
CREATE INDEX "workflow_blocks_workflow_parent_idx" ON "workflow_blocks" USING btree ("workflow_id","parent_id");--> statement-breakpoint
49+
CREATE INDEX "workflow_blocks_workflow_type_idx" ON "workflow_blocks" USING btree ("workflow_id","type");--> statement-breakpoint
50+
CREATE INDEX "workflow_edges_workflow_id_idx" ON "workflow_edges" USING btree ("workflow_id");--> statement-breakpoint
51+
CREATE INDEX "workflow_edges_source_block_idx" ON "workflow_edges" USING btree ("source_block_id");--> statement-breakpoint
52+
CREATE INDEX "workflow_edges_target_block_idx" ON "workflow_edges" USING btree ("target_block_id");--> statement-breakpoint
53+
CREATE INDEX "workflow_edges_workflow_source_idx" ON "workflow_edges" USING btree ("workflow_id","source_block_id");--> statement-breakpoint
54+
CREATE INDEX "workflow_edges_workflow_target_idx" ON "workflow_edges" USING btree ("workflow_id","target_block_id");--> statement-breakpoint
55+
CREATE INDEX "workflow_edges_source_block_fk_idx" ON "workflow_edges" USING btree ("source_block_id");--> statement-breakpoint
56+
CREATE INDEX "workflow_edges_target_block_fk_idx" ON "workflow_edges" USING btree ("target_block_id");--> statement-breakpoint
57+
CREATE INDEX "workflow_subflows_workflow_id_idx" ON "workflow_subflows" USING btree ("workflow_id");--> statement-breakpoint
58+
CREATE INDEX "workflow_subflows_workflow_type_idx" ON "workflow_subflows" USING btree ("workflow_id","type");

0 commit comments

Comments
 (0)