Skip to content

Commit df00f07

Browse files
feat(table): live cell updates via SSE + per-table event buffer
Replaces the polling-based row refetch with a push-based SSE stream that patches the React Query cache directly as cell-state events arrive. Architecture: - New per-table event buffer in apps/sim/lib/table/events.ts. Redis sorted-set with monotonic eventId, 1h TTL, 5000-event cap, in-memory fallback. Modeled after apps/sim/lib/execution/event-buffer.ts but stripped of complexity tables don't need (no per-execution lifecycle, no id-batching, no write queue serialization). ~150 lines instead of 700. - writeWorkflowGroupState appends a fat event after each successful 'wrote'. Status transitions carry executionId + jobId; terminal/partial transitions also include the new output values inline so the client can patch row data without a follow-up refetch. - New SSE route at /api/table/[tableId]/events/stream?from=<lastEventId>. Replays from buffer on connect, polls at 500ms (mirrors workflow execution stream), heartbeat every 15s, signals 'pruned' if the caller fell off the back of the buffer. - Client hook useTableEventStream subscribes via EventSource. Reconnect-resume with last-seen eventId. On 'pruned', invalidates the rows query and resumes from the new earliest. Cache patches walk every cached query under rowsRoot(tableId) so filter/sort variants all stay live. - Removes refetchInterval from useTableRows and the per-page polling effect from useInfiniteTableRows. React Query's refetchOnWindowFocus + refetchOnReconnect cover the durability gap if any push is dropped. Out of scope: - Bulk-cancel events (cancellation path is being redesigned separately). - Generalizing the workflow event-buffer module to a shared primitive (defer until a third use case appears; for now the table buffer is the simpler cousin of the workflow one).
1 parent 408669d commit df00f07

9 files changed

Lines changed: 742 additions & 98 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import { createLogger } from '@sim/logger'
2+
import { toError } from '@sim/utils/errors'
3+
import { sleep } from '@sim/utils/helpers'
4+
import { type NextRequest, NextResponse } from 'next/server'
5+
import { tableEventStreamContract } from '@/lib/api/contracts/tables'
6+
import { parseRequest } from '@/lib/api/server'
7+
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
8+
import { SSE_HEADERS } from '@/lib/core/utils/sse'
9+
import { generateRequestId } from '@/lib/core/utils/request'
10+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
11+
import { readTableEventsSince, type TableEventEntry } from '@/lib/table/events'
12+
import { accessError, checkAccess } from '@/app/api/table/utils'
13+
14+
const logger = createLogger('TableEventStreamAPI')
15+
16+
const POLL_INTERVAL_MS = 500
17+
const HEARTBEAT_INTERVAL_MS = 15_000
18+
const MAX_STREAM_DURATION_MS = 4 * 60 * 60 * 1000 // 4 hours; client reconnects past this
19+
20+
export const runtime = 'nodejs'
21+
export const dynamic = 'force-dynamic'
22+
23+
interface RouteContext {
24+
params: Promise<{ tableId: string }>
25+
}
26+
27+
/** GET /api/table/[tableId]/events/stream?from=<lastEventId>
28+
*
29+
* SSE stream of cell-state transitions. Replay-on-reconnect via `from`.
30+
* Pruning (buffer cap exceeded or TTL expired) sends a `pruned` event and
31+
* closes; the client responds with a full row-query refetch and reconnects
32+
* from the new earliest. */
33+
export const GET = withRouteHandler(async (req: NextRequest, context: RouteContext) => {
34+
const requestId = generateRequestId()
35+
const parsed = await parseRequest(tableEventStreamContract, req, context)
36+
if (!parsed.success) return parsed.response
37+
const { tableId } = parsed.data.params
38+
const { from: fromEventId } = parsed.data.query
39+
40+
const auth = await checkSessionOrInternalAuth(req, { requireWorkflowId: false })
41+
if (!auth.success || !auth.userId) {
42+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
43+
}
44+
45+
const access = await checkAccess(tableId, auth.userId, 'read')
46+
if (!access.ok) return accessError(access, requestId, tableId)
47+
48+
logger.info(`[${requestId}] Table event stream opened`, { tableId, fromEventId })
49+
50+
const encoder = new TextEncoder()
51+
let closed = false
52+
53+
const stream = new ReadableStream<Uint8Array>({
54+
async start(controller) {
55+
let lastEventId = fromEventId
56+
const deadline = Date.now() + MAX_STREAM_DURATION_MS
57+
let nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS
58+
59+
const enqueue = (text: string) => {
60+
if (closed) return
61+
try {
62+
controller.enqueue(encoder.encode(text))
63+
} catch {
64+
closed = true
65+
}
66+
}
67+
68+
const sendEvents = (events: TableEventEntry[]) => {
69+
for (const entry of events) {
70+
if (closed) return
71+
enqueue(`data: ${JSON.stringify(entry)}\n\n`)
72+
lastEventId = entry.eventId
73+
}
74+
}
75+
76+
const sendPrunedAndClose = (earliestEventId: number | undefined) => {
77+
enqueue(
78+
`event: pruned\ndata: ${JSON.stringify({ earliestEventId: earliestEventId ?? null })}\n\n`
79+
)
80+
if (!closed) {
81+
closed = true
82+
try {
83+
controller.close()
84+
} catch {}
85+
}
86+
}
87+
88+
const sendHeartbeat = () => {
89+
// SSE comment line — keeps proxies (ALB default 60s idle) from closing
90+
// the connection during quiet periods.
91+
enqueue(`: ping ${Date.now()}\n\n`)
92+
}
93+
94+
try {
95+
// Initial replay from buffer.
96+
const initial = await readTableEventsSince(tableId, lastEventId)
97+
if (initial.status === 'pruned') {
98+
sendPrunedAndClose(initial.earliestEventId)
99+
return
100+
}
101+
if (initial.status === 'unavailable') {
102+
throw new Error(`Table event buffer unavailable: ${initial.error}`)
103+
}
104+
sendEvents(initial.events)
105+
106+
// Stream loop — poll the buffer and forward new events. Workflow
107+
// execution stream uses the same shape; pub/sub wakeups are an
108+
// optimization we can add later if 500ms polling becomes a problem.
109+
while (!closed && Date.now() < deadline) {
110+
await sleep(POLL_INTERVAL_MS)
111+
if (closed) return
112+
113+
const result = await readTableEventsSince(tableId, lastEventId)
114+
if (result.status === 'pruned') {
115+
sendPrunedAndClose(result.earliestEventId)
116+
return
117+
}
118+
if (result.status === 'unavailable') {
119+
throw new Error(`Table event buffer unavailable: ${result.error}`)
120+
}
121+
if (result.events.length > 0) {
122+
sendEvents(result.events)
123+
}
124+
125+
if (Date.now() >= nextHeartbeatAt) {
126+
sendHeartbeat()
127+
nextHeartbeatAt = Date.now() + HEARTBEAT_INTERVAL_MS
128+
}
129+
}
130+
131+
// Reached the defensive duration ceiling — close cleanly so the client
132+
// reconnects with the latest lastEventId.
133+
if (!closed) {
134+
enqueue(`event: rotate\ndata: {}\n\n`)
135+
closed = true
136+
try {
137+
controller.close()
138+
} catch {}
139+
}
140+
} catch (error) {
141+
logger.error(`[${requestId}] Table event stream error`, {
142+
tableId,
143+
error: toError(error).message,
144+
})
145+
if (!closed) {
146+
try {
147+
controller.error(error)
148+
} catch {}
149+
}
150+
}
151+
},
152+
cancel() {
153+
closed = true
154+
logger.info(`[${requestId}] Client disconnected from table event stream`, { tableId })
155+
},
156+
})
157+
158+
return new NextResponse(stream, {
159+
headers: { ...SSE_HEADERS, 'X-Table-Id': tableId },
160+
})
161+
})
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from './use-context-menu'
22
export * from './use-table'
3+
export * from './use-table-event-stream'

0 commit comments

Comments
 (0)