Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
# Config
dotenvy = "0.15"

# Streaming
tokio-stream = "0.1"
futures = "0.3"
async-stream = "0.3"

# Utilities
bigdecimal = { version = "0.4", features = ["serde"] }
hex = "0.4"
Expand Down
3 changes: 3 additions & 0 deletions backend/crates/atlas-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ bigdecimal = { workspace = true }
hex = { workspace = true }
chrono = { workspace = true }
reqwest = { workspace = true }
tokio-stream = { workspace = true }
futures = { workspace = true }
async-stream = { workspace = true }
1 change: 1 addition & 0 deletions backend/crates/atlas-api/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod logs;
pub mod nfts;
pub mod proxy;
pub mod search;
pub mod sse;
pub mod status;
pub mod tokens;
pub mod transactions;
Expand Down
114 changes: 114 additions & 0 deletions backend/crates/atlas-api/src/handlers/sse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use axum::{
extract::State,
response::sse::{Event, Sse},
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;

use crate::AppState;
use atlas_common::Block;
use tracing::warn;

#[derive(Serialize)]
struct NewBlockEvent {
block: Block,
}

/// GET /api/events — Server-Sent Events stream for live block updates.
/// Polls the DB every 200ms and emits one `new_block` event per block, in order.
/// Never skips blocks — fetches all blocks since the last one sent.
pub async fn block_events(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = async_stream::stream! {
let mut last_block_number: Option<i64> = None;
let mut tick = interval(Duration::from_millis(200));
let mut ping_counter: u32 = 0;

loop {
tick.tick().await;
ping_counter += 1;

// On first tick, seed with the latest block number
if last_block_number.is_none() {
let latest: Option<i64> = match sqlx::query_scalar("SELECT MAX(number) FROM blocks")
.fetch_one(&state.pool)
.await
{
Ok(v) => v,
Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; }
};

if let Some(max_num) = latest {
last_block_number = Some(max_num);
// Emit the current latest block as the initial event
let block: Option<Block> = match sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks WHERE number = $1"
)
.bind(max_num)
.fetch_optional(&state.pool)
.await
{
Ok(v) => v,
Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; }
};

if let Some(block) = block {
let event = NewBlockEvent { block };
if let Ok(json) = serde_json::to_string(&event) {
yield Ok(Event::default().event("new_block").data(json));
}
}
ping_counter = 0;
}
continue;
Comment on lines 46 to 69
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Initial block may be permanently skipped if fetch fails.

last_block_number is set to max_num at line 44 before the block is successfully fetched and emitted. If the fetch at lines 46-54 fails silently, the cursor advances past that block and subsequent polls query number > max_num, causing the initial block to never be emitted.

🔧 Proposed fix

Set last_block_number only after successfully emitting the event:

             if let Some(max_num) = latest {
-                last_block_number = Some(max_num);
                 // Emit the current latest block as the initial event
                 let block: Option<Block> = sqlx::query_as(
                     "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
                      FROM blocks WHERE number = $1"
                 )
                 .bind(max_num)
                 .fetch_optional(&state.pool)
                 .await
                 .ok()
                 .flatten();

                 if let Some(block) = block {
+                    last_block_number = Some(block.number);
                     let event = NewBlockEvent { block };
                     if let Ok(json) = serde_json::to_string(&event) {
                         yield Ok(Event::default().event("new_block").data(json));
                     }
+                    ping_counter = 0;
                 }
-                ping_counter = 0;
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 43 - 64, The code
sets last_block_number = Some(max_num) before ensuring the initial block was
fetched and emitted, which can permanently skip the block if the fetch fails;
modify the logic in the block handling using latest/max_num so that
last_block_number is only updated after a successful fetch-and-emit: perform the
sqlx::query_as(...) fetch_optional, check and serialize the fetched Block into
NewBlockEvent (using serde_json::to_string) and yield the Event first, and only
then assign last_block_number = Some(max_num) (and reset ping_counter) so the
cursor does not advance on failed fetches; update the code paths around latest,
block, NewBlockEvent and the yield
Ok(Event::default().event("new_block").data(...)) accordingly.

}

let cursor = last_block_number.unwrap();

// Fetch ALL new blocks since last sent, in ascending order
let new_blocks: Vec<Block> = match sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks WHERE number > $1 ORDER BY number ASC"
)
.bind(cursor)
.fetch_all(&state.pool)
.await
{
Ok(rows) => rows,
Err(e) => { warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); continue; }
};

if !new_blocks.is_empty() {
ping_counter = 0;
}

// Emit one event per block, in order
for block in new_blocks {
last_block_number = Some(block.number);

let event = NewBlockEvent { block };
if let Ok(json) = serde_json::to_string(&event) {
yield Ok(Event::default().event("new_block").data(json));
}
}

// Send keep-alive ping every ~15s (75 ticks * 200ms)
if ping_counter >= 75 {
ping_counter = 0;
yield Ok(Event::default().comment("keep-alive"));
}
}
};

Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive"),
)
}
12 changes: 10 additions & 2 deletions backend/crates/atlas-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ async fn main() -> Result<()> {
admin_api_key,
});

// SSE route — excluded from TimeoutLayer so connections stay alive
let sse_routes = Router::new()
.route("/api/events", get(handlers::sse::block_events))
.with_state(state.clone());

// Build router
let app = Router::new()
// Blocks
Expand Down Expand Up @@ -215,14 +220,17 @@ async fn main() -> Result<()> {
axum::http::StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(10),
))
.with_state(state)
// Merge SSE routes (no TimeoutLayer so connections stay alive)
.merge(sse_routes)
// Shared layers applied to all routes
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any),
)
.layer(TraceLayer::new_for_http())
.with_state(state);
.layer(TraceLayer::new_for_http());

let addr = format!("{}:{}", host, port);
tracing::info!("Listening on {}", addr);
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ services:

volumes:
pgdata:
external: true
name: atlas_pgdata
13 changes: 13 additions & 0 deletions frontend/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,19 @@ server {
try_files $uri $uri/ /index.html;
}

# SSE endpoint — disable buffering so events stream through immediately
location /api/events {
proxy_pass http://atlas-api:3000/api/events;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
}

# Proxy API requests to atlas-api service
location /api/ {
proxy_pass http://atlas-api:3000/api/;
Expand Down
28 changes: 19 additions & 9 deletions frontend/src/components/Layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Link, NavLink, Outlet, useLocation } from 'react-router-dom';
import { useEffect, useMemo, useRef, useState } from 'react';
import SearchBar from './SearchBar';
import useLatestBlockHeight from '../hooks/useLatestBlockHeight';
import useBlockSSE from '../hooks/useBlockSSE';
import SmoothCounter from './SmoothCounter';
import logoImg from '../assets/logo.png';
import { BlockStatsContext } from '../context/BlockStatsContext';
Expand All @@ -10,7 +11,8 @@ import { useTheme } from '../hooks/useTheme';
export default function Layout() {
const location = useLocation();
const isHome = location.pathname === '/';
const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, 1000000);
const sse = useBlockSSE();
const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, 1000000, sse.height, sse.connected, sse.bps);
const [now, setNow] = useState(() => Date.now());
const recentlyUpdated = lastUpdatedAt ? (now - lastUpdatedAt) < 10000 : false;
const [displayedHeight, setDisplayedHeight] = useState<number | null>(null);
Expand All @@ -25,7 +27,9 @@ export default function Layout() {
return () => window.clearInterval(id);
}, []);

// Smoothly increment displayed height using bps
// Update displayed height
// When SSE is connected: show exact height from SSE (increments one-by-one)
// When polling: use bps prediction to smooth between poll intervals
useEffect(() => {
if (height == null) {
if (displayRafRef.current !== null) {
Expand All @@ -52,9 +56,17 @@ export default function Layout() {
});
}

// When SSE is connected, just track the real height directly — no prediction.
// The initialization block above already scheduled a RAF to call setDisplayedHeight
// whenever height changes, so no synchronous setState needed here.
if (sse.connected) {
displayedRef.current = height;
return;
}

// Polling mode: use bps prediction to smooth between poll intervals
const loop = (t: number) => {
if (!bps || bps <= 0) {
// No rate info; just stick to the last known real height
if (displayedRef.current !== height) {
displayedRef.current = height;
setDisplayedHeight(displayedRef.current);
Expand All @@ -64,9 +76,7 @@ export default function Layout() {
const dt = lastFrameRef.current ? (now - lastFrameRef.current) / 1000 : 0;
lastFrameRef.current = now;

// Increase predicted height smoothly by bps * dt
const predicted = displayedRef.current + bps * dt;
// Always at least the latest known chain height
const next = Math.max(height, Math.floor(predicted));
if (next !== displayedRef.current) {
displayedRef.current = next;
Expand All @@ -84,7 +94,7 @@ export default function Layout() {
displayRafRef.current = null;
lastFrameRef.current = 0;
};
}, [height, bps]);
}, [height, bps, sse.connected]);
const blockTimeLabel = useMemo(() => {
if (bps !== null && bps > 0) {
const secs = 1 / bps;
Expand Down Expand Up @@ -175,8 +185,8 @@ export default function Layout() {
</button>
<div className="flex items-center gap-3 text-sm text-gray-300">
<span
className={`inline-block w-2.5 h-2.5 rounded-full ${recentlyUpdated ? 'bg-red-500 live-dot' : 'bg-gray-600'}`}
title={recentlyUpdated ? 'Live updates' : 'Idle'}
className={`inline-block w-2.5 h-2.5 rounded-full ${sse.connected ? 'bg-green-500 live-dot' : recentlyUpdated ? 'bg-red-500 live-dot' : 'bg-gray-600'}`}
title={sse.connected ? 'SSE connected' : recentlyUpdated ? 'Polling' : 'Idle'}
/>
<SmoothCounter value={displayedHeight} />
<span className="text-gray-600">|</span>
Expand Down Expand Up @@ -259,7 +269,7 @@ export default function Layout() {
{/* Main content */}
<main className="flex-1">
<div className="max-w-7xl mx-auto px-4 sm:px-6 lg:px-8 py-8">
<BlockStatsContext.Provider value={{ bps, height: displayedHeight }}>
<BlockStatsContext.Provider value={{ bps, height: displayedHeight, latestBlockEvent: sse.latestBlock, sseConnected: sse.connected }}>
<Outlet />
</BlockStatsContext.Provider>
</div>
Expand Down
5 changes: 1 addition & 4 deletions frontend/src/components/SmoothCounter.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ interface SmoothCounterProps {

export default function SmoothCounter({ value, className = '' }: SmoothCounterProps) {
const text = value !== null ? new Intl.NumberFormat('en-US').format(Math.floor(value)) : '—';
// Key on value so the animation restarts on change
return (
<span className={`font-mono ${className}`}>
<span key={text} className="fade-in-up inline-block align-bottom">{text}</span>
</span>
<span className={`font-mono tabular-nums ${className}`}>{text}</span>
);
}
10 changes: 9 additions & 1 deletion frontend/src/context/BlockStatsContext.tsx
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
import { createContext } from 'react';
import type { NewBlockEvent } from '../hooks/useBlockSSE';

export interface BlockStats {
bps: number | null;
height: number | null;
latestBlockEvent: NewBlockEvent | null;
sseConnected: boolean;
}

export const BlockStatsContext = createContext<BlockStats>({ bps: null, height: null });
export const BlockStatsContext = createContext<BlockStats>({
bps: null,
height: null,
latestBlockEvent: null,
sseConnected: false,
});

Loading