Skip to content

Commit 9796ce3

Browse files
reorganize db to treat each column separately
1 parent 9bdd42e commit 9796ce3

28 files changed

Lines changed: 17871 additions & 1022 deletions

File tree

apps/sim/app/api/table/[tableId]/cancel-runs/route.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { z } from 'zod'
44
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
55
import { generateRequestId } from '@/lib/core/utils/request'
66
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
7-
import { cancelWorkflowColumnRuns } from '@/lib/table/workflow-columns'
7+
import { cancelWorkflowGroupRuns } from '@/lib/table/workflow-columns'
88
import { accessError, checkAccess } from '@/app/api/table/utils'
99

1010
const logger = createLogger('TableCancelRunsAPI')
@@ -50,7 +50,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
5050
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
5151
}
5252

53-
const cancelled = await cancelWorkflowColumnRuns(
53+
const cancelled = await cancelWorkflowGroupRuns(
5454
tableId,
5555
validated.scope === 'row' ? validated.rowId : undefined
5656
)

apps/sim/app/api/table/[tableId]/columns/route.ts

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import {
1010
renameColumn,
1111
updateColumnConstraints,
1212
updateColumnType,
13-
updateColumnWorkflowConfig,
1413
} from '@/lib/table'
1514
import {
1615
accessError,
@@ -119,12 +118,6 @@ export const PATCH = withRouteHandler(
119118
}
120119

121120
if (updates.type) {
122-
if (updates.type === 'workflow' && !updates.workflowConfig) {
123-
return NextResponse.json(
124-
{ error: 'workflowConfig is required when setting type to workflow' },
125-
{ status: 400 }
126-
)
127-
}
128121
updatedTable = await updateColumnType(
129122
{ tableId, columnName: updates.name ?? validated.columnName, newType: updates.type },
130123
requestId
@@ -143,17 +136,6 @@ export const PATCH = withRouteHandler(
143136
)
144137
}
145138

146-
if (updates.workflowConfig) {
147-
updatedTable = await updateColumnWorkflowConfig(
148-
{
149-
tableId,
150-
columnName: updates.name ?? validated.columnName,
151-
workflowConfig: updates.workflowConfig,
152-
},
153-
requestId
154-
)
155-
}
156-
157139
if (!updatedTable) {
158140
return NextResponse.json({ error: 'No updates specified' }, { status: 400 })
159141
}

apps/sim/app/api/table/[tableId]/columns/[columnName]/run/route.ts renamed to apps/sim/app/api/table/[tableId]/groups/[groupId]/run/route.ts

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,31 @@ import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
99
import { generateRequestId } from '@/lib/core/utils/request'
1010
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
1111
import { batchUpdateRows } from '@/lib/table'
12-
import type { RowData, TableRow, WorkflowCellValue } from '@/lib/table'
13-
import { areWorkflowColumnDepsSatisfied } from '@/lib/table/workflow-columns'
12+
import type { RowData, RowExecutionMetadata, RowExecutions, TableRow } from '@/lib/table'
13+
import { areGroupDepsSatisfied } from '@/lib/table/workflow-columns'
1414
import { accessError, checkAccess } from '@/app/api/table/utils'
1515

16-
const logger = createLogger('TableRunColumnAPI')
16+
const logger = createLogger('TableRunGroupAPI')
1717

1818
const RunSchema = z.object({
1919
workspaceId: z.string().min(1, 'Workspace ID is required'),
2020
})
2121

2222
interface RouteParams {
23-
params: Promise<{ tableId: string; columnName: string }>
23+
params: Promise<{ tableId: string; groupId: string }>
2424
}
2525

2626
/**
27-
* POST /api/table/[tableId]/columns/[columnName]/run
27+
* POST /api/table/[tableId]/groups/[groupId]/run
2828
*
29-
* Manually triggers a workflow column run for every row in the table. Each
30-
* cell is force-reset to `pending`, which fires the scheduler and enqueues
31-
* a per-cell trigger.dev job.
29+
* Manually triggers the workflow group for every eligible row in the table.
30+
* Each eligible row's `executions[groupId]` is reset to `pending` so the
31+
* scheduler picks it up and enqueues a per-cell trigger.dev job. Rows whose
32+
* deps aren't satisfied or whose group is already running are skipped.
3233
*/
3334
export const POST = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
3435
const requestId = generateRequestId()
35-
const { tableId, columnName } = await params
36+
const { tableId, groupId } = await params
3637

3738
try {
3839
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
@@ -51,22 +52,17 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
5152
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
5253
}
5354

54-
const column = table.schema.columns.find((c) => c.name === columnName)
55-
if (!column || column.type !== 'workflow' || !column.workflowConfig?.workflowId) {
56-
return NextResponse.json(
57-
{ error: 'Column is not a configured workflow column' },
58-
{ status: 400 }
59-
)
55+
const group = (table.schema.workflowGroups ?? []).find((g) => g.id === groupId)
56+
if (!group) {
57+
return NextResponse.json({ error: 'Workflow group not found' }, { status: 404 })
6058
}
6159

62-
const workflowId = column.workflowConfig.workflowId
63-
const columnIndex = table.schema.columns.findIndex((c) => c.name === columnName)
64-
6560
const allRows = await db
6661
.select({
6762
id: userTableRows.id,
6863
position: userTableRows.position,
6964
data: userTableRows.data,
65+
executions: userTableRows.executions,
7066
createdAt: userTableRows.createdAt,
7167
updatedAt: userTableRows.updatedAt,
7268
})
@@ -83,22 +79,24 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
8379
return NextResponse.json({ success: true, data: { triggered: 0 } })
8480
}
8581

86-
// Only target rows whose deps are satisfied AND aren't already running.
87-
// Forcing every row through `pending` would leave dep-unsatisfied rows
88-
// stuck pending forever (the scheduler's eligibility predicate filters
89-
// them out), and would re-issue runs that are already in flight.
82+
// Only target rows whose deps are satisfied AND whose group isn't running.
83+
// Force-resetting every row would leave dep-unsatisfied rows stuck `pending`
84+
// forever (the scheduler's eligibility check filters them out anyway), and
85+
// would re-issue runs already in flight.
9086
const eligibleRows = allRows.filter((r) => {
9187
const tableRow: TableRow = {
9288
id: r.id,
9389
data: r.data as RowData,
90+
executions: (r.executions as RowExecutions) ?? {},
9491
position: r.position,
9592
createdAt: r.createdAt,
9693
updatedAt: r.updatedAt,
9794
}
98-
const cell = (r.data as RowData)[columnName] as WorkflowCellValue | null | undefined
99-
if (cell?.status === 'running') return false
95+
const exec = tableRow.executions[groupId]
96+
if (exec?.status === 'running') return false
97+
if (exec?.status === 'pending' && exec?.jobId) return false
10098
try {
101-
return areWorkflowColumnDepsSatisfied(column, columnIndex, tableRow, table.schema)
99+
return areGroupDepsSatisfied(group, tableRow)
102100
} catch {
103101
return false
104102
}
@@ -109,17 +107,17 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
109107
}
110108

111109
const updates = eligibleRows.map((r) => {
112-
const pendingCell: WorkflowCellValue = {
110+
const pendingExec: RowExecutionMetadata = {
111+
status: 'pending',
113112
executionId: generateId(),
114113
jobId: null,
115-
workflowId,
116-
status: 'pending',
117-
output: null,
114+
workflowId: group.workflowId,
118115
error: null,
119116
}
120117
return {
121118
rowId: r.id,
122-
data: { [columnName]: pendingCell as unknown as RowData[string] } as RowData,
119+
data: {} as RowData,
120+
executionsPatch: { [groupId]: pendingExec },
123121
}
124122
})
125123

@@ -144,7 +142,7 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
144142
{ status: 400 }
145143
)
146144
}
147-
logger.error(`run-column failed for ${tableId}/${columnName}:`, error)
148-
return NextResponse.json({ error: 'Failed to run column' }, { status: 500 })
145+
logger.error(`run-group failed for ${tableId}/${groupId}:`, error)
146+
return NextResponse.json({ error: 'Failed to run group' }, { status: 500 })
149147
}
150148
})
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import { createLogger } from '@sim/logger'
2+
import { type NextRequest, NextResponse } from 'next/server'
3+
import { z } from 'zod'
4+
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
5+
import { generateRequestId } from '@/lib/core/utils/request'
6+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
7+
import { addWorkflowGroup, deleteWorkflowGroup, updateWorkflowGroup } from '@/lib/table/service'
8+
import {
9+
accessError,
10+
AddWorkflowGroupSchema,
11+
checkAccess,
12+
DeleteWorkflowGroupSchema,
13+
normalizeColumn,
14+
UpdateWorkflowGroupSchema,
15+
} from '@/app/api/table/utils'
16+
17+
const logger = createLogger('TableWorkflowGroupsAPI')
18+
19+
interface RouteParams {
20+
params: Promise<{ tableId: string }>
21+
}
22+
23+
/** POST /api/table/[tableId]/groups — create a workflow group + its output columns. */
24+
export const POST = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
25+
const requestId = generateRequestId()
26+
const { tableId } = await params
27+
try {
28+
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
29+
if (!authResult.success || !authResult.userId) {
30+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
31+
}
32+
const body = await request.json()
33+
const validated = AddWorkflowGroupSchema.parse(body)
34+
const result = await checkAccess(tableId, authResult.userId, 'write')
35+
if (!result.ok) return accessError(result, requestId, tableId)
36+
if (result.table.workspaceId !== validated.workspaceId) {
37+
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
38+
}
39+
const updatedTable = await addWorkflowGroup(
40+
{ tableId, group: validated.group, outputColumns: validated.outputColumns },
41+
requestId
42+
)
43+
return NextResponse.json({
44+
success: true,
45+
data: {
46+
columns: updatedTable.schema.columns.map(normalizeColumn),
47+
workflowGroups: updatedTable.schema.workflowGroups ?? [],
48+
},
49+
})
50+
} catch (error) {
51+
if (error instanceof z.ZodError) {
52+
return NextResponse.json(
53+
{ error: 'Invalid request data', details: error.errors },
54+
{ status: 400 }
55+
)
56+
}
57+
if (error instanceof Error) {
58+
const msg = error.message
59+
if (msg === 'Table not found') return NextResponse.json({ error: msg }, { status: 404 })
60+
if (
61+
msg.includes('already exists') ||
62+
msg.includes('Schema validation') ||
63+
msg.includes('exceed')
64+
) {
65+
return NextResponse.json({ error: msg }, { status: 400 })
66+
}
67+
}
68+
logger.error(`POST groups failed for ${tableId}:`, error)
69+
return NextResponse.json({ error: 'Failed to add workflow group' }, { status: 500 })
70+
}
71+
})
72+
73+
/** PATCH /api/table/[tableId]/groups — update a workflow group (deps / outputs). */
74+
export const PATCH = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
75+
const requestId = generateRequestId()
76+
const { tableId } = await params
77+
try {
78+
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
79+
if (!authResult.success || !authResult.userId) {
80+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
81+
}
82+
const body = await request.json()
83+
const validated = UpdateWorkflowGroupSchema.parse(body)
84+
const result = await checkAccess(tableId, authResult.userId, 'write')
85+
if (!result.ok) return accessError(result, requestId, tableId)
86+
if (result.table.workspaceId !== validated.workspaceId) {
87+
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
88+
}
89+
const updatedTable = await updateWorkflowGroup(
90+
{
91+
tableId,
92+
groupId: validated.groupId,
93+
...(validated.workflowId !== undefined ? { workflowId: validated.workflowId } : {}),
94+
...(validated.name !== undefined ? { name: validated.name } : {}),
95+
...(validated.dependencies !== undefined
96+
? { dependencies: validated.dependencies }
97+
: {}),
98+
...(validated.outputs !== undefined ? { outputs: validated.outputs } : {}),
99+
...(validated.newOutputColumns !== undefined
100+
? { newOutputColumns: validated.newOutputColumns }
101+
: {}),
102+
},
103+
requestId
104+
)
105+
return NextResponse.json({
106+
success: true,
107+
data: {
108+
columns: updatedTable.schema.columns.map(normalizeColumn),
109+
workflowGroups: updatedTable.schema.workflowGroups ?? [],
110+
},
111+
})
112+
} catch (error) {
113+
if (error instanceof z.ZodError) {
114+
return NextResponse.json(
115+
{ error: 'Invalid request data', details: error.errors },
116+
{ status: 400 }
117+
)
118+
}
119+
if (error instanceof Error) {
120+
const msg = error.message
121+
if (msg.includes('not found')) return NextResponse.json({ error: msg }, { status: 404 })
122+
if (
123+
msg.includes('Schema validation') ||
124+
msg.includes('Missing column definition') ||
125+
msg.includes('already exists')
126+
) {
127+
return NextResponse.json({ error: msg }, { status: 400 })
128+
}
129+
}
130+
logger.error(`PATCH groups failed for ${tableId}:`, error)
131+
return NextResponse.json({ error: 'Failed to update workflow group' }, { status: 500 })
132+
}
133+
})
134+
135+
/** DELETE /api/table/[tableId]/groups — remove a workflow group + its columns. */
136+
export const DELETE = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
137+
const requestId = generateRequestId()
138+
const { tableId } = await params
139+
try {
140+
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
141+
if (!authResult.success || !authResult.userId) {
142+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
143+
}
144+
const body = await request.json()
145+
const validated = DeleteWorkflowGroupSchema.parse(body)
146+
const result = await checkAccess(tableId, authResult.userId, 'write')
147+
if (!result.ok) return accessError(result, requestId, tableId)
148+
if (result.table.workspaceId !== validated.workspaceId) {
149+
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
150+
}
151+
const updatedTable = await deleteWorkflowGroup(
152+
{ tableId, groupId: validated.groupId },
153+
requestId
154+
)
155+
return NextResponse.json({
156+
success: true,
157+
data: {
158+
columns: updatedTable.schema.columns.map(normalizeColumn),
159+
workflowGroups: updatedTable.schema.workflowGroups ?? [],
160+
},
161+
})
162+
} catch (error) {
163+
if (error instanceof z.ZodError) {
164+
return NextResponse.json(
165+
{ error: 'Invalid request data', details: error.errors },
166+
{ status: 400 }
167+
)
168+
}
169+
if (error instanceof Error && error.message.includes('not found')) {
170+
return NextResponse.json({ error: error.message }, { status: 404 })
171+
}
172+
logger.error(`DELETE groups failed for ${tableId}:`, error)
173+
return NextResponse.json({ error: 'Failed to delete workflow group' }, { status: 500 })
174+
}
175+
})

apps/sim/app/api/table/[tableId]/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ export const GET = withRouteHandler(async (request: NextRequest, { params }: Tab
6464
description: table.description,
6565
schema: {
6666
columns: schemaData.columns.map(normalizeColumn),
67+
...(schemaData.workflowGroups
68+
? { workflowGroups: schemaData.workflowGroups }
69+
: {}),
6770
},
6871
metadata: table.metadata ?? null,
6972
rowCount: table.rowCount,

0 commit comments

Comments
 (0)