Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 15 additions & 4 deletions mutiny-core/src/nodemanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,12 +689,23 @@ impl<S: MutinyStorage> NodeManager<S> {
}
}

// check keychain size
let did_keychain_compact_this_round = match nm.wallet.try_compact_keychain().await {
Ok(did_keychain_compact_this_round) => did_keychain_compact_this_round,
Err(e) => {
log_error!(nm.logger, "Failed to compact keychain: {e}");
false
}
};

// wait for next sync round, checking graceful shutdown check each second.
for _ in 0..sync_interval_secs {
if nm.stop.load(Ordering::Relaxed) {
return;
if !did_keychain_compact_this_round {
for _ in 0..sync_interval_secs {
if nm.stop.load(Ordering::Relaxed) {
return;
}
sleep(1_000).await;
}
sleep(1_000).await;
}
}
});
Expand Down
168 changes: 163 additions & 5 deletions mutiny-core/src/onchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use bdk_wallet::bitcoin::FeeRate;
use bdk_wallet::psbt::PsbtUtils;
use bdk_wallet::template::DescriptorTemplateOut;
use bdk_wallet::{
CreateParams, KeychainKind, LoadParams, LocalOutput, SignOptions, Update, Wallet,
ChangeSet, CreateParams, KeychainKind, LoadParams, LocalOutput, SignOptions, Update, Wallet,
};
use bitcoin::bip32::{ChildNumber, DerivationPath, Xpriv};
use bitcoin::consensus::serialize;
Expand All @@ -39,9 +39,15 @@ use crate::utils;
use crate::utils::{now, sleep};
use crate::TransactionDetails;

#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;
#[cfg(target_arch = "wasm32")]
use web_time::Instant;

pub(crate) const FULL_SYNC_STOP_GAP: usize = 150;
pub(crate) const RESTORE_SYNC_STOP_GAP: usize = 50;
const PARALLEL_REQUESTS: usize = 10;
const KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES: usize = 256 * 1024; // 256KB

#[derive(Clone)]
pub struct OnChainWallet<S: MutinyStorage> {
Expand All @@ -53,6 +59,10 @@ pub struct OnChainWallet<S: MutinyStorage> {
pub(crate) stop: Arc<AtomicBool>,
logger: Arc<MutinyLogger>,
ln_event_callback: Option<CommonLnEventCallback>,
/// The Bitcoin output descriptors for the wallet’s keychains:
/// 0: receive_descriptor
/// 1: change_descriptor
tr_descriptors: (DescriptorTemplateOut, DescriptorTemplateOut),
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -112,17 +122,23 @@ impl<S: MutinyStorage> OnChainWallet<S> {
None | Some(Ok(None)) => {
// we don't have a bdk wallet, create one
Wallet::create_with_params(
CreateParams::new(receive_descriptor_template, change_descriptor_template)
.network(network),
CreateParams::new(
receive_descriptor_template.clone(),
change_descriptor_template.clone(),
)
.network(network),
)?
}
Some(Err(bdk_wallet::LoadError::Mismatch(_))) => {
// failed to read storage, means we have old encoding and need to delete and re-init wallet
db.delete(&[KEYCHAIN_STORE_KEY])?;
db.write_data(NEED_FULL_SYNC_KEY.to_string(), true, None)?;
Wallet::create_with_params(
CreateParams::new(receive_descriptor_template, change_descriptor_template)
.network(network),
CreateParams::new(
receive_descriptor_template.clone(),
change_descriptor_template.clone(),
)
.network(network),
)?
}
Some(Err(e)) => {
Expand All @@ -140,6 +156,7 @@ impl<S: MutinyStorage> OnChainWallet<S> {
stop,
logger,
ln_event_callback,
tr_descriptors: (receive_descriptor_template, change_descriptor_template),
})
}

Expand Down Expand Up @@ -253,7 +270,13 @@ impl<S: MutinyStorage> OnChainWallet<S> {
pub async fn sync(&self) -> Result<(), MutinyError> {
// if we need a full sync from a restore
if self.storage.get(NEED_FULL_SYNC_KEY)?.unwrap_or_default() {
let start = Instant::now();
self.full_sync(RESTORE_SYNC_STOP_GAP).await?;
log_info!(
self.logger,
"Full sync took {} seconds",
start.elapsed().as_secs()
);
self.storage.delete(&[NEED_FULL_SYNC_KEY])?;
}
// get first wallet lock that only needs to read
Expand Down Expand Up @@ -835,6 +858,141 @@ impl<S: MutinyStorage> OnChainWallet<S> {
log_debug!(self.logger, "Fee bump Transaction broadcast! TXID: {txid}");
Ok(txid)
}

pub fn new_wallet(&self) -> Result<Wallet, MutinyError> {
let wallet = Wallet::create_with_params(
CreateParams::new(self.tr_descriptors.0.clone(), self.tr_descriptors.1.clone())
.network(self.network),
)?;
Ok(wallet)
}

pub async fn try_compact_keychain(&self) -> Result<bool, MutinyError> {
let start = Instant::now();

let changes = self.storage.read_changes()?.unwrap_or_default();
let total_size = serde_json::to_vec(&changes).unwrap_or_default().len();
if total_size < KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES {
log_info!(
self.logger,
"Keychain size {} bytes is below threshold {} bytes, not compacting",
total_size,
KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES
);
return Ok(false);
}
log_info!(
self.logger,
"Keychain size threshold exceeded {} Bytes, spawning simplified compaction task.",
KEYCHAIN_COMPACTION_SIZE_THRESHOLD_BYTES
);
self.log_keychain_size(&changes, false);

let mut new_wallet = self.new_wallet()?;
let update = full_scan(&new_wallet, RESTORE_SYNC_STOP_GAP, self.blockchain.clone()).await?;

new_wallet
.apply_update_at(update, Some(now().as_secs()))
.map_err(|e| {
log_error!(self.logger, "Could not apply wallet update: {e}");
MutinyError::Other(anyhow!("Could not apply update: {e}"))
})?;
let mut wallet = self.wallet.try_write()?;
let index = self.storage.activity_index();
let mut index = index.try_write()?;
let new_changeset = new_wallet.take_staged().ok_or(MutinyError::Other(anyhow!(
"Failed to take staged changeset from new wallet"
)))?;
self.log_keychain_size(&new_changeset, true);
self.storage.restore_changes(&new_changeset)?;
*wallet = new_wallet;
drop(wallet); // drop so we can read from wallet

// update the activity index, just get the list of transactions
// and insert them into the index
let index_items = self
.list_transactions(false)?
.into_iter()
.map(|t| IndexItem {
timestamp: match t.confirmation_time {
ConfirmationTime::Confirmed { time, .. } => Some(time),
ConfirmationTime::Unconfirmed { .. } => None,
},
key: format!("{ONCHAIN_PREFIX}{}", t.internal_id),
})
.collect::<Vec<_>>();

// remove old-onchain txs
index.retain(|i| !i.key.starts_with(ONCHAIN_PREFIX));
index.extend(index_items);

log_info!(self.logger, "Keychain compaction completed successfully.");
log_info!(
self.logger,
"Keychain compaction took {} seconds",
start.elapsed().as_secs()
);

Ok(true)
}

fn log_keychain_size(&self, keychain: &ChangeSet, is_post_compaction: bool) {
let total_size = serde_json::to_vec(keychain).unwrap_or_default().len();
let local_chain_size = serde_json::to_vec(&keychain.local_chain)
.map(|v| v.len())
.unwrap_or(0);
let tx_graph_size = serde_json::to_vec(&keychain.tx_graph)
.map(|v| v.len())
.unwrap_or(0);
let indexer_size = serde_json::to_vec(&keychain.indexer)
.map(|v| v.len())
.unwrap_or(0);

let prefix = if is_post_compaction {
"POST-COMPACTION"
} else {
"PRE-COMPACTION"
};

log_debug!(
self.logger,
"{} size: {} bytes. Approx component sizes (bytes): LocalChain={}, TxGraph={}, Indexer={}",
prefix,
total_size,
local_chain_size,
tx_graph_size,
indexer_size
);
}
}

async fn full_scan(
wallet: &Wallet,
gap: usize,
blockchain: Arc<AsyncClient>,
) -> Result<Update, MutinyError> {
// get first wallet lock that only needs to read
let spks = wallet.all_unbounded_spk_iters();

let mut request_builder = FullScanRequestBuilder::default();
for (kind, pks) in spks.into_iter() {
request_builder = request_builder.spks_for_keychain(kind, pks)
}

let FullScanResult {
tx_update,
last_active_indices,
chain_update,
} = blockchain
.full_scan(request_builder, gap, PARALLEL_REQUESTS)
.await?;
let update = Update {
last_active_indices,
tx_update,
chain: chain_update,
};

Ok(update)
}

fn get_tr_descriptors_for_extended_key(
Expand Down
22 changes: 19 additions & 3 deletions mutiny-core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,25 @@ pub trait MutinyStorage: Clone + Sized + Send + Sync + 'static {
}
}

/// Restore changeset to the storage
fn restore_changes(&self, changeset: &ChangeSet) -> Result<(), MutinyError> {
let current_timestamp_secs = now().as_secs();
let backup_key = format!("{}_backup_{}", KEYCHAIN_STORE_KEY, current_timestamp_secs);
let _ = match self.get_data::<VersionedValue>(KEYCHAIN_STORE_KEY) {
Ok(Some(versioned)) => self.write_data(backup_key, versioned, None),
Ok(None) => Ok(()),
Err(e) => {
log_error!(self.logger(), "Error writing backup: {:?}", e);
Err(e)
}
};

let version = current_timestamp_secs as u32;
let value = serde_json::to_value(changeset)?;
let value = VersionedValue { value, version };
self.write_data(KEYCHAIN_STORE_KEY.to_string(), value, Some(version))
}

/// Spawn background task to run db tasks
fn spawn<Fut: Task>(&self, _fut: Fut);
}
Expand Down Expand Up @@ -1209,9 +1228,6 @@ pub(crate) fn list_payment_info<S: MutinyStorage>(
.collect())
}

#[derive(Clone)]
pub struct OnChainStorage<S: MutinyStorage>(pub(crate) S);

pub(crate) fn get_payment_hash_from_key<'a>(key: &'a str, prefix: &str) -> &'a str {
key.trim_start_matches(prefix)
.splitn(2, '_') // To support the old format that had `_{node_id}` at the end
Expand Down
2 changes: 1 addition & 1 deletion mutiny-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ cargo-features = ["per-package-target"]

[package]
name = "mutiny-wasm"
version = "1.15.0"
version = "1.16.0"
edition = "2021"
authors = ["utxostack"]
forced-target = "wasm32-unknown-unknown"
Expand Down
Loading