Skip to content

feat: add SSE for live block updates#19

Open
pthmas wants to merge 5 commits intomainfrom
pthmas/websocket-live-updates
Open

feat: add SSE for live block updates#19
pthmas wants to merge 5 commits intomainfrom
pthmas/websocket-live-updates

Conversation

@pthmas
Copy link
Collaborator

@pthmas pthmas commented Feb 26, 2026

Summary

  • Adds SSE endpoint (GET /api/events) that streams new block metadata in real-time
  • Frontend drains events at the chain's natural block rate (derived from on-chain timestamps) for a smooth visual cadence
  • Blocks page prepends new blocks instantly via SSE; polling remains as fallback
  • Block time display uses a 30s rolling window of block timestamps for stability

Closes #11

Summary by CodeRabbit

  • New Features

    • Real-time SSE endpoint for live block events; backend now streams new-block events to clients.
    • Frontend hook to consume streamed block events and expose latest event, connection state, and bps.
  • UI

    • Live status indicator shows connected (green) vs polling (red).
    • Block height display uses direct SSE updates when connected, predictive smoothing when polling.
    • Blocks list prepends newest SSE-received blocks with deduplication and polling fallback.
  • Chores

    • Web proxy configured to forward and stream /api/events.
    • DB volume converted to an external, named volume for persistent storage.
  • Style

    • Simplified, non-animated block counter for consistent rendering.

Add Server-Sent Events endpoint (GET /api/events) that streams new blocks
to the frontend in real-time, replacing polling for block height and blocks
list updates.

Backend:
- New SSE handler polls DB every 200ms, emits one event per block in order
- SSE route excluded from 10s TimeoutLayer so connections stay alive
- Nginx configured to proxy SSE with buffering disabled

Frontend:
- useBlockSSE hook buffers events in a ref-based queue and drains at the
  chain's natural block rate (computed from on-chain timestamps)
- Navbar counter updates instantly via SSE, block time derived from 30s
  rolling window of block timestamps for stability
- BlocksPage prepends new blocks from SSE on page 1 with auto-refresh
- Polling falls back automatically when SSE disconnects

Closes #11
@coderabbitai
Copy link

coderabbitai bot commented Feb 26, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds SSE-based real-time block streaming: backend exposes /api/events that streams new blocks from the DB; nginx proxies the stream; frontend connects via a new hook, buffers/drains events, and integrates SSE-driven heights into UI and polling logic.

Changes

Cohort / File(s) Summary
Workspace Manifests
backend/Cargo.toml, backend/crates/atlas-api/Cargo.toml
Added streaming dependencies: tokio-stream, futures, and async-stream (workspace entries and crate-level workspace refs).
Backend SSE module
backend/crates/atlas-api/src/handlers/mod.rs, backend/crates/atlas-api/src/handlers/sse.rs
New sse module and block_events handler: periodic (200ms) DB polling, seed latest block, stream new_block JSON events, maintain cursor and emit keep-alive comments.
Backend router
backend/crates/atlas-api/src/main.rs
Added dedicated SSE routes for /api/events, merged into the app router and excluded from the TimeoutLayer; shared AppState applied to SSE routes.
Infrastructure
docker-compose.yml, frontend/nginx.conf
Marked pgdata volume as external/named atlas_pgdata; nginx: added /api/events proxy to atlas-api:3000/api/events with HTTP/1.1 and buffering disabled for streaming.
Frontend SSE hook
frontend/src/hooks/useBlockSSE.ts
New hook connecting to /api/events, buffering new_block events, maintaining block log to compute bps, adaptive drain loop (30–500ms), queue management, reconnection logic; exports NewBlockEvent, BlockSSEState, and useBlockSSE.
Frontend integration
frontend/src/components/Layout.tsx, frontend/src/hooks/useLatestBlockHeight.ts, frontend/src/context/BlockStatsContext.tsx
Integrated useBlockSSE into layout, passed SSE height/connected/bps into useLatestBlockHeight, extended BlockStats context with latestBlockEvent and sseConnected, and updated hook signature/logic to skip polling when SSE connected.
Pages & UI
frontend/src/pages/BlocksPage.tsx, frontend/src/components/SmoothCounter.tsx
BlocksPage: prepend/deduplicate SSE blocks into page list and adjust polling cadence based on SSE; SmoothCounter: simplified DOM/CSS (removed animated wrapper).

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant Nginx as Nginx
    participant Backend as Backend
    participant DB as Database
    participant Hook as useBlockSSE
    participant UI as Display

    Client->>Nginx: GET /api/events (SSE)
    Nginx->>Backend: Proxy /api/events (HTTP/1.1, no buffering)
    Backend->>DB: Query max(block_number) (seed)
    DB-->>Backend: latest block
    Backend->>Client: Emit new_block (initial)

    loop Every 200ms
        Backend->>DB: Query blocks > last_block_number
        DB-->>Backend: Return new blocks (ASC)
        Backend->>Client: Emit new_block events
        Backend->>Client: Emit keep-alive comment (~15s)
    end

    Client->>Hook: Receive new_block event
    Hook->>Hook: Parse event, append to queue & blockLog
    Hook->>Hook: Compute bps & adjust drain interval
    loop Drain loop (adaptive interval)
        Hook->>Hook: Dequeue next block -> set latestBlock & height
        Hook->>UI: Provide SSE state (height, connected, bps, latestBlockEvent)
        UI->>UI: Render UI (height, live indicator)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested reviewers

  • tac0turtle

Poem

🐰 Hopping near the data streams,
New blocks dance through midnight beams,
Queues and pings in gentle flow,
SSE sings — the heights will show,
I nibble logs and watch them grow.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Out of Scope Changes check ⚠️ Warning The docker-compose.yml change (external volume configuration) appears unrelated to SSE implementation and may be out of scope for the stated objective. Review whether the docker-compose.yml volume change belongs in this PR or should be separated into a distinct issue/PR.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: add SSE for live block updates' clearly summarizes the main change: introducing Server-Sent Events for real-time block updates.
Linked Issues check ✅ Passed All code changes implement the requirement from issue #11 to add SSE for notifying the frontend when new blocks are produced, enabling faster UI updates.
Docstring Coverage ✅ Passed Docstring coverage is 85.71% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pthmas/websocket-live-updates

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/crates/atlas-api/src/main.rs (1)

219-225: ⚠️ Potential issue | 🟠 Major

The SSE routes are not wrapped by the TimeoutLayer and violate the coding guidelines.

In axum 0.8, .layer() only affects routes that exist at the time it is called. Since TimeoutLayer is applied on lines 219–222 and .merge(sse_routes) occurs on line 225, the SSE routes do not receive the timeout middleware. Per coding guidelines, all routes in main.rs must be wrapped with TimeoutLayer::with_status_code(StatusCode::REQUEST_TIMEOUT, Duration::from_secs(10)).

Move the .layer(TimeoutLayer...) call after .merge(sse_routes), or apply TimeoutLayer to the sse_routes router before merging if SSE connections require a different timeout policy.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/main.rs` around lines 219 - 225, The
TimeoutLayer is applied before merging sse_routes so SSE routes miss the
timeout; move the call to
TimeoutLayer::with_status_code(axum::http::StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(10)) so it wraps the final router that includes sse_routes
(i.e., call .layer(TimeoutLayer::with_status_code(...)) after
.merge(sse_routes)), or alternatively apply the same TimeoutLayer to the
sse_routes Router itself before calling .merge so all routes in the combined
router receive the timeout middleware while preserving long-lived SSE behavior
if needed.
🧹 Nitpick comments (3)
docker-compose.yml (1)

69-70: Document first-run volume bootstrap for local/dev environments.

Marking pgdata as external is fine, but new environments will fail to start unless atlas_pgdata already exists. Consider documenting docker volume create atlas_pgdata in setup docs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docker-compose.yml` around lines 69 - 70, The docker-compose file marks the
Postgres volume as external ("external: true" with name "atlas_pgdata"), which
will cause new local/dev environments to fail unless that Docker volume already
exists; add a short setup note instructing developers to run the Docker volume
creation command (e.g., docker volume create atlas_pgdata) before first
docker-compose up, and include this instruction in the project's README or local
setup docs alongside the postgres/pgdata volume mention so new contributors know
to bootstrap the volume.
backend/crates/atlas-api/src/handlers/sse.rs (1)

26-33: Per-client 200ms DB polling will not scale well with concurrent SSE clients.

Consider a shared broadcaster (single poller + fanout) so DB polling is O(1) per server instance instead of O(clients).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 26 - 33, The
current per-client async_stream (symbols: stream, last_block_number, tick,
ping_counter) polls the DB every 200ms for each SSE connection; change this to a
shared broadcaster pattern: create a single background task (spawned once at
server start or lazy-init) that owns last_block_number and tick, polls the DB
and sends updates on a tokio::sync::broadcast::channel (or similar) and include
periodic ping events; in the SSE handler, replace the per-client loop with
subscribing to that broadcast channel (channel.subscribe()), receiving updates
(with a timeout or handling lagged subscribers) and forwarding them to the
client, falling back to the channel’s ping/events instead of each client running
its own interval loop.
frontend/src/components/Layout.tsx (1)

59-63: Inconsistent state update pattern in SSE branch.

Line 62 calls setDisplayedHeight directly, while all other branches (initialization, polling) wrap setState in requestAnimationFrame. For consistency with the rest of the effect, consider applying the same RAF-wrapping pattern here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/components/Layout.tsx` around lines 59 - 63, The SSE-connected
branch updates displayedRef.current and calls setDisplayedHeight directly;
change it to match the initialization/polling branches by wrapping the
displayedRef.current assignment and setDisplayedHeight call inside
requestAnimationFrame (use the same raf id tracking/cancellation pattern your
effect uses), e.g., when sse.connected is true schedule the update via
requestAnimationFrame and store/clear the raf id to ensure consistent rendering
and proper cleanup; target the displayedRef, setDisplayedHeight, and
sse.connected code paths for this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 37-43: Replace the silent .ok().flatten() and unwrap_or_default()
patterns in the SSE loop with explicit error handling: use
sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(number) FROM
blocks").fetch_optional(&state.pool).await and similarly switch the other block
fetch (currently using query_as with a tuple) to an appropriate query_scalar or
query_as for the concrete row type, then match on the Result; on Err(log the
error via the existing logger/tracing) and perform a short backoff (e.g.,
tokio::time::sleep) then continue the loop instead of treating errors as missing
data, and on Ok(None/Some) handle the null/Some case as before—this ensures
database errors are logged and the loop recovers rather than silently stalling.

In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 126-131: The TS2448 self-reference error comes from calling
connect() inside its own arrow-function initializer; change the anonymous arrow
callback used to define connect into a named function expression (e.g., function
connectInner(...) { ... } or const connect = function connect(...) { ... }) so
internal recursive/timeout references use the function's internal name rather
than the outer const binding; update any references from
reconnectTimeoutRef.current = window.setTimeout(() => { ... connect(); ... },
2000) to call the internal name (or keep calling connect() since the named
function expression allows self-reference) and ensure the connect symbol remains
exported/used the same elsewhere.

In `@frontend/src/hooks/useLatestBlockHeight.ts`:
- Around line 74-82: The polling fallback in useLatestBlockHeight's fetchHeight
currently calls getStatus() which is too heavy; change fetchHeight to call the
lightweight height endpoint (e.g., getHeight or an equivalent function that
performs GET /api/height) and use the returned { block_height, indexed_at }
instead of status?.block_height, then pass the numeric block_height into
processHeight(latestHeight, false); ensure fetchingRef logic remains unchanged
and update any imports/usages of getStatus inside this module to the new
getHeight function.

In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 37-40: The useEffect that unconditionally calls setSseBlocks([])
when fetchedBlocks changes drops streamed blocks prematurely; instead, in the
effect inside the BlocksPage component compare current sseBlocks with
fetchedBlocks (e.g., by unique id or timestamp) and only remove/merge those SSE
entries that already appear in fetchedBlocks, leaving remaining sseBlocks
intact; update the effect to iterate over sseBlocks and filter out items present
in fetchedBlocks, then call setSseBlocks(filteredSseBlocks) so streamed blocks
are preserved until they are present in fetched data.

---

Outside diff comments:
In `@backend/crates/atlas-api/src/main.rs`:
- Around line 219-225: The TimeoutLayer is applied before merging sse_routes so
SSE routes miss the timeout; move the call to
TimeoutLayer::with_status_code(axum::http::StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(10)) so it wraps the final router that includes sse_routes
(i.e., call .layer(TimeoutLayer::with_status_code(...)) after
.merge(sse_routes)), or alternatively apply the same TimeoutLayer to the
sse_routes Router itself before calling .merge so all routes in the combined
router receive the timeout middleware while preserving long-lived SSE behavior
if needed.

---

Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 26-33: The current per-client async_stream (symbols: stream,
last_block_number, tick, ping_counter) polls the DB every 200ms for each SSE
connection; change this to a shared broadcaster pattern: create a single
background task (spawned once at server start or lazy-init) that owns
last_block_number and tick, polls the DB and sends updates on a
tokio::sync::broadcast::channel (or similar) and include periodic ping events;
in the SSE handler, replace the per-client loop with subscribing to that
broadcast channel (channel.subscribe()), receiving updates (with a timeout or
handling lagged subscribers) and forwarding them to the client, falling back to
the channel’s ping/events instead of each client running its own interval loop.

In `@docker-compose.yml`:
- Around line 69-70: The docker-compose file marks the Postgres volume as
external ("external: true" with name "atlas_pgdata"), which will cause new
local/dev environments to fail unless that Docker volume already exists; add a
short setup note instructing developers to run the Docker volume creation
command (e.g., docker volume create atlas_pgdata) before first docker-compose
up, and include this instruction in the project's README or local setup docs
alongside the postgres/pgdata volume mention so new contributors know to
bootstrap the volume.

In `@frontend/src/components/Layout.tsx`:
- Around line 59-63: The SSE-connected branch updates displayedRef.current and
calls setDisplayedHeight directly; change it to match the initialization/polling
branches by wrapping the displayedRef.current assignment and setDisplayedHeight
call inside requestAnimationFrame (use the same raf id tracking/cancellation
pattern your effect uses), e.g., when sse.connected is true schedule the update
via requestAnimationFrame and store/clear the raf id to ensure consistent
rendering and proper cleanup; target the displayedRef, setDisplayedHeight, and
sse.connected code paths for this change.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a488e55 and f2d30cd.

📒 Files selected for processing (13)
  • backend/Cargo.toml
  • backend/crates/atlas-api/Cargo.toml
  • backend/crates/atlas-api/src/handlers/mod.rs
  • backend/crates/atlas-api/src/handlers/sse.rs
  • backend/crates/atlas-api/src/main.rs
  • docker-compose.yml
  • frontend/nginx.conf
  • frontend/src/components/Layout.tsx
  • frontend/src/components/SmoothCounter.tsx
  • frontend/src/context/BlockStatsContext.tsx
  • frontend/src/hooks/useBlockSSE.ts
  • frontend/src/hooks/useLatestBlockHeight.ts
  • frontend/src/pages/BlocksPage.tsx

Comment on lines 37 to 43
let latest: Option<(i64,)> = sqlx::query_as(
"SELECT MAX(number) FROM blocks"
)
.fetch_optional(&state.pool)
.await
.ok()
.flatten();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Check the actual code at lines 37-43 and 72-79 in sse.rs
cat -n backend/crates/atlas-api/src/handlers/sse.rs | sed -n '30,85p'

Repository: evstack/atlas

Length of output: 2547


🌐 Web query:

SQLx Rust MAX() NULL handling - does query_as decode NULL to Option, or fail when column is NULL?

💡 Result:

If your MAX(...) can be NULL (e.g., no matching rows, or all values are NULL), then in SQLx:

  • Decoding into Option<T> succeeds and yields None.
  • Decoding into plain T fails at runtime with UnexpectedNullError (“unexpected null; try decoding as an Option”). [1]

query_as works via FromRow, and the derived FromRow implementation reads each field using Row::try_get; whether NULL is acceptable is determined by the Rust field type (Option<T> vs T). [2][3]

With the compile-time macros (query!, query_as!), output expressions (like MAX(col)) are generally treated as nullable (so the macro will typically infer an Option<...> unless you override with as "alias!"). [4]

Practical fixes

  • Use Option<i64> / Option<YourType> for MAX.
  • Or force non-NULL in SQL: SELECT COALESCE(MAX(col), 0) ... (or another default).

Sources:
[1] https://docs.rs/sqlx/latest/sqlx/error/struct.UnexpectedNullError.html
[2] https://docs.rs/sqlx/latest/sqlx/trait.FromRow.html
[3] https://docs.rs/sqlx/latest/sqlx/trait.Row.html
[4] https://docs.rs/sqlx/latest/sqlx/macro.query.html


Explicitly handle database errors in the SSE stream loop instead of silently swallowing them.

The .ok().flatten() pattern at lines 37–43 and .unwrap_or_default() at line 72–79 hide database failures, causing the stream to appear stale or stuck without any indication. Log errors and use explicit continue/backoff to recover gracefully.

The NULL handling for MAX(number) is correct in the current code (the Option<(i64,)> tuple is properly decoded to None when the result is NULL). However, use query_scalar::<_, Option<i64>> instead of query_as with a tuple for cleaner, more idiomatic Rust.

🛠️ Suggested fix
+use tracing::warn;
@@
-                let latest: Option<(i64,)> = sqlx::query_as(
+                let latest: Option<i64> = sqlx::query_scalar(
                     "SELECT MAX(number) FROM blocks"
                 )
-                .fetch_optional(&state.pool)
+                .fetch_one(&state.pool)
                 .await
-                .ok()
-                .flatten();
+                .inspect_err(|err| warn!(error = ?err, "sse latest-block query failed"))
+                .unwrap_or_default();

-                if let Some((max_num,)) = latest {
+                if let Some(max_num) = latest {
@@
-            let new_blocks: Vec<Block> = sqlx::query_as(
+            let new_blocks: Vec<Block> = match sqlx::query_as(
                 "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
                  FROM blocks WHERE number > $1 ORDER BY number ASC"
             )
             .bind(cursor)
             .fetch_all(&state.pool)
             .await
+            {
+                Ok(rows) => rows,
+                Err(err) => {
+                    warn!(error = ?err, cursor, "sse incremental-blocks query failed");
+                    continue;
+                }
+            };
-            .unwrap_or_default();

Also applies to: 54–56 (same .ok().flatten() pattern on the block fetch).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 37 - 43, Replace
the silent .ok().flatten() and unwrap_or_default() patterns in the SSE loop with
explicit error handling: use sqlx::query_scalar::<_, Option<i64>>("SELECT
MAX(number) FROM blocks").fetch_optional(&state.pool).await and similarly switch
the other block fetch (currently using query_as with a tuple) to an appropriate
query_scalar or query_as for the concrete row type, then match on the Result; on
Err(log the error via the existing logger/tracing) and perform a short backoff
(e.g., tokio::time::sleep) then continue the loop instead of treating errors as
missing data, and on Ok(None/Some) handle the null/Some case as before—this
ensures database errors are logged and the loop recovers rather than silently
stalling.

Comment on lines +74 to +82
// HTTP polling fallback — only active when SSE is not connected
const fetchHeight = useCallback(async () => {
if (fetchingRef.current) return;
fetchingRef.current = true;
try {
const status = await getStatus();
const now = Date.now();
const latestHeight = status?.block_height;
if (typeof latestHeight === 'number') {
if (latestHeight !== heightRef.current) {
// Update height and time of update
heightRef.current = latestHeight;
setHeight(latestHeight);
setLastUpdatedAt(now);
}
// Update bps using wall-clock deltas between status samples
const prev = prevSampleRef.current;
const curr = { h: latestHeight, t: now };
if (prev && curr.t > prev.t && curr.h >= prev.h) {
const dh = curr.h - prev.h;
const dt = (curr.t - prev.t) / 1000;
const inst = dt > 0 ? dh / dt : 0;
const alpha = alphaRef.current;
setBps((prevBps) => (prevBps == null ? inst : prevBps + alpha * (inst - prevBps)));
}
prevSampleRef.current = curr;
setError(null);
processHeight(latestHeight, false);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Polling fallback should use the fast /api/height endpoint, not /api/status.

This path still calls getStatus(), which misses the required lightweight height endpoint contract for navbar polling.

🔧 Suggested change
-import { getStatus } from '../api/status';
+import { getHeight } from '../api/height';
...
-      const status = await getStatus();
+      const status = await getHeight();
       const latestHeight = status?.block_height;

As per coding guidelines, Frontend must implement fast polling endpoint GET /api/height returning { block_height, indexed_at } used by navbar every 2s.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// HTTP polling fallback — only active when SSE is not connected
const fetchHeight = useCallback(async () => {
if (fetchingRef.current) return;
fetchingRef.current = true;
try {
const status = await getStatus();
const now = Date.now();
const latestHeight = status?.block_height;
if (typeof latestHeight === 'number') {
if (latestHeight !== heightRef.current) {
// Update height and time of update
heightRef.current = latestHeight;
setHeight(latestHeight);
setLastUpdatedAt(now);
}
// Update bps using wall-clock deltas between status samples
const prev = prevSampleRef.current;
const curr = { h: latestHeight, t: now };
if (prev && curr.t > prev.t && curr.h >= prev.h) {
const dh = curr.h - prev.h;
const dt = (curr.t - prev.t) / 1000;
const inst = dt > 0 ? dh / dt : 0;
const alpha = alphaRef.current;
setBps((prevBps) => (prevBps == null ? inst : prevBps + alpha * (inst - prevBps)));
}
prevSampleRef.current = curr;
setError(null);
processHeight(latestHeight, false);
// HTTP polling fallback — only active when SSE is not connected
const fetchHeight = useCallback(async () => {
if (fetchingRef.current) return;
fetchingRef.current = true;
try {
const status = await getHeight();
const latestHeight = status?.block_height;
if (typeof latestHeight === 'number') {
processHeight(latestHeight, false);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/hooks/useLatestBlockHeight.ts` around lines 74 - 82, The polling
fallback in useLatestBlockHeight's fetchHeight currently calls getStatus() which
is too heavy; change fetchHeight to call the lightweight height endpoint (e.g.,
getHeight or an equivalent function that performs GET /api/height) and use the
returned { block_height, indexed_at } instead of status?.block_height, then pass
the numeric block_height into processHeight(latestHeight, false); ensure
fetchingRef logic remains unchanged and update any imports/usages of getStatus
inside this module to the new getHeight function.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)

93-105: Redundant keep-alive mechanisms.

The manual ping_counter logic (lines 93-97) sends a keep-alive comment every ~15 seconds, while axum's KeepAlive configuration (lines 101-105) independently sends a keep-alive text at the same interval. Consider removing one to reduce redundancy.

♻️ Proposed simplification — remove manual keep-alive
-            // Send keep-alive ping every ~15s (75 ticks * 200ms)
-            if ping_counter >= 75 {
-                ping_counter = 0;
-                yield Ok(Event::default().comment("keep-alive"));
-            }
         }
     };

     Sse::new(stream).keep_alive(
         axum::response::sse::KeepAlive::new()
             .interval(Duration::from_secs(15))
             .text("keep-alive"),
     )

You can also remove ping_counter and its updates at lines 29, 33, 62, and 80 if adopting this change.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 93 - 105, Remove
the redundant manual keep-alive: delete the ping_counter-based send (the block
that yields Ok(Event::default().comment("keep-alive")) guarded by ping_counter
>= 75) and any updates to ping_counter elsewhere (updates at the locations that
increment/reset ping_counter), and rely solely on Sse::new(stream).keep_alive
with axum::response::sse::KeepAlive::new().interval(...).text("keep-alive") to
emit pings; this eliminates duplicate keep-alive messages while keeping the
existing Sse::keep_alive configuration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 43-64: The code sets last_block_number = Some(max_num) before
ensuring the initial block was fetched and emitted, which can permanently skip
the block if the fetch fails; modify the logic in the block handling using
latest/max_num so that last_block_number is only updated after a successful
fetch-and-emit: perform the sqlx::query_as(...) fetch_optional, check and
serialize the fetched Block into NewBlockEvent (using serde_json::to_string) and
yield the Event first, and only then assign last_block_number = Some(max_num)
(and reset ping_counter) so the cursor does not advance on failed fetches;
update the code paths around latest, block, NewBlockEvent and the yield
Ok(Event::default().event("new_block").data(...)) accordingly.

---

Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 93-105: Remove the redundant manual keep-alive: delete the
ping_counter-based send (the block that yields
Ok(Event::default().comment("keep-alive")) guarded by ping_counter >= 75) and
any updates to ping_counter elsewhere (updates at the locations that
increment/reset ping_counter), and rely solely on Sse::new(stream).keep_alive
with axum::response::sse::KeepAlive::new().interval(...).text("keep-alive") to
emit pings; this eliminates duplicate keep-alive messages while keeping the
existing Sse::keep_alive configuration.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f2d30cd and cc1a48e.

📒 Files selected for processing (1)
  • backend/crates/atlas-api/src/handlers/sse.rs

Comment on lines 43 to 64
if let Some(max_num) = latest {
last_block_number = Some(max_num);
// Emit the current latest block as the initial event
let block: Option<Block> = sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks WHERE number = $1"
)
.bind(max_num)
.fetch_optional(&state.pool)
.await
.ok()
.flatten();

if let Some(block) = block {
let event = NewBlockEvent { block };
if let Ok(json) = serde_json::to_string(&event) {
yield Ok(Event::default().event("new_block").data(json));
}
}
ping_counter = 0;
}
continue;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Initial block may be permanently skipped if fetch fails.

last_block_number is set to max_num at line 44 before the block is successfully fetched and emitted. If the fetch at lines 46-54 fails silently, the cursor advances past that block and subsequent polls query number > max_num, causing the initial block to never be emitted.

🔧 Proposed fix

Set last_block_number only after successfully emitting the event:

             if let Some(max_num) = latest {
-                last_block_number = Some(max_num);
                 // Emit the current latest block as the initial event
                 let block: Option<Block> = sqlx::query_as(
                     "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
                      FROM blocks WHERE number = $1"
                 )
                 .bind(max_num)
                 .fetch_optional(&state.pool)
                 .await
                 .ok()
                 .flatten();

                 if let Some(block) = block {
+                    last_block_number = Some(block.number);
                     let event = NewBlockEvent { block };
                     if let Ok(json) = serde_json::to_string(&event) {
                         yield Ok(Event::default().event("new_block").data(json));
                     }
+                    ping_counter = 0;
                 }
-                ping_counter = 0;
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 43 - 64, The code
sets last_block_number = Some(max_num) before ensuring the initial block was
fetched and emitted, which can permanently skip the block if the fetch fails;
modify the logic in the block handling using latest/max_num so that
last_block_number is only updated after a successful fetch-and-emit: perform the
sqlx::query_as(...) fetch_optional, check and serialize the fetched Block into
NewBlockEvent (using serde_json::to_string) and yield the Event first, and only
then assign last_block_number = Some(max_num) (and reset ping_counter) so the
cursor does not advance on failed fetches; update the code paths around latest,
block, NewBlockEvent and the yield
Ok(Event::default().event("new_block").data(...)) accordingly.

- sse.rs: replace silent error swallowing with tracing::warn! and continue
- useBlockSSE: use named function expression to fix TS2448 self-reference
- BlocksPage: filter SSE buffer instead of unconditional clear on fetch
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
backend/crates/atlas-api/src/handlers/sse.rs (2)

46-68: ⚠️ Potential issue | 🟠 Major

Advance the SSE cursor only after a successful initial event emit.

last_block_number is set before the initial block is fetched/emitted. If that fetch/emit path fails, the stream cursor advances and that block can be skipped for this client.

Suggested fix
                 if let Some(max_num) = latest {
-                    last_block_number = Some(max_num);
                     // Emit the current latest block as the initial event
                     let block: Option<Block> = match sqlx::query_as(
                         "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
                          FROM blocks WHERE number = $1"
                     )
@@
                     if let Some(block) = block {
                         let event = NewBlockEvent { block };
                         if let Ok(json) = serde_json::to_string(&event) {
                             yield Ok(Event::default().event("new_block").data(json));
+                            last_block_number = Some(max_num);
+                            ping_counter = 0;
                         }
                     }
-                    ping_counter = 0;
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 46 - 68, The SSE
cursor advancement currently sets last_block_number = Some(max_num) before
fetching/emitting the initial block (in the branch handling latest), which can
skip a block if fetch or emit fails; move the assignment of last_block_number so
it only happens after a successful fetch and successful serde_json::to_string +
yield of the NewBlockEvent (i.e., only after the block is Some, NewBlockEvent is
constructed, serialization succeeds, and
Event::default().event("new_block").data(json) is yielded); update the code
around latest, sqlx::query_as(...).bind(max_num).fetch_optional(...), the block
handling, serde_json::to_string(&event), and the yield to ensure
last_block_number is set after emission succeeds and remains unchanged on
errors.

43-44: ⚠️ Potential issue | 🟠 Major

Add a short backoff on repeated DB errors.

Current error branches immediately continue, so outages can produce a tight warn/retry loop every 200ms per SSE client.

Suggested fix
-use tokio::time::interval;
+use tokio::time::{interval, sleep};
@@
-                    Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; }
+                    Err(e) => {
+                        warn!(error = ?e, "sse: failed to query latest block number");
+                        sleep(Duration::from_millis(500)).await;
+                        continue;
+                    }
@@
-                        Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; }
+                        Err(e) => {
+                            warn!(error = ?e, "sse: failed to fetch initial block");
+                            sleep(Duration::from_millis(500)).await;
+                            continue;
+                        }
@@
-                Err(e) => { warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); continue; }
+                Err(e) => {
+                    warn!(error = ?e, cursor, "sse: failed to fetch new blocks");
+                    sleep(Duration::from_millis(500)).await;
+                    continue;
+                }
             };

Also applies to: 58-59, 84-85

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 43 - 44, The error
branches currently `continue` immediately (e.g. the branches that warn!("sse:
failed to query latest block number") and the other similar warn! messages),
causing a tight retry loop on DB outages; add a short async backoff (e.g.,
tokio::time::sleep for a few hundred milliseconds) before each `continue` so the
SSE handler loop yields and avoids tight retries. Update the three error
branches referenced (the Err(e) warn! handling around the latest block query and
the two other warn! sites at the noted locations) to await a small delay and
then continue; keep the warn! logs as-is and only add the sleep+await just prior
to the continue. Ensure you import tokio::time::Duration and tokio::time::sleep
in the sse.rs scope if not already present.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 46-51: The merge logic in the useMemo for blocks (using sseBlocks
and fetchedBlocks) runs on every page, causing newer page views to show items
belonging to page 1; change the logic to only prepend/merge sseBlocks when the
current page is 1: add the page (or currentPage) value to the dependency array
and, inside the useMemo, if page !== 1 return fetchedBlocks directly; otherwise
perform the existing Set/filter merge of sseBlocks and fetchedBlocks and
slice(0,20). Ensure you reference and update the blocks useMemo dependencies to
include the page/currentPage variable so updates happen correctly.

---

Duplicate comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 46-68: The SSE cursor advancement currently sets last_block_number
= Some(max_num) before fetching/emitting the initial block (in the branch
handling latest), which can skip a block if fetch or emit fails; move the
assignment of last_block_number so it only happens after a successful fetch and
successful serde_json::to_string + yield of the NewBlockEvent (i.e., only after
the block is Some, NewBlockEvent is constructed, serialization succeeds, and
Event::default().event("new_block").data(json) is yielded); update the code
around latest, sqlx::query_as(...).bind(max_num).fetch_optional(...), the block
handling, serde_json::to_string(&event), and the yield to ensure
last_block_number is set after emission succeeds and remains unchanged on
errors.
- Around line 43-44: The error branches currently `continue` immediately (e.g.
the branches that warn!("sse: failed to query latest block number") and the
other similar warn! messages), causing a tight retry loop on DB outages; add a
short async backoff (e.g., tokio::time::sleep for a few hundred milliseconds)
before each `continue` so the SSE handler loop yields and avoids tight retries.
Update the three error branches referenced (the Err(e) warn! handling around the
latest block query and the two other warn! sites at the noted locations) to
await a small delay and then continue; keep the warn! logs as-is and only add
the sleep+await just prior to the continue. Ensure you import
tokio::time::Duration and tokio::time::sleep in the sse.rs scope if not already
present.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cc1a48e and bc193c3.

📒 Files selected for processing (3)
  • backend/crates/atlas-api/src/handlers/sse.rs
  • frontend/src/hooks/useBlockSSE.ts
  • frontend/src/pages/BlocksPage.tsx
🚧 Files skipped from review as they are similar to previous changes (1)
  • frontend/src/hooks/useBlockSSE.ts

Comment on lines +46 to +51
const blocks = useMemo(() => {
if (!sseBlocks.length) return fetchedBlocks;
const seen = new Set(fetchedBlocks.map((b) => b.number));
const unique = sseBlocks.filter((b) => !seen.has(b.number));
return [...unique, ...fetchedBlocks].slice(0, 20);
}, [fetchedBlocks, sseBlocks]);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Limit SSE prepend/merge to page 1 to keep pagination correct.

Right now sseBlocks are merged on every page, so page 2+ can display newest blocks that belong to page 1.

Suggested fix
   const blocks = useMemo(() => {
-    if (!sseBlocks.length) return fetchedBlocks;
+    if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks;
     const seen = new Set(fetchedBlocks.map((b) => b.number));
     const unique = sseBlocks.filter((b) => !seen.has(b.number));
     return [...unique, ...fetchedBlocks].slice(0, 20);
-  }, [fetchedBlocks, sseBlocks]);
+  }, [fetchedBlocks, sseBlocks, page, autoRefresh]);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const blocks = useMemo(() => {
if (!sseBlocks.length) return fetchedBlocks;
const seen = new Set(fetchedBlocks.map((b) => b.number));
const unique = sseBlocks.filter((b) => !seen.has(b.number));
return [...unique, ...fetchedBlocks].slice(0, 20);
}, [fetchedBlocks, sseBlocks]);
const blocks = useMemo(() => {
if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks;
const seen = new Set(fetchedBlocks.map((b) => b.number));
const unique = sseBlocks.filter((b) => !seen.has(b.number));
return [...unique, ...fetchedBlocks].slice(0, 20);
}, [fetchedBlocks, sseBlocks, page, autoRefresh]);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/pages/BlocksPage.tsx` around lines 46 - 51, The merge logic in
the useMemo for blocks (using sseBlocks and fetchedBlocks) runs on every page,
causing newer page views to show items belonging to page 1; change the logic to
only prepend/merge sseBlocks when the current page is 1: add the page (or
currentPage) value to the dependency array and, inside the useMemo, if page !==
1 return fetchedBlocks directly; otherwise perform the existing Set/filter merge
of sseBlocks and fetchedBlocks and slice(0,20). Ensure you reference and update
the blocks useMemo dependencies to include the page/currentPage variable so
updates happen correctly.

…-in-effect

Wrap setSseBlocks calls in BlocksPage and setDisplayedHeight in Layout
in requestAnimationFrame to avoid calling setState synchronously inside
useEffect bodies, which is flagged by eslint-plugin-react-hooks v7.
Stop polling every 10s as a safety net when SSE is active — periodic
refetches disrupt the smooth live flow. Instead, refetch immediately
when SSE disconnects to catch any missed blocks during the gap.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
frontend/src/pages/BlocksPage.tsx (1)

55-60: ⚠️ Potential issue | 🟠 Major

Gate SSE merge to page 1 (and live mode) to avoid cross-page contamination.

blocks still prepends sseBlocks on every page. After navigating away from page 1, buffered newest blocks can leak into page 2+ and break pagination correctness.

Suggested fix
   const blocks = useMemo(() => {
-    if (!sseBlocks.length) return fetchedBlocks;
+    if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks;
     const seen = new Set(fetchedBlocks.map((b) => b.number));
     const unique = sseBlocks.filter((b) => !seen.has(b.number));
     return [...unique, ...fetchedBlocks].slice(0, 20);
-  }, [fetchedBlocks, sseBlocks]);
+  }, [fetchedBlocks, sseBlocks, page, autoRefresh]);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/pages/BlocksPage.tsx` around lines 55 - 60, The merge logic in
the useMemo for blocks currently prepends sseBlocks on every page and must be
restricted to live mode on page 1; modify the useMemo so it first checks the
page and live-mode flags (e.g., currentPage === 1 and isLive / liveMode) and if
not both true simply return fetchedBlocks, otherwise run the existing
dedup/merge (seen Set, unique, return [...unique,
...fetchedBlocks].slice(0,20)). Update the dependency array to include the page
and live-mode identifiers so the memo recalculates when those change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 55-60: The merge logic in the useMemo for blocks currently
prepends sseBlocks on every page and must be restricted to live mode on page 1;
modify the useMemo so it first checks the page and live-mode flags (e.g.,
currentPage === 1 and isLive / liveMode) and if not both true simply return
fetchedBlocks, otherwise run the existing dedup/merge (seen Set, unique, return
[...unique, ...fetchedBlocks].slice(0,20)). Update the dependency array to
include the page and live-mode identifiers so the memo recalculates when those
change.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2a0cea0 and 5a61a1f.

📒 Files selected for processing (1)
  • frontend/src/pages/BlocksPage.tsx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add SSE for block production

1 participant