From 03d748fa6b468a95382cc493ec5f2c730553f717 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 14 Apr 2026 13:59:31 -0300 Subject: [PATCH 1/3] Add Devnet-4 metrics: block production, gossip sizes, sync status, and bucket update (leanMetrics PR #29) - Block production: building time, payload aggregation time, aggregated payload count, success/failure counters - Gossip message sizes: block, attestation, and aggregation compressed bytes - Sync status gauge with idle/syncing/synced labels (wiring deferred to PR #246) - Update committee_signatures_aggregation buckets from [0.005..1s] to [0.05..4s] --- crates/blockchain/src/lib.rs | 12 +++- crates/blockchain/src/metrics.rs | 90 ++++++++++++++++++++++++- crates/blockchain/src/store.rs | 4 ++ crates/net/p2p/src/gossipsub/handler.rs | 5 +- crates/net/p2p/src/metrics.rs | 48 +++++++++++++ 5 files changed, 155 insertions(+), 4 deletions(-) diff --git a/crates/blockchain/src/lib.rs b/crates/blockchain/src/lib.rs index 9f70a805..75e330f8 100644 --- a/crates/blockchain/src/lib.rs +++ b/crates/blockchain/src/lib.rs @@ -222,11 +222,16 @@ impl BlockChainServer { /// Build and publish a block for the given slot and validator. fn propose_block(&mut self, slot: u64, validator_id: u64) { info!(%slot, %validator_id, "We are the proposer for this slot"); + let _block_timing = metrics::time_block_building(); // Build the block with attestation signatures let Ok((block, attestation_signatures, post_checkpoints)) = - store::produce_block_with_signatures(&mut self.store, slot, validator_id) - .inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block")) + store::produce_block_with_signatures(&mut self.store, slot, validator_id).inspect_err( + |err| { + metrics::inc_block_building_failures(); + error!(%slot, %validator_id, %err, "Failed to build block"); + }, + ) else { return; }; @@ -278,10 +283,13 @@ impl BlockChainServer { // Process the block locally before publishing if let Err(err) = self.process_block(signed_block.clone()) { + metrics::inc_block_building_failures(); error!(%slot, %validator_id, %err, "Failed to process built block"); return; }; + metrics::inc_block_building_success(); + // Publish to gossip network if let Some(ref p2p) = self.p2p { let _ = p2p diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 20092cd7..c88013f7 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -261,11 +261,59 @@ static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_block_aggregated_payloads", + "Number of aggregated_payloads in a block", + vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0] + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_block_building_payload_aggregation_time_seconds", + "Time taken to build aggregated_payloads during block building", + vec![0.1, 0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0] + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_TIME_SECONDS: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_block_building_time_seconds", + "Time taken to build a block", + vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0] + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_SUCCESS_TOTAL: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_counter!( + "lean_block_building_success_total", + "Successful block builds" + ) + .unwrap() + }); + +static LEAN_BLOCK_BUILDING_FAILURES_TOTAL: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_int_counter!("lean_block_building_failures_total", "Failed block builds").unwrap() + }); + +static LEAN_NODE_SYNC_STATUS: std::sync::LazyLock = std::sync::LazyLock::new(|| { + register_int_gauge_vec!("lean_node_sync_status", "Node sync status", &["status"]).unwrap() +}); + static LEAN_FORK_CHOICE_REORG_DEPTH: std::sync::LazyLock = std::sync::LazyLock::new(|| { register_histogram!( @@ -314,6 +362,12 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_BUILDING_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_SUCCESS_TOTAL); + std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_FAILURES_TOTAL); + std::sync::LazyLock::force(&LEAN_NODE_SYNC_STATUS); std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH); } @@ -476,3 +530,37 @@ pub fn set_attestation_committee_count(count: u64) { pub fn observe_fork_choice_reorg_depth(depth: u64) { LEAN_FORK_CHOICE_REORG_DEPTH.observe(depth as f64); } + +/// Observe the number of aggregated payloads in a produced block. +pub fn observe_block_aggregated_payloads(count: usize) { + LEAN_BLOCK_AGGREGATED_PAYLOADS.observe(count as f64); +} + +/// Start timing payload aggregation during block building. Records duration when the guard is dropped. +pub fn time_block_building_payload_aggregation() -> TimingGuard { + TimingGuard::new(&LEAN_BLOCK_BUILDING_PAYLOAD_AGGREGATION_TIME_SECONDS) +} + +/// Start timing block building. Records duration when the guard is dropped. +pub fn time_block_building() -> TimingGuard { + TimingGuard::new(&LEAN_BLOCK_BUILDING_TIME_SECONDS) +} + +/// Increment the successful block builds counter. +pub fn inc_block_building_success() { + LEAN_BLOCK_BUILDING_SUCCESS_TOTAL.inc(); +} + +/// Increment the failed block builds counter. +pub fn inc_block_building_failures() { + LEAN_BLOCK_BUILDING_FAILURES_TOTAL.inc(); +} + +/// Set the node sync status. Sets the given status label to 1 and all others to 0. +pub fn set_sync_status(status: &str) { + for label in &["idle", "syncing", "synced"] { + LEAN_NODE_SYNC_STATUS + .with_label_values(&[label]) + .set(i64::from(*label == status)); + } +} diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index eb8c4fec..d44feed9 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -814,6 +814,7 @@ pub fn produce_block_with_signatures( let known_block_roots = store.get_block_roots(); + let _payload_timing = metrics::time_block_building_payload_aggregation(); let (block, signatures, post_checkpoints) = build_block( &head_state, slot, @@ -822,6 +823,9 @@ pub fn produce_block_with_signatures( &known_block_roots, &aggregated_payloads, )?; + drop(_payload_timing); + + metrics::observe_block_aggregated_payloads(signatures.len()); Ok((block, signatures, post_checkpoints)) } diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 7ab52430..516e0ab3 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -15,7 +15,7 @@ use super::{ attestation_subnet_topic, }, }; -use crate::P2PServer; +use crate::{P2PServer, metrics}; pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let Event::Message { @@ -29,6 +29,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let topic_kind = message.topic.as_str().split("/").nth(3); match topic_kind { Some(BLOCK_TOPIC_KIND) => { + metrics::observe_gossip_block_size(message.data.len()); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) else { @@ -60,6 +61,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } } Some(AGGREGATION_TOPIC_KIND) => { + metrics::observe_gossip_aggregation_size(message.data.len()); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) else { @@ -89,6 +91,7 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } } Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { + metrics::observe_gossip_attestation_size(message.data.len()); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) else { diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index 14059d1d..4254628f 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -95,6 +95,54 @@ pub fn notify_peer_connected(peer_id: &Option, direction: &str, result: } } +static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { + ethlambda_metrics::register_histogram!( + "lean_gossip_block_size_bytes", + "Bytes size of a gossip block message", + vec![ + 10000.0, 50000.0, 100000.0, 250000.0, 500000.0, 1000000.0, 2000000.0, 5000000.0 + ] + ) + .unwrap() +}); + +static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock = + LazyLock::new(|| { + ethlambda_metrics::register_histogram!( + "lean_gossip_attestation_size_bytes", + "Bytes size of a gossip attestation message", + vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0] + ) + .unwrap() + }); + +static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = + LazyLock::new(|| { + ethlambda_metrics::register_histogram!( + "lean_gossip_aggregation_size_bytes", + "Bytes size of a gossip aggregated attestation message", + vec![ + 1024.0, 4096.0, 16384.0, 65536.0, 131072.0, 262144.0, 524288.0, 1048576.0 + ] + ) + .unwrap() + }); + +/// Observe the compressed size of a gossip block message. +pub fn observe_gossip_block_size(bytes: usize) { + LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64); +} + +/// Observe the compressed size of a gossip attestation message. +pub fn observe_gossip_attestation_size(bytes: usize) { + LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64); +} + +/// Observe the compressed size of a gossip aggregated attestation message. +pub fn observe_gossip_aggregation_size(bytes: usize) { + LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64); +} + /// Notify that a peer disconnected. /// /// Decrements the connected peer count and increments the disconnection event counter. From 9f30c7bb6080b5bdf3b3831339e1d42e61a0ad49 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 14 Apr 2026 15:54:19 -0300 Subject: [PATCH 2/3] Move payload aggregation timer inside build_block and remove unused set_sync_status Address PR #279 review comments: the payload aggregation timing guard now only starts when there are payloads to aggregate, and the unused set_sync_status function and LEAN_NODE_SYNC_STATUS metric are removed. --- crates/blockchain/src/metrics.rs | 13 ------------- crates/blockchain/src/store.rs | 4 ++-- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index c88013f7..c56c3092 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -310,10 +310,6 @@ static LEAN_BLOCK_BUILDING_FAILURES_TOTAL: std::sync::LazyLock = register_int_counter!("lean_block_building_failures_total", "Failed block builds").unwrap() }); -static LEAN_NODE_SYNC_STATUS: std::sync::LazyLock = std::sync::LazyLock::new(|| { - register_int_gauge_vec!("lean_node_sync_status", "Node sync status", &["status"]).unwrap() -}); - static LEAN_FORK_CHOICE_REORG_DEPTH: std::sync::LazyLock = std::sync::LazyLock::new(|| { register_histogram!( @@ -367,7 +363,6 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_SUCCESS_TOTAL); std::sync::LazyLock::force(&LEAN_BLOCK_BUILDING_FAILURES_TOTAL); - std::sync::LazyLock::force(&LEAN_NODE_SYNC_STATUS); std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH); } @@ -556,11 +551,3 @@ pub fn inc_block_building_failures() { LEAN_BLOCK_BUILDING_FAILURES_TOTAL.inc(); } -/// Set the node sync status. Sets the given status label to 1 and all others to 0. -pub fn set_sync_status(status: &str) { - for label in &["idle", "syncing", "synced"] { - LEAN_NODE_SYNC_STATUS - .with_label_values(&[label]) - .set(i64::from(*label == status)); - } -} diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index d44feed9..4f7add8c 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -814,7 +814,6 @@ pub fn produce_block_with_signatures( let known_block_roots = store.get_block_roots(); - let _payload_timing = metrics::time_block_building_payload_aggregation(); let (block, signatures, post_checkpoints) = build_block( &head_state, slot, @@ -823,7 +822,6 @@ pub fn produce_block_with_signatures( &known_block_roots, &aggregated_payloads, )?; - drop(_payload_timing); metrics::observe_block_aggregated_payloads(signatures.len()); @@ -1041,6 +1039,8 @@ fn build_block( let mut accumulated_proof_bytes: usize = 0; if !aggregated_payloads.is_empty() { + let _payload_timing = metrics::time_block_building_payload_aggregation(); + // Genesis edge case: when building on genesis (slot 0), // process_block_header will set latest_justified.root = parent_root. // Derive this upfront so attestation filtering matches. From 357688578d59e8676b3445660e383041e6fd8643 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 14 Apr 2026 15:59:32 -0300 Subject: [PATCH 3/3] fmt --- crates/blockchain/src/metrics.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index c56c3092..71a886ee 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -550,4 +550,3 @@ pub fn inc_block_building_success() { pub fn inc_block_building_failures() { LEAN_BLOCK_BUILDING_FAILURES_TOTAL.inc(); } -