diff --git a/apps/sim/app/api/table/[tableId]/events/stream/route.ts b/apps/sim/app/api/table/[tableId]/events/stream/route.ts new file mode 100644 index 0000000000..d938c80c3e --- /dev/null +++ b/apps/sim/app/api/table/[tableId]/events/stream/route.ts @@ -0,0 +1,161 @@ +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { sleep } from '@sim/utils/helpers' +import { type NextRequest, NextResponse } from 'next/server' +import { tableEventStreamContract } from '@/lib/api/contracts/tables' +import { parseRequest } from '@/lib/api/server' +import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' +import { generateRequestId } from '@/lib/core/utils/request' +import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { withRouteHandler } from '@/lib/core/utils/with-route-handler' +import { readTableEventsSince, type TableEventEntry } from '@/lib/table/events' +import { accessError, checkAccess } from '@/app/api/table/utils' + +const logger = createLogger('TableEventStreamAPI') + +const POLL_INTERVAL_MS = 500 +const HEARTBEAT_INTERVAL_MS = 15_000 +const MAX_STREAM_DURATION_MS = 4 * 60 * 60 * 1000 // 4 hours; client reconnects past this + +export const runtime = 'nodejs' +export const dynamic = 'force-dynamic' + +interface RouteContext { + params: Promise<{ tableId: string }> +} + +/** GET /api/table/[tableId]/events/stream?from= + * + * SSE stream of cell-state transitions. Replay-on-reconnect via `from`. + * Pruning (buffer cap exceeded or TTL expired) sends a `pruned` event and + * closes; the client responds with a full row-query refetch and reconnects + * from the new earliest. */ +export const GET = withRouteHandler(async (req: NextRequest, context: RouteContext) => { + const requestId = generateRequestId() + const parsed = await parseRequest(tableEventStreamContract, req, context) + if (!parsed.success) return parsed.response + const { tableId } = parsed.data.params + const { from: fromEventId } = parsed.data.query + + const auth = await checkSessionOrInternalAuth(req, { requireWorkflowId: false }) + if (!auth.success || !auth.userId) { + return NextResponse.json({ error: 'Authentication required' }, { status: 401 }) + } + + const access = await checkAccess(tableId, auth.userId, 'read') + if (!access.ok) return accessError(access, requestId, tableId) + + logger.info(`[${requestId}] Table event stream opened`, { tableId, fromEventId }) + + const encoder = new TextEncoder() + let closed = false + + const stream = new ReadableStream({ + async start(controller) { + let lastEventId = fromEventId + const deadline = Date.now() + MAX_STREAM_DURATION_MS + let nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS + + const enqueue = (text: string) => { + if (closed) return + try { + controller.enqueue(encoder.encode(text)) + } catch { + closed = true + } + } + + const sendEvents = (events: TableEventEntry[]) => { + for (const entry of events) { + if (closed) return + enqueue(`data: ${JSON.stringify(entry)}\n\n`) + lastEventId = entry.eventId + } + } + + const sendPrunedAndClose = (earliestEventId: number | undefined) => { + enqueue( + `event: pruned\ndata: ${JSON.stringify({ earliestEventId: earliestEventId ?? null })}\n\n` + ) + if (!closed) { + closed = true + try { + controller.close() + } catch {} + } + } + + const sendHeartbeat = () => { + // SSE comment line — keeps proxies (ALB default 60s idle) from closing + // the connection during quiet periods. + enqueue(`: ping ${Date.now()}\n\n`) + } + + try { + // Initial replay from buffer. + const initial = await readTableEventsSince(tableId, lastEventId) + if (initial.status === 'pruned') { + sendPrunedAndClose(initial.earliestEventId) + return + } + if (initial.status === 'unavailable') { + throw new Error(`Table event buffer unavailable: ${initial.error}`) + } + sendEvents(initial.events) + + // Stream loop — poll the buffer and forward new events. Workflow + // execution stream uses the same shape; pub/sub wakeups are an + // optimization we can add later if 500ms polling becomes a problem. + while (!closed && Date.now() < deadline) { + await sleep(POLL_INTERVAL_MS) + if (closed) return + + const result = await readTableEventsSince(tableId, lastEventId) + if (result.status === 'pruned') { + sendPrunedAndClose(result.earliestEventId) + return + } + if (result.status === 'unavailable') { + throw new Error(`Table event buffer unavailable: ${result.error}`) + } + if (result.events.length > 0) { + sendEvents(result.events) + } + + if (Date.now() >= nextHeartbeatAt) { + sendHeartbeat() + nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS + } + } + + // Reached the defensive duration ceiling — close cleanly so the client + // reconnects with the latest lastEventId. + if (!closed) { + enqueue(`event: rotate\ndata: {}\n\n`) + closed = true + try { + controller.close() + } catch {} + } + } catch (error) { + logger.error(`[${requestId}] Table event stream error`, { + tableId, + error: toError(error).message, + }) + if (!closed) { + try { + controller.error(error) + } catch {} + } + } + }, + cancel() { + closed = true + logger.info(`[${requestId}] Client disconnected from table event stream`, { tableId }) + }, + }) + + return new NextResponse(stream, { + headers: { ...SSE_HEADERS, 'X-Table-Id': tableId }, + }) +}) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx index 35eb3d2e8f..c98733eb1c 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx @@ -76,23 +76,27 @@ export function resolveCellRender({ if (blockError) return { kind: 'block-error' } - // In-flight wins over the existing value: when the group is being re-run, - // the current value is about to be overwritten — surface the run state so - // the user sees the cell is changing. Without this, a queued / running - // re-run on a previously-completed cell looks like nothing happened until - // the new value lands. + // Active re-run of THIS column wins over its prior value — the value is + // about to be overwritten and the user should see the cell is changing. const inFlight = exec?.status === 'running' || exec?.status === 'queued' || exec?.status === 'pending' + if (inFlight && blockRunning) return { kind: 'running' } + + // Value wins over `pending-upstream`: once this column's output has + // landed, the cell is done from the user's perspective — even if the + // group is still running other blocks downstream. Without this, mid-run + // partial-write events (`status: 'running'` carrying outputs but tagging + // a different block as running) would flip a finished column back to the + // amber Pending pill until the terminal `completed` event arrives. + if (!isNull) return { kind: 'value', text: stringifyValue(value) } + if (inFlight && !(groupHasBlockErrors && !blockRunning)) { - if (blockRunning) return { kind: 'running' } if (exec?.status === 'queued' || exec?.status === 'pending') return { kind: 'queued' } - // `running` with this block not in `runningBlockIds` = upstream block - // still going; surface as the amber Pending pill per logs convention. + // `running` with this block not in `runningBlockIds` and no value yet = + // upstream block still going; surface as the amber Pending pill. return { kind: 'pending-upstream' } } - if (!isNull) return { kind: 'value', text: stringifyValue(value) } - // Waiting wins over a stale terminal state: if deps are unmet right now, // the prior `cancelled` / `error` is informational at best — the cell // can't actually run until the user fills the missing input. Surface the diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts index d207594366..76addf44c4 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/index.ts @@ -1,2 +1,3 @@ export * from './use-context-menu' export * from './use-table' +export * from './use-table-event-stream' diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts new file mode 100644 index 0000000000..e5a0e5e807 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -0,0 +1,196 @@ +'use client' + +import { useEffect } from 'react' +import { createLogger } from '@sim/logger' +import { useQueryClient } from '@tanstack/react-query' +import type { RowData, RowExecutionMetadata, RowExecutions } from '@/lib/table' +import type { TableEvent, TableEventEntry } from '@/lib/table/events' +import { snapshotAndMutateRows, tableKeys } from '@/hooks/queries/tables' + +const logger = createLogger('useTableEventStream') + +interface PrunedEvent { + earliestEventId: number | null +} + +const RECONNECT_BACKOFF_MS = [500, 1_000, 2_000, 5_000, 10_000] +const POINTER_PREFIX = 'table-event-stream-pointer:' + +function loadPointer(tableId: string): number { + if (typeof window === 'undefined') return 0 + try { + const raw = window.sessionStorage.getItem(`${POINTER_PREFIX}${tableId}`) + if (!raw) return 0 + const parsed = Number.parseInt(raw, 10) + return Number.isFinite(parsed) && parsed >= 0 ? parsed : 0 + } catch { + return 0 + } +} + +function savePointer(tableId: string, eventId: number): void { + if (typeof window === 'undefined') return + try { + window.sessionStorage.setItem(`${POINTER_PREFIX}${tableId}`, String(eventId)) + } catch { + // sessionStorage can throw under quota / private mode — ignore. + } +} + +interface UseTableEventStreamArgs { + tableId: string | undefined + workspaceId: string | undefined + enabled?: boolean +} + +/** + * Subscribes to the table's SSE event stream and patches the React Query + * cache as cell-state events arrive. + * + * Reconnect-resume: on transport error, reconnects with `from=` set to the + * last seen `eventId`; server replays missed events from the Redis-backed + * buffer. If the gap exceeds buffer retention (server emits `pruned`), the + * hook full-refetches the row queries and resumes from the new earliest. + */ +export function useTableEventStream({ + tableId, + workspaceId, + enabled = true, +}: UseTableEventStreamArgs): void { + const queryClient = useQueryClient() + + useEffect(() => { + if (!enabled || !tableId || !workspaceId) return + + let cancelled = false + let eventSource: EventSource | null = null + let reconnectTimer: ReturnType | null = null + // Resume from the last seen eventId persisted in sessionStorage. Survives + // tab refresh; if the buffer has rolled past this id the server replies + // `pruned` and we full-refetch + restart from the new earliest. + let lastEventId = loadPointer(tableId) + let reconnectAttempt = 0 + + const applyCell = (event: Extract): void => { + const { + rowId, + groupId, + status, + executionId, + jobId, + error, + outputs, + runningBlockIds, + blockErrors, + } = event + void snapshotAndMutateRows( + queryClient, + tableId, + (row) => { + if (row.id !== rowId) return null + const prevExec = row.executions?.[groupId] + const nextExec: RowExecutionMetadata = { + status, + executionId: executionId ?? null, + jobId: jobId ?? null, + // Preserve workflowId from cache; SSE payload doesn't carry it. + workflowId: prevExec?.workflowId ?? '', + error: error ?? null, + ...(runningBlockIds ? { runningBlockIds } : {}), + ...(blockErrors ? { blockErrors } : {}), + } + const nextExecutions: RowExecutions = { + ...(row.executions ?? {}), + [groupId]: nextExec, + } + const nextData: RowData = outputs ? ({ ...row.data, ...outputs } as RowData) : row.data + return { ...row, executions: nextExecutions, data: nextData } + }, + { cancelInFlight: false } + ) + } + + const handlePrune = (payload: PrunedEvent): void => { + logger.info('Table event buffer pruned — full refetch', { tableId, ...payload }) + void queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) + lastEventId = typeof payload.earliestEventId === 'number' ? payload.earliestEventId : 0 + savePointer(tableId, lastEventId) + // Close proactively so the server's close doesn't fire onerror and route + // through the backoff path. Reconnect immediately from the new cursor. + eventSource?.close() + eventSource = null + reconnectAttempt = 0 + connect() + } + + const scheduleReconnect = (): void => { + if (cancelled) return + const idx = Math.min(reconnectAttempt, RECONNECT_BACKOFF_MS.length - 1) + const delay = RECONNECT_BACKOFF_MS[idx] + reconnectAttempt++ + reconnectTimer = setTimeout(() => { + reconnectTimer = null + connect() + }, delay) + } + + const connect = (): void => { + if (cancelled) return + const url = `/api/table/${tableId}/events/stream?from=${lastEventId}` + try { + eventSource = new EventSource(url) + } catch (err) { + logger.warn('Failed to open table event stream', { tableId, err }) + scheduleReconnect() + return + } + + eventSource.onopen = () => { + reconnectAttempt = 0 + } + + eventSource.onmessage = (msg: MessageEvent) => { + try { + const entry = JSON.parse(msg.data) as TableEventEntry + if (entry.event?.kind !== 'cell') return + if (entry.eventId <= lastEventId) return + lastEventId = entry.eventId + savePointer(tableId, lastEventId) + applyCell(entry.event) + } catch (err) { + logger.warn('Failed to parse table event', { tableId, err }) + } + } + + eventSource.addEventListener('pruned', (msg: MessageEvent) => { + try { + handlePrune(JSON.parse(msg.data) as PrunedEvent) + } catch { + handlePrune({ earliestEventId: null }) + } + }) + + eventSource.addEventListener('rotate', () => { + eventSource?.close() + eventSource = null + scheduleReconnect() + }) + + eventSource.onerror = () => { + if (cancelled) return + eventSource?.close() + eventSource = null + scheduleReconnect() + } + } + + connect() + + return () => { + cancelled = true + if (reconnectTimer !== null) clearTimeout(reconnectTimer) + eventSource?.close() + eventSource = null + } + }, [enabled, tableId, workspaceId, queryClient]) +} diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx index e9fea357e6..91eeb93595 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/table.tsx @@ -50,7 +50,7 @@ import { } from './components' import { COLUMN_SIDEBAR_WIDTH } from './components/table-grid/constants' import { COLUMN_TYPE_ICONS } from './components/table-grid/headers' -import { useTable } from './hooks' +import { useTable, useTableEventStream } from './hooks' import type { QueryOptions } from './types' import { generateColumnName } from './utils' @@ -117,6 +117,8 @@ export function Table({ const workspaceId = propWorkspaceId || (params.workspaceId as string) const tableId = propTableId || (params.tableId as string) + useTableEventStream({ tableId, workspaceId }) + const [slideout, dispatch] = useReducer(slideoutReducer, { kind: 'none' }) const [showDeleteTableConfirm, setShowDeleteTableConfirm] = useState(false) const [isImportCsvOpen, setIsImportCsvOpen] = useState(false) diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index e2cf11ac08..c065bad410 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -4,7 +4,7 @@ * React Query hooks for managing user-defined tables. */ -import { useEffect, useMemo } from 'react' +import { useMemo } from 'react' import { createLogger } from '@sim/logger' import { type InfiniteData, @@ -71,91 +71,6 @@ import type { } from '@/lib/table' import { areOutputsFilled, optimisticallyScheduleNewlyEligibleGroups } from '@/lib/table/deps' -/** Short poll to surface running → completed transitions from the server without a dedicated realtime channel. */ -const ROWS_POLL_INTERVAL_WHILE_RUNNING_MS = 1500 - -function hasRunningGroupExecution(rows: TableRow[] | undefined): boolean { - if (!rows) return false - for (const row of rows) { - const executions = row.executions ?? {} - for (const key in executions) { - if (isOptimisticInFlight(executions[key])) return true - } - } - return false -} - -/** - * Shallow-equality on the fields the renderer reads from a row. Row data is - * also shallow-compared — workflow output cells are scalars, so `===` per key - * suffices. `executions` is a per-group exec metadata object; we compare each - * group's `(status, jobId, executionId, error)` tuple. Any deeper drift forces - * a fresh row reference, which is the safe default. - */ -function rowEqual(a: TableRow, b: TableRow): boolean { - if (a === b) return true - if (a.position !== b.position) return false - const aData = a.data ?? {} - const bData = b.data ?? {} - const aDataKeys = Object.keys(aData) - if (aDataKeys.length !== Object.keys(bData).length) return false - for (const k of aDataKeys) { - if (aData[k] !== bData[k]) return false - } - const aExec = a.executions ?? {} - const bExec = b.executions ?? {} - const aExecKeys = Object.keys(aExec) - if (aExecKeys.length !== Object.keys(bExec).length) return false - for (const k of aExecKeys) { - const ax = aExec[k] - const bx = bExec[k] - if (ax === bx) continue - if (!ax || !bx) return false - if ( - ax.status !== bx.status || - ax.jobId !== bx.jobId || - ax.executionId !== bx.executionId || - ax.error !== bx.error - ) { - return false - } - } - return true -} - -/** - * Replaces `prev.rows` element-by-element with `fresh.rows`, but reuses the - * `prev` reference for any row that hasn't changed. Memoized `` - * children short-circuit on row-identity, so a poll tick that arrives with - * 1000 rows but only flips the status of 5 only re-renders those 5 instead - * of every row in the page. - */ -function mergePagePreservingIdentity( - prev: TableRowsResponse, - fresh: TableRowsResponse -): TableRowsResponse { - if (prev.rows === fresh.rows) return prev - const oldById = new Map(prev.rows.map((r) => [r.id, r])) - let changed = false - const merged = fresh.rows.map((freshRow) => { - const old = oldById.get(freshRow.id) - if (old && rowEqual(old, freshRow)) return old - changed = true - return freshRow - }) - if (!changed && merged.length === prev.rows.length) { - let identical = true - for (let i = 0; i < merged.length; i++) { - if (merged[i] !== prev.rows[i]) { - identical = false - break - } - } - if (identical && prev.totalCount === fresh.totalCount) return prev - } - return { ...fresh, rows: merged } -} - const logger = createLogger('TableQueries') type TableQueryScope = 'active' | 'archived' | 'all' @@ -310,7 +225,6 @@ export function useTableRows({ includeTotal, enabled = true, }: TableRowsParams & { enabled?: boolean }) { - const queryClient = useQueryClient() const paramsKey = JSON.stringify({ limit, offset, @@ -326,13 +240,6 @@ export function useTableRows({ enabled: Boolean(workspaceId && tableId) && enabled, staleTime: 30 * 1000, placeholderData: keepPreviousData, - refetchInterval: (query) => { - if (queryClient.isMutating() > 0) return false - return hasRunningGroupExecution(query.state.data?.rows) - ? ROWS_POLL_INTERVAL_WHILE_RUNNING_MS - : false - }, - refetchIntervalInBackground: false, }) } @@ -351,7 +258,6 @@ export function useInfiniteTableRows({ sort, enabled = true, }: InfiniteTableRowsParams) { - const queryClient = useQueryClient() const paramsKey = JSON.stringify({ pageSize, filter: filter ?? null, @@ -359,7 +265,7 @@ export function useInfiniteTableRows({ }) const queryKey = useMemo(() => tableKeys.infiniteRows(tableId, paramsKey), [tableId, paramsKey]) - const query = useInfiniteQuery({ + return useInfiniteQuery({ queryKey, queryFn: ({ pageParam, signal }) => fetchTableRows({ @@ -380,78 +286,6 @@ export function useInfiniteTableRows({ enabled: Boolean(workspaceId && tableId) && enabled, staleTime: 30 * 1000, }) - - /** - * Per-page polling. Built-in `refetchInterval` would refetch every loaded - * page on each tick — wasteful when only one page has running cells. - * Instead, walk pages each tick and refetch ONLY the dirty ones, splicing - * results back into the cache. Polling stops when no page has in-flight - * cells, or while a mutation is running (optimistic-update guard). - */ - useEffect(() => { - if (!enabled || !workspaceId || !tableId) return - let cancelled = false - let timeoutId: ReturnType | null = null - const tick = async () => { - if (cancelled) return - if (queryClient.isMutating() === 0) { - const data = queryClient.getQueryData>(queryKey) - const dirty: number[] = [] - if (data) { - for (let i = 0; i < data.pages.length; i++) { - if (hasRunningGroupExecution(data.pages[i].rows)) { - dirty.push(data.pageParams[i] ?? i * pageSize) - } - } - } - if (dirty.length > 0) { - await Promise.all( - dirty.map(async (offset) => { - try { - const fresh = await fetchTableRows({ - workspaceId, - tableId, - limit: pageSize, - offset, - filter, - sort, - includeTotal: offset === 0, - }) - if (cancelled) return - queryClient.setQueryData>( - queryKey, - (prev) => { - if (!prev) return prev - const idx = prev.pageParams.indexOf(offset) - if (idx === -1) return prev - const merged = mergePagePreservingIdentity(prev.pages[idx], fresh) - if (merged === prev.pages[idx]) return prev - const nextPages = prev.pages.slice() - nextPages[idx] = merged - return { ...prev, pages: nextPages } - } - ) - } catch { - // Transient fetch failure — next tick retries. Don't kill the loop. - } - }) - ) - } - } - if (cancelled) return - // Recursive setTimeout instead of setInterval so a slow tick can't - // overlap the next one — out-of-order responses would otherwise let - // stale data overwrite fresh. - timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS) - } - timeoutId = setTimeout(() => void tick(), ROWS_POLL_INTERVAL_WHILE_RUNNING_MS) - return () => { - cancelled = true - if (timeoutId !== null) clearTimeout(timeoutId) - } - }, [enabled, workspaceId, tableId, pageSize, filter, sort, queryClient, queryKey]) - - return query } /** @@ -1186,20 +1020,27 @@ function isInfiniteRowsCache(value: unknown): value is InfiniteRowsCache { } /** - * Walks every cached row-list under `tableId`, applies `transform` to each row, - * and snapshots the originals for rollback. + * Walks every cached query under `rowsRoot(tableId)` and applies `transform` + * to each row. Handles both cache shapes — the single-page `TableRowsResponse` + * and the infinite-query `{ pages, pageParams }`. `transform(row)` returns + * the next row to write, or `null` to leave it. + * + * Returns the list of `[queryKey, prior data]` entries so optimistic-update + * callers can roll back. SSE patchers can ignore the return value. * - * Handles both cache shapes: the single-page `TableRowsResponse` and the - * infinite-query `{ pages, pageParams }`. `transform(row)` returns the next - * row to write, or `null` to leave it. The common pattern is "matching cells - * flip state, others are skipped". + * `cancelInFlight` defaults to true (the optimistic-update contract) but SSE + * patchers pass `false` so live cell updates don't kick the row query off the + * network. */ export async function snapshotAndMutateRows( queryClient: ReturnType, tableId: string, - transform: (row: TableRow) => TableRow | null + transform: (row: TableRow) => TableRow | null, + options?: { cancelInFlight?: boolean } ): Promise { - await queryClient.cancelQueries({ queryKey: tableKeys.rowsRoot(tableId) }) + if (options?.cancelInFlight !== false) { + await queryClient.cancelQueries({ queryKey: tableKeys.rowsRoot(tableId) }) + } const matching = queryClient.getQueriesData({ queryKey: tableKeys.rowsRoot(tableId), }) @@ -1329,14 +1170,9 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) { onError: (_err, _variables, context) => { if (context?.snapshots) restoreCachedWorkflowCells(queryClient, context.snapshots) }, - onSettled: async () => { - // Cancel any in-flight poll first — without this, a poll started during - // the mutation but lands AFTER it resolves can clobber the optimistic - // patch with stale data, producing a queued → cancelled → queued flicker - // before the authoritative refetch arrives. - await queryClient.cancelQueries({ queryKey: tableKeys.rowsRoot(tableId) }) - await queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(tableId) }) - }, + // No reconciliation here — useTableEventStream is the source of truth for + // post-mutation cache state, and a refetch would race its incremental + // patches. }) } diff --git a/apps/sim/lib/api/contracts/tables.ts b/apps/sim/lib/api/contracts/tables.ts index 5c7b467e8f..e9cb743dc2 100644 --- a/apps/sim/lib/api/contracts/tables.ts +++ b/apps/sim/lib/api/contracts/tables.ts @@ -912,3 +912,21 @@ export type RunColumnBodyInput = z.input /** Shared `runMode` union — used by every UI / hook / Mothership site that * builds a run-column payload. Single source of truth for the literal pair. */ export type RunMode = NonNullable + +export const tableEventStreamQuerySchema = z.object({ + from: z.preprocess((value) => { + if (typeof value !== 'string') return 0 + const parsed = Number.parseInt(value, 10) + return Number.isFinite(parsed) && parsed >= 0 ? parsed : 0 + }, z.number().int().min(0)), +}) + +export const tableEventStreamContract = defineRouteContract({ + method: 'GET', + path: '/api/table/[tableId]/events/stream', + params: tableIdParamsSchema, + query: tableEventStreamQuerySchema, + response: { + mode: 'stream', + }, +}) diff --git a/apps/sim/lib/table/cell-write.ts b/apps/sim/lib/table/cell-write.ts index 932b2d136a..5e179319f9 100644 --- a/apps/sim/lib/table/cell-write.ts +++ b/apps/sim/lib/table/cell-write.ts @@ -10,6 +10,7 @@ */ import { createLogger } from '@sim/logger' +import { appendTableEvent } from '@/lib/table/events' import type { RowData, RowExecutionMetadata, RowExecutions, WorkflowGroup } from '@/lib/table/types' const logger = createLogger('WorkflowCellWrite') @@ -113,6 +114,25 @@ export async function writeWorkflowGroupState( ) return 'skipped' } + + const dataPatch = payload.dataPatch + const hasOutputs = dataPatch && Object.keys(dataPatch).length > 0 + const runningBlockIds = payload.executionState.runningBlockIds + const blockErrors = payload.executionState.blockErrors + void appendTableEvent({ + kind: 'cell', + tableId, + rowId, + groupId, + status: payload.executionState.status, + executionId: payload.executionState.executionId ?? null, + jobId: payload.executionState.jobId ?? null, + error: payload.executionState.error ?? null, + ...(hasOutputs ? { outputs: dataPatch } : {}), + ...(runningBlockIds && runningBlockIds.length > 0 ? { runningBlockIds } : {}), + ...(blockErrors && Object.keys(blockErrors).length > 0 ? { blockErrors } : {}), + }) + return 'wrote' } diff --git a/apps/sim/lib/table/events.ts b/apps/sim/lib/table/events.ts new file mode 100644 index 0000000000..64d4fabc56 --- /dev/null +++ b/apps/sim/lib/table/events.ts @@ -0,0 +1,291 @@ +/** + * Per-table event buffer for live cell-state updates. + * + * The grid subscribes to a per-table SSE stream and patches its React Query + * cache as events arrive. This buffer is the durable mid-tier between the + * cell-write paths (`writeWorkflowGroupState`, `cancelWorkflowGroupRuns`) and + * the SSE consumers — every status transition appends here with a monotonic + * eventId; SSE clients resume on reconnect via `?from=` and the + * server replays from this buffer. + * + * Modeled after `apps/sim/lib/execution/event-buffer.ts` but stripped of + * complexity tables don't need: no per-execution lifecycle, no id reservation + * batching, no write-queue serialization. Tables are always-on; cell writes + * are sparse and independent. + */ + +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { env } from '@/lib/core/config/env' +import { getRedisClient } from '@/lib/core/config/redis' + +const logger = createLogger('TableEventBuffer') + +const REDIS_PREFIX = 'table:stream:' +export const TABLE_EVENT_TTL_SECONDS = 60 * 60 // 1 hour +export const TABLE_EVENT_CAP = 5000 +/** Max events returned by a single read; the SSE route drains in chunks. */ +export const TABLE_EVENT_READ_CHUNK = 500 + +/** + * Atomic append: INCR the seq counter to mint a new eventId, build the entry + * JSON inline, ZADD it, refresh TTL on events + seq + meta, trim to cap, then + * write the resulting earliestEventId to meta. Single round-trip per event. + * Without atomicity a slow reader could observe the trim before the meta + * update and miss the prune signal. + * + * KEYS: [events, seq, meta] + * ARGV: [ttlSec, cap, updatedAtIso, entryPrefix, entrySuffix] + * The new eventId is spliced between prefix/suffix to form the entry JSON. + * Returns the new eventId. + */ +const APPEND_EVENT_SCRIPT = ` +local eventId = redis.call('INCR', KEYS[2]) +local entry = ARGV[4] .. eventId .. ARGV[5] +redis.call('ZADD', KEYS[1], eventId, entry) +redis.call('EXPIRE', KEYS[1], tonumber(ARGV[1])) +redis.call('EXPIRE', KEYS[2], tonumber(ARGV[1])) +redis.call('ZREMRANGEBYRANK', KEYS[1], 0, -tonumber(ARGV[2]) - 1) +local oldest = redis.call('ZRANGE', KEYS[1], 0, 0, 'WITHSCORES') +if oldest[2] then + redis.call('HSET', KEYS[3], 'earliestEventId', tostring(math.floor(tonumber(oldest[2]))), 'updatedAt', ARGV[3]) + redis.call('EXPIRE', KEYS[3], tonumber(ARGV[1])) +end +return eventId +` + +function getEventsKey(tableId: string) { + return `${REDIS_PREFIX}${tableId}:events` +} + +function getSeqKey(tableId: string) { + return `${REDIS_PREFIX}${tableId}:seq` +} + +function getMetaKey(tableId: string) { + return `${REDIS_PREFIX}${tableId}:meta` +} + +export type TableCellStatus = 'pending' | 'queued' | 'running' | 'completed' | 'cancelled' | 'error' + +export interface TableEvent { + kind: 'cell' + tableId: string + rowId: string + groupId: string + status: TableCellStatus + executionId: string | null + jobId: string | null + error: string | null + /** + * Present when this transition wrote new output values; absent on + * pure-status transitions (queued, running, cancelled). The publisher + * already has these in hand from the same updateRow call that wrote DB. + */ + outputs?: Record + /** + * Block-level metadata the renderer reads to distinguish "running" (some + * block actively executing) from "pending-upstream" (run started but this + * column's block hasn't fired yet). The worker fills these on partial + * writes; without them the cell stays on the amber Pending pill. + */ + runningBlockIds?: string[] + blockErrors?: Record +} + +export interface TableEventEntry { + eventId: number + tableId: string + event: TableEvent +} + +export type TableEventsReadResult = + | { status: 'ok'; events: TableEventEntry[] } + | { status: 'pruned'; earliestEventId: number | undefined } + | { status: 'unavailable'; error: string } + +/** In-memory fallback for dev/tests when Redis isn't configured. */ +interface MemoryTableStream { + events: TableEventEntry[] + earliestEventId?: number + nextEventId: number + expiresAt: number +} + +const memoryTableStreams = new Map() + +function canUseMemoryBuffer(): boolean { + return typeof window === 'undefined' && !env.REDIS_URL +} + +function pruneExpiredMemoryStreams(now = Date.now()): void { + for (const [tableId, stream] of memoryTableStreams) { + if (stream.expiresAt <= now) { + memoryTableStreams.delete(tableId) + } + } +} + +function getMemoryStream(tableId: string): MemoryTableStream { + pruneExpiredMemoryStreams() + let stream = memoryTableStreams.get(tableId) + if (!stream) { + stream = { + events: [], + nextEventId: 1, + expiresAt: Date.now() + TABLE_EVENT_TTL_SECONDS * 1000, + } + memoryTableStreams.set(tableId, stream) + } + return stream +} + +function appendMemory(event: TableEvent): TableEventEntry { + const stream = getMemoryStream(event.tableId) + const entry: TableEventEntry = { + eventId: stream.nextEventId++, + tableId: event.tableId, + event, + } + stream.events.push(entry) + if (stream.events.length > TABLE_EVENT_CAP) { + stream.events = stream.events.slice(-TABLE_EVENT_CAP) + stream.earliestEventId = stream.events[0]?.eventId + } + stream.expiresAt = Date.now() + TABLE_EVENT_TTL_SECONDS * 1000 + return entry +} + +function readMemory(tableId: string, afterEventId: number): TableEventsReadResult { + pruneExpiredMemoryStreams() + const stream = memoryTableStreams.get(tableId) + if (!stream) { + // Mirror the Redis path: a non-zero afterEventId with no buffer at all + // means TTL expired or the stream never existed; either way the caller's + // cursor is stale. + if (afterEventId > 0) return { status: 'pruned', earliestEventId: undefined } + return { status: 'ok', events: [] } + } + if (stream.earliestEventId !== undefined && afterEventId + 1 < stream.earliestEventId) { + return { status: 'pruned', earliestEventId: stream.earliestEventId } + } + return { + status: 'ok', + events: stream.events + .filter((entry) => entry.eventId > afterEventId) + .slice(0, TABLE_EVENT_READ_CHUNK), + } +} + +/** + * Append an event to the table's buffer. Fire-and-forget from the caller — + * this never throws, returns null on failure. A Redis blip must not fail a + * cell-write. + */ +export async function appendTableEvent(event: TableEvent): Promise { + const redis = getRedisClient() + if (!redis) { + if (canUseMemoryBuffer()) { + try { + return appendMemory(event) + } catch (error) { + logger.warn('appendTableEvent: memory append failed', { + tableId: event.tableId, + error: toError(error).message, + }) + return null + } + } + return null + } + try { + // Build the entry JSON in two halves so Lua can splice the new eventId + // between them without us needing a round-trip just to mint the id first. + const tail = `,"tableId":${JSON.stringify(event.tableId)},"event":${JSON.stringify(event)}}` + const head = `{"eventId":` + const result = await redis.eval( + APPEND_EVENT_SCRIPT, + 3, + getEventsKey(event.tableId), + getSeqKey(event.tableId), + getMetaKey(event.tableId), + TABLE_EVENT_TTL_SECONDS, + TABLE_EVENT_CAP, + new Date().toISOString(), + head, + tail + ) + const eventId = typeof result === 'number' ? result : Number(result) + if (!Number.isFinite(eventId)) return null + return { eventId, tableId: event.tableId, event } + } catch (error) { + logger.warn('appendTableEvent: Redis append failed', { + tableId: event.tableId, + error: toError(error).message, + }) + return null + } +} + +/** + * Read events for a table where eventId > afterEventId. Returns 'pruned' if + * the caller has fallen off the back of the buffer (TTL expired or cap rolled + * past their lastEventId). Caller should respond by full-refetching from DB + * and resuming streaming from the new earliestEventId. + */ +export async function readTableEventsSince( + tableId: string, + afterEventId: number +): Promise { + const redis = getRedisClient() + if (!redis) { + if (canUseMemoryBuffer()) { + return readMemory(tableId, afterEventId) + } + return { status: 'unavailable', error: 'Redis client unavailable' } + } + try { + const meta = await redis.hgetall(getMetaKey(tableId)) + const earliestEventId = + meta?.earliestEventId !== undefined ? Number(meta.earliestEventId) : undefined + if (earliestEventId !== undefined && afterEventId + 1 < earliestEventId) { + return { status: 'pruned', earliestEventId } + } + // Read in capped chunks so a 5000-event backlog doesn't materialize as one + // multi-MB Redis reply + JSON parse + SSE flush. The route loop drains + // chunks across ticks. + const raw = await redis.zrangebyscore( + getEventsKey(tableId), + afterEventId + 1, + '+inf', + 'LIMIT', + 0, + TABLE_EVENT_READ_CHUNK + ) + if (raw.length === 0 && afterEventId > 0) { + // Total TTL expiry: events + meta both gone. The seq counter has the + // same TTL — its absence means the buffer was wiped and the caller's + // `afterEventId` is stale. Signal pruned so the client refetches. + const seqExists = await redis.exists(getSeqKey(tableId)) + if (seqExists === 0) { + return { status: 'pruned', earliestEventId: undefined } + } + } + return { + status: 'ok', + events: raw + .map((entry) => { + try { + return JSON.parse(entry) as TableEventEntry + } catch { + return null + } + }) + .filter((entry): entry is TableEventEntry => Boolean(entry)), + } + } catch (error) { + const message = toError(error).message + logger.warn('readTableEventsSince failed', { tableId, error: message }) + return { status: 'unavailable', error: message } + } +} diff --git a/scripts/check-api-validation-contracts.ts b/scripts/check-api-validation-contracts.ts index bbf0d0f249..3045606207 100644 --- a/scripts/check-api-validation-contracts.ts +++ b/scripts/check-api-validation-contracts.ts @@ -9,8 +9,8 @@ const QUERY_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/queries') const SELECTOR_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/selectors') const BASELINE = { - totalRoutes: 734, - zodRoutes: 734, + totalRoutes: 735, + zodRoutes: 735, nonZodRoutes: 0, } as const