From c9ee4a0661b3abcd7a3b4676f6d8d2ff12540161 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 30 Oct 2025 14:15:55 -0700 Subject: [PATCH 1/8] Create phaser-metrics crate with trait-based composable metrics - Add SegmentWorkerMetrics base with common metrics - Add SegmentMetrics trait for composition pattern - Add BridgeMetrics with gRPC-specific metrics - Add QueryMetrics with sync-specific metrics - Types only implement base() to get all methods via trait - Add WorkerStage enum for worker state tracking - Add gather_metrics() wrapper to avoid leaking prometheus types - Add unit tests for metrics registration and duplication detection This provides composable metrics infrastructure without coupling phaser-bridge to prometheus, allowing bridge implementations to choose their observability approach. --- Cargo.lock | 11 +- Cargo.toml | 1 + crates/phaser-metrics/Cargo.toml | 8 + crates/phaser-metrics/src/lib.rs | 150 +++++++ crates/phaser-metrics/src/segment_metrics.rs | 431 +++++++++++++++++++ 5 files changed, 599 insertions(+), 2 deletions(-) create mode 100644 crates/phaser-metrics/Cargo.toml create mode 100644 crates/phaser-metrics/src/lib.rs create mode 100644 crates/phaser-metrics/src/segment_metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 4f8a399..95375fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4271,6 +4271,14 @@ dependencies = [ "typed-arrow", ] +[[package]] +name = "phaser-metrics" +version = "0.1.0" +dependencies = [ + "phaser-bridge", + "prometheus", +] + [[package]] name = "phaser-parquet-metadata" version = "0.1.0" @@ -4305,12 +4313,11 @@ dependencies = [ "evm-common", "futures", "jsonrpsee", - "lazy_static", "num_cpus", "parquet", "phaser-bridge", + "phaser-metrics", "phaser-parquet-metadata", - "prometheus", "prost", "prost-types", "rocksdb", diff --git a/Cargo.toml b/Cargo.toml index fb4971e..6f6780b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ resolver = "2" members = [ "crates/phaser-query", "crates/phaser-bridge", + "crates/phaser-metrics", "crates/bridges/evm/erigon-bridge", "crates/bridges/evm/jsonrpc-bridge", "crates/schemas/evm/common", diff --git a/crates/phaser-metrics/Cargo.toml b/crates/phaser-metrics/Cargo.toml new file mode 100644 index 0000000..cdcd516 --- /dev/null +++ b/crates/phaser-metrics/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "phaser-metrics" +version = "0.1.0" +edition = "2021" + +[dependencies] +phaser-bridge = { path = "../phaser-bridge" } +prometheus = "0.13" diff --git a/crates/phaser-metrics/src/lib.rs b/crates/phaser-metrics/src/lib.rs new file mode 100644 index 0000000..0b91330 --- /dev/null +++ b/crates/phaser-metrics/src/lib.rs @@ -0,0 +1,150 @@ +//! Metrics infrastructure for phaser bridges and query services +//! +//! Provides composable metrics using a trait-based pattern. Base metrics are defined +//! in SegmentWorkerMetrics, and specialized metrics (BridgeMetrics, QueryMetrics) add +//! service-specific metrics while inheriting common functionality via the SegmentMetrics trait. + +mod segment_metrics; + +pub use segment_metrics::{BridgeMetrics, SegmentMetrics, SegmentWorkerMetrics, WorkerStage}; + +use prometheus::{register_int_gauge_vec, IntGaugeVec}; +use std::sync::Arc; + +/// Query service metrics for sync operations. +/// Composes SegmentWorkerMetrics and adds query-specific metrics. +#[derive(Clone)] +pub struct QueryMetrics { + pub base: Arc, + sync_queue_depth: IntGaugeVec, + segment_total_duration_seconds: IntGaugeVec, +} + +impl SegmentMetrics for QueryMetrics { + fn base(&self) -> &SegmentWorkerMetrics { + &self.base + } +} + +impl QueryMetrics { + pub fn new( + service_name: impl Into, + chain_id: u64, + bridge_name: impl Into, + ) -> Self { + let service_name_str = service_name.into(); + let base = + SegmentWorkerMetrics::new(service_name_str.clone(), chain_id, bridge_name.into()); + + Self { + base: Arc::new(base), + sync_queue_depth: register_int_gauge_vec!( + format!("{}_sync_queue_depth", service_name_str), + "Number of segments in each queue", + &["chain_id", "bridge_name", "queue"] // queue: "pending", "retrying" + ) + .unwrap(), + segment_total_duration_seconds: register_int_gauge_vec!( + format!("{}_segment_total_duration_seconds", service_name_str), + "Total seconds spent on this specific segment across all retry attempts", + &["chain_id", "bridge_name", "segment_num"] + ) + .unwrap(), + } + } + + /// Set sync queue depth + pub fn sync_queue_depth(&self, queue: &str, depth: i64) { + self.sync_queue_depth + .with_label_values(&[&self.base.chain_id, &self.base.bridge_name, queue]) + .set(depth); + } + + /// Convenience wrapper: record segment attempts (maps to base.segment_attempt) + pub fn segment_attempts(&self, result: &str) { + self.segment_attempt(result == "success"); + } + + /// Convenience wrapper: record sync errors (maps to base.error) + pub fn sync_errors(&self, error_type: &str, data_type: &str) { + self.error(error_type, data_type); + } + + /// Set segment sync retries for current sync run + /// Note: This delegates to segment_retry_count for compatibility + pub fn segment_sync_retries(&self, segment_num: u64, retries: i64) { + self.segment_retry_count(segment_num, retries); + } + + /// Set total duration for this segment in seconds + pub fn segment_total_duration(&self, segment_num: u64, seconds: i64) { + self.segment_total_duration_seconds + .with_label_values(&[ + &self.base.chain_id, + &self.base.bridge_name, + &segment_num.to_string(), + ]) + .set(seconds); + } +} + +/// Gather all registered metrics in Prometheus text format +pub fn gather_metrics() -> Result { + use prometheus::{Encoder, TextEncoder}; + + let encoder = TextEncoder::new(); + let metric_families = prometheus::gather(); + let mut buffer = Vec::new(); + + encoder + .encode(&metric_families, &mut buffer) + .map_err(|e| format!("Failed to encode metrics: {}", e))?; + + String::from_utf8(buffer).map_err(|e| format!("Failed to convert metrics to UTF-8: {}", e)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_query_metrics_registration() { + // First registration should succeed + let metrics = QueryMetrics::new("test_query", 1, "test_bridge"); + + // Verify the metrics struct was created + assert_eq!(metrics.base.service_name, "test_query"); + assert_eq!(metrics.base.chain_id, "1"); + assert_eq!(metrics.base.bridge_name, "test_bridge"); + } + + #[test] + #[should_panic(expected = "AlreadyReg")] + fn test_duplicate_query_metrics_registration_panics() { + // Register metrics with the same service name twice - should panic + let _metrics1 = QueryMetrics::new("duplicate_query", 1, "bridge1"); + let _metrics2 = QueryMetrics::new("duplicate_query", 2, "bridge2"); + } + + #[test] + fn test_query_metrics_with_different_names_coexist() { + // Different service names should be able to coexist + let _metrics1 = QueryMetrics::new("query_a", 1, "bridge1"); + let _metrics2 = QueryMetrics::new("query_b", 1, "bridge2"); + + // If we got here without panic, test passes + } + + #[test] + fn test_segment_metrics_trait_delegation() { + // Test that the trait delegation works correctly + let metrics = QueryMetrics::new("trait_test", 1, "test_bridge"); + + // These should compile and not panic - they're delegating to base() + metrics.segment_attempt(true); + metrics.segment_retry_count(100, 5); + metrics.error("network", "blocks"); + + // Success if we got here + } +} diff --git a/crates/phaser-metrics/src/segment_metrics.rs b/crates/phaser-metrics/src/segment_metrics.rs new file mode 100644 index 0000000..a4ac0d3 --- /dev/null +++ b/crates/phaser-metrics/src/segment_metrics.rs @@ -0,0 +1,431 @@ +//! Standard metrics for bridge implementations +//! +//! Provides a consistent metrics interface for all bridges that implement the FlightBridge trait. +//! These metrics track worker progress, segment processing, and performance across phases. + +use prometheus::{ + register_gauge_vec, register_histogram_vec, register_int_counter_vec, register_int_gauge_vec, + GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec, +}; +use std::sync::Arc; + +/// Common segment worker metrics shared by bridges and query services. +/// Wrapped in Arc for cheap cloning across workers without copying strings/state. +pub struct SegmentWorkerMetrics { + pub service_name: String, + pub chain_id: String, + pub bridge_name: String, + worker_stage: IntGaugeVec, + active_workers: IntGaugeVec, + items_processed: IntCounterVec, + segment_attempts: IntCounterVec, + segment_duration: HistogramVec, + worker_progress: GaugeVec, + stream_retries: IntCounterVec, + segment_retry_count: IntGaugeVec, + sync_errors: IntCounterVec, +} + +/// Trait providing common segment worker metrics with default implementations. +/// Types only need to implement `base()` to get all the delegation automatically. +pub trait SegmentMetrics { + /// Access to the underlying SegmentWorkerMetrics + fn base(&self) -> &SegmentWorkerMetrics; + + /// Set the current stage for a worker + fn set_worker_stage(&self, worker_id: usize, segment_id: u64, stage: WorkerStage) { + self.base().set_worker_stage(worker_id, segment_id, stage); + } + + /// Increment active workers for a phase + fn active_workers_inc(&self, phase: &str) { + self.base().active_workers_inc(phase); + } + + /// Decrement active workers for a phase + fn active_workers_dec(&self, phase: &str) { + self.base().active_workers_dec(phase); + } + + /// Increment items processed counter + fn items_processed_inc(&self, worker_id: usize, segment_id: u64, data_type: &str, count: u64) { + self.base() + .items_processed_inc(worker_id, segment_id, data_type, count); + } + + /// Record segment attempt + fn segment_attempt(&self, success: bool) { + self.base().segment_attempt(success); + } + + /// Observe segment processing duration + fn segment_duration(&self, phase: &str, duration_secs: f64) { + self.base().segment_duration(phase, duration_secs); + } + + /// Set worker progress percentage + fn set_worker_progress( + &self, + worker_id: usize, + segment_id: u64, + phase: &str, + progress_pct: f64, + ) { + self.base() + .set_worker_progress(worker_id, segment_id, phase, progress_pct); + } + + /// Record stream retry attempt + fn stream_retry(&self, operation: &str, success: bool) { + self.base().stream_retry(operation, success); + } + + /// Set segment retry count + fn segment_retry_count(&self, segment_num: u64, count: i64) { + self.base().segment_retry_count(segment_num, count); + } + + /// Remove segment retry count metric + fn segment_retry_count_remove(&self, segment_num: u64) { + self.base().segment_retry_count_remove(segment_num); + } + + /// Record an error + fn error(&self, error_type: &str, data_type: &str) { + self.base().error(error_type, data_type); + } +} + +/// Bridge-specific metrics that add to the common segment worker metrics. +/// Cheap to clone via Arc wrapper on base. +#[derive(Clone)] +pub struct BridgeMetrics { + pub base: Arc, + grpc_streams_active: IntGaugeVec, +} + +impl SegmentMetrics for BridgeMetrics { + fn base(&self) -> &SegmentWorkerMetrics { + &self.base + } +} + +impl SegmentWorkerMetrics { + pub fn new( + service_name: impl Into, + chain_id: u64, + bridge_name: impl Into, + ) -> Self { + let service_name = service_name.into(); + let chain_id_str = chain_id.to_string(); + let bridge_name = bridge_name.into(); + + Self { + service_name: service_name.clone(), + chain_id: chain_id_str, + bridge_name: bridge_name.clone(), + worker_stage: register_int_gauge_vec!( + format!("{}_worker_stage", service_name), + "Current stage for each worker (0=idle, 1=blocks, 2=transactions, 3=logs)", + &[ + "chain_id", + "bridge_name", + "worker_id", + "segment_id", + "stage" + ] + ) + .unwrap(), + active_workers: register_int_gauge_vec!( + format!("{}_active_workers", service_name), + "Number of workers currently active in each processing phase", + &["chain_id", "bridge_name", "phase"] + ) + .unwrap(), + items_processed: register_int_counter_vec!( + format!("{}_items_processed_total", service_name), + "Total items processed by data type", + &[ + "chain_id", + "bridge_name", + "worker_id", + "segment_id", + "data_type" + ] + ) + .unwrap(), + segment_attempts: register_int_counter_vec!( + format!("{}_segment_attempts_total", service_name), + "Total segment processing attempts by result", + &["chain_id", "bridge_name", "result"] // result: "success", "failure" + ) + .unwrap(), + segment_duration: register_histogram_vec!( + format!("{}_segment_duration_seconds", service_name), + "Time to process a segment by phase", + &["chain_id", "bridge_name", "phase"], + vec![1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0] // 1s to 30min + ) + .unwrap(), + worker_progress: register_gauge_vec!( + format!("{}_worker_progress_percent", service_name), + "Progress percentage for each worker in current phase", + &[ + "chain_id", + "bridge_name", + "worker_id", + "segment_id", + "phase" + ] + ) + .unwrap(), + stream_retries: register_int_counter_vec!( + format!("{}_stream_retries_total", service_name), + "Number of stream retry attempts", + &["chain_id", "bridge_name", "operation", "result"] + ) + .unwrap(), + segment_retry_count: register_int_gauge_vec!( + format!("{}_segment_retry_count", service_name), + "Current retry count for segments", + &["chain_id", "bridge_name", "segment_num"] + ) + .unwrap(), + sync_errors: register_int_counter_vec!( + format!("{}_errors_total", service_name), + "Errors by type and data category", + &["chain_id", "bridge_name", "error_type", "data_type"] + ) + .unwrap(), + } + } + + /// Set the current stage for a worker + pub fn set_worker_stage(&self, worker_id: usize, segment_id: u64, stage: WorkerStage) { + let worker_label = worker_id.to_string(); + let segment_label = segment_id.to_string(); + + // Clear all stages for this worker + for s in &["idle", "blocks", "transactions", "logs"] { + self.worker_stage + .with_label_values(&[ + &self.chain_id, + &self.bridge_name, + &worker_label, + &segment_label, + s, + ]) + .set(0); + } + + // Set current stage + let (stage_name, stage_value) = match stage { + WorkerStage::Idle => ("idle", 0), + WorkerStage::Blocks => ("blocks", 1), + WorkerStage::Transactions => ("transactions", 2), + WorkerStage::Logs => ("logs", 3), + }; + + self.worker_stage + .with_label_values(&[ + &self.chain_id, + &self.bridge_name, + &worker_label, + &segment_label, + stage_name, + ]) + .set(stage_value); + } + + /// Increment active workers for a phase + pub fn active_workers_inc(&self, phase: &str) { + self.active_workers + .with_label_values(&[&self.chain_id, &self.bridge_name, phase]) + .inc(); + } + + /// Decrement active workers for a phase + pub fn active_workers_dec(&self, phase: &str) { + self.active_workers + .with_label_values(&[&self.chain_id, &self.bridge_name, phase]) + .dec(); + } + + /// Increment items processed counter + pub fn items_processed_inc( + &self, + worker_id: usize, + segment_id: u64, + data_type: &str, + count: u64, + ) { + self.items_processed + .with_label_values(&[ + &self.chain_id, + &self.bridge_name, + &worker_id.to_string(), + &segment_id.to_string(), + data_type, + ]) + .inc_by(count); + } + + /// Record segment attempt + pub fn segment_attempt(&self, success: bool) { + let result = if success { "success" } else { "failure" }; + self.segment_attempts + .with_label_values(&[&self.chain_id, &self.bridge_name, result]) + .inc(); + } + + /// Observe segment processing duration + pub fn segment_duration(&self, phase: &str, duration_secs: f64) { + self.segment_duration + .with_label_values(&[&self.chain_id, &self.bridge_name, phase]) + .observe(duration_secs); + } + + /// Set worker progress percentage + pub fn set_worker_progress( + &self, + worker_id: usize, + segment_id: u64, + phase: &str, + progress_pct: f64, + ) { + self.worker_progress + .with_label_values(&[ + &self.chain_id, + &self.bridge_name, + &worker_id.to_string(), + &segment_id.to_string(), + phase, + ]) + .set(progress_pct); + } + + /// Record stream retry attempt + pub fn stream_retry(&self, operation: &str, success: bool) { + let result = if success { "success" } else { "failure" }; + self.stream_retries + .with_label_values(&[&self.chain_id, &self.bridge_name, operation, result]) + .inc(); + } + + /// Set segment retry count + pub fn segment_retry_count(&self, segment_num: u64, count: i64) { + self.segment_retry_count + .with_label_values(&[&self.chain_id, &self.bridge_name, &segment_num.to_string()]) + .set(count); + } + + /// Remove segment retry count metric + pub fn segment_retry_count_remove(&self, segment_num: u64) { + self.segment_retry_count + .remove_label_values(&[&self.chain_id, &self.bridge_name, &segment_num.to_string()]) + .ok(); + } + + /// Record an error + pub fn error(&self, error_type: &str, data_type: &str) { + self.sync_errors + .with_label_values(&[&self.chain_id, &self.bridge_name, error_type, data_type]) + .inc(); + } +} + +impl BridgeMetrics { + pub fn new( + service_name: impl Into, + chain_id: u64, + bridge_name: impl Into, + ) -> Self { + let service_name_str = service_name.into(); + let base = SegmentWorkerMetrics::new(service_name_str.clone(), chain_id, bridge_name); + + Self { + base: Arc::new(base), + grpc_streams_active: register_int_gauge_vec!( + format!("{}_grpc_streams_active", service_name_str), + "Number of active gRPC streams", + &["chain_id", "bridge_name", "stream_type"] + ) + .unwrap(), + } + } + + /// Increment active gRPC stream count + pub fn grpc_stream_inc(&self, stream_type: &str) { + self.grpc_streams_active + .with_label_values(&[&self.base.chain_id, &self.base.bridge_name, stream_type]) + .inc(); + } + + /// Decrement active gRPC stream count + pub fn grpc_stream_dec(&self, stream_type: &str) { + self.grpc_streams_active + .with_label_values(&[&self.base.chain_id, &self.base.bridge_name, stream_type]) + .dec(); + } +} + +/// Worker processing stages +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum WorkerStage { + Idle, + Blocks, + Transactions, + Logs, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_segment_worker_metrics_registration() { + // First registration should succeed + let metrics1 = SegmentWorkerMetrics::new("test_service", 1, "test_bridge"); + + // Verify the metrics struct was created + assert_eq!(metrics1.service_name, "test_service"); + assert_eq!(metrics1.chain_id, "1"); + assert_eq!(metrics1.bridge_name, "test_bridge"); + } + + #[test] + #[should_panic(expected = "AlreadyReg")] + fn test_duplicate_segment_worker_metrics_registration_panics() { + // Register metrics with the same service name twice - should panic + let _metrics1 = SegmentWorkerMetrics::new("duplicate_test", 1, "bridge1"); + let _metrics2 = SegmentWorkerMetrics::new("duplicate_test", 2, "bridge2"); + } + + #[test] + fn test_bridge_metrics_registration() { + // Test that BridgeMetrics can be created + let metrics = BridgeMetrics::new("test_bridge_service", 1, "erigon"); + + // Verify base metrics are accessible + assert_eq!(metrics.base.service_name, "test_bridge_service"); + assert_eq!(metrics.base.chain_id, "1"); + assert_eq!(metrics.base.bridge_name, "erigon"); + } + + #[test] + #[should_panic(expected = "AlreadyReg")] + fn test_duplicate_bridge_metrics_registration_panics() { + // Register BridgeMetrics with the same service name twice - should panic + let _metrics1 = BridgeMetrics::new("duplicate_bridge", 1, "bridge1"); + let _metrics2 = BridgeMetrics::new("duplicate_bridge", 2, "bridge2"); + } + + #[test] + fn test_different_service_names_coexist() { + // Different service names should be able to coexist + let _metrics1 = SegmentWorkerMetrics::new("service_a", 1, "bridge1"); + let _metrics2 = SegmentWorkerMetrics::new("service_b", 1, "bridge2"); + + // If we got here without panic, test passes + } +} From 92cc24ff10fb9dc1ba592adf4c9e8cb2dd874ec8 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 30 Oct 2025 14:16:00 -0700 Subject: [PATCH 2/8] Refactor phaser-query to use phaser-metrics crate - Replace lazy_static metrics with QueryMetrics instances - Pass metrics through SyncWorkerConfig instead of globals - Import SegmentMetrics trait from phaser-metrics - Update service.rs to create metrics with service_name - Update worker.rs to use instance methods instead of statics - Track segment_total_duration for per-segment timing - Use scopeguard with cloned metrics for phase tracking - Remove prometheus and lazy_static dependencies Removes 50+ lines of static metric definitions and unnecessary dependencies. Consumers now use phaser-metrics' gather_metrics() instead of importing prometheus directly. --- crates/phaser-query/Cargo.toml | 3 +- crates/phaser-query/src/sync/metrics.rs | 53 +----------------------- crates/phaser-query/src/sync/service.rs | 55 ++++++++++++++----------- crates/phaser-query/src/sync/worker.rs | 37 +++++++---------- 4 files changed, 49 insertions(+), 99 deletions(-) diff --git a/crates/phaser-query/Cargo.toml b/crates/phaser-query/Cargo.toml index ccf3e2e..461fb2e 100644 --- a/crates/phaser-query/Cargo.toml +++ b/crates/phaser-query/Cargo.toml @@ -14,6 +14,7 @@ path = "src/bin/phaser-cli.rs" [dependencies] # Local dependencies phaser-bridge = { workspace = true } +phaser-metrics = { path = "../phaser-metrics" } erigon-bridge = { workspace = true } evm-common = { workspace = true } core-executor = { workspace = true } @@ -55,8 +56,6 @@ tokio-stream = "0.1" alloy-consensus.workspace = true alloy-rlp.workspace = true chrono = "0.4.42" -prometheus = "0.13" -lazy_static = "1.5" scopeguard = "1.2" [build-dependencies] diff --git a/crates/phaser-query/src/sync/metrics.rs b/crates/phaser-query/src/sync/metrics.rs index d91bc3c..28099e9 100644 --- a/crates/phaser-query/src/sync/metrics.rs +++ b/crates/phaser-query/src/sync/metrics.rs @@ -1,54 +1,5 @@ //! Prometheus metrics for phaser-query sync service //! -//! Tracks sync job progress, errors, retries, and worker activity +//! Re-exports QueryMetrics from phaser-metrics crate -use lazy_static::lazy_static; -use prometheus::{ - register_histogram_vec, register_int_counter_vec, register_int_gauge_vec, HistogramVec, - IntCounterVec, IntGaugeVec, -}; - -lazy_static! { - /// Queue depths for work tracking - pub static ref SYNC_QUEUE_DEPTH: IntGaugeVec = register_int_gauge_vec!( - "phaser_sync_queue_depth", - "Number of segments in each queue", - &["queue"] // "pending", "retrying" - ).unwrap(); - - /// Segment processing attempts - pub static ref SEGMENT_ATTEMPTS: IntCounterVec = register_int_counter_vec!( - "phaser_sync_segment_attempts_total", - "Total segment processing attempts", - &["result"] // "success", "retry" - ).unwrap(); - - /// Errors by category - pub static ref SYNC_ERRORS: IntCounterVec = register_int_counter_vec!( - "phaser_sync_errors_total", - "Sync errors by type and data category", - &["error_type", "data_type"] // error_type: "connection", "timeout", "no_data", etc. - ).unwrap(); - - /// Segment processing duration - pub static ref SEGMENT_DURATION: HistogramVec = register_histogram_vec!( - "phaser_sync_segment_duration_seconds", - "Time to complete a segment (including retries)", - &["data_type"], - vec![10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0, 3600.0] - ).unwrap(); - - /// Active workers - pub static ref ACTIVE_WORKERS: IntGaugeVec = register_int_gauge_vec!( - "phaser_sync_active_workers", - "Number of workers actively processing", - &["phase"] // "blocks", "transactions", "logs" - ).unwrap(); - - /// Segment retry count gauge (tracks current retry count for in-progress segments) - pub static ref SEGMENT_RETRY_COUNT: IntGaugeVec = register_int_gauge_vec!( - "phaser_sync_segment_retry_count", - "Current retry count for segments", - &["segment_num"] - ).unwrap(); -} +pub use phaser_metrics::QueryMetrics as SyncMetrics; diff --git a/crates/phaser-query/src/sync/service.rs b/crates/phaser-query/src/sync/service.rs index 63c656e..cf4cafd 100644 --- a/crates/phaser-query/src/sync/service.rs +++ b/crates/phaser-query/src/sync/service.rs @@ -6,6 +6,7 @@ use crate::sync::worker::{ProgressTracker, SyncWorker, SyncWorkerConfig}; use crate::PhaserConfig; use anyhow::Result; use core_executor::ThreadPoolExecutor; +use phaser_metrics::SegmentMetrics; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -295,6 +296,13 @@ impl SyncServer { const MIN_BACKOFF_SECS: u64 = 1; const MAX_BACKOFF_SECS: u64 = 10; + // Create metrics helper once + let sync_metrics = metrics::SyncMetrics::new( + "phaser_query", + job_config.chain_id, + job_config.bridge_name.clone(), + ); + for worker_id in 0..num_workers { let bridge_endpoint = job_config.bridge_endpoint.clone(); let data_dir = data_dir.clone(); @@ -304,6 +312,7 @@ impl SyncServer { let validation_stage = config.validation_stage; let job_complete = job_complete.clone(); let historical_boundary = job_config.historical_boundary; + let metrics = sync_metrics.clone(); let handle: tokio::task::JoinHandle> = tokio::spawn(async move { loop { @@ -318,9 +327,7 @@ impl SyncServer { let mut queue = segment_queue.lock().await; // Update queue depth metric - metrics::SYNC_QUEUE_DEPTH - .with_label_values(&["pending"]) - .set(queue.len() as i64); + metrics.sync_queue_depth("pending", queue.len() as i64); queue.pop_front() }; @@ -352,8 +359,14 @@ impl SyncServer { "Starting segment processing" ); + // Reset retry metric on first attempt + if retry_count == 0 { + metrics.segment_sync_retries(segment_num, 0); + } + // Create and run worker for this segment let config = SyncWorkerConfig { + metrics: metrics.clone(), bridge_endpoint: bridge_endpoint.clone(), data_dir: data_dir.clone(), from_block: segment_from, @@ -374,18 +387,17 @@ impl SyncServer { let duration = segment_start.elapsed(); // Metrics - metrics::SEGMENT_ATTEMPTS - .with_label_values(&["success"]) - .inc(); + metrics.segment_attempts("success"); + metrics.segment_duration("all", duration.as_secs_f64()); + + // Record total duration for this segment (from first attempt to completion) + metrics.segment_total_duration(segment_num, duration.as_secs() as i64); - metrics::SEGMENT_DURATION - .with_label_values(&["all"]) - .observe(duration.as_secs_f64()); + // Set final retry count for this sync run + metrics.segment_sync_retries(segment_num, retry_count as i64); - // Clear retry count metric for this segment - metrics::SEGMENT_RETRY_COUNT - .remove_label_values(&[&segment_num.to_string()]) - .ok(); + // Clear current retry count gauge (no longer needed) + metrics.segment_retry_count_remove(segment_num); info!( worker_id = worker_id, @@ -401,18 +413,11 @@ impl SyncServer { let data_type = sync_err.data_type.as_str(); // Metrics - metrics::SYNC_ERRORS - .with_label_values(&[error_category, data_type]) - .inc(); - - metrics::SEGMENT_ATTEMPTS - .with_label_values(&["retry"]) - .inc(); - - // Update retry count metric - metrics::SEGMENT_RETRY_COUNT - .with_label_values(&[&segment_num.to_string()]) - .set((retry_count + 1) as i64); + metrics.sync_errors(error_category, data_type); + metrics.segment_attempts("retry"); + + // Update retry count metric (gauge for current retry count) + metrics.segment_retry_count(segment_num, (retry_count + 1) as i64); // Calculate backoff: exponential up to 5 retries, then cap at 10s let backoff_secs = if retry_count < MAX_RETRIES_WITHOUT_PROGRESS { diff --git a/crates/phaser-query/src/sync/worker.rs b/crates/phaser-query/src/sync/worker.rs index 41c8371..fc81c98 100644 --- a/crates/phaser-query/src/sync/worker.rs +++ b/crates/phaser-query/src/sync/worker.rs @@ -8,6 +8,7 @@ use evm_common::transaction::TransactionRecord; use futures::StreamExt; use phaser_bridge::client::FlightBridgeClient; use phaser_bridge::descriptors::{BlockchainDescriptor, StreamType, ValidationStage}; +use phaser_metrics::SegmentMetrics; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; @@ -63,6 +64,7 @@ pub struct ProgressUpdate { /// Configuration for creating a SyncWorker pub struct SyncWorkerConfig { + pub metrics: super::metrics::SyncMetrics, pub bridge_endpoint: String, pub data_dir: PathBuf, pub from_block: u64, @@ -77,6 +79,7 @@ pub struct SyncWorkerConfig { /// A worker that syncs a specific block range from erigon-bridge pub struct SyncWorker { worker_id: u32, + metrics: super::metrics::SyncMetrics, bridge_endpoint: String, data_dir: PathBuf, from_block: u64, @@ -134,6 +137,7 @@ impl SyncWorker { Self { worker_id, + metrics: config.metrics, bridge_endpoint: config.bridge_endpoint, data_dir: config.data_dir, from_block: config.from_block, @@ -384,13 +388,10 @@ impl SyncWorker { to_block: u64, ) -> Result<(u64, u64), SyncError> { // Track phase - super::metrics::ACTIVE_WORKERS - .with_label_values(&["blocks"]) - .inc(); - let _phase_guard = scopeguard::guard((), |_| { - super::metrics::ACTIVE_WORKERS - .with_label_values(&["blocks"]) - .dec(); + self.metrics.active_workers_inc("blocks"); + let metrics = self.metrics.clone(); + let _phase_guard = scopeguard::guard(metrics, |m| { + m.active_workers_dec("blocks"); }); let mut writer = ParquetWriter::with_config_and_mode( @@ -646,13 +647,10 @@ impl SyncWorker { to_block: u64, ) -> Result<(u64, u64), SyncError> { // Track phase - super::metrics::ACTIVE_WORKERS - .with_label_values(&["transactions"]) - .inc(); - let _phase_guard = scopeguard::guard((), |_| { - super::metrics::ACTIVE_WORKERS - .with_label_values(&["transactions"]) - .dec(); + self.metrics.active_workers_inc("transactions"); + let metrics = self.metrics.clone(); + let _phase_guard = scopeguard::guard(metrics, |m| { + m.active_workers_dec("transactions"); }); let mut writer = ParquetWriter::with_config_and_mode( @@ -1055,13 +1053,10 @@ impl SyncWorker { to_block: u64, ) -> Result<(u64, u64), SyncError> { // Track phase - super::metrics::ACTIVE_WORKERS - .with_label_values(&["logs"]) - .inc(); - let _phase_guard = scopeguard::guard((), |_| { - super::metrics::ACTIVE_WORKERS - .with_label_values(&["logs"]) - .dec(); + self.metrics.active_workers_inc("logs"); + let metrics = self.metrics.clone(); + let _phase_guard = scopeguard::guard(metrics, |m| { + m.active_workers_dec("logs"); }); let mut writer = ParquetWriter::with_config_and_mode( From 2ba8ec82d7f2e18174bfbd9c3e25d09363445180 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Thu, 30 Oct 2025 14:16:00 -0700 Subject: [PATCH 3/8] Remove broken example from validators/evm docs The example code referenced methods that don't exist in the current API. Remove it rather than maintaining outdated examples. --- crates/validators/evm/src/lib.rs | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/crates/validators/evm/src/lib.rs b/crates/validators/evm/src/lib.rs index 96909dd..4a87391 100644 --- a/crates/validators/evm/src/lib.rs +++ b/crates/validators/evm/src/lib.rs @@ -2,22 +2,6 @@ //! //! Provides validation functions and pluggable executors for verifying //! Ethereum transaction merkle roots. -//! -//! # Example -//! -//! ```no_run -//! use validators_evm::{ExecutorConfig, ValidationExecutor}; -//! -//! # async fn example() -> Result<(), Box> { -//! // Configure a Rayon-based executor -//! let config = ExecutorConfig::Rayon { num_threads: None }; -//! let executor = config.build(); -//! -//! // Validate a block's transactions -//! // executor.validate_block(block, transactions).await?; -//! # Ok(()) -//! # } -//! ``` pub mod error; pub mod executor; From 27445bbe6fc88e0be1479bc921a86383168407b0 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Mon, 3 Nov 2025 10:36:41 -0800 Subject: [PATCH 4/8] Refactor erigon-bridge to use phaser-metrics crate - Add phaser-metrics to workspace dependencies - Replace lazy_static global metrics with Arc-wrapped BridgeMetrics - Update ErigonFlightBridge to initialize and hold metrics instance - Thread metrics through SegmentWorker constructors and async streams - Update all metric call sites to use trait methods instead of statics - Handle Result type from gather_metrics in main.rs metrics endpoint - Maintains same metric names and functionality, just cleaner architecture Part of metrics refactor: consolidating metrics into trait-based composable pattern --- Cargo.lock | 1 + Cargo.toml | 1 + crates/bridges/evm/erigon-bridge/Cargo.toml | 1 + .../bridges/evm/erigon-bridge/src/bridge.rs | 32 +++- crates/bridges/evm/erigon-bridge/src/main.rs | 3 +- .../bridges/evm/erigon-bridge/src/metrics.rs | 151 +----------------- .../evm/erigon-bridge/src/segment_worker.rs | 51 +++--- 7 files changed, 64 insertions(+), 176 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 95375fa..f1aa5c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2386,6 +2386,7 @@ dependencies = [ "lazy_static", "num_cpus", "phaser-bridge", + "phaser-metrics", "prometheus", "prost", "prost-types", diff --git a/Cargo.toml b/Cargo.toml index 6f6780b..db34912 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ erigon-bridge = { path = "crates/bridges/evm/erigon-bridge" } jsonrpc-bridge = { path = "crates/bridges/evm/jsonrpc-bridge" } phaser-bridge = { path = "crates/phaser-bridge" } +phaser-metrics = { path = "crates/phaser-metrics" } evm-common = { path = "crates/schemas/evm/common", features = ["views"] } validators-evm = { path = "crates/validators/evm" } core-executor = { git = "https://github.com/dwerner/core-executor", rev = "376f2c62abbaceafef639fa24ab82199a0ce8cfe" } diff --git a/crates/bridges/evm/erigon-bridge/Cargo.toml b/crates/bridges/evm/erigon-bridge/Cargo.toml index 65f4c4e..cab0c18 100644 --- a/crates/bridges/evm/erigon-bridge/Cargo.toml +++ b/crates/bridges/evm/erigon-bridge/Cargo.toml @@ -14,6 +14,7 @@ path = "src/main.rs" [dependencies] # Local dependencies phaser-bridge = { workspace = true } +phaser-metrics = { workspace = true } evm-common = { workspace = true } validators-evm = { workspace = true } diff --git a/crates/bridges/evm/erigon-bridge/src/bridge.rs b/crates/bridges/evm/erigon-bridge/src/bridge.rs index aa3a4d9..516098b 100644 --- a/crates/bridges/evm/erigon-bridge/src/bridge.rs +++ b/crates/bridges/evm/erigon-bridge/src/bridge.rs @@ -20,6 +20,7 @@ use crate::client::ErigonClient; use crate::client_pool::{ClientPool, PoolConfig}; use crate::converter::ErigonDataConverter; use crate::error::ErigonBridgeError; +use crate::metrics::BridgeMetrics; use crate::segment_worker::{split_into_segments, SegmentConfig, SegmentWorker}; use crate::streaming_service::StreamingService; use crate::trie_client::TrieClient; @@ -36,6 +37,7 @@ pub struct ErigonFlightBridge { streaming_service: Arc, validator: Option>, segment_config: SegmentConfig, + metrics: BridgeMetrics, } impl ErigonFlightBridge { @@ -105,6 +107,9 @@ impl ErigonFlightBridge { } }); + // Initialize metrics + let metrics = BridgeMetrics::new("erigon_bridge", chain_id, "erigon"); + Ok(Self { client: Arc::new(tokio::sync::Mutex::new(client)), blockdata_pool: Arc::new(blockdata_pool), @@ -113,6 +118,7 @@ impl ErigonFlightBridge { streaming_service, validator, segment_config: segment_config.unwrap_or_default(), + metrics, }) } @@ -205,6 +211,7 @@ impl ErigonFlightBridge { /// /// Extracted as a separate function to avoid lifetime capture issues fn process_transactions_with_segments( + &self, pool: Arc, config: SegmentConfig, validator: Option>, @@ -215,6 +222,7 @@ impl ErigonFlightBridge { { let max_concurrent = config.max_concurrent_segments; let should_validate = validate && validator.is_some(); + let metrics = self.metrics.clone(); // Split range into segments let segments = split_into_segments(start, end, config.segment_size); @@ -231,6 +239,7 @@ impl ErigonFlightBridge { } else { None }; + let metrics = metrics.clone(); async move { // Get a client from the pool for this segment @@ -294,7 +303,13 @@ impl ErigonFlightBridge { ); let worker = SegmentWorker::new( - worker_id, seg_start, seg_end, client, config, validator, + worker_id, + seg_start, + seg_end, + client, + config, + validator, + metrics.clone(), ); // Convert the stream to handle FlightError @@ -327,6 +342,7 @@ impl ErigonFlightBridge { /// /// Extracted as a separate function to avoid lifetime capture issues fn process_logs_with_segments( + &self, pool: Arc, config: SegmentConfig, validator: Option>, @@ -337,6 +353,7 @@ impl ErigonFlightBridge { { let max_concurrent = config.max_concurrent_segments; let should_validate = validate && validator.is_some(); + let metrics = self.metrics.clone(); // Split range into segments let segments = split_into_segments(start, end, config.segment_size); @@ -353,6 +370,7 @@ impl ErigonFlightBridge { } else { None }; + let metrics = metrics.clone(); async move { // Get a client from the pool for this segment @@ -416,7 +434,13 @@ impl ErigonFlightBridge { ); let worker = SegmentWorker::new_for_logs( - worker_id, seg_start, seg_end, client, config, validator, + worker_id, + seg_start, + seg_end, + client, + config, + validator, + metrics.clone(), ); // Convert the stream to handle FlightError @@ -495,7 +519,7 @@ impl ErigonFlightBridge { // Handle transactions and logs separately since they use segment workers if stream_type == StreamType::Transactions { let pool = self.blockdata_pool.clone(); - let stream = Self::process_transactions_with_segments( + let stream = self.process_transactions_with_segments( pool, self.segment_config.clone(), self.validator.clone(), @@ -508,7 +532,7 @@ impl ErigonFlightBridge { if stream_type == StreamType::Logs { let pool = self.blockdata_pool.clone(); - let stream = Self::process_logs_with_segments( + let stream = self.process_logs_with_segments( pool, self.segment_config.clone(), self.validator.clone(), diff --git a/crates/bridges/evm/erigon-bridge/src/main.rs b/crates/bridges/evm/erigon-bridge/src/main.rs index 0fc05e1..c8cec73 100644 --- a/crates/bridges/evm/erigon-bridge/src/main.rs +++ b/crates/bridges/evm/erigon-bridge/src/main.rs @@ -151,7 +151,8 @@ async fn main() -> Result<()> { let make_svc = hyper::service::make_service_fn(|_conn| async { Ok::<_, Infallible>(hyper::service::service_fn(|_req| async { - let metrics = metrics::gather_metrics(); + let metrics = metrics::gather_metrics() + .unwrap_or_else(|e| format!("Error gathering metrics: {}", e)); let mut response = hyper::Response::new(hyper::Body::from(metrics)); response.headers_mut().insert( hyper::header::CONTENT_TYPE, diff --git a/crates/bridges/evm/erigon-bridge/src/metrics.rs b/crates/bridges/evm/erigon-bridge/src/metrics.rs index ef591ab..92d8bd6 100644 --- a/crates/bridges/evm/erigon-bridge/src/metrics.rs +++ b/crates/bridges/evm/erigon-bridge/src/metrics.rs @@ -1,152 +1,5 @@ //! Prometheus metrics for erigon-bridge //! -//! Tracks worker progress, stage transitions, and performance metrics +//! Re-exports BridgeMetrics from phaser-metrics crate -use lazy_static::lazy_static; -use prometheus::{ - register_gauge_vec, register_histogram_vec, register_int_counter_vec, register_int_gauge_vec, - Encoder, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec, TextEncoder, -}; - -lazy_static! { - /// Worker stage tracking - shows which stage each worker is currently in - /// Labels: worker_id, segment_id, stage=[blocks|transactions|logs|idle] - pub static ref WORKER_STAGE: IntGaugeVec = register_int_gauge_vec!( - "erigon_bridge_worker_stage", - "Current stage for each worker (0=idle, 1=blocks, 2=transactions, 3=logs)", - &["worker_id", "segment_id", "stage"] - ).unwrap(); - - /// Active workers by processing phase - /// Labels: phase=[blocks|transactions|logs] - pub static ref WORKERS_ACTIVE: IntGaugeVec = register_int_gauge_vec!( - "erigon_bridge_workers_active", - "Number of workers currently active in each processing phase", - &["phase"] - ).unwrap(); - - /// Blocks processed per worker - /// Labels: worker_id, segment_id - pub static ref BLOCKS_PROCESSED: IntCounterVec = register_int_counter_vec!( - "erigon_bridge_blocks_processed_total", - "Total blocks processed by each worker", - &["worker_id", "segment_id"] - ).unwrap(); - - /// Transactions processed per worker - /// Labels: worker_id, segment_id - pub static ref TRANSACTIONS_PROCESSED: IntCounterVec = register_int_counter_vec!( - "erigon_bridge_transactions_processed_total", - "Total transactions processed by each worker", - &["worker_id", "segment_id"] - ).unwrap(); - - /// Logs processed per worker - /// Labels: worker_id, segment_id - pub static ref LOGS_PROCESSED: IntCounterVec = register_int_counter_vec!( - "erigon_bridge_logs_processed_total", - "Total logs processed by each worker", - &["worker_id", "segment_id"] - ).unwrap(); - - /// Segment completion tracking - /// Labels: phase=[blocks|transactions|logs], result=[success|failure] - pub static ref SEGMENTS_COMPLETED: IntCounterVec = register_int_counter_vec!( - "erigon_bridge_segments_completed_total", - "Total segments completed by phase and result", - &["phase", "result"] - ).unwrap(); - - /// Segment processing duration - /// Labels: phase=[blocks|transactions|logs] - pub static ref SEGMENT_DURATION: HistogramVec = register_histogram_vec!( - "erigon_bridge_segment_duration_seconds", - "Time to process a segment by phase", - &["phase"], - vec![1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0] // 1s to 30min - ).unwrap(); - - /// Worker progress percentage (0.0 - 100.0) - /// Labels: worker_id, segment_id, phase - pub static ref WORKER_PROGRESS: GaugeVec = register_gauge_vec!( - "erigon_bridge_worker_progress_percent", - "Progress percentage for each worker in current phase", - &["worker_id", "segment_id", "phase"] - ).unwrap(); - - /// gRPC stream retry attempts - /// Labels: operation=[blocks|transactions|logs], result=[success|failure] - pub static ref STREAM_RETRIES: IntCounterVec = register_int_counter_vec!( - "erigon_bridge_stream_retries_total", - "Number of gRPC stream retry attempts", - &["operation", "result"] - ).unwrap(); - - /// Active gRPC streams - pub static ref GRPC_STREAMS_ACTIVE: IntGaugeVec = register_int_gauge_vec!( - "erigon_bridge_grpc_streams_active", - "Number of active gRPC streams", - &["stream_type"] - ).unwrap(); - - /// Memory usage (heap bytes) - pub static ref MEMORY_HEAP_BYTES: IntGaugeVec = register_int_gauge_vec!( - "erigon_bridge_memory_heap_bytes", - "Current heap memory usage in bytes", - &["type"] - ).unwrap(); -} - -/// Helper to set worker stage -pub fn set_worker_stage(worker_id: usize, segment_id: u64, stage: WorkerStage) { - let worker_label = worker_id.to_string(); - let segment_label = segment_id.to_string(); - - // Clear all stages for this worker - for s in &["idle", "blocks", "transactions", "logs"] { - WORKER_STAGE - .with_label_values(&[&worker_label, &segment_label, s]) - .set(0); - } - - // Set current stage - let (stage_name, stage_value) = match stage { - WorkerStage::Idle => ("idle", 0), - WorkerStage::Blocks => ("blocks", 1), - WorkerStage::Transactions => ("transactions", 2), - }; - - WORKER_STAGE - .with_label_values(&[&worker_label, &segment_label, stage_name]) - .set(stage_value); -} - -/// Helper to set worker progress -pub fn set_worker_progress(worker_id: usize, segment_id: u64, phase: &str, progress_pct: f64) { - WORKER_PROGRESS - .with_label_values(&[&worker_id.to_string(), &segment_id.to_string(), phase]) - .set(progress_pct); -} - -/// Helper to record segment completion -pub fn record_segment_complete(phase: &str, success: bool) { - let result = if success { "success" } else { "failure" }; - SEGMENTS_COMPLETED.with_label_values(&[phase, result]).inc(); -} - -/// Worker processing stages -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum WorkerStage { - Idle, - Blocks, - Transactions, -} - -/// Get metrics in Prometheus text format -pub fn gather_metrics() -> String { - let encoder = TextEncoder::new(); - let metric_families = prometheus::gather(); - let mut buffer = Vec::new(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - String::from_utf8(buffer).unwrap() -} +pub use phaser_metrics::{gather_metrics, BridgeMetrics, SegmentMetrics, WorkerStage}; diff --git a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs index 3139259..fed3180 100644 --- a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs +++ b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs @@ -7,7 +7,7 @@ use crate::blockdata_client::BlockDataClient; use crate::blockdata_converter::BlockDataConverter; use crate::error::ErigonBridgeError; -use crate::metrics; +use crate::metrics::{SegmentMetrics, WorkerStage}; use crate::proto::custom::TransactionData; use alloy_consensus::Header; use alloy_primitives::Bytes; @@ -78,6 +78,7 @@ pub struct SegmentWorker { config: SegmentConfig, validator: Option>, data_type: SegmentDataType, + metrics: crate::metrics::BridgeMetrics, } impl SegmentWorker { @@ -89,6 +90,7 @@ impl SegmentWorker { blockdata_client: BlockDataClient, config: SegmentConfig, validator: Option>, + metrics: crate::metrics::BridgeMetrics, ) -> Self { Self { worker_id, @@ -98,6 +100,7 @@ impl SegmentWorker { config, validator, data_type: SegmentDataType::Transactions, + metrics, } } @@ -109,6 +112,7 @@ impl SegmentWorker { blockdata_client: BlockDataClient, config: SegmentConfig, validator: Option>, + metrics: crate::metrics::BridgeMetrics, ) -> Self { Self { worker_id, @@ -118,6 +122,7 @@ impl SegmentWorker { config, validator, data_type: SegmentDataType::Logs, + metrics, } } @@ -154,7 +159,8 @@ impl SegmentWorker { ); // Set initial worker stage - metrics::set_worker_stage(worker_id, segment_id, metrics::WorkerStage::Blocks); + self.metrics + .set_worker_stage(worker_id, segment_id, WorkerStage::Blocks); // Clone the client at the start (shares underlying HTTP/2 connection) let mut client = self.blockdata_client.clone(); @@ -177,7 +183,7 @@ impl SegmentWorker { Err(e) => { error!("Worker {} segment {}: Failed to fetch headers for blocks {}-{}: {}", worker_id, segment_id, current_block, chunk_end, e); - metrics::record_segment_complete("blocks", false); + self.metrics.segment_attempt(false); yield Err(e); return; } @@ -200,14 +206,15 @@ impl SegmentWorker { // Update progress - blocks phase let blocks_processed = current_block - self.segment_start; let progress_pct = (blocks_processed as f64 / total_blocks as f64) * 100.0; - metrics::set_worker_progress(worker_id, segment_id, "blocks", progress_pct); - metrics::BLOCKS_PROCESSED - .with_label_values(&[&worker_id.to_string(), &segment_id.to_string()]) - .inc_by(num_headers as u64); + self.metrics + .set_worker_progress(worker_id, segment_id, "blocks", progress_pct); + self.metrics + .items_processed_inc(worker_id, segment_id, "blocks", num_headers as u64); // Transition to transactions phase on first chunk only if first_chunk { - metrics::set_worker_stage(worker_id, segment_id, metrics::WorkerStage::Transactions); + self.metrics + .set_worker_stage(worker_id, segment_id, WorkerStage::Transactions); first_chunk = false; } @@ -218,7 +225,7 @@ impl SegmentWorker { let start_block = chunk.first().unwrap().0; let end_block = chunk.last().unwrap().0; - metrics::GRPC_STREAMS_ACTIVE.with_label_values(&["transactions"]).inc(); + self.metrics.grpc_stream_inc("transactions"); debug!( "Worker {} segment {}: Streaming transactions for blocks {}-{}", @@ -240,14 +247,13 @@ impl SegmentWorker { worker_id, segment_id, batch_count, tx_batch.transactions.len()); let tx_count = tx_batch.transactions.len() as u64; all_transactions.extend(tx_batch.transactions); - metrics::TRANSACTIONS_PROCESSED - .with_label_values(&[&worker_id.to_string(), &segment_id.to_string()]) - .inc_by(tx_count); + self.metrics + .items_processed_inc(worker_id, segment_id, "transactions", tx_count); } Err(e) => { error!("Worker {} segment {}: Transaction stream error after {} batches: {}", worker_id, segment_id, batch_count, e); - metrics::GRPC_STREAMS_ACTIVE.with_label_values(&["transactions"]).dec(); + self.metrics.grpc_stream_dec("transactions"); yield Err(ErigonBridgeError::ErigonClient(Box::new(e))); return; } @@ -257,7 +263,7 @@ impl SegmentWorker { // Stream completed successfully info!("Worker {} segment {}: Transaction stream completed for blocks {}-{}, received {} batches with {} total transactions", worker_id, segment_id, start_block, end_block, batch_count, all_transactions.len()); - metrics::GRPC_STREAMS_ACTIVE.with_label_values(&["transactions"]).dec(); + self.metrics.grpc_stream_dec("transactions"); // Peekable iterator to pull transactions for each block let mut tx_iter = all_transactions.into_iter().peekable(); @@ -320,7 +326,7 @@ impl SegmentWorker { Ok(batch) => batch, Err(e) => { // Clean up metrics before error return - metrics::record_segment_complete("transactions", false); + self.metrics.segment_attempt(false); yield Err(e); return; } @@ -348,8 +354,9 @@ impl SegmentWorker { segment_id, total_blocks ); - metrics::set_worker_stage(worker_id, segment_id, metrics::WorkerStage::Idle); - metrics::record_segment_complete("transactions", false); + self.metrics + .set_worker_stage(worker_id, segment_id, WorkerStage::Idle); + self.metrics.segment_attempt(false); yield Err(ErigonBridgeError::StreamProtocol(StreamError::ZeroBatchesConsumed { start: self.segment_start, end: self.segment_end, @@ -366,11 +373,11 @@ impl SegmentWorker { yielded_batches ); - metrics::SEGMENT_DURATION - .with_label_values(&["transactions"]) - .observe(duration); - metrics::set_worker_stage(worker_id, segment_id, metrics::WorkerStage::Idle); - metrics::record_segment_complete("transactions", true); + self.metrics + .segment_duration("transactions", duration); + self.metrics + .set_worker_stage(worker_id, segment_id, WorkerStage::Idle); + self.metrics.segment_attempt(true); } } From 8ad0907f9dc7bb17968d2473dfc1d526932f8053 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Tue, 4 Nov 2025 12:01:19 -0800 Subject: [PATCH 5/8] Refactor erigon-bridge metrics server to use axum - Replace hyper dependency with axum - Replace hyper service boilerplate with clean axum Router - Return proper HTTP 500 errors when metrics gathering fails - Use standard axum patterns from official examples - Simplify server binding with format string Benefits: - Much simpler and more readable code (~30 lines vs ~15 lines) - Better error handling with proper status codes - Follows axum best practices --- Cargo.lock | 129 ++++++------------- crates/bridges/evm/erigon-bridge/Cargo.toml | 2 +- crates/bridges/evm/erigon-bridge/src/main.rs | 44 ++++--- 3 files changed, 68 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1aa5c7..e5a6b10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -282,7 +282,7 @@ checksum = "be436893c0d1f7a57d1d8f1b6b9af9db04174468410b7e6e1d1893e78110a3bc" dependencies = [ "alloy-primitives", "alloy-sol-types", - "http 1.3.1", + "http", "serde", "serde_json", "thiserror 2.0.16", @@ -770,7 +770,7 @@ dependencies = [ "alloy-pubsub", "alloy-transport", "futures", - "http 1.3.1", + "http", "rustls", "serde_json", "tokio", @@ -1397,10 +1397,10 @@ dependencies = [ "axum-core 0.4.5", "bytes", "futures-util", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", - "hyper 1.7.0", + "hyper", "hyper-util", "itoa", "matchit 0.7.3", @@ -1430,8 +1430,8 @@ dependencies = [ "axum-core 0.5.2", "bytes", "futures-util", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", "itoa", "matchit 0.8.4", @@ -1456,8 +1456,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", "mime", "pin-project-lite", @@ -1476,8 +1476,8 @@ checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" dependencies = [ "bytes", "futures-core", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", "mime", "pin-project-lite", @@ -2374,14 +2374,14 @@ dependencies = [ "arrow-schema", "async-stream", "async-trait", + "axum 0.7.9", "chrono", "clap", "derive_more 1.0.0", "evm-common", "futures", "hex", - "http 1.3.1", - "hyper 0.14.32", + "http", "hyper-util", "lazy_static", "num_cpus", @@ -2834,7 +2834,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.3.1", + "http", "indexmap 2.11.4", "slab", "tokio", @@ -2919,17 +2919,6 @@ dependencies = [ "digest 0.10.7", ] -[[package]] -name = "http" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" -dependencies = [ - "bytes", - "fnv", - "itoa", -] - [[package]] name = "http" version = "1.3.1" @@ -2941,17 +2930,6 @@ dependencies = [ "itoa", ] -[[package]] -name = "http-body" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" -dependencies = [ - "bytes", - "http 0.2.12", - "pin-project-lite", -] - [[package]] name = "http-body" version = "1.0.1" @@ -2959,7 +2937,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 1.3.1", + "http", ] [[package]] @@ -2970,8 +2948,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "pin-project-lite", ] @@ -3021,29 +2999,6 @@ dependencies = [ "windows-sys 0.61.1", ] -[[package]] -name = "hyper" -version = "0.14.32" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" -dependencies = [ - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "httparse", - "httpdate", - "itoa", - "pin-project-lite", - "socket2 0.5.10", - "tokio", - "tower-service", - "tracing", - "want", -] - [[package]] name = "hyper" version = "1.7.0" @@ -3055,8 +3010,8 @@ dependencies = [ "futures-channel", "futures-core", "h2", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "httparse", "httpdate", "itoa", @@ -3073,8 +3028,8 @@ version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http 1.3.1", - "hyper 1.7.0", + "http", + "hyper", "hyper-util", "rustls", "rustls-pki-types", @@ -3090,7 +3045,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper 1.7.0", + "hyper", "hyper-util", "pin-project-lite", "tokio", @@ -3108,9 +3063,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http 1.3.1", - "http-body 1.0.1", - "hyper 1.7.0", + "http", + "http-body", + "hyper", "ipnet", "libc", "percent-encoding", @@ -3479,8 +3434,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", "jsonrpsee-types", "parking_lot", @@ -3500,10 +3455,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55e363146da18e50ad2b51a0a7925fc423137a0b1371af8235b1c231a0647328" dependencies = [ "futures-util", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", - "hyper 1.7.0", + "hyper", "hyper-util", "jsonrpsee-core", "jsonrpsee-types", @@ -3526,7 +3481,7 @@ version = "0.24.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08a8e70baf945b6b5752fc8eb38c918a48f1234daf11355e07106d963f860089" dependencies = [ - "http 1.3.1", + "http", "serde", "serde_json", "thiserror 1.0.69", @@ -4839,10 +4794,10 @@ dependencies = [ "base64", "bytes", "futures-core", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", - "hyper 1.7.0", + "hyper", "hyper-rustls", "hyper-util", "js-sys", @@ -5439,7 +5394,7 @@ dependencies = [ "base64", "bytes", "futures", - "http 1.3.1", + "http", "httparse", "log", "rand 0.8.5", @@ -5854,10 +5809,10 @@ dependencies = [ "bytes", "flate2", "h2", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "http-body-util", - "hyper 1.7.0", + "hyper", "hyper-timeout", "hyper-util", "percent-encoding", @@ -5930,8 +5885,8 @@ dependencies = [ "bitflags 2.9.4", "bytes", "futures-util", - "http 1.3.1", - "http-body 1.0.1", + "http", + "http-body", "iri-string", "pin-project-lite", "tower 0.5.2", @@ -6027,7 +5982,7 @@ checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" dependencies = [ "bytes", "data-encoding", - "http 1.3.1", + "http", "httparse", "log", "rand 0.9.2", @@ -6147,7 +6102,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b4531c118335662134346048ddb0e54cc86bd7e81866757873055f0e38f5d2" dependencies = [ "base64", - "http 1.3.1", + "http", "httparse", "log", ] diff --git a/crates/bridges/evm/erigon-bridge/Cargo.toml b/crates/bridges/evm/erigon-bridge/Cargo.toml index cab0c18..064f3ec 100644 --- a/crates/bridges/evm/erigon-bridge/Cargo.toml +++ b/crates/bridges/evm/erigon-bridge/Cargo.toml @@ -35,7 +35,7 @@ async-trait = { workspace = true } futures = "0.3" tokio-stream = "0.1" tower = "0.5" -hyper = { version = "0.14", features = ["server", "tcp", "http1"] } +axum = { workspace = true } hyper-util = "0.1" http = "1.0" diff --git a/crates/bridges/evm/erigon-bridge/src/main.rs b/crates/bridges/evm/erigon-bridge/src/main.rs index c8cec73..3846402 100644 --- a/crates/bridges/evm/erigon-bridge/src/main.rs +++ b/crates/bridges/evm/erigon-bridge/src/main.rs @@ -147,28 +147,34 @@ async fn main() -> Result<()> { // Start Prometheus metrics server let metrics_port = args.metrics_port; tokio::spawn(async move { - use std::convert::Infallible; - - let make_svc = hyper::service::make_service_fn(|_conn| async { - Ok::<_, Infallible>(hyper::service::service_fn(|_req| async { - let metrics = metrics::gather_metrics() - .unwrap_or_else(|e| format!("Error gathering metrics: {}", e)); - let mut response = hyper::Response::new(hyper::Body::from(metrics)); - response.headers_mut().insert( - hyper::header::CONTENT_TYPE, - hyper::header::HeaderValue::from_static("text/plain; version=0.0.4"), - ); - Ok::<_, Infallible>(response) - })) - }); - - let addr = ([0, 0, 0, 0], metrics_port).into(); + use axum::{response::IntoResponse, routing::get, Router}; + + async fn metrics_handler() -> impl IntoResponse { + match metrics::gather_metrics() { + Ok(metrics) => ( + axum::http::StatusCode::OK, + [("content-type", "text/plain; version=0.0.4")], + metrics, + ), + Err(e) => ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + [("content-type", "text/plain")], + format!("Error gathering metrics: {}", e), + ), + } + } + + let app = Router::new().route("/metrics", get(metrics_handler)); + + let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", metrics_port)) + .await + .unwrap(); info!( - "Starting Prometheus metrics server on http://0.0.0.0:{}", - metrics_port + "Prometheus metrics server listening on {}", + listener.local_addr().unwrap() ); - if let Err(e) = hyper::Server::bind(&addr).serve(make_svc).await { + if let Err(e) = axum::serve(listener, app).await { error!("Metrics server error: {}", e); } }); From e153417534a1de5caa65a4ab6fc8742675c63579 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Tue, 4 Nov 2025 12:01:22 -0800 Subject: [PATCH 6/8] Refactor phaser-query metrics server to use axum - Replace hyper service boilerplate with axum Router - Return proper HTTP 500 errors when metrics gathering fails - Make sync::metrics module public for metrics server - Re-export gather_metrics function - Add metrics_port to PhaserConfig (default: 9092) Benefits: - Consistent with erigon-bridge metrics server - Simpler and more maintainable code - Proper error handling with status codes --- crates/phaser-query/src/bin/phaser-query.rs | 51 +++++++++++++++++++++ crates/phaser-query/src/lib.rs | 6 +++ crates/phaser-query/src/sync/metrics.rs | 2 +- crates/phaser-query/src/sync/mod.rs | 2 +- 4 files changed, 59 insertions(+), 2 deletions(-) diff --git a/crates/phaser-query/src/bin/phaser-query.rs b/crates/phaser-query/src/bin/phaser-query.rs index 85cbc33..d1543e8 100644 --- a/crates/phaser-query/src/bin/phaser-query.rs +++ b/crates/phaser-query/src/bin/phaser-query.rs @@ -39,6 +39,10 @@ struct Args { #[clap(long)] disable_sync_admin: bool, + /// Prometheus metrics port (overrides config, default: 9092) + #[clap(long)] + metrics_port: Option, + /// Bridge name to use for streaming (must be defined in config) #[clap(long)] bridge_name: Option, @@ -73,6 +77,10 @@ async fn main() -> Result<()> { info!("Overriding data root: {:?}", data_root); config.data_root = data_root; } + if let Some(metrics_port) = args.metrics_port { + info!("Overriding metrics port: {}", metrics_port); + config.metrics_port = metrics_port; + } info!("RocksDB path: {:?}", config.rocksdb_path); info!("Data root: {:?}", config.data_root); @@ -242,6 +250,49 @@ async fn main() -> Result<()> { handles.push(handle); } + // Start Prometheus metrics server if enabled (default: enabled if metrics_port > 0) + if config.metrics_port > 0 { + info!( + "Starting Prometheus metrics server on port {}", + config.metrics_port + ); + + let metrics_port = config.metrics_port; + let handle = tokio::spawn(async move { + use axum::{response::IntoResponse, routing::get, Router}; + + async fn metrics_handler() -> impl IntoResponse { + match phaser_query::sync::metrics::gather_metrics() { + Ok(metrics) => ( + axum::http::StatusCode::OK, + [("content-type", "text/plain; version=0.0.4")], + metrics, + ), + Err(e) => ( + axum::http::StatusCode::INTERNAL_SERVER_ERROR, + [("content-type", "text/plain")], + format!("Error gathering metrics: {}", e), + ), + } + } + + let app = Router::new().route("/metrics", get(metrics_handler)); + + let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", metrics_port)) + .await + .unwrap(); + info!( + "Prometheus metrics server listening on {}", + listener.local_addr().unwrap() + ); + + if let Err(e) = axum::serve(listener, app).await { + error!("Metrics server error: {}", e); + } + }); + handles.push(handle); + } + // Start sync admin server if enabled (default: enabled if sync_admin_port > 0) let enable_sync_admin = !args.disable_sync_admin && config.sync_admin_port > 0; if enable_sync_admin { diff --git a/crates/phaser-query/src/lib.rs b/crates/phaser-query/src/lib.rs index b99b861..19c99b9 100644 --- a/crates/phaser-query/src/lib.rs +++ b/crates/phaser-query/src/lib.rs @@ -206,6 +206,8 @@ pub struct PhaserConfig { pub sql_port: u16, #[serde(default = "default_sync_admin_port")] pub sync_admin_port: u16, // Port for sync admin gRPC (9090) + #[serde(default = "default_metrics_port")] + pub metrics_port: u16, // Port for Prometheus metrics HTTP server (9091) #[serde(default = "default_sync_parallelism")] pub sync_parallelism: u32, // Number of parallel workers for historical sync (4) #[serde(default)] @@ -234,6 +236,10 @@ fn default_sync_admin_port() -> u16 { 9090 } +fn default_metrics_port() -> u16 { + 9092 +} + fn default_sync_parallelism() -> u32 { 4 } diff --git a/crates/phaser-query/src/sync/metrics.rs b/crates/phaser-query/src/sync/metrics.rs index 28099e9..4d49fdd 100644 --- a/crates/phaser-query/src/sync/metrics.rs +++ b/crates/phaser-query/src/sync/metrics.rs @@ -2,4 +2,4 @@ //! //! Re-exports QueryMetrics from phaser-metrics crate -pub use phaser_metrics::QueryMetrics as SyncMetrics; +pub use phaser_metrics::{gather_metrics, QueryMetrics as SyncMetrics}; diff --git a/crates/phaser-query/src/sync/mod.rs b/crates/phaser-query/src/sync/mod.rs index c04f32f..3baa65b 100644 --- a/crates/phaser-query/src/sync/mod.rs +++ b/crates/phaser-query/src/sync/mod.rs @@ -1,6 +1,6 @@ mod data_scanner; mod error; -mod metrics; +pub mod metrics; mod service; mod worker; From 3eeb976060f4c44ecd063ea17b7b84480bb0d87a Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Tue, 4 Nov 2025 12:01:22 -0800 Subject: [PATCH 7/8] Add comprehensive metrics instrumentation to segment worker Tracks worker lifecycle across all three phases: Active workers by phase: - active_workers_inc/dec for blocks, transactions, logs phases - Proper increment on phase start, decrement on phase end - Cleanup on error paths to prevent metric leaks Phase durations: - segment_duration for each phase (blocks, transactions, logs) - Measured from phase start to completion Stream lifecycle: - grpc_stream_inc/dec for blocks, transactions, receipts streams - Proper cleanup on stream errors Items processed: - Track blocks, transactions, receipts processed per segment This provides complete observability of: - Current worker distribution across phases - Phase processing times for performance analysis - Active gRPC stream counts - Data throughput per segment --- .../evm/erigon-bridge/src/segment_worker.rs | 98 ++++++++++++++++--- 1 file changed, 84 insertions(+), 14 deletions(-) diff --git a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs index fed3180..8771aee 100644 --- a/crates/bridges/evm/erigon-bridge/src/segment_worker.rs +++ b/crates/bridges/evm/erigon-bridge/src/segment_worker.rs @@ -7,7 +7,7 @@ use crate::blockdata_client::BlockDataClient; use crate::blockdata_converter::BlockDataConverter; use crate::error::ErigonBridgeError; -use crate::metrics::{SegmentMetrics, WorkerStage}; +use crate::metrics::{BridgeMetrics, SegmentMetrics, WorkerStage}; use crate::proto::custom::TransactionData; use alloy_consensus::Header; use alloy_primitives::Bytes; @@ -147,6 +147,7 @@ impl SegmentWorker { let segment_id = self.segment_start / 500_000; // Calculate segment ID let total_blocks = self.segment_end - self.segment_start + 1; let phase_start = Instant::now(); + let blocks_phase_start = Instant::now(); let mut yielded_batches = 0u64; info!( @@ -161,6 +162,7 @@ impl SegmentWorker { // Set initial worker stage self.metrics .set_worker_stage(worker_id, segment_id, WorkerStage::Blocks); + self.metrics.active_workers_inc("blocks"); // Clone the client at the start (shares underlying HTTP/2 connection) let mut client = self.blockdata_client.clone(); @@ -172,7 +174,7 @@ impl SegmentWorker { let chunk_end = (current_block + self.config.validation_batch_size as u64 - 1).min(self.segment_end); // Fetch headers only for this chunk - let headers = match Self::fetch_headers(current_block, chunk_end, &mut client, self.config.validation_batch_size as u32).await { + let headers = match Self::fetch_headers(current_block, chunk_end, &mut client, self.config.validation_batch_size as u32, &self.metrics).await { Ok(h) => { if h.is_empty() { warn!("Worker {} segment {}: Received EMPTY header response for blocks {}-{}", @@ -183,6 +185,7 @@ impl SegmentWorker { Err(e) => { error!("Worker {} segment {}: Failed to fetch headers for blocks {}-{}: {}", worker_id, segment_id, current_block, chunk_end, e); + self.metrics.active_workers_dec("blocks"); self.metrics.segment_attempt(false); yield Err(e); return; @@ -213,8 +216,14 @@ impl SegmentWorker { // Transition to transactions phase on first chunk only if first_chunk { + // Record blocks phase duration + let blocks_duration = blocks_phase_start.elapsed().as_secs_f64(); + self.metrics.segment_duration("blocks", blocks_duration); + + self.metrics.active_workers_dec("blocks"); self.metrics .set_worker_stage(worker_id, segment_id, WorkerStage::Transactions); + self.metrics.active_workers_inc("transactions"); first_chunk = false; } @@ -254,6 +263,7 @@ impl SegmentWorker { error!("Worker {} segment {}: Transaction stream error after {} batches: {}", worker_id, segment_id, batch_count, e); self.metrics.grpc_stream_dec("transactions"); + self.metrics.active_workers_dec("transactions"); yield Err(ErigonBridgeError::ErigonClient(Box::new(e))); return; } @@ -326,6 +336,7 @@ impl SegmentWorker { Ok(batch) => batch, Err(e) => { // Clean up metrics before error return + self.metrics.active_workers_dec("transactions"); self.metrics.segment_attempt(false); yield Err(e); return; @@ -354,6 +365,7 @@ impl SegmentWorker { segment_id, total_blocks ); + self.metrics.active_workers_dec("transactions"); self.metrics .set_worker_stage(worker_id, segment_id, WorkerStage::Idle); self.metrics.segment_attempt(false); @@ -375,6 +387,7 @@ impl SegmentWorker { self.metrics .segment_duration("transactions", duration); + self.metrics.active_workers_dec("transactions"); self.metrics .set_worker_stage(worker_id, segment_id, WorkerStage::Idle); self.metrics.segment_attempt(true); @@ -386,16 +399,27 @@ impl SegmentWorker { self, ) -> impl futures::Stream> + Send { async_stream::stream! { + let worker_id = self.worker_id; + let segment_id = self.segment_start / 500_000; // Calculate segment ID let total_blocks = self.segment_end - self.segment_start + 1; + let phase_start = Instant::now(); let mut yielded_batches = 0u64; + let mut total_receipts_processed = 0u64; info!( - "Segment worker processing logs for blocks {} to {} ({} blocks)", + "Worker {} segment {} processing logs for blocks {} to {} ({} blocks)", + worker_id, + segment_id, self.segment_start, self.segment_end, total_blocks ); + // Set initial worker stage + self.metrics + .set_worker_stage(worker_id, segment_id, WorkerStage::Logs); + self.metrics.active_workers_inc("logs"); + // Clone the client at the start (shares underlying HTTP/2 connection) let mut client = self.blockdata_client.clone(); @@ -406,7 +430,7 @@ impl SegmentWorker { // Fetch a chunk of headers (batch_size blocks at a time) let chunk_end = (current_block + self.config.validation_batch_size as u64 - 1).min(self.segment_end); - let chunk_headers = match Self::fetch_headers(current_block, chunk_end, &mut client, self.config.validation_batch_size as u32).await { + let chunk_headers = match Self::fetch_headers(current_block, chunk_end, &mut client, self.config.validation_batch_size as u32, &self.metrics).await { Ok(h) => { if h.is_empty() { warn!("Process logs: Received EMPTY header response for blocks {}-{}", @@ -415,8 +439,9 @@ impl SegmentWorker { h }, Err(e) => { - error!("Process logs: Failed to fetch headers for blocks {}-{}: {}", - current_block, chunk_end, e); + error!("Worker {} segment {}: Failed to fetch headers for blocks {}-{}: {}", + worker_id, segment_id, current_block, chunk_end, e); + self.metrics.active_workers_dec("logs"); yield Err(e); return; } @@ -446,6 +471,7 @@ impl SegmentWorker { .map(|block_chunk| { let mut client_clone = client.clone(); let block_chunk: Vec<_> = block_chunk.to_vec(); + let metrics = self.metrics.clone(); async move { let first_block = block_chunk[0].0; let last_block = block_chunk[block_chunk.len() - 1].0; @@ -457,7 +483,7 @@ impl SegmentWorker { last_block ); - Self::collect_receipts_for_range(first_block, last_block, block_chunk, &mut client_clone).await + Self::collect_receipts_for_range(first_block, last_block, block_chunk, &mut client_clone, &metrics).await } }) .collect(); @@ -471,9 +497,14 @@ impl SegmentWorker { match result { Ok(batch_data) => { // Each result is a Vec of (block_num, receipts, header) + // Count receipts for metrics + for (_, receipts, _) in &batch_data { + total_receipts_processed += receipts.len() as u64; + } current_batch.extend(batch_data); } Err(e) => { + self.metrics.active_workers_dec("logs"); yield Err(e); return; } @@ -499,6 +530,7 @@ impl SegmentWorker { .await { Ok(b) => b, Err(e) => { + self.metrics.active_workers_dec("logs"); yield Err(e); return; } @@ -512,19 +544,30 @@ impl SegmentWorker { } } + // Update progress after processing chunk + let blocks_processed = chunk_end - self.segment_start + 1; + let progress_pct = (blocks_processed as f64 / total_blocks as f64) * 100.0; + self.metrics + .set_worker_progress(worker_id, segment_id, "logs", progress_pct); + // Move to next chunk current_block = chunk_end + 1; } + // Record total receipts processed + self.metrics + .items_processed_inc(worker_id, segment_id, "receipts", total_receipts_processed); + // Detect empty streams and ERROR OUT if yielded_batches == 0 { error!( - "Segment {}-{}: EMPTY STREAM - Processed {} blocks but yielded ZERO RecordBatches! \ + "Worker {} segment {}: EMPTY STREAM - Processed {} blocks but yielded ZERO RecordBatches! \ This indicates the client disconnected or dropped the stream before receiving data.", - self.segment_start, - self.segment_end, + worker_id, + segment_id, total_blocks ); + self.metrics.active_workers_dec("logs"); yield Err(ErigonBridgeError::StreamProtocol(StreamError::ZeroBatchesConsumed { start: self.segment_start, end: self.segment_end, @@ -533,12 +576,22 @@ impl SegmentWorker { return; } + // Record phase duration and metrics + let duration = phase_start.elapsed().as_secs_f64(); info!( - "Segment {}-{}: Completed log processing, yielded {} RecordBatches", - self.segment_start, - self.segment_end, - yielded_batches + "Worker {} segment {}: Completed log processing, yielded {} RecordBatches, processed {} receipts in {:.2}s", + worker_id, + segment_id, + yielded_batches, + total_receipts_processed, + duration ); + + self.metrics + .segment_duration("logs", duration); + self.metrics.active_workers_dec("logs"); + self.metrics + .set_worker_stage(worker_id, segment_id, WorkerStage::Idle); } } @@ -548,6 +601,7 @@ impl SegmentWorker { segment_end: u64, client: &mut BlockDataClient, batch_size: u32, + metrics: &BridgeMetrics, ) -> Result, ErigonBridgeError> { let mut block_stream = client .stream_blocks(segment_start, segment_end, batch_size) @@ -559,6 +613,10 @@ impl SegmentWorker { ); e })?; + + // Track active gRPC stream + metrics.grpc_stream_inc("blocks"); + let mut headers = HashMap::new(); let mut batch_count = 0u64; @@ -595,11 +653,15 @@ impl SegmentWorker { "Blocks {}-{}: Header stream error after {} batches: {}", segment_start, segment_end, batch_count, e ); + metrics.grpc_stream_dec("blocks"); return Err(ErigonBridgeError::ErigonClient(Box::new(e))); } } } + // Stream completed successfully + metrics.grpc_stream_dec("blocks"); + if headers.is_empty() { warn!( "Blocks {}-{}: Header stream completed with ZERO headers after {} batches", @@ -690,6 +752,7 @@ impl SegmentWorker { to_block: u64, block_headers: Vec<(u64, (Header, i64))>, client: &mut BlockDataClient, + metrics: &BridgeMetrics, ) -> Result, Header)>, ErigonBridgeError> { let call_start = std::time::Instant::now(); @@ -729,6 +792,9 @@ impl SegmentWorker { // Cancel the monitor task since the call succeeded monitor_handle.abort(); + // Track active gRPC stream + metrics.grpc_stream_inc("receipts"); + // Collect all receipts by block number let mut receipts_by_block: HashMap> = HashMap::new(); @@ -753,11 +819,15 @@ impl SegmentWorker { "Blocks {}-{}: Receipt stream error after {} batches: {}", from_block, to_block, batch_count, e ); + metrics.grpc_stream_dec("receipts"); return Err(ErigonBridgeError::ErigonClient(Box::new(e))); } } } + // Stream completed successfully + metrics.grpc_stream_dec("receipts"); + let elapsed = call_start.elapsed(); if elapsed > std::time::Duration::from_secs(3) { warn!( From 751ce8ffcad919ef8684c1cc0762562c564b2022 Mon Sep 17 00:00:00 2001 From: Daniel Werner Date: Tue, 4 Nov 2025 12:01:22 -0800 Subject: [PATCH 8/8] Add check-quality.sh and .claude/ to gitignore - check-quality.sh is a local development script - .claude/ contains local claude configuration --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 1bd557f..30e6548 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ commit-changes.sh cargo-build.log heaptrack.*.zst context-snapshot.yaml +check-quality.sh +.claude/