Skip to content

Commit e104c19

Browse files
authored
Merge pull request #330 from deploystackio/main
prod-release
2 parents ff2149d + f51d98e commit e104c19

27 files changed

+6488
-190
lines changed

development/backend/api/sse.mdx

Lines changed: 108 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -202,30 +202,82 @@ export default async function sseRoute(server: FastifyInstance) {
202202

203203
## Polling Pattern with Async Operations
204204

205-
When using `setInterval` with async database queries or API calls, you **must** check connection state **after** the async operation completes to prevent crashes:
205+
When using `setInterval` with async database queries or API calls, you **must** check connection state **after** the async operation completes to prevent crashes.
206+
207+
### Critical: Timestamp Tracking with Array Mutations
208+
209+
**⚠️ COMMON BUG**: When polling for new records and reversing arrays for chronological order, you **must** capture the newest timestamp **before** reversing the array:
210+
211+
```typescript
212+
// ❌ WRONG: Captures oldest timestamp after reverse
213+
const newItems = await db.query().orderBy(desc(created_at))
214+
for (const item of newItems.reverse()) { // reverse() MUTATES the array
215+
reply.sse.send({ event: 'item', data: item })
216+
}
217+
lastSentTimestamp = newItems[0].created_at // BUG: Now points to OLDEST item!
218+
219+
// ✅ CORRECT: Capture newest timestamp before reversing
220+
const newItems = await db.query().orderBy(desc(created_at))
221+
const newestTimestamp = newItems[0].created_at // Capture FIRST (newest)
222+
for (const item of newItems.reverse()) {
223+
reply.sse.send({ event: 'item', data: item })
224+
}
225+
lastSentTimestamp = newestTimestamp // Use captured newest timestamp
226+
```
227+
228+
**Why this matters:**
229+
- Query returns items in **descending order** (newest first): `[newest, ..., oldest]`
230+
- `array.reverse()` **mutates** the array: `[oldest, ..., newest]`
231+
- After reversal, `array[0]` points to the **oldest** item
232+
- Using `array[0].created_at` after reversal sets `lastSentTimestamp` to the oldest timestamp
233+
- Next poll finds the same items again → **infinite duplicate stream**
234+
235+
**The Fix:**
236+
Always capture the newest timestamp from `array[0]` **before** calling `reverse()`.
237+
238+
### Complete Polling Pattern Example
206239

207240
```typescript
241+
import { type FastifyInstance } from 'fastify'
242+
import { getDb, getSchema } from '../../../db'
243+
import { eq, and, desc, gt } from 'drizzle-orm'
244+
208245
export default async function pollingStreamRoute(server: FastifyInstance) {
209-
server.get('/metrics/stream', {
246+
server.get('/logs/stream', {
210247
sse: true,
211-
preValidation: requirePermission('metrics.view'),
248+
preValidation: requirePermission('logs.view'),
212249
schema: {
213-
tags: ['Metrics'],
214-
summary: 'Stream metrics via SSE with polling',
250+
tags: ['Logs'],
251+
summary: 'Stream logs via SSE with polling',
215252
security: [{ cookieAuth: [] }]
216253
}
217254
}, async (request, reply) => {
218255
const userId = request.user!.id
219256
let pollInterval: NodeJS.Timeout | null = null
220257

258+
const db = getDb()
259+
const { logs } = getSchema()
260+
221261
// Keep connection open
222262
reply.sse.keepAlive()
223263

224264
// Send initial snapshot
225-
const initialData = await fetchMetrics(userId)
226-
reply.sse.send({ event: 'snapshot', data: initialData })
265+
const initialLogs = await db
266+
.select()
267+
.from(logs)
268+
.where(eq(logs.user_id, userId))
269+
.orderBy(desc(logs.created_at))
270+
.limit(50)
271+
272+
reply.sse.send({
273+
event: 'snapshot',
274+
data: { logs: initialLogs }
275+
})
276+
277+
// Track last sent timestamp for polling
278+
let lastSentTimestamp = initialLogs[0]?.created_at || new Date(0)
227279

228-
// Poll for updates every 3 seconds
280+
// Poll for new logs every 3 seconds
229281
pollInterval = setInterval(async () => {
230282
// Check #1: Before starting async work
231283
if (!reply.sse.isConnected) {
@@ -234,25 +286,46 @@ export default async function pollingStreamRoute(server: FastifyInstance) {
234286
}
235287

236288
try {
237-
// Async database query or API call
238-
const data = await fetchMetrics(userId)
289+
// Query for new logs (newest first)
290+
const newLogs = await db
291+
.select()
292+
.from(logs)
293+
.where(and(
294+
eq(logs.user_id, userId),
295+
gt(logs.created_at, lastSentTimestamp)
296+
))
297+
.orderBy(desc(logs.created_at))
298+
.limit(100)
239299

240300
// ⚠️ CRITICAL: Check #2 after async operation completes
241301
if (!reply.sse.isConnected) {
242302
if (pollInterval) clearInterval(pollInterval)
243303
return
244304
}
245305

246-
// If sending multiple items in a loop, check before each send
247-
for (const item of data) {
248-
if (!reply.sse.isConnected) {
249-
if (pollInterval) clearInterval(pollInterval)
250-
return
306+
if (newLogs.length > 0) {
307+
// ⚠️ CRITICAL: Capture newest timestamp BEFORE reversing
308+
const newestTimestamp = newLogs[0].created_at
309+
310+
// Send logs in chronological order (oldest first)
311+
for (const log of newLogs.reverse()) {
312+
// Check before each send
313+
if (!reply.sse.isConnected) {
314+
if (pollInterval) clearInterval(pollInterval)
315+
return
316+
}
317+
reply.sse.send({
318+
id: log.id,
319+
event: 'log',
320+
data: log
321+
})
251322
}
252-
reply.sse.send({ event: 'update', data: item })
323+
324+
// Update to newest timestamp (captured before reversal)
325+
lastSentTimestamp = newestTimestamp
253326
}
254327
} catch (error) {
255-
server.log.error(error, 'Failed to fetch metrics')
328+
server.log.error(error, 'Failed to poll for new logs')
256329
// Don't crash - just log the error
257330
}
258331
}, 3000)
@@ -263,7 +336,7 @@ export default async function pollingStreamRoute(server: FastifyInstance) {
263336
clearInterval(pollInterval)
264337
pollInterval = null
265338
}
266-
server.log.debug({ userId }, 'Metrics stream closed')
339+
server.log.debug({ userId }, 'Log stream closed')
267340
})
268341
})
269342
}
@@ -289,6 +362,23 @@ Time 100ms: Query completes → Check #2 ✅ (disconnected) → Return early ✅
289362

290363
The client can disconnect **during** async operations, so checking only at the start of the interval is insufficient.
291364

365+
### Key Takeaways for Polling Patterns
366+
367+
When implementing SSE with polling:
368+
369+
1. **✅ Always check `reply.sse.isConnected` after async operations** - Client can disconnect during queries
370+
2. **✅ Capture timestamps before array mutations** - `array.reverse()` mutates the array
371+
3. **✅ Use `gt()` (greater than) for timestamp filtering** - Prevents re-sending same items
372+
4. **✅ Clear interval on disconnect** - Prevent memory leaks and unnecessary queries
373+
5. **✅ Send in chronological order** - Oldest first for natural reading experience
374+
6. **✅ Wrap polling logic in try-catch** - Don't crash on database errors
375+
376+
**Common Pitfalls:**
377+
- ❌ Using `lastSentTimestamp = array[0]` after `array.reverse()`
378+
- ❌ Only checking `isConnected` before async operations
379+
- ❌ Forgetting to clear interval in `onClose` handler
380+
- ❌ Not handling database errors gracefully
381+
292382
## Frontend Client
293383

294384
```javascript

0 commit comments

Comments
 (0)