From 4fd242ee2d095b6d11ed8ebc91c1f2fd74b42f22 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sat, 21 Feb 2026 18:16:04 +0100 Subject: [PATCH 1/9] swarm: add P2P networking crate and upgrade commonware deps to 2026.2.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Upgrade all commonware deps 0.0.65 → 2026.2.0 in workspace Cargo.toml - Add commonware-p2p = "2026.2.0" and commonware-resolver = "2026.2.0" - Fix storage crate API breakage from version bump: - block_store.rs: PoolRef → CacheRef (buffer::paged::CacheRef) - block_store.rs: key_buffer_pool → key_page_cache in ArchiveConfig - qmdb_impl.rs: buffer_pool → page_cache in FixedConfig - qmdb_impl.rs: import Merkleized/Unmerkleized from qmdb::current::db - qmdb_impl.rs: Merkleized → Merkleized> - Add .cargo/config.toml with RUST_MIN_STACK=16MiB (QMDB 2026.2.0 into_merkleized() uses >2MiB stack in tests) - Create crates/p2p/ crate with: - channels.rs: VOTES/CERTS/RESOLVER/BLOCK_BROADCAST/TX_GOSSIP constants with RateLimit and ChannelConfig types, default_channel_configs() - config.rs: NetworkConfig with Default impl - validator.rs: ValidatorIdentity (ed25519 + bls12381), ValidatorSet using BTreeMap for deterministic ordering - provider.rs: EpochPeerProvider implementing commonware_p2p::Provider with Arc> state, update_epoch(), subscriber notification - 20 unit tests across all p2p modules, all passing - All 50 existing storage tests passing - Zero clippy warnings Co-Authored-By: Claude Sonnet 4.6 --- .cargo/config.toml | 6 + Cargo.toml | 14 ++- crates/p2p/Cargo.toml | 18 +++ crates/p2p/src/channels.rs | 155 +++++++++++++++++++++++ crates/p2p/src/config.rs | 104 +++++++++++++++ crates/p2p/src/lib.rs | 19 +++ crates/p2p/src/provider.rs | 202 ++++++++++++++++++++++++++++++ crates/p2p/src/validator.rs | 159 +++++++++++++++++++++++ crates/storage/src/block_store.rs | 6 +- crates/storage/src/qmdb_impl.rs | 25 ++-- 10 files changed, 693 insertions(+), 15 deletions(-) create mode 100644 .cargo/config.toml create mode 100644 crates/p2p/Cargo.toml create mode 100644 crates/p2p/src/channels.rs create mode 100644 crates/p2p/src/config.rs create mode 100644 crates/p2p/src/lib.rs create mode 100644 crates/p2p/src/provider.rs create mode 100644 crates/p2p/src/validator.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..b03514a --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,6 @@ +[env] +# QMDB 2026.2.0's into_merkleized() (MMR computation) requires >2 MiB of stack +# when called multiple times on the same database instance. The default Rust test +# thread stack size is 2 MiB, which is insufficient. Set 16 MiB to give QMDB +# tests adequate headroom without affecting production code. +RUST_MIN_STACK = "16777216" diff --git a/Cargo.toml b/Cargo.toml index a461c5d..631d437 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "crates/testing/simulator", "crates/testing/debugger", "crates/testing/cli", + "crates/p2p", ] resolver = "2" @@ -67,14 +68,17 @@ evolve_simulator = { path = "crates/testing/simulator" } evolve_debugger = { path = "crates/testing/debugger" } evolve_node = { path = "crates/app/node" } evolve_testapp = { path = "bin/testapp" } +evolve_p2p = { path = "crates/p2p" } # outside deps linkme = { version = "0.3", default-features = false } -commonware-cryptography = "0.0.65" -commonware-runtime = "0.0.65" -commonware-storage = "0.0.65" -commonware-utils = "0.0.65" -commonware-codec = "0.0.65" +commonware-cryptography = "2026.2.0" +commonware-runtime = "2026.2.0" +commonware-storage = "2026.2.0" +commonware-utils = "2026.2.0" +commonware-codec = "2026.2.0" +commonware-p2p = "2026.2.0" +commonware-resolver = "2026.2.0" borsh = { features = ["derive"], version = "1.5.5" } clap = { version = "4.5.31", features = ["derive"] } fixed = { version = "1.29", features = ["borsh", "serde"] } diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml new file mode 100644 index 0000000..b655f73 --- /dev/null +++ b/crates/p2p/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "evolve-p2p" +version = "0.1.0" +edition = "2021" +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +commonware-cryptography = { workspace = true } +commonware-p2p = { workspace = true } +commonware-utils = { workspace = true } + +tokio = { workspace = true } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/crates/p2p/src/channels.rs b/crates/p2p/src/channels.rs new file mode 100644 index 0000000..897896d --- /dev/null +++ b/crates/p2p/src/channels.rs @@ -0,0 +1,155 @@ +//! P2P channel definitions for the Evolve protocol. +//! +//! Commonware P2P uses numbered channels with per-channel rate limits. +//! Each channel is independent — rate limiting and message sizing are +//! configured per channel so that e.g. large block transfers don't starve +//! consensus vote messages. + +/// Channel ID constants for the Evolve P2P protocol. +/// +/// Each channel has its own rate limit quota and message capacity. +/// Consensus channels (0–2) are used by simplex BFT. +/// Application channels (3–4) are Evolve-specific. +pub mod channel { + /// Consensus vote messages (notarize, finalize, nullify). + /// High rate, small messages (~200 bytes). + pub const VOTES: u32 = 0; + + /// Consensus certificates (notarization, finalization, nullification). + /// High rate, medium messages (~2 KB with aggregated sigs). + pub const CERTIFICATES: u32 = 1; + + /// Block resolution requests/responses. + /// Medium rate, large messages (full block data). + pub const RESOLVER: u32 = 2; + + /// Block broadcast from proposer to validators. + /// Low rate (1 per view), large messages. + pub const BLOCK_BROADCAST: u32 = 3; + + /// Transaction gossip between validators. + /// Medium rate, medium messages (encoded transactions). + /// + /// NOTE: This channel is Evolve-specific — Alto doesn't have it because + /// Alto has no transaction concept. + pub const TX_GOSSIP: u32 = 4; + + /// Total number of channels. + pub const COUNT: u32 = 5; +} + +/// Rate limiting configuration for a single P2P channel. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RateLimit { + /// Sustained throughput: messages allowed per second. + pub messages_per_second: u32, + /// Burst capacity: maximum message spike above the sustained rate. + pub burst: u32, +} + +/// Configuration for a single P2P channel. +#[derive(Clone, Debug)] +pub struct ChannelConfig { + /// Channel identifier. Must match one of the constants in [`channel`]. + pub id: u32, + /// Maximum encoded size of a single message on this channel (bytes). + pub max_message_size: usize, + /// Rate limiting parameters for this channel. + pub rate_limit: RateLimit, +} + +/// Default channel configurations for all Evolve P2P channels. +/// +/// Values are conservative starting points. Production deployments should +/// tune `max_message_size` and `rate_limit` based on observed traffic. +pub fn default_channel_configs() -> Vec { + vec![ + ChannelConfig { + id: channel::VOTES, + max_message_size: 512, + rate_limit: RateLimit { + messages_per_second: 100, + burst: 200, + }, + }, + ChannelConfig { + id: channel::CERTIFICATES, + max_message_size: 4_096, + rate_limit: RateLimit { + messages_per_second: 100, + burst: 200, + }, + }, + ChannelConfig { + id: channel::RESOLVER, + max_message_size: 4_194_304, // 4 MiB + rate_limit: RateLimit { + messages_per_second: 10, + burst: 20, + }, + }, + ChannelConfig { + id: channel::BLOCK_BROADCAST, + max_message_size: 4_194_304, // 4 MiB + rate_limit: RateLimit { + messages_per_second: 5, + burst: 10, + }, + }, + ChannelConfig { + id: channel::TX_GOSSIP, + max_message_size: 65_536, // 64 KiB + rate_limit: RateLimit { + messages_per_second: 50, + burst: 100, + }, + }, + ] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn channel_ids_are_unique_and_sequential() { + assert_eq!(channel::VOTES, 0); + assert_eq!(channel::CERTIFICATES, 1); + assert_eq!(channel::RESOLVER, 2); + assert_eq!(channel::BLOCK_BROADCAST, 3); + assert_eq!(channel::TX_GOSSIP, 4); + assert_eq!(channel::COUNT, 5); + } + + #[test] + fn default_configs_cover_all_channels() { + let configs = default_channel_configs(); + assert_eq!(configs.len(), channel::COUNT as usize); + + let mut ids: Vec = configs.iter().map(|c| c.id).collect(); + ids.sort_unstable(); + let expected: Vec = (0..channel::COUNT).collect(); + assert_eq!(ids, expected); + } + + #[test] + fn default_configs_have_positive_limits() { + for cfg in default_channel_configs() { + assert!( + cfg.max_message_size > 0, + "channel {} has zero max_message_size", + cfg.id + ); + assert!( + cfg.rate_limit.messages_per_second > 0, + "channel {} has zero messages_per_second", + cfg.id + ); + assert!( + cfg.rate_limit.burst >= cfg.rate_limit.messages_per_second, + "channel {} burst is less than messages_per_second", + cfg.id + ); + } + } +} diff --git a/crates/p2p/src/config.rs b/crates/p2p/src/config.rs new file mode 100644 index 0000000..537834d --- /dev/null +++ b/crates/p2p/src/config.rs @@ -0,0 +1,104 @@ +//! Network configuration for Evolve's P2P layer. + +use std::{net::SocketAddr, time::Duration}; + +use crate::channels::{default_channel_configs, ChannelConfig}; + +/// Network configuration for Evolve's authenticated P2P layer. +/// +/// Passed to the Commonware P2P bootstrapper at startup. All fields have +/// sensible defaults via [`Default`]; override only what your deployment +/// requires. +#[derive(Clone, Debug)] +pub struct NetworkConfig { + /// Address to listen on for incoming P2P connections. + pub listen_addr: SocketAddr, + + /// Bootstrapper/seed node addresses for initial peer discovery. + /// + /// Leave empty for single-node dev setups. Production nodes should list + /// at least one stable seed address. + pub bootstrappers: Vec, + + /// Per-channel rate limiting and message size configuration. + /// + /// Defaults to [`default_channel_configs`] covering all five Evolve channels. + pub channel_configs: Vec, + + /// Maximum number of concurrent peer connections to maintain. + pub max_peers: usize, + + /// Timeout for establishing a new peer connection. + pub connection_timeout: Duration, + + /// Signing namespace for domain separation. + /// + /// Prevents cross-chain message replay. Must be unique per network. + pub namespace: Vec, +} + +impl Default for NetworkConfig { + fn default() -> Self { + Self { + listen_addr: "0.0.0.0:9000".parse().expect("static addr is valid"), + bootstrappers: vec![], + channel_configs: default_channel_configs(), + max_peers: 50, + connection_timeout: Duration::from_secs(10), + namespace: b"_EVOLVE".to_vec(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::channels::channel; + + #[test] + fn default_listen_addr() { + let cfg = NetworkConfig::default(); + assert_eq!(cfg.listen_addr.port(), 9000); + } + + #[test] + fn default_bootstrappers_empty() { + let cfg = NetworkConfig::default(); + assert!(cfg.bootstrappers.is_empty()); + } + + #[test] + fn default_channel_count() { + let cfg = NetworkConfig::default(); + assert_eq!(cfg.channel_configs.len(), channel::COUNT as usize); + } + + #[test] + fn default_namespace() { + let cfg = NetworkConfig::default(); + assert_eq!(cfg.namespace, b"_EVOLVE"); + } + + #[test] + fn default_max_peers_positive() { + let cfg = NetworkConfig::default(); + assert!(cfg.max_peers > 0); + } + + #[test] + fn custom_config_roundtrip() { + let addr: SocketAddr = "127.0.0.1:19000".parse().unwrap(); + let seed: SocketAddr = "10.0.0.1:9000".parse().unwrap(); + let cfg = NetworkConfig { + listen_addr: addr, + bootstrappers: vec![seed], + max_peers: 10, + namespace: b"_TEST".to_vec(), + ..NetworkConfig::default() + }; + assert_eq!(cfg.listen_addr, addr); + assert_eq!(cfg.bootstrappers.len(), 1); + assert_eq!(cfg.max_peers, 10); + assert_eq!(cfg.namespace, b"_TEST"); + } +} diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs new file mode 100644 index 0000000..9105a59 --- /dev/null +++ b/crates/p2p/src/lib.rs @@ -0,0 +1,19 @@ +//! P2P networking configuration for Evolve's consensus layer. +//! +//! This crate configures Commonware's authenticated P2P networking +//! for use in the Evolve blockchain. It provides: +//! +//! - Channel constants and rate limit configuration +//! - `NetworkConfig` for bootstrapping the P2P layer +//! - `ValidatorSet` for managing epoch-based validator identities +//! - `EpochPeerProvider` implementing the `Provider` trait for peer management + +pub mod channels; +pub mod config; +pub mod provider; +pub mod validator; + +pub use channels::{default_channel_configs, ChannelConfig, RateLimit}; +pub use config::NetworkConfig; +pub use provider::EpochPeerProvider; +pub use validator::{ValidatorIdentity, ValidatorSet}; diff --git a/crates/p2p/src/provider.rs b/crates/p2p/src/provider.rs new file mode 100644 index 0000000..b17f391 --- /dev/null +++ b/crates/p2p/src/provider.rs @@ -0,0 +1,202 @@ +//! Epoch-based peer set provider for the Evolve P2P layer. +//! +//! [`EpochPeerProvider`] implements [`commonware_p2p::Provider`] so that the +//! Commonware authenticated P2P stack can query the current set of validator +//! peers and receive change notifications on epoch transitions. + +use std::{collections::BTreeMap, fmt, sync::Arc}; + +use commonware_cryptography::ed25519; +use commonware_p2p::Provider; +use commonware_utils::ordered::Set; +use tokio::sync::{mpsc, RwLock}; + +/// Convenience alias for the subscriber notification payload. +/// +/// `(epoch_id, new_peer_set, all_tracked_peers)` +type Notification = (u64, Set, Set); + +#[derive(Debug)] +struct ProviderState { + /// Peer sets indexed by epoch. BTreeMap for deterministic iteration. + peer_sets: BTreeMap>, + /// Active subscriber channels. Dead senders are pruned on each notify. + subscribers: Vec>, + /// Union of all currently tracked peer sets. + all_peers: Set, +} + +impl ProviderState { + fn new() -> Self { + Self { + peer_sets: BTreeMap::new(), + subscribers: Vec::new(), + all_peers: Set::default(), + } + } + + /// Notify live subscribers and prune dead channels. + fn notify(&mut self, epoch: u64, peers: Set) { + self.subscribers.retain(|tx| { + tx.send((epoch, peers.clone(), self.all_peers.clone())) + .is_ok() + }); + } +} + +/// Provides peer sets to the P2P layer based on epochs. +/// +/// When the validator set changes (new epoch), call [`update_epoch`] to +/// register the new peer set and notify all current subscribers. +/// +/// [`EpochPeerProvider`] is cheaply cloneable; all clones share the same +/// underlying state via an [`Arc`]. +/// +/// [`update_epoch`]: EpochPeerProvider::update_epoch +#[derive(Clone)] +pub struct EpochPeerProvider { + inner: Arc>, +} + +impl fmt::Debug for EpochPeerProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EpochPeerProvider").finish_non_exhaustive() + } +} + +impl EpochPeerProvider { + /// Create a new provider with no registered peer sets. + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(ProviderState::new())), + } + } + + /// Register a peer set for the given epoch and notify all subscribers. + /// + /// If a peer set for `epoch` already exists it is replaced. Subscribers + /// receive the new set and the updated union of all tracked peers. Dead + /// subscriber channels are pruned automatically. + pub async fn update_epoch(&self, epoch: u64, peers: Set) { + let mut state = self.inner.write().await; + state.peer_sets.insert(epoch, peers.clone()); + + let all_peers: Vec = state + .peer_sets + .values() + .flat_map(|s| s.iter().cloned()) + .collect(); + state.all_peers = Set::from_iter_dedup(all_peers); + + state.notify(epoch, peers); + } +} + +impl Default for EpochPeerProvider { + fn default() -> Self { + Self::new() + } +} + +impl Provider for EpochPeerProvider { + type PublicKey = ed25519::PublicKey; + + fn peer_set( + &mut self, + id: u64, + ) -> impl std::future::Future>> + Send { + let inner = self.inner.clone(); + async move { inner.read().await.peer_sets.get(&id).cloned() } + } + + fn subscribe( + &mut self, + ) -> impl std::future::Future< + Output = mpsc::UnboundedReceiver<(u64, Set, Set)>, + > + Send { + let inner = self.inner.clone(); + async move { + let (tx, rx) = mpsc::unbounded_channel(); + inner.write().await.subscribers.push(tx); + rx + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use commonware_cryptography::{ed25519, Signer as _}; + + fn make_key(seed: u64) -> ed25519::PublicKey { + ed25519::PrivateKey::from_seed(seed).public_key() + } + + fn peer_set(seeds: &[u64]) -> Set { + Set::from_iter_dedup(seeds.iter().map(|&s| make_key(s))) + } + + #[tokio::test] + async fn peer_set_returns_none_before_registration() { + let mut provider = EpochPeerProvider::new(); + assert!(provider.peer_set(0).await.is_none()); + } + + #[tokio::test] + async fn peer_set_returns_registered_set() { + let mut provider = EpochPeerProvider::new(); + let peers = peer_set(&[1, 2, 3]); + provider.update_epoch(0, peers.clone()).await; + + let result = provider.peer_set(0).await.expect("epoch 0 must exist"); + assert_eq!(result, peers); + } + + #[tokio::test] + async fn peer_set_not_found_for_unknown_epoch() { + let mut provider = EpochPeerProvider::new(); + provider.update_epoch(1, peer_set(&[1])).await; + + assert!(provider.peer_set(0).await.is_none()); + assert!(provider.peer_set(2).await.is_none()); + } + + #[tokio::test] + async fn subscriber_notified_on_epoch_update() { + let provider = EpochPeerProvider::new(); + let mut subscriber = provider.clone(); + let mut rx = subscriber.subscribe().await; + + let peers = peer_set(&[10, 20]); + provider.update_epoch(5, peers.clone()).await; + + let (epoch, new_set, _all) = rx.recv().await.expect("notification expected"); + assert_eq!(epoch, 5); + assert_eq!(new_set, peers); + } + + #[tokio::test] + async fn all_peers_is_union_of_tracked_sets() { + let provider = EpochPeerProvider::new(); + let mut subscriber = provider.clone(); + let mut rx = subscriber.subscribe().await; + + provider.update_epoch(0, peer_set(&[1, 2])).await; + let _ = rx.recv().await; + + provider.update_epoch(1, peer_set(&[3, 4])).await; + let (_, _, all) = rx.recv().await.expect("second notification expected"); + + assert_eq!(all.len(), 4, "all_peers must be union of epochs 0 and 1"); + } + + #[tokio::test] + async fn clones_share_state() { + let provider = EpochPeerProvider::new(); + let mut clone = provider.clone(); + + provider.update_epoch(0, peer_set(&[7, 8])).await; + let result = clone.peer_set(0).await; + assert!(result.is_some(), "clone must see updates from original"); + } +} diff --git a/crates/p2p/src/validator.rs b/crates/p2p/src/validator.rs new file mode 100644 index 0000000..bcc23b9 --- /dev/null +++ b/crates/p2p/src/validator.rs @@ -0,0 +1,159 @@ +//! Validator identity and epoch-based validator set management. +//! +//! Each Evolve validator carries two public keys: +//! - An Ed25519 key for P2P identity and network authentication. +//! - A BLS12-381 key for consensus threshold signatures. +//! +//! [`ValidatorSet`] stores validators in a [`BTreeMap`] keyed by their Ed25519 +//! P2P key, guaranteeing deterministic iteration order for consensus. + +use std::collections::BTreeMap; + +use commonware_cryptography::{bls12381, ed25519}; + +/// A validator's dual identity in the Evolve network. +#[derive(Clone, Debug)] +pub struct ValidatorIdentity { + /// Ed25519 public key for P2P authentication and network-layer identity. + pub p2p_key: ed25519::PublicKey, + + /// BLS12-381 public key for consensus threshold signatures. + pub consensus_key: bls12381::PublicKey, +} + +/// The set of validators active during a specific epoch. +/// +/// Stored in a [`BTreeMap`] keyed by the Ed25519 P2P key so that iteration +/// order is deterministic — a requirement for consensus correctness. +#[derive(Clone, Debug)] +pub struct ValidatorSet { + validators: BTreeMap, + epoch: u64, +} + +impl ValidatorSet { + /// Create an empty validator set for the given epoch. + pub fn new(epoch: u64) -> Self { + Self { + validators: BTreeMap::new(), + epoch, + } + } + + /// Add a validator to the set. + /// + /// If a validator with the same P2P key already exists it is replaced. + pub fn add_validator(&mut self, identity: ValidatorIdentity) { + self.validators.insert(identity.p2p_key.clone(), identity); + } + + /// Return a reference to the underlying validator map. + pub fn validators(&self) -> &BTreeMap { + &self.validators + } + + /// Number of validators in the set. + pub fn len(&self) -> usize { + self.validators.len() + } + + /// Returns `true` when the set contains no validators. + pub fn is_empty(&self) -> bool { + self.validators.is_empty() + } + + /// The epoch this set corresponds to. + pub fn epoch(&self) -> u64 { + self.epoch + } + + /// Ed25519 P2P public keys for all validators, in deterministic order. + pub fn p2p_keys(&self) -> Vec { + self.validators.keys().cloned().collect() + } + + /// BLS12-381 consensus public keys for all validators, in the same + /// deterministic order as [`p2p_keys`](Self::p2p_keys). + pub fn consensus_keys(&self) -> Vec { + self.validators + .values() + .map(|v| v.consensus_key.clone()) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use commonware_cryptography::{bls12381, ed25519, Signer as _}; + + fn make_identity(seed: u64) -> ValidatorIdentity { + let p2p_key = ed25519::PrivateKey::from_seed(seed).public_key(); + let consensus_key = bls12381::PrivateKey::from_seed(seed).public_key(); + ValidatorIdentity { + p2p_key, + consensus_key, + } + } + + #[test] + fn new_set_is_empty() { + let vs = ValidatorSet::new(0); + assert!(vs.is_empty()); + assert_eq!(vs.len(), 0); + assert_eq!(vs.epoch(), 0); + } + + #[test] + fn add_and_retrieve_validators() { + let mut vs = ValidatorSet::new(1); + let id = make_identity(1); + let p2p_key = id.p2p_key.clone(); + vs.add_validator(id); + + assert_eq!(vs.len(), 1); + assert!(!vs.is_empty()); + assert!(vs.validators().contains_key(&p2p_key)); + } + + #[test] + fn duplicate_p2p_key_replaces_entry() { + let mut vs = ValidatorSet::new(2); + let id1 = make_identity(10); + let p2p_key = id1.p2p_key.clone(); + + let id2 = ValidatorIdentity { + p2p_key: p2p_key.clone(), + consensus_key: bls12381::PrivateKey::from_seed(99).public_key(), + }; + + vs.add_validator(id1); + vs.add_validator(id2); + assert_eq!(vs.len(), 1); + } + + #[test] + fn p2p_keys_deterministic_order() { + let mut vs = ValidatorSet::new(3); + for seed in 0..5u64 { + vs.add_validator(make_identity(seed)); + } + + let keys = vs.p2p_keys(); + let mut sorted = keys.clone(); + sorted.sort(); + assert_eq!( + keys, sorted, + "p2p_keys must be in sorted (deterministic) order" + ); + } + + #[test] + fn p2p_and_consensus_keys_same_length() { + let mut vs = ValidatorSet::new(4); + for seed in 0..3u64 { + vs.add_validator(make_identity(seed)); + } + assert_eq!(vs.p2p_keys().len(), vs.consensus_keys().len()); + } +} diff --git a/crates/storage/src/block_store.rs b/crates/storage/src/block_store.rs index caa5bfa..113605e 100644 --- a/crates/storage/src/block_store.rs +++ b/crates/storage/src/block_store.rs @@ -32,7 +32,7 @@ use crate::types::{BlockHash, BlockStorageConfig}; use commonware_codec::RangeCfg; -use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage}; +use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage}; use commonware_storage::{ archive::{ prunable::{Archive, Config as ArchiveConfig}, @@ -108,12 +108,12 @@ where // Buffer pool for the key journal. let page_size = std::num::NonZeroU16::new(KEY_JOURNAL_PAGE_SIZE).unwrap(); let cache_pages = std::num::NonZeroUsize::new(KEY_JOURNAL_CACHE_PAGES).unwrap(); - let key_buffer_pool = PoolRef::new(page_size, cache_pages); + let key_page_cache = CacheRef::new(page_size, cache_pages); let cfg = ArchiveConfig { translator: EightCap, key_partition: format!("{}-block-index", config.partition_prefix), - key_buffer_pool, + key_page_cache, value_partition: format!("{}-block-data", config.partition_prefix), // No compression by default. Blocks are often already compressed (gzip/zstd // at the application layer), so double-compression wastes CPU. diff --git a/crates/storage/src/qmdb_impl.rs b/crates/storage/src/qmdb_impl.rs index 93083bf..c97df67 100644 --- a/crates/storage/src/qmdb_impl.rs +++ b/crates/storage/src/qmdb_impl.rs @@ -27,12 +27,15 @@ use crate::types::{ StorageValueChunk, MAX_VALUE_DATA_SIZE, }; use async_trait::async_trait; -use commonware_cryptography::sha256::Sha256; -use commonware_runtime::{utils::buffer::pool::PoolRef, Clock, Metrics, Storage as RStorage}; +use commonware_cryptography::{sha256::Sha256, DigestOf}; +use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage}; use commonware_storage::qmdb::{ - current::{unordered::fixed::Db, FixedConfig}, + current::{ + db::{Merkleized, Unmerkleized}, + unordered::fixed::Db, + FixedConfig, + }, store::{Durable, NonDurable}, - Merkleized, Unmerkleized, }; use commonware_storage::translator::EightCap; use evolve_core::{ErrorCode, ReadonlyKV}; @@ -45,8 +48,16 @@ use tokio::sync::RwLock; /// Type alias for QMDB in Clean state (Merkleized, Durable) /// N = 64 because SHA256 digest is 32 bytes, and N must be 2 * digest_size -type QmdbClean = - Db, Durable>; +type QmdbClean = Db< + C, + StorageKey, + StorageValueChunk, + Sha256, + EightCap, + 64, + Merkleized>, + Durable, +>; /// Type alias for QMDB in Mutable state (Unmerkleized, NonDurable) type QmdbMutable = @@ -196,7 +207,7 @@ where bitmap_metadata_partition: format!("{}_bitmap-metadata", config.partition_prefix), translator: EightCap, thread_pool: None, - buffer_pool: PoolRef::new(page_size, capacity), + page_cache: CacheRef::new(page_size, capacity), }; // Initialize QMDB - starts in Clean state (Merkleized, Durable) From d3e45a7ea4aa04668737aeccc8d3228dd9ad659d Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sat, 21 Feb 2026 18:40:52 +0100 Subject: [PATCH 2/9] swarm: implement consensus application layer modules Add the evolve_consensus crate with commonware simplex consensus integration: - ConsensusBlock: wraps evolve_server::Block with commonware traits (Heightable, Digestible, Committable, Codec, Block) - EvolveAutomaton: bridges STF/mempool with consensus propose/verify - EvolveRelay: local in-memory block relay for digest broadcast - EvolveReporter: stub reporter for finalization callbacks - ConsensusConfig: consensus parameters (timeouts, gas limit, epoch) Also includes Part 1 changes: upgrade commonware deps from 0.0.65 to 2026.2.0 and fix API breakage in storage crate. Unit tests cover block serialization roundtrip, digest determinism, height extraction, parent hash, and uniqueness. Co-Authored-By: Claude Opus 4.6 --- Cargo.toml | 14 +- crates/consensus/Cargo.toml | 28 ++++ crates/consensus/src/automaton.rs | 241 +++++++++++++++++++++++++++ crates/consensus/src/block.rs | 268 ++++++++++++++++++++++++++++++ crates/consensus/src/config.rs | 34 ++++ crates/consensus/src/lib.rs | 16 ++ crates/consensus/src/relay.rs | 56 +++++++ crates/consensus/src/reporter.rs | 41 +++++ crates/storage/src/block_store.rs | 6 +- crates/storage/src/qmdb_impl.rs | 25 ++- 10 files changed, 714 insertions(+), 15 deletions(-) create mode 100644 crates/consensus/Cargo.toml create mode 100644 crates/consensus/src/automaton.rs create mode 100644 crates/consensus/src/block.rs create mode 100644 crates/consensus/src/config.rs create mode 100644 crates/consensus/src/lib.rs create mode 100644 crates/consensus/src/relay.rs create mode 100644 crates/consensus/src/reporter.rs diff --git a/Cargo.toml b/Cargo.toml index a461c5d..b5ddedc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "crates/testing/simulator", "crates/testing/debugger", "crates/testing/cli", + "crates/consensus", ] resolver = "2" @@ -70,11 +71,14 @@ evolve_testapp = { path = "bin/testapp" } # outside deps linkme = { version = "0.3", default-features = false } -commonware-cryptography = "0.0.65" -commonware-runtime = "0.0.65" -commonware-storage = "0.0.65" -commonware-utils = "0.0.65" -commonware-codec = "0.0.65" +commonware-cryptography = "2026.2.0" +commonware-runtime = "2026.2.0" +commonware-storage = "2026.2.0" +commonware-utils = "2026.2.0" +commonware-codec = "2026.2.0" +commonware-consensus = "2026.2.0" +commonware-broadcast = "2026.2.0" +evolve_consensus = { path = "crates/consensus" } borsh = { features = ["derive"], version = "1.5.5" } clap = { version = "4.5.31", features = ["derive"] } fixed = { version = "1.29", features = ["borsh", "serde"] } diff --git a/crates/consensus/Cargo.toml b/crates/consensus/Cargo.toml new file mode 100644 index 0000000..fd7f093 --- /dev/null +++ b/crates/consensus/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "evolve_consensus" +version = "0.1.0" +edition = "2021" +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +commonware-consensus = { workspace = true } +commonware-cryptography = { workspace = true } +commonware-codec = { workspace = true } +commonware-utils = { workspace = true } + +evolve_core = { workspace = true } +evolve_server = { workspace = true } +evolve_stf_traits = { workspace = true } +evolve_mempool = { workspace = true } +evolve_storage = { workspace = true } + +alloy-primitives = { workspace = true } +borsh = { workspace = true } +bytes = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/crates/consensus/src/automaton.rs b/crates/consensus/src/automaton.rs new file mode 100644 index 0000000..43cf85b --- /dev/null +++ b/crates/consensus/src/automaton.rs @@ -0,0 +1,241 @@ +use crate::block::ConsensusBlock; +use crate::config::ConsensusConfig; +use alloy_primitives::B256; +use commonware_consensus::types::Epoch; +use commonware_consensus::{Automaton, CertifiableAutomaton}; +use commonware_utils::channel::oneshot; +use evolve_core::ReadonlyKV; +use evolve_mempool::{Mempool, MempoolTx, SharedMempool}; +use evolve_server::BlockBuilder; +use evolve_server::StfExecutor; +use evolve_stf_traits::{AccountsCodeStorage, Transaction}; +use evolve_storage::Storage; +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, RwLock}; +use tokio::sync::RwLock as TokioRwLock; + +/// EvolveAutomaton bridges Evolve's STF and mempool with commonware's consensus. +/// +/// It implements the `Automaton` and `CertifiableAutomaton` traits, allowing +/// the simplex consensus engine to propose and verify blocks through Evolve's +/// state transition function. +/// +/// # Design +/// +/// Consensus operates on opaque digests, not full blocks. The automaton: +/// - On `propose()`: builds a block from mempool txs, stores it locally, +/// returns only the digest to consensus. +/// - On `verify()`: looks up the block by digest (populated via Relay), +/// validates parent chain, timestamp monotonicity, etc. +pub struct EvolveAutomaton { + stf: Stf, + storage: S, + codes: Codes, + mempool: SharedMempool>, + /// Block cache: stores proposed/received blocks by their digest. + pending_blocks: Arc>>>, + /// Current chain height. + height: Arc, + /// Last block hash. + last_hash: Arc>, + /// Consensus configuration. + config: ConsensusConfig, + /// Phantom for the context type. + _ctx: std::marker::PhantomData, +} + +impl Clone for EvolveAutomaton +where + Stf: Clone, + S: Clone, + Codes: Clone, +{ + fn clone(&self) -> Self { + Self { + stf: self.stf.clone(), + storage: self.storage.clone(), + codes: self.codes.clone(), + mempool: self.mempool.clone(), + pending_blocks: self.pending_blocks.clone(), + height: self.height.clone(), + last_hash: self.last_hash.clone(), + config: self.config.clone(), + _ctx: std::marker::PhantomData, + } + } +} + +impl EvolveAutomaton { + pub fn new( + stf: Stf, + storage: S, + codes: Codes, + mempool: SharedMempool>, + pending_blocks: Arc>>>, + config: ConsensusConfig, + ) -> Self { + Self { + stf, + storage, + codes, + mempool, + pending_blocks, + height: Arc::new(AtomicU64::new(1)), + last_hash: Arc::new(TokioRwLock::new(B256::ZERO)), + config, + _ctx: std::marker::PhantomData, + } + } + + /// Get the current height. + pub fn height(&self) -> u64 { + self.height.load(Ordering::SeqCst) + } + + /// Get a reference to the shared pending blocks. + pub fn pending_blocks(&self) -> &Arc>>> { + &self.pending_blocks + } +} + +impl Automaton for EvolveAutomaton +where + Tx: Transaction + MempoolTx + Clone + Send + Sync + 'static, + S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, + Codes: AccountsCodeStorage + Clone + Send + Sync + 'static, + Stf: StfExecutor + Send + Sync + Clone + 'static, + Ctx: Clone + Send + 'static, +{ + type Context = Ctx; + type Digest = commonware_cryptography::sha256::Digest; + + async fn genesis(&mut self, _epoch: Epoch) -> Self::Digest { + // Genesis: return the digest of the empty genesis block at height 0. + let genesis_block = BlockBuilder::::new() + .number(0) + .timestamp(0) + .parent_hash(B256::ZERO) + .gas_limit(self.config.gas_limit) + .build(); + + let cb = ConsensusBlock::new(genesis_block); + let digest = cb.digest; + + // Store genesis in pending blocks. + self.pending_blocks.write().unwrap().insert(digest.0, cb); + + digest + } + + async fn propose(&mut self, _context: Self::Context) -> oneshot::Receiver { + let (sender, receiver) = oneshot::channel(); + + let height = self.height.fetch_add(1, Ordering::SeqCst); + let last_hash = *self.last_hash.read().await; + let gas_limit = self.config.gas_limit; + let mempool = self.mempool.clone(); + let pending_blocks = self.pending_blocks.clone(); + + // Spawn block building onto a background task. + tokio::spawn(async move { + // Pull transactions from mempool. + let selected = { + let mut pool = mempool.write().await; + pool.select(1000) // max txs per block + }; + + let transactions: Vec = selected + .into_iter() + .map(|arc_tx| (*arc_tx).clone()) + .collect(); + + // Build the block. + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let block = BlockBuilder::::new() + .number(height) + .timestamp(timestamp) + .parent_hash(last_hash) + .gas_limit(gas_limit) + .transactions(transactions) + .build(); + + let cb = ConsensusBlock::new(block); + let digest = cb.digest; + + // Store in pending blocks for later retrieval. + pending_blocks.write().unwrap().insert(digest.0, cb); + + // Return the digest to consensus. + let _ = sender.send(digest); + }); + + receiver + } + + async fn verify( + &mut self, + _context: Self::Context, + payload: Self::Digest, + ) -> oneshot::Receiver { + let (sender, receiver) = oneshot::channel(); + + let pending_blocks = self.pending_blocks.clone(); + let last_hash = self.last_hash.clone(); + + tokio::spawn(async move { + // Look up the block by digest. + let block = { + let blocks = pending_blocks.read().unwrap(); + blocks.get(&payload.0).cloned() + }; + + let Some(block) = block else { + tracing::warn!( + digest = ?payload, + "verify: block not found in pending blocks" + ); + let _ = sender.send(false); + return; + }; + + // Validate parent hash chain. + let expected_parent = *last_hash.read().await; + if block.inner.header.parent_hash != expected_parent { + tracing::warn!( + expected = ?expected_parent, + actual = ?block.inner.header.parent_hash, + "verify: parent hash mismatch" + ); + let _ = sender.send(false); + return; + } + + // Validate height is positive. + if block.inner.header.number == 0 { + tracing::warn!("verify: block height cannot be 0 (genesis)"); + let _ = sender.send(false); + return; + } + + let _ = sender.send(true); + }); + + receiver + } +} + +impl CertifiableAutomaton for EvolveAutomaton +where + Tx: Transaction + MempoolTx + Clone + Send + Sync + 'static, + S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, + Codes: AccountsCodeStorage + Clone + Send + Sync + 'static, + Stf: StfExecutor + Send + Sync + Clone + 'static, + Ctx: Clone + Send + 'static, +{ + // Use the default implementation which always certifies. +} diff --git a/crates/consensus/src/block.rs b/crates/consensus/src/block.rs new file mode 100644 index 0000000..a47ab29 --- /dev/null +++ b/crates/consensus/src/block.rs @@ -0,0 +1,268 @@ +use alloy_primitives::B256; +use borsh::{BorshDeserialize, BorshSerialize}; +use bytes::{Buf, BufMut}; +use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt, Write}; +use commonware_consensus::types::Height; +use commonware_consensus::Heightable; +use commonware_cryptography::{Committable, Digestible, Hasher, Sha256}; +use evolve_server::{Block, BlockHeader}; + +/// A consensus-aware wrapper around Evolve's Block. +/// +/// Implements commonware traits (Heightable, Digestible, Committable, Codec) +/// so it can participate in simplex consensus. +#[derive(Debug, Clone)] +pub struct ConsensusBlock { + /// The inner evolve block. + pub inner: Block, + /// Cached digest (SHA-256 of the header fields). + pub digest: commonware_cryptography::sha256::Digest, + /// Cached parent digest. + pub parent_digest: commonware_cryptography::sha256::Digest, +} + +impl ConsensusBlock { + /// Create a new ConsensusBlock from an evolve Block. + /// + /// Computes and caches the block digest and parent digest. + pub fn new(inner: Block) -> Self { + let digest = compute_block_digest(&inner); + let parent_digest = hash_to_sha256_digest(inner.header.parent_hash); + Self { + inner, + digest, + parent_digest, + } + } + + /// Return the inner block hash as a B256. + pub fn block_hash(&self) -> B256 { + B256::from_slice(&self.digest.0) + } +} + +/// Compute a deterministic SHA-256 digest from block header fields. +fn compute_block_digest(block: &Block) -> commonware_cryptography::sha256::Digest { + let mut hasher = Sha256::new(); + hasher.update(&block.header.number.to_le_bytes()); + hasher.update(&block.header.timestamp.to_le_bytes()); + hasher.update(block.header.parent_hash.as_slice()); + hasher.update(&block.header.gas_limit.to_le_bytes()); + hasher.update(block.header.beneficiary.as_slice()); + hasher.finalize() +} + +/// Convert a B256 into a SHA-256 Digest (both are 32 bytes). +fn hash_to_sha256_digest(hash: B256) -> commonware_cryptography::sha256::Digest { + commonware_cryptography::sha256::Digest(hash.0) +} + +// -- Commonware trait implementations -- + +impl Heightable for ConsensusBlock { + fn height(&self) -> Height { + Height::new(self.inner.header.number) + } +} + +impl Digestible for ConsensusBlock { + type Digest = commonware_cryptography::sha256::Digest; + + fn digest(&self) -> Self::Digest { + self.digest + } +} + +impl Committable for ConsensusBlock { + type Commitment = commonware_cryptography::sha256::Digest; + + fn commitment(&self) -> Self::Commitment { + self.digest + } +} + +/// Borsh-based wire format for consensus block serialization. +#[derive(BorshSerialize, BorshDeserialize)] +struct WireBlock { + number: u64, + timestamp: u64, + parent_hash: [u8; 32], + gas_limit: u64, + gas_used: u64, + beneficiary: [u8; 20], + transactions_encoded: Vec>, +} + +impl Write for ConsensusBlock { + fn write(&self, buf: &mut impl BufMut) { + let wire = to_wire(&self.inner); + let bytes = borsh::to_vec(&wire).expect("wire block serialization should not fail"); + // Write length-prefixed bytes. + (bytes.len() as u32).write(buf); + buf.put_slice(&bytes); + } +} + +impl Read for ConsensusBlock { + type Cfg = (); + + fn read_cfg(buf: &mut impl Buf, _cfg: &Self::Cfg) -> Result { + let len = u32::read(buf)? as usize; + if buf.remaining() < len { + return Err(CodecError::EndOfBuffer); + } + let bytes = buf.copy_to_bytes(len); + let wire: WireBlock = borsh::from_slice(&bytes) + .map_err(|_| CodecError::Invalid("ConsensusBlock", "borsh deserialization failed"))?; + + let transactions: Vec = wire + .transactions_encoded + .iter() + .map(|encoded| { + borsh::from_slice(encoded).map_err(|_| { + CodecError::Invalid("ConsensusBlock", "tx borsh deserialization failed") + }) + }) + .collect::, _>>()?; + + let header = BlockHeader { + number: wire.number, + timestamp: wire.timestamp, + parent_hash: B256::from_slice(&wire.parent_hash), + gas_limit: wire.gas_limit, + gas_used: wire.gas_used, + beneficiary: alloy_primitives::Address::from_slice(&wire.beneficiary), + ..Default::default() + }; + + let block = Block::new(header, transactions); + Ok(ConsensusBlock::new(block)) + } +} + +impl EncodeSize for ConsensusBlock { + fn encode_size(&self) -> usize { + let wire = to_wire(&self.inner); + let bytes = borsh::to_vec(&wire).expect("wire block serialization should not fail"); + // u32 length prefix + payload + 4 + bytes.len() + } +} + +fn to_wire(inner: &Block) -> WireBlock { + WireBlock { + number: inner.header.number, + timestamp: inner.header.timestamp, + parent_hash: inner.header.parent_hash.0, + gas_limit: inner.header.gas_limit, + gas_used: inner.header.gas_used, + beneficiary: inner.header.beneficiary.0 .0, + transactions_encoded: inner + .transactions + .iter() + .map(|tx| borsh::to_vec(tx).expect("tx serialization should not fail")) + .collect(), + } +} + +impl + commonware_consensus::Block for ConsensusBlock +{ + fn parent(&self) -> Self::Commitment { + self.parent_digest + } +} + +#[cfg(test)] +mod tests { + use super::*; + use commonware_codec::{DecodeExt, Encode}; + + #[derive(Debug, Clone, BorshSerialize, BorshDeserialize, PartialEq)] + struct TestTx { + data: Vec, + } + + #[test] + fn test_encode_decode_roundtrip() { + let header = BlockHeader::new(42, 1000, B256::repeat_byte(0xAA)); + let txs = vec![ + TestTx { + data: vec![1, 2, 3], + }, + TestTx { + data: vec![4, 5, 6], + }, + ]; + let block = Block::new(header, txs); + let consensus_block = ConsensusBlock::new(block); + + let encoded = consensus_block.encode(); + let decoded = ConsensusBlock::::decode(encoded).unwrap(); + + assert_eq!( + decoded.inner.header.number, + consensus_block.inner.header.number + ); + assert_eq!( + decoded.inner.header.timestamp, + consensus_block.inner.header.timestamp + ); + assert_eq!( + decoded.inner.header.parent_hash, + consensus_block.inner.header.parent_hash + ); + assert_eq!( + decoded.inner.transactions, + consensus_block.inner.transactions + ); + assert_eq!(decoded.digest, consensus_block.digest); + } + + #[test] + fn test_digest_determinism() { + let header = BlockHeader::new(10, 500, B256::repeat_byte(0xBB)); + let txs = vec![TestTx { + data: vec![7, 8, 9], + }]; + + let block1 = Block::new(header.clone(), txs.clone()); + let block2 = Block::new(header, txs); + + let cb1 = ConsensusBlock::new(block1); + let cb2 = ConsensusBlock::new(block2); + + assert_eq!(cb1.digest, cb2.digest); + } + + #[test] + fn test_heightable() { + let header = BlockHeader::new(99, 0, B256::ZERO); + let block = Block::::new(header, vec![]); + let cb = ConsensusBlock::new(block); + + assert_eq!(cb.height(), Height::new(99)); + } + + #[test] + fn test_parent_returns_parent_hash() { + let parent = B256::repeat_byte(0xCC); + let header = BlockHeader::new(5, 0, parent); + let block = Block::::new(header, vec![]); + let cb = ConsensusBlock::new(block); + + let parent_digest = as commonware_consensus::Block>::parent(&cb); + assert_eq!(parent_digest.0, parent.0); + } + + #[test] + fn test_different_blocks_different_digests() { + let block1 = Block::::new(BlockHeader::new(1, 100, B256::ZERO), vec![]); + let block2 = Block::::new(BlockHeader::new(2, 100, B256::ZERO), vec![]); + + let cb1 = ConsensusBlock::new(block1); + let cb2 = ConsensusBlock::new(block2); + + assert_ne!(cb1.digest, cb2.digest); + } +} diff --git a/crates/consensus/src/config.rs b/crates/consensus/src/config.rs new file mode 100644 index 0000000..dc99d44 --- /dev/null +++ b/crates/consensus/src/config.rs @@ -0,0 +1,34 @@ +use std::time::Duration; + +/// Configuration for the consensus application layer. +#[derive(Debug, Clone)] +pub struct ConsensusConfig { + /// Chain ID for the evolve network. + pub chain_id: u64, + /// Namespace bytes for signing domain separation (e.g., b"_EVOLVE"). + pub namespace: Vec, + /// Gas limit per block. + pub gas_limit: u64, + /// Timeout before leader rotation. + pub leader_timeout: Duration, + /// Timeout for notarization. + pub notarization_timeout: Duration, + /// Timeout for activity before penalization. + pub activity_timeout: Duration, + /// Number of blocks per epoch. + pub epoch_length: u64, +} + +impl Default for ConsensusConfig { + fn default() -> Self { + Self { + chain_id: 1, + namespace: b"_EVOLVE".to_vec(), + gas_limit: 30_000_000, + leader_timeout: Duration::from_secs(2), + notarization_timeout: Duration::from_secs(2), + activity_timeout: Duration::from_secs(10), + epoch_length: 100, + } + } +} diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs new file mode 100644 index 0000000..3c24ebd --- /dev/null +++ b/crates/consensus/src/lib.rs @@ -0,0 +1,16 @@ +// Automaton uses SystemTime for block timestamps in propose(). +// This is acceptable in a consensus proposer context (non-deterministic timestamp +// is validated by verifiers and consensus). +#![allow(clippy::disallowed_types)] + +pub mod automaton; +pub mod block; +pub mod config; +pub mod relay; +pub mod reporter; + +pub use automaton::EvolveAutomaton; +pub use block::ConsensusBlock; +pub use config::ConsensusConfig; +pub use relay::EvolveRelay; +pub use reporter::EvolveReporter; diff --git a/crates/consensus/src/relay.rs b/crates/consensus/src/relay.rs new file mode 100644 index 0000000..8512286 --- /dev/null +++ b/crates/consensus/src/relay.rs @@ -0,0 +1,56 @@ +use crate::block::ConsensusBlock; +use commonware_consensus::Relay; +use std::collections::BTreeMap; +use std::sync::{Arc, RwLock}; + +/// A local in-memory relay for block broadcast. +/// +/// Stores proposed blocks so that other participants (or the local verifier) +/// can look them up by digest. In production, this would broadcast full blocks +/// over the P2P network. For now, it serves as a shared cache between the +/// automaton (proposer) and verifier. +#[derive(Clone)] +pub struct EvolveRelay { + /// Shared storage of blocks indexed by their digest. + blocks: Arc>>>, +} + +impl EvolveRelay { + pub fn new(blocks: Arc>>>) -> Self { + Self { blocks } + } + + /// Retrieve a block by its digest. + pub fn get_block(&self, digest: &[u8; 32]) -> Option> + where + Tx: Clone, + { + self.blocks.read().unwrap().get(digest).cloned() + } + + /// Insert a block into the relay's store. + pub fn insert_block(&self, block: ConsensusBlock) + where + Tx: Clone, + { + let digest = block.digest.0; + self.blocks.write().unwrap().insert(digest, block); + } +} + +impl Relay for EvolveRelay +where + Tx: Clone + Send + Sync + 'static, +{ + type Digest = commonware_cryptography::sha256::Digest; + + async fn broadcast(&mut self, payload: Self::Digest) { + // In production, this would broadcast the full block to all peers. + // For now, the block is already stored in the shared pending_blocks map + // by the automaton's propose() method. We just log the broadcast. + tracing::debug!( + digest = ?payload, + "relay: broadcasting block digest (local relay, no-op)" + ); + } +} diff --git a/crates/consensus/src/reporter.rs b/crates/consensus/src/reporter.rs new file mode 100644 index 0000000..c10c0de --- /dev/null +++ b/crates/consensus/src/reporter.rs @@ -0,0 +1,41 @@ +use commonware_consensus::Reporter; +use std::marker::PhantomData; + +/// A reporter that logs consensus activity events. +/// +/// Generic over the activity type so it works with any consensus scheme. +/// In production, this would handle finalization callbacks: +/// executing blocks through the STF, committing state changes, +/// and updating chain state. For now, it logs activity. +#[derive(Clone)] +pub struct EvolveReporter { + _phantom: PhantomData A>, +} + +impl EvolveReporter { + pub fn new() -> Self { + Self { + _phantom: PhantomData, + } + } +} + +impl Default for EvolveReporter { + fn default() -> Self { + Self::new() + } +} + +impl Reporter for EvolveReporter { + type Activity = A; + + async fn report(&mut self, _activity: Self::Activity) { + // In production, this would: + // 1. Extract finalization events (Finalization variant) + // 2. Look up the full block from pending_blocks by digest + // 3. Execute through STF + // 4. Commit state changes to storage + // 5. Update chain height and last_hash + tracing::debug!("reporter: received consensus activity"); + } +} diff --git a/crates/storage/src/block_store.rs b/crates/storage/src/block_store.rs index caa5bfa..113605e 100644 --- a/crates/storage/src/block_store.rs +++ b/crates/storage/src/block_store.rs @@ -32,7 +32,7 @@ use crate::types::{BlockHash, BlockStorageConfig}; use commonware_codec::RangeCfg; -use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage}; +use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage}; use commonware_storage::{ archive::{ prunable::{Archive, Config as ArchiveConfig}, @@ -108,12 +108,12 @@ where // Buffer pool for the key journal. let page_size = std::num::NonZeroU16::new(KEY_JOURNAL_PAGE_SIZE).unwrap(); let cache_pages = std::num::NonZeroUsize::new(KEY_JOURNAL_CACHE_PAGES).unwrap(); - let key_buffer_pool = PoolRef::new(page_size, cache_pages); + let key_page_cache = CacheRef::new(page_size, cache_pages); let cfg = ArchiveConfig { translator: EightCap, key_partition: format!("{}-block-index", config.partition_prefix), - key_buffer_pool, + key_page_cache, value_partition: format!("{}-block-data", config.partition_prefix), // No compression by default. Blocks are often already compressed (gzip/zstd // at the application layer), so double-compression wastes CPU. diff --git a/crates/storage/src/qmdb_impl.rs b/crates/storage/src/qmdb_impl.rs index 93083bf..c97df67 100644 --- a/crates/storage/src/qmdb_impl.rs +++ b/crates/storage/src/qmdb_impl.rs @@ -27,12 +27,15 @@ use crate::types::{ StorageValueChunk, MAX_VALUE_DATA_SIZE, }; use async_trait::async_trait; -use commonware_cryptography::sha256::Sha256; -use commonware_runtime::{utils::buffer::pool::PoolRef, Clock, Metrics, Storage as RStorage}; +use commonware_cryptography::{sha256::Sha256, DigestOf}; +use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage}; use commonware_storage::qmdb::{ - current::{unordered::fixed::Db, FixedConfig}, + current::{ + db::{Merkleized, Unmerkleized}, + unordered::fixed::Db, + FixedConfig, + }, store::{Durable, NonDurable}, - Merkleized, Unmerkleized, }; use commonware_storage::translator::EightCap; use evolve_core::{ErrorCode, ReadonlyKV}; @@ -45,8 +48,16 @@ use tokio::sync::RwLock; /// Type alias for QMDB in Clean state (Merkleized, Durable) /// N = 64 because SHA256 digest is 32 bytes, and N must be 2 * digest_size -type QmdbClean = - Db, Durable>; +type QmdbClean = Db< + C, + StorageKey, + StorageValueChunk, + Sha256, + EightCap, + 64, + Merkleized>, + Durable, +>; /// Type alias for QMDB in Mutable state (Unmerkleized, NonDurable) type QmdbMutable = @@ -196,7 +207,7 @@ where bitmap_metadata_partition: format!("{}_bitmap-metadata", config.partition_prefix), translator: EightCap, thread_pool: None, - buffer_pool: PoolRef::new(page_size, capacity), + page_cache: CacheRef::new(page_size, capacity), }; // Initialize QMDB - starts in Clean state (Merkleized, Durable) From e3079ff4bba9d19a08d0ac6d9756196c4a95096d Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sat, 21 Feb 2026 19:19:11 +0100 Subject: [PATCH 3/9] swarm: fix QMDB stack overflow and scope clippy allow Fix 1 (crates/storage/src/qmdb_impl.rs): - Box both QmdbClean and QmdbMutable in QmdbState enum to eliminate large-enum-variant lint and reduce stack pressure at every construction site. - Box::pin the Db::init() future so its state machine lives on the heap rather than being inlined into the caller's async state machine, preventing stack overflow when block_on runs on the test thread (8 MB default stack). - Keep clean_db as Box> throughout commit_state to avoid moving the large type back onto the stack. Fix 2 (crates/consensus/src/lib.rs + automaton.rs): - Remove crate-level #![allow(clippy::disallowed_types)] from lib.rs. - Scope the allow to just the propose() method in automaton.rs where SystemTime::now() is legitimately used for block timestamps. Co-Authored-By: Claude Sonnet 4.6 --- crates/consensus/src/automaton.rs | 3 +++ crates/consensus/src/lib.rs | 5 ---- crates/storage/src/qmdb_impl.rs | 41 +++++++++++++++++++------------ 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/crates/consensus/src/automaton.rs b/crates/consensus/src/automaton.rs index 43cf85b..a247189 100644 --- a/crates/consensus/src/automaton.rs +++ b/crates/consensus/src/automaton.rs @@ -128,6 +128,9 @@ where digest } + // SystemTime::now() is used here for block timestamps only. This is acceptable + // in a consensus proposer context — the timestamp is validated by verifiers. + #[allow(clippy::disallowed_types)] async fn propose(&mut self, _context: Self::Context) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index 3c24ebd..473bcef 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -1,8 +1,3 @@ -// Automaton uses SystemTime for block timestamps in propose(). -// This is acceptable in a consensus proposer context (non-deterministic timestamp -// is validated by verifiers and consensus). -#![allow(clippy::disallowed_types)] - pub mod automaton; pub mod block; pub mod config; diff --git a/crates/storage/src/qmdb_impl.rs b/crates/storage/src/qmdb_impl.rs index c97df67..5f4e8c7 100644 --- a/crates/storage/src/qmdb_impl.rs +++ b/crates/storage/src/qmdb_impl.rs @@ -67,12 +67,16 @@ type QmdbMutable = type QmdbDurable = Db; -/// Internal state enum to track QMDB state machine transitions +/// Internal state enum to track QMDB state machine transitions. +/// +/// Both large variants are boxed to keep `sizeof(QmdbState)` small. +/// This prevents large stack allocations at every construction site and +/// keeps async future state machines that hold this enum small. enum QmdbState { - /// Clean state - ready for proofs, durable - Clean(QmdbClean), - /// Mutable state - can perform updates/deletes - Mutable(QmdbMutable), + /// Clean state - ready for proofs, durable (boxed to reduce stack pressure) + Clean(Box>), + /// Mutable state - can perform updates/deletes (boxed to reduce stack pressure) + Mutable(Box>), /// Transitional state for ownership management Transitioning, } @@ -210,14 +214,17 @@ where page_cache: CacheRef::new(page_size, capacity), }; - // Initialize QMDB - starts in Clean state (Merkleized, Durable) - let db: QmdbClean = Db::init(context.clone(), qmdb_config) + // Initialize QMDB - starts in Clean state (Merkleized, Durable). + // Box::pin prevents the Db::init future's state machine from being + // stored inline in with_metrics' state machine, keeping the combined + // state machine (which lives on the test/caller thread's stack) small. + let db: QmdbClean = Box::pin(Db::init(context.clone(), qmdb_config)) .await .map_err(|e| StorageError::Qmdb(e.to_string()))?; Ok(Self { context: Arc::new(context), - state: Arc::new(RwLock::new(QmdbState::Clean(db))), + state: Arc::new(RwLock::new(QmdbState::Clean(Box::new(db)))), cache: Arc::new(ShardedDbCache::with_defaults()), metrics, }) @@ -255,9 +262,10 @@ where // Take ownership to perform state transitions let current_state = std::mem::replace(&mut *state_guard, QmdbState::Transitioning); - let clean_db = match current_state { + // Keep clean_db boxed to avoid putting QmdbClean on the stack. + let clean_db: Box> = match current_state { QmdbState::Clean(db) => { - // Already clean, just return current root + // Already clean, return the existing Box directly. db } QmdbState::Mutable(db) => { @@ -273,7 +281,7 @@ where .await .map_err(|e| StorageError::Qmdb(e.to_string()))?; - clean + Box::new(clean) } QmdbState::Transitioning => { return Err(StorageError::InvalidState( @@ -282,7 +290,7 @@ where } }; - // Get the root hash + // Get the root hash (Deref coerces Box> → &QmdbClean). let root = clean_db.root(); let hash = match root.as_ref().try_into() { Ok(bytes) => crate::types::CommitHash::new(bytes), @@ -292,7 +300,7 @@ where } }; - // Store clean state back + // Store clean state back (already boxed, no extra allocation). *state_guard = QmdbState::Clean(clean_db); // Record commit latency @@ -358,10 +366,11 @@ where // Take ownership to perform state transition if needed let current_state = std::mem::replace(&mut *state_guard, QmdbState::Transitioning); - let mut mutable_db: QmdbMutable = match current_state { + let mut mutable_db: Box> = match current_state { QmdbState::Clean(db) => { - // Clean → into_mutable() → Mutable (sync method) - db.into_mutable() + // Clean → into_mutable() → Mutable (sync method). + // Rust auto-moves out of Box for consuming methods. + Box::new(db.into_mutable()) } QmdbState::Mutable(db) => db, QmdbState::Transitioning => { From dc802737dfc7c4cbb839ad143321e4a304d87a75 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sat, 21 Feb 2026 21:26:49 +0100 Subject: [PATCH 4/9] swarm: wire simplex consensus engine with P2P channels and integration tests Add engine.rs and runner.rs modules to crates/consensus/ that configure and start commonware's simplex BFT consensus engine with Evolve's types: - SimplexSetup: configures simplex::Config with RoundRobin leader election, Sequential strategy, and Evolve-specific parameters (timeouts, cache sizes) - ConsensusRunner: higher-level orchestrator that wires consensus config, network config, and P2P channel pairs into a running engine - start_engine: convenience function for starting an engine with channel pairs Integration tests using commonware's deterministic runtime and simulated P2P network verify single-validator consensus produces and finalizes blocks through both SimplexSetup and ConsensusRunner APIs. Co-Authored-By: Claude Opus 4.6 --- Cargo.toml | 1 + crates/consensus/Cargo.toml | 13 ++ crates/consensus/src/engine.rs | 147 ++++++++++++ crates/consensus/src/lib.rs | 4 + crates/consensus/src/runner.rs | 83 +++++++ crates/consensus/tests/simplex_integration.rs | 216 ++++++++++++++++++ 6 files changed, 464 insertions(+) create mode 100644 crates/consensus/src/engine.rs create mode 100644 crates/consensus/src/runner.rs create mode 100644 crates/consensus/tests/simplex_integration.rs diff --git a/Cargo.toml b/Cargo.toml index f1f9a8a..cba7470 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ evolve_p2p = { path = "crates/p2p" } # outside deps linkme = { version = "0.3", default-features = false } commonware-cryptography = "2026.2.0" +commonware-parallel = "2026.2.0" commonware-runtime = "2026.2.0" commonware-storage = "2026.2.0" commonware-utils = "2026.2.0" diff --git a/crates/consensus/Cargo.toml b/crates/consensus/Cargo.toml index fd7f093..ef0ef60 100644 --- a/crates/consensus/Cargo.toml +++ b/crates/consensus/Cargo.toml @@ -10,6 +10,9 @@ rust-version.workspace = true commonware-consensus = { workspace = true } commonware-cryptography = { workspace = true } commonware-codec = { workspace = true } +commonware-p2p = { workspace = true } +commonware-parallel = "2026.2.0" +commonware-runtime = { workspace = true } commonware-utils = { workspace = true } evolve_core = { workspace = true } @@ -17,6 +20,9 @@ evolve_server = { workspace = true } evolve_stf_traits = { workspace = true } evolve_mempool = { workspace = true } evolve_storage = { workspace = true } +evolve_p2p = { package = "evolve-p2p", path = "../p2p" } + +rand_core = "0.6" alloy-primitives = { workspace = true } borsh = { workspace = true } @@ -24,5 +30,12 @@ bytes = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +[dev-dependencies] +commonware-consensus = { workspace = true, features = ["fuzz"] } +commonware-cryptography = { workspace = true, features = ["mocks"] } +commonware-p2p = { workspace = true } +commonware-runtime = { workspace = true, features = ["test-utils"] } +commonware-macros = "2026.2.0" + [lints] workspace = true diff --git a/crates/consensus/src/engine.rs b/crates/consensus/src/engine.rs new file mode 100644 index 0000000..88996f1 --- /dev/null +++ b/crates/consensus/src/engine.rs @@ -0,0 +1,147 @@ +//! Simplex consensus engine configuration and setup. +//! +//! Configures `commonware_consensus::simplex::Engine` to work with Evolve's +//! automaton, relay, and reporter types. This module provides [`SimplexSetup`] +//! which builds the engine from a signing scheme, leader elector, and Evolve +//! application components. + +use crate::config::ConsensusConfig; +use commonware_consensus::simplex; +use commonware_consensus::simplex::elector::{Config as ElectorConfig, RoundRobin}; +use commonware_consensus::simplex::types::{Activity, Context}; +use commonware_consensus::{CertifiableAutomaton, Relay, Reporter}; +use commonware_cryptography::certificate::Scheme; +use commonware_cryptography::Digest; +use commonware_p2p::{Blocker, Receiver, Sender}; +use commonware_parallel::Sequential; +use commonware_runtime::buffer::paged::CacheRef; +use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage}; +use rand_core::CryptoRngCore; +use std::num::{NonZeroU16, NonZeroUsize}; +use std::time::Duration; + +/// Convenience re-export: the simplex `scheme::Scheme` super-trait that binds +/// a `certificate::Scheme` to simplex's `Subject` type. +pub use commonware_consensus::simplex::scheme::Scheme as SimplexScheme; + +/// Configuration and factory for the simplex consensus engine. +/// +/// Captures the configuration needed to build a simplex engine instance. +/// Call [`build`](Self::build) to construct the engine, then +/// [`Engine::start`](simplex::Engine::start) to begin consensus. +/// +/// # Type Parameters +/// +/// * `S` - Signing scheme (e.g., `ed25519::Scheme`, `bls12381_threshold::standard::Scheme`) +/// * `L` - Leader elector config (e.g., `RoundRobin`) +pub struct SimplexSetup { + config: ConsensusConfig, + scheme: S, + elector: L, +} + +impl SimplexSetup { + /// Create a new simplex setup with round-robin leader election. + pub fn new(config: ConsensusConfig, scheme: S) -> Self { + Self { + config, + scheme, + elector: RoundRobin::default(), + } + } +} + +impl SimplexSetup { + /// Set a custom leader elector configuration. + pub fn with_elector(self, elector: L2) -> SimplexSetup { + SimplexSetup { + config: self.config, + scheme: self.scheme, + elector, + } + } + + /// Build the simplex engine with the given runtime context and consensus components. + /// + /// Returns the engine ready to be started with + /// [`Engine::start`](simplex::Engine::start). + #[allow(clippy::too_many_arguments)] + pub fn build( + self, + context: E, + automaton: A, + relay: R, + reporter: F, + blocker: B, + ) -> simplex::Engine + where + E: Clock + CryptoRngCore + Spawner + Storage + Metrics, + D: Digest, + S: SimplexScheme, + L: ElectorConfig, + B: Blocker, + A: CertifiableAutomaton, Digest = D>, + R: Relay, + F: Reporter>, + { + let cfg = simplex::Config { + scheme: self.scheme, + elector: self.elector, + blocker, + automaton, + relay, + reporter, + strategy: Sequential, + partition: "evolve".to_string(), + mailbox_size: 1024, + epoch: commonware_consensus::types::Epoch::new(0), + replay_buffer: NonZeroUsize::new(65536).unwrap(), + write_buffer: NonZeroUsize::new(4096).unwrap(), + page_cache: CacheRef::new( + NonZeroU16::new(4096).unwrap(), // page_size + NonZeroUsize::new(1024).unwrap(), // cache_size + ), + leader_timeout: self.config.leader_timeout, + notarization_timeout: self.config.notarization_timeout, + nullify_retry: Duration::from_secs(1), + activity_timeout: commonware_consensus::types::ViewDelta::new(10), + skip_timeout: commonware_consensus::types::ViewDelta::new(5), + fetch_timeout: Duration::from_secs(5), + fetch_concurrent: 3, + }; + + simplex::Engine::new(context, cfg) + } +} + +/// Start a simplex engine with the given P2P channel pairs. +/// +/// Convenience function that calls `engine.start()` with the three required +/// network channel pairs (votes, certificates, resolver). +pub fn start_engine( + engine: simplex::Engine, + vote_network: ( + impl Sender, + impl Receiver, + ), + certificate_network: ( + impl Sender, + impl Receiver, + ), + resolver_network: ( + impl Sender, + impl Receiver, + ), +) -> Handle<()> +where + E: Clock + CryptoRngCore + Spawner + Storage + Metrics, + S: SimplexScheme, + L: ElectorConfig, + B: Blocker, + D: Digest, + A: CertifiableAutomaton, Digest = D>, + R: Relay, + F: Reporter>, +{ + engine.start(vote_network, certificate_network, resolver_network) +} diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index 473bcef..04261a9 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -1,11 +1,15 @@ pub mod automaton; pub mod block; pub mod config; +pub mod engine; pub mod relay; pub mod reporter; +pub mod runner; pub use automaton::EvolveAutomaton; pub use block::ConsensusBlock; pub use config::ConsensusConfig; +pub use engine::SimplexSetup; pub use relay::EvolveRelay; pub use reporter::EvolveReporter; +pub use runner::ConsensusRunner; diff --git a/crates/consensus/src/runner.rs b/crates/consensus/src/runner.rs new file mode 100644 index 0000000..9c37557 --- /dev/null +++ b/crates/consensus/src/runner.rs @@ -0,0 +1,83 @@ +//! Consensus lifecycle orchestrator. +//! +//! [`ConsensusRunner`] manages the full lifecycle of consensus + P2P subsystems: +//! network initialization, engine startup, and graceful shutdown. + +use crate::config::ConsensusConfig; +use crate::engine::{SimplexScheme, SimplexSetup}; +use commonware_consensus::simplex::elector::{Config as ElectorConfig, RoundRobin}; +use commonware_consensus::simplex::types::{Activity, Context}; +use commonware_consensus::{CertifiableAutomaton, Relay, Reporter}; +use commonware_cryptography::Digest; +use commonware_p2p::{Blocker, Receiver, Sender}; +use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage}; +use evolve_p2p::NetworkConfig; +use rand_core::CryptoRngCore; + +/// Orchestrates the full consensus lifecycle. +/// +/// Wires together P2P networking and the simplex consensus engine, +/// managing startup and graceful shutdown of all subsystems. +/// +/// # Lifecycle +/// +/// 1. Create the runner with consensus and network configuration. +/// 2. Call [`start`](Self::start) with the runtime context, automaton, scheme, +/// blocker, and network channels. +/// 3. The returned [`Handle`] completes when the runtime signals shutdown. +pub struct ConsensusRunner { + consensus_config: ConsensusConfig, + _network_config: NetworkConfig, +} + +impl ConsensusRunner { + /// Create a new consensus runner. + pub fn new(consensus_config: ConsensusConfig, network_config: NetworkConfig) -> Self { + Self { + consensus_config, + _network_config: network_config, + } + } + + /// Start the consensus engine with the given runtime context and components. + /// + /// Builds a simplex engine from the provided components, then starts it + /// with the given P2P channel pairs. + /// + /// # Returns + /// + /// A [`Handle`] that completes when the engine shuts down. + #[allow(clippy::too_many_arguments)] + pub fn start( + &self, + context: E, + automaton: A, + relay: R, + reporter: F, + scheme: S, + blocker: B, + vote_network: (VS, VR), + certificate_network: (CS, CR), + resolver_network: (RS, RR), + ) -> Handle<()> + where + E: Clock + CryptoRngCore + Spawner + Storage + Metrics, + S: SimplexScheme + Clone, + D: Digest, + B: Blocker, + A: CertifiableAutomaton, Digest = D>, + R: Relay, + F: Reporter>, + RoundRobin: ElectorConfig, + VS: Sender, + VR: Receiver, + CS: Sender, + CR: Receiver, + RS: Sender, + RR: Receiver, + { + let setup = SimplexSetup::new(self.consensus_config.clone(), scheme); + let engine = setup.build(context, automaton, relay, reporter, blocker); + engine.start(vote_network, certificate_network, resolver_network) + } +} diff --git a/crates/consensus/tests/simplex_integration.rs b/crates/consensus/tests/simplex_integration.rs new file mode 100644 index 0000000..90b0057 --- /dev/null +++ b/crates/consensus/tests/simplex_integration.rs @@ -0,0 +1,216 @@ +//! Integration tests for the simplex consensus engine wiring. +//! +//! Validates that [`SimplexSetup`] correctly configures and starts a simplex +//! engine using ed25519 signatures, RoundRobin leader election, and the +//! commonware simulated P2P network. + +use commonware_consensus::simplex::elector::RoundRobin; +use commonware_consensus::simplex::mocks; +use commonware_consensus::simplex::scheme::ed25519; +use commonware_consensus::types::View; +use commonware_consensus::Monitor; +use commonware_cryptography::Sha256; +use commonware_p2p::simulated::{Config as NetConfig, Network}; +use commonware_runtime::{deterministic, Metrics, Quota, Runner}; +use evolve_consensus::engine::SimplexSetup; +use evolve_consensus::ConsensusConfig; +use std::num::NonZeroU32; +use std::sync::Arc; +use std::time::Duration; + +/// Single-validator consensus: one validator starts consensus, proposes +/// blocks, and finalizes them via [`SimplexSetup`]. +/// +/// Verifies: +/// - The engine starts successfully via SimplexSetup::build +/// - The automaton's propose() is called (blocks are produced) +/// - Blocks are finalized (reporter receives finalization events) +/// - No faults are detected +#[test] +fn single_validator_consensus_produces_and_finalizes_blocks() { + let executor = deterministic::Runner::timed(Duration::from_secs(120)); + executor.start(|mut context| async move { + let namespace = b"evolve_test".to_vec(); + let n = 1u32; + + // Create simulated P2P network. + let (network, oracle) = Network::new( + context.with_label("network"), + NetConfig { + max_size: 1024 * 1024, + disconnect_on_block: true, + tracked_peer_sets: None, + }, + ); + network.start(); + + // Generate ed25519 fixture for 1 validator. + let fixture = ed25519::fixture(&mut context, &namespace, n); + let validator = fixture.participants[0].clone(); + + // Register validator channels on simulated network. + let control = oracle.control(validator.clone()); + let quota = Quota::per_second(NonZeroU32::MAX); + let vote_network = control.register(0, quota).await.unwrap(); + let certificate_network = control.register(1, quota).await.unwrap(); + let resolver_network = control.register(2, quota).await.unwrap(); + + // Create mock relay, application, and reporter. + let relay = Arc::new(mocks::relay::Relay::new()); + let app_cfg = mocks::application::Config { + hasher: Sha256::default(), + relay: relay.clone(), + me: validator.clone(), + propose_latency: (5.0, 1.0), + verify_latency: (5.0, 1.0), + certify_latency: (5.0, 1.0), + should_certify: mocks::application::Certifier::Always, + }; + let (application, mailbox) = + mocks::application::Application::new(context.with_label("application"), app_cfg); + application.start(); + + let elector = RoundRobin::::default(); + let reporter_cfg = mocks::reporter::Config { + participants: fixture.participants.clone().try_into().expect("non-empty"), + scheme: fixture.schemes[0].clone(), + elector: elector.clone(), + }; + let mut reporter = + mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg); + + // Subscribe to finalization events BEFORE starting the engine. + let (mut latest, mut monitor): (View, _) = reporter.subscribe().await; + + // Use SimplexSetup to build the engine -- this is the code under test. + let config = ConsensusConfig { + chain_id: 1, + namespace: namespace.clone(), + gas_limit: 30_000_000, + leader_timeout: Duration::from_secs(1), + notarization_timeout: Duration::from_secs(2), + activity_timeout: Duration::from_secs(10), + epoch_length: 100, + }; + let setup = SimplexSetup::new(config, fixture.schemes[0].clone()); + let engine = setup.build( + context.with_label("engine"), + mailbox.clone(), // automaton + mailbox, // relay (Mailbox implements both) + reporter.clone(), + oracle.control(validator.clone()), // blocker + ); + + // Start the engine with P2P channels. + let _handle = engine.start(vote_network, certificate_network, resolver_network); + + // Wait for at least 3 finalized views. + let target = View::new(3); + while latest < target { + latest = monitor.recv().await.expect("monitor channel closed"); + } + + // Verify: we reached the target view through the monitor. + assert!( + latest >= target, + "expected finalization to reach view {target:?}, got {latest:?}" + ); + + // Verify: no faults detected. + let faults = reporter.faults.lock().unwrap(); + assert!(faults.is_empty(), "unexpected faults detected"); + + // Verify: no invalid signatures. + let invalid = *reporter.invalid.lock().unwrap(); + assert_eq!(invalid, 0, "unexpected invalid signatures: {invalid}"); + }); +} + +/// Verify that ConsensusRunner wires the engine correctly. +/// +/// This test validates the higher-level ConsensusRunner API produces +/// a running consensus engine. +#[test] +fn consensus_runner_starts_engine() { + let executor = deterministic::Runner::timed(Duration::from_secs(120)); + executor.start(|mut context| async move { + let namespace = b"runner_test".to_vec(); + let n = 1u32; + + // Create simulated P2P network. + let (network, oracle) = Network::new( + context.with_label("network"), + NetConfig { + max_size: 1024 * 1024, + disconnect_on_block: true, + tracked_peer_sets: None, + }, + ); + network.start(); + + // Generate fixture. + let fixture = ed25519::fixture(&mut context, &namespace, n); + let validator = fixture.participants[0].clone(); + + // Register channels. + let control = oracle.control(validator.clone()); + let quota = Quota::per_second(NonZeroU32::MAX); + let vote_network = control.register(0, quota).await.unwrap(); + let certificate_network = control.register(1, quota).await.unwrap(); + let resolver_network = control.register(2, quota).await.unwrap(); + + // Create mock components. + let relay = Arc::new(mocks::relay::Relay::new()); + let app_cfg = mocks::application::Config { + hasher: Sha256::default(), + relay: relay.clone(), + me: validator.clone(), + propose_latency: (5.0, 1.0), + verify_latency: (5.0, 1.0), + certify_latency: (5.0, 1.0), + should_certify: mocks::application::Certifier::Always, + }; + let (application, mailbox) = + mocks::application::Application::new(context.with_label("application"), app_cfg); + application.start(); + + let elector = RoundRobin::::default(); + let reporter_cfg = mocks::reporter::Config { + participants: fixture.participants.clone().try_into().expect("non-empty"), + scheme: fixture.schemes[0].clone(), + elector: elector.clone(), + }; + let mut reporter = + mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg); + + let (mut latest, mut monitor): (View, _) = reporter.subscribe().await; + + // Use ConsensusRunner -- the higher-level API under test. + let consensus_config = ConsensusConfig::default(); + let network_config = evolve_p2p::NetworkConfig::default(); + let runner = evolve_consensus::ConsensusRunner::new(consensus_config, network_config); + let _handle = runner.start( + context.with_label("runner_engine"), + mailbox.clone(), + mailbox, + reporter.clone(), + fixture.schemes[0].clone(), + oracle.control(validator.clone()), + vote_network, + certificate_network, + resolver_network, + ); + + // Wait for finalization progress. + let target = View::new(2); + while latest < target { + latest = monitor.recv().await.expect("monitor channel closed"); + } + + // Verify: we reached the target view through the monitor. + assert!( + latest >= target, + "expected finalization to reach view {target:?}, got {latest:?}" + ); + }); +} From d6f46adf9e4f5cf8cb545e4ba7e898c80cffdb02 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sat, 21 Feb 2026 21:48:30 +0100 Subject: [PATCH 5/9] swarm: add marshal wiring for ordered finalized block delivery Add marshal module to evolve_consensus that configures commonware_consensus::marshal::Actor for sequential block delivery. The marshal sits between simplex and the application reporter, buffering and reordering finalization events. Changes: - New marshal.rs: MarshalConfig, init_marshal_config(), archive_config(), resolver_config() helpers with re-exports of MarshalActor, MarshalMailbox - Updated runner.rs docs to show marshal integration pattern - Fixed dependency consistency: commonware-parallel and commonware-macros now use { workspace = true } instead of literal version strings - Added commonware-broadcast, commonware-resolver, commonware-storage as workspace dependencies - Added commonware-macros to workspace deps in root Cargo.toml Co-Authored-By: Claude Opus 4.6 --- Cargo.toml | 1 + crates/consensus/Cargo.toml | 7 +- crates/consensus/src/lib.rs | 2 + crates/consensus/src/marshal.rs | 174 ++++++++++++++++++++++++++++++++ crates/consensus/src/runner.rs | 41 +++++++- 5 files changed, 221 insertions(+), 4 deletions(-) create mode 100644 crates/consensus/src/marshal.rs diff --git a/Cargo.toml b/Cargo.toml index cba7470..ff2503a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ commonware-utils = "2026.2.0" commonware-codec = "2026.2.0" commonware-consensus = "2026.2.0" commonware-broadcast = "2026.2.0" +commonware-macros = "2026.2.0" commonware-p2p = "2026.2.0" commonware-resolver = "2026.2.0" evolve_consensus = { path = "crates/consensus" } diff --git a/crates/consensus/Cargo.toml b/crates/consensus/Cargo.toml index ef0ef60..c871641 100644 --- a/crates/consensus/Cargo.toml +++ b/crates/consensus/Cargo.toml @@ -7,12 +7,15 @@ repository.workspace = true rust-version.workspace = true [dependencies] +commonware-broadcast = { workspace = true } commonware-consensus = { workspace = true } commonware-cryptography = { workspace = true } commonware-codec = { workspace = true } commonware-p2p = { workspace = true } -commonware-parallel = "2026.2.0" +commonware-parallel = { workspace = true } +commonware-resolver = { workspace = true } commonware-runtime = { workspace = true } +commonware-storage = { workspace = true } commonware-utils = { workspace = true } evolve_core = { workspace = true } @@ -35,7 +38,7 @@ commonware-consensus = { workspace = true, features = ["fuzz"] } commonware-cryptography = { workspace = true, features = ["mocks"] } commonware-p2p = { workspace = true } commonware-runtime = { workspace = true, features = ["test-utils"] } -commonware-macros = "2026.2.0" +commonware-macros = { workspace = true } [lints] workspace = true diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index 04261a9..eae552e 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -2,6 +2,7 @@ pub mod automaton; pub mod block; pub mod config; pub mod engine; +pub mod marshal; pub mod relay; pub mod reporter; pub mod runner; @@ -10,6 +11,7 @@ pub use automaton::EvolveAutomaton; pub use block::ConsensusBlock; pub use config::ConsensusConfig; pub use engine::SimplexSetup; +pub use marshal::{MarshalConfig, MarshalMailbox}; pub use relay::EvolveRelay; pub use reporter::EvolveReporter; pub use runner::ConsensusRunner; diff --git a/crates/consensus/src/marshal.rs b/crates/consensus/src/marshal.rs new file mode 100644 index 0000000..6f84889 --- /dev/null +++ b/crates/consensus/src/marshal.rs @@ -0,0 +1,174 @@ +//! Marshal wiring for ordered finalized block delivery. +//! +//! The marshal sits between the simplex consensus engine and the application +//! reporter. It receives out-of-order finalization events from consensus, +//! reconstructs total order, and delivers blocks sequentially to the +//! application. +//! +//! # Architecture +//! +//! ```text +//! Simplex Engine +//! │ (Activity) +//! ▼ +//! Marshal Mailbox (implements Reporter) +//! │ +//! ▼ +//! Marshal Actor (archives + reordering) +//! │ (Update) +//! ▼ +//! Application Reporter +//! ``` +//! +//! # Usage +//! +//! 1. Create archive stores for certificates and blocks +//! 2. Call [`MarshalActor::init`] with the stores and a [`MarshalConfig`] +//! 3. Pass the returned [`MarshalMailbox`] as the `reporter` to +//! [`SimplexSetup::build`](crate::engine::SimplexSetup::build) +//! 4. Call [`MarshalActor::start`] with your application reporter, broadcast +//! buffer, and resolver +//! +//! See [`init_marshal_config`] for creating a config with sensible defaults. + +use commonware_consensus::types::{Epoch, ViewDelta}; +use commonware_cryptography::certificate::{ConstantProvider, Scheme}; +use commonware_parallel::Sequential; +use commonware_runtime::buffer::paged::CacheRef; +use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; + +/// Re-export marshal types needed for wiring. +pub use commonware_consensus::marshal::{ + resolver, Actor as MarshalActor, Config as MarshalActorConfig, Mailbox as MarshalMailbox, + Update, +}; +pub use commonware_consensus::types::FixedEpocher; +pub use commonware_storage::archive::immutable; + +/// Configuration for initializing the marshal subsystem. +/// +/// Provides sensible defaults for storage partitioning, buffer sizes, +/// and epoch boundaries. +pub struct MarshalConfig { + /// Prefix for storage partition names (e.g., "evolve-validator-0"). + pub partition_prefix: String, + /// Number of blocks per epoch for the fixed epocher. + pub epoch_length: u64, + /// Size of the marshal's internal mailbox. + pub mailbox_size: usize, + /// How many views to retain before pruning. + pub view_retention: u64, + /// Maximum concurrent repair requests. + pub max_repair: usize, +} + +impl Default for MarshalConfig { + fn default() -> Self { + Self { + partition_prefix: "evolve".to_string(), + epoch_length: 100, + mailbox_size: 1024, + view_retention: 10, + max_repair: 10, + } + } +} + +/// Create a marshal [`MarshalActorConfig`] from a [`MarshalConfig`] and signing scheme. +/// +/// Uses [`ConstantProvider`] (same scheme for all epochs) and [`FixedEpocher`] +/// (fixed-length epochs). The block codec config defaults to `()`. +/// +/// # Type Parameters +/// +/// * `S` - Certificate scheme (e.g., `ed25519::Scheme`, `bls12381::Scheme`) +/// * `B` - Block type (e.g., [`ConsensusBlock`](crate::ConsensusBlock)) +pub fn init_marshal_config( + config: &MarshalConfig, + scheme: S, + block_codec_config: ::Cfg, +) -> MarshalActorConfig, FixedEpocher, Sequential> +where + S: Scheme, + B: commonware_consensus::Block, +{ + MarshalActorConfig { + provider: ConstantProvider::::new(scheme), + epocher: FixedEpocher::new(NonZeroU64::new(config.epoch_length).unwrap()), + partition_prefix: config.partition_prefix.clone(), + mailbox_size: config.mailbox_size, + view_retention_timeout: ViewDelta::new(config.view_retention), + prunable_items_per_section: NonZeroU64::new(10).unwrap(), + page_cache: CacheRef::new( + NonZeroU16::new(4096).unwrap(), + NonZeroUsize::new(1024).unwrap(), + ), + replay_buffer: NonZeroUsize::new(1024).unwrap(), + key_write_buffer: NonZeroUsize::new(1024).unwrap(), + value_write_buffer: NonZeroUsize::new(1024).unwrap(), + block_codec_config, + max_repair: NonZeroUsize::new(config.max_repair).unwrap(), + strategy: Sequential, + } +} + +/// Create an [`immutable::Config`] for a marshal archive store. +/// +/// Builds the 17-field config required by [`immutable::Archive::init`] +/// with standard settings for the given partition prefix and codec config. +pub fn archive_config( + partition_prefix: &str, + store_name: &str, + codec_config: C, +) -> immutable::Config { + let prefix = format!("{}-{}", partition_prefix, store_name); + let page_cache = CacheRef::new( + NonZeroU16::new(4096).unwrap(), + NonZeroUsize::new(1024).unwrap(), + ); + + immutable::Config { + metadata_partition: format!("{prefix}-metadata"), + freezer_table_partition: format!("{prefix}-freezer-table"), + freezer_table_initial_size: 64, + freezer_table_resize_frequency: 10, + freezer_table_resize_chunk_size: 10, + freezer_key_partition: format!("{prefix}-freezer-key"), + freezer_key_page_cache: page_cache, + freezer_value_partition: format!("{prefix}-freezer-value"), + freezer_value_target_size: 4096, + freezer_value_compression: None, + ordinal_partition: format!("{prefix}-ordinal"), + items_per_section: NonZeroU64::new(10).unwrap(), + codec_config, + replay_buffer: NonZeroUsize::new(1024).unwrap(), + freezer_key_write_buffer: NonZeroUsize::new(1024).unwrap(), + freezer_value_write_buffer: NonZeroUsize::new(1024).unwrap(), + ordinal_write_buffer: NonZeroUsize::new(1024).unwrap(), + } +} + +/// Create a resolver [`Config`](resolver::p2p::Config) for the marshal's P2P block fetcher. +pub fn resolver_config( + public_key: P, + provider: C, + blocker: B, + mailbox_size: usize, +) -> resolver::p2p::Config +where + P: commonware_cryptography::PublicKey, + C: commonware_p2p::Provider, + B: commonware_p2p::Blocker, +{ + resolver::p2p::Config { + public_key, + provider, + blocker, + mailbox_size, + initial: std::time::Duration::from_secs(1), + timeout: std::time::Duration::from_secs(2), + fetch_retry_timeout: std::time::Duration::from_millis(100), + priority_requests: false, + priority_responses: false, + } +} diff --git a/crates/consensus/src/runner.rs b/crates/consensus/src/runner.rs index 9c37557..6c5446b 100644 --- a/crates/consensus/src/runner.rs +++ b/crates/consensus/src/runner.rs @@ -2,6 +2,19 @@ //! //! [`ConsensusRunner`] manages the full lifecycle of consensus + P2P subsystems: //! network initialization, engine startup, and graceful shutdown. +//! +//! # With Marshal (Production) +//! +//! For ordered finalized block delivery, initialize the marshal via +//! [`crate::marshal`] and pass the [`MarshalMailbox`](crate::MarshalMailbox) +//! as the `reporter` parameter to [`start`](ConsensusRunner::start). The +//! marshal actor then delivers blocks in sequential height order to the +//! application reporter. See [`crate::marshal`] for the full wiring pattern. +//! +//! # Without Marshal (Testing) +//! +//! For testing, pass any `Reporter` implementation directly — the simplex +//! engine will report raw activity events without ordering guarantees. use crate::config::ConsensusConfig; use crate::engine::{SimplexScheme, SimplexSetup}; @@ -25,6 +38,28 @@ use rand_core::CryptoRngCore; /// 2. Call [`start`](Self::start) with the runtime context, automaton, scheme, /// blocker, and network channels. /// 3. The returned [`Handle`] completes when the runtime signals shutdown. +/// +/// # Marshal Integration +/// +/// For production use with ordered block delivery: +/// +/// ```rust,ignore +/// use evolve_consensus::marshal::*; +/// +/// // 1. Create archive stores +/// let certs = immutable::Archive::init(ctx, archive_config(&prefix, "certs", codec)).await?; +/// let blocks = immutable::Archive::init(ctx, archive_config(&prefix, "blocks", ())).await?; +/// +/// // 2. Initialize marshal +/// let marshal_cfg = init_marshal_config::(&MarshalConfig::default(), scheme, ()); +/// let (actor, mailbox, height) = MarshalActor::init(ctx, certs, blocks, marshal_cfg).await; +/// +/// // 3. Pass marshal mailbox as reporter to ConsensusRunner::start() +/// let engine_handle = runner.start(ctx, automaton, relay, mailbox, scheme, blocker, ...); +/// +/// // 4. Start marshal actor (delivers ordered blocks to app reporter) +/// let marshal_handle = actor.start(app_reporter, broadcast_buffer, resolver); +/// ``` pub struct ConsensusRunner { consensus_config: ConsensusConfig, _network_config: NetworkConfig, @@ -41,8 +76,10 @@ impl ConsensusRunner { /// Start the consensus engine with the given runtime context and components. /// - /// Builds a simplex engine from the provided components, then starts it - /// with the given P2P channel pairs. + /// The `reporter` parameter accepts any [`Reporter`] implementation. + /// For production, pass a [`MarshalMailbox`](crate::MarshalMailbox) to + /// enable ordered block delivery through the marshal actor. + /// For testing, pass a mock reporter directly. /// /// # Returns /// From 8fdbc58f873cb43dca2d66b91f01f8fbb57f6806 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sun, 22 Feb 2026 14:40:56 +0100 Subject: [PATCH 6/9] fix: address PR review findings across consensus and p2p crates - Include gas_used, transactions_root, and state_root in block digest to prevent collision between blocks with identical header metadata - Move height fetch_add inside spawned task to avoid permanent gaps on task failure or cancellation - Wire ConsensusConfig namespace and activity_timeout into simplex config instead of hardcoding values - Use NonZeroU64/NonZeroUsize in MarshalConfig to make invalid states unrepresentable at the type level - Add ChainState to EvolveReporter for finalization-driven last_hash updates, expose last_hash/height_atomic accessors on EvolveAutomaton - Add retain_epochs pruning method to EpochPeerProvider - Fix doc comment claiming timestamp monotonicity validation Co-Authored-By: Claude Opus 4.6 --- crates/consensus/src/automaton.rs | 26 +++++++- crates/consensus/src/block.rs | 6 ++ crates/consensus/src/engine.rs | 12 +++- crates/consensus/src/lib.rs | 2 +- crates/consensus/src/marshal.rs | 19 +++--- crates/consensus/src/reporter.rs | 99 +++++++++++++++++++++++++------ crates/p2p/src/provider.rs | 22 +++++++ 7 files changed, 152 insertions(+), 34 deletions(-) diff --git a/crates/consensus/src/automaton.rs b/crates/consensus/src/automaton.rs index a247189..0ccfd66 100644 --- a/crates/consensus/src/automaton.rs +++ b/crates/consensus/src/automaton.rs @@ -27,7 +27,7 @@ use tokio::sync::RwLock as TokioRwLock; /// - On `propose()`: builds a block from mempool txs, stores it locally, /// returns only the digest to consensus. /// - On `verify()`: looks up the block by digest (populated via Relay), -/// validates parent chain, timestamp monotonicity, etc. +/// validates parent chain and height. pub struct EvolveAutomaton { stf: Stf, storage: S, @@ -97,6 +97,22 @@ impl EvolveAutomaton pub fn pending_blocks(&self) -> &Arc>>> { &self.pending_blocks } + + /// Get a reference to the shared last block hash. + /// + /// The finalization path (reporter) must update this after a block is + /// certified so that subsequent `propose()` calls use the correct parent. + pub fn last_hash(&self) -> &Arc> { + &self.last_hash + } + + /// Get a reference to the shared height counter. + /// + /// The finalization path (reporter) may use this to reconcile height + /// after block finalization. + pub fn height_atomic(&self) -> &Arc { + &self.height + } } impl Automaton for EvolveAutomaton @@ -134,7 +150,7 @@ where async fn propose(&mut self, _context: Self::Context) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); - let height = self.height.fetch_add(1, Ordering::SeqCst); + let height = self.height.clone(); let last_hash = *self.last_hash.read().await; let gas_limit = self.config.gas_limit; let mempool = self.mempool.clone(); @@ -159,8 +175,12 @@ where .unwrap_or_default() .as_secs(); + // Increment height only after successful block construction to avoid + // gaps when the spawned task fails or is cancelled. + let block_height = height.fetch_add(1, Ordering::SeqCst); + let block = BlockBuilder::::new() - .number(height) + .number(block_height) .timestamp(timestamp) .parent_hash(last_hash) .gas_limit(gas_limit) diff --git a/crates/consensus/src/block.rs b/crates/consensus/src/block.rs index a47ab29..ea9788e 100644 --- a/crates/consensus/src/block.rs +++ b/crates/consensus/src/block.rs @@ -42,13 +42,19 @@ impl ConsensusBlock { } /// Compute a deterministic SHA-256 digest from block header fields. +/// +/// All consensus-relevant header fields are included to ensure distinct blocks +/// always produce distinct digests. fn compute_block_digest(block: &Block) -> commonware_cryptography::sha256::Digest { let mut hasher = Sha256::new(); hasher.update(&block.header.number.to_le_bytes()); hasher.update(&block.header.timestamp.to_le_bytes()); hasher.update(block.header.parent_hash.as_slice()); hasher.update(&block.header.gas_limit.to_le_bytes()); + hasher.update(&block.header.gas_used.to_le_bytes()); hasher.update(block.header.beneficiary.as_slice()); + hasher.update(block.header.transactions_root.as_slice()); + hasher.update(block.header.state_root.as_slice()); hasher.finalize() } diff --git a/crates/consensus/src/engine.rs b/crates/consensus/src/engine.rs index 88996f1..92e3baa 100644 --- a/crates/consensus/src/engine.rs +++ b/crates/consensus/src/engine.rs @@ -84,6 +84,14 @@ impl SimplexSetup { R: Relay, F: Reporter>, { + // Convert activity_timeout from Duration to ViewDelta (seconds). + let activity_timeout_secs = self.config.activity_timeout.as_secs(); + let activity_timeout = + commonware_consensus::types::ViewDelta::new(activity_timeout_secs.max(1)); + + let partition = String::from_utf8(self.config.namespace.clone()) + .unwrap_or_else(|_| "evolve".to_string()); + let cfg = simplex::Config { scheme: self.scheme, elector: self.elector, @@ -92,7 +100,7 @@ impl SimplexSetup { relay, reporter, strategy: Sequential, - partition: "evolve".to_string(), + partition, mailbox_size: 1024, epoch: commonware_consensus::types::Epoch::new(0), replay_buffer: NonZeroUsize::new(65536).unwrap(), @@ -104,7 +112,7 @@ impl SimplexSetup { leader_timeout: self.config.leader_timeout, notarization_timeout: self.config.notarization_timeout, nullify_retry: Duration::from_secs(1), - activity_timeout: commonware_consensus::types::ViewDelta::new(10), + activity_timeout, skip_timeout: commonware_consensus::types::ViewDelta::new(5), fetch_timeout: Duration::from_secs(5), fetch_concurrent: 3, diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index eae552e..c8bb686 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -13,5 +13,5 @@ pub use config::ConsensusConfig; pub use engine::SimplexSetup; pub use marshal::{MarshalConfig, MarshalMailbox}; pub use relay::EvolveRelay; -pub use reporter::EvolveReporter; +pub use reporter::{ChainState, EvolveReporter}; pub use runner::ConsensusRunner; diff --git a/crates/consensus/src/marshal.rs b/crates/consensus/src/marshal.rs index 6f84889..47eaf48 100644 --- a/crates/consensus/src/marshal.rs +++ b/crates/consensus/src/marshal.rs @@ -48,28 +48,29 @@ pub use commonware_storage::archive::immutable; /// Configuration for initializing the marshal subsystem. /// /// Provides sensible defaults for storage partitioning, buffer sizes, -/// and epoch boundaries. +/// and epoch boundaries. Uses `NonZero` types to make invalid states +/// unrepresentable. pub struct MarshalConfig { /// Prefix for storage partition names (e.g., "evolve-validator-0"). pub partition_prefix: String, - /// Number of blocks per epoch for the fixed epocher. - pub epoch_length: u64, + /// Number of blocks per epoch for the fixed epocher. Must be non-zero. + pub epoch_length: NonZeroU64, /// Size of the marshal's internal mailbox. pub mailbox_size: usize, /// How many views to retain before pruning. pub view_retention: u64, - /// Maximum concurrent repair requests. - pub max_repair: usize, + /// Maximum concurrent repair requests. Must be non-zero. + pub max_repair: NonZeroUsize, } impl Default for MarshalConfig { fn default() -> Self { Self { partition_prefix: "evolve".to_string(), - epoch_length: 100, + epoch_length: NonZeroU64::new(100).unwrap(), mailbox_size: 1024, view_retention: 10, - max_repair: 10, + max_repair: NonZeroUsize::new(10).unwrap(), } } } @@ -94,7 +95,7 @@ where { MarshalActorConfig { provider: ConstantProvider::::new(scheme), - epocher: FixedEpocher::new(NonZeroU64::new(config.epoch_length).unwrap()), + epocher: FixedEpocher::new(config.epoch_length), partition_prefix: config.partition_prefix.clone(), mailbox_size: config.mailbox_size, view_retention_timeout: ViewDelta::new(config.view_retention), @@ -107,7 +108,7 @@ where key_write_buffer: NonZeroUsize::new(1024).unwrap(), value_write_buffer: NonZeroUsize::new(1024).unwrap(), block_codec_config, - max_repair: NonZeroUsize::new(config.max_repair).unwrap(), + max_repair: config.max_repair, strategy: Sequential, } } diff --git a/crates/consensus/src/reporter.rs b/crates/consensus/src/reporter.rs index c10c0de..22db492 100644 --- a/crates/consensus/src/reporter.rs +++ b/crates/consensus/src/reporter.rs @@ -1,41 +1,102 @@ +use alloy_primitives::B256; use commonware_consensus::Reporter; -use std::marker::PhantomData; +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, RwLock}; +use tokio::sync::RwLock as TokioRwLock; -/// A reporter that logs consensus activity events. +use crate::block::ConsensusBlock; + +/// Shared chain state that the reporter updates on finalization. +/// +/// The automaton exposes these via `last_hash()` and `height_atomic()`. +/// Pass them to the reporter so finalization events update chain linkage. +pub struct ChainState { + /// The hash of the most recently finalized block. + pub last_hash: Arc>, + /// The current chain height. + pub height: Arc, + /// Pending blocks cache (shared with automaton). + pub pending_blocks: Arc>>>, +} + +impl Clone for ChainState { + fn clone(&self) -> Self { + Self { + last_hash: self.last_hash.clone(), + height: self.height.clone(), + pending_blocks: self.pending_blocks.clone(), + } + } +} + +/// A reporter that logs consensus activity and updates chain state on finalization. /// /// Generic over the activity type so it works with any consensus scheme. -/// In production, this would handle finalization callbacks: -/// executing blocks through the STF, committing state changes, -/// and updating chain state. For now, it logs activity. -#[derive(Clone)] -pub struct EvolveReporter { - _phantom: PhantomData A>, +/// Holds an optional [`ChainState`] reference — when present, finalization +/// events update `last_hash` so subsequent proposals use the correct parent. +/// +/// In production, the marshal sits between simplex and this reporter, +/// delivering ordered `Update` events. For direct (non-marshal) usage, +/// this reporter receives raw `Activity` events. +pub struct EvolveReporter> { + chain_state: Option>, + _phantom: std::marker::PhantomData A>, } -impl EvolveReporter { +impl Clone for EvolveReporter { + fn clone(&self) -> Self { + Self { + chain_state: self.chain_state.clone(), + _phantom: std::marker::PhantomData, + } + } +} + +impl EvolveReporter { + /// Create a reporter without chain state (logging only). pub fn new() -> Self { Self { - _phantom: PhantomData, + chain_state: None, + _phantom: std::marker::PhantomData, + } + } + + /// Create a reporter with chain state for finalization updates. + pub fn with_chain_state(chain_state: ChainState) -> Self { + Self { + chain_state: Some(chain_state), + _phantom: std::marker::PhantomData, } } } -impl Default for EvolveReporter { +impl Default for EvolveReporter { fn default() -> Self { Self::new() } } -impl Reporter for EvolveReporter { +impl Reporter for EvolveReporter { type Activity = A; async fn report(&mut self, _activity: Self::Activity) { - // In production, this would: - // 1. Extract finalization events (Finalization variant) - // 2. Look up the full block from pending_blocks by digest - // 3. Execute through STF - // 4. Commit state changes to storage - // 5. Update chain height and last_hash - tracing::debug!("reporter: received consensus activity"); + // TODO: Extract finalization events from activity, look up the finalized + // block by digest in pending_blocks, and update chain state: + // 1. Set last_hash to the finalized block's hash + // 2. Remove the block from pending_blocks + // 3. Execute through STF and commit state changes + // + // This requires matching on Activity::Finalization which depends on + // concrete scheme types. Will be implemented when the full finalization + // pipeline is wired. + if let Some(ref state) = self.chain_state { + tracing::debug!( + height = state.height.load(Ordering::SeqCst), + "reporter: received consensus activity (chain state wired)" + ); + } else { + tracing::debug!("reporter: received consensus activity"); + } } } diff --git a/crates/p2p/src/provider.rs b/crates/p2p/src/provider.rs index b17f391..a0b53db 100644 --- a/crates/p2p/src/provider.rs +++ b/crates/p2p/src/provider.rs @@ -77,6 +77,12 @@ impl EpochPeerProvider { /// If a peer set for `epoch` already exists it is replaced. Subscribers /// receive the new set and the updated union of all tracked peers. Dead /// subscriber channels are pruned automatically. + /// + /// # Note + /// + /// Old epochs are not pruned — `peer_sets` grows with each call. For + /// production use, call [`retain_epochs`](Self::retain_epochs) periodically + /// to bound memory usage. pub async fn update_epoch(&self, epoch: u64, peers: Set) { let mut state = self.inner.write().await; state.peer_sets.insert(epoch, peers.clone()); @@ -90,6 +96,22 @@ impl EpochPeerProvider { state.notify(epoch, peers); } + + /// Remove all epoch entries older than `min_epoch`. + /// + /// Recomputes `all_peers` from the remaining sets and notifies subscribers + /// with the current epoch's peer set (if it exists). + pub async fn retain_epochs(&self, min_epoch: u64) { + let mut state = self.inner.write().await; + state.peer_sets.retain(|&e, _| e >= min_epoch); + + let all_peers: Vec = state + .peer_sets + .values() + .flat_map(|s| s.iter().cloned()) + .collect(); + state.all_peers = Set::from_iter_dedup(all_peers); + } } impl Default for EpochPeerProvider { From d77c612d74d356a2cc9f4b5f1a058ac90758949e Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sun, 22 Feb 2026 16:45:18 +0100 Subject: [PATCH 7/9] Fix consensus finalization state and block digest stability --- crates/consensus/src/automaton.rs | 17 +++++ crates/consensus/src/block.rs | 116 +++++++++++++++++++++++++++--- crates/consensus/src/reporter.rs | 71 +++++++++++++----- 3 files changed, 176 insertions(+), 28 deletions(-) diff --git a/crates/consensus/src/automaton.rs b/crates/consensus/src/automaton.rs index 0ccfd66..fe99458 100644 --- a/crates/consensus/src/automaton.rs +++ b/crates/consensus/src/automaton.rs @@ -3,6 +3,7 @@ use crate::config::ConsensusConfig; use alloy_primitives::B256; use commonware_consensus::types::Epoch; use commonware_consensus::{Automaton, CertifiableAutomaton}; +use commonware_cryptography::{Hasher, Sha256}; use commonware_utils::channel::oneshot; use evolve_core::ReadonlyKV; use evolve_mempool::{Mempool, MempoolTx, SharedMempool}; @@ -135,6 +136,10 @@ where .gas_limit(self.config.gas_limit) .build(); + let mut genesis_block = genesis_block; + genesis_block.header.transactions_root = + compute_transactions_root(&genesis_block.transactions); + let cb = ConsensusBlock::new(genesis_block); let digest = cb.digest; @@ -187,6 +192,9 @@ where .transactions(transactions) .build(); + let mut block = block; + block.header.transactions_root = compute_transactions_root(&block.transactions); + let cb = ConsensusBlock::new(block); let digest = cb.digest; @@ -252,6 +260,15 @@ where } } +fn compute_transactions_root(transactions: &[Tx]) -> B256 { + let mut hasher = Sha256::new(); + hasher.update(&(transactions.len() as u64).to_le_bytes()); + for tx in transactions { + hasher.update(&tx.compute_identifier()); + } + B256::from_slice(&hasher.finalize().0) +} + impl CertifiableAutomaton for EvolveAutomaton where Tx: Transaction + MempoolTx + Clone + Send + Sync + 'static, diff --git a/crates/consensus/src/block.rs b/crates/consensus/src/block.rs index ea9788e..9a4fae5 100644 --- a/crates/consensus/src/block.rs +++ b/crates/consensus/src/block.rs @@ -6,6 +6,7 @@ use commonware_consensus::types::Height; use commonware_consensus::Heightable; use commonware_cryptography::{Committable, Digestible, Hasher, Sha256}; use evolve_server::{Block, BlockHeader}; +use evolve_stf_traits::Transaction; /// A consensus-aware wrapper around Evolve's Block. /// @@ -21,7 +22,14 @@ pub struct ConsensusBlock { pub parent_digest: commonware_cryptography::sha256::Digest, } -impl ConsensusBlock { +impl ConsensusBlock { + /// Return the inner block hash as a B256. + pub fn block_hash(&self) -> B256 { + B256::from_slice(&self.digest.0) + } +} + +impl ConsensusBlock { /// Create a new ConsensusBlock from an evolve Block. /// /// Computes and caches the block digest and parent digest. @@ -34,18 +42,15 @@ impl ConsensusBlock { parent_digest, } } - - /// Return the inner block hash as a B256. - pub fn block_hash(&self) -> B256 { - B256::from_slice(&self.digest.0) - } } /// Compute a deterministic SHA-256 digest from block header fields. /// /// All consensus-relevant header fields are included to ensure distinct blocks /// always produce distinct digests. -fn compute_block_digest(block: &Block) -> commonware_cryptography::sha256::Digest { +fn compute_block_digest( + block: &Block, +) -> commonware_cryptography::sha256::Digest { let mut hasher = Sha256::new(); hasher.update(&block.header.number.to_le_bytes()); hasher.update(&block.header.timestamp.to_le_bytes()); @@ -55,6 +60,10 @@ fn compute_block_digest(block: &Block) -> commonware_cryptography::sha25 hasher.update(block.header.beneficiary.as_slice()); hasher.update(block.header.transactions_root.as_slice()); hasher.update(block.header.state_root.as_slice()); + hasher.update(&(block.transactions.len() as u64).to_le_bytes()); + for tx in &block.transactions { + hasher.update(&tx.compute_identifier()); + } hasher.finalize() } @@ -96,6 +105,8 @@ struct WireBlock { gas_limit: u64, gas_used: u64, beneficiary: [u8; 20], + transactions_root: [u8; 32], + state_root: [u8; 32], transactions_encoded: Vec>, } @@ -109,7 +120,9 @@ impl Write for ConsensusBloc } } -impl Read for ConsensusBlock { +impl Read + for ConsensusBlock +{ type Cfg = (); fn read_cfg(buf: &mut impl Buf, _cfg: &Self::Cfg) -> Result { @@ -138,6 +151,8 @@ impl Read for ConsensusBlo gas_limit: wire.gas_limit, gas_used: wire.gas_used, beneficiary: alloy_primitives::Address::from_slice(&wire.beneficiary), + transactions_root: B256::from_slice(&wire.transactions_root), + state_root: B256::from_slice(&wire.state_root), ..Default::default() }; @@ -163,6 +178,8 @@ fn to_wire(inner: &Block) -> WireBlock { gas_limit: inner.header.gas_limit, gas_used: inner.header.gas_used, beneficiary: inner.header.beneficiary.0 .0, + transactions_root: inner.header.transactions_root.0, + state_root: inner.header.state_root.0, transactions_encoded: inner .transactions .iter() @@ -171,7 +188,7 @@ fn to_wire(inner: &Block) -> WireBlock { } } -impl +impl commonware_consensus::Block for ConsensusBlock { fn parent(&self) -> Self::Commitment { @@ -183,12 +200,43 @@ impl mod tests { use super::*; use commonware_codec::{DecodeExt, Encode}; + use evolve_core::{AccountId, FungibleAsset, InvokeRequest}; + use evolve_stf_traits::Transaction; #[derive(Debug, Clone, BorshSerialize, BorshDeserialize, PartialEq)] struct TestTx { data: Vec, } + impl Transaction for TestTx { + fn sender(&self) -> AccountId { + AccountId::new(1) + } + + fn recipient(&self) -> AccountId { + AccountId::new(2) + } + + fn request(&self) -> &InvokeRequest { + panic!("request() is not used in consensus block tests") + } + + fn gas_limit(&self) -> u64 { + 21_000 + } + + fn funds(&self) -> &[FungibleAsset] { + &[] + } + + fn compute_identifier(&self) -> [u8; 32] { + let mut id = [0u8; 32]; + let len = self.data.len().min(32); + id[..len].copy_from_slice(&self.data[..len]); + id + } + } + #[test] fn test_encode_decode_roundtrip() { let header = BlockHeader::new(42, 1000, B256::repeat_byte(0xAA)); @@ -271,4 +319,54 @@ mod tests { assert_ne!(cb1.digest, cb2.digest); } + + #[test] + fn test_different_transactions_change_digest() { + let header = BlockHeader::new(7, 1_000, B256::repeat_byte(0xAB)); + let block1 = Block::new( + header.clone(), + vec![TestTx { + data: vec![1, 2, 3], + }], + ); + let block2 = Block::new( + header, + vec![TestTx { + data: vec![9, 8, 7], + }], + ); + + let cb1 = ConsensusBlock::new(block1); + let cb2 = ConsensusBlock::new(block2); + + assert_ne!(cb1.digest, cb2.digest); + } + + #[test] + fn test_roundtrip_preserves_roots() { + let mut header = BlockHeader::new(12, 5_000, B256::repeat_byte(0xCD)); + header.transactions_root = B256::repeat_byte(0x11); + header.state_root = B256::repeat_byte(0x22); + + let block = Block::new( + header, + vec![TestTx { + data: vec![4, 5, 6], + }], + ); + let consensus_block = ConsensusBlock::new(block); + + let encoded = consensus_block.encode(); + let decoded = ConsensusBlock::::decode(encoded).unwrap(); + + assert_eq!( + decoded.inner.header.transactions_root, + consensus_block.inner.header.transactions_root + ); + assert_eq!( + decoded.inner.header.state_root, + consensus_block.inner.header.state_root + ); + assert_eq!(decoded.digest, consensus_block.digest); + } } diff --git a/crates/consensus/src/reporter.rs b/crates/consensus/src/reporter.rs index 22db492..4677d01 100644 --- a/crates/consensus/src/reporter.rs +++ b/crates/consensus/src/reporter.rs @@ -1,5 +1,9 @@ use alloy_primitives::B256; +use commonware_consensus::simplex::types::Activity; use commonware_consensus::Reporter; +use commonware_cryptography::certificate::Scheme; +use commonware_cryptography::sha256::Digest as Sha256Digest; +use evolve_stf_traits::Transaction; use std::collections::BTreeMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, RwLock}; @@ -77,26 +81,55 @@ impl Default for EvolveReporter { } } -impl Reporter for EvolveReporter { - type Activity = A; +impl Reporter for EvolveReporter, Tx> +where + S: Scheme + Clone + Send + 'static, + Tx: Clone + Transaction + Send + Sync + 'static, +{ + type Activity = Activity; - async fn report(&mut self, _activity: Self::Activity) { - // TODO: Extract finalization events from activity, look up the finalized - // block by digest in pending_blocks, and update chain state: - // 1. Set last_hash to the finalized block's hash - // 2. Remove the block from pending_blocks - // 3. Execute through STF and commit state changes - // - // This requires matching on Activity::Finalization which depends on - // concrete scheme types. Will be implemented when the full finalization - // pipeline is wired. - if let Some(ref state) = self.chain_state { - tracing::debug!( - height = state.height.load(Ordering::SeqCst), - "reporter: received consensus activity (chain state wired)" - ); - } else { + async fn report(&mut self, activity: Self::Activity) { + let Some(state) = self.chain_state.as_ref() else { tracing::debug!("reporter: received consensus activity"); - } + return; + }; + + let finalized_digest = match activity { + Activity::Finalization(finalization) => finalization.proposal.payload.0, + _ => { + tracing::debug!( + height = state.height.load(Ordering::SeqCst), + "reporter: received non-finalization activity" + ); + return; + } + }; + + let finalized_block = { + let mut pending = state.pending_blocks.write().unwrap(); + pending.remove(&finalized_digest) + }; + + let Some(block) = finalized_block else { + tracing::warn!( + digest = ?finalized_digest, + "reporter: finalization digest not found in pending blocks" + ); + return; + }; + + let finalized_hash = block.block_hash(); + *state.last_hash.write().await = finalized_hash; + state.height.store( + block.inner.header.number.saturating_add(1), + Ordering::SeqCst, + ); + + tracing::debug!( + digest = ?finalized_digest, + block_number = block.inner.header.number, + block_hash = ?finalized_hash, + "reporter: advanced chain state from finalized activity" + ); } } From c603230a3d8014f8246d5ead55b00f57fc0b940d Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sun, 22 Feb 2026 17:08:15 +0100 Subject: [PATCH 8/9] Harden consensus and p2p test coverage --- crates/consensus/src/block.rs | 52 ++++++++++ crates/consensus/src/relay.rs | 71 ++++++++++++++ crates/consensus/src/reporter.rs | 157 +++++++++++++++++++++++++++++++ crates/p2p/src/config.rs | 25 +---- crates/p2p/src/provider.rs | 64 +++++++++++++ crates/p2p/src/validator.rs | 32 +++++++ 6 files changed, 380 insertions(+), 21 deletions(-) diff --git a/crates/consensus/src/block.rs b/crates/consensus/src/block.rs index 9a4fae5..8c0bd7f 100644 --- a/crates/consensus/src/block.rs +++ b/crates/consensus/src/block.rs @@ -369,4 +369,56 @@ mod tests { ); assert_eq!(decoded.digest, consensus_block.digest); } + + #[test] + fn test_decode_rejects_truncated_payload() { + let header = BlockHeader::new(9, 123, B256::repeat_byte(0xEF)); + let block = Block::new( + header, + vec![TestTx { + data: vec![1, 2, 3], + }], + ); + let consensus_block = ConsensusBlock::new(block); + let mut encoded = consensus_block.encode().to_vec(); + encoded.pop(); + + let result = ConsensusBlock::::decode(encoded.as_slice()); + assert!(matches!(result, Err(CodecError::EndOfBuffer))); + } + + #[test] + fn test_decode_rejects_invalid_wire_payload() { + let payload = vec![0x01, 0x02, 0x03, 0x04]; + let mut encoded = (payload.len() as u32).to_le_bytes().to_vec(); + encoded.extend_from_slice(&payload); + + let result = ConsensusBlock::::decode(encoded.as_slice()); + assert!(result.is_err(), "invalid wire payload must fail decode"); + } + + #[test] + fn test_decode_rejects_invalid_transaction_encoding() { + let wire = WireBlock { + number: 1, + timestamp: 2, + parent_hash: B256::ZERO.0, + gas_limit: 30_000_000, + gas_used: 21_000, + beneficiary: alloy_primitives::Address::ZERO.0 .0, + transactions_root: B256::ZERO.0, + state_root: B256::ZERO.0, + transactions_encoded: vec![vec![0xFF]], + }; + + let payload = borsh::to_vec(&wire).expect("wire block serialization should not fail"); + let mut encoded = (payload.len() as u32).to_le_bytes().to_vec(); + encoded.extend_from_slice(&payload); + + let result = ConsensusBlock::::decode(encoded.as_slice()); + assert!( + result.is_err(), + "invalid transaction encoding must fail decode" + ); + } } diff --git a/crates/consensus/src/relay.rs b/crates/consensus/src/relay.rs index 8512286..94a4350 100644 --- a/crates/consensus/src/relay.rs +++ b/crates/consensus/src/relay.rs @@ -54,3 +54,74 @@ where ); } } + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::B256; + use evolve_core::{AccountId, FungibleAsset, InvokeRequest, Message}; + use evolve_stf_traits::Transaction; + use std::collections::BTreeMap; + use std::sync::{Arc, RwLock}; + + #[derive(Debug, Clone)] + struct TestTx { + id: [u8; 32], + request: InvokeRequest, + funds: Vec, + } + + impl TestTx { + fn new(id: [u8; 32]) -> Self { + Self { + id, + request: InvokeRequest::new_from_message("test", 1, Message::from_bytes(vec![])), + funds: Vec::new(), + } + } + } + + impl Transaction for TestTx { + fn sender(&self) -> AccountId { + AccountId::new(1) + } + + fn recipient(&self) -> AccountId { + AccountId::new(2) + } + + fn request(&self) -> &InvokeRequest { + &self.request + } + + fn gas_limit(&self) -> u64 { + 21_000 + } + + fn funds(&self) -> &[FungibleAsset] { + &self.funds + } + + fn compute_identifier(&self) -> [u8; 32] { + self.id + } + } + + #[test] + fn insert_and_get_block_by_digest() { + let store = Arc::new(RwLock::new(BTreeMap::new())); + let relay = EvolveRelay::new(store); + let block = evolve_server::Block::new( + evolve_server::BlockHeader::new(1, 100, B256::ZERO), + vec![TestTx::new([7u8; 32])], + ); + let consensus_block = ConsensusBlock::new(block); + let digest = consensus_block.digest.0; + + relay.insert_block(consensus_block.clone()); + + let fetched = relay.get_block(&digest).expect("block should exist"); + assert_eq!(fetched.digest, consensus_block.digest); + assert_eq!(fetched.inner.header.number, 1); + } +} diff --git a/crates/consensus/src/reporter.rs b/crates/consensus/src/reporter.rs index 4677d01..60deeae 100644 --- a/crates/consensus/src/reporter.rs +++ b/crates/consensus/src/reporter.rs @@ -133,3 +133,160 @@ where ); } } + +#[cfg(test)] +mod tests { + use super::*; + use commonware_consensus::simplex::scheme::ed25519; + use commonware_consensus::simplex::types::{Finalization, Finalize, Proposal}; + use commonware_consensus::types::{Epoch, Round, View}; + use commonware_cryptography::{ed25519::PrivateKey, Signer as _}; + use commonware_parallel::Sequential; + use commonware_utils::ordered::Set; + use evolve_core::{AccountId, FungibleAsset, InvokeRequest, Message}; + use evolve_server::{Block, BlockHeader}; + use std::sync::Arc; + + #[derive(Clone)] + struct TestTx { + id: [u8; 32], + request: InvokeRequest, + funds: Vec, + } + + impl TestTx { + fn new(id: [u8; 32]) -> Self { + Self { + id, + request: InvokeRequest::new_from_message("test", 1, Message::from_bytes(vec![])), + funds: Vec::new(), + } + } + } + + impl Transaction for TestTx { + fn sender(&self) -> AccountId { + AccountId::new(1) + } + + fn recipient(&self) -> AccountId { + AccountId::new(2) + } + + fn request(&self) -> &InvokeRequest { + &self.request + } + + fn gas_limit(&self) -> u64 { + 21_000 + } + + fn funds(&self) -> &[FungibleAsset] { + &self.funds + } + + fn compute_identifier(&self) -> [u8; 32] { + self.id + } + } + + fn test_scheme() -> ed25519::Scheme { + let private_key = PrivateKey::from_seed(7); + let public_key = private_key.public_key(); + let participants = Set::from_iter_dedup([public_key]); + ed25519::Scheme::signer(b"reporter-test", participants, private_key) + .expect("signer must exist in participants") + } + + fn finalization_activity_for_digest( + scheme: &ed25519::Scheme, + digest: Sha256Digest, + ) -> Activity { + let proposal = Proposal::new( + Round::new(Epoch::zero(), View::new(1)), + View::zero(), + digest, + ); + let finalize_vote = Finalize::sign(scheme, proposal).expect("finalize vote must sign"); + let finalization = Finalization::from_finalizes(scheme, [&finalize_vote], &Sequential) + .expect("single-validator finalization must assemble"); + Activity::Finalization(finalization) + } + + #[tokio::test] + async fn finalization_updates_chain_state_and_evicts_pending_block() { + let last_hash = Arc::new(TokioRwLock::new(B256::ZERO)); + let height = Arc::new(AtomicU64::new(1)); + let pending_blocks = Arc::new(RwLock::new( + BTreeMap::<[u8; 32], ConsensusBlock>::new(), + )); + + let block = Block::new( + BlockHeader::new(1, 1_000, B256::ZERO), + vec![TestTx::new([1u8; 32])], + ); + let consensus_block = ConsensusBlock::new(block); + let digest = consensus_block.digest; + let block_hash = consensus_block.block_hash(); + pending_blocks + .write() + .unwrap() + .insert(consensus_block.digest.0, consensus_block); + + let chain_state = ChainState { + last_hash: last_hash.clone(), + height: height.clone(), + pending_blocks: pending_blocks.clone(), + }; + let mut reporter = + EvolveReporter::, TestTx>::with_chain_state( + chain_state, + ); + + let scheme = test_scheme(); + reporter + .report(finalization_activity_for_digest(&scheme, digest)) + .await; + + assert_eq!(*last_hash.read().await, block_hash); + assert_eq!(height.load(Ordering::SeqCst), 2); + assert!( + pending_blocks.read().unwrap().is_empty(), + "finalized block should be evicted from pending cache" + ); + } + + #[tokio::test] + async fn non_finalization_activity_does_not_mutate_chain_state() { + let last_hash = Arc::new(TokioRwLock::new(B256::repeat_byte(0xAA))); + let height = Arc::new(AtomicU64::new(42)); + let pending_blocks = Arc::new(RwLock::new( + BTreeMap::<[u8; 32], ConsensusBlock>::new(), + )); + + let chain_state = ChainState { + last_hash: last_hash.clone(), + height: height.clone(), + pending_blocks: pending_blocks.clone(), + }; + let mut reporter = + EvolveReporter::, TestTx>::with_chain_state( + chain_state, + ); + + let digest = Sha256Digest([9u8; 32]); + let proposal = Proposal::new( + Round::new(Epoch::zero(), View::new(2)), + View::new(1), + digest, + ); + let scheme = test_scheme(); + let finalize_vote = Finalize::sign(&scheme, proposal).expect("finalize vote must sign"); + + reporter.report(Activity::Finalize(finalize_vote)).await; + + assert_eq!(*last_hash.read().await, B256::repeat_byte(0xAA)); + assert_eq!(height.load(Ordering::SeqCst), 42); + assert!(pending_blocks.read().unwrap().is_empty()); + } +} diff --git a/crates/p2p/src/config.rs b/crates/p2p/src/config.rs index 537834d..272c4ec 100644 --- a/crates/p2p/src/config.rs +++ b/crates/p2p/src/config.rs @@ -54,35 +54,18 @@ impl Default for NetworkConfig { mod tests { use super::*; use crate::channels::channel; + use std::time::Duration; #[test] - fn default_listen_addr() { + fn default_config_invariants() { let cfg = NetworkConfig::default(); - assert_eq!(cfg.listen_addr.port(), 9000); - } - #[test] - fn default_bootstrappers_empty() { - let cfg = NetworkConfig::default(); + assert_eq!(cfg.listen_addr.port(), 9000); assert!(cfg.bootstrappers.is_empty()); - } - - #[test] - fn default_channel_count() { - let cfg = NetworkConfig::default(); assert_eq!(cfg.channel_configs.len(), channel::COUNT as usize); - } - - #[test] - fn default_namespace() { - let cfg = NetworkConfig::default(); assert_eq!(cfg.namespace, b"_EVOLVE"); - } - - #[test] - fn default_max_peers_positive() { - let cfg = NetworkConfig::default(); assert!(cfg.max_peers > 0); + assert!(cfg.connection_timeout > Duration::ZERO); } #[test] diff --git a/crates/p2p/src/provider.rs b/crates/p2p/src/provider.rs index a0b53db..f207955 100644 --- a/crates/p2p/src/provider.rs +++ b/crates/p2p/src/provider.rs @@ -158,6 +158,10 @@ mod tests { Set::from_iter_dedup(seeds.iter().map(|&s| make_key(s))) } + fn has_key(set: &Set, key: &ed25519::PublicKey) -> bool { + set.iter().any(|k| k == key) + } + #[tokio::test] async fn peer_set_returns_none_before_registration() { let mut provider = EpochPeerProvider::new(); @@ -221,4 +225,64 @@ mod tests { let result = clone.peer_set(0).await; assert!(result.is_some(), "clone must see updates from original"); } + + #[tokio::test] + async fn update_epoch_replaces_existing_epoch_set() { + let mut provider = EpochPeerProvider::new(); + provider.update_epoch(3, peer_set(&[1, 2])).await; + provider.update_epoch(3, peer_set(&[2, 9])).await; + + let epoch_set = provider.peer_set(3).await.expect("epoch must exist"); + assert_eq!(epoch_set.len(), 2); + assert!(has_key(&epoch_set, &make_key(2))); + assert!(has_key(&epoch_set, &make_key(9))); + assert!(!has_key(&epoch_set, &make_key(1))); + } + + #[tokio::test] + async fn retain_epochs_prunes_old_sets_and_updates_union() { + let mut provider = EpochPeerProvider::new(); + let mut subscriber = provider.clone(); + let mut rx = subscriber.subscribe().await; + + provider.update_epoch(0, peer_set(&[1, 2])).await; + let _ = rx.recv().await; + provider.update_epoch(1, peer_set(&[3])).await; + let _ = rx.recv().await; + provider.update_epoch(2, peer_set(&[4])).await; + let _ = rx.recv().await; + + provider.retain_epochs(1).await; + assert!(provider.peer_set(0).await.is_none()); + assert!(provider.peer_set(1).await.is_some()); + assert!(provider.peer_set(2).await.is_some()); + + provider.update_epoch(3, peer_set(&[5])).await; + let (_, _, all) = rx.recv().await.expect("notification expected"); + assert_eq!(all.len(), 3); + assert!(has_key(&all, &make_key(3))); + assert!(has_key(&all, &make_key(4))); + assert!(has_key(&all, &make_key(5))); + assert!(!has_key(&all, &make_key(1))); + assert!(!has_key(&all, &make_key(2))); + } + + #[tokio::test] + async fn dropped_subscriber_is_pruned_on_notify() { + let provider = EpochPeerProvider::new(); + let mut sub_a = provider.clone(); + let mut sub_b = provider.clone(); + let _rx_a = sub_a.subscribe().await; + let rx_b = sub_b.subscribe().await; + + assert_eq!(provider.inner.read().await.subscribers.len(), 2); + drop(rx_b); + + provider.update_epoch(9, peer_set(&[42])).await; + assert_eq!( + provider.inner.read().await.subscribers.len(), + 1, + "dead subscriber should be removed after notify" + ); + } } diff --git a/crates/p2p/src/validator.rs b/crates/p2p/src/validator.rs index bcc23b9..74d9310 100644 --- a/crates/p2p/src/validator.rs +++ b/crates/p2p/src/validator.rs @@ -121,15 +121,25 @@ mod tests { let mut vs = ValidatorSet::new(2); let id1 = make_identity(10); let p2p_key = id1.p2p_key.clone(); + let old_consensus = id1.consensus_key.clone(); let id2 = ValidatorIdentity { p2p_key: p2p_key.clone(), consensus_key: bls12381::PrivateKey::from_seed(99).public_key(), }; + let new_consensus = id2.consensus_key.clone(); vs.add_validator(id1); vs.add_validator(id2); assert_eq!(vs.len(), 1); + assert_eq!( + vs.validators() + .get(&p2p_key) + .expect("validator should exist") + .consensus_key, + new_consensus + ); + assert_ne!(old_consensus, new_consensus); } #[test] @@ -156,4 +166,26 @@ mod tests { } assert_eq!(vs.p2p_keys().len(), vs.consensus_keys().len()); } + + #[test] + fn p2p_and_consensus_keys_preserve_mapping_order() { + let mut vs = ValidatorSet::new(5); + for seed in 0..6u64 { + vs.add_validator(make_identity(seed)); + } + + let p2p_keys = vs.p2p_keys(); + let consensus_keys = vs.consensus_keys(); + for idx in 0..p2p_keys.len() { + let expected = &vs + .validators() + .get(&p2p_keys[idx]) + .expect("p2p key should exist") + .consensus_key; + assert_eq!( + &consensus_keys[idx], expected, + "consensus key order must match p2p key order" + ); + } + } } From a6efaac32767698211dc0b39b7c9df922a16d182 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Thu, 26 Feb 2026 16:10:13 +0100 Subject: [PATCH 9/9] fix: address PR review findings in consensus and p2p --- crates/consensus/src/automaton.rs | 22 ++++--- crates/consensus/src/block.rs | 30 ++++++--- crates/consensus/src/marshal.rs | 81 ++++++++++++++++-------- crates/consensus/src/relay.rs | 23 +++++-- crates/consensus/src/reporter.rs | 42 +++++++++++-- crates/p2p/src/provider.rs | 100 ++++++++++++++++++++++++------ 6 files changed, 228 insertions(+), 70 deletions(-) diff --git a/crates/consensus/src/automaton.rs b/crates/consensus/src/automaton.rs index fe99458..624d450 100644 --- a/crates/consensus/src/automaton.rs +++ b/crates/consensus/src/automaton.rs @@ -141,7 +141,7 @@ where compute_transactions_root(&genesis_block.transactions); let cb = ConsensusBlock::new(genesis_block); - let digest = cb.digest; + let digest = cb.digest_value(); // Store genesis in pending blocks. self.pending_blocks.write().unwrap().insert(digest.0, cb); @@ -156,7 +156,7 @@ where let (sender, receiver) = oneshot::channel(); let height = self.height.clone(); - let last_hash = *self.last_hash.read().await; + let last_hash = self.last_hash.clone(); let gas_limit = self.config.gas_limit; let mempool = self.mempool.clone(); let pending_blocks = self.pending_blocks.clone(); @@ -180,23 +180,25 @@ where .unwrap_or_default() .as_secs(); - // Increment height only after successful block construction to avoid - // gaps when the spawned task fails or is cancelled. - let block_height = height.fetch_add(1, Ordering::SeqCst); + let parent_hash = *last_hash.read().await; - let block = BlockBuilder::::new() - .number(block_height) + // Build first, then consume the height counter so a failed build + // cannot permanently skip a height. + let mut block = BlockBuilder::::new() + .number(0) .timestamp(timestamp) - .parent_hash(last_hash) + .parent_hash(parent_hash) .gas_limit(gas_limit) .transactions(transactions) .build(); - let mut block = block; + let block_height = height.fetch_add(1, Ordering::SeqCst); + block.header.number = block_height; + block.header.transactions_root = compute_transactions_root(&block.transactions); let cb = ConsensusBlock::new(block); - let digest = cb.digest; + let digest = cb.digest_value(); // Store in pending blocks for later retrieval. pending_blocks.write().unwrap().insert(digest.0, cb); diff --git a/crates/consensus/src/block.rs b/crates/consensus/src/block.rs index 8c0bd7f..3f9dd2a 100644 --- a/crates/consensus/src/block.rs +++ b/crates/consensus/src/block.rs @@ -17,16 +17,34 @@ pub struct ConsensusBlock { /// The inner evolve block. pub inner: Block, /// Cached digest (SHA-256 of the header fields). - pub digest: commonware_cryptography::sha256::Digest, + digest: commonware_cryptography::sha256::Digest, /// Cached parent digest. - pub parent_digest: commonware_cryptography::sha256::Digest, + parent_digest: commonware_cryptography::sha256::Digest, } impl ConsensusBlock { + /// Return the cached block digest. + pub fn digest_value(&self) -> commonware_cryptography::sha256::Digest { + self.digest + } + /// Return the inner block hash as a B256. pub fn block_hash(&self) -> B256 { B256::from_slice(&self.digest.0) } + + /// Return the cached parent digest. + pub fn parent_digest(&self) -> commonware_cryptography::sha256::Digest { + self.parent_digest + } + + fn serialize_wire_bytes(&self) -> Vec + where + Tx: BorshSerialize, + { + let wire = to_wire(&self.inner); + borsh::to_vec(&wire).expect("wire block serialization should not fail") + } } impl ConsensusBlock { @@ -112,8 +130,7 @@ struct WireBlock { impl Write for ConsensusBlock { fn write(&self, buf: &mut impl BufMut) { - let wire = to_wire(&self.inner); - let bytes = borsh::to_vec(&wire).expect("wire block serialization should not fail"); + let bytes = self.serialize_wire_bytes(); // Write length-prefixed bytes. (bytes.len() as u32).write(buf); buf.put_slice(&bytes); @@ -163,8 +180,7 @@ impl Read impl EncodeSize for ConsensusBlock { fn encode_size(&self) -> usize { - let wire = to_wire(&self.inner); - let bytes = borsh::to_vec(&wire).expect("wire block serialization should not fail"); + let bytes = self.serialize_wire_bytes(); // u32 length prefix + payload 4 + bytes.len() } @@ -192,7 +208,7 @@ impl { fn parent(&self) -> Self::Commitment { - self.parent_digest + self.parent_digest() } } diff --git a/crates/consensus/src/marshal.rs b/crates/consensus/src/marshal.rs index 47eaf48..d03890b 100644 --- a/crates/consensus/src/marshal.rs +++ b/crates/consensus/src/marshal.rs @@ -36,6 +36,12 @@ use commonware_cryptography::certificate::{ConstantProvider, Scheme}; use commonware_parallel::Sequential; use commonware_runtime::buffer::paged::CacheRef; use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; +use std::time::Duration; + +const DEFAULT_ITEMS_PER_SECTION: u64 = 10; +const DEFAULT_PAGE_SIZE: u16 = 4096; +const DEFAULT_PAGE_COUNT: usize = 1024; +const DEFAULT_BUFFER_COUNT: usize = 1024; /// Re-export marshal types needed for wiring. pub use commonware_consensus::marshal::{ @@ -61,6 +67,29 @@ pub struct MarshalConfig { pub view_retention: u64, /// Maximum concurrent repair requests. Must be non-zero. pub max_repair: NonZeroUsize, + /// Resolver timing and priority settings. + pub resolver: ResolverConfig, +} + +/// Tuning knobs for the marshal's P2P resolver. +pub struct ResolverConfig { + pub initial: Duration, + pub timeout: Duration, + pub fetch_retry_timeout: Duration, + pub priority_requests: bool, + pub priority_responses: bool, +} + +impl Default for ResolverConfig { + fn default() -> Self { + Self { + initial: Duration::from_secs(1), + timeout: Duration::from_secs(2), + fetch_retry_timeout: Duration::from_millis(100), + priority_requests: false, + priority_responses: false, + } + } } impl Default for MarshalConfig { @@ -71,10 +100,18 @@ impl Default for MarshalConfig { mailbox_size: 1024, view_retention: 10, max_repair: NonZeroUsize::new(10).unwrap(), + resolver: ResolverConfig::default(), } } } +fn make_default_cache_ref() -> CacheRef { + CacheRef::new( + NonZeroU16::new(DEFAULT_PAGE_SIZE).unwrap(), + NonZeroUsize::new(DEFAULT_PAGE_COUNT).unwrap(), + ) +} + /// Create a marshal [`MarshalActorConfig`] from a [`MarshalConfig`] and signing scheme. /// /// Uses [`ConstantProvider`] (same scheme for all epochs) and [`FixedEpocher`] @@ -99,14 +136,11 @@ where partition_prefix: config.partition_prefix.clone(), mailbox_size: config.mailbox_size, view_retention_timeout: ViewDelta::new(config.view_retention), - prunable_items_per_section: NonZeroU64::new(10).unwrap(), - page_cache: CacheRef::new( - NonZeroU16::new(4096).unwrap(), - NonZeroUsize::new(1024).unwrap(), - ), - replay_buffer: NonZeroUsize::new(1024).unwrap(), - key_write_buffer: NonZeroUsize::new(1024).unwrap(), - value_write_buffer: NonZeroUsize::new(1024).unwrap(), + prunable_items_per_section: NonZeroU64::new(DEFAULT_ITEMS_PER_SECTION).unwrap(), + page_cache: make_default_cache_ref(), + replay_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + key_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + value_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), block_codec_config, max_repair: config.max_repair, strategy: Sequential, @@ -123,10 +157,7 @@ pub fn archive_config( codec_config: C, ) -> immutable::Config { let prefix = format!("{}-{}", partition_prefix, store_name); - let page_cache = CacheRef::new( - NonZeroU16::new(4096).unwrap(), - NonZeroUsize::new(1024).unwrap(), - ); + let page_cache = make_default_cache_ref(); immutable::Config { metadata_partition: format!("{prefix}-metadata"), @@ -137,15 +168,15 @@ pub fn archive_config( freezer_key_partition: format!("{prefix}-freezer-key"), freezer_key_page_cache: page_cache, freezer_value_partition: format!("{prefix}-freezer-value"), - freezer_value_target_size: 4096, + freezer_value_target_size: u64::from(DEFAULT_PAGE_SIZE), freezer_value_compression: None, ordinal_partition: format!("{prefix}-ordinal"), - items_per_section: NonZeroU64::new(10).unwrap(), + items_per_section: NonZeroU64::new(DEFAULT_ITEMS_PER_SECTION).unwrap(), codec_config, - replay_buffer: NonZeroUsize::new(1024).unwrap(), - freezer_key_write_buffer: NonZeroUsize::new(1024).unwrap(), - freezer_value_write_buffer: NonZeroUsize::new(1024).unwrap(), - ordinal_write_buffer: NonZeroUsize::new(1024).unwrap(), + replay_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + freezer_key_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + freezer_value_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + ordinal_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), } } @@ -154,7 +185,7 @@ pub fn resolver_config( public_key: P, provider: C, blocker: B, - mailbox_size: usize, + config: &MarshalConfig, ) -> resolver::p2p::Config where P: commonware_cryptography::PublicKey, @@ -165,11 +196,11 @@ where public_key, provider, blocker, - mailbox_size, - initial: std::time::Duration::from_secs(1), - timeout: std::time::Duration::from_secs(2), - fetch_retry_timeout: std::time::Duration::from_millis(100), - priority_requests: false, - priority_responses: false, + mailbox_size: config.mailbox_size, + initial: config.resolver.initial, + timeout: config.resolver.timeout, + fetch_retry_timeout: config.resolver.fetch_retry_timeout, + priority_requests: config.resolver.priority_requests, + priority_responses: config.resolver.priority_responses, } } diff --git a/crates/consensus/src/relay.rs b/crates/consensus/src/relay.rs index 94a4350..814c2b3 100644 --- a/crates/consensus/src/relay.rs +++ b/crates/consensus/src/relay.rs @@ -25,7 +25,14 @@ impl EvolveRelay { where Tx: Clone, { - self.blocks.read().unwrap().get(digest).cloned() + self.blocks + .read() + .unwrap_or_else(|poison| { + tracing::warn!("relay: recovered from poisoned read lock"); + poison.into_inner() + }) + .get(digest) + .cloned() } /// Insert a block into the relay's store. @@ -33,8 +40,14 @@ impl EvolveRelay { where Tx: Clone, { - let digest = block.digest.0; - self.blocks.write().unwrap().insert(digest, block); + let digest = block.digest_value().0; + self.blocks + .write() + .unwrap_or_else(|poison| { + tracing::warn!("relay: recovered from poisoned write lock"); + poison.into_inner() + }) + .insert(digest, block); } } @@ -116,12 +129,12 @@ mod tests { vec![TestTx::new([7u8; 32])], ); let consensus_block = ConsensusBlock::new(block); - let digest = consensus_block.digest.0; + let digest = consensus_block.digest_value().0; relay.insert_block(consensus_block.clone()); let fetched = relay.get_block(&digest).expect("block should exist"); - assert_eq!(fetched.digest, consensus_block.digest); + assert_eq!(fetched.digest_value(), consensus_block.digest_value()); assert_eq!(fetched.inner.header.number, 1); } } diff --git a/crates/consensus/src/reporter.rs b/crates/consensus/src/reporter.rs index 60deeae..48c6e63 100644 --- a/crates/consensus/src/reporter.rs +++ b/crates/consensus/src/reporter.rs @@ -106,7 +106,10 @@ where }; let finalized_block = { - let mut pending = state.pending_blocks.write().unwrap(); + let mut pending = state.pending_blocks.write().unwrap_or_else(|poison| { + tracing::warn!("reporter: recovered from poisoned pending_blocks lock"); + poison.into_inner() + }); pending.remove(&finalized_digest) }; @@ -120,7 +123,7 @@ where let finalized_hash = block.block_hash(); *state.last_hash.write().await = finalized_hash; - state.height.store( + state.height.fetch_max( block.inner.header.number.saturating_add(1), Ordering::SeqCst, ); @@ -226,12 +229,12 @@ mod tests { vec![TestTx::new([1u8; 32])], ); let consensus_block = ConsensusBlock::new(block); - let digest = consensus_block.digest; + let digest = consensus_block.digest_value(); let block_hash = consensus_block.block_hash(); pending_blocks .write() .unwrap() - .insert(consensus_block.digest.0, consensus_block); + .insert(consensus_block.digest_value().0, consensus_block); let chain_state = ChainState { last_hash: last_hash.clone(), @@ -289,4 +292,35 @@ mod tests { assert_eq!(height.load(Ordering::SeqCst), 42); assert!(pending_blocks.read().unwrap().is_empty()); } + + #[tokio::test] + async fn finalization_with_unknown_digest_does_not_mutate_chain_state() { + let last_hash = Arc::new(TokioRwLock::new(B256::repeat_byte(0xBB))); + let height = Arc::new(AtomicU64::new(7)); + let pending_blocks = Arc::new(RwLock::new( + BTreeMap::<[u8; 32], ConsensusBlock>::new(), + )); + + let chain_state = ChainState { + last_hash: last_hash.clone(), + height: height.clone(), + pending_blocks: pending_blocks.clone(), + }; + let mut reporter = + EvolveReporter::, TestTx>::with_chain_state( + chain_state, + ); + + let scheme = test_scheme(); + reporter + .report(finalization_activity_for_digest( + &scheme, + Sha256Digest([0xCC; 32]), + )) + .await; + + assert_eq!(*last_hash.read().await, B256::repeat_byte(0xBB)); + assert_eq!(height.load(Ordering::SeqCst), 7); + assert!(pending_blocks.read().unwrap().is_empty()); + } } diff --git a/crates/p2p/src/provider.rs b/crates/p2p/src/provider.rs index f207955..abb073c 100644 --- a/crates/p2p/src/provider.rs +++ b/crates/p2p/src/provider.rs @@ -15,13 +15,14 @@ use tokio::sync::{mpsc, RwLock}; /// /// `(epoch_id, new_peer_set, all_tracked_peers)` type Notification = (u64, Set, Set); +const SUBSCRIBER_CHANNEL_CAPACITY: usize = 64; #[derive(Debug)] struct ProviderState { /// Peer sets indexed by epoch. BTreeMap for deterministic iteration. peer_sets: BTreeMap>, /// Active subscriber channels. Dead senders are pruned on each notify. - subscribers: Vec>, + subscribers: Vec>, /// Union of all currently tracked peer sets. all_peers: Set, } @@ -35,11 +36,29 @@ impl ProviderState { } } + fn recompute_all_peers(&mut self) { + let all_peers: Vec = self + .peer_sets + .values() + .flat_map(|s| s.iter().cloned()) + .collect(); + self.all_peers = Set::from_iter_dedup(all_peers); + } + /// Notify live subscribers and prune dead channels. fn notify(&mut self, epoch: u64, peers: Set) { self.subscribers.retain(|tx| { - tx.send((epoch, peers.clone(), self.all_peers.clone())) - .is_ok() + match tx.try_send((epoch, peers.clone(), self.all_peers.clone())) { + Ok(()) => true, + Err(mpsc::error::TrySendError::Full(_)) => { + tracing::warn!( + epoch, + "provider: dropping peer-set notification for slow subscriber" + ); + true + } + Err(mpsc::error::TrySendError::Closed(_)) => false, + } }); } } @@ -86,13 +105,7 @@ impl EpochPeerProvider { pub async fn update_epoch(&self, epoch: u64, peers: Set) { let mut state = self.inner.write().await; state.peer_sets.insert(epoch, peers.clone()); - - let all_peers: Vec = state - .peer_sets - .values() - .flat_map(|s| s.iter().cloned()) - .collect(); - state.all_peers = Set::from_iter_dedup(all_peers); + state.recompute_all_peers(); state.notify(epoch, peers); } @@ -100,17 +113,20 @@ impl EpochPeerProvider { /// Remove all epoch entries older than `min_epoch`. /// /// Recomputes `all_peers` from the remaining sets and notifies subscribers - /// with the current epoch's peer set (if it exists). + /// with the highest retained epoch's peer set (if one exists). pub async fn retain_epochs(&self, min_epoch: u64) { let mut state = self.inner.write().await; state.peer_sets.retain(|&e, _| e >= min_epoch); + state.recompute_all_peers(); - let all_peers: Vec = state + if let Some((epoch, peers)) = state .peer_sets - .values() - .flat_map(|s| s.iter().cloned()) - .collect(); - state.all_peers = Set::from_iter_dedup(all_peers); + .iter() + .next_back() + .map(|(e, p)| (*e, p.clone())) + { + state.notify(epoch, peers); + } } } @@ -138,9 +154,20 @@ impl Provider for EpochPeerProvider { > + Send { let inner = self.inner.clone(); async move { - let (tx, rx) = mpsc::unbounded_channel(); - inner.write().await.subscribers.push(tx); - rx + let (bounded_tx, mut bounded_rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel(); + + inner.write().await.subscribers.push(bounded_tx); + + tokio::spawn(async move { + while let Some(notification) = bounded_rx.recv().await { + if unbounded_tx.send(notification).is_err() { + break; + } + } + }); + + unbounded_rx } } } @@ -253,6 +280,13 @@ mod tests { let _ = rx.recv().await; provider.retain_epochs(1).await; + let (epoch, retained_set, retained_all) = + rx.recv().await.expect("retain notification expected"); + assert_eq!(epoch, 2); + assert_eq!(retained_set, peer_set(&[4])); + assert_eq!(retained_all.len(), 2); + assert!(has_key(&retained_all, &make_key(3))); + assert!(has_key(&retained_all, &make_key(4))); assert!(provider.peer_set(0).await.is_none()); assert!(provider.peer_set(1).await.is_some()); assert!(provider.peer_set(2).await.is_some()); @@ -267,6 +301,32 @@ mod tests { assert!(!has_key(&all, &make_key(2))); } + #[tokio::test] + async fn retain_epochs_pruning_highest_epoch_notifies_new_highest() { + let mut provider = EpochPeerProvider::new(); + let mut subscriber = provider.clone(); + let mut rx = subscriber.subscribe().await; + + provider.update_epoch(1, peer_set(&[1])).await; + let _ = rx.recv().await; + provider.update_epoch(5, peer_set(&[5])).await; + let _ = rx.recv().await; + + provider.retain_epochs(0).await; + let _ = rx.recv().await; + + provider.retain_epochs(6).await; + assert!(provider.peer_set(1).await.is_none()); + assert!(provider.peer_set(5).await.is_none()); + + provider.update_epoch(6, peer_set(&[6])).await; + let (epoch, new_set, all) = rx.recv().await.expect("notification expected"); + assert_eq!(epoch, 6); + assert_eq!(new_set, peer_set(&[6])); + assert_eq!(all.len(), 1); + assert!(has_key(&all, &make_key(6))); + } + #[tokio::test] async fn dropped_subscriber_is_pruned_on_notify() { let provider = EpochPeerProvider::new(); @@ -279,6 +339,8 @@ mod tests { drop(rx_b); provider.update_epoch(9, peer_set(&[42])).await; + tokio::task::yield_now().await; + provider.update_epoch(10, peer_set(&[43])).await; assert_eq!( provider.inner.read().await.subscribers.len(), 1,