Conversation
Add Server-Sent Events endpoint (GET /api/events) that streams new blocks to the frontend in real-time, replacing polling for block height and blocks list updates. Backend: - New SSE handler polls DB every 200ms, emits one event per block in order - SSE route excluded from 10s TimeoutLayer so connections stay alive - Nginx configured to proxy SSE with buffering disabled Frontend: - useBlockSSE hook buffers events in a ref-based queue and drains at the chain's natural block rate (computed from on-chain timestamps) - Navbar counter updates instantly via SSE, block time derived from 30s rolling window of block timestamps for stability - BlocksPage prepends new blocks from SSE on page 1 with auto-refresh - Polling falls back automatically when SSE disconnects Closes #11
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds SSE-based real-time block streaming: backend exposes Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Nginx as Nginx
participant Backend as Backend
participant DB as Database
participant Hook as useBlockSSE
participant UI as Display
Client->>Nginx: GET /api/events (SSE)
Nginx->>Backend: Proxy /api/events (HTTP/1.1, no buffering)
Backend->>DB: Query max(block_number) (seed)
DB-->>Backend: latest block
Backend->>Client: Emit new_block (initial)
loop Every 200ms
Backend->>DB: Query blocks > last_block_number
DB-->>Backend: Return new blocks (ASC)
Backend->>Client: Emit new_block events
Backend->>Client: Emit keep-alive comment (~15s)
end
Client->>Hook: Receive new_block event
Hook->>Hook: Parse event, append to queue & blockLog
Hook->>Hook: Compute bps & adjust drain interval
loop Drain loop (adaptive interval)
Hook->>Hook: Dequeue next block -> set latestBlock & height
Hook->>UI: Provide SSE state (height, connected, bps, latestBlockEvent)
UI->>UI: Render UI (height, live indicator)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/crates/atlas-api/src/main.rs (1)
219-225:⚠️ Potential issue | 🟠 MajorThe SSE routes are not wrapped by the TimeoutLayer and violate the coding guidelines.
In axum 0.8,
.layer()only affects routes that exist at the time it is called. SinceTimeoutLayeris applied on lines 219–222 and.merge(sse_routes)occurs on line 225, the SSE routes do not receive the timeout middleware. Per coding guidelines, all routes inmain.rsmust be wrapped withTimeoutLayer::with_status_code(StatusCode::REQUEST_TIMEOUT, Duration::from_secs(10)).Move the
.layer(TimeoutLayer...)call after.merge(sse_routes), or applyTimeoutLayerto thesse_routesrouter before merging if SSE connections require a different timeout policy.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/main.rs` around lines 219 - 225, The TimeoutLayer is applied before merging sse_routes so SSE routes miss the timeout; move the call to TimeoutLayer::with_status_code(axum::http::StatusCode::REQUEST_TIMEOUT, Duration::from_secs(10)) so it wraps the final router that includes sse_routes (i.e., call .layer(TimeoutLayer::with_status_code(...)) after .merge(sse_routes)), or alternatively apply the same TimeoutLayer to the sse_routes Router itself before calling .merge so all routes in the combined router receive the timeout middleware while preserving long-lived SSE behavior if needed.
🧹 Nitpick comments (3)
docker-compose.yml (1)
69-70: Document first-run volume bootstrap for local/dev environments.Marking
pgdataas external is fine, but new environments will fail to start unlessatlas_pgdataalready exists. Consider documentingdocker volume create atlas_pgdatain setup docs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker-compose.yml` around lines 69 - 70, The docker-compose file marks the Postgres volume as external ("external: true" with name "atlas_pgdata"), which will cause new local/dev environments to fail unless that Docker volume already exists; add a short setup note instructing developers to run the Docker volume creation command (e.g., docker volume create atlas_pgdata) before first docker-compose up, and include this instruction in the project's README or local setup docs alongside the postgres/pgdata volume mention so new contributors know to bootstrap the volume.backend/crates/atlas-api/src/handlers/sse.rs (1)
26-33: Per-client 200ms DB polling will not scale well with concurrent SSE clients.Consider a shared broadcaster (single poller + fanout) so DB polling is O(1) per server instance instead of O(clients).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 26 - 33, The current per-client async_stream (symbols: stream, last_block_number, tick, ping_counter) polls the DB every 200ms for each SSE connection; change this to a shared broadcaster pattern: create a single background task (spawned once at server start or lazy-init) that owns last_block_number and tick, polls the DB and sends updates on a tokio::sync::broadcast::channel (or similar) and include periodic ping events; in the SSE handler, replace the per-client loop with subscribing to that broadcast channel (channel.subscribe()), receiving updates (with a timeout or handling lagged subscribers) and forwarding them to the client, falling back to the channel’s ping/events instead of each client running its own interval loop.frontend/src/components/Layout.tsx (1)
59-63: Inconsistent state update pattern in SSE branch.Line 62 calls
setDisplayedHeightdirectly, while all other branches (initialization, polling) wrap setState inrequestAnimationFrame. For consistency with the rest of the effect, consider applying the same RAF-wrapping pattern here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/Layout.tsx` around lines 59 - 63, The SSE-connected branch updates displayedRef.current and calls setDisplayedHeight directly; change it to match the initialization/polling branches by wrapping the displayedRef.current assignment and setDisplayedHeight call inside requestAnimationFrame (use the same raf id tracking/cancellation pattern your effect uses), e.g., when sse.connected is true schedule the update via requestAnimationFrame and store/clear the raf id to ensure consistent rendering and proper cleanup; target the displayedRef, setDisplayedHeight, and sse.connected code paths for this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 37-43: Replace the silent .ok().flatten() and unwrap_or_default()
patterns in the SSE loop with explicit error handling: use
sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(number) FROM
blocks").fetch_optional(&state.pool).await and similarly switch the other block
fetch (currently using query_as with a tuple) to an appropriate query_scalar or
query_as for the concrete row type, then match on the Result; on Err(log the
error via the existing logger/tracing) and perform a short backoff (e.g.,
tokio::time::sleep) then continue the loop instead of treating errors as missing
data, and on Ok(None/Some) handle the null/Some case as before—this ensures
database errors are logged and the loop recovers rather than silently stalling.
In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 126-131: The TS2448 self-reference error comes from calling
connect() inside its own arrow-function initializer; change the anonymous arrow
callback used to define connect into a named function expression (e.g., function
connectInner(...) { ... } or const connect = function connect(...) { ... }) so
internal recursive/timeout references use the function's internal name rather
than the outer const binding; update any references from
reconnectTimeoutRef.current = window.setTimeout(() => { ... connect(); ... },
2000) to call the internal name (or keep calling connect() since the named
function expression allows self-reference) and ensure the connect symbol remains
exported/used the same elsewhere.
In `@frontend/src/hooks/useLatestBlockHeight.ts`:
- Around line 74-82: The polling fallback in useLatestBlockHeight's fetchHeight
currently calls getStatus() which is too heavy; change fetchHeight to call the
lightweight height endpoint (e.g., getHeight or an equivalent function that
performs GET /api/height) and use the returned { block_height, indexed_at }
instead of status?.block_height, then pass the numeric block_height into
processHeight(latestHeight, false); ensure fetchingRef logic remains unchanged
and update any imports/usages of getStatus inside this module to the new
getHeight function.
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 37-40: The useEffect that unconditionally calls setSseBlocks([])
when fetchedBlocks changes drops streamed blocks prematurely; instead, in the
effect inside the BlocksPage component compare current sseBlocks with
fetchedBlocks (e.g., by unique id or timestamp) and only remove/merge those SSE
entries that already appear in fetchedBlocks, leaving remaining sseBlocks
intact; update the effect to iterate over sseBlocks and filter out items present
in fetchedBlocks, then call setSseBlocks(filteredSseBlocks) so streamed blocks
are preserved until they are present in fetched data.
---
Outside diff comments:
In `@backend/crates/atlas-api/src/main.rs`:
- Around line 219-225: The TimeoutLayer is applied before merging sse_routes so
SSE routes miss the timeout; move the call to
TimeoutLayer::with_status_code(axum::http::StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(10)) so it wraps the final router that includes sse_routes
(i.e., call .layer(TimeoutLayer::with_status_code(...)) after
.merge(sse_routes)), or alternatively apply the same TimeoutLayer to the
sse_routes Router itself before calling .merge so all routes in the combined
router receive the timeout middleware while preserving long-lived SSE behavior
if needed.
---
Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 26-33: The current per-client async_stream (symbols: stream,
last_block_number, tick, ping_counter) polls the DB every 200ms for each SSE
connection; change this to a shared broadcaster pattern: create a single
background task (spawned once at server start or lazy-init) that owns
last_block_number and tick, polls the DB and sends updates on a
tokio::sync::broadcast::channel (or similar) and include periodic ping events;
in the SSE handler, replace the per-client loop with subscribing to that
broadcast channel (channel.subscribe()), receiving updates (with a timeout or
handling lagged subscribers) and forwarding them to the client, falling back to
the channel’s ping/events instead of each client running its own interval loop.
In `@docker-compose.yml`:
- Around line 69-70: The docker-compose file marks the Postgres volume as
external ("external: true" with name "atlas_pgdata"), which will cause new
local/dev environments to fail unless that Docker volume already exists; add a
short setup note instructing developers to run the Docker volume creation
command (e.g., docker volume create atlas_pgdata) before first docker-compose
up, and include this instruction in the project's README or local setup docs
alongside the postgres/pgdata volume mention so new contributors know to
bootstrap the volume.
In `@frontend/src/components/Layout.tsx`:
- Around line 59-63: The SSE-connected branch updates displayedRef.current and
calls setDisplayedHeight directly; change it to match the initialization/polling
branches by wrapping the displayedRef.current assignment and setDisplayedHeight
call inside requestAnimationFrame (use the same raf id tracking/cancellation
pattern your effect uses), e.g., when sse.connected is true schedule the update
via requestAnimationFrame and store/clear the raf id to ensure consistent
rendering and proper cleanup; target the displayedRef, setDisplayedHeight, and
sse.connected code paths for this change.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
backend/Cargo.tomlbackend/crates/atlas-api/Cargo.tomlbackend/crates/atlas-api/src/handlers/mod.rsbackend/crates/atlas-api/src/handlers/sse.rsbackend/crates/atlas-api/src/main.rsdocker-compose.ymlfrontend/nginx.conffrontend/src/components/Layout.tsxfrontend/src/components/SmoothCounter.tsxfrontend/src/context/BlockStatsContext.tsxfrontend/src/hooks/useBlockSSE.tsfrontend/src/hooks/useLatestBlockHeight.tsfrontend/src/pages/BlocksPage.tsx
| let latest: Option<(i64,)> = sqlx::query_as( | ||
| "SELECT MAX(number) FROM blocks" | ||
| ) | ||
| .fetch_optional(&state.pool) | ||
| .await | ||
| .ok() | ||
| .flatten(); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Check the actual code at lines 37-43 and 72-79 in sse.rs
cat -n backend/crates/atlas-api/src/handlers/sse.rs | sed -n '30,85p'Repository: evstack/atlas
Length of output: 2547
🌐 Web query:
SQLx Rust MAX() NULL handling - does query_as decode NULL to Option, or fail when column is NULL?
💡 Result:
If your MAX(...) can be NULL (e.g., no matching rows, or all values are NULL), then in SQLx:
- Decoding into
Option<T>succeeds and yieldsNone. - Decoding into plain
Tfails at runtime withUnexpectedNullError(“unexpected null; try decoding as anOption”). [1]
query_as works via FromRow, and the derived FromRow implementation reads each field using Row::try_get; whether NULL is acceptable is determined by the Rust field type (Option<T> vs T). [2][3]
With the compile-time macros (query!, query_as!), output expressions (like MAX(col)) are generally treated as nullable (so the macro will typically infer an Option<...> unless you override with as "alias!"). [4]
Practical fixes
- Use
Option<i64>/Option<YourType>forMAX. - Or force non-NULL in SQL:
SELECT COALESCE(MAX(col), 0) ...(or another default).
Sources:
[1] https://docs.rs/sqlx/latest/sqlx/error/struct.UnexpectedNullError.html
[2] https://docs.rs/sqlx/latest/sqlx/trait.FromRow.html
[3] https://docs.rs/sqlx/latest/sqlx/trait.Row.html
[4] https://docs.rs/sqlx/latest/sqlx/macro.query.html
Explicitly handle database errors in the SSE stream loop instead of silently swallowing them.
The .ok().flatten() pattern at lines 37–43 and .unwrap_or_default() at line 72–79 hide database failures, causing the stream to appear stale or stuck without any indication. Log errors and use explicit continue/backoff to recover gracefully.
The NULL handling for MAX(number) is correct in the current code (the Option<(i64,)> tuple is properly decoded to None when the result is NULL). However, use query_scalar::<_, Option<i64>> instead of query_as with a tuple for cleaner, more idiomatic Rust.
🛠️ Suggested fix
+use tracing::warn;
@@
- let latest: Option<(i64,)> = sqlx::query_as(
+ let latest: Option<i64> = sqlx::query_scalar(
"SELECT MAX(number) FROM blocks"
)
- .fetch_optional(&state.pool)
+ .fetch_one(&state.pool)
.await
- .ok()
- .flatten();
+ .inspect_err(|err| warn!(error = ?err, "sse latest-block query failed"))
+ .unwrap_or_default();
- if let Some((max_num,)) = latest {
+ if let Some(max_num) = latest {
@@
- let new_blocks: Vec<Block> = sqlx::query_as(
+ let new_blocks: Vec<Block> = match sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks WHERE number > $1 ORDER BY number ASC"
)
.bind(cursor)
.fetch_all(&state.pool)
.await
+ {
+ Ok(rows) => rows,
+ Err(err) => {
+ warn!(error = ?err, cursor, "sse incremental-blocks query failed");
+ continue;
+ }
+ };
- .unwrap_or_default();Also applies to: 54–56 (same .ok().flatten() pattern on the block fetch).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 37 - 43, Replace
the silent .ok().flatten() and unwrap_or_default() patterns in the SSE loop with
explicit error handling: use sqlx::query_scalar::<_, Option<i64>>("SELECT
MAX(number) FROM blocks").fetch_optional(&state.pool).await and similarly switch
the other block fetch (currently using query_as with a tuple) to an appropriate
query_scalar or query_as for the concrete row type, then match on the Result; on
Err(log the error via the existing logger/tracing) and perform a short backoff
(e.g., tokio::time::sleep) then continue the loop instead of treating errors as
missing data, and on Ok(None/Some) handle the null/Some case as before—this
ensures database errors are logged and the loop recovers rather than silently
stalling.
| // HTTP polling fallback — only active when SSE is not connected | ||
| const fetchHeight = useCallback(async () => { | ||
| if (fetchingRef.current) return; | ||
| fetchingRef.current = true; | ||
| try { | ||
| const status = await getStatus(); | ||
| const now = Date.now(); | ||
| const latestHeight = status?.block_height; | ||
| if (typeof latestHeight === 'number') { | ||
| if (latestHeight !== heightRef.current) { | ||
| // Update height and time of update | ||
| heightRef.current = latestHeight; | ||
| setHeight(latestHeight); | ||
| setLastUpdatedAt(now); | ||
| } | ||
| // Update bps using wall-clock deltas between status samples | ||
| const prev = prevSampleRef.current; | ||
| const curr = { h: latestHeight, t: now }; | ||
| if (prev && curr.t > prev.t && curr.h >= prev.h) { | ||
| const dh = curr.h - prev.h; | ||
| const dt = (curr.t - prev.t) / 1000; | ||
| const inst = dt > 0 ? dh / dt : 0; | ||
| const alpha = alphaRef.current; | ||
| setBps((prevBps) => (prevBps == null ? inst : prevBps + alpha * (inst - prevBps))); | ||
| } | ||
| prevSampleRef.current = curr; | ||
| setError(null); | ||
| processHeight(latestHeight, false); |
There was a problem hiding this comment.
Polling fallback should use the fast /api/height endpoint, not /api/status.
This path still calls getStatus(), which misses the required lightweight height endpoint contract for navbar polling.
🔧 Suggested change
-import { getStatus } from '../api/status';
+import { getHeight } from '../api/height';
...
- const status = await getStatus();
+ const status = await getHeight();
const latestHeight = status?.block_height;As per coding guidelines, Frontend must implement fast polling endpoint GET /api/height returning { block_height, indexed_at } used by navbar every 2s.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // HTTP polling fallback — only active when SSE is not connected | |
| const fetchHeight = useCallback(async () => { | |
| if (fetchingRef.current) return; | |
| fetchingRef.current = true; | |
| try { | |
| const status = await getStatus(); | |
| const now = Date.now(); | |
| const latestHeight = status?.block_height; | |
| if (typeof latestHeight === 'number') { | |
| if (latestHeight !== heightRef.current) { | |
| // Update height and time of update | |
| heightRef.current = latestHeight; | |
| setHeight(latestHeight); | |
| setLastUpdatedAt(now); | |
| } | |
| // Update bps using wall-clock deltas between status samples | |
| const prev = prevSampleRef.current; | |
| const curr = { h: latestHeight, t: now }; | |
| if (prev && curr.t > prev.t && curr.h >= prev.h) { | |
| const dh = curr.h - prev.h; | |
| const dt = (curr.t - prev.t) / 1000; | |
| const inst = dt > 0 ? dh / dt : 0; | |
| const alpha = alphaRef.current; | |
| setBps((prevBps) => (prevBps == null ? inst : prevBps + alpha * (inst - prevBps))); | |
| } | |
| prevSampleRef.current = curr; | |
| setError(null); | |
| processHeight(latestHeight, false); | |
| // HTTP polling fallback — only active when SSE is not connected | |
| const fetchHeight = useCallback(async () => { | |
| if (fetchingRef.current) return; | |
| fetchingRef.current = true; | |
| try { | |
| const status = await getHeight(); | |
| const latestHeight = status?.block_height; | |
| if (typeof latestHeight === 'number') { | |
| processHeight(latestHeight, false); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/src/hooks/useLatestBlockHeight.ts` around lines 74 - 82, The polling
fallback in useLatestBlockHeight's fetchHeight currently calls getStatus() which
is too heavy; change fetchHeight to call the lightweight height endpoint (e.g.,
getHeight or an equivalent function that performs GET /api/height) and use the
returned { block_height, indexed_at } instead of status?.block_height, then pass
the numeric block_height into processHeight(latestHeight, false); ensure
fetchingRef logic remains unchanged and update any imports/usages of getStatus
inside this module to the new getHeight function.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)
93-105: Redundant keep-alive mechanisms.The manual
ping_counterlogic (lines 93-97) sends a keep-alive comment every ~15 seconds, while axum'sKeepAliveconfiguration (lines 101-105) independently sends a keep-alive text at the same interval. Consider removing one to reduce redundancy.♻️ Proposed simplification — remove manual keep-alive
- // Send keep-alive ping every ~15s (75 ticks * 200ms) - if ping_counter >= 75 { - ping_counter = 0; - yield Ok(Event::default().comment("keep-alive")); - } } }; Sse::new(stream).keep_alive( axum::response::sse::KeepAlive::new() .interval(Duration::from_secs(15)) .text("keep-alive"), )You can also remove
ping_counterand its updates at lines 29, 33, 62, and 80 if adopting this change.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 93 - 105, Remove the redundant manual keep-alive: delete the ping_counter-based send (the block that yields Ok(Event::default().comment("keep-alive")) guarded by ping_counter >= 75) and any updates to ping_counter elsewhere (updates at the locations that increment/reset ping_counter), and rely solely on Sse::new(stream).keep_alive with axum::response::sse::KeepAlive::new().interval(...).text("keep-alive") to emit pings; this eliminates duplicate keep-alive messages while keeping the existing Sse::keep_alive configuration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 43-64: The code sets last_block_number = Some(max_num) before
ensuring the initial block was fetched and emitted, which can permanently skip
the block if the fetch fails; modify the logic in the block handling using
latest/max_num so that last_block_number is only updated after a successful
fetch-and-emit: perform the sqlx::query_as(...) fetch_optional, check and
serialize the fetched Block into NewBlockEvent (using serde_json::to_string) and
yield the Event first, and only then assign last_block_number = Some(max_num)
(and reset ping_counter) so the cursor does not advance on failed fetches;
update the code paths around latest, block, NewBlockEvent and the yield
Ok(Event::default().event("new_block").data(...)) accordingly.
---
Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 93-105: Remove the redundant manual keep-alive: delete the
ping_counter-based send (the block that yields
Ok(Event::default().comment("keep-alive")) guarded by ping_counter >= 75) and
any updates to ping_counter elsewhere (updates at the locations that
increment/reset ping_counter), and rely solely on Sse::new(stream).keep_alive
with axum::response::sse::KeepAlive::new().interval(...).text("keep-alive") to
emit pings; this eliminates duplicate keep-alive messages while keeping the
existing Sse::keep_alive configuration.
| if let Some(max_num) = latest { | ||
| last_block_number = Some(max_num); | ||
| // Emit the current latest block as the initial event | ||
| let block: Option<Block> = sqlx::query_as( | ||
| "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at | ||
| FROM blocks WHERE number = $1" | ||
| ) | ||
| .bind(max_num) | ||
| .fetch_optional(&state.pool) | ||
| .await | ||
| .ok() | ||
| .flatten(); | ||
|
|
||
| if let Some(block) = block { | ||
| let event = NewBlockEvent { block }; | ||
| if let Ok(json) = serde_json::to_string(&event) { | ||
| yield Ok(Event::default().event("new_block").data(json)); | ||
| } | ||
| } | ||
| ping_counter = 0; | ||
| } | ||
| continue; |
There was a problem hiding this comment.
Initial block may be permanently skipped if fetch fails.
last_block_number is set to max_num at line 44 before the block is successfully fetched and emitted. If the fetch at lines 46-54 fails silently, the cursor advances past that block and subsequent polls query number > max_num, causing the initial block to never be emitted.
🔧 Proposed fix
Set last_block_number only after successfully emitting the event:
if let Some(max_num) = latest {
- last_block_number = Some(max_num);
// Emit the current latest block as the initial event
let block: Option<Block> = sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks WHERE number = $1"
)
.bind(max_num)
.fetch_optional(&state.pool)
.await
.ok()
.flatten();
if let Some(block) = block {
+ last_block_number = Some(block.number);
let event = NewBlockEvent { block };
if let Ok(json) = serde_json::to_string(&event) {
yield Ok(Event::default().event("new_block").data(json));
}
+ ping_counter = 0;
}
- ping_counter = 0;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 43 - 64, The code
sets last_block_number = Some(max_num) before ensuring the initial block was
fetched and emitted, which can permanently skip the block if the fetch fails;
modify the logic in the block handling using latest/max_num so that
last_block_number is only updated after a successful fetch-and-emit: perform the
sqlx::query_as(...) fetch_optional, check and serialize the fetched Block into
NewBlockEvent (using serde_json::to_string) and yield the Event first, and only
then assign last_block_number = Some(max_num) (and reset ping_counter) so the
cursor does not advance on failed fetches; update the code paths around latest,
block, NewBlockEvent and the yield
Ok(Event::default().event("new_block").data(...)) accordingly.
- sse.rs: replace silent error swallowing with tracing::warn! and continue - useBlockSSE: use named function expression to fix TS2448 self-reference - BlocksPage: filter SSE buffer instead of unconditional clear on fetch
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
backend/crates/atlas-api/src/handlers/sse.rs (2)
46-68:⚠️ Potential issue | 🟠 MajorAdvance the SSE cursor only after a successful initial event emit.
last_block_numberis set before the initial block is fetched/emitted. If that fetch/emit path fails, the stream cursor advances and that block can be skipped for this client.Suggested fix
if let Some(max_num) = latest { - last_block_number = Some(max_num); // Emit the current latest block as the initial event let block: Option<Block> = match sqlx::query_as( "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at FROM blocks WHERE number = $1" ) @@ if let Some(block) = block { let event = NewBlockEvent { block }; if let Ok(json) = serde_json::to_string(&event) { yield Ok(Event::default().event("new_block").data(json)); + last_block_number = Some(max_num); + ping_counter = 0; } } - ping_counter = 0; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 46 - 68, The SSE cursor advancement currently sets last_block_number = Some(max_num) before fetching/emitting the initial block (in the branch handling latest), which can skip a block if fetch or emit fails; move the assignment of last_block_number so it only happens after a successful fetch and successful serde_json::to_string + yield of the NewBlockEvent (i.e., only after the block is Some, NewBlockEvent is constructed, serialization succeeds, and Event::default().event("new_block").data(json) is yielded); update the code around latest, sqlx::query_as(...).bind(max_num).fetch_optional(...), the block handling, serde_json::to_string(&event), and the yield to ensure last_block_number is set after emission succeeds and remains unchanged on errors.
43-44:⚠️ Potential issue | 🟠 MajorAdd a short backoff on repeated DB errors.
Current error branches immediately
continue, so outages can produce a tight warn/retry loop every 200ms per SSE client.Suggested fix
-use tokio::time::interval; +use tokio::time::{interval, sleep}; @@ - Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; } + Err(e) => { + warn!(error = ?e, "sse: failed to query latest block number"); + sleep(Duration::from_millis(500)).await; + continue; + } @@ - Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; } + Err(e) => { + warn!(error = ?e, "sse: failed to fetch initial block"); + sleep(Duration::from_millis(500)).await; + continue; + } @@ - Err(e) => { warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); continue; } + Err(e) => { + warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); + sleep(Duration::from_millis(500)).await; + continue; + } };Also applies to: 58-59, 84-85
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 43 - 44, The error branches currently `continue` immediately (e.g. the branches that warn!("sse: failed to query latest block number") and the other similar warn! messages), causing a tight retry loop on DB outages; add a short async backoff (e.g., tokio::time::sleep for a few hundred milliseconds) before each `continue` so the SSE handler loop yields and avoids tight retries. Update the three error branches referenced (the Err(e) warn! handling around the latest block query and the two other warn! sites at the noted locations) to await a small delay and then continue; keep the warn! logs as-is and only add the sleep+await just prior to the continue. Ensure you import tokio::time::Duration and tokio::time::sleep in the sse.rs scope if not already present.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 46-51: The merge logic in the useMemo for blocks (using sseBlocks
and fetchedBlocks) runs on every page, causing newer page views to show items
belonging to page 1; change the logic to only prepend/merge sseBlocks when the
current page is 1: add the page (or currentPage) value to the dependency array
and, inside the useMemo, if page !== 1 return fetchedBlocks directly; otherwise
perform the existing Set/filter merge of sseBlocks and fetchedBlocks and
slice(0,20). Ensure you reference and update the blocks useMemo dependencies to
include the page/currentPage variable so updates happen correctly.
---
Duplicate comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 46-68: The SSE cursor advancement currently sets last_block_number
= Some(max_num) before fetching/emitting the initial block (in the branch
handling latest), which can skip a block if fetch or emit fails; move the
assignment of last_block_number so it only happens after a successful fetch and
successful serde_json::to_string + yield of the NewBlockEvent (i.e., only after
the block is Some, NewBlockEvent is constructed, serialization succeeds, and
Event::default().event("new_block").data(json) is yielded); update the code
around latest, sqlx::query_as(...).bind(max_num).fetch_optional(...), the block
handling, serde_json::to_string(&event), and the yield to ensure
last_block_number is set after emission succeeds and remains unchanged on
errors.
- Around line 43-44: The error branches currently `continue` immediately (e.g.
the branches that warn!("sse: failed to query latest block number") and the
other similar warn! messages), causing a tight retry loop on DB outages; add a
short async backoff (e.g., tokio::time::sleep for a few hundred milliseconds)
before each `continue` so the SSE handler loop yields and avoids tight retries.
Update the three error branches referenced (the Err(e) warn! handling around the
latest block query and the two other warn! sites at the noted locations) to
await a small delay and then continue; keep the warn! logs as-is and only add
the sleep+await just prior to the continue. Ensure you import
tokio::time::Duration and tokio::time::sleep in the sse.rs scope if not already
present.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
backend/crates/atlas-api/src/handlers/sse.rsfrontend/src/hooks/useBlockSSE.tsfrontend/src/pages/BlocksPage.tsx
🚧 Files skipped from review as they are similar to previous changes (1)
- frontend/src/hooks/useBlockSSE.ts
| const blocks = useMemo(() => { | ||
| if (!sseBlocks.length) return fetchedBlocks; | ||
| const seen = new Set(fetchedBlocks.map((b) => b.number)); | ||
| const unique = sseBlocks.filter((b) => !seen.has(b.number)); | ||
| return [...unique, ...fetchedBlocks].slice(0, 20); | ||
| }, [fetchedBlocks, sseBlocks]); |
There was a problem hiding this comment.
Limit SSE prepend/merge to page 1 to keep pagination correct.
Right now sseBlocks are merged on every page, so page 2+ can display newest blocks that belong to page 1.
Suggested fix
const blocks = useMemo(() => {
- if (!sseBlocks.length) return fetchedBlocks;
+ if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks;
const seen = new Set(fetchedBlocks.map((b) => b.number));
const unique = sseBlocks.filter((b) => !seen.has(b.number));
return [...unique, ...fetchedBlocks].slice(0, 20);
- }, [fetchedBlocks, sseBlocks]);
+ }, [fetchedBlocks, sseBlocks, page, autoRefresh]);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const blocks = useMemo(() => { | |
| if (!sseBlocks.length) return fetchedBlocks; | |
| const seen = new Set(fetchedBlocks.map((b) => b.number)); | |
| const unique = sseBlocks.filter((b) => !seen.has(b.number)); | |
| return [...unique, ...fetchedBlocks].slice(0, 20); | |
| }, [fetchedBlocks, sseBlocks]); | |
| const blocks = useMemo(() => { | |
| if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks; | |
| const seen = new Set(fetchedBlocks.map((b) => b.number)); | |
| const unique = sseBlocks.filter((b) => !seen.has(b.number)); | |
| return [...unique, ...fetchedBlocks].slice(0, 20); | |
| }, [fetchedBlocks, sseBlocks, page, autoRefresh]); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/src/pages/BlocksPage.tsx` around lines 46 - 51, The merge logic in
the useMemo for blocks (using sseBlocks and fetchedBlocks) runs on every page,
causing newer page views to show items belonging to page 1; change the logic to
only prepend/merge sseBlocks when the current page is 1: add the page (or
currentPage) value to the dependency array and, inside the useMemo, if page !==
1 return fetchedBlocks directly; otherwise perform the existing Set/filter merge
of sseBlocks and fetchedBlocks and slice(0,20). Ensure you reference and update
the blocks useMemo dependencies to include the page/currentPage variable so
updates happen correctly.
…-in-effect Wrap setSseBlocks calls in BlocksPage and setDisplayedHeight in Layout in requestAnimationFrame to avoid calling setState synchronously inside useEffect bodies, which is flagged by eslint-plugin-react-hooks v7.
Stop polling every 10s as a safety net when SSE is active — periodic refetches disrupt the smooth live flow. Instead, refetch immediately when SSE disconnects to catch any missed blocks during the gap.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
frontend/src/pages/BlocksPage.tsx (1)
55-60:⚠️ Potential issue | 🟠 MajorGate SSE merge to page 1 (and live mode) to avoid cross-page contamination.
blocksstill prependssseBlockson every page. After navigating away from page 1, buffered newest blocks can leak into page 2+ and break pagination correctness.Suggested fix
const blocks = useMemo(() => { - if (!sseBlocks.length) return fetchedBlocks; + if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks; const seen = new Set(fetchedBlocks.map((b) => b.number)); const unique = sseBlocks.filter((b) => !seen.has(b.number)); return [...unique, ...fetchedBlocks].slice(0, 20); - }, [fetchedBlocks, sseBlocks]); + }, [fetchedBlocks, sseBlocks, page, autoRefresh]);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/pages/BlocksPage.tsx` around lines 55 - 60, The merge logic in the useMemo for blocks currently prepends sseBlocks on every page and must be restricted to live mode on page 1; modify the useMemo so it first checks the page and live-mode flags (e.g., currentPage === 1 and isLive / liveMode) and if not both true simply return fetchedBlocks, otherwise run the existing dedup/merge (seen Set, unique, return [...unique, ...fetchedBlocks].slice(0,20)). Update the dependency array to include the page and live-mode identifiers so the memo recalculates when those change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 55-60: The merge logic in the useMemo for blocks currently
prepends sseBlocks on every page and must be restricted to live mode on page 1;
modify the useMemo so it first checks the page and live-mode flags (e.g.,
currentPage === 1 and isLive / liveMode) and if not both true simply return
fetchedBlocks, otherwise run the existing dedup/merge (seen Set, unique, return
[...unique, ...fetchedBlocks].slice(0,20)). Update the dependency array to
include the page and live-mode identifiers so the memo recalculates when those
change.
Summary
GET /api/events) that streams new block metadata in real-timeCloses #11
Summary by CodeRabbit
New Features
UI
Chores
Style