diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index dafeffe98bf..fede2611d0f 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -17315,6 +17315,304 @@ impl Readable for VecDeque<(Event, Option)> { } } +// Raw deserialized data from a ChannelManager, before validation or reconstruction. +// This is an internal DTO used in the two-stage deserialization process. +pub(super) struct ChannelManagerData +where + SP::Target: SignerProvider, +{ + chain_hash: ChainHash, + best_block_height: u32, + best_block_hash: BlockHash, + channels: Vec>, + forward_htlcs: HashMap>, + claimable_htlcs: Vec<(PaymentHash, Vec)>, + peer_init_features: Vec<(PublicKey, InitFeatures)>, + pending_events: VecDeque<(events::Event, Option)>, + highest_seen_timestamp: u32, + pending_intercepted_htlcs: Option>, + pending_outbound_payments: HashMap, + pending_claiming_payments: HashMap, + received_network_pubkey: Option, + monitor_update_blocked_actions_per_peer: + Vec<(PublicKey, BTreeMap>)>, + fake_scid_rand_bytes: Option<[u8; 32]>, + claimable_htlc_purposes: Option>, + probing_cookie_secret: Option<[u8; 32]>, + claimable_htlc_onion_fields: Option>>, + decode_update_add_htlcs: Option>>, + inbound_payment_id_secret: Option<[u8; 32]>, + in_flight_monitor_updates: Option>>, + peer_storage_dir: Option)>>, + async_receive_offer_cache: AsyncReceiveOfferCache, +} + +/// Arguments for deserializing [`ChannelManagerData`]. +struct ChannelManagerDataReadArgs<'a, ES: Deref, SP: Deref, L: Deref> +where + ES::Target: EntropySource, + SP::Target: SignerProvider, + L::Target: Logger, +{ + entropy_source: &'a ES, + signer_provider: &'a SP, + config: UserConfig, + logger: &'a L, +} + +impl<'a, ES: Deref, SP: Deref, L: Deref> ReadableArgs> + for ChannelManagerData +where + ES::Target: EntropySource, + SP::Target: SignerProvider, + L::Target: Logger, +{ + fn read( + reader: &mut R, args: ChannelManagerDataReadArgs<'a, ES, SP, L>, + ) -> Result { + let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); + + let chain_hash: ChainHash = Readable::read(reader)?; + let best_block_height: u32 = Readable::read(reader)?; + let best_block_hash: BlockHash = Readable::read(reader)?; + + const MAX_ALLOC_SIZE: usize = 1024 * 64; + + let channel_count: u64 = Readable::read(reader)?; + let mut channels = Vec::with_capacity(cmp::min(channel_count as usize, 128)); + for _ in 0..channel_count { + let channel: FundedChannel = FundedChannel::read( + reader, + ( + args.entropy_source, + args.signer_provider, + &provided_channel_type_features(&args.config), + ), + )?; + channels.push(channel); + } + + let forward_htlcs_count: u64 = Readable::read(reader)?; + let mut forward_htlcs: HashMap> = + hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128)); + for _ in 0..forward_htlcs_count { + let short_channel_id = Readable::read(reader)?; + let pending_forwards_count: u64 = Readable::read(reader)?; + let mut pending_forwards = Vec::with_capacity(cmp::min( + pending_forwards_count as usize, + MAX_ALLOC_SIZE / mem::size_of::(), + )); + for _ in 0..pending_forwards_count { + pending_forwards.push(Readable::read(reader)?); + } + forward_htlcs.insert(short_channel_id, pending_forwards); + } + + let claimable_htlcs_count: u64 = Readable::read(reader)?; + let mut claimable_htlcs = Vec::with_capacity(cmp::min(claimable_htlcs_count as usize, 128)); + for _ in 0..claimable_htlcs_count { + let payment_hash = Readable::read(reader)?; + let previous_hops_len: u64 = Readable::read(reader)?; + let mut previous_hops = Vec::with_capacity(cmp::min( + previous_hops_len as usize, + MAX_ALLOC_SIZE / mem::size_of::(), + )); + for _ in 0..previous_hops_len { + previous_hops.push(::read(reader)?); + } + claimable_htlcs.push((payment_hash, previous_hops)); + } + + let peer_count: u64 = Readable::read(reader)?; + let mut peer_init_features = Vec::with_capacity(cmp::min(peer_count as usize, 128)); + for _ in 0..peer_count { + let peer_pubkey: PublicKey = Readable::read(reader)?; + let latest_features = Readable::read(reader)?; + peer_init_features.push((peer_pubkey, latest_features)); + } + + let event_count: u64 = Readable::read(reader)?; + let mut pending_events: VecDeque<(events::Event, Option)> = + VecDeque::with_capacity(cmp::min( + event_count as usize, + MAX_ALLOC_SIZE / mem::size_of::<(events::Event, Option)>(), + )); + for _ in 0..event_count { + match MaybeReadable::read(reader)? { + Some(event) => pending_events.push_back((event, None)), + None => continue, + } + } + + let background_event_count: u64 = Readable::read(reader)?; + for _ in 0..background_event_count { + match ::read(reader)? { + 0 => { + // LDK versions prior to 0.0.116 wrote pending `MonitorUpdateRegeneratedOnStartup`s here, + // however we really don't (and never did) need them - we regenerate all + // on-startup monitor updates. + let _: OutPoint = Readable::read(reader)?; + let _: ChannelMonitorUpdate = Readable::read(reader)?; + }, + _ => return Err(DecodeError::InvalidValue), + } + } + + let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111 + let highest_seen_timestamp: u32 = Readable::read(reader)?; + + // The last version where a pending inbound payment may have been added was 0.0.116. + let pending_inbound_payment_count: u64 = Readable::read(reader)?; + for _ in 0..pending_inbound_payment_count { + let payment_hash: PaymentHash = Readable::read(reader)?; + let logger = WithContext::from(args.logger, None, None, Some(payment_hash)); + let inbound: PendingInboundPayment = Readable::read(reader)?; + log_warn!( + logger, + "Ignoring deprecated pending inbound payment with payment hash {}: {:?}", + payment_hash, + inbound + ); + } + + let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?; + let mut pending_outbound_payments_compat: HashMap = + hash_map_with_capacity(cmp::min( + pending_outbound_payments_count_compat as usize, + MAX_ALLOC_SIZE / 32, + )); + for _ in 0..pending_outbound_payments_count_compat { + let session_priv = Readable::read(reader)?; + let payment = PendingOutboundPayment::Legacy { + session_privs: hash_set_from_iter([session_priv]), + }; + if pending_outbound_payments_compat.insert(PaymentId(session_priv), payment).is_some() { + return Err(DecodeError::InvalidValue); + }; + } + + let mut pending_intercepted_htlcs: Option> = None; + let mut decode_update_add_htlcs: Option>> = None; + let mut pending_outbound_payments_no_retry: Option>> = + None; + let mut pending_outbound_payments = None; + let mut received_network_pubkey: Option = None; + let mut fake_scid_rand_bytes: Option<[u8; 32]> = None; + let mut probing_cookie_secret: Option<[u8; 32]> = None; + let mut claimable_htlc_purposes = None; + let mut claimable_htlc_onion_fields = None; + let mut pending_claiming_payments = Some(new_hash_map()); + let mut monitor_update_blocked_actions_per_peer: Option>)>> = + Some(Vec::new()); + let mut events_override = None; + let mut legacy_in_flight_monitor_updates: Option< + HashMap<(PublicKey, OutPoint), Vec>, + > = None; + let mut in_flight_monitor_updates: Option< + HashMap<(PublicKey, ChannelId), Vec>, + > = None; + let mut inbound_payment_id_secret = None; + let mut peer_storage_dir: Option)>> = None; + let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new(); + read_tlv_fields!(reader, { + (1, pending_outbound_payments_no_retry, option), + (2, pending_intercepted_htlcs, option), + (3, pending_outbound_payments, option), + (4, pending_claiming_payments, option), + (5, received_network_pubkey, option), + (6, monitor_update_blocked_actions_per_peer, option), + (7, fake_scid_rand_bytes, option), + (8, events_override, option), + (9, claimable_htlc_purposes, optional_vec), + (10, legacy_in_flight_monitor_updates, option), + (11, probing_cookie_secret, option), + (13, claimable_htlc_onion_fields, optional_vec), + (14, decode_update_add_htlcs, option), + (15, inbound_payment_id_secret, option), + (17, in_flight_monitor_updates, option), + (19, peer_storage_dir, optional_vec), + (21, async_receive_offer_cache, (default_value, async_receive_offer_cache)), + }); + + // Merge legacy pending_outbound_payments fields into a single HashMap. + // Priority: pending_outbound_payments (TLV 3) > pending_outbound_payments_no_retry (TLV 1) + // > pending_outbound_payments_compat (non-TLV legacy) + let pending_outbound_payments = pending_outbound_payments.unwrap_or_else(|| { + pending_outbound_payments_no_retry + .map(|no_retry| { + no_retry + .into_iter() + .map(|(id, session_privs)| { + (id, PendingOutboundPayment::Legacy { session_privs }) + }) + .collect() + }) + .unwrap_or(pending_outbound_payments_compat) + }); + + // Merge legacy in-flight monitor updates (keyed by OutPoint) into the new format (keyed by + // ChannelId). + if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates { + // We should never serialize an empty map. + if legacy_in_flight_upds.is_empty() { + return Err(DecodeError::InvalidValue); + } + match &in_flight_monitor_updates { + None => { + // Convert legacy format (OutPoint) to new format (ChannelId). + // All channels with legacy in flight monitor updates are v1 channels. + in_flight_monitor_updates = Some( + legacy_in_flight_upds + .into_iter() + .map(|((counterparty_node_id, funding_txo), updates)| { + let channel_id = ChannelId::v1_from_funding_outpoint(funding_txo); + ((counterparty_node_id, channel_id), updates) + }) + .collect(), + ); + }, + Some(upds) if upds.is_empty() => { + // Both TLVs present but new one is empty - invalid. + return Err(DecodeError::InvalidValue); + }, + Some(_) => {}, // New format takes precedence, nothing to do. + } + } + + // Resolve events_override: if present, it replaces pending_events. + let pending_events = events_override.unwrap_or(pending_events); + + Ok(ChannelManagerData { + chain_hash, + best_block_height, + best_block_hash, + channels, + forward_htlcs, + claimable_htlcs, + peer_init_features, + pending_events, + highest_seen_timestamp, + pending_intercepted_htlcs, + pending_outbound_payments, + // unwrap safety: pending_claiming_payments is guaranteed to be `Some` after read_tlv_fields + pending_claiming_payments: pending_claiming_payments.unwrap(), + received_network_pubkey, + // unwrap safety: monitor_update_blocked_actions_per_peer is guaranteed to be `Some` after read_tlv_fields + monitor_update_blocked_actions_per_peer: monitor_update_blocked_actions_per_peer + .unwrap(), + fake_scid_rand_bytes, + claimable_htlc_purposes, + probing_cookie_secret, + claimable_htlc_onion_fields, + decode_update_add_htlcs, + inbound_payment_id_secret, + in_flight_monitor_updates, + peer_storage_dir, + async_receive_offer_cache, + }) + } +} + /// Arguments for the creation of a ChannelManager that are not deserialized. /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation @@ -17581,13 +17879,71 @@ where L::Target: Logger, { fn read( - reader: &mut Reader, mut args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, MR, L>, + reader: &mut Reader, args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, MR, L>, ) -> Result { - let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); + // Stage 1: Pure deserialization into DTO + let data: ChannelManagerData = ChannelManagerData::read( + reader, + ChannelManagerDataReadArgs { + entropy_source: &args.entropy_source, + signer_provider: &args.signer_provider, + config: args.config.clone(), + logger: &args.logger, + }, + )?; - let chain_hash: ChainHash = Readable::read(reader)?; - let best_block_height: u32 = Readable::read(reader)?; - let best_block_hash: BlockHash = Readable::read(reader)?; + // Stage 2: Validation and reconstruction + ChannelManager::from_channel_manager_data(data, args) + } +} + +impl< + M: Deref, + T: Deref, + ES: Deref, + NS: Deref, + SP: Deref, + F: Deref, + R: Deref, + MR: Deref, + L: Deref + Clone, + > ChannelManager +where + M::Target: chain::Watch<::EcdsaSigner>, + T::Target: BroadcasterInterface, + ES::Target: EntropySource, + NS::Target: NodeSigner, + SP::Target: SignerProvider, + F::Target: FeeEstimator, + R::Target: Router, + MR::Target: MessageRouter, + L::Target: Logger, +{ + /// Constructs a `ChannelManager` from deserialized data and runtime dependencies. + /// + /// This is the second stage of deserialization, taking the raw [`ChannelManagerData`] and combining it with the + /// provided [`ChannelManagerReadArgs`] to produce a fully functional `ChannelManager`. + /// + /// This method performs validation, reconciliation with [`ChannelMonitor`]s, and reconstruction of internal state. + /// It may close channels if monitors are ahead of the serialized state, and will replay any pending + /// [`ChannelMonitorUpdate`]s. + pub(super) fn from_channel_manager_data( + data: ChannelManagerData, + mut args: ChannelManagerReadArgs<'_, M, T, ES, NS, SP, F, R, MR, L>, + ) -> Result<(BlockHash, Self), DecodeError> { + // Extract mutable/complex fields into local variables; simple fields accessed via data.* + let mut forward_htlcs_legacy = data.forward_htlcs; + let mut claimable_htlcs_list = data.claimable_htlcs; + let mut pending_intercepted_htlcs_legacy = + data.pending_intercepted_htlcs.unwrap_or_else(new_hash_map); + let pending_outbound_payments = data.pending_outbound_payments; + let mut fake_scid_rand_bytes = data.fake_scid_rand_bytes; + let mut probing_cookie_secret = data.probing_cookie_secret; + let mut decode_update_add_htlcs_legacy = + data.decode_update_add_htlcs.unwrap_or_else(new_hash_map); + let mut inbound_payment_id_secret = data.inbound_payment_id_secret; + let mut in_flight_monitor_updates = data.in_flight_monitor_updates; + let mut pending_events_read = data.pending_events; let empty_peer_state = || PeerState { channel_by_id: new_hash_map(), @@ -17602,25 +17958,18 @@ where is_connected: false, }; + const MAX_ALLOC_SIZE: usize = 1024 * 64; let mut failed_htlcs = Vec::new(); - let channel_count: u64 = Readable::read(reader)?; - let mut channel_id_set = hash_set_with_capacity(cmp::min(channel_count as usize, 128)); + let channel_count = data.channels.len(); + let mut channel_id_set = hash_set_with_capacity(cmp::min(channel_count, 128)); let mut per_peer_state = hash_map_with_capacity(cmp::min( - channel_count as usize, + channel_count, MAX_ALLOC_SIZE / mem::size_of::<(PublicKey, Mutex>)>(), )); - let mut short_to_chan_info = hash_map_with_capacity(cmp::min(channel_count as usize, 128)); + let mut short_to_chan_info = hash_map_with_capacity(cmp::min(channel_count, 128)); let mut channel_closures = VecDeque::new(); let mut close_background_events = Vec::new(); - for _ in 0..channel_count { - let mut channel: FundedChannel = FundedChannel::read( - reader, - ( - &args.entropy_source, - &args.signer_provider, - &provided_channel_type_features(&args.config), - ), - )?; + for mut channel in data.channels { let logger = WithChannelContext::from(&args.logger, &channel.context, None); let channel_id = channel.context.channel_id(); channel_id_set.insert(channel_id); @@ -17869,168 +18218,15 @@ where } } - const MAX_ALLOC_SIZE: usize = 1024 * 64; - let forward_htlcs_count: u64 = Readable::read(reader)?; - // Marked `_legacy` because in versions > 0.2 we are taking steps to remove the requirement of - // regularly persisting the `ChannelManager` and instead rebuild the set of HTLC forwards from - // `Channel{Monitor}` data. See `reconstruct_manager_from_monitors` usage below. - let mut forward_htlcs_legacy: HashMap> = - hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128)); - for _ in 0..forward_htlcs_count { - let short_channel_id = Readable::read(reader)?; - let pending_forwards_count: u64 = Readable::read(reader)?; - let mut pending_forwards = Vec::with_capacity(cmp::min( - pending_forwards_count as usize, - MAX_ALLOC_SIZE / mem::size_of::(), - )); - for _ in 0..pending_forwards_count { - pending_forwards.push(Readable::read(reader)?); - } - forward_htlcs_legacy.insert(short_channel_id, pending_forwards); - } - - let claimable_htlcs_count: u64 = Readable::read(reader)?; - let mut claimable_htlcs_list = - Vec::with_capacity(cmp::min(claimable_htlcs_count as usize, 128)); - for _ in 0..claimable_htlcs_count { - let payment_hash = Readable::read(reader)?; - let previous_hops_len: u64 = Readable::read(reader)?; - let mut previous_hops = Vec::with_capacity(cmp::min( - previous_hops_len as usize, - MAX_ALLOC_SIZE / mem::size_of::(), - )); - for _ in 0..previous_hops_len { - previous_hops.push(::read(reader)?); - } - claimable_htlcs_list.push((payment_hash, previous_hops)); - } - - let peer_count: u64 = Readable::read(reader)?; - for _ in 0..peer_count { - let peer_pubkey: PublicKey = Readable::read(reader)?; - let latest_features = Readable::read(reader)?; + // Apply peer features from deserialized data + for (peer_pubkey, latest_features) in data.peer_init_features { if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) { peer_state.get_mut().unwrap().latest_features = latest_features; } } - let event_count: u64 = Readable::read(reader)?; - let mut pending_events_read: VecDeque<(events::Event, Option)> = - VecDeque::with_capacity(cmp::min( - event_count as usize, - MAX_ALLOC_SIZE / mem::size_of::<(events::Event, Option)>(), - )); - for _ in 0..event_count { - match MaybeReadable::read(reader)? { - Some(event) => pending_events_read.push_back((event, None)), - None => continue, - } - } - - let background_event_count: u64 = Readable::read(reader)?; - for _ in 0..background_event_count { - match ::read(reader)? { - 0 => { - // LDK versions prior to 0.0.116 wrote pending `MonitorUpdateRegeneratedOnStartup`s here, - // however we really don't (and never did) need them - we regenerate all - // on-startup monitor updates. - let _: OutPoint = Readable::read(reader)?; - let _: ChannelMonitorUpdate = Readable::read(reader)?; - }, - _ => return Err(DecodeError::InvalidValue), - } - } - - let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111 - let highest_seen_timestamp: u32 = Readable::read(reader)?; - - // The last version where a pending inbound payment may have been added was 0.0.116. - let pending_inbound_payment_count: u64 = Readable::read(reader)?; - for _ in 0..pending_inbound_payment_count { - let payment_hash: PaymentHash = Readable::read(reader)?; - let logger = WithContext::from(&args.logger, None, None, Some(payment_hash)); - let inbound: PendingInboundPayment = Readable::read(reader)?; - log_warn!( - logger, - "Ignoring deprecated pending inbound payment with payment hash {}: {:?}", - payment_hash, - inbound - ); - } - - let pending_outbound_payments_count_compat: u64 = Readable::read(reader)?; - let mut pending_outbound_payments_compat: HashMap = - hash_map_with_capacity(cmp::min( - pending_outbound_payments_count_compat as usize, - MAX_ALLOC_SIZE / 32, - )); - for _ in 0..pending_outbound_payments_count_compat { - let session_priv = Readable::read(reader)?; - let payment = PendingOutboundPayment::Legacy { - session_privs: hash_set_from_iter([session_priv]), - }; - if pending_outbound_payments_compat.insert(PaymentId(session_priv), payment).is_some() { - return Err(DecodeError::InvalidValue); - }; - } - - // Marked `_legacy` because in versions > 0.2 we are taking steps to remove the requirement of - // regularly persisting the `ChannelManager` and instead rebuild the set of HTLC forwards from - // `Channel{Monitor}` data. See `reconstruct_manager_from_monitors` below. - let mut pending_intercepted_htlcs_legacy: Option> = - None; - let mut decode_update_add_htlcs_legacy: Option>> = - None; - - // pending_outbound_payments_no_retry is for compatibility with 0.0.101 clients. - let mut pending_outbound_payments_no_retry: Option>> = - None; - let mut pending_outbound_payments = None; - let mut received_network_pubkey: Option = None; - let mut fake_scid_rand_bytes: Option<[u8; 32]> = None; - let mut probing_cookie_secret: Option<[u8; 32]> = None; - let mut claimable_htlc_purposes = None; - let mut claimable_htlc_onion_fields = None; - let mut pending_claiming_payments = Some(new_hash_map()); - let mut monitor_update_blocked_actions_per_peer: Option>)>> = - Some(Vec::new()); - let mut events_override = None; - let mut legacy_in_flight_monitor_updates: Option< - HashMap<(PublicKey, OutPoint), Vec>, - > = None; - // We use this one over the legacy since they represent the same data, just with a different - // key. We still need to read the legacy one as it's an even TLV. - let mut in_flight_monitor_updates: Option< - HashMap<(PublicKey, ChannelId), Vec>, - > = None; - let mut inbound_payment_id_secret = None; - let mut peer_storage_dir: Option)>> = None; - let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new(); - read_tlv_fields!(reader, { - (1, pending_outbound_payments_no_retry, option), - (2, pending_intercepted_htlcs_legacy, option), - (3, pending_outbound_payments, option), - (4, pending_claiming_payments, option), - (5, received_network_pubkey, option), - (6, monitor_update_blocked_actions_per_peer, option), - (7, fake_scid_rand_bytes, option), - (8, events_override, option), - (9, claimable_htlc_purposes, optional_vec), - (10, legacy_in_flight_monitor_updates, option), - (11, probing_cookie_secret, option), - (13, claimable_htlc_onion_fields, optional_vec), - (14, decode_update_add_htlcs_legacy, option), - (15, inbound_payment_id_secret, option), - (17, in_flight_monitor_updates, option), - (19, peer_storage_dir, optional_vec), - (21, async_receive_offer_cache, (default_value, async_receive_offer_cache)), - }); - let mut decode_update_add_htlcs_legacy = - decode_update_add_htlcs_legacy.unwrap_or_else(|| new_hash_map()); - let mut pending_intercepted_htlcs_legacy = - pending_intercepted_htlcs_legacy.unwrap_or_else(|| new_hash_map()); + // Post-deserialization processing let mut decode_update_add_htlcs = new_hash_map(); - let peer_storage_dir: Vec<(PublicKey, Vec)> = peer_storage_dir.unwrap_or_else(Vec::new); if fake_scid_rand_bytes.is_none() { fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes()); } @@ -18043,50 +18239,17 @@ where inbound_payment_id_secret = Some(args.entropy_source.get_secure_random_bytes()); } - if let Some(events) = events_override { - pending_events_read = events; - } - if !channel_closures.is_empty() { pending_events_read.append(&mut channel_closures); } - if pending_outbound_payments.is_none() && pending_outbound_payments_no_retry.is_none() { - pending_outbound_payments = Some(pending_outbound_payments_compat); - } else if pending_outbound_payments.is_none() { - let mut outbounds = new_hash_map(); - for (id, session_privs) in pending_outbound_payments_no_retry.unwrap().drain() { - outbounds.insert(id, PendingOutboundPayment::Legacy { session_privs }); - } - pending_outbound_payments = Some(outbounds); - } let pending_outbounds = - OutboundPayments::new(pending_outbound_payments.unwrap(), args.logger.clone()); - - for (peer_pubkey, peer_storage) in peer_storage_dir { - if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) { - peer_state.get_mut().unwrap().peer_storage = peer_storage; - } - } + OutboundPayments::new(pending_outbound_payments, args.logger.clone()); - // Handle transitioning from the legacy TLV to the new one on upgrades. - if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates { - // We should never serialize an empty map. - if legacy_in_flight_upds.is_empty() { - return Err(DecodeError::InvalidValue); - } - if in_flight_monitor_updates.is_none() { - let in_flight_upds = - in_flight_monitor_updates.get_or_insert_with(|| new_hash_map()); - for ((counterparty_node_id, funding_txo), updates) in legacy_in_flight_upds { - // All channels with legacy in flight monitor updates are v1 channels. - let channel_id = ChannelId::v1_from_funding_outpoint(funding_txo); - in_flight_upds.insert((counterparty_node_id, channel_id), updates); - } - } else { - // We should never serialize an empty map. - if in_flight_monitor_updates.as_ref().unwrap().is_empty() { - return Err(DecodeError::InvalidValue); + if let Some(peer_storage_dir) = data.peer_storage_dir { + for (peer_pubkey, peer_storage) in peer_storage_dir { + if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) { + peer_state.get_mut().unwrap().peer_storage = peer_storage; } } } @@ -18413,7 +18576,7 @@ where htlc.payment_hash, session_priv_bytes, &path, - best_block_height, + data.best_block_height, ); } } @@ -18715,11 +18878,11 @@ where let expanded_inbound_key = args.node_signer.get_expanded_key(); let mut claimable_payments = hash_map_with_capacity(claimable_htlcs_list.len()); - if let Some(purposes) = claimable_htlc_purposes { + if let Some(purposes) = data.claimable_htlc_purposes { if purposes.len() != claimable_htlcs_list.len() { return Err(DecodeError::InvalidValue); } - if let Some(onion_fields) = claimable_htlc_onion_fields { + if let Some(onion_fields) = data.claimable_htlc_onion_fields { if onion_fields.len() != claimable_htlcs_list.len() { return Err(DecodeError::InvalidValue); } @@ -18817,7 +18980,7 @@ where Ok(key) => key, Err(()) => return Err(DecodeError::InvalidValue), }; - if let Some(network_pubkey) = received_network_pubkey { + if let Some(network_pubkey) = data.received_network_pubkey { if network_pubkey != our_network_pubkey { log_error!(args.logger, "Key that was generated does not match the existing key."); return Err(DecodeError::InvalidValue); @@ -18836,8 +18999,8 @@ where loop { outbound_scid_alias = fake_scid::Namespace::OutboundAlias .get_fake_scid( - best_block_height, - &chain_hash, + data.best_block_height, + &data.chain_hash, fake_scid_rand_bytes.as_ref().unwrap(), &args.entropy_source, ); @@ -18884,7 +19047,7 @@ where let bounded_fee_estimator = LowerBoundedFeeEstimator::new(args.fee_estimator); for (node_id, monitor_update_blocked_actions) in - monitor_update_blocked_actions_per_peer.unwrap() + data.monitor_update_blocked_actions_per_peer { if let Some(peer_state) = per_peer_state.get(&node_id) { for (channel_id, actions) in monitor_update_blocked_actions.iter() { @@ -19013,22 +19176,22 @@ where } } - let best_block = BestBlock::new(best_block_hash, best_block_height); + let best_block = BestBlock::new(data.best_block_hash, data.best_block_height); let flow = OffersMessageFlow::new( - chain_hash, + data.chain_hash, best_block, our_network_pubkey, - highest_seen_timestamp, + data.highest_seen_timestamp, expanded_inbound_key, args.node_signer.get_receive_auth_key(), secp_ctx.clone(), args.message_router, args.logger.clone(), ) - .with_async_payments_offers_cache(async_receive_offer_cache); + .with_async_payments_offers_cache(data.async_receive_offer_cache); let channel_manager = ChannelManager { - chain_hash, + chain_hash: data.chain_hash, fee_estimator: bounded_fee_estimator, chain_monitor: args.chain_monitor, tx_broadcaster: args.tx_broadcaster, @@ -19045,7 +19208,7 @@ where decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs), claimable_payments: Mutex::new(ClaimablePayments { claimable_payments, - pending_claiming_payments: pending_claiming_payments.unwrap(), + pending_claiming_payments: data.pending_claiming_payments, }), outbound_scid_aliases: Mutex::new(outbound_scid_aliases), short_to_chan_info: FairRwLock::new(short_to_chan_info), @@ -19057,7 +19220,7 @@ where our_network_pubkey, secp_ctx, - highest_seen_timestamp: AtomicUsize::new(highest_seen_timestamp as usize), + highest_seen_timestamp: AtomicUsize::new(data.highest_seen_timestamp as usize), per_peer_state: FairRwLock::new(per_peer_state), @@ -19369,7 +19532,7 @@ where //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. - Ok((best_block_hash.clone(), channel_manager)) + Ok((data.best_block_hash, channel_manager)) } }