From df00f07fb782374247282b767f9da28aaf9178b7 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 7 May 2026 19:28:50 -0700 Subject: [PATCH 1/9] feat(table): live cell updates via SSE + per-table event buffer Replaces the polling-based row refetch with a push-based SSE stream that patches the React Query cache directly as cell-state events arrive. Architecture: - New per-table event buffer in apps/sim/lib/table/events.ts. Redis sorted-set with monotonic eventId, 1h TTL, 5000-event cap, in-memory fallback. Modeled after apps/sim/lib/execution/event-buffer.ts but stripped of complexity tables don't need (no per-execution lifecycle, no id-batching, no write queue serialization). ~150 lines instead of 700. - writeWorkflowGroupState appends a fat event after each successful 'wrote'. Status transitions carry executionId + jobId; terminal/partial transitions also include the new output values inline so the client can patch row data without a follow-up refetch. - New SSE route at /api/table/[tableId]/events/stream?from=. Replays from buffer on connect, polls at 500ms (mirrors workflow execution stream), heartbeat every 15s, signals 'pruned' if the caller fell off the back of the buffer. - Client hook useTableEventStream subscribes via EventSource. Reconnect-resume with last-seen eventId. On 'pruned', invalidates the rows query and resumes from the new earliest. Cache patches walk every cached query under rowsRoot(tableId) so filter/sort variants all stay live. - Removes refetchInterval from useTableRows and the per-page polling effect from useInfiniteTableRows. React Query's refetchOnWindowFocus + refetchOnReconnect cover the durability gap if any push is dropped. Out of scope: - Bulk-cancel events (cancellation path is being redesigned separately). - Generalizing the workflow event-buffer module to a shared primitive (defer until a third use case appears; for now the table buffer is the simpler cousin of the workflow one). --- .../table/[tableId]/events/stream/route.ts | 161 +++++++++++ .../tables/[tableId]/hooks/index.ts | 1 + .../[tableId]/hooks/use-table-event-stream.ts | 269 ++++++++++++++++++ .../[workspaceId]/tables/[tableId]/table.tsx | 6 +- apps/sim/hooks/queries/tables.ts | 97 +------ apps/sim/lib/api/contracts/tables.ts | 18 ++ apps/sim/lib/table/cell-write.ts | 26 ++ apps/sim/lib/table/events.ts | 258 +++++++++++++++++ scripts/check-api-validation-contracts.ts | 4 +- 9 files changed, 742 insertions(+), 98 deletions(-) create mode 100644 apps/sim/app/api/table/[tableId]/events/stream/route.ts create mode 100644 apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts create mode 100644 apps/sim/lib/table/events.ts diff --git a/apps/sim/app/api/table/[tableId]/events/stream/route.ts b/apps/sim/app/api/table/[tableId]/events/stream/route.ts new file mode 100644 index 0000000000..f5b2042ae7 --- /dev/null +++ b/apps/sim/app/api/table/[tableId]/events/stream/route.ts @@ -0,0 +1,161 @@ +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { sleep } from '@sim/utils/helpers' +import { type NextRequest, NextResponse } from 'next/server' +import { tableEventStreamContract } from '@/lib/api/contracts/tables' +import { parseRequest } from '@/lib/api/server' +import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { generateRequestId } from '@/lib/core/utils/request' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { readTableEventsSince, type TableEventEntry } from '@/lib/table/events' +import { accessError, checkAccess } from '@/app/api/table/utils' + +const logger = createLogger('TableEventStreamAPI') + +const POLL_INTERVAL_MS = 500 +const HEARTBEAT_INTERVAL_MS = 15_000 +const MAX_STREAM_DURATION_MS = 4 * 60 * 60 * 1000 // 4 hours; client reconnects past this + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +interface RouteContext { + params: Promise<{ tableId: string }> +} + +/** GET /api/table/[tableId]/events/stream?from= + * + * SSE stream of cell-state transitions. Replay-on-reconnect via `from`. + * Pruning (buffer cap exceeded or TTL expired) sends a `pruned` event and + * closes; the client responds with a full row-query refetch and reconnects + * from the new earliest. */ +export const GET = withRouteHandler(async (req: NextRequest, context: RouteContext) => { + const requestId = generateRequestId() + const parsed = await parseRequest(tableEventStreamContract, req, context) + if (!parsed.success) return parsed.response + const { tableId } = parsed.data.params + const { from: fromEventId } = parsed.data.query + + const auth = await checkSessionOrInternalAuth(req, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) + } + + const access = await checkAccess(tableId, auth.userId, 'read') + if (!access.ok) return accessError(access, requestId, tableId) + + logger.info(`[${requestId}] Table event stream opened`, { tableId, fromEventId }) + + const encoder = new TextEncoder() + let closed = false + + const stream = new ReadableStream({ + async start(controller) { + let lastEventId = fromEventId + const deadline = Date.now() + MAX_STREAM_DURATION_MS + let nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS + + const enqueue = (text: string) => { + if (closed) return + try { + controller.enqueue(encoder.encode(text)) + } catch { + closed = true + } + } + + const sendEvents = (events: TableEventEntry[]) => { + for (const entry of events) { + if (closed) return + enqueue(`data: ${JSON.stringify(entry)}\n\n`) + lastEventId = entry.eventId + } + } + + const sendPrunedAndClose = (earliestEventId: number | undefined) => { + enqueue( + `event: pruned\ndata: ${JSON.stringify({ earliestEventId: earliestEventId ?? null })}\n\n` + ) + if (!closed) { + closed = true + try { + controller.close() + } catch {} + } + } + + const sendHeartbeat = () => { + // SSE comment line — keeps proxies (ALB default 60s idle) from closing + // the connection during quiet periods. + enqueue(`: ping ${Date.now()}\n\n`) + } + + try { + // Initial replay from buffer. + const initial = await readTableEventsSince(tableId, lastEventId) + if (initial.status === 'pruned') { + sendPrunedAndClose(initial.earliestEventId) + return + } + if (initial.status === 'unavailable') { + throw new Error(`Table event buffer unavailable: ${initial.error}`) + } + sendEvents(initial.events) + + // Stream loop — poll the buffer and forward new events. Workflow + // execution stream uses the same shape; pub/sub wakeups are an + // optimization we can add later if 500ms polling becomes a problem. + while (!closed && Date.now() < deadline) { + await sleep(POLL_INTERVAL_MS) + if (closed) return + + const result = await readTableEventsSince(tableId, lastEventId) + if (result.status === 'pruned') { + sendPrunedAndClose(result.earliestEventId) + return + } + if (result.status === 'unavailable') { + throw new Error(`Table event buffer unavailable: ${result.error}`) + } + if (result.events.length > 0) { + sendEvents(result.events) + } + + if (Date.now() >= nextHeartbeatAt) { + sendHeartbeat() + nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS + } + } + + // Reached the defensive duration ceiling — close cleanly so the client + // reconnects with the latest lastEventId. + if (!closed) { + enqueue(`event: rotate\ndata: {}\n\n`) + closed = true + try { + controller.close() + } catch {} + } + } catch (error) { + logger.error(`[${requestId}] Table event stream error`, { + tableId, + error: toError(error).message, + }) + if (!closed) { + try { + controller.error(error) + } catch {} + } + } + }, + cancel() { + closed = true + logger.info(`[${requestId}] Client disconnected from table event stream`, { tableId }) + }, + }) + + return new NextResponse(stream, { + headers: { ...SSE_HEADERS, 'X-Table-Id': tableId }, + }) +}) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts index d207594366..76addf44c4 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts @@ -1,2 +1,3 @@ export * from './use-context-menu' export * from './use-table' +export * from './use-table-event-stream' diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts new file mode 100644 index 0000000000..be58bb13f8 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -0,0 +1,269 @@ +'use client' + +import { useEffect, useRef } from 'react' +import { createLogger } from '@sim/logger' +import { useQueryClient } from '@tanstack/react-query' +import { tableKeys, type TableRowsResponse } from '@/hooks/queries/tables' +import type { RowData, RowExecutionMetadata, RowExecutions, TableRow } from '@/lib/table' + +const logger = createLogger('useTableEventStream') + +/** Mirrors the server-side `TableCellStatus` from `apps/sim/lib/table/events.ts`. */ +type TableCellStatus = 'pending' | 'queued' | 'running' | 'completed' | 'cancelled' | 'error' + +interface TableCellEvent { + kind: 'cell' + tableId: string + rowId: string + groupId: string + status: TableCellStatus + executionId: string | null + jobId: string | null + error: string | null + outputs?: Record +} + +interface TableEventEntry { + eventId: number + tableId: string + event: TableCellEvent +} + +interface PrunedEvent { + earliestEventId: number | null +} + +const RECONNECT_BACKOFF_MS = [500, 1_000, 2_000, 5_000, 10_000] + +interface UseTableEventStreamArgs { + tableId: string | undefined + workspaceId: string | undefined + enabled?: boolean +} + +/** + * Subscribes to the table's SSE event stream and patches the React Query + * cache as cell-state events arrive. Replaces polling — once the page mounts, + * cells flip in <100ms via push instead of waiting for the next poll tick. + * + * Reconnect-resume: on transport error, the hook reconnects with `from=` set + * to the last seen `eventId`; the server replays anything missed from the + * Redis-backed buffer. If the buffer has rolled past the gap (server returns + * a `pruned` event), the hook full-refetches the row queries and resumes + * streaming from the new earliest. + * + * Returns nothing — the only side effect is keeping the cache live. Cleans + * up the EventSource on unmount or argument change. + */ +export function useTableEventStream({ + tableId, + workspaceId, + enabled = true, +}: UseTableEventStreamArgs): void { + const queryClient = useQueryClient() + + // Refs so the long-lived stream loop reads current values without forcing + // effect re-subscription on every render. + const lastEventIdRef = useRef(0) + const reconnectAttemptRef = useRef(0) + + useEffect(() => { + if (!enabled || !tableId || !workspaceId) return + + let cancelled = false + let eventSource: EventSource | null = null + let reconnectTimer: ReturnType | null = null + // Reset the dedupe cursor on every fresh mount so a remount after + // navigation doesn't accidentally skip events from a prior session. + lastEventIdRef.current = 0 + reconnectAttemptRef.current = 0 + + const patchRow = (entry: TableEventEntry): void => { + const { rowId, groupId, status, executionId, jobId, error, outputs } = entry.event + const nextExec: RowExecutionMetadata = { + status, + executionId: executionId ?? null, + jobId: jobId ?? null, + // workflowId is required by the type but not in the SSE payload — we + // preserve any prior value via the merge below; if there's no prior + // value, the empty string is overwritten on the next refetch. + workflowId: '', + error: error ?? null, + } + + const queries = queryClient.getQueriesData({ + queryKey: tableKeys.rowsRoot(tableId), + }) + for (const [queryKey, data] of queries) { + if (!data) continue + const patched = patchCacheEntry(data, rowId, groupId, nextExec, outputs) + if (patched !== data) { + queryClient.setQueryData(queryKey, patched) + } + } + } + + const handlePrune = (payload: PrunedEvent): void => { + logger.info('Table event buffer pruned — full refetch', { tableId, ...payload }) + void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) + // Resume streaming from the new earliest. The next reconnect picks + // this up via lastEventIdRef.current. + if (typeof payload.earliestEventId === 'number') { + lastEventIdRef.current = payload.earliestEventId + } else { + lastEventIdRef.current = 0 + } + } + + const scheduleReconnect = (): void => { + if (cancelled) return + const attempt = Math.min(reconnectAttemptRef.current, RECONNECT_BACKOFF_MS.length - 1) + const delay = RECONNECT_BACKOFF_MS[attempt] + reconnectAttemptRef.current++ + reconnectTimer = setTimeout(() => { + reconnectTimer = null + connect() + }, delay) + } + + const connect = (): void => { + if (cancelled) return + const url = `/api/table/${tableId}/events/stream?from=${lastEventIdRef.current}` + try { + eventSource = new EventSource(url) + } catch (err) { + logger.warn('Failed to open table event stream', { tableId, err }) + scheduleReconnect() + return + } + + eventSource.onopen = () => { + reconnectAttemptRef.current = 0 + } + + eventSource.onmessage = (msg: MessageEvent) => { + try { + const entry = JSON.parse(msg.data) as TableEventEntry + if (entry.event?.kind !== 'cell') return + if (entry.eventId <= lastEventIdRef.current) return + lastEventIdRef.current = entry.eventId + patchRow(entry) + } catch (err) { + logger.warn('Failed to parse table event', { tableId, err }) + } + } + + eventSource.addEventListener('pruned', (msg: MessageEvent) => { + try { + handlePrune(JSON.parse(msg.data) as PrunedEvent) + } catch { + handlePrune({ earliestEventId: null }) + } + }) + + eventSource.addEventListener('rotate', () => { + // Server hit its defensive duration ceiling — close + reconnect. + eventSource?.close() + eventSource = null + scheduleReconnect() + }) + + eventSource.onerror = () => { + if (cancelled) return + eventSource?.close() + eventSource = null + scheduleReconnect() + } + } + + connect() + + return () => { + cancelled = true + if (reconnectTimer !== null) clearTimeout(reconnectTimer) + eventSource?.close() + eventSource = null + } + }, [enabled, tableId, workspaceId, queryClient]) +} + +/** + * Returns a new cache entry with the given row's executions/data patched, or + * the original reference if the row isn't in this entry. Handles both + * single-page (`useTableRows`) and infinite (`useInfiniteTableRows`) shapes. + * + * Within a page we only allocate a new row object when it actually changes; + * unchanged rows keep their reference so memoized `` short-circuits. + */ +function patchCacheEntry( + entry: unknown, + rowId: string, + groupId: string, + nextExec: RowExecutionMetadata, + outputs: Record | undefined +): unknown { + if (isInfiniteCache(entry)) { + let touched = false + const nextPages = entry.pages.map((page) => { + const nextRows = patchRows(page.rows, rowId, groupId, nextExec, outputs) + if (nextRows === page.rows) return page + touched = true + return { ...page, rows: nextRows } + }) + if (!touched) return entry + return { ...entry, pages: nextPages } + } + if (isSinglePage(entry)) { + const nextRows = patchRows(entry.rows, rowId, groupId, nextExec, outputs) + if (nextRows === entry.rows) return entry + return { ...entry, rows: nextRows } + } + return entry +} + +function patchRows( + rows: TableRow[], + rowId: string, + groupId: string, + nextExec: RowExecutionMetadata, + outputs: Record | undefined +): TableRow[] { + let touched = false + const next = rows.map((row) => { + if (row.id !== rowId) return row + const prevExec = row.executions?.[groupId] + // Preserve the prior workflowId — the SSE payload doesn't carry it but + // the cache row may already have it from the page query. + const mergedExec: RowExecutionMetadata = { + ...nextExec, + workflowId: prevExec?.workflowId ?? nextExec.workflowId, + } + const nextExecutions: RowExecutions = { ...(row.executions ?? {}), [groupId]: mergedExec } + const nextData: RowData = outputs + ? ({ ...row.data, ...outputs } as RowData) + : row.data + touched = true + return { ...row, executions: nextExecutions, data: nextData } + }) + return touched ? next : rows +} + +interface InfiniteCache { + pages: TableRowsResponse[] + pageParams: number[] +} + +function isInfiniteCache(value: unknown): value is InfiniteCache { + return ( + typeof value === 'object' && + value !== null && + Array.isArray((value as InfiniteCache).pages) && + Array.isArray((value as InfiniteCache).pageParams) + ) +} + +function isSinglePage(value: unknown): value is TableRowsResponse { + return ( + typeof value === 'object' && value !== null && Array.isArray((value as TableRowsResponse).rows) + ) +} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx index e9fea357e6..cded289720 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx @@ -50,7 +50,7 @@ import { } from './components' import { COLUMN_SIDEBAR_WIDTH } from './components/table-grid/constants' import { COLUMN_TYPE_ICONS } from './components/table-grid/headers' -import { useTable } from './hooks' +import { useTable, useTableEventStream } from './hooks' import type { QueryOptions } from './types' import { generateColumnName } from './utils' @@ -117,6 +117,10 @@ export function Table({ const workspaceId = propWorkspaceId || (params.workspaceId as string) const tableId = propTableId || (params.tableId as string) + // Subscribe to per-cell SSE events for this table. Patches the row cache + // as transitions arrive — replaces polling for live updates. + useTableEventStream({ tableId, workspaceId }) + const [slideout, dispatch] = useReducer(slideoutReducer, { kind: 'none' }) const [showDeleteTableConfirm, setShowDeleteTableConfirm] = useState(false) const [isImportCsvOpen, setIsImportCsvOpen] = useState(false) diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index e2cf11ac08..3ba2d62b9f 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -4,7 +4,7 @@ * React Query hooks for managing user-defined tables. */ -import { useEffect, useMemo } from 'react' +import { useMemo } from 'react' import { createLogger } from '@sim/logger' import { type InfiniteData, @@ -71,20 +71,6 @@ import type { } from '@/lib/table' import { areOutputsFilled, optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps' -/** Short poll to surface running → completed transitions from the server without a dedicated realtime channel. */ -const ROWS_POLL_INTERVAL_WHILE_RUNNING_MS = 1500 - -function hasRunningGroupExecution(rows: TableRow[] | undefined): boolean { - if (!rows) return false - for (const row of rows) { - const executions = row.executions ?? {} - for (const key in executions) { - if (isOptimisticInFlight(executions[key])) return true - } - } - return false -} - /** * Shallow-equality on the fields the renderer reads from a row. Row data is * also shallow-compared — workflow output cells are scalars, so `===` per key @@ -326,13 +312,6 @@ export function useTableRows({ enabled: Boolean(workspaceId && tableId) && enabled, staleTime: 30 * 1000, placeholderData: keepPreviousData, - refetchInterval: (query) => { - if (queryClient.isMutating() > 0) return false - return hasRunningGroupExecution(query.state.data?.rows) - ? ROWS_POLL_INTERVAL_WHILE_RUNNING_MS - : false - }, - refetchIntervalInBackground: false, }) } @@ -359,7 +338,7 @@ export function useInfiniteTableRows({ }) const queryKey = useMemo(() => tableKeys.infiniteRows(tableId, paramsKey), [tableId, paramsKey]) - const query = useInfiniteQuery({ + return useInfiniteQuery({ queryKey, queryFn: ({ pageParam, signal }) => fetchTableRows({ @@ -380,78 +359,6 @@ export function useInfiniteTableRows({ enabled: Boolean(workspaceId && tableId) && enabled, staleTime: 30 * 1000, }) - - /** - * Per-page polling. Built-in `refetchInterval` would refetch every loaded - * page on each tick — wasteful when only one page has running cells. - * Instead, walk pages each tick and refetch ONLY the dirty ones, splicing - * results back into the cache. Polling stops when no page has in-flight - * cells, or while a mutation is running (optimistic-update guard). - */ - useEffect(() => { - if (!enabled || !workspaceId || !tableId) return - let cancelled = false - let timeoutId: ReturnType | null = null - const tick = async () => { - if (cancelled) return - if (queryClient.isMutating() === 0) { - const data = queryClient.getQueryData>(queryKey) - const dirty: number[] = [] - if (data) { - for (let i = 0; i < data.pages.length; i++) { - if (hasRunningGroupExecution(data.pages[i].rows)) { - dirty.push(data.pageParams[i] ?? i * pageSize) - } - } - } - if (dirty.length > 0) { - await Promise.all( - dirty.map(async (offset) => { - try { - const fresh = await fetchTableRows({ - workspaceId, - tableId, - limit: pageSize, - offset, - filter, - sort, - includeTotal: offset === 0, - }) - if (cancelled) return - queryClient.setQueryData>( - queryKey, - (prev) => { - if (!prev) return prev - const idx = prev.pageParams.indexOf(offset) - if (idx === -1) return prev - const merged = mergePagePreservingIdentity(prev.pages[idx], fresh) - if (merged === prev.pages[idx]) return prev - const nextPages = prev.pages.slice() - nextPages[idx] = merged - return { ...prev, pages: nextPages } - } - ) - } catch { - // Transient fetch failure — next tick retries. Don't kill the loop. - } - }) - ) - } - } - if (cancelled) return - // Recursive setTimeout instead of setInterval so a slow tick can't - // overlap the next one — out-of-order responses would otherwise let - // stale data overwrite fresh. - timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS) - } - timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS) - return () => { - cancelled = true - if (timeoutId !== null) clearTimeout(timeoutId) - } - }, [enabled, workspaceId, tableId, pageSize, filter, sort, queryClient, queryKey]) - - return query } /** diff --git a/apps/sim/lib/api/contracts/tables.ts b/apps/sim/lib/api/contracts/tables.ts index 5c7b467e8f..e9cb743dc2 100644 --- a/apps/sim/lib/api/contracts/tables.ts +++ b/apps/sim/lib/api/contracts/tables.ts @@ -912,3 +912,21 @@ export type RunColumnBodyInput = z.input /** Shared `runMode` union — used by every UI / hook / Mothership site that * builds a run-column payload. Single source of truth for the literal pair. */ export type RunMode = NonNullable + +export const tableEventStreamQuerySchema = z.object({ + from: z.preprocess((value) => { + if (typeof value !== 'string') return 0 + const parsed = Number.parseInt(value, 10) + return Number.isFinite(parsed) && parsed >= 0 ? parsed : 0 + }, z.number().int().min(0)), +}) + +export const tableEventStreamContract = defineRouteContract({ + method: 'GET', + path: '/api/table/[tableId]/events/stream', + params: tableIdParamsSchema, + query: tableEventStreamQuerySchema, + response: { + mode: 'stream', + }, +}) diff --git a/apps/sim/lib/table/cell-write.ts b/apps/sim/lib/table/cell-write.ts index 932b2d136a..120777ef29 100644 --- a/apps/sim/lib/table/cell-write.ts +++ b/apps/sim/lib/table/cell-write.ts @@ -10,6 +10,8 @@ */ import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { appendTableEvent } from '@/lib/table/events' import type { RowData, RowExecutionMetadata, RowExecutions, WorkflowGroup } from '@/lib/table/types' const logger = createLogger('WorkflowCellWrite') @@ -113,6 +115,30 @@ export async function writeWorkflowGroupState( ) return 'skipped' } + + // Append to the table event buffer so live SSE consumers see the + // transition. Fire-and-forget — appendTableEvent never throws, but the + // try/catch is defensive: a Redis blip must not fail a successful DB write. + try { + const dataPatch = payload.dataPatch + const hasOutputs = dataPatch && Object.keys(dataPatch).length > 0 + void appendTableEvent({ + kind: 'cell', + tableId, + rowId, + groupId, + status: payload.executionState.status, + executionId: payload.executionState.executionId ?? null, + jobId: payload.executionState.jobId ?? null, + error: payload.executionState.error ?? null, + ...(hasOutputs ? { outputs: dataPatch } : {}), + }) + } catch (err) { + logger.warn(`appendTableEvent failed for table=${tableId} row=${rowId} group=${groupId}`, { + error: toError(err).message, + }) + } + return 'wrote' } diff --git a/apps/sim/lib/table/events.ts b/apps/sim/lib/table/events.ts new file mode 100644 index 0000000000..43edca6595 --- /dev/null +++ b/apps/sim/lib/table/events.ts @@ -0,0 +1,258 @@ +/** + * Per-table event buffer for live cell-state updates. + * + * The grid subscribes to a per-table SSE stream and patches its React Query + * cache as events arrive. This buffer is the durable mid-tier between the + * cell-write paths (`writeWorkflowGroupState`, `cancelWorkflowGroupRuns`) and + * the SSE consumers — every status transition appends here with a monotonic + * eventId; SSE clients resume on reconnect via `?from=` and the + * server replays from this buffer. + * + * Modeled after `apps/sim/lib/execution/event-buffer.ts` but stripped of + * complexity tables don't need: no per-execution lifecycle, no id reservation + * batching, no write-queue serialization. Tables are always-on; cell writes + * are sparse and independent. + */ + +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { env } from '@/lib/core/config/env' +import { getRedisClient } from '@/lib/core/config/redis' + +const logger = createLogger('TableEventBuffer') + +const REDIS_PREFIX = 'table:stream:' +export const TABLE_EVENT_TTL_SECONDS = 60 * 60 // 1 hour +export const TABLE_EVENT_CAP = 5000 + +/** + * Atomic flush: ZADD the new entry, refresh TTL on events + seq + meta keys, + * trim the front of the sorted set to enforce the cap, then update the meta + * `earliestEventId` to whatever the front of the set now is. Without the + * Lua script, a slow reader could observe the trim before the meta update + * and incorrectly think pruning hadn't happened. + * + * KEYS[1] = events sorted set key + * KEYS[2] = seq counter key (only EXPIRE'd here; INCR happens before EVAL) + * KEYS[3] = meta hash key + * ARGV[1] = TTL seconds + * ARGV[2] = cap (max events retained) + * ARGV[3] = updatedAt ISO string + * ARGV[4] = eventId (numeric, used as ZADD score) + * ARGV[5] = entry JSON + */ +const APPEND_EVENT_SCRIPT = ` +redis.call('ZADD', KEYS[1], ARGV[4], ARGV[5]) +redis.call('EXPIRE', KEYS[1], tonumber(ARGV[1])) +redis.call('EXPIRE', KEYS[2], tonumber(ARGV[1])) +redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -tonumber(ARGV[2]) - 1) +local oldest = redis.call('ZRANGE', KEYS[1], 0, 0, 'WITHSCORES') +if oldest[2] then + redis.call('HSET', KEYS[3], 'earliestEventId', tostring(math.floor(tonumber(oldest[2]))), 'updatedAt', ARGV[3]) + redis.call('EXPIRE', KEYS[3], tonumber(ARGV[1])) +end +return oldest[2] or false +` + +function getEventsKey(tableId: string) { + return `${REDIS_PREFIX}${tableId}:events` +} + +function getSeqKey(tableId: string) { + return `${REDIS_PREFIX}${tableId}:seq` +} + +function getMetaKey(tableId: string) { + return `${REDIS_PREFIX}${tableId}:meta` +} + +export type TableCellStatus = + | 'pending' + | 'queued' + | 'running' + | 'completed' + | 'cancelled' + | 'error' + +export interface TableEvent { + kind: 'cell' + tableId: string + rowId: string + groupId: string + status: TableCellStatus + executionId: string | null + jobId: string | null + error: string | null + /** + * Present when this transition wrote new output values; absent on + * pure-status transitions (queued, running, cancelled). The publisher + * already has these in hand from the same updateRow call that wrote DB. + */ + outputs?: Record +} + +export interface TableEventEntry { + eventId: number + tableId: string + event: TableEvent +} + +export type TableEventsReadResult = + | { status: 'ok'; events: TableEventEntry[] } + | { status: 'pruned'; earliestEventId: number | undefined } + | { status: 'unavailable'; error: string } + +/** In-memory fallback for dev/tests when Redis isn't configured. */ +interface MemoryTableStream { + events: TableEventEntry[] + earliestEventId?: number + nextEventId: number + expiresAt: number +} + +const memoryTableStreams = new Map() + +function canUseMemoryBuffer(): boolean { + return typeof window === 'undefined' && !env.REDIS_URL +} + +function pruneExpiredMemoryStreams(now = Date.now()): void { + for (const [tableId, stream] of memoryTableStreams) { + if (stream.expiresAt <= now) { + memoryTableStreams.delete(tableId) + } + } +} + +function getMemoryStream(tableId: string): MemoryTableStream { + pruneExpiredMemoryStreams() + let stream = memoryTableStreams.get(tableId) + if (!stream) { + stream = { + events: [], + nextEventId: 1, + expiresAt: Date.now() + TABLE_EVENT_TTL_SECONDS * 1000, + } + memoryTableStreams.set(tableId, stream) + } + return stream +} + +function appendMemory(event: TableEvent): TableEventEntry { + const stream = getMemoryStream(event.tableId) + const entry: TableEventEntry = { + eventId: stream.nextEventId++, + tableId: event.tableId, + event, + } + stream.events.push(entry) + if (stream.events.length > TABLE_EVENT_CAP) { + stream.events = stream.events.slice(-TABLE_EVENT_CAP) + stream.earliestEventId = stream.events[0]?.eventId + } + stream.expiresAt = Date.now() + TABLE_EVENT_TTL_SECONDS * 1000 + return entry +} + +function readMemory(tableId: string, afterEventId: number): TableEventsReadResult { + pruneExpiredMemoryStreams() + const stream = memoryTableStreams.get(tableId) + if (!stream) return { status: 'ok', events: [] } + if (stream.earliestEventId !== undefined && afterEventId + 1 < stream.earliestEventId) { + return { status: 'pruned', earliestEventId: stream.earliestEventId } + } + return { + status: 'ok', + events: stream.events.filter((entry) => entry.eventId > afterEventId), + } +} + +/** + * Append an event to the table's buffer. Fire-and-forget from the caller — + * this never throws, returns null on failure. A Redis blip must not fail a + * cell-write. + */ +export async function appendTableEvent(event: TableEvent): Promise { + const redis = getRedisClient() + if (!redis) { + if (canUseMemoryBuffer()) { + try { + return appendMemory(event) + } catch (error) { + logger.warn('appendTableEvent: memory append failed', { + tableId: event.tableId, + error: toError(error).message, + }) + return null + } + } + return null + } + try { + const eventId = await redis.incr(getSeqKey(event.tableId)) + const entry: TableEventEntry = { eventId, tableId: event.tableId, event } + await redis.eval( + APPEND_EVENT_SCRIPT, + 3, + getEventsKey(event.tableId), + getSeqKey(event.tableId), + getMetaKey(event.tableId), + TABLE_EVENT_TTL_SECONDS, + TABLE_EVENT_CAP, + new Date().toISOString(), + eventId, + JSON.stringify(entry) + ) + return entry + } catch (error) { + logger.warn('appendTableEvent: Redis append failed', { + tableId: event.tableId, + error: toError(error).message, + }) + return null + } +} + +/** + * Read events for a table where eventId > afterEventId. Returns 'pruned' if + * the caller has fallen off the back of the buffer (TTL expired or cap rolled + * past their lastEventId). Caller should respond by full-refetching from DB + * and resuming streaming from the new earliestEventId. + */ +export async function readTableEventsSince( + tableId: string, + afterEventId: number +): Promise { + const redis = getRedisClient() + if (!redis) { + if (canUseMemoryBuffer()) { + return readMemory(tableId, afterEventId) + } + return { status: 'unavailable', error: 'Redis client unavailable' } + } + try { + const meta = await redis.hgetall(getMetaKey(tableId)) + const earliestEventId = + meta?.earliestEventId !== undefined ? Number(meta.earliestEventId) : undefined + if (earliestEventId !== undefined && afterEventId + 1 < earliestEventId) { + return { status: 'pruned', earliestEventId } + } + const raw = await redis.zrangebyscore(getEventsKey(tableId), afterEventId + 1, '+inf') + return { + status: 'ok', + events: raw + .map((entry) => { + try { + return JSON.parse(entry) as TableEventEntry + } catch { + return null + } + }) + .filter((entry): entry is TableEventEntry => Boolean(entry)), + } + } catch (error) { + const message = toError(error).message + logger.warn('readTableEventsSince failed', { tableId, error: message }) + return { status: 'unavailable', error: message } + } +} diff --git a/scripts/check-api-validation-contracts.ts b/scripts/check-api-validation-contracts.ts index bbf0d0f249..3045606207 100644 --- a/scripts/check-api-validation-contracts.ts +++ b/scripts/check-api-validation-contracts.ts @@ -9,8 +9,8 @@ const QUERY_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/queries') const SELECTOR_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/selectors') const BASELINE = { - totalRoutes: 734, - zodRoutes: 734, + totalRoutes: 735, + zodRoutes: 735, nonZodRoutes: 0, } as const From 56ac40cc154cf7c449b65c9487d11769becf1649 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 7 May 2026 19:51:38 -0700 Subject: [PATCH 2/9] fix(table): drop run-mutation refetch so SSE patches aren't overwritten MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit useRunColumn.onSettled was canceling in-flight queries and invalidating the rows query — leftover behavior from the polling era. With the SSE stream now keeping the cache live via incremental patches, this refetch races the stream and snaps the cache back to whatever DB shows at the refetch moment, which can lag the just-arrived queued/running events. Cells appeared stuck on the optimistic 'pending' even though the SSE was delivering the real transitions. --- apps/sim/hooks/queries/tables.ts | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index 3ba2d62b9f..a8525038b7 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -1236,14 +1236,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) { onError: (_err, _variables, context) => { if (context?.snapshots) restoreCachedWorkflowCells(queryClient, context.snapshots) }, - onSettled: async () => { - // Cancel any in-flight poll first — without this, a poll started during - // the mutation but lands AFTER it resolves can clobber the optimistic - // patch with stale data, producing a queued → cancelled → queued flicker - // before the authoritative refetch arrives. - await queryClient.cancelQueries({ queryKey: tableKeys.rowsRoot(tableId) }) - await queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) - }, + // No onSettled refetch — useTableEventStream keeps the cache live via SSE. + // A post-mutation refetch here would race the stream's incremental patches + // and snap the cache back to a DB snapshot, losing the just-arrived + // status transitions. }) } From 64197b57ffd23d956e351716a45ac42a8d7012fa Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 7 May 2026 20:01:56 -0700 Subject: [PATCH 3/9] =?UTF-8?q?chore(table):=20simplify=20SSE=20plumbing?= =?UTF-8?q?=20=E2=80=94=20reuse=20helpers,=20drop=20dead=20polling=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reuse snapshotAndMutateRows for SSE cache patches instead of reimplementing the page-walk + cache-shape detection. Adds a {cancelInFlight: false} opt for the SSE caller (mutations still cancel as before). - Drop client-side type duplication in use-table-event-stream — import TableEvent and TableEventEntry from lib/table/events directly. - Drop the now-dead mergePagePreservingIdentity + rowEqual from tables.ts; their only caller was the polling effect that was removed earlier. - Drop the defensive try/catch around appendTableEvent in cell-write — the function is documented as never-throwing (returns null on failure). - Combine INCR + ZADD into one Lua eval in events.ts. Halves Redis RTT per cell-write. Lua returns the new eventId; the script splices it into the pre-built entry JSON. - Trim refs to plain let bindings inside the effect; trim stale comments referencing the old polling implementation. --- .../[tableId]/hooks/use-table-event-stream.ts | 212 ++++-------------- .../[workspaceId]/tables/[tableId]/table.tsx | 2 - apps/sim/hooks/queries/tables.ts | 89 +------- apps/sim/lib/table/cell-write.ts | 36 ++- apps/sim/lib/table/events.ts | 44 ++-- 5 files changed, 96 insertions(+), 287 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts index be58bb13f8..e31d541069 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -1,34 +1,14 @@ 'use client' -import { useEffect, useRef } from 'react' +import { useEffect } from 'react' import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' -import { tableKeys, type TableRowsResponse } from '@/hooks/queries/tables' -import type { RowData, RowExecutionMetadata, RowExecutions, TableRow } from '@/lib/table' +import { snapshotAndMutateRows, tableKeys } from '@/hooks/queries/tables' +import type { TableEvent, TableEventEntry } from '@/lib/table/events' +import type { RowData, RowExecutionMetadata, RowExecutions } from '@/lib/table' const logger = createLogger('useTableEventStream') -/** Mirrors the server-side `TableCellStatus` from `apps/sim/lib/table/events.ts`. */ -type TableCellStatus = 'pending' | 'queued' | 'running' | 'completed' | 'cancelled' | 'error' - -interface TableCellEvent { - kind: 'cell' - tableId: string - rowId: string - groupId: string - status: TableCellStatus - executionId: string | null - jobId: string | null - error: string | null - outputs?: Record -} - -interface TableEventEntry { - eventId: number - tableId: string - event: TableCellEvent -} - interface PrunedEvent { earliestEventId: number | null } @@ -43,17 +23,12 @@ interface UseTableEventStreamArgs { /** * Subscribes to the table's SSE event stream and patches the React Query - * cache as cell-state events arrive. Replaces polling — once the page mounts, - * cells flip in <100ms via push instead of waiting for the next poll tick. - * - * Reconnect-resume: on transport error, the hook reconnects with `from=` set - * to the last seen `eventId`; the server replays anything missed from the - * Redis-backed buffer. If the buffer has rolled past the gap (server returns - * a `pruned` event), the hook full-refetches the row queries and resumes - * streaming from the new earliest. + * cache as cell-state events arrive. * - * Returns nothing — the only side effect is keeping the cache live. Cleans - * up the EventSource on unmount or argument change. + * Reconnect-resume: on transport error, reconnects with `from=` set to the + * last seen `eventId`; server replays missed events from the Redis-backed + * buffer. If the gap exceeds buffer retention (server emits `pruned`), the + * hook full-refetches the row queries and resumes from the new earliest. */ export function useTableEventStream({ tableId, @@ -62,64 +37,55 @@ export function useTableEventStream({ }: UseTableEventStreamArgs): void { const queryClient = useQueryClient() - // Refs so the long-lived stream loop reads current values without forcing - // effect re-subscription on every render. - const lastEventIdRef = useRef(0) - const reconnectAttemptRef = useRef(0) - useEffect(() => { if (!enabled || !tableId || !workspaceId) return let cancelled = false let eventSource: EventSource | null = null let reconnectTimer: ReturnType | null = null - // Reset the dedupe cursor on every fresh mount so a remount after - // navigation doesn't accidentally skip events from a prior session. - lastEventIdRef.current = 0 - reconnectAttemptRef.current = 0 - - const patchRow = (entry: TableEventEntry): void => { - const { rowId, groupId, status, executionId, jobId, error, outputs } = entry.event - const nextExec: RowExecutionMetadata = { - status, - executionId: executionId ?? null, - jobId: jobId ?? null, - // workflowId is required by the type but not in the SSE payload — we - // preserve any prior value via the merge below; if there's no prior - // value, the empty string is overwritten on the next refetch. - workflowId: '', - error: error ?? null, - } - - const queries = queryClient.getQueriesData({ - queryKey: tableKeys.rowsRoot(tableId), - }) - for (const [queryKey, data] of queries) { - if (!data) continue - const patched = patchCacheEntry(data, rowId, groupId, nextExec, outputs) - if (patched !== data) { - queryClient.setQueryData(queryKey, patched) - } - } + let lastEventId = 0 + let reconnectAttempt = 0 + + const applyCell = (event: Extract): void => { + const { rowId, groupId, status, executionId, jobId, error, outputs } = event + void snapshotAndMutateRows( + queryClient, + tableId, + (row) => { + if (row.id !== rowId) return null + const prevExec = row.executions?.[groupId] + const nextExec: RowExecutionMetadata = { + status, + executionId: executionId ?? null, + jobId: jobId ?? null, + // Preserve workflowId from cache; SSE payload doesn't carry it. + workflowId: prevExec?.workflowId ?? '', + error: error ?? null, + } + const nextExecutions: RowExecutions = { + ...(row.executions ?? {}), + [groupId]: nextExec, + } + const nextData: RowData = outputs + ? ({ ...row.data, ...outputs } as RowData) + : row.data + return { ...row, executions: nextExecutions, data: nextData } + }, + { cancelInFlight: false } + ) } const handlePrune = (payload: PrunedEvent): void => { logger.info('Table event buffer pruned — full refetch', { tableId, ...payload }) void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) - // Resume streaming from the new earliest. The next reconnect picks - // this up via lastEventIdRef.current. - if (typeof payload.earliestEventId === 'number') { - lastEventIdRef.current = payload.earliestEventId - } else { - lastEventIdRef.current = 0 - } + lastEventId = typeof payload.earliestEventId === 'number' ? payload.earliestEventId : 0 } const scheduleReconnect = (): void => { if (cancelled) return - const attempt = Math.min(reconnectAttemptRef.current, RECONNECT_BACKOFF_MS.length - 1) - const delay = RECONNECT_BACKOFF_MS[attempt] - reconnectAttemptRef.current++ + const idx = Math.min(reconnectAttempt, RECONNECT_BACKOFF_MS.length - 1) + const delay = RECONNECT_BACKOFF_MS[idx] + reconnectAttempt++ reconnectTimer = setTimeout(() => { reconnectTimer = null connect() @@ -128,7 +94,7 @@ export function useTableEventStream({ const connect = (): void => { if (cancelled) return - const url = `/api/table/${tableId}/events/stream?from=${lastEventIdRef.current}` + const url = `/api/table/${tableId}/events/stream?from=${lastEventId}` try { eventSource = new EventSource(url) } catch (err) { @@ -138,16 +104,16 @@ export function useTableEventStream({ } eventSource.onopen = () => { - reconnectAttemptRef.current = 0 + reconnectAttempt = 0 } eventSource.onmessage = (msg: MessageEvent) => { try { const entry = JSON.parse(msg.data) as TableEventEntry if (entry.event?.kind !== 'cell') return - if (entry.eventId <= lastEventIdRef.current) return - lastEventIdRef.current = entry.eventId - patchRow(entry) + if (entry.eventId <= lastEventId) return + lastEventId = entry.eventId + applyCell(entry.event) } catch (err) { logger.warn('Failed to parse table event', { tableId, err }) } @@ -162,7 +128,6 @@ export function useTableEventStream({ }) eventSource.addEventListener('rotate', () => { - // Server hit its defensive duration ceiling — close + reconnect. eventSource?.close() eventSource = null scheduleReconnect() @@ -186,84 +151,3 @@ export function useTableEventStream({ } }, [enabled, tableId, workspaceId, queryClient]) } - -/** - * Returns a new cache entry with the given row's executions/data patched, or - * the original reference if the row isn't in this entry. Handles both - * single-page (`useTableRows`) and infinite (`useInfiniteTableRows`) shapes. - * - * Within a page we only allocate a new row object when it actually changes; - * unchanged rows keep their reference so memoized `` short-circuits. - */ -function patchCacheEntry( - entry: unknown, - rowId: string, - groupId: string, - nextExec: RowExecutionMetadata, - outputs: Record | undefined -): unknown { - if (isInfiniteCache(entry)) { - let touched = false - const nextPages = entry.pages.map((page) => { - const nextRows = patchRows(page.rows, rowId, groupId, nextExec, outputs) - if (nextRows === page.rows) return page - touched = true - return { ...page, rows: nextRows } - }) - if (!touched) return entry - return { ...entry, pages: nextPages } - } - if (isSinglePage(entry)) { - const nextRows = patchRows(entry.rows, rowId, groupId, nextExec, outputs) - if (nextRows === entry.rows) return entry - return { ...entry, rows: nextRows } - } - return entry -} - -function patchRows( - rows: TableRow[], - rowId: string, - groupId: string, - nextExec: RowExecutionMetadata, - outputs: Record | undefined -): TableRow[] { - let touched = false - const next = rows.map((row) => { - if (row.id !== rowId) return row - const prevExec = row.executions?.[groupId] - // Preserve the prior workflowId — the SSE payload doesn't carry it but - // the cache row may already have it from the page query. - const mergedExec: RowExecutionMetadata = { - ...nextExec, - workflowId: prevExec?.workflowId ?? nextExec.workflowId, - } - const nextExecutions: RowExecutions = { ...(row.executions ?? {}), [groupId]: mergedExec } - const nextData: RowData = outputs - ? ({ ...row.data, ...outputs } as RowData) - : row.data - touched = true - return { ...row, executions: nextExecutions, data: nextData } - }) - return touched ? next : rows -} - -interface InfiniteCache { - pages: TableRowsResponse[] - pageParams: number[] -} - -function isInfiniteCache(value: unknown): value is InfiniteCache { - return ( - typeof value === 'object' && - value !== null && - Array.isArray((value as InfiniteCache).pages) && - Array.isArray((value as InfiniteCache).pageParams) - ) -} - -function isSinglePage(value: unknown): value is TableRowsResponse { - return ( - typeof value === 'object' && value !== null && Array.isArray((value as TableRowsResponse).rows) - ) -} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx index cded289720..91eeb93595 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx @@ -117,8 +117,6 @@ export function Table({ const workspaceId = propWorkspaceId || (params.workspaceId as string) const tableId = propTableId || (params.tableId as string) - // Subscribe to per-cell SSE events for this table. Patches the row cache - // as transitions arrive — replaces polling for live updates. useTableEventStream({ tableId, workspaceId }) const [slideout, dispatch] = useReducer(slideoutReducer, { kind: 'none' }) diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index a8525038b7..5690839837 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -71,77 +71,6 @@ import type { } from '@/lib/table' import { areOutputsFilled, optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps' -/** - * Shallow-equality on the fields the renderer reads from a row. Row data is - * also shallow-compared — workflow output cells are scalars, so `===` per key - * suffices. `executions` is a per-group exec metadata object; we compare each - * group's `(status, jobId, executionId, error)` tuple. Any deeper drift forces - * a fresh row reference, which is the safe default. - */ -function rowEqual(a: TableRow, b: TableRow): boolean { - if (a === b) return true - if (a.position !== b.position) return false - const aData = a.data ?? {} - const bData = b.data ?? {} - const aDataKeys = Object.keys(aData) - if (aDataKeys.length !== Object.keys(bData).length) return false - for (const k of aDataKeys) { - if (aData[k] !== bData[k]) return false - } - const aExec = a.executions ?? {} - const bExec = b.executions ?? {} - const aExecKeys = Object.keys(aExec) - if (aExecKeys.length !== Object.keys(bExec).length) return false - for (const k of aExecKeys) { - const ax = aExec[k] - const bx = bExec[k] - if (ax === bx) continue - if (!ax || !bx) return false - if ( - ax.status !== bx.status || - ax.jobId !== bx.jobId || - ax.executionId !== bx.executionId || - ax.error !== bx.error - ) { - return false - } - } - return true -} - -/** - * Replaces `prev.rows` element-by-element with `fresh.rows`, but reuses the - * `prev` reference for any row that hasn't changed. Memoized `` - * children short-circuit on row-identity, so a poll tick that arrives with - * 1000 rows but only flips the status of 5 only re-renders those 5 instead - * of every row in the page. - */ -function mergePagePreservingIdentity( - prev: TableRowsResponse, - fresh: TableRowsResponse -): TableRowsResponse { - if (prev.rows === fresh.rows) return prev - const oldById = new Map(prev.rows.map((r) => [r.id, r])) - let changed = false - const merged = fresh.rows.map((freshRow) => { - const old = oldById.get(freshRow.id) - if (old && rowEqual(old, freshRow)) return old - changed = true - return freshRow - }) - if (!changed && merged.length === prev.rows.length) { - let identical = true - for (let i = 0; i < merged.length; i++) { - if (merged[i] !== prev.rows[i]) { - identical = false - break - } - } - if (identical && prev.totalCount === fresh.totalCount) return prev - } - return { ...fresh, rows: merged } -} - const logger = createLogger('TableQueries') type TableQueryScope = 'active' | 'archived' | 'all' @@ -1101,12 +1030,19 @@ function isInfiniteRowsCache(value: unknown): value is InfiniteRowsCache { * row to write, or `null` to leave it. The common pattern is "matching cells * flip state, others are skipped". */ +/** Walks every cached query under `rowsRoot(tableId)` and applies `transform` + * to each row. Transform returns the new row or `null` to skip. Returns the + * list of [queryKey, prior data] entries so optimistic-update callers can + * roll back. SSE patchers can ignore the return value. */ export async function snapshotAndMutateRows( queryClient: ReturnType, tableId: string, - transform: (row: TableRow) => TableRow | null + transform: (row: TableRow) => TableRow | null, + options?: { cancelInFlight?: boolean } ): Promise { - await queryClient.cancelQueries({ queryKey: tableKeys.rowsRoot(tableId) }) + if (options?.cancelInFlight !== false) { + await queryClient.cancelQueries({ queryKey: tableKeys.rowsRoot(tableId) }) + } const matching = queryClient.getQueriesData({ queryKey: tableKeys.rowsRoot(tableId), }) @@ -1236,10 +1172,9 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) { onError: (_err, _variables, context) => { if (context?.snapshots) restoreCachedWorkflowCells(queryClient, context.snapshots) }, - // No onSettled refetch — useTableEventStream keeps the cache live via SSE. - // A post-mutation refetch here would race the stream's incremental patches - // and snap the cache back to a DB snapshot, losing the just-arrived - // status transitions. + // No reconciliation here — useTableEventStream is the source of truth for + // post-mutation cache state, and a refetch would race its incremental + // patches. }) } diff --git a/apps/sim/lib/table/cell-write.ts b/apps/sim/lib/table/cell-write.ts index 120777ef29..2007e1e76f 100644 --- a/apps/sim/lib/table/cell-write.ts +++ b/apps/sim/lib/table/cell-write.ts @@ -10,7 +10,6 @@ */ import { createLogger } from '@sim/logger' -import { toError } from '@sim/utils/errors' import { appendTableEvent } from '@/lib/table/events' import type { RowData, RowExecutionMetadata, RowExecutions, WorkflowGroup } from '@/lib/table/types' @@ -116,28 +115,19 @@ export async function writeWorkflowGroupState( return 'skipped' } - // Append to the table event buffer so live SSE consumers see the - // transition. Fire-and-forget — appendTableEvent never throws, but the - // try/catch is defensive: a Redis blip must not fail a successful DB write. - try { - const dataPatch = payload.dataPatch - const hasOutputs = dataPatch && Object.keys(dataPatch).length > 0 - void appendTableEvent({ - kind: 'cell', - tableId, - rowId, - groupId, - status: payload.executionState.status, - executionId: payload.executionState.executionId ?? null, - jobId: payload.executionState.jobId ?? null, - error: payload.executionState.error ?? null, - ...(hasOutputs ? { outputs: dataPatch } : {}), - }) - } catch (err) { - logger.warn(`appendTableEvent failed for table=${tableId} row=${rowId} group=${groupId}`, { - error: toError(err).message, - }) - } + const dataPatch = payload.dataPatch + const hasOutputs = dataPatch && Object.keys(dataPatch).length > 0 + void appendTableEvent({ + kind: 'cell', + tableId, + rowId, + groupId, + status: payload.executionState.status, + executionId: payload.executionState.executionId ?? null, + jobId: payload.executionState.jobId ?? null, + error: payload.executionState.error ?? null, + ...(hasOutputs ? { outputs: dataPatch } : {}), + }) return 'wrote' } diff --git a/apps/sim/lib/table/events.ts b/apps/sim/lib/table/events.ts index 43edca6595..689f70efed 100644 --- a/apps/sim/lib/table/events.ts +++ b/apps/sim/lib/table/events.ts @@ -26,23 +26,21 @@ export const TABLE_EVENT_TTL_SECONDS = 60 * 60 // 1 hour export const TABLE_EVENT_CAP = 5000 /** - * Atomic flush: ZADD the new entry, refresh TTL on events + seq + meta keys, - * trim the front of the sorted set to enforce the cap, then update the meta - * `earliestEventId` to whatever the front of the set now is. Without the - * Lua script, a slow reader could observe the trim before the meta update - * and incorrectly think pruning hadn't happened. + * Atomic append: INCR the seq counter to mint a new eventId, build the entry + * JSON inline, ZADD it, refresh TTL on events + seq + meta, trim to cap, then + * write the resulting earliestEventId to meta. Single round-trip per event. + * Without atomicity a slow reader could observe the trim before the meta + * update and miss the prune signal. * - * KEYS[1] = events sorted set key - * KEYS[2] = seq counter key (only EXPIRE'd here; INCR happens before EVAL) - * KEYS[3] = meta hash key - * ARGV[1] = TTL seconds - * ARGV[2] = cap (max events retained) - * ARGV[3] = updatedAt ISO string - * ARGV[4] = eventId (numeric, used as ZADD score) - * ARGV[5] = entry JSON + * KEYS: [events, seq, meta] + * ARGV: [ttlSec, cap, updatedAtIso, entryPrefix, entrySuffix] + * The new eventId is spliced between prefix/suffix to form the entry JSON. + * Returns the new eventId. */ const APPEND_EVENT_SCRIPT = ` -redis.call('ZADD', KEYS[1], ARGV[4], ARGV[5]) +local eventId = redis.call('INCR', KEYS[2]) +local entry = ARGV[4] .. eventId .. ARGV[5] +redis.call('ZADD', KEYS[1], eventId, entry) redis.call('EXPIRE', KEYS[1], tonumber(ARGV[1])) redis.call('EXPIRE', KEYS[2], tonumber(ARGV[1])) redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -tonumber(ARGV[2]) - 1) @@ -51,7 +49,7 @@ if oldest[2] then redis.call('HSET', KEYS[3], 'earliestEventId', tostring(math.floor(tonumber(oldest[2]))), 'updatedAt', ARGV[3]) redis.call('EXPIRE', KEYS[3], tonumber(ARGV[1])) end -return oldest[2] or false +return eventId ` function getEventsKey(tableId: string) { @@ -189,9 +187,11 @@ export async function appendTableEvent(event: TableEvent): Promise Date: Thu, 7 May 2026 20:20:18 -0700 Subject: [PATCH 4/9] fix(table): address PR review on SSE buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - TTL-expiry silent miss: when all keys expire, hgetall(meta) returns empty so earliestEventId is undefined and the prune branch was skipped. Reconnect with non-zero afterEventId now checks the seq counter — its absence (TTL expired) signals pruned so the client refetches. Memory fallback mirrors. - Unbounded ZRANGEBYSCORE: cap reads at TABLE_EVENT_READ_CHUNK = 500 events per call. The route's 500ms poll loop drains chunks across ticks instead of flushing 5000 entries (multi-MB) in one tick after a long disconnect. - Pruned handler closes EventSource client-side: server-side close was firing onerror and routing through the 500ms backoff path. Now we close proactively, reset the reconnect attempt counter, and reconnect immediately from the new earliest. --- .../[tableId]/hooks/use-table-event-stream.ts | 6 ++++ apps/sim/lib/table/events.ts | 35 +++++++++++++++++-- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts index e31d541069..5c999b220f 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -79,6 +79,12 @@ export function useTableEventStream({ logger.info('Table event buffer pruned — full refetch', { tableId, ...payload }) void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) lastEventId = typeof payload.earliestEventId === 'number' ? payload.earliestEventId : 0 + // Close proactively so the server's close doesn't fire onerror and route + // through the backoff path. Reconnect immediately from the new cursor. + eventSource?.close() + eventSource = null + reconnectAttempt = 0 + connect() } const scheduleReconnect = (): void => { diff --git a/apps/sim/lib/table/events.ts b/apps/sim/lib/table/events.ts index 689f70efed..97f9d2cac4 100644 --- a/apps/sim/lib/table/events.ts +++ b/apps/sim/lib/table/events.ts @@ -24,6 +24,8 @@ const logger = createLogger('TableEventBuffer') const REDIS_PREFIX = 'table:stream:' export const TABLE_EVENT_TTL_SECONDS = 60 * 60 // 1 hour export const TABLE_EVENT_CAP = 5000 +/** Max events returned by a single read; the SSE route drains in chunks. */ +export const TABLE_EVENT_READ_CHUNK = 500 /** * Atomic append: INCR the seq counter to mint a new eventId, build the entry @@ -155,13 +157,21 @@ function appendMemory(event: TableEvent): TableEventEntry { function readMemory(tableId: string, afterEventId: number): TableEventsReadResult { pruneExpiredMemoryStreams() const stream = memoryTableStreams.get(tableId) - if (!stream) return { status: 'ok', events: [] } + if (!stream) { + // Mirror the Redis path: a non-zero afterEventId with no buffer at all + // means TTL expired or the stream never existed; either way the caller's + // cursor is stale. + if (afterEventId > 0) return { status: 'pruned', earliestEventId: undefined } + return { status: 'ok', events: [] } + } if (stream.earliestEventId !== undefined && afterEventId + 1 < stream.earliestEventId) { return { status: 'pruned', earliestEventId: stream.earliestEventId } } return { status: 'ok', - events: stream.events.filter((entry) => entry.eventId > afterEventId), + events: stream.events + .filter((entry) => entry.eventId > afterEventId) + .slice(0, TABLE_EVENT_READ_CHUNK), } } @@ -239,7 +249,26 @@ export async function readTableEventsSince( if (earliestEventId !== undefined && afterEventId + 1 < earliestEventId) { return { status: 'pruned', earliestEventId } } - const raw = await redis.zrangebyscore(getEventsKey(tableId), afterEventId + 1, '+inf') + // Read in capped chunks so a 5000-event backlog doesn't materialize as one + // multi-MB Redis reply + JSON parse + SSE flush. The route loop drains + // chunks across ticks. + const raw = await redis.zrangebyscore( + getEventsKey(tableId), + afterEventId + 1, + '+inf', + 'LIMIT', + 0, + TABLE_EVENT_READ_CHUNK + ) + if (raw.length === 0 && afterEventId > 0) { + // Total TTL expiry: events + meta both gone. The seq counter has the + // same TTL — its absence means the buffer was wiped and the caller's + // `afterEventId` is stale. Signal pruned so the client refetches. + const seqExists = await redis.exists(getSeqKey(tableId)) + if (seqExists === 0) { + return { status: 'pruned', earliestEventId: undefined } + } + } return { status: 'ok', events: raw From 52834e914011ad6c20d86db8625a29392f2b212e Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Thu, 7 May 2026 23:58:07 -0700 Subject: [PATCH 5/9] improvement(table): persist SSE lastEventId in sessionStorage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tab refresh / navigate-away-and-back now resume the stream from where the previous mount left off instead of replaying from from=0. Mirrors the useExecutionStream pattern (saveExecutionPointer / loadExecutionPointer). First-ever mounts and new tabs still start at 0 — sessionStorage is per-tab, so the safe default applies. On a 'pruned' fallback the new earliestEventId is also persisted so the next reconnect starts there. --- .../[tableId]/hooks/use-table-event-stream.ts | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts index 5c999b220f..39aa77d970 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -14,6 +14,28 @@ interface PrunedEvent { } const RECONNECT_BACKOFF_MS = [500, 1_000, 2_000, 5_000, 10_000] +const POINTER_PREFIX = 'table-event-stream-pointer:' + +function loadPointer(tableId: string): number { + if (typeof window === 'undefined') return 0 + try { + const raw = window.sessionStorage.getItem(`${POINTER_PREFIX}${tableId}`) + if (!raw) return 0 + const parsed = Number.parseInt(raw, 10) + return Number.isFinite(parsed) && parsed >= 0 ? parsed : 0 + } catch { + return 0 + } +} + +function savePointer(tableId: string, eventId: number): void { + if (typeof window === 'undefined') return + try { + window.sessionStorage.setItem(`${POINTER_PREFIX}${tableId}`, String(eventId)) + } catch { + // sessionStorage can throw under quota / private mode — ignore. + } +} interface UseTableEventStreamArgs { tableId: string | undefined @@ -43,7 +65,10 @@ export function useTableEventStream({ let cancelled = false let eventSource: EventSource | null = null let reconnectTimer: ReturnType | null = null - let lastEventId = 0 + // Resume from the last seen eventId persisted in sessionStorage. Survives + // tab refresh; if the buffer has rolled past this id the server replies + // `pruned` and we full-refetch + restart from the new earliest. + let lastEventId = loadPointer(tableId) let reconnectAttempt = 0 const applyCell = (event: Extract): void => { @@ -79,6 +104,7 @@ export function useTableEventStream({ logger.info('Table event buffer pruned — full refetch', { tableId, ...payload }) void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) lastEventId = typeof payload.earliestEventId === 'number' ? payload.earliestEventId : 0 + savePointer(tableId, lastEventId) // Close proactively so the server's close doesn't fire onerror and route // through the backoff path. Reconnect immediately from the new cursor. eventSource?.close() @@ -119,6 +145,7 @@ export function useTableEventStream({ if (entry.event?.kind !== 'cell') return if (entry.eventId <= lastEventId) return lastEventId = entry.eventId + savePointer(tableId, lastEventId) applyCell(entry.event) } catch (err) { logger.warn('Failed to parse table event', { tableId, err }) From 9398a2be104eae48ac0dfdb8b38deab2b8b786e1 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Fri, 8 May 2026 00:18:57 -0700 Subject: [PATCH 6/9] fix(table): include runningBlockIds + blockErrors in SSE event payload MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cell renderer's 'queued' vs 'running' vs 'pending-upstream' decision reads exec.runningBlockIds + exec.blockErrors. Without those fields the inFlight branch falls through to 'pending-upstream' (amber Pending pill) even when the worker has already written status=running. The worker writes both fields to DB; the SSE event was stripping them. Thread them through events.ts → cell-write.ts → use-table-event-stream.ts. --- .../tables/[tableId]/hooks/use-table-event-stream.ts | 4 +++- apps/sim/lib/table/cell-write.ts | 4 ++++ apps/sim/lib/table/events.ts | 8 ++++++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts index 39aa77d970..3cb83a71b5 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -72,7 +72,7 @@ export function useTableEventStream({ let reconnectAttempt = 0 const applyCell = (event: Extract): void => { - const { rowId, groupId, status, executionId, jobId, error, outputs } = event + const { rowId, groupId, status, executionId, jobId, error, outputs, runningBlockIds, blockErrors } = event void snapshotAndMutateRows( queryClient, tableId, @@ -86,6 +86,8 @@ export function useTableEventStream({ // Preserve workflowId from cache; SSE payload doesn't carry it. workflowId: prevExec?.workflowId ?? '', error: error ?? null, + ...(runningBlockIds ? { runningBlockIds } : {}), + ...(blockErrors ? { blockErrors } : {}), } const nextExecutions: RowExecutions = { ...(row.executions ?? {}), diff --git a/apps/sim/lib/table/cell-write.ts b/apps/sim/lib/table/cell-write.ts index 2007e1e76f..5e179319f9 100644 --- a/apps/sim/lib/table/cell-write.ts +++ b/apps/sim/lib/table/cell-write.ts @@ -117,6 +117,8 @@ export async function writeWorkflowGroupState( const dataPatch = payload.dataPatch const hasOutputs = dataPatch && Object.keys(dataPatch).length > 0 + const runningBlockIds = payload.executionState.runningBlockIds + const blockErrors = payload.executionState.blockErrors void appendTableEvent({ kind: 'cell', tableId, @@ -127,6 +129,8 @@ export async function writeWorkflowGroupState( jobId: payload.executionState.jobId ?? null, error: payload.executionState.error ?? null, ...(hasOutputs ? { outputs: dataPatch } : {}), + ...(runningBlockIds && runningBlockIds.length > 0 ? { runningBlockIds } : {}), + ...(blockErrors && Object.keys(blockErrors).length > 0 ? { blockErrors } : {}), }) return 'wrote' diff --git a/apps/sim/lib/table/events.ts b/apps/sim/lib/table/events.ts index 97f9d2cac4..174e97de4f 100644 --- a/apps/sim/lib/table/events.ts +++ b/apps/sim/lib/table/events.ts @@ -89,6 +89,14 @@ export interface TableEvent { * already has these in hand from the same updateRow call that wrote DB. */ outputs?: Record + /** + * Block-level metadata the renderer reads to distinguish "running" (some + * block actively executing) from "pending-upstream" (run started but this + * column's block hasn't fired yet). The worker fills these on partial + * writes; without them the cell stays on the amber Pending pill. + */ + runningBlockIds?: string[] + blockErrors?: Record } export interface TableEventEntry { From d3428ffe395d1c30257cc3c0ddd3484adfdda889 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Fri, 8 May 2026 00:52:56 -0700 Subject: [PATCH 7/9] fix(table): show value once column output has landed mid-run MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cell renderer treated any `status: 'running'` event as in-flight, even when the column's own output had already been written. During a multi-block group run, partial-write events for a later block carry the earlier block's outputs but tag only the later block as running — that flipped the finished column back to the amber Pending pill until the terminal `completed` event arrived. Re-order the priority chain so the column's value wins over `pending-upstream`. Active re-run of the column itself (`blockRunning`) still wins over the stale value, so a re-run on a previously-completed cell still surfaces the running pill before the new value overwrites. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../table-grid/cells/cell-render.tsx | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx index 35eb3d2e8f..c98733eb1c 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx @@ -76,23 +76,27 @@ export function resolveCellRender({ if (blockError) return { kind: 'block-error' } - // In-flight wins over the existing value: when the group is being re-run, - // the current value is about to be overwritten — surface the run state so - // the user sees the cell is changing. Without this, a queued / running - // re-run on a previously-completed cell looks like nothing happened until - // the new value lands. + // Active re-run of THIS column wins over its prior value — the value is + // about to be overwritten and the user should see the cell is changing. const inFlight = exec?.status === 'running' || exec?.status === 'queued' || exec?.status === 'pending' + if (inFlight && blockRunning) return { kind: 'running' } + + // Value wins over `pending-upstream`: once this column's output has + // landed, the cell is done from the user's perspective — even if the + // group is still running other blocks downstream. Without this, mid-run + // partial-write events (`status: 'running'` carrying outputs but tagging + // a different block as running) would flip a finished column back to the + // amber Pending pill until the terminal `completed` event arrives. + if (!isNull) return { kind: 'value', text: stringifyValue(value) } + if (inFlight && !(groupHasBlockErrors && !blockRunning)) { - if (blockRunning) return { kind: 'running' } if (exec?.status === 'queued' || exec?.status === 'pending') return { kind: 'queued' } - // `running` with this block not in `runningBlockIds` = upstream block - // still going; surface as the amber Pending pill per logs convention. + // `running` with this block not in `runningBlockIds` and no value yet = + // upstream block still going; surface as the amber Pending pill. return { kind: 'pending-upstream' } } - if (!isNull) return { kind: 'value', text: stringifyValue(value) } - // Waiting wins over a stale terminal state: if deps are unmet right now, // the prior `cancelled` / `error` is informational at best — the cell // can't actually run until the user fills the missing input. Surface the From 039bef20afdb911fe783f4e1f645ea122063a09c Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Fri, 8 May 2026 00:54:39 -0700 Subject: [PATCH 8/9] chore(table): address PR review nits on tables.ts - Merge duplicate JSDoc on snapshotAndMutateRows into a single block - Remove unused useQueryClient() calls from useTableRows and useInfiniteTableRows (leftover from polling-era code) Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/sim/hooks/queries/tables.ts | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index 5690839837..c065bad410 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -225,7 +225,6 @@ export function useTableRows({ includeTotal, enabled = true, }: TableRowsParams & { enabled?: boolean }) { - const queryClient = useQueryClient() const paramsKey = JSON.stringify({ limit, offset, @@ -259,7 +258,6 @@ export function useInfiniteTableRows({ sort, enabled = true, }: InfiniteTableRowsParams) { - const queryClient = useQueryClient() const paramsKey = JSON.stringify({ pageSize, filter: filter ?? null, @@ -1022,18 +1020,18 @@ function isInfiniteRowsCache(value: unknown): value is InfiniteRowsCache { } /** - * Walks every cached row-list under `tableId`, applies `transform` to each row, - * and snapshots the originals for rollback. + * Walks every cached query under `rowsRoot(tableId)` and applies `transform` + * to each row. Handles both cache shapes — the single-page `TableRowsResponse` + * and the infinite-query `{ pages, pageParams }`. `transform(row)` returns + * the next row to write, or `null` to leave it. + * + * Returns the list of `[queryKey, prior data]` entries so optimistic-update + * callers can roll back. SSE patchers can ignore the return value. * - * Handles both cache shapes: the single-page `TableRowsResponse` and the - * infinite-query `{ pages, pageParams }`. `transform(row)` returns the next - * row to write, or `null` to leave it. The common pattern is "matching cells - * flip state, others are skipped". + * `cancelInFlight` defaults to true (the optimistic-update contract) but SSE + * patchers pass `false` so live cell updates don't kick the row query off the + * network. */ -/** Walks every cached query under `rowsRoot(tableId)` and applies `transform` - * to each row. Transform returns the new row or `null` to skip. Returns the - * list of [queryKey, prior data] entries so optimistic-update callers can - * roll back. SSE patchers can ignore the return value. */ export async function snapshotAndMutateRows( queryClient: ReturnType, tableId: string, From 9f565dc0ae9282963809bf33ef82c5b894520135 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Fri, 8 May 2026 01:00:31 -0700 Subject: [PATCH 9/9] chore(table): apply biome formatting fixes CI lint job flagged import order in two files and over-wrapped union in lib/table/events.ts. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../table/[tableId]/events/stream/route.ts | 2 +- .../[tableId]/hooks/use-table-event-stream.ts | 20 +++++++++++++------ apps/sim/lib/table/events.ts | 8 +------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/apps/sim/app/api/table/[tableId]/events/stream/route.ts b/apps/sim/app/api/table/[tableId]/events/stream/route.ts index f5b2042ae7..d938c80c3e 100644 --- a/apps/sim/app/api/table/[tableId]/events/stream/route.ts +++ b/apps/sim/app/api/table/[tableId]/events/stream/route.ts @@ -5,8 +5,8 @@ import { type NextRequest, NextResponse } from 'next/server' import { tableEventStreamContract } from '@/lib/api/contracts/tables' import { parseRequest } from '@/lib/api/server' import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' -import { SSE_HEADERS } from '@/lib/core/utils/sse' import { generateRequestId } from '@/lib/core/utils/request' +import { SSE_HEADERS } from '@/lib/core/utils/sse' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { readTableEventsSince, type TableEventEntry } from '@/lib/table/events' import { accessError, checkAccess } from '@/app/api/table/utils' diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts index 3cb83a71b5..e5a0e5e807 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -3,9 +3,9 @@ import { useEffect } from 'react' import { createLogger } from '@sim/logger' import { useQueryClient } from '@tanstack/react-query' -import { snapshotAndMutateRows, tableKeys } from '@/hooks/queries/tables' -import type { TableEvent, TableEventEntry } from '@/lib/table/events' import type { RowData, RowExecutionMetadata, RowExecutions } from '@/lib/table' +import type { TableEvent, TableEventEntry } from '@/lib/table/events' +import { snapshotAndMutateRows, tableKeys } from '@/hooks/queries/tables' const logger = createLogger('useTableEventStream') @@ -72,7 +72,17 @@ export function useTableEventStream({ let reconnectAttempt = 0 const applyCell = (event: Extract): void => { - const { rowId, groupId, status, executionId, jobId, error, outputs, runningBlockIds, blockErrors } = event + const { + rowId, + groupId, + status, + executionId, + jobId, + error, + outputs, + runningBlockIds, + blockErrors, + } = event void snapshotAndMutateRows( queryClient, tableId, @@ -93,9 +103,7 @@ export function useTableEventStream({ ...(row.executions ?? {}), [groupId]: nextExec, } - const nextData: RowData = outputs - ? ({ ...row.data, ...outputs } as RowData) - : row.data + const nextData: RowData = outputs ? ({ ...row.data, ...outputs } as RowData) : row.data return { ...row, executions: nextExecutions, data: nextData } }, { cancelInFlight: false } diff --git a/apps/sim/lib/table/events.ts b/apps/sim/lib/table/events.ts index 174e97de4f..64d4fabc56 100644 --- a/apps/sim/lib/table/events.ts +++ b/apps/sim/lib/table/events.ts @@ -66,13 +66,7 @@ function getMetaKey(tableId: string) { return `${REDIS_PREFIX}${tableId}:meta` } -export type TableCellStatus = - | 'pending' - | 'queued' - | 'running' - | 'completed' - | 'cancelled' - | 'error' +export type TableCellStatus = 'pending' | 'queued' | 'running' | 'completed' | 'cancelled' | 'error' export interface TableEvent { kind: 'cell'