@@ -773,6 +773,17 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
773773/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
774774/// no time is available, some features may be disabled, however the node will still operate fine.
775775///
776+ /// Note that when deferred monitor writes are enabled on [`ChainMonitor`], this function flushes
777+ /// pending writes after persisting the [`ChannelManager`]. If the [`Persist`] implementation
778+ /// performs blocking I/O and returns [`Completed`] synchronously rather than returning
779+ /// [`InProgress`], this will block the async executor.
780+ ///
781+ /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
782+ /// [`Persist`]: lightning::chain::chainmonitor::Persist
783+ /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
784+ /// [`Completed`]: lightning::chain::ChannelMonitorUpdateStatus::Completed
785+ /// [`InProgress`]: lightning::chain::ChannelMonitorUpdateStatus::InProgress
786+ ///
776787/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
777788/// could setup `process_events_async` like this:
778789/// ```
@@ -1116,9 +1127,18 @@ where
11161127 None => { } ,
11171128 }
11181129
1130+ // We capture pending_operation_count inside the persistence branch to
1131+ // avoid a race: ChannelManager handlers queue deferred monitor ops
1132+ // before the persistence flag is set. Capturing outside would let us
1133+ // observe pending ops while the flag is still unset, causing us to
1134+ // flush monitor writes without persisting the ChannelManager.
1135+ // Declared before futures so it outlives the Joiner (drop order).
1136+ let pending_monitor_writes;
1137+
11191138 let mut futures = Joiner :: new ( ) ;
11201139
11211140 if channel_manager. get_cm ( ) . get_and_clear_needs_persistence ( ) {
1141+ pending_monitor_writes = chain_monitor. get_cm ( ) . pending_operation_count ( ) ;
11221142 log_trace ! ( logger, "Persisting ChannelManager..." ) ;
11231143
11241144 let fut = async {
@@ -1129,7 +1149,12 @@ where
11291149 CHANNEL_MANAGER_PERSISTENCE_KEY ,
11301150 channel_manager. get_cm ( ) . encode ( ) ,
11311151 )
1132- . await
1152+ . await ?;
1153+
1154+ // Flush monitor operations that were pending before we persisted. New updates
1155+ // that arrived after are left for the next iteration.
1156+ chain_monitor. get_cm ( ) . flush ( pending_monitor_writes, & logger) ;
1157+ Ok ( ( ) )
11331158 } ;
11341159 // TODO: Once our MSRV is 1.68 we should be able to drop the Box
11351160 let mut fut = Box :: pin ( fut) ;
@@ -1371,6 +1396,7 @@ where
13711396 // After we exit, ensure we persist the ChannelManager one final time - this avoids
13721397 // some races where users quit while channel updates were in-flight, with
13731398 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1399+ let pending_monitor_writes = chain_monitor. get_cm ( ) . pending_operation_count ( ) ;
13741400 kv_store
13751401 . write (
13761402 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
@@ -1379,6 +1405,10 @@ where
13791405 channel_manager. get_cm ( ) . encode ( ) ,
13801406 )
13811407 . await ?;
1408+
1409+ // Flush monitor operations that were pending before final persistence.
1410+ chain_monitor. get_cm ( ) . flush ( pending_monitor_writes, & logger) ;
1411+
13821412 if let Some ( ref scorer) = scorer {
13831413 kv_store
13841414 . write (
@@ -1682,7 +1712,15 @@ impl BackgroundProcessor {
16821712 channel_manager. get_cm ( ) . timer_tick_occurred ( ) ;
16831713 last_freshness_call = Instant :: now ( ) ;
16841714 }
1715+
16851716 if channel_manager. get_cm ( ) . get_and_clear_needs_persistence ( ) {
1717+ // We capture pending_operation_count inside the persistence
1718+ // branch to avoid a race: ChannelManager handlers queue
1719+ // deferred monitor ops before the persistence flag is set.
1720+ // Capturing outside would let us observe pending ops while
1721+ // the flag is still unset, causing us to flush monitor
1722+ // writes without persisting the ChannelManager.
1723+ let pending_monitor_writes = chain_monitor. get_cm ( ) . pending_operation_count ( ) ;
16861724 log_trace ! ( logger, "Persisting ChannelManager..." ) ;
16871725 ( kv_store. write (
16881726 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
@@ -1691,6 +1729,10 @@ impl BackgroundProcessor {
16911729 channel_manager. get_cm ( ) . encode ( ) ,
16921730 ) ) ?;
16931731 log_trace ! ( logger, "Done persisting ChannelManager." ) ;
1732+
1733+ // Flush monitor operations that were pending before we persisted.
1734+ // New updates that arrived after are left for the next iteration.
1735+ chain_monitor. get_cm ( ) . flush ( pending_monitor_writes, & logger) ;
16941736 }
16951737
16961738 if let Some ( liquidity_manager) = liquidity_manager. as_ref ( ) {
@@ -1807,12 +1849,17 @@ impl BackgroundProcessor {
18071849 // After we exit, ensure we persist the ChannelManager one final time - this avoids
18081850 // some races where users quit while channel updates were in-flight, with
18091851 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1852+ let pending_monitor_writes = chain_monitor. get_cm ( ) . pending_operation_count ( ) ;
18101853 kv_store. write (
18111854 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
18121855 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
18131856 CHANNEL_MANAGER_PERSISTENCE_KEY ,
18141857 channel_manager. get_cm ( ) . encode ( ) ,
18151858 ) ?;
1859+
1860+ // Flush monitor operations that were pending before final persistence.
1861+ chain_monitor. get_cm ( ) . flush ( pending_monitor_writes, & logger) ;
1862+
18161863 if let Some ( ref scorer) = scorer {
18171864 kv_store. write (
18181865 SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
@@ -1894,9 +1941,10 @@ mod tests {
18941941 use bitcoin:: transaction:: { Transaction , TxOut } ;
18951942 use bitcoin:: { Amount , ScriptBuf , Txid } ;
18961943 use core:: sync:: atomic:: { AtomicBool , Ordering } ;
1944+ use lightning:: chain:: chainmonitor;
18971945 use lightning:: chain:: channelmonitor:: ANTI_REORG_DELAY ;
18981946 use lightning:: chain:: transaction:: OutPoint ;
1899- use lightning:: chain:: { chainmonitor , BestBlock , Confirm } ;
1947+ use lightning:: chain:: { BestBlock , Confirm } ;
19001948 use lightning:: events:: { Event , PathFailure , ReplayEvent } ;
19011949 use lightning:: ln:: channelmanager;
19021950 use lightning:: ln:: channelmanager:: {
@@ -2441,6 +2489,7 @@ mod tests {
24412489 Arc :: clone ( & kv_store) ,
24422490 Arc :: clone ( & keys_manager) ,
24432491 keys_manager. get_peer_storage_key ( ) ,
2492+ true ,
24442493 ) ) ;
24452494 let best_block = BestBlock :: from_network ( network) ;
24462495 let params = ChainParameters { network, best_block } ;
@@ -2562,6 +2611,8 @@ mod tests {
25622611 ( persist_dir, nodes)
25632612 }
25642613
2614+ /// Opens a channel between two nodes without a running `BackgroundProcessor`,
2615+ /// so deferred monitor operations are flushed manually at each step.
25652616 macro_rules! open_channel {
25662617 ( $node_a: expr, $node_b: expr, $channel_value: expr) => { {
25672618 begin_open_channel!( $node_a, $node_b, $channel_value) ;
@@ -2577,19 +2628,31 @@ mod tests {
25772628 tx. clone( ) ,
25782629 )
25792630 . unwrap( ) ;
2631+ // funding_transaction_generated does not call watch_channel, so no
2632+ // deferred op is queued and FundingCreated is available immediately.
25802633 let msg_a = get_event_msg!(
25812634 $node_a,
25822635 MessageSendEvent :: SendFundingCreated ,
25832636 $node_b. node. get_our_node_id( )
25842637 ) ;
25852638 $node_b. node. handle_funding_created( $node_a. node. get_our_node_id( ) , & msg_a) ;
2639+ // Flush node_b's new monitor (watch_channel) so it releases the
2640+ // FundingSigned message.
2641+ $node_b
2642+ . chain_monitor
2643+ . flush( $node_b. chain_monitor. pending_operation_count( ) , & $node_b. logger) ;
25862644 get_event!( $node_b, Event :: ChannelPending ) ;
25872645 let msg_b = get_event_msg!(
25882646 $node_b,
25892647 MessageSendEvent :: SendFundingSigned ,
25902648 $node_a. node. get_our_node_id( )
25912649 ) ;
25922650 $node_a. node. handle_funding_signed( $node_b. node. get_our_node_id( ) , & msg_b) ;
2651+ // Flush node_a's new monitor (watch_channel) queued by
2652+ // handle_funding_signed.
2653+ $node_a
2654+ . chain_monitor
2655+ . flush( $node_a. chain_monitor. pending_operation_count( ) , & $node_a. logger) ;
25932656 get_event!( $node_a, Event :: ChannelPending ) ;
25942657 tx
25952658 } } ;
@@ -2715,6 +2778,20 @@ mod tests {
27152778 confirm_transaction_depth ( node, tx, ANTI_REORG_DELAY ) ;
27162779 }
27172780
2781+ /// Waits until the background processor has flushed all pending deferred monitor
2782+ /// operations for the given node. Panics if the pending count does not reach zero
2783+ /// within `EVENT_DEADLINE`.
2784+ fn wait_for_flushed ( chain_monitor : & ChainMonitor ) {
2785+ let start = std:: time:: Instant :: now ( ) ;
2786+ while chain_monitor. pending_operation_count ( ) > 0 {
2787+ assert ! (
2788+ start. elapsed( ) < EVENT_DEADLINE ,
2789+ "Pending monitor operations were not flushed within deadline"
2790+ ) ;
2791+ std:: thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
2792+ }
2793+ }
2794+
27182795 #[ test]
27192796 fn test_background_processor ( ) {
27202797 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
@@ -3055,11 +3132,21 @@ mod tests {
30553132 . node
30563133 . funding_transaction_generated ( temporary_channel_id, node_1_id, funding_tx. clone ( ) )
30573134 . unwrap ( ) ;
3135+ // funding_transaction_generated does not call watch_channel, so no deferred op is
3136+ // queued and the FundingCreated message is available immediately.
30583137 let msg_0 = get_event_msg ! ( nodes[ 0 ] , MessageSendEvent :: SendFundingCreated , node_1_id) ;
30593138 nodes[ 1 ] . node . handle_funding_created ( node_0_id, & msg_0) ;
3139+ // Node 1 has no bg processor, flush its new monitor (watch_channel) manually so
3140+ // events and FundingSigned are released.
3141+ nodes[ 1 ]
3142+ . chain_monitor
3143+ . flush ( nodes[ 1 ] . chain_monitor . pending_operation_count ( ) , & nodes[ 1 ] . logger ) ;
30603144 get_event ! ( nodes[ 1 ] , Event :: ChannelPending ) ;
30613145 let msg_1 = get_event_msg ! ( nodes[ 1 ] , MessageSendEvent :: SendFundingSigned , node_0_id) ;
30623146 nodes[ 0 ] . node . handle_funding_signed ( node_1_id, & msg_1) ;
3147+ // Wait for the bg processor to flush the new monitor (watch_channel) queued by
3148+ // handle_funding_signed.
3149+ wait_for_flushed ( & nodes[ 0 ] . chain_monitor ) ;
30633150 channel_pending_recv
30643151 . recv_timeout ( EVENT_DEADLINE )
30653152 . expect ( "ChannelPending not handled within deadline" ) ;
@@ -3120,6 +3207,9 @@ mod tests {
31203207 error_message. to_string ( ) ,
31213208 )
31223209 . unwrap ( ) ;
3210+ // Wait for the bg processor to flush the monitor update triggered by force close
3211+ // so the commitment tx is broadcast.
3212+ wait_for_flushed ( & nodes[ 0 ] . chain_monitor ) ;
31233213 let commitment_tx = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) . unwrap ( ) ;
31243214 confirm_transaction_depth ( & mut nodes[ 0 ] , & commitment_tx, BREAKDOWN_TIMEOUT as u32 ) ;
31253215
0 commit comments