Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
vise.workspace = true

[dev-dependencies]
anyhow.workspace = true
Expand Down
22 changes: 22 additions & 0 deletions crates/consensus/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{
Mutex, PoisonError,
atomic::{AtomicBool, Ordering},
},
time::Instant,
};

use prost_types::Any;
Expand Down Expand Up @@ -105,6 +106,10 @@ pub struct InstanceIo<T> {
/// Publishes the runner completion result.
pub(crate) err_tx: mpsc::Sender<RunnerResult>,
err_rx: ReceiverSlot<RunnerResult>,

/// Publishes the local decision timestamp.
pub(crate) decided_at_tx: mpsc::Sender<Instant>,
decided_at_rx: ReceiverSlot<Instant>,
}

impl<T> InstanceIo<T> {
Expand All @@ -115,6 +120,7 @@ impl<T> InstanceIo<T> {
let (value_tx, value_rx) = mpsc::channel(1);
let (verify_tx, verify_rx) = mpsc::channel(1);
let (err_tx, err_rx) = mpsc::channel(1);
let (decided_at_tx, decided_at_rx) = mpsc::channel(1);

Self {
participated: AtomicBool::new(false),
Expand All @@ -130,6 +136,8 @@ impl<T> InstanceIo<T> {
verify_rx: Mutex::new(Some(verify_rx)),
err_tx,
err_rx: Mutex::new(Some(err_rx)),
decided_at_tx,
decided_at_rx: Mutex::new(Some(decided_at_rx)),
}
}

Expand Down Expand Up @@ -191,6 +199,11 @@ impl<T> InstanceIo<T> {
pub fn take_err_rx(&self) -> Result<mpsc::Receiver<RunnerResult>> {
take_receiver(&self.err_rx, "err")
}

/// Transfers decided timestamp ownership to the proposer.
pub fn take_decided_at_rx(&self) -> Result<mpsc::Receiver<Instant>> {
take_receiver(&self.decided_at_rx, "decided_at")
}
}

impl<T> Default for InstanceIo<T> {
Expand Down Expand Up @@ -290,6 +303,12 @@ mod tests {
io.err_tx.try_send(Err(Box::new(TestError))),
Err(TrySendError::Full(Err(_)))
));

assert!(io.decided_at_tx.try_send(Instant::now()).is_ok());
assert!(matches!(
io.decided_at_tx.try_send(Instant::now()),
Err(TrySendError::Full(_))
));
}

#[test]
Expand Down Expand Up @@ -319,6 +338,9 @@ mod tests {

assert!(io.take_err_rx().is_ok());
assert_receiver_already_taken(io.take_err_rx(), "err");

assert!(io.take_decided_at_rx().is_ok());
assert_receiver_already_taken(io.take_decided_at_rx(), "decided_at");
}

#[test]
Expand Down
2 changes: 2 additions & 0 deletions crates/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub mod protocols;

/// Consensus instance I/O channels.
pub mod instance;
/// Consensus metrics.
pub mod metrics;
/// QBFT consensus wrapper.
pub mod qbft;

Expand Down
208 changes: 208 additions & 0 deletions crates/consensus/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
//! Prometheus metrics for consensus.

use pluto_core::types::Duty;
use vise::{Counter, Gauge, Histogram, LabeledFamily, Metrics};

use crate::{protocols::QBFT_V2_PROTOCOL_ID, timer::TimerType};

/// Histogram buckets for consensus duration metrics.
pub const CONSENSUS_DURATION_BUCKETS: [f64; 17] = [
0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.0, 2.25, 2.5, 2.75, 3.0, 5.0,
];

type ProtocolDutyTimerLabels = (String, String, String);
type ProtocolDutyLabels = (String, String);

/// Metrics for consensus protocols.
#[derive(Debug, Metrics)]
#[metrics(prefix = "core_consensus")]
pub struct ConsensusMetrics {
/// Number of decided rounds by protocol, duty, and timer.
#[metrics(labels = ["protocol", "duty", "timer"])]
pub decided_rounds: LabeledFamily<ProtocolDutyTimerLabels, Gauge<i64>, 3>,

/// Index of the decided leader by protocol and duty.
#[metrics(labels = ["protocol", "duty"])]
pub decided_leader_index: LabeledFamily<ProtocolDutyLabels, Gauge<i64>, 2>,

/// Duration of the consensus process by protocol, duty, and timer.
#[metrics(buckets = &CONSENSUS_DURATION_BUCKETS, labels = ["protocol", "duty", "timer"])]
pub duration_seconds: LabeledFamily<ProtocolDutyTimerLabels, Histogram, 3>,

/// Total count of consensus timeouts by protocol, duty, and timer.
#[metrics(labels = ["protocol", "duty", "timer"])]
pub timeout_total: LabeledFamily<ProtocolDutyTimerLabels, Counter, 3>,

/// Total count of consensus errors by protocol.
#[metrics(labels = ["protocol"])]
pub error_total: LabeledFamily<String, Counter>,
}

impl ConsensusMetrics {
/// Sets the number of decided rounds for a duty and timer.
pub fn set_decided_rounds(&self, protocol: &str, duty: &str, timer: &str, rounds: i64) {
self.decided_rounds[&labels(protocol, duty, timer)].set(rounds);
}

/// Sets the decided leader index for a duty.
pub fn set_decided_leader_index(&self, protocol: &str, duty: &str, leader_index: i64) {
self.decided_leader_index[&(protocol.to_owned(), duty.to_owned())].set(leader_index);
}

/// Observes the consensus duration for a duty and timer.
pub fn observe_consensus_duration(
&self,
protocol: &str,
duty: &str,
timer: &str,
duration_seconds: f64,
) {
self.duration_seconds[&labels(protocol, duty, timer)].observe(duration_seconds);
}

/// Increments the consensus timeout counter for a duty and timer.
pub fn inc_consensus_timeout(&self, protocol: &str, duty: &str, timer: &str) {
self.timeout_total[&labels(protocol, duty, timer)].inc();
}

/// Increments the consensus error counter.
pub fn inc_consensus_error(&self, protocol: &str) {
self.error_total[protocol].inc();
}
}

/// Global metrics for consensus.
#[vise::register]
pub static CONSENSUS_METRICS: vise::Global<ConsensusMetrics> = vise::Global::new();

/// Records the metrics emitted when QBFT decides a duty.
pub(crate) fn record_qbft_decision(
duty: &Duty,
timer_type: TimerType,
round: i64,
leader_index: i64,
) {
let duty = duty.duty_type.to_string();
let timer = timer_type.as_str();

CONSENSUS_METRICS.set_decided_leader_index(QBFT_V2_PROTOCOL_ID, &duty, leader_index);
CONSENSUS_METRICS.set_decided_rounds(QBFT_V2_PROTOCOL_ID, &duty, timer, round);
}

/// Records QBFT consensus duration after a local proposal decides.
pub(crate) fn observe_qbft_consensus_duration(
duty: &Duty,
timer_type: TimerType,
duration_seconds: f64,
) {
let duty = duty.duty_type.to_string();
let timer = timer_type.as_str();
CONSENSUS_METRICS.observe_consensus_duration(
QBFT_V2_PROTOCOL_ID,
&duty,
timer,
duration_seconds,
);
}

/// Records a QBFT consensus timeout.
pub(crate) fn inc_qbft_consensus_timeout(duty: &Duty, timer_type: TimerType) {
let duty = duty.duty_type.to_string();
CONSENSUS_METRICS.inc_consensus_timeout(QBFT_V2_PROTOCOL_ID, &duty, timer_type.as_str());
}

/// Records a QBFT core consensus error.
pub(crate) fn inc_qbft_consensus_error() {
CONSENSUS_METRICS.inc_consensus_error(QBFT_V2_PROTOCOL_ID);
}

fn labels(protocol: &str, duty: &str, timer: &str) -> ProtocolDutyTimerLabels {
(protocol.to_owned(), duty.to_owned(), timer.to_owned())
}

#[cfg(test)]
mod tests {
use vise::{Format, Registry};

use super::*;

#[test]
fn decided_rounds_records_metric_name_labels_and_value() {
let metrics = ConsensusMetrics::default();
metrics.set_decided_rounds("test", "duty", "timer", 1);

let output = encode(&metrics);

assert!(output.contains(
r#"core_consensus_decided_rounds{protocol="test",duty="duty",timer="timer"} 1"#
));
}

#[test]
fn decided_leader_index_records_metric_name_labels_and_value() {
let metrics = ConsensusMetrics::default();
metrics.set_decided_leader_index("test", "duty", 123);

let output = encode(&metrics);

assert!(
output.contains(
r#"core_consensus_decided_leader_index{protocol="test",duty="duty"} 123"#
)
);
}

#[test]
fn duration_records_metric_name_labels_and_exact_buckets() {
let metrics = ConsensusMetrics::default();
metrics.observe_consensus_duration("test", "duty", "timer", 1.0);

let output = encode(&metrics);

assert!(output.contains(
r#"core_consensus_duration_seconds_count{protocol="test",duty="duty",timer="timer"} 1"#
));
for bucket in [
"0.01", "0.025", "0.05", "0.1", "0.25", "0.5", "0.75", "1.0", "1.25", "1.5", "1.75",
"2.0", "2.25", "2.5", "2.75", "3.0", "5.0",
] {
assert!(
output.contains(&format!(
r#"core_consensus_duration_seconds_bucket{{le="{bucket}",protocol="test",duty="duty",timer="timer"}}"#
)),
"missing bucket {bucket}: {output}"
);
}
}

#[test]
fn timeout_records_metric_name_labels_and_value() {
let metrics = ConsensusMetrics::default();
metrics.inc_consensus_timeout("test", "duty", "timer");

let output = encode(&metrics);

assert!(output.contains(
r#"core_consensus_timeout_total{protocol="test",duty="duty",timer="timer"} 1"#
));
}

#[test]
fn error_records_metric_name_labels_and_value() {
let metrics = ConsensusMetrics::default();
metrics.inc_consensus_error("test");

let output = encode(&metrics);

assert!(output.contains(r#"core_consensus_error_total{protocol="test"} 1"#));
}

fn encode(metrics: &ConsensusMetrics) -> String {
let mut registry = Registry::empty();
registry.register_metrics(metrics);

let mut output = String::new();
registry.encode(&mut output, Format::Prometheus).unwrap();
output
}
}
Loading
Loading