Skip to content

Commit eb871fc

Browse files
feat(table): live cell updates via SSE + per-table event buffer (#4508)
* feat(table): live cell updates via SSE + per-table event buffer Replaces the polling-based row refetch with a push-based SSE stream that patches the React Query cache directly as cell-state events arrive. Architecture: - New per-table event buffer in apps/sim/lib/table/events.ts. Redis sorted-set with monotonic eventId, 1h TTL, 5000-event cap, in-memory fallback. Modeled after apps/sim/lib/execution/event-buffer.ts but stripped of complexity tables don't need (no per-execution lifecycle, no id-batching, no write queue serialization). ~150 lines instead of 700. - writeWorkflowGroupState appends a fat event after each successful 'wrote'. Status transitions carry executionId + jobId; terminal/partial transitions also include the new output values inline so the client can patch row data without a follow-up refetch. - New SSE route at /api/table/[tableId]/events/stream?from=<lastEventId>. Replays from buffer on connect, polls at 500ms (mirrors workflow execution stream), heartbeat every 15s, signals 'pruned' if the caller fell off the back of the buffer. - Client hook useTableEventStream subscribes via EventSource. Reconnect-resume with last-seen eventId. On 'pruned', invalidates the rows query and resumes from the new earliest. Cache patches walk every cached query under rowsRoot(tableId) so filter/sort variants all stay live. - Removes refetchInterval from useTableRows and the per-page polling effect from useInfiniteTableRows. React Query's refetchOnWindowFocus + refetchOnReconnect cover the durability gap if any push is dropped. Out of scope: - Bulk-cancel events (cancellation path is being redesigned separately). - Generalizing the workflow event-buffer module to a shared primitive (defer until a third use case appears; for now the table buffer is the simpler cousin of the workflow one). * fix(table): drop run-mutation refetch so SSE patches aren't overwritten useRunColumn.onSettled was canceling in-flight queries and invalidating the rows query — leftover behavior from the polling era. With the SSE stream now keeping the cache live via incremental patches, this refetch races the stream and snaps the cache back to whatever DB shows at the refetch moment, which can lag the just-arrived queued/running events. Cells appeared stuck on the optimistic 'pending' even though the SSE was delivering the real transitions. * chore(table): simplify SSE plumbing — reuse helpers, drop dead polling code - Reuse snapshotAndMutateRows for SSE cache patches instead of reimplementing the page-walk + cache-shape detection. Adds a {cancelInFlight: false} opt for the SSE caller (mutations still cancel as before). - Drop client-side type duplication in use-table-event-stream — import TableEvent and TableEventEntry from lib/table/events directly. - Drop the now-dead mergePagePreservingIdentity + rowEqual from tables.ts; their only caller was the polling effect that was removed earlier. - Drop the defensive try/catch around appendTableEvent in cell-write — the function is documented as never-throwing (returns null on failure). - Combine INCR + ZADD into one Lua eval in events.ts. Halves Redis RTT per cell-write. Lua returns the new eventId; the script splices it into the pre-built entry JSON. - Trim refs to plain let bindings inside the effect; trim stale comments referencing the old polling implementation. * fix(table): address PR review on SSE buffer - TTL-expiry silent miss: when all keys expire, hgetall(meta) returns empty so earliestEventId is undefined and the prune branch was skipped. Reconnect with non-zero afterEventId now checks the seq counter — its absence (TTL expired) signals pruned so the client refetches. Memory fallback mirrors. - Unbounded ZRANGEBYSCORE: cap reads at TABLE_EVENT_READ_CHUNK = 500 events per call. The route's 500ms poll loop drains chunks across ticks instead of flushing 5000 entries (multi-MB) in one tick after a long disconnect. - Pruned handler closes EventSource client-side: server-side close was firing onerror and routing through the 500ms backoff path. Now we close proactively, reset the reconnect attempt counter, and reconnect immediately from the new earliest. * improvement(table): persist SSE lastEventId in sessionStorage Tab refresh / navigate-away-and-back now resume the stream from where the previous mount left off instead of replaying from from=0. Mirrors the useExecutionStream pattern (saveExecutionPointer / loadExecutionPointer). First-ever mounts and new tabs still start at 0 — sessionStorage is per-tab, so the safe default applies. On a 'pruned' fallback the new earliestEventId is also persisted so the next reconnect starts there. * fix(table): include runningBlockIds + blockErrors in SSE event payload The cell renderer's 'queued' vs 'running' vs 'pending-upstream' decision reads exec.runningBlockIds + exec.blockErrors. Without those fields the inFlight branch falls through to 'pending-upstream' (amber Pending pill) even when the worker has already written status=running. The worker writes both fields to DB; the SSE event was stripping them. Thread them through events.ts → cell-write.ts → use-table-event-stream.ts. * fix(table): show value once column output has landed mid-run The cell renderer treated any `status: 'running'` event as in-flight, even when the column's own output had already been written. During a multi-block group run, partial-write events for a later block carry the earlier block's outputs but tag only the later block as running — that flipped the finished column back to the amber Pending pill until the terminal `completed` event arrived. Re-order the priority chain so the column's value wins over `pending-upstream`. Active re-run of the column itself (`blockRunning`) still wins over the stale value, so a re-run on a previously-completed cell still surfaces the running pill before the new value overwrites. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(table): address PR review nits on tables.ts - Merge duplicate JSDoc on snapshotAndMutateRows into a single block - Remove unused useQueryClient() calls from useTableRows and useInfiniteTableRows (leftover from polling-era code) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore(table): apply biome formatting fixes CI lint job flagged import order in two files and over-wrapped union in lib/table/events.ts. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6cb7796 commit eb871fc

10 files changed

Lines changed: 726 additions & 197 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import { createLogger } from '@sim/logger'
2+
import { toError } from '@sim/utils/errors'
3+
import { sleep } from '@sim/utils/helpers'
4+
import { type NextRequest, NextResponse } from 'next/server'
5+
import { tableEventStreamContract } from '@/lib/api/contracts/tables'
6+
import { parseRequest } from '@/lib/api/server'
7+
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
8+
import { generateRequestId } from '@/lib/core/utils/request'
9+
import { SSE_HEADERS } from '@/lib/core/utils/sse'
10+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
11+
import { readTableEventsSince, type TableEventEntry } from '@/lib/table/events'
12+
import { accessError, checkAccess } from '@/app/api/table/utils'
13+
14+
const logger = createLogger('TableEventStreamAPI')
15+
16+
const POLL_INTERVAL_MS = 500
17+
const HEARTBEAT_INTERVAL_MS = 15_000
18+
const MAX_STREAM_DURATION_MS = 4 * 60 * 60 * 1000 // 4 hours; client reconnects past this
19+
20+
export const runtime = 'nodejs'
21+
export const dynamic = 'force-dynamic'
22+
23+
interface RouteContext {
24+
params: Promise<{ tableId: string }>
25+
}
26+
27+
/** GET /api/table/[tableId]/events/stream?from=<lastEventId>
28+
*
29+
* SSE stream of cell-state transitions. Replay-on-reconnect via `from`.
30+
* Pruning (buffer cap exceeded or TTL expired) sends a `pruned` event and
31+
* closes; the client responds with a full row-query refetch and reconnects
32+
* from the new earliest. */
33+
export const GET = withRouteHandler(async (req: NextRequest, context: RouteContext) => {
34+
const requestId = generateRequestId()
35+
const parsed = await parseRequest(tableEventStreamContract, req, context)
36+
if (!parsed.success) return parsed.response
37+
const { tableId } = parsed.data.params
38+
const { from: fromEventId } = parsed.data.query
39+
40+
const auth = await checkSessionOrInternalAuth(req, { requireWorkflowId: false })
41+
if (!auth.success || !auth.userId) {
42+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
43+
}
44+
45+
const access = await checkAccess(tableId, auth.userId, 'read')
46+
if (!access.ok) return accessError(access, requestId, tableId)
47+
48+
logger.info(`[${requestId}] Table event stream opened`, { tableId, fromEventId })
49+
50+
const encoder = new TextEncoder()
51+
let closed = false
52+
53+
const stream = new ReadableStream<Uint8Array>({
54+
async start(controller) {
55+
let lastEventId = fromEventId
56+
const deadline = Date.now() + MAX_STREAM_DURATION_MS
57+
let nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS
58+
59+
const enqueue = (text: string) => {
60+
if (closed) return
61+
try {
62+
controller.enqueue(encoder.encode(text))
63+
} catch {
64+
closed = true
65+
}
66+
}
67+
68+
const sendEvents = (events: TableEventEntry[]) => {
69+
for (const entry of events) {
70+
if (closed) return
71+
enqueue(`data: ${JSON.stringify(entry)}\n\n`)
72+
lastEventId = entry.eventId
73+
}
74+
}
75+
76+
const sendPrunedAndClose = (earliestEventId: number | undefined) => {
77+
enqueue(
78+
`event: pruned\ndata: ${JSON.stringify({ earliestEventId: earliestEventId ?? null })}\n\n`
79+
)
80+
if (!closed) {
81+
closed = true
82+
try {
83+
controller.close()
84+
} catch {}
85+
}
86+
}
87+
88+
const sendHeartbeat = () => {
89+
// SSE comment line — keeps proxies (ALB default 60s idle) from closing
90+
// the connection during quiet periods.
91+
enqueue(`: ping ${Date.now()}\n\n`)
92+
}
93+
94+
try {
95+
// Initial replay from buffer.
96+
const initial = await readTableEventsSince(tableId, lastEventId)
97+
if (initial.status === 'pruned') {
98+
sendPrunedAndClose(initial.earliestEventId)
99+
return
100+
}
101+
if (initial.status === 'unavailable') {
102+
throw new Error(`Table event buffer unavailable: ${initial.error}`)
103+
}
104+
sendEvents(initial.events)
105+
106+
// Stream loop — poll the buffer and forward new events. Workflow
107+
// execution stream uses the same shape; pub/sub wakeups are an
108+
// optimization we can add later if 500ms polling becomes a problem.
109+
while (!closed && Date.now() < deadline) {
110+
await sleep(POLL_INTERVAL_MS)
111+
if (closed) return
112+
113+
const result = await readTableEventsSince(tableId, lastEventId)
114+
if (result.status === 'pruned') {
115+
sendPrunedAndClose(result.earliestEventId)
116+
return
117+
}
118+
if (result.status === 'unavailable') {
119+
throw new Error(`Table event buffer unavailable: ${result.error}`)
120+
}
121+
if (result.events.length > 0) {
122+
sendEvents(result.events)
123+
}
124+
125+
if (Date.now() >= nextHeartbeatAt) {
126+
sendHeartbeat()
127+
nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS
128+
}
129+
}
130+
131+
// Reached the defensive duration ceiling — close cleanly so the client
132+
// reconnects with the latest lastEventId.
133+
if (!closed) {
134+
enqueue(`event: rotate\ndata: {}\n\n`)
135+
closed = true
136+
try {
137+
controller.close()
138+
} catch {}
139+
}
140+
} catch (error) {
141+
logger.error(`[${requestId}] Table event stream error`, {
142+
tableId,
143+
error: toError(error).message,
144+
})
145+
if (!closed) {
146+
try {
147+
controller.error(error)
148+
} catch {}
149+
}
150+
}
151+
},
152+
cancel() {
153+
closed = true
154+
logger.info(`[${requestId}] Client disconnected from table event stream`, { tableId })
155+
},
156+
})
157+
158+
return new NextResponse(stream, {
159+
headers: { ...SSE_HEADERS, 'X-Table-Id': tableId },
160+
})
161+
})

apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,23 +76,27 @@ export function resolveCellRender({
7676

7777
if (blockError) return { kind: 'block-error' }
7878

79-
// In-flight wins over the existing value: when the group is being re-run,
80-
// the current value is about to be overwritten — surface the run state so
81-
// the user sees the cell is changing. Without this, a queued / running
82-
// re-run on a previously-completed cell looks like nothing happened until
83-
// the new value lands.
79+
// Active re-run of THIS column wins over its prior value — the value is
80+
// about to be overwritten and the user should see the cell is changing.
8481
const inFlight =
8582
exec?.status === 'running' || exec?.status === 'queued' || exec?.status === 'pending'
83+
if (inFlight && blockRunning) return { kind: 'running' }
84+
85+
// Value wins over `pending-upstream`: once this column's output has
86+
// landed, the cell is done from the user's perspective — even if the
87+
// group is still running other blocks downstream. Without this, mid-run
88+
// partial-write events (`status: 'running'` carrying outputs but tagging
89+
// a different block as running) would flip a finished column back to the
90+
// amber Pending pill until the terminal `completed` event arrives.
91+
if (!isNull) return { kind: 'value', text: stringifyValue(value) }
92+
8693
if (inFlight && !(groupHasBlockErrors && !blockRunning)) {
87-
if (blockRunning) return { kind: 'running' }
8894
if (exec?.status === 'queued' || exec?.status === 'pending') return { kind: 'queued' }
89-
// `running` with this block not in `runningBlockIds` = upstream block
90-
// still going; surface as the amber Pending pill per logs convention.
95+
// `running` with this block not in `runningBlockIds` and no value yet =
96+
// upstream block still going; surface as the amber Pending pill.
9197
return { kind: 'pending-upstream' }
9298
}
9399

94-
if (!isNull) return { kind: 'value', text: stringifyValue(value) }
95-
96100
// Waiting wins over a stale terminal state: if deps are unmet right now,
97101
// the prior `cancelled` / `error` is informational at best — the cell
98102
// can't actually run until the user fills the missing input. Surface the
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from './use-context-menu'
22
export * from './use-table'
3+
export * from './use-table-event-stream'

0 commit comments

Comments
 (0)