Skip to content
11 changes: 9 additions & 2 deletions apps/sim/app/api/table/[tableId]/columns/run/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { type NextRequest, NextResponse } from 'next/server'
import { runColumnContract } from '@/lib/api/contracts/tables'
import { parseRequest } from '@/lib/api/server'
Expand Down Expand Up @@ -29,15 +30,21 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
const access = await checkAccess(tableId, auth.userId, 'write')
if (!access.ok) return accessError(access, requestId, tableId)

const { triggered } = await runWorkflowColumn({
// Dispatch in the background — large fan-outs (thousands of rows) issue
// sequential trigger.dev calls and would otherwise hold the HTTP response
// open for minutes, blocking the AI/copilot tool span and the UI mutation.
void runWorkflowColumn({
tableId,
workspaceId,
groupIds,
mode: runMode,
rowIds,
requestId,
}).catch((err) => {
logger.error(`[${requestId}] run-column dispatch failed:`, toError(err).message)
})
return NextResponse.json({ success: true, data: { triggered } })

return NextResponse.json({ success: true, data: { triggered: null } })
} catch (error) {
if (error instanceof Error && error.message === 'Invalid workspace ID') {
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@ import {
DropdownMenuTrigger,
} from '@/components/emcn'
import { Plus } from '@/components/emcn/icons'
import { isWorkflowColumnsEnabledClient } from '@/lib/core/config/feature-flags'
import type { ColumnDefinition } from '@/lib/table'
import { COLUMN_TYPE_OPTIONS } from '../column-config-sidebar'

const VISIBLE_COLUMN_TYPE_OPTIONS = isWorkflowColumnsEnabledClient
? COLUMN_TYPE_OPTIONS
: COLUMN_TYPE_OPTIONS.filter((o) => o.type !== 'workflow')

const CELL_HEADER =
'border-[var(--border)] border-r border-b bg-[var(--bg)] px-2 py-[7px] text-left align-middle'

Expand Down Expand Up @@ -56,7 +61,7 @@ export function NewColumnDropdown({
)}
</DropdownMenuTrigger>
<DropdownMenuContent align='start' side='bottom' sideOffset={4}>
{COLUMN_TYPE_OPTIONS.map((option) => {
{VISIBLE_COLUMN_TYPE_OPTIONS.map((option) => {
const Icon = option.icon
const onSelect =
option.type === 'workflow'
Expand Down
183 changes: 158 additions & 25 deletions apps/sim/hooks/queries/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* React Query hooks for managing user-defined tables.
*/

import { useEffect, useMemo } from 'react'
import { createLogger } from '@sim/logger'
import {
type InfiniteData,
Expand Down Expand Up @@ -68,7 +69,7 @@ import type {
WorkflowGroupDependencies,
WorkflowGroupOutput,
} from '@/lib/table'
import { optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps'
import { areOutputsFilled, optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps'

/** Short poll to surface running → completed transitions from the server without a dedicated realtime channel. */
const ROWS_POLL_INTERVAL_WHILE_RUNNING_MS = 1500
Expand All @@ -84,12 +85,75 @@ function hasRunningGroupExecution(rows: TableRow[] | undefined): boolean {
return false
}

function hasRunningGroupExecutionInPages(pages: TableRowsResponse[] | undefined): boolean {
if (!pages) return false
for (const page of pages) {
if (hasRunningGroupExecution(page.rows)) return true
/**
* Shallow-equality on the fields the renderer reads from a row. Row data is
* also shallow-compared — workflow output cells are scalars, so `===` per key
* suffices. `executions` is a per-group exec metadata object; we compare each
* group's `(status, jobId, executionId, error)` tuple. Any deeper drift forces
* a fresh row reference, which is the safe default.
*/
function rowEqual(a: TableRow, b: TableRow): boolean {
if (a === b) return true
if (a.position !== b.position) return false
const aData = a.data ?? {}
const bData = b.data ?? {}
const aDataKeys = Object.keys(aData)
if (aDataKeys.length !== Object.keys(bData).length) return false
for (const k of aDataKeys) {
if (aData[k] !== bData[k]) return false
}
return false
const aExec = a.executions ?? {}
const bExec = b.executions ?? {}
const aExecKeys = Object.keys(aExec)
if (aExecKeys.length !== Object.keys(bExec).length) return false
for (const k of aExecKeys) {
const ax = aExec[k]
const bx = bExec[k]
if (ax === bx) continue
if (!ax || !bx) return false
if (
ax.status !== bx.status ||
ax.jobId !== bx.jobId ||
ax.executionId !== bx.executionId ||
ax.error !== bx.error
) {
return false
}
}
return true
}

/**
* Replaces `prev.rows` element-by-element with `fresh.rows`, but reuses the
* `prev` reference for any row that hasn't changed. Memoized `<DataRow>`
* children short-circuit on row-identity, so a poll tick that arrives with
* 1000 rows but only flips the status of 5 only re-renders those 5 instead
* of every row in the page.
*/
function mergePagePreservingIdentity(
prev: TableRowsResponse,
fresh: TableRowsResponse
): TableRowsResponse {
if (prev.rows === fresh.rows) return prev
const oldById = new Map(prev.rows.map((r) => [r.id, r]))
let changed = false
const merged = fresh.rows.map((freshRow) => {
const old = oldById.get(freshRow.id)
if (old && rowEqual(old, freshRow)) return old
changed = true
return freshRow
})
if (!changed && merged.length === prev.rows.length) {
let identical = true
for (let i = 0; i < merged.length; i++) {
if (merged[i] !== prev.rows[i]) {
identical = false
break
}
}
if (identical && prev.totalCount === fresh.totalCount) return prev
}
return { ...fresh, rows: merged }
}

const logger = createLogger('TableQueries')
Expand Down Expand Up @@ -293,9 +357,10 @@ export function useInfiniteTableRows({
filter: filter ?? null,
sort: sort ?? null,
})
const queryKey = useMemo(() => tableKeys.infiniteRows(tableId, paramsKey), [tableId, paramsKey])

return useInfiniteQuery({
queryKey: tableKeys.infiniteRows(tableId, paramsKey),
const query = useInfiniteQuery({
queryKey,
queryFn: ({ pageParam, signal }) =>
fetchTableRows({
workspaceId,
Expand All @@ -314,23 +379,79 @@ export function useInfiniteTableRows({
},
enabled: Boolean(workspaceId && tableId) && enabled,
staleTime: 30 * 1000,
/**
* Poll while any row has a `pending` or `running` group execution.
* Realtime sockets push every cell write, but cross-network paths
* (trigger.dev workers → realtime ECS, client through CloudFront/proxy)
* occasionally drop events. Polling at the running cadence is the
* safety net so cells reach their terminal state without a refresh.
* No polling when nothing is running and no polling while a mutation
* is in flight (optimistic-update guard).
*/
refetchInterval: (query) => {
if (queryClient.isMutating() > 0) return false
return hasRunningGroupExecutionInPages(query.state.data?.pages)
? ROWS_POLL_INTERVAL_WHILE_RUNNING_MS
: false
},
refetchIntervalInBackground: false,
})

/**
* Per-page polling. Built-in `refetchInterval` would refetch every loaded
* page on each tick — wasteful when only one page has running cells.
* Instead, walk pages each tick and refetch ONLY the dirty ones, splicing
* results back into the cache. Polling stops when no page has in-flight
* cells, or while a mutation is running (optimistic-update guard).
*/
useEffect(() => {
if (!enabled || !workspaceId || !tableId) return
let cancelled = false
let timeoutId: ReturnType<typeof setTimeout> | null = null
const tick = async () => {
if (cancelled) return
if (queryClient.isMutating() === 0) {
const data = queryClient.getQueryData<InfiniteData<TableRowsResponse, number>>(queryKey)
const dirty: number[] = []
if (data) {
for (let i = 0; i < data.pages.length; i++) {
if (hasRunningGroupExecution(data.pages[i].rows)) {
dirty.push(data.pageParams[i] ?? i * pageSize)
}
}
}
if (dirty.length > 0) {
await Promise.all(
dirty.map(async (offset) => {
try {
const fresh = await fetchTableRows({
workspaceId,
tableId,
limit: pageSize,
offset,
filter,
sort,
includeTotal: offset === 0,
})
if (cancelled) return
queryClient.setQueryData<InfiniteData<TableRowsResponse, number>>(
queryKey,
(prev) => {
if (!prev) return prev
const idx = prev.pageParams.indexOf(offset)
if (idx === -1) return prev
const merged = mergePagePreservingIdentity(prev.pages[idx], fresh)
if (merged === prev.pages[idx]) return prev
const nextPages = prev.pages.slice()
nextPages[idx] = merged
return { ...prev, pages: nextPages }
}
)
} catch {
// Transient fetch failure — next tick retries. Don't kill the loop.
}
})
)
}
}
if (cancelled) return
// Recursive setTimeout instead of setInterval so a slow tick can't
// overlap the next one — out-of-order responses would otherwise let
// stale data overwrite fresh.
timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS)
Comment thread
TheodoreSpeaks marked this conversation as resolved.
}
timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS)
return () => {
cancelled = true
if (timeoutId !== null) clearTimeout(timeoutId)
}
}, [enabled, workspaceId, tableId, pageSize, filter, sort, queryClient, queryKey])

return query
}

/**
Expand Down Expand Up @@ -1176,6 +1297,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
onMutate: async ({ groupIds, runMode = 'all', rowIds }) => {
const targetRowIds = rowIds && rowIds.length > 0 ? new Set(rowIds) : null
const targetGroupIds = new Set(groupIds)
const groups =
queryClient.getQueryData<TableDefinition>(tableKeys.detail(tableId))?.schema
.workflowGroups ?? []
const groupsById = new Map(groups.map((g) => [g.id, g]))
const snapshots = await snapshotAndMutateRows(queryClient, tableId, (r) => {
if (targetRowIds && !targetRowIds.has(r.id)) return null
const executions = r.executions ?? {}
Expand All @@ -1184,7 +1309,15 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
for (const groupId of targetGroupIds) {
const exec = executions[groupId] as RowExecutionMetadata | undefined
if (isOptimisticInFlight(exec)) continue
if (runMode === 'incomplete' && exec?.status === 'completed') continue
// Mirror server eligibility for `mode: 'incomplete'`: skip cells whose
// outputs are filled, regardless of exec status. A cancelled/error
// cell with a leftover value from a prior run was rendering as filled
// but flipping to "queued" optimistically here even though the server
// would skip it.
if (runMode === 'incomplete') {
const group = groupsById.get(groupId)
if (group && areOutputsFilled(group, r)) continue
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimistic patch diverges from server eligibility check

Medium Severity

The client optimistic patch for mode='incomplete' skips cells solely based on areOutputsFilled(group, r), but the server's classifyEligibility skips only when status === 'completed' && areOutputsFilled(group, row) (the completedAndFilled variable). For cancelled or error cells with leftover output values, the client incorrectly skips them (no "queued" feedback) even though the server considers them eligible and will run them. The comment claims it "mirrors server eligibility" but it's missing the exec?.status === 'completed' condition the server requires.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit b23ba1b. Configure here.

next[groupId] = buildPendingExec(exec)
changed = true
}
Expand Down
7 changes: 6 additions & 1 deletion apps/sim/lib/api/contracts/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,12 @@ export const runColumnContract = defineRouteContract({
body: runColumnBodySchema,
response: {
mode: 'json',
schema: successResponseSchema(z.object({ triggered: z.number() })),
/**
* `triggered` is `null` when the dispatcher runs in the background — the
* actual count is only known after a fan-out that may be tens of thousands
* of rows, and we don't hold the HTTP response open for that long.
*/
schema: successResponseSchema(z.object({ triggered: z.number().nullable() })),
},
})

Expand Down
11 changes: 8 additions & 3 deletions apps/sim/lib/copilot/tools/server/table/user-table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1415,19 +1415,24 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
}
const requestId = generateId().slice(0, 8)
assertNotAborted()
const { triggered } = await runWorkflowColumn({
// Dispatch in the background — large fan-outs (thousands of rows)
// issue sequential trigger.dev calls and would otherwise hold the
// tool span open for minutes, blocking the chat connection.
void runWorkflowColumn({
tableId: args.tableId,
workspaceId,
groupIds,
mode: runMode,
rowIds,
requestId,
}).catch((err) => {
logger.error(`[${requestId}] run_column dispatch failed`, err)
})
const scopeLabel = rowIds ? `${rowIds.length} row(s) by id` : runMode
return {
success: true,
message: `Triggered ${triggered} row(s) across ${groupIds.length} column(s) (${scopeLabel})`,
data: { triggered },
message: `Started running ${groupIds.length} column(s) (${scopeLabel}). Cells will populate as workflows complete.`,
data: { triggered: null },
}
}

Expand Down
2 changes: 2 additions & 0 deletions apps/sim/lib/core/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ export const env = createEnv({
NEXT_PUBLIC_AUDIT_LOGS_ENABLED: z.boolean().optional(), // Enable audit logs on self-hosted (bypasses hosted requirements)
NEXT_PUBLIC_DATA_RETENTION_ENABLED: z.boolean().optional(), // Enable data retention settings on self-hosted (bypasses hosted requirements)
NEXT_PUBLIC_DATA_DRAINS_ENABLED: z.boolean().optional(), // Enable data drains on self-hosted (bypasses hosted requirements)
NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED: z.boolean().optional(), // Show the "Workflow" column type in user tables (defaults to false)
NEXT_PUBLIC_ORGANIZATIONS_ENABLED: z.boolean().optional(), // Enable organizations on self-hosted (bypasses plan requirements)
NEXT_PUBLIC_DISABLE_INVITATIONS: z.boolean().optional(), // Disable workspace invitations globally (for self-hosted deployments)
NEXT_PUBLIC_DISABLE_PUBLIC_API: z.boolean().optional(), // Disable public API access UI toggle globally
Expand Down Expand Up @@ -493,6 +494,7 @@ export const env = createEnv({
NEXT_PUBLIC_AUDIT_LOGS_ENABLED: process.env.NEXT_PUBLIC_AUDIT_LOGS_ENABLED,
NEXT_PUBLIC_DATA_RETENTION_ENABLED: process.env.NEXT_PUBLIC_DATA_RETENTION_ENABLED,
NEXT_PUBLIC_DATA_DRAINS_ENABLED: process.env.NEXT_PUBLIC_DATA_DRAINS_ENABLED,
NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED: process.env.NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED,
NEXT_PUBLIC_ORGANIZATIONS_ENABLED: process.env.NEXT_PUBLIC_ORGANIZATIONS_ENABLED,
NEXT_PUBLIC_DISABLE_INVITATIONS: process.env.NEXT_PUBLIC_DISABLE_INVITATIONS,
NEXT_PUBLIC_DISABLE_PUBLIC_API: process.env.NEXT_PUBLIC_DISABLE_PUBLIC_API,
Expand Down
9 changes: 9 additions & 0 deletions apps/sim/lib/core/config/feature-flags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ export const isDataRetentionEnabled = isTruthy(env.DATA_RETENTION_ENABLED)
*/
export const isDataDrainsEnabled = isTruthy(env.DATA_DRAINS_ENABLED)

/**
* Are workflow output columns enabled in user tables.
* Defaults to false; set NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED=true to show
* the "Workflow" column type in the new-column dropdown.
*/
export const isWorkflowColumnsEnabledClient = isTruthy(
getEnv('NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED')
)

/**
* Is E2B enabled for remote code execution
*/
Expand Down
Loading
Loading