diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index e4fd3475024..0926b3b7fae 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -266,12 +266,13 @@ struct TestChainMonitor { Arc, >, >, + pub deferred: bool, pub latest_monitors: Mutex>, } impl TestChainMonitor { pub fn new( broadcaster: Arc, logger: Arc, feeest: Arc, - persister: Arc, keys: Arc, + persister: Arc, keys: Arc, deferred: bool, ) -> Self { Self { chain_monitor: Arc::new(chainmonitor::ChainMonitor::new( @@ -282,14 +283,44 @@ impl TestChainMonitor { Arc::clone(&persister), Arc::clone(&keys), keys.get_peer_storage_key(), - false, + deferred, )), logger, keys, persister, + deferred, latest_monitors: Mutex::new(new_hash_map()), } } + + /// Flushes all deferred monitor operations and, if the persister reports success, promotes + /// pending monitor states to persisted in our shadow records. `TestChainMonitor` maintains + /// its own `latest_monitors` map that tracks serialized monitor snapshots independently of + /// `ChainMonitor`, so that the fuzzer can simulate node restarts by deserializing from these + /// snapshots rather than relying on the persister's storage. + /// + /// This simulates the pattern of snapshotting the pending count, persisting the + /// `ChannelManager`, then flushing the queued monitor writes. + fn flush_and_update_latest_monitors(&self) { + let count = self.chain_monitor.pending_operation_count(); + if count == 0 { + return; + } + // Execute all queued watch_channel/update_channel operations inside the ChainMonitor. + self.chain_monitor.flush(count, &self.logger); + let persister_res = *self.persister.update_ret.lock().unwrap(); + // Only update our local tracking state when the persister signals completion. When + // persistence is still in-progress, the monitors stay in the pending set so that a + // simulated restart can still reload from the last fully-persisted snapshot. + if persister_res == chain::ChannelMonitorUpdateStatus::Completed { + for (_channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() { + if let Some((id, data)) = state.pending_monitors.drain(..).last() { + state.persisted_monitor_id = id; + state.persisted_monitor = data; + } + } + } + } } impl chain::Watch for TestChainMonitor { fn watch_channel( @@ -299,6 +330,9 @@ impl chain::Watch for TestChainMonitor { monitor.write(&mut ser).unwrap(); let monitor_id = monitor.get_latest_update_id(); let res = self.chain_monitor.watch_channel(channel_id, monitor); + if self.deferred { + assert_eq!(res, Ok(chain::ChannelMonitorUpdateStatus::InProgress)); + } let state = match res { Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState { persisted_monitor_id: monitor_id, @@ -348,6 +382,9 @@ impl chain::Watch for TestChainMonitor { let mut ser = VecWriter(Vec::new()); deserialized_monitor.write(&mut ser).unwrap(); let res = self.chain_monitor.update_channel(channel_id, update); + if self.deferred { + assert_eq!(res, chain::ChannelMonitorUpdateStatus::InProgress); + } match res { chain::ChannelMonitorUpdateStatus::Completed => { map_entry.persisted_monitor_id = update.update_id; @@ -364,6 +401,9 @@ impl chain::Watch for TestChainMonitor { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + if self.deferred { + self.flush_and_update_latest_monitors(); + } return self.chain_monitor.release_pending_monitor_events(); } } @@ -891,6 +931,11 @@ pub fn do_test( ChannelMonitorUpdateStatus::Completed }), ]; + let deferred = [ + initial_mon_styles & 0b001_000 != 0, + initial_mon_styles & 0b010_000 != 0, + initial_mon_styles & 0b100_000 != 0, + ]; let mut chain_state = ChainState::new(); let mut node_height_a: u32 = 0; @@ -919,6 +964,7 @@ pub fn do_test( update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()), }), Arc::clone(&keys_manager), + deferred[$node_id as usize], )); let mut config = UserConfig::default(); @@ -971,6 +1017,7 @@ pub fn do_test( update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed), }), Arc::clone(keys), + deferred[node_id as usize], )); let mut config = UserConfig::default(); @@ -1037,18 +1084,28 @@ pub fn do_test( let manager = <(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager"); let res = (manager.1, chain_monitor.clone()); + let expected_status = if deferred[node_id as usize] { + ChannelMonitorUpdateStatus::InProgress + } else { + ChannelMonitorUpdateStatus::Completed + }; for (channel_id, mon) in monitors.drain() { assert_eq!( chain_monitor.chain_monitor.watch_channel(channel_id, mon), - Ok(ChannelMonitorUpdateStatus::Completed) + Ok(expected_status) ); } + if deferred[node_id as usize] { + let count = chain_monitor.chain_monitor.pending_operation_count(); + chain_monitor.chain_monitor.flush(count, &chain_monitor.logger); + } *chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow(); res }; macro_rules! complete_all_pending_monitor_updates { ($monitor: expr) => {{ + $monitor.flush_and_update_latest_monitors(); for (channel_id, state) in $monitor.latest_monitors.lock().unwrap().iter_mut() { for (id, data) in state.pending_monitors.drain(..) { $monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap(); @@ -2008,6 +2065,7 @@ pub fn do_test( |monitor: &Arc, chan_funding, compl_selector: &dyn Fn(&mut Vec<(u64, Vec)>) -> Option<(u64, Vec)>| { + monitor.flush_and_update_latest_monitors(); if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) { assert!( state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), @@ -2024,6 +2082,7 @@ pub fn do_test( }; let complete_all_monitor_updates = |monitor: &Arc, chan_id| { + monitor.flush_and_update_latest_monitors(); if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { assert!( state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),