From d8e33c15baf82a4ce46754745f0a33582e1d54df Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 30 Dec 2025 14:09:27 +0000 Subject: [PATCH 1/7] Fix circular `Arc` reference in `LiquiditySource` `LiquiditySource` takes a reference to our `PeerManager` but the `PeerManager` holds an indirect reference to the `LiquiditySource`. As a result, after our `Node` instance is `stop`ped and the `Node` `drop`ped, much of the node's memory will stick around, including the `NetworkGraph`. Here we fix this issue by using `Weak` pointers, though note that there is another issue caused by LDK's gossip validation API. --- src/builder.rs | 2 +- src/liquidity.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 187f780d2..ee8931127 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1603,7 +1603,7 @@ fn build_with_store_internal( peer_manager_clone.process_events(); })); - liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager))); + liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::downgrade(&peer_manager))); gossip_source.set_gossip_verifier( Arc::clone(&chain_source), diff --git a/src/liquidity.rs b/src/liquidity.rs index 74e6098dd..2151110b6 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -9,7 +9,7 @@ use std::collections::HashMap; use std::ops::Deref; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::Duration; use bitcoin::hashes::{sha256, Hash}; @@ -291,7 +291,7 @@ where lsps2_service: Option, wallet: Arc, channel_manager: Arc, - peer_manager: RwLock>>, + peer_manager: RwLock>>, keys_manager: Arc, liquidity_manager: Arc, config: Arc, @@ -302,7 +302,7 @@ impl LiquiditySource where L::Target: LdkLogger, { - pub(crate) fn set_peer_manager(&self, peer_manager: Arc) { + pub(crate) fn set_peer_manager(&self, peer_manager: Weak) { *self.peer_manager.write().unwrap() = Some(peer_manager); } @@ -715,8 +715,8 @@ where return; }; - let init_features = if let Some(peer_manager) = - self.peer_manager.read().unwrap().as_ref() + let init_features = if let Some(Some(peer_manager)) = + self.peer_manager.read().unwrap().as_ref().map(|weak| weak.upgrade()) { // Fail if we're not connected to the prospective channel partner. if let Some(peer) = peer_manager.peer_by_node_id(&their_network_key) { From a7d2b6a1869d25c88ec08db3059f635ae537769f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 8 Jan 2026 14:47:22 +0000 Subject: [PATCH 2/7] Fix circular `Arc` reference in HRN resolver action In added logic to use the HRN resolver from `bitcoin-payment-instructions`, we created a circular `Arc` reference - the `LDKOnionMessageDNSSECHrnResolver` is used as a handler for the `OnionMessenger` but we also set a post-queue-action which holds a reference to the `PeerManager`. As a result, after our `Node` instance is `stop`ped and the `Node` `drop`ped, much of the node's memory will stick around, including the `NetworkGraph`. Here we fix this issue by using `Weak` pointers. --- src/builder.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index ee8931127..cea3d09f5 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1597,10 +1597,11 @@ fn build_with_store_internal( Arc::clone(&keys_manager), )); - let peer_manager_clone = Arc::clone(&peer_manager); - + let peer_manager_clone = Arc::downgrade(&peer_manager); hrn_resolver.register_post_queue_action(Box::new(move || { - peer_manager_clone.process_events(); + if let Some(upgraded_pointer) = peer_manager_clone.upgrade() { + upgraded_pointer.process_events(); + } })); liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::downgrade(&peer_manager))); From 809a2270efbbfb017e333d03e610086be5152627 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 8 Jan 2026 14:21:10 +0000 Subject: [PATCH 3/7] Update LDK, fixing a circular `Arc` reference in gossip validation LDK's gossip validation API basically forced us to have a circular `Arc` reference, leading to memory leaks after `drop`ping an instance of `Node`. This is fixed upstream in LDK PR #4294 which we update to here. --- Cargo.toml | 26 +++++++++++++------------- src/builder.rs | 16 +++++++--------- src/gossip.rs | 38 ++++++++++---------------------------- src/types.rs | 2 +- 4 files changed, 31 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cc2a4b194..431d6b8d8 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,17 +39,17 @@ default = [] #lightning-liquidity = { version = "0.2.0", features = ["std"] } #lightning-macros = { version = "0.2.0" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["std"] } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } +lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} @@ -78,13 +78,13 @@ log = { version = "0.4.22", default-features = false, features = ["std"]} vss-client = { package = "vss-client-ng", version = "0.4" } prost = { version = "0.11.6", default-features = false} #bitcoin-payment-instructions = { version = "0.6" } -bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "a9ad849a0eb7b155a688d713de6d9010cb48f073" } +bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "fdca6c62f2fe2c53427d3e51e322a49aa7323ee2" } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "1c730c8a16e28cc8e0c4817717ee63c97abcf4b0", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std", "_test_utils"] } proptest = "1.0.0" regex = "1.5.6" criterion = { version = "0.7.0", features = ["async_tokio"] } diff --git a/src/builder.rs b/src/builder.rs index cea3d09f5..ca8e71d03 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -20,7 +20,7 @@ use bitcoin::key::Secp256k1; use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Network}; use bitcoin_payment_instructions::onion_message_resolver::LDKOnionMessageDNSSECHrnResolver; -use lightning::chain::{chainmonitor, BestBlock, Watch}; +use lightning::chain::{chainmonitor, BestBlock}; use lightning::ln::channelmanager::{self, ChainParameters, ChannelManagerReadArgs}; use lightning::ln::msgs::{RoutingMessageHandler, SocketAddress}; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler}; @@ -1481,8 +1481,12 @@ fn build_with_store_internal( let gossip_source = match gossip_source_config { GossipSourceConfig::P2PNetwork => { - let p2p_source = - Arc::new(GossipSource::new_p2p(Arc::clone(&network_graph), Arc::clone(&logger))); + let p2p_source = Arc::new(GossipSource::new_p2p( + Arc::clone(&network_graph), + Arc::clone(&chain_source), + Arc::clone(&runtime), + Arc::clone(&logger), + )); // Reset the RGS sync timestamp in case we somehow switch gossip sources { @@ -1606,12 +1610,6 @@ fn build_with_store_internal( liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::downgrade(&peer_manager))); - gossip_source.set_gossip_verifier( - Arc::clone(&chain_source), - Arc::clone(&peer_manager), - Arc::clone(&runtime), - ); - let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); diff --git a/src/gossip.rs b/src/gossip.rs index 563d9e1ea..2b524d9ae 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -17,7 +17,7 @@ use crate::chain::ChainSource; use crate::config::RGS_SYNC_TIMEOUT_SECS; use crate::logger::{log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; -use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup}; +use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync}; use crate::Error; pub(crate) enum GossipSource { @@ -33,12 +33,15 @@ pub(crate) enum GossipSource { } impl GossipSource { - pub fn new_p2p(network_graph: Arc, logger: Arc) -> Self { - let gossip_sync = Arc::new(P2PGossipSync::new( - network_graph, - None::>, - Arc::clone(&logger), - )); + pub fn new_p2p( + network_graph: Arc, chain_source: Arc, runtime: Arc, + logger: Arc, + ) -> Self { + let verifier = chain_source.as_utxo_source().map(|utxo_source| { + Arc::new(GossipVerifier::new(Arc::new(utxo_source), RuntimeSpawner::new(runtime))) + }); + + let gossip_sync = Arc::new(P2PGossipSync::new(network_graph, verifier, logger)); Self::P2PNetwork { gossip_sync } } @@ -62,27 +65,6 @@ impl GossipSource { } } - pub(crate) fn set_gossip_verifier( - &self, chain_source: Arc, peer_manager: Arc, - runtime: Arc, - ) { - match self { - Self::P2PNetwork { gossip_sync } => { - if let Some(utxo_source) = chain_source.as_utxo_source() { - let spawner = RuntimeSpawner::new(Arc::clone(&runtime)); - let gossip_verifier = Arc::new(GossipVerifier::new( - Arc::new(utxo_source), - spawner, - Arc::clone(gossip_sync), - peer_manager, - )); - gossip_sync.add_utxo_lookup(Some(gossip_verifier)); - } - }, - _ => (), - } - } - pub async fn update_rgs_snapshot(&self) -> Result { match self { Self::P2PNetwork { gossip_sync: _, .. } => Ok(0), diff --git a/src/types.rs b/src/types.rs index 5e9cd74c9..2b7d3829a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -254,7 +254,7 @@ pub(crate) type Scorer = CombinedScorer, Arc>; pub(crate) type Graph = gossip::NetworkGraph>; -pub(crate) type UtxoLookup = GossipVerifier, Arc>; +pub(crate) type UtxoLookup = GossipVerifier>; pub(crate) type P2PGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; From 97a750f3ff1f6847a7d24146193f136066753834 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 30 Dec 2025 14:09:57 +0000 Subject: [PATCH 4/7] Add test for circular references leading to `NetworkGraph` leaks Due to two circular `Arc` references, after `stop`ping and `drop`ping the `Node` instance the bulk of ldk-node's memory (in the form of the `NetworkGraph`) would hang around. Here we add a test for this in our integration tests, checking if the `NetworkGraph` (as a proxy for other objects held in reference by the `PeerManager`) hangs around after `Node`s are `drop`ped. --- .github/workflows/cln-integration.yml | 2 +- .github/workflows/lnd-integration.yml | 2 +- .github/workflows/rust.yml | 4 +-- .github/workflows/vss-integration.yml | 2 +- Cargo.toml | 1 + benches/payments.rs | 6 ++-- src/lib.rs | 6 ++++ src/logger.rs | 10 ++++++ tests/common/mod.rs | 48 +++++++++++++++++++++++++-- tests/integration_tests_rust.rs | 37 +++++++++++++-------- tests/integration_tests_vss.rs | 10 +++--- 11 files changed, 99 insertions(+), 29 deletions(-) diff --git a/.github/workflows/cln-integration.yml b/.github/workflows/cln-integration.yml index 32e7b74c0..d7624181b 100644 --- a/.github/workflows/cln-integration.yml +++ b/.github/workflows/cln-integration.yml @@ -27,4 +27,4 @@ jobs: socat -d -d UNIX-LISTEN:/tmp/lightning-rpc,reuseaddr,fork TCP:127.0.0.1:9937& - name: Run CLN integration tests - run: RUSTFLAGS="--cfg cln_test" cargo test --test integration_tests_cln + run: RUSTFLAGS="--cfg cln_test --cfg cycle_tests" cargo test --test integration_tests_cln diff --git a/.github/workflows/lnd-integration.yml b/.github/workflows/lnd-integration.yml index f913e92ad..894209c4b 100644 --- a/.github/workflows/lnd-integration.yml +++ b/.github/workflows/lnd-integration.yml @@ -51,6 +51,6 @@ jobs: - name: Run LND integration tests run: LND_CERT_PATH=$LND_DATA_DIR/tls.cert LND_MACAROON_PATH=$LND_DATA_DIR/data/chain/bitcoin/regtest/admin.macaroon - RUSTFLAGS="--cfg lnd_test" cargo test --test integration_tests_lnd -- --exact --show-output + RUSTFLAGS="--cfg lnd_test --cfg cycle_tests" cargo test --test integration_tests_lnd -- --exact --show-output env: LND_DATA_DIR: ${{ env.LND_DATA_DIR }} diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 661703ded..1ccade444 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -80,11 +80,11 @@ jobs: - name: Test on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest'" run: | - RUSTFLAGS="--cfg no_download" cargo test + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test - name: Test with UniFFI support on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest' && matrix.build-uniffi" run: | - RUSTFLAGS="--cfg no_download" cargo test --features uniffi + RUSTFLAGS="--cfg no_download --cfg cycle_tests" cargo test --features uniffi doc: name: Documentation diff --git a/.github/workflows/vss-integration.yml b/.github/workflows/vss-integration.yml index 8473ed413..b5c4e9a0b 100644 --- a/.github/workflows/vss-integration.yml +++ b/.github/workflows/vss-integration.yml @@ -45,4 +45,4 @@ jobs: cd ldk-node export TEST_VSS_BASE_URL="http://localhost:8080/vss" RUSTFLAGS="--cfg vss_test" cargo test io::vss_store - RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss + RUSTFLAGS="--cfg vss_test --cfg cycle_tests" cargo test --test integration_tests_vss diff --git a/Cargo.toml b/Cargo.toml index 431d6b8d8..207ad92a1 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -124,6 +124,7 @@ check-cfg = [ "cfg(tokio_unstable)", "cfg(cln_test)", "cfg(lnd_test)", + "cfg(cycle_tests)", ] [[bench]] diff --git a/benches/payments.rs b/benches/payments.rs index ba69e046d..4114f69cb 100644 --- a/benches/payments.rs +++ b/benches/payments.rs @@ -8,7 +8,7 @@ use bitcoin::hex::DisplayHex; use bitcoin::Amount; use common::{ expect_channel_ready_event, generate_blocks_and_wait, premine_and_distribute_funds, - setup_bitcoind_and_electrsd, setup_two_nodes_with_store, TestChainSource, + setup_bitcoind_and_electrsd, setup_two_nodes_with_store, TestChainSource, TestNode, }; use criterion::{criterion_group, criterion_main, Criterion}; use ldk_node::{Event, Node}; @@ -18,7 +18,7 @@ use tokio::task::{self}; use crate::common::open_channel_push_amt; -fn spawn_payment(node_a: Arc, node_b: Arc, amount_msat: u64) { +fn spawn_payment(node_a: Arc, node_b: Arc, amount_msat: u64) { let mut preimage_bytes = [0u8; 32]; rand::rng().fill_bytes(&mut preimage_bytes); let preimage = PaymentPreimage(preimage_bytes); @@ -61,7 +61,7 @@ fn spawn_payment(node_a: Arc, node_b: Arc, amount_msat: u64) { }); } -async fn send_payments(node_a: Arc, node_b: Arc) -> std::time::Duration { +async fn send_payments(node_a: Arc, node_b: Arc) -> std::time::Duration { let start = Instant::now(); let total_payments = 1000; diff --git a/src/lib.rs b/src/lib.rs index d9bca4551..cd234798b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1761,6 +1761,12 @@ impl Node { Error::PersistenceFailed }) } + + #[cfg(cycle_tests)] + /// Fetch a reference to the inner NetworkGraph, for Arc cycle detection + pub fn fetch_ref(&self) -> std::sync::Weak { + Arc::downgrade(&self.network_graph) + } } impl Drop for Node { diff --git a/src/logger.rs b/src/logger.rs index 4eaefad74..6a98b92f2 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -171,6 +171,14 @@ impl LogWriter for Writer { } } +#[cfg(cycle_tests)] +/// Our log writer +pub struct Logger { + /// Specifies the logger's writer. + writer: Writer, +} + +#[cfg(not(cycle_tests))] pub(crate) struct Logger { /// Specifies the logger's writer. writer: Writer, @@ -195,10 +203,12 @@ impl Logger { Ok(Self { writer: Writer::FileWriter { file_path, max_log_level } }) } + /// Creates a new logger writing to the `log` crate pub fn new_log_facade() -> Self { Self { writer: Writer::LogFacadeWriter } } + /// Creates a new logger writing to an underlying [`LogWriter`] pub fn new_custom_writer(log_writer: Arc) -> Self { Self { writer: Writer::CustomWriter(log_writer) } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 96f58297c..5a725812d 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -13,6 +13,7 @@ pub(crate) mod logging; use std::collections::{HashMap, HashSet}; use std::env; use std::future::Future; +use std::ops::Deref; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -261,10 +262,51 @@ pub(crate) fn random_config(anchor_channels: bool) -> TestConfig { TestConfig { node_config, ..Default::default() } } +pub struct TestNode { + #[cfg(feature = "uniffi")] + inner: Option>, + #[cfg(not(feature = "uniffi"))] + inner: Option, +} + #[cfg(feature = "uniffi")] -type TestNode = Arc; +impl From> for TestNode { + fn from(inner: Arc) -> Self { + Self { inner: Some(inner) } + } +} + #[cfg(not(feature = "uniffi"))] -type TestNode = Node; +impl From for TestNode { + fn from(inner: Node) -> Self { + Self { inner: Some(inner) } + } +} + +impl Deref for TestNode { + type Target = Node; + fn deref(&self) -> &Node { + #[cfg(feature = "uniffi")] + { + self.inner.as_ref().unwrap().deref() + } + #[cfg(not(feature = "uniffi"))] + { + &self.inner.as_ref().unwrap() + } + } +} + +#[cfg(cycle_tests)] +impl Drop for TestNode { + fn drop(&mut self) { + if !std::thread::panicking() { + let graph_ref = (**self).fetch_ref(); + self.inner.take(); + assert_eq!(graph_ref.strong_count(), 0); + } + } +} #[derive(Clone)] pub(crate) enum TestChainSource<'a> { @@ -430,7 +472,7 @@ pub(crate) fn setup_node_for_async_payments( node.start().unwrap(); assert!(node.status().is_running); assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); - node + node.into() } pub(crate) async fn generate_blocks_and_wait( diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 4d2a17422..99743cad5 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -23,8 +23,8 @@ use common::{ expect_splice_pending_event, generate_blocks_and_wait, open_channel, open_channel_push_amt, premine_and_distribute_funds, premine_blocks, prepare_rbf, random_config, random_listening_addresses, setup_bitcoind_and_electrsd, setup_builder, setup_node, - setup_node_for_async_payments, setup_two_nodes, wait_for_tx, TestChainSource, TestStoreType, - TestSyncStore, + setup_node_for_async_payments, setup_two_nodes, wait_for_tx, TestChainSource, TestNode, + TestStoreType, TestSyncStore, }; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::liquidity::LSPS2ServiceConfig; @@ -154,15 +154,15 @@ async fn multi_hop_sending() { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); // Setup and fund 5 nodes - let mut nodes = Vec::new(); + let mut nodes: Vec = Vec::new(); for _ in 0..5 { let config = random_config(true); let sync_config = EsploraSyncConfig { background_sync_config: None }; setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = builder.build(config.node_entropy.into()).unwrap(); + let node: TestNode = builder.build(config.node_entropy.into()).unwrap().into(); node.start().unwrap(); - nodes.push(node); + nodes.push(node.into()); } let addresses = nodes.iter().map(|n| n.onchain_payment().new_address().unwrap()).collect(); @@ -1730,7 +1730,8 @@ async fn do_lsps2_client_service_integration(client_trusts_lsp: bool) { setup_builder!(service_builder, service_config.node_config); service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); service_builder.set_liquidity_provider_lsps2(lsps2_service_config); - let service_node = service_builder.build(service_config.node_entropy.into()).unwrap(); + let service_node: TestNode = + service_builder.build(service_config.node_entropy.into()).unwrap().into(); service_node.start().unwrap(); let service_node_id = service_node.node_id(); @@ -1740,13 +1741,15 @@ async fn do_lsps2_client_service_integration(client_trusts_lsp: bool) { setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); client_builder.set_liquidity_source_lsps2(service_node_id, service_addr, None); - let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); + let client_node: TestNode = + client_builder.build(client_config.node_entropy.into()).unwrap().into(); client_node.start().unwrap(); let payer_config = random_config(true); setup_builder!(payer_builder, payer_config.node_config); payer_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let payer_node = payer_builder.build(payer_config.node_entropy.into()).unwrap(); + let payer_node: TestNode = + payer_builder.build(payer_config.node_entropy.into()).unwrap().into(); payer_node.start().unwrap(); let service_addr = service_node.onchain_payment().new_address().unwrap(); @@ -2047,7 +2050,8 @@ async fn lsps2_client_trusts_lsp() { setup_builder!(service_builder, service_config.node_config); service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); service_builder.set_liquidity_provider_lsps2(lsps2_service_config); - let service_node = service_builder.build(service_config.node_entropy.into()).unwrap(); + let service_node: TestNode = + service_builder.build(service_config.node_entropy.into()).unwrap().into(); service_node.start().unwrap(); let service_node_id = service_node.node_id(); let service_addr = service_node.listening_addresses().unwrap().first().unwrap().clone(); @@ -2056,14 +2060,16 @@ async fn lsps2_client_trusts_lsp() { setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); client_builder.set_liquidity_source_lsps2(service_node_id, service_addr.clone(), None); - let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); + let client_node: TestNode = + client_builder.build(client_config.node_entropy.into()).unwrap().into(); client_node.start().unwrap(); let client_node_id = client_node.node_id(); let payer_config = random_config(true); setup_builder!(payer_builder, payer_config.node_config); payer_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let payer_node = payer_builder.build(payer_config.node_entropy.into()).unwrap(); + let payer_node: TestNode = + payer_builder.build(payer_config.node_entropy.into()).unwrap().into(); payer_node.start().unwrap(); let service_addr_onchain = service_node.onchain_payment().new_address().unwrap(); @@ -2220,7 +2226,8 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { setup_builder!(service_builder, service_config.node_config); service_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); service_builder.set_liquidity_provider_lsps2(lsps2_service_config); - let service_node = service_builder.build(service_config.node_entropy.into()).unwrap(); + let service_node: TestNode = + service_builder.build(service_config.node_entropy.into()).unwrap().into(); service_node.start().unwrap(); let service_node_id = service_node.node_id(); @@ -2230,7 +2237,8 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); client_builder.set_liquidity_source_lsps2(service_node_id, service_addr.clone(), None); - let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); + let client_node: TestNode = + client_builder.build(client_config.node_entropy.into()).unwrap().into(); client_node.start().unwrap(); let client_node_id = client_node.node_id(); @@ -2238,7 +2246,8 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { let payer_config = random_config(true); setup_builder!(payer_builder, payer_config.node_config); payer_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let payer_node = payer_builder.build(payer_config.node_entropy.into()).unwrap(); + let payer_node: TestNode = + payer_builder.build(payer_config.node_entropy.into()).unwrap().into(); payer_node.start().unwrap(); let service_addr_onchain = service_node.onchain_payment().new_address().unwrap(); diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index 54912b358..2184052e9 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -24,28 +24,30 @@ async fn channel_full_cycle_with_vss_store() { let mut builder_a = Builder::from_config(config_a.node_config); builder_a.set_chain_source_esplora(esplora_url.clone(), None); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); - let node_a = builder_a + let node_a: common::TestNode = builder_a .build_with_vss_store_and_fixed_headers( config_a.node_entropy, vss_base_url.clone(), "node_1_store".to_string(), HashMap::new(), ) - .unwrap(); + .unwrap() + .into(); node_a.start().unwrap(); println!("\n== Node B =="); let config_b = common::random_config(true); let mut builder_b = Builder::from_config(config_b.node_config); builder_b.set_chain_source_esplora(esplora_url.clone(), None); - let node_b = builder_b + let node_b: common::TestNode = builder_b .build_with_vss_store_and_fixed_headers( config_b.node_entropy, vss_base_url, "node_2_store".to_string(), HashMap::new(), ) - .unwrap(); + .unwrap() + .into(); node_b.start().unwrap(); common::do_channel_full_cycle( From 2e844bec42b0492de941167c1e3a5a027405d8bd Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 8 Jan 2026 19:16:49 +0000 Subject: [PATCH 5/7] Upgrade to latest LDK (which spawns futures with a return value) --- Cargo.toml | 26 +++++++++++++------------- src/gossip.rs | 14 ++++++++++++-- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 207ad92a1..c6ac55c61 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,17 +39,17 @@ default = [] #lightning-liquidity = { version = "0.2.0", features = ["std"] } #lightning-macros = { version = "0.2.0" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} @@ -78,13 +78,13 @@ log = { version = "0.4.22", default-features = false, features = ["std"]} vss-client = { package = "vss-client-ng", version = "0.4" } prost = { version = "0.11.6", default-features = false} #bitcoin-payment-instructions = { version = "0.6" } -bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "fdca6c62f2fe2c53427d3e51e322a49aa7323ee2" } +bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "ce9ff5281ae9bb05526981f6f9df8f8d929c7c44" } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std", "_test_utils"] } proptest = "1.0.0" regex = "1.5.6" criterion = { version = "0.7.0", features = ["async_tokio"] } diff --git a/src/gossip.rs b/src/gossip.rs index 2b524d9ae..6fd3fb039 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -126,7 +126,17 @@ impl RuntimeSpawner { } impl FutureSpawner for RuntimeSpawner { - fn spawn + Send + 'static>(&self, future: T) { - self.runtime.spawn_cancellable_background_task(future); + type E = tokio::sync::oneshot::error::RecvError; + type SpawnedFutureResult = tokio::sync::oneshot::Receiver; + fn spawn + Send + 'static>( + &self, future: F, + ) -> Self::SpawnedFutureResult { + let (result, output) = tokio::sync::oneshot::channel(); + self.runtime.spawn_cancellable_background_task(async move { + // We don't care if the send works or not, if the receiver is dropped its not our + // problem. + let _ = result.send(future.await); + }); + output } } From 53bcbb1401cf86876f27484b202484d09ae1c6f7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 8 Jan 2026 19:25:44 +0000 Subject: [PATCH 6/7] Switch to the new highly-parallel `ChannelMonitor` reader Upstream LDK added the ability to read `ChannelMonitor`s from storage in parallel, which we switch to here. --- src/builder.rs | 24 +++++++++++++++++++----- src/types.rs | 14 +++++++++++++- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index ca8e71d03..270132164 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -51,7 +51,7 @@ use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; -use crate::gossip::GossipSource; +use crate::gossip::{GossipSource, RuntimeSpawner}; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, @@ -72,8 +72,9 @@ use crate::peer_store::PeerStore; use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager, - MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, + KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, + SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1261,8 +1262,9 @@ fn build_with_store_internal( )); let peer_storage_key = keys_manager.get_peer_storage_key(); - let persister = Arc::new(Persister::new( + let monitor_reader = Arc::new(AsyncPersister::new( Arc::clone(&kv_store), + RuntimeSpawner::new(Arc::clone(&runtime)), Arc::clone(&logger), PERSISTER_MAX_PENDING_UPDATES, Arc::clone(&keys_manager), @@ -1272,7 +1274,9 @@ fn build_with_store_internal( )); // Read ChannelMonitor state from store - let channel_monitors = match persister.read_all_channel_monitors_with_updates() { + let monitor_read_result = + runtime.block_on(monitor_reader.read_all_channel_monitors_with_updates_parallel()); + let channel_monitors = match monitor_read_result { Ok(monitors) => monitors, Err(e) => { if e.kind() == lightning::io::ErrorKind::NotFound { @@ -1284,6 +1288,16 @@ fn build_with_store_internal( }, }; + let persister = Arc::new(Persister::new( + Arc::clone(&kv_store), + Arc::clone(&logger), + PERSISTER_MAX_PENDING_UPDATES, + Arc::clone(&keys_manager), + Arc::clone(&keys_manager), + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + )); + // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( Some(Arc::clone(&chain_source)), diff --git a/src/types.rs b/src/types.rs index 2b7d3829a..f1112fcf8 100644 --- a/src/types.rs +++ b/src/types.rs @@ -23,7 +23,9 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister}; +use lightning::util::persist::{ + KVStore, KVStoreSync, MonitorUpdatingPersister, MonitorUpdatingPersisterAsync, +}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; use lightning_block_sync::gossip::GossipVerifier; @@ -185,6 +187,16 @@ impl DynStoreTrait for DynStoreWrapper } } +pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync< + Arc, + RuntimeSpawner, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + pub type Persister = MonitorUpdatingPersister< Arc, Arc, From 71fdb1d1417b2bd18cd7ae41d7f53f657d51ce3a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 8 Jan 2026 19:51:14 +0000 Subject: [PATCH 7/7] Parallelize store reads in init Since I was editing the init logic anyway I couldn't resist going ahead and parallelizing various read calls. Since we added support for an async `KVStore` in LDK 0.2/ldk-node 0.7, we can now practically do initialization reads in parallel. Thus, rather than making a long series of read calls in `build`, we use `tokio::join` to reduce the number of round-trips to our backing store, which should be a very large win for initialization cost on those using remote storage (e.g. VSS). Sadly we can't trivially do all our reads in one go, we need the payment history to initialize the BDK wallet, which is used in the `Walet` object which is referenced in our `KeysManager`. Thus we first read the payment store and node metrics before moving on. Then, we need a reference to the `NetworkGraph` when we build the scorer. While we could/eventually should move to reading the *bytes* for the scorer while reading the graph and only building the scorer later, that's a larger refactor we leave for later. In the end, we end up with: * 1 round-trip to load the payment history and node metrics, * 2 round-trips to load ChannelMonitors and NetworkGraph (where there's an internal extra round-trip after listing the monitor updates for a monitor), * 1 round-trip to validate bitcoind RPC/REST access for those using bitcoind as a chain source, * 1 round-trip to load various smaller LDK and ldk-node objects, * and 1 additional round-trip to drop the rgs snapshot timestamp for nodes using P2P network gossip syncing for a total of 4 round-trips in the common case and 6 for nodes using less common chain source and gossip sync sources. We then have additional round-trips to our storage and chain source during node start, but those are in many cases already async. --- src/builder.rs | 135 +++++++++++++++++++++++++++++-------------------- 1 file changed, 80 insertions(+), 55 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 270132164..f661d2da1 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -33,7 +33,7 @@ use lightning::routing::scoring::{ }; use lightning::sign::{EntropySource, NodeSigner}; use lightning::util::persist::{ - KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::ReadableArgs; @@ -1052,10 +1052,20 @@ fn build_with_store_internal( } } + let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); + let fee_estimator = Arc::new(OnchainFeeEstimator::new()); + + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let (payment_store_res, node_metris_res) = runtime.block_on(async move { + tokio::join!( + read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + ) + }); + // Initialize the status fields. - let node_metrics = match runtime - .block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await }) - { + let node_metrics = match node_metris_res { Ok(metrics) => Arc::new(RwLock::new(metrics)), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1066,23 +1076,20 @@ fn build_with_store_internal( } }, }; - let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); - let fee_estimator = Arc::new(OnchainFeeEstimator::new()); - let payment_store = - match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) { - Ok(payments) => Arc::new(PaymentStore::new( - payments, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), - Arc::clone(&kv_store), - Arc::clone(&logger), - )), - Err(e) => { - log_error!(logger, "Failed to read payment data from store: {}", e); - return Err(BuildError::ReadFailed); - }, - }; + let payment_store = match payment_store_res { + Ok(payments) => Arc::new(PaymentStore::new( + payments, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { @@ -1273,10 +1280,18 @@ fn build_with_store_internal( Arc::clone(&fee_estimator), )); + // Read ChannelMonitors and the NetworkGraph + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let (monitor_read_res, network_graph_res) = runtime.block_on(async move { + tokio::join!( + monitor_reader.read_all_channel_monitors_with_updates_parallel(), + read_network_graph(&*kv_store_ref, logger_ref), + ) + }); + // Read ChannelMonitor state from store - let monitor_read_result = - runtime.block_on(monitor_reader.read_all_channel_monitors_with_updates_parallel()); - let channel_monitors = match monitor_read_result { + let channel_monitors = match monitor_read_res { Ok(monitors) => monitors, Err(e) => { if e.kind() == lightning::io::ErrorKind::NotFound { @@ -1310,9 +1325,7 @@ fn build_with_store_internal( )); // Initialize the network graph, scorer, and router - let network_graph = match runtime - .block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await }) - { + let network_graph = match network_graph_res { Ok(graph) => Arc::new(graph), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1324,9 +1337,42 @@ fn build_with_store_internal( }, }; - let local_scorer = match runtime.block_on(async { - read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await - }) { + // Read various smaller LDK and ldk-node objects from the store + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let network_graph_ref = Arc::clone(&network_graph); + let output_sweeper_future = read_output_sweeper( + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&chain_source), + Arc::clone(&keys_manager), + Arc::clone(&kv_store_ref), + Arc::clone(&logger_ref), + ); + let ( + scorer_res, + external_scores_res, + channel_manager_bytes_res, + sweeper_bytes_res, + event_queue_res, + peer_info_res, + ) = runtime.block_on(async move { + tokio::join!( + read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)), + read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)), + KVStore::read( + &*kv_store_ref, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ), + output_sweeper_future, + read_event_queue(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)), + read_peer_info(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)), + ) + }); + + let local_scorer = match scorer_res { Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1342,9 +1388,7 @@ fn build_with_store_internal( let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); // Restore external pathfinding scores from cache if possible. - match runtime.block_on(async { - read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await - }) { + match external_scores_res { Ok(external_scores) => { scorer.lock().unwrap().merge(external_scores, cur_time); log_trace!(logger, "External scores from cache merged successfully"); @@ -1397,12 +1441,7 @@ fn build_with_store_internal( // Initialize the ChannelManager let channel_manager = { - if let Ok(reader) = KVStoreSync::read( - &*kv_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - ) { + if let Ok(reader) = channel_manager_bytes_res { let channel_monitor_references = channel_monitors.iter().map(|(_, chanmon)| chanmon).collect(); let read_args = ChannelManagerReadArgs::new( @@ -1627,17 +1666,7 @@ fn build_with_store_internal( let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); - let output_sweeper = match runtime.block_on(async { - read_output_sweeper( - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&chain_source), - Arc::clone(&keys_manager), - Arc::clone(&kv_store), - Arc::clone(&logger), - ) - .await - }) { + let output_sweeper = match sweeper_bytes_res { Ok(output_sweeper) => Arc::new(output_sweeper), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1658,9 +1687,7 @@ fn build_with_store_internal( }, }; - let event_queue = match runtime - .block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await }) - { + let event_queue = match event_queue_res { Ok(event_queue) => Arc::new(event_queue), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1672,9 +1699,7 @@ fn build_with_store_internal( }, }; - let peer_store = match runtime - .block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await }) - { + let peer_store = match peer_info_res { Ok(peer_store) => Arc::new(peer_store), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound {