Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 87 additions & 4 deletions dash-spv/src/sync/filters/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
) -> SyncResult<Vec<SyncEvent>> {
debug_assert!(self.is_idle(), "manager should have no in-flight state on start");

self.set_state(SyncState::Syncing);
// Use filter_committed_height for restart recovery instead of
// synced_height, which advances per-block and may exceed committed scan progress.
let (wallet_birth_height, wallet_committed_height) = {
Expand Down Expand Up @@ -184,7 +183,10 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
tip_height: self.progress.committed_height(),
}]);
}
// Caught up to available filter headers but chain tip not reached yet
// Not enough filter headers yet to start scanning. Go back to waiting
// so the next FilterHeadersStored event triggers start_download again
// with proper batch processing initialization.
self.set_state(SyncState::WaitForEvents);
return Ok(vec![]);
}

Expand All @@ -207,6 +209,8 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
self.progress.filter_header_tip_height()
);

self.set_state(SyncState::Syncing);

// Initialize download pipeline for remaining filters
if download_start <= self.progress.filter_header_tip_height() {
self.filter_pipeline.init(download_start, self.progress.filter_header_tip_height());
Expand Down Expand Up @@ -723,13 +727,19 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
SyncState::Syncing | SyncState::Synced
if self.progress.stored_height() < self.progress.filter_header_tip_height() =>
{
// Transition back to Syncing so is_synced() returns false
// until all new filters and matched blocks are fully processed.
if self.state() == SyncState::Synced {
self.set_state(SyncState::Syncing);
}

self.filter_pipeline.extend_target(tip_height);
{
let header_storage = self.header_storage.read().await;
self.filter_pipeline.send_pending(requests, &*header_storage).await?;
}

if self.state() == SyncState::Synced && self.active_batches.is_empty() {
if self.active_batches.is_empty() {
tracing::debug!("Processing new filter (target: {})", tip_height);
return self.try_create_lookahead_batches().await;
}
Expand All @@ -755,13 +765,14 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
#[cfg(test)]
mod tests {
use super::*;
use crate::network::MessageType;
use crate::network::{MessageType, RequestSender};
use crate::storage::{
DiskStorageManager, PersistentBlockHeaderStorage, PersistentFilterHeaderStorage,
PersistentFilterStorage, StorageManager,
};
use crate::sync::{ManagerIdentifier, SyncManagerProgress};
use key_wallet_manager::test_utils::MockWallet;
use tokio::sync::mpsc::unbounded_channel;

type TestFiltersManager = FiltersManager<
PersistentBlockHeaderStorage,
Expand Down Expand Up @@ -969,4 +980,76 @@ mod tests {
// After take, should be empty
assert!(batch.take_collected_addresses().is_empty());
}

#[tokio::test]
async fn test_start_download_waits_when_filter_headers_insufficient() {
let mut manager = create_test_manager().await;
assert_eq!(manager.state(), SyncState::Initializing);

// Wallet committed to height 100, so scan_start will be 101
manager.wallet.write().await.update_synced_height(100);
// Filter headers only reached 50, so its below scan_start
manager.progress.update_filter_header_tip_height(50);
// Chain tip higher so the Synced early-return is not taken
manager.progress.update_target_height(1000);

let (tx, _rx) = unbounded_channel();
let events = manager.start_download(&RequestSender::new(tx)).await.unwrap();

assert!(events.is_empty());
assert_eq!(manager.state(), SyncState::WaitForEvents);
assert!(manager.is_idle());
}

#[tokio::test]
async fn test_start_download_transitions_to_syncing_when_filters_available() {
let mut manager = create_test_manager().await;
assert_eq!(manager.state(), SyncState::Initializing);

// Store headers so send_pending can resolve stop hashes
let headers = dashcore::block::Header::dummy_batch(0..101);
manager.header_storage.write().await.store_headers(&headers).await.unwrap();

// Filter headers available up to 100, wallet at genesis (scan_start = 0)
manager.progress.update_filter_header_tip_height(100);
manager.progress.update_target_height(1000);

let (tx, _rx) = unbounded_channel();
let events = manager.start_download(&RequestSender::new(tx)).await.unwrap();

assert_eq!(manager.state(), SyncState::Syncing);
assert!(!manager.is_idle());
assert!(events.is_empty());
// Should have created an initial processing batch spanning scan_start to filter tip
let batch = manager.active_batches.get(&0).expect("batch at scan_start=0");
assert_eq!(batch.start_height(), 0);
assert_eq!(batch.end_height(), 100);
}

#[tokio::test]
async fn test_handle_new_filter_headers_transitions_synced_to_syncing() {
let mut manager = create_test_manager().await;

// Simulate fully synced state at height 100
manager.set_state(SyncState::Synced);
manager.progress.update_stored_height(100);
manager.progress.update_filter_header_tip_height(100);
manager.progress.update_committed_height(100);
manager.progress.update_target_height(1000);
// Pipeline target at 150 with no pending batches, so extend_target(150)
// is a no-op and send_pending returns immediately (no headers needed)
manager.filter_pipeline.init(151, 150);
// Active batch prevents try_create_lookahead_batches from running
manager.active_batches.insert(101, FiltersBatch::new(101, 200, HashMap::new()));

let (tx, _rx) = unbounded_channel();
let requests = RequestSender::new(tx);

// New filter headers arrive at 150: stored(100) < tip(150)
let events = manager.handle_new_filter_headers(150, &requests).await.unwrap();

assert!(events.is_empty());
assert_eq!(manager.state(), SyncState::Syncing);
assert!(!manager.is_idle());
}
}
Loading