-
Notifications
You must be signed in to change notification settings - Fork 3.6k
fix(table): trigger cascade race fixes, polling, workflow column flag #4499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ca1bc83
89c758a
590d1fd
3c9afcd
b23ba1b
2727b4c
7fcc8b6
c19ec14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| * React Query hooks for managing user-defined tables. | ||
| */ | ||
|
|
||
| import { useEffect, useMemo } from 'react' | ||
| import { createLogger } from '@sim/logger' | ||
| import { | ||
| type InfiniteData, | ||
|
|
@@ -68,7 +69,7 @@ import type { | |
| WorkflowGroupDependencies, | ||
| WorkflowGroupOutput, | ||
| } from '@/lib/table' | ||
| import { optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps' | ||
| import { areOutputsFilled, optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps' | ||
|
|
||
| /** Short poll to surface running → completed transitions from the server without a dedicated realtime channel. */ | ||
| const ROWS_POLL_INTERVAL_WHILE_RUNNING_MS = 1500 | ||
|
|
@@ -84,12 +85,75 @@ function hasRunningGroupExecution(rows: TableRow[] | undefined): boolean { | |
| return false | ||
| } | ||
|
|
||
| function hasRunningGroupExecutionInPages(pages: TableRowsResponse[] | undefined): boolean { | ||
| if (!pages) return false | ||
| for (const page of pages) { | ||
| if (hasRunningGroupExecution(page.rows)) return true | ||
| /** | ||
| * Shallow-equality on the fields the renderer reads from a row. Row data is | ||
| * also shallow-compared — workflow output cells are scalars, so `===` per key | ||
| * suffices. `executions` is a per-group exec metadata object; we compare each | ||
| * group's `(status, jobId, executionId, error)` tuple. Any deeper drift forces | ||
| * a fresh row reference, which is the safe default. | ||
| */ | ||
| function rowEqual(a: TableRow, b: TableRow): boolean { | ||
| if (a === b) return true | ||
| if (a.position !== b.position) return false | ||
| const aData = a.data ?? {} | ||
| const bData = b.data ?? {} | ||
| const aDataKeys = Object.keys(aData) | ||
| if (aDataKeys.length !== Object.keys(bData).length) return false | ||
| for (const k of aDataKeys) { | ||
| if (aData[k] !== bData[k]) return false | ||
| } | ||
| return false | ||
| const aExec = a.executions ?? {} | ||
| const bExec = b.executions ?? {} | ||
| const aExecKeys = Object.keys(aExec) | ||
| if (aExecKeys.length !== Object.keys(bExec).length) return false | ||
| for (const k of aExecKeys) { | ||
| const ax = aExec[k] | ||
| const bx = bExec[k] | ||
| if (ax === bx) continue | ||
| if (!ax || !bx) return false | ||
| if ( | ||
| ax.status !== bx.status || | ||
| ax.jobId !== bx.jobId || | ||
| ax.executionId !== bx.executionId || | ||
| ax.error !== bx.error | ||
| ) { | ||
| return false | ||
| } | ||
| } | ||
| return true | ||
| } | ||
|
|
||
| /** | ||
| * Replaces `prev.rows` element-by-element with `fresh.rows`, but reuses the | ||
| * `prev` reference for any row that hasn't changed. Memoized `<DataRow>` | ||
| * children short-circuit on row-identity, so a poll tick that arrives with | ||
| * 1000 rows but only flips the status of 5 only re-renders those 5 instead | ||
| * of every row in the page. | ||
| */ | ||
| function mergePagePreservingIdentity( | ||
| prev: TableRowsResponse, | ||
| fresh: TableRowsResponse | ||
| ): TableRowsResponse { | ||
| if (prev.rows === fresh.rows) return prev | ||
| const oldById = new Map(prev.rows.map((r) => [r.id, r])) | ||
| let changed = false | ||
| const merged = fresh.rows.map((freshRow) => { | ||
| const old = oldById.get(freshRow.id) | ||
| if (old && rowEqual(old, freshRow)) return old | ||
| changed = true | ||
| return freshRow | ||
| }) | ||
| if (!changed && merged.length === prev.rows.length) { | ||
| let identical = true | ||
| for (let i = 0; i < merged.length; i++) { | ||
| if (merged[i] !== prev.rows[i]) { | ||
| identical = false | ||
| break | ||
| } | ||
| } | ||
| if (identical && prev.totalCount === fresh.totalCount) return prev | ||
| } | ||
| return { ...fresh, rows: merged } | ||
| } | ||
|
|
||
| const logger = createLogger('TableQueries') | ||
|
|
@@ -293,9 +357,10 @@ export function useInfiniteTableRows({ | |
| filter: filter ?? null, | ||
| sort: sort ?? null, | ||
| }) | ||
| const queryKey = useMemo(() => tableKeys.infiniteRows(tableId, paramsKey), [tableId, paramsKey]) | ||
|
|
||
| return useInfiniteQuery({ | ||
| queryKey: tableKeys.infiniteRows(tableId, paramsKey), | ||
| const query = useInfiniteQuery({ | ||
| queryKey, | ||
| queryFn: ({ pageParam, signal }) => | ||
| fetchTableRows({ | ||
| workspaceId, | ||
|
|
@@ -314,23 +379,79 @@ export function useInfiniteTableRows({ | |
| }, | ||
| enabled: Boolean(workspaceId && tableId) && enabled, | ||
| staleTime: 30 * 1000, | ||
| /** | ||
| * Poll while any row has a `pending` or `running` group execution. | ||
| * Realtime sockets push every cell write, but cross-network paths | ||
| * (trigger.dev workers → realtime ECS, client through CloudFront/proxy) | ||
| * occasionally drop events. Polling at the running cadence is the | ||
| * safety net so cells reach their terminal state without a refresh. | ||
| * No polling when nothing is running and no polling while a mutation | ||
| * is in flight (optimistic-update guard). | ||
| */ | ||
| refetchInterval: (query) => { | ||
| if (queryClient.isMutating() > 0) return false | ||
| return hasRunningGroupExecutionInPages(query.state.data?.pages) | ||
| ? ROWS_POLL_INTERVAL_WHILE_RUNNING_MS | ||
| : false | ||
| }, | ||
| refetchIntervalInBackground: false, | ||
| }) | ||
|
|
||
| /** | ||
| * Per-page polling. Built-in `refetchInterval` would refetch every loaded | ||
| * page on each tick — wasteful when only one page has running cells. | ||
| * Instead, walk pages each tick and refetch ONLY the dirty ones, splicing | ||
| * results back into the cache. Polling stops when no page has in-flight | ||
| * cells, or while a mutation is running (optimistic-update guard). | ||
| */ | ||
| useEffect(() => { | ||
| if (!enabled || !workspaceId || !tableId) return | ||
| let cancelled = false | ||
| let timeoutId: ReturnType<typeof setTimeout> | null = null | ||
| const tick = async () => { | ||
| if (cancelled) return | ||
| if (queryClient.isMutating() === 0) { | ||
| const data = queryClient.getQueryData<InfiniteData<TableRowsResponse, number>>(queryKey) | ||
| const dirty: number[] = [] | ||
| if (data) { | ||
| for (let i = 0; i < data.pages.length; i++) { | ||
| if (hasRunningGroupExecution(data.pages[i].rows)) { | ||
| dirty.push(data.pageParams[i] ?? i * pageSize) | ||
| } | ||
| } | ||
| } | ||
| if (dirty.length > 0) { | ||
| await Promise.all( | ||
| dirty.map(async (offset) => { | ||
| try { | ||
| const fresh = await fetchTableRows({ | ||
| workspaceId, | ||
| tableId, | ||
| limit: pageSize, | ||
| offset, | ||
| filter, | ||
| sort, | ||
| includeTotal: offset === 0, | ||
| }) | ||
| if (cancelled) return | ||
| queryClient.setQueryData<InfiniteData<TableRowsResponse, number>>( | ||
| queryKey, | ||
| (prev) => { | ||
| if (!prev) return prev | ||
| const idx = prev.pageParams.indexOf(offset) | ||
| if (idx === -1) return prev | ||
| const merged = mergePagePreservingIdentity(prev.pages[idx], fresh) | ||
| if (merged === prev.pages[idx]) return prev | ||
| const nextPages = prev.pages.slice() | ||
| nextPages[idx] = merged | ||
| return { ...prev, pages: nextPages } | ||
| } | ||
| ) | ||
| } catch { | ||
| // Transient fetch failure — next tick retries. Don't kill the loop. | ||
| } | ||
| }) | ||
| ) | ||
| } | ||
| } | ||
| if (cancelled) return | ||
| // Recursive setTimeout instead of setInterval so a slow tick can't | ||
| // overlap the next one — out-of-order responses would otherwise let | ||
| // stale data overwrite fresh. | ||
| timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS) | ||
| } | ||
| timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS) | ||
| return () => { | ||
| cancelled = true | ||
| if (timeoutId !== null) clearTimeout(timeoutId) | ||
| } | ||
| }, [enabled, workspaceId, tableId, pageSize, filter, sort, queryClient, queryKey]) | ||
|
|
||
| return query | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1176,6 +1297,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) { | |
| onMutate: async ({ groupIds, runMode = 'all', rowIds }) => { | ||
| const targetRowIds = rowIds && rowIds.length > 0 ? new Set(rowIds) : null | ||
| const targetGroupIds = new Set(groupIds) | ||
| const groups = | ||
| queryClient.getQueryData<TableDefinition>(tableKeys.detail(tableId))?.schema | ||
| .workflowGroups ?? [] | ||
| const groupsById = new Map(groups.map((g) => [g.id, g])) | ||
| const snapshots = await snapshotAndMutateRows(queryClient, tableId, (r) => { | ||
| if (targetRowIds && !targetRowIds.has(r.id)) return null | ||
| const executions = r.executions ?? {} | ||
|
|
@@ -1184,7 +1309,15 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) { | |
| for (const groupId of targetGroupIds) { | ||
| const exec = executions[groupId] as RowExecutionMetadata | undefined | ||
| if (isOptimisticInFlight(exec)) continue | ||
| if (runMode === 'incomplete' && exec?.status === 'completed') continue | ||
| // Mirror server eligibility for `mode: 'incomplete'`: skip cells whose | ||
| // outputs are filled, regardless of exec status. A cancelled/error | ||
| // cell with a leftover value from a prior run was rendering as filled | ||
| // but flipping to "queued" optimistically here even though the server | ||
| // would skip it. | ||
| if (runMode === 'incomplete') { | ||
| const group = groupsById.get(groupId) | ||
| if (group && areOutputsFilled(group, r)) continue | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optimistic patch diverges from server eligibility checkMedium Severity The client optimistic patch for Reviewed by Cursor Bugbot for commit b23ba1b. Configure here. |
||
| next[groupId] = buildPendingExec(exec) | ||
| changed = true | ||
| } | ||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.