Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 65 additions & 66 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ use lightning::sign::{
use lightning::util::async_poll::MaybeSend;
use lightning::util::logger::Logger;
use lightning::util::persist::{
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
KVStore, KVStoreSync, KVStoreSyncWrapper, NETWORK_GRAPH_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
use lightning::util::wakers::Future;
Expand Down Expand Up @@ -1150,39 +1149,13 @@ where
None => {},
}

let mut futures = Joiner::new();
// Type A is unused but needed for inference - we use the same boxed future type as other slots
let mut futures: Joiner<lightning::io::Error, core::pin::Pin<Box<dyn core::future::Future<Output = Result<(), lightning::io::Error>> + Send + 'static>>, _, _, _, _> = Joiner::new();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");

let fut = async {
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)
.await
};
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
let mut fut = Box::pin(fut);

// Because persisting the ChannelManager is important to avoid accidental
// force-closures, go ahead and poll the future once before we do slightly more
// CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping
// below. This will get it moving but won't block us for too long if the underlying
// future is actually async.
use core::future::Future;
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
task::Poll::Ready(res) => futures.set_a_res(res),
task::Poll::Pending => futures.set_a(fut),
}

log_trace!(logger, "Done persisting ChannelManager.");
}
// Capture the number of pending monitor writes and whether manager needs persistence.
// We'll flush monitors and manager together in a single batch after other tasks complete.
let pending_monitor_writes = chain_monitor.pending_write_count();
let needs_manager_persist = channel_manager.get_cm().get_and_clear_needs_persistence();

// Note that we want to archive stale ChannelMonitors and run a network graph prune once
// not long after startup before falling back to their usual infrequent runs. This avoids
Expand Down Expand Up @@ -1349,6 +1322,17 @@ where
res?;
}

// Flush monitors and manager together in a single batch.
// Any monitor writes that arrived after are left in the queue for the next iteration.
if pending_monitor_writes > 0 || needs_manager_persist {
log_trace!(logger, "Persisting ChannelManager and flushing {} monitor writes...", pending_monitor_writes);
let manager_bytes = channel_manager.get_cm().encode();
match chain_monitor.flush(pending_monitor_writes, manager_bytes) {
Ok(()) => log_trace!(logger, "Flushed ChannelManager and {} monitor writes", pending_monitor_writes),
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
}
}

match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
sleeper(ONION_MESSAGE_HANDLER_TIMER)
}) {
Expand Down Expand Up @@ -1402,17 +1386,20 @@ where
}
log_trace!(logger, "Terminating background processor.");

// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)
.await?;
// After we exit, ensure we persist the ChannelManager one final time along with any
// pending monitor writes - this avoids some races where users quit while channel updates
// were in-flight, with ChannelMonitor update(s) persisted without a corresponding
// ChannelManager update.
let pending_monitor_writes = chain_monitor.pending_write_count();
let manager_bytes = channel_manager.get_cm().encode();
match chain_monitor.flush(pending_monitor_writes, manager_bytes) {
Ok(()) => log_trace!(logger, "Final flush: ChannelManager and {} monitor writes", pending_monitor_writes),
Err(e) => {
log_error!(logger, "Failed final flush: {}", e);
return Err(e);
},
}

if let Some(ref scorer) = scorer {
kv_store
.write(
Expand Down Expand Up @@ -1722,15 +1709,20 @@ impl BackgroundProcessor {
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = Instant::now();
}
if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");
(kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
))?;
log_trace!(logger, "Done persisting ChannelManager.");
// Capture the number of pending monitor writes and whether manager needs persistence.
let pending_monitor_writes = chain_monitor.pending_write_count();
let needs_manager_persist = channel_manager.get_cm().get_and_clear_needs_persistence();

// Flush monitors and manager together in a single batch.
if pending_monitor_writes > 0 || needs_manager_persist {
log_trace!(logger, "Persisting ChannelManager and flushing {} monitor writes...", pending_monitor_writes);
let manager_bytes = channel_manager.get_cm().encode();
match chain_monitor.flush(pending_monitor_writes, manager_bytes) {
Ok(()) => {
log_trace!(logger, "Flushed ChannelManager and {} monitor writes", pending_monitor_writes)
},
Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e),
}
}

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
Expand Down Expand Up @@ -1844,15 +1836,22 @@ impl BackgroundProcessor {
}
}

// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)?;
// After we exit, ensure we persist the ChannelManager one final time along with any
// pending monitor writes - this avoids some races where users quit while channel updates
// were in-flight, with ChannelMonitor update(s) persisted without a corresponding
// ChannelManager update.
let pending_monitor_writes = chain_monitor.pending_write_count();
let manager_bytes = channel_manager.get_cm().encode();
match chain_monitor.flush(pending_monitor_writes, manager_bytes) {
Ok(()) => {
log_trace!(logger, "Final flush: ChannelManager and {} monitor writes", pending_monitor_writes)
},
Err(e) => {
log_error!(logger, "Failed final flush: {}", e);
return Err(e.into());
},
}

if let Some(ref scorer) = scorer {
kv_store.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand Down
78 changes: 57 additions & 21 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::chain::channelmonitor::{
use crate::chain::transaction::{OutPoint, TransactionData};
use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput};
use crate::events::{self, Event, EventHandler, ReplayEvent};
use crate::io;
use crate::ln::channel_state::ChannelDetails;
#[cfg(peer_storage)]
use crate::ln::msgs::PeerStorage;
Expand Down Expand Up @@ -198,16 +199,27 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
/// the monitor already exists in the archive.
fn archive_persisted_channel(&self, monitor_name: MonitorName);

/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
/// [`Self::update_persisted_channel`], which have completed.
/// Returns the number of pending writes in the queue.
///
/// Returning an update here is equivalent to calling
/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
/// hidden in the docs.
#[doc(hidden)]
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
Vec::new()
/// This can be used to capture the queue size before persisting the channel manager,
/// then pass that count to [`Self::flush`] to only flush those specific updates.
fn pending_write_count(&self) -> usize {
0
}

/// Flushes pending writes to the underlying storage.
///
/// The `count` parameter specifies how many pending monitor writes to flush.
/// The `channel_manager_bytes` parameter contains the serialized channel manager to persist.
///
/// The channel manager is always written first in the batch, before any monitor writes,
/// to ensure proper ordering (manager state should be at least as recent as monitors on disk).
///
/// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
/// from persist methods), this method should write queued data to storage.
///
/// Returns the list of completed monitor updates (channel_id, update_id) that were flushed.
fn flush(&self, count: usize, channel_manager_bytes: Vec<u8>) -> Result<Vec<(ChannelId, u64)>, io::Error>;
}

struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
Expand Down Expand Up @@ -272,7 +284,6 @@ pub struct AsyncPersister<
FE::Target: FeeEstimator,
{
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
event_notifier: Arc<Notifier>,
}

impl<
Expand Down Expand Up @@ -320,26 +331,28 @@ where
&self, monitor_name: MonitorName,
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
) -> ChannelMonitorUpdateStatus {
let notifier = Arc::clone(&self.event_notifier);
self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
self.persister.queue_new_channel(monitor_name, monitor);
ChannelMonitorUpdateStatus::InProgress
}

fn update_persisted_channel(
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
) -> ChannelMonitorUpdateStatus {
let notifier = Arc::clone(&self.event_notifier);
self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
self.persister.queue_channel_update(monitor_name, monitor_update, monitor);
ChannelMonitorUpdateStatus::InProgress
}

fn archive_persisted_channel(&self, monitor_name: MonitorName) {
self.persister.spawn_async_archive_persisted_channel(monitor_name);
}

fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
self.persister.get_and_clear_completed_updates()
fn pending_write_count(&self) -> usize {
self.persister.pending_write_count()
}

fn flush(&self, count: usize, channel_manager_bytes: Vec<u8>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
crate::util::persist::poll_sync_future(self.persister.flush(count, channel_manager_bytes))
}
}

Expand Down Expand Up @@ -440,7 +453,6 @@ impl<
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
_our_peerstorage_encryption_key: PeerStorageKey,
) -> Self {
let event_notifier = Arc::new(Notifier::new());
Self {
monitors: RwLock::new(new_hash_map()),
chain_source,
Expand All @@ -450,8 +462,8 @@ impl<
_entropy_source,
pending_monitor_events: Mutex::new(Vec::new()),
highest_chain_height: AtomicUsize::new(0),
event_notifier: Arc::clone(&event_notifier),
persister: AsyncPersister { persister, event_notifier },
event_notifier: Arc::new(Notifier::new()),
persister: AsyncPersister { persister },
pending_send_only_events: Mutex::new(Vec::new()),
#[cfg(peer_storage)]
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
Expand Down Expand Up @@ -742,6 +754,33 @@ where
.collect()
}

/// Returns the number of pending writes in the persister queue.
///
/// This can be used to capture the queue size before persisting the channel manager,
/// then pass that count to [`Self::flush`] to only flush those specific updates.
pub fn pending_write_count(&self) -> usize {
self.persister.pending_write_count()
}

/// Flushes pending writes to the underlying storage.
///
/// The `count` parameter specifies how many pending monitor writes to flush.
/// The `channel_manager_bytes` parameter contains the serialized channel manager to persist.
///
/// The channel manager is always written first in the batch, before any monitor writes,
/// to ensure proper ordering (manager state should be at least as recent as monitors on disk).
///
/// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
/// from persist methods), this method writes queued data to storage and signals
/// completion to the channel manager via [`Self::channel_monitor_updated`].
pub fn flush(&self, count: usize, channel_manager_bytes: Vec<u8>) -> Result<(), io::Error> {
let completed = self.persister.flush(count, channel_manager_bytes)?;
for (channel_id, update_id) in completed {
let _ = self.channel_monitor_updated(channel_id, update_id);
}
Ok(())
}

#[cfg(any(test, feature = "_test_utils"))]
pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor<ChannelSigner> {
self.monitors.write().unwrap().remove(channel_id).unwrap().monitor
Expand Down Expand Up @@ -1497,9 +1536,6 @@ where
fn release_pending_monitor_events(
&self,
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
let _ = self.channel_monitor_updated(channel_id, update_id);
}
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
Expand Down
Loading