diff --git a/crates/video-streamer/src/streamer/mod.rs b/crates/video-streamer/src/streamer/mod.rs index 3642270b4..2ce156d27 100644 --- a/crates/video-streamer/src/streamer/mod.rs +++ b/crates/video-streamer/src/streamer/mod.rs @@ -99,7 +99,7 @@ pub fn webm_stream( } } - const MAX_RETRY_COUNT: usize = 3; + const MAX_RETRY_COUNT: usize = 25; // To make sure we don't retry forever // Retry is set to 0 when we successfully read a tag let mut retry_count = 0; @@ -190,6 +190,10 @@ pub fn webm_stream( }, _ = stop_notifier.notified() => { let _ = tx.send(WhenEofControlFlow::Break); + }, + _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => { + info!("EOF wait timed out, retrying"); + let _ = tx.send(WhenEofControlFlow::Continue); } } }); diff --git a/crates/video-streamer/src/streamer/tag_writers.rs b/crates/video-streamer/src/streamer/tag_writers.rs index a98001fe1..28b311441 100644 --- a/crates/video-streamer/src/streamer/tag_writers.rs +++ b/crates/video-streamer/src/streamer/tag_writers.rs @@ -1,4 +1,4 @@ -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::Context; use cadeau::xmf::vpx::{VpxCodec, VpxDecoder, VpxEncoder}; @@ -14,7 +14,7 @@ use crate::debug::mastroka_spec_name; const VPX_EFLAG_FORCE_KF: u32 = 0x00000001; #[cfg(feature = "perf-diagnostics")] -fn duration_as_millis_u64(duration: std::time::Duration) -> u64 { +fn duration_as_millis_u64(duration: Duration) -> u64 { u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) } @@ -105,7 +105,9 @@ where last_encoded_abs_time: Option, // Adaptive frame skipping state + #[cfg(feature = "perf-diagnostics")] stream_start: Instant, + processing_time: Duration, last_ratio: f64, frames_since_last_encode: u32, adaptive_frame_skip: bool, @@ -192,7 +194,9 @@ where decoder, cut_block_state: CutBlockState::HaventMet, last_encoded_abs_time: None, + #[cfg(feature = "perf-diagnostics")] stream_start: Instant::now(), + processing_time: Duration::ZERO, last_ratio: 1.0, frames_since_last_encode: 0, adaptive_frame_skip: config.adaptive_frame_skip, @@ -263,7 +267,9 @@ where "VideoBlock created" ); + let processing_started = Instant::now(); self.process_current_block(&video_block)?; + self.processing_time += processing_started.elapsed(); Ok(WriterResult::Continue) } @@ -371,11 +377,11 @@ where fn current_realtime_ratio(&self, media_advanced_ms: u64) -> f64 { #[allow(clippy::cast_possible_truncation)] // u64 max is ~584 million years in ms; no real truncation risk - let wall_ms = self.stream_start.elapsed().as_millis() as u64; - if wall_ms == 0 { + let processing_ms = self.processing_time.as_millis() as u64; + if processing_ms == 0 { 1.0 } else { - media_advanced_ms as f64 / wall_ms as f64 + media_advanced_ms as f64 / processing_ms as f64 } }