From 87767166c3140f0ae3edb50788326226e76cfc00 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Tue, 30 Dec 2025 18:41:22 +0000 Subject: [PATCH 1/8] removed Arc> from PeerReputationManager --- dash-spv/src/network/manager.rs | 57 ++++++++++++---- dash-spv/src/network/reputation.rs | 84 ++++++++++++------------ dash-spv/src/network/reputation_tests.rs | 12 ++-- 3 files changed, 91 insertions(+), 62 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index a268f8b7..6e9dfae8 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -41,7 +41,7 @@ pub struct PeerNetworkManager { /// Peer persistence peer_store: Arc, /// Peer reputation manager - reputation_manager: Arc, + reputation_manager: Arc>, /// Network type network: Network, /// Shutdown token @@ -83,7 +83,7 @@ impl PeerNetworkManager { let peer_store = PersistentPeerStorage::open(data_dir.clone()).await?; - let reputation_manager = Arc::new(PeerReputationManager::new()); + let mut reputation_manager = PeerReputationManager::new(); if let Err(e) = reputation_manager.load_from_storage(&peer_store).await { log::warn!("Failed to load peer reputation data: {}", e); @@ -97,7 +97,7 @@ impl PeerNetworkManager { discovery: Arc::new(discovery), addrv2_handler: Arc::new(AddrV2Handler::new()), peer_store: Arc::new(peer_store), - reputation_manager, + reputation_manager: Arc::new(Mutex::new(reputation_manager)), network: config.network, shutdown_token: CancellationToken::new(), message_tx, @@ -172,7 +172,7 @@ impl PeerNetworkManager { /// Connect to a specific peer async fn connect_to_peer(&self, addr: SocketAddr) { // Check reputation first - if !self.reputation_manager.should_connect_to_peer(&addr).await { + if !self.reputation_manager.lock().await.should_connect_to_peer(&addr).await { log::warn!("Not connecting to {} due to bad reputation", addr); return; } @@ -188,7 +188,7 @@ impl PeerNetworkManager { } // Record connection attempt - self.reputation_manager.record_connection_attempt(addr).await; + self.reputation_manager.lock().await.record_connection_attempt(addr).await; let pool = self.pool.clone(); let network = self.network; @@ -215,7 +215,11 @@ impl PeerNetworkManager { log::info!("Successfully connected to {}", addr); // Record successful connection - reputation_manager.record_successful_connection(addr).await; + reputation_manager + .lock() + .await + .record_successful_connection(addr) + .await; // Add to pool if let Err(e) = pool.add_peer(addr, peer).await { @@ -245,6 +249,8 @@ impl PeerNetworkManager { log::warn!("Handshake failed with {}: {}", addr, e); // Update reputation for handshake failure reputation_manager + .lock() + .await .update_reputation( addr, misbehavior_scores::INVALID_MESSAGE, @@ -260,6 +266,8 @@ impl PeerNetworkManager { log::debug!("Failed to connect to {}: {}", addr, e); // Minor reputation penalty for connection failure reputation_manager + .lock() + .await .update_reputation( addr, misbehavior_scores::TIMEOUT / 2, @@ -278,7 +286,7 @@ impl PeerNetworkManager { message_tx: mpsc::Sender<(SocketAddr, NetworkMessage)>, addrv2_handler: Arc, shutdown_token: CancellationToken, - reputation_manager: Arc, + reputation_manager: Arc>, connected_peer_count: Arc, ) { tokio::spawn(async move { @@ -485,6 +493,8 @@ impl PeerNetworkManager { log::debug!("Timeout reading from {}, continuing...", addr); // Minor reputation penalty for timeout reputation_manager + .lock() + .await .update_reputation( addr, misbehavior_scores::TIMEOUT, @@ -507,6 +517,8 @@ impl PeerNetworkManager { ); // Reputation penalty for invalid data reputation_manager + .lock() + .await .update_reputation( addr, misbehavior_scores::INVALID_TRANSACTION, @@ -567,6 +579,8 @@ impl PeerNetworkManager { if conn_duration > Duration::from_secs(3600) { // 1 hour reputation_manager + .lock() + .await .update_reputation(addr, positive_scores::LONG_UPTIME, "Long connection uptime") .await; } @@ -644,7 +658,7 @@ impl PeerNetworkManager { let known = addrv2_handler.get_known_addresses().await; let needed = TARGET_PEERS.saturating_sub(count); // Select best peers based on reputation - let best_peers = reputation_manager.select_best_peers(known, needed * 2).await; + let best_peers = reputation_manager.lock().await.select_best_peers(known, needed * 2).await; let mut attempted = 0; for addr in best_peers { @@ -718,7 +732,7 @@ impl PeerNetworkManager { if let Err(e) = peer_guard.send_ping().await { log::error!("Failed to ping {}: {}", addr, e); // Update reputation for ping failure - reputation_manager.update_reputation( + reputation_manager.lock().await.update_reputation( addr, misbehavior_scores::TIMEOUT, "Ping failed", @@ -738,7 +752,7 @@ impl PeerNetworkManager { } // Save reputation data periodically - if let Err(e) = reputation_manager.save_to_storage(&peer_store).await { + if let Err(e) = reputation_manager.lock().await.save_to_storage(&peer_store).await { log::warn!("Failed to save reputation data: {}", e); } } @@ -944,7 +958,7 @@ impl PeerNetworkManager { /// Get reputation information for all peers pub async fn get_peer_reputations(&self) -> HashMap { - let reputations = self.reputation_manager.get_all_reputations().await; + let reputations = self.reputation_manager.lock().await.get_all_reputations().await; reputations.into_iter().map(|(addr, rep)| (addr, (rep.score, rep.is_banned()))).collect() } @@ -983,6 +997,8 @@ impl PeerNetworkManager { // Update reputation to trigger ban self.reputation_manager + .lock() + .await .update_reputation( *addr, misbehavior_scores::INVALID_HEADER * 2, // Severe penalty @@ -995,7 +1011,7 @@ impl PeerNetworkManager { /// Unban a specific peer pub async fn unban_peer(&self, addr: &SocketAddr) { - self.reputation_manager.unban_peer(addr).await; + self.reputation_manager.lock().await.unban_peer(addr).await; } /// Shutdown the network manager @@ -1012,7 +1028,8 @@ impl PeerNetworkManager { } // Save reputation data before shutdown - if let Err(e) = self.reputation_manager.save_to_storage(&self.peer_store).await { + if let Err(e) = self.reputation_manager.lock().await.save_to_storage(&self.peer_store).await + { log::warn!("Failed to save reputation data on shutdown: {}", e); } @@ -1112,7 +1129,11 @@ impl NetworkManager for PeerNetworkManager { ) -> NetworkResult<()> { // Get the last peer that sent us a message if let Some(addr) = self.get_last_message_peer().await { - self.reputation_manager.update_reputation(addr, score_change, reason).await; + self.reputation_manager + .lock() + .await + .update_reputation(addr, score_change, reason) + .await; } Ok(()) } @@ -1142,11 +1163,15 @@ impl NetworkManager for PeerNetworkManager { // Apply misbehavior score and a short temporary ban self.reputation_manager + .lock() + .await .update_reputation(addr, misbehavior_scores::INVALID_CHAINLOCK, reason) .await; // Short ban: 10 minutes for relaying invalid ChainLock self.reputation_manager + .lock() + .await .temporary_ban_peer(addr, Duration::from_secs(10 * 60), reason) .await; } @@ -1160,11 +1185,15 @@ impl NetworkManager for PeerNetworkManager { if let Some(addr) = self.get_last_message_peer().await { // Apply misbehavior score and a short temporary ban self.reputation_manager + .lock() + .await .update_reputation(addr, misbehavior_scores::INVALID_INSTANTLOCK, reason) .await; // Short ban: 10 minutes for relaying invalid InstantLock self.reputation_manager + .lock() + .await .temporary_ban_peer(addr, Duration::from_secs(10 * 60), reason) .await; diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index 2c01e45e..dab32e38 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -8,9 +8,7 @@ use serde::{Deserialize, Deserializer, Serialize}; use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::RwLock; use crate::storage::{PeerStorage, PersistentPeerStorage}; @@ -239,10 +237,10 @@ pub struct ReputationEvent { /// Peer reputation manager pub struct PeerReputationManager { /// Reputation data for each peer - reputations: Arc>>, + reputations: HashMap, /// Recent reputation events for monitoring - recent_events: Arc>>, + recent_events: Vec, /// Maximum number of events to keep max_events: usize, @@ -258,21 +256,20 @@ impl PeerReputationManager { /// Create a new reputation manager pub fn new() -> Self { Self { - reputations: Arc::new(RwLock::new(HashMap::new())), - recent_events: Arc::new(RwLock::new(Vec::new())), + reputations: HashMap::new(), + recent_events: Vec::new(), max_events: 1000, } } /// Update peer reputation pub async fn update_reputation( - &self, + &mut self, peer: SocketAddr, score_change: i32, reason: &str, ) -> bool { - let mut reputations = self.reputations.write().await; - let reputation = reputations.entry(peer).or_default(); + let reputation = self.reputations.entry(peer).or_default(); // Apply decay first reputation.apply_decay(); @@ -323,15 +320,14 @@ impl PeerReputationManager { timestamp: Instant::now(), }; - drop(reputations); // Release lock before recording event self.record_event(event).await; should_ban } /// Record a reputation event - async fn record_event(&self, event: ReputationEvent) { - let mut events = self.recent_events.write().await; + async fn record_event(&mut self, event: ReputationEvent) { + let events = &mut self.recent_events; events.push(event); // Keep only recent events @@ -342,8 +338,8 @@ impl PeerReputationManager { } /// Check if a peer is banned - pub async fn is_banned(&self, peer: &SocketAddr) -> bool { - let mut reputations = self.reputations.write().await; + pub async fn is_banned(&mut self, peer: &SocketAddr) -> bool { + let reputations = &mut self.reputations; if let Some(reputation) = reputations.get_mut(peer) { reputation.apply_decay(); reputation.is_banned() @@ -353,8 +349,8 @@ impl PeerReputationManager { } /// Get peer reputation score - pub async fn get_score(&self, peer: &SocketAddr) -> i32 { - let mut reputations = self.reputations.write().await; + pub async fn get_score(&mut self, peer: &SocketAddr) -> i32 { + let reputations = &mut self.reputations; if let Some(reputation) = reputations.get_mut(peer) { reputation.apply_decay(); reputation.score @@ -365,8 +361,8 @@ impl PeerReputationManager { /// Temporarily ban a peer for a specified duration, regardless of score. /// This can be used for critical protocol violations (e.g., invalid ChainLocks). - pub async fn temporary_ban_peer(&self, peer: SocketAddr, duration: Duration, reason: &str) { - let mut reputations = self.reputations.write().await; + pub async fn temporary_ban_peer(&mut self, peer: SocketAddr, duration: Duration, reason: &str) { + let reputations = &mut self.reputations; let reputation = reputations.entry(peer).or_default(); reputation.banned_until = Some(Instant::now() + duration); @@ -382,23 +378,23 @@ impl PeerReputationManager { } /// Record a connection attempt - pub async fn record_connection_attempt(&self, peer: SocketAddr) { - let mut reputations = self.reputations.write().await; + pub async fn record_connection_attempt(&mut self, peer: SocketAddr) { + let reputations = &mut self.reputations; let reputation = reputations.entry(peer).or_default(); reputation.connection_attempts += 1; reputation.last_connection = Some(Instant::now()); } /// Record a successful connection - pub async fn record_successful_connection(&self, peer: SocketAddr) { - let mut reputations = self.reputations.write().await; + pub async fn record_successful_connection(&mut self, peer: SocketAddr) { + let reputations = &mut self.reputations; let reputation = reputations.entry(peer).or_default(); reputation.successful_connections += 1; } /// Get all peer reputations - pub async fn get_all_reputations(&self) -> HashMap { - let mut reputations = self.reputations.write().await; + pub async fn get_all_reputations(&mut self) -> HashMap { + let reputations = &mut self.reputations; // Apply decay to all peers for reputation in reputations.values_mut() { @@ -410,12 +406,12 @@ impl PeerReputationManager { /// Get recent reputation events pub async fn get_recent_events(&self) -> Vec { - self.recent_events.read().await.clone() + self.recent_events.clone() } /// Clear banned status for a peer (admin function) - pub async fn unban_peer(&self, peer: &SocketAddr) { - let mut reputations = self.reputations.write().await; + pub async fn unban_peer(&mut self, peer: &SocketAddr) { + let reputations = &mut self.reputations; if let Some(reputation) = reputations.get_mut(peer) { reputation.banned_until = None; reputation.score = reputation.score.min(MAX_MISBEHAVIOR_SCORE - 10); @@ -424,15 +420,15 @@ impl PeerReputationManager { } /// Reset reputation for a peer - pub async fn reset_reputation(&self, peer: &SocketAddr) { - let mut reputations = self.reputations.write().await; + pub async fn reset_reputation(&mut self, peer: &SocketAddr) { + let reputations = &mut self.reputations; reputations.remove(peer); log::info!("Reset reputation for peer {}", peer); } /// Get peers sorted by reputation (best first) - pub async fn get_peers_by_reputation(&self) -> Vec<(SocketAddr, i32)> { - let mut reputations = self.reputations.write().await; + pub async fn get_peers_by_reputation(&mut self) -> Vec<(SocketAddr, i32)> { + let reputations = &mut self.reputations; // Apply decay and collect scores let mut peer_scores: Vec<(SocketAddr, i32)> = reputations @@ -451,17 +447,21 @@ impl PeerReputationManager { } /// Save reputation data to persistent storage - pub async fn save_to_storage(&self, storage: &PersistentPeerStorage) -> std::io::Result<()> { - let reputations = self.reputations.read().await; - - storage.save_peers_reputation(&reputations).await.map_err(std::io::Error::other) + pub async fn save_to_storage( + &mut self, + storage: &PersistentPeerStorage, + ) -> std::io::Result<()> { + storage.save_peers_reputation(&self.reputations).await.map_err(std::io::Error::other) } /// Load reputation data from persistent storage - pub async fn load_from_storage(&self, storage: &PersistentPeerStorage) -> std::io::Result<()> { + pub async fn load_from_storage( + &mut self, + storage: &PersistentPeerStorage, + ) -> std::io::Result<()> { let data = storage.load_peers_reputation().await.map_err(std::io::Error::other)?; - let mut reputations = self.reputations.write().await; + let reputations = &mut self.reputations; let mut loaded_count = 0; let mut skipped_count = 0; @@ -501,26 +501,26 @@ impl PeerReputationManager { pub trait ReputationAware { /// Select best peers based on reputation fn select_best_peers( - &self, + &mut self, available_peers: Vec, count: usize, ) -> impl std::future::Future> + Send; /// Check if we should connect to a peer based on reputation fn should_connect_to_peer( - &self, + &mut self, peer: &SocketAddr, ) -> impl std::future::Future + Send; } impl ReputationAware for PeerReputationManager { async fn select_best_peers( - &self, + &mut self, available_peers: Vec, count: usize, ) -> Vec { let mut peer_scores = Vec::new(); - let mut reputations = self.reputations.write().await; + let reputations = &mut self.reputations; for peer in available_peers { let reputation = reputations.entry(peer).or_default(); @@ -538,7 +538,7 @@ impl ReputationAware for PeerReputationManager { peer_scores.into_iter().take(count).map(|(peer, _)| peer).collect() } - async fn should_connect_to_peer(&self, peer: &SocketAddr) -> bool { + async fn should_connect_to_peer(&mut self, peer: &SocketAddr) -> bool { !self.is_banned(peer).await } } diff --git a/dash-spv/src/network/reputation_tests.rs b/dash-spv/src/network/reputation_tests.rs index 8ab6dffc..9239b105 100644 --- a/dash-spv/src/network/reputation_tests.rs +++ b/dash-spv/src/network/reputation_tests.rs @@ -9,7 +9,7 @@ mod tests { #[tokio::test] async fn test_basic_reputation_operations() { - let manager = PeerReputationManager::new(); + let mut manager = PeerReputationManager::new(); let peer: SocketAddr = "127.0.0.1:8333".parse().unwrap(); // Initial score should be 0 @@ -28,7 +28,7 @@ mod tests { #[tokio::test] async fn test_banning_mechanism() { - let manager = PeerReputationManager::new(); + let mut manager = PeerReputationManager::new(); let peer: SocketAddr = "192.168.1.1:8333".parse().unwrap(); // Accumulate misbehavior @@ -54,7 +54,7 @@ mod tests { #[tokio::test] async fn test_reputation_persistence() { - let manager = PeerReputationManager::new(); + let mut manager = PeerReputationManager::new(); let peer1: SocketAddr = "10.0.0.1:8333".parse().unwrap(); let peer2: SocketAddr = "10.0.0.2:8333".parse().unwrap(); @@ -69,7 +69,7 @@ mod tests { .expect("Failed to open PersistentPeerStorage"); manager.save_to_storage(&peer_storage).await.unwrap(); - let new_manager = PeerReputationManager::new(); + let mut new_manager = PeerReputationManager::new(); new_manager.load_from_storage(&peer_storage).await.unwrap(); // Verify scores were preserved @@ -79,7 +79,7 @@ mod tests { #[tokio::test] async fn test_peer_selection() { - let manager = PeerReputationManager::new(); + let mut manager = PeerReputationManager::new(); let good_peer: SocketAddr = "1.1.1.1:8333".parse().unwrap(); let neutral_peer: SocketAddr = "2.2.2.2:8333".parse().unwrap(); @@ -101,7 +101,7 @@ mod tests { #[tokio::test] async fn test_connection_tracking() { - let manager = PeerReputationManager::new(); + let mut manager = PeerReputationManager::new(); let peer: SocketAddr = "127.0.0.1:9999".parse().unwrap(); // Track connection attempts From 9c06cc3ab6ab92ccf20e785761673f03f2661f16 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Tue, 30 Dec 2025 18:53:19 +0000 Subject: [PATCH 2/8] removed unused fields in PeerReputationManager --- dash-spv/src/network/reputation.rs | 44 ------------------------------ 1 file changed, 44 deletions(-) diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index dab32e38..5a3b9961 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -225,25 +225,10 @@ impl PeerReputation { } } -/// Reputation change event -#[derive(Debug, Clone)] -pub struct ReputationEvent { - pub peer: SocketAddr, - pub change: i32, - pub reason: String, - pub timestamp: Instant, -} - /// Peer reputation manager pub struct PeerReputationManager { /// Reputation data for each peer reputations: HashMap, - - /// Recent reputation events for monitoring - recent_events: Vec, - - /// Maximum number of events to keep - max_events: usize, } impl Default for PeerReputationManager { @@ -257,8 +242,6 @@ impl PeerReputationManager { pub fn new() -> Self { Self { reputations: HashMap::new(), - recent_events: Vec::new(), - max_events: 1000, } } @@ -312,31 +295,9 @@ impl PeerReputationManager { ); } - // Record event - let event = ReputationEvent { - peer, - change: score_change, - reason: reason.to_string(), - timestamp: Instant::now(), - }; - - self.record_event(event).await; - should_ban } - /// Record a reputation event - async fn record_event(&mut self, event: ReputationEvent) { - let events = &mut self.recent_events; - events.push(event); - - // Keep only recent events - if events.len() > self.max_events { - let drain_count = events.len() - self.max_events; - events.drain(0..drain_count); - } - } - /// Check if a peer is banned pub async fn is_banned(&mut self, peer: &SocketAddr) -> bool { let reputations = &mut self.reputations; @@ -404,11 +365,6 @@ impl PeerReputationManager { reputations.clone() } - /// Get recent reputation events - pub async fn get_recent_events(&self) -> Vec { - self.recent_events.clone() - } - /// Clear banned status for a peer (admin function) pub async fn unban_peer(&mut self, peer: &SocketAddr) { let reputations = &mut self.reputations; From fa6593a45a2db099d491f29f33a23dbaf05dde15 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Tue, 30 Dec 2025 19:11:56 +0000 Subject: [PATCH 3/8] removed unused stuff after making the reputation module private --- dash-spv/src/network/manager.rs | 8 +- dash-spv/src/network/mod.rs | 3 +- dash-spv/src/network/reputation.rs | 173 ++++++++++++----------- dash-spv/src/network/reputation_tests.rs | 118 ---------------- dash-spv/src/storage/peers.rs | 2 +- 5 files changed, 98 insertions(+), 206 deletions(-) delete mode 100644 dash-spv/src/network/reputation_tests.rs diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 6e9dfae8..cd668407 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -1,6 +1,6 @@ //! Peer network manager for SPV client -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -956,12 +956,6 @@ impl PeerNetworkManager { Ok(()) } - /// Get reputation information for all peers - pub async fn get_peer_reputations(&self) -> HashMap { - let reputations = self.reputation_manager.lock().await.get_all_reputations().await; - reputations.into_iter().map(|(addr, rep)| (addr, (rep.score, rep.is_banned()))).collect() - } - /// Get the last peer that sent us a message pub async fn get_last_message_peer(&self) -> Option { let last_peer = self.last_message_peer.lock().await; diff --git a/dash-spv/src/network/mod.rs b/dash-spv/src/network/mod.rs index ff427d57..669f633f 100644 --- a/dash-spv/src/network/mod.rs +++ b/dash-spv/src/network/mod.rs @@ -7,7 +7,7 @@ pub mod handshake; pub mod manager; pub mod peer; pub mod pool; -pub mod reputation; +mod reputation; #[cfg(test)] mod tests; @@ -24,6 +24,7 @@ use dashcore::BlockHash; pub use handshake::{HandshakeManager, HandshakeState}; pub use manager::PeerNetworkManager; pub use peer::Peer; +pub use reputation::PeerReputation; /// Network manager trait for abstracting network operations. #[async_trait] diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index 5a3b9961..dee60511 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -20,47 +20,21 @@ pub mod misbehavior_scores { /// Invalid block header pub const INVALID_HEADER: i32 = 50; - /// Invalid compact filter - pub const INVALID_FILTER: i32 = 25; - /// Timeout or slow response pub const TIMEOUT: i32 = 5; - /// Sending unsolicited data - pub const UNSOLICITED_DATA: i32 = 15; - /// Invalid transaction pub const INVALID_TRANSACTION: i32 = 20; - /// Invalid masternode list diff - pub const INVALID_MASTERNODE_DIFF: i32 = 30; - /// Invalid ChainLock pub const INVALID_CHAINLOCK: i32 = 40; /// Invalid InstantLock pub const INVALID_INSTANTLOCK: i32 = 35; - - /// Duplicate message - pub const DUPLICATE_MESSAGE: i32 = 5; - - /// Connection flood attempt - pub const CONNECTION_FLOOD: i32 = 20; } /// Positive behavior scores pub mod positive_scores { - /// Successfully provided valid headers - pub const VALID_HEADERS: i32 = -5; - - /// Successfully provided valid filters - pub const VALID_FILTERS: i32 = -3; - - /// Successfully provided valid block - pub const VALID_BLOCK: i32 = -10; - - /// Fast response time - pub const FAST_RESPONSE: i32 = -2; /// Long uptime connection pub const LONG_UPTIME: i32 = -5; @@ -309,17 +283,6 @@ impl PeerReputationManager { } } - /// Get peer reputation score - pub async fn get_score(&mut self, peer: &SocketAddr) -> i32 { - let reputations = &mut self.reputations; - if let Some(reputation) = reputations.get_mut(peer) { - reputation.apply_decay(); - reputation.score - } else { - 0 - } - } - /// Temporarily ban a peer for a specified duration, regardless of score. /// This can be used for critical protocol violations (e.g., invalid ChainLocks). pub async fn temporary_ban_peer(&mut self, peer: SocketAddr, duration: Duration, reason: &str) { @@ -353,18 +316,6 @@ impl PeerReputationManager { reputation.successful_connections += 1; } - /// Get all peer reputations - pub async fn get_all_reputations(&mut self) -> HashMap { - let reputations = &mut self.reputations; - - // Apply decay to all peers - for reputation in reputations.values_mut() { - reputation.apply_decay(); - } - - reputations.clone() - } - /// Clear banned status for a peer (admin function) pub async fn unban_peer(&mut self, peer: &SocketAddr) { let reputations = &mut self.reputations; @@ -375,33 +326,6 @@ impl PeerReputationManager { } } - /// Reset reputation for a peer - pub async fn reset_reputation(&mut self, peer: &SocketAddr) { - let reputations = &mut self.reputations; - reputations.remove(peer); - log::info!("Reset reputation for peer {}", peer); - } - - /// Get peers sorted by reputation (best first) - pub async fn get_peers_by_reputation(&mut self) -> Vec<(SocketAddr, i32)> { - let reputations = &mut self.reputations; - - // Apply decay and collect scores - let mut peer_scores: Vec<(SocketAddr, i32)> = reputations - .iter_mut() - .map(|(addr, rep)| { - rep.apply_decay(); - (*addr, rep.score) - }) - .filter(|(_, score)| *score < MAX_MISBEHAVIOR_SCORE) // Exclude banned peers - .collect(); - - // Sort by score (lower is better) - peer_scores.sort_by_key(|(_, score)| *score); - - peer_scores - } - /// Save reputation data to persistent storage pub async fn save_to_storage( &mut self, @@ -499,7 +423,98 @@ impl ReputationAware for PeerReputationManager { } } -// Include tests module #[cfg(test)] -#[path = "reputation_tests.rs"] -mod reputation_tests; +mod tests { + use crate::storage::PersistentStorage; + + use super::*; + use std::net::SocketAddr; + + #[tokio::test] + async fn test_basic_reputation_operations() { + let mut manager = PeerReputationManager::new(); + let peer: SocketAddr = "127.0.0.1:8333".parse().unwrap(); + + // Initial score should be 0 + assert_eq!(manager.reputations.get(&peer).expect("Peer not found").score, 0); + + // Test misbehavior + manager + .update_reputation(peer, misbehavior_scores::INVALID_MESSAGE, "Test invalid message") + .await; + assert_eq!(manager.reputations.get(&peer).expect("Peer not found").score, 10); + } + + #[tokio::test] + async fn test_banning_mechanism() { + let mut manager = PeerReputationManager::new(); + let peer: SocketAddr = "192.168.1.1:8333".parse().unwrap(); + + // Accumulate misbehavior + for i in 0..10 { + let banned = manager + .update_reputation( + peer, + misbehavior_scores::INVALID_MESSAGE, + &format!("Violation {}", i), + ) + .await; + + // Should be banned on the 10th violation (total score = 100) + if i == 9 { + assert!(banned); + } else { + assert!(!banned); + } + } + + assert!(manager.is_banned(&peer).await); + } + + #[tokio::test] + async fn test_reputation_persistence() { + let mut manager = PeerReputationManager::new(); + let peer1: SocketAddr = "10.0.0.1:8333".parse().unwrap(); + let peer2: SocketAddr = "10.0.0.2:8333".parse().unwrap(); + + // Set reputations + manager.update_reputation(peer1, -10, "Good peer").await; + manager.update_reputation(peer2, 50, "Bad peer").await; + + // Save and load + let temp_dir = tempfile::TempDir::new().unwrap(); + let peer_storage = PersistentPeerStorage::open(temp_dir.path()) + .await + .expect("Failed to open PersistentPeerStorage"); + manager.save_to_storage(&peer_storage).await.unwrap(); + + let mut new_manager = PeerReputationManager::new(); + new_manager.load_from_storage(&peer_storage).await.unwrap(); + + // Verify scores were preserved + assert_eq!(new_manager.reputations.get(&peer1).expect("Peer not found").score, -10); + assert_eq!(new_manager.reputations.get(&peer2).expect("Peer not found").score, 50); + } + + #[tokio::test] + async fn test_peer_selection() { + let mut manager = PeerReputationManager::new(); + + let good_peer: SocketAddr = "1.1.1.1:8333".parse().unwrap(); + let neutral_peer: SocketAddr = "2.2.2.2:8333".parse().unwrap(); + let bad_peer: SocketAddr = "3.3.3.3:8333".parse().unwrap(); + + // Set different reputations + manager.update_reputation(good_peer, -20, "Very good").await; + manager.update_reputation(bad_peer, 80, "Very bad").await; + // neutral_peer has default score of 0 + + let all_peers = vec![good_peer, neutral_peer, bad_peer]; + let selected = manager.select_best_peers(all_peers, 2).await; + + // Should select good_peer first, then neutral_peer + assert_eq!(selected.len(), 2); + assert_eq!(selected[0], good_peer); + assert_eq!(selected[1], neutral_peer); + } +} diff --git a/dash-spv/src/network/reputation_tests.rs b/dash-spv/src/network/reputation_tests.rs deleted file mode 100644 index 9239b105..00000000 --- a/dash-spv/src/network/reputation_tests.rs +++ /dev/null @@ -1,118 +0,0 @@ -//! Unit tests for reputation system (in-module tests) - -#[cfg(test)] -mod tests { - use crate::storage::PersistentStorage; - - use super::super::*; - use std::net::SocketAddr; - - #[tokio::test] - async fn test_basic_reputation_operations() { - let mut manager = PeerReputationManager::new(); - let peer: SocketAddr = "127.0.0.1:8333".parse().unwrap(); - - // Initial score should be 0 - assert_eq!(manager.get_score(&peer).await, 0); - - // Test misbehavior - manager - .update_reputation(peer, misbehavior_scores::INVALID_MESSAGE, "Test invalid message") - .await; - assert_eq!(manager.get_score(&peer).await, 10); - - // Test positive behavior - manager.update_reputation(peer, positive_scores::VALID_HEADERS, "Test valid headers").await; - assert_eq!(manager.get_score(&peer).await, 5); - } - - #[tokio::test] - async fn test_banning_mechanism() { - let mut manager = PeerReputationManager::new(); - let peer: SocketAddr = "192.168.1.1:8333".parse().unwrap(); - - // Accumulate misbehavior - for i in 0..10 { - let banned = manager - .update_reputation( - peer, - misbehavior_scores::INVALID_MESSAGE, - &format!("Violation {}", i), - ) - .await; - - // Should be banned on the 10th violation (total score = 100) - if i == 9 { - assert!(banned); - } else { - assert!(!banned); - } - } - - assert!(manager.is_banned(&peer).await); - } - - #[tokio::test] - async fn test_reputation_persistence() { - let mut manager = PeerReputationManager::new(); - let peer1: SocketAddr = "10.0.0.1:8333".parse().unwrap(); - let peer2: SocketAddr = "10.0.0.2:8333".parse().unwrap(); - - // Set reputations - manager.update_reputation(peer1, -10, "Good peer").await; - manager.update_reputation(peer2, 50, "Bad peer").await; - - // Save and load - let temp_dir = tempfile::TempDir::new().unwrap(); - let peer_storage = PersistentPeerStorage::open(temp_dir.path()) - .await - .expect("Failed to open PersistentPeerStorage"); - manager.save_to_storage(&peer_storage).await.unwrap(); - - let mut new_manager = PeerReputationManager::new(); - new_manager.load_from_storage(&peer_storage).await.unwrap(); - - // Verify scores were preserved - assert_eq!(new_manager.get_score(&peer1).await, -10); - assert_eq!(new_manager.get_score(&peer2).await, 50); - } - - #[tokio::test] - async fn test_peer_selection() { - let mut manager = PeerReputationManager::new(); - - let good_peer: SocketAddr = "1.1.1.1:8333".parse().unwrap(); - let neutral_peer: SocketAddr = "2.2.2.2:8333".parse().unwrap(); - let bad_peer: SocketAddr = "3.3.3.3:8333".parse().unwrap(); - - // Set different reputations - manager.update_reputation(good_peer, -20, "Very good").await; - manager.update_reputation(bad_peer, 80, "Very bad").await; - // neutral_peer has default score of 0 - - let all_peers = vec![good_peer, neutral_peer, bad_peer]; - let selected = manager.select_best_peers(all_peers, 2).await; - - // Should select good_peer first, then neutral_peer - assert_eq!(selected.len(), 2); - assert_eq!(selected[0], good_peer); - assert_eq!(selected[1], neutral_peer); - } - - #[tokio::test] - async fn test_connection_tracking() { - let mut manager = PeerReputationManager::new(); - let peer: SocketAddr = "127.0.0.1:9999".parse().unwrap(); - - // Track connection attempts - manager.record_connection_attempt(peer).await; - manager.record_connection_attempt(peer).await; - manager.record_successful_connection(peer).await; - - let reputations = manager.get_all_reputations().await; - let rep = &reputations[&peer]; - - assert_eq!(rep.connection_attempts, 2); - assert_eq!(rep.successful_connections, 1); - } -} diff --git a/dash-spv/src/storage/peers.rs b/dash-spv/src/storage/peers.rs index 63e2a3dc..9d39baff 100644 --- a/dash-spv/src/storage/peers.rs +++ b/dash-spv/src/storage/peers.rs @@ -14,7 +14,7 @@ use dashcore::{ use crate::{ error::StorageResult, - network::reputation::PeerReputation, + network::PeerReputation, storage::{io::atomic_write, PersistentStorage}, StorageError, }; From e6cdeff147828c8e6026810f1b46cb22f69487f3 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Tue, 30 Dec 2025 19:25:07 +0000 Subject: [PATCH 4/8] visibility and removal of redundant code --- dash-spv/src/network/manager.rs | 6 +- dash-spv/src/network/reputation.rs | 126 +++++++++++------------------ 2 files changed, 48 insertions(+), 84 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index cd668407..d78ba7ae 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -23,9 +23,7 @@ use crate::network::addrv2::AddrV2Handler; use crate::network::constants::*; use crate::network::discovery::DnsDiscovery; use crate::network::pool::PeerPool; -use crate::network::reputation::{ - misbehavior_scores, positive_scores, PeerReputationManager, ReputationAware, -}; +use crate::network::reputation::{misbehavior_scores, positive_scores, PeerReputationManager}; use crate::network::{HandshakeManager, NetworkManager, Peer}; use crate::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage}; use crate::types::PeerInfo; @@ -83,7 +81,7 @@ impl PeerNetworkManager { let peer_store = PersistentPeerStorage::open(data_dir.clone()).await?; - let mut reputation_manager = PeerReputationManager::new(); + let mut reputation_manager = PeerReputationManager::default(); if let Err(e) = reputation_manager.load_from_storage(&peer_store).await { log::warn!("Failed to load peer reputation data: {}", e); diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index dee60511..7785297e 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -110,36 +110,36 @@ where pub struct PeerReputation { /// Current misbehavior score #[serde(deserialize_with = "clamp_peer_score")] - pub score: i32, + score: i32, /// Number of times this peer has been banned #[serde(deserialize_with = "clamp_peer_ban_count")] - pub ban_count: u32, + ban_count: u32, /// Time when the peer was banned (if currently banned) #[serde(skip)] - pub banned_until: Option, + banned_until: Option, /// Last time the reputation was updated #[serde(skip, default = "default_instant")] - pub last_update: Instant, + last_update: Instant, /// Total number of positive actions - pub positive_actions: u64, + positive_actions: u64, /// Total number of negative actions - pub negative_actions: u64, + negative_actions: u64, /// Connection count #[serde(deserialize_with = "clamp_peer_connection_attempts")] - pub connection_attempts: u64, + connection_attempts: u64, /// Successful connection count - pub successful_connections: u64, + successful_connections: u64, /// Last connection time #[serde(skip)] - pub last_connection: Option, + last_connection: Option, } impl Default for PeerReputation { @@ -159,13 +159,11 @@ impl Default for PeerReputation { } impl PeerReputation { - /// Check if the peer is currently banned - pub fn is_banned(&self) -> bool { + fn is_banned(&self) -> bool { self.banned_until.is_some_and(|until| Instant::now() < until) } - /// Get remaining ban time - pub fn ban_time_remaining(&self) -> Option { + fn ban_time_remaining(&self) -> Option { self.banned_until.and_then(|until| { let now = Instant::now(); if now < until { @@ -177,7 +175,7 @@ impl PeerReputation { } /// Apply reputation decay - pub fn apply_decay(&mut self) { + fn apply_decay(&mut self) { let now = Instant::now(); let elapsed = now - self.last_update; @@ -199,26 +197,12 @@ impl PeerReputation { } } -/// Peer reputation manager +#[derive(Default)] pub struct PeerReputationManager { - /// Reputation data for each peer reputations: HashMap, } -impl Default for PeerReputationManager { - fn default() -> Self { - Self::new() - } -} - impl PeerReputationManager { - /// Create a new reputation manager - pub fn new() -> Self { - Self { - reputations: HashMap::new(), - } - } - /// Update peer reputation pub async fn update_reputation( &mut self, @@ -326,6 +310,34 @@ impl PeerReputationManager { } } + pub async fn select_best_peers( + &mut self, + available_peers: Vec, + count: usize, + ) -> Vec { + let mut peer_scores = Vec::new(); + let reputations = &mut self.reputations; + + for peer in available_peers { + let reputation = reputations.entry(peer).or_default(); + reputation.apply_decay(); + + if !reputation.is_banned() { + peer_scores.push((peer, reputation.score)); + } + } + + // Sort by score (lower is better) + peer_scores.sort_by_key(|(_, score)| *score); + + // Return the best peers + peer_scores.into_iter().take(count).map(|(peer, _)| peer).collect() + } + + pub async fn should_connect_to_peer(&mut self, peer: &SocketAddr) -> bool { + !self.is_banned(peer).await + } + /// Save reputation data to persistent storage pub async fn save_to_storage( &mut self, @@ -377,52 +389,6 @@ impl PeerReputationManager { } } -/// Helper trait for reputation-aware peer selection -pub trait ReputationAware { - /// Select best peers based on reputation - fn select_best_peers( - &mut self, - available_peers: Vec, - count: usize, - ) -> impl std::future::Future> + Send; - - /// Check if we should connect to a peer based on reputation - fn should_connect_to_peer( - &mut self, - peer: &SocketAddr, - ) -> impl std::future::Future + Send; -} - -impl ReputationAware for PeerReputationManager { - async fn select_best_peers( - &mut self, - available_peers: Vec, - count: usize, - ) -> Vec { - let mut peer_scores = Vec::new(); - let reputations = &mut self.reputations; - - for peer in available_peers { - let reputation = reputations.entry(peer).or_default(); - reputation.apply_decay(); - - if !reputation.is_banned() { - peer_scores.push((peer, reputation.score)); - } - } - - // Sort by score (lower is better) - peer_scores.sort_by_key(|(_, score)| *score); - - // Return the best peers - peer_scores.into_iter().take(count).map(|(peer, _)| peer).collect() - } - - async fn should_connect_to_peer(&mut self, peer: &SocketAddr) -> bool { - !self.is_banned(peer).await - } -} - #[cfg(test)] mod tests { use crate::storage::PersistentStorage; @@ -432,7 +398,7 @@ mod tests { #[tokio::test] async fn test_basic_reputation_operations() { - let mut manager = PeerReputationManager::new(); + let mut manager = PeerReputationManager::default(); let peer: SocketAddr = "127.0.0.1:8333".parse().unwrap(); // Initial score should be 0 @@ -447,7 +413,7 @@ mod tests { #[tokio::test] async fn test_banning_mechanism() { - let mut manager = PeerReputationManager::new(); + let mut manager = PeerReputationManager::default(); let peer: SocketAddr = "192.168.1.1:8333".parse().unwrap(); // Accumulate misbehavior @@ -473,7 +439,7 @@ mod tests { #[tokio::test] async fn test_reputation_persistence() { - let mut manager = PeerReputationManager::new(); + let mut manager = PeerReputationManager::default(); let peer1: SocketAddr = "10.0.0.1:8333".parse().unwrap(); let peer2: SocketAddr = "10.0.0.2:8333".parse().unwrap(); @@ -488,7 +454,7 @@ mod tests { .expect("Failed to open PersistentPeerStorage"); manager.save_to_storage(&peer_storage).await.unwrap(); - let mut new_manager = PeerReputationManager::new(); + let mut new_manager = PeerReputationManager::default(); new_manager.load_from_storage(&peer_storage).await.unwrap(); // Verify scores were preserved @@ -498,7 +464,7 @@ mod tests { #[tokio::test] async fn test_peer_selection() { - let mut manager = PeerReputationManager::new(); + let mut manager = PeerReputationManager::default(); let good_peer: SocketAddr = "1.1.1.1:8333".parse().unwrap(); let neutral_peer: SocketAddr = "2.2.2.2:8333".parse().unwrap(); From 923712a2670ce5b3da644c1f3ff1a69f18e9c58e Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Tue, 30 Dec 2025 20:07:08 +0000 Subject: [PATCH 5/8] ReputationChangeReason replacing old constant values --- dash-spv/src/client/chainlock.rs | 5 +- dash-spv/src/client/queries.rs | 14 ---- dash-spv/src/network/manager.rs | 107 ++++++-------------------- dash-spv/src/network/mod.rs | 27 ++----- dash-spv/src/network/reputation.rs | 119 +++++++++++++++++------------ dash-spv/tests/peer_test.rs | 34 --------- 6 files changed, 102 insertions(+), 204 deletions(-) diff --git a/dash-spv/src/client/chainlock.rs b/dash-spv/src/client/chainlock.rs index 553f0b58..59632d8d 100644 --- a/dash-spv/src/client/chainlock.rs +++ b/dash-spv/src/client/chainlock.rs @@ -43,8 +43,7 @@ impl< .await { // Penalize the peer that relayed the invalid ChainLock - let reason = format!("Invalid ChainLock: {}", e); - let _ = self.network.penalize_last_message_peer_invalid_chainlock(&reason).await; + let _ = self.network.penalize_last_message_peer_invalid_chainlock().await; return Err(SpvError::Validation(e)); } } @@ -111,7 +110,7 @@ impl< tracing::warn!("{}", reason); // Ban the peer using the reputation system - let _ = self.network.penalize_last_message_peer_invalid_instantlock(&reason).await; + let _ = self.network.penalize_last_message_peer_invalid_instantlock().await; return Err(SpvError::Validation(e)); } diff --git a/dash-spv/src/client/queries.rs b/dash-spv/src/client/queries.rs index bb0be8c3..6adb2e27 100644 --- a/dash-spv/src/client/queries.rs +++ b/dash-spv/src/client/queries.rs @@ -42,20 +42,6 @@ impl< self.network.peer_count() } - /// Disconnect a specific peer. - pub async fn disconnect_peer(&self, addr: &std::net::SocketAddr, reason: &str) -> Result<()> { - // Cast network manager to PeerNetworkManager to access disconnect_peer - let network = self - .network - .as_any() - .downcast_ref::() - .ok_or_else(|| { - SpvError::Config("Network manager does not support peer disconnection".to_string()) - })?; - - network.disconnect_peer(addr, reason).await - } - // ============ Masternode Queries ============ /// Get a reference to the masternode list engine. diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index d78ba7ae..df05560c 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -23,7 +23,7 @@ use crate::network::addrv2::AddrV2Handler; use crate::network::constants::*; use crate::network::discovery::DnsDiscovery; use crate::network::pool::PeerPool; -use crate::network::reputation::{misbehavior_scores, positive_scores, PeerReputationManager}; +use crate::network::reputation::{PeerReputationManager, ReputationChangeReason}; use crate::network::{HandshakeManager, NetworkManager, Peer}; use crate::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage}; use crate::types::PeerInfo; @@ -249,11 +249,7 @@ impl PeerNetworkManager { reputation_manager .lock() .await - .update_reputation( - addr, - misbehavior_scores::INVALID_MESSAGE, - "Handshake failed", - ) + .update_reputation(addr, ReputationChangeReason::InvalidMessage) .await; // For handshake failures, try again later tokio::time::sleep(RECONNECT_DELAY).await; @@ -266,11 +262,7 @@ impl PeerNetworkManager { reputation_manager .lock() .await - .update_reputation( - addr, - misbehavior_scores::TIMEOUT / 2, - "Connection failed", - ) + .update_reputation(addr, ReputationChangeReason::Timeout) .await; } } @@ -493,11 +485,7 @@ impl PeerNetworkManager { reputation_manager .lock() .await - .update_reputation( - addr, - misbehavior_scores::TIMEOUT, - "Read timeout", - ) + .update_reputation(addr, ReputationChangeReason::Timeout) .await; continue; } @@ -519,8 +507,7 @@ impl PeerNetworkManager { .await .update_reputation( addr, - misbehavior_scores::INVALID_TRANSACTION, - "Invalid transaction type in block", + ReputationChangeReason::InvalidTransaction, ) .await; } else if error_msg @@ -579,7 +566,7 @@ impl PeerNetworkManager { reputation_manager .lock() .await - .update_reputation(addr, positive_scores::LONG_UPTIME, "Long connection uptime") + .update_reputation(addr, ReputationChangeReason::LongUptime) .await; } }); @@ -732,8 +719,7 @@ impl PeerNetworkManager { // Update reputation for ping failure reputation_manager.lock().await.update_reputation( addr, - misbehavior_scores::TIMEOUT, - "Ping failed", + ReputationChangeReason::Timeout, ).await; } } @@ -945,10 +931,7 @@ impl PeerNetworkManager { } /// Disconnect a specific peer - pub async fn disconnect_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> { - log::info!("Disconnecting peer {} - reason: {}", addr, reason); - - // Remove the peer + pub async fn disconnect_peer(&self, addr: &SocketAddr) -> Result<(), Error> { self.pool.remove_peer(addr).await; Ok(()) @@ -980,27 +963,6 @@ impl PeerNetworkManager { *last_peer } - /// Ban a specific peer manually - pub async fn ban_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> { - log::info!("Manually banning peer {} - reason: {}", addr, reason); - - // Disconnect the peer first - self.disconnect_peer(addr, reason).await?; - - // Update reputation to trigger ban - self.reputation_manager - .lock() - .await - .update_reputation( - *addr, - misbehavior_scores::INVALID_HEADER * 2, // Severe penalty - reason, - ) - .await; - - Ok(()) - } - /// Unban a specific peer pub async fn unban_peer(&self, addr: &SocketAddr) { self.reputation_manager.lock().await.unban_peer(addr).await; @@ -1116,39 +1078,24 @@ impl NetworkManager for PeerNetworkManager { async fn penalize_last_message_peer( &self, - score_change: i32, - reason: &str, + reason: ReputationChangeReason, ) -> NetworkResult<()> { // Get the last peer that sent us a message if let Some(addr) = self.get_last_message_peer().await { - self.reputation_manager - .lock() - .await - .update_reputation(addr, score_change, reason) - .await; + self.reputation_manager.lock().await.update_reputation(addr, reason).await; } Ok(()) } - async fn penalize_last_message_peer_invalid_chainlock( - &self, - reason: &str, - ) -> NetworkResult<()> { + async fn penalize_last_message_peer_invalid_chainlock(&self) -> NetworkResult<()> { if let Some(addr) = self.get_last_message_peer().await { - match self.disconnect_peer(&addr, reason).await { + match self.disconnect_peer(&addr).await { Ok(()) => { - log::warn!( - "Peer {} disconnected for invalid ChainLock enforcement: {}", - addr, - reason - ); + log::warn!("Peer {addr} disconnected for invalid ChainLock enforcement",); } Err(err) => { log::error!( - "Failed to disconnect peer {} after invalid ChainLock enforcement ({}): {}", - addr, - reason, - err + "Failed to disconnect peer {addr} after invalid ChainLock enforcement: {err}", ); } } @@ -1157,52 +1104,42 @@ impl NetworkManager for PeerNetworkManager { self.reputation_manager .lock() .await - .update_reputation(addr, misbehavior_scores::INVALID_CHAINLOCK, reason) + .update_reputation(addr, ReputationChangeReason::InvalidChainLock) .await; // Short ban: 10 minutes for relaying invalid ChainLock self.reputation_manager .lock() .await - .temporary_ban_peer(addr, Duration::from_secs(10 * 60), reason) + .temporary_ban_peer(addr, Duration::from_secs(10 * 60)) .await; } Ok(()) } - async fn penalize_last_message_peer_invalid_instantlock( - &self, - reason: &str, - ) -> NetworkResult<()> { + async fn penalize_last_message_peer_invalid_instantlock(&self) -> NetworkResult<()> { if let Some(addr) = self.get_last_message_peer().await { // Apply misbehavior score and a short temporary ban self.reputation_manager .lock() .await - .update_reputation(addr, misbehavior_scores::INVALID_INSTANTLOCK, reason) + .update_reputation(addr, ReputationChangeReason::InvalidInstantLock) .await; // Short ban: 10 minutes for relaying invalid InstantLock self.reputation_manager .lock() .await - .temporary_ban_peer(addr, Duration::from_secs(10 * 60), reason) + .temporary_ban_peer(addr, Duration::from_secs(10 * 60)) .await; - match self.disconnect_peer(&addr, reason).await { + match self.disconnect_peer(&addr).await { Ok(()) => { - log::warn!( - "Peer {} disconnected for invalid InstantLock enforcement: {}", - addr, - reason - ); + log::warn!("Peer {addr} disconnected for invalid InstantLock enforcement",); } Err(err) => { log::error!( - "Failed to disconnect peer {} after invalid InstantLock enforcement ({}): {}", - addr, - reason, - err + "Failed to disconnect peer {addr} after invalid InstantLock enforcement: {err}" ); } } diff --git a/dash-spv/src/network/mod.rs b/dash-spv/src/network/mod.rs index 669f633f..f2bcaa56 100644 --- a/dash-spv/src/network/mod.rs +++ b/dash-spv/src/network/mod.rs @@ -17,7 +17,7 @@ pub mod mock; use async_trait::async_trait; -use crate::error::NetworkResult; +use crate::{error::NetworkResult, network::reputation::ReputationChangeReason}; use dashcore::network::message::NetworkMessage; use dashcore::BlockHash; @@ -130,33 +130,18 @@ pub trait NetworkManager: Send + Sync { /// Default implementation is a no-op for managers without reputation. async fn penalize_last_message_peer( &self, - _score_change: i32, - _reason: &str, + _reason: ReputationChangeReason, ) -> NetworkResult<()> { Ok(()) } /// Convenience: penalize last peer for an invalid ChainLock. - async fn penalize_last_message_peer_invalid_chainlock( - &self, - reason: &str, - ) -> NetworkResult<()> { - self.penalize_last_message_peer( - crate::network::reputation::misbehavior_scores::INVALID_CHAINLOCK, - reason, - ) - .await + async fn penalize_last_message_peer_invalid_chainlock(&self) -> NetworkResult<()> { + self.penalize_last_message_peer(ReputationChangeReason::InvalidChainLock).await } /// Convenience: penalize last peer for an invalid InstantLock. - async fn penalize_last_message_peer_invalid_instantlock( - &self, - reason: &str, - ) -> NetworkResult<()> { - self.penalize_last_message_peer( - crate::network::reputation::misbehavior_scores::INVALID_INSTANTLOCK, - reason, - ) - .await + async fn penalize_last_message_peer_invalid_instantlock(&self) -> NetworkResult<()> { + self.penalize_last_message_peer(ReputationChangeReason::InvalidInstantLock).await } } diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index 7785297e..3c3296df 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -12,32 +12,54 @@ use std::time::{Duration, Instant}; use crate::storage::{PeerStorage, PersistentPeerStorage}; -/// Misbehavior score thresholds for different violations -pub mod misbehavior_scores { - /// Invalid message format or protocol violation - pub const INVALID_MESSAGE: i32 = 10; - - /// Invalid block header - pub const INVALID_HEADER: i32 = 50; - - /// Timeout or slow response - pub const TIMEOUT: i32 = 5; - - /// Invalid transaction - pub const INVALID_TRANSACTION: i32 = 20; - - /// Invalid ChainLock - pub const INVALID_CHAINLOCK: i32 = 40; - - /// Invalid InstantLock - pub const INVALID_INSTANTLOCK: i32 = 35; +pub enum ReputationChangeReason { + // Negative Changes + InvalidMessage, + InvalidHeader, + Timeout, + InvalidTransaction, + InvalidChainLock, + InvalidInstantLock, + + // Positive changes + LongUptime, + + // Other + Other(i32, String), } -/// Positive behavior scores -pub mod positive_scores { +impl ReputationChangeReason { + pub fn score(&self) -> i32 { + // This score represents the missbehaviour score change, that means + // the higher the score, the more severe the violation. + match self { + ReputationChangeReason::InvalidMessage => 10, + ReputationChangeReason::InvalidHeader => 50, + ReputationChangeReason::Timeout => 5, + ReputationChangeReason::InvalidTransaction => 20, + ReputationChangeReason::InvalidChainLock => 40, + ReputationChangeReason::InvalidInstantLock => 35, + ReputationChangeReason::LongUptime => -5, + ReputationChangeReason::Other(score, _) => *score, + } + } +} - /// Long uptime connection - pub const LONG_UPTIME: i32 = -5; +impl std::fmt::Display for ReputationChangeReason { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + ReputationChangeReason::InvalidMessage => { + write!(f, "Invalid message format or protocol violation") + } + ReputationChangeReason::InvalidHeader => write!(f, "Invalid block header"), + ReputationChangeReason::Timeout => write!(f, "Timeout or slow response"), + ReputationChangeReason::InvalidTransaction => write!(f, "Invalid transaction"), + ReputationChangeReason::InvalidChainLock => write!(f, "Invalid ChainLock"), + ReputationChangeReason::InvalidInstantLock => write!(f, "Invalid InstantLock"), + ReputationChangeReason::LongUptime => write!(f, "Long uptime"), + ReputationChangeReason::Other(_, reason) => write!(f, "{}", reason), + } + } } /// Ban duration for misbehaving peers @@ -207,8 +229,7 @@ impl PeerReputationManager { pub async fn update_reputation( &mut self, peer: SocketAddr, - score_change: i32, - reason: &str, + reason: ReputationChangeReason, ) -> bool { let reputation = self.reputations.entry(peer).or_default(); @@ -218,12 +239,12 @@ impl PeerReputationManager { // Update score let old_score = reputation.score; reputation.score = - (reputation.score + score_change).clamp(MIN_MISBEHAVIOR_SCORE, MAX_MISBEHAVIOR_SCORE); + (reputation.score + reason.score()).clamp(MIN_MISBEHAVIOR_SCORE, MAX_MISBEHAVIOR_SCORE); // Track positive/negative actions - if score_change > 0 { + if reason.score() > 0 { reputation.negative_actions += 1; - } else if score_change < 0 { + } else if reason.score() < 0 { reputation.positive_actions += 1; } @@ -242,13 +263,13 @@ impl PeerReputationManager { } // Log significant changes - if score_change.abs() >= 10 || should_ban { + if reason.score().abs() >= 10 || should_ban { log::info!( "Peer {} reputation changed: {} -> {} (change: {}, reason: {})", peer, old_score, reputation.score, - score_change, + reason.score(), reason ); } @@ -269,7 +290,7 @@ impl PeerReputationManager { /// Temporarily ban a peer for a specified duration, regardless of score. /// This can be used for critical protocol violations (e.g., invalid ChainLocks). - pub async fn temporary_ban_peer(&mut self, peer: SocketAddr, duration: Duration, reason: &str) { + pub async fn temporary_ban_peer(&mut self, peer: SocketAddr, duration: Duration) { let reputations = &mut self.reputations; let reputation = reputations.entry(peer).or_default(); @@ -277,11 +298,10 @@ impl PeerReputationManager { reputation.ban_count += 1; log::warn!( - "Peer {} temporarily banned for {:?} (ban #{}, reason: {})", + "Peer {} temporarily banned for {:?} (ban #{})", peer, duration, reputation.ban_count, - reason ); } @@ -402,12 +422,11 @@ mod tests { let peer: SocketAddr = "127.0.0.1:8333".parse().unwrap(); // Initial score should be 0 + assert_eq!(manager.select_best_peers(vec![peer], 1).await[0], peer); assert_eq!(manager.reputations.get(&peer).expect("Peer not found").score, 0); // Test misbehavior - manager - .update_reputation(peer, misbehavior_scores::INVALID_MESSAGE, "Test invalid message") - .await; + manager.update_reputation(peer, ReputationChangeReason::InvalidMessage).await; assert_eq!(manager.reputations.get(&peer).expect("Peer not found").score, 10); } @@ -418,13 +437,8 @@ mod tests { // Accumulate misbehavior for i in 0..10 { - let banned = manager - .update_reputation( - peer, - misbehavior_scores::INVALID_MESSAGE, - &format!("Violation {}", i), - ) - .await; + let banned = + manager.update_reputation(peer, ReputationChangeReason::InvalidMessage).await; // Should be banned on the 10th violation (total score = 100) if i == 9 { @@ -444,8 +458,12 @@ mod tests { let peer2: SocketAddr = "10.0.0.2:8333".parse().unwrap(); // Set reputations - manager.update_reputation(peer1, -10, "Good peer").await; - manager.update_reputation(peer2, 50, "Bad peer").await; + manager + .update_reputation(peer1, ReputationChangeReason::Other(-10, "Good peer".to_string())) + .await; + manager + .update_reputation(peer2, ReputationChangeReason::Other(50, "Bad peer".to_string())) + .await; // Save and load let temp_dir = tempfile::TempDir::new().unwrap(); @@ -471,8 +489,15 @@ mod tests { let bad_peer: SocketAddr = "3.3.3.3:8333".parse().unwrap(); // Set different reputations - manager.update_reputation(good_peer, -20, "Very good").await; - manager.update_reputation(bad_peer, 80, "Very bad").await; + manager + .update_reputation( + good_peer, + ReputationChangeReason::Other(-20, "Very good".to_string()), + ) + .await; + manager + .update_reputation(bad_peer, ReputationChangeReason::Other(80, "Very bad".to_string())) + .await; // neutral_peer has default score of 0 let all_peers = vec![good_peer, neutral_peer, bad_peer]; diff --git a/dash-spv/tests/peer_test.rs b/dash-spv/tests/peer_test.rs index f15adada..54da1d55 100644 --- a/dash-spv/tests/peer_test.rs +++ b/dash-spv/tests/peer_test.rs @@ -139,40 +139,6 @@ async fn test_peer_persistence() { } } -#[tokio::test] -async fn test_peer_disconnection() { - let _ = env_logger::builder().is_test(true).try_init(); - - let temp_dir = TempDir::new().unwrap(); - let temp_path = temp_dir.path().to_path_buf(); - let mut config = create_test_config(Network::Regtest, Some(temp_dir)); - - // Add manual test peers (would need actual regtest nodes running) - config.peers = vec!["127.0.0.1:19899".parse().unwrap(), "127.0.0.1:19898".parse().unwrap()]; - - // Create network manager - let network_manager = PeerNetworkManager::new(&config).await.unwrap(); - - // Create storage manager - let storage_manager = DiskStorageManager::new(temp_path).await.unwrap(); - - // Create wallet manager - let wallet = Arc::new(RwLock::new(WalletManager::::new())); - - let client = - DashSpvClient::new(config, network_manager, storage_manager, wallet).await.unwrap(); - - // Note: This test would require actual regtest nodes running - // For now, we just test that the API works - let test_addr: SocketAddr = "127.0.0.1:19899".parse().unwrap(); - - // Try to disconnect (will fail if not connected, but tests the API) - match client.disconnect_peer(&test_addr, "Test disconnection").await { - Ok(_) => println!("Disconnected peer {}", test_addr), - Err(e) => println!("Expected error disconnecting non-existent peer: {}", e), - } -} - #[tokio::test] async fn test_max_peer_limit() { use dash_spv::network::constants::MAX_PEERS; From 4cd7ba9a169804327fa596cb475dec2810a71bc3 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Tue, 30 Dec 2025 20:58:09 +0000 Subject: [PATCH 6/8] positive and negative4 action fields removed --- dash-spv/src/network/reputation.rs | 33 +----------------------------- 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index 3c3296df..a03d7564 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -146,12 +146,6 @@ pub struct PeerReputation { #[serde(skip, default = "default_instant")] last_update: Instant, - /// Total number of positive actions - positive_actions: u64, - - /// Total number of negative actions - negative_actions: u64, - /// Connection count #[serde(deserialize_with = "clamp_peer_connection_attempts")] connection_attempts: u64, @@ -171,8 +165,6 @@ impl Default for PeerReputation { ban_count: 0, banned_until: None, last_update: default_instant(), - positive_actions: 0, - negative_actions: 0, connection_attempts: 0, successful_connections: 0, last_connection: None, @@ -241,13 +233,6 @@ impl PeerReputationManager { reputation.score = (reputation.score + reason.score()).clamp(MIN_MISBEHAVIOR_SCORE, MAX_MISBEHAVIOR_SCORE); - // Track positive/negative actions - if reason.score() > 0 { - reputation.negative_actions += 1; - } else if reason.score() < 0 { - reputation.positive_actions += 1; - } - // Check if peer should be banned let should_ban = reputation.score >= MAX_MISBEHAVIOR_SCORE && !reputation.is_banned(); if should_ban { @@ -372,39 +357,23 @@ impl PeerReputationManager { storage: &PersistentPeerStorage, ) -> std::io::Result<()> { let data = storage.load_peers_reputation().await.map_err(std::io::Error::other)?; + log::info!("Loaded reputation data for {} peers", data.len()); let reputations = &mut self.reputations; - let mut loaded_count = 0; - let mut skipped_count = 0; for (addr, mut reputation) in data { // Validate successful connections don't exceed attempts reputation.successful_connections = reputation.successful_connections.min(reputation.connection_attempts); - // Skip entry if data appears corrupted - if reputation.positive_actions > MAX_ACTION_COUNT - || reputation.negative_actions > MAX_ACTION_COUNT - { - log::warn!("Skipping peer {} with potentially corrupted action counts", addr); - skipped_count += 1; - continue; - } - // Apply initial decay based on ban count if reputation.ban_count > 0 { reputation.score = reputation.score.max(50); // Start with higher score for previously banned peers } reputations.insert(addr, reputation); - loaded_count += 1; } - log::info!( - "Loaded reputation data for {} peers (skipped {} corrupted entries)", - loaded_count, - skipped_count - ); Ok(()) } } From c56321ed0e00eccd13ef59f783500aa38906ad42 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Tue, 30 Dec 2025 21:03:10 +0000 Subject: [PATCH 7/8] connection stats removed from PeerReputation --- dash-spv/src/network/manager.rs | 7 ------ dash-spv/src/network/reputation.rs | 34 ------------------------------ 2 files changed, 41 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index df05560c..3cd641da 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -212,13 +212,6 @@ impl PeerNetworkManager { Ok(_) => { log::info!("Successfully connected to {}", addr); - // Record successful connection - reputation_manager - .lock() - .await - .record_successful_connection(addr) - .await; - // Add to pool if let Err(e) = pool.add_peer(addr, peer).await { log::error!("Failed to add peer to pool: {}", e); diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index a03d7564..0656135a 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -79,8 +79,6 @@ const MIN_MISBEHAVIOR_SCORE: i32 = -50; const MAX_BAN_COUNT: u32 = 1000; -const MAX_ACTION_COUNT: u64 = 1_000_000; - fn default_instant() -> Instant { Instant::now() } @@ -116,17 +114,6 @@ where Ok(v) } -fn clamp_peer_connection_attempts<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let mut v = u64::deserialize(deserializer)?; - - v = v.min(MAX_ACTION_COUNT); - - Ok(v) -} - /// Peer reputation entry #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerReputation { @@ -146,13 +133,6 @@ pub struct PeerReputation { #[serde(skip, default = "default_instant")] last_update: Instant, - /// Connection count - #[serde(deserialize_with = "clamp_peer_connection_attempts")] - connection_attempts: u64, - - /// Successful connection count - successful_connections: u64, - /// Last connection time #[serde(skip)] last_connection: Option, @@ -165,8 +145,6 @@ impl Default for PeerReputation { ban_count: 0, banned_until: None, last_update: default_instant(), - connection_attempts: 0, - successful_connections: 0, last_connection: None, } } @@ -294,17 +272,9 @@ impl PeerReputationManager { pub async fn record_connection_attempt(&mut self, peer: SocketAddr) { let reputations = &mut self.reputations; let reputation = reputations.entry(peer).or_default(); - reputation.connection_attempts += 1; reputation.last_connection = Some(Instant::now()); } - /// Record a successful connection - pub async fn record_successful_connection(&mut self, peer: SocketAddr) { - let reputations = &mut self.reputations; - let reputation = reputations.entry(peer).or_default(); - reputation.successful_connections += 1; - } - /// Clear banned status for a peer (admin function) pub async fn unban_peer(&mut self, peer: &SocketAddr) { let reputations = &mut self.reputations; @@ -362,10 +332,6 @@ impl PeerReputationManager { let reputations = &mut self.reputations; for (addr, mut reputation) in data { - // Validate successful connections don't exceed attempts - reputation.successful_connections = - reputation.successful_connections.min(reputation.connection_attempts); - // Apply initial decay based on ban count if reputation.ban_count > 0 { reputation.score = reputation.score.max(50); // Start with higher score for previously banned peers From 78a56000dfdb472be6567bc8f52980aa39a55252 Mon Sep 17 00:00:00 2001 From: Borja Castellano Date: Tue, 30 Dec 2025 22:15:39 +0000 Subject: [PATCH 8/8] PeerReputationManager always tries to load in the constructor --- dash-spv/src/network/manager.rs | 6 +-- dash-spv/src/network/reputation.rs | 59 +++++++++++++++--------------- 2 files changed, 31 insertions(+), 34 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 3cd641da..ce0e2166 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -81,11 +81,7 @@ impl PeerNetworkManager { let peer_store = PersistentPeerStorage::open(data_dir.clone()).await?; - let mut reputation_manager = PeerReputationManager::default(); - - if let Err(e) = reputation_manager.load_from_storage(&peer_store).await { - log::warn!("Failed to load peer reputation data: {}", e); - } + let reputation_manager = PeerReputationManager::load_or_new(&peer_store).await; // Determine exclusive mode: either explicitly requested or peers were provided let exclusive_mode = config.restrict_to_configured_peers || !config.peers.is_empty(); diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index 0656135a..4d78b308 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -189,12 +189,28 @@ impl PeerReputation { } } -#[derive(Default)] pub struct PeerReputationManager { reputations: HashMap, } impl PeerReputationManager { + pub async fn load_or_new(storage: &PersistentPeerStorage) -> Self { + let mut reputations = + storage.load_peers_reputation().await.unwrap_or_else(|_| HashMap::new()); + + log::info!("Loaded reputation data for {} peers", reputations.len()); + + for (_, reputation) in reputations.iter_mut() { + if reputation.ban_count > 0 { + reputation.score = reputation.score.max(50); // Start with higher score for previously banned peers + } + } + + Self { + reputations, + } + } + /// Update peer reputation pub async fn update_reputation( &mut self, @@ -320,28 +336,6 @@ impl PeerReputationManager { ) -> std::io::Result<()> { storage.save_peers_reputation(&self.reputations).await.map_err(std::io::Error::other) } - - /// Load reputation data from persistent storage - pub async fn load_from_storage( - &mut self, - storage: &PersistentPeerStorage, - ) -> std::io::Result<()> { - let data = storage.load_peers_reputation().await.map_err(std::io::Error::other)?; - log::info!("Loaded reputation data for {} peers", data.len()); - - let reputations = &mut self.reputations; - - for (addr, mut reputation) in data { - // Apply initial decay based on ban count - if reputation.ban_count > 0 { - reputation.score = reputation.score.max(50); // Start with higher score for previously banned peers - } - - reputations.insert(addr, reputation); - } - - Ok(()) - } } #[cfg(test)] @@ -351,9 +345,17 @@ mod tests { use super::*; use std::net::SocketAddr; + async fn build_peer_reputation_manager() -> PeerReputationManager { + let temp_dir = tempfile::TempDir::new().unwrap(); + let peer_storage = PersistentPeerStorage::open(temp_dir.path()) + .await + .expect("Failed to open PersistentPeerStorage"); + PeerReputationManager::load_or_new(&peer_storage).await + } + #[tokio::test] async fn test_basic_reputation_operations() { - let mut manager = PeerReputationManager::default(); + let mut manager = build_peer_reputation_manager().await; let peer: SocketAddr = "127.0.0.1:8333".parse().unwrap(); // Initial score should be 0 @@ -367,7 +369,7 @@ mod tests { #[tokio::test] async fn test_banning_mechanism() { - let mut manager = PeerReputationManager::default(); + let mut manager = build_peer_reputation_manager().await; let peer: SocketAddr = "192.168.1.1:8333".parse().unwrap(); // Accumulate misbehavior @@ -388,7 +390,7 @@ mod tests { #[tokio::test] async fn test_reputation_persistence() { - let mut manager = PeerReputationManager::default(); + let mut manager = build_peer_reputation_manager().await; let peer1: SocketAddr = "10.0.0.1:8333".parse().unwrap(); let peer2: SocketAddr = "10.0.0.2:8333".parse().unwrap(); @@ -407,8 +409,7 @@ mod tests { .expect("Failed to open PersistentPeerStorage"); manager.save_to_storage(&peer_storage).await.unwrap(); - let mut new_manager = PeerReputationManager::default(); - new_manager.load_from_storage(&peer_storage).await.unwrap(); + let new_manager = PeerReputationManager::load_or_new(&peer_storage).await; // Verify scores were preserved assert_eq!(new_manager.reputations.get(&peer1).expect("Peer not found").score, -10); @@ -417,7 +418,7 @@ mod tests { #[tokio::test] async fn test_peer_selection() { - let mut manager = PeerReputationManager::default(); + let mut manager = build_peer_reputation_manager().await; let good_peer: SocketAddr = "1.1.1.1:8333".parse().unwrap(); let neutral_peer: SocketAddr = "2.2.2.2:8333".parse().unwrap();