From 6ed79d99f8d99882651a064930b778661d9226a4 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 15 Jan 2026 14:46:53 +0100 Subject: [PATCH 1/4] Defer MonitorUpdatingPersister writes to flush() Update MonitorUpdatingPersister and MonitorUpdatingPersisterAsync to queue persist operations in memory instead of writing immediately to disk. The Persist trait methods now return ChannelMonitorUpdateStatus:: InProgress and the actual writes happen when flush() is called. This fixes a race condition that could cause channel force closures: previously, if the node crashed after writing channel monitors but before writing the channel manager, the monitors would be ahead of the manager on restart. By deferring monitor writes until after the channel manager is persisted (via flush()), we ensure the manager is always at least as up-to-date as the monitors. The flush() method takes a count parameter specifying how many queued writes to flush. The background processor captures the queue size before persisting the channel manager, then flushes exactly that many writes afterward. This prevents flushing monitor updates that arrived after the manager state was captured. Key changes: - Add PendingWrite enum with FullMonitor and Update variants for queued writes - Add pending_writes queue to MonitorUpdatingPersisterAsyncInner - Add pending_write_count() and flush(count) to Persist trait and ChainMonitor - ChainMonitor::flush() calls channel_monitor_updated for each completed write - Stale update cleanup happens in flush() after full monitor is written - Call flush() in background processor after channel manager persistence Co-Authored-By: Claude Opus 4.5 --- lightning-background-processor/src/lib.rs | 49 +++ lightning/src/chain/chainmonitor.rs | 71 ++-- lightning/src/util/persist.rs | 406 +++++++++++----------- lightning/src/util/test_utils.rs | 8 + 4 files changed, 314 insertions(+), 220 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index c38d6dfe080..e9e4e4ab487 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1152,6 +1152,11 @@ where let mut futures = Joiner::new(); + // Capture the number of pending monitor writes before persisting the channel manager. + // We'll only flush this many writes after the manager is persisted, to avoid flushing + // monitor updates that arrived after the manager state was captured. + let pending_monitor_writes = chain_monitor.pending_write_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); @@ -1349,6 +1354,15 @@ where res?; } + // Flush the monitor writes that were pending before we persisted the channel manager. + // Any writes that arrived after are left in the queue for the next iteration. + if pending_monitor_writes > 0 { + match chain_monitor.flush(pending_monitor_writes) { + Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes), + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } + } + match check_and_reset_sleeper(&mut last_onion_message_handler_call, || { sleeper(ONION_MESSAGE_HANDLER_TIMER) }) { @@ -1413,6 +1427,16 @@ where channel_manager.get_cm().encode(), ) .await?; + + // Flush all pending monitor writes after final channel manager persistence. + let pending_monitor_writes = chain_monitor.pending_write_count(); + if pending_monitor_writes > 0 { + match chain_monitor.flush(pending_monitor_writes) { + Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes), + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } + } + if let Some(ref scorer) = scorer { kv_store .write( @@ -1722,6 +1746,9 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } + // Capture the number of pending monitor writes before persisting the channel manager. + let pending_monitor_writes = chain_monitor.pending_write_count(); + if channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!(logger, "Persisting ChannelManager..."); (kv_store.write( @@ -1733,6 +1760,16 @@ impl BackgroundProcessor { log_trace!(logger, "Done persisting ChannelManager."); } + // Flush the monitor writes that were pending before we persisted the channel manager. + if pending_monitor_writes > 0 { + match chain_monitor.flush(pending_monitor_writes) { + Ok(()) => { + log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes) + }, + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } + } + if let Some(liquidity_manager) = liquidity_manager.as_ref() { log_trace!(logger, "Persisting LiquidityManager..."); let _ = liquidity_manager.get_lm().persist().map_err(|e| { @@ -1853,6 +1890,18 @@ impl BackgroundProcessor { CHANNEL_MANAGER_PERSISTENCE_KEY, channel_manager.get_cm().encode(), )?; + + // Flush all pending monitor writes after final channel manager persistence. + let pending_monitor_writes = chain_monitor.pending_write_count(); + if pending_monitor_writes > 0 { + match chain_monitor.flush(pending_monitor_writes) { + Ok(()) => { + log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes) + }, + Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), + } + } + if let Some(ref scorer) = scorer { kv_store.write( SCORER_PERSISTENCE_PRIMARY_NAMESPACE, diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 9fd6383cf7e..c1d2265e0a9 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -39,6 +39,7 @@ use crate::chain::channelmonitor::{ use crate::chain::transaction::{OutPoint, TransactionData}; use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::events::{self, Event, EventHandler, ReplayEvent}; +use crate::io; use crate::ln::channel_state::ChannelDetails; #[cfg(peer_storage)] use crate::ln::msgs::PeerStorage; @@ -198,16 +199,23 @@ pub trait Persist { /// the monitor already exists in the archive. fn archive_persisted_channel(&self, monitor_name: MonitorName); - /// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with - /// [`Self::update_persisted_channel`], which have completed. + /// Returns the number of pending writes in the queue. /// - /// Returning an update here is equivalent to calling - /// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and - /// hidden in the docs. - #[doc(hidden)] - fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { - Vec::new() + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific updates. + fn pending_write_count(&self) -> usize { + 0 } + + /// Flushes pending writes to the underlying storage. + /// + /// The `count` parameter specifies how many pending writes to flush. + /// + /// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`] + /// from persist methods), this method should write queued data to storage. + /// + /// Returns the list of completed monitor updates (channel_id, update_id) that were flushed. + fn flush(&self, count: usize) -> Result, io::Error>; } struct MonitorHolder { @@ -272,7 +280,6 @@ pub struct AsyncPersister< FE::Target: FeeEstimator, { persister: MonitorUpdatingPersisterAsync, - event_notifier: Arc, } impl< @@ -320,8 +327,7 @@ where &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, ) -> ChannelMonitorUpdateStatus { - let notifier = Arc::clone(&self.event_notifier); - self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier); + self.persister.queue_new_channel(monitor_name, monitor); ChannelMonitorUpdateStatus::InProgress } @@ -329,8 +335,7 @@ where &self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<::EcdsaSigner>, ) -> ChannelMonitorUpdateStatus { - let notifier = Arc::clone(&self.event_notifier); - self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier); + self.persister.queue_channel_update(monitor_name, monitor_update, monitor); ChannelMonitorUpdateStatus::InProgress } @@ -338,8 +343,12 @@ where self.persister.spawn_async_archive_persisted_channel(monitor_name); } - fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { - self.persister.get_and_clear_completed_updates() + fn pending_write_count(&self) -> usize { + self.persister.pending_write_count() + } + + fn flush(&self, count: usize) -> Result, io::Error> { + crate::util::persist::poll_sync_future(self.persister.flush(count)) } } @@ -440,7 +449,6 @@ impl< persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, ) -> Self { - let event_notifier = Arc::new(Notifier::new()); Self { monitors: RwLock::new(new_hash_map()), chain_source, @@ -450,8 +458,8 @@ impl< _entropy_source, pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), - event_notifier: Arc::clone(&event_notifier), - persister: AsyncPersister { persister, event_notifier }, + event_notifier: Arc::new(Notifier::new()), + persister: AsyncPersister { persister }, pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, @@ -742,6 +750,30 @@ where .collect() } + /// Returns the number of pending writes in the persister queue. + /// + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific updates. + pub fn pending_write_count(&self) -> usize { + self.persister.pending_write_count() + } + + /// Flushes pending writes to the underlying storage. + /// + /// If `count` is `Some(n)`, only the first `n` pending writes are flushed. + /// If `count` is `None`, all pending writes are flushed. + /// + /// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`] + /// from persist methods), this method writes queued data to storage and signals + /// completion to the channel manager via [`Self::channel_monitor_updated`]. + pub fn flush(&self, count: usize) -> Result<(), io::Error> { + let completed = self.persister.flush(count)?; + for (channel_id, update_id) in completed { + let _ = self.channel_monitor_updated(channel_id, update_id); + } + Ok(()) + } + #[cfg(any(test, feature = "_test_utils"))] pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor { self.monitors.write().unwrap().remove(channel_id).unwrap().monitor @@ -1497,9 +1529,6 @@ where fn release_pending_monitor_events( &self, ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { - for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() { - let _ = self.channel_monitor_updated(channel_id, update_id); - } let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events(); diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 2e1e8805d0a..7bcf46534ce 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -18,9 +18,8 @@ use bitcoin::{BlockHash, Txid}; use core::convert::Infallible; use core::future::Future; -use core::mem; use core::ops::Deref; -use core::pin::{pin, Pin}; +use core::pin::pin; use core::str::FromStr; use core::task; @@ -41,7 +40,6 @@ use crate::util::async_poll::{ use crate::util::logger::Logger; use crate::util::native_async::FutureSpawner; use crate::util::ser::{Readable, ReadableArgs, Writeable}; -use crate::util::wakers::Notifier; /// The alphabet of characters allowed for namespaces and keys. pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = @@ -442,6 +440,11 @@ impl Persist Result, io::Error> { + // KVStoreSync implementations persist immediately, so there's nothing to flush. + Ok(Vec::new()) + } } /// Read previously persisted [`ChannelMonitor`]s from the store. @@ -501,7 +504,7 @@ impl FutureSpawner for PanicingSpawner { } } -fn poll_sync_future(future: F) -> F::Output { +pub(crate) fn poll_sync_future(future: F) -> F::Output { let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); match pin!(future).poll(&mut ctx) { @@ -513,6 +516,28 @@ fn poll_sync_future(future: F) -> F::Output { } } +/// Represents a pending write operation that will be executed when +/// [`MonitorUpdatingPersister::flush`] is called. +enum PendingWrite { + /// A full channel monitor write. + FullMonitor { + monitor_key: String, + monitor_bytes: Vec, + /// The (channel_id, update_id) pair to signal as complete after the write. + completion: (ChannelId, u64), + /// Range of stale update IDs to clean up after the write: `start..end` (exclusive end). + stale_update_cleanup: Option<(u64, u64)>, + }, + /// A channel monitor update write. + Update { + monitor_key: String, + update_key: String, + update_bytes: Vec, + /// The (channel_id, update_id) pair to signal as complete after the write. + completion: (ChannelId, u64), + }, +} + /// Implements [`Persist`] in a way that writes and reads both [`ChannelMonitor`]s and /// [`ChannelMonitorUpdate`]s. /// @@ -588,6 +613,13 @@ fn poll_sync_future(future: F) -> F::Output { /// If you have many stale updates stored (such as after a crash with pending lazy deletes), and /// would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. +/// +/// # Deferred Persistence +/// +/// When [`Persist::persist_new_channel`] or [`Persist::update_persisted_channel`] is called, the +/// serialized data is not immediately written to disk. Instead, it is stored in an internal queue +/// and [`ChannelMonitorUpdateStatus::InProgress`] is returned. To actually persist the data to disk, +/// call [`MonitorUpdatingPersister::flush`] which will write all pending data to the [`KVStoreSync`]. pub struct MonitorUpdatingPersister( MonitorUpdatingPersisterAsync, PanicingSpawner, L, ES, SP, BI, FE>, ) @@ -708,61 +740,49 @@ where BI::Target: BroadcasterInterface, FE::Target: FeeEstimator, { - /// Persists a new channel. This means writing the entire monitor to the - /// parametrized [`KVStoreSync`]. + /// Queues a new channel monitor to be persisted. The actual write happens when + /// [`MonitorUpdatingPersister::flush`] is called. + /// + /// Returns [`ChannelMonitorUpdateStatus::InProgress`] to indicate the write is pending. fn persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let res = poll_sync_future(self.0 .0.persist_new_channel(monitor_name, monitor)); - match res { - Ok(_) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.0 .0.logger, - "Failed to write ChannelMonitor {}/{}/{} reason: {}", - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - monitor_name, - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + let completion = (monitor.channel_id(), monitor.get_latest_update_id()); + // New channel, no stale updates to clean up + self.0 .0.queue_monitor_write(monitor_name, monitor, completion, None); + chain::ChannelMonitorUpdateStatus::InProgress } - /// Persists a channel update, writing only the update to the parameterized [`KVStoreSync`] if possible. + /// Queues a channel update to be persisted. The actual write happens when + /// [`MonitorUpdatingPersister::flush`] is called. /// - /// In some cases, this will forward to [`MonitorUpdatingPersister::persist_new_channel`]: + /// In some cases, this will write the full monitor instead of just the update: /// - /// - No full monitor is found in [`KVStoreSync`] /// - The number of pending updates exceeds `maximum_pending_updates` as given to [`Self::new`] /// - LDK commands re-persisting the entire monitor through this function, specifically when - /// `update` is `None`. + /// `update` is `None`. /// - The update is at [`u64::MAX`], indicating an update generated by pre-0.1 LDK. + /// + /// Returns [`ChannelMonitorUpdateStatus::InProgress`] to indicate the write is pending. fn update_persisted_channel( &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, ) -> chain::ChannelMonitorUpdateStatus { - let inner = Arc::clone(&self.0 .0); - let res = poll_sync_future(inner.update_persisted_channel(monitor_name, update, monitor)); - match res { - Ok(()) => chain::ChannelMonitorUpdateStatus::Completed, - Err(e) => { - log_error!( - self.0 .0.logger, - "Failed to write ChannelMonitorUpdate {} id {} reason: {}", - monitor_name, - update.as_ref().map(|upd| upd.update_id).unwrap_or(0), - e - ); - chain::ChannelMonitorUpdateStatus::UnrecoverableError - }, - } + self.0 .0.queue_channel_update(monitor_name, update, monitor); + chain::ChannelMonitorUpdateStatus::InProgress } fn archive_persisted_channel(&self, monitor_name: MonitorName) { poll_sync_future(self.0 .0.archive_persisted_channel(monitor_name)); } + + fn pending_write_count(&self) -> usize { + self.0.pending_write_count() + } + + fn flush(&self, count: usize) -> Result, io::Error> { + poll_sync_future(self.0.flush(count)) + } } /// A variant of the [`MonitorUpdatingPersister`] which utilizes the async [`KVStore`] and offers @@ -811,7 +831,7 @@ struct MonitorUpdatingPersisterAsyncInner< FE::Target: FeeEstimator, { kv_store: K, - async_completed_updates: Mutex>, + pending_writes: Mutex>, future_spawner: S, logger: L, maximum_pending_updates: u64, @@ -840,7 +860,7 @@ where ) -> Self { MonitorUpdatingPersisterAsync(Arc::new(MonitorUpdatingPersisterAsyncInner { kv_store, - async_completed_updates: Mutex::new(Vec::new()), + pending_writes: Mutex::new(Vec::new()), future_spawner, logger, maximum_pending_updates, @@ -963,6 +983,86 @@ where pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { self.0.cleanup_stale_updates(lazy).await } + + /// Returns the number of pending writes in the queue. + /// + /// This can be used to capture the queue size before persisting the channel manager, + /// then pass that count to [`Self::flush`] to only flush those specific updates. + pub fn pending_write_count(&self) -> usize { + self.0.pending_writes.lock().unwrap().len() + } + + /// Flushes pending writes to the underlying [`KVStore`]. + /// + /// If `count` is `Some(n)`, only the first `n` pending writes are flushed. + /// If `count` is `None`, all pending writes are flushed. + /// + /// This method should be called after one or more calls that queue persist operations + /// to actually write the data to storage. + /// + /// Returns the list of completed monitor updates (channel_id, update_id) that were flushed. + pub async fn flush(&self, count: usize) -> Result, io::Error> { + let pending = { + let mut queue = self.0.pending_writes.lock().unwrap(); + let n = count.min(queue.len()); + queue.drain(..n).collect::>() + }; + + let mut completed = Vec::new(); + for write in pending { + match write { + PendingWrite::FullMonitor { + monitor_key, + monitor_bytes, + completion, + stale_update_cleanup, + } => { + self.0 + .kv_store + .write( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + &monitor_key, + monitor_bytes, + ) + .await?; + completed.push(completion); + + // Clean up stale updates after successfully writing a full monitor + if let Some((start, end)) = stale_update_cleanup { + for update_id in start..end { + let update_name = UpdateName::from(update_id); + // Lazy delete - ignore errors as this is just cleanup + let _ = self + .0 + .kv_store + .remove( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + &monitor_key, + update_name.as_str(), + true, + ) + .await; + } + } + }, + PendingWrite::Update { monitor_key, update_key, update_bytes, completion } => { + self.0 + .kv_store + .write( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + &monitor_key, + &update_key, + update_bytes, + ) + .await?; + completed.push(completion); + }, + } + } + + Ok(completed) + } } impl< @@ -983,63 +1083,24 @@ where FE::Target: FeeEstimator, ::EcdsaSigner: MaybeSend + 'static, { - pub(crate) fn spawn_async_persist_new_channel( + /// Queues a new channel monitor to be persisted. The actual write happens when + /// [`Self::flush`] is called. + pub(crate) fn queue_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, - notifier: Arc, ) { - let inner = Arc::clone(&self.0); - // Note that `persist_new_channel` is a sync method which calls all the way through to the - // sync KVStore::write method (which returns a future) to ensure writes are well-ordered. - let future = inner.persist_new_channel(monitor_name, monitor); - let channel_id = monitor.channel_id(); let completion = (monitor.channel_id(), monitor.get_latest_update_id()); - let _runs_free = self.0.future_spawner.spawn(async move { - match future.await { - Ok(()) => { - inner.async_completed_updates.lock().unwrap().push(completion); - notifier.notify(); - }, - Err(e) => { - log_error!( - inner.logger, - "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.", - ); - }, - } - }); + // New channel, no stale updates to clean up + self.0.queue_monitor_write(monitor_name, monitor, completion, None); } - pub(crate) fn spawn_async_update_channel( + /// Queues a channel update to be persisted. The actual write happens when + /// [`Self::flush`] is called. + pub(crate) fn queue_channel_update( &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<::EcdsaSigner>, - notifier: Arc, ) { - let inner = Arc::clone(&self.0); - // Note that `update_persisted_channel` is a sync method which calls all the way through to - // the sync KVStore::write method (which returns a future) to ensure writes are well-ordered - let future = inner.update_persisted_channel(monitor_name, update, monitor); - let channel_id = monitor.channel_id(); - let completion = if let Some(update) = update { - Some((monitor.channel_id(), update.update_id)) - } else { - None - }; - let inner = Arc::clone(&self.0); - let _runs_free = self.0.future_spawner.spawn(async move { - match future.await { - Ok(()) => if let Some(completion) = completion { - inner.async_completed_updates.lock().unwrap().push(completion); - notifier.notify(); - }, - Err(e) => { - log_error!( - inner.logger, - "Failed to persist new ChannelMonitor {channel_id}: {e}. The node will now likely stall as this channel will not be able to make progress. You should restart as soon as possible.", - ); - }, - } - }); + self.0.queue_channel_update(monitor_name, update, monitor); } pub(crate) fn spawn_async_archive_persisted_channel(&self, monitor_name: MonitorName) { @@ -1048,15 +1109,8 @@ where inner.archive_persisted_channel(monitor_name).await; }); } - - pub(crate) fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> { - mem::take(&mut *self.0.async_completed_updates.lock().unwrap()) - } } -trait MaybeSendableFuture: Future> + MaybeSend {} -impl> + MaybeSend> MaybeSendableFuture for F {} - impl MonitorUpdatingPersisterAsyncInner where @@ -1231,111 +1285,83 @@ where Ok(()) } - fn persist_new_channel<'a, ChannelSigner: EcdsaChannelSigner>( - &'a self, monitor_name: MonitorName, monitor: &'a ChannelMonitor, - ) -> Pin> + 'static>> { - // Determine the proper key for this monitor + /// Queues a full monitor write to the pending writes queue. + fn queue_monitor_write( + &self, monitor_name: MonitorName, monitor: &ChannelMonitor, + completion: (ChannelId, u64), stale_update_cleanup: Option<(u64, u64)>, + ) { let monitor_key = monitor_name.to_string(); - // Serialize and write the new monitor + let mut monitor_bytes = Vec::with_capacity( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() + monitor.serialized_length(), ); - // If `maximum_pending_updates` is zero, we aren't actually writing monitor updates at all. - // Thus, there's no need to add the sentinel prefix as the monitor can be read directly - // from disk without issue. if self.maximum_pending_updates != 0 { monitor_bytes.extend_from_slice(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL); } monitor.write(&mut monitor_bytes).unwrap(); - // Note that this is NOT an async function, but rather calls the *sync* KVStore write - // method, allowing it to do its queueing immediately, and then return a future for the - // completion of the write. This ensures monitor persistence ordering is preserved. - let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; - let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; - // There's no real reason why this needs to be boxed, but dropping it rams into the "hidden - // type for impl... captures lifetime that does not appear in bounds" issue. This can - // trivially be dropped once we upgrade to edition 2024/MSRV 1.85. - Box::pin(self.kv_store.write(primary, secondary, monitor_key.as_str(), monitor_bytes)) + + self.pending_writes.lock().unwrap().push(PendingWrite::FullMonitor { + monitor_key, + monitor_bytes, + completion, + stale_update_cleanup, + }); + } + + /// Queues an update write to the pending writes queue. + fn queue_update_write( + &self, monitor_name: MonitorName, update: &ChannelMonitorUpdate, + completion: (ChannelId, u64), + ) { + let monitor_key = monitor_name.to_string(); + let update_name = UpdateName::from(update.update_id); + let update_bytes = update.encode(); + + self.pending_writes.lock().unwrap().push(PendingWrite::Update { + monitor_key, + update_key: update_name.as_str().to_string(), + update_bytes, + completion, + }); } - fn update_persisted_channel<'a, ChannelSigner: EcdsaChannelSigner + 'a>( - self: Arc, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, + /// Queues writes for a channel update. Decides whether to write just the update or a full + /// monitor based on the update ID and maximum_pending_updates configuration. + fn queue_channel_update( + &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor, - ) -> impl Future> + 'a - where - Self: 'a, - { + ) { const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; - let mut res_a = None; - let mut res_b = None; - let mut res_c = None; + + let completion = ( + monitor.channel_id(), + update.map(|u| u.update_id).unwrap_or_else(|| monitor.get_latest_update_id()), + ); + if let Some(update) = update { let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID && self.maximum_pending_updates != 0 && update.update_id % self.maximum_pending_updates != 0; + if persist_update { - let monitor_key = monitor_name.to_string(); - let update_name = UpdateName::from(update.update_id); - let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; - // Note that this is NOT an async function, but rather calls the *sync* KVStore - // write method, allowing it to do its queueing immediately, and then return a - // future for the completion of the write. This ensures monitor persistence - // ordering is preserved. - let encoded = update.encode(); - res_a = Some(async move { - self.kv_store.write(primary, &monitor_key, update_name.as_str(), encoded).await - }); + // Write just the update + self.queue_update_write(monitor_name, update, completion); } else { - // We could write this update, but it meets criteria of our design that calls for a full monitor write. - // Note that this is NOT an async function, but rather calls the *sync* KVStore - // write method, allowing it to do its queueing immediately, and then return a - // future for the completion of the write. This ensures monitor persistence - // ordering is preserved. This, thus, must happen before any await we do below. - let write_fut = self.persist_new_channel(monitor_name, monitor); + // Write full monitor and clean up stale updates afterward let latest_update_id = monitor.get_latest_update_id(); - - res_b = Some(async move { - let write_status = write_fut.await; - if let Ok(()) = write_status { - if latest_update_id == LEGACY_CLOSED_CHANNEL_UPDATE_ID { - let monitor_key = monitor_name.to_string(); - self.cleanup_stale_updates_for_monitor_to( - &monitor_key, - latest_update_id, - true, - ) - .await?; - } else { - let end = latest_update_id; - let start = end.saturating_sub(self.maximum_pending_updates); - self.cleanup_in_range(monitor_name, start, end).await; - } - } - - write_status - }); + let stale_update_cleanup = if latest_update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID { + let end = latest_update_id; + let start = end.saturating_sub(self.maximum_pending_updates); + Some((start, end)) + } else { + None + }; + self.queue_monitor_write(monitor_name, monitor, completion, stale_update_cleanup); } } else { - // There is no update given, so we must persist a new monitor. - // Note that this is NOT an async function, but rather calls the *sync* KVStore write - // method, allowing it to do its queueing immediately, and then return a future for the - // completion of the write. This ensures monitor persistence ordering is preserved. - res_c = Some(self.persist_new_channel(monitor_name, monitor)); - } - async move { - // Complete any pending future(s). Note that to keep one return type we have to end - // with a single async move block that we return, rather than trying to return the - // individual futures themselves. - if let Some(a) = res_a { - a.await?; - } - if let Some(b) = res_b { - b.await?; - } - if let Some(c) = res_c { - c.await?; - } - Ok(()) + // No update given, persist full monitor (no cleanup needed as this is typically + // called for force-close or other special cases) + self.queue_monitor_write(monitor_name, monitor, completion, None); } } @@ -1355,24 +1381,6 @@ where let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; let _ = self.kv_store.remove(primary, secondary, &monitor_key, true).await; } - - // Cleans up monitor updates for given monitor in range `start..=end`. - async fn cleanup_in_range(&self, monitor_name: MonitorName, start: u64, end: u64) { - let monitor_key = monitor_name.to_string(); - for update_id in start..=end { - let update_name = UpdateName::from(update_id); - let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; - let res = self.kv_store.remove(primary, &monitor_key, update_name.as_str(), true).await; - if let Err(e) = res { - log_error!( - self.logger, - "Failed to clean up channel monitor updates for monitor {}, reason: {}", - monitor_key.as_str(), - e - ); - }; - } - } } /// A struct representing a name for a channel monitor. diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 34f5d5fe36e..5f7a8d5981a 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -824,6 +824,10 @@ impl Persist for WatchtowerPers monitor_name, ); } + + fn flush(&self, _count: usize) -> Result, io::Error> { + Ok(Vec::new()) + } } pub struct TestPersister { @@ -887,6 +891,10 @@ impl Persist for TestPersister self.offchain_monitor_updates.lock().unwrap().remove(&monitor_name); self.chain_sync_monitor_persistences.lock().unwrap().retain(|x| x != &monitor_name); } + + fn flush(&self, _count: usize) -> Result, io::Error> { + Ok(Vec::new()) + } } // A simple multi-producer-single-consumer one-shot channel From eaa91ec6ab55086ecf00cb6d6e44caab05e7d2c4 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 4 Feb 2026 11:47:26 +0100 Subject: [PATCH 2/4] Add non-atomic ordered batch write method to KVStore traits Adds `write_batch` method to both `KVStoreSync` and `KVStore` traits with default implementations that delegate to the single `write` method. - `BatchWriteEntry`: struct containing namespace, key, and data - `BatchWriteResult`: returns successful write count and optional error - Writes execute sequentially in order; stops on first error - Existing implementations automatically inherit the default behavior Co-Authored-By: Claude Opus 4.5 --- lightning/src/util/persist.rs | 124 +++++++++++++++++++++++++++++++++- 1 file changed, 123 insertions(+), 1 deletion(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 7bcf46534ce..1480aa576e0 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -19,7 +19,7 @@ use bitcoin::{BlockHash, Txid}; use core::convert::Infallible; use core::future::Future; use core::ops::Deref; -use core::pin::pin; +use core::pin::{pin, Pin}; use core::str::FromStr; use core::task; @@ -119,6 +119,54 @@ pub const OUTPUT_SWEEPER_PERSISTENCE_KEY: &str = "output_sweeper"; /// updates applied to be current) with another implementation. pub const MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL: &[u8] = &[0xFF; 2]; +/// An entry in a batch write operation. +pub struct BatchWriteEntry { + /// The primary namespace for this write. + pub primary_namespace: String, + /// The secondary namespace for this write. + pub secondary_namespace: String, + /// The key to write to. + pub key: String, + /// The data to write. + pub buf: Vec, +} + +impl BatchWriteEntry { + /// Creates a new batch write entry. + pub fn new( + primary_namespace: impl Into, secondary_namespace: impl Into, + key: impl Into, buf: Vec, + ) -> Self { + Self { + primary_namespace: primary_namespace.into(), + secondary_namespace: secondary_namespace.into(), + key: key.into(), + buf, + } + } +} + +/// The result of a batch write operation. +#[derive(Debug)] +pub struct BatchWriteResult { + /// The number of writes that completed successfully. + pub successful_writes: usize, + /// The error that occurred, if any. If `None`, all writes succeeded. + pub error: Option, +} + +impl BatchWriteResult { + /// Returns `true` if all writes succeeded. + pub fn is_ok(&self) -> bool { + self.error.is_none() + } + + /// Returns the error if one occurred, consuming the result. + pub fn err(self) -> Option { + self.error + } +} + /// Provides an interface that allows storage and retrieval of persisted values that are associated /// with given keys. /// @@ -191,6 +239,31 @@ pub trait KVStoreSync { fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Result, io::Error>; + /// Persists multiple key-value pairs in a single batch operation. + /// + /// Processes writes in order. Non-atomic: if an error occurs, earlier writes may have already + /// been persisted and will not be rolled back. However, writes after the failed one are never + /// started. + /// + /// The default implementation iterates through entries and calls [`Self::write`] for each one. + /// Implementations may override for optimized batch operations. + fn write_batch(&self, entries: Vec) -> BatchWriteResult { + let mut successful_writes = 0; + for entry in entries { + match self.write( + &entry.primary_namespace, + &entry.secondary_namespace, + &entry.key, + entry.buf, + ) { + Ok(()) => successful_writes += 1, + Err(e) => { + return BatchWriteResult { successful_writes, error: Some(e) }; + }, + } + } + BatchWriteResult { successful_writes, error: None } + } } /// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. It is not necessary to use this type @@ -246,6 +319,13 @@ where async move { res } } + + fn write_batch( + &self, entries: Vec, + ) -> impl Future + 'static + MaybeSend { + let res = self.0.write_batch(entries); + async move { res } + } } /// Provides an interface that allows storage and retrieval of persisted values that are associated @@ -343,6 +423,48 @@ pub trait KVStore { fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> impl Future, io::Error>> + 'static + MaybeSend; + /// Persists multiple key-value pairs in a single batch operation. + /// + /// Processes writes in order, awaiting each write before starting the next. Non-atomic: if an + /// error occurs, earlier writes may have already been persisted and will not be rolled back. + /// However, writes after the failed one are never started. + /// + /// Note that similar to [`Self::write`], ordering is maintained: all writes in the batch are + /// ordered relative to each other and to concurrent writes. + /// + /// The default implementation calls [`Self::write`] for each entry sequentially. + fn write_batch( + &self, entries: Vec, + ) -> impl Future + 'static + MaybeSend { + // Capture all write futures synchronously to maintain ordering + // (version numbers are assigned when write() is called, not when awaited) + let write_futures: Vec> + Send>>> = + entries + .into_iter() + .map(|entry| { + let fut = self.write( + &entry.primary_namespace, + &entry.secondary_namespace, + &entry.key, + entry.buf, + ); + Box::pin(fut) as Pin> + Send>> + }) + .collect(); + + async move { + let mut successful_writes = 0; + for write_future in write_futures { + match write_future.await { + Ok(()) => successful_writes += 1, + Err(e) => { + return BatchWriteResult { successful_writes, error: Some(e) }; + }, + } + } + BatchWriteResult { successful_writes, error: None } + } + } } /// Provides additional interface methods that are required for [`KVStore`]-to-[`KVStore`] From c8248bbadf15973fb97ea570a968ea9d7458e2aa Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 5 Feb 2026 14:12:47 +0100 Subject: [PATCH 3/4] Use write_batch in MonitorUpdatingPersister flush() Refactors flush() to batch all writes into a single write_batch() call instead of individual write() calls. On partial failure, failed and subsequent writes are re-queued at the front of the pending writes queue for retry on the next flush. Co-Authored-By: Claude Opus 4.5 --- lightning/src/util/persist.rs | 107 ++++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 38 deletions(-) diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 1480aa576e0..22dc4a44905 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -1130,60 +1130,91 @@ where queue.drain(..n).collect::>() }; - let mut completed = Vec::new(); - for write in pending { + // Phase 1: Collect all batch entries + let mut batch_entries = Vec::with_capacity(pending.len()); + let mut stale_cleanups = Vec::new(); + + for (i, write) in pending.iter().enumerate() { match write { PendingWrite::FullMonitor { monitor_key, monitor_bytes, - completion, stale_update_cleanup, + .. } => { - self.0 - .kv_store - .write( - CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, - &monitor_key, - monitor_bytes, - ) - .await?; - completed.push(completion); - - // Clean up stale updates after successfully writing a full monitor + batch_entries.push(BatchWriteEntry::new( + CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, + monitor_key.clone(), + monitor_bytes.clone(), + )); if let Some((start, end)) = stale_update_cleanup { - for update_id in start..end { - let update_name = UpdateName::from(update_id); - // Lazy delete - ignore errors as this is just cleanup - let _ = self - .0 - .kv_store - .remove( - CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, - &monitor_key, - update_name.as_str(), - true, - ) - .await; - } + stale_cleanups.push((i, monitor_key.clone(), *start, *end)); } }, - PendingWrite::Update { monitor_key, update_key, update_bytes, completion } => { - self.0 + PendingWrite::Update { monitor_key, update_key, update_bytes, .. } => { + batch_entries.push(BatchWriteEntry::new( + CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, + monitor_key.clone(), + update_key.clone(), + update_bytes.clone(), + )); + }, + } + } + + // Phase 2: Execute batch write + let successful_writes = if !batch_entries.is_empty() { + let result = self.0.kv_store.write_batch(batch_entries).await; + if let Some(err) = result.error { + // Re-queue failed and subsequent writes + let failed_writes = + pending.into_iter().skip(result.successful_writes).collect::>(); + if !failed_writes.is_empty() { + let mut queue = self.0.pending_writes.lock().unwrap(); + // Prepend failed writes back to the front of the queue + for write in failed_writes.into_iter().rev() { + queue.insert(0, write); + } + } + return Err(err); + } + result.successful_writes + } else { + 0 + }; + + // Phase 3: Cleanup stale updates (only for successfully written monitors) + for (i, monitor_key, start, end) in stale_cleanups { + if i < successful_writes { + for update_id in start..end { + let update_name = UpdateName::from(update_id); + // Lazy delete - ignore errors as this is just cleanup + let _ = self + .0 .kv_store - .write( + .remove( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, &monitor_key, - &update_key, - update_bytes, + update_name.as_str(), + true, ) - .await?; - completed.push(completion); - }, + .await; + } } } - Ok(completed) + // Phase 4: Return completions for successful writes only + let completions = pending + .into_iter() + .take(successful_writes) + .map(|write| match write { + PendingWrite::FullMonitor { completion, .. } => completion, + PendingWrite::Update { completion, .. } => completion, + }) + .collect(); + + Ok(completions) } } From 6f20a0026ff610be6b65347b548de10aacb0af35 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 5 Feb 2026 14:38:48 +0100 Subject: [PATCH 4/4] Include ChannelManager in flush() write batch Extends Persist::flush() to accept channel_manager_bytes, allowing the channel manager to be written in the same write_batch() call as channel monitors. The channel manager is always written first in the batch to ensure proper ordering. This removes the separate channel manager persistence from the background processor and combines it with the monitor flush, reducing round trips. Co-Authored-By: Claude Opus 4.5 --- lightning-background-processor/src/lib.rs | 146 +++++++--------------- lightning/src/chain/chainmonitor.rs | 23 ++-- lightning/src/util/persist.rs | 75 +++++++---- lightning/src/util/test_utils.rs | 4 +- 4 files changed, 113 insertions(+), 135 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index e9e4e4ab487..7b31080fdda 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -57,11 +57,10 @@ use lightning::sign::{ use lightning::util::async_poll::MaybeSend; use lightning::util::logger::Logger; use lightning::util::persist::{ - KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, - SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStore, KVStoreSync, KVStoreSyncWrapper, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + SCORER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::sweep::{OutputSweeper, OutputSweeperSync}; use lightning::util::wakers::Future; @@ -1150,44 +1149,13 @@ where None => {}, } - let mut futures = Joiner::new(); + // Type A is unused but needed for inference - we use the same boxed future type as other slots + let mut futures: Joiner> + Send + 'static>>, _, _, _, _> = Joiner::new(); - // Capture the number of pending monitor writes before persisting the channel manager. - // We'll only flush this many writes after the manager is persisted, to avoid flushing - // monitor updates that arrived after the manager state was captured. + // Capture the number of pending monitor writes and whether manager needs persistence. + // We'll flush monitors and manager together in a single batch after other tasks complete. let pending_monitor_writes = chain_monitor.pending_write_count(); - - if channel_manager.get_cm().get_and_clear_needs_persistence() { - log_trace!(logger, "Persisting ChannelManager..."); - - let fut = async { - kv_store - .write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - channel_manager.get_cm().encode(), - ) - .await - }; - // TODO: Once our MSRV is 1.68 we should be able to drop the Box - let mut fut = Box::pin(fut); - - // Because persisting the ChannelManager is important to avoid accidental - // force-closures, go ahead and poll the future once before we do slightly more - // CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping - // below. This will get it moving but won't block us for too long if the underlying - // future is actually async. - use core::future::Future; - let mut waker = dummy_waker(); - let mut ctx = task::Context::from_waker(&mut waker); - match core::pin::Pin::new(&mut fut).poll(&mut ctx) { - task::Poll::Ready(res) => futures.set_a_res(res), - task::Poll::Pending => futures.set_a(fut), - } - - log_trace!(logger, "Done persisting ChannelManager."); - } + let needs_manager_persist = channel_manager.get_cm().get_and_clear_needs_persistence(); // Note that we want to archive stale ChannelMonitors and run a network graph prune once // not long after startup before falling back to their usual infrequent runs. This avoids @@ -1354,11 +1322,13 @@ where res?; } - // Flush the monitor writes that were pending before we persisted the channel manager. - // Any writes that arrived after are left in the queue for the next iteration. - if pending_monitor_writes > 0 { - match chain_monitor.flush(pending_monitor_writes) { - Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes), + // Flush monitors and manager together in a single batch. + // Any monitor writes that arrived after are left in the queue for the next iteration. + if pending_monitor_writes > 0 || needs_manager_persist { + log_trace!(logger, "Persisting ChannelManager and flushing {} monitor writes...", pending_monitor_writes); + let manager_bytes = channel_manager.get_cm().encode(); + match chain_monitor.flush(pending_monitor_writes, manager_bytes) { + Ok(()) => log_trace!(logger, "Flushed ChannelManager and {} monitor writes", pending_monitor_writes), Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), } } @@ -1416,25 +1386,18 @@ where } log_trace!(logger, "Terminating background processor."); - // After we exit, ensure we persist the ChannelManager one final time - this avoids - // some races where users quit while channel updates were in-flight, with - // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - kv_store - .write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - channel_manager.get_cm().encode(), - ) - .await?; - - // Flush all pending monitor writes after final channel manager persistence. + // After we exit, ensure we persist the ChannelManager one final time along with any + // pending monitor writes - this avoids some races where users quit while channel updates + // were in-flight, with ChannelMonitor update(s) persisted without a corresponding + // ChannelManager update. let pending_monitor_writes = chain_monitor.pending_write_count(); - if pending_monitor_writes > 0 { - match chain_monitor.flush(pending_monitor_writes) { - Ok(()) => log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes), - Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), - } + let manager_bytes = channel_manager.get_cm().encode(); + match chain_monitor.flush(pending_monitor_writes, manager_bytes) { + Ok(()) => log_trace!(logger, "Final flush: ChannelManager and {} monitor writes", pending_monitor_writes), + Err(e) => { + log_error!(logger, "Failed final flush: {}", e); + return Err(e); + }, } if let Some(ref scorer) = scorer { @@ -1746,25 +1709,17 @@ impl BackgroundProcessor { channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = Instant::now(); } - // Capture the number of pending monitor writes before persisting the channel manager. + // Capture the number of pending monitor writes and whether manager needs persistence. let pending_monitor_writes = chain_monitor.pending_write_count(); + let needs_manager_persist = channel_manager.get_cm().get_and_clear_needs_persistence(); - if channel_manager.get_cm().get_and_clear_needs_persistence() { - log_trace!(logger, "Persisting ChannelManager..."); - (kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - channel_manager.get_cm().encode(), - ))?; - log_trace!(logger, "Done persisting ChannelManager."); - } - - // Flush the monitor writes that were pending before we persisted the channel manager. - if pending_monitor_writes > 0 { - match chain_monitor.flush(pending_monitor_writes) { + // Flush monitors and manager together in a single batch. + if pending_monitor_writes > 0 || needs_manager_persist { + log_trace!(logger, "Persisting ChannelManager and flushing {} monitor writes...", pending_monitor_writes); + let manager_bytes = channel_manager.get_cm().encode(); + match chain_monitor.flush(pending_monitor_writes, manager_bytes) { Ok(()) => { - log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes) + log_trace!(logger, "Flushed ChannelManager and {} monitor writes", pending_monitor_writes) }, Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), } @@ -1881,25 +1836,20 @@ impl BackgroundProcessor { } } - // After we exit, ensure we persist the ChannelManager one final time - this avoids - // some races where users quit while channel updates were in-flight, with - // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - kv_store.write( - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - channel_manager.get_cm().encode(), - )?; - - // Flush all pending monitor writes after final channel manager persistence. + // After we exit, ensure we persist the ChannelManager one final time along with any + // pending monitor writes - this avoids some races where users quit while channel updates + // were in-flight, with ChannelMonitor update(s) persisted without a corresponding + // ChannelManager update. let pending_monitor_writes = chain_monitor.pending_write_count(); - if pending_monitor_writes > 0 { - match chain_monitor.flush(pending_monitor_writes) { - Ok(()) => { - log_trace!(logger, "Flushed {} monitor writes", pending_monitor_writes) - }, - Err(e) => log_error!(logger, "Failed to flush chain monitor: {}", e), - } + let manager_bytes = channel_manager.get_cm().encode(); + match chain_monitor.flush(pending_monitor_writes, manager_bytes) { + Ok(()) => { + log_trace!(logger, "Final flush: ChannelManager and {} monitor writes", pending_monitor_writes) + }, + Err(e) => { + log_error!(logger, "Failed final flush: {}", e); + return Err(e.into()); + }, } if let Some(ref scorer) = scorer { diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index c1d2265e0a9..9ce45bf3fc9 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -209,13 +209,17 @@ pub trait Persist { /// Flushes pending writes to the underlying storage. /// - /// The `count` parameter specifies how many pending writes to flush. + /// The `count` parameter specifies how many pending monitor writes to flush. + /// The `channel_manager_bytes` parameter contains the serialized channel manager to persist. + /// + /// The channel manager is always written first in the batch, before any monitor writes, + /// to ensure proper ordering (manager state should be at least as recent as monitors on disk). /// /// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`] /// from persist methods), this method should write queued data to storage. /// /// Returns the list of completed monitor updates (channel_id, update_id) that were flushed. - fn flush(&self, count: usize) -> Result, io::Error>; + fn flush(&self, count: usize, channel_manager_bytes: Vec) -> Result, io::Error>; } struct MonitorHolder { @@ -347,8 +351,8 @@ where self.persister.pending_write_count() } - fn flush(&self, count: usize) -> Result, io::Error> { - crate::util::persist::poll_sync_future(self.persister.flush(count)) + fn flush(&self, count: usize, channel_manager_bytes: Vec) -> Result, io::Error> { + crate::util::persist::poll_sync_future(self.persister.flush(count, channel_manager_bytes)) } } @@ -760,14 +764,17 @@ where /// Flushes pending writes to the underlying storage. /// - /// If `count` is `Some(n)`, only the first `n` pending writes are flushed. - /// If `count` is `None`, all pending writes are flushed. + /// The `count` parameter specifies how many pending monitor writes to flush. + /// The `channel_manager_bytes` parameter contains the serialized channel manager to persist. + /// + /// The channel manager is always written first in the batch, before any monitor writes, + /// to ensure proper ordering (manager state should be at least as recent as monitors on disk). /// /// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`] /// from persist methods), this method writes queued data to storage and signals /// completion to the channel manager via [`Self::channel_monitor_updated`]. - pub fn flush(&self, count: usize) -> Result<(), io::Error> { - let completed = self.persister.flush(count)?; + pub fn flush(&self, count: usize, channel_manager_bytes: Vec) -> Result<(), io::Error> { + let completed = self.persister.flush(count, channel_manager_bytes)?; for (channel_id, update_id) in completed { let _ = self.channel_monitor_updated(channel_id, update_id); } diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 22dc4a44905..8881e440a35 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -563,8 +563,15 @@ impl Persist Result, io::Error> { - // KVStoreSync implementations persist immediately, so there's nothing to flush. + fn flush(&self, _count: usize, channel_manager_bytes: Vec) -> Result, io::Error> { + // KVStoreSync implementations persist immediately, so there's nothing to flush + // for monitors. However, we still need to persist the channel manager. + self.write( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + channel_manager_bytes, + )?; Ok(Vec::new()) } } @@ -902,8 +909,8 @@ where self.0.pending_write_count() } - fn flush(&self, count: usize) -> Result, io::Error> { - poll_sync_future(self.0.flush(count)) + fn flush(&self, count: usize, channel_manager_bytes: Vec) -> Result, io::Error> { + poll_sync_future(self.0.flush(count, channel_manager_bytes)) } } @@ -1116,14 +1123,19 @@ where /// Flushes pending writes to the underlying [`KVStore`]. /// - /// If `count` is `Some(n)`, only the first `n` pending writes are flushed. - /// If `count` is `None`, all pending writes are flushed. + /// The `count` parameter specifies how many pending monitor writes to flush. + /// The `channel_manager_bytes` parameter contains the serialized channel manager to persist. + /// + /// The channel manager is always written first in the batch, before any monitor writes, + /// to ensure proper ordering (manager state should be at least as recent as monitors on disk). /// /// This method should be called after one or more calls that queue persist operations /// to actually write the data to storage. /// /// Returns the list of completed monitor updates (channel_id, update_id) that were flushed. - pub async fn flush(&self, count: usize) -> Result, io::Error> { + pub async fn flush( + &self, count: usize, channel_manager_bytes: Vec, + ) -> Result, io::Error> { let pending = { let mut queue = self.0.pending_writes.lock().unwrap(); let n = count.min(queue.len()); @@ -1131,7 +1143,15 @@ where }; // Phase 1: Collect all batch entries - let mut batch_entries = Vec::with_capacity(pending.len()); + // Channel manager goes FIRST to ensure it's written before monitors + let mut batch_entries = Vec::with_capacity(pending.len() + 1); + batch_entries.push(BatchWriteEntry::new( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + channel_manager_bytes, + )); + let mut stale_cleanups = Vec::new(); for (i, write) in pending.iter().enumerate() { @@ -1164,29 +1184,30 @@ where } // Phase 2: Execute batch write - let successful_writes = if !batch_entries.is_empty() { - let result = self.0.kv_store.write_batch(batch_entries).await; - if let Some(err) = result.error { - // Re-queue failed and subsequent writes - let failed_writes = - pending.into_iter().skip(result.successful_writes).collect::>(); - if !failed_writes.is_empty() { - let mut queue = self.0.pending_writes.lock().unwrap(); - // Prepend failed writes back to the front of the queue - for write in failed_writes.into_iter().rev() { - queue.insert(0, write); - } + let result = self.0.kv_store.write_batch(batch_entries).await; + if let Some(err) = result.error { + // The first entry is the channel manager, so successful_writes includes it + // Monitor writes start at index 1, so subtract 1 to get monitor success count + let successful_monitor_writes = + if result.successful_writes > 0 { result.successful_writes - 1 } else { 0 }; + // Re-queue failed and subsequent monitor writes + let failed_writes = + pending.into_iter().skip(successful_monitor_writes).collect::>(); + if !failed_writes.is_empty() { + let mut queue = self.0.pending_writes.lock().unwrap(); + // Prepend failed writes back to the front of the queue + for write in failed_writes.into_iter().rev() { + queue.insert(0, write); } - return Err(err); } - result.successful_writes - } else { - 0 - }; + return Err(err); + } + // Subtract 1 for the channel manager entry to get monitor success count + let successful_monitor_writes = result.successful_writes.saturating_sub(1); // Phase 3: Cleanup stale updates (only for successfully written monitors) for (i, monitor_key, start, end) in stale_cleanups { - if i < successful_writes { + if i < successful_monitor_writes { for update_id in start..end { let update_name = UpdateName::from(update_id); // Lazy delete - ignore errors as this is just cleanup @@ -1207,7 +1228,7 @@ where // Phase 4: Return completions for successful writes only let completions = pending .into_iter() - .take(successful_writes) + .take(successful_monitor_writes) .map(|write| match write { PendingWrite::FullMonitor { completion, .. } => completion, PendingWrite::Update { completion, .. } => completion, diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 5f7a8d5981a..f52068b53ad 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -825,7 +825,7 @@ impl Persist for WatchtowerPers ); } - fn flush(&self, _count: usize) -> Result, io::Error> { + fn flush(&self, _count: usize, _channel_manager_bytes: Vec) -> Result, io::Error> { Ok(Vec::new()) } } @@ -892,7 +892,7 @@ impl Persist for TestPersister self.chain_sync_monitor_persistences.lock().unwrap().retain(|x| x != &monitor_name); } - fn flush(&self, _count: usize) -> Result, io::Error> { + fn flush(&self, _count: usize, _channel_manager_bytes: Vec) -> Result, io::Error> { Ok(Vec::new()) } }