diff --git a/Cargo.lock b/Cargo.lock index 5d3a0e1aa..b6a15be3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1525,7 +1525,7 @@ dependencies = [ [[package]] name = "mutiny-wasm" -version = "1.15.0" +version = "1.16.0" dependencies = [ "anyhow", "async-trait", diff --git a/mutiny-core/src/nodemanager.rs b/mutiny-core/src/nodemanager.rs index ca608dc27..46c414fff 100644 --- a/mutiny-core/src/nodemanager.rs +++ b/mutiny-core/src/nodemanager.rs @@ -689,12 +689,23 @@ impl NodeManager { } } + // check keychain size + let did_keychain_compact_this_round = match nm.wallet.try_compact_keychain().await { + Ok(did_keychain_compact_this_round) => did_keychain_compact_this_round, + Err(e) => { + log_error!(nm.logger, "Failed to compact keychain: {e}"); + false + } + }; + // wait for next sync round, checking graceful shutdown check each second. - for _ in 0..sync_interval_secs { - if nm.stop.load(Ordering::Relaxed) { - return; + if !did_keychain_compact_this_round { + for _ in 0..sync_interval_secs { + if nm.stop.load(Ordering::Relaxed) { + return; + } + sleep(1_000).await; } - sleep(1_000).await; } } }); diff --git a/mutiny-core/src/onchain.rs b/mutiny-core/src/onchain.rs index 9e6fe9aeb..d96396092 100644 --- a/mutiny-core/src/onchain.rs +++ b/mutiny-core/src/onchain.rs @@ -13,7 +13,7 @@ use bdk_wallet::bitcoin::FeeRate; use bdk_wallet::psbt::PsbtUtils; use bdk_wallet::template::DescriptorTemplateOut; use bdk_wallet::{ - CreateParams, KeychainKind, LoadParams, LocalOutput, SignOptions, Update, Wallet, + ChangeSet, CreateParams, KeychainKind, LoadParams, LocalOutput, SignOptions, Update, Wallet, }; use bitcoin::bip32::{ChildNumber, DerivationPath, Xpriv}; use bitcoin::consensus::serialize; @@ -39,9 +39,15 @@ use crate::utils; use crate::utils::{now, sleep}; use crate::TransactionDetails; +#[cfg(not(target_arch = "wasm32"))] +use std::time::Instant; +#[cfg(target_arch = "wasm32")] +use web_time::Instant; + pub(crate) const FULL_SYNC_STOP_GAP: usize = 150; pub(crate) const RESTORE_SYNC_STOP_GAP: usize = 50; const PARALLEL_REQUESTS: usize = 10; +const KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES: usize = 256 * 1024; // 256KB #[derive(Clone)] pub struct OnChainWallet { @@ -53,6 +59,10 @@ pub struct OnChainWallet { pub(crate) stop: Arc, logger: Arc, ln_event_callback: Option, + /// The Bitcoin output descriptors for the wallet’s keychains: + /// 0: receive_descriptor + /// 1: change_descriptor + tr_descriptors: (DescriptorTemplateOut, DescriptorTemplateOut), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] @@ -112,8 +122,11 @@ impl OnChainWallet { None | Some(Ok(None)) => { // we don't have a bdk wallet, create one Wallet::create_with_params( - CreateParams::new(receive_descriptor_template, change_descriptor_template) - .network(network), + CreateParams::new( + receive_descriptor_template.clone(), + change_descriptor_template.clone(), + ) + .network(network), )? } Some(Err(bdk_wallet::LoadError::Mismatch(_))) => { @@ -121,8 +134,11 @@ impl OnChainWallet { db.delete(&[KEYCHAIN_STORE_KEY])?; db.write_data(NEED_FULL_SYNC_KEY.to_string(), true, None)?; Wallet::create_with_params( - CreateParams::new(receive_descriptor_template, change_descriptor_template) - .network(network), + CreateParams::new( + receive_descriptor_template.clone(), + change_descriptor_template.clone(), + ) + .network(network), )? } Some(Err(e)) => { @@ -140,6 +156,7 @@ impl OnChainWallet { stop, logger, ln_event_callback, + tr_descriptors: (receive_descriptor_template, change_descriptor_template), }) } @@ -253,7 +270,13 @@ impl OnChainWallet { pub async fn sync(&self) -> Result<(), MutinyError> { // if we need a full sync from a restore if self.storage.get(NEED_FULL_SYNC_KEY)?.unwrap_or_default() { + let start = Instant::now(); self.full_sync(RESTORE_SYNC_STOP_GAP).await?; + log_info!( + self.logger, + "Full sync took {} seconds", + start.elapsed().as_secs() + ); self.storage.delete(&[NEED_FULL_SYNC_KEY])?; } // get first wallet lock that only needs to read @@ -835,6 +858,141 @@ impl OnChainWallet { log_debug!(self.logger, "Fee bump Transaction broadcast! TXID: {txid}"); Ok(txid) } + + pub fn new_wallet(&self) -> Result { + let wallet = Wallet::create_with_params( + CreateParams::new(self.tr_descriptors.0.clone(), self.tr_descriptors.1.clone()) + .network(self.network), + )?; + Ok(wallet) + } + + pub async fn try_compact_keychain(&self) -> Result { + let start = Instant::now(); + + let changes = self.storage.read_changes()?.unwrap_or_default(); + let total_size = serde_json::to_vec(&changes).unwrap_or_default().len(); + if total_size < KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES { + log_info!( + self.logger, + "Keychain size {} bytes is below threshold {} bytes, not compacting", + total_size, + KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES + ); + return Ok(false); + } + log_info!( + self.logger, + "Keychain size threshold exceeded {} Bytes, spawning simplified compaction task.", + KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES + ); + self.log_keychain_size(&changes, false); + + let mut new_wallet = self.new_wallet()?; + let update = full_scan(&new_wallet, RESTORE_SYNC_STOP_GAP, self.blockchain.clone()).await?; + + new_wallet + .apply_update_at(update, Some(now().as_secs())) + .map_err(|e| { + log_error!(self.logger, "Could not apply wallet update: {e}"); + MutinyError::Other(anyhow!("Could not apply update: {e}")) + })?; + let mut wallet = self.wallet.try_write()?; + let index = self.storage.activity_index(); + let mut index = index.try_write()?; + let new_changeset = new_wallet.take_staged().ok_or(MutinyError::Other(anyhow!( + "Failed to take staged changeset from new wallet" + )))?; + self.log_keychain_size(&new_changeset, true); + self.storage.restore_changes(&new_changeset)?; + *wallet = new_wallet; + drop(wallet); // drop so we can read from wallet + + // update the activity index, just get the list of transactions + // and insert them into the index + let index_items = self + .list_transactions(false)? + .into_iter() + .map(|t| IndexItem { + timestamp: match t.confirmation_time { + ConfirmationTime::Confirmed { time, .. } => Some(time), + ConfirmationTime::Unconfirmed { .. } => None, + }, + key: format!("{ONCHAIN_PREFIX}{}", t.internal_id), + }) + .collect::>(); + + // remove old-onchain txs + index.retain(|i| !i.key.starts_with(ONCHAIN_PREFIX)); + index.extend(index_items); + + log_info!(self.logger, "Keychain compaction completed successfully."); + log_info!( + self.logger, + "Keychain compaction took {} seconds", + start.elapsed().as_secs() + ); + + Ok(true) + } + + fn log_keychain_size(&self, keychain: &ChangeSet, is_post_compaction: bool) { + let total_size = serde_json::to_vec(keychain).unwrap_or_default().len(); + let local_chain_size = serde_json::to_vec(&keychain.local_chain) + .map(|v| v.len()) + .unwrap_or(0); + let tx_graph_size = serde_json::to_vec(&keychain.tx_graph) + .map(|v| v.len()) + .unwrap_or(0); + let indexer_size = serde_json::to_vec(&keychain.indexer) + .map(|v| v.len()) + .unwrap_or(0); + + let prefix = if is_post_compaction { + "POST-COMPACTION" + } else { + "PRE-COMPACTION" + }; + + log_debug!( + self.logger, + "{} size: {} bytes. Approx component sizes (bytes): LocalChain={}, TxGraph={}, Indexer={}", + prefix, + total_size, + local_chain_size, + tx_graph_size, + indexer_size + ); + } +} + +async fn full_scan( + wallet: &Wallet, + gap: usize, + blockchain: Arc, +) -> Result { + // get first wallet lock that only needs to read + let spks = wallet.all_unbounded_spk_iters(); + + let mut request_builder = FullScanRequestBuilder::default(); + for (kind, pks) in spks.into_iter() { + request_builder = request_builder.spks_for_keychain(kind, pks) + } + + let FullScanResult { + tx_update, + last_active_indices, + chain_update, + } = blockchain + .full_scan(request_builder, gap, PARALLEL_REQUESTS) + .await?; + let update = Update { + last_active_indices, + tx_update, + chain: chain_update, + }; + + Ok(update) } fn get_tr_descriptors_for_extended_key( diff --git a/mutiny-core/src/storage.rs b/mutiny-core/src/storage.rs index cf80f998f..6b3054f54 100644 --- a/mutiny-core/src/storage.rs +++ b/mutiny-core/src/storage.rs @@ -674,6 +674,25 @@ pub trait MutinyStorage: Clone + Sized + Send + Sync + 'static { } } + /// Restore changeset to the storage + fn restore_changes(&self, changeset: &ChangeSet) -> Result<(), MutinyError> { + let current_timestamp_secs = now().as_secs(); + let backup_key = format!("{}_backup_{}", KEYCHAIN_STORE_KEY, current_timestamp_secs); + let _ = match self.get_data::(KEYCHAIN_STORE_KEY) { + Ok(Some(versioned)) => self.write_data(backup_key, versioned, None), + Ok(None) => Ok(()), + Err(e) => { + log_error!(self.logger(), "Error writing backup: {:?}", e); + Err(e) + } + }; + + let version = current_timestamp_secs as u32; + let value = serde_json::to_value(changeset)?; + let value = VersionedValue { value, version }; + self.write_data(KEYCHAIN_STORE_KEY.to_string(), value, Some(version)) + } + /// Spawn background task to run db tasks fn spawn(&self, _fut: Fut); } @@ -1209,9 +1228,6 @@ pub(crate) fn list_payment_info( .collect()) } -#[derive(Clone)] -pub struct OnChainStorage(pub(crate) S); - pub(crate) fn get_payment_hash_from_key<'a>(key: &'a str, prefix: &str) -> &'a str { key.trim_start_matches(prefix) .splitn(2, '_') // To support the old format that had `_{node_id}` at the end diff --git a/mutiny-wasm/Cargo.toml b/mutiny-wasm/Cargo.toml index 94513c381..b301aabb9 100644 --- a/mutiny-wasm/Cargo.toml +++ b/mutiny-wasm/Cargo.toml @@ -2,7 +2,7 @@ cargo-features = ["per-package-target"] [package] name = "mutiny-wasm" -version = "1.15.0" +version = "1.16.0" edition = "2021" authors = ["utxostack"] forced-target = "wasm32-unknown-unknown"