Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 122 additions & 25 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ use lightning::util::logger::Logger;
use lightning::util::persist::{
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
use lightning::util::wakers::Future;
Expand Down Expand Up @@ -182,6 +183,13 @@ const ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(60 * 10);
#[cfg(test)]
const ARCHIVE_STALE_MONITORS_TIMER: Duration = Duration::from_secs(1);

/// After this many incremental ChannelManager updates, consolidate by writing a full
/// ChannelManager and cleaning up old updates.
#[cfg(not(test))]
const CHANNEL_MANAGER_UPDATE_CONSOLIDATION_THRESHOLD: u64 = 100;
#[cfg(test)]
const CHANNEL_MANAGER_UPDATE_CONSOLIDATION_THRESHOLD: u64 = 5;

/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
const fn min_duration(a: Duration, b: Duration) -> Duration {
if a.as_nanos() < b.as_nanos() {
Expand Down Expand Up @@ -1150,18 +1158,32 @@ where
None => {},
}

let mut futures = Joiner::new();
// Prepare ChannelManager update data outside the async block so it lives long enough.
// This must be declared before `futures` so it outlives the future references.
let cm_update_data = if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager update...");

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");
let mut buf = Vec::new();
let update_id = channel_manager
.get_cm()
.write_update(&mut buf)
.map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?;
Some((update_id, update_id.to_string(), buf))
} else {
None
};

let mut futures = Joiner::new();

if let Some((_, ref update_key, ref buf)) = cm_update_data {
let buf = buf.clone(); // Clone for the async block
let fut = async {
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
"",
update_key,
buf,
)
.await
};
Expand All @@ -1181,7 +1203,7 @@ where
task::Poll::Pending => futures.set_a(fut),
}

log_trace!(logger, "Done persisting ChannelManager.");
log_trace!(logger, "Done persisting ChannelManager update.");
}

// Note that we want to archive stale ChannelMonitors and run a network graph prune once
Expand Down Expand Up @@ -1349,6 +1371,40 @@ where
res?;
}

// Consolidate if we've accumulated enough updates
if let Some((update_id, _, _)) = cm_update_data {
if update_id % CHANNEL_MANAGER_UPDATE_CONSOLIDATION_THRESHOLD == 0 {
log_trace!(logger, "Consolidating ChannelManager updates...");
// Write full ChannelManager
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)
.await?;
// Clean up old updates
let update_keys =
kv_store.list(CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, "").await?;
for key in update_keys {
if let Ok(id) = key.parse::<u64>() {
if id <= update_id {
let _ = kv_store
.remove(
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
"",
&key,
true,
)
.await;
}
}
}
log_trace!(logger, "Done consolidating ChannelManager updates.");
}
}

match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
sleeper(ONION_MESSAGE_HANDLER_TIMER)
}) {
Expand Down Expand Up @@ -1405,12 +1461,17 @@ where
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
let mut buf = Vec::new();
let update_id = channel_manager
.get_cm()
.write_update(&mut buf)
.map_err(|e| lightning::io::Error::new(lightning::io::ErrorKind::Other, e))?;
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
"",
&update_id.to_string(),
buf,
)
.await?;
if let Some(ref scorer) = scorer {
Expand Down Expand Up @@ -1723,14 +1784,47 @@ impl BackgroundProcessor {
last_freshness_call = Instant::now();
}
if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");
log_trace!(logger, "Persisting ChannelManager update...");
let mut buf = Vec::new();
let update_id = channel_manager
.get_cm()
.write_update(&mut buf)
.map_err(std::io::Error::other)?;
(kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
"",
&update_id.to_string(),
buf,
))?;
log_trace!(logger, "Done persisting ChannelManager.");
log_trace!(logger, "Done persisting ChannelManager update {}.", update_id);

// Consolidate if we've accumulated enough updates
if update_id % CHANNEL_MANAGER_UPDATE_CONSOLIDATION_THRESHOLD == 0 {
log_trace!(logger, "Consolidating ChannelManager updates...");
// Write full ChannelManager
kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)?;
// Clean up old updates
let update_keys = kv_store
.list(CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, "")?;
for key in update_keys {
if let Ok(id) = key.parse::<u64>() {
if id <= update_id {
let _ = kv_store.remove(
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
"",
&key,
true,
);
}
}
}
log_trace!(logger, "Done consolidating ChannelManager updates.");
}
}

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
Expand Down Expand Up @@ -1847,11 +1941,14 @@ impl BackgroundProcessor {
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
let mut buf = Vec::new();
let update_id =
channel_manager.get_cm().write_update(&mut buf).map_err(std::io::Error::other)?;
kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
CHANNEL_MANAGER_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
"",
&update_id.to_string(),
buf,
)?;
if let Some(ref scorer) = scorer {
kv_store.write(
Expand Down
Loading
Loading