diff --git a/apps/sim/app/api/table/[tableId]/columns/run/route.ts b/apps/sim/app/api/table/[tableId]/columns/run/route.ts index 7b997c9c23..eddfc416e0 100644 --- a/apps/sim/app/api/table/[tableId]/columns/run/route.ts +++ b/apps/sim/app/api/table/[tableId]/columns/run/route.ts @@ -1,4 +1,5 @@ import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' import { type NextRequest, NextResponse } from 'next/server' import { runColumnContract } from '@/lib/api/contracts/tables' import { parseRequest } from '@/lib/api/server' @@ -29,15 +30,21 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro const access = await checkAccess(tableId, auth.userId, 'write') if (!access.ok) return accessError(access, requestId, tableId) - const { triggered } = await runWorkflowColumn({ + // Dispatch in the background — large fan-outs (thousands of rows) issue + // sequential trigger.dev calls and would otherwise hold the HTTP response + // open for minutes, blocking the AI/copilot tool span and the UI mutation. + void runWorkflowColumn({ tableId, workspaceId, groupIds, mode: runMode, rowIds, requestId, + }).catch((err) => { + logger.error(`[${requestId}] run-column dispatch failed:`, toError(err).message) }) - return NextResponse.json({ success: true, data: { triggered } }) + + return NextResponse.json({ success: true, data: { triggered: null } }) } catch (error) { if (error instanceof Error && error.message === 'Invalid workspace ID') { return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 }) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/new-column-dropdown/new-column-dropdown.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/new-column-dropdown/new-column-dropdown.tsx index 8330e581cf..773e4ba0cf 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/new-column-dropdown/new-column-dropdown.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/new-column-dropdown/new-column-dropdown.tsx @@ -8,9 +8,14 @@ import { DropdownMenuTrigger, } from '@/components/emcn' import { Plus } from '@/components/emcn/icons' +import { isWorkflowColumnsEnabledClient } from '@/lib/core/config/feature-flags' import type { ColumnDefinition } from '@/lib/table' import { COLUMN_TYPE_OPTIONS } from '../column-config-sidebar' +const VISIBLE_COLUMN_TYPE_OPTIONS = isWorkflowColumnsEnabledClient + ? COLUMN_TYPE_OPTIONS + : COLUMN_TYPE_OPTIONS.filter((o) => o.type !== 'workflow') + const CELL_HEADER = 'border-[var(--border)] border-r border-b bg-[var(--bg)] px-2 py-[7px] text-left align-middle' @@ -56,7 +61,7 @@ export function NewColumnDropdown({ )} - {COLUMN_TYPE_OPTIONS.map((option) => { + {VISIBLE_COLUMN_TYPE_OPTIONS.map((option) => { const Icon = option.icon const onSelect = option.type === 'workflow' diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index 5314220f84..e2cf11ac08 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -4,6 +4,7 @@ * React Query hooks for managing user-defined tables. */ +import { useEffect, useMemo } from 'react' import { createLogger } from '@sim/logger' import { type InfiniteData, @@ -68,7 +69,7 @@ import type { WorkflowGroupDependencies, WorkflowGroupOutput, } from '@/lib/table' -import { optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps' +import { areOutputsFilled, optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps' /** Short poll to surface running → completed transitions from the server without a dedicated realtime channel. */ const ROWS_POLL_INTERVAL_WHILE_RUNNING_MS = 1500 @@ -84,12 +85,75 @@ function hasRunningGroupExecution(rows: TableRow[] | undefined): boolean { return false } -function hasRunningGroupExecutionInPages(pages: TableRowsResponse[] | undefined): boolean { - if (!pages) return false - for (const page of pages) { - if (hasRunningGroupExecution(page.rows)) return true +/** + * Shallow-equality on the fields the renderer reads from a row. Row data is + * also shallow-compared — workflow output cells are scalars, so `===` per key + * suffices. `executions` is a per-group exec metadata object; we compare each + * group's `(status, jobId, executionId, error)` tuple. Any deeper drift forces + * a fresh row reference, which is the safe default. + */ +function rowEqual(a: TableRow, b: TableRow): boolean { + if (a === b) return true + if (a.position !== b.position) return false + const aData = a.data ?? {} + const bData = b.data ?? {} + const aDataKeys = Object.keys(aData) + if (aDataKeys.length !== Object.keys(bData).length) return false + for (const k of aDataKeys) { + if (aData[k] !== bData[k]) return false } - return false + const aExec = a.executions ?? {} + const bExec = b.executions ?? {} + const aExecKeys = Object.keys(aExec) + if (aExecKeys.length !== Object.keys(bExec).length) return false + for (const k of aExecKeys) { + const ax = aExec[k] + const bx = bExec[k] + if (ax === bx) continue + if (!ax || !bx) return false + if ( + ax.status !== bx.status || + ax.jobId !== bx.jobId || + ax.executionId !== bx.executionId || + ax.error !== bx.error + ) { + return false + } + } + return true +} + +/** + * Replaces `prev.rows` element-by-element with `fresh.rows`, but reuses the + * `prev` reference for any row that hasn't changed. Memoized `` + * children short-circuit on row-identity, so a poll tick that arrives with + * 1000 rows but only flips the status of 5 only re-renders those 5 instead + * of every row in the page. + */ +function mergePagePreservingIdentity( + prev: TableRowsResponse, + fresh: TableRowsResponse +): TableRowsResponse { + if (prev.rows === fresh.rows) return prev + const oldById = new Map(prev.rows.map((r) => [r.id, r])) + let changed = false + const merged = fresh.rows.map((freshRow) => { + const old = oldById.get(freshRow.id) + if (old && rowEqual(old, freshRow)) return old + changed = true + return freshRow + }) + if (!changed && merged.length === prev.rows.length) { + let identical = true + for (let i = 0; i < merged.length; i++) { + if (merged[i] !== prev.rows[i]) { + identical = false + break + } + } + if (identical && prev.totalCount === fresh.totalCount) return prev + } + return { ...fresh, rows: merged } } const logger = createLogger('TableQueries') @@ -293,9 +357,10 @@ export function useInfiniteTableRows({ filter: filter ?? null, sort: sort ?? null, }) + const queryKey = useMemo(() => tableKeys.infiniteRows(tableId, paramsKey), [tableId, paramsKey]) - return useInfiniteQuery({ - queryKey: tableKeys.infiniteRows(tableId, paramsKey), + const query = useInfiniteQuery({ + queryKey, queryFn: ({ pageParam, signal }) => fetchTableRows({ workspaceId, @@ -314,23 +379,79 @@ export function useInfiniteTableRows({ }, enabled: Boolean(workspaceId && tableId) && enabled, staleTime: 30 * 1000, - /** - * Poll while any row has a `pending` or `running` group execution. - * Realtime sockets push every cell write, but cross-network paths - * (trigger.dev workers → realtime ECS, client through CloudFront/proxy) - * occasionally drop events. Polling at the running cadence is the - * safety net so cells reach their terminal state without a refresh. - * No polling when nothing is running and no polling while a mutation - * is in flight (optimistic-update guard). - */ - refetchInterval: (query) => { - if (queryClient.isMutating() > 0) return false - return hasRunningGroupExecutionInPages(query.state.data?.pages) - ? ROWS_POLL_INTERVAL_WHILE_RUNNING_MS - : false - }, - refetchIntervalInBackground: false, }) + + /** + * Per-page polling. Built-in `refetchInterval` would refetch every loaded + * page on each tick — wasteful when only one page has running cells. + * Instead, walk pages each tick and refetch ONLY the dirty ones, splicing + * results back into the cache. Polling stops when no page has in-flight + * cells, or while a mutation is running (optimistic-update guard). + */ + useEffect(() => { + if (!enabled || !workspaceId || !tableId) return + let cancelled = false + let timeoutId: ReturnType | null = null + const tick = async () => { + if (cancelled) return + if (queryClient.isMutating() === 0) { + const data = queryClient.getQueryData>(queryKey) + const dirty: number[] = [] + if (data) { + for (let i = 0; i < data.pages.length; i++) { + if (hasRunningGroupExecution(data.pages[i].rows)) { + dirty.push(data.pageParams[i] ?? i * pageSize) + } + } + } + if (dirty.length > 0) { + await Promise.all( + dirty.map(async (offset) => { + try { + const fresh = await fetchTableRows({ + workspaceId, + tableId, + limit: pageSize, + offset, + filter, + sort, + includeTotal: offset === 0, + }) + if (cancelled) return + queryClient.setQueryData>( + queryKey, + (prev) => { + if (!prev) return prev + const idx = prev.pageParams.indexOf(offset) + if (idx === -1) return prev + const merged = mergePagePreservingIdentity(prev.pages[idx], fresh) + if (merged === prev.pages[idx]) return prev + const nextPages = prev.pages.slice() + nextPages[idx] = merged + return { ...prev, pages: nextPages } + } + ) + } catch { + // Transient fetch failure — next tick retries. Don't kill the loop. + } + }) + ) + } + } + if (cancelled) return + // Recursive setTimeout instead of setInterval so a slow tick can't + // overlap the next one — out-of-order responses would otherwise let + // stale data overwrite fresh. + timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS) + } + timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS) + return () => { + cancelled = true + if (timeoutId !== null) clearTimeout(timeoutId) + } + }, [enabled, workspaceId, tableId, pageSize, filter, sort, queryClient, queryKey]) + + return query } /** @@ -1176,6 +1297,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) { onMutate: async ({ groupIds, runMode = 'all', rowIds }) => { const targetRowIds = rowIds && rowIds.length > 0 ? new Set(rowIds) : null const targetGroupIds = new Set(groupIds) + const groups = + queryClient.getQueryData(tableKeys.detail(tableId))?.schema + .workflowGroups ?? [] + const groupsById = new Map(groups.map((g) => [g.id, g])) const snapshots = await snapshotAndMutateRows(queryClient, tableId, (r) => { if (targetRowIds && !targetRowIds.has(r.id)) return null const executions = r.executions ?? {} @@ -1184,7 +1309,15 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) { for (const groupId of targetGroupIds) { const exec = executions[groupId] as RowExecutionMetadata | undefined if (isOptimisticInFlight(exec)) continue - if (runMode === 'incomplete' && exec?.status === 'completed') continue + // Mirror server eligibility for `mode: 'incomplete'`: skip cells whose + // outputs are filled, regardless of exec status. A cancelled/error + // cell with a leftover value from a prior run was rendering as filled + // but flipping to "queued" optimistically here even though the server + // would skip it. + if (runMode === 'incomplete') { + const group = groupsById.get(groupId) + if (group && areOutputsFilled(group, r)) continue + } next[groupId] = buildPendingExec(exec) changed = true } diff --git a/apps/sim/lib/api/contracts/tables.ts b/apps/sim/lib/api/contracts/tables.ts index 40733c09a8..5c7b467e8f 100644 --- a/apps/sim/lib/api/contracts/tables.ts +++ b/apps/sim/lib/api/contracts/tables.ts @@ -895,7 +895,12 @@ export const runColumnContract = defineRouteContract({ body: runColumnBodySchema, response: { mode: 'json', - schema: successResponseSchema(z.object({ triggered: z.number() })), + /** + * `triggered` is `null` when the dispatcher runs in the background — the + * actual count is only known after a fan-out that may be tens of thousands + * of rows, and we don't hold the HTTP response open for that long. + */ + schema: successResponseSchema(z.object({ triggered: z.number().nullable() })), }, }) diff --git a/apps/sim/lib/copilot/tools/server/table/user-table.ts b/apps/sim/lib/copilot/tools/server/table/user-table.ts index d3b768d699..e6464e87af 100644 --- a/apps/sim/lib/copilot/tools/server/table/user-table.ts +++ b/apps/sim/lib/copilot/tools/server/table/user-table.ts @@ -1415,19 +1415,24 @@ export const userTableServerTool: BaseServerTool } const requestId = generateId().slice(0, 8) assertNotAborted() - const { triggered } = await runWorkflowColumn({ + // Dispatch in the background — large fan-outs (thousands of rows) + // issue sequential trigger.dev calls and would otherwise hold the + // tool span open for minutes, blocking the chat connection. + void runWorkflowColumn({ tableId: args.tableId, workspaceId, groupIds, mode: runMode, rowIds, requestId, + }).catch((err) => { + logger.error(`[${requestId}] run_column dispatch failed`, err) }) const scopeLabel = rowIds ? `${rowIds.length} row(s) by id` : runMode return { success: true, - message: `Triggered ${triggered} row(s) across ${groupIds.length} column(s) (${scopeLabel})`, - data: { triggered }, + message: `Started running ${groupIds.length} column(s) (${scopeLabel}). Cells will populate as workflows complete.`, + data: { triggered: null }, } } diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 4d13831659..c95fa7fa99 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -455,6 +455,7 @@ export const env = createEnv({ NEXT_PUBLIC_AUDIT_LOGS_ENABLED: z.boolean().optional(), // Enable audit logs on self-hosted (bypasses hosted requirements) NEXT_PUBLIC_DATA_RETENTION_ENABLED: z.boolean().optional(), // Enable data retention settings on self-hosted (bypasses hosted requirements) NEXT_PUBLIC_DATA_DRAINS_ENABLED: z.boolean().optional(), // Enable data drains on self-hosted (bypasses hosted requirements) + NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED: z.boolean().optional(), // Show the "Workflow" column type in user tables (defaults to false) NEXT_PUBLIC_ORGANIZATIONS_ENABLED: z.boolean().optional(), // Enable organizations on self-hosted (bypasses plan requirements) NEXT_PUBLIC_DISABLE_INVITATIONS: z.boolean().optional(), // Disable workspace invitations globally (for self-hosted deployments) NEXT_PUBLIC_DISABLE_PUBLIC_API: z.boolean().optional(), // Disable public API access UI toggle globally @@ -493,6 +494,7 @@ export const env = createEnv({ NEXT_PUBLIC_AUDIT_LOGS_ENABLED: process.env.NEXT_PUBLIC_AUDIT_LOGS_ENABLED, NEXT_PUBLIC_DATA_RETENTION_ENABLED: process.env.NEXT_PUBLIC_DATA_RETENTION_ENABLED, NEXT_PUBLIC_DATA_DRAINS_ENABLED: process.env.NEXT_PUBLIC_DATA_DRAINS_ENABLED, + NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED: process.env.NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED, NEXT_PUBLIC_ORGANIZATIONS_ENABLED: process.env.NEXT_PUBLIC_ORGANIZATIONS_ENABLED, NEXT_PUBLIC_DISABLE_INVITATIONS: process.env.NEXT_PUBLIC_DISABLE_INVITATIONS, NEXT_PUBLIC_DISABLE_PUBLIC_API: process.env.NEXT_PUBLIC_DISABLE_PUBLIC_API, diff --git a/apps/sim/lib/core/config/feature-flags.ts b/apps/sim/lib/core/config/feature-flags.ts index e4c1b7f444..f1d6e959f3 100644 --- a/apps/sim/lib/core/config/feature-flags.ts +++ b/apps/sim/lib/core/config/feature-flags.ts @@ -141,6 +141,15 @@ export const isDataRetentionEnabled = isTruthy(env.DATA_RETENTION_ENABLED) */ export const isDataDrainsEnabled = isTruthy(env.DATA_DRAINS_ENABLED) +/** + * Are workflow output columns enabled in user tables. + * Defaults to false; set NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED=true to show + * the "Workflow" column type in the new-column dropdown. + */ +export const isWorkflowColumnsEnabledClient = isTruthy( + getEnv('NEXT_PUBLIC_WORKFLOW_COLUMNS_ENABLED') +) + /** * Is E2B enabled for remote code execution */ diff --git a/apps/sim/lib/table/cell-write.ts b/apps/sim/lib/table/cell-write.ts index 24826fe5d1..932b2d136a 100644 --- a/apps/sim/lib/table/cell-write.ts +++ b/apps/sim/lib/table/cell-write.ts @@ -56,24 +56,29 @@ export async function writeWorkflowGroupState( } const current = row.executions?.[groupId] as RowExecutionMetadata | undefined // Stale-worker guard: only blocks writes FROM an old worker (status = - // running / completed / error / pending). A `queued` stamp is the scheduler - // claiming the cell for a brand-new run — the new executionId is supposed - // to overwrite whatever was there. Same for `cancelled` (authoritative). - // Without this carve-out, the new run's stamp gets rejected and the cell - // is stuck in its old state forever. - const isAuthoritativeNewStamp = - payload.executionState.status === 'queued' || payload.executionState.status === 'cancelled' - if ( - !isAuthoritativeNewStamp && - current && - current.executionId && - current.executionId !== executionId - ) { + // running / completed / error / pending). A `queued` stamp from the + // scheduler can claim the cell for a brand-new run — that's the new + // authority. Same for `cancelled` (always authoritative, written by stop). + const isCancelStamp = payload.executionState.status === 'cancelled' + const isQueuedStamp = payload.executionState.status === 'queued' + const isNewQueuedStamp = isQueuedStamp && current?.executionId !== executionId + const bypassStaleWorker = isNewQueuedStamp || isCancelStamp + if (!bypassStaleWorker && current && current.executionId && current.executionId !== executionId) { logger.info( `Skipping group write — stale worker (table=${tableId} row=${rowId} group=${groupId} mine=${executionId} active=${current.executionId})` ) return 'skipped' } + // A late `queued` stamp for the SAME run that's already moved past queued + // (worker called markWorkflowGroupPickedUp before our parallel stamp landed) + // must NOT overwrite the further-along state. Without this, a cell can show + // "queued" forever while the worker is actually running. + if (isQueuedStamp && current?.executionId === executionId && current.status !== 'pending') { + logger.info( + `Skipping queued stamp — same run already at status=${current.status} (table=${tableId} row=${rowId} group=${groupId} executionId=${executionId})` + ) + return 'skipped' + } if ( current?.status === 'cancelled' && current.executionId === executionId && @@ -89,7 +94,7 @@ export async function writeWorkflowGroupState( // stamps from the scheduler also bypass — they ARE the new authority. Cell- // task writes (running/completed/error) get the SQL guard so an in-flight // partial can't clobber a stop click or a newer run that already committed. - const cancellationGuard = isAuthoritativeNewStamp ? undefined : { groupId, executionId } + const cancellationGuard = bypassStaleWorker ? undefined : { groupId, executionId } const result = await updateRow( { tableId, diff --git a/apps/sim/lib/table/workflow-columns.ts b/apps/sim/lib/table/workflow-columns.ts index 71b091c2df..4cb72dc9d2 100644 --- a/apps/sim/lib/table/workflow-columns.ts +++ b/apps/sim/lib/table/workflow-columns.ts @@ -92,7 +92,8 @@ export function isGroupEligible( row: TableRow, opts?: { isManualRun?: boolean; mode?: 'all' | 'incomplete' } ): boolean { - return classifyEligibility(group, row, opts) === 'eligible' + const reason = classifyEligibility(group, row, opts) + return reason === 'eligible' || reason === 'manual-bypass' } /** @@ -142,7 +143,7 @@ export async function scheduleRunsForRows( mode: opts?.mode, }) reasonCounts[reason] = (reasonCounts[reason] ?? 0) + 1 - if (reason !== 'eligible') continue + if (reason !== 'eligible' && reason !== 'manual-bypass') continue pendingRuns.push({ tableId: table.id, tableName: table.name,