Skip to content

Commit 30d05ca

Browse files
joostjagerclaude
andcommitted
Implement deferred monitor write queueing and flushing
Replace the unimplemented!() stubs with a full deferred write implementation. When ChainMonitor has deferred=true, Watch trait operations queue PendingMonitorOp entries instead of executing immediately. A new flush() method drains the queue and forwards operations to the internal watch/update methods, calling channel_monitor_updated on Completed status. The BackgroundProcessor is updated to capture pending_operation_count before persisting the ChannelManager, then flush that many writes afterward - ensuring monitor writes happen in the correct order relative to manager persistence. Key changes: - Add PendingMonitorOp enum and pending_ops queue to ChainMonitor - Implement flush() and pending_operation_count() public methods - Integrate flush calls in BackgroundProcessor (both sync and async) - Add TestChainMonitor::new_deferred, flush helpers, and auto-flush in release_pending_monitor_events for test compatibility - Add create_node_cfgs_deferred for deferred-mode test networks - Add unit tests for queue/flush mechanics and full payment flow Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 5d4e081 commit 30d05ca

4 files changed

Lines changed: 672 additions & 19 deletions

File tree

lightning-background-processor/src/lib.rs

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,6 +1120,10 @@ where
11201120

11211121
let mut futures = Joiner::new();
11221122

1123+
// Capture the pending count before persisting. Only this many writes will be
1124+
// flushed afterward, so that updates arriving after persist aren't included.
1125+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
1126+
11231127
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11241128
log_trace!(logger, "Persisting ChannelManager...");
11251129

@@ -1317,6 +1321,10 @@ where
13171321
res?;
13181322
}
13191323

1324+
// Flush monitor writes that were pending before we persisted. New updates that
1325+
// arrived after are left for the next iteration.
1326+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1327+
13201328
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13211329
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13221330
}) {
@@ -1373,6 +1381,7 @@ where
13731381
// After we exit, ensure we persist the ChannelManager one final time - this avoids
13741382
// some races where users quit while channel updates were in-flight, with
13751383
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1384+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
13761385
kv_store
13771386
.write(
13781387
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1381,6 +1390,10 @@ where
13811390
channel_manager.get_cm().encode(),
13821391
)
13831392
.await?;
1393+
1394+
// Flush monitor writes that were pending before final persistence.
1395+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1396+
13841397
if let Some(ref scorer) = scorer {
13851398
kv_store
13861399
.write(
@@ -1571,7 +1584,8 @@ impl BackgroundProcessor {
15711584
) -> Self
15721585
where
15731586
L::Target: 'static + Logger,
1574-
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
1587+
M::Target:
1588+
'static + AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
15751589
CM::Target: AChannelManager,
15761590
OM::Target: AOnionMessenger,
15771591
PM::Target: APeerManager,
@@ -1684,6 +1698,11 @@ impl BackgroundProcessor {
16841698
channel_manager.get_cm().timer_tick_occurred();
16851699
last_freshness_call = Instant::now();
16861700
}
1701+
1702+
// Capture the pending count before persisting. Only this many writes will be
1703+
// flushed afterward, so that updates arriving after persist aren't included.
1704+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
1705+
16871706
if channel_manager.get_cm().get_and_clear_needs_persistence() {
16881707
log_trace!(logger, "Persisting ChannelManager...");
16891708
(kv_store.write(
@@ -1695,6 +1714,10 @@ impl BackgroundProcessor {
16951714
log_trace!(logger, "Done persisting ChannelManager.");
16961715
}
16971716

1717+
// Flush monitor writes that were pending before we persisted. New updates
1718+
// that arrived after are left for the next iteration.
1719+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1720+
16981721
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
16991722
log_trace!(logger, "Persisting LiquidityManager...");
17001723
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1809,12 +1832,17 @@ impl BackgroundProcessor {
18091832
// After we exit, ensure we persist the ChannelManager one final time - this avoids
18101833
// some races where users quit while channel updates were in-flight, with
18111834
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1835+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
18121836
kv_store.write(
18131837
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
18141838
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
18151839
CHANNEL_MANAGER_PERSISTENCE_KEY,
18161840
channel_manager.get_cm().encode(),
18171841
)?;
1842+
1843+
// Flush monitor writes that were pending before final persistence.
1844+
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
1845+
18181846
if let Some(ref scorer) = scorer {
18191847
kv_store.write(
18201848
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1896,9 +1924,10 @@ mod tests {
18961924
use bitcoin::transaction::{Transaction, TxOut};
18971925
use bitcoin::{Amount, ScriptBuf, Txid};
18981926
use core::sync::atomic::{AtomicBool, Ordering};
1927+
use lightning::chain::chainmonitor::ChainMonitor as LdkChainMonitor;
18991928
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19001929
use lightning::chain::transaction::OutPoint;
1901-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1930+
use lightning::chain::{BestBlock, Confirm, Filter};
19021931
use lightning::events::{Event, PathFailure, ReplayEvent};
19031932
use lightning::ln::channelmanager;
19041933
use lightning::ln::channelmanager::{
@@ -2008,7 +2037,7 @@ mod tests {
20082037
Arc<test_utils::TestLogger>,
20092038
>;
20102039

2011-
type ChainMonitor = chainmonitor::ChainMonitor<
2040+
type ChainMonitor = LdkChainMonitor<
20122041
InMemorySigner,
20132042
Arc<test_utils::TestChainSource>,
20142043
Arc<test_utils::TestBroadcaster>,
@@ -2436,14 +2465,15 @@ mod tests {
24362465
let now = Duration::from_secs(genesis_block.header.time as u64);
24372466
let keys_manager =
24382467
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2439-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2468+
let chain_monitor = Arc::new(LdkChainMonitor::new(
24402469
Some(Arc::clone(&chain_source)),
24412470
Arc::clone(&tx_broadcaster),
24422471
Arc::clone(&logger),
24432472
Arc::clone(&fee_estimator),
24442473
Arc::clone(&kv_store),
24452474
Arc::clone(&keys_manager),
24462475
keys_manager.get_peer_storage_key(),
2476+
true,
24472477
));
24482478
let best_block = BestBlock::from_network(network);
24492479
let params = ChainParameters { network, best_block };
@@ -2580,19 +2610,31 @@ mod tests {
25802610
tx.clone(),
25812611
)
25822612
.unwrap();
2613+
// Flush deferred monitor operations so messages aren't held back
2614+
$node_a
2615+
.chain_monitor
2616+
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
25832617
let msg_a = get_event_msg!(
25842618
$node_a,
25852619
MessageSendEvent::SendFundingCreated,
25862620
$node_b.node.get_our_node_id()
25872621
);
25882622
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2623+
// Flush node_b's monitor so it releases the FundingSigned message
2624+
$node_b
2625+
.chain_monitor
2626+
.flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger);
25892627
get_event!($node_b, Event::ChannelPending);
25902628
let msg_b = get_event_msg!(
25912629
$node_b,
25922630
MessageSendEvent::SendFundingSigned,
25932631
$node_a.node.get_our_node_id()
25942632
);
25952633
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2634+
// Flush node_a's monitor for the final update
2635+
$node_a
2636+
.chain_monitor
2637+
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
25962638
get_event!($node_a, Event::ChannelPending);
25972639
tx
25982640
}};
@@ -3039,11 +3081,23 @@ mod tests {
30393081
.node
30403082
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
30413083
.unwrap();
3084+
// Flush node_0's deferred monitor operations so the FundingCreated message is released
3085+
nodes[0]
3086+
.chain_monitor
3087+
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
30423088
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
30433089
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3090+
// Flush node_1's deferred monitor operations so events and FundingSigned are released
3091+
nodes[1]
3092+
.chain_monitor
3093+
.flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger);
30443094
get_event!(nodes[1], Event::ChannelPending);
30453095
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
30463096
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3097+
// Flush node_0's monitor for the funding_signed update
3098+
nodes[0]
3099+
.chain_monitor
3100+
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
30473101
channel_pending_recv
30483102
.recv_timeout(EVENT_DEADLINE)
30493103
.expect("ChannelPending not handled within deadline");
@@ -3104,6 +3158,10 @@ mod tests {
31043158
error_message.to_string(),
31053159
)
31063160
.unwrap();
3161+
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
3162+
nodes[0]
3163+
.chain_monitor
3164+
.flush(nodes[0].chain_monitor.pending_operation_count(), &nodes[0].logger);
31073165
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31083166
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31093167

0 commit comments

Comments
 (0)