Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/add_dynacast_support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
libwebrtc: patch
livekit: patch
livekit-ffi: patch
---

Add dynacast support - #1003 (@chenosaurus)
20 changes: 19 additions & 1 deletion examples/local_video/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ struct Args {
/// Use H.265/HEVC encoding if supported (falls back to H.264 on failure)
#[arg(long, default_value_t = false)]
h265: bool,

/// Enable dynacast (pause unused simulcast layers based on subscriber demand)
#[arg(long, default_value_t = false)]
dynacast: bool,
}

fn list_cameras() -> Result<()> {
Expand Down Expand Up @@ -137,6 +141,7 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
info!("Connecting to LiveKit room '{}' as '{}'...", args.room_name, args.identity);
let mut room_options = RoomOptions::default();
room_options.auto_subscribe = true;
room_options.dynacast = args.dynacast;
let (room, _) = Room::connect(&url, &token, room_options).await?;
let room = std::sync::Arc::new(room);
info!("Connected: {} - {}", room.name(), room.sid().await);
Expand Down Expand Up @@ -419,11 +424,24 @@ async fn run(args: Args, ctrl_c_received: Arc<AtomicBool>) -> Result<()> {
let secs = last_fps_log.elapsed().as_secs_f64();
let fps_est = frames as f64 / secs;
let n = frames.max(1) as f64;
let layers = track.publishing_layers();
let layers_str = if layers.is_empty() {
"n/a".to_string()
} else {
layers
.iter()
.map(|(rid, quality, active)| {
format!("{}({})={}", rid, quality, if *active { "ON" } else { "off" })
})
.collect::<Vec<_>>()
.join(", ")
};
info!(
"Publishing video: {}x{}, ~{:.1} fps | avg ms: get {:.2}, decode {:.2}, convert {:.2}, capture {:.2}, sleep {:.2}, iter {:.2} | target {:.2}",
"Publishing video: {}x{}, ~{:.1} fps | layers: [{}] | avg ms: get {:.2}, decode {:.2}, convert {:.2}, capture {:.2}, sleep {:.2}, iter {:.2} | target {:.2}",
width,
height,
fps_est,
layers_str,
sum_get_ms / n,
sum_decode_ms / n,
sum_convert_ms / n,
Expand Down
1 change: 1 addition & 0 deletions examples/local_video/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ impl eframe::App for VideoApp {
let resp = ui.selectable_label(is_selected, label);
if resp.clicked() {
if let Some(ref pub_remote) = sc.publication {
info!("Requesting layer: {:?}", q);
pub_remote.set_video_quality(q);
sc.requested_quality = Some(q);
}
Expand Down
92 changes: 73 additions & 19 deletions libwebrtc/src/native/rtp_parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ impl From<sys_rp::ffi::RtpParameters> for RtpParameters {
Self {
codecs: value.codecs.into_iter().map(Into::into).collect(),
header_extensions: value.header_extensions.into_iter().map(Into::into).collect(),
encodings: value.encodings.into_iter().map(Into::into).collect(),
rtcp: value.rtcp.into(),
transaction_id: value.transaction_id,
mid: value.mid,
has_degradation_preference: value.has_degradation_preference,
// Safety: DegradationPreference is #[repr(i32)]
degradation_preference: unsafe { std::mem::transmute(value.degradation_preference) },
}
}
}
Expand All @@ -51,13 +57,36 @@ impl From<sys_rp::ffi::RtpCodecParameters> for RtpCodecParameters {
payload_type: value.payload_type as u8,
clock_rate: value.has_clock_rate.then_some(value.clock_rate as u64),
channels: value.has_num_channels.then_some(value.num_channels as u16),
name: value.name,
// Safety: MediaType, RtcpFeedbackType, RtcpFeedbackMessageType are #[repr(i32)]
kind: unsafe { std::mem::transmute(value.kind) },
has_max_ptime: value.has_max_ptime,
max_ptime: value.max_ptime,
has_ptime: value.has_ptime,
ptime: value.ptime,
rtcp_feedback: value
.rtcp_feedback
.into_iter()
.map(|f| CodecFeedback {
feedback_type: unsafe { std::mem::transmute(f.feedback_type) },
has_message_type: f.has_message_type,
message_type: unsafe { std::mem::transmute(f.message_type) },
})
.collect(),
parameters: value.parameters.into_iter().map(|kv| (kv.key, kv.value)).collect(),
}
}
}

impl From<sys_rp::ffi::RtcpParameters> for RtcpParameters {
fn from(value: sys_rp::ffi::RtcpParameters) -> Self {
Self { cname: value.cname, reduced_size: value.reduced_size }
Self {
cname: value.cname,
reduced_size: value.reduced_size,
mux: value.mux,
has_ssrc: value.has_ssrc,
ssrc: value.ssrc,
}
}
}

Expand All @@ -72,6 +101,8 @@ impl From<sys_rp::ffi::RtpEncodingParameters> for RtpEncodingParameters {
scale_resolution_down_by: value
.has_scale_resolution_down_by
.then_some(value.scale_resolution_down_by),
has_ssrc: value.has_ssrc,
ssrc: value.ssrc,
}
}
}
Expand Down Expand Up @@ -139,36 +170,59 @@ impl From<RtpHeaderExtensionParameters> for sys_rp::ffi::RtpExtension {

impl From<RtpParameters> for sys_rp::ffi::RtpParameters {
fn from(value: RtpParameters) -> Self {
// Safety: DegradationPreference is #[repr(i32)]
let degradation_preference: sys_rp::ffi::DegradationPreference =
unsafe { std::mem::transmute(value.degradation_preference) };
Self {
codecs: value.codecs.into_iter().map(Into::into).collect(),
header_extensions: value.header_extensions.into_iter().map(Into::into).collect(),
encodings: Vec::new(),
encodings: value.encodings.into_iter().map(Into::into).collect(),
rtcp: value.rtcp.into(),
transaction_id: "".to_string(),
mid: "".to_string(),
has_degradation_preference: false,
degradation_preference: sys_rp::ffi::DegradationPreference::Balanced,
transaction_id: value.transaction_id,
mid: value.mid,
has_degradation_preference: value.has_degradation_preference,
degradation_preference,
}
}
}

impl From<RtpCodecParameters> for sys_rp::ffi::RtpCodecParameters {
fn from(value: RtpCodecParameters) -> Self {
// Safety: MediaType, RtcpFeedbackType, RtcpFeedbackMessageType are all #[repr(i32)]
let kind: sys_webrtc::ffi::MediaType = unsafe { std::mem::transmute(value.kind) };
Self {
payload_type: value.payload_type as i32,
mime_type: value.mime_type,
has_clock_rate: value.clock_rate.is_some(),
clock_rate: value.clock_rate.unwrap_or_default() as i32,
has_num_channels: value.channels.is_some(),
num_channels: value.channels.unwrap_or_default() as i32,
name: "".to_string(),
kind: sys_rp::ffi::MediaType::Audio,
has_max_ptime: false,
max_ptime: 0,
has_ptime: false,
ptime: 0,
rtcp_feedback: Vec::new(),
parameters: Vec::new(),
name: value.name,
kind,
has_max_ptime: value.has_max_ptime,
max_ptime: value.max_ptime,
has_ptime: value.has_ptime,
ptime: value.ptime,
rtcp_feedback: value
.rtcp_feedback
.into_iter()
.map(|f| {
let feedback_type: sys_rp::ffi::RtcpFeedbackType =
unsafe { std::mem::transmute(f.feedback_type) };
let message_type: sys_rp::ffi::RtcpFeedbackMessageType =
unsafe { std::mem::transmute(f.message_type) };
sys_rp::ffi::RtcpFeedback {
feedback_type,
has_message_type: f.has_message_type,
message_type,
}
})
.collect(),
parameters: value
.parameters
.into_iter()
.map(|(key, value)| sys_rp::ffi::StringKeyValue { key, value })
.collect(),
}
}
}
Expand All @@ -178,9 +232,9 @@ impl From<RtcpParameters> for sys_rp::ffi::RtcpParameters {
Self {
cname: value.cname,
reduced_size: value.reduced_size,
has_ssrc: false,
ssrc: 0,
mux: false,
has_ssrc: value.has_ssrc,
ssrc: value.ssrc,
mux: value.mux,
}
}
}
Expand All @@ -205,8 +259,8 @@ impl From<RtpEncodingParameters> for sys_rp::ffi::RtpEncodingParameters {
num_temporal_layers: 0,
has_scalability_mode: false,
scalability_mode: "".to_string(),
has_ssrc: false,
ssrc: 0,
has_ssrc: value.has_ssrc,
ssrc: value.ssrc,
}
}
}
Expand Down
31 changes: 31 additions & 0 deletions libwebrtc/src/rtp_parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,22 @@ pub struct RtpHeaderExtensionParameters {
pub struct RtpParameters {
pub codecs: Vec<RtpCodecParameters>,
pub header_extensions: Vec<RtpHeaderExtensionParameters>,
pub encodings: Vec<RtpEncodingParameters>,
pub rtcp: RtcpParameters,
/// Opaque token used by WebRTC to pair getParameters/setParameters calls.
/// Must be preserved when round-tripping through set_parameters().
pub(crate) transaction_id: String,
pub(crate) mid: String,
pub(crate) has_degradation_preference: bool,
pub(crate) degradation_preference: i32,
}

/// Mirrors webrtc_sys RtcpFeedback for round-trip fidelity.
#[derive(Debug, Clone, Default)]
pub(crate) struct CodecFeedback {
pub feedback_type: i32,
pub has_message_type: bool,
pub message_type: i32,
}

#[derive(Debug, Clone, Default)]
Expand All @@ -42,12 +57,23 @@ pub struct RtpCodecParameters {
pub mime_type: String, // read-only
pub clock_rate: Option<u64>,
pub channels: Option<u16>,
pub(crate) name: String,
pub(crate) kind: i32,
pub(crate) has_max_ptime: bool,
pub(crate) max_ptime: i32,
pub(crate) has_ptime: bool,
pub(crate) ptime: i32,
pub(crate) rtcp_feedback: Vec<CodecFeedback>,
pub(crate) parameters: Vec<(String, String)>,
}

#[derive(Debug, Clone, Default)]
pub struct RtcpParameters {
pub cname: String,
pub reduced_size: bool,
pub(crate) mux: bool,
pub(crate) has_ssrc: bool,
pub(crate) ssrc: u32,
}

#[derive(Debug, Clone)]
Expand All @@ -58,6 +84,9 @@ pub struct RtpEncodingParameters {
pub priority: Priority,
pub rid: String,
pub scale_resolution_down_by: Option<f64>,
/// Preserved for round-trip fidelity with WebRTC's getParameters/setParameters.
pub has_ssrc: bool,
pub ssrc: u32,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -89,6 +118,8 @@ impl Default for RtpEncodingParameters {
priority: Priority::Low,
rid: String::default(),
scale_resolution_down_by: None,
has_ssrc: false,
ssrc: 0,
}
}
}
91 changes: 91 additions & 0 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,9 @@ impl RoomSession {
EngineEvent::TrackMuted { sid, muted } => {
self.handle_server_initiated_mute_track(sid, muted);
}
EngineEvent::SubscribedQualityUpdate { update } => {
self.handle_subscribed_quality_update(update);
}
EngineEvent::LocalDataTrackInput(event) => {
_ = self.local_dt_input.send(event);
}
Expand Down Expand Up @@ -1814,6 +1817,94 @@ impl RoomSession {
log::warn!("Track not found in mute request: {}", sid_for_log);
}

#[allow(deprecated)]
fn handle_subscribed_quality_update(&self, update: proto::SubscribedQualityUpdate) {
if !self.options.dynacast {
return;
}

let track_sid: TrackSid = match update.track_sid.clone().try_into() {
Ok(sid) => sid,
Err(_) => {
log::warn!(
"dynacast: invalid track sid in subscribed quality update: {}",
update.track_sid
);
return;
}
};

let publication = match self.local_participant.get_track_publication(&track_sid) {
Some(pub_) => pub_,
None => {
log::warn!("dynacast: local track publication not found for sid {}", track_sid);
return;
}
};

let video_track = match publication.track() {
Some(LocalTrack::Video(vt)) => vt,
_ => {
log::debug!(
"dynacast: track {} is not a local video track, ignoring quality update",
track_sid
);
return;
}
};

let qualities: Vec<proto::SubscribedQuality> = if !update.subscribed_codecs.is_empty() {
let codec = publication.publish_options().video_codec.as_str().to_lowercase();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curiously, is publication.publish_options().video_codec the codec that being used ? or the codec that is preferred or requested ?

I wonder if there is any chance that the actual codec is not publication.publish_options().video_codec ? in that case, there might be a mismatch

log::info!(
"dynacast: SFU quality update for {}: subscribed_codecs={:?}, looking for codec '{}'",
track_sid,
update.subscribed_codecs.iter().map(|sc| {
let qs: Vec<String> = sc.qualities.iter().map(|q| {
format!("{:?}={}", proto::VideoQuality::try_from(q.quality).unwrap_or(proto::VideoQuality::High), q.enabled)
}).collect();
format!("{}:[{}]", sc.codec, qs.join(", "))
}).collect::<Vec<_>>().join("; "),
codec,
);
update
.subscribed_codecs
.iter()
.find(|sc| sc.codec.to_lowercase() == codec)
.map(|sc| sc.qualities.clone())
.unwrap_or_else(|| {
log::warn!("dynacast: codec '{}' not found in subscribed_codecs, falling back to first", codec);
update
.subscribed_codecs
.first()
.map(|sc| sc.qualities.clone())
.unwrap_or_default()
})
} else {
let qs: Vec<String> = update
.subscribed_qualities
.iter()
.map(|q| {
format!(
"{:?}={}",
proto::VideoQuality::try_from(q.quality)
.unwrap_or(proto::VideoQuality::High),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we have multiple places to default the quality to proto::VideoQuality::High when things are unset ?

Should we make a flag or macro instead ? and please add comments to explain why proto::VideoQuality::High is preferred

q.enabled
)
})
.collect();
log::info!(
"dynacast: SFU quality update for {} (legacy): [{}]",
track_sid,
qs.join(", "),
);
update.subscribed_qualities.clone()
};

if let Err(e) = video_track.set_publishing_layers(&qualities) {
log::error!("dynacast: failed to set publishing layers for {}: {}", track_sid, e);
}
}

/// Create a new participant
/// Also add it to the participants list
fn create_participant(
Expand Down
Loading
Loading