From 6274ba0333427c5c2679662eba69058d8d1e78b2 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 6 Mar 2026 12:04:27 +0100 Subject: [PATCH] Add deferred monitor write support to chanmon_consistency fuzz target Add a `deferred` flag to `TestChainMonitor` that controls whether the underlying `ChainMonitor` queues operations instead of executing them immediately. The flag is derived from the fuzz input alongside the existing monitor style bits, so each of the three nodes can independently run in deferred or immediate mode. In deferred mode, `watch_channel` and `update_channel` always return `InProgress`. A new `flush_and_update_latest_monitors` method drains the queued operations and, when the persister reports `Completed`, promotes the pending shadow monitor snapshots to persisted state. This method is called before `release_pending_monitor_events` and at each point where the fuzzer completes pending monitor updates. On node reload, deferred monitors are flushed immediately after `watch_channel` so the node starts with a consistent state. AI tools were used in preparing this commit. Co-Authored-By: Claude Opus 4.6 --- fuzz/src/chanmon_consistency.rs | 65 +++++++++++++++++++++++++++++++-- 1 file changed, 62 insertions(+), 3 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index e4fd3475024..0926b3b7fae 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -266,12 +266,13 @@ struct TestChainMonitor { Arc, >, >, + pub deferred: bool, pub latest_monitors: Mutex>, } impl TestChainMonitor { pub fn new( broadcaster: Arc, logger: Arc, feeest: Arc, - persister: Arc, keys: Arc, + persister: Arc, keys: Arc, deferred: bool, ) -> Self { Self { chain_monitor: Arc::new(chainmonitor::ChainMonitor::new( @@ -282,14 +283,44 @@ impl TestChainMonitor { Arc::clone(&persister), Arc::clone(&keys), keys.get_peer_storage_key(), - false, + deferred, )), logger, keys, persister, + deferred, latest_monitors: Mutex::new(new_hash_map()), } } + + /// Flushes all deferred monitor operations and, if the persister reports success, promotes + /// pending monitor states to persisted in our shadow records. `TestChainMonitor` maintains + /// its own `latest_monitors` map that tracks serialized monitor snapshots independently of + /// `ChainMonitor`, so that the fuzzer can simulate node restarts by deserializing from these + /// snapshots rather than relying on the persister's storage. + /// + /// This simulates the pattern of snapshotting the pending count, persisting the + /// `ChannelManager`, then flushing the queued monitor writes. + fn flush_and_update_latest_monitors(&self) { + let count = self.chain_monitor.pending_operation_count(); + if count == 0 { + return; + } + // Execute all queued watch_channel/update_channel operations inside the ChainMonitor. + self.chain_monitor.flush(count, &self.logger); + let persister_res = *self.persister.update_ret.lock().unwrap(); + // Only update our local tracking state when the persister signals completion. When + // persistence is still in-progress, the monitors stay in the pending set so that a + // simulated restart can still reload from the last fully-persisted snapshot. + if persister_res == chain::ChannelMonitorUpdateStatus::Completed { + for (_channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() { + if let Some((id, data)) = state.pending_monitors.drain(..).last() { + state.persisted_monitor_id = id; + state.persisted_monitor = data; + } + } + } + } } impl chain::Watch for TestChainMonitor { fn watch_channel( @@ -299,6 +330,9 @@ impl chain::Watch for TestChainMonitor { monitor.write(&mut ser).unwrap(); let monitor_id = monitor.get_latest_update_id(); let res = self.chain_monitor.watch_channel(channel_id, monitor); + if self.deferred { + assert_eq!(res, Ok(chain::ChannelMonitorUpdateStatus::InProgress)); + } let state = match res { Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState { persisted_monitor_id: monitor_id, @@ -348,6 +382,9 @@ impl chain::Watch for TestChainMonitor { let mut ser = VecWriter(Vec::new()); deserialized_monitor.write(&mut ser).unwrap(); let res = self.chain_monitor.update_channel(channel_id, update); + if self.deferred { + assert_eq!(res, chain::ChannelMonitorUpdateStatus::InProgress); + } match res { chain::ChannelMonitorUpdateStatus::Completed => { map_entry.persisted_monitor_id = update.update_id; @@ -364,6 +401,9 @@ impl chain::Watch for TestChainMonitor { fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { + if self.deferred { + self.flush_and_update_latest_monitors(); + } return self.chain_monitor.release_pending_monitor_events(); } } @@ -891,6 +931,11 @@ pub fn do_test( ChannelMonitorUpdateStatus::Completed }), ]; + let deferred = [ + initial_mon_styles & 0b001_000 != 0, + initial_mon_styles & 0b010_000 != 0, + initial_mon_styles & 0b100_000 != 0, + ]; let mut chain_state = ChainState::new(); let mut node_height_a: u32 = 0; @@ -919,6 +964,7 @@ pub fn do_test( update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()), }), Arc::clone(&keys_manager), + deferred[$node_id as usize], )); let mut config = UserConfig::default(); @@ -971,6 +1017,7 @@ pub fn do_test( update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed), }), Arc::clone(keys), + deferred[node_id as usize], )); let mut config = UserConfig::default(); @@ -1037,18 +1084,28 @@ pub fn do_test( let manager = <(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager"); let res = (manager.1, chain_monitor.clone()); + let expected_status = if deferred[node_id as usize] { + ChannelMonitorUpdateStatus::InProgress + } else { + ChannelMonitorUpdateStatus::Completed + }; for (channel_id, mon) in monitors.drain() { assert_eq!( chain_monitor.chain_monitor.watch_channel(channel_id, mon), - Ok(ChannelMonitorUpdateStatus::Completed) + Ok(expected_status) ); } + if deferred[node_id as usize] { + let count = chain_monitor.chain_monitor.pending_operation_count(); + chain_monitor.chain_monitor.flush(count, &chain_monitor.logger); + } *chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow(); res }; macro_rules! complete_all_pending_monitor_updates { ($monitor: expr) => {{ + $monitor.flush_and_update_latest_monitors(); for (channel_id, state) in $monitor.latest_monitors.lock().unwrap().iter_mut() { for (id, data) in state.pending_monitors.drain(..) { $monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap(); @@ -2008,6 +2065,7 @@ pub fn do_test( |monitor: &Arc, chan_funding, compl_selector: &dyn Fn(&mut Vec<(u64, Vec)>) -> Option<(u64, Vec)>| { + monitor.flush_and_update_latest_monitors(); if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) { assert!( state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), @@ -2024,6 +2082,7 @@ pub fn do_test( }; let complete_all_monitor_updates = |monitor: &Arc, chan_id| { + monitor.flush_and_update_latest_monitors(); if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { assert!( state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),