From f2d30cdc6bd49a735bae7c46faa42742506d19d7 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 26 Feb 2026 15:58:14 +0100 Subject: [PATCH 1/5] feat: add SSE for live block updates Add Server-Sent Events endpoint (GET /api/events) that streams new blocks to the frontend in real-time, replacing polling for block height and blocks list updates. Backend: - New SSE handler polls DB every 200ms, emits one event per block in order - SSE route excluded from 10s TimeoutLayer so connections stay alive - Nginx configured to proxy SSE with buffering disabled Frontend: - useBlockSSE hook buffers events in a ref-based queue and drains at the chain's natural block rate (computed from on-chain timestamps) - Navbar counter updates instantly via SSE, block time derived from 30s rolling window of block timestamps for stability - BlocksPage prepends new blocks from SSE on page 1 with auto-refresh - Polling falls back automatically when SSE disconnects Closes #11 --- backend/Cargo.toml | 5 + backend/crates/atlas-api/Cargo.toml | 3 + backend/crates/atlas-api/src/handlers/mod.rs | 1 + backend/crates/atlas-api/src/handlers/sse.rs | 108 ++++++++++ backend/crates/atlas-api/src/main.rs | 12 +- docker-compose.yml | 2 + frontend/nginx.conf | 13 ++ frontend/src/components/Layout.tsx | 27 ++- frontend/src/components/SmoothCounter.tsx | 5 +- frontend/src/context/BlockStatsContext.tsx | 10 +- frontend/src/hooks/useBlockSSE.ts | 197 +++++++++++++++++++ frontend/src/hooks/useLatestBlockHeight.ts | 80 +++++--- frontend/src/pages/BlocksPage.tsx | 41 +++- 13 files changed, 462 insertions(+), 42 deletions(-) create mode 100644 backend/crates/atlas-api/src/handlers/sse.rs create mode 100644 frontend/src/hooks/useBlockSSE.ts diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 3ed4006..fb250d6 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -44,6 +44,11 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } # Config dotenvy = "0.15" +# Streaming +tokio-stream = "0.1" +futures = "0.3" +async-stream = "0.3" + # Utilities bigdecimal = { version = "0.4", features = ["serde"] } hex = "0.4" diff --git a/backend/crates/atlas-api/Cargo.toml b/backend/crates/atlas-api/Cargo.toml index b348715..2e37231 100644 --- a/backend/crates/atlas-api/Cargo.toml +++ b/backend/crates/atlas-api/Cargo.toml @@ -25,3 +25,6 @@ bigdecimal = { workspace = true } hex = { workspace = true } chrono = { workspace = true } reqwest = { workspace = true } +tokio-stream = { workspace = true } +futures = { workspace = true } +async-stream = { workspace = true } diff --git a/backend/crates/atlas-api/src/handlers/mod.rs b/backend/crates/atlas-api/src/handlers/mod.rs index 3bd5818..769a4e2 100644 --- a/backend/crates/atlas-api/src/handlers/mod.rs +++ b/backend/crates/atlas-api/src/handlers/mod.rs @@ -8,6 +8,7 @@ pub mod logs; pub mod nfts; pub mod proxy; pub mod search; +pub mod sse; pub mod status; pub mod tokens; pub mod transactions; diff --git a/backend/crates/atlas-api/src/handlers/sse.rs b/backend/crates/atlas-api/src/handlers/sse.rs new file mode 100644 index 0000000..94dac03 --- /dev/null +++ b/backend/crates/atlas-api/src/handlers/sse.rs @@ -0,0 +1,108 @@ +use axum::{ + extract::State, + response::sse::{Event, Sse}, +}; +use futures::stream::Stream; +use serde::Serialize; +use std::convert::Infallible; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::interval; + +use crate::AppState; +use atlas_common::Block; + +#[derive(Serialize)] +struct NewBlockEvent { + block: Block, +} + +/// GET /api/events — Server-Sent Events stream for live block updates. +/// Polls the DB every 200ms and emits one `new_block` event per block, in order. +/// Never skips blocks — fetches all blocks since the last one sent. +pub async fn block_events( + State(state): State>, +) -> Sse>> { + let stream = async_stream::stream! { + let mut last_block_number: Option = None; + let mut tick = interval(Duration::from_millis(200)); + let mut ping_counter: u32 = 0; + + loop { + tick.tick().await; + ping_counter += 1; + + // On first tick, seed with the latest block number + if last_block_number.is_none() { + let latest: Option<(i64,)> = sqlx::query_as( + "SELECT MAX(number) FROM blocks" + ) + .fetch_optional(&state.pool) + .await + .ok() + .flatten(); + + if let Some((max_num,)) = latest { + last_block_number = Some(max_num); + // Emit the current latest block as the initial event + let block: Option = sqlx::query_as( + "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at + FROM blocks WHERE number = $1" + ) + .bind(max_num) + .fetch_optional(&state.pool) + .await + .ok() + .flatten(); + + if let Some(block) = block { + let event = NewBlockEvent { block }; + if let Ok(json) = serde_json::to_string(&event) { + yield Ok(Event::default().event("new_block").data(json)); + } + } + ping_counter = 0; + } + continue; + } + + let cursor = last_block_number.unwrap(); + + // Fetch ALL new blocks since last sent, in ascending order + let new_blocks: Vec = sqlx::query_as( + "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at + FROM blocks WHERE number > $1 ORDER BY number ASC" + ) + .bind(cursor) + .fetch_all(&state.pool) + .await + .unwrap_or_default(); + + if !new_blocks.is_empty() { + ping_counter = 0; + } + + // Emit one event per block, in order + for block in new_blocks { + last_block_number = Some(block.number); + + let event = NewBlockEvent { block }; + if let Ok(json) = serde_json::to_string(&event) { + yield Ok(Event::default().event("new_block").data(json)); + } + } + + // Send keep-alive ping every ~15s (75 ticks * 200ms) + if ping_counter >= 75 { + ping_counter = 0; + yield Ok(Event::default().comment("keep-alive")); + } + } + }; + + Sse::new(stream).keep_alive( + axum::response::sse::KeepAlive::new() + .interval(Duration::from_secs(15)) + .text("keep-alive"), + ) +} diff --git a/backend/crates/atlas-api/src/main.rs b/backend/crates/atlas-api/src/main.rs index 3e471f0..e78ed9e 100644 --- a/backend/crates/atlas-api/src/main.rs +++ b/backend/crates/atlas-api/src/main.rs @@ -60,6 +60,11 @@ async fn main() -> Result<()> { admin_api_key, }); + // SSE route — excluded from TimeoutLayer so connections stay alive + let sse_routes = Router::new() + .route("/api/events", get(handlers::sse::block_events)) + .with_state(state.clone()); + // Build router let app = Router::new() // Blocks @@ -215,14 +220,17 @@ async fn main() -> Result<()> { axum::http::StatusCode::REQUEST_TIMEOUT, Duration::from_secs(10), )) + .with_state(state) + // Merge SSE routes (no TimeoutLayer so connections stay alive) + .merge(sse_routes) + // Shared layers applied to all routes .layer( CorsLayer::new() .allow_origin(Any) .allow_methods(Any) .allow_headers(Any), ) - .layer(TraceLayer::new_for_http()) - .with_state(state); + .layer(TraceLayer::new_for_http()); let addr = format!("{}:{}", host, port); tracing::info!("Listening on {}", addr); diff --git a/docker-compose.yml b/docker-compose.yml index 13901ed..231dbe5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -66,3 +66,5 @@ services: volumes: pgdata: + external: true + name: atlas_pgdata diff --git a/frontend/nginx.conf b/frontend/nginx.conf index d173d92..d753b3b 100644 --- a/frontend/nginx.conf +++ b/frontend/nginx.conf @@ -9,6 +9,19 @@ server { try_files $uri $uri/ /index.html; } + # SSE endpoint — disable buffering so events stream through immediately + location /api/events { + proxy_pass http://atlas-api:3000/api/events; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header Connection ''; + proxy_http_version 1.1; + chunked_transfer_encoding off; + proxy_buffering off; + proxy_cache off; + } + # Proxy API requests to atlas-api service location /api/ { proxy_pass http://atlas-api:3000/api/; diff --git a/frontend/src/components/Layout.tsx b/frontend/src/components/Layout.tsx index 39642e1..af574d6 100644 --- a/frontend/src/components/Layout.tsx +++ b/frontend/src/components/Layout.tsx @@ -2,6 +2,7 @@ import { Link, NavLink, Outlet, useLocation } from 'react-router-dom'; import { useEffect, useMemo, useRef, useState } from 'react'; import SearchBar from './SearchBar'; import useLatestBlockHeight from '../hooks/useLatestBlockHeight'; +import useBlockSSE from '../hooks/useBlockSSE'; import SmoothCounter from './SmoothCounter'; import logoImg from '../assets/logo.png'; import { BlockStatsContext } from '../context/BlockStatsContext'; @@ -10,7 +11,8 @@ import { useTheme } from '../hooks/useTheme'; export default function Layout() { const location = useLocation(); const isHome = location.pathname === '/'; - const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, 1000000); + const sse = useBlockSSE(); + const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, 1000000, sse.height, sse.connected, sse.bps); const [now, setNow] = useState(() => Date.now()); const recentlyUpdated = lastUpdatedAt ? (now - lastUpdatedAt) < 10000 : false; const [displayedHeight, setDisplayedHeight] = useState(null); @@ -25,7 +27,9 @@ export default function Layout() { return () => window.clearInterval(id); }, []); - // Smoothly increment displayed height using bps + // Update displayed height + // When SSE is connected: show exact height from SSE (increments one-by-one) + // When polling: use bps prediction to smooth between poll intervals useEffect(() => { if (height == null) { if (displayRafRef.current !== null) { @@ -52,9 +56,16 @@ export default function Layout() { }); } + // When SSE is connected, just track the real height directly — no prediction + if (sse.connected) { + displayedRef.current = height; + setDisplayedHeight(height); + return; + } + + // Polling mode: use bps prediction to smooth between poll intervals const loop = (t: number) => { if (!bps || bps <= 0) { - // No rate info; just stick to the last known real height if (displayedRef.current !== height) { displayedRef.current = height; setDisplayedHeight(displayedRef.current); @@ -64,9 +75,7 @@ export default function Layout() { const dt = lastFrameRef.current ? (now - lastFrameRef.current) / 1000 : 0; lastFrameRef.current = now; - // Increase predicted height smoothly by bps * dt const predicted = displayedRef.current + bps * dt; - // Always at least the latest known chain height const next = Math.max(height, Math.floor(predicted)); if (next !== displayedRef.current) { displayedRef.current = next; @@ -84,7 +93,7 @@ export default function Layout() { displayRafRef.current = null; lastFrameRef.current = 0; }; - }, [height, bps]); + }, [height, bps, sse.connected]); const blockTimeLabel = useMemo(() => { if (bps !== null && bps > 0) { const secs = 1 / bps; @@ -175,8 +184,8 @@ export default function Layout() {
| @@ -259,7 +268,7 @@ export default function Layout() { {/* Main content */}
- +
diff --git a/frontend/src/components/SmoothCounter.tsx b/frontend/src/components/SmoothCounter.tsx index bfdb6d4..deb2a68 100644 --- a/frontend/src/components/SmoothCounter.tsx +++ b/frontend/src/components/SmoothCounter.tsx @@ -5,10 +5,7 @@ interface SmoothCounterProps { export default function SmoothCounter({ value, className = '' }: SmoothCounterProps) { const text = value !== null ? new Intl.NumberFormat('en-US').format(Math.floor(value)) : '—'; - // Key on value so the animation restarts on change return ( - - {text} - + {text} ); } diff --git a/frontend/src/context/BlockStatsContext.tsx b/frontend/src/context/BlockStatsContext.tsx index beff79d..e07a305 100644 --- a/frontend/src/context/BlockStatsContext.tsx +++ b/frontend/src/context/BlockStatsContext.tsx @@ -1,9 +1,17 @@ import { createContext } from 'react'; +import type { NewBlockEvent } from '../hooks/useBlockSSE'; export interface BlockStats { bps: number | null; height: number | null; + latestBlockEvent: NewBlockEvent | null; + sseConnected: boolean; } -export const BlockStatsContext = createContext({ bps: null, height: null }); +export const BlockStatsContext = createContext({ + bps: null, + height: null, + latestBlockEvent: null, + sseConnected: false, +}); diff --git a/frontend/src/hooks/useBlockSSE.ts b/frontend/src/hooks/useBlockSSE.ts new file mode 100644 index 0000000..9d518a3 --- /dev/null +++ b/frontend/src/hooks/useBlockSSE.ts @@ -0,0 +1,197 @@ +import { useCallback, useEffect, useRef, useState } from 'react'; +import type { Block } from '../types'; + +const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api'; + +export interface NewBlockEvent { + block: Block; +} + +export interface BlockSSEState { + latestBlock: NewBlockEvent | null; + height: number | null; + connected: boolean; + error: string | null; + bps: number | null; +} + +/** + * Connects to the SSE endpoint and delivers block events one-by-one. + * + * SSE events arrive in bursts (the backend polls every 200ms and yields all + * new blocks at once). React batching would collapse rapid setState calls, + * so we buffer into a ref-based queue and drain one at a time. + * + * The drain interval is derived from the chain's true block time, computed + * from on-chain block timestamps (not arrival/indexed times). We collect a + * rolling window of (blockNumber, blockTimestamp) samples and compute + * bps = deltaBlocks / deltaTimestamp. Since timestamps have second-level + * granularity, we require at least 2 seconds of block-time span for accuracy. + * This makes the visual cadence match the chain's actual speed, regardless of + * indexer lag, network batching, or SSE delivery timing. + */ +export default function useBlockSSE(): BlockSSEState { + const [latestBlock, setLatestBlock] = useState(null); + const [height, setHeight] = useState(null); + const [connected, setConnected] = useState(false); + const [error, setError] = useState(null); + const [bps, setBps] = useState(null); + const esRef = useRef(null); + const reconnectTimeoutRef = useRef(null); + const queueRef = useRef([]); + const drainTimerRef = useRef(null); + // Rolling window of (blockNumber, blockTimestamp) for chain-rate calculation. + // We keep up to 500 samples (~45s at 11 bps) and use two windows: + // - 30s of block-time for the displayed bps (very stable) + // - 10s of block-time for drain pacing (responsive enough to adapt) + const blockLogRef = useRef<{ num: number; ts: number }[]>([]); + // Cached drain interval in ms, derived from chain block timestamps + const drainIntervalRef = useRef(90); // initial guess ~11 bps + + const connect = useCallback(() => { + if (esRef.current) { + esRef.current.close(); + } + + const url = `${API_BASE_URL}/events`; + const es = new EventSource(url); + esRef.current = es; + + es.onopen = () => { + setConnected(true); + setError(null); + }; + + es.addEventListener('new_block', (e: MessageEvent) => { + try { + const data: NewBlockEvent = JSON.parse(e.data); + + // Log block number + on-chain timestamp for true chain-rate calculation + blockLogRef.current.push({ + num: data.block.number, + ts: data.block.timestamp, // unix seconds from the chain + }); + + // Keep last 500 samples (~45s at 11 bps) + if (blockLogRef.current.length > 500) { + blockLogRef.current = blockLogRef.current.slice(-500); + } + + // Recalculate bps and drain interval from chain timestamps. + const log = blockLogRef.current; + const newest = log[log.length - 1]; + + // Displayed bps: use 30s window for maximum stability + for (let i = 0; i < log.length - 1; i++) { + const span = newest.ts - log[i].ts; + if (span >= 30) { + const chainBps = (newest.num - log[i].num) / span; + setBps(chainBps); + break; + } + // If we don't have 30s yet, use whatever we have (≥5s) + if (i === 0 && span >= 5) { + const chainBps = (newest.num - log[0].num) / span; + setBps(chainBps); + } + } + + // Drain pacing: use 10s window for moderate responsiveness + for (let i = 0; i < log.length - 1; i++) { + const span = newest.ts - log[i].ts; + if (span >= 10) { + const drainBps = (newest.num - log[i].num) / span; + drainIntervalRef.current = Math.max(30, Math.min(500, 1000 / drainBps)); + break; + } + // Bootstrap: use ≥2s while building up + if (i === 0 && span >= 2) { + const drainBps = (newest.num - log[0].num) / span; + drainIntervalRef.current = Math.max(30, Math.min(500, 1000 / drainBps)); + } + } + + // Push to ref queue — synchronous, never lost by React batching + queueRef.current.push(data); + } catch { + // Ignore malformed events + } + }); + + es.onerror = () => { + setConnected(false); + es.close(); + esRef.current = null; + + reconnectTimeoutRef.current = window.setTimeout(() => { + reconnectTimeoutRef.current = null; + connect(); + }, 2000); + }; + }, []); + + // Drain loop: emit one block per tick at the chain's natural cadence. + useEffect(() => { + let running = true; + + const drain = () => { + if (!running) return; + const queue = queueRef.current; + + if (queue.length === 0) { + // Nothing to drain — check again soon + drainTimerRef.current = window.setTimeout(drain, 30); + return; + } + + // If queue is backing up (> 50 items), skip to near the end + if (queue.length > 50) { + const skip = queue.splice(0, queue.length - 5); + const lastSkipped = skip[skip.length - 1]; + setHeight(lastSkipped.block.number); + } + + const next = queue.shift()!; + setLatestBlock(next); + setHeight(next.block.number); + + // Use the chain-rate interval, but if queue is growing speed up gently + let interval = drainIntervalRef.current; + if (queue.length > 5) { + interval = interval * 0.7; + } + drainTimerRef.current = window.setTimeout(drain, interval); + }; + + drainTimerRef.current = window.setTimeout(drain, 30); + + return () => { + running = false; + if (drainTimerRef.current !== null) { + clearTimeout(drainTimerRef.current); + drainTimerRef.current = null; + } + }; + }, []); + + useEffect(() => { + connect(); + + return () => { + if (esRef.current) { + esRef.current.close(); + esRef.current = null; + } + if (reconnectTimeoutRef.current !== null) { + clearTimeout(reconnectTimeoutRef.current); + reconnectTimeoutRef.current = null; + } + if (drainTimerRef.current !== null) { + clearTimeout(drainTimerRef.current); + drainTimerRef.current = null; + } + }; + }, [connect]); + + return { latestBlock, height, connected, error, bps }; +} diff --git a/frontend/src/hooks/useLatestBlockHeight.ts b/frontend/src/hooks/useLatestBlockHeight.ts index afb6295..52b5c21 100644 --- a/frontend/src/hooks/useLatestBlockHeight.ts +++ b/frontend/src/hooks/useLatestBlockHeight.ts @@ -9,7 +9,18 @@ export interface LatestHeightState { bps: number | null; } -export default function useLatestBlockHeight(pollMs = 2000, _windowBlocks = 1000000): LatestHeightState { +/** + * Tracks the latest block height and computes blocks-per-second (bps). + * When SSE is connected, uses sseBps (derived from on-chain block timestamps) + * for a stable block-time display. Falls back to wall-clock EMA when polling. + */ +export default function useLatestBlockHeight( + pollMs = 2000, + _windowBlocks = 1000000, + sseHeight: number | null = null, + sseConnected = false, + sseBps: number | null = null, +): LatestHeightState { void _windowBlocks; const [height, setHeight] = useState(null); const heightRef = useRef(null); @@ -21,32 +32,54 @@ export default function useLatestBlockHeight(pollMs = 2000, _windowBlocks = 1000 const prevSampleRef = useRef<{ h: number; t: number } | null>(null); const alphaRef = useRef(0.25); // smoothing factor for EMA + // When SSE provides bps from block timestamps, use it directly + useEffect(() => { + if (sseConnected && sseBps != null) { + setBps(sseBps); + } + }, [sseConnected, sseBps]); + + // Process a new height value (from either SSE or polling) + const processHeight = useCallback((latestHeight: number, fromSSE: boolean) => { + const now = Date.now(); + if (latestHeight !== heightRef.current) { + heightRef.current = latestHeight; + setHeight(latestHeight); + setLastUpdatedAt(now); + } + // Only compute wall-clock EMA bps when polling (not SSE) + if (!fromSSE) { + const prev = prevSampleRef.current; + const curr = { h: latestHeight, t: now }; + if (prev && curr.t > prev.t && curr.h >= prev.h) { + const dh = curr.h - prev.h; + const dt = (curr.t - prev.t) / 1000; + const inst = dt > 0 ? dh / dt : 0; + const alpha = alphaRef.current; + setBps((prevBps) => (prevBps == null ? inst : prevBps + alpha * (inst - prevBps))); + } + prevSampleRef.current = curr; + } + setError(null); + setLoading(false); + }, []); + + // Handle SSE height updates + useEffect(() => { + if (sseHeight != null) { + processHeight(sseHeight, true); + } + }, [sseHeight, processHeight]); + + // HTTP polling fallback — only active when SSE is not connected const fetchHeight = useCallback(async () => { if (fetchingRef.current) return; fetchingRef.current = true; try { const status = await getStatus(); - const now = Date.now(); const latestHeight = status?.block_height; if (typeof latestHeight === 'number') { - if (latestHeight !== heightRef.current) { - // Update height and time of update - heightRef.current = latestHeight; - setHeight(latestHeight); - setLastUpdatedAt(now); - } - // Update bps using wall-clock deltas between status samples - const prev = prevSampleRef.current; - const curr = { h: latestHeight, t: now }; - if (prev && curr.t > prev.t && curr.h >= prev.h) { - const dh = curr.h - prev.h; - const dt = (curr.t - prev.t) / 1000; - const inst = dt > 0 ? dh / dt : 0; - const alpha = alphaRef.current; - setBps((prevBps) => (prevBps == null ? inst : prevBps + alpha * (inst - prevBps))); - } - prevSampleRef.current = curr; - setError(null); + processHeight(latestHeight, false); } else { setHeight(null); } @@ -56,13 +89,16 @@ export default function useLatestBlockHeight(pollMs = 2000, _windowBlocks = 1000 setLoading(false); fetchingRef.current = false; } - }, []); + }, [processHeight]); useEffect(() => { + // Skip polling when SSE is connected + if (sseConnected) return; + fetchHeight(); const id = setInterval(fetchHeight, pollMs); return () => clearInterval(id); - }, [pollMs, fetchHeight]); + }, [pollMs, fetchHeight, sseConnected]); return { height, loading, error, lastUpdatedAt, bps }; } diff --git a/frontend/src/pages/BlocksPage.tsx b/frontend/src/pages/BlocksPage.tsx index 64a4d1b..08f85dc 100644 --- a/frontend/src/pages/BlocksPage.tsx +++ b/frontend/src/pages/BlocksPage.tsx @@ -1,8 +1,9 @@ -import { useEffect, useMemo, useRef, useState } from 'react'; +import { useContext, useEffect, useMemo, useRef, useState } from 'react'; import { Link, useNavigate } from 'react-router-dom'; import { useBlocks } from '../hooks'; import { CopyButton, Loading } from '../components'; import { formatNumber, formatTimeAgo, formatGas, truncateHash } from '../utils'; +import { BlockStatsContext } from '../context/BlockStatsContext'; export default function BlocksPage() { const [page, setPage] = useState(1); @@ -14,8 +15,37 @@ export default function BlocksPage() { return true; } }); - const { blocks, pagination, refetch, loading } = useBlocks({ page, limit: 20 }); + const { blocks: fetchedBlocks, pagination, refetch, loading } = useBlocks({ page, limit: 20 }); const hasLoaded = !loading || pagination !== null; + const { latestBlockEvent, sseConnected } = useContext(BlockStatsContext); + const [sseBlocks, setSseBlocks] = useState([]); + const lastSseBlockRef = useRef(null); + + // Prepend new blocks from SSE on page 1 with auto-refresh + useEffect(() => { + if (!latestBlockEvent || page !== 1 || !autoRefresh) return; + const block = latestBlockEvent.block; + if (lastSseBlockRef.current != null && block.number <= lastSseBlockRef.current) return; + lastSseBlockRef.current = block.number; + setSseBlocks((prev) => { + // Avoid duplicates with fetched blocks + if (prev.some((b) => b.number === block.number)) return prev; + return [block, ...prev].slice(0, 20); + }); + }, [latestBlockEvent, page, autoRefresh]); + + // Reset SSE buffer when fetched blocks update (they'll include the SSE blocks now) + useEffect(() => { + setSseBlocks([]); + }, [fetchedBlocks]); + + // Merge: SSE blocks prepended, deduped, trimmed to page size + const blocks = useMemo(() => { + if (!sseBlocks.length) return fetchedBlocks; + const seen = new Set(fetchedBlocks.map((b) => b.number)); + const unique = sseBlocks.filter((b) => !seen.has(b.number)); + return [...unique, ...fetchedBlocks].slice(0, 20); + }, [fetchedBlocks, sseBlocks]); const navigate = useNavigate(); const [sort, setSort] = useState<{ key: 'number' | 'hash' | 'timestamp' | 'transaction_count' | 'gas_used' | null; direction: 'asc' | 'desc'; }>({ key: null, direction: 'desc' }); const seenBlocksRef = useRef>(new Set()); @@ -51,13 +81,16 @@ export default function BlocksPage() { useEffect(() => { if (!autoRefresh) return; + // When SSE is connected, poll less frequently (every 10s as a safety net) + // When SSE is disconnected, poll every 1s as before + const interval = sseConnected ? 10000 : 1000; const id = setInterval(() => { if (!loading) { void refetch(); } - }, 1000); + }, interval); return () => clearInterval(id); - }, [autoRefresh, refetch, loading]); + }, [autoRefresh, refetch, loading, sseConnected]); // Keep relative timestamps (Age) updating even when auto refresh is paused useEffect(() => { From cc1a48ea709964ba7a672c9590322f1200b55831 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 26 Feb 2026 16:31:05 +0100 Subject: [PATCH 2/5] Fix SSE bootstrap query for empty blocks table --- backend/crates/atlas-api/src/handlers/sse.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/backend/crates/atlas-api/src/handlers/sse.rs b/backend/crates/atlas-api/src/handlers/sse.rs index 94dac03..f0426bc 100644 --- a/backend/crates/atlas-api/src/handlers/sse.rs +++ b/backend/crates/atlas-api/src/handlers/sse.rs @@ -34,15 +34,13 @@ pub async fn block_events( // On first tick, seed with the latest block number if last_block_number.is_none() { - let latest: Option<(i64,)> = sqlx::query_as( - "SELECT MAX(number) FROM blocks" - ) - .fetch_optional(&state.pool) - .await - .ok() - .flatten(); + let latest: Option = sqlx::query_scalar("SELECT MAX(number) FROM blocks") + .fetch_one(&state.pool) + .await + .ok() + .flatten(); - if let Some((max_num,)) = latest { + if let Some(max_num) = latest { last_block_number = Some(max_num); // Emit the current latest block as the initial event let block: Option = sqlx::query_as( From bc193c38ed02cab4b8e76dcdf8be9d636d247b81 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 26 Feb 2026 20:19:45 +0100 Subject: [PATCH 3/5] fix: address code review comments - sse.rs: replace silent error swallowing with tracing::warn! and continue - useBlockSSE: use named function expression to fix TS2448 self-reference - BlocksPage: filter SSE buffer instead of unconditional clear on fetch --- backend/crates/atlas-api/src/handlers/sse.rs | 24 +++++++++++++------- frontend/src/hooks/useBlockSSE.ts | 4 ++-- frontend/src/pages/BlocksPage.tsx | 7 ++++-- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/backend/crates/atlas-api/src/handlers/sse.rs b/backend/crates/atlas-api/src/handlers/sse.rs index f0426bc..db2b956 100644 --- a/backend/crates/atlas-api/src/handlers/sse.rs +++ b/backend/crates/atlas-api/src/handlers/sse.rs @@ -11,6 +11,7 @@ use tokio::time::interval; use crate::AppState; use atlas_common::Block; +use tracing::warn; #[derive(Serialize)] struct NewBlockEvent { @@ -34,24 +35,28 @@ pub async fn block_events( // On first tick, seed with the latest block number if last_block_number.is_none() { - let latest: Option = sqlx::query_scalar("SELECT MAX(number) FROM blocks") + let latest: Option = match sqlx::query_scalar("SELECT MAX(number) FROM blocks") .fetch_one(&state.pool) .await - .ok() - .flatten(); + { + Ok(v) => v, + Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; } + }; if let Some(max_num) = latest { last_block_number = Some(max_num); // Emit the current latest block as the initial event - let block: Option = sqlx::query_as( + let block: Option = match sqlx::query_as( "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at FROM blocks WHERE number = $1" ) .bind(max_num) .fetch_optional(&state.pool) .await - .ok() - .flatten(); + { + Ok(v) => v, + Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; } + }; if let Some(block) = block { let event = NewBlockEvent { block }; @@ -67,14 +72,17 @@ pub async fn block_events( let cursor = last_block_number.unwrap(); // Fetch ALL new blocks since last sent, in ascending order - let new_blocks: Vec = sqlx::query_as( + let new_blocks: Vec = match sqlx::query_as( "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at FROM blocks WHERE number > $1 ORDER BY number ASC" ) .bind(cursor) .fetch_all(&state.pool) .await - .unwrap_or_default(); + { + Ok(rows) => rows, + Err(e) => { warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); continue; } + }; if !new_blocks.is_empty() { ping_counter = 0; diff --git a/frontend/src/hooks/useBlockSSE.ts b/frontend/src/hooks/useBlockSSE.ts index 9d518a3..5b0c788 100644 --- a/frontend/src/hooks/useBlockSSE.ts +++ b/frontend/src/hooks/useBlockSSE.ts @@ -48,7 +48,7 @@ export default function useBlockSSE(): BlockSSEState { // Cached drain interval in ms, derived from chain block timestamps const drainIntervalRef = useRef(90); // initial guess ~11 bps - const connect = useCallback(() => { + const connect = useCallback(function connectSSE() { if (esRef.current) { esRef.current.close(); } @@ -125,7 +125,7 @@ export default function useBlockSSE(): BlockSSEState { reconnectTimeoutRef.current = window.setTimeout(() => { reconnectTimeoutRef.current = null; - connect(); + connectSSE(); }, 2000); }; }, []); diff --git a/frontend/src/pages/BlocksPage.tsx b/frontend/src/pages/BlocksPage.tsx index 08f85dc..46796e2 100644 --- a/frontend/src/pages/BlocksPage.tsx +++ b/frontend/src/pages/BlocksPage.tsx @@ -34,9 +34,12 @@ export default function BlocksPage() { }); }, [latestBlockEvent, page, autoRefresh]); - // Reset SSE buffer when fetched blocks update (they'll include the SSE blocks now) + // Drop SSE blocks that are now present in fetchedBlocks to avoid duplicates, + // but keep any that haven't been fetched yet. useEffect(() => { - setSseBlocks([]); + if (!fetchedBlocks.length) return; + const fetched = new Set(fetchedBlocks.map((b) => b.number)); + setSseBlocks((prev) => prev.filter((b) => !fetched.has(b.number))); }, [fetchedBlocks]); // Merge: SSE blocks prepended, deduped, trimmed to page size From 2a0cea0717127f6b8bb535619fb9c7573f47155c Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 26 Feb 2026 20:28:30 +0100 Subject: [PATCH 4/5] fix: defer setState calls in effects to satisfy react-hooks/set-state-in-effect Wrap setSseBlocks calls in BlocksPage and setDisplayedHeight in Layout in requestAnimationFrame to avoid calling setState synchronously inside useEffect bodies, which is flagged by eslint-plugin-react-hooks v7. --- frontend/src/components/Layout.tsx | 5 +++-- frontend/src/pages/BlocksPage.tsx | 27 ++++++++++++++++++++++----- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/frontend/src/components/Layout.tsx b/frontend/src/components/Layout.tsx index af574d6..9d7e1d2 100644 --- a/frontend/src/components/Layout.tsx +++ b/frontend/src/components/Layout.tsx @@ -56,10 +56,11 @@ export default function Layout() { }); } - // When SSE is connected, just track the real height directly — no prediction + // When SSE is connected, just track the real height directly — no prediction. + // The initialization block above already scheduled a RAF to call setDisplayedHeight + // whenever height changes, so no synchronous setState needed here. if (sse.connected) { displayedRef.current = height; - setDisplayedHeight(height); return; } diff --git a/frontend/src/pages/BlocksPage.tsx b/frontend/src/pages/BlocksPage.tsx index 46796e2..3355074 100644 --- a/frontend/src/pages/BlocksPage.tsx +++ b/frontend/src/pages/BlocksPage.tsx @@ -20,6 +20,8 @@ export default function BlocksPage() { const { latestBlockEvent, sseConnected } = useContext(BlockStatsContext); const [sseBlocks, setSseBlocks] = useState([]); const lastSseBlockRef = useRef(null); + const ssePrependRafRef = useRef(null); + const sseFilterRafRef = useRef(null); // Prepend new blocks from SSE on page 1 with auto-refresh useEffect(() => { @@ -27,10 +29,13 @@ export default function BlocksPage() { const block = latestBlockEvent.block; if (lastSseBlockRef.current != null && block.number <= lastSseBlockRef.current) return; lastSseBlockRef.current = block.number; - setSseBlocks((prev) => { - // Avoid duplicates with fetched blocks - if (prev.some((b) => b.number === block.number)) return prev; - return [block, ...prev].slice(0, 20); + if (ssePrependRafRef.current !== null) cancelAnimationFrame(ssePrependRafRef.current); + ssePrependRafRef.current = window.requestAnimationFrame(() => { + setSseBlocks((prev) => { + if (prev.some((b) => b.number === block.number)) return prev; + return [block, ...prev].slice(0, 20); + }); + ssePrependRafRef.current = null; }); }, [latestBlockEvent, page, autoRefresh]); @@ -39,7 +44,11 @@ export default function BlocksPage() { useEffect(() => { if (!fetchedBlocks.length) return; const fetched = new Set(fetchedBlocks.map((b) => b.number)); - setSseBlocks((prev) => prev.filter((b) => !fetched.has(b.number))); + if (sseFilterRafRef.current !== null) cancelAnimationFrame(sseFilterRafRef.current); + sseFilterRafRef.current = window.requestAnimationFrame(() => { + setSseBlocks((prev) => prev.filter((b) => !fetched.has(b.number))); + sseFilterRafRef.current = null; + }); }, [fetchedBlocks]); // Merge: SSE blocks prepended, deduped, trimmed to page size @@ -163,6 +172,14 @@ export default function BlocksPage() { window.cancelAnimationFrame(highlightRafRef.current); highlightRafRef.current = null; } + if (ssePrependRafRef.current !== null) { + cancelAnimationFrame(ssePrependRafRef.current); + ssePrependRafRef.current = null; + } + if (sseFilterRafRef.current !== null) { + cancelAnimationFrame(sseFilterRafRef.current); + sseFilterRafRef.current = null; + } for (const [, t] of activeTimeouts) clearTimeout(t); activeTimeouts.clear(); }; From 5a61a1f19efbc2a857ba76b2f9598239adca83e7 Mon Sep 17 00:00:00 2001 From: pthmas <9058370+pthmas@users.noreply.github.com> Date: Thu, 26 Feb 2026 21:39:43 +0100 Subject: [PATCH 5/5] fix: disable polling entirely while SSE is connected MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stop polling every 10s as a safety net when SSE is active — periodic refetches disrupt the smooth live flow. Instead, refetch immediately when SSE disconnects to catch any missed blocks during the gap. --- frontend/src/pages/BlocksPage.tsx | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/frontend/src/pages/BlocksPage.tsx b/frontend/src/pages/BlocksPage.tsx index 3355074..aa3370a 100644 --- a/frontend/src/pages/BlocksPage.tsx +++ b/frontend/src/pages/BlocksPage.tsx @@ -91,19 +91,25 @@ export default function BlocksPage() { }); }, [blocks, sort]); + // No polling while SSE is connected — periodic refetches disrupt the smooth live flow. + // Fall back to 1s polling only when SSE is disconnected. useEffect(() => { - if (!autoRefresh) return; - // When SSE is connected, poll less frequently (every 10s as a safety net) - // When SSE is disconnected, poll every 1s as before - const interval = sseConnected ? 10000 : 1000; + if (!autoRefresh || sseConnected) return; const id = setInterval(() => { - if (!loading) { - void refetch(); - } - }, interval); + if (!loading) void refetch(); + }, 1000); return () => clearInterval(id); }, [autoRefresh, refetch, loading, sseConnected]); + // When SSE drops, immediately refetch to catch any blocks missed during the gap. + const prevSseConnectedRef = useRef(sseConnected); + useEffect(() => { + if (prevSseConnectedRef.current && !sseConnected && autoRefresh) { + void refetch(); + } + prevSseConnectedRef.current = sseConnected; + }, [sseConnected, refetch, autoRefresh]); + // Keep relative timestamps (Age) updating even when auto refresh is paused useEffect(() => { const id = setInterval(() => setTick((t) => (t + 1) % 1_000_000), 1000);