diff --git a/dash-spv/src/client/chainlock.rs b/dash-spv/src/client/chainlock.rs index 553f0b58..59632d8d 100644 --- a/dash-spv/src/client/chainlock.rs +++ b/dash-spv/src/client/chainlock.rs @@ -43,8 +43,7 @@ impl< .await { // Penalize the peer that relayed the invalid ChainLock - let reason = format!("Invalid ChainLock: {}", e); - let _ = self.network.penalize_last_message_peer_invalid_chainlock(&reason).await; + let _ = self.network.penalize_last_message_peer_invalid_chainlock().await; return Err(SpvError::Validation(e)); } } @@ -111,7 +110,7 @@ impl< tracing::warn!("{}", reason); // Ban the peer using the reputation system - let _ = self.network.penalize_last_message_peer_invalid_instantlock(&reason).await; + let _ = self.network.penalize_last_message_peer_invalid_instantlock().await; return Err(SpvError::Validation(e)); } diff --git a/dash-spv/src/client/queries.rs b/dash-spv/src/client/queries.rs index bb0be8c3..6adb2e27 100644 --- a/dash-spv/src/client/queries.rs +++ b/dash-spv/src/client/queries.rs @@ -42,20 +42,6 @@ impl< self.network.peer_count() } - /// Disconnect a specific peer. - pub async fn disconnect_peer(&self, addr: &std::net::SocketAddr, reason: &str) -> Result<()> { - // Cast network manager to PeerNetworkManager to access disconnect_peer - let network = self - .network - .as_any() - .downcast_ref::() - .ok_or_else(|| { - SpvError::Config("Network manager does not support peer disconnection".to_string()) - })?; - - network.disconnect_peer(addr, reason).await - } - // ============ Masternode Queries ============ /// Get a reference to the masternode list engine. diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index a268f8b7..ce0e2166 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -1,6 +1,6 @@ //! Peer network manager for SPV client -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -23,9 +23,7 @@ use crate::network::addrv2::AddrV2Handler; use crate::network::constants::*; use crate::network::discovery::DnsDiscovery; use crate::network::pool::PeerPool; -use crate::network::reputation::{ - misbehavior_scores, positive_scores, PeerReputationManager, ReputationAware, -}; +use crate::network::reputation::{PeerReputationManager, ReputationChangeReason}; use crate::network::{HandshakeManager, NetworkManager, Peer}; use crate::storage::{PeerStorage, PersistentPeerStorage, PersistentStorage}; use crate::types::PeerInfo; @@ -41,7 +39,7 @@ pub struct PeerNetworkManager { /// Peer persistence peer_store: Arc, /// Peer reputation manager - reputation_manager: Arc, + reputation_manager: Arc>, /// Network type network: Network, /// Shutdown token @@ -83,11 +81,7 @@ impl PeerNetworkManager { let peer_store = PersistentPeerStorage::open(data_dir.clone()).await?; - let reputation_manager = Arc::new(PeerReputationManager::new()); - - if let Err(e) = reputation_manager.load_from_storage(&peer_store).await { - log::warn!("Failed to load peer reputation data: {}", e); - } + let reputation_manager = PeerReputationManager::load_or_new(&peer_store).await; // Determine exclusive mode: either explicitly requested or peers were provided let exclusive_mode = config.restrict_to_configured_peers || !config.peers.is_empty(); @@ -97,7 +91,7 @@ impl PeerNetworkManager { discovery: Arc::new(discovery), addrv2_handler: Arc::new(AddrV2Handler::new()), peer_store: Arc::new(peer_store), - reputation_manager, + reputation_manager: Arc::new(Mutex::new(reputation_manager)), network: config.network, shutdown_token: CancellationToken::new(), message_tx, @@ -172,7 +166,7 @@ impl PeerNetworkManager { /// Connect to a specific peer async fn connect_to_peer(&self, addr: SocketAddr) { // Check reputation first - if !self.reputation_manager.should_connect_to_peer(&addr).await { + if !self.reputation_manager.lock().await.should_connect_to_peer(&addr).await { log::warn!("Not connecting to {} due to bad reputation", addr); return; } @@ -188,7 +182,7 @@ impl PeerNetworkManager { } // Record connection attempt - self.reputation_manager.record_connection_attempt(addr).await; + self.reputation_manager.lock().await.record_connection_attempt(addr).await; let pool = self.pool.clone(); let network = self.network; @@ -214,9 +208,6 @@ impl PeerNetworkManager { Ok(_) => { log::info!("Successfully connected to {}", addr); - // Record successful connection - reputation_manager.record_successful_connection(addr).await; - // Add to pool if let Err(e) = pool.add_peer(addr, peer).await { log::error!("Failed to add peer to pool: {}", e); @@ -245,11 +236,9 @@ impl PeerNetworkManager { log::warn!("Handshake failed with {}: {}", addr, e); // Update reputation for handshake failure reputation_manager - .update_reputation( - addr, - misbehavior_scores::INVALID_MESSAGE, - "Handshake failed", - ) + .lock() + .await + .update_reputation(addr, ReputationChangeReason::InvalidMessage) .await; // For handshake failures, try again later tokio::time::sleep(RECONNECT_DELAY).await; @@ -260,11 +249,9 @@ impl PeerNetworkManager { log::debug!("Failed to connect to {}: {}", addr, e); // Minor reputation penalty for connection failure reputation_manager - .update_reputation( - addr, - misbehavior_scores::TIMEOUT / 2, - "Connection failed", - ) + .lock() + .await + .update_reputation(addr, ReputationChangeReason::Timeout) .await; } } @@ -278,7 +265,7 @@ impl PeerNetworkManager { message_tx: mpsc::Sender<(SocketAddr, NetworkMessage)>, addrv2_handler: Arc, shutdown_token: CancellationToken, - reputation_manager: Arc, + reputation_manager: Arc>, connected_peer_count: Arc, ) { tokio::spawn(async move { @@ -485,11 +472,9 @@ impl PeerNetworkManager { log::debug!("Timeout reading from {}, continuing...", addr); // Minor reputation penalty for timeout reputation_manager - .update_reputation( - addr, - misbehavior_scores::TIMEOUT, - "Read timeout", - ) + .lock() + .await + .update_reputation(addr, ReputationChangeReason::Timeout) .await; continue; } @@ -507,10 +492,11 @@ impl PeerNetworkManager { ); // Reputation penalty for invalid data reputation_manager + .lock() + .await .update_reputation( addr, - misbehavior_scores::INVALID_TRANSACTION, - "Invalid transaction type in block", + ReputationChangeReason::InvalidTransaction, ) .await; } else if error_msg @@ -567,7 +553,9 @@ impl PeerNetworkManager { if conn_duration > Duration::from_secs(3600) { // 1 hour reputation_manager - .update_reputation(addr, positive_scores::LONG_UPTIME, "Long connection uptime") + .lock() + .await + .update_reputation(addr, ReputationChangeReason::LongUptime) .await; } }); @@ -644,7 +632,7 @@ impl PeerNetworkManager { let known = addrv2_handler.get_known_addresses().await; let needed = TARGET_PEERS.saturating_sub(count); // Select best peers based on reputation - let best_peers = reputation_manager.select_best_peers(known, needed * 2).await; + let best_peers = reputation_manager.lock().await.select_best_peers(known, needed * 2).await; let mut attempted = 0; for addr in best_peers { @@ -718,10 +706,9 @@ impl PeerNetworkManager { if let Err(e) = peer_guard.send_ping().await { log::error!("Failed to ping {}: {}", addr, e); // Update reputation for ping failure - reputation_manager.update_reputation( + reputation_manager.lock().await.update_reputation( addr, - misbehavior_scores::TIMEOUT, - "Ping failed", + ReputationChangeReason::Timeout, ).await; } } @@ -738,7 +725,7 @@ impl PeerNetworkManager { } // Save reputation data periodically - if let Err(e) = reputation_manager.save_to_storage(&peer_store).await { + if let Err(e) = reputation_manager.lock().await.save_to_storage(&peer_store).await { log::warn!("Failed to save reputation data: {}", e); } } @@ -933,21 +920,12 @@ impl PeerNetworkManager { } /// Disconnect a specific peer - pub async fn disconnect_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> { - log::info!("Disconnecting peer {} - reason: {}", addr, reason); - - // Remove the peer + pub async fn disconnect_peer(&self, addr: &SocketAddr) -> Result<(), Error> { self.pool.remove_peer(addr).await; Ok(()) } - /// Get reputation information for all peers - pub async fn get_peer_reputations(&self) -> HashMap { - let reputations = self.reputation_manager.get_all_reputations().await; - reputations.into_iter().map(|(addr, rep)| (addr, (rep.score, rep.is_banned()))).collect() - } - /// Get the last peer that sent us a message pub async fn get_last_message_peer(&self) -> Option { let last_peer = self.last_message_peer.lock().await; @@ -974,28 +952,9 @@ impl PeerNetworkManager { *last_peer } - /// Ban a specific peer manually - pub async fn ban_peer(&self, addr: &SocketAddr, reason: &str) -> Result<(), Error> { - log::info!("Manually banning peer {} - reason: {}", addr, reason); - - // Disconnect the peer first - self.disconnect_peer(addr, reason).await?; - - // Update reputation to trigger ban - self.reputation_manager - .update_reputation( - *addr, - misbehavior_scores::INVALID_HEADER * 2, // Severe penalty - reason, - ) - .await; - - Ok(()) - } - /// Unban a specific peer pub async fn unban_peer(&self, addr: &SocketAddr) { - self.reputation_manager.unban_peer(addr).await; + self.reputation_manager.lock().await.unban_peer(addr).await; } /// Shutdown the network manager @@ -1012,7 +971,8 @@ impl PeerNetworkManager { } // Save reputation data before shutdown - if let Err(e) = self.reputation_manager.save_to_storage(&self.peer_store).await { + if let Err(e) = self.reputation_manager.lock().await.save_to_storage(&self.peer_store).await + { log::warn!("Failed to save reputation data on shutdown: {}", e); } @@ -1107,81 +1067,68 @@ impl NetworkManager for PeerNetworkManager { async fn penalize_last_message_peer( &self, - score_change: i32, - reason: &str, + reason: ReputationChangeReason, ) -> NetworkResult<()> { // Get the last peer that sent us a message if let Some(addr) = self.get_last_message_peer().await { - self.reputation_manager.update_reputation(addr, score_change, reason).await; + self.reputation_manager.lock().await.update_reputation(addr, reason).await; } Ok(()) } - async fn penalize_last_message_peer_invalid_chainlock( - &self, - reason: &str, - ) -> NetworkResult<()> { + async fn penalize_last_message_peer_invalid_chainlock(&self) -> NetworkResult<()> { if let Some(addr) = self.get_last_message_peer().await { - match self.disconnect_peer(&addr, reason).await { + match self.disconnect_peer(&addr).await { Ok(()) => { - log::warn!( - "Peer {} disconnected for invalid ChainLock enforcement: {}", - addr, - reason - ); + log::warn!("Peer {addr} disconnected for invalid ChainLock enforcement",); } Err(err) => { log::error!( - "Failed to disconnect peer {} after invalid ChainLock enforcement ({}): {}", - addr, - reason, - err + "Failed to disconnect peer {addr} after invalid ChainLock enforcement: {err}", ); } } // Apply misbehavior score and a short temporary ban self.reputation_manager - .update_reputation(addr, misbehavior_scores::INVALID_CHAINLOCK, reason) + .lock() + .await + .update_reputation(addr, ReputationChangeReason::InvalidChainLock) .await; // Short ban: 10 minutes for relaying invalid ChainLock self.reputation_manager - .temporary_ban_peer(addr, Duration::from_secs(10 * 60), reason) + .lock() + .await + .temporary_ban_peer(addr, Duration::from_secs(10 * 60)) .await; } Ok(()) } - async fn penalize_last_message_peer_invalid_instantlock( - &self, - reason: &str, - ) -> NetworkResult<()> { + async fn penalize_last_message_peer_invalid_instantlock(&self) -> NetworkResult<()> { if let Some(addr) = self.get_last_message_peer().await { // Apply misbehavior score and a short temporary ban self.reputation_manager - .update_reputation(addr, misbehavior_scores::INVALID_INSTANTLOCK, reason) + .lock() + .await + .update_reputation(addr, ReputationChangeReason::InvalidInstantLock) .await; // Short ban: 10 minutes for relaying invalid InstantLock self.reputation_manager - .temporary_ban_peer(addr, Duration::from_secs(10 * 60), reason) + .lock() + .await + .temporary_ban_peer(addr, Duration::from_secs(10 * 60)) .await; - match self.disconnect_peer(&addr, reason).await { + match self.disconnect_peer(&addr).await { Ok(()) => { - log::warn!( - "Peer {} disconnected for invalid InstantLock enforcement: {}", - addr, - reason - ); + log::warn!("Peer {addr} disconnected for invalid InstantLock enforcement",); } Err(err) => { log::error!( - "Failed to disconnect peer {} after invalid InstantLock enforcement ({}): {}", - addr, - reason, - err + "Failed to disconnect peer {addr} after invalid InstantLock enforcement: {err}" ); } } diff --git a/dash-spv/src/network/mod.rs b/dash-spv/src/network/mod.rs index ff427d57..f2bcaa56 100644 --- a/dash-spv/src/network/mod.rs +++ b/dash-spv/src/network/mod.rs @@ -7,7 +7,7 @@ pub mod handshake; pub mod manager; pub mod peer; pub mod pool; -pub mod reputation; +mod reputation; #[cfg(test)] mod tests; @@ -17,13 +17,14 @@ pub mod mock; use async_trait::async_trait; -use crate::error::NetworkResult; +use crate::{error::NetworkResult, network::reputation::ReputationChangeReason}; use dashcore::network::message::NetworkMessage; use dashcore::BlockHash; pub use handshake::{HandshakeManager, HandshakeState}; pub use manager::PeerNetworkManager; pub use peer::Peer; +pub use reputation::PeerReputation; /// Network manager trait for abstracting network operations. #[async_trait] @@ -129,33 +130,18 @@ pub trait NetworkManager: Send + Sync { /// Default implementation is a no-op for managers without reputation. async fn penalize_last_message_peer( &self, - _score_change: i32, - _reason: &str, + _reason: ReputationChangeReason, ) -> NetworkResult<()> { Ok(()) } /// Convenience: penalize last peer for an invalid ChainLock. - async fn penalize_last_message_peer_invalid_chainlock( - &self, - reason: &str, - ) -> NetworkResult<()> { - self.penalize_last_message_peer( - crate::network::reputation::misbehavior_scores::INVALID_CHAINLOCK, - reason, - ) - .await + async fn penalize_last_message_peer_invalid_chainlock(&self) -> NetworkResult<()> { + self.penalize_last_message_peer(ReputationChangeReason::InvalidChainLock).await } /// Convenience: penalize last peer for an invalid InstantLock. - async fn penalize_last_message_peer_invalid_instantlock( - &self, - reason: &str, - ) -> NetworkResult<()> { - self.penalize_last_message_peer( - crate::network::reputation::misbehavior_scores::INVALID_INSTANTLOCK, - reason, - ) - .await + async fn penalize_last_message_peer_invalid_instantlock(&self) -> NetworkResult<()> { + self.penalize_last_message_peer(ReputationChangeReason::InvalidInstantLock).await } } diff --git a/dash-spv/src/network/reputation.rs b/dash-spv/src/network/reputation.rs index 2c01e45e..4d78b308 100644 --- a/dash-spv/src/network/reputation.rs +++ b/dash-spv/src/network/reputation.rs @@ -8,64 +8,58 @@ use serde::{Deserialize, Deserializer, Serialize}; use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::RwLock; use crate::storage::{PeerStorage, PersistentPeerStorage}; -/// Misbehavior score thresholds for different violations -pub mod misbehavior_scores { - /// Invalid message format or protocol violation - pub const INVALID_MESSAGE: i32 = 10; +pub enum ReputationChangeReason { + // Negative Changes + InvalidMessage, + InvalidHeader, + Timeout, + InvalidTransaction, + InvalidChainLock, + InvalidInstantLock, - /// Invalid block header - pub const INVALID_HEADER: i32 = 50; + // Positive changes + LongUptime, - /// Invalid compact filter - pub const INVALID_FILTER: i32 = 25; - - /// Timeout or slow response - pub const TIMEOUT: i32 = 5; - - /// Sending unsolicited data - pub const UNSOLICITED_DATA: i32 = 15; - - /// Invalid transaction - pub const INVALID_TRANSACTION: i32 = 20; - - /// Invalid masternode list diff - pub const INVALID_MASTERNODE_DIFF: i32 = 30; - - /// Invalid ChainLock - pub const INVALID_CHAINLOCK: i32 = 40; - - /// Invalid InstantLock - pub const INVALID_INSTANTLOCK: i32 = 35; - - /// Duplicate message - pub const DUPLICATE_MESSAGE: i32 = 5; - - /// Connection flood attempt - pub const CONNECTION_FLOOD: i32 = 20; + // Other + Other(i32, String), } -/// Positive behavior scores -pub mod positive_scores { - /// Successfully provided valid headers - pub const VALID_HEADERS: i32 = -5; - - /// Successfully provided valid filters - pub const VALID_FILTERS: i32 = -3; - - /// Successfully provided valid block - pub const VALID_BLOCK: i32 = -10; - - /// Fast response time - pub const FAST_RESPONSE: i32 = -2; +impl ReputationChangeReason { + pub fn score(&self) -> i32 { + // This score represents the missbehaviour score change, that means + // the higher the score, the more severe the violation. + match self { + ReputationChangeReason::InvalidMessage => 10, + ReputationChangeReason::InvalidHeader => 50, + ReputationChangeReason::Timeout => 5, + ReputationChangeReason::InvalidTransaction => 20, + ReputationChangeReason::InvalidChainLock => 40, + ReputationChangeReason::InvalidInstantLock => 35, + ReputationChangeReason::LongUptime => -5, + ReputationChangeReason::Other(score, _) => *score, + } + } +} - /// Long uptime connection - pub const LONG_UPTIME: i32 = -5; +impl std::fmt::Display for ReputationChangeReason { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + ReputationChangeReason::InvalidMessage => { + write!(f, "Invalid message format or protocol violation") + } + ReputationChangeReason::InvalidHeader => write!(f, "Invalid block header"), + ReputationChangeReason::Timeout => write!(f, "Timeout or slow response"), + ReputationChangeReason::InvalidTransaction => write!(f, "Invalid transaction"), + ReputationChangeReason::InvalidChainLock => write!(f, "Invalid ChainLock"), + ReputationChangeReason::InvalidInstantLock => write!(f, "Invalid InstantLock"), + ReputationChangeReason::LongUptime => write!(f, "Long uptime"), + ReputationChangeReason::Other(_, reason) => write!(f, "{}", reason), + } + } } /// Ban duration for misbehaving peers @@ -85,8 +79,6 @@ const MIN_MISBEHAVIOR_SCORE: i32 = -50; const MAX_BAN_COUNT: u32 = 1000; -const MAX_ACTION_COUNT: u64 = 1_000_000; - fn default_instant() -> Instant { Instant::now() } @@ -122,52 +114,28 @@ where Ok(v) } -fn clamp_peer_connection_attempts<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let mut v = u64::deserialize(deserializer)?; - - v = v.min(MAX_ACTION_COUNT); - - Ok(v) -} - /// Peer reputation entry #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PeerReputation { /// Current misbehavior score #[serde(deserialize_with = "clamp_peer_score")] - pub score: i32, + score: i32, /// Number of times this peer has been banned #[serde(deserialize_with = "clamp_peer_ban_count")] - pub ban_count: u32, + ban_count: u32, /// Time when the peer was banned (if currently banned) #[serde(skip)] - pub banned_until: Option, + banned_until: Option, /// Last time the reputation was updated #[serde(skip, default = "default_instant")] - pub last_update: Instant, - - /// Total number of positive actions - pub positive_actions: u64, - - /// Total number of negative actions - pub negative_actions: u64, - - /// Connection count - #[serde(deserialize_with = "clamp_peer_connection_attempts")] - pub connection_attempts: u64, - - /// Successful connection count - pub successful_connections: u64, + last_update: Instant, /// Last connection time #[serde(skip)] - pub last_connection: Option, + last_connection: Option, } impl Default for PeerReputation { @@ -177,23 +145,17 @@ impl Default for PeerReputation { ban_count: 0, banned_until: None, last_update: default_instant(), - positive_actions: 0, - negative_actions: 0, - connection_attempts: 0, - successful_connections: 0, last_connection: None, } } } impl PeerReputation { - /// Check if the peer is currently banned - pub fn is_banned(&self) -> bool { + fn is_banned(&self) -> bool { self.banned_until.is_some_and(|until| Instant::now() < until) } - /// Get remaining ban time - pub fn ban_time_remaining(&self) -> Option { + fn ban_time_remaining(&self) -> Option { self.banned_until.and_then(|until| { let now = Instant::now(); if now < until { @@ -205,7 +167,7 @@ impl PeerReputation { } /// Apply reputation decay - pub fn apply_decay(&mut self) { + fn apply_decay(&mut self) { let now = Instant::now(); let elapsed = now - self.last_update; @@ -227,52 +189,35 @@ impl PeerReputation { } } -/// Reputation change event -#[derive(Debug, Clone)] -pub struct ReputationEvent { - pub peer: SocketAddr, - pub change: i32, - pub reason: String, - pub timestamp: Instant, -} - -/// Peer reputation manager pub struct PeerReputationManager { - /// Reputation data for each peer - reputations: Arc>>, + reputations: HashMap, +} - /// Recent reputation events for monitoring - recent_events: Arc>>, +impl PeerReputationManager { + pub async fn load_or_new(storage: &PersistentPeerStorage) -> Self { + let mut reputations = + storage.load_peers_reputation().await.unwrap_or_else(|_| HashMap::new()); - /// Maximum number of events to keep - max_events: usize, -} + log::info!("Loaded reputation data for {} peers", reputations.len()); -impl Default for PeerReputationManager { - fn default() -> Self { - Self::new() - } -} + for (_, reputation) in reputations.iter_mut() { + if reputation.ban_count > 0 { + reputation.score = reputation.score.max(50); // Start with higher score for previously banned peers + } + } -impl PeerReputationManager { - /// Create a new reputation manager - pub fn new() -> Self { Self { - reputations: Arc::new(RwLock::new(HashMap::new())), - recent_events: Arc::new(RwLock::new(Vec::new())), - max_events: 1000, + reputations, } } /// Update peer reputation pub async fn update_reputation( - &self, + &mut self, peer: SocketAddr, - score_change: i32, - reason: &str, + reason: ReputationChangeReason, ) -> bool { - let mut reputations = self.reputations.write().await; - let reputation = reputations.entry(peer).or_default(); + let reputation = self.reputations.entry(peer).or_default(); // Apply decay first reputation.apply_decay(); @@ -280,14 +225,7 @@ impl PeerReputationManager { // Update score let old_score = reputation.score; reputation.score = - (reputation.score + score_change).clamp(MIN_MISBEHAVIOR_SCORE, MAX_MISBEHAVIOR_SCORE); - - // Track positive/negative actions - if score_change > 0 { - reputation.negative_actions += 1; - } else if score_change < 0 { - reputation.positive_actions += 1; - } + (reputation.score + reason.score()).clamp(MIN_MISBEHAVIOR_SCORE, MAX_MISBEHAVIOR_SCORE); // Check if peer should be banned let should_ban = reputation.score >= MAX_MISBEHAVIOR_SCORE && !reputation.is_banned(); @@ -304,46 +242,23 @@ impl PeerReputationManager { } // Log significant changes - if score_change.abs() >= 10 || should_ban { + if reason.score().abs() >= 10 || should_ban { log::info!( "Peer {} reputation changed: {} -> {} (change: {}, reason: {})", peer, old_score, reputation.score, - score_change, + reason.score(), reason ); } - // Record event - let event = ReputationEvent { - peer, - change: score_change, - reason: reason.to_string(), - timestamp: Instant::now(), - }; - - drop(reputations); // Release lock before recording event - self.record_event(event).await; - should_ban } - /// Record a reputation event - async fn record_event(&self, event: ReputationEvent) { - let mut events = self.recent_events.write().await; - events.push(event); - - // Keep only recent events - if events.len() > self.max_events { - let drain_count = events.len() - self.max_events; - events.drain(0..drain_count); - } - } - /// Check if a peer is banned - pub async fn is_banned(&self, peer: &SocketAddr) -> bool { - let mut reputations = self.reputations.write().await; + pub async fn is_banned(&mut self, peer: &SocketAddr) -> bool { + let reputations = &mut self.reputations; if let Some(reputation) = reputations.get_mut(peer) { reputation.apply_decay(); reputation.is_banned() @@ -352,70 +267,33 @@ impl PeerReputationManager { } } - /// Get peer reputation score - pub async fn get_score(&self, peer: &SocketAddr) -> i32 { - let mut reputations = self.reputations.write().await; - if let Some(reputation) = reputations.get_mut(peer) { - reputation.apply_decay(); - reputation.score - } else { - 0 - } - } - /// Temporarily ban a peer for a specified duration, regardless of score. /// This can be used for critical protocol violations (e.g., invalid ChainLocks). - pub async fn temporary_ban_peer(&self, peer: SocketAddr, duration: Duration, reason: &str) { - let mut reputations = self.reputations.write().await; + pub async fn temporary_ban_peer(&mut self, peer: SocketAddr, duration: Duration) { + let reputations = &mut self.reputations; let reputation = reputations.entry(peer).or_default(); reputation.banned_until = Some(Instant::now() + duration); reputation.ban_count += 1; log::warn!( - "Peer {} temporarily banned for {:?} (ban #{}, reason: {})", + "Peer {} temporarily banned for {:?} (ban #{})", peer, duration, reputation.ban_count, - reason ); } /// Record a connection attempt - pub async fn record_connection_attempt(&self, peer: SocketAddr) { - let mut reputations = self.reputations.write().await; + pub async fn record_connection_attempt(&mut self, peer: SocketAddr) { + let reputations = &mut self.reputations; let reputation = reputations.entry(peer).or_default(); - reputation.connection_attempts += 1; reputation.last_connection = Some(Instant::now()); } - /// Record a successful connection - pub async fn record_successful_connection(&self, peer: SocketAddr) { - let mut reputations = self.reputations.write().await; - let reputation = reputations.entry(peer).or_default(); - reputation.successful_connections += 1; - } - - /// Get all peer reputations - pub async fn get_all_reputations(&self) -> HashMap { - let mut reputations = self.reputations.write().await; - - // Apply decay to all peers - for reputation in reputations.values_mut() { - reputation.apply_decay(); - } - - reputations.clone() - } - - /// Get recent reputation events - pub async fn get_recent_events(&self) -> Vec { - self.recent_events.read().await.clone() - } - /// Clear banned status for a peer (admin function) - pub async fn unban_peer(&self, peer: &SocketAddr) { - let mut reputations = self.reputations.write().await; + pub async fn unban_peer(&mut self, peer: &SocketAddr) { + let reputations = &mut self.reputations; if let Some(reputation) = reputations.get_mut(peer) { reputation.banned_until = None; reputation.score = reputation.score.min(MAX_MISBEHAVIOR_SCORE - 10); @@ -423,104 +301,13 @@ impl PeerReputationManager { } } - /// Reset reputation for a peer - pub async fn reset_reputation(&self, peer: &SocketAddr) { - let mut reputations = self.reputations.write().await; - reputations.remove(peer); - log::info!("Reset reputation for peer {}", peer); - } - - /// Get peers sorted by reputation (best first) - pub async fn get_peers_by_reputation(&self) -> Vec<(SocketAddr, i32)> { - let mut reputations = self.reputations.write().await; - - // Apply decay and collect scores - let mut peer_scores: Vec<(SocketAddr, i32)> = reputations - .iter_mut() - .map(|(addr, rep)| { - rep.apply_decay(); - (*addr, rep.score) - }) - .filter(|(_, score)| *score < MAX_MISBEHAVIOR_SCORE) // Exclude banned peers - .collect(); - - // Sort by score (lower is better) - peer_scores.sort_by_key(|(_, score)| *score); - - peer_scores - } - - /// Save reputation data to persistent storage - pub async fn save_to_storage(&self, storage: &PersistentPeerStorage) -> std::io::Result<()> { - let reputations = self.reputations.read().await; - - storage.save_peers_reputation(&reputations).await.map_err(std::io::Error::other) - } - - /// Load reputation data from persistent storage - pub async fn load_from_storage(&self, storage: &PersistentPeerStorage) -> std::io::Result<()> { - let data = storage.load_peers_reputation().await.map_err(std::io::Error::other)?; - - let mut reputations = self.reputations.write().await; - let mut loaded_count = 0; - let mut skipped_count = 0; - - for (addr, mut reputation) in data { - // Validate successful connections don't exceed attempts - reputation.successful_connections = - reputation.successful_connections.min(reputation.connection_attempts); - - // Skip entry if data appears corrupted - if reputation.positive_actions > MAX_ACTION_COUNT - || reputation.negative_actions > MAX_ACTION_COUNT - { - log::warn!("Skipping peer {} with potentially corrupted action counts", addr); - skipped_count += 1; - continue; - } - - // Apply initial decay based on ban count - if reputation.ban_count > 0 { - reputation.score = reputation.score.max(50); // Start with higher score for previously banned peers - } - - reputations.insert(addr, reputation); - loaded_count += 1; - } - - log::info!( - "Loaded reputation data for {} peers (skipped {} corrupted entries)", - loaded_count, - skipped_count - ); - Ok(()) - } -} - -/// Helper trait for reputation-aware peer selection -pub trait ReputationAware { - /// Select best peers based on reputation - fn select_best_peers( - &self, - available_peers: Vec, - count: usize, - ) -> impl std::future::Future> + Send; - - /// Check if we should connect to a peer based on reputation - fn should_connect_to_peer( - &self, - peer: &SocketAddr, - ) -> impl std::future::Future + Send; -} - -impl ReputationAware for PeerReputationManager { - async fn select_best_peers( - &self, + pub async fn select_best_peers( + &mut self, available_peers: Vec, count: usize, ) -> Vec { let mut peer_scores = Vec::new(); - let mut reputations = self.reputations.write().await; + let reputations = &mut self.reputations; for peer in available_peers { let reputation = reputations.entry(peer).or_default(); @@ -538,12 +325,123 @@ impl ReputationAware for PeerReputationManager { peer_scores.into_iter().take(count).map(|(peer, _)| peer).collect() } - async fn should_connect_to_peer(&self, peer: &SocketAddr) -> bool { + pub async fn should_connect_to_peer(&mut self, peer: &SocketAddr) -> bool { !self.is_banned(peer).await } + + /// Save reputation data to persistent storage + pub async fn save_to_storage( + &mut self, + storage: &PersistentPeerStorage, + ) -> std::io::Result<()> { + storage.save_peers_reputation(&self.reputations).await.map_err(std::io::Error::other) + } } -// Include tests module #[cfg(test)] -#[path = "reputation_tests.rs"] -mod reputation_tests; +mod tests { + use crate::storage::PersistentStorage; + + use super::*; + use std::net::SocketAddr; + + async fn build_peer_reputation_manager() -> PeerReputationManager { + let temp_dir = tempfile::TempDir::new().unwrap(); + let peer_storage = PersistentPeerStorage::open(temp_dir.path()) + .await + .expect("Failed to open PersistentPeerStorage"); + PeerReputationManager::load_or_new(&peer_storage).await + } + + #[tokio::test] + async fn test_basic_reputation_operations() { + let mut manager = build_peer_reputation_manager().await; + let peer: SocketAddr = "127.0.0.1:8333".parse().unwrap(); + + // Initial score should be 0 + assert_eq!(manager.select_best_peers(vec![peer], 1).await[0], peer); + assert_eq!(manager.reputations.get(&peer).expect("Peer not found").score, 0); + + // Test misbehavior + manager.update_reputation(peer, ReputationChangeReason::InvalidMessage).await; + assert_eq!(manager.reputations.get(&peer).expect("Peer not found").score, 10); + } + + #[tokio::test] + async fn test_banning_mechanism() { + let mut manager = build_peer_reputation_manager().await; + let peer: SocketAddr = "192.168.1.1:8333".parse().unwrap(); + + // Accumulate misbehavior + for i in 0..10 { + let banned = + manager.update_reputation(peer, ReputationChangeReason::InvalidMessage).await; + + // Should be banned on the 10th violation (total score = 100) + if i == 9 { + assert!(banned); + } else { + assert!(!banned); + } + } + + assert!(manager.is_banned(&peer).await); + } + + #[tokio::test] + async fn test_reputation_persistence() { + let mut manager = build_peer_reputation_manager().await; + let peer1: SocketAddr = "10.0.0.1:8333".parse().unwrap(); + let peer2: SocketAddr = "10.0.0.2:8333".parse().unwrap(); + + // Set reputations + manager + .update_reputation(peer1, ReputationChangeReason::Other(-10, "Good peer".to_string())) + .await; + manager + .update_reputation(peer2, ReputationChangeReason::Other(50, "Bad peer".to_string())) + .await; + + // Save and load + let temp_dir = tempfile::TempDir::new().unwrap(); + let peer_storage = PersistentPeerStorage::open(temp_dir.path()) + .await + .expect("Failed to open PersistentPeerStorage"); + manager.save_to_storage(&peer_storage).await.unwrap(); + + let new_manager = PeerReputationManager::load_or_new(&peer_storage).await; + + // Verify scores were preserved + assert_eq!(new_manager.reputations.get(&peer1).expect("Peer not found").score, -10); + assert_eq!(new_manager.reputations.get(&peer2).expect("Peer not found").score, 50); + } + + #[tokio::test] + async fn test_peer_selection() { + let mut manager = build_peer_reputation_manager().await; + + let good_peer: SocketAddr = "1.1.1.1:8333".parse().unwrap(); + let neutral_peer: SocketAddr = "2.2.2.2:8333".parse().unwrap(); + let bad_peer: SocketAddr = "3.3.3.3:8333".parse().unwrap(); + + // Set different reputations + manager + .update_reputation( + good_peer, + ReputationChangeReason::Other(-20, "Very good".to_string()), + ) + .await; + manager + .update_reputation(bad_peer, ReputationChangeReason::Other(80, "Very bad".to_string())) + .await; + // neutral_peer has default score of 0 + + let all_peers = vec![good_peer, neutral_peer, bad_peer]; + let selected = manager.select_best_peers(all_peers, 2).await; + + // Should select good_peer first, then neutral_peer + assert_eq!(selected.len(), 2); + assert_eq!(selected[0], good_peer); + assert_eq!(selected[1], neutral_peer); + } +} diff --git a/dash-spv/src/network/reputation_tests.rs b/dash-spv/src/network/reputation_tests.rs deleted file mode 100644 index 8ab6dffc..00000000 --- a/dash-spv/src/network/reputation_tests.rs +++ /dev/null @@ -1,118 +0,0 @@ -//! Unit tests for reputation system (in-module tests) - -#[cfg(test)] -mod tests { - use crate::storage::PersistentStorage; - - use super::super::*; - use std::net::SocketAddr; - - #[tokio::test] - async fn test_basic_reputation_operations() { - let manager = PeerReputationManager::new(); - let peer: SocketAddr = "127.0.0.1:8333".parse().unwrap(); - - // Initial score should be 0 - assert_eq!(manager.get_score(&peer).await, 0); - - // Test misbehavior - manager - .update_reputation(peer, misbehavior_scores::INVALID_MESSAGE, "Test invalid message") - .await; - assert_eq!(manager.get_score(&peer).await, 10); - - // Test positive behavior - manager.update_reputation(peer, positive_scores::VALID_HEADERS, "Test valid headers").await; - assert_eq!(manager.get_score(&peer).await, 5); - } - - #[tokio::test] - async fn test_banning_mechanism() { - let manager = PeerReputationManager::new(); - let peer: SocketAddr = "192.168.1.1:8333".parse().unwrap(); - - // Accumulate misbehavior - for i in 0..10 { - let banned = manager - .update_reputation( - peer, - misbehavior_scores::INVALID_MESSAGE, - &format!("Violation {}", i), - ) - .await; - - // Should be banned on the 10th violation (total score = 100) - if i == 9 { - assert!(banned); - } else { - assert!(!banned); - } - } - - assert!(manager.is_banned(&peer).await); - } - - #[tokio::test] - async fn test_reputation_persistence() { - let manager = PeerReputationManager::new(); - let peer1: SocketAddr = "10.0.0.1:8333".parse().unwrap(); - let peer2: SocketAddr = "10.0.0.2:8333".parse().unwrap(); - - // Set reputations - manager.update_reputation(peer1, -10, "Good peer").await; - manager.update_reputation(peer2, 50, "Bad peer").await; - - // Save and load - let temp_dir = tempfile::TempDir::new().unwrap(); - let peer_storage = PersistentPeerStorage::open(temp_dir.path()) - .await - .expect("Failed to open PersistentPeerStorage"); - manager.save_to_storage(&peer_storage).await.unwrap(); - - let new_manager = PeerReputationManager::new(); - new_manager.load_from_storage(&peer_storage).await.unwrap(); - - // Verify scores were preserved - assert_eq!(new_manager.get_score(&peer1).await, -10); - assert_eq!(new_manager.get_score(&peer2).await, 50); - } - - #[tokio::test] - async fn test_peer_selection() { - let manager = PeerReputationManager::new(); - - let good_peer: SocketAddr = "1.1.1.1:8333".parse().unwrap(); - let neutral_peer: SocketAddr = "2.2.2.2:8333".parse().unwrap(); - let bad_peer: SocketAddr = "3.3.3.3:8333".parse().unwrap(); - - // Set different reputations - manager.update_reputation(good_peer, -20, "Very good").await; - manager.update_reputation(bad_peer, 80, "Very bad").await; - // neutral_peer has default score of 0 - - let all_peers = vec![good_peer, neutral_peer, bad_peer]; - let selected = manager.select_best_peers(all_peers, 2).await; - - // Should select good_peer first, then neutral_peer - assert_eq!(selected.len(), 2); - assert_eq!(selected[0], good_peer); - assert_eq!(selected[1], neutral_peer); - } - - #[tokio::test] - async fn test_connection_tracking() { - let manager = PeerReputationManager::new(); - let peer: SocketAddr = "127.0.0.1:9999".parse().unwrap(); - - // Track connection attempts - manager.record_connection_attempt(peer).await; - manager.record_connection_attempt(peer).await; - manager.record_successful_connection(peer).await; - - let reputations = manager.get_all_reputations().await; - let rep = &reputations[&peer]; - - assert_eq!(rep.connection_attempts, 2); - assert_eq!(rep.successful_connections, 1); - } -} diff --git a/dash-spv/src/storage/peers.rs b/dash-spv/src/storage/peers.rs index 63e2a3dc..9d39baff 100644 --- a/dash-spv/src/storage/peers.rs +++ b/dash-spv/src/storage/peers.rs @@ -14,7 +14,7 @@ use dashcore::{ use crate::{ error::StorageResult, - network::reputation::PeerReputation, + network::PeerReputation, storage::{io::atomic_write, PersistentStorage}, StorageError, }; diff --git a/dash-spv/tests/peer_test.rs b/dash-spv/tests/peer_test.rs index f15adada..54da1d55 100644 --- a/dash-spv/tests/peer_test.rs +++ b/dash-spv/tests/peer_test.rs @@ -139,40 +139,6 @@ async fn test_peer_persistence() { } } -#[tokio::test] -async fn test_peer_disconnection() { - let _ = env_logger::builder().is_test(true).try_init(); - - let temp_dir = TempDir::new().unwrap(); - let temp_path = temp_dir.path().to_path_buf(); - let mut config = create_test_config(Network::Regtest, Some(temp_dir)); - - // Add manual test peers (would need actual regtest nodes running) - config.peers = vec!["127.0.0.1:19899".parse().unwrap(), "127.0.0.1:19898".parse().unwrap()]; - - // Create network manager - let network_manager = PeerNetworkManager::new(&config).await.unwrap(); - - // Create storage manager - let storage_manager = DiskStorageManager::new(temp_path).await.unwrap(); - - // Create wallet manager - let wallet = Arc::new(RwLock::new(WalletManager::::new())); - - let client = - DashSpvClient::new(config, network_manager, storage_manager, wallet).await.unwrap(); - - // Note: This test would require actual regtest nodes running - // For now, we just test that the API works - let test_addr: SocketAddr = "127.0.0.1:19899".parse().unwrap(); - - // Try to disconnect (will fail if not connected, but tests the API) - match client.disconnect_peer(&test_addr, "Test disconnection").await { - Ok(_) => println!("Disconnected peer {}", test_addr), - Err(e) => println!("Expected error disconnecting non-existent peer: {}", e), - } -} - #[tokio::test] async fn test_max_peer_limit() { use dash_spv::network::constants::MAX_PEERS;