diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 134c17ab8..59f957ae5 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -68,7 +68,7 @@ fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<( signal.clone(), &metrics, )?); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config)); + let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); let mut indexer = Indexer::open( Arc::clone(&store), fetch_from(&config, &store), diff --git a/src/bin/popular-scripts.rs b/src/bin/popular-scripts.rs index ef550fce5..a7b245817 100644 --- a/src/bin/popular-scripts.rs +++ b/src/bin/popular-scripts.rs @@ -2,14 +2,13 @@ extern crate electrs; use bitcoin::hex::DisplayHex; use electrs::{ - config::Config, - new_index::{Store, TxHistoryKey}, - util::bincode, + config::Config, metrics::Metrics, new_index::{Store, TxHistoryKey}, util::bincode }; fn main() { let config = Config::from_args(); - let store = Store::open(&config.db_path.join("newindex"), &config); + let metrics = Metrics::new(config.monitoring_addr); + let store = Store::open(&config.db_path.join("newindex"), &config, &metrics); let mut iter = store.history_db().raw_iterator(); iter.seek(b"H"); diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 9073faec8..83b3f213a 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -23,7 +23,8 @@ fn main() { let signal = Waiter::start(crossbeam_channel::never()); let config = Config::from_args(); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config)); + let metrics = Metrics::new(config.monitoring_addr); + let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); diff --git a/src/config.rs b/src/config.rs index a0abdda08..d23128e91 100644 --- a/src/config.rs +++ b/src/config.rs @@ -49,6 +49,24 @@ pub struct Config { /// however, this requires much more disk space. pub initial_sync_compaction: bool, + /// RocksDB block cache size in MB (per database) + /// Caches decompressed blocks in memory to avoid repeated decompression (CPU intensive) + /// Total memory usage = cache_size * 3_databases (txstore, history, cache) + /// Recommendation: Start with 1024MB for production + /// Higher values reduce CPU load from cache misses but use more RAM + pub db_block_cache_mb: usize, + + /// RocksDB parallelism level (background compaction and flush threads) + /// Recommendation: Set to number of CPU cores for optimal performance + /// This configures max_background_jobs and thread pools automatically + pub db_parallelism: usize, + + /// RocksDB write buffer size in MB (per database) + /// Each database uses this much RAM for in-memory writes before flushing to disk + /// Total RAM usage = write_buffer_size * max_write_buffer_number * 3_databases + /// Larger buffers = fewer flushes (less CPU) but more RAM usage + pub db_write_buffer_size_mb: usize, + #[cfg(feature = "liquid")] pub parent_network: BNetwork, #[cfg(feature = "liquid")] @@ -216,6 +234,24 @@ impl Config { .long("initial-sync-compaction") .help("Perform compaction during initial sync (slower but less disk space required)") ).arg( + Arg::with_name("db_block_cache_mb") + .long("db-block-cache-mb") + .help("RocksDB block cache size in MB per database") + .takes_value(true) + .default_value("8") + ).arg( + Arg::with_name("db_parallelism") + .long("db-parallelism") + .help("RocksDB parallelism level. Set to number of CPU cores for optimal performance") + .takes_value(true) + .default_value("2") + ).arg( + Arg::with_name("db_write_buffer_size_mb") + .long("db-write-buffer-size-mb") + .help("RocksDB write buffer size in MB per database. RAM usage = size * max_write_buffers(2) * 3_databases") + .takes_value(true) + .default_value("256") + ).arg( Arg::with_name("zmq_addr") .long("zmq-addr") .help("Optional zmq socket address of the bitcoind daemon") @@ -452,6 +488,9 @@ impl Config { cors: m.value_of("cors").map(|s| s.to_string()), precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()), initial_sync_compaction: m.is_present("initial_sync_compaction"), + db_block_cache_mb: value_t_or_exit!(m, "db_block_cache_mb", usize), + db_parallelism: value_t_or_exit!(m, "db_parallelism", usize), + db_write_buffer_size_mb: value_t_or_exit!(m, "db_write_buffer_size_mb", usize), zmq_addr, #[cfg(feature = "liquid")] diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 20db12f4c..e889aad63 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -1,9 +1,15 @@ +use prometheus::GaugeVec; use rocksdb; +use std::convert::TryInto; use std::path::Path; +use std::sync::Arc; +use std::thread; +use std::time::Duration; use crate::config::Config; -use crate::util::{bincode, Bytes}; +use crate::new_index::db_metrics::RocksDbMetrics; +use crate::util::{bincode, spawn_thread, Bytes}; static DB_VERSION: u32 = 1; @@ -71,7 +77,7 @@ impl<'a> Iterator for ReverseScanIterator<'a> { #[derive(Debug)] pub struct DB { - db: rocksdb::DB, + db: Arc, } #[derive(Copy, Clone, Debug)] @@ -89,18 +95,29 @@ impl DB { db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); db_opts.set_target_file_size_base(1_073_741_824); - db_opts.set_write_buffer_size(256 << 20); db_opts.set_disable_auto_compactions(!config.initial_sync_compaction); // for initial bulk load + + let parallelism: i32 = config.db_parallelism.try_into() + .expect("db_parallelism value too large for i32"); + + // Configure parallelism (background jobs and thread pools) + db_opts.increase_parallelism(parallelism); + + // Configure write buffer size (not set by increase_parallelism) + db_opts.set_write_buffer_size(config.db_write_buffer_size_mb * 1024 * 1024); + // db_opts.set_advise_random_on_open(???); db_opts.set_compaction_readahead_size(1 << 20); - db_opts.increase_parallelism(2); - // let mut block_opts = rocksdb::BlockBasedOptions::default(); - // block_opts.set_block_size(???); + // Configure block cache + let mut block_opts = rocksdb::BlockBasedOptions::default(); + let cache_size_bytes = config.db_block_cache_mb * 1024 * 1024; + block_opts.set_block_cache(&rocksdb::Cache::new_lru_cache(cache_size_bytes)); + db_opts.set_block_based_table_factory(&block_opts); let db = DB { - db: rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB"), + db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB")) }; db.verify_compatibility(config); db @@ -220,4 +237,54 @@ impl DB { Some(_) => (), } } + + pub fn start_stats_exporter(&self, db_metrics: Arc, db_name: &str) { + let db_arc = Arc::clone(&self.db); + let label = db_name.to_string(); + + let update_gauge = move |gauge: &GaugeVec, property: &str| { + if let Ok(Some(value)) = db_arc.property_value(property) { + if let Ok(v) = value.parse::() { + gauge.with_label_values(&[&label]).set(v); + } + } + }; + + spawn_thread("db_stats_exporter", move || loop { + update_gauge(&db_metrics.num_immutable_mem_table, "rocksdb.num-immutable-mem-table"); + update_gauge(&db_metrics.mem_table_flush_pending, "rocksdb.mem-table-flush-pending"); + update_gauge(&db_metrics.compaction_pending, "rocksdb.compaction-pending"); + update_gauge(&db_metrics.background_errors, "rocksdb.background-errors"); + update_gauge(&db_metrics.cur_size_active_mem_table, "rocksdb.cur-size-active-mem-table"); + update_gauge(&db_metrics.cur_size_all_mem_tables, "rocksdb.cur-size-all-mem-tables"); + update_gauge(&db_metrics.size_all_mem_tables, "rocksdb.size-all-mem-tables"); + update_gauge(&db_metrics.num_entries_active_mem_table, "rocksdb.num-entries-active-mem-table"); + update_gauge(&db_metrics.num_entries_imm_mem_tables, "rocksdb.num-entries-imm-mem-tables"); + update_gauge(&db_metrics.num_deletes_active_mem_table, "rocksdb.num-deletes-active-mem-table"); + update_gauge(&db_metrics.num_deletes_imm_mem_tables, "rocksdb.num-deletes-imm-mem-tables"); + update_gauge(&db_metrics.estimate_num_keys, "rocksdb.estimate-num-keys"); + update_gauge(&db_metrics.estimate_table_readers_mem, "rocksdb.estimate-table-readers-mem"); + update_gauge(&db_metrics.is_file_deletions_enabled, "rocksdb.is-file-deletions-enabled"); + update_gauge(&db_metrics.num_snapshots, "rocksdb.num-snapshots"); + update_gauge(&db_metrics.oldest_snapshot_time, "rocksdb.oldest-snapshot-time"); + update_gauge(&db_metrics.num_live_versions, "rocksdb.num-live-versions"); + update_gauge(&db_metrics.current_super_version_number, "rocksdb.current-super-version-number"); + update_gauge(&db_metrics.estimate_live_data_size, "rocksdb.estimate-live-data-size"); + update_gauge(&db_metrics.min_log_number_to_keep, "rocksdb.min-log-number-to-keep"); + update_gauge(&db_metrics.min_obsolete_sst_number_to_keep, "rocksdb.min-obsolete-sst-number-to-keep"); + update_gauge(&db_metrics.total_sst_files_size, "rocksdb.total-sst-files-size"); + update_gauge(&db_metrics.live_sst_files_size, "rocksdb.live-sst-files-size"); + update_gauge(&db_metrics.base_level, "rocksdb.base-level"); + update_gauge(&db_metrics.estimate_pending_compaction_bytes, "rocksdb.estimate-pending-compaction-bytes"); + update_gauge(&db_metrics.num_running_compactions, "rocksdb.num-running-compactions"); + update_gauge(&db_metrics.num_running_flushes, "rocksdb.num-running-flushes"); + update_gauge(&db_metrics.actual_delayed_write_rate, "rocksdb.actual-delayed-write-rate"); + update_gauge(&db_metrics.is_write_stopped, "rocksdb.is-write-stopped"); + update_gauge(&db_metrics.estimate_oldest_key_time, "rocksdb.estimate-oldest-key-time"); + update_gauge(&db_metrics.block_cache_capacity, "rocksdb.block-cache-capacity"); + update_gauge(&db_metrics.block_cache_usage, "rocksdb.block-cache-usage"); + update_gauge(&db_metrics.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage"); + thread::sleep(Duration::from_secs(5)); + }); + } } diff --git a/src/new_index/db_metrics.rs b/src/new_index/db_metrics.rs new file mode 100644 index 000000000..e8df0db43 --- /dev/null +++ b/src/new_index/db_metrics.rs @@ -0,0 +1,233 @@ +use crate::metrics::{GaugeVec, MetricOpts, Metrics}; + +#[derive(Debug)] +pub struct RocksDbMetrics { + // Memory table metrics + pub num_immutable_mem_table: GaugeVec, + pub mem_table_flush_pending: GaugeVec, + pub cur_size_active_mem_table: GaugeVec, + pub cur_size_all_mem_tables: GaugeVec, + pub size_all_mem_tables: GaugeVec, + pub num_entries_active_mem_table: GaugeVec, + pub num_entries_imm_mem_tables: GaugeVec, + pub num_deletes_active_mem_table: GaugeVec, + pub num_deletes_imm_mem_tables: GaugeVec, + + // Compaction metrics + pub compaction_pending: GaugeVec, + pub estimate_pending_compaction_bytes: GaugeVec, + pub num_running_compactions: GaugeVec, + pub num_running_flushes: GaugeVec, + + // Error metrics + pub background_errors: GaugeVec, + + // Key and data size estimates + pub estimate_num_keys: GaugeVec, + pub estimate_live_data_size: GaugeVec, + pub estimate_oldest_key_time: GaugeVec, + + // Table reader memory + pub estimate_table_readers_mem: GaugeVec, + + // File and SST metrics + pub is_file_deletions_enabled: GaugeVec, + pub total_sst_files_size: GaugeVec, + pub live_sst_files_size: GaugeVec, + pub min_obsolete_sst_number_to_keep: GaugeVec, + + // Snapshot metrics + pub num_snapshots: GaugeVec, + pub oldest_snapshot_time: GaugeVec, + + // Version metrics + pub num_live_versions: GaugeVec, + pub current_super_version_number: GaugeVec, + + // Log metrics + pub min_log_number_to_keep: GaugeVec, + + // Level metrics + pub base_level: GaugeVec, + + // Write metrics + pub actual_delayed_write_rate: GaugeVec, + pub is_write_stopped: GaugeVec, + + // Block cache metrics + pub block_cache_capacity: GaugeVec, + pub block_cache_usage: GaugeVec, + pub block_cache_pinned_usage: GaugeVec, +} + +impl RocksDbMetrics { + pub fn new(metrics: &Metrics) -> Self { + let labels = &["db"]; + + Self { + // Memory table metrics + num_immutable_mem_table: metrics.gauge_vec(MetricOpts::new( + "rocksdb_num_immutable_mem_table", + "Number of immutable memtables that have not yet been flushed." + ), labels), + mem_table_flush_pending: metrics.gauge_vec(MetricOpts::new( + "rocksdb_mem_table_flush_pending", + "1 if a memtable flush is pending and 0 otherwise." + ), labels), + cur_size_active_mem_table: metrics.gauge_vec(MetricOpts::new( + "rocksdb_cur_size_active_mem_table_bytes", + "Approximate size of active memtable in bytes." + ), labels), + cur_size_all_mem_tables: metrics.gauge_vec(MetricOpts::new( + "rocksdb_cur_size_all_mem_tables_bytes", + "Approximate size of active and unflushed immutable memtables in bytes." + ), labels), + size_all_mem_tables: metrics.gauge_vec(MetricOpts::new( + "rocksdb_size_all_mem_tables_bytes", + "Approximate size of active, unflushed immutable, and pinned immutable memtables in bytes." + ), labels), + num_entries_active_mem_table: metrics.gauge_vec(MetricOpts::new( + "rocksdb_num_entries_active_mem_table", + "Total number of entries in the active memtable." + ), labels), + num_entries_imm_mem_tables: metrics.gauge_vec(MetricOpts::new( + "rocksdb_num_entries_imm_mem_tables", + "Total number of entries in the unflushed immutable memtables." + ), labels), + num_deletes_active_mem_table: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_deletes_active_mem_table"), + "Total number of delete entries in the active memtable." + ), labels), + num_deletes_imm_mem_tables: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_deletes_imm_mem_tables"), + "Total number of delete entries in the unflushed immutable memtables." + ), labels), + + // Compaction metrics + compaction_pending: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_compaction_pending"), + "1 if at least one compaction is pending; otherwise, 0." + ), labels), + + estimate_pending_compaction_bytes: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_pending_compaction_bytes"), + "Estimated total number of bytes compaction needs to rewrite." + ), labels), + + num_running_compactions: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_running_compactions"), + "Number of currently running compactions." + ), labels), + + num_running_flushes: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_running_flushes"), + "Number of currently running flushes." + ), labels), + + // Error metrics + background_errors: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_background_errors_total"), + "Accumulated number of background errors." + ), labels), + + // Key and data size estimates + estimate_num_keys: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_num_keys"), + "Estimated number of total keys in the active and unflushed immutable memtables and storage." + ), labels), + + estimate_live_data_size: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_live_data_size_bytes"), + "Estimated live data size in bytes." + ), labels), + + estimate_oldest_key_time: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_oldest_key_time_seconds"), + "Estimated oldest key timestamp." + ), labels), + + // Table reader memory + estimate_table_readers_mem: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_table_readers_mem_bytes"), + "Estimated memory used for reading SST tables, excluding memory used in block cache." + ), labels), + + // File and SST metrics + is_file_deletions_enabled: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_is_file_deletions_enabled"), + "0 if deletion of obsolete files is enabled; otherwise, non-zero." + ), labels), + + total_sst_files_size: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_total_sst_files_size_bytes"), + "Total size of all SST files in bytes." + ), labels), + + live_sst_files_size: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_live_sst_files_size_bytes"), + "Total size (bytes) of all SST files belonging to any of the CF's versions." + ), labels), + + min_obsolete_sst_number_to_keep: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_min_obsolete_sst_number_to_keep"), + "Minimum file number for an obsolete SST to be kept, or maximum uint64_t value if obsolete files can be deleted." + ), labels), + + // Snapshot metrics + num_snapshots: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_snapshots"), + "Number of unreleased snapshots of the database." + ), labels), + oldest_snapshot_time: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_oldest_snapshot_time_seconds"), + "Unix timestamp of oldest unreleased snapshot." + ), labels), + + // Version metrics + num_live_versions: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_live_versions"), + "Number of live versions." + ), labels), + current_super_version_number: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_current_super_version_number"), + "Number of current LSM version. Incremented after any change to LSM tree." + ), labels), + + // Log metrics + min_log_number_to_keep: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_min_log_number_to_keep"), + "Minimum log number to keep." + ), labels), + + // Level metrics + base_level: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_base_level"), + "Base level for compaction." + ), labels), + + // Write metrics + actual_delayed_write_rate: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_actual_delayed_write_rate"), + "The current actual delayed write rate. 0 means no delay." + ), labels), + is_write_stopped: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_is_write_stopped"), + "1 if write has been stopped." + ), labels), + + // Block cache metrics + block_cache_capacity: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_block_cache_capacity_bytes"), + "The block cache capacity in bytes." + ), labels), + block_cache_usage: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_block_cache_usage_bytes"), + "The memory size for the entries residing in block cache." + ), labels), + block_cache_pinned_usage: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_block_cache_pinned_usage_bytes"), + "The memory size for the entries being pinned." + ), labels), + } + } +} diff --git a/src/new_index/mod.rs b/src/new_index/mod.rs index 09730b104..f82291e55 100644 --- a/src/new_index/mod.rs +++ b/src/new_index/mod.rs @@ -1,4 +1,5 @@ pub mod db; +pub mod db_metrics; mod fetch; mod mempool; pub mod precache; diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 578999aff..ec7bc54d8 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -21,9 +21,9 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::path::Path; use std::sync::{Arc, RwLock}; -use crate::chain::{ +use crate::{chain::{ BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, -}; +}, new_index::db_metrics::RocksDbMetrics}; use crate::config::Config; use crate::daemon::Daemon; use crate::errors::*; @@ -58,7 +58,7 @@ pub struct Store { } impl Store { - pub fn open(path: &Path, config: &Config) -> Self { + pub fn open(path: &Path, config: &Config, metrics: &Metrics) -> Self { let txstore_db = DB::open(&path.join("txstore"), config); let added_blockhashes = load_blockhashes(&txstore_db, &BlockRow::done_filter()); debug!("{} blocks were added", added_blockhashes.len()); @@ -69,6 +69,11 @@ impl Store { let cache_db = DB::open(&path.join("cache"), config); + let db_metrics = Arc::new(RocksDbMetrics::new(&metrics)); + txstore_db.start_stats_exporter(Arc::clone(&db_metrics), "txstore_db"); + history_db.start_stats_exporter(Arc::clone(&db_metrics), "history_db"); + cache_db.start_stats_exporter(Arc::clone(&db_metrics), "cache_db"); + let headers = if let Some(tip_hash) = txstore_db.get(b"t") { let tip_hash = deserialize(&tip_hash).expect("invalid chain tip in `t`"); let headers_map = load_blockheaders(&txstore_db); diff --git a/tests/common.rs b/tests/common.rs index 2ec8b99c3..5fb995d2d 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -118,6 +118,9 @@ impl TestRunner { #[cfg(feature = "liquid")] parent_network: bitcoin::Network::Regtest, initial_sync_compaction: false, + db_block_cache_mb: 8, + db_parallelism: 2, + db_write_buffer_size_mb: 256, //#[cfg(feature = "electrum-discovery")] //electrum_public_hosts: Option, //#[cfg(feature = "electrum-discovery")] @@ -141,7 +144,7 @@ impl TestRunner { &metrics, )?); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config)); + let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); let fetch_from = if !env::var("JSONRPC_IMPORT").is_ok() && !cfg!(feature = "liquid") { // run the initial indexing from the blk files then switch to using the jsonrpc,