From b00b66c553fb7505ce8004374c266b40f053f0df Mon Sep 17 00:00:00 2001 From: xdustinface Date: Sun, 14 Dec 2025 10:39:02 +1100 Subject: [PATCH] feat: Implement parallel filter matching --- dash-spv/src/client/block_processor_test.rs | 10 +- dash-spv/src/sync/filters/matching.rs | 18 ++- key-wallet-manager/Cargo.toml | 2 + key-wallet-manager/src/wallet_interface.rs | 23 ++- .../src/wallet_manager/matching.rs | 123 +++++++++++++++ key-wallet-manager/src/wallet_manager/mod.rs | 1 + .../src/wallet_manager/process_block.rs | 7 + .../tests/check_compact_filters_tests.rs | 145 ++++++++++++++++++ test-utils/src/builders.rs | 58 +++++++ 9 files changed, 381 insertions(+), 6 deletions(-) create mode 100644 key-wallet-manager/src/wallet_manager/matching.rs create mode 100644 key-wallet-manager/tests/check_compact_filters_tests.rs diff --git a/dash-spv/src/client/block_processor_test.rs b/dash-spv/src/client/block_processor_test.rs index a8330a2d2..4f6686419 100644 --- a/dash-spv/src/client/block_processor_test.rs +++ b/dash-spv/src/client/block_processor_test.rs @@ -7,7 +7,7 @@ mod tests { use crate::storage::DiskStorageManager; use crate::types::{SpvEvent, SpvStats}; use dashcore::{blockdata::constants::genesis_block, Block, Network, Transaction}; - + use key_wallet_manager::wallet_manager::matching::{FilterMatchInput, FilterMatchOutput}; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex, RwLock}; @@ -62,6 +62,10 @@ mod tests { true } + async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput { + input.keys().cloned().collect() + } + async fn describe(&self) -> String { "MockWallet (test implementation)".to_string() } @@ -263,6 +267,10 @@ mod tests { false } + async fn check_compact_filters(&self, _input: FilterMatchInput) -> FilterMatchOutput { + FilterMatchOutput::new() + } + async fn describe(&self) -> String { "NonMatchingWallet (test implementation)".to_string() } diff --git a/dash-spv/src/sync/filters/matching.rs b/dash-spv/src/sync/filters/matching.rs index ed119a9d3..84743c53e 100644 --- a/dash-spv/src/sync/filters/matching.rs +++ b/dash-spv/src/sync/filters/matching.rs @@ -8,16 +8,16 @@ //! - Efficient filter matching using BIP158 algorithms //! - Block download coordination for matches +use crate::error::{SyncError, SyncResult}; +use crate::network::NetworkManager; +use crate::storage::StorageManager; use dashcore::{ bip158::{BlockFilterReader, Error as Bip158Error}, network::message::NetworkMessage, network::message_blockdata::Inventory, BlockHash, ScriptBuf, }; - -use crate::error::{SyncError, SyncResult}; -use crate::network::NetworkManager; -use crate::storage::StorageManager; +use key_wallet_manager::wallet_manager::matching::{FilterMatchInput, FilterMatchOutput}; impl super::manager::FilterSyncManager @@ -43,6 +43,16 @@ impl( + &self, + input_map: FilterMatchInput, + wallet: &W, + ) -> FilterMatchOutput { + wallet.check_compact_filters(input_map).await + } + /// Check if filter matches any of the provided scripts using BIP158 GCS filter. #[allow(dead_code)] fn filter_matches_scripts( diff --git a/key-wallet-manager/Cargo.toml b/key-wallet-manager/Cargo.toml index 6c47aacb2..620734446 100644 --- a/key-wallet-manager/Cargo.toml +++ b/key-wallet-manager/Cargo.toml @@ -24,10 +24,12 @@ serde = { version = "1.0", default-features = false, features = ["derive"], opti async-trait = "0.1" bincode = { version = "=2.0.0-rc.3", optional = true } zeroize = { version = "1.8", features = ["derive"] } +rayon = "1.11" [dev-dependencies] hex = "0.4" serde_json = "1.0" +dashcore-test-utils = { path = "../test-utils" } tokio = { version = "1.32", features = ["full"] } [lints.rust] diff --git a/key-wallet-manager/src/wallet_interface.rs b/key-wallet-manager/src/wallet_interface.rs index 62af33871..cfd1142b7 100644 --- a/key-wallet-manager/src/wallet_interface.rs +++ b/key-wallet-manager/src/wallet_interface.rs @@ -2,11 +2,28 @@ //! //! This module defines the trait that SPV clients use to interact with wallets. +use crate::wallet_manager::matching::{FilterMatchInput, FilterMatchOutput}; use alloc::string::String; +use alloc::vec::Vec; use async_trait::async_trait; use dashcore::bip158::BlockFilter; use dashcore::prelude::CoreBlockHeight; -use dashcore::{Block, Transaction, Txid}; +use dashcore::{Block, BlockHash, Transaction, Txid}; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct FilterMatchKey { + pub block_height: CoreBlockHeight, + pub block_hash: BlockHash, +} + +impl FilterMatchKey { + pub fn new(height: CoreBlockHeight, hash: BlockHash) -> Self { + Self { + block_height: height, + block_hash: hash, + } + } +} /// Trait for wallet implementations to receive SPV events #[async_trait] @@ -26,6 +43,10 @@ pub trait WalletInterface: Send + Sync { block_hash: &dashcore::BlockHash, ) -> bool; + /// Check compact filters against watched addresses in batch + /// Returns map of filter keys to match results + async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput; + /// Return the wallet's per-transaction net change and involved addresses if known. /// Returns (net_amount, addresses) where net_amount is received - sent in satoshis. /// If the wallet has no record for the transaction, returns None. diff --git a/key-wallet-manager/src/wallet_manager/matching.rs b/key-wallet-manager/src/wallet_manager/matching.rs new file mode 100644 index 000000000..20c58f137 --- /dev/null +++ b/key-wallet-manager/src/wallet_manager/matching.rs @@ -0,0 +1,123 @@ +use crate::wallet_interface::FilterMatchKey; +use alloc::vec::Vec; +use dashcore::bip158::BlockFilter; +use dashcore::Address; +use rayon::prelude::{IntoParallelIterator, ParallelIterator}; +use std::collections::{BTreeSet, HashMap}; + +pub type FilterMatchInput = HashMap; +pub type FilterMatchOutput = BTreeSet; + +/// Check compact filters for addresses and return the keys that matched. +pub fn check_compact_filters_for_addresses( + input: FilterMatchInput, + addresses: Vec
, +) -> FilterMatchOutput { + let script_pubkey_bytes: Vec> = + addresses.iter().map(|address| address.script_pubkey().to_bytes()).collect(); + + input + .into_par_iter() + .filter_map(|(key, filter)| { + filter + .match_any(&key.block_hash, script_pubkey_bytes.iter().map(|v| v.as_slice())) + .unwrap_or(false) + .then_some(key) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use dashcore::blockdata::script::ScriptBuf; + use dashcore_test_utils::{ + create_filter_for_block, create_test_block, create_test_transaction_to_script, test_address, + }; + + #[test] + fn test_empty_input_returns_empty() { + let result = check_compact_filters_for_addresses(FilterMatchInput::new(), vec![]); + assert!(result.is_empty()); + } + + #[test] + fn test_empty_addresses_returns_empty() { + let tx = create_test_transaction_to_script(ScriptBuf::new()); + let block = create_test_block(100, vec![tx]); + let filter = create_filter_for_block(&block); + let key = FilterMatchKey::new(100, block.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key.clone(), filter); + + let output = check_compact_filters_for_addresses(input, vec![]); + assert!(!output.contains(&key)); + } + + #[test] + fn test_matching_filter() { + let address = test_address(0); + + let tx = create_test_transaction_to_script(address.script_pubkey()); + let block = create_test_block(100, vec![tx]); + let filter = create_filter_for_block(&block); + let key = FilterMatchKey::new(100, block.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key.clone(), filter); + + let output = check_compact_filters_for_addresses(input, vec![address]); + assert!(output.contains(&key)); + } + + #[test] + fn test_non_matching_filter() { + let address = test_address(0); + let other_address = test_address(1); + + let tx = create_test_transaction_to_script(other_address.script_pubkey()); + let block = create_test_block(100, vec![tx]); + let filter = create_filter_for_block(&block); + let key = FilterMatchKey::new(100, block.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key.clone(), filter); + + let output = check_compact_filters_for_addresses(input, vec![address]); + assert!(!output.contains(&key)); + } + + #[test] + fn test_batch_mixed_results() { + let address1 = test_address(0); + let address2 = test_address(1); + let unrelated_address = test_address(2); + + let tx1 = create_test_transaction_to_script(address1.script_pubkey()); + let block1 = create_test_block(100, vec![tx1]); + let filter1 = create_filter_for_block(&block1); + let key1 = FilterMatchKey::new(100, block1.block_hash()); + + let tx2 = create_test_transaction_to_script(address2.script_pubkey()); + let block2 = create_test_block(200, vec![tx2]); + let filter2 = create_filter_for_block(&block2); + let key2 = FilterMatchKey::new(200, block2.block_hash()); + + let tx3 = create_test_transaction_to_script(unrelated_address.script_pubkey()); + let block3 = create_test_block(300, vec![tx3]); + let filter3 = create_filter_for_block(&block3); + let key3 = FilterMatchKey::new(300, block3.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key1.clone(), filter1); + input.insert(key2.clone(), filter2); + input.insert(key3.clone(), filter3); + + let output = check_compact_filters_for_addresses(input, vec![address1, address2]); + assert_eq!(output.len(), 2); + assert!(output.contains(&key1)); + assert!(output.contains(&key2)); + assert!(!output.contains(&key3)); + } +} diff --git a/key-wallet-manager/src/wallet_manager/mod.rs b/key-wallet-manager/src/wallet_manager/mod.rs index 5ad673754..da357aff7 100644 --- a/key-wallet-manager/src/wallet_manager/mod.rs +++ b/key-wallet-manager/src/wallet_manager/mod.rs @@ -4,6 +4,7 @@ //! each of which can have multiple accounts. This follows the architecture //! pattern where a manager oversees multiple distinct wallets. +pub mod matching; mod process_block; mod transaction_building; diff --git a/key-wallet-manager/src/wallet_manager/process_block.rs b/key-wallet-manager/src/wallet_manager/process_block.rs index fae442d58..e3ee361eb 100644 --- a/key-wallet-manager/src/wallet_manager/process_block.rs +++ b/key-wallet-manager/src/wallet_manager/process_block.rs @@ -1,4 +1,7 @@ use crate::wallet_interface::WalletInterface; +use crate::wallet_manager::matching::{ + check_compact_filters_for_addresses, FilterMatchInput, FilterMatchOutput, +}; use crate::WalletManager; use alloc::string::String; use alloc::vec::Vec; @@ -78,6 +81,10 @@ impl WalletInterface for WalletM hit } + async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput { + check_compact_filters_for_addresses(input, self.monitored_addresses()) + } + async fn transaction_effect(&self, tx: &Transaction) -> Option<(i64, Vec)> { // Aggregate across all managed wallets. If any wallet considers it relevant, // compute net = total_received - total_sent and collect involved addresses. diff --git a/key-wallet-manager/tests/check_compact_filters_tests.rs b/key-wallet-manager/tests/check_compact_filters_tests.rs new file mode 100644 index 000000000..819f9ae45 --- /dev/null +++ b/key-wallet-manager/tests/check_compact_filters_tests.rs @@ -0,0 +1,145 @@ +//! Integration tests for WalletInterface::check_compact_filters + +use dashcore::blockdata::script::ScriptBuf; +use dashcore_test_utils::{ + create_filter_for_block, create_test_block, create_test_transaction_to_script, +}; +use key_wallet::wallet::initialization::WalletAccountCreationOptions; +use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; +use key_wallet::Network; +use key_wallet_manager::wallet_interface::{FilterMatchKey, WalletInterface}; +use key_wallet_manager::wallet_manager::matching::FilterMatchInput; +use key_wallet_manager::wallet_manager::WalletManager; + +#[tokio::test] +async fn test_check_compact_filters_empty_input() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + let input = FilterMatchInput::new(); + let output = manager.check_compact_filters(input).await; + + assert!(output.is_empty()); +} + +#[tokio::test] +async fn test_check_compact_filters_no_matches() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + let tx1 = create_test_transaction_to_script(ScriptBuf::new()); + let block1 = create_test_block(100, vec![tx1]); + let filter1 = create_filter_for_block(&block1); + let key1 = FilterMatchKey::new(100, block1.block_hash()); + + let tx2 = create_test_transaction_to_script(ScriptBuf::new()); + let block2 = create_test_block(200, vec![tx2]); + let filter2 = create_filter_for_block(&block2); + let key2 = FilterMatchKey::new(200, block2.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key1, filter1); + input.insert(key2, filter2); + + let output = manager.check_compact_filters(input).await; + + assert!(output.is_empty()); +} + +#[tokio::test] +async fn test_check_compact_filters_batch_mixed_results() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + let addresses = manager.monitored_addresses(); + assert!(!addresses.is_empty()); + let wallet_script = addresses[0].script_pubkey(); + + let tx_match = create_test_transaction_to_script(wallet_script); + let block_match = create_test_block(100, vec![tx_match]); + let filter_match = create_filter_for_block(&block_match); + let key_match = FilterMatchKey::new(100, block_match.block_hash()); + + let tx_no_match = create_test_transaction_to_script(ScriptBuf::new()); + let block_no_match = create_test_block(200, vec![tx_no_match]); + let filter_no_match = create_filter_for_block(&block_no_match); + let key_no_match = FilterMatchKey::new(200, block_no_match.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key_match.clone(), filter_match); + input.insert(key_no_match.clone(), filter_no_match); + + let output = manager.check_compact_filters(input).await; + + assert_eq!(output.len(), 1); + assert!(output.contains(&key_match)); + assert!(!output.contains(&key_no_match)); +} + +#[tokio::test] +async fn test_check_compact_filters_multiple_matching_addresses() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + let addresses = manager.monitored_addresses(); + assert!(addresses.len() >= 2, "Need at least 2 addresses for this test"); + + let tx1 = create_test_transaction_to_script(addresses[0].script_pubkey()); + let tx2 = create_test_transaction_to_script(addresses[1].script_pubkey()); + let block = create_test_block(100, vec![tx1, tx2]); + let filter = create_filter_for_block(&block); + let key = FilterMatchKey::new(100, block.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key.clone(), filter); + + let output = manager.check_compact_filters(input).await; + + assert!(output.contains(&key)); +} + +#[tokio::test] +async fn test_check_compact_filters_all_match() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + let addresses = manager.monitored_addresses(); + assert!(addresses.len() >= 2, "Need at least 2 addresses"); + let script1 = addresses[0].script_pubkey(); + let script2 = addresses[1].script_pubkey(); + + let tx1 = create_test_transaction_to_script(script1); + let block1 = create_test_block(100, vec![tx1]); + let filter1 = create_filter_for_block(&block1); + let key1 = FilterMatchKey::new(100, block1.block_hash()); + + let tx2 = create_test_transaction_to_script(script2); + let block2 = create_test_block(200, vec![tx2]); + let filter2 = create_filter_for_block(&block2); + let key2 = FilterMatchKey::new(200, block2.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key1.clone(), filter1); + input.insert(key2.clone(), filter2); + + let output = manager.check_compact_filters(input).await; + + assert_eq!(output.len(), 2); + assert!(output.contains(&key1)); + assert!(output.contains(&key2)); +} diff --git a/test-utils/src/builders.rs b/test-utils/src/builders.rs index 22b626ef4..1c69a5990 100644 --- a/test-utils/src/builders.rs +++ b/test-utils/src/builders.rs @@ -195,6 +195,64 @@ pub fn random_block_hash() -> BlockHash { BlockHash::from_slice(&bytes).unwrap() } +/// Create a test transaction paying to a specific script +pub fn create_test_transaction_to_script(script: ScriptBuf) -> Transaction { + Transaction { + version: 2, + lock_time: 0, + input: vec![TxIn { + previous_output: OutPoint { + txid: Txid::from_byte_array([1u8; 32]), + vout: 0, + }, + script_sig: ScriptBuf::new(), + sequence: 0xffffffff, + witness: dashcore::Witness::default(), + }], + output: vec![TxOut { + value: 1000, + script_pubkey: script, + }], + special_transaction_payload: None, + } +} + +/// Create a test block with given height and transactions +pub fn create_test_block(height: u32, transactions: Vec) -> dashcore::Block { + dashcore::Block { + header: Header { + version: block::Version::ONE, + prev_blockhash: BlockHash::from_byte_array([height as u8; 32]), + merkle_root: TxMerkleNode::from_byte_array([0u8; 32]), + time: height, + bits: dashcore::CompactTarget::from_consensus(0x1d00ffff), + nonce: height, + }, + txdata: transactions, + } +} + +/// Create a BIP158 compact block filter for a block +pub fn create_filter_for_block(block: &dashcore::Block) -> dashcore::bip158::BlockFilter { + use dashcore::bip158::BlockFilterWriter; + let mut content = Vec::new(); + let mut writer = BlockFilterWriter::new(&mut content, block); + writer.add_output_scripts(); + writer.finish().expect("Failed to finish filter"); + dashcore::bip158::BlockFilter::new(&content) +} + +/// Create a deterministic test address from an index +pub fn test_address(idx: usize) -> dashcore::Address { + use dashcore::secp256k1::{Secp256k1, SecretKey}; + let secp = Secp256k1::new(); + let mut key_bytes = [0u8; 32]; + key_bytes[31] = (idx as u8) + 1; + let secret_key = SecretKey::from_slice(&key_bytes).expect("valid secret key"); + let public_key = dashcore::PublicKey::new(secret_key.public_key(&secp)); + dashcore::Address::p2pkh(&public_key, dashcore::Network::Testnet) +} + #[cfg(test)] mod tests { use super::*;