Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 116 additions & 43 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use fwd_batch::BatchDelay;

use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
use lightning::chain::chainmonitor::Persist;
use lightning::chain::deferred::DeferredChainMonitor;
#[cfg(feature = "std")]
use lightning::events::EventHandler;
#[cfg(feature = "std")]
Expand Down Expand Up @@ -101,7 +102,7 @@ use alloc::vec::Vec;
/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
/// writing it to disk/backups by invoking the callback given to it at startup.
/// [`ChannelManager`] persistence should be done in the background.
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
/// * Calling [`ChannelManager::timer_tick_occurred`], [`lightning::chain::chainmonitor::ChainMonitor::rebroadcast_pending_claims`]
/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
Expand Down Expand Up @@ -853,7 +854,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
/// # fn disconnect_socket(&mut self) {}
/// # }
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
/// # type ChainMonitor<B, F, FE> = lightning::chain::deferred::DeferredChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
Expand Down Expand Up @@ -964,7 +965,9 @@ pub async fn process_events_async<
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
ES: Deref,
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
M: Deref<
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
>,
CM: Deref,
OM: Deref,
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
Expand Down Expand Up @@ -1152,6 +1155,11 @@ where

let mut futures = Joiner::new();

// Capture the number of pending monitor writes before persisting the channel manager.
// We'll only flush this many writes after the manager is persisted, to avoid flushing
// monitor updates that arrived after the manager state was captured.
let pending_monitor_writes = chain_monitor.pending_operation_count();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");

Expand Down Expand Up @@ -1349,6 +1357,15 @@ where
res?;
}

// Flush the monitor writes that were pending before we persisted the channel manager.
// Any writes that arrived after are left in the queue for the next iteration. There's
// no need to "chase the tail" by processing new updates that arrive during flushing -
// they'll be handled in the next round.
if pending_monitor_writes > 0 {
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
chain_monitor.flush(pending_monitor_writes);
}

match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
sleeper(ONION_MESSAGE_HANDLER_TIMER)
}) {
Expand Down Expand Up @@ -1413,6 +1430,14 @@ where
channel_manager.get_cm().encode(),
)
.await?;

// Flush all pending monitor writes after final channel manager persistence.
let pending_monitor_writes = chain_monitor.pending_operation_count();
if pending_monitor_writes > 0 {
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
chain_monitor.flush(pending_monitor_writes);
}

if let Some(ref scorer) = scorer {
kv_store
.write(
Expand Down Expand Up @@ -1465,7 +1490,9 @@ pub async fn process_events_async_with_kv_store_sync<
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
ES: Deref,
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
M: Deref<
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
>,
CM: Deref,
OM: Deref,
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
Expand Down Expand Up @@ -1580,7 +1607,15 @@ impl BackgroundProcessor {
ES: 'static + Deref + Send,
M: 'static
+ Deref<
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
Target = DeferredChainMonitor<
<CM::Target as AChannelManager>::Signer,
CF,
T,
F,
L,
P,
ES,
>,
>
+ Send
+ Sync,
Expand Down Expand Up @@ -1722,6 +1757,10 @@ impl BackgroundProcessor {
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = Instant::now();
}

// Capture the number of pending monitor writes before persisting the channel manager.
let pending_monitor_writes = chain_monitor.pending_operation_count();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");
(kv_store.write(
Expand All @@ -1733,6 +1772,14 @@ impl BackgroundProcessor {
log_trace!(logger, "Done persisting ChannelManager.");
}

// Flush the monitor writes that were pending before we persisted the channel manager.
// There's no need to "chase the tail" by processing new updates that arrive during
// flushing - they'll be handled in the next round.
if pending_monitor_writes > 0 {
log_trace!(logger, "Flushing {} monitor writes", pending_monitor_writes);
chain_monitor.flush(pending_monitor_writes);
}

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
log_trace!(logger, "Persisting LiquidityManager...");
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
Expand Down Expand Up @@ -1853,6 +1900,18 @@ impl BackgroundProcessor {
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)?;

// Flush all pending monitor writes after final channel manager persistence.
let pending_monitor_writes = chain_monitor.pending_operation_count();
if pending_monitor_writes > 0 {
log_trace!(
logger,
"Flushing {} monitor writes on shutdown",
pending_monitor_writes
);
chain_monitor.flush(pending_monitor_writes);
}

if let Some(ref scorer) = scorer {
kv_store.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand Down Expand Up @@ -1936,7 +1995,7 @@ mod tests {
use core::sync::atomic::{AtomicBool, Ordering};
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::transaction::OutPoint;
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
use lightning::chain::{deferred, BestBlock, Confirm, Filter};
use lightning::events::{Event, PathFailure, ReplayEvent};
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{
Expand Down Expand Up @@ -1984,6 +2043,26 @@ mod tests {
const EVENT_DEADLINE: Duration =
Duration::from_millis(5 * (FRESHNESS_TIMER.as_millis() as u64));

/// Reads a directory and returns only non-`.tmp` files.
/// The file system may return files in any order, and during persistence
/// operations there may be temporary `.tmp` files present.
fn list_monitor_files(dir: &str) -> Vec<std::fs::DirEntry> {
std::fs::read_dir(dir)
.unwrap()
.filter_map(|entry| {
let entry = entry.unwrap();
let path_str = entry.path().to_str().unwrap().to_lowercase();
// Skip any .tmp files that may exist during persistence.
// On Windows, ReplaceFileW creates backup files with .TMP (uppercase).
if path_str.ends_with(".tmp") {
None
} else {
Some(entry)
}
})
.collect()
}

#[derive(Clone, Hash, PartialEq, Eq)]
struct TestDescriptor {}
impl SocketDescriptor for TestDescriptor {
Expand Down Expand Up @@ -2026,7 +2105,7 @@ mod tests {
Arc<test_utils::TestLogger>,
>;

type ChainMonitor = chainmonitor::ChainMonitor<
type ChainMonitor = deferred::DeferredChainMonitor<
InMemorySigner,
Arc<test_utils::TestChainSource>,
Arc<test_utils::TestBroadcaster>,
Expand Down Expand Up @@ -2454,7 +2533,7 @@ mod tests {
let now = Duration::from_secs(genesis_block.header.time as u64);
let keys_manager =
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
let chain_monitor = Arc::new(deferred::DeferredChainMonitor::new(
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Expand Down Expand Up @@ -2598,19 +2677,25 @@ mod tests {
tx.clone(),
)
.unwrap();
// Flush deferred monitor operations so messages aren't held back
$node_a.chain_monitor.flush_all();
let msg_a = get_event_msg!(
$node_a,
MessageSendEvent::SendFundingCreated,
$node_b.node.get_our_node_id()
);
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
// Flush node_b's monitor so it releases the FundingSigned message
$node_b.chain_monitor.flush_all();
get_event!($node_b, Event::ChannelPending);
let msg_b = get_event_msg!(
$node_b,
MessageSendEvent::SendFundingSigned,
$node_a.node.get_our_node_id()
);
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
// Flush node_a's monitor for the final update
$node_a.chain_monitor.flush_all();
get_event!($node_a, Event::ChannelPending);
tx
}};
Expand Down Expand Up @@ -3057,11 +3142,17 @@ mod tests {
.node
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
.unwrap();
// Flush node_0's deferred monitor operations so the FundingCreated message is released
nodes[0].chain_monitor.flush_all();
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
// Flush node_1's deferred monitor operations so events and FundingSigned are released
nodes[1].chain_monitor.flush_all();
get_event!(nodes[1], Event::ChannelPending);
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
// Flush node_0's monitor for the funding_signed update
nodes[0].chain_monitor.flush_all();
channel_pending_recv
.recv_timeout(EVENT_DEADLINE)
.expect("ChannelPending not handled within deadline");
Expand Down Expand Up @@ -3122,6 +3213,8 @@ mod tests {
error_message.to_string(),
)
.unwrap();
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
nodes[0].chain_monitor.flush_all();
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);

Expand Down Expand Up @@ -3787,30 +3880,20 @@ mod tests {
);

let dir = format!("{}_persister_1/monitors", &persist_dir);
let mut mons = std::fs::read_dir(&dir).unwrap();
let mut mon = mons.next().unwrap().unwrap();
if mon.path().to_str().unwrap().ends_with(".tmp") {
mon = mons.next().unwrap().unwrap();
assert_eq!(mon.path().extension(), None);
}
assert!(mons.next().is_none());
let mut mons = list_monitor_files(&dir);
assert_eq!(mons.len(), 1);
let mon = mons.pop().unwrap();

// Because the channel wasn't funded, we'll archive the ChannelMonitor immedaitely after
// its force-closed (at least on node B, which didn't put their money into it).
nodes[1].node.force_close_all_channels_broadcasting_latest_txn("".to_owned());
loop {
let mut mons = std::fs::read_dir(&dir).unwrap();
if let Some(new_mon) = mons.next() {
let mut new_mon = new_mon.unwrap();
if new_mon.path().to_str().unwrap().ends_with(".tmp") {
new_mon = mons.next().unwrap().unwrap();
assert_eq!(new_mon.path().extension(), None);
}
assert_eq!(new_mon.path(), mon.path());
assert!(mons.next().is_none());
} else {
let mons = list_monitor_files(&dir);
if mons.is_empty() {
break;
}
assert_eq!(mons.len(), 1);
assert_eq!(mons[0].path(), mon.path());
}

bp.stop().unwrap();
Expand Down Expand Up @@ -3855,30 +3938,20 @@ mod tests {
));

let dir = format!("{}_persister_1/monitors", &persist_dir);
let mut mons = std::fs::read_dir(&dir).unwrap();
let mut mon = mons.next().unwrap().unwrap();
if mon.path().to_str().unwrap().ends_with(".tmp") {
mon = mons.next().unwrap().unwrap();
assert_eq!(mon.path().extension(), None);
}
assert!(mons.next().is_none());
let mut mons = list_monitor_files(&dir);
assert_eq!(mons.len(), 1);
let mon = mons.pop().unwrap();

// Because the channel wasn't funded, we'll archive the ChannelMonitor immedaitely after
// its force-closed (at least on node B, which didn't put their money into it).
nodes[1].node.force_close_all_channels_broadcasting_latest_txn("".to_owned());
loop {
let mut mons = std::fs::read_dir(&dir).unwrap();
if let Some(new_mon) = mons.next() {
let mut new_mon = new_mon.unwrap();
if new_mon.path().to_str().unwrap().ends_with(".tmp") {
new_mon = mons.next().unwrap().unwrap();
assert_eq!(new_mon.path().extension(), None);
}
assert_eq!(new_mon.path(), mon.path());
assert!(mons.next().is_none());
} else {
let mons = list_monitor_files(&dir);
if mons.is_empty() {
break;
}
assert_eq!(mons.len(), 1);
assert_eq!(mons[0].path(), mon.path());
tokio::task::yield_now().await;
}

Expand Down
Loading
Loading