diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..b03514a --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,6 @@ +[env] +# QMDB 2026.2.0's into_merkleized() (MMR computation) requires >2 MiB of stack +# when called multiple times on the same database instance. The default Rust test +# thread stack size is 2 MiB, which is insufficient. Set 16 MiB to give QMDB +# tests adequate headroom without affecting production code. +RUST_MIN_STACK = "16777216" diff --git a/Cargo.toml b/Cargo.toml index a461c5d..ff2503a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,8 @@ members = [ "crates/testing/simulator", "crates/testing/debugger", "crates/testing/cli", + "crates/consensus", + "crates/p2p", ] resolver = "2" @@ -67,14 +69,22 @@ evolve_simulator = { path = "crates/testing/simulator" } evolve_debugger = { path = "crates/testing/debugger" } evolve_node = { path = "crates/app/node" } evolve_testapp = { path = "bin/testapp" } +evolve_p2p = { path = "crates/p2p" } # outside deps linkme = { version = "0.3", default-features = false } -commonware-cryptography = "0.0.65" -commonware-runtime = "0.0.65" -commonware-storage = "0.0.65" -commonware-utils = "0.0.65" -commonware-codec = "0.0.65" +commonware-cryptography = "2026.2.0" +commonware-parallel = "2026.2.0" +commonware-runtime = "2026.2.0" +commonware-storage = "2026.2.0" +commonware-utils = "2026.2.0" +commonware-codec = "2026.2.0" +commonware-consensus = "2026.2.0" +commonware-broadcast = "2026.2.0" +commonware-macros = "2026.2.0" +commonware-p2p = "2026.2.0" +commonware-resolver = "2026.2.0" +evolve_consensus = { path = "crates/consensus" } borsh = { features = ["derive"], version = "1.5.5" } clap = { version = "4.5.31", features = ["derive"] } fixed = { version = "1.29", features = ["borsh", "serde"] } diff --git a/crates/consensus/Cargo.toml b/crates/consensus/Cargo.toml new file mode 100644 index 0000000..c871641 --- /dev/null +++ b/crates/consensus/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "evolve_consensus" +version = "0.1.0" +edition = "2021" +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +commonware-broadcast = { workspace = true } +commonware-consensus = { workspace = true } +commonware-cryptography = { workspace = true } +commonware-codec = { workspace = true } +commonware-p2p = { workspace = true } +commonware-parallel = { workspace = true } +commonware-resolver = { workspace = true } +commonware-runtime = { workspace = true } +commonware-storage = { workspace = true } +commonware-utils = { workspace = true } + +evolve_core = { workspace = true } +evolve_server = { workspace = true } +evolve_stf_traits = { workspace = true } +evolve_mempool = { workspace = true } +evolve_storage = { workspace = true } +evolve_p2p = { package = "evolve-p2p", path = "../p2p" } + +rand_core = "0.6" + +alloy-primitives = { workspace = true } +borsh = { workspace = true } +bytes = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +commonware-consensus = { workspace = true, features = ["fuzz"] } +commonware-cryptography = { workspace = true, features = ["mocks"] } +commonware-p2p = { workspace = true } +commonware-runtime = { workspace = true, features = ["test-utils"] } +commonware-macros = { workspace = true } + +[lints] +workspace = true diff --git a/crates/consensus/src/automaton.rs b/crates/consensus/src/automaton.rs new file mode 100644 index 0000000..624d450 --- /dev/null +++ b/crates/consensus/src/automaton.rs @@ -0,0 +1,283 @@ +use crate::block::ConsensusBlock; +use crate::config::ConsensusConfig; +use alloy_primitives::B256; +use commonware_consensus::types::Epoch; +use commonware_consensus::{Automaton, CertifiableAutomaton}; +use commonware_cryptography::{Hasher, Sha256}; +use commonware_utils::channel::oneshot; +use evolve_core::ReadonlyKV; +use evolve_mempool::{Mempool, MempoolTx, SharedMempool}; +use evolve_server::BlockBuilder; +use evolve_server::StfExecutor; +use evolve_stf_traits::{AccountsCodeStorage, Transaction}; +use evolve_storage::Storage; +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, RwLock}; +use tokio::sync::RwLock as TokioRwLock; + +/// EvolveAutomaton bridges Evolve's STF and mempool with commonware's consensus. +/// +/// It implements the `Automaton` and `CertifiableAutomaton` traits, allowing +/// the simplex consensus engine to propose and verify blocks through Evolve's +/// state transition function. +/// +/// # Design +/// +/// Consensus operates on opaque digests, not full blocks. The automaton: +/// - On `propose()`: builds a block from mempool txs, stores it locally, +/// returns only the digest to consensus. +/// - On `verify()`: looks up the block by digest (populated via Relay), +/// validates parent chain and height. +pub struct EvolveAutomaton { + stf: Stf, + storage: S, + codes: Codes, + mempool: SharedMempool>, + /// Block cache: stores proposed/received blocks by their digest. + pending_blocks: Arc>>>, + /// Current chain height. + height: Arc, + /// Last block hash. + last_hash: Arc>, + /// Consensus configuration. + config: ConsensusConfig, + /// Phantom for the context type. + _ctx: std::marker::PhantomData, +} + +impl Clone for EvolveAutomaton +where + Stf: Clone, + S: Clone, + Codes: Clone, +{ + fn clone(&self) -> Self { + Self { + stf: self.stf.clone(), + storage: self.storage.clone(), + codes: self.codes.clone(), + mempool: self.mempool.clone(), + pending_blocks: self.pending_blocks.clone(), + height: self.height.clone(), + last_hash: self.last_hash.clone(), + config: self.config.clone(), + _ctx: std::marker::PhantomData, + } + } +} + +impl EvolveAutomaton { + pub fn new( + stf: Stf, + storage: S, + codes: Codes, + mempool: SharedMempool>, + pending_blocks: Arc>>>, + config: ConsensusConfig, + ) -> Self { + Self { + stf, + storage, + codes, + mempool, + pending_blocks, + height: Arc::new(AtomicU64::new(1)), + last_hash: Arc::new(TokioRwLock::new(B256::ZERO)), + config, + _ctx: std::marker::PhantomData, + } + } + + /// Get the current height. + pub fn height(&self) -> u64 { + self.height.load(Ordering::SeqCst) + } + + /// Get a reference to the shared pending blocks. + pub fn pending_blocks(&self) -> &Arc>>> { + &self.pending_blocks + } + + /// Get a reference to the shared last block hash. + /// + /// The finalization path (reporter) must update this after a block is + /// certified so that subsequent `propose()` calls use the correct parent. + pub fn last_hash(&self) -> &Arc> { + &self.last_hash + } + + /// Get a reference to the shared height counter. + /// + /// The finalization path (reporter) may use this to reconcile height + /// after block finalization. + pub fn height_atomic(&self) -> &Arc { + &self.height + } +} + +impl Automaton for EvolveAutomaton +where + Tx: Transaction + MempoolTx + Clone + Send + Sync + 'static, + S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, + Codes: AccountsCodeStorage + Clone + Send + Sync + 'static, + Stf: StfExecutor + Send + Sync + Clone + 'static, + Ctx: Clone + Send + 'static, +{ + type Context = Ctx; + type Digest = commonware_cryptography::sha256::Digest; + + async fn genesis(&mut self, _epoch: Epoch) -> Self::Digest { + // Genesis: return the digest of the empty genesis block at height 0. + let genesis_block = BlockBuilder::::new() + .number(0) + .timestamp(0) + .parent_hash(B256::ZERO) + .gas_limit(self.config.gas_limit) + .build(); + + let mut genesis_block = genesis_block; + genesis_block.header.transactions_root = + compute_transactions_root(&genesis_block.transactions); + + let cb = ConsensusBlock::new(genesis_block); + let digest = cb.digest_value(); + + // Store genesis in pending blocks. + self.pending_blocks.write().unwrap().insert(digest.0, cb); + + digest + } + + // SystemTime::now() is used here for block timestamps only. This is acceptable + // in a consensus proposer context — the timestamp is validated by verifiers. + #[allow(clippy::disallowed_types)] + async fn propose(&mut self, _context: Self::Context) -> oneshot::Receiver { + let (sender, receiver) = oneshot::channel(); + + let height = self.height.clone(); + let last_hash = self.last_hash.clone(); + let gas_limit = self.config.gas_limit; + let mempool = self.mempool.clone(); + let pending_blocks = self.pending_blocks.clone(); + + // Spawn block building onto a background task. + tokio::spawn(async move { + // Pull transactions from mempool. + let selected = { + let mut pool = mempool.write().await; + pool.select(1000) // max txs per block + }; + + let transactions: Vec = selected + .into_iter() + .map(|arc_tx| (*arc_tx).clone()) + .collect(); + + // Build the block. + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let parent_hash = *last_hash.read().await; + + // Build first, then consume the height counter so a failed build + // cannot permanently skip a height. + let mut block = BlockBuilder::::new() + .number(0) + .timestamp(timestamp) + .parent_hash(parent_hash) + .gas_limit(gas_limit) + .transactions(transactions) + .build(); + + let block_height = height.fetch_add(1, Ordering::SeqCst); + block.header.number = block_height; + + block.header.transactions_root = compute_transactions_root(&block.transactions); + + let cb = ConsensusBlock::new(block); + let digest = cb.digest_value(); + + // Store in pending blocks for later retrieval. + pending_blocks.write().unwrap().insert(digest.0, cb); + + // Return the digest to consensus. + let _ = sender.send(digest); + }); + + receiver + } + + async fn verify( + &mut self, + _context: Self::Context, + payload: Self::Digest, + ) -> oneshot::Receiver { + let (sender, receiver) = oneshot::channel(); + + let pending_blocks = self.pending_blocks.clone(); + let last_hash = self.last_hash.clone(); + + tokio::spawn(async move { + // Look up the block by digest. + let block = { + let blocks = pending_blocks.read().unwrap(); + blocks.get(&payload.0).cloned() + }; + + let Some(block) = block else { + tracing::warn!( + digest = ?payload, + "verify: block not found in pending blocks" + ); + let _ = sender.send(false); + return; + }; + + // Validate parent hash chain. + let expected_parent = *last_hash.read().await; + if block.inner.header.parent_hash != expected_parent { + tracing::warn!( + expected = ?expected_parent, + actual = ?block.inner.header.parent_hash, + "verify: parent hash mismatch" + ); + let _ = sender.send(false); + return; + } + + // Validate height is positive. + if block.inner.header.number == 0 { + tracing::warn!("verify: block height cannot be 0 (genesis)"); + let _ = sender.send(false); + return; + } + + let _ = sender.send(true); + }); + + receiver + } +} + +fn compute_transactions_root(transactions: &[Tx]) -> B256 { + let mut hasher = Sha256::new(); + hasher.update(&(transactions.len() as u64).to_le_bytes()); + for tx in transactions { + hasher.update(&tx.compute_identifier()); + } + B256::from_slice(&hasher.finalize().0) +} + +impl CertifiableAutomaton for EvolveAutomaton +where + Tx: Transaction + MempoolTx + Clone + Send + Sync + 'static, + S: ReadonlyKV + Storage + Clone + Send + Sync + 'static, + Codes: AccountsCodeStorage + Clone + Send + Sync + 'static, + Stf: StfExecutor + Send + Sync + Clone + 'static, + Ctx: Clone + Send + 'static, +{ + // Use the default implementation which always certifies. +} diff --git a/crates/consensus/src/block.rs b/crates/consensus/src/block.rs new file mode 100644 index 0000000..3f9dd2a --- /dev/null +++ b/crates/consensus/src/block.rs @@ -0,0 +1,440 @@ +use alloy_primitives::B256; +use borsh::{BorshDeserialize, BorshSerialize}; +use bytes::{Buf, BufMut}; +use commonware_codec::{EncodeSize, Error as CodecError, Read, ReadExt, Write}; +use commonware_consensus::types::Height; +use commonware_consensus::Heightable; +use commonware_cryptography::{Committable, Digestible, Hasher, Sha256}; +use evolve_server::{Block, BlockHeader}; +use evolve_stf_traits::Transaction; + +/// A consensus-aware wrapper around Evolve's Block. +/// +/// Implements commonware traits (Heightable, Digestible, Committable, Codec) +/// so it can participate in simplex consensus. +#[derive(Debug, Clone)] +pub struct ConsensusBlock { + /// The inner evolve block. + pub inner: Block, + /// Cached digest (SHA-256 of the header fields). + digest: commonware_cryptography::sha256::Digest, + /// Cached parent digest. + parent_digest: commonware_cryptography::sha256::Digest, +} + +impl ConsensusBlock { + /// Return the cached block digest. + pub fn digest_value(&self) -> commonware_cryptography::sha256::Digest { + self.digest + } + + /// Return the inner block hash as a B256. + pub fn block_hash(&self) -> B256 { + B256::from_slice(&self.digest.0) + } + + /// Return the cached parent digest. + pub fn parent_digest(&self) -> commonware_cryptography::sha256::Digest { + self.parent_digest + } + + fn serialize_wire_bytes(&self) -> Vec + where + Tx: BorshSerialize, + { + let wire = to_wire(&self.inner); + borsh::to_vec(&wire).expect("wire block serialization should not fail") + } +} + +impl ConsensusBlock { + /// Create a new ConsensusBlock from an evolve Block. + /// + /// Computes and caches the block digest and parent digest. + pub fn new(inner: Block) -> Self { + let digest = compute_block_digest(&inner); + let parent_digest = hash_to_sha256_digest(inner.header.parent_hash); + Self { + inner, + digest, + parent_digest, + } + } +} + +/// Compute a deterministic SHA-256 digest from block header fields. +/// +/// All consensus-relevant header fields are included to ensure distinct blocks +/// always produce distinct digests. +fn compute_block_digest( + block: &Block, +) -> commonware_cryptography::sha256::Digest { + let mut hasher = Sha256::new(); + hasher.update(&block.header.number.to_le_bytes()); + hasher.update(&block.header.timestamp.to_le_bytes()); + hasher.update(block.header.parent_hash.as_slice()); + hasher.update(&block.header.gas_limit.to_le_bytes()); + hasher.update(&block.header.gas_used.to_le_bytes()); + hasher.update(block.header.beneficiary.as_slice()); + hasher.update(block.header.transactions_root.as_slice()); + hasher.update(block.header.state_root.as_slice()); + hasher.update(&(block.transactions.len() as u64).to_le_bytes()); + for tx in &block.transactions { + hasher.update(&tx.compute_identifier()); + } + hasher.finalize() +} + +/// Convert a B256 into a SHA-256 Digest (both are 32 bytes). +fn hash_to_sha256_digest(hash: B256) -> commonware_cryptography::sha256::Digest { + commonware_cryptography::sha256::Digest(hash.0) +} + +// -- Commonware trait implementations -- + +impl Heightable for ConsensusBlock { + fn height(&self) -> Height { + Height::new(self.inner.header.number) + } +} + +impl Digestible for ConsensusBlock { + type Digest = commonware_cryptography::sha256::Digest; + + fn digest(&self) -> Self::Digest { + self.digest + } +} + +impl Committable for ConsensusBlock { + type Commitment = commonware_cryptography::sha256::Digest; + + fn commitment(&self) -> Self::Commitment { + self.digest + } +} + +/// Borsh-based wire format for consensus block serialization. +#[derive(BorshSerialize, BorshDeserialize)] +struct WireBlock { + number: u64, + timestamp: u64, + parent_hash: [u8; 32], + gas_limit: u64, + gas_used: u64, + beneficiary: [u8; 20], + transactions_root: [u8; 32], + state_root: [u8; 32], + transactions_encoded: Vec>, +} + +impl Write for ConsensusBlock { + fn write(&self, buf: &mut impl BufMut) { + let bytes = self.serialize_wire_bytes(); + // Write length-prefixed bytes. + (bytes.len() as u32).write(buf); + buf.put_slice(&bytes); + } +} + +impl Read + for ConsensusBlock +{ + type Cfg = (); + + fn read_cfg(buf: &mut impl Buf, _cfg: &Self::Cfg) -> Result { + let len = u32::read(buf)? as usize; + if buf.remaining() < len { + return Err(CodecError::EndOfBuffer); + } + let bytes = buf.copy_to_bytes(len); + let wire: WireBlock = borsh::from_slice(&bytes) + .map_err(|_| CodecError::Invalid("ConsensusBlock", "borsh deserialization failed"))?; + + let transactions: Vec = wire + .transactions_encoded + .iter() + .map(|encoded| { + borsh::from_slice(encoded).map_err(|_| { + CodecError::Invalid("ConsensusBlock", "tx borsh deserialization failed") + }) + }) + .collect::, _>>()?; + + let header = BlockHeader { + number: wire.number, + timestamp: wire.timestamp, + parent_hash: B256::from_slice(&wire.parent_hash), + gas_limit: wire.gas_limit, + gas_used: wire.gas_used, + beneficiary: alloy_primitives::Address::from_slice(&wire.beneficiary), + transactions_root: B256::from_slice(&wire.transactions_root), + state_root: B256::from_slice(&wire.state_root), + ..Default::default() + }; + + let block = Block::new(header, transactions); + Ok(ConsensusBlock::new(block)) + } +} + +impl EncodeSize for ConsensusBlock { + fn encode_size(&self) -> usize { + let bytes = self.serialize_wire_bytes(); + // u32 length prefix + payload + 4 + bytes.len() + } +} + +fn to_wire(inner: &Block) -> WireBlock { + WireBlock { + number: inner.header.number, + timestamp: inner.header.timestamp, + parent_hash: inner.header.parent_hash.0, + gas_limit: inner.header.gas_limit, + gas_used: inner.header.gas_used, + beneficiary: inner.header.beneficiary.0 .0, + transactions_root: inner.header.transactions_root.0, + state_root: inner.header.state_root.0, + transactions_encoded: inner + .transactions + .iter() + .map(|tx| borsh::to_vec(tx).expect("tx serialization should not fail")) + .collect(), + } +} + +impl + commonware_consensus::Block for ConsensusBlock +{ + fn parent(&self) -> Self::Commitment { + self.parent_digest() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use commonware_codec::{DecodeExt, Encode}; + use evolve_core::{AccountId, FungibleAsset, InvokeRequest}; + use evolve_stf_traits::Transaction; + + #[derive(Debug, Clone, BorshSerialize, BorshDeserialize, PartialEq)] + struct TestTx { + data: Vec, + } + + impl Transaction for TestTx { + fn sender(&self) -> AccountId { + AccountId::new(1) + } + + fn recipient(&self) -> AccountId { + AccountId::new(2) + } + + fn request(&self) -> &InvokeRequest { + panic!("request() is not used in consensus block tests") + } + + fn gas_limit(&self) -> u64 { + 21_000 + } + + fn funds(&self) -> &[FungibleAsset] { + &[] + } + + fn compute_identifier(&self) -> [u8; 32] { + let mut id = [0u8; 32]; + let len = self.data.len().min(32); + id[..len].copy_from_slice(&self.data[..len]); + id + } + } + + #[test] + fn test_encode_decode_roundtrip() { + let header = BlockHeader::new(42, 1000, B256::repeat_byte(0xAA)); + let txs = vec![ + TestTx { + data: vec![1, 2, 3], + }, + TestTx { + data: vec![4, 5, 6], + }, + ]; + let block = Block::new(header, txs); + let consensus_block = ConsensusBlock::new(block); + + let encoded = consensus_block.encode(); + let decoded = ConsensusBlock::::decode(encoded).unwrap(); + + assert_eq!( + decoded.inner.header.number, + consensus_block.inner.header.number + ); + assert_eq!( + decoded.inner.header.timestamp, + consensus_block.inner.header.timestamp + ); + assert_eq!( + decoded.inner.header.parent_hash, + consensus_block.inner.header.parent_hash + ); + assert_eq!( + decoded.inner.transactions, + consensus_block.inner.transactions + ); + assert_eq!(decoded.digest, consensus_block.digest); + } + + #[test] + fn test_digest_determinism() { + let header = BlockHeader::new(10, 500, B256::repeat_byte(0xBB)); + let txs = vec![TestTx { + data: vec![7, 8, 9], + }]; + + let block1 = Block::new(header.clone(), txs.clone()); + let block2 = Block::new(header, txs); + + let cb1 = ConsensusBlock::new(block1); + let cb2 = ConsensusBlock::new(block2); + + assert_eq!(cb1.digest, cb2.digest); + } + + #[test] + fn test_heightable() { + let header = BlockHeader::new(99, 0, B256::ZERO); + let block = Block::::new(header, vec![]); + let cb = ConsensusBlock::new(block); + + assert_eq!(cb.height(), Height::new(99)); + } + + #[test] + fn test_parent_returns_parent_hash() { + let parent = B256::repeat_byte(0xCC); + let header = BlockHeader::new(5, 0, parent); + let block = Block::::new(header, vec![]); + let cb = ConsensusBlock::new(block); + + let parent_digest = as commonware_consensus::Block>::parent(&cb); + assert_eq!(parent_digest.0, parent.0); + } + + #[test] + fn test_different_blocks_different_digests() { + let block1 = Block::::new(BlockHeader::new(1, 100, B256::ZERO), vec![]); + let block2 = Block::::new(BlockHeader::new(2, 100, B256::ZERO), vec![]); + + let cb1 = ConsensusBlock::new(block1); + let cb2 = ConsensusBlock::new(block2); + + assert_ne!(cb1.digest, cb2.digest); + } + + #[test] + fn test_different_transactions_change_digest() { + let header = BlockHeader::new(7, 1_000, B256::repeat_byte(0xAB)); + let block1 = Block::new( + header.clone(), + vec![TestTx { + data: vec![1, 2, 3], + }], + ); + let block2 = Block::new( + header, + vec![TestTx { + data: vec![9, 8, 7], + }], + ); + + let cb1 = ConsensusBlock::new(block1); + let cb2 = ConsensusBlock::new(block2); + + assert_ne!(cb1.digest, cb2.digest); + } + + #[test] + fn test_roundtrip_preserves_roots() { + let mut header = BlockHeader::new(12, 5_000, B256::repeat_byte(0xCD)); + header.transactions_root = B256::repeat_byte(0x11); + header.state_root = B256::repeat_byte(0x22); + + let block = Block::new( + header, + vec![TestTx { + data: vec![4, 5, 6], + }], + ); + let consensus_block = ConsensusBlock::new(block); + + let encoded = consensus_block.encode(); + let decoded = ConsensusBlock::::decode(encoded).unwrap(); + + assert_eq!( + decoded.inner.header.transactions_root, + consensus_block.inner.header.transactions_root + ); + assert_eq!( + decoded.inner.header.state_root, + consensus_block.inner.header.state_root + ); + assert_eq!(decoded.digest, consensus_block.digest); + } + + #[test] + fn test_decode_rejects_truncated_payload() { + let header = BlockHeader::new(9, 123, B256::repeat_byte(0xEF)); + let block = Block::new( + header, + vec![TestTx { + data: vec![1, 2, 3], + }], + ); + let consensus_block = ConsensusBlock::new(block); + let mut encoded = consensus_block.encode().to_vec(); + encoded.pop(); + + let result = ConsensusBlock::::decode(encoded.as_slice()); + assert!(matches!(result, Err(CodecError::EndOfBuffer))); + } + + #[test] + fn test_decode_rejects_invalid_wire_payload() { + let payload = vec![0x01, 0x02, 0x03, 0x04]; + let mut encoded = (payload.len() as u32).to_le_bytes().to_vec(); + encoded.extend_from_slice(&payload); + + let result = ConsensusBlock::::decode(encoded.as_slice()); + assert!(result.is_err(), "invalid wire payload must fail decode"); + } + + #[test] + fn test_decode_rejects_invalid_transaction_encoding() { + let wire = WireBlock { + number: 1, + timestamp: 2, + parent_hash: B256::ZERO.0, + gas_limit: 30_000_000, + gas_used: 21_000, + beneficiary: alloy_primitives::Address::ZERO.0 .0, + transactions_root: B256::ZERO.0, + state_root: B256::ZERO.0, + transactions_encoded: vec![vec![0xFF]], + }; + + let payload = borsh::to_vec(&wire).expect("wire block serialization should not fail"); + let mut encoded = (payload.len() as u32).to_le_bytes().to_vec(); + encoded.extend_from_slice(&payload); + + let result = ConsensusBlock::::decode(encoded.as_slice()); + assert!( + result.is_err(), + "invalid transaction encoding must fail decode" + ); + } +} diff --git a/crates/consensus/src/config.rs b/crates/consensus/src/config.rs new file mode 100644 index 0000000..dc99d44 --- /dev/null +++ b/crates/consensus/src/config.rs @@ -0,0 +1,34 @@ +use std::time::Duration; + +/// Configuration for the consensus application layer. +#[derive(Debug, Clone)] +pub struct ConsensusConfig { + /// Chain ID for the evolve network. + pub chain_id: u64, + /// Namespace bytes for signing domain separation (e.g., b"_EVOLVE"). + pub namespace: Vec, + /// Gas limit per block. + pub gas_limit: u64, + /// Timeout before leader rotation. + pub leader_timeout: Duration, + /// Timeout for notarization. + pub notarization_timeout: Duration, + /// Timeout for activity before penalization. + pub activity_timeout: Duration, + /// Number of blocks per epoch. + pub epoch_length: u64, +} + +impl Default for ConsensusConfig { + fn default() -> Self { + Self { + chain_id: 1, + namespace: b"_EVOLVE".to_vec(), + gas_limit: 30_000_000, + leader_timeout: Duration::from_secs(2), + notarization_timeout: Duration::from_secs(2), + activity_timeout: Duration::from_secs(10), + epoch_length: 100, + } + } +} diff --git a/crates/consensus/src/engine.rs b/crates/consensus/src/engine.rs new file mode 100644 index 0000000..92e3baa --- /dev/null +++ b/crates/consensus/src/engine.rs @@ -0,0 +1,155 @@ +//! Simplex consensus engine configuration and setup. +//! +//! Configures `commonware_consensus::simplex::Engine` to work with Evolve's +//! automaton, relay, and reporter types. This module provides [`SimplexSetup`] +//! which builds the engine from a signing scheme, leader elector, and Evolve +//! application components. + +use crate::config::ConsensusConfig; +use commonware_consensus::simplex; +use commonware_consensus::simplex::elector::{Config as ElectorConfig, RoundRobin}; +use commonware_consensus::simplex::types::{Activity, Context}; +use commonware_consensus::{CertifiableAutomaton, Relay, Reporter}; +use commonware_cryptography::certificate::Scheme; +use commonware_cryptography::Digest; +use commonware_p2p::{Blocker, Receiver, Sender}; +use commonware_parallel::Sequential; +use commonware_runtime::buffer::paged::CacheRef; +use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage}; +use rand_core::CryptoRngCore; +use std::num::{NonZeroU16, NonZeroUsize}; +use std::time::Duration; + +/// Convenience re-export: the simplex `scheme::Scheme` super-trait that binds +/// a `certificate::Scheme` to simplex's `Subject` type. +pub use commonware_consensus::simplex::scheme::Scheme as SimplexScheme; + +/// Configuration and factory for the simplex consensus engine. +/// +/// Captures the configuration needed to build a simplex engine instance. +/// Call [`build`](Self::build) to construct the engine, then +/// [`Engine::start`](simplex::Engine::start) to begin consensus. +/// +/// # Type Parameters +/// +/// * `S` - Signing scheme (e.g., `ed25519::Scheme`, `bls12381_threshold::standard::Scheme`) +/// * `L` - Leader elector config (e.g., `RoundRobin`) +pub struct SimplexSetup { + config: ConsensusConfig, + scheme: S, + elector: L, +} + +impl SimplexSetup { + /// Create a new simplex setup with round-robin leader election. + pub fn new(config: ConsensusConfig, scheme: S) -> Self { + Self { + config, + scheme, + elector: RoundRobin::default(), + } + } +} + +impl SimplexSetup { + /// Set a custom leader elector configuration. + pub fn with_elector(self, elector: L2) -> SimplexSetup { + SimplexSetup { + config: self.config, + scheme: self.scheme, + elector, + } + } + + /// Build the simplex engine with the given runtime context and consensus components. + /// + /// Returns the engine ready to be started with + /// [`Engine::start`](simplex::Engine::start). + #[allow(clippy::too_many_arguments)] + pub fn build( + self, + context: E, + automaton: A, + relay: R, + reporter: F, + blocker: B, + ) -> simplex::Engine + where + E: Clock + CryptoRngCore + Spawner + Storage + Metrics, + D: Digest, + S: SimplexScheme, + L: ElectorConfig, + B: Blocker, + A: CertifiableAutomaton, Digest = D>, + R: Relay, + F: Reporter>, + { + // Convert activity_timeout from Duration to ViewDelta (seconds). + let activity_timeout_secs = self.config.activity_timeout.as_secs(); + let activity_timeout = + commonware_consensus::types::ViewDelta::new(activity_timeout_secs.max(1)); + + let partition = String::from_utf8(self.config.namespace.clone()) + .unwrap_or_else(|_| "evolve".to_string()); + + let cfg = simplex::Config { + scheme: self.scheme, + elector: self.elector, + blocker, + automaton, + relay, + reporter, + strategy: Sequential, + partition, + mailbox_size: 1024, + epoch: commonware_consensus::types::Epoch::new(0), + replay_buffer: NonZeroUsize::new(65536).unwrap(), + write_buffer: NonZeroUsize::new(4096).unwrap(), + page_cache: CacheRef::new( + NonZeroU16::new(4096).unwrap(), // page_size + NonZeroUsize::new(1024).unwrap(), // cache_size + ), + leader_timeout: self.config.leader_timeout, + notarization_timeout: self.config.notarization_timeout, + nullify_retry: Duration::from_secs(1), + activity_timeout, + skip_timeout: commonware_consensus::types::ViewDelta::new(5), + fetch_timeout: Duration::from_secs(5), + fetch_concurrent: 3, + }; + + simplex::Engine::new(context, cfg) + } +} + +/// Start a simplex engine with the given P2P channel pairs. +/// +/// Convenience function that calls `engine.start()` with the three required +/// network channel pairs (votes, certificates, resolver). +pub fn start_engine( + engine: simplex::Engine, + vote_network: ( + impl Sender, + impl Receiver, + ), + certificate_network: ( + impl Sender, + impl Receiver, + ), + resolver_network: ( + impl Sender, + impl Receiver, + ), +) -> Handle<()> +where + E: Clock + CryptoRngCore + Spawner + Storage + Metrics, + S: SimplexScheme, + L: ElectorConfig, + B: Blocker, + D: Digest, + A: CertifiableAutomaton, Digest = D>, + R: Relay, + F: Reporter>, +{ + engine.start(vote_network, certificate_network, resolver_network) +} diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs new file mode 100644 index 0000000..c8bb686 --- /dev/null +++ b/crates/consensus/src/lib.rs @@ -0,0 +1,17 @@ +pub mod automaton; +pub mod block; +pub mod config; +pub mod engine; +pub mod marshal; +pub mod relay; +pub mod reporter; +pub mod runner; + +pub use automaton::EvolveAutomaton; +pub use block::ConsensusBlock; +pub use config::ConsensusConfig; +pub use engine::SimplexSetup; +pub use marshal::{MarshalConfig, MarshalMailbox}; +pub use relay::EvolveRelay; +pub use reporter::{ChainState, EvolveReporter}; +pub use runner::ConsensusRunner; diff --git a/crates/consensus/src/marshal.rs b/crates/consensus/src/marshal.rs new file mode 100644 index 0000000..d03890b --- /dev/null +++ b/crates/consensus/src/marshal.rs @@ -0,0 +1,206 @@ +//! Marshal wiring for ordered finalized block delivery. +//! +//! The marshal sits between the simplex consensus engine and the application +//! reporter. It receives out-of-order finalization events from consensus, +//! reconstructs total order, and delivers blocks sequentially to the +//! application. +//! +//! # Architecture +//! +//! ```text +//! Simplex Engine +//! │ (Activity) +//! ▼ +//! Marshal Mailbox (implements Reporter) +//! │ +//! ▼ +//! Marshal Actor (archives + reordering) +//! │ (Update) +//! ▼ +//! Application Reporter +//! ``` +//! +//! # Usage +//! +//! 1. Create archive stores for certificates and blocks +//! 2. Call [`MarshalActor::init`] with the stores and a [`MarshalConfig`] +//! 3. Pass the returned [`MarshalMailbox`] as the `reporter` to +//! [`SimplexSetup::build`](crate::engine::SimplexSetup::build) +//! 4. Call [`MarshalActor::start`] with your application reporter, broadcast +//! buffer, and resolver +//! +//! See [`init_marshal_config`] for creating a config with sensible defaults. + +use commonware_consensus::types::{Epoch, ViewDelta}; +use commonware_cryptography::certificate::{ConstantProvider, Scheme}; +use commonware_parallel::Sequential; +use commonware_runtime::buffer::paged::CacheRef; +use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; +use std::time::Duration; + +const DEFAULT_ITEMS_PER_SECTION: u64 = 10; +const DEFAULT_PAGE_SIZE: u16 = 4096; +const DEFAULT_PAGE_COUNT: usize = 1024; +const DEFAULT_BUFFER_COUNT: usize = 1024; + +/// Re-export marshal types needed for wiring. +pub use commonware_consensus::marshal::{ + resolver, Actor as MarshalActor, Config as MarshalActorConfig, Mailbox as MarshalMailbox, + Update, +}; +pub use commonware_consensus::types::FixedEpocher; +pub use commonware_storage::archive::immutable; + +/// Configuration for initializing the marshal subsystem. +/// +/// Provides sensible defaults for storage partitioning, buffer sizes, +/// and epoch boundaries. Uses `NonZero` types to make invalid states +/// unrepresentable. +pub struct MarshalConfig { + /// Prefix for storage partition names (e.g., "evolve-validator-0"). + pub partition_prefix: String, + /// Number of blocks per epoch for the fixed epocher. Must be non-zero. + pub epoch_length: NonZeroU64, + /// Size of the marshal's internal mailbox. + pub mailbox_size: usize, + /// How many views to retain before pruning. + pub view_retention: u64, + /// Maximum concurrent repair requests. Must be non-zero. + pub max_repair: NonZeroUsize, + /// Resolver timing and priority settings. + pub resolver: ResolverConfig, +} + +/// Tuning knobs for the marshal's P2P resolver. +pub struct ResolverConfig { + pub initial: Duration, + pub timeout: Duration, + pub fetch_retry_timeout: Duration, + pub priority_requests: bool, + pub priority_responses: bool, +} + +impl Default for ResolverConfig { + fn default() -> Self { + Self { + initial: Duration::from_secs(1), + timeout: Duration::from_secs(2), + fetch_retry_timeout: Duration::from_millis(100), + priority_requests: false, + priority_responses: false, + } + } +} + +impl Default for MarshalConfig { + fn default() -> Self { + Self { + partition_prefix: "evolve".to_string(), + epoch_length: NonZeroU64::new(100).unwrap(), + mailbox_size: 1024, + view_retention: 10, + max_repair: NonZeroUsize::new(10).unwrap(), + resolver: ResolverConfig::default(), + } + } +} + +fn make_default_cache_ref() -> CacheRef { + CacheRef::new( + NonZeroU16::new(DEFAULT_PAGE_SIZE).unwrap(), + NonZeroUsize::new(DEFAULT_PAGE_COUNT).unwrap(), + ) +} + +/// Create a marshal [`MarshalActorConfig`] from a [`MarshalConfig`] and signing scheme. +/// +/// Uses [`ConstantProvider`] (same scheme for all epochs) and [`FixedEpocher`] +/// (fixed-length epochs). The block codec config defaults to `()`. +/// +/// # Type Parameters +/// +/// * `S` - Certificate scheme (e.g., `ed25519::Scheme`, `bls12381::Scheme`) +/// * `B` - Block type (e.g., [`ConsensusBlock`](crate::ConsensusBlock)) +pub fn init_marshal_config( + config: &MarshalConfig, + scheme: S, + block_codec_config: ::Cfg, +) -> MarshalActorConfig, FixedEpocher, Sequential> +where + S: Scheme, + B: commonware_consensus::Block, +{ + MarshalActorConfig { + provider: ConstantProvider::::new(scheme), + epocher: FixedEpocher::new(config.epoch_length), + partition_prefix: config.partition_prefix.clone(), + mailbox_size: config.mailbox_size, + view_retention_timeout: ViewDelta::new(config.view_retention), + prunable_items_per_section: NonZeroU64::new(DEFAULT_ITEMS_PER_SECTION).unwrap(), + page_cache: make_default_cache_ref(), + replay_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + key_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + value_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + block_codec_config, + max_repair: config.max_repair, + strategy: Sequential, + } +} + +/// Create an [`immutable::Config`] for a marshal archive store. +/// +/// Builds the 17-field config required by [`immutable::Archive::init`] +/// with standard settings for the given partition prefix and codec config. +pub fn archive_config( + partition_prefix: &str, + store_name: &str, + codec_config: C, +) -> immutable::Config { + let prefix = format!("{}-{}", partition_prefix, store_name); + let page_cache = make_default_cache_ref(); + + immutable::Config { + metadata_partition: format!("{prefix}-metadata"), + freezer_table_partition: format!("{prefix}-freezer-table"), + freezer_table_initial_size: 64, + freezer_table_resize_frequency: 10, + freezer_table_resize_chunk_size: 10, + freezer_key_partition: format!("{prefix}-freezer-key"), + freezer_key_page_cache: page_cache, + freezer_value_partition: format!("{prefix}-freezer-value"), + freezer_value_target_size: u64::from(DEFAULT_PAGE_SIZE), + freezer_value_compression: None, + ordinal_partition: format!("{prefix}-ordinal"), + items_per_section: NonZeroU64::new(DEFAULT_ITEMS_PER_SECTION).unwrap(), + codec_config, + replay_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + freezer_key_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + freezer_value_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + ordinal_write_buffer: NonZeroUsize::new(DEFAULT_BUFFER_COUNT).unwrap(), + } +} + +/// Create a resolver [`Config`](resolver::p2p::Config) for the marshal's P2P block fetcher. +pub fn resolver_config( + public_key: P, + provider: C, + blocker: B, + config: &MarshalConfig, +) -> resolver::p2p::Config +where + P: commonware_cryptography::PublicKey, + C: commonware_p2p::Provider, + B: commonware_p2p::Blocker, +{ + resolver::p2p::Config { + public_key, + provider, + blocker, + mailbox_size: config.mailbox_size, + initial: config.resolver.initial, + timeout: config.resolver.timeout, + fetch_retry_timeout: config.resolver.fetch_retry_timeout, + priority_requests: config.resolver.priority_requests, + priority_responses: config.resolver.priority_responses, + } +} diff --git a/crates/consensus/src/relay.rs b/crates/consensus/src/relay.rs new file mode 100644 index 0000000..814c2b3 --- /dev/null +++ b/crates/consensus/src/relay.rs @@ -0,0 +1,140 @@ +use crate::block::ConsensusBlock; +use commonware_consensus::Relay; +use std::collections::BTreeMap; +use std::sync::{Arc, RwLock}; + +/// A local in-memory relay for block broadcast. +/// +/// Stores proposed blocks so that other participants (or the local verifier) +/// can look them up by digest. In production, this would broadcast full blocks +/// over the P2P network. For now, it serves as a shared cache between the +/// automaton (proposer) and verifier. +#[derive(Clone)] +pub struct EvolveRelay { + /// Shared storage of blocks indexed by their digest. + blocks: Arc>>>, +} + +impl EvolveRelay { + pub fn new(blocks: Arc>>>) -> Self { + Self { blocks } + } + + /// Retrieve a block by its digest. + pub fn get_block(&self, digest: &[u8; 32]) -> Option> + where + Tx: Clone, + { + self.blocks + .read() + .unwrap_or_else(|poison| { + tracing::warn!("relay: recovered from poisoned read lock"); + poison.into_inner() + }) + .get(digest) + .cloned() + } + + /// Insert a block into the relay's store. + pub fn insert_block(&self, block: ConsensusBlock) + where + Tx: Clone, + { + let digest = block.digest_value().0; + self.blocks + .write() + .unwrap_or_else(|poison| { + tracing::warn!("relay: recovered from poisoned write lock"); + poison.into_inner() + }) + .insert(digest, block); + } +} + +impl Relay for EvolveRelay +where + Tx: Clone + Send + Sync + 'static, +{ + type Digest = commonware_cryptography::sha256::Digest; + + async fn broadcast(&mut self, payload: Self::Digest) { + // In production, this would broadcast the full block to all peers. + // For now, the block is already stored in the shared pending_blocks map + // by the automaton's propose() method. We just log the broadcast. + tracing::debug!( + digest = ?payload, + "relay: broadcasting block digest (local relay, no-op)" + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::B256; + use evolve_core::{AccountId, FungibleAsset, InvokeRequest, Message}; + use evolve_stf_traits::Transaction; + use std::collections::BTreeMap; + use std::sync::{Arc, RwLock}; + + #[derive(Debug, Clone)] + struct TestTx { + id: [u8; 32], + request: InvokeRequest, + funds: Vec, + } + + impl TestTx { + fn new(id: [u8; 32]) -> Self { + Self { + id, + request: InvokeRequest::new_from_message("test", 1, Message::from_bytes(vec![])), + funds: Vec::new(), + } + } + } + + impl Transaction for TestTx { + fn sender(&self) -> AccountId { + AccountId::new(1) + } + + fn recipient(&self) -> AccountId { + AccountId::new(2) + } + + fn request(&self) -> &InvokeRequest { + &self.request + } + + fn gas_limit(&self) -> u64 { + 21_000 + } + + fn funds(&self) -> &[FungibleAsset] { + &self.funds + } + + fn compute_identifier(&self) -> [u8; 32] { + self.id + } + } + + #[test] + fn insert_and_get_block_by_digest() { + let store = Arc::new(RwLock::new(BTreeMap::new())); + let relay = EvolveRelay::new(store); + let block = evolve_server::Block::new( + evolve_server::BlockHeader::new(1, 100, B256::ZERO), + vec![TestTx::new([7u8; 32])], + ); + let consensus_block = ConsensusBlock::new(block); + let digest = consensus_block.digest_value().0; + + relay.insert_block(consensus_block.clone()); + + let fetched = relay.get_block(&digest).expect("block should exist"); + assert_eq!(fetched.digest_value(), consensus_block.digest_value()); + assert_eq!(fetched.inner.header.number, 1); + } +} diff --git a/crates/consensus/src/reporter.rs b/crates/consensus/src/reporter.rs new file mode 100644 index 0000000..48c6e63 --- /dev/null +++ b/crates/consensus/src/reporter.rs @@ -0,0 +1,326 @@ +use alloy_primitives::B256; +use commonware_consensus::simplex::types::Activity; +use commonware_consensus::Reporter; +use commonware_cryptography::certificate::Scheme; +use commonware_cryptography::sha256::Digest as Sha256Digest; +use evolve_stf_traits::Transaction; +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, RwLock}; +use tokio::sync::RwLock as TokioRwLock; + +use crate::block::ConsensusBlock; + +/// Shared chain state that the reporter updates on finalization. +/// +/// The automaton exposes these via `last_hash()` and `height_atomic()`. +/// Pass them to the reporter so finalization events update chain linkage. +pub struct ChainState { + /// The hash of the most recently finalized block. + pub last_hash: Arc>, + /// The current chain height. + pub height: Arc, + /// Pending blocks cache (shared with automaton). + pub pending_blocks: Arc>>>, +} + +impl Clone for ChainState { + fn clone(&self) -> Self { + Self { + last_hash: self.last_hash.clone(), + height: self.height.clone(), + pending_blocks: self.pending_blocks.clone(), + } + } +} + +/// A reporter that logs consensus activity and updates chain state on finalization. +/// +/// Generic over the activity type so it works with any consensus scheme. +/// Holds an optional [`ChainState`] reference — when present, finalization +/// events update `last_hash` so subsequent proposals use the correct parent. +/// +/// In production, the marshal sits between simplex and this reporter, +/// delivering ordered `Update` events. For direct (non-marshal) usage, +/// this reporter receives raw `Activity` events. +pub struct EvolveReporter> { + chain_state: Option>, + _phantom: std::marker::PhantomData A>, +} + +impl Clone for EvolveReporter { + fn clone(&self) -> Self { + Self { + chain_state: self.chain_state.clone(), + _phantom: std::marker::PhantomData, + } + } +} + +impl EvolveReporter { + /// Create a reporter without chain state (logging only). + pub fn new() -> Self { + Self { + chain_state: None, + _phantom: std::marker::PhantomData, + } + } + + /// Create a reporter with chain state for finalization updates. + pub fn with_chain_state(chain_state: ChainState) -> Self { + Self { + chain_state: Some(chain_state), + _phantom: std::marker::PhantomData, + } + } +} + +impl Default for EvolveReporter { + fn default() -> Self { + Self::new() + } +} + +impl Reporter for EvolveReporter, Tx> +where + S: Scheme + Clone + Send + 'static, + Tx: Clone + Transaction + Send + Sync + 'static, +{ + type Activity = Activity; + + async fn report(&mut self, activity: Self::Activity) { + let Some(state) = self.chain_state.as_ref() else { + tracing::debug!("reporter: received consensus activity"); + return; + }; + + let finalized_digest = match activity { + Activity::Finalization(finalization) => finalization.proposal.payload.0, + _ => { + tracing::debug!( + height = state.height.load(Ordering::SeqCst), + "reporter: received non-finalization activity" + ); + return; + } + }; + + let finalized_block = { + let mut pending = state.pending_blocks.write().unwrap_or_else(|poison| { + tracing::warn!("reporter: recovered from poisoned pending_blocks lock"); + poison.into_inner() + }); + pending.remove(&finalized_digest) + }; + + let Some(block) = finalized_block else { + tracing::warn!( + digest = ?finalized_digest, + "reporter: finalization digest not found in pending blocks" + ); + return; + }; + + let finalized_hash = block.block_hash(); + *state.last_hash.write().await = finalized_hash; + state.height.fetch_max( + block.inner.header.number.saturating_add(1), + Ordering::SeqCst, + ); + + tracing::debug!( + digest = ?finalized_digest, + block_number = block.inner.header.number, + block_hash = ?finalized_hash, + "reporter: advanced chain state from finalized activity" + ); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use commonware_consensus::simplex::scheme::ed25519; + use commonware_consensus::simplex::types::{Finalization, Finalize, Proposal}; + use commonware_consensus::types::{Epoch, Round, View}; + use commonware_cryptography::{ed25519::PrivateKey, Signer as _}; + use commonware_parallel::Sequential; + use commonware_utils::ordered::Set; + use evolve_core::{AccountId, FungibleAsset, InvokeRequest, Message}; + use evolve_server::{Block, BlockHeader}; + use std::sync::Arc; + + #[derive(Clone)] + struct TestTx { + id: [u8; 32], + request: InvokeRequest, + funds: Vec, + } + + impl TestTx { + fn new(id: [u8; 32]) -> Self { + Self { + id, + request: InvokeRequest::new_from_message("test", 1, Message::from_bytes(vec![])), + funds: Vec::new(), + } + } + } + + impl Transaction for TestTx { + fn sender(&self) -> AccountId { + AccountId::new(1) + } + + fn recipient(&self) -> AccountId { + AccountId::new(2) + } + + fn request(&self) -> &InvokeRequest { + &self.request + } + + fn gas_limit(&self) -> u64 { + 21_000 + } + + fn funds(&self) -> &[FungibleAsset] { + &self.funds + } + + fn compute_identifier(&self) -> [u8; 32] { + self.id + } + } + + fn test_scheme() -> ed25519::Scheme { + let private_key = PrivateKey::from_seed(7); + let public_key = private_key.public_key(); + let participants = Set::from_iter_dedup([public_key]); + ed25519::Scheme::signer(b"reporter-test", participants, private_key) + .expect("signer must exist in participants") + } + + fn finalization_activity_for_digest( + scheme: &ed25519::Scheme, + digest: Sha256Digest, + ) -> Activity { + let proposal = Proposal::new( + Round::new(Epoch::zero(), View::new(1)), + View::zero(), + digest, + ); + let finalize_vote = Finalize::sign(scheme, proposal).expect("finalize vote must sign"); + let finalization = Finalization::from_finalizes(scheme, [&finalize_vote], &Sequential) + .expect("single-validator finalization must assemble"); + Activity::Finalization(finalization) + } + + #[tokio::test] + async fn finalization_updates_chain_state_and_evicts_pending_block() { + let last_hash = Arc::new(TokioRwLock::new(B256::ZERO)); + let height = Arc::new(AtomicU64::new(1)); + let pending_blocks = Arc::new(RwLock::new( + BTreeMap::<[u8; 32], ConsensusBlock>::new(), + )); + + let block = Block::new( + BlockHeader::new(1, 1_000, B256::ZERO), + vec![TestTx::new([1u8; 32])], + ); + let consensus_block = ConsensusBlock::new(block); + let digest = consensus_block.digest_value(); + let block_hash = consensus_block.block_hash(); + pending_blocks + .write() + .unwrap() + .insert(consensus_block.digest_value().0, consensus_block); + + let chain_state = ChainState { + last_hash: last_hash.clone(), + height: height.clone(), + pending_blocks: pending_blocks.clone(), + }; + let mut reporter = + EvolveReporter::, TestTx>::with_chain_state( + chain_state, + ); + + let scheme = test_scheme(); + reporter + .report(finalization_activity_for_digest(&scheme, digest)) + .await; + + assert_eq!(*last_hash.read().await, block_hash); + assert_eq!(height.load(Ordering::SeqCst), 2); + assert!( + pending_blocks.read().unwrap().is_empty(), + "finalized block should be evicted from pending cache" + ); + } + + #[tokio::test] + async fn non_finalization_activity_does_not_mutate_chain_state() { + let last_hash = Arc::new(TokioRwLock::new(B256::repeat_byte(0xAA))); + let height = Arc::new(AtomicU64::new(42)); + let pending_blocks = Arc::new(RwLock::new( + BTreeMap::<[u8; 32], ConsensusBlock>::new(), + )); + + let chain_state = ChainState { + last_hash: last_hash.clone(), + height: height.clone(), + pending_blocks: pending_blocks.clone(), + }; + let mut reporter = + EvolveReporter::, TestTx>::with_chain_state( + chain_state, + ); + + let digest = Sha256Digest([9u8; 32]); + let proposal = Proposal::new( + Round::new(Epoch::zero(), View::new(2)), + View::new(1), + digest, + ); + let scheme = test_scheme(); + let finalize_vote = Finalize::sign(&scheme, proposal).expect("finalize vote must sign"); + + reporter.report(Activity::Finalize(finalize_vote)).await; + + assert_eq!(*last_hash.read().await, B256::repeat_byte(0xAA)); + assert_eq!(height.load(Ordering::SeqCst), 42); + assert!(pending_blocks.read().unwrap().is_empty()); + } + + #[tokio::test] + async fn finalization_with_unknown_digest_does_not_mutate_chain_state() { + let last_hash = Arc::new(TokioRwLock::new(B256::repeat_byte(0xBB))); + let height = Arc::new(AtomicU64::new(7)); + let pending_blocks = Arc::new(RwLock::new( + BTreeMap::<[u8; 32], ConsensusBlock>::new(), + )); + + let chain_state = ChainState { + last_hash: last_hash.clone(), + height: height.clone(), + pending_blocks: pending_blocks.clone(), + }; + let mut reporter = + EvolveReporter::, TestTx>::with_chain_state( + chain_state, + ); + + let scheme = test_scheme(); + reporter + .report(finalization_activity_for_digest( + &scheme, + Sha256Digest([0xCC; 32]), + )) + .await; + + assert_eq!(*last_hash.read().await, B256::repeat_byte(0xBB)); + assert_eq!(height.load(Ordering::SeqCst), 7); + assert!(pending_blocks.read().unwrap().is_empty()); + } +} diff --git a/crates/consensus/src/runner.rs b/crates/consensus/src/runner.rs new file mode 100644 index 0000000..6c5446b --- /dev/null +++ b/crates/consensus/src/runner.rs @@ -0,0 +1,120 @@ +//! Consensus lifecycle orchestrator. +//! +//! [`ConsensusRunner`] manages the full lifecycle of consensus + P2P subsystems: +//! network initialization, engine startup, and graceful shutdown. +//! +//! # With Marshal (Production) +//! +//! For ordered finalized block delivery, initialize the marshal via +//! [`crate::marshal`] and pass the [`MarshalMailbox`](crate::MarshalMailbox) +//! as the `reporter` parameter to [`start`](ConsensusRunner::start). The +//! marshal actor then delivers blocks in sequential height order to the +//! application reporter. See [`crate::marshal`] for the full wiring pattern. +//! +//! # Without Marshal (Testing) +//! +//! For testing, pass any `Reporter` implementation directly — the simplex +//! engine will report raw activity events without ordering guarantees. + +use crate::config::ConsensusConfig; +use crate::engine::{SimplexScheme, SimplexSetup}; +use commonware_consensus::simplex::elector::{Config as ElectorConfig, RoundRobin}; +use commonware_consensus::simplex::types::{Activity, Context}; +use commonware_consensus::{CertifiableAutomaton, Relay, Reporter}; +use commonware_cryptography::Digest; +use commonware_p2p::{Blocker, Receiver, Sender}; +use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage}; +use evolve_p2p::NetworkConfig; +use rand_core::CryptoRngCore; + +/// Orchestrates the full consensus lifecycle. +/// +/// Wires together P2P networking and the simplex consensus engine, +/// managing startup and graceful shutdown of all subsystems. +/// +/// # Lifecycle +/// +/// 1. Create the runner with consensus and network configuration. +/// 2. Call [`start`](Self::start) with the runtime context, automaton, scheme, +/// blocker, and network channels. +/// 3. The returned [`Handle`] completes when the runtime signals shutdown. +/// +/// # Marshal Integration +/// +/// For production use with ordered block delivery: +/// +/// ```rust,ignore +/// use evolve_consensus::marshal::*; +/// +/// // 1. Create archive stores +/// let certs = immutable::Archive::init(ctx, archive_config(&prefix, "certs", codec)).await?; +/// let blocks = immutable::Archive::init(ctx, archive_config(&prefix, "blocks", ())).await?; +/// +/// // 2. Initialize marshal +/// let marshal_cfg = init_marshal_config::(&MarshalConfig::default(), scheme, ()); +/// let (actor, mailbox, height) = MarshalActor::init(ctx, certs, blocks, marshal_cfg).await; +/// +/// // 3. Pass marshal mailbox as reporter to ConsensusRunner::start() +/// let engine_handle = runner.start(ctx, automaton, relay, mailbox, scheme, blocker, ...); +/// +/// // 4. Start marshal actor (delivers ordered blocks to app reporter) +/// let marshal_handle = actor.start(app_reporter, broadcast_buffer, resolver); +/// ``` +pub struct ConsensusRunner { + consensus_config: ConsensusConfig, + _network_config: NetworkConfig, +} + +impl ConsensusRunner { + /// Create a new consensus runner. + pub fn new(consensus_config: ConsensusConfig, network_config: NetworkConfig) -> Self { + Self { + consensus_config, + _network_config: network_config, + } + } + + /// Start the consensus engine with the given runtime context and components. + /// + /// The `reporter` parameter accepts any [`Reporter`] implementation. + /// For production, pass a [`MarshalMailbox`](crate::MarshalMailbox) to + /// enable ordered block delivery through the marshal actor. + /// For testing, pass a mock reporter directly. + /// + /// # Returns + /// + /// A [`Handle`] that completes when the engine shuts down. + #[allow(clippy::too_many_arguments)] + pub fn start( + &self, + context: E, + automaton: A, + relay: R, + reporter: F, + scheme: S, + blocker: B, + vote_network: (VS, VR), + certificate_network: (CS, CR), + resolver_network: (RS, RR), + ) -> Handle<()> + where + E: Clock + CryptoRngCore + Spawner + Storage + Metrics, + S: SimplexScheme + Clone, + D: Digest, + B: Blocker, + A: CertifiableAutomaton, Digest = D>, + R: Relay, + F: Reporter>, + RoundRobin: ElectorConfig, + VS: Sender, + VR: Receiver, + CS: Sender, + CR: Receiver, + RS: Sender, + RR: Receiver, + { + let setup = SimplexSetup::new(self.consensus_config.clone(), scheme); + let engine = setup.build(context, automaton, relay, reporter, blocker); + engine.start(vote_network, certificate_network, resolver_network) + } +} diff --git a/crates/consensus/tests/simplex_integration.rs b/crates/consensus/tests/simplex_integration.rs new file mode 100644 index 0000000..90b0057 --- /dev/null +++ b/crates/consensus/tests/simplex_integration.rs @@ -0,0 +1,216 @@ +//! Integration tests for the simplex consensus engine wiring. +//! +//! Validates that [`SimplexSetup`] correctly configures and starts a simplex +//! engine using ed25519 signatures, RoundRobin leader election, and the +//! commonware simulated P2P network. + +use commonware_consensus::simplex::elector::RoundRobin; +use commonware_consensus::simplex::mocks; +use commonware_consensus::simplex::scheme::ed25519; +use commonware_consensus::types::View; +use commonware_consensus::Monitor; +use commonware_cryptography::Sha256; +use commonware_p2p::simulated::{Config as NetConfig, Network}; +use commonware_runtime::{deterministic, Metrics, Quota, Runner}; +use evolve_consensus::engine::SimplexSetup; +use evolve_consensus::ConsensusConfig; +use std::num::NonZeroU32; +use std::sync::Arc; +use std::time::Duration; + +/// Single-validator consensus: one validator starts consensus, proposes +/// blocks, and finalizes them via [`SimplexSetup`]. +/// +/// Verifies: +/// - The engine starts successfully via SimplexSetup::build +/// - The automaton's propose() is called (blocks are produced) +/// - Blocks are finalized (reporter receives finalization events) +/// - No faults are detected +#[test] +fn single_validator_consensus_produces_and_finalizes_blocks() { + let executor = deterministic::Runner::timed(Duration::from_secs(120)); + executor.start(|mut context| async move { + let namespace = b"evolve_test".to_vec(); + let n = 1u32; + + // Create simulated P2P network. + let (network, oracle) = Network::new( + context.with_label("network"), + NetConfig { + max_size: 1024 * 1024, + disconnect_on_block: true, + tracked_peer_sets: None, + }, + ); + network.start(); + + // Generate ed25519 fixture for 1 validator. + let fixture = ed25519::fixture(&mut context, &namespace, n); + let validator = fixture.participants[0].clone(); + + // Register validator channels on simulated network. + let control = oracle.control(validator.clone()); + let quota = Quota::per_second(NonZeroU32::MAX); + let vote_network = control.register(0, quota).await.unwrap(); + let certificate_network = control.register(1, quota).await.unwrap(); + let resolver_network = control.register(2, quota).await.unwrap(); + + // Create mock relay, application, and reporter. + let relay = Arc::new(mocks::relay::Relay::new()); + let app_cfg = mocks::application::Config { + hasher: Sha256::default(), + relay: relay.clone(), + me: validator.clone(), + propose_latency: (5.0, 1.0), + verify_latency: (5.0, 1.0), + certify_latency: (5.0, 1.0), + should_certify: mocks::application::Certifier::Always, + }; + let (application, mailbox) = + mocks::application::Application::new(context.with_label("application"), app_cfg); + application.start(); + + let elector = RoundRobin::::default(); + let reporter_cfg = mocks::reporter::Config { + participants: fixture.participants.clone().try_into().expect("non-empty"), + scheme: fixture.schemes[0].clone(), + elector: elector.clone(), + }; + let mut reporter = + mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg); + + // Subscribe to finalization events BEFORE starting the engine. + let (mut latest, mut monitor): (View, _) = reporter.subscribe().await; + + // Use SimplexSetup to build the engine -- this is the code under test. + let config = ConsensusConfig { + chain_id: 1, + namespace: namespace.clone(), + gas_limit: 30_000_000, + leader_timeout: Duration::from_secs(1), + notarization_timeout: Duration::from_secs(2), + activity_timeout: Duration::from_secs(10), + epoch_length: 100, + }; + let setup = SimplexSetup::new(config, fixture.schemes[0].clone()); + let engine = setup.build( + context.with_label("engine"), + mailbox.clone(), // automaton + mailbox, // relay (Mailbox implements both) + reporter.clone(), + oracle.control(validator.clone()), // blocker + ); + + // Start the engine with P2P channels. + let _handle = engine.start(vote_network, certificate_network, resolver_network); + + // Wait for at least 3 finalized views. + let target = View::new(3); + while latest < target { + latest = monitor.recv().await.expect("monitor channel closed"); + } + + // Verify: we reached the target view through the monitor. + assert!( + latest >= target, + "expected finalization to reach view {target:?}, got {latest:?}" + ); + + // Verify: no faults detected. + let faults = reporter.faults.lock().unwrap(); + assert!(faults.is_empty(), "unexpected faults detected"); + + // Verify: no invalid signatures. + let invalid = *reporter.invalid.lock().unwrap(); + assert_eq!(invalid, 0, "unexpected invalid signatures: {invalid}"); + }); +} + +/// Verify that ConsensusRunner wires the engine correctly. +/// +/// This test validates the higher-level ConsensusRunner API produces +/// a running consensus engine. +#[test] +fn consensus_runner_starts_engine() { + let executor = deterministic::Runner::timed(Duration::from_secs(120)); + executor.start(|mut context| async move { + let namespace = b"runner_test".to_vec(); + let n = 1u32; + + // Create simulated P2P network. + let (network, oracle) = Network::new( + context.with_label("network"), + NetConfig { + max_size: 1024 * 1024, + disconnect_on_block: true, + tracked_peer_sets: None, + }, + ); + network.start(); + + // Generate fixture. + let fixture = ed25519::fixture(&mut context, &namespace, n); + let validator = fixture.participants[0].clone(); + + // Register channels. + let control = oracle.control(validator.clone()); + let quota = Quota::per_second(NonZeroU32::MAX); + let vote_network = control.register(0, quota).await.unwrap(); + let certificate_network = control.register(1, quota).await.unwrap(); + let resolver_network = control.register(2, quota).await.unwrap(); + + // Create mock components. + let relay = Arc::new(mocks::relay::Relay::new()); + let app_cfg = mocks::application::Config { + hasher: Sha256::default(), + relay: relay.clone(), + me: validator.clone(), + propose_latency: (5.0, 1.0), + verify_latency: (5.0, 1.0), + certify_latency: (5.0, 1.0), + should_certify: mocks::application::Certifier::Always, + }; + let (application, mailbox) = + mocks::application::Application::new(context.with_label("application"), app_cfg); + application.start(); + + let elector = RoundRobin::::default(); + let reporter_cfg = mocks::reporter::Config { + participants: fixture.participants.clone().try_into().expect("non-empty"), + scheme: fixture.schemes[0].clone(), + elector: elector.clone(), + }; + let mut reporter = + mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg); + + let (mut latest, mut monitor): (View, _) = reporter.subscribe().await; + + // Use ConsensusRunner -- the higher-level API under test. + let consensus_config = ConsensusConfig::default(); + let network_config = evolve_p2p::NetworkConfig::default(); + let runner = evolve_consensus::ConsensusRunner::new(consensus_config, network_config); + let _handle = runner.start( + context.with_label("runner_engine"), + mailbox.clone(), + mailbox, + reporter.clone(), + fixture.schemes[0].clone(), + oracle.control(validator.clone()), + vote_network, + certificate_network, + resolver_network, + ); + + // Wait for finalization progress. + let target = View::new(2); + while latest < target { + latest = monitor.recv().await.expect("monitor channel closed"); + } + + // Verify: we reached the target view through the monitor. + assert!( + latest >= target, + "expected finalization to reach view {target:?}, got {latest:?}" + ); + }); +} diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml new file mode 100644 index 0000000..b655f73 --- /dev/null +++ b/crates/p2p/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "evolve-p2p" +version = "0.1.0" +edition = "2021" +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +commonware-cryptography = { workspace = true } +commonware-p2p = { workspace = true } +commonware-utils = { workspace = true } + +tokio = { workspace = true } +tracing = { workspace = true } + +[lints] +workspace = true diff --git a/crates/p2p/src/channels.rs b/crates/p2p/src/channels.rs new file mode 100644 index 0000000..897896d --- /dev/null +++ b/crates/p2p/src/channels.rs @@ -0,0 +1,155 @@ +//! P2P channel definitions for the Evolve protocol. +//! +//! Commonware P2P uses numbered channels with per-channel rate limits. +//! Each channel is independent — rate limiting and message sizing are +//! configured per channel so that e.g. large block transfers don't starve +//! consensus vote messages. + +/// Channel ID constants for the Evolve P2P protocol. +/// +/// Each channel has its own rate limit quota and message capacity. +/// Consensus channels (0–2) are used by simplex BFT. +/// Application channels (3–4) are Evolve-specific. +pub mod channel { + /// Consensus vote messages (notarize, finalize, nullify). + /// High rate, small messages (~200 bytes). + pub const VOTES: u32 = 0; + + /// Consensus certificates (notarization, finalization, nullification). + /// High rate, medium messages (~2 KB with aggregated sigs). + pub const CERTIFICATES: u32 = 1; + + /// Block resolution requests/responses. + /// Medium rate, large messages (full block data). + pub const RESOLVER: u32 = 2; + + /// Block broadcast from proposer to validators. + /// Low rate (1 per view), large messages. + pub const BLOCK_BROADCAST: u32 = 3; + + /// Transaction gossip between validators. + /// Medium rate, medium messages (encoded transactions). + /// + /// NOTE: This channel is Evolve-specific — Alto doesn't have it because + /// Alto has no transaction concept. + pub const TX_GOSSIP: u32 = 4; + + /// Total number of channels. + pub const COUNT: u32 = 5; +} + +/// Rate limiting configuration for a single P2P channel. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RateLimit { + /// Sustained throughput: messages allowed per second. + pub messages_per_second: u32, + /// Burst capacity: maximum message spike above the sustained rate. + pub burst: u32, +} + +/// Configuration for a single P2P channel. +#[derive(Clone, Debug)] +pub struct ChannelConfig { + /// Channel identifier. Must match one of the constants in [`channel`]. + pub id: u32, + /// Maximum encoded size of a single message on this channel (bytes). + pub max_message_size: usize, + /// Rate limiting parameters for this channel. + pub rate_limit: RateLimit, +} + +/// Default channel configurations for all Evolve P2P channels. +/// +/// Values are conservative starting points. Production deployments should +/// tune `max_message_size` and `rate_limit` based on observed traffic. +pub fn default_channel_configs() -> Vec { + vec![ + ChannelConfig { + id: channel::VOTES, + max_message_size: 512, + rate_limit: RateLimit { + messages_per_second: 100, + burst: 200, + }, + }, + ChannelConfig { + id: channel::CERTIFICATES, + max_message_size: 4_096, + rate_limit: RateLimit { + messages_per_second: 100, + burst: 200, + }, + }, + ChannelConfig { + id: channel::RESOLVER, + max_message_size: 4_194_304, // 4 MiB + rate_limit: RateLimit { + messages_per_second: 10, + burst: 20, + }, + }, + ChannelConfig { + id: channel::BLOCK_BROADCAST, + max_message_size: 4_194_304, // 4 MiB + rate_limit: RateLimit { + messages_per_second: 5, + burst: 10, + }, + }, + ChannelConfig { + id: channel::TX_GOSSIP, + max_message_size: 65_536, // 64 KiB + rate_limit: RateLimit { + messages_per_second: 50, + burst: 100, + }, + }, + ] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn channel_ids_are_unique_and_sequential() { + assert_eq!(channel::VOTES, 0); + assert_eq!(channel::CERTIFICATES, 1); + assert_eq!(channel::RESOLVER, 2); + assert_eq!(channel::BLOCK_BROADCAST, 3); + assert_eq!(channel::TX_GOSSIP, 4); + assert_eq!(channel::COUNT, 5); + } + + #[test] + fn default_configs_cover_all_channels() { + let configs = default_channel_configs(); + assert_eq!(configs.len(), channel::COUNT as usize); + + let mut ids: Vec = configs.iter().map(|c| c.id).collect(); + ids.sort_unstable(); + let expected: Vec = (0..channel::COUNT).collect(); + assert_eq!(ids, expected); + } + + #[test] + fn default_configs_have_positive_limits() { + for cfg in default_channel_configs() { + assert!( + cfg.max_message_size > 0, + "channel {} has zero max_message_size", + cfg.id + ); + assert!( + cfg.rate_limit.messages_per_second > 0, + "channel {} has zero messages_per_second", + cfg.id + ); + assert!( + cfg.rate_limit.burst >= cfg.rate_limit.messages_per_second, + "channel {} burst is less than messages_per_second", + cfg.id + ); + } + } +} diff --git a/crates/p2p/src/config.rs b/crates/p2p/src/config.rs new file mode 100644 index 0000000..272c4ec --- /dev/null +++ b/crates/p2p/src/config.rs @@ -0,0 +1,87 @@ +//! Network configuration for Evolve's P2P layer. + +use std::{net::SocketAddr, time::Duration}; + +use crate::channels::{default_channel_configs, ChannelConfig}; + +/// Network configuration for Evolve's authenticated P2P layer. +/// +/// Passed to the Commonware P2P bootstrapper at startup. All fields have +/// sensible defaults via [`Default`]; override only what your deployment +/// requires. +#[derive(Clone, Debug)] +pub struct NetworkConfig { + /// Address to listen on for incoming P2P connections. + pub listen_addr: SocketAddr, + + /// Bootstrapper/seed node addresses for initial peer discovery. + /// + /// Leave empty for single-node dev setups. Production nodes should list + /// at least one stable seed address. + pub bootstrappers: Vec, + + /// Per-channel rate limiting and message size configuration. + /// + /// Defaults to [`default_channel_configs`] covering all five Evolve channels. + pub channel_configs: Vec, + + /// Maximum number of concurrent peer connections to maintain. + pub max_peers: usize, + + /// Timeout for establishing a new peer connection. + pub connection_timeout: Duration, + + /// Signing namespace for domain separation. + /// + /// Prevents cross-chain message replay. Must be unique per network. + pub namespace: Vec, +} + +impl Default for NetworkConfig { + fn default() -> Self { + Self { + listen_addr: "0.0.0.0:9000".parse().expect("static addr is valid"), + bootstrappers: vec![], + channel_configs: default_channel_configs(), + max_peers: 50, + connection_timeout: Duration::from_secs(10), + namespace: b"_EVOLVE".to_vec(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::channels::channel; + use std::time::Duration; + + #[test] + fn default_config_invariants() { + let cfg = NetworkConfig::default(); + + assert_eq!(cfg.listen_addr.port(), 9000); + assert!(cfg.bootstrappers.is_empty()); + assert_eq!(cfg.channel_configs.len(), channel::COUNT as usize); + assert_eq!(cfg.namespace, b"_EVOLVE"); + assert!(cfg.max_peers > 0); + assert!(cfg.connection_timeout > Duration::ZERO); + } + + #[test] + fn custom_config_roundtrip() { + let addr: SocketAddr = "127.0.0.1:19000".parse().unwrap(); + let seed: SocketAddr = "10.0.0.1:9000".parse().unwrap(); + let cfg = NetworkConfig { + listen_addr: addr, + bootstrappers: vec![seed], + max_peers: 10, + namespace: b"_TEST".to_vec(), + ..NetworkConfig::default() + }; + assert_eq!(cfg.listen_addr, addr); + assert_eq!(cfg.bootstrappers.len(), 1); + assert_eq!(cfg.max_peers, 10); + assert_eq!(cfg.namespace, b"_TEST"); + } +} diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs new file mode 100644 index 0000000..9105a59 --- /dev/null +++ b/crates/p2p/src/lib.rs @@ -0,0 +1,19 @@ +//! P2P networking configuration for Evolve's consensus layer. +//! +//! This crate configures Commonware's authenticated P2P networking +//! for use in the Evolve blockchain. It provides: +//! +//! - Channel constants and rate limit configuration +//! - `NetworkConfig` for bootstrapping the P2P layer +//! - `ValidatorSet` for managing epoch-based validator identities +//! - `EpochPeerProvider` implementing the `Provider` trait for peer management + +pub mod channels; +pub mod config; +pub mod provider; +pub mod validator; + +pub use channels::{default_channel_configs, ChannelConfig, RateLimit}; +pub use config::NetworkConfig; +pub use provider::EpochPeerProvider; +pub use validator::{ValidatorIdentity, ValidatorSet}; diff --git a/crates/p2p/src/provider.rs b/crates/p2p/src/provider.rs new file mode 100644 index 0000000..abb073c --- /dev/null +++ b/crates/p2p/src/provider.rs @@ -0,0 +1,350 @@ +//! Epoch-based peer set provider for the Evolve P2P layer. +//! +//! [`EpochPeerProvider`] implements [`commonware_p2p::Provider`] so that the +//! Commonware authenticated P2P stack can query the current set of validator +//! peers and receive change notifications on epoch transitions. + +use std::{collections::BTreeMap, fmt, sync::Arc}; + +use commonware_cryptography::ed25519; +use commonware_p2p::Provider; +use commonware_utils::ordered::Set; +use tokio::sync::{mpsc, RwLock}; + +/// Convenience alias for the subscriber notification payload. +/// +/// `(epoch_id, new_peer_set, all_tracked_peers)` +type Notification = (u64, Set, Set); +const SUBSCRIBER_CHANNEL_CAPACITY: usize = 64; + +#[derive(Debug)] +struct ProviderState { + /// Peer sets indexed by epoch. BTreeMap for deterministic iteration. + peer_sets: BTreeMap>, + /// Active subscriber channels. Dead senders are pruned on each notify. + subscribers: Vec>, + /// Union of all currently tracked peer sets. + all_peers: Set, +} + +impl ProviderState { + fn new() -> Self { + Self { + peer_sets: BTreeMap::new(), + subscribers: Vec::new(), + all_peers: Set::default(), + } + } + + fn recompute_all_peers(&mut self) { + let all_peers: Vec = self + .peer_sets + .values() + .flat_map(|s| s.iter().cloned()) + .collect(); + self.all_peers = Set::from_iter_dedup(all_peers); + } + + /// Notify live subscribers and prune dead channels. + fn notify(&mut self, epoch: u64, peers: Set) { + self.subscribers.retain(|tx| { + match tx.try_send((epoch, peers.clone(), self.all_peers.clone())) { + Ok(()) => true, + Err(mpsc::error::TrySendError::Full(_)) => { + tracing::warn!( + epoch, + "provider: dropping peer-set notification for slow subscriber" + ); + true + } + Err(mpsc::error::TrySendError::Closed(_)) => false, + } + }); + } +} + +/// Provides peer sets to the P2P layer based on epochs. +/// +/// When the validator set changes (new epoch), call [`update_epoch`] to +/// register the new peer set and notify all current subscribers. +/// +/// [`EpochPeerProvider`] is cheaply cloneable; all clones share the same +/// underlying state via an [`Arc`]. +/// +/// [`update_epoch`]: EpochPeerProvider::update_epoch +#[derive(Clone)] +pub struct EpochPeerProvider { + inner: Arc>, +} + +impl fmt::Debug for EpochPeerProvider { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EpochPeerProvider").finish_non_exhaustive() + } +} + +impl EpochPeerProvider { + /// Create a new provider with no registered peer sets. + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(ProviderState::new())), + } + } + + /// Register a peer set for the given epoch and notify all subscribers. + /// + /// If a peer set for `epoch` already exists it is replaced. Subscribers + /// receive the new set and the updated union of all tracked peers. Dead + /// subscriber channels are pruned automatically. + /// + /// # Note + /// + /// Old epochs are not pruned — `peer_sets` grows with each call. For + /// production use, call [`retain_epochs`](Self::retain_epochs) periodically + /// to bound memory usage. + pub async fn update_epoch(&self, epoch: u64, peers: Set) { + let mut state = self.inner.write().await; + state.peer_sets.insert(epoch, peers.clone()); + state.recompute_all_peers(); + + state.notify(epoch, peers); + } + + /// Remove all epoch entries older than `min_epoch`. + /// + /// Recomputes `all_peers` from the remaining sets and notifies subscribers + /// with the highest retained epoch's peer set (if one exists). + pub async fn retain_epochs(&self, min_epoch: u64) { + let mut state = self.inner.write().await; + state.peer_sets.retain(|&e, _| e >= min_epoch); + state.recompute_all_peers(); + + if let Some((epoch, peers)) = state + .peer_sets + .iter() + .next_back() + .map(|(e, p)| (*e, p.clone())) + { + state.notify(epoch, peers); + } + } +} + +impl Default for EpochPeerProvider { + fn default() -> Self { + Self::new() + } +} + +impl Provider for EpochPeerProvider { + type PublicKey = ed25519::PublicKey; + + fn peer_set( + &mut self, + id: u64, + ) -> impl std::future::Future>> + Send { + let inner = self.inner.clone(); + async move { inner.read().await.peer_sets.get(&id).cloned() } + } + + fn subscribe( + &mut self, + ) -> impl std::future::Future< + Output = mpsc::UnboundedReceiver<(u64, Set, Set)>, + > + Send { + let inner = self.inner.clone(); + async move { + let (bounded_tx, mut bounded_rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel(); + + inner.write().await.subscribers.push(bounded_tx); + + tokio::spawn(async move { + while let Some(notification) = bounded_rx.recv().await { + if unbounded_tx.send(notification).is_err() { + break; + } + } + }); + + unbounded_rx + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use commonware_cryptography::{ed25519, Signer as _}; + + fn make_key(seed: u64) -> ed25519::PublicKey { + ed25519::PrivateKey::from_seed(seed).public_key() + } + + fn peer_set(seeds: &[u64]) -> Set { + Set::from_iter_dedup(seeds.iter().map(|&s| make_key(s))) + } + + fn has_key(set: &Set, key: &ed25519::PublicKey) -> bool { + set.iter().any(|k| k == key) + } + + #[tokio::test] + async fn peer_set_returns_none_before_registration() { + let mut provider = EpochPeerProvider::new(); + assert!(provider.peer_set(0).await.is_none()); + } + + #[tokio::test] + async fn peer_set_returns_registered_set() { + let mut provider = EpochPeerProvider::new(); + let peers = peer_set(&[1, 2, 3]); + provider.update_epoch(0, peers.clone()).await; + + let result = provider.peer_set(0).await.expect("epoch 0 must exist"); + assert_eq!(result, peers); + } + + #[tokio::test] + async fn peer_set_not_found_for_unknown_epoch() { + let mut provider = EpochPeerProvider::new(); + provider.update_epoch(1, peer_set(&[1])).await; + + assert!(provider.peer_set(0).await.is_none()); + assert!(provider.peer_set(2).await.is_none()); + } + + #[tokio::test] + async fn subscriber_notified_on_epoch_update() { + let provider = EpochPeerProvider::new(); + let mut subscriber = provider.clone(); + let mut rx = subscriber.subscribe().await; + + let peers = peer_set(&[10, 20]); + provider.update_epoch(5, peers.clone()).await; + + let (epoch, new_set, _all) = rx.recv().await.expect("notification expected"); + assert_eq!(epoch, 5); + assert_eq!(new_set, peers); + } + + #[tokio::test] + async fn all_peers_is_union_of_tracked_sets() { + let provider = EpochPeerProvider::new(); + let mut subscriber = provider.clone(); + let mut rx = subscriber.subscribe().await; + + provider.update_epoch(0, peer_set(&[1, 2])).await; + let _ = rx.recv().await; + + provider.update_epoch(1, peer_set(&[3, 4])).await; + let (_, _, all) = rx.recv().await.expect("second notification expected"); + + assert_eq!(all.len(), 4, "all_peers must be union of epochs 0 and 1"); + } + + #[tokio::test] + async fn clones_share_state() { + let provider = EpochPeerProvider::new(); + let mut clone = provider.clone(); + + provider.update_epoch(0, peer_set(&[7, 8])).await; + let result = clone.peer_set(0).await; + assert!(result.is_some(), "clone must see updates from original"); + } + + #[tokio::test] + async fn update_epoch_replaces_existing_epoch_set() { + let mut provider = EpochPeerProvider::new(); + provider.update_epoch(3, peer_set(&[1, 2])).await; + provider.update_epoch(3, peer_set(&[2, 9])).await; + + let epoch_set = provider.peer_set(3).await.expect("epoch must exist"); + assert_eq!(epoch_set.len(), 2); + assert!(has_key(&epoch_set, &make_key(2))); + assert!(has_key(&epoch_set, &make_key(9))); + assert!(!has_key(&epoch_set, &make_key(1))); + } + + #[tokio::test] + async fn retain_epochs_prunes_old_sets_and_updates_union() { + let mut provider = EpochPeerProvider::new(); + let mut subscriber = provider.clone(); + let mut rx = subscriber.subscribe().await; + + provider.update_epoch(0, peer_set(&[1, 2])).await; + let _ = rx.recv().await; + provider.update_epoch(1, peer_set(&[3])).await; + let _ = rx.recv().await; + provider.update_epoch(2, peer_set(&[4])).await; + let _ = rx.recv().await; + + provider.retain_epochs(1).await; + let (epoch, retained_set, retained_all) = + rx.recv().await.expect("retain notification expected"); + assert_eq!(epoch, 2); + assert_eq!(retained_set, peer_set(&[4])); + assert_eq!(retained_all.len(), 2); + assert!(has_key(&retained_all, &make_key(3))); + assert!(has_key(&retained_all, &make_key(4))); + assert!(provider.peer_set(0).await.is_none()); + assert!(provider.peer_set(1).await.is_some()); + assert!(provider.peer_set(2).await.is_some()); + + provider.update_epoch(3, peer_set(&[5])).await; + let (_, _, all) = rx.recv().await.expect("notification expected"); + assert_eq!(all.len(), 3); + assert!(has_key(&all, &make_key(3))); + assert!(has_key(&all, &make_key(4))); + assert!(has_key(&all, &make_key(5))); + assert!(!has_key(&all, &make_key(1))); + assert!(!has_key(&all, &make_key(2))); + } + + #[tokio::test] + async fn retain_epochs_pruning_highest_epoch_notifies_new_highest() { + let mut provider = EpochPeerProvider::new(); + let mut subscriber = provider.clone(); + let mut rx = subscriber.subscribe().await; + + provider.update_epoch(1, peer_set(&[1])).await; + let _ = rx.recv().await; + provider.update_epoch(5, peer_set(&[5])).await; + let _ = rx.recv().await; + + provider.retain_epochs(0).await; + let _ = rx.recv().await; + + provider.retain_epochs(6).await; + assert!(provider.peer_set(1).await.is_none()); + assert!(provider.peer_set(5).await.is_none()); + + provider.update_epoch(6, peer_set(&[6])).await; + let (epoch, new_set, all) = rx.recv().await.expect("notification expected"); + assert_eq!(epoch, 6); + assert_eq!(new_set, peer_set(&[6])); + assert_eq!(all.len(), 1); + assert!(has_key(&all, &make_key(6))); + } + + #[tokio::test] + async fn dropped_subscriber_is_pruned_on_notify() { + let provider = EpochPeerProvider::new(); + let mut sub_a = provider.clone(); + let mut sub_b = provider.clone(); + let _rx_a = sub_a.subscribe().await; + let rx_b = sub_b.subscribe().await; + + assert_eq!(provider.inner.read().await.subscribers.len(), 2); + drop(rx_b); + + provider.update_epoch(9, peer_set(&[42])).await; + tokio::task::yield_now().await; + provider.update_epoch(10, peer_set(&[43])).await; + assert_eq!( + provider.inner.read().await.subscribers.len(), + 1, + "dead subscriber should be removed after notify" + ); + } +} diff --git a/crates/p2p/src/validator.rs b/crates/p2p/src/validator.rs new file mode 100644 index 0000000..74d9310 --- /dev/null +++ b/crates/p2p/src/validator.rs @@ -0,0 +1,191 @@ +//! Validator identity and epoch-based validator set management. +//! +//! Each Evolve validator carries two public keys: +//! - An Ed25519 key for P2P identity and network authentication. +//! - A BLS12-381 key for consensus threshold signatures. +//! +//! [`ValidatorSet`] stores validators in a [`BTreeMap`] keyed by their Ed25519 +//! P2P key, guaranteeing deterministic iteration order for consensus. + +use std::collections::BTreeMap; + +use commonware_cryptography::{bls12381, ed25519}; + +/// A validator's dual identity in the Evolve network. +#[derive(Clone, Debug)] +pub struct ValidatorIdentity { + /// Ed25519 public key for P2P authentication and network-layer identity. + pub p2p_key: ed25519::PublicKey, + + /// BLS12-381 public key for consensus threshold signatures. + pub consensus_key: bls12381::PublicKey, +} + +/// The set of validators active during a specific epoch. +/// +/// Stored in a [`BTreeMap`] keyed by the Ed25519 P2P key so that iteration +/// order is deterministic — a requirement for consensus correctness. +#[derive(Clone, Debug)] +pub struct ValidatorSet { + validators: BTreeMap, + epoch: u64, +} + +impl ValidatorSet { + /// Create an empty validator set for the given epoch. + pub fn new(epoch: u64) -> Self { + Self { + validators: BTreeMap::new(), + epoch, + } + } + + /// Add a validator to the set. + /// + /// If a validator with the same P2P key already exists it is replaced. + pub fn add_validator(&mut self, identity: ValidatorIdentity) { + self.validators.insert(identity.p2p_key.clone(), identity); + } + + /// Return a reference to the underlying validator map. + pub fn validators(&self) -> &BTreeMap { + &self.validators + } + + /// Number of validators in the set. + pub fn len(&self) -> usize { + self.validators.len() + } + + /// Returns `true` when the set contains no validators. + pub fn is_empty(&self) -> bool { + self.validators.is_empty() + } + + /// The epoch this set corresponds to. + pub fn epoch(&self) -> u64 { + self.epoch + } + + /// Ed25519 P2P public keys for all validators, in deterministic order. + pub fn p2p_keys(&self) -> Vec { + self.validators.keys().cloned().collect() + } + + /// BLS12-381 consensus public keys for all validators, in the same + /// deterministic order as [`p2p_keys`](Self::p2p_keys). + pub fn consensus_keys(&self) -> Vec { + self.validators + .values() + .map(|v| v.consensus_key.clone()) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use commonware_cryptography::{bls12381, ed25519, Signer as _}; + + fn make_identity(seed: u64) -> ValidatorIdentity { + let p2p_key = ed25519::PrivateKey::from_seed(seed).public_key(); + let consensus_key = bls12381::PrivateKey::from_seed(seed).public_key(); + ValidatorIdentity { + p2p_key, + consensus_key, + } + } + + #[test] + fn new_set_is_empty() { + let vs = ValidatorSet::new(0); + assert!(vs.is_empty()); + assert_eq!(vs.len(), 0); + assert_eq!(vs.epoch(), 0); + } + + #[test] + fn add_and_retrieve_validators() { + let mut vs = ValidatorSet::new(1); + let id = make_identity(1); + let p2p_key = id.p2p_key.clone(); + vs.add_validator(id); + + assert_eq!(vs.len(), 1); + assert!(!vs.is_empty()); + assert!(vs.validators().contains_key(&p2p_key)); + } + + #[test] + fn duplicate_p2p_key_replaces_entry() { + let mut vs = ValidatorSet::new(2); + let id1 = make_identity(10); + let p2p_key = id1.p2p_key.clone(); + let old_consensus = id1.consensus_key.clone(); + + let id2 = ValidatorIdentity { + p2p_key: p2p_key.clone(), + consensus_key: bls12381::PrivateKey::from_seed(99).public_key(), + }; + let new_consensus = id2.consensus_key.clone(); + + vs.add_validator(id1); + vs.add_validator(id2); + assert_eq!(vs.len(), 1); + assert_eq!( + vs.validators() + .get(&p2p_key) + .expect("validator should exist") + .consensus_key, + new_consensus + ); + assert_ne!(old_consensus, new_consensus); + } + + #[test] + fn p2p_keys_deterministic_order() { + let mut vs = ValidatorSet::new(3); + for seed in 0..5u64 { + vs.add_validator(make_identity(seed)); + } + + let keys = vs.p2p_keys(); + let mut sorted = keys.clone(); + sorted.sort(); + assert_eq!( + keys, sorted, + "p2p_keys must be in sorted (deterministic) order" + ); + } + + #[test] + fn p2p_and_consensus_keys_same_length() { + let mut vs = ValidatorSet::new(4); + for seed in 0..3u64 { + vs.add_validator(make_identity(seed)); + } + assert_eq!(vs.p2p_keys().len(), vs.consensus_keys().len()); + } + + #[test] + fn p2p_and_consensus_keys_preserve_mapping_order() { + let mut vs = ValidatorSet::new(5); + for seed in 0..6u64 { + vs.add_validator(make_identity(seed)); + } + + let p2p_keys = vs.p2p_keys(); + let consensus_keys = vs.consensus_keys(); + for idx in 0..p2p_keys.len() { + let expected = &vs + .validators() + .get(&p2p_keys[idx]) + .expect("p2p key should exist") + .consensus_key; + assert_eq!( + &consensus_keys[idx], expected, + "consensus key order must match p2p key order" + ); + } + } +} diff --git a/crates/storage/src/block_store.rs b/crates/storage/src/block_store.rs index caa5bfa..113605e 100644 --- a/crates/storage/src/block_store.rs +++ b/crates/storage/src/block_store.rs @@ -32,7 +32,7 @@ use crate::types::{BlockHash, BlockStorageConfig}; use commonware_codec::RangeCfg; -use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage}; +use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage}; use commonware_storage::{ archive::{ prunable::{Archive, Config as ArchiveConfig}, @@ -108,12 +108,12 @@ where // Buffer pool for the key journal. let page_size = std::num::NonZeroU16::new(KEY_JOURNAL_PAGE_SIZE).unwrap(); let cache_pages = std::num::NonZeroUsize::new(KEY_JOURNAL_CACHE_PAGES).unwrap(); - let key_buffer_pool = PoolRef::new(page_size, cache_pages); + let key_page_cache = CacheRef::new(page_size, cache_pages); let cfg = ArchiveConfig { translator: EightCap, key_partition: format!("{}-block-index", config.partition_prefix), - key_buffer_pool, + key_page_cache, value_partition: format!("{}-block-data", config.partition_prefix), // No compression by default. Blocks are often already compressed (gzip/zstd // at the application layer), so double-compression wastes CPU. diff --git a/crates/storage/src/qmdb_impl.rs b/crates/storage/src/qmdb_impl.rs index dbad532..3b6a159 100644 --- a/crates/storage/src/qmdb_impl.rs +++ b/crates/storage/src/qmdb_impl.rs @@ -27,12 +27,15 @@ use crate::types::{ StorageValueChunk, MAX_VALUE_DATA_SIZE, }; use async_trait::async_trait; -use commonware_cryptography::sha256::Sha256; -use commonware_runtime::{utils::buffer::pool::PoolRef, Clock, Metrics, Storage as RStorage}; +use commonware_cryptography::{sha256::Sha256, DigestOf}; +use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage as RStorage}; use commonware_storage::qmdb::{ - current::{unordered::fixed::Db, FixedConfig}, + current::{ + db::{Merkleized, Unmerkleized}, + unordered::fixed::Db, + FixedConfig, + }, store::{Durable, NonDurable}, - Merkleized, Unmerkleized, }; use commonware_storage::translator::EightCap; use evolve_core::{ErrorCode, ReadonlyKV}; @@ -45,8 +48,16 @@ use tokio::sync::RwLock; /// Type alias for QMDB in Clean state (Merkleized, Durable) /// N = 64 because SHA256 digest is 32 bytes, and N must be 2 * digest_size -type QmdbClean = - Db, Durable>; +type QmdbClean = Db< + C, + StorageKey, + StorageValueChunk, + Sha256, + EightCap, + 64, + Merkleized>, + Durable, +>; /// Type alias for QMDB in Mutable state (Unmerkleized, NonDurable) type QmdbMutable = @@ -56,12 +67,16 @@ type QmdbMutable = type QmdbDurable = Db; -/// Internal state enum to track QMDB state machine transitions +/// Internal state enum to track QMDB state machine transitions. +/// +/// Both large variants are boxed to keep `sizeof(QmdbState)` small. +/// This prevents large stack allocations at every construction site and +/// keeps async future state machines that hold this enum small. enum QmdbState { - /// Clean state - ready for proofs, durable - Clean(QmdbClean), - /// Mutable state - can perform updates/deletes - Mutable(QmdbMutable), + /// Clean state - ready for proofs, durable (boxed to reduce stack pressure) + Clean(Box>), + /// Mutable state - can perform updates/deletes (boxed to reduce stack pressure) + Mutable(Box>), /// Transitional state for ownership management Transitioning, } @@ -196,17 +211,20 @@ where bitmap_metadata_partition: format!("{}_bitmap-metadata", config.partition_prefix), translator: EightCap, thread_pool: None, - buffer_pool: PoolRef::new(page_size, capacity), + page_cache: CacheRef::new(page_size, capacity), }; - // Initialize QMDB - starts in Clean state (Merkleized, Durable) - let db: QmdbClean = Db::init(context.clone(), qmdb_config) + // Initialize QMDB - starts in Clean state (Merkleized, Durable). + // Box::pin prevents the Db::init future's state machine from being + // stored inline in with_metrics' state machine, keeping the combined + // state machine (which lives on the test/caller thread's stack) small. + let db: QmdbClean = Box::pin(Db::init(context.clone(), qmdb_config)) .await .map_err(|e| StorageError::Qmdb(e.to_string()))?; Ok(Self { context: Arc::new(context), - state: Arc::new(RwLock::new(QmdbState::Clean(db))), + state: Arc::new(RwLock::new(QmdbState::Clean(Box::new(db)))), cache: Arc::new(ShardedDbCache::with_defaults()), metrics, }) @@ -244,9 +262,10 @@ where // Take ownership to perform state transitions let current_state = std::mem::replace(&mut *state_guard, QmdbState::Transitioning); - let clean_db = match current_state { + // Keep clean_db boxed to avoid putting QmdbClean on the stack. + let clean_db: Box> = match current_state { QmdbState::Clean(db) => { - // Already clean, just return current root + // Already clean, return the existing Box directly. db } QmdbState::Mutable(db) => { @@ -262,7 +281,7 @@ where .await .map_err(|e| StorageError::Qmdb(e.to_string()))?; - clean + Box::new(clean) } QmdbState::Transitioning => { return Err(StorageError::InvalidState( @@ -271,7 +290,7 @@ where } }; - // Get the root hash + // Get the root hash (Deref coerces Box> → &QmdbClean). let root = clean_db.root(); let hash = match root.as_ref().try_into() { Ok(bytes) => crate::types::CommitHash::new(bytes), @@ -281,7 +300,7 @@ where } }; - // Store clean state back + // Store clean state back (already boxed, no extra allocation). *state_guard = QmdbState::Clean(clean_db); // Record commit latency @@ -347,10 +366,11 @@ where // Take ownership to perform state transition if needed let current_state = std::mem::replace(&mut *state_guard, QmdbState::Transitioning); - let mut mutable_db: QmdbMutable = match current_state { + let mut mutable_db: Box> = match current_state { QmdbState::Clean(db) => { - // Clean → into_mutable() → Mutable (sync method) - db.into_mutable() + // Clean → into_mutable() → Mutable (sync method). + // Rust auto-moves out of Box for consuming methods. + Box::new(db.into_mutable()) } QmdbState::Mutable(db) => db, QmdbState::Transitioning => {