Skip to content

Commit 159b30c

Browse files
joostjagerclaude
andcommitted
Add deferred monitor completion signaling to ChainMonitor
When ChainMonitor is constructed with deferred=true, monitor operations (persist, insert, apply update) still execute inline, but completion signals are held back in a queue. pending_operation_count() returns the queue length and flush(count) delivers up to that many completions via channel_monitor_updated(). The public channel_monitor_updated() method checks the deferred flag and queues completions rather than resolving them immediately. This ensures that both synchronous persistence completions and external async callers are properly deferred. flush() calls an internal non-deferring variant to actually deliver the signals. The BackgroundProcessor snapshots the pending count before persisting the ChannelManager, then flushes that many completions afterward. This ensures the ChannelManager is always persisted before its associated monitor completions are signaled, avoiding force closures from a crash between monitor and channel manager persistence. A test is added covering the interaction between deferred mode and async persistence (persister returning InProgress), verifying the two-phase completion flow: persister signals completion via channel_monitor_updated (queued into deferred_completions), then flush delivers them. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6f286df commit 159b30c

4 files changed

Lines changed: 616 additions & 16 deletions

File tree

lightning-background-processor/src/lib.rs

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1120,6 +1120,10 @@ where
11201120

11211121
let mut futures = Joiner::new();
11221122

1123+
// Capture the pending count before persisting. Only this many writes will be
1124+
// flushed afterward, so that updates arriving after persist aren't included.
1125+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
1126+
11231127
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11241128
log_trace!(logger, "Persisting ChannelManager...");
11251129

@@ -1317,6 +1321,10 @@ where
13171321
res?;
13181322
}
13191323

1324+
// Flush monitor operations that were pending before we persisted. New updates
1325+
// that arrived after are left for the next iteration.
1326+
chain_monitor.get_cm().flush(pending_monitor_writes);
1327+
13201328
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13211329
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13221330
}) {
@@ -1373,6 +1381,7 @@ where
13731381
// After we exit, ensure we persist the ChannelManager one final time - this avoids
13741382
// some races where users quit while channel updates were in-flight, with
13751383
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1384+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
13761385
kv_store
13771386
.write(
13781387
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1381,6 +1390,10 @@ where
13811390
channel_manager.get_cm().encode(),
13821391
)
13831392
.await?;
1393+
1394+
// Flush monitor operations that were pending before final persistence.
1395+
chain_monitor.get_cm().flush(pending_monitor_writes);
1396+
13841397
if let Some(ref scorer) = scorer {
13851398
kv_store
13861399
.write(
@@ -1684,6 +1697,11 @@ impl BackgroundProcessor {
16841697
channel_manager.get_cm().timer_tick_occurred();
16851698
last_freshness_call = Instant::now();
16861699
}
1700+
1701+
// Capture the pending count before persisting. Only this many writes will be
1702+
// flushed afterward, so that updates arriving after persist aren't included.
1703+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
1704+
16871705
if channel_manager.get_cm().get_and_clear_needs_persistence() {
16881706
log_trace!(logger, "Persisting ChannelManager...");
16891707
(kv_store.write(
@@ -1695,6 +1713,10 @@ impl BackgroundProcessor {
16951713
log_trace!(logger, "Done persisting ChannelManager.");
16961714
}
16971715

1716+
// Flush monitor operations that were pending before we persisted. New
1717+
// updates that arrived after are left for the next iteration.
1718+
chain_monitor.get_cm().flush(pending_monitor_writes);
1719+
16981720
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
16991721
log_trace!(logger, "Persisting LiquidityManager...");
17001722
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1809,12 +1831,17 @@ impl BackgroundProcessor {
18091831
// After we exit, ensure we persist the ChannelManager one final time - this avoids
18101832
// some races where users quit while channel updates were in-flight, with
18111833
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1834+
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
18121835
kv_store.write(
18131836
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
18141837
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
18151838
CHANNEL_MANAGER_PERSISTENCE_KEY,
18161839
channel_manager.get_cm().encode(),
18171840
)?;
1841+
1842+
// Flush monitor operations that were pending before final persistence.
1843+
chain_monitor.get_cm().flush(pending_monitor_writes);
1844+
18181845
if let Some(ref scorer) = scorer {
18191846
kv_store.write(
18201847
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -1896,9 +1923,10 @@ mod tests {
18961923
use bitcoin::transaction::{Transaction, TxOut};
18971924
use bitcoin::{Amount, ScriptBuf, Txid};
18981925
use core::sync::atomic::{AtomicBool, Ordering};
1926+
use lightning::chain::chainmonitor;
18991927
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19001928
use lightning::chain::transaction::OutPoint;
1901-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1929+
use lightning::chain::{BestBlock, Confirm, Filter};
19021930
use lightning::events::{Event, PathFailure, ReplayEvent};
19031931
use lightning::ln::channelmanager;
19041932
use lightning::ln::channelmanager::{
@@ -2444,6 +2472,7 @@ mod tests {
24442472
Arc::clone(&kv_store),
24452473
Arc::clone(&keys_manager),
24462474
keys_manager.get_peer_storage_key(),
2475+
true,
24472476
));
24482477
let best_block = BestBlock::from_network(network);
24492478
let params = ChainParameters { network, best_block };
@@ -2567,6 +2596,8 @@ mod tests {
25672596
(persist_dir, nodes)
25682597
}
25692598

2599+
/// Opens a channel between two nodes without a running `BackgroundProcessor`,
2600+
/// so deferred monitor operations are flushed manually at each step.
25702601
macro_rules! open_channel {
25712602
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
25722603
begin_open_channel!($node_a, $node_b, $channel_value);
@@ -2582,19 +2613,27 @@ mod tests {
25822613
tx.clone(),
25832614
)
25842615
.unwrap();
2616+
// funding_transaction_generated does not call watch_channel, so no
2617+
// deferred op is queued and FundingCreated is available immediately.
25852618
let msg_a = get_event_msg!(
25862619
$node_a,
25872620
MessageSendEvent::SendFundingCreated,
25882621
$node_b.node.get_our_node_id()
25892622
);
25902623
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2624+
// Flush node_b's new monitor (watch_channel) so it releases the
2625+
// FundingSigned message.
2626+
$node_b.chain_monitor.flush($node_b.chain_monitor.pending_operation_count());
25912627
get_event!($node_b, Event::ChannelPending);
25922628
let msg_b = get_event_msg!(
25932629
$node_b,
25942630
MessageSendEvent::SendFundingSigned,
25952631
$node_a.node.get_our_node_id()
25962632
);
25972633
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2634+
// Flush node_a's new monitor (watch_channel) queued by
2635+
// handle_funding_signed.
2636+
$node_a.chain_monitor.flush($node_a.chain_monitor.pending_operation_count());
25982637
get_event!($node_a, Event::ChannelPending);
25992638
tx
26002639
}};
@@ -2720,6 +2759,20 @@ mod tests {
27202759
confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
27212760
}
27222761

2762+
/// Waits until the background processor has flushed all pending deferred monitor
2763+
/// operations for the given node. Panics if the pending count does not reach zero
2764+
/// within `EVENT_DEADLINE`.
2765+
fn wait_for_flushed(chain_monitor: &ChainMonitor) {
2766+
let start = std::time::Instant::now();
2767+
while chain_monitor.pending_operation_count() > 0 {
2768+
assert!(
2769+
start.elapsed() < EVENT_DEADLINE,
2770+
"Pending monitor operations were not flushed within deadline"
2771+
);
2772+
std::thread::sleep(Duration::from_millis(10));
2773+
}
2774+
}
2775+
27232776
#[test]
27242777
fn test_background_processor() {
27252778
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
@@ -3060,11 +3113,19 @@ mod tests {
30603113
.node
30613114
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
30623115
.unwrap();
3116+
// funding_transaction_generated does not call watch_channel, so no deferred op is
3117+
// queued and the FundingCreated message is available immediately.
30633118
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
30643119
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3120+
// Node 1 has no bg processor, flush its new monitor (watch_channel) manually so
3121+
// events and FundingSigned are released.
3122+
nodes[1].chain_monitor.flush(nodes[1].chain_monitor.pending_operation_count());
30653123
get_event!(nodes[1], Event::ChannelPending);
30663124
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
30673125
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3126+
// Wait for the bg processor to flush the new monitor (watch_channel) queued by
3127+
// handle_funding_signed.
3128+
wait_for_flushed(&nodes[0].chain_monitor);
30683129
channel_pending_recv
30693130
.recv_timeout(EVENT_DEADLINE)
30703131
.expect("ChannelPending not handled within deadline");
@@ -3125,6 +3186,9 @@ mod tests {
31253186
error_message.to_string(),
31263187
)
31273188
.unwrap();
3189+
// Wait for the bg processor to flush the monitor update triggered by force close
3190+
// so the commitment tx is broadcast.
3191+
wait_for_flushed(&nodes[0].chain_monitor);
31283192
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31293193
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31303194

0 commit comments

Comments
 (0)