diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c38d6dfe080..7b31080fdda 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -57,11 +57,10 @@ use lightning::sign::{ use lightning::util::async_poll::MaybeSend; use lightning::util::logger::Logger; use lightning::util::persist::{ - KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStore, KVStoreSync, KVStoreSyncWrapper, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::sweep::{OutputSweeper, OutputSweeperSync}; use lightning::util::wakers::Future; @@ -1150,39 +1149,13 @@ where None => {}, } - let mut futures = Joiner::new(); + // Type A is unused but needed for inference - we use the same boxed future type as other slots + let mut futures: Joiner> + Send + 'static>>, _, _, _, _> = Joiner::new(); - if channel_manager.get_cm().get_and_clear_needs_persistence() { - log_trace!(logger, "Persisting ChannelManager..."); - - let fut = async { - kv_store - .write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - channel_manager.get_cm().encode(), - ) - .await - }; - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - let mut fut = Box::pin(fut); - - // Because persisting the ChannelManager is important to avoid accidental - // force-closures, go ahead and poll the future once before we do slightly more - // CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping - // below. This will get it moving but won't block us for too long if the underlying - // future is actually async. - use core::future::Future; - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(&mut fut).poll(&mut ctx) { - task::Poll::Ready(res) => futures.set_a_res(res), - task::Poll::Pending => futures.set_a(fut), - } - - log_trace!(logger, "Done persisting ChannelManager."); - } + // Capture the number of pending monitor writes and whether manager needs persistence. + // We'll flush monitors and manager together in a single batch after other tasks complete. + let pending_monitor_writes = chain_monitor.pending_write_count(); + let needs_manager_persist = channel_manager.get_cm().get_and_clear_needs_persistence(); // Note that we want to archive stale ChannelMonitors and run a network graph prune once // not long after startup before falling back to their usual infrequent runs. This avoids @@ -1349,6 +1322,17 @@ where res?; } + // Flush monitors and manager together in a single batch. + // Any monitor writes that arrived after are left in the queue for the next iteration. + if pending_monitor_writes > 0 || needs_manager_persist { + log_trace!(logger, "Persisting ChannelManager and flushing {} monitor writes...", pending_monitor_writes); + let manager_bytes = channel_manager.get_cm().encode(); + match chain_monitor.flush(pending_monitor_writes, manager_bytes) { + Ok(()) => log_trace!(logger, "Flushed ChannelManager and {} monitor writes", pending_monitor_writes), + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } + } + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { sleeper(ONION_MESSAGE_HANDLER_TIMER) }) { @@ -1402,17 +1386,20 @@ where } log_trace!(logger, "Terminating background processor."); - // After we exit, ensure we persist the ChannelManager one final time - this avoids - // some races where users quit while channel updates were in-flight, with - // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - kv_store - .write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - channel_manager.get_cm().encode(), - ) - .await?; + // After we exit, ensure we persist the ChannelManager one final time along with any + // pending monitor writes - this avoids some races where users quit while channel updates + // were in-flight, with ChannelMonitor update(s) persisted without a corresponding + // ChannelManager update. + let pending_monitor_writes = chain_monitor.pending_write_count(); + let manager_bytes = channel_manager.get_cm().encode(); + match chain_monitor.flush(pending_monitor_writes, manager_bytes) { + Ok(()) => log_trace!(logger, "Final flush: ChannelManager and {} monitor writes", pending_monitor_writes), + Err(e) => { + log_error!(logger, "Failed final flush: {}", e); + return Err(e); + }, + } + if let Some(ref scorer) = scorer { kv_store .write( @@ -1722,15 +1709,20 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } - if channel_manager.get_cm().get_and_clear_needs_persistence() { - log_trace!(logger, "Persisting ChannelManager..."); - (kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - channel_manager.get_cm().encode(), - ))?; - log_trace!(logger, "Done persisting ChannelManager."); + // Capture the number of pending monitor writes and whether manager needs persistence. + let pending_monitor_writes = chain_monitor.pending_write_count(); + let needs_manager_persist = channel_manager.get_cm().get_and_clear_needs_persistence(); + + // Flush monitors and manager together in a single batch. + if pending_monitor_writes > 0 || needs_manager_persist { + log_trace!(logger, "Persisting ChannelManager and flushing {} monitor writes...", pending_monitor_writes); + let manager_bytes = channel_manager.get_cm().encode(); + match chain_monitor.flush(pending_monitor_writes, manager_bytes) { + Ok(()) => { + log_trace!(logger, "Flushed ChannelManager and {} monitor writes", pending_monitor_writes) + }, + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } } if let Some(liquidity_manager) = liquidity_manager.as_ref() { @@ -1844,15 +1836,22 @@ impl BackgroundProcessor { } } - // After we exit, ensure we persist the ChannelManager one final time - this avoids - // some races where users quit while channel updates were in-flight, with - // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - channel_manager.get_cm().encode(), - )?; + // After we exit, ensure we persist the ChannelManager one final time along with any + // pending monitor writes - this avoids some races where users quit while channel updates + // were in-flight, with ChannelMonitor update(s) persisted without a corresponding + // ChannelManager update. + let pending_monitor_writes = chain_monitor.pending_write_count(); + let manager_bytes = channel_manager.get_cm().encode(); + match chain_monitor.flush(pending_monitor_writes, manager_bytes) { + Ok(()) => { + log_trace!(logger, "Final flush: ChannelManager and {} monitor writes", pending_monitor_writes) + }, + Err(e) => { + log_error!(logger, "Failed final flush: {}", e); + return Err(e.into()); + }, + } + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9fd6383cf7e..9ce45bf3fc9 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -39,6 +39,7 @@ use crate::chain::channelmonitor::{ use crate::chain::transaction::{OutPoint, TransactionData}; use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::events::{self, Event, EventHandler, ReplayEvent}; +use crate::io; use crate::ln::channel_state::ChannelDetails; #[cfg(peer_storage)] use crate::ln::msgs::PeerStorage; @@ -198,16 +199,27 @@ pub trait Persist { /// the monitor already exists in the archive. fn archive_persisted_channel(&self, monitor_name: MonitorName); - /// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with - /// [`Self::update_persisted_channel`], which have completed. + /// Returns the number of pending writes in the queue. /// - /// Returning an update here is equivalent to calling - /// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and - /// hidden in the docs. - #[doc(hidden)] - fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { - Vec::new() + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific updates. + fn pending_write_count(&self) -> usize { + 0 } + + /// Flushes pending writes to the underlying storage. + /// + /// The `count` parameter specifies how many pending monitor writes to flush. + /// The `channel_manager_bytes` parameter contains the serialized channel manager to persist. + /// + /// The channel manager is always written first in the batch, before any monitor writes, + /// to ensure proper ordering (manager state should be at least as recent as monitors on disk). + /// + /// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`] + /// from persist methods), this method should write queued data to storage. + /// + /// Returns the list of completed monitor updates (channel_id, update_id) that were flushed. + fn flush(&self, count: usize, channel_manager_bytes: Vec) -> Result, io::Error>; } struct MonitorHolder { @@ -272,7 +284,6 @@ pub struct AsyncPersister< FE::Target: FeeEstimator, { persister: MonitorUpdatingPersisterAsync, - event_notifier: Arc, } impl< @@ -320,8 +331,7 @@ where &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, ) -> ChannelMonitorUpdateStatus { - let notifier = Arc::clone(&self.event_notifier); - self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier); + self.persister.queue_new_channel(monitor_name, monitor); ChannelMonitorUpdateStatus::InProgress } @@ -329,8 +339,7 @@ where &self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<::EcdsaSigner>, ) -> ChannelMonitorUpdateStatus { - let notifier = Arc::clone(&self.event_notifier); - self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier); + self.persister.queue_channel_update(monitor_name, monitor_update, monitor); ChannelMonitorUpdateStatus::InProgress } @@ -338,8 +347,12 @@ where self.persister.spawn_async_archive_persisted_channel(monitor_name); } - fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { - self.persister.get_and_clear_completed_updates() + fn pending_write_count(&self) -> usize { + self.persister.pending_write_count() + } + + fn flush(&self, count: usize, channel_manager_bytes: Vec) -> Result, io::Error> { + crate::util::persist::poll_sync_future(self.persister.flush(count, channel_manager_bytes)) } } @@ -440,7 +453,6 @@ impl< persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, ) -> Self { - let event_notifier = Arc::new(Notifier::new()); Self { monitors: RwLock::new(new_hash_map()), chain_source, @@ -450,8 +462,8 @@ impl< _entropy_source, pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), - event_notifier: Arc::clone(&event_notifier), - persister: AsyncPersister { persister, event_notifier }, + event_notifier: Arc::new(Notifier::new()), + persister: AsyncPersister { persister }, pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, @@ -742,6 +754,33 @@ where .collect() } + /// Returns the number of pending writes in the persister queue. + /// + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific updates. + pub fn pending_write_count(&self) -> usize { + self.persister.pending_write_count() + } + + /// Flushes pending writes to the underlying storage. + /// + /// The `count` parameter specifies how many pending monitor writes to flush. + /// The `channel_manager_bytes` parameter contains the serialized channel manager to persist. + /// + /// The channel manager is always written first in the batch, before any monitor writes, + /// to ensure proper ordering (manager state should be at least as recent as monitors on disk). + /// + /// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`] + /// from persist methods), this method writes queued data to storage and signals + /// completion to the channel manager via [`Self::channel_monitor_updated`]. + pub fn flush(&self, count: usize, channel_manager_bytes: Vec) -> Result<(), io::Error> { + let completed = self.persister.flush(count, channel_manager_bytes)?; + for (channel_id, update_id) in completed { + let _ = self.channel_monitor_updated(channel_id, update_id); + } + Ok(()) + } + #[cfg(any(test, feature = "_test_utils"))] pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor { self.monitors.write().unwrap().remove(channel_id).unwrap().monitor @@ -1497,9 +1536,6 @@ where fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { - for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { - let _ = self.channel_monitor_updated(channel_id, update_id); - } let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 2e1e8805d0a..8881e440a35 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -18,7 +18,6 @@ use bitcoin::{BlockHash, Txid}; use core::convert::Infallible; use core::future::Future; -use core::mem; use core::ops::Deref; use core::pin::{pin, Pin}; use core::str::FromStr; @@ -41,7 +40,6 @@ use crate::util::async_poll::{ use crate::util::logger::Logger; use crate::util::native_async::FutureSpawner; use crate::util::ser::{Readable, ReadableArgs, Writeable}; -use crate::util::wakers::Notifier; /// The alphabet of characters allowed for namespaces and keys. pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = @@ -121,6 +119,54 @@ pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper"; /// updates applied to be current) with another implementation. pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2]; +/// An entry in a batch write operation. +pub struct BatchWriteEntry { + /// The primary namespace for this write. + pub primary_namespace: String, + /// The secondary namespace for this write. + pub secondary_namespace: String, + /// The key to write to. + pub key: String, + /// The data to write. + pub buf: Vec, +} + +impl BatchWriteEntry { + /// Creates a new batch write entry. + pub fn new( + primary_namespace: impl Into, secondary_namespace: impl Into, + key: impl Into, buf: Vec, + ) -> Self { + Self { + primary_namespace: primary_namespace.into(), + secondary_namespace: secondary_namespace.into(), + key: key.into(), + buf, + } + } +} + +/// The result of a batch write operation. +#[derive(Debug)] +pub struct BatchWriteResult { + /// The number of writes that completed successfully. + pub successful_writes: usize, + /// The error that occurred, if any. If `None`, all writes succeeded. + pub error: Option, +} + +impl BatchWriteResult { + /// Returns `true` if all writes succeeded. + pub fn is_ok(&self) -> bool { + self.error.is_none() + } + + /// Returns the error if one occurred, consuming the result. + pub fn err(self) -> Option { + self.error + } +} + /// Provides an interface that allows storage and retrieval of persisted values that are associated /// with given keys. /// @@ -193,6 +239,31 @@ pub trait KVStoreSync { fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Result, io::Error>; + /// Persists multiple key-value pairs in a single batch operation. + /// + /// Processes writes in order. Non-atomic: if an error occurs, earlier writes may have already + /// been persisted and will not be rolled back. However, writes after the failed one are never + /// started. + /// + /// The default implementation iterates through entries and calls [`Self::write`] for each one. + /// Implementations may override for optimized batch operations. + fn write_batch(&self, entries: Vec) -> BatchWriteResult { + let mut successful_writes = 0; + for entry in entries { + match self.write( + &entry.primary_namespace, + &entry.secondary_namespace, + &entry.key, + entry.buf, + ) { + Ok(()) => successful_writes += 1, + Err(e) => { + return BatchWriteResult { successful_writes, error: Some(e) }; + }, + } + } + BatchWriteResult { successful_writes, error: None } + } } /// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. It is not necessary to use this type @@ -248,6 +319,13 @@ where async move { res } } + + fn write_batch( + &self, entries: Vec, + ) -> impl Future + 'static + MaybeSend { + let res = self.0.write_batch(entries); + async move { res } + } } /// Provides an interface that allows storage and retrieval of persisted values that are associated @@ -345,6 +423,48 @@ pub trait KVStore { fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> impl Future, io::Error>> + 'static + MaybeSend; + /// Persists multiple key-value pairs in a single batch operation. + /// + /// Processes writes in order, awaiting each write before starting the next. Non-atomic: if an + /// error occurs, earlier writes may have already been persisted and will not be rolled back. + /// However, writes after the failed one are never started. + /// + /// Note that similar to [`Self::write`], ordering is maintained: all writes in the batch are + /// ordered relative to each other and to concurrent writes. + /// + /// The default implementation calls [`Self::write`] for each entry sequentially. + fn write_batch( + &self, entries: Vec, + ) -> impl Future + 'static + MaybeSend { + // Capture all write futures synchronously to maintain ordering + // (version numbers are assigned when write() is called, not when awaited) + let write_futures: Vec> + Send>>> = + entries + .into_iter() + .map(|entry| { + let fut = self.write( + &entry.primary_namespace, + &entry.secondary_namespace, + &entry.key, + entry.buf, + ); + Box::pin(fut) as Pin> + Send>> + }) + .collect(); + + async move { + let mut successful_writes = 0; + for write_future in write_futures { + match write_future.await { + Ok(()) => successful_writes += 1, + Err(e) => { + return BatchWriteResult { successful_writes, error: Some(e) }; + }, + } + } + BatchWriteResult { successful_writes, error: None } + } + } } /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] @@ -442,6 +562,18 @@ impl Persist) -> Result, io::Error> { + // KVStoreSync implementations persist immediately, so there's nothing to flush + // for monitors. However, we still need to persist the channel manager. + self.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + channel_manager_bytes, + )?; + Ok(Vec::new()) + } } /// Read previously persisted [`ChannelMonitor`]s from the store. @@ -501,7 +633,7 @@ impl FutureSpawner for PanicingSpawner { } } -fn poll_sync_future(future: F) -> F::Output { +pub(crate) fn poll_sync_future(future: F) -> F::Output { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match pin!(future).poll(&mut ctx) { @@ -513,6 +645,28 @@ fn poll_sync_future(future: F) -> F::Output { } } +/// Represents a pending write operation that will be executed when +/// [`MonitorUpdatingPersister::flush`] is called. +enum PendingWrite { + /// A full channel monitor write. + FullMonitor { + monitor_key: String, + monitor_bytes: Vec, + /// The (channel_id, update_id) pair to signal as complete after the write. + completion: (ChannelId, u64), + /// Range of stale update IDs to clean up after the write: `start..end` (exclusive end). + stale_update_cleanup: Option<(u64, u64)>, + }, + /// A channel monitor update write. + Update { + monitor_key: String, + update_key: String, + update_bytes: Vec, + /// The (channel_id, update_id) pair to signal as complete after the write. + completion: (ChannelId, u64), + }, +} + /// Implements [`Persist`] in a way that writes and reads both [`ChannelMonitor`]s and /// [`ChannelMonitorUpdate`]s. /// @@ -588,6 +742,13 @@ fn poll_sync_future(future: F) -> F::Output { /// If you have many stale updates stored (such as after a crash with pending lazy deletes), and /// would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. +/// +/// # Deferred Persistence +/// +/// When [`Persist::persist_new_channel`] or [`Persist::update_persisted_channel`] is called, the +/// serialized data is not immediately written to disk. Instead, it is stored in an internal queue +/// and [`ChannelMonitorUpdateStatus::InProgress`] is returned. To actually persist the data to disk, +/// call [`MonitorUpdatingPersister::flush`] which will write all pending data to the [`KVStoreSync`]. pub struct MonitorUpdatingPersister( MonitorUpdatingPersisterAsync, PanicingSpawner, L, ES, SP, BI, FE>, ) @@ -708,61 +869,49 @@ where BI::Target: BroadcasterInterface, FE::Target: FeeEstimator, { - /// Persists a new channel. This means writing the entire monitor to the - /// parametrized [`KVStoreSync`]. + /// Queues a new channel monitor to be persisted. The actual write happens when + /// [`MonitorUpdatingPersister::flush`] is called. + /// + /// Returns [`ChannelMonitorUpdateStatus::InProgress`] to indicate the write is pending. fn persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let res = poll_sync_future(self.0 .0.persist_new_channel(monitor_name, monitor)); - match res { - Ok(_) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.0 .0.logger, - "Failed to write ChannelMonitor {}/{}/{} reason: {}", - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_name, - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + let completion = (monitor.channel_id(), monitor.get_latest_update_id()); + // New channel, no stale updates to clean up + self.0 .0.queue_monitor_write(monitor_name, monitor, completion, None); + chain::ChannelMonitorUpdateStatus::InProgress } - /// Persists a channel update, writing only the update to the parameterized [`KVStoreSync`] if possible. + /// Queues a channel update to be persisted. The actual write happens when + /// [`MonitorUpdatingPersister::flush`] is called. /// - /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]: + /// In some cases, this will write the full monitor instead of just the update: /// - /// - No full monitor is found in [`KVStoreSync`] /// - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`] /// - LDK commands re-persisting the entire monitor through this function, specifically when - /// `update` is `None`. + /// `update` is `None`. /// - The update is at [`u64::MAX`], indicating an update generated by pre-0.1 LDK. + /// + /// Returns [`ChannelMonitorUpdateStatus::InProgress`] to indicate the write is pending. fn update_persisted_channel( &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let inner = Arc::clone(&self.0 .0); - let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor)); - match res { - Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.0 .0.logger, - "Failed to write ChannelMonitorUpdate {} id {} reason: {}", - monitor_name, - update.as_ref().map(|upd| upd.update_id).unwrap_or(0), - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + self.0 .0.queue_channel_update(monitor_name, update, monitor); + chain::ChannelMonitorUpdateStatus::InProgress } fn archive_persisted_channel(&self, monitor_name: MonitorName) { poll_sync_future(self.0 .0.archive_persisted_channel(monitor_name)); } + + fn pending_write_count(&self) -> usize { + self.0.pending_write_count() + } + + fn flush(&self, count: usize, channel_manager_bytes: Vec) -> Result, io::Error> { + poll_sync_future(self.0.flush(count, channel_manager_bytes)) + } } /// A variant of the [`MonitorUpdatingPersister`] which utilizes the async [`KVStore`] and offers @@ -811,7 +960,7 @@ struct MonitorUpdatingPersisterAsyncInner< FE::Target: FeeEstimator, { kv_store: K, - async_completed_updates: Mutex>, + pending_writes: Mutex>, future_spawner: S, logger: L, maximum_pending_updates: u64, @@ -840,7 +989,7 @@ where ) -> Self { MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner { kv_store, - async_completed_updates: Mutex::new(Vec::new()), + pending_writes: Mutex::new(Vec::new()), future_spawner, logger, maximum_pending_updates, @@ -963,6 +1112,131 @@ where pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { self.0.cleanup_stale_updates(lazy).await } + + /// Returns the number of pending writes in the queue. + /// + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific updates. + pub fn pending_write_count(&self) -> usize { + self.0.pending_writes.lock().unwrap().len() + } + + /// Flushes pending writes to the underlying [`KVStore`]. + /// + /// The `count` parameter specifies how many pending monitor writes to flush. + /// The `channel_manager_bytes` parameter contains the serialized channel manager to persist. + /// + /// The channel manager is always written first in the batch, before any monitor writes, + /// to ensure proper ordering (manager state should be at least as recent as monitors on disk). + /// + /// This method should be called after one or more calls that queue persist operations + /// to actually write the data to storage. + /// + /// Returns the list of completed monitor updates (channel_id, update_id) that were flushed. + pub async fn flush( + &self, count: usize, channel_manager_bytes: Vec, + ) -> Result, io::Error> { + let pending = { + let mut queue = self.0.pending_writes.lock().unwrap(); + let n = count.min(queue.len()); + queue.drain(..n).collect::>() + }; + + // Phase 1: Collect all batch entries + // Channel manager goes FIRST to ensure it's written before monitors + let mut batch_entries = Vec::with_capacity(pending.len() + 1); + batch_entries.push(BatchWriteEntry::new( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + channel_manager_bytes, + )); + + let mut stale_cleanups = Vec::new(); + + for (i, write) in pending.iter().enumerate() { + match write { + PendingWrite::FullMonitor { + monitor_key, + monitor_bytes, + stale_update_cleanup, + .. + } => { + batch_entries.push(BatchWriteEntry::new( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_key.clone(), + monitor_bytes.clone(), + )); + if let Some((start, end)) = stale_update_cleanup { + stale_cleanups.push((i, monitor_key.clone(), *start, *end)); + } + }, + PendingWrite::Update { monitor_key, update_key, update_bytes, .. } => { + batch_entries.push(BatchWriteEntry::new( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_key.clone(), + update_key.clone(), + update_bytes.clone(), + )); + }, + } + } + + // Phase 2: Execute batch write + let result = self.0.kv_store.write_batch(batch_entries).await; + if let Some(err) = result.error { + // The first entry is the channel manager, so successful_writes includes it + // Monitor writes start at index 1, so subtract 1 to get monitor success count + let successful_monitor_writes = + if result.successful_writes > 0 { result.successful_writes - 1 } else { 0 }; + // Re-queue failed and subsequent monitor writes + let failed_writes = + pending.into_iter().skip(successful_monitor_writes).collect::>(); + if !failed_writes.is_empty() { + let mut queue = self.0.pending_writes.lock().unwrap(); + // Prepend failed writes back to the front of the queue + for write in failed_writes.into_iter().rev() { + queue.insert(0, write); + } + } + return Err(err); + } + // Subtract 1 for the channel manager entry to get monitor success count + let successful_monitor_writes = result.successful_writes.saturating_sub(1); + + // Phase 3: Cleanup stale updates (only for successfully written monitors) + for (i, monitor_key, start, end) in stale_cleanups { + if i < successful_monitor_writes { + for update_id in start..end { + let update_name = UpdateName::from(update_id); + // Lazy delete - ignore errors as this is just cleanup + let _ = self + .0 + .kv_store + .remove( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + &monitor_key, + update_name.as_str(), + true, + ) + .await; + } + } + } + + // Phase 4: Return completions for successful writes only + let completions = pending + .into_iter() + .take(successful_monitor_writes) + .map(|write| match write { + PendingWrite::FullMonitor { completion, .. } => completion, + PendingWrite::Update { completion, .. } => completion, + }) + .collect(); + + Ok(completions) + } } impl< @@ -983,63 +1257,24 @@ where FE::Target: FeeEstimator, ::EcdsaSigner: MaybeSend + 'static, { - pub(crate) fn spawn_async_persist_new_channel( + /// Queues a new channel monitor to be persisted. The actual write happens when + /// [`Self::flush`] is called. + pub(crate) fn queue_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, - notifier: Arc, ) { - let inner = Arc::clone(&self.0); - // Note that `persist_new_channel` is a sync method which calls all the way through to the - // sync KVStore::write method (which returns a future) to ensure writes are well-ordered. - let future = inner.persist_new_channel(monitor_name, monitor); - let channel_id = monitor.channel_id(); let completion = (monitor.channel_id(), monitor.get_latest_update_id()); - let _runs_free = self.0.future_spawner.spawn(async move { - match future.await { - Ok(()) => { - inner.async_completed_updates.lock().unwrap().push(completion); - notifier.notify(); - }, - Err(e) => { - log_error!( - inner.logger, - "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.", - ); - }, - } - }); + // New channel, no stale updates to clean up + self.0.queue_monitor_write(monitor_name, monitor, completion, None); } - pub(crate) fn spawn_async_update_channel( + /// Queues a channel update to be persisted. The actual write happens when + /// [`Self::flush`] is called. + pub(crate) fn queue_channel_update( &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<::EcdsaSigner>, - notifier: Arc, ) { - let inner = Arc::clone(&self.0); - // Note that `update_persisted_channel` is a sync method which calls all the way through to - // the sync KVStore::write method (which returns a future) to ensure writes are well-ordered - let future = inner.update_persisted_channel(monitor_name, update, monitor); - let channel_id = monitor.channel_id(); - let completion = if let Some(update) = update { - Some((monitor.channel_id(), update.update_id)) - } else { - None - }; - let inner = Arc::clone(&self.0); - let _runs_free = self.0.future_spawner.spawn(async move { - match future.await { - Ok(()) => if let Some(completion) = completion { - inner.async_completed_updates.lock().unwrap().push(completion); - notifier.notify(); - }, - Err(e) => { - log_error!( - inner.logger, - "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.", - ); - }, - } - }); + self.0.queue_channel_update(monitor_name, update, monitor); } pub(crate) fn spawn_async_archive_persisted_channel(&self, monitor_name: MonitorName) { @@ -1048,15 +1283,8 @@ where inner.archive_persisted_channel(monitor_name).await; }); } - - pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { - mem::take(&mut *self.0.async_completed_updates.lock().unwrap()) - } } -trait MaybeSendableFuture: Future> + MaybeSend {} -impl> + MaybeSend> MaybeSendableFuture for F {} - impl MonitorUpdatingPersisterAsyncInner where @@ -1231,111 +1459,83 @@ where Ok(()) } - fn persist_new_channel<'a, ChannelSigner: EcdsaChannelSigner>( - &'a self, monitor_name: MonitorName, monitor: &'a ChannelMonitor, - ) -> Pin> + 'static>> { - // Determine the proper key for this monitor + /// Queues a full monitor write to the pending writes queue. + fn queue_monitor_write( + &self, monitor_name: MonitorName, monitor: &ChannelMonitor, + completion: (ChannelId, u64), stale_update_cleanup: Option<(u64, u64)>, + ) { let monitor_key = monitor_name.to_string(); - // Serialize and write the new monitor + let mut monitor_bytes = Vec::with_capacity( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(), ); - // If `maximum_pending_updates` is zero, we aren't actually writing monitor updates at all. - // Thus, there's no need to add the sentinel prefix as the monitor can be read directly - // from disk without issue. if self.maximum_pending_updates != 0 { monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); } monitor.write(&mut monitor_bytes).unwrap(); - // Note that this is NOT an async function, but rather calls the *sync* KVStore write - // method, allowing it to do its queueing immediately, and then return a future for the - // completion of the write. This ensures monitor persistence ordering is preserved. - let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; - let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; - // There's no real reason why this needs to be boxed, but dropping it rams into the "hidden - // type for impl... captures lifetime that does not appear in bounds" issue. This can - // trivially be dropped once we upgrade to edition 2024/MSRV 1.85. - Box::pin(self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes)) + + self.pending_writes.lock().unwrap().push(PendingWrite::FullMonitor { + monitor_key, + monitor_bytes, + completion, + stale_update_cleanup, + }); } - fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>( - self: Arc, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, + /// Queues an update write to the pending writes queue. + fn queue_update_write( + &self, monitor_name: MonitorName, update: &ChannelMonitorUpdate, + completion: (ChannelId, u64), + ) { + let monitor_key = monitor_name.to_string(); + let update_name = UpdateName::from(update.update_id); + let update_bytes = update.encode(); + + self.pending_writes.lock().unwrap().push(PendingWrite::Update { + monitor_key, + update_key: update_name.as_str().to_string(), + update_bytes, + completion, + }); + } + + /// Queues writes for a channel update. Decides whether to write just the update or a full + /// monitor based on the update ID and maximum_pending_updates configuration. + fn queue_channel_update( + &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, - ) -> impl Future> + 'a - where - Self: 'a, - { + ) { const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; - let mut res_a = None; - let mut res_b = None; - let mut res_c = None; + + let completion = ( + monitor.channel_id(), + update.map(|u| u.update_id).unwrap_or_else(|| monitor.get_latest_update_id()), + ); + if let Some(update) = update { let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID && self.maximum_pending_updates != 0 && update.update_id % self.maximum_pending_updates != 0; + if persist_update { - let monitor_key = monitor_name.to_string(); - let update_name = UpdateName::from(update.update_id); - let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; - // Note that this is NOT an async function, but rather calls the *sync* KVStore - // write method, allowing it to do its queueing immediately, and then return a - // future for the completion of the write. This ensures monitor persistence - // ordering is preserved. - let encoded = update.encode(); - res_a = Some(async move { - self.kv_store.write(primary, &monitor_key, update_name.as_str(), encoded).await - }); + // Write just the update + self.queue_update_write(monitor_name, update, completion); } else { - // We could write this update, but it meets criteria of our design that calls for a full monitor write. - // Note that this is NOT an async function, but rather calls the *sync* KVStore - // write method, allowing it to do its queueing immediately, and then return a - // future for the completion of the write. This ensures monitor persistence - // ordering is preserved. This, thus, must happen before any await we do below. - let write_fut = self.persist_new_channel(monitor_name, monitor); + // Write full monitor and clean up stale updates afterward let latest_update_id = monitor.get_latest_update_id(); - - res_b = Some(async move { - let write_status = write_fut.await; - if let Ok(()) = write_status { - if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID { - let monitor_key = monitor_name.to_string(); - self.cleanup_stale_updates_for_monitor_to( - &monitor_key, - latest_update_id, - true, - ) - .await?; - } else { - let end = latest_update_id; - let start = end.saturating_sub(self.maximum_pending_updates); - self.cleanup_in_range(monitor_name, start, end).await; - } - } - - write_status - }); + let stale_update_cleanup = if latest_update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID { + let end = latest_update_id; + let start = end.saturating_sub(self.maximum_pending_updates); + Some((start, end)) + } else { + None + }; + self.queue_monitor_write(monitor_name, monitor, completion, stale_update_cleanup); } } else { - // There is no update given, so we must persist a new monitor. - // Note that this is NOT an async function, but rather calls the *sync* KVStore write - // method, allowing it to do its queueing immediately, and then return a future for the - // completion of the write. This ensures monitor persistence ordering is preserved. - res_c = Some(self.persist_new_channel(monitor_name, monitor)); - } - async move { - // Complete any pending future(s). Note that to keep one return type we have to end - // with a single async move block that we return, rather than trying to return the - // individual futures themselves. - if let Some(a) = res_a { - a.await?; - } - if let Some(b) = res_b { - b.await?; - } - if let Some(c) = res_c { - c.await?; - } - Ok(()) + // No update given, persist full monitor (no cleanup needed as this is typically + // called for force-close or other special cases) + self.queue_monitor_write(monitor_name, monitor, completion, None); } } @@ -1355,24 +1555,6 @@ where let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; let _ = self.kv_store.remove(primary, secondary, &monitor_key, true).await; } - - // Cleans up monitor updates for given monitor in range `start..=end`. - async fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) { - let monitor_key = monitor_name.to_string(); - for update_id in start..=end { - let update_name = UpdateName::from(update_id); - let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; - let res = self.kv_store.remove(primary, &monitor_key, update_name.as_str(), true).await; - if let Err(e) = res { - log_error!( - self.logger, - "Failed to clean up channel monitor updates for monitor {}, reason: {}", - monitor_key.as_str(), - e - ); - }; - } - } } /// A struct representing a name for a channel monitor. diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..f52068b53ad 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -824,6 +824,10 @@ impl Persist for WatchtowerPers monitor_name, ); } + + fn flush(&self, _count: usize, _channel_manager_bytes: Vec) -> Result, io::Error> { + Ok(Vec::new()) + } } pub struct TestPersister { @@ -887,6 +891,10 @@ impl Persist for TestPersister self.offchain_monitor_updates.lock().unwrap().remove(&monitor_name); self.chain_sync_monitor_persistences.lock().unwrap().retain(|x| x != &monitor_name); } + + fn flush(&self, _count: usize, _channel_manager_bytes: Vec) -> Result, io::Error> { + Ok(Vec::new()) + } } // A simple multi-producer-single-consumer one-shot channel