diff --git a/Cargo.lock b/Cargo.lock index cb071c13..9b3379d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5615,6 +5615,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "vise", ] [[package]] diff --git a/crates/consensus/Cargo.toml b/crates/consensus/Cargo.toml index a6eb61a2..d2fc4b0e 100644 --- a/crates/consensus/Cargo.toml +++ b/crates/consensus/Cargo.toml @@ -26,6 +26,7 @@ thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true tracing.workspace = true +vise.workspace = true [dev-dependencies] anyhow.workspace = true diff --git a/crates/consensus/src/instance.rs b/crates/consensus/src/instance.rs index f5f3a9f1..04ecb8a5 100644 --- a/crates/consensus/src/instance.rs +++ b/crates/consensus/src/instance.rs @@ -29,6 +29,7 @@ use std::{ Mutex, PoisonError, atomic::{AtomicBool, Ordering}, }, + time::Instant, }; use prost_types::Any; @@ -105,6 +106,10 @@ pub struct InstanceIo { /// Publishes the runner completion result. pub(crate) err_tx: mpsc::Sender, err_rx: ReceiverSlot, + + /// Publishes the local decision timestamp. + pub(crate) decided_at_tx: mpsc::Sender, + decided_at_rx: ReceiverSlot, } impl InstanceIo { @@ -115,6 +120,7 @@ impl InstanceIo { let (value_tx, value_rx) = mpsc::channel(1); let (verify_tx, verify_rx) = mpsc::channel(1); let (err_tx, err_rx) = mpsc::channel(1); + let (decided_at_tx, decided_at_rx) = mpsc::channel(1); Self { participated: AtomicBool::new(false), @@ -130,6 +136,8 @@ impl InstanceIo { verify_rx: Mutex::new(Some(verify_rx)), err_tx, err_rx: Mutex::new(Some(err_rx)), + decided_at_tx, + decided_at_rx: Mutex::new(Some(decided_at_rx)), } } @@ -191,6 +199,11 @@ impl InstanceIo { pub fn take_err_rx(&self) -> Result> { take_receiver(&self.err_rx, "err") } + + /// Transfers decided timestamp ownership to the proposer. + pub fn take_decided_at_rx(&self) -> Result> { + take_receiver(&self.decided_at_rx, "decided_at") + } } impl Default for InstanceIo { @@ -290,6 +303,12 @@ mod tests { io.err_tx.try_send(Err(Box::new(TestError))), Err(TrySendError::Full(Err(_))) )); + + assert!(io.decided_at_tx.try_send(Instant::now()).is_ok()); + assert!(matches!( + io.decided_at_tx.try_send(Instant::now()), + Err(TrySendError::Full(_)) + )); } #[test] @@ -319,6 +338,9 @@ mod tests { assert!(io.take_err_rx().is_ok()); assert_receiver_already_taken(io.take_err_rx(), "err"); + + assert!(io.take_decided_at_rx().is_ok()); + assert_receiver_already_taken(io.take_decided_at_rx(), "decided_at"); } #[test] diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index 75b5a06e..491f076a 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -9,6 +9,8 @@ pub mod protocols; /// Consensus instance I/O channels. pub mod instance; +/// Consensus metrics. +pub mod metrics; /// QBFT consensus wrapper. pub mod qbft; diff --git a/crates/consensus/src/metrics.rs b/crates/consensus/src/metrics.rs new file mode 100644 index 00000000..c97789bd --- /dev/null +++ b/crates/consensus/src/metrics.rs @@ -0,0 +1,208 @@ +//! Prometheus metrics for consensus. + +use pluto_core::types::Duty; +use vise::{Counter, Gauge, Histogram, LabeledFamily, Metrics}; + +use crate::{protocols::QBFT_V2_PROTOCOL_ID, timer::TimerType}; + +/// Histogram buckets for consensus duration metrics. +pub const CONSENSUS_DURATION_BUCKETS: [f64; 17] = [ + 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 5.0, +]; + +type ProtocolDutyTimerLabels = (String, String, String); +type ProtocolDutyLabels = (String, String); + +/// Metrics for consensus protocols. +#[derive(Debug, Metrics)] +#[metrics(prefix = "core_consensus")] +pub struct ConsensusMetrics { + /// Number of decided rounds by protocol, duty, and timer. + #[metrics(labels = ["protocol", "duty", "timer"])] + pub decided_rounds: LabeledFamily, 3>, + + /// Index of the decided leader by protocol and duty. + #[metrics(labels = ["protocol", "duty"])] + pub decided_leader_index: LabeledFamily, 2>, + + /// Duration of the consensus process by protocol, duty, and timer. + #[metrics(buckets = &CONSENSUS_DURATION_BUCKETS, labels = ["protocol", "duty", "timer"])] + pub duration_seconds: LabeledFamily, + + /// Total count of consensus timeouts by protocol, duty, and timer. + #[metrics(labels = ["protocol", "duty", "timer"])] + pub timeout_total: LabeledFamily, + + /// Total count of consensus errors by protocol. + #[metrics(labels = ["protocol"])] + pub error_total: LabeledFamily, +} + +impl ConsensusMetrics { + /// Sets the number of decided rounds for a duty and timer. + pub fn set_decided_rounds(&self, protocol: &str, duty: &str, timer: &str, rounds: i64) { + self.decided_rounds[&labels(protocol, duty, timer)].set(rounds); + } + + /// Sets the decided leader index for a duty. + pub fn set_decided_leader_index(&self, protocol: &str, duty: &str, leader_index: i64) { + self.decided_leader_index[&(protocol.to_owned(), duty.to_owned())].set(leader_index); + } + + /// Observes the consensus duration for a duty and timer. + pub fn observe_consensus_duration( + &self, + protocol: &str, + duty: &str, + timer: &str, + duration_seconds: f64, + ) { + self.duration_seconds[&labels(protocol, duty, timer)].observe(duration_seconds); + } + + /// Increments the consensus timeout counter for a duty and timer. + pub fn inc_consensus_timeout(&self, protocol: &str, duty: &str, timer: &str) { + self.timeout_total[&labels(protocol, duty, timer)].inc(); + } + + /// Increments the consensus error counter. + pub fn inc_consensus_error(&self, protocol: &str) { + self.error_total[protocol].inc(); + } +} + +/// Global metrics for consensus. +#[vise::register] +pub static CONSENSUS_METRICS: vise::Global = vise::Global::new(); + +/// Records the metrics emitted when QBFT decides a duty. +pub(crate) fn record_qbft_decision( + duty: &Duty, + timer_type: TimerType, + round: i64, + leader_index: i64, +) { + let duty = duty.duty_type.to_string(); + let timer = timer_type.as_str(); + + CONSENSUS_METRICS.set_decided_leader_index(QBFT_V2_PROTOCOL_ID, &duty, leader_index); + CONSENSUS_METRICS.set_decided_rounds(QBFT_V2_PROTOCOL_ID, &duty, timer, round); +} + +/// Records QBFT consensus duration after a local proposal decides. +pub(crate) fn observe_qbft_consensus_duration( + duty: &Duty, + timer_type: TimerType, + duration_seconds: f64, +) { + let duty = duty.duty_type.to_string(); + let timer = timer_type.as_str(); + CONSENSUS_METRICS.observe_consensus_duration( + QBFT_V2_PROTOCOL_ID, + &duty, + timer, + duration_seconds, + ); +} + +/// Records a QBFT consensus timeout. +pub(crate) fn inc_qbft_consensus_timeout(duty: &Duty, timer_type: TimerType) { + let duty = duty.duty_type.to_string(); + CONSENSUS_METRICS.inc_consensus_timeout(QBFT_V2_PROTOCOL_ID, &duty, timer_type.as_str()); +} + +/// Records a QBFT core consensus error. +pub(crate) fn inc_qbft_consensus_error() { + CONSENSUS_METRICS.inc_consensus_error(QBFT_V2_PROTOCOL_ID); +} + +fn labels(protocol: &str, duty: &str, timer: &str) -> ProtocolDutyTimerLabels { + (protocol.to_owned(), duty.to_owned(), timer.to_owned()) +} + +#[cfg(test)] +mod tests { + use vise::{Format, Registry}; + + use super::*; + + #[test] + fn decided_rounds_records_metric_name_labels_and_value() { + let metrics = ConsensusMetrics::default(); + metrics.set_decided_rounds("test", "duty", "timer", 1); + + let output = encode(&metrics); + + assert!(output.contains( + r#"core_consensus_decided_rounds{protocol="test",duty="duty",timer="timer"} 1"# + )); + } + + #[test] + fn decided_leader_index_records_metric_name_labels_and_value() { + let metrics = ConsensusMetrics::default(); + metrics.set_decided_leader_index("test", "duty", 123); + + let output = encode(&metrics); + + assert!( + output.contains( + r#"core_consensus_decided_leader_index{protocol="test",duty="duty"} 123"# + ) + ); + } + + #[test] + fn duration_records_metric_name_labels_and_exact_buckets() { + let metrics = ConsensusMetrics::default(); + metrics.observe_consensus_duration("test", "duty", "timer", 1.0); + + let output = encode(&metrics); + + assert!(output.contains( + r#"core_consensus_duration_seconds_count{protocol="test",duty="duty",timer="timer"} 1"# + )); + for bucket in [ + "0.01", "0.025", "0.05", "0.1", "0.25", "0.5", "0.75", "1.0", "1.25", "1.5", "1.75", + "2.0", "2.25", "2.5", "2.75", "3.0", "5.0", + ] { + assert!( + output.contains(&format!( + r#"core_consensus_duration_seconds_bucket{{le="{bucket}",protocol="test",duty="duty",timer="timer"}}"# + )), + "missing bucket {bucket}: {output}" + ); + } + } + + #[test] + fn timeout_records_metric_name_labels_and_value() { + let metrics = ConsensusMetrics::default(); + metrics.inc_consensus_timeout("test", "duty", "timer"); + + let output = encode(&metrics); + + assert!(output.contains( + r#"core_consensus_timeout_total{protocol="test",duty="duty",timer="timer"} 1"# + )); + } + + #[test] + fn error_records_metric_name_labels_and_value() { + let metrics = ConsensusMetrics::default(); + metrics.inc_consensus_error("test"); + + let output = encode(&metrics); + + assert!(output.contains(r#"core_consensus_error_total{protocol="test"} 1"#)); + } + + fn encode(metrics: &ConsensusMetrics) -> String { + let mut registry = Registry::empty(); + registry.register_metrics(metrics); + + let mut output = String::new(); + registry.encode(&mut output, Format::Prometheus).unwrap(); + output + } +} diff --git a/crates/consensus/src/qbft/runner.rs b/crates/consensus/src/qbft/runner.rs index 48252b07..f59462d1 100644 --- a/crates/consensus/src/qbft/runner.rs +++ b/crates/consensus/src/qbft/runner.rs @@ -1,8 +1,11 @@ //! QBFT consensus runner bridge. -use std::sync::{ - Arc, Mutex, PoisonError, - atomic::{AtomicBool, Ordering}, +use std::{ + sync::{ + Arc, Mutex, PoisonError, + atomic::{AtomicBool, Ordering}, + }, + time::Instant, }; use cancellation::CancellationTokenSource; @@ -16,7 +19,11 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; -use crate::instance::{self, InstanceIo, RunnerError, RunnerResult}; +use crate::{ + instance::{self, InstanceIo, RunnerError, RunnerResult}, + metrics, + timer::TimerType, +}; use pluto_core::{ corepb::v1::{core as pbcore, priority as pbpriority}, deadline::AddOutcome, @@ -143,11 +150,19 @@ where return Err(Error::InputChannelFull); } - if !inst.maybe_start() { - return wait_instance_result(&inst).await; - } + let proposed_at = Instant::now(); + let mut decided_at_rx = inst.take_decided_at_rx()?; + let timer_type = consensus.round_timer(duty.clone()).timer_type(); - run_instance(consensus, ct, duty, inst).await + let result = if !inst.maybe_start() { + wait_instance_result(&inst).await + } else { + run_instance(consensus, ct, duty.clone(), inst).await + }; + + observe_qbft_consensus_duration(&mut decided_at_rx, &duty, timer_type, proposed_at); + + result } /// Starts participating in a duty without a local proposal value. @@ -237,6 +252,8 @@ async fn run_instance_inner( inner_recv_tx, Sniffer::new(i64::try_from(nodes).expect("node count fits i64"), peer_idx), )); + let round_timer = consensus.round_timer(duty.clone()); + let timer_type = round_timer.timer_type(); let mut tasks = JoinSet::new(); tasks.spawn(bridge_mpsc_to_crossbeam( @@ -279,9 +296,22 @@ async fn run_instance_inner( let decide_callback: DecideCallback = { let decided = Arc::clone(&decided); + let duty = duty.clone(); let instance_ct = instance_ct.clone(); let core_cts = Arc::clone(&core_cts); - Arc::new(move |_qcommit| { + let decided_at_tx = inst.decided_at_tx.clone(); + let nodes = i64::try_from(nodes).expect("node count fits i64"); + Arc::new(move |qcommit| { + if let Some(commit) = qcommit.first() { + let round = commit.round(); + let _ = decided_at_tx.try_send(Instant::now()); + metrics::record_qbft_decision( + &duty, + timer_type, + round, + definition::leader(&duty, round, nodes), + ); + } decided.store(true, Ordering::Relaxed); instance_ct.cancel(); core_cts.cancel(); @@ -291,7 +321,7 @@ async fn run_instance_inner( let def = definition::new_definition(DefinitionConfig { nodes, subscribers: consensus.subscribers(), - round_timer: consensus.round_timer(duty.clone()), + round_timer, decide_callback, compare_attestations: consensus.compare_attestations(), runtime: runtime.clone(), @@ -332,12 +362,13 @@ async fn run_instance_inner( }; let core_ct_for_run = core_ct.clone(); + let core_duty = duty.clone(); let core_result = tokio::task::spawn_blocking(move || { qbft::run( &core_ct_for_run, &def, &core_transport, - &duty, + &core_duty, peer_idx, core_hash_rx, core_verify_rx, @@ -360,6 +391,7 @@ async fn run_instance_inner( .unwrap_or_else(PoisonError::into_inner) .take() { + metrics::inc_qbft_consensus_error(); return Err(Error::Transport(err)); } @@ -368,11 +400,18 @@ async fn run_instance_inner( match core_result { Ok(()) => Ok(()), Err(qbft::QbftError::ContextCanceled) if decided.load(Ordering::Relaxed) => Ok(()), - Err(qbft::QbftError::ContextCanceled) => Err(Error::ConsensusTimeout), + Err(qbft::QbftError::ContextCanceled) => { + metrics::inc_qbft_consensus_timeout(&duty, timer_type); + Err(Error::ConsensusTimeout) + } Err(qbft::QbftError::ChannelError(_)) if canceled_before_teardown => { + metrics::inc_qbft_consensus_timeout(&duty, timer_type); Err(Error::ConsensusTimeout) } - Err(err) => Err(Error::Core(err)), + Err(err) => { + metrics::inc_qbft_consensus_error(); + Err(Error::Core(err)) + } } } @@ -407,6 +446,29 @@ async fn wait_instance_result(inst: &InstanceIo) -> Result<()> { } } +/// Observes duration only for callers that supplied a local proposal value. +fn observe_qbft_consensus_duration( + decided_at_rx: &mut mpsc::Receiver, + duty: &Duty, + timer_type: TimerType, + proposed_at: Instant, +) { + if let Ok(decided_at) = decided_at_rx.try_recv() { + metrics::observe_qbft_consensus_duration( + duty, + timer_type, + duration_seconds(decided_at, proposed_at), + ); + } +} + +fn duration_seconds(decided_at: Instant, proposed_at: Instant) -> f64 { + decided_at.checked_duration_since(proposed_at).map_or_else( + || -proposed_at.duration_since(decided_at).as_secs_f64(), + |duration| duration.as_secs_f64(), + ) +} + /// Bridges Tokio channels into the crossbeam channels expected by core QBFT. async fn bridge_mpsc_to_crossbeam( ct: CancellationToken, @@ -471,6 +533,16 @@ mod tests { use crate::qbft::component::{self, Config}; use pluto_core::{corepb::v1::core as pbcore, types::SlotNumber}; + #[test] + fn duration_seconds_preserves_signed_order() { + let proposed_at = Instant::now(); + let decided_after = proposed_at + Duration::from_millis(250); + let decided_before = proposed_at - Duration::from_millis(250); + + assert_eq!(duration_seconds(decided_after, proposed_at), 0.25); + assert_eq!(duration_seconds(decided_before, proposed_at), -0.25); + } + #[tokio::test] async fn propose_when_instance_already_running_fills_value_hash_and_verify_channels() { let consensus = Arc::new(