Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions apps/skit/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1890,9 +1890,7 @@ impl<S> InstrumentedOneshotStream<S> {
impl<S> Drop for InstrumentedOneshotStream<S> {
fn drop(&mut self) {
if !self.recorded {
// If the client disconnects early, the response body stream is dropped without EOF.
// Record as error so we still get visibility into partial/aborted oneshot executions.
self.record("error");
self.record("incomplete");
}
}
}
Expand Down Expand Up @@ -2060,6 +2058,9 @@ async fn process_oneshot_pipeline_handler(
.with_description(
"Oneshot pipeline runtime from request start until response stream ends",
)
.with_boundaries(
streamkit_core::metrics::HISTOGRAM_BOUNDARIES_PIPELINE_DURATION.to_vec(),
)
.build()
})
.clone();
Expand Down Expand Up @@ -2262,7 +2263,12 @@ async fn metrics_middleware(req: axum::http::Request<Body>, next: Next) -> Respo
let meter = global::meter("skit_server");
(
meter.u64_counter("http.server.requests").build(),
meter.f64_histogram("http.server.duration").build(),
meter
.f64_histogram("http.server.duration")
.with_boundaries(
streamkit_core::metrics::HISTOGRAM_BOUNDARIES_HTTP_DURATION.to_vec(),
)
.build(),
)
})
.clone();
Expand Down
3 changes: 3 additions & 0 deletions apps/skit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ impl Default for SessionManager {
.f64_histogram("session.duration")
.with_description("Session lifetime duration in seconds")
.with_unit("s")
.with_boundaries(
streamkit_core::metrics::HISTOGRAM_BOUNDARIES_SESSION_DURATION.to_vec(),
)
.build(),
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub mod control;
pub mod error;
pub mod frame_pool;
pub mod helpers;
pub mod metrics;
pub mod moq_gateway;
pub mod node;
pub mod node_config;
Expand Down
58 changes: 58 additions & 0 deletions crates/core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
//
// SPDX-License-Identifier: MPL-2.0

//! Shared metrics configuration and histogram boundaries.
//!
//! Defines standard histogram bucket boundaries for OpenTelemetry metrics
//! to ensure accurate percentile calculations across the codebase.

/// Sub-millisecond boundaries for per-packet codec operations (10μs to 1s)
/// Used by: opus encode/decode
pub const HISTOGRAM_BOUNDARIES_CODEC_PACKET: &[f64] =
&[0.00001, 0.00005, 0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0];

/// Millisecond-to-minute boundaries for per-file operations (1ms to 60s)
/// Used by: mp3/flac/wav decode/demux
pub const HISTOGRAM_BOUNDARIES_FILE_OPERATION: &[f64] =
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0];

/// Node execution boundaries (10μs to 60s)
/// Used by: node.execution.duration
pub const HISTOGRAM_BOUNDARIES_NODE_EXECUTION: &[f64] =
&[0.00001, 0.0001, 0.001, 0.01, 0.1, 1.0, 10.0, 60.0];

/// Backpressure wait time boundaries (1μs to 10s)
/// Used by: pin_distributor.send_wait_seconds
pub const HISTOGRAM_BOUNDARIES_BACKPRESSURE: &[f64] =
&[0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1.0, 10.0];

/// Pacer lateness boundaries (1ms to 10s)
/// Used by: pacer.lateness_seconds
pub const HISTOGRAM_BOUNDARIES_PACER_LATENESS: &[f64] =
&[0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0, 10.0];

/// Clock offset boundaries in milliseconds (0.1ms to 10s)
/// Used by: moq.push.clock_offset_ms
pub const HISTOGRAM_BOUNDARIES_CLOCK_OFFSET_MS: &[f64] =
&[0.1, 1.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0, 1000.0, 5000.0, 10000.0];

/// Frame gap boundaries in milliseconds (1ms to 1s, with common frame rates)
/// Used by: moq.peer.inter_frame_ms
pub const HISTOGRAM_BOUNDARIES_FRAME_GAP_MS: &[f64] =
&[1.0, 5.0, 10.0, 16.0, 20.0, 33.0, 50.0, 100.0, 200.0, 500.0, 1000.0];

/// Pipeline duration boundaries (10ms to 5 minutes)
/// Used by: oneshot_pipeline.duration
pub const HISTOGRAM_BOUNDARIES_PIPELINE_DURATION: &[f64] =
&[0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0];

/// HTTP request duration boundaries (1ms to 60s)
/// Used by: http.server.duration
pub const HISTOGRAM_BOUNDARIES_HTTP_DURATION: &[f64] =
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0];

/// Session lifetime boundaries (1s to 24 hours)
/// Used by: session.duration
pub const HISTOGRAM_BOUNDARIES_SESSION_DURATION: &[f64] =
&[1.0, 10.0, 60.0, 300.0, 600.0, 1800.0, 3600.0, 7200.0, 21600.0, 86400.0];
64 changes: 51 additions & 13 deletions crates/engine/src/dynamic_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub struct DynamicEngine {
HashMap<String, mpsc::Sender<streamkit_core::pins::PinManagementMessage>>,
/// Map of node pin metadata: NodeId -> Pin Metadata (for runtime type validation)
pub(super) node_pin_metadata: HashMap<String, NodePinMetadata>,
/// Map of node_id -> node_kind for labeling metrics
pub(super) node_kinds: HashMap<String, String>,
pub(super) batch_size: usize,
/// Session ID for gateway registration (if applicable)
pub(super) session_id: Option<String>,
Expand All @@ -75,11 +77,11 @@ pub struct DynamicEngine {
pub(super) nodes_active_gauge: opentelemetry::metrics::Gauge<u64>,
pub(super) node_state_transitions_counter: opentelemetry::metrics::Counter<u64>,
pub(super) engine_operations_counter: opentelemetry::metrics::Counter<u64>,
// Node-level packet metrics
pub(super) node_packets_received_gauge: opentelemetry::metrics::Gauge<u64>,
pub(super) node_packets_sent_gauge: opentelemetry::metrics::Gauge<u64>,
pub(super) node_packets_discarded_gauge: opentelemetry::metrics::Gauge<u64>,
pub(super) node_packets_errored_gauge: opentelemetry::metrics::Gauge<u64>,
// Node-level packet metrics (counters, not gauges - for proper rate() calculation)
pub(super) node_packets_received_counter: opentelemetry::metrics::Counter<u64>,
pub(super) node_packets_sent_counter: opentelemetry::metrics::Counter<u64>,
pub(super) node_packets_discarded_counter: opentelemetry::metrics::Counter<u64>,
pub(super) node_packets_errored_counter: opentelemetry::metrics::Counter<u64>,
// Node state metric (1=running, 0=not running)
pub(super) node_state_gauge: opentelemetry::metrics::Gauge<u64>,
}
Expand Down Expand Up @@ -363,16 +365,50 @@ impl DynamicEngine {
"Node stats updated"
);

// Store the current stats
self.node_stats.insert(update.node_id.clone(), update.stats.clone());
let node_kind =
self.node_kinds.get(&update.node_id).map_or("unknown", std::string::String::as_str);
let labels = &[
KeyValue::new("node_id", update.node_id.clone()),
KeyValue::new("node_kind", node_kind.to_string()),
];

let prev_stats = self.node_stats.get(&update.node_id);

let delta_received = prev_stats.map_or(update.stats.received, |prev| {
if update.stats.received < prev.received {
update.stats.received
} else {
update.stats.received - prev.received
}
});
let delta_sent = prev_stats.map_or(update.stats.sent, |prev| {
if update.stats.sent < prev.sent {
update.stats.sent
} else {
update.stats.sent - prev.sent
}
});
let delta_discarded = prev_stats.map_or(update.stats.discarded, |prev| {
if update.stats.discarded < prev.discarded {
update.stats.discarded
} else {
update.stats.discarded - prev.discarded
}
});
let delta_errored = prev_stats.map_or(update.stats.errored, |prev| {
if update.stats.errored < prev.errored {
update.stats.errored
} else {
update.stats.errored - prev.errored
}
});

// Record metrics with node_id label
let labels = &[KeyValue::new("node_id", update.node_id.clone())];
self.node_packets_received_counter.add(delta_received, labels);
self.node_packets_sent_counter.add(delta_sent, labels);
self.node_packets_discarded_counter.add(delta_discarded, labels);
self.node_packets_errored_counter.add(delta_errored, labels);

self.node_packets_received_gauge.record(update.stats.received, labels);
self.node_packets_sent_gauge.record(update.stats.sent, labels);
self.node_packets_discarded_gauge.record(update.stats.discarded, labels);
self.node_packets_errored_gauge.record(update.stats.errored, labels);
self.node_stats.insert(update.node_id.clone(), update.stats.clone());

// Broadcast to all subscribers
self.stats_subscribers.retain(|subscriber| {
Expand Down Expand Up @@ -862,6 +898,7 @@ impl DynamicEngine {
self.node_stats.remove(node_id);
self.node_pin_metadata.remove(node_id);
self.pin_management_txs.remove(node_id);
self.node_kinds.remove(node_id);
self.nodes_active_gauge.record(self.live_nodes.len() as u64, &[]);
}

Expand All @@ -881,6 +918,7 @@ impl DynamicEngine {
tracing::info!(name = %node_id, kind = %kind, "Adding node to graph");
match self.registry.create_node(&kind, params.as_ref()) {
Ok(node) => {
self.node_kinds.insert(node_id.clone(), kind.clone());
// Delegate initialization to helper function
// Pass by reference to avoid unnecessary clones
if let Err(e) = self
Expand Down
1 change: 1 addition & 0 deletions crates/engine/src/dynamic_pin_distributor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl PinDistributorActor {
let send_wait_histogram = meter
.f64_histogram("pin_distributor.send_wait_seconds")
.with_description("Time spent waiting for downstream capacity (backpressure)")
.with_boundaries(streamkit_core::metrics::HISTOGRAM_BOUNDARIES_BACKPRESSURE.to_vec())
.build();
let queue_depth_gauge = meter
.u64_gauge("pin_distributor.queue_depth")
Expand Down
13 changes: 8 additions & 5 deletions crates/engine/src/graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use streamkit_core::node::{InitContext, NodeContext, OutputRouting, OutputSender
use streamkit_core::packet_meta::{can_connect, packet_type_registry};
use streamkit_core::pins::PinUpdate;
use streamkit_core::state::{NodeState, NodeStateUpdate, StopReason};
use streamkit_core::stats::NodeStatsUpdate;
use streamkit_core::types::{Packet, PacketType};
use streamkit_core::PinCardinality;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -62,6 +63,7 @@ pub async fn wire_and_spawn_graph(
batch_size: usize,
media_channel_capacity: usize,
state_tx: Option<mpsc::Sender<NodeStateUpdate>>,
stats_tx: Option<mpsc::Sender<NodeStatsUpdate>>,
cancellation_token: Option<tokio_util::sync::CancellationToken>,
audio_pool: Option<Arc<AudioFramePool>>,
) -> Result<HashMap<String, LiveNode>, StreamKitError> {
Expand Down Expand Up @@ -356,9 +358,9 @@ pub async fn wire_and_spawn_graph(
output_sender: OutputSender::new(name.clone(), OutputRouting::Direct(direct_outputs)),
batch_size,
state_tx: node_state_tx.clone(),
stats_tx: None, // Stateless pipelines don't track stats
telemetry_tx: None, // Stateless pipelines don't emit telemetry
session_id: None, // Stateless pipelines don't have sessions
stats_tx: stats_tx.clone(), // Used by oneshot metrics recording
telemetry_tx: None, // Stateless pipelines don't emit telemetry
session_id: None, // Stateless pipelines don't have sessions
cancellation_token: cancellation_token.clone(),
pin_management_rx: None, // Stateless pipelines don't support dynamic pins
audio_pool: audio_pool.clone(),
Expand All @@ -384,12 +386,13 @@ pub async fn wire_and_spawn_graph(
let meter = global::meter("skit_engine");
let histogram = meter
.f64_histogram("node.execution.duration")
.with_boundaries(streamkit_core::metrics::HISTOGRAM_BOUNDARIES_NODE_EXECUTION.to_vec())
.build();
let status = if result.is_ok() { "ok" } else { "error" };

let labels = [
KeyValue::new("node.name", name.clone()),
KeyValue::new("node.kind", kind.clone()),
KeyValue::new("node_id", name.clone()),
KeyValue::new("node_kind", kind.clone()),
KeyValue::new("status", status),
];
histogram.record(duration.as_secs_f64(), &labels);
Expand Down
17 changes: 9 additions & 8 deletions crates/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ impl Engine {
pin_distributors: HashMap::new(),
pin_management_txs: HashMap::new(),
node_pin_metadata: HashMap::new(),
node_kinds: HashMap::new(),
batch_size: config.packet_batch_size,
session_id: config.session_id,
audio_pool: self.audio_pool.clone(),
Expand All @@ -257,20 +258,20 @@ impl Engine {
.u64_counter("engine.operations")
.with_description("Engine control operations")
.build(),
node_packets_received_gauge: meter
.u64_gauge("node.packets.received")
node_packets_received_counter: meter
.u64_counter("node.packets.received")
.with_description("Total packets received by node")
.build(),
node_packets_sent_gauge: meter
.u64_gauge("node.packets.sent")
node_packets_sent_counter: meter
.u64_counter("node.packets.sent")
.with_description("Total packets sent by node")
.build(),
node_packets_discarded_gauge: meter
.u64_gauge("node.packets.discarded")
node_packets_discarded_counter: meter
.u64_counter("node.packets.discarded")
.with_description("Total packets discarded by node")
.build(),
node_packets_errored_gauge: meter
.u64_gauge("node.packets.errored")
node_packets_errored_counter: meter
.u64_counter("node.packets.errored")
.with_description("Total packet processing errors by node")
.build(),
node_state_gauge: meter
Expand Down
Loading
Loading