Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 24 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ cli-table = "0.5.0"
bdk_bitcoind_rpc = { version = "0.21.0", features = ["std"], optional = true }
bdk_electrum = { version = "0.23.0", optional = true }
bdk_esplora = { version = "0.22.1", features = ["async-https", "tokio"], optional = true }
bdk_kyoto = { version = "0.13.1", optional = true }
bdk_kyoto = { version = "0.15.1", optional = true }
bdk_redb = { version = "0.1.0", optional = true }
shlex = { version = "1.3.0", optional = true }
tracing = "0.1.41"
Expand Down
4 changes: 0 additions & 4 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ pub struct CompactFilterOpts {
/// Sets the number of parallel node connections.
#[clap(name = "CONNECTIONS", long = "cbf-conn-count", default_value = "2", value_parser = value_parser!(u8).range(1..=15))]
pub conn_count: u8,

/// Optionally skip initial `skip_blocks` blocks.
#[clap(env = "SKIP_BLOCKS", short = 'k', long = "cbf-skip-blocks")]
pub skip_blocks: Option<u32>,
}

/// Wallet subcommands that can be issued without a blockchain backend.
Expand Down
31 changes: 6 additions & 25 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use std::sync::Arc;
#[cfg(feature = "electrum")]
use crate::utils::BlockchainClient::Electrum;
#[cfg(feature = "cbf")]
use bdk_kyoto::{Info, LightClient};
use bdk_kyoto::LightClient;
#[cfg(feature = "compiler")]
use bdk_wallet::bitcoin::XOnlyPublicKey;
use bdk_wallet::bitcoin::base64::prelude::*;
Expand Down Expand Up @@ -809,7 +809,6 @@ pub(crate) async fn handle_online_wallet_subcommand(
KyotoClient { client } => {
let LightClient {
requester,
mut log_subscriber,
mut info_subscriber,
mut warning_subscriber,
update_subscriber: _,
Expand All @@ -823,9 +822,9 @@ pub(crate) async fn handle_online_wallet_subcommand(
tokio::task::spawn(async move { node.run().await });
tokio::task::spawn(async move {
select! {
log = log_subscriber.recv() => {
if let Some(log) = log {
tracing::info!("{log}");
info = info_subscriber.recv() => {
if let Some(info) = info {
tracing::info!("{info}");
}
},
warn = warning_subscriber.recv() => {
Expand All @@ -836,29 +835,11 @@ pub(crate) async fn handle_online_wallet_subcommand(
}
});
let txid = tx.compute_txid();
requester
.broadcast_random(tx.clone())
.map_err(|e| Error::Generic(format!("{e}")))?;
tokio::time::timeout(tokio::time::Duration::from_secs(30), async move {
while let Some(info) = info_subscriber.recv().await {
match info {
Info::TxGossiped(wtxid) => {
tracing::info!("Successfully broadcast WTXID: {wtxid}");
break;
}
Info::ConnectionsMet => {
tracing::info!("Rebroadcasting to new connections");
requester.broadcast_random(tx.clone()).unwrap();
}
_ => tracing::info!("{info}"),
}
}
})
.await
.map_err(|_| {
let wtxid = requester.broadcast_random(tx.clone()).await.map_err(|_| {
tracing::warn!("Broadcast was unsuccessful");
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
})?;
tracing::info!("Successfully broadcast WTXID: {wtxid}");
txid
}
};
Expand Down
25 changes: 5 additions & 20 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ use std::path::{Path, PathBuf};
use crate::commands::WalletOpts;
#[cfg(feature = "cbf")]
use bdk_kyoto::{
Info, LightClient, NodeBuilderExt, Receiver,
ScanType::{Recovery, Sync},
UnboundedReceiver, Warning,
builder::NodeBuilder,
BuilderExt, Info, LightClient, Receiver, ScanType::Sync, UnboundedReceiver, Warning,
builder::Builder,
};
use bdk_wallet::bitcoin::{Address, Network, OutPoint, ScriptBuf};

Expand Down Expand Up @@ -195,12 +193,8 @@ pub(crate) fn new_blockchain_client(

#[cfg(feature = "cbf")]
ClientType::Cbf => {
let scan_type = match wallet_opts.compactfilter_opts.skip_blocks {
Some(from_height) => Recovery { from_height },
None => Sync,
};

let builder = NodeBuilder::new(_wallet.network());
let scan_type = Sync;
let builder = Builder::new(_wallet.network());

let client = builder
.required_peers(wallet_opts.compactfilter_opts.conn_count)
Expand Down Expand Up @@ -299,17 +293,11 @@ pub(crate) fn new_wallet(network: Network, wallet_opts: &WalletOpts) -> Result<W

#[cfg(feature = "cbf")]
pub async fn trace_logger(
mut log_subscriber: Receiver<String>,
mut info_subcriber: Receiver<Info>,
mut warning_subscriber: UnboundedReceiver<Warning>,
) {
loop {
tokio::select! {
log = log_subscriber.recv() => {
if let Some(log) = log {
tracing::info!("{log}")
}
}
info = info_subcriber.recv() => {
if let Some(info) = info {
tracing::info!("{info}")
Expand All @@ -329,7 +317,6 @@ pub async fn trace_logger(
pub async fn sync_kyoto_client(wallet: &mut Wallet, client: Box<LightClient>) -> Result<(), Error> {
let LightClient {
requester,
log_subscriber,
info_subscriber,
warning_subscriber,
mut update_subscriber,
Expand All @@ -341,9 +328,7 @@ pub async fn sync_kyoto_client(wallet: &mut Wallet, client: Box<LightClient>) ->
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?;

tokio::task::spawn(async move { node.run().await });
tokio::task::spawn(async move {
trace_logger(log_subscriber, info_subscriber, warning_subscriber).await
});
tokio::task::spawn(async move { trace_logger(info_subscriber, warning_subscriber).await });

if !requester.is_running() {
tracing::error!("Kyoto node is not running");
Expand Down
Loading