From 045c768d52fca15fc10152c8649647288977fdc0 Mon Sep 17 00:00:00 2001 From: Francesco Dainese Date: Thu, 11 Dec 2025 16:32:29 +0100 Subject: [PATCH 1/9] feat(gateway): portal authentication feat(portal): gateway authentication fix(registry): remove JWT feat(gateway): HTTP & RPC auth layers refactor(portal): dedicated gateway client module --- based/Cargo.lock | 5 + based/bin/gateway_registry/src/main.rs | 31 +- based/bin/overseer/src/main.rs | 2 +- based/bin/portal/src/cli.rs | 26 +- based/bin/portal/src/clients.rs | 340 +----------- based/bin/portal/src/clients/gateway.rs | 489 ++++++++++++++++++ based/bin/portal/src/server.rs | 23 +- based/crates/common/src/api.rs | 32 +- based/crates/common/src/auth.rs | 10 + based/crates/common/src/config.rs | 17 +- based/crates/common/src/lib.rs | 1 + based/crates/rpc/Cargo.toml | 5 + based/crates/rpc/src/auth.rs | 348 +++++++++++++ based/crates/rpc/src/lib.rs | 34 +- based/portal.just | 5 +- chain-replay/registry.json | 3 +- main_node/compose.dev.yml | 2 +- main_node/compose.prod.yml | 2 +- main_node/registry_example.json | 5 +- main_node_dockerized_network/compose.yml | 2 +- .../registry_example.json | 5 +- 21 files changed, 993 insertions(+), 394 deletions(-) create mode 100644 based/bin/portal/src/clients/gateway.rs create mode 100644 based/crates/common/src/auth.rs create mode 100644 based/crates/rpc/src/auth.rs diff --git a/based/Cargo.lock b/based/Cargo.lock index af514c813..71ea4ed68 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -1993,7 +1993,11 @@ dependencies = [ "axum", "bop-common", "clap", + "dashmap 6.1.0", "eyre", + "hex 0.4.3", + "http", + "http-body-util", "jsonrpsee", "op-alloy-consensus", "op-alloy-rpc-types", @@ -2004,6 +2008,7 @@ dependencies = [ "reth-rpc-layer", "serde", "serde_json", + "thiserror 2.0.12", "tokio", "tower", "tracing", diff --git a/based/bin/gateway_registry/src/main.rs b/based/bin/gateway_registry/src/main.rs index 26963ad04..14334029f 100644 --- a/based/bin/gateway_registry/src/main.rs +++ b/based/bin/gateway_registry/src/main.rs @@ -5,7 +5,7 @@ use std::{ time::Duration, }; -use alloy_primitives::{Address, B256, U256}; +use alloy_primitives::{Address, U256}; use alloy_provider::{Provider, RootProvider}; use bop_common::{ api::RegistryApiServer, @@ -108,19 +108,19 @@ enum RegistryError { } type Result = std::result::Result; -fn refresh_gateway_clients(path: impl AsRef) -> Result> { +fn refresh_gateway_clients(path: impl AsRef) -> Result> { Ok(serde_json::from_reader(std::fs::File::open(path.as_ref())?)?) } -fn write_gateway_clients(path: Arc, clients: &[(Url, Address, B256)]) { +fn write_gateway_clients(path: Arc, clients: &[(Url, Address)]) { let _ = std::fs::write(Arc::unwrap_or_clone(path), serde_json::to_string(clients).unwrap()); } #[derive(Clone)] pub struct RegistryServer { eth_client: RootProvider, - // url, address, jwt secret - gateway_clients: Arc>>, + // url, address + gateway_clients: Arc>>, gateway_update_blocks: u64, registry_path: Arc, @@ -190,7 +190,7 @@ impl RegistryServer { #[async_trait] impl RegistryApiServer for RegistryServer { #[tracing::instrument(skip_all, err, ret(level = Level::DEBUG))] - async fn get_future_gateway(&self, n_blocks_into_the_future: u64) -> RpcResult<(u64, Url, Address, B256)> { + async fn get_future_gateway(&self, n_blocks_into_the_future: u64) -> RpcResult<(u64, Url, Address)> { // let curblock = self.eth_client.block_number().await?; let curblock = if !self.use_mock_blocknumber { self.eth_client @@ -206,30 +206,35 @@ impl RegistryApiServer for RegistryServer { if n_gateways == 0 { return Err(RpcError::Jsonrpsee(ClientError::Custom("No registered gateways".to_string()))); } - let target_block = u64::try_from(curblock + U256::from_limbs([1, 0, 0, 0])).map_err(|_| RpcError::Internal)? + - n_blocks_into_the_future; + let target_block = u64::try_from(curblock + U256::from_limbs([1, 0, 0, 0])).map_err(|_| RpcError::Internal)? + + n_blocks_into_the_future; let id = (target_block / self.gateway_update_blocks) as usize; - let (url, address, jwt_in_b256) = gateways[id % n_gateways].clone(); + let (url, address) = gateways[id % n_gateways].clone(); tracing::debug!("serving future gateway for block {target_block}: url={url}, address={address}",); - Ok((target_block, url, address, jwt_in_b256)) + Ok((target_block, url, address)) } #[tracing::instrument(skip_all, err, ret(level = Level::DEBUG))] - async fn registered_gateways(&self) -> RpcResult> { + async fn registered_gateways(&self) -> RpcResult> { Ok(self.gateway_clients.read().clone()) } #[tracing::instrument(skip_all, err, ret(level = Level::DEBUG))] - async fn register_gateway(&self, gateway: (Url, Address, B256)) -> RpcResult<()> { + async fn register_gateway(&self, gateway: (Url, Address)) -> RpcResult<()> { let mut gateways = self.gateway_clients.read().clone(); - if !gateways.iter().any(|g| g.0.host() == gateway.0.host() || g.2 == gateway.2) { + if !gateways.iter().any(|g| g.0.host() == gateway.0.host() || g.1 == gateway.1) { gateways.push(gateway); write_gateway_clients(self.registry_path.clone(), &gateways); *self.gateway_clients.write() = gateways; } Ok(()) } + + #[tracing::instrument(skip_all, ret(level = Level::DEBUG))] + async fn gateway_update_blocks(&self) -> RpcResult { + Ok(self.gateway_update_blocks) + } } #[tokio::main] diff --git a/based/bin/overseer/src/main.rs b/based/bin/overseer/src/main.rs index 871c57a0f..40adf69aa 100644 --- a/based/bin/overseer/src/main.rs +++ b/based/bin/overseer/src/main.rs @@ -162,7 +162,7 @@ impl OverseerConnections { } pub fn current_gateway(&self) -> Result<(Url, Address), ClientError> { - self.runtime.block_on(self.client_portal.current_gateway()).map(|(_, url, address, _)| (url, address)) + self.runtime.block_on(self.client_portal.current_gateway()).map(|(_, url, address)| (url, address)) } pub fn peers_based_op_node(&self) -> Result, ClientError> { diff --git a/based/bin/portal/src/cli.rs b/based/bin/portal/src/cli.rs index 74257fa81..a2bf26faf 100644 --- a/based/bin/portal/src/cli.rs +++ b/based/bin/portal/src/cli.rs @@ -1,9 +1,13 @@ use std::path::PathBuf; -use bop_common::config::{LoggingConfig, LoggingFlags}; +use bop_common::{ + config::{LoggingConfig, LoggingFlags}, + signing::ECDSASigner, +}; use clap::Parser; use reqwest::Url; use reth_rpc_layer::JwtSecret; +use std::fs; use tracing::level_filters::LevelFilter; #[derive(Parser, Debug, Clone)] @@ -40,6 +44,9 @@ pub struct PortalArgs { /// Timeout for gateway requests in milliseconds #[arg(long = "gateway.timeout_ms", default_value_t = 100)] pub gateway_timeout_ms: u64, + /// Signing key used to authenticate with gateways (hex string or path to file containing hex) + #[arg(long = "gateway.signing-key")] + pub gateway_signing_key: String, /// Enable debug logging #[arg(long)] @@ -88,6 +95,10 @@ impl PortalArgs { .or_else(|_| JwtSecret::from_file(std::path::Path::new(&self.config_dir.join("jwt")))) .expect("Please set the --fallback.jwt flag manually, or generate and place a jwt file in the config dir") } + + pub fn gateway_signer(&self) -> eyre::Result { + parse_signing_key(&self.gateway_signing_key) + } } impl From<&PortalArgs> for LoggingConfig { @@ -106,3 +117,16 @@ impl From<&PortalArgs> for LoggingConfig { } } } + +fn parse_signing_key(input: &str) -> eyre::Result { + let trimmed = input.trim(); + let normalized = trimmed.trim_start_matches("0x"); + match ECDSASigner::try_from_hex(normalized) { + Ok(signer) => Ok(signer), + Err(_) => { + let contents = fs::read_to_string(trimmed)?; + let key = contents.trim().trim_start_matches("0x"); + ECDSASigner::try_from_hex(key).map_err(|err| eyre::eyre!("failed to parse gateway signing key: {err}")) + } + } +} diff --git a/based/bin/portal/src/clients.rs b/based/bin/portal/src/clients.rs index 7ffba46bf..de58cc3e7 100644 --- a/based/bin/portal/src/clients.rs +++ b/based/bin/portal/src/clients.rs @@ -1,342 +1,15 @@ -use std::{ - fmt, - sync::{ - Arc, - atomic::{AtomicBool, AtomicU64, Ordering}, - }, -}; - -use alloy_eips::eip7685::RequestsOrHash; -use alloy_primitives::{Address, B256, U64, hex}; -use alloy_rpc_types::engine::{ExecutionPayloadV3, ForkchoiceState}; -use bop_common::{ - api::{ControlApiClient, EngineApiClient, OpMinerExtApiClient, RegistryApiClient}, - communication::Producer, - debug_panic, - metrics::{Gauge, Metric, MetricsUpdate}, - time::{Duration, Instant}, - utils::uuid, -}; -use indexmap::IndexMap; +use bop_common::time::Duration; use jsonrpsee::{ - core::{ClientError, middleware::layer::RpcLogger}, + core::middleware::layer::RpcLogger, http_client::{HttpClient, HttpClientBuilder, RpcService, transport::HttpBackend}, }; -use op_alloy_rpc_types_engine::{OpExecutionPayloadV4, OpPayloadAttributes}; use reqwest::Url; use reth_rpc_layer::{AuthClientLayer, AuthClientService, JwtSecret}; -use tokio::sync::RwLock; -use tracing::{Instrument, debug, error, info, trace}; -use crate::cli::PortalArgs; +mod gateway; +pub use gateway::*; pub type RpcClient = jsonrpsee::http_client::HttpClient; -pub type AuthRpcClient = HttpClient>>>; - -#[derive(Clone)] -pub struct Gateway { - pub url: Url, - pub jwt: JwtSecret, - pub address: Address, - pub client: AuthRpcClient, - pub ping: Arc>, - pub active: Arc, - pub registry_index: Arc, - pub metrics: Producer, -} - -impl Gateway { - pub async fn health_check(&self) { - let ping_start = Instant::now(); - match ControlApiClient::heartbeat(&self.client).await { - Ok(_) => { - let ping_duration = ping_start.elapsed(); - *self.ping.write().await = ping_duration; - self.active.store(true, Ordering::Relaxed); - info!("successfully pinged gateway={} ping={:>9}", self.url, ping_duration.to_string()); - MetricsUpdate::send_ref( - uuid(), - Metric::SetGauge(Gauge::PortalGatewayPingLatencyMs(self.address), ping_duration.as_millis()), - &self.metrics, - ); - } - Err(err) => { - error!(%err, ?self, "failed to ping gateway"); - self.active.store(false, Ordering::Relaxed); - } - } - } - - pub fn is_active(&self) -> bool { - self.active.load(Ordering::Relaxed) - } -} - -impl fmt::Debug for Gateway { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.url) - } -} - -impl Gateway { - fn new( - url: Url, - client: AuthRpcClient, - jwt: JwtSecret, - address: Address, - registry_index: usize, - metrics: Producer, - ) -> Self { - Self { - url, - jwt, - address, - client, - ping: Arc::new(RwLock::new(Duration::from_millis(0))), - active: Arc::new(AtomicBool::new(false)), - registry_index: Arc::new(AtomicU64::new(registry_index as u64)), - metrics, - } - } -} - -pub type GatewayInstance = Arc; - -pub struct GatewayManager { - gateways: Arc>>, - pub registry_client: RpcClient, - current_gateway: Arc>>, - pub metrics: Producer, -} - -impl GatewayManager { - pub fn new(registry_client: RpcClient, metrics: Producer) -> Self { - Self { - gateways: Arc::new(RwLock::new(IndexMap::new())), - registry_client, - current_gateway: Arc::new(RwLock::new(None)), - metrics, - } - } - - pub fn new_from_args(args: &PortalArgs, metrics: Producer) -> Self { - let registry_client_url = args.registry_url.clone(); - let timeout = Duration::from_millis(args.fallback_timeout_ms); - let registry_client = create_client(registry_client_url, timeout).unwrap(); - Self::new(registry_client, metrics) - } - - pub async fn update_gateway_list(&self) -> eyre::Result<()> { - let raw_gateways = self.registry_client.registered_gateways().await?; - let mut gateways = self.gateways.write().await; - - let timeout = Duration::from_millis(1000); - - for (index, (url, address, jwt_as_b256)) in raw_gateways.iter().enumerate() { - let jwt_as_str = hex::encode(jwt_as_b256); - let jwt = JwtSecret::from_hex(&jwt_as_str).map_err(|_| eyre::eyre!("Invalid JWT secret"))?; - match gateways.get_mut(url) { - Some(gateway) => { - let gateway = Arc::make_mut(gateway); - gateway.jwt = jwt; - gateway.address = *address; - gateway.registry_index.store(index as u64, Ordering::Relaxed); - } - None => { - let client = create_auth_client(url.clone(), jwt, timeout)?; - let gateway = Gateway::new(url.clone(), client, jwt, *address, index, self.metrics); - gateways.insert(url.clone(), Arc::new(gateway)); - } - } - } - - gateways.retain(|url, _| raw_gateways.iter().any(|(u, _, _)| u == url)); - - if gateways.len() != raw_gateways.len() { - error!("Mismatch in number of gateways: expected {}, found {}", raw_gateways.len(), gateways.len()); - debug_panic!("Mismatch in number of gateways: expected {}, found {}", raw_gateways.len(), gateways.len()); - } - - gateways.sort_by(|_, v1, _, v2| { - let index1 = v1.registry_index.load(Ordering::Relaxed); - let index2 = v2.registry_index.load(Ordering::Relaxed); - index1.cmp(&index2) - }); - Ok(()) - } - - async fn get_next_available_gateway(&self, start_index: usize) -> Option { - let gateways = self.gateways.read().await; - let len = gateways.len(); - for i in 0..len { - let index = (start_index + i) % len; - if let Some((_, gateway)) = gateways.get_index(index) { - if gateway.is_active() { - return Some(Arc::clone(gateway)); - } - } - } - None - } - - // this method should be called at the block transition (end of block / start of new block) - pub async fn decide_current_gateway(&self) -> Option { - let gateways = self.gateways.read().await; - match self.registry_client.current_gateway().await { - Ok((_, current_registry_gateway_url, _, _)) => match gateways.get_index_of(¤t_registry_gateway_url) { - Some(index) => { - let result = self.get_next_available_gateway(index).await; - if let Some(gateway) = &result { - self.current_gateway.write().await.replace(Arc::clone(gateway)); - MetricsUpdate::send_ref( - uuid(), - Metric::SetGauge(Gauge::PortalCurrentGatewayRegistryAddress(gateway.address), 1.0), - &self.metrics, - ); - } - result - } - None => { - error!("Current registry gateway not found in local list: {}", current_registry_gateway_url); - None - } - }, - Err(_) => { - error!("Failed to fetch current gateway from registry"); - None - } - } - } - - pub async fn health_check(&self) { - let gateways = self.gateways.read().await; - for gateway in gateways.values().collect::>() { - let gateway = gateway.clone(); - tokio::spawn(async move { - gateway.health_check().await; - }); - } - } - - pub async fn current_gateway(&self) -> Option { - self.current_gateway.read().await.as_ref().cloned() - } - - async fn _send_fcu( - fork_choice_state: ForkchoiceState, - payload_attributes: Option, - gateway: GatewayInstance, - ) { - match gateway.client.fork_choice_updated_v3(fork_choice_state, payload_attributes).await { - Ok(res) => { - if res.is_valid() { - trace!(?gateway, ?res, "gateway response"); - } else { - trace!(?gateway, ?res, "Error: gateway response"); - } - } - Err(err) => trace!(%err, "Error: failed gateway"), - } - debug!(?gateway, "served fcu") - } - - pub async fn send_fcu(&self, fork_choice_state: ForkchoiceState, payload_attributes: Option) { - match self.current_gateway().await { - Some(gateway) => { - if gateway.is_active() { - tokio::spawn(Self::_send_fcu(fork_choice_state, payload_attributes, gateway)); - } else { - error!("Current gateway is not active, cannot send fork choice update"); - } - } - None => { - for gateway in self.gateways.read().await.values() { - let payload_attributes = payload_attributes.clone(); - tokio::spawn(Self::_send_fcu(fork_choice_state, payload_attributes, gateway.clone())); - } - } - } - } - - pub async fn broadcast_new_payload_v3( - &self, - payload: ExecutionPayloadV3, - versioned_hashes: Vec, - parent_beacon_block_root: B256, - ) { - for gateway in self.gateways.read().await.values() { - let gateway = gateway.clone(); - let payload = payload.clone(); - let versioned_hashes = versioned_hashes.clone(); - tokio::spawn( - async move { - match gateway.client.new_payload_v3(payload, versioned_hashes, parent_beacon_block_root).await { - Ok(res) => { - if res.is_valid() { - debug!(?gateway, ?res, "gateway response"); - } else { - error!(?gateway, ?res, "gateway response"); - } - } - Err(ClientError::Call(_)) => {} - Err(err) => error!(?gateway, %err, "failed gateway"), - } - } - .in_current_span(), - ); - } - } - - pub async fn broadcast_new_payload_v4( - &self, - payload: OpExecutionPayloadV4, - versioned_hashes: Vec, - parent_beacon_block_root: B256, - requests: RequestsOrHash, - ) { - for gateway in self.gateways.read().await.values() { - let gateway = gateway.clone(); - let payload = payload.clone(); - let requests = requests.clone(); - let versioned_hashes = versioned_hashes.clone(); - tokio::spawn( - async move { - match gateway - .client - .new_payload_v4(payload, versioned_hashes, parent_beacon_block_root, requests) - .await - { - Ok(res) => { - if res.is_valid() { - debug!(?gateway, ?res, "gateway response"); - } else { - error!(?gateway, ?res, "gateway response"); - } - } - Err(ClientError::Call(_)) => {} - Err(err) => error!(?gateway, %err, "failed gateway"), - } - } - .in_current_span(), - ); - } - } - - pub async fn broadcast_set_max_da_size(&self, max_tx_size: U64, max_block_size: U64) { - for gateway in self.gateways.read().await.values() { - let gateway = gateway.clone(); - tokio::spawn( - async move { - if let Err(err) = gateway.client.set_max_da_size(max_tx_size, max_block_size).await { - error!(?gateway, %err, "failed to forward miner_setMaxDASize"); - } - } - .in_current_span(), - ); - } - } -} - pub fn create_client(url: Url, timeout: Duration) -> eyre::Result { let client = HttpClientBuilder::default() .max_request_size(u32::MAX) @@ -346,8 +19,9 @@ pub fn create_client(url: Url, timeout: Duration) -> eyre::Result { Ok(client) } -pub fn create_auth_client(url: Url, jwt: JwtSecret, timeout: Duration) -> eyre::Result { - let secret_layer = AuthClientLayer::new(jwt); +pub type AuthRpcClient = HttpClient>>>; +pub fn create_auth_client(url: Url, token: JwtSecret, timeout: Duration) -> eyre::Result { + let secret_layer = AuthClientLayer::new(token); let middleware = tower::ServiceBuilder::default().layer(secret_layer); let client = HttpClientBuilder::default() diff --git a/based/bin/portal/src/clients/gateway.rs b/based/bin/portal/src/clients/gateway.rs new file mode 100644 index 000000000..a1e6759eb --- /dev/null +++ b/based/bin/portal/src/clients/gateway.rs @@ -0,0 +1,489 @@ +use std::{ + fmt, + sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, + }, + time::SystemTime, +}; + +use alloy_eips::eip7685::RequestsOrHash; +use alloy_primitives::{Address, B256, Signature, U64}; +use alloy_rpc_types::engine::{ExecutionPayloadV3, ForkchoiceState}; +use bop_common::{ + api::{BasedAuthApiClient, ControlApiClient, EngineApiClient, OpMinerExtApiClient, RegistryApiClient}, + auth::gateway_auth_message, + communication::Producer, + debug_panic, + metrics::{Gauge, Metric, MetricsUpdate}, + signing::ECDSASigner, + time::{Duration, Instant}, + utils::uuid, +}; +use eyre::eyre; +use indexmap::IndexMap; +use jsonrpsee::{ + core::{ClientError, middleware::layer::RpcLogger}, + http_client::{HttpClient, HttpClientBuilder, HttpRequest, RpcService, transport::HttpBackend}, +}; +use op_alloy_rpc_types_engine::{OpExecutionPayloadV4, OpPayloadAttributes}; +use reqwest::{ + Url, + header::{AUTHORIZATION, HeaderValue, InvalidHeaderValue}, +}; +use tokio::sync::RwLock; +use tower::{Layer, Service}; +use tracing::{Instrument, debug, error, info, trace, warn}; + +use crate::{cli::PortalArgs, clients::create_client}; + +use super::RpcClient; + +pub type GatewayClient = HttpClient>>>; + +#[derive(Debug)] +pub struct GatewayClientAuthLayer { + pub token: Arc, +} +impl GatewayClientAuthLayer { + pub fn new(token: &str) -> Result { + let value = HeaderValue::from_str(token)?; + Ok(Self { token: Arc::new(value) }) + } +} + +impl Layer for GatewayClientAuthLayer { + type Service = GatewayClientService; + + fn layer(&self, inner: S) -> Self::Service { + GatewayClientService { jwt: self.token.clone(), inner } + } +} + +#[derive(Debug, Clone)] +pub struct GatewayClientService { + jwt: Arc, + inner: S, +} + +impl Service> for GatewayClientService +where + S: Service>, +{ + type Response = S::Response; + + type Error = S::Error; + + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: HttpRequest) -> Self::Future { + req.headers_mut().insert(AUTHORIZATION, Arc::unwrap_or_clone(self.jwt.clone())); + self.inner.call(req) + } +} + +#[derive(Clone)] +pub struct Gateway { + pub url: Url, + pub address: Arc
, + pub client: Arc>>, + pub ping: Arc>, + pub active: Arc, + pub registry_index: Arc, + pub metrics: Producer, +} + +impl Gateway { + pub async fn health_check(&self) { + let Some(client) = self.client.read().await.clone() else { + self.active.store(false, Ordering::Relaxed); + return; + }; + let ping_start = Instant::now(); + match ControlApiClient::heartbeat(&client).await { + Ok(_) => { + let ping_duration = ping_start.elapsed(); + *self.ping.write().await = ping_duration; + self.active.store(true, Ordering::Relaxed); + info!("successfully pinged gateway={} ping={:>9}", self.url, ping_duration.to_string()); + MetricsUpdate::send_ref( + uuid(), + Metric::SetGauge(Gauge::PortalGatewayPingLatencyMs(*self.address), ping_duration.as_millis()), + &self.metrics, + ); + } + Err(err) => { + // TODO: specifically handle authentication error by removing client? + error!(%err, ?self, "failed to ping gateway"); + self.active.store(false, Ordering::Relaxed); + } + } + } + + pub fn is_active(&self) -> bool { + self.active.load(Ordering::Relaxed) + } + + /// Try to reauthenticate with the given gateway + pub async fn attempt_authentication( + &self, + timestamp: u64, + signature: Signature, + timeout: Duration, + ) -> eyre::Result<()> { + let mut client = self.client.write().await; + + // Invalidate the old client and request a new token. + // Attempt to reuse the client if one exists + let response = match client.take() { + None => { + let temp_client = create_client(self.url.clone(), timeout)?; + temp_client.authenticate_proposer(timestamp, signature).await + } + Some(old_client) => old_client.authenticate_proposer(timestamp, signature).await, + } + .map_err(|err| eyre!("gateway authentication RPC failed: {err}"))?; + + let auth_layer = GatewayClientAuthLayer::new(&response.token)?; + let middleware = tower::ServiceBuilder::default().layer(auth_layer); + + let new_client = HttpClientBuilder::default() + .max_request_size(u32::MAX) + .max_response_size(u32::MAX) + .set_http_middleware(middleware) + .request_timeout(timeout.into()) + .build(self.url.clone())?; + + client.replace(new_client); + + info!(url = %self.url, "authenticated gateway"); + + Ok(()) + } + + pub async fn client(&self) -> Option { + self.client.read().await.clone() + } +} + +impl fmt::Debug for Gateway { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.url) + } +} + +impl Gateway { + fn new(url: Url, address: Address, registry_index: usize, metrics: Producer) -> Self { + Self { + url, + address: Arc::new(address), + client: Arc::new(RwLock::new(None)), + ping: Arc::new(RwLock::new(Duration::from_millis(0))), + active: Arc::new(AtomicBool::new(false)), + registry_index: Arc::new(AtomicU64::new(registry_index as u64)), + metrics, + } + } +} + +pub type GatewayInstance = Arc; + +pub struct GatewayManager { + gateways: Arc>>, + pub registry_client: RpcClient, + current_gateway: Arc>>, + pub metrics: Producer, + + // TODO: refactor out in dedicated auth structs + authentication_signer: Arc, + + gateway_timeout: Duration, +} + +impl GatewayManager { + pub fn new( + registry_client: RpcClient, + metrics: Producer, + signer: Arc, + gateway_timeout: Duration, + ) -> Self { + Self { + gateways: Arc::new(RwLock::new(IndexMap::new())), + registry_client, + current_gateway: Arc::new(RwLock::new(None)), + metrics, + authentication_signer: signer, + gateway_timeout, + } + } + + pub fn new_from_args(args: &PortalArgs, signer: Arc, metrics: Producer) -> Self { + let registry_client_url = args.registry_url.clone(); + let timeout = Duration::from_millis(args.registry_timeout_ms); + let registry_client = create_client(registry_client_url, timeout).unwrap(); + let gateway_timeout = Duration::from_millis(args.gateway_timeout_ms); + Self::new(registry_client, metrics, signer, gateway_timeout) + } + + pub async fn update_gateway_list(&self) -> eyre::Result<()> { + let raw_gateways = self.registry_client.registered_gateways().await?; + let mut gateways = self.gateways.write().await; + + gateways.retain(|url, _| raw_gateways.iter().any(|(u, _)| u == url)); + + let expected_gateways = raw_gateways.len(); + + for (index, (url, address)) in raw_gateways.into_iter().enumerate() { + match gateways.get_mut(&url) { + Some(gateway) => { + let gateway = Arc::make_mut(gateway); + gateway.address = Arc::new(address); + gateway.registry_index.store(index as u64, Ordering::Relaxed); + } + None => { + let gateway = Gateway::new(url.clone(), address, index, self.metrics.clone()); + gateways.insert(url, Arc::new(gateway)); + } + } + } + + let actual_gateways = gateways.len(); + if actual_gateways != expected_gateways { + error!("Mismatch in number of gateways: expected {}, found {}", expected_gateways, actual_gateways); + debug_panic!("Mismatch in number of gateways: expected {}, found {}", expected_gateways, actual_gateways); + } + + gateways.sort_by(|_, v1, _, v2| { + let index1 = v1.registry_index.load(Ordering::Relaxed); + let index2 = v2.registry_index.load(Ordering::Relaxed); + index1.cmp(&index2) + }); + Ok(()) + } + + async fn get_next_available_gateway(&self, start_index: usize) -> Option { + let gateways = self.gateways.read().await; + let len = gateways.len(); + for i in 0..len { + let index = (start_index + i) % len; + if let Some((_, gateway)) = gateways.get_index(index) { + if gateway.is_active() { + return Some(Arc::clone(gateway)); + } + } + } + None + } + + // this method should be called at the block transition (end of block / start of new block) + pub async fn decide_current_gateway(&self) -> Option { + let gateways = self.gateways.read().await; + + let gateway = match self.registry_client.current_gateway().await { + Ok((_, url, _)) => match gateways.get_index_of(&url) { + Some(index) => self.get_next_available_gateway(index).await, + None => { + error!("Current registry gateway not found in local list: {}", url); + None + } + }, + Err(_) => { + error!("Failed to fetch current gateway from registry"); + None + } + }?; + + Self::prepare_gateway(&self.authentication_signer, self.gateway_timeout, &gateway).await.ok()?; + + self.current_gateway.write().await.replace(gateway.clone()); + + MetricsUpdate::send_ref( + uuid(), + Metric::SetGauge(Gauge::PortalCurrentGatewayRegistryAddress(*gateway.address), 1.0), + &self.metrics, + ); + + Some(gateway) + } + + async fn authenticate_gateway( + signer: &ECDSASigner, + timeout: Duration, + gateway: GatewayInstance, + valid_from: SystemTime, + ) -> eyre::Result<()> { + let timestamp = valid_from.duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_secs(); + + let message = gateway_auth_message(*gateway.address, timestamp); + let signature = + signer.sign_message(message).map_err(|err| eyre!("failed to sign gateway auth payload: {err}"))?; + + gateway.attempt_authentication(timestamp, signature, timeout).await?; + + Ok(()) + } + + /// Attempts to authenticate with the gateway if it's unauthenticated + async fn prepare_gateway(signer: &ECDSASigner, timeout: Duration, gateway: &GatewayInstance) -> eyre::Result<()> { + if gateway.client().await.is_some() { + return Ok(()); + } + + if let Err(err) = Self::authenticate_gateway(signer, timeout, gateway.clone(), SystemTime::now()).await { + error!(%err, url = %gateway.url, "failed to authenticate gateway"); + return Err(err); + } + + Ok(()) + } + + pub async fn health_check(&self) { + let timeout = self.gateway_timeout; + let gateways = self.gateways.read().await; + for gateway in gateways.values().collect::>() { + let gateway = gateway.clone(); + let signer = self.authentication_signer.clone(); + + tokio::spawn(async move { + gateway.health_check().await; + // TODO: only if health check fails due to auth + _ = Self::prepare_gateway(&signer, timeout, &gateway).await; + }); + } + } + + pub async fn current_gateway(&self) -> Option { + self.current_gateway.read().await.as_ref().cloned() + } + + async fn _send_fcu( + fork_choice_state: ForkchoiceState, + payload_attributes: Option, + gateway: GatewayInstance, + ) { + let Some(client) = gateway.client().await else { + warn!(url = %gateway.url, "cannot send fork choice update to unauthenticated gateway"); + return; + }; + + match client.fork_choice_updated_v3(fork_choice_state, payload_attributes).await { + Ok(res) => { + if res.is_valid() { + trace!(?gateway, ?res, "gateway response"); + } else { + trace!(?gateway, ?res, "Error: gateway response"); + } + } + Err(err) => trace!(%err, "Error: failed gateway"), + } + debug!(?gateway, "served fcu") + } + + pub async fn send_fcu(&self, fork_choice_state: ForkchoiceState, payload_attributes: Option) { + match self.current_gateway().await { + Some(gateway) => { + if gateway.is_active() { + tokio::spawn(Self::_send_fcu(fork_choice_state, payload_attributes, gateway)); + } else { + error!("Current gateway is not active, cannot send fork choice update"); + } + } + None => { + for gateway in self.gateways.read().await.values() { + let payload_attributes = payload_attributes.clone(); + tokio::spawn(Self::_send_fcu(fork_choice_state, payload_attributes, gateway.clone())); + } + } + } + } + + pub async fn broadcast_new_payload_v3( + &self, + payload: ExecutionPayloadV3, + versioned_hashes: Vec, + parent_beacon_block_root: B256, + ) { + for gateway in self.gateways.read().await.values() { + let gateway = gateway.clone(); + let payload = payload.clone(); + let versioned_hashes = versioned_hashes.clone(); + tokio::spawn( + async move { + let Some(client) = gateway.client().await else { + warn!(url = %gateway.url, "skipping newPayloadV3 broadcast to unauthenticated gateway"); + return; + }; + match client.new_payload_v3(payload, versioned_hashes, parent_beacon_block_root).await { + Ok(res) => { + if res.is_valid() { + debug!(?gateway, ?res, "gateway response"); + } else { + error!(?gateway, ?res, "gateway response"); + } + } + Err(ClientError::Call(_)) => {} + Err(err) => error!(?gateway, %err, "failed gateway"), + } + } + .in_current_span(), + ); + } + } + + pub async fn broadcast_new_payload_v4( + &self, + payload: OpExecutionPayloadV4, + versioned_hashes: Vec, + parent_beacon_block_root: B256, + requests: RequestsOrHash, + ) { + for gateway in self.gateways.read().await.values() { + let gateway = gateway.clone(); + let payload = payload.clone(); + let requests = requests.clone(); + let versioned_hashes = versioned_hashes.clone(); + tokio::spawn( + async move { + let Some(client) = gateway.client().await else { + warn!(url = %gateway.url, "skipping newPayloadV4 broadcast to unauthenticated gateway"); + return; + }; + match client.new_payload_v4(payload, versioned_hashes, parent_beacon_block_root, requests).await { + Ok(res) => { + if res.is_valid() { + debug!(?gateway, ?res, "gateway response"); + } else { + error!(?gateway, ?res, "gateway response"); + } + } + Err(ClientError::Call(_)) => {} + Err(err) => error!(?gateway, %err, "failed gateway"), + } + } + .in_current_span(), + ); + } + } + + pub async fn broadcast_set_max_da_size(&self, max_tx_size: U64, max_block_size: U64) { + for gateway in self.gateways.read().await.values() { + let gateway = gateway.clone(); + tokio::spawn( + async move { + let Some(client) = gateway.client().await else { + warn!(url = %gateway.url, "skipping miner_setMaxDASize for unauthenticated gateway"); + return; + }; + if let Err(err) = client.set_max_da_size(max_tx_size, max_block_size).await { + error!(?gateway, %err, "failed to forward miner_setMaxDASize"); + } + } + .in_current_span(), + ); + } + } +} diff --git a/based/bin/portal/src/server.rs b/based/bin/portal/src/server.rs index 642a2da95..8b8390651 100644 --- a/based/bin/portal/src/server.rs +++ b/based/bin/portal/src/server.rs @@ -61,18 +61,20 @@ impl PortalServer { let fallback_eth_client = create_client(args.fallback_eth_url.clone(), timeout)?; let op_node_client = create_client(args.op_node_url.clone(), timeout)?; let fallback_client = create_auth_client(args.fallback_url.clone(), args.fallback_jwt(), timeout)?; - let metrics = metrics_queue().into(); + let signer = Arc::new(args.gateway_signer()?); + let metrics: Producer = metrics_queue().into(); + let args = Arc::new(args); let temp = Self { geth_client: fallback_eth_client, geth_engine_client: fallback_client, op_node_client, - gateway_manager: Arc::new(GatewayManager::new_from_args(&args, metrics)), + gateway_manager: Arc::new(GatewayManager::new_from_args(&args, signer, metrics.clone())), new_payload_block_number: Arc::new(AtomicU64::new(0)), new_payload_block_hash: Arc::new(RwLock::new(B256::ZERO)), current_block_number: Arc::new(AtomicU64::new(0)), metrics, - args: Arc::new(args), + args, }; let _ = temp.gateway_manager.update_gateway_list().await; @@ -261,6 +263,9 @@ impl EngineApiServer for PortalServer { }); let Some(gateway) = self.gateway_manager.current_gateway().await else { return Ok(fallback_fut.await??) }; + let Some(gateway_client) = gateway.client().await else { return Ok(fallback_fut.await??) }; + let gateway_for_task = gateway.clone(); + let gateway_for_log = gateway.clone(); let gateway_fut: tokio::task::JoinHandle> = tokio::spawn( { @@ -268,8 +273,7 @@ impl EngineApiServer for PortalServer { let fallback_client = self.geth_engine_client.clone(); async move { - let gateway_payload = gateway - .client + let gateway_payload = gateway_client .get_payload_v4(payload_id) .await .inspect_err(|err| error!(%err, "failed gateway"))?; @@ -288,10 +292,10 @@ impl EngineApiServer for PortalServer { .inspect_err(|err| error!(%err, "failed fallback validation"))?; if payload_status.is_valid() { - trace!(?gateway, ?gateway_payload, ?payload_status, "gateway response"); + trace!(?gateway_for_task, ?gateway_payload, ?payload_status, "gateway response"); Ok(gateway_payload) } else { - error!(?gateway, ?gateway_payload, ?payload_status, "gateway response"); + error!(?gateway_for_task, ?gateway_payload, ?payload_status, "gateway response"); Err(RpcError::Internal) } } @@ -304,10 +308,11 @@ impl EngineApiServer for PortalServer { // ignore join errors let fallback = fallback?; let gateway = gateway?; - if let Ok(gateway) = gateway.as_ref() { + if let Ok(gateway_payload) = gateway.as_ref() { info!( "block {}: successfully served from based-gateway {:?}", - gateway.execution_payload.payload_inner.payload_inner.payload_inner.block_number, gateway + gateway_payload.execution_payload.payload_inner.payload_inner.payload_inner.block_number, + gateway_for_log ); MetricsUpdate::send_ref( uuid(), diff --git a/based/crates/common/src/api.rs b/based/crates/common/src/api.rs index d05b513c9..adbd48886 100644 --- a/based/crates/common/src/api.rs +++ b/based/crates/common/src/api.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use alloy_eips::eip7685::RequestsOrHash; -use alloy_primitives::{Address, B256, Bytes, U64}; +use alloy_primitives::{Address, B256, Bytes, Signature, U64}; use alloy_rpc_types::engine::{ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus}; use jsonrpsee::proc_macros::rpc; use op_alloy_consensus::OpTxEnvelope; @@ -83,25 +83,47 @@ pub trait MinimalEthApi { async fn send_raw_transaction(&self, bytes: Bytes) -> RpcResult; } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct GatewayAuthentication { + pub token: String, + pub challenger: Address, +} + +#[rpc(client, server, namespace = "based")] +pub trait BasedAuthApi { + /// Returns an authentication challenge with the given parameters, ready for signing. + #[method(name = "authenticationChallenge")] + async fn authentication_challenge(&self, valid_from: u64) -> RpcResult; + + /// Authenticate the Proposer who signed the authentication challenge. + /// + /// The JWT will be valid from the specified `valid_from` timestamp, expressed in seconds since UNIX_EPOCH. + #[method(name = "authenticateProposer")] + async fn authenticate_proposer(&self, valid_from: u64, signature: Signature) -> RpcResult; +} + #[rpc(client, server, namespace = "registry")] pub trait RegistryApi { /// Returns the future blocknumber and corresponding gateway url and address #[method(name = "futureGateway")] - async fn get_future_gateway(&self, n_blocks_into_future: u64) -> RpcResult<(u64, Url, Address, B256)>; + async fn get_future_gateway(&self, n_blocks_into_future: u64) -> RpcResult<(u64, Url, Address)>; /// Returns the current blocknumber and corresponding gateway url and address #[method(name = "currentGateway")] - async fn current_gateway(&self) -> RpcResult<(u64, Url, Address, B256)> { + async fn current_gateway(&self) -> RpcResult<(u64, Url, Address)> { self.get_future_gateway(0).await } /// Returns the current blocknumber and corresponding gateway url and address #[method(name = "registeredGateways")] - async fn registered_gateways(&self) -> RpcResult>; + async fn registered_gateways(&self) -> RpcResult>; /// Returns the current blocknumber and corresponding gateway url and address #[method(name = "registerGateway")] - async fn register_gateway(&self, gateway: (Url, Address, B256)) -> RpcResult<()>; + async fn register_gateway(&self, gateway: (Url, Address)) -> RpcResult<()>; + + #[method(name = "gatewayUpdateBlocks")] + async fn gateway_update_blocks(&self) -> RpcResult; } #[rpc(client, server, namespace = "portal")] diff --git a/based/crates/common/src/auth.rs b/based/crates/common/src/auth.rs new file mode 100644 index 000000000..47cec277b --- /dev/null +++ b/based/crates/common/src/auth.rs @@ -0,0 +1,10 @@ +use alloy_primitives::{Address, B256, keccak256}; + +/// Hashes the tuple `(gateway_address, token_valid_from)` to authenticate a given portal to the gateway +pub fn gateway_auth_message(gateway: Address, valid_from: u64) -> B256 { + let mut encoded = [0u8; 28]; + encoded[..20].copy_from_slice(gateway.as_slice()); + encoded[20..].copy_from_slice(valid_from.to_le_bytes().as_slice()); + + keccak256(encoded) +} diff --git a/based/crates/common/src/config.rs b/based/crates/common/src/config.rs index 08ee48775..6f1d92241 100644 --- a/based/crates/common/src/config.rs +++ b/based/crates/common/src/config.rs @@ -1,6 +1,6 @@ use std::{net::Ipv4Addr, ops::RangeInclusive, path::PathBuf, str::FromStr, sync::Arc}; -use alloy_rpc_types::engine::JwtSecret; +use alloy_primitives::Address; use clap::Parser; use reqwest::Url; use reth_cli::chainspec::ChainSpecParser; @@ -38,8 +38,9 @@ pub struct GatewayArgs { pub rpc_port_no_auth: u16, #[arg(long = "rpc.port_ws", default_value_t = 9999)] pub rpc_port_ws: u16, - #[arg(long = "rpc.jwt")] - pub rpc_jwt: String, + /// Address that identifies this gateway inside the registry. + #[arg(long = "gateway.address")] + pub gateway_address: Address, /// Url to an L2 eth api rpc #[arg(long = "eth_client.url", default_value = "http://localhost:8545")] pub eth_client_url: Url, @@ -127,15 +128,13 @@ pub struct GatewayArgs { /// Port for prometheus server #[arg(long = "metrics.port", default_value_t = 9464)] pub metrics_port: u16, + + /// Duration (in minutes) of the authentication token that portals may request. + #[arg(long = "auth.token_duration", default_value_t = 720)] + pub auth_duration: u64, } impl GatewayArgs { - pub fn sequencer_jwt(&self) -> JwtSecret { - JwtSecret::from_hex(&self.rpc_jwt) - .or_else(|_| JwtSecret::from_file(std::path::Path::new(&self.rpc_jwt))) - .expect("Couldn't parse sequencer_jwt") - } - pub fn gossip_signer_private_key(&self) -> Option { self.gossip_signer_private_key.as_ref().and_then(|key_str| { // Try to parse as hex first diff --git a/based/crates/common/src/lib.rs b/based/crates/common/src/lib.rs index f7b397865..7dfc0bb6f 100644 --- a/based/crates/common/src/lib.rs +++ b/based/crates/common/src/lib.rs @@ -1,5 +1,6 @@ pub mod actor; pub mod api; +pub mod auth; pub mod communication; pub mod config; pub mod db; diff --git a/based/crates/rpc/Cargo.toml b/based/crates/rpc/Cargo.toml index c827ae345..819819130 100644 --- a/based/crates/rpc/Cargo.toml +++ b/based/crates/rpc/Cargo.toml @@ -23,6 +23,11 @@ tokio.workspace = true tower.workspace = true tracing.workspace = true tree_hash.workspace = true +dashmap = "6.1" +hex = "0.4" +http.workspace = true +thiserror.workspace = true +http-body-util = "0.1.3" [dev-dependencies] alloy-consensus.workspace = true diff --git a/based/crates/rpc/src/auth.rs b/based/crates/rpc/src/auth.rs new file mode 100644 index 000000000..5f2ff27d5 --- /dev/null +++ b/based/crates/rpc/src/auth.rs @@ -0,0 +1,348 @@ +use std::{ + future::Future, + pin::Pin, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use alloy_primitives::{Address, B256, Signature}; +use alloy_rpc_types::engine::{Claims, JwtSecret}; +use bop_common::{ + api::{BasedAuthApiServer, GatewayAuthentication}, + auth::gateway_auth_message, + communication::messages::{RpcError, RpcResult}, + config::GatewayArgs, +}; +use dashmap::DashMap; +use http::{HeaderMap, StatusCode}; +use jsonrpsee::{ + MethodResponse, RpcModule, + core::{ + async_trait, + middleware::{Request, RpcServiceT}, + }, + http_client::{HttpBody, HttpRequest, HttpResponse}, + types::{ErrorObject, error::INVALID_PARAMS_CODE}, +}; +use thiserror::Error; +use tower::{Layer, Service}; +use tracing::{error, info}; + +#[derive(Clone, Debug)] +pub struct AuthConfig { + /// Represents the current gateway's address + pub gateway_address: Address, + /// Represents the durationfor which the token will remain valid + pub token_validity: Duration, +} + +impl From<&GatewayArgs> for AuthConfig { + fn from(args: &GatewayArgs) -> Self { + Self { gateway_address: args.gateway_address, token_validity: Duration::from_secs(args.auth_duration * 60) } + } +} + +#[derive(Debug)] +pub struct AuthEntry { + pub secret: JwtSecret, + pub expires_at: SystemTime, +} + +impl AuthEntry { + // TODO: better id that doesn't leak secret data + fn id(&self) -> B256 { + B256::from_slice(self.secret.as_bytes()) + } +} + +#[derive(Debug)] +pub struct AuthManager { + entries: DashMap>, + cfg: AuthConfig, +} + +impl AuthManager { + pub fn new(cfg: AuthConfig) -> Self { + Self { entries: DashMap::new(), cfg } + } + + pub fn config(&self) -> &AuthConfig { + &self.cfg + } + + fn purge(&self, now: SystemTime) { + self.entries.retain(|_, entry| entry.expires_at >= now); + } + + pub fn issue(&self, challenger: Address, issued_at: SystemTime) -> GatewayAuthentication { + let secret = JwtSecret::random(); + let expiry = issued_at + self.cfg.token_validity; + + let entry = Arc::new(AuthEntry { secret: secret.clone(), expires_at: expiry.clone() }); + + self.entries.insert(entry.id(), entry.clone()); + info!(%challenger, "issued JWT secret"); + + let expiry = expiry.duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_secs(); + let issued_at = issued_at.duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_secs(); + + let claims = Claims { exp: Some(expiry), iat: issued_at }; + + GatewayAuthentication { token: secret.encode(&claims).expect("able to encode JWT claims"), challenger } + } + + pub fn validate(&self, token: &str) -> Result, AuthError> { + let now = SystemTime::now(); + self.purge(now); + + for entry in self.entries.iter() { + if entry.secret.validate(token).is_ok() { + return Ok(entry.clone()); + } + } + + Err(AuthError::UnknownToken) + } +} + +#[derive(Debug, Error)] +pub enum AuthError { + #[error("token did not match any active secret")] + UnknownToken, +} + +/// Produced by [`GatewayAuthLayer`] to indicate if the token was valid or not +#[derive(Clone)] +enum Authentication { + Valid, + Invalid, +} + +/// Produced by [`GatewayRPCAuthLayer`] to indicate if the method was allowed or not +#[derive(Clone, Copy)] +enum AuthenticationResult { + Allowed, + Disallowed, +} + +/// Interprets the JWT provided in the request's authorization header +/// +/// Companion to [`GatewayRPCAuthLayer`] +pub struct GatewayAuthLayer { + manager: Arc, +} + +impl GatewayAuthLayer { + pub fn new(manager: Arc) -> Self { + Self { manager } + } +} + +impl Layer for GatewayAuthLayer +where + S: Service, + S::Future: Send + 'static, +{ + type Service = GatewayAuthService; + + fn layer(&self, inner: S) -> Self::Service { + GatewayAuthService { manager: self.manager.clone(), inner } + } +} + +#[derive(Clone)] +pub struct GatewayAuthService { + manager: Arc, + inner: S, +} + +impl Service for GatewayAuthService +where + S: Service, + S::Future: Send + 'static, +{ + type Response = HttpResponse; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, mut req: HttpRequest) -> Self::Future { + let auth = match extract_bearer(req.headers()) { + None => Authentication::Invalid, + Some(token) => { + if self.manager.validate(&token).is_err() { + Authentication::Invalid + } else { + Authentication::Valid + } + } + }; + + // Add authentication result as request extension + req.extensions_mut().insert(auth); + + let fut = self.inner.call(req); + + let fut = async move { + fut.await.map(|response| { + let ext = response.extensions(); + + let Some(auth) = ext.get::() else { + // RPC layer not used, ignore + return response; + }; + + if matches!(auth, AuthenticationResult::Disallowed) { + err_response("Method requires authentication".to_string()) + } else { + response + } + }) + }; + + Box::pin(fut) + } +} + +fn extract_bearer(headers: &HeaderMap) -> Option { + let header = headers.get(http::header::AUTHORIZATION)?; + let auth = header.to_str().ok()?; + auth.strip_prefix("Bearer ").map(str::to_owned) +} + +fn err_response(err: String) -> HttpResponse { + HttpResponse::builder() + .status(StatusCode::UNAUTHORIZED) + .body(HttpBody::new(err)) + .expect("failed to build unauthorized response") +} + +/// Enforces authentication for the RPC methods NOT in `excluded_methods` +/// +/// Companion to [`GatewayAuthLayer`] +#[derive(Clone)] +pub struct GatewayRPCAuthLayer { + excluded_methods: Arc>, +} + +impl GatewayRPCAuthLayer { + pub fn new(excluded_method_names: Arc>) -> Self { + Self { excluded_methods: excluded_method_names } + } + + pub fn exclude(module: &RpcModule) -> Self { + let method_names = module.method_names().map(|s| s.to_owned()).collect::>().into_boxed_slice(); + Self::new(Arc::new(method_names)) + } +} + +impl Layer for GatewayRPCAuthLayer { + type Service = GatewayRPCAuth; + + fn layer(&self, inner: S) -> Self::Service { + GatewayRPCAuth { excluded_methods: self.excluded_methods.clone(), inner } + } +} + +pub struct GatewayRPCAuth { + excluded_methods: Arc>, + inner: S, +} + +impl RpcServiceT for GatewayRPCAuth +where + S: RpcServiceT, +{ + type MethodResponse = MethodResponse; + + type NotificationResponse = S::NotificationResponse; + + type BatchResponse = S::BatchResponse; + + fn call<'a>(&self, request: Request<'a>) -> impl Future + Send + 'a { + let auth = request.extensions().get::(); + + // bypass auth for the given method if the method is excluded + let bypass_auth = self.excluded_methods.iter().find(|method| *method == request.method_name()).is_some(); + + let authentication_result = match auth { + _ if bypass_auth => AuthenticationResult::Allowed, + Some(Authentication::Valid) => AuthenticationResult::Allowed, + _ => AuthenticationResult::Disallowed, + }; + + let id = request.id().clone(); + let fut = self.inner.call(request); + + async move { + let mut response = if matches!(authentication_result, AuthenticationResult::Allowed) { + fut.await + } else { + MethodResponse::error( + id, + ErrorObject::borrowed(INVALID_PARAMS_CODE, "Method requires authentication", None), + ) + }; + response.extensions_mut().insert(authentication_result); + + response + } + } + + fn batch<'a>( + &self, + requests: jsonrpsee::core::middleware::Batch<'a>, + ) -> impl Future + Send + 'a { + self.inner.batch(requests) + } + + fn notification<'a>( + &self, + n: jsonrpsee::core::middleware::Notification<'a>, + ) -> impl Future + Send + 'a { + self.inner.notification(n) + } +} + +#[derive(Clone)] +pub struct AuthRpc { + manager: Arc, +} + +impl AuthRpc { + pub fn new(manager: Arc) -> Self { + Self { manager } + } + + pub fn http_layer(&self) -> GatewayAuthLayer { + GatewayAuthLayer::new(self.manager.clone()) + } + + pub fn rpc_layer(&self) -> GatewayRPCAuthLayer { + let module = BasedAuthApiServer::into_rpc(self.clone()); + GatewayRPCAuthLayer::exclude(&module) + } +} + +#[async_trait] +impl BasedAuthApiServer for AuthRpc { + async fn authentication_challenge(&self, valid_from: u64) -> RpcResult { + Ok(gateway_auth_message(self.manager.config().gateway_address, valid_from)) + } + + async fn authenticate_proposer(&self, valid_from: u64, signature: Signature) -> RpcResult { + let payload_hash = self.authentication_challenge(valid_from).await?; + let challenger = signature + .recover_address_from_prehash(&payload_hash) + .map_err(|_| RpcError::Generic("invalid signature"))?; + + let valid_from = Duration::from_secs(valid_from); + let valid_from = SystemTime::UNIX_EPOCH + valid_from; + + // TODO: add authorization logic, verifying challenger may authenticate with this gateway + Ok(self.manager.issue(challenger, valid_from)) + } +} diff --git a/based/crates/rpc/src/lib.rs b/based/crates/rpc/src/lib.rs index f00b2d61d..3d7fab4cd 100644 --- a/based/crates/rpc/src/lib.rs +++ b/based/crates/rpc/src/lib.rs @@ -1,10 +1,9 @@ use std::{net::SocketAddr, sync::Arc}; use alloy_primitives::{B256, Bytes, U64}; -use alloy_rpc_types::engine::JwtSecret; use axum::{Router, routing::get}; use bop_common::{ - api::{ControlApiServer, EngineApiServer, MinimalEthApiServer, OpMinerExtApiServer}, + api::{BasedAuthApiServer, ControlApiServer, EngineApiServer, MinimalEthApiServer, OpMinerExtApiServer}, communication::{ Producer, Sender, Spine, messages::{EngineApi, RpcResult}, @@ -19,15 +18,18 @@ use bop_common::{ }; use jsonrpsee::{ core::async_trait, - server::{ServerBuilder, ServerConfigBuilder}, + server::{ServerBuilder, ServerConfigBuilder, middleware::rpc::RpcServiceBuilder}, }; use reth_optimism_payload_builder::config::OpDAConfig; -use reth_rpc_layer::{AuthLayer, JwtAuthValidator}; use tokio::{net::TcpListener, runtime::Runtime}; use tracing::{Level, error, info, trace}; -use crate::state_stream::{StreamState, state_stream}; +use crate::{ + auth::{AuthConfig, AuthManager, AuthRpc}, + state_stream::{StreamState, state_stream}, +}; +mod auth; mod engine; mod fabric; pub mod gossiper; @@ -45,7 +47,8 @@ pub fn start_rpc( let addr_auth = SocketAddr::new(config.rpc_host.into(), config.rpc_port); let addr_no_auth = SocketAddr::new(config.rpc_host.into(), config.rpc_port_no_auth); let addr_ws = SocketAddr::new(config.rpc_host.into(), config.rpc_port_ws); - let server = RpcServer::new(spine, config.sequencer_jwt(), rx_spawner, da_config); + let auth_manager = Arc::new(AuthManager::new(AuthConfig::from(config))); + let server = RpcServer::new(spine, auth_manager, rx_spawner, da_config); rt.spawn(server.run(addr_auth, addr_no_auth, addr_ws)); } @@ -56,16 +59,16 @@ struct RpcServer { new_order_tx: Sender>, engine_timeout: Duration, engine_rpc_tx: Sender, - jwt: JwtSecret, telemetry_producer: Producer, frag_receiver_spawner: tokio::sync::broadcast::Sender, da_config: OpDAConfig, + auth_manager: Arc, } impl RpcServer { pub fn new( spine: &Spine, - jwt: JwtSecret, + auth_manager: Arc, frag_receiver_spawner: tokio::sync::broadcast::Sender, da_config: OpDAConfig, ) -> Self { @@ -73,25 +76,29 @@ impl RpcServer { new_order_tx: spine.into(), engine_rpc_tx: spine.into(), engine_timeout: Duration::from_secs(1), - jwt, telemetry_producer: telemetry_queue().into(), frag_receiver_spawner, da_config, + auth_manager, } } #[tracing::instrument(skip_all, name = "rpc")] pub async fn run(self, addr_auth: SocketAddr, addr_no_auth: SocketAddr, addr_ws: SocketAddr) { info!(%addr_auth, "starting RPC server"); - let validator = JwtAuthValidator::new(self.jwt); - let auth_layer = AuthLayer::new(validator); - let service_builder = tower::ServiceBuilder::new().layer(auth_layer).timeout(std::time::Duration::from_secs(2)); + + let auth_rpc = AuthRpc::new(self.auth_manager.clone()); + + let http_middleware = + tower::ServiceBuilder::new().layer(auth_rpc.http_layer()).timeout(std::time::Duration::from_secs(2)); + let rpc_middleware = RpcServiceBuilder::new().layer(auth_rpc.rpc_layer()); let server_auth = ServerBuilder::default() .set_config( ServerConfigBuilder::new().max_request_body_size(u32::MAX).max_response_body_size(u32::MAX).build(), ) - .set_http_middleware(service_builder) + .set_http_middleware(http_middleware) + .set_rpc_middleware(rpc_middleware) .build(addr_auth) .await .expect("failed to create eth RPC server"); @@ -99,6 +106,7 @@ impl RpcServer { module.merge(EngineApiServer::into_rpc(self.clone())).expect("failed to merge modules"); module.merge(ControlApiServer::into_rpc(self.clone())).expect("failed to merge modules"); module.merge(OpMinerExtApiServer::into_rpc(self.clone())).expect("failed to merge modules"); + module.merge(BasedAuthApiServer::into_rpc(auth_rpc)).expect("failed to merge modules"); let server_handle_auth = server_auth.start(module); diff --git a/based/portal.just b/based/portal.just index f67e1cae3..806dfa541 100644 --- a/based/portal.just +++ b/based/portal.just @@ -26,10 +26,10 @@ l2_chain_id: (_rpc "portal_l2ChainId") rollup: (_rpc "portal_fileRollup") genesis: (_rpc "portal_fileGenesis") -@register-gateway url address jwt: +@register-gateway url address: # The portal returns null on success so we remove the -e flag JQ_ARGS="" {{self}} _rpc "registry_registerGateway" \ - "[[\"{{url}}\", \"{{address}}\", \"{{jwt}}\"]]" + "[[\"{{url}}\", \"{{address}}\"]]" # Start the portal service and view logs (for main sequencing node) start: @@ -55,4 +55,3 @@ stop: data=$(dirname $({{main_node}} _is-configured)) docker compose -f $data/compose.yml down {{service-name}} - diff --git a/chain-replay/registry.json b/chain-replay/registry.json index 2e91f83a8..68744e495 100644 --- a/chain-replay/registry.json +++ b/chain-replay/registry.json @@ -1,7 +1,6 @@ [ [ "http://0.0.0.0:9997", - "0x6Fd6766071Beb65E65ceae31de00efbA6342342a", - "0xbfea64567a978df0bcd973699554671aa20b20820bc40ba45dfed46025e2fa10" + "0x6Fd6766071Beb65E65ceae31de00efbA6342342a" ] ] diff --git a/main_node/compose.dev.yml b/main_node/compose.dev.yml index c0fb69193..026efc2db 100644 --- a/main_node/compose.dev.yml +++ b/main_node/compose.dev.yml @@ -156,6 +156,7 @@ services: - --op_node.url=http://0.0.0.0:${OP_NODE_RPC_PORT} - --registry.url=http://0.0.0.0:${REGISTRY_PORT} - --gateway.timeout_ms=200 + - --gateway.signing-key=${OP_PROPOSER_PRIVATE_KEY} - --log.dir=/var/log/app - --port=${PORTAL_PORT} volumes: @@ -256,4 +257,3 @@ services: # SSG_ENABLED: "false" # PORT: "${BLOCKSCOUT_PORT}" # # no ports: host-mode means Blockscout binds to 4000 on the host automatically - diff --git a/main_node/compose.prod.yml b/main_node/compose.prod.yml index 3f83181e2..acc9ff158 100644 --- a/main_node/compose.prod.yml +++ b/main_node/compose.prod.yml @@ -157,6 +157,7 @@ services: - --op_node.url=http://0.0.0.0:${OP_NODE_RPC_PORT} - --registry.url=http://0.0.0.0:${REGISTRY_PORT} - --gateway.timeout_ms=200 + - --gateway.signing-key=${OP_PROPOSER_PRIVATE_KEY} - --log.dir=/var/log/app - --port=${PORTAL_PORT} volumes: @@ -259,4 +260,3 @@ services: # SSG_ENABLED: "false" # PORT: "${BLOCKSCOUT_PORT}" # # no ports: host-mode means Blockscout binds to 4000 on the host automatically - diff --git a/main_node/registry_example.json b/main_node/registry_example.json index c96009167..b1776e5ae 100644 --- a/main_node/registry_example.json +++ b/main_node/registry_example.json @@ -1,3 +1,6 @@ [ - ["fill out this file and copy to config/registry.json. this field should be the url of a gateway e.g. http://0.0.0.0:9997", "0x0000000addres_of_signing_wallet", "0x00000_jwt_used_to_communicate_with_gateway"] + [ + "fill out this file and copy to config/registry.json. this field should be the url of a gateway e.g. http://0.0.0.0:9997", + "0x0000000addres_of_signing_wallet" + ] ] diff --git a/main_node_dockerized_network/compose.yml b/main_node_dockerized_network/compose.yml index e3d579002..e70baa74c 100644 --- a/main_node_dockerized_network/compose.yml +++ b/main_node_dockerized_network/compose.yml @@ -176,6 +176,7 @@ services: - --op_node.url=http://op-node:${OP_NODE_RPC_PORT} - --registry.url=http://based-registry:${REGISTRY_PORT} - --gateway.timeout_ms=200 + - --gateway.signing-key=${OP_PROPOSER_PRIVATE_KEY} volumes: - ./config:/config restart: unless-stopped @@ -265,4 +266,3 @@ services: SSG_ENABLED: "false" PORT: "${BLOCKSCOUT_PORT}" # no ports: host-mode means Blockscout binds to 4000 on the host automatically - diff --git a/main_node_dockerized_network/registry_example.json b/main_node_dockerized_network/registry_example.json index c96009167..b1776e5ae 100644 --- a/main_node_dockerized_network/registry_example.json +++ b/main_node_dockerized_network/registry_example.json @@ -1,3 +1,6 @@ [ - ["fill out this file and copy to config/registry.json. this field should be the url of a gateway e.g. http://0.0.0.0:9997", "0x0000000addres_of_signing_wallet", "0x00000_jwt_used_to_communicate_with_gateway"] + [ + "fill out this file and copy to config/registry.json. this field should be the url of a gateway e.g. http://0.0.0.0:9997", + "0x0000000addres_of_signing_wallet" + ] ] From 5f89bfe9c38947501dfe77658ec47d16e903e347 Mon Sep 17 00:00:00 2001 From: Francesco Dainese Date: Wed, 17 Dec 2025 21:21:23 +0100 Subject: [PATCH 2/9] chore: fmt --- based/bin/gateway_registry/src/main.rs | 4 ++-- based/bin/portal/src/cli.rs | 3 +-- based/bin/portal/src/clients/gateway.rs | 7 ++----- based/crates/rpc/src/auth.rs | 6 ++---- 4 files changed, 7 insertions(+), 13 deletions(-) diff --git a/based/bin/gateway_registry/src/main.rs b/based/bin/gateway_registry/src/main.rs index 14334029f..39c8ee330 100644 --- a/based/bin/gateway_registry/src/main.rs +++ b/based/bin/gateway_registry/src/main.rs @@ -206,8 +206,8 @@ impl RegistryApiServer for RegistryServer { if n_gateways == 0 { return Err(RpcError::Jsonrpsee(ClientError::Custom("No registered gateways".to_string()))); } - let target_block = u64::try_from(curblock + U256::from_limbs([1, 0, 0, 0])).map_err(|_| RpcError::Internal)? - + n_blocks_into_the_future; + let target_block = u64::try_from(curblock + U256::from_limbs([1, 0, 0, 0])).map_err(|_| RpcError::Internal)? + + n_blocks_into_the_future; let id = (target_block / self.gateway_update_blocks) as usize; let (url, address) = gateways[id % n_gateways].clone(); diff --git a/based/bin/portal/src/cli.rs b/based/bin/portal/src/cli.rs index a2bf26faf..7d7dc1db7 100644 --- a/based/bin/portal/src/cli.rs +++ b/based/bin/portal/src/cli.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{fs, path::PathBuf}; use bop_common::{ config::{LoggingConfig, LoggingFlags}, @@ -7,7 +7,6 @@ use bop_common::{ use clap::Parser; use reqwest::Url; use reth_rpc_layer::JwtSecret; -use std::fs; use tracing::level_filters::LevelFilter; #[derive(Parser, Debug, Clone)] diff --git a/based/bin/portal/src/clients/gateway.rs b/based/bin/portal/src/clients/gateway.rs index a1e6759eb..0b6215a8e 100644 --- a/based/bin/portal/src/clients/gateway.rs +++ b/based/bin/portal/src/clients/gateway.rs @@ -35,9 +35,8 @@ use tokio::sync::RwLock; use tower::{Layer, Service}; use tracing::{Instrument, debug, error, info, trace, warn}; -use crate::{cli::PortalArgs, clients::create_client}; - use super::RpcClient; +use crate::{cli::PortalArgs, clients::create_client}; pub type GatewayClient = HttpClient>>>; @@ -70,11 +69,9 @@ impl Service> for GatewayClientService where S: Service>, { - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; + type Response = S::Response; fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.inner.poll_ready(cx) diff --git a/based/crates/rpc/src/auth.rs b/based/crates/rpc/src/auth.rs index 5f2ff27d5..ecc0bf335 100644 --- a/based/crates/rpc/src/auth.rs +++ b/based/crates/rpc/src/auth.rs @@ -161,9 +161,9 @@ where S: Service, S::Future: Send + 'static, { - type Response = HttpResponse; type Error = S::Error; type Future = Pin> + Send>>; + type Response = HttpResponse; fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.inner.poll_ready(cx).map_err(Into::into) @@ -256,12 +256,10 @@ impl RpcServiceT for GatewayRPCAuth where S: RpcServiceT, { + type BatchResponse = S::BatchResponse; type MethodResponse = MethodResponse; - type NotificationResponse = S::NotificationResponse; - type BatchResponse = S::BatchResponse; - fn call<'a>(&self, request: Request<'a>) -> impl Future + Send + 'a { let auth = request.extensions().get::(); From 8053316e1429ace783b6c650dadd1da354ecc4fd Mon Sep 17 00:00:00 2001 From: Francesco Dainese Date: Wed, 17 Dec 2025 21:29:56 +0100 Subject: [PATCH 3/9] chore: clippy --- based/bin/portal/src/clients/gateway.rs | 2 +- based/bin/portal/src/server.rs | 2 +- based/crates/rpc/src/auth.rs | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/based/bin/portal/src/clients/gateway.rs b/based/bin/portal/src/clients/gateway.rs index 0b6215a8e..509447c4a 100644 --- a/based/bin/portal/src/clients/gateway.rs +++ b/based/bin/portal/src/clients/gateway.rs @@ -242,7 +242,7 @@ impl GatewayManager { gateway.registry_index.store(index as u64, Ordering::Relaxed); } None => { - let gateway = Gateway::new(url.clone(), address, index, self.metrics.clone()); + let gateway = Gateway::new(url.clone(), address, index, self.metrics); gateways.insert(url, Arc::new(gateway)); } } diff --git a/based/bin/portal/src/server.rs b/based/bin/portal/src/server.rs index 8b8390651..5358d10fa 100644 --- a/based/bin/portal/src/server.rs +++ b/based/bin/portal/src/server.rs @@ -69,7 +69,7 @@ impl PortalServer { geth_client: fallback_eth_client, geth_engine_client: fallback_client, op_node_client, - gateway_manager: Arc::new(GatewayManager::new_from_args(&args, signer, metrics.clone())), + gateway_manager: Arc::new(GatewayManager::new_from_args(&args, signer, metrics)), new_payload_block_number: Arc::new(AtomicU64::new(0)), new_payload_block_hash: Arc::new(RwLock::new(B256::ZERO)), current_block_number: Arc::new(AtomicU64::new(0)), diff --git a/based/crates/rpc/src/auth.rs b/based/crates/rpc/src/auth.rs index ecc0bf335..cd9082d61 100644 --- a/based/crates/rpc/src/auth.rs +++ b/based/crates/rpc/src/auth.rs @@ -26,7 +26,7 @@ use jsonrpsee::{ }; use thiserror::Error; use tower::{Layer, Service}; -use tracing::{error, info}; +use tracing::info; #[derive(Clone, Debug)] pub struct AuthConfig { @@ -78,7 +78,7 @@ impl AuthManager { let secret = JwtSecret::random(); let expiry = issued_at + self.cfg.token_validity; - let entry = Arc::new(AuthEntry { secret: secret.clone(), expires_at: expiry.clone() }); + let entry = Arc::new(AuthEntry { secret, expires_at: expiry }); self.entries.insert(entry.id(), entry.clone()); info!(%challenger, "issued JWT secret"); @@ -264,7 +264,7 @@ where let auth = request.extensions().get::(); // bypass auth for the given method if the method is excluded - let bypass_auth = self.excluded_methods.iter().find(|method| *method == request.method_name()).is_some(); + let bypass_auth = self.excluded_methods.iter().any(|method| method == request.method_name()); let authentication_result = match auth { _ if bypass_auth => AuthenticationResult::Allowed, From 471c0985b6396d15cb0ac557e6fc7a04c517d72b Mon Sep 17 00:00:00 2001 From: Francesco Dainese Date: Thu, 18 Dec 2025 15:27:16 +0100 Subject: [PATCH 4/9] refactor: replace RwLocks in GatewayInstance --- based/Cargo.lock | 7 +++++ based/Cargo.toml | 3 ++- based/bin/portal/Cargo.toml | 1 + based/bin/portal/src/clients/gateway.rs | 36 +++++++++++++------------ based/bin/portal/src/server.rs | 2 +- 5 files changed, 30 insertions(+), 19 deletions(-) diff --git a/based/Cargo.lock b/based/Cargo.lock index 71ea4ed68..fb0822823 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -1006,6 +1006,12 @@ dependencies = [ "derive_arbitrary", ] +[[package]] +name = "arc-cell" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fec9da9adf9420d86def101bd5b4a227b0512d456b6a128b0d677fdf68e5f7b8" + [[package]] name = "ark-bls12-381" version = "0.5.0" @@ -1570,6 +1576,7 @@ dependencies = [ "alloy-eips", "alloy-primitives", "alloy-rpc-types", + "arc-cell", "bop-common", "bop-metrics", "clap", diff --git a/based/Cargo.toml b/based/Cargo.toml index e5df44acc..bf4e0f5f9 100644 --- a/based/Cargo.toml +++ b/based/Cargo.toml @@ -22,9 +22,10 @@ alloy-signer = "1.0.41" alloy-signer-local = "1.0.41" alloy-transport = "1.0.41" alloy-transport-http = "1.0.41" + +arc-cell = "0.3.3" auto_impl = "1.3.0" axum = { version = "0.8.1", features = ["macros", "ws"] } - backtrace = "0.3.73" bitflags = "2.6.0" bop-common = { path = "crates/common" } diff --git a/based/bin/portal/Cargo.toml b/based/bin/portal/Cargo.toml index 4d567c49f..e2c364c5d 100644 --- a/based/bin/portal/Cargo.toml +++ b/based/bin/portal/Cargo.toml @@ -8,6 +8,7 @@ version.workspace = true alloy-eips.workspace = true alloy-primitives.workspace = true alloy-rpc-types.workspace = true +arc-cell.workspace = true bop-common.workspace = true bop-metrics.workspace = true clap.workspace = true diff --git a/based/bin/portal/src/clients/gateway.rs b/based/bin/portal/src/clients/gateway.rs index 509447c4a..2d74b1dcc 100644 --- a/based/bin/portal/src/clients/gateway.rs +++ b/based/bin/portal/src/clients/gateway.rs @@ -10,6 +10,7 @@ use std::{ use alloy_eips::eip7685::RequestsOrHash; use alloy_primitives::{Address, B256, Signature, U64}; use alloy_rpc_types::engine::{ExecutionPayloadV3, ForkchoiceState}; +use arc_cell::{ArcCell, OptionalArcCell}; use bop_common::{ api::{BasedAuthApiClient, ControlApiClient, EngineApiClient, OpMinerExtApiClient, RegistryApiClient}, auth::gateway_auth_message, @@ -87,8 +88,8 @@ where pub struct Gateway { pub url: Url, pub address: Arc
, - pub client: Arc>>, - pub ping: Arc>, + pub client: OptionalArcCell, + pub ping: ArcCell, pub active: Arc, pub registry_index: Arc, pub metrics: Producer, @@ -96,15 +97,16 @@ pub struct Gateway { impl Gateway { pub async fn health_check(&self) { - let Some(client) = self.client.read().await.clone() else { + let Some(client) = self.client.get() else { self.active.store(false, Ordering::Relaxed); return; }; + let ping_start = Instant::now(); - match ControlApiClient::heartbeat(&client).await { + match ControlApiClient::heartbeat(&*client).await { Ok(_) => { let ping_duration = ping_start.elapsed(); - *self.ping.write().await = ping_duration; + self.ping.set(ping_duration.into()); self.active.store(true, Ordering::Relaxed); info!("successfully pinged gateway={} ping={:>9}", self.url, ping_duration.to_string()); MetricsUpdate::send_ref( @@ -132,11 +134,11 @@ impl Gateway { signature: Signature, timeout: Duration, ) -> eyre::Result<()> { - let mut client = self.client.write().await; + let client = self.client.take(); // Invalidate the old client and request a new token. // Attempt to reuse the client if one exists - let response = match client.take() { + let response = match client { None => { let temp_client = create_client(self.url.clone(), timeout)?; temp_client.authenticate_proposer(timestamp, signature).await @@ -155,15 +157,15 @@ impl Gateway { .request_timeout(timeout.into()) .build(self.url.clone())?; - client.replace(new_client); + self.client.set(Some(new_client.into())); info!(url = %self.url, "authenticated gateway"); Ok(()) } - pub async fn client(&self) -> Option { - self.client.read().await.clone() + pub fn client(&self) -> Option> { + self.client.get() } } @@ -178,8 +180,8 @@ impl Gateway { Self { url, address: Arc::new(address), - client: Arc::new(RwLock::new(None)), - ping: Arc::new(RwLock::new(Duration::from_millis(0))), + client: Default::default(), + ping: Default::default(), active: Arc::new(AtomicBool::new(false)), registry_index: Arc::new(AtomicU64::new(registry_index as u64)), metrics, @@ -326,7 +328,7 @@ impl GatewayManager { /// Attempts to authenticate with the gateway if it's unauthenticated async fn prepare_gateway(signer: &ECDSASigner, timeout: Duration, gateway: &GatewayInstance) -> eyre::Result<()> { - if gateway.client().await.is_some() { + if gateway.client().is_some() { return Ok(()); } @@ -362,7 +364,7 @@ impl GatewayManager { payload_attributes: Option, gateway: GatewayInstance, ) { - let Some(client) = gateway.client().await else { + let Some(client) = gateway.client() else { warn!(url = %gateway.url, "cannot send fork choice update to unauthenticated gateway"); return; }; @@ -410,7 +412,7 @@ impl GatewayManager { let versioned_hashes = versioned_hashes.clone(); tokio::spawn( async move { - let Some(client) = gateway.client().await else { + let Some(client) = gateway.client() else { warn!(url = %gateway.url, "skipping newPayloadV3 broadcast to unauthenticated gateway"); return; }; @@ -445,7 +447,7 @@ impl GatewayManager { let versioned_hashes = versioned_hashes.clone(); tokio::spawn( async move { - let Some(client) = gateway.client().await else { + let Some(client) = gateway.client() else { warn!(url = %gateway.url, "skipping newPayloadV4 broadcast to unauthenticated gateway"); return; }; @@ -471,7 +473,7 @@ impl GatewayManager { let gateway = gateway.clone(); tokio::spawn( async move { - let Some(client) = gateway.client().await else { + let Some(client) = gateway.client() else { warn!(url = %gateway.url, "skipping miner_setMaxDASize for unauthenticated gateway"); return; }; diff --git a/based/bin/portal/src/server.rs b/based/bin/portal/src/server.rs index 5358d10fa..c316c36ef 100644 --- a/based/bin/portal/src/server.rs +++ b/based/bin/portal/src/server.rs @@ -263,7 +263,7 @@ impl EngineApiServer for PortalServer { }); let Some(gateway) = self.gateway_manager.current_gateway().await else { return Ok(fallback_fut.await??) }; - let Some(gateway_client) = gateway.client().await else { return Ok(fallback_fut.await??) }; + let Some(gateway_client) = gateway.client() else { return Ok(fallback_fut.await??) }; let gateway_for_task = gateway.clone(); let gateway_for_log = gateway.clone(); From 30709d26320c972721b292097576990c2112134e Mon Sep 17 00:00:00 2001 From: Francesco Dainese Date: Thu, 18 Dec 2025 16:21:56 +0100 Subject: [PATCH 5/9] test: gateway auth --- based/bin/portal/src/clients/gateway.rs | 43 +++++- based/crates/rpc/src/auth.rs | 189 ++++++++++++++++++++++++ 2 files changed, 231 insertions(+), 1 deletion(-) diff --git a/based/bin/portal/src/clients/gateway.rs b/based/bin/portal/src/clients/gateway.rs index 2d74b1dcc..2884aafad 100644 --- a/based/bin/portal/src/clients/gateway.rs +++ b/based/bin/portal/src/clients/gateway.rs @@ -47,7 +47,8 @@ pub struct GatewayClientAuthLayer { } impl GatewayClientAuthLayer { pub fn new(token: &str) -> Result { - let value = HeaderValue::from_str(token)?; + let header = if token.starts_with("Bearer ") { token.to_owned() } else { format!("Bearer {token}") }; + let value = HeaderValue::from_str(&header)?; Ok(Self { token: Arc::new(value) }) } } @@ -84,6 +85,46 @@ where } } +#[cfg(test)] +mod tests { + use std::convert::Infallible; + + use jsonrpsee::http_client::HttpRequest; + use reqwest::header::AUTHORIZATION; + use tower::{Layer, ServiceExt, service_fn}; + + use super::GatewayClientAuthLayer; + + #[tokio::test] + async fn gateway_client_auth_layer_prefixes_bearer() { + let layer = GatewayClientAuthLayer::new("token").expect("header should be valid"); + assert_eq!(layer.token.to_str().unwrap(), "Bearer token"); + + let layer = GatewayClientAuthLayer::new("Bearer token").expect("header should be valid"); + assert_eq!(layer.token.to_str().unwrap(), "Bearer token"); + } + + #[tokio::test] + async fn gateway_client_auth_layer_inserts_authorization_header() { + let layer = GatewayClientAuthLayer::new("token").expect("header should be valid"); + + // Simple service to retrieve the authorization header (which should have been added by the layer). + let svc = service_fn(|req: HttpRequest<()>| async move { + let auth = req.headers().get(AUTHORIZATION).cloned(); + Ok::<_, Infallible>(auth) + }); + + let svc = layer.layer(svc); + let auth = svc + .oneshot(HttpRequest::new(())) + .await + .expect("service call ok") + .expect("authorization header should have been extracted"); + + assert_eq!(auth.to_str().unwrap(), "Bearer token"); + } +} + #[derive(Clone)] pub struct Gateway { pub url: Url, diff --git a/based/crates/rpc/src/auth.rs b/based/crates/rpc/src/auth.rs index cd9082d61..406bf9a54 100644 --- a/based/crates/rpc/src/auth.rs +++ b/based/crates/rpc/src/auth.rs @@ -344,3 +344,192 @@ impl BasedAuthApiServer for AuthRpc { Ok(self.manager.issue(challenger, valid_from)) } } + +#[cfg(test)] +mod tests { + use std::{net::SocketAddr, sync::Arc, time::SystemTime}; + + use bop_common::{ + api::{BasedAuthApiServer, ControlApiServer}, + auth::gateway_auth_message, + communication::messages::RpcResult, + signing::ECDSASigner, + }; + use jsonrpsee::server::{ServerBuilder, ServerHandle, middleware::rpc::RpcServiceBuilder}; + use serde_json::{Value, json}; + + use super::{AuthConfig, AuthManager, AuthRpc}; + + #[derive(Clone)] + struct TestControlRpc; + + #[jsonrpsee::core::async_trait] + impl ControlApiServer for TestControlRpc { + async fn heartbeat(&self) -> RpcResult<()> { + Ok(()) + } + } + + async fn post_jsonrpc(url: &str, body: Value, auth_header: Option<&str>) -> (reqwest::StatusCode, String) { + // We do the JSON-RPC request manually (instead of using jsonrpsee) to check the HTTP status code returned. + let client = reqwest::Client::new(); + let mut req = client.post(url).json(&body); + if let Some(value) = auth_header { + req = req.header(reqwest::header::AUTHORIZATION, value); + } + let resp = req.send().await.expect("request should succeed"); + let status = resp.status(); + let text = resp.text().await.expect("response body should be readable"); + (status, text) + } + + struct StartedServer { + url: String, + handle: ServerHandle, + gateway_address: alloy_primitives::Address, + } + + async fn start_server() -> StartedServer { + let gateway_address = alloy_primitives::Address::from([0x11u8; 20]); + let auth_manager = Arc::new(AuthManager::new(AuthConfig { + gateway_address, + token_validity: std::time::Duration::from_secs(60), + })); + + let auth_rpc = AuthRpc::new(auth_manager.clone()); + + let http_middleware = tower::ServiceBuilder::new().layer(auth_rpc.http_layer()); + let rpc_middleware = RpcServiceBuilder::new().layer(auth_rpc.rpc_layer()); + + let server = ServerBuilder::default() + .set_http_middleware(http_middleware) + .set_rpc_middleware(rpc_middleware) + .build(SocketAddr::from(([127, 0, 0, 1], 0))) + .await + .expect("server should build"); + + let addr = server.local_addr().expect("server should have local addr"); + let url = format!("http://{addr}"); + + let mut module = ControlApiServer::into_rpc(TestControlRpc); + module + .merge(BasedAuthApiServer::into_rpc(auth_rpc)) + .expect("failed to merge based auth rpc"); + + let handle = server.start(module); + + StartedServer { url, handle, gateway_address } + } + + fn get_jsonrpc_success(body: &str) -> Option { + let value: Value = serde_json::from_str(body).ok()?; + if value.get("error").is_some() { + return None; + } + value.get("result").cloned() + } + + #[tokio::test] + async fn rejects_unauthenticated_requests_for_non_excluded_methods() { + let StartedServer { url, handle, .. } = start_server().await; + + // Without a token, non-excluded methods are rejected at the HTTP layer. + let (status, body) = post_jsonrpc( + &url, + json!({"jsonrpc":"2.0","id":1,"method":"control_heartbeat","params":[]}), + None, + ) + .await; + assert_eq!(status, reqwest::StatusCode::UNAUTHORIZED); + assert!(body.contains("Method requires authentication"), "unexpected body: {body}"); + + // With a fake token, non-excluded methods are still rejected. + let (status, body) = post_jsonrpc( + &url, + json!({"jsonrpc":"2.0","id":2,"method":"control_heartbeat","params":[]}), + Some("Bearer totally-fake"), + ) + .await; + assert_eq!(status, reqwest::StatusCode::UNAUTHORIZED); + assert!(body.contains("Method requires authentication"), "unexpected body: {body}"); + + handle.stop().expect("server stop should succeed"); + handle.stopped().await; + } + + #[tokio::test] + async fn allows_excluded_based_auth_methods_without_token() { + let StartedServer { url, handle, gateway_address } = start_server().await; + + // Auth methods are excluded from enforcement (allowlist). + let valid_from = SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time ok") + .as_secs(); + let (status, body) = post_jsonrpc( + &url, + json!({"jsonrpc":"2.0","id":3,"method":"based_authenticationChallenge","params":[valid_from]}), + None, + ) + .await; + assert_eq!(status, reqwest::StatusCode::OK); + let challenge_value = get_jsonrpc_success(&body).expect("expected jsonrpc success"); + let challenge: alloy_primitives::B256 = serde_json::from_value(challenge_value).expect("challenge should be B256"); + assert_eq!(challenge, gateway_auth_message(gateway_address, valid_from)); + + handle.stop().expect("server stop should succeed"); + handle.stopped().await; + } + + #[tokio::test] + async fn token_from_authenticate_proposer_allows_followup_requests_with_bearer_prefix() { + let StartedServer { url, handle, gateway_address } = start_server().await; + + let valid_from = SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time ok") + .as_secs(); + + // Obtain a real token via based_authenticateProposer. + let signer = ECDSASigner::random(); + let signature = signer + .sign_message(gateway_auth_message(gateway_address, valid_from)) + .expect("signature should succeed"); + + let (status, body) = post_jsonrpc( + &url, + json!({"jsonrpc":"2.0","id":4,"method":"based_authenticateProposer","params":[valid_from, signature]}), + None, + ) + .await; + assert_eq!(status, reqwest::StatusCode::OK); + let auth_value = get_jsonrpc_success(&body).expect("expected jsonrpc success"); + let token = auth_value + .get("token") + .and_then(|v| v.as_str()) + .expect("token should be string") + .to_owned(); + + // Token without "Bearer " prefix is not interpreted as a bearer token. + let (status, body) = post_jsonrpc( + &url, + json!({"jsonrpc":"2.0","id":5,"method":"control_heartbeat","params":[]}), + Some(&token), + ) + .await; + assert_eq!(status, reqwest::StatusCode::UNAUTHORIZED); + assert!(body.contains("Method requires authentication"), "unexpected body: {body}"); + + // Token with "Bearer " prefix allows access to the rest of the gateway RPC methods. + let (status, body) = post_jsonrpc( + &url, + json!({"jsonrpc":"2.0","id":6,"method":"control_heartbeat","params":[]}), + Some(&format!("Bearer {token}")), + ) + .await; + assert_eq!(status, reqwest::StatusCode::OK); + get_jsonrpc_success(&body).expect("expected jsonrpc success"); + handle.stop().expect("server stop should succeed"); + handle.stopped().await; + } +} From 64a7fec2de820d0399f23979f99c3650fac1c536 Mon Sep 17 00:00:00 2001 From: Francesco Dainese Date: Thu, 18 Dec 2025 16:26:17 +0100 Subject: [PATCH 6/9] chore: fmt & clippy --- based/crates/rpc/src/auth.rs | 45 +++++++++++------------------------- 1 file changed, 13 insertions(+), 32 deletions(-) diff --git a/based/crates/rpc/src/auth.rs b/based/crates/rpc/src/auth.rs index 406bf9a54..49aad73cb 100644 --- a/based/crates/rpc/src/auth.rs +++ b/based/crates/rpc/src/auth.rs @@ -412,9 +412,7 @@ mod tests { let url = format!("http://{addr}"); let mut module = ControlApiServer::into_rpc(TestControlRpc); - module - .merge(BasedAuthApiServer::into_rpc(auth_rpc)) - .expect("failed to merge based auth rpc"); + module.merge(BasedAuthApiServer::into_rpc(auth_rpc)).expect("failed to merge based auth rpc"); let handle = server.start(module); @@ -434,12 +432,8 @@ mod tests { let StartedServer { url, handle, .. } = start_server().await; // Without a token, non-excluded methods are rejected at the HTTP layer. - let (status, body) = post_jsonrpc( - &url, - json!({"jsonrpc":"2.0","id":1,"method":"control_heartbeat","params":[]}), - None, - ) - .await; + let (status, body) = + post_jsonrpc(&url, json!({"jsonrpc":"2.0","id":1,"method":"control_heartbeat","params":[]}), None).await; assert_eq!(status, reqwest::StatusCode::UNAUTHORIZED); assert!(body.contains("Method requires authentication"), "unexpected body: {body}"); @@ -462,10 +456,7 @@ mod tests { let StartedServer { url, handle, gateway_address } = start_server().await; // Auth methods are excluded from enforcement (allowlist). - let valid_from = SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("time ok") - .as_secs(); + let valid_from = SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("time ok").as_secs(); let (status, body) = post_jsonrpc( &url, json!({"jsonrpc":"2.0","id":3,"method":"based_authenticationChallenge","params":[valid_from]}), @@ -474,7 +465,8 @@ mod tests { .await; assert_eq!(status, reqwest::StatusCode::OK); let challenge_value = get_jsonrpc_success(&body).expect("expected jsonrpc success"); - let challenge: alloy_primitives::B256 = serde_json::from_value(challenge_value).expect("challenge should be B256"); + let challenge: alloy_primitives::B256 = + serde_json::from_value(challenge_value).expect("challenge should be B256"); assert_eq!(challenge, gateway_auth_message(gateway_address, valid_from)); handle.stop().expect("server stop should succeed"); @@ -485,16 +477,12 @@ mod tests { async fn token_from_authenticate_proposer_allows_followup_requests_with_bearer_prefix() { let StartedServer { url, handle, gateway_address } = start_server().await; - let valid_from = SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("time ok") - .as_secs(); + let valid_from = SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("time ok").as_secs(); // Obtain a real token via based_authenticateProposer. let signer = ECDSASigner::random(); - let signature = signer - .sign_message(gateway_auth_message(gateway_address, valid_from)) - .expect("signature should succeed"); + let signature = + signer.sign_message(gateway_auth_message(gateway_address, valid_from)).expect("signature should succeed"); let (status, body) = post_jsonrpc( &url, @@ -504,19 +492,12 @@ mod tests { .await; assert_eq!(status, reqwest::StatusCode::OK); let auth_value = get_jsonrpc_success(&body).expect("expected jsonrpc success"); - let token = auth_value - .get("token") - .and_then(|v| v.as_str()) - .expect("token should be string") - .to_owned(); + let token = auth_value.get("token").and_then(|v| v.as_str()).expect("token should be string").to_owned(); // Token without "Bearer " prefix is not interpreted as a bearer token. - let (status, body) = post_jsonrpc( - &url, - json!({"jsonrpc":"2.0","id":5,"method":"control_heartbeat","params":[]}), - Some(&token), - ) - .await; + let (status, body) = + post_jsonrpc(&url, json!({"jsonrpc":"2.0","id":5,"method":"control_heartbeat","params":[]}), Some(&token)) + .await; assert_eq!(status, reqwest::StatusCode::UNAUTHORIZED); assert!(body.contains("Method requires authentication"), "unexpected body: {body}"); From fdbddfb1aec586cb74e3191680815a893974444f Mon Sep 17 00:00:00 2001 From: Francesco Dainese Date: Thu, 18 Dec 2025 16:32:46 +0100 Subject: [PATCH 7/9] chore: rename method --- based/bin/portal/src/clients/gateway.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/based/bin/portal/src/clients/gateway.rs b/based/bin/portal/src/clients/gateway.rs index 2884aafad..21988115e 100644 --- a/based/bin/portal/src/clients/gateway.rs +++ b/based/bin/portal/src/clients/gateway.rs @@ -169,12 +169,7 @@ impl Gateway { } /// Try to reauthenticate with the given gateway - pub async fn attempt_authentication( - &self, - timestamp: u64, - signature: Signature, - timeout: Duration, - ) -> eyre::Result<()> { + pub async fn authenticate(&self, timestamp: u64, signature: Signature, timeout: Duration) -> eyre::Result<()> { let client = self.client.take(); // Invalidate the old client and request a new token. @@ -362,7 +357,7 @@ impl GatewayManager { let signature = signer.sign_message(message).map_err(|err| eyre!("failed to sign gateway auth payload: {err}"))?; - gateway.attempt_authentication(timestamp, signature, timeout).await?; + gateway.authenticate(timestamp, signature, timeout).await?; Ok(()) } From 3e9bd619d06fa20668f474e29453aa858ff88d07 Mon Sep 17 00:00:00 2001 From: Francesco Dainese Date: Wed, 31 Dec 2025 17:35:01 +0100 Subject: [PATCH 8/9] fix: use custom jwt validation fix: remove gateway.address arg, use gossip key as "gateway address" refactor: cleanup scripts and compose files --- based/Cargo.lock | 1 + based/Cargo.toml | 1 + based/bin/gateway/src/main.rs | 6 +- based/bin/portal/src/clients/gateway.rs | 1 + based/crates/common/src/config.rs | 3 - based/crates/rpc/Cargo.toml | 3 +- based/crates/rpc/src/auth.rs | 85 +++++++++++++++++++++---- based/crates/rpc/src/gossiper.rs | 4 +- based/crates/rpc/src/lib.rs | 5 +- based/follower-node.just | 3 +- follower_node/compose.dev.yml | 1 - follower_node/compose.prod.yml | 1 - 12 files changed, 86 insertions(+), 28 deletions(-) diff --git a/based/Cargo.lock b/based/Cargo.lock index fb0822823..9ed30eb36 100644 --- a/based/Cargo.lock +++ b/based/Cargo.lock @@ -2006,6 +2006,7 @@ dependencies = [ "http", "http-body-util", "jsonrpsee", + "jsonwebtoken", "op-alloy-consensus", "op-alloy-rpc-types", "op-alloy-rpc-types-engine", diff --git a/based/Cargo.toml b/based/Cargo.toml index bf4e0f5f9..0c84fec01 100644 --- a/based/Cargo.toml +++ b/based/Cargo.toml @@ -46,6 +46,7 @@ hickory-resolver = "=0.25.0-alpha.5" # Use the exact version reth expects http = "1.3.1" hyper = "1.5.2" jsonrpsee = { version = "0.26", features = ["http-client", "macros", "server", "jsonrpsee-client-transport"] } +jsonwebtoken = { version = "9.3" } metrics = "0.24.1" metrics-exporter-prometheus = "0.16.2" mio = { features = ["net", "os-poll"], version = "1.0.4" } diff --git a/based/bin/gateway/src/main.rs b/based/bin/gateway/src/main.rs index 8d816119d..9072dab61 100644 --- a/based/bin/gateway/src/main.rs +++ b/based/bin/gateway/src/main.rs @@ -85,7 +85,9 @@ fn run(mut args: GatewayArgs) -> eyre::Result<()> { let simulator_vsync_window = Duration::from_micros(args.vsync_window_us as u64); let root_peer_url = args.gossip_root_peer_url.clone(); - let gossip_signer_private_key = args.gossip_signer_private_key().map(|key| ECDSASigner::new(key).unwrap()); + let gossip_signer_private_key = + args.gossip_signer_private_key().map(|key| ECDSASigner::new(key).unwrap()).unwrap_or_else(ECDSASigner::random); + let gateway_address = gossip_signer_private_key.address; std::thread::scope(|s| { let rt: Arc = tokio::runtime::Builder::new_current_thread() @@ -106,7 +108,7 @@ fn run(mut args: GatewayArgs) -> eyre::Result<()> { s.spawn({ let rt = rt.clone(); - start_rpc(&args, &spine, &rt, frag_broadcast_tx.clone(), args.da_config.clone()); + start_rpc(&args, &spine, &rt, frag_broadcast_tx.clone(), args.da_config.clone(), gateway_address); move || rt.block_on(wait_for_signal()) }); diff --git a/based/bin/portal/src/clients/gateway.rs b/based/bin/portal/src/clients/gateway.rs index 21988115e..6a7f05197 100644 --- a/based/bin/portal/src/clients/gateway.rs +++ b/based/bin/portal/src/clients/gateway.rs @@ -158,6 +158,7 @@ impl Gateway { } Err(err) => { // TODO: specifically handle authentication error by removing client? + // To also trigger re-authentication error!(%err, ?self, "failed to ping gateway"); self.active.store(false, Ordering::Relaxed); } diff --git a/based/crates/common/src/config.rs b/based/crates/common/src/config.rs index 6f1d92241..e778a2b7e 100644 --- a/based/crates/common/src/config.rs +++ b/based/crates/common/src/config.rs @@ -38,9 +38,6 @@ pub struct GatewayArgs { pub rpc_port_no_auth: u16, #[arg(long = "rpc.port_ws", default_value_t = 9999)] pub rpc_port_ws: u16, - /// Address that identifies this gateway inside the registry. - #[arg(long = "gateway.address")] - pub gateway_address: Address, /// Url to an L2 eth api rpc #[arg(long = "eth_client.url", default_value = "http://localhost:8545")] pub eth_client_url: Url, diff --git a/based/crates/rpc/Cargo.toml b/based/crates/rpc/Cargo.toml index 819819130..1ff9089f1 100644 --- a/based/crates/rpc/Cargo.toml +++ b/based/crates/rpc/Cargo.toml @@ -11,6 +11,7 @@ alloy-rpc-types.workspace = true axum.workspace = true bop-common.workspace = true jsonrpsee.workspace = true +jsonwebtoken.workspace = true op-alloy-consensus.workspace = true op-alloy-rpc-types.workspace = true op-alloy-rpc-types-engine.workspace = true @@ -18,6 +19,7 @@ reqwest.workspace = true reth-optimism-payload-builder.workspace = true reth-optimism-primitives.workspace = true reth-rpc-layer.workspace = true +serde.workspace = true serde_json.workspace = true tokio.workspace = true tower.workspace = true @@ -33,7 +35,6 @@ http-body-util = "0.1.3" alloy-consensus.workspace = true clap.workspace = true eyre.workspace = true -serde.workspace = true tracing-subscriber.workspace = true [[example]] diff --git a/based/crates/rpc/src/auth.rs b/based/crates/rpc/src/auth.rs index 49aad73cb..19c095c26 100644 --- a/based/crates/rpc/src/auth.rs +++ b/based/crates/rpc/src/auth.rs @@ -24,6 +24,9 @@ use jsonrpsee::{ http_client::{HttpBody, HttpRequest, HttpResponse}, types::{ErrorObject, error::INVALID_PARAMS_CODE}, }; +use jsonwebtoken::{EncodingKey, errors::ErrorKind}; +use reth_rpc_layer::JwtError; +use serde::{Deserialize, Serialize}; use thiserror::Error; use tower::{Layer, Service}; use tracing::info; @@ -36,22 +39,79 @@ pub struct AuthConfig { pub token_validity: Duration, } -impl From<&GatewayArgs> for AuthConfig { - fn from(args: &GatewayArgs) -> Self { - Self { gateway_address: args.gateway_address, token_validity: Duration::from_secs(args.auth_duration * 60) } +impl AuthConfig { + pub fn new(address: Address, token_duration_secs: u64) -> Self { + Self { gateway_address: address, token_validity: Duration::from_secs(token_duration_secs) } } } #[derive(Debug)] pub struct AuthEntry { - pub secret: JwtSecret, + pub secret: Vec, pub expires_at: SystemTime, } impl AuthEntry { // TODO: better id that doesn't leak secret data fn id(&self) -> B256 { - B256::from_slice(self.secret.as_bytes()) + B256::from_slice(&self.secret) + } +} + +#[derive(Clone, Serialize, Deserialize)] +pub struct TokenClaims { + /// Issued at UNIX timestamp + issued_at: u64, + /// The expiration UNIX timestamp + expiry: u64, +} + +impl TokenClaims { + pub fn from_systemtime(iat: SystemTime, exp: SystemTime) -> Self { + debug_assert!(iat < exp, "issuance should always be before expiry"); + + let issued_at = iat.duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_secs(); + let expiry = exp.duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_secs(); + + Self { issued_at, expiry } + } + + const fn signature_algo() -> jsonwebtoken::Algorithm { + jsonwebtoken::Algorithm::HS256 + } + + fn is_within_time_window(&self) -> bool { + let now = jsonwebtoken::get_current_timestamp(); + self.issued_at <= now && self.expiry >= now + } + + pub fn encode(&self, secret: &[u8]) -> Result { + let secret = jsonwebtoken::EncodingKey::from_secret(secret); + let algo = jsonwebtoken::Header::new(Self::signature_algo()); + jsonwebtoken::encode(&algo, self, &secret) + } + + pub fn validate(token: &str, secret: &[u8]) -> Result<(), JwtError> { + let mut validation = jsonwebtoken::Validation::new(Self::signature_algo()); + validation.set_required_spec_claims(&["iat"]); + + match jsonwebtoken::decode::(token, &jsonwebtoken::DecodingKey::from_secret(secret), &validation) { + Ok(token) => { + if !token.claims.is_within_time_window() { + Err(JwtError::InvalidIssuanceTimestamp)? + } + + Ok(()) + } + Err(err) => match *err.kind() { + ErrorKind::InvalidSignature => Err(JwtError::InvalidSignature)?, + ErrorKind::InvalidAlgorithm => Err(JwtError::UnsupportedSignatureAlgorithm)?, + _ => { + let detail = format!("{err}"); + Err(JwtError::JwtDecodingError(detail))? + } + }, + } } } @@ -76,19 +136,18 @@ impl AuthManager { pub fn issue(&self, challenger: Address, issued_at: SystemTime) -> GatewayAuthentication { let secret = JwtSecret::random(); + let secret = secret.as_bytes().to_vec(); + let expiry = issued_at + self.cfg.token_validity; - let entry = Arc::new(AuthEntry { secret, expires_at: expiry }); + let entry = Arc::new(AuthEntry { secret: secret.clone(), expires_at: expiry }); self.entries.insert(entry.id(), entry.clone()); - info!(%challenger, "issued JWT secret"); - - let expiry = expiry.duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_secs(); - let issued_at = issued_at.duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_secs(); + tracing::debug!(%challenger, "issued JWT secret"); - let claims = Claims { exp: Some(expiry), iat: issued_at }; + let claims = TokenClaims::from_systemtime(issued_at, expiry); - GatewayAuthentication { token: secret.encode(&claims).expect("able to encode JWT claims"), challenger } + GatewayAuthentication { token: claims.encode(&secret).expect("able to encode JWT claims"), challenger } } pub fn validate(&self, token: &str) -> Result, AuthError> { @@ -96,7 +155,7 @@ impl AuthManager { self.purge(now); for entry in self.entries.iter() { - if entry.secret.validate(token).is_ok() { + if TokenClaims::validate(token, &entry.secret).is_ok() { return Ok(entry.clone()); } } diff --git a/based/crates/rpc/src/gossiper.rs b/based/crates/rpc/src/gossiper.rs index 8a539a3ef..63550bfcb 100644 --- a/based/crates/rpc/src/gossiper.rs +++ b/based/crates/rpc/src/gossiper.rs @@ -20,7 +20,7 @@ pub struct Gossiper { impl Gossiper { pub fn new( target_rpc: Url, - signer: Option, + signer: ECDSASigner, frag_broadcast: tokio::sync::broadcast::Sender, ) -> Self { let client = ClientBuilder::new() @@ -28,8 +28,6 @@ impl Gossiper { .build() .expect("couldn't build http client"); - let signer = signer.unwrap_or_else(ECDSASigner::random); - Self { target_rpc, client, signer, frag_broadcast } } diff --git a/based/crates/rpc/src/lib.rs b/based/crates/rpc/src/lib.rs index 3d7fab4cd..87ff1b591 100644 --- a/based/crates/rpc/src/lib.rs +++ b/based/crates/rpc/src/lib.rs @@ -1,6 +1,6 @@ use std::{net::SocketAddr, sync::Arc}; -use alloy_primitives::{B256, Bytes, U64}; +use alloy_primitives::{Address, B256, Bytes, U64}; use axum::{Router, routing::get}; use bop_common::{ api::{BasedAuthApiServer, ControlApiServer, EngineApiServer, MinimalEthApiServer, OpMinerExtApiServer}, @@ -43,11 +43,12 @@ pub fn start_rpc( rt: &Runtime, rx_spawner: tokio::sync::broadcast::Sender, da_config: OpDAConfig, + gateway_address: Address, ) { let addr_auth = SocketAddr::new(config.rpc_host.into(), config.rpc_port); let addr_no_auth = SocketAddr::new(config.rpc_host.into(), config.rpc_port_no_auth); let addr_ws = SocketAddr::new(config.rpc_host.into(), config.rpc_port_ws); - let auth_manager = Arc::new(AuthManager::new(AuthConfig::from(config))); + let auth_manager = Arc::new(AuthManager::new(AuthConfig::new(gateway_address, config.auth_duration * 60))); let server = RpcServer::new(spine, auth_manager, rx_spawner, da_config); rt.spawn(server.run(addr_auth, addr_no_auth, addr_ws)); } diff --git a/based/follower-node.just b/based/follower-node.just index 562724923..89970925c 100644 --- a/based/follower-node.just +++ b/based/follower-node.just @@ -188,10 +188,9 @@ start-dev $dotenv=join(FOLLOWER_NODE_DATA, ".env") network="based_op_node": (doc port=$(grep -m1 '^GATEWAY_PORT[[:space:]]*=' $dotenv | cut -d= -f2) address=$({{wallet}} address {{WALLET_NAME}}) - jwt=$(cat $FOLLOWER_NODE_DATA/config/jwt) echo "Registering gateway" - {{portal}} register-gateway "http://$ip:$port" "$address" "$jwt" + {{portal}} register-gateway "http://$ip:$port" "$address" # Start services echo "Starting gateway and monitoring services..." diff --git a/follower_node/compose.dev.yml b/follower_node/compose.dev.yml index 069b4a293..4cd7f057a 100644 --- a/follower_node/compose.dev.yml +++ b/follower_node/compose.dev.yml @@ -104,7 +104,6 @@ services: - --chain=/config/genesis.json - --db.datadir=/data/gateway - --eth_client.url=$PORTAL - - --rpc.jwt=/config/jwt - --gossip.signer_private_key=$GATEWAY_SEQUENCING_KEY - --gossip.root_peer_url=http://0.0.0.0:8547 - --log.dir=/var/log/app diff --git a/follower_node/compose.prod.yml b/follower_node/compose.prod.yml index f3505a7a2..639a12015 100644 --- a/follower_node/compose.prod.yml +++ b/follower_node/compose.prod.yml @@ -107,7 +107,6 @@ services: - --chain=/config/genesis.json - --db.datadir=/data/gateway - --eth_client.url=$PORTAL - - --rpc.jwt=/config/jwt - --gossip.signer_private_key=$GATEWAY_SEQUENCING_KEY - --gossip.root_peer_url=http://0.0.0.0:8547 - --log.dir=/var/log/app From 52146120d4625cdc46d1d5633c19871a427e99f8 Mon Sep 17 00:00:00 2001 From: Francesco Dainese Date: Wed, 31 Dec 2025 18:20:23 +0100 Subject: [PATCH 9/9] fix(portal): automatic gateway reauth deps: portal-auth based-op-node chore: fmt and unused imports --- based/bin/portal/src/clients/gateway.rs | 34 ++++++++++++++++++++++--- based/crates/common/src/config.rs | 1 - based/crates/rpc/src/auth.rs | 6 ++--- deps/optimism.just | 2 +- 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/based/bin/portal/src/clients/gateway.rs b/based/bin/portal/src/clients/gateway.rs index 6a7f05197..da118a518 100644 --- a/based/bin/portal/src/clients/gateway.rs +++ b/based/bin/portal/src/clients/gateway.rs @@ -157,9 +157,14 @@ impl Gateway { ); } Err(err) => { - // TODO: specifically handle authentication error by removing client? - // To also trigger re-authentication - error!(%err, ?self, "failed to ping gateway"); + let err_str = err.to_string(); + // TODO: improve unauthorized check + if err_str.contains("401") { + // remove client to trigger reauthentication + self.client.take(); + } + + error!(err = %err_str, ?self, "failed to ping gateway"); self.active.store(false, Ordering::Relaxed); } } @@ -366,14 +371,20 @@ impl GatewayManager { /// Attempts to authenticate with the gateway if it's unauthenticated async fn prepare_gateway(signer: &ECDSASigner, timeout: Duration, gateway: &GatewayInstance) -> eyre::Result<()> { if gateway.client().is_some() { + // TODO: more robust auth check return Ok(()); } + tracing::debug!(?gateway, "authenticating with gateway"); + if let Err(err) = Self::authenticate_gateway(signer, timeout, gateway.clone(), SystemTime::now()).await { error!(%err, url = %gateway.url, "failed to authenticate gateway"); return Err(err); } + // Trigger health check right after authentication to mark gateway as active + gateway.health_check().await; + Ok(()) } @@ -387,13 +398,28 @@ impl GatewayManager { tokio::spawn(async move { gateway.health_check().await; // TODO: only if health check fails due to auth + // Currently health_check will remove the client if it detects a 401, so this will trigger + // reauthentication _ = Self::prepare_gateway(&signer, timeout, &gateway).await; }); } } pub async fn current_gateway(&self) -> Option { - self.current_gateway.read().await.as_ref().cloned() + let current = self.current_gateway.read().await.as_ref().cloned(); + + // ensure gateway is authenticated + if let Some(current) = ¤t { + // trigger health check to remove client if unauthenticated + current.health_check().await; + + // trigger authentication if client was removed + let Ok(_) = Self::prepare_gateway(&self.authentication_signer, self.gateway_timeout, current).await else { + return None; + }; + } + + current } async fn _send_fcu( diff --git a/based/crates/common/src/config.rs b/based/crates/common/src/config.rs index e778a2b7e..822a407f1 100644 --- a/based/crates/common/src/config.rs +++ b/based/crates/common/src/config.rs @@ -1,6 +1,5 @@ use std::{net::Ipv4Addr, ops::RangeInclusive, path::PathBuf, str::FromStr, sync::Arc}; -use alloy_primitives::Address; use clap::Parser; use reqwest::Url; use reth_cli::chainspec::ChainSpecParser; diff --git a/based/crates/rpc/src/auth.rs b/based/crates/rpc/src/auth.rs index 19c095c26..91c9d5106 100644 --- a/based/crates/rpc/src/auth.rs +++ b/based/crates/rpc/src/auth.rs @@ -6,12 +6,11 @@ use std::{ }; use alloy_primitives::{Address, B256, Signature}; -use alloy_rpc_types::engine::{Claims, JwtSecret}; +use alloy_rpc_types::engine::JwtSecret; use bop_common::{ api::{BasedAuthApiServer, GatewayAuthentication}, auth::gateway_auth_message, communication::messages::{RpcError, RpcResult}, - config::GatewayArgs, }; use dashmap::DashMap; use http::{HeaderMap, StatusCode}; @@ -24,12 +23,11 @@ use jsonrpsee::{ http_client::{HttpBody, HttpRequest, HttpResponse}, types::{ErrorObject, error::INVALID_PARAMS_CODE}, }; -use jsonwebtoken::{EncodingKey, errors::ErrorKind}; +use jsonwebtoken::errors::ErrorKind; use reth_rpc_layer::JwtError; use serde::{Deserialize, Serialize}; use thiserror::Error; use tower::{Layer, Service}; -use tracing::info; #[derive(Clone, Debug)] pub struct AuthConfig { diff --git a/deps/optimism.just b/deps/optimism.just index 9774ca09b..b07e59a22 100644 --- a/deps/optimism.just +++ b/deps/optimism.just @@ -1,5 +1,5 @@ repo_default := "https://github.com/gattaca-com/based-optimism" -ref_default := "based/develop" +ref_default := "feat/portal-auth" repo := env("BASED_OP_GETH_REPO", repo_default) ref := env("BASED_OP_GETH_REF", ref_default)