Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@ extern crate log;

extern crate electrs;

use crossbeam_channel::{self as channel};
use error_chain::ChainedError;
use std::{env, process, thread};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use bitcoin::hex::DisplayHex;
use rand::{rng, RngCore};
use crossbeam_channel::{self as channel};
use electrs::{
config::Config,
daemon::Daemon,
Expand All @@ -21,6 +16,11 @@ use electrs::{
rest,
signal::Waiter,
};
use error_chain::ChainedError;
use rand::{rng, RngCore};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::{env, process, thread};

#[cfg(feature = "otlp-tracing")]
use electrs::otlp_trace;
Expand Down
5 changes: 4 additions & 1 deletion src/bin/popular-scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ extern crate electrs;

use bitcoin::hex::DisplayHex;
use electrs::{
config::Config, metrics::Metrics, new_index::{Store, TxHistoryKey}, util::bincode
config::Config,
metrics::Metrics,
new_index::{Store, TxHistoryKey},
util::bincode,
};

fn main() {
Expand Down
65 changes: 36 additions & 29 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,28 +523,32 @@ impl Daemon {
// buffering the replies into a vector. If any of the requests fail, processing is terminated and an Err is returned.
#[trace]
fn requests(&self, method: &str, params_list: Vec<Value>) -> Result<Vec<Value>> {
self.requests_iter(method, params_list).collect()
self.rpc_threads
.install(|| self.requests_iter(method, params_list).collect())
}

// Send requests in parallel over multiple RPC connections, iterating over the results without buffering them.
// Errors are included in the iterator and do not terminate other pending requests.
//
// IMPORTANT: The returned parallel iterator must be collected inside self.rpc_threads.install()
// to ensure it runs on the daemon's own thread pool, not the global rayon pool. This is necessary
// because the per-thread DAEMON_INSTANCE thread-locals would otherwise be shared across different
// daemon instances in the same process (e.g. during parallel tests).
#[trace]
fn requests_iter<'a>(
&'a self,
method: &'a str,
params_list: Vec<Value>,
) -> impl ParallelIterator<Item = Result<Value>> + IndexedParallelIterator + 'a {
self.rpc_threads.install(move || {
params_list.into_par_iter().map(move |params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will
// get initialized as necessary for the `rpc_threads` pool thread managed by rayon.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
daemon
.get_or_init(|| self.retry_reconnect())
.retry_request(&method, &params)
})
params_list.into_par_iter().map(move |params| {
// Store a local per-thread Daemon, each with its own TCP connection. These will
// get initialized as necessary for the `rpc_threads` pool thread managed by rayon.
thread_local!(static DAEMON_INSTANCE: OnceCell<Daemon> = OnceCell::new());

DAEMON_INSTANCE.with(|daemon| {
daemon
.get_or_init(|| self.retry_reconnect())
.retry_request(&method, &params)
})
})
}
Expand Down Expand Up @@ -647,20 +651,22 @@ impl Daemon {
.map(|txhash| json!([txhash, /*verbose=*/ false]))
.collect();

self.requests_iter("getrawtransaction", params_list)
.zip(txids)
.filter_map(|(res, txid)| match res {
Ok(val) => Some(tx_from_value(val).map(|tx| (**txid, tx))),
// Ignore 'tx not found' errors
Err(Error(ErrorKind::RpcError(code, _, _), _))
if code == RPC_INVALID_ADDRESS_OR_KEY =>
{
None
}
// Terminate iteration if any other errors are encountered
Err(e) => Some(Err(e)),
})
.collect()
self.rpc_threads.install(|| {
self.requests_iter("getrawtransaction", params_list)
.zip(txids)
.filter_map(|(res, txid)| match res {
Ok(val) => Some(tx_from_value(val).map(|tx| (**txid, tx))),
// Ignore 'tx not found' errors
Err(Error(ErrorKind::RpcError(code, _, _), _))
if code == RPC_INVALID_ADDRESS_OR_KEY =>
{
None
}
// Terminate iteration if any other errors are encountered
Err(e) => Some(Err(e)),
})
.collect()
})
}

#[trace]
Expand Down Expand Up @@ -773,11 +779,12 @@ impl Daemon {

result.append(&mut headers);

info!("downloaded {}/{} block headers ({:.0}%)",
info!(
"downloaded {}/{} block headers ({:.0}%)",
result.len(),
tip_height,
result.len() as f32 / tip_height as f32 * 100.0);

result.len() as f32 / tip_height as f32 * 100.0
);
}

let mut blockhash = *DEFAULT_BLOCKHASH;
Expand Down
10 changes: 5 additions & 5 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ use serde_json::{from_str, Value};

use electrs_macros::trace;

#[cfg(not(feature = "liquid"))]
use bitcoin::consensus::encode::serialize_hex;
#[cfg(feature = "liquid")]
use elements::encode::serialize_hex;
use crate::chain::Txid;
use crate::config::{Config, RpcLogging};
use crate::electrum::{get_electrum_height, ProtocolVersion};
Expand All @@ -27,6 +23,10 @@ use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use crate::new_index::{Query, Utxo};
use crate::util::electrum_merkle::{get_header_merkle_proof, get_id_from_pos, get_tx_merkle_proof};
use crate::util::{create_socket, spawn_thread, BlockId, BoolThen, Channel, FullHash, HeaderEntry};
#[cfg(not(feature = "liquid"))]
use bitcoin::consensus::encode::serialize_hex;
#[cfg(feature = "liquid")]
use elements::encode::serialize_hex;

const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::new(1, 4);
Expand Down Expand Up @@ -799,7 +799,7 @@ impl RPC {
config: Arc<Config>,
query: Arc<Query>,
metrics: &Metrics,
salt_rwlock: Arc<RwLock<String>>
salt_rwlock: Arc<RwLock<String>>,
) -> RPC {
let stats = Arc::new(Stats {
latency: metrics.histogram_vec(
Expand Down
135 changes: 106 additions & 29 deletions src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ impl DB {
db_opts.set_target_file_size_base(1_073_741_824);
db_opts.set_disable_auto_compactions(!config.initial_sync_compaction); // for initial bulk load


let parallelism: i32 = config.db_parallelism.try_into()
let parallelism: i32 = config
.db_parallelism
.try_into()
.expect("db_parallelism value too large for i32");

// Configure parallelism (background jobs and thread pools)
Expand All @@ -117,7 +118,7 @@ impl DB {
db_opts.set_block_based_table_factory(&block_opts);

let db = DB {
db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB"))
db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB")),
};
if verify_compat {
db.verify_compatibility(config);
Expand Down Expand Up @@ -242,7 +243,8 @@ impl DB {
}

fn verify_compatibility(&self, config: &Config) {
let compatibility_bytes = bincode::serialize_little(&(DB_VERSION, config.light_mode)).unwrap();
let compatibility_bytes =
bincode::serialize_little(&(DB_VERSION, config.light_mode)).unwrap();

match self.get(b"V") {
None => self.put(b"V", &compatibility_bytes),
Expand All @@ -266,39 +268,114 @@ impl DB {
};

spawn_thread("db_stats_exporter", move || loop {
update_gauge(&db_metrics.num_immutable_mem_table, "rocksdb.num-immutable-mem-table");
update_gauge(&db_metrics.mem_table_flush_pending, "rocksdb.mem-table-flush-pending");
update_gauge(
&db_metrics.num_immutable_mem_table,
"rocksdb.num-immutable-mem-table",
);
update_gauge(
&db_metrics.mem_table_flush_pending,
"rocksdb.mem-table-flush-pending",
);
update_gauge(&db_metrics.compaction_pending, "rocksdb.compaction-pending");
update_gauge(&db_metrics.background_errors, "rocksdb.background-errors");
update_gauge(&db_metrics.cur_size_active_mem_table, "rocksdb.cur-size-active-mem-table");
update_gauge(&db_metrics.cur_size_all_mem_tables, "rocksdb.cur-size-all-mem-tables");
update_gauge(&db_metrics.size_all_mem_tables, "rocksdb.size-all-mem-tables");
update_gauge(&db_metrics.num_entries_active_mem_table, "rocksdb.num-entries-active-mem-table");
update_gauge(&db_metrics.num_entries_imm_mem_tables, "rocksdb.num-entries-imm-mem-tables");
update_gauge(&db_metrics.num_deletes_active_mem_table, "rocksdb.num-deletes-active-mem-table");
update_gauge(&db_metrics.num_deletes_imm_mem_tables, "rocksdb.num-deletes-imm-mem-tables");
update_gauge(
&db_metrics.cur_size_active_mem_table,
"rocksdb.cur-size-active-mem-table",
);
update_gauge(
&db_metrics.cur_size_all_mem_tables,
"rocksdb.cur-size-all-mem-tables",
);
update_gauge(
&db_metrics.size_all_mem_tables,
"rocksdb.size-all-mem-tables",
);
update_gauge(
&db_metrics.num_entries_active_mem_table,
"rocksdb.num-entries-active-mem-table",
);
update_gauge(
&db_metrics.num_entries_imm_mem_tables,
"rocksdb.num-entries-imm-mem-tables",
);
update_gauge(
&db_metrics.num_deletes_active_mem_table,
"rocksdb.num-deletes-active-mem-table",
);
update_gauge(
&db_metrics.num_deletes_imm_mem_tables,
"rocksdb.num-deletes-imm-mem-tables",
);
update_gauge(&db_metrics.estimate_num_keys, "rocksdb.estimate-num-keys");
update_gauge(&db_metrics.estimate_table_readers_mem, "rocksdb.estimate-table-readers-mem");
update_gauge(&db_metrics.is_file_deletions_enabled, "rocksdb.is-file-deletions-enabled");
update_gauge(
&db_metrics.estimate_table_readers_mem,
"rocksdb.estimate-table-readers-mem",
);
update_gauge(
&db_metrics.is_file_deletions_enabled,
"rocksdb.is-file-deletions-enabled",
);
update_gauge(&db_metrics.num_snapshots, "rocksdb.num-snapshots");
update_gauge(&db_metrics.oldest_snapshot_time, "rocksdb.oldest-snapshot-time");
update_gauge(
&db_metrics.oldest_snapshot_time,
"rocksdb.oldest-snapshot-time",
);
update_gauge(&db_metrics.num_live_versions, "rocksdb.num-live-versions");
update_gauge(&db_metrics.current_super_version_number, "rocksdb.current-super-version-number");
update_gauge(&db_metrics.estimate_live_data_size, "rocksdb.estimate-live-data-size");
update_gauge(&db_metrics.min_log_number_to_keep, "rocksdb.min-log-number-to-keep");
update_gauge(&db_metrics.min_obsolete_sst_number_to_keep, "rocksdb.min-obsolete-sst-number-to-keep");
update_gauge(&db_metrics.total_sst_files_size, "rocksdb.total-sst-files-size");
update_gauge(&db_metrics.live_sst_files_size, "rocksdb.live-sst-files-size");
update_gauge(
&db_metrics.current_super_version_number,
"rocksdb.current-super-version-number",
);
update_gauge(
&db_metrics.estimate_live_data_size,
"rocksdb.estimate-live-data-size",
);
update_gauge(
&db_metrics.min_log_number_to_keep,
"rocksdb.min-log-number-to-keep",
);
update_gauge(
&db_metrics.min_obsolete_sst_number_to_keep,
"rocksdb.min-obsolete-sst-number-to-keep",
);
update_gauge(
&db_metrics.total_sst_files_size,
"rocksdb.total-sst-files-size",
);
update_gauge(
&db_metrics.live_sst_files_size,
"rocksdb.live-sst-files-size",
);
update_gauge(&db_metrics.base_level, "rocksdb.base-level");
update_gauge(&db_metrics.estimate_pending_compaction_bytes, "rocksdb.estimate-pending-compaction-bytes");
update_gauge(&db_metrics.num_running_compactions, "rocksdb.num-running-compactions");
update_gauge(&db_metrics.num_running_flushes, "rocksdb.num-running-flushes");
update_gauge(&db_metrics.actual_delayed_write_rate, "rocksdb.actual-delayed-write-rate");
update_gauge(
&db_metrics.estimate_pending_compaction_bytes,
"rocksdb.estimate-pending-compaction-bytes",
);
update_gauge(
&db_metrics.num_running_compactions,
"rocksdb.num-running-compactions",
);
update_gauge(
&db_metrics.num_running_flushes,
"rocksdb.num-running-flushes",
);
update_gauge(
&db_metrics.actual_delayed_write_rate,
"rocksdb.actual-delayed-write-rate",
);
update_gauge(&db_metrics.is_write_stopped, "rocksdb.is-write-stopped");
update_gauge(&db_metrics.estimate_oldest_key_time, "rocksdb.estimate-oldest-key-time");
update_gauge(&db_metrics.block_cache_capacity, "rocksdb.block-cache-capacity");
update_gauge(
&db_metrics.estimate_oldest_key_time,
"rocksdb.estimate-oldest-key-time",
);
update_gauge(
&db_metrics.block_cache_capacity,
"rocksdb.block-cache-capacity",
);
update_gauge(&db_metrics.block_cache_usage, "rocksdb.block-cache-usage");
update_gauge(&db_metrics.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage");
update_gauge(
&db_metrics.block_cache_pinned_usage,
"rocksdb.block-cache-pinned-usage",
);
thread::sleep(Duration::from_secs(5));
});
}
Expand Down
11 changes: 7 additions & 4 deletions src/new_index/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ fn bitcoind_fetcher(
let total_blocks_fetched = new_headers.len();
for entries in new_headers.chunks(100) {
if fetcher_count % 50 == 0 && total_blocks_fetched >= 50 {
info!("fetching blocks {}/{} ({:.1}%)",
info!(
"fetching blocks {}/{} ({:.1}%)",
blocks_fetched,
total_blocks_fetched,
blocks_fetched as f32 / total_blocks_fetched as f32 * 100.0
Expand Down Expand Up @@ -148,10 +149,11 @@ fn blkfiles_fetcher(
.into_iter()
.filter_map(|(block, size)| {
index += 1;
debug!("fetch block {:}/{:} {:.2}%",
debug!(
"fetch block {:}/{:} {:.2}%",
index,
block_count,
(index/block_count) as f32/100.0
(index / block_count) as f32 / 100.0
);
let blockhash = block.block_hash();
entry_map
Expand Down Expand Up @@ -188,7 +190,8 @@ fn blkfiles_reader(blk_files: Vec<PathBuf>, xor_key: Option<[u8; 8]>) -> Fetcher
spawn_thread("blkfiles_reader", move || {
let blk_files_len = blk_files.len();
for (count, path) in blk_files.iter().enumerate() {
info!("block file reading {:}/{:} {:.2}%",
info!(
"block file reading {:}/{:} {:.2}%",
count,
blk_files_len,
count / blk_files_len
Expand Down
Loading
Loading