44 * React Query hooks for managing user-defined tables.
55 */
66
7+ import { useEffect } from 'react'
78import { createLogger } from '@sim/logger'
89import {
910 type InfiniteData ,
@@ -68,7 +69,7 @@ import type {
6869 WorkflowGroupDependencies ,
6970 WorkflowGroupOutput ,
7071} from '@/lib/table'
71- import { optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps'
72+ import { areOutputsFilled , optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps'
7273
7374/** Short poll to surface running → completed transitions from the server without a dedicated realtime channel. */
7475const ROWS_POLL_INTERVAL_WHILE_RUNNING_MS = 1500
@@ -84,14 +85,6 @@ function hasRunningGroupExecution(rows: TableRow[] | undefined): boolean {
8485 return false
8586}
8687
87- function hasRunningGroupExecutionInPages ( pages : TableRowsResponse [ ] | undefined ) : boolean {
88- if ( ! pages ) return false
89- for ( const page of pages ) {
90- if ( hasRunningGroupExecution ( page . rows ) ) return true
91- }
92- return false
93- }
94-
9588const logger = createLogger ( 'TableQueries' )
9689
9790type TableQueryScope = 'active' | 'archived' | 'all'
@@ -293,9 +286,10 @@ export function useInfiniteTableRows({
293286 filter : filter ?? null ,
294287 sort : sort ?? null ,
295288 } )
289+ const queryKey = tableKeys . infiniteRows ( tableId , paramsKey )
296290
297- return useInfiniteQuery ( {
298- queryKey : tableKeys . infiniteRows ( tableId , paramsKey ) ,
291+ const query = useInfiniteQuery ( {
292+ queryKey,
299293 queryFn : ( { pageParam, signal } ) =>
300294 fetchTableRows ( {
301295 workspaceId,
@@ -314,23 +308,65 @@ export function useInfiniteTableRows({
314308 } ,
315309 enabled : Boolean ( workspaceId && tableId ) && enabled ,
316310 staleTime : 30 * 1000 ,
317- /**
318- * Poll while any row has a `pending` or `running` group execution.
319- * Realtime sockets push every cell write, but cross-network paths
320- * (trigger.dev workers → realtime ECS, client through CloudFront/proxy)
321- * occasionally drop events. Polling at the running cadence is the
322- * safety net so cells reach their terminal state without a refresh.
323- * No polling when nothing is running and no polling while a mutation
324- * is in flight (optimistic-update guard).
325- */
326- refetchInterval : ( query ) => {
327- if ( queryClient . isMutating ( ) > 0 ) return false
328- return hasRunningGroupExecutionInPages ( query . state . data ?. pages )
329- ? ROWS_POLL_INTERVAL_WHILE_RUNNING_MS
330- : false
331- } ,
332- refetchIntervalInBackground : false ,
333311 } )
312+
313+ /**
314+ * Per-page polling. Built-in `refetchInterval` would refetch every loaded
315+ * page on each tick — wasteful when only one page has running cells.
316+ * Instead, walk pages each tick and refetch ONLY the dirty ones, splicing
317+ * results back into the cache. Polling stops when no page has in-flight
318+ * cells, or while a mutation is running (optimistic-update guard).
319+ */
320+ useEffect ( ( ) => {
321+ if ( ! enabled || ! workspaceId || ! tableId ) return
322+ let cancelled = false
323+ const tick = async ( ) => {
324+ if ( cancelled ) return
325+ if ( queryClient . isMutating ( ) > 0 ) return
326+ const data = queryClient . getQueryData < InfiniteData < TableRowsResponse , number > > ( queryKey )
327+ if ( ! data ) return
328+ const dirty : number [ ] = [ ]
329+ for ( let i = 0 ; i < data . pages . length ; i ++ ) {
330+ if ( hasRunningGroupExecution ( data . pages [ i ] . rows ) ) {
331+ dirty . push ( data . pageParams [ i ] ?? i * pageSize )
332+ }
333+ }
334+ if ( dirty . length === 0 ) return
335+ await Promise . all (
336+ dirty . map ( async ( offset ) => {
337+ try {
338+ const fresh = await fetchTableRows ( {
339+ workspaceId,
340+ tableId,
341+ limit : pageSize ,
342+ offset,
343+ filter,
344+ sort,
345+ includeTotal : offset === 0 ,
346+ } )
347+ if ( cancelled ) return
348+ queryClient . setQueryData < InfiniteData < TableRowsResponse , number > > ( queryKey , ( prev ) => {
349+ if ( ! prev ) return prev
350+ const idx = prev . pageParams . indexOf ( offset )
351+ if ( idx === - 1 ) return prev
352+ const nextPages = prev . pages . slice ( )
353+ nextPages [ idx ] = fresh
354+ return { ...prev , pages : nextPages }
355+ } )
356+ } catch {
357+ // Transient fetch failure — next tick retries. Don't kill the loop.
358+ }
359+ } )
360+ )
361+ }
362+ const intervalId = setInterval ( ( ) => void tick ( ) , ROWS_POLL_INTERVAL_WHILE_RUNNING_MS )
363+ return ( ) => {
364+ cancelled = true
365+ clearInterval ( intervalId )
366+ }
367+ } , [ enabled , workspaceId , tableId , pageSize , filter , sort , queryClient , queryKey ] )
368+
369+ return query
334370}
335371
336372/**
@@ -1176,6 +1212,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
11761212 onMutate : async ( { groupIds, runMode = 'all' , rowIds } ) => {
11771213 const targetRowIds = rowIds && rowIds . length > 0 ? new Set ( rowIds ) : null
11781214 const targetGroupIds = new Set ( groupIds )
1215+ const groups =
1216+ queryClient . getQueryData < TableDefinition > ( tableKeys . detail ( tableId ) ) ?. schema
1217+ . workflowGroups ?? [ ]
1218+ const groupsById = new Map ( groups . map ( ( g ) => [ g . id , g ] ) )
11791219 const snapshots = await snapshotAndMutateRows ( queryClient , tableId , ( r ) => {
11801220 if ( targetRowIds && ! targetRowIds . has ( r . id ) ) return null
11811221 const executions = r . executions ?? { }
@@ -1184,7 +1224,15 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
11841224 for ( const groupId of targetGroupIds ) {
11851225 const exec = executions [ groupId ] as RowExecutionMetadata | undefined
11861226 if ( isOptimisticInFlight ( exec ) ) continue
1187- if ( runMode === 'incomplete' && exec ?. status === 'completed' ) continue
1227+ // Mirror server eligibility for `mode: 'incomplete'`: skip cells whose
1228+ // outputs are filled, regardless of exec status. A cancelled/error
1229+ // cell with a leftover value from a prior run was rendering as filled
1230+ // but flipping to "queued" optimistically here even though the server
1231+ // would skip it.
1232+ if ( runMode === 'incomplete' ) {
1233+ const group = groupsById . get ( groupId )
1234+ if ( group && areOutputsFilled ( group , r ) ) continue
1235+ }
11881236 next [ groupId ] = buildPendingExec ( exec )
11891237 changed = true
11901238 }
0 commit comments