Conversation
📝 WalkthroughWalkthroughAdds a new internal crate Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant LP as LocalParticipant
participant LDM as LocalManager
participant Enc as Encryption
participant Pkt as Packetizer
participant RTCE as RTC Engine
participant DC as Data Channel
App->>LP: publish_data_track(options)
LP->>LDM: publish_track(options)
LDM->>LDM: allocate handle / create LocalDataTrack
App->>LP: try_push(frame)
LP->>LDM: frame_tx.send(frame)
LDM->>Enc: encrypt_if_needed(frame)
Enc-->>LDM: EncryptedPayload or frame
LDM->>Pkt: packetize(frame)
Pkt-->>LDM: Vec<Packet>
LDM->>RTCE: OutputEvent::PacketsAvailable
RTCE->>DC: send packets (data channel)
sequenceDiagram
participant DC as Data Channel
participant RTCE as RTC Engine
participant RDM as RemoteManager
participant Dep as Depacketizer
participant Dec as Decryption
participant Sub as Subscriber
DC->>RTCE: binary packet
RTCE->>RDM: RemoteDataTrackInput::PacketReceived
RDM->>Dep: push packet
Dep-->>RDM: DepacketizerFrame or drop
RDM->>Dec: decrypt_if_needed(frame)
Dec-->>RDM: DataTrackFrame
RDM->>Sub: broadcast frame to subscribers
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 19
🤖 Fix all issues with AI agents
In `@examples/basic_data_track/src/subscriber.rs`:
- Around line 23-31: The function handle_first_publication currently continues
listening after subscribing to the first RemoteDataTrack; update it to stop
after the first successful subscription by returning (or breaking and returning)
immediately once subscribe(track).await? completes. Locate the loop in
handle_first_publication and add an early return (e.g., return Ok(())) right
after the subscribe(track).await? call so the function only handles the first
publication.
In `@examples/local_audio/Cargo.toml`:
- Around line 4-17: Update the example Cargo.toml dependencies to match the
workspace defaults by aligning env_logger and clap versions with the workspace
(remove the explicit version pins or set them to the workspace versions) so
env_logger uses 0.11 and clap uses 4.5; update the dependency entries for
env_logger and clap in the examples/local_audio Cargo.toml (the dependency keys
"env_logger" and "clap") to inherit or match the workspace versions.
In `@examples/rpc/Cargo.toml`:
- Around line 4-14: Update the dependency versions in the examples/rpc
Cargo.toml so they match the workspace: change the env_logger dependency from
"0.10" to "0.11" and change rand from "0.8" to "0.9" (the entries for env_logger
and rand in the examples/rpc Cargo.toml); leave other dependency entries
unchanged.
In `@examples/screensharing/Cargo.toml`:
- Around line 4-13: Update the example Cargo.toml dependencies to match the
workspace root: change env_logger from "0.10" to "0.11" and change clap from
"4.0" to "4.5" so the declared versions for env_logger and clap in the examples
align with the workspace root; verify the dependency entries for env_logger and
clap in the examples/screensharing/Cargo.toml are updated accordingly while
leaving other entries (tokio, livekit, livekit-api, log) unchanged.
In `@examples/wgpu_room/Cargo.toml`:
- Around line 4-26: The Cargo.toml example pins env_logger = "0.10.0" and
parking_lot = "0.12.1" which diverge from the workspace root versions; update
the dependency declarations for env_logger and parking_lot to either use
workspace = true (e.g., env_logger = { workspace = true } and parking_lot = {
workspace = true, features = ["deadlock_detection"] }) to inherit the root
versions, or explicitly change the versions to match the workspace root
(env_logger "0.11" and parking_lot "0.12") and document the intentional drift in
a comment if you choose to keep different versions; adjust only the entries for
env_logger and parking_lot in this Cargo.toml.
In `@livekit-datatrack/Cargo.toml`:
- Line 13: The workspace root Cargo.toml currently pins thiserror = "1" while
the livekit-datatrack crate uses thiserror = "2.0.17", causing dual major
versions; open the root Cargo.toml and change the workspace dependency entry
thiserror = "1" to thiserror = "2" so all crates use the same major version
(ensure the workspace dependency name is exactly "thiserror" and then run cargo
update/build to verify no other dependency conflicts).
In `@livekit-datatrack/src/local/manager.rs`:
- Around line 185-199: In on_sfu_publish_response, avoid calling
self.descriptors.remove(&event.handle) before confirming the descriptor is a
pending descriptor; instead look up the descriptor (e.g., with get_mut or entry
API), match it to Descriptor::Pending(result_tx) and only remove the descriptor
when you transition it to active (after calling self.create_local_track and
sending the result), keep active descriptors intact on duplicate responses, and
if the descriptor is missing or result_tx.is_closed() but event.result is Ok
(SFU accepted the publish), call your server-side unpublish path (e.g., a method
like self.unpublish_server_side(event.handle) or similar) to clean up the
orphaned SFU publication so it isn’t left published without a local track.
- Around line 236-265: The HashMap removal in remove_descriptor (and in
on_publish_cancelled) currently frees the descriptor but never returns the u16
handle to the allocator, exhausting HandleAllocator after 65,535 cycles; add a
release(id: Handle) method to HandleAllocator (or implement a reusable allocator
with a free list/bitset) and call it whenever a descriptor is discarded: inside
remove_descriptor() after taking the descriptor (and after sending
published_tx), and inside on_publish_cancelled() where the pending descriptor is
removed, ensuring handles returned to the allocator so HandleAllocator.get() can
reuse freed handles.
- Around line 333-380: The publish_track method uses event_in_tx.try_send which
conflates a full channel with a disconnected receiver; change it to await the
async send to properly handle backpressure. In ManagerInput::publish_track,
replace self.event_in_tx.try_send(event.into()).map_err(|_|
PublishError::Disconnected)? with an awaitable send:
self.event_in_tx.send(event.into()).await.map_err(|_|
PublishError::Disconnected)?, keeping the same PublishRequest/event.into() usage
and existing timeout/result handling (PUBLISH_TIMEOUT, PublishError) otherwise.
In `@livekit-datatrack/src/local/pipeline.rs`:
- Around line 47-51: The debug-only check in Pipeline::new using
debug_assert_eq!(options.info.uses_e2ee, options.encryption_provider.is_some())
must be promoted to a runtime check so releases don't silently misconfigure
E2EE; update the Pipeline::new constructor to validate options.info.uses_e2ee
against options.encryption_provider.is_some() and fail fast (e.g., use
assert_eq! or an explicit if check that panics with a clear message including
the actual values) before creating Packetizer so mismatches cannot result in
plaintext or unreadable ciphertext.
In `@livekit-datatrack/src/local/proto.rs`:
- Around line 76-92: The match on proto::request_response::Reason in
publish_result_from_request_response currently treats any unhandled variant
(including Reason::Ok) as an error; add an explicit check for Reason::Ok and
return None early (since this function is the error-only path) before mapping
other failure reasons to PublishError, keeping the existing mappings for
Reason::NotAllowed and Reason::DuplicateName and constructing the
SfuPublishResponse with Err(error) only for genuine error reasons.
In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Around line 134-159: The finalize function in Depacketizer incorrectly uses
`sequence < end_sequence` which breaks when u16 sequence numbers wrap; update
the loop logic in finalize (working on PartialFrame, sequence, end_sequence,
start_sequence) to check `sequence != end_sequence` and advance `sequence` with
`wrapping_add(1)` so the reassembly correctly iterates across wrap-around
boundaries before stopping at end_sequence; keep the existing removal of
payloads and final return of DepacketizerFrame when sequence equals
end_sequence.
In `@livekit-datatrack/src/remote/manager.rs`:
- Around line 419-424: The ManagerInput::send implementation uses
event_in_tx.try_send which can drop events when the channel is full; change
ManagerInput::send to reliably deliver control events by either (a) making the
method async and replacing try_send with event_in_tx.send(event).await, or (b)
using event_in_tx.blocking_send(event) for a synchronous guarantee, and adjust
callers of ManagerInput::send accordingly (look for the ManagerInput type and
its send method, and the event_in_tx field) so critical events like
SubscribeRequest or Shutdown are never silently dropped.
In `@livekit-datatrack/src/remote/mod.rs`:
- Around line 75-97: Add an early publication-status check to the subscribe()
method: call self.is_published() at the start of subscribe() and if it returns
false return Err(SubscribeError::Unpublished) before creating/sending the
SubscribeRequest (the SubscribeRequest { sid: ..., result_tx } and event_in_tx
send logic). This ensures SubscribeError::Unpublished is used for unpublished
remote tracks and prevents sending subscribe events for un-published tracks.
In `@livekit-datatrack/src/remote/proto.rs`:
- Around line 66-80: The call to Option::is_none_or in
event_from_participant_info (using local_participant_identity.is_none_or(...))
requires Rust 1.82; either set the crate MSRV by adding rust-version = "1.82" to
the crate's Cargo.toml, or replace the is_none_or usage with a compatible
alternative such as local_participant_identity.map_or(true, |identity|
participant.identity != identity) inside event_from_participant_info so
extract_track_info and the participant filtering work on older compilers.
In `@livekit/Cargo.toml`:
- Line 40: The workspace is using thiserror = "1.0" while livekit-datatrack pins
thiserror = "2.0.17", causing duplicate versions; update the root/workspace
Cargo.toml's thiserror version to a 2.x range (e.g., "2.0") and then update each
crate that currently inherits or specifies thiserror (including the livekit
crate entries and the livekit-datatrack Cargo.toml) to the same 2.x version so
all crates use a single thiserror v2 line; ensure removal of any older explicit
"1.0" entries and run cargo update to refresh Cargo.lock so error types are
unified across the workspace.
In `@livekit/src/room/e2ee/data_track.rs`:
- Around line 19-50: The formatting in DataTrackEncryptionProvider::encrypt (and
surrounding code) doesn't match rustfmt; run `cargo fmt` to reformat the file so
the spacing/indentation around the key_index and encrypted variable assignments
is fixed; ensure DataTrackEncryptionProvider, its new constructor, and the impl
dt::EncryptionProvider::encrypt (which calls E2eeManager::key_provider and
E2eeManager::encrypt_data) are left unchanged functionally and only reformatted.
In `@livekit/src/room/e2ee/manager.rs`:
- Around line 272-277: The function signature for encrypt_data is missing a
trailing comma after the key_index parameter; update the signature in the
encrypt_data method (the parameters: data: Vec<u8>, participant_identity:
&ParticipantIdentity, key_index: u32) to include a trailing comma after
key_index to satisfy formatting/CI requirements.
In `@livekit/src/room/mod.rs`:
- Around line 611-628: The variables are swapped: decryption_provider is
constructed as DataTrackEncryptionProvider and encryption_provider as
DataTrackDecryptionProvider, and those are passed to the wrong ManagerOptions;
change the assignments so the actual encryption provider is passed to
local::ManagerOptions and the actual decryption provider is passed to
remote::ManagerOptions — specifically ensure DataTrackEncryptionProvider
(currently in decryption_provider) is used for local_dt_options
(encryption_provider) and DataTrackDecryptionProvider (currently in
encryption_provider) is used for remote_dt_options (decryption_provider), or
alternatively rename the variables so decryption_provider/encryption_provider
reflect their real types and then pass them to local_dt_options and
remote_dt_options accordingly (affecting DataTrackEncryptionProvider,
DataTrackDecryptionProvider, local::ManagerOptions, remote::ManagerOptions,
local_dt_options, remote_dt_options).
🧹 Nitpick comments (15)
examples/send_bytes/Cargo.toml (1)
8-13: Inconsistent dependency versioning compared to other examples.This file pins exact patch versions (
tokio = "1.47.1",log = "0.4.28",env_logger = "0.11.8",rand = "0.9.2"), while other examples in this PR use semver-compatible ranges (e.g.,tokio = { version = "1", ... },log = "0.4"). For unpublished examples, exact pinning creates unnecessary maintenance burden.Consider aligning with other examples:
♻️ Suggested alignment with other examples
-tokio = { version = "1.47.1", features = ["full"] } +tokio = { version = "1", features = ["full"] } livekit = { workspace = true, features = ["rustls-tls-native-roots"] } -log = "0.4.28" -env_logger = "0.11.8" +log = "0.4" +env_logger = "0.11" bitfield-struct = "0.11.0" -rand = "0.9.2" +rand = "0.9"examples/basic_text_stream/Cargo.toml (1)
8-12: Minor version pinning inconsistency within this file.
tokioandfutures-utiluse semver ranges ("1","0.3") whilelogandenv_loggeruse exact patch versions ("0.4.26","0.11.7"). For consistency within this manifest and with other examples:♻️ Optional: align version specifications
tokio = { version = "1", features = ["full"] } futures-util = { version = "0.3", default-features = false, features = ["sink"] } livekit = { workspace = true, features = ["rustls-tls-native-roots"] } -log = "0.4.26" -env_logger = "0.11.7" +log = "0.4" +env_logger = "0.11"examples/basic_data_track/README.md (1)
1-21: Add a best‑effort delivery note to set expectations.This example reads as fully reliable; a short note about possible frame drops would avoid confusion.
✏️ Proposed doc tweak
-Simple example of publishing and subscribing to a data track. +Simple example of publishing and subscribing to a data track. +Note: data tracks are best-effort and may drop frames under backpressure.Based on learnings: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The
process_and_sendmethod inlivekit-datatrack/src/local/manager.rsintentionally usestry_sendto drop packets on backpressure rather than propagating it, and this is expected behavior.livekit-datatrack/src/packet/handle.rs (1)
84-89: Minor: Test dummy excludesu16::MAXas a valid handle.The range
1..u16::MAXis exclusive ofu16::MAX, meaning handle65535will never be generated in tests. Ifu16::MAXis a valid handle value (which it appears to be perTryFrom<u16>), consider using1..=u16::MAXfor full coverage.Proposed fix
impl fake::Dummy<fake::Faker> for Handle { fn dummy_with_rng<R: rand::Rng + ?Sized>(_: &fake::Faker, rng: &mut R) -> Self { - Self::try_from(rng.random_range(1..u16::MAX)).unwrap() + Self::try_from(rng.random_range(1..=u16::MAX)).unwrap() } }livekit-datatrack/src/local/packetizer.rs (1)
133-136: Nitpick: Preferis_empty()over length comparison.Idiomatic Rust uses
is_empty()for emptiness checks.Proposed fix
- if packets.len() == 0 { + if packets.is_empty() { assert_eq!(payload_size, 0, "Should be no packets for zero payload"); return; }livekit-datatrack/src/packet/time.rs (1)
82-88: Document intentional u32 truncation induration_to_ticks.The
ticks as u32cast silently truncates values exceedingu32::MAX. While this is correct behavior for wrapping RTP timestamps, a brief comment would clarify that this is intentional rather than an oversight.Proposed documentation addition
/// Convert a duration since the epoch into clock ticks. const fn duration_to_ticks(duration: Duration) -> u32 { // round(nanos * rate_hz / 1e9) let nanos = duration.as_nanos(); let ticks = (nanos * RATE as u128 + 500_000_000) / 1_000_000_000; + // Intentional truncation: timestamps wrap at u32::MAX ticks as u32 }livekit/Cargo.toml (1)
35-42: Consider using workspace dependencies for consistency.Several dependencies are pinned to explicit versions here while other crates in the workspace may use
workspace = true. This can lead to version drift. If these versions are intentionally different from workspace defaults, this is fine; otherwise, consider using workspace references for consistency.livekit-datatrack/src/remote/depacketizer.rs (1)
240-420: Good test coverage; consider a wrap‑around case.After fixing the wrap logic, a test with
start_sequencenearu16::MAXwould guard against regressions.livekit-datatrack/src/packet/deserialize.rs (1)
97-135: Known extension parsing ignores the wirelenfield.For
E2eeExtandUserTimestampExt, the code checksraw.remaining() < E2eeExt::LEN(hardcoded constant) rather than using thelenvalue read from the wire at line 102. If a future protocol version sends a different length for these extensions, deserialization could read incorrect data or leave bytes unconsumed.Consider validating that
lenmatches the expectedLENconstant, or usinglento bound the read.♻️ Proposed validation
E2eeExt::TAG => { + if len != E2eeExt::LEN { + Err(DeserializeError::MalformedExt(tag))? + } if raw.remaining() < E2eeExt::LEN { Err(DeserializeError::MalformedExt(tag))? } // ... } UserTimestampExt::TAG => { + if len != UserTimestampExt::LEN { + Err(DeserializeError::MalformedExt(tag))? + } if raw.remaining() < UserTimestampExt::LEN { Err(DeserializeError::MalformedExt(tag))? } // ... }livekit-datatrack/src/remote/mod.rs (1)
94-96: Silent frame drops on subscriber lag.
BroadcastStreamreturnsErr(BroadcastStreamRecvError::Lagged(n))when a subscriber falls behind. Filtering these out viafilter_map(|r| async { r.ok() })silently discards the lag notification—subscribers won't know they missed frames.Consider exposing the lag info (e.g., via a wrapper event or logging) so applications can detect and handle missed frames. Based on learnings, this may be acceptable if data tracks don't guarantee reliable delivery, but documenting this behavior in the
subscribedoc comment would help users understand the semantics.livekit-datatrack/src/remote/manager.rs (2)
74-75: Consider documenting or increasing channel buffer sizes.The buffer sizes of 4 for event channels may be tight under load. While the
TODO: tune buffer sizecomment acknowledges this, consider either:
- Making buffer sizes configurable via
ManagerOptions- Documenting the expected throughput characteristics
128-130: Address the TODO for subscription timeout.Pending subscriptions have no internal timeout—if the SFU never responds with a subscriber handle, the
result_txswill never be resolved. Consider adding a timeout mechanism or tracking this in an issue.Would you like me to open an issue to track implementing an internal subscription timeout?
livekit/src/rtc_engine/rtc_session.rs (2)
665-672: Partial batch sending on error—verify this is acceptable.
try_send_data_track_packetssends packets until the first error, then drops the rest of the batch. This aligns with the lossy delivery model, but consider whether logging the number of dropped packets would aid debugging.
1942-1954: Consider removing the TODO or clarifying the clone concern.The comment
// TODO: avoid clone if possiblerefers tobuffer.data.to_vec().into(). SinceDataBuffer.datais a slice (&[u8]), a copy is unavoidable when converting to ownedBytes. The current approach is correct—consider removing the TODO or noting that ownership transfer from the C++ layer requires this copy.livekit-datatrack/src/local/mod.rs (1)
143-149: Consider validating non-empty track name.The documentation states the name "Must not be empty", but
DataTrackOptions::newdoesn't enforce this. Consider adding validation:🛡️ Proposed fix to validate name
pub fn new(name: impl Into<String>) -> Self { - Self { name: name.into() } + let name = name.into(); + debug_assert!(!name.is_empty(), "Track name must not be empty"); + Self { name } }Alternatively, return a
Resultto enforce at runtime, or document that empty names result in undefined behavior.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (66)
Cargo.tomlexamples/agent_dispatch/Cargo.tomlexamples/api/Cargo.tomlexamples/basic_data_track/Cargo.tomlexamples/basic_data_track/README.mdexamples/basic_data_track/src/publisher.rsexamples/basic_data_track/src/subscriber.rsexamples/basic_room/Cargo.tomlexamples/basic_text_stream/Cargo.tomlexamples/encrypted_text_stream/Cargo.tomlexamples/local_audio/Cargo.tomlexamples/local_video/Cargo.tomlexamples/mobile/Cargo.tomlexamples/play_from_disk/Cargo.tomlexamples/rpc/Cargo.tomlexamples/save_to_disk/Cargo.tomlexamples/screensharing/Cargo.tomlexamples/send_bytes/Cargo.tomlexamples/webhooks/Cargo.tomlexamples/wgpu_room/Cargo.tomllivekit-datatrack/Cargo.tomllivekit-datatrack/README.mdlivekit-datatrack/src/e2ee.rslivekit-datatrack/src/error.rslivekit-datatrack/src/frame.rslivekit-datatrack/src/lib.rslivekit-datatrack/src/local/events.rslivekit-datatrack/src/local/manager.rslivekit-datatrack/src/local/mod.rslivekit-datatrack/src/local/packetizer.rslivekit-datatrack/src/local/pipeline.rslivekit-datatrack/src/local/proto.rslivekit-datatrack/src/packet/deserialize.rslivekit-datatrack/src/packet/extension.rslivekit-datatrack/src/packet/handle.rslivekit-datatrack/src/packet/mod.rslivekit-datatrack/src/packet/serialize.rslivekit-datatrack/src/packet/time.rslivekit-datatrack/src/remote/depacketizer.rslivekit-datatrack/src/remote/events.rslivekit-datatrack/src/remote/manager.rslivekit-datatrack/src/remote/mod.rslivekit-datatrack/src/remote/pipeline.rslivekit-datatrack/src/remote/proto.rslivekit-datatrack/src/track.rslivekit-datatrack/src/utils/bytes.rslivekit-datatrack/src/utils/counter.rslivekit-datatrack/src/utils/mod.rslivekit-ffi/src/conversion/participant.rslivekit/Cargo.tomllivekit/src/prelude.rslivekit/src/proto.rslivekit/src/room/data_track.rslivekit/src/room/e2ee/data_track.rslivekit/src/room/e2ee/manager.rslivekit/src/room/e2ee/mod.rslivekit/src/room/mod.rslivekit/src/room/participant/local_participant.rslivekit/src/room/participant/mod.rslivekit/src/rtc_engine/mod.rslivekit/src/rtc_engine/rtc_events.rslivekit/src/rtc_engine/rtc_session.rslivekit/tests/common/e2e/mod.rslivekit/tests/data_channel_encryption.rslivekit/tests/data_track_test.rsrelease-plz.toml
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:04.638Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:04.638Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:04.638Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
Applied to files:
livekit/src/room/e2ee/mod.rslivekit/src/prelude.rsexamples/basic_data_track/README.mdlivekit-datatrack/src/local/packetizer.rslivekit/tests/data_track_test.rslivekit-datatrack/src/remote/proto.rslivekit-datatrack/src/local/pipeline.rslivekit-datatrack/src/lib.rslivekit/src/room/data_track.rslivekit-datatrack/README.mdlivekit/src/room/participant/local_participant.rslivekit-datatrack/src/packet/handle.rslivekit-datatrack/src/remote/pipeline.rsexamples/basic_data_track/src/publisher.rslivekit/Cargo.tomllivekit-datatrack/src/remote/mod.rslivekit-datatrack/src/remote/depacketizer.rslivekit-datatrack/src/local/events.rslivekit-datatrack/src/local/proto.rslivekit-datatrack/src/packet/mod.rslivekit/src/room/mod.rslivekit-datatrack/src/remote/manager.rsexamples/basic_data_track/Cargo.tomlexamples/basic_data_track/src/subscriber.rsCargo.tomllivekit-datatrack/src/remote/events.rslivekit-datatrack/src/local/manager.rslivekit-datatrack/Cargo.tomllivekit/src/rtc_engine/mod.rslivekit-datatrack/src/track.rslivekit-datatrack/src/frame.rslivekit/src/rtc_engine/rtc_session.rslivekit-datatrack/src/local/mod.rs
🧬 Code graph analysis (21)
livekit-datatrack/src/local/packetizer.rs (3)
livekit-datatrack/src/packet/time.rs (2)
new(54-56)random(23-25)livekit-datatrack/src/utils/counter.rs (1)
new(21-23)livekit-datatrack/src/packet/handle.rs (2)
from(52-54)from(58-60)
livekit/tests/data_track_test.rs (3)
livekit-datatrack/src/track.rs (1)
uses_e2ee(91-93)livekit-datatrack/src/remote/mod.rs (1)
publisher_identity(100-102)livekit-datatrack/src/frame.rs (2)
payload(41-43)user_timestamp(46-48)
livekit-datatrack/src/remote/proto.rs (2)
livekit-datatrack/src/track.rs (3)
try_from(111-117)from(121-123)name(87-89)livekit-datatrack/src/packet/handle.rs (4)
try_from(34-39)try_from(45-48)from(52-54)from(58-60)
livekit-datatrack/src/lib.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
livekit/tests/data_channel_encryption.rs (1)
livekit/tests/common/e2e/mod.rs (1)
test_rooms_with_options(82-128)
livekit-datatrack/src/packet/deserialize.rs (3)
livekit-datatrack/src/packet/handle.rs (2)
from(52-54)from(58-60)livekit-datatrack/src/packet/time.rs (1)
from_ticks(27-29)livekit-datatrack/src/packet/serialize.rs (1)
packet(178-193)
livekit-datatrack/src/remote/pipeline.rs (1)
livekit-datatrack/src/remote/depacketizer.rs (3)
new(39-41)from(190-192)from(196-198)
examples/basic_data_track/src/publisher.rs (1)
examples/basic_data_track/src/subscriber.rs (1)
main(8-21)
livekit/src/rtc_engine/rtc_events.rs (1)
livekit/src/rtc_engine/rtc_session.rs (2)
data_channel(622-624)data_channel(1906-1922)
livekit-datatrack/src/remote/mod.rs (4)
livekit-runtime/src/dispatcher.rs (1)
timeout(115-128)livekit-datatrack/src/remote/depacketizer.rs (3)
new(39-41)from(190-192)from(196-198)livekit-datatrack/src/remote/pipeline.rs (1)
new(39-47)livekit-datatrack/src/track.rs (3)
info(38-40)published_rx(63-68)from(121-123)
livekit-datatrack/src/local/events.rs (1)
livekit-api/src/http_client.rs (1)
bytes(81-83)
livekit-datatrack/src/utils/mod.rs (1)
livekit-api/src/http_client.rs (1)
bytes(81-83)
livekit-datatrack/src/local/proto.rs (2)
livekit-datatrack/src/track.rs (5)
from(121-123)try_from(111-117)uses_e2ee(91-93)sid(83-85)name(87-89)livekit-datatrack/src/packet/handle.rs (4)
from(52-54)from(58-60)try_from(34-39)try_from(45-48)
livekit-datatrack/src/packet/mod.rs (2)
livekit-datatrack/src/packet/deserialize.rs (3)
deserialize(44-49)deserialize(53-94)deserialize(98-134)livekit-datatrack/src/packet/serialize.rs (1)
serialize(50-57)
livekit/src/room/mod.rs (1)
livekit-datatrack/src/local/proto.rs (1)
publish_responses_for_sync_state(113-120)
livekit-datatrack/src/remote/manager.rs (5)
livekit-datatrack/src/remote/mod.rs (4)
new(43-45)publisher_identity(100-102)published_rx(113-115)inner(47-52)livekit-datatrack/src/remote/pipeline.rs (2)
new(39-47)from(88-93)livekit-datatrack/src/track.rs (5)
info(38-40)sid(83-85)published_rx(63-68)from(121-123)uses_e2ee(91-93)livekit-runtime/src/dispatcher.rs (1)
timeout(115-128)livekit-datatrack/src/packet/handle.rs (2)
from(52-54)from(58-60)
examples/basic_data_track/src/subscriber.rs (2)
livekit-datatrack/src/remote/mod.rs (1)
publisher_identity(100-102)livekit-datatrack/src/frame.rs (1)
payload(41-43)
livekit-datatrack/src/packet/time.rs (1)
livekit-datatrack/src/local/packetizer.rs (1)
new(46-54)
livekit-datatrack/src/track.rs (2)
livekit-datatrack/src/local/mod.rs (1)
published_rx(110-112)livekit-datatrack/src/remote/mod.rs (1)
published_rx(113-115)
livekit-datatrack/src/frame.rs (2)
livekit-datatrack/src/local/pipeline.rs (1)
from(81-89)livekit-datatrack/src/remote/pipeline.rs (1)
from(88-93)
livekit/src/rtc_engine/rtc_session.rs (4)
livekit-datatrack/src/remote/proto.rs (1)
event_from_join(47-51)livekit-datatrack/src/local/proto.rs (1)
publish_result_from_request_response(76-92)livekit/src/rtc_engine/rtc_events.rs (1)
on_message(157-165)libwebrtc/src/native/data_channel.rs (2)
on_message(105-107)on_message(129-134)
🪛 GitHub Actions: Rust Formatting
livekit/src/room/e2ee/manager.rs
[error] 273-273: cargo fmt -- --check detected a formatting difference in manager.rs (added trailing comma after key_index parameter). Run 'cargo fmt' to fix code style issues.
livekit/src/room/e2ee/data_track.rs
[error] 31-31: cargo fmt -- --check detected a formatting difference in data_track.rs. Run 'cargo fmt' to fix code style issues.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (aarch64-linux-android)
- GitHub Check: Build (aarch64-unknown-linux-gnu)
- GitHub Check: Build (x86_64-unknown-linux-gnu)
- GitHub Check: Build (aarch64-apple-darwin)
- GitHub Check: Build (x86_64-linux-android)
- GitHub Check: Build (aarch64-apple-ios)
- GitHub Check: Build (x86_64-pc-windows-msvc)
- GitHub Check: Build (aarch64-apple-ios-sim)
- GitHub Check: Build (aarch64-pc-windows-msvc)
- GitHub Check: Build (x86_64-apple-darwin)
- GitHub Check: Build (armv7-linux-androideabi)
- GitHub Check: Test (x86_64-pc-windows-msvc)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-apple-darwin)
🔇 Additional comments (122)
livekit/src/proto.rs (1)
149-159: LGTM — formatting-only change.
Trailing comma is fine and keeps the match arm consistent.livekit-ffi/src/conversion/participant.rs (1)
61-61: LGTM! Good formatting improvement.Adding trailing commas to the last match arms follows Rust best practices and makes future additions cleaner.
Also applies to: 75-75
release-plz.toml (1)
87-91: LGTM!The release configuration for the new
livekit-datatrackcrate is properly set up with changelog path, publish, and release settings consistent with other workspace packages.examples/webhooks/Cargo.toml (1)
4-10: LGTM!The explicit edition and tokio dependency declaration align with the broader migration pattern in this PR.
Cargo.toml (2)
9-9: LGTM!The
livekit-datatrackcrate is correctly added as a workspace member and dependency with appropriate version pinning.Also applies to: 47-47
60-62: New workspace dependencies look appropriate.The additions of
from_variants,futures-core, andtokio-streamare reasonable supporting dependencies for the new data tracks functionality.Also applies to: 75-75
livekit/tests/data_channel_encryption.rs (1)
52-52: LGTM!The
.into()conversions correctly adaptRoomOptionsto the expectedTestRoomOptionstype required bytest_rooms_with_options, aligning with the function signature that expectsIntoIterator<Item = TestRoomOptions>.examples/api/Cargo.toml (1)
4-8: LGTM!The explicit edition and tokio dependency with
parking_lotfeature are appropriate for this API example.examples/basic_room/Cargo.toml (1)
4-4: LGTM.
Manifest pinning and explicit edition are clear and consistent with the workspace direction.Also applies to: 8-12
examples/agent_dispatch/Cargo.toml (1)
4-4: LGTM.
Explicit dependency pins and edition update look fine.Also applies to: 8-13
examples/basic_data_track/Cargo.toml (1)
1-21: LGTM.
New example manifest is clean and well-structured.examples/local_video/Cargo.toml (1)
4-4: LGTM.
Pin updates and edition change are straightforward.Also applies to: 26-26, 32-35, 42-43
livekit/src/room/participant/mod.rs (1)
48-49: LGTM.
Pure formatting change; no behavioral impact.Also applies to: 57-58
examples/mobile/Cargo.toml (1)
4-4: LGTM.
Manifest updates look consistent with the rest of the examples.Also applies to: 8-11, 15-15
livekit-datatrack/src/error.rs (1)
15-24: Concise internal error wrapper.Clear and lightweight; no issues spotted.
examples/basic_data_track/src/publisher.rs (3)
6-21: Clear connect→publish→shutdown flow.Easy to follow and cleanly structured.
24-27: Good placeholder for sensor input.Simple stub makes it easy to swap in real data later.
29-38: Push loop cadence is straightforward.Readable, predictable pacing for example use.
examples/save_to_disk/Cargo.toml (1)
4-11: Dependency versions are aligned with peer examples in the workspace.The explicit pins for tokio, bytes, and futures in this example are consistent with similar examples. Tokio uses the flexible
version = "1"pinning strategy common across save_to_disk, screensharing, play_from_disk, rpc, and other examples; bytes is uniquely declared here; and futures versions (0.3.28 vs. 0.3 elsewhere) fall within the same major.minor release band. No problematic drift detected.examples/encrypted_text_stream/Cargo.toml (1)
1-13: LGTM.livekit-datatrack/src/e2ee.rs (1)
22-56: API surface looks solid.livekit-datatrack/src/frame.rs (1)
19-124: Frame API and helpers look good.examples/play_from_disk/Cargo.toml (1)
1-12: LGTM.livekit-datatrack/src/utils/bytes.rs (1)
17-85: Chunking utilities and tests look good.livekit-datatrack/src/packet/handle.rs (1)
76-81: LGTM! Safe handle allocation with overflow protection.The
checked_addapproach correctly prevents integer overflow when allocating handles. The implementation ensures handle 0 is never allocated (starts at 1 after first increment).livekit-datatrack/src/remote/proto.rs (2)
25-39: LGTM! Clean protocol-to-event conversion.The
TryFromimplementation properly chains conversions for both handle and SID, collecting into aHashMapwith appropriate error propagation via?.
82-89: LGTM! Efficient ownership transfer withmem::take.Using
mem::taketo move ownership ofdata_trackswhile leaving an empty vector in place is an efficient pattern that avoids cloning.livekit-datatrack/src/local/packetizer.rs (2)
44-54: LGTM! Well-structured packetizer initialization.The constructor properly initializes counters and creates a randomized clock base timestamp, which helps avoid correlation attacks on timestamp values.
56-88: LGTM! Correct packetization with proper header propagation.The implementation correctly:
- Computes max payload size accounting for header overhead
- Handles MTU-too-short edge case
- Assigns proper frame markers (Single/Start/Inter/Final)
- Increments sequence numbers per packet while keeping frame_number constant within a frame
livekit-datatrack/src/packet/time.rs (2)
44-50: LGTM! Well-designed monotonic clock with wraparound handling.The
Clockstruct correctly tracks the previous timestamp to enforce monotonicity in RTP wraparound space, which is essential for media synchronization.
68-80: LGTM! Correct monotonicity enforcement.The
atmethod properly clamps the computed timestamp toprevwhen wraparound would cause a backwards jump. Theis_beforemethod using signed comparison of wrapping difference is the standard approach for RTP timestamp comparison.livekit-datatrack/src/local/events.rs (2)
24-44: LGTM! Clean event-driven architecture.The
InputEventandOutputEventenums provide a clear separation between events consumed by the manager and events produced for external handling. TheFromVariantsderive reduces boilerplate for event construction.
54-60: LGTM! Appropriate use of oneshot channel for async result.Using
oneshot::Senderfor the publish result allows the caller to await the outcome without blocking the manager's event loop.livekit/Cargo.toml (1)
33-33: LGTM! New data track dependency added.The
livekit-datatrackworkspace dependency enables the data tracks feature integration.livekit-datatrack/src/track.rs (4)
36-47: LGTM! Clean accessor methods with proper delegation.The
info()andis_published()methods correctly delegate to the underlying state, withis_published()borrowing from the watch receiver appropriately.
54-61: LGTM! Correct async unpublish wait implementation.The method properly handles the already-unpublished case with an early return, and the
wait_forcall correctly waits for the condition!*is_published. Discarding the result with_ =is acceptable since channel closure implies the track is gone.
108-117: LGTM! Appropriate SID validation.The
TryFrom<String>implementation correctly validates theDTR_prefix requirement, providing a type-safe way to constructDataTrackSidvalues.
132-144: LGTM! Well-designed test dummy with Base57 alphabet.The fake implementation generates valid SIDs using a Base57 alphabet (excluding ambiguous characters like 0, O, I, l), which is a common practice for human-readable identifiers.
livekit-datatrack/src/local/proto.rs (4)
26-38: Publish/unpublish request mapping is consistent.E2EE flag and handles are cleanly translated into protocol fields.
42-74: Response/DataTrackInfo conversions look solid.The error context and encryption mapping are clear and consistent.
96-119: Sync‑state helpers look good.The encryption mapping and response construction are straightforward.
122-204: Conversion tests provide good coverage.Nice breadth across publish/unpublish and sync-state flows.
livekit-datatrack/src/remote/depacketizer.rs (3)
20-41: Depacketizer setup is straightforward.Structs and constructor are clear and minimal.
43-133: Packet ingestion logic reads well.The single/start/inter/final routing and drop handling are clean.
162-238: Result/error types are clear and well‑documented.Nice separation between drop reasons and payload delivery.
livekit-datatrack/src/remote/pipeline.rs (5)
23-35: Pipeline option/struct definitions are clean.
49-85: Packet processing/decryption flow looks consistent.
87-93: Frame conversion is straightforward.
96-123: Test validates the non‑E2EE path well.
37-47: The E2EE/provider invariant is already enforced at all call sites.The invariant cannot be violated at runtime. In
manager.rs(line 253-257), thedecryption_provideris set conditionally based onuses_e2ee(), guaranteeing the invariant beforePipeline::new()is called. Thedebug_assert_eq!serves as a defensive sanity check, not as the sole point of enforcement. Since the invariant is structurally guaranteed by the calling code, changing toassert_eq!would add unnecessary runtime overhead in release builds without practical safety benefit.livekit-datatrack/README.md (1)
1-6: Clear guidance on internal crate usage.livekit/src/room/e2ee/mod.rs (1)
22-23: Data‑track module wiring looks good.livekit/src/room/data_track.rs (1)
15-16: Re-export keeps the data‑track API discoverable.livekit/src/prelude.rs (1)
17-34: Prelude exports for data tracks are coherent.livekit-datatrack/src/utils/mod.rs (1)
15-21: Nice consolidated utils module.
Centralized re-exports keep the Bytes and counter helpers discoverable.livekit/src/rtc_engine/rtc_events.rs (2)
21-24: Label imports are now centralized.
Pulling the DC labels fromrtc_sessionkeeps constants co-located with their definitions.
100-108: Label-based dispatch is clearer.
Explicit matching avoids accidentally wiring unknown labels to the data handler.livekit/src/room/participant/local_participant.rs (3)
34-35: Imports align with the new data-track API.
The added data_track types are consistent with the new publish path.
253-285: publish_data_track wiring looks solid.
Session validation and error propagation are clean and minimal.
543-547: Clear deprecation guidance for publish_data.
The doc note gives users a good forward path.livekit-datatrack/src/packet/extension.rs (2)
17-33: Typed extension structs are well-shaped.
Clear separation of user timestamp and E2EE metadata.
35-52: E2EE debug redaction and metadata constants look good.
Avoiding sensitive field output is the right call.livekit-datatrack/src/local/pipeline.rs (6)
25-43: Pipeline scaffolding and error unification are clean.
Options, state, and error types are concise and composable.
53-57: process_frame flow is straightforward.
Encrypt-then-packetize ordering is clear and easy to follow.
59-74: Conditional encryption and extension injection are correct.
The E2EE extension is attached only when encryption is active.
76-77: Explicit MTU constant improves clarity.
Having it centralized will help future tuning.
80-89: Frame-to-packetizer conversion preserves timestamps.
The user timestamp mapping is handled cleanly.
92-120: Good coverage for non‑E2EE packetization.
The test validates packet count and payload integrity.livekit-datatrack/src/lib.rs (3)
17-39: Module layout is clear and comprehensive.
The separation of track/local/remote/frame/e2ee/packet is clean.
41-44: Public API re-export is tidy.
Theapimodule keeps the surface cohesive.
46-58: Backend re-exports are well organized.
The nested local/remote modules mirror internal structure nicely.livekit-datatrack/src/utils/counter.rs (3)
15-36: Counter API is concise and usable.
The get / get_then_increment split is clear.
38-41: WrappingIncrement trait is straightforward.
Good abstraction for wrap-around behavior.
43-53: Macro-based impls reduce boilerplate cleanly.
The unsigned coverage looks appropriate.livekit/tests/data_track_test.rs (6)
36-101: Well-structured parameterized test for data track publishing and subscribing.The test covers both high FPS/small payload and low FPS/large payload scenarios with appropriate timing and frame count validation. The 95% minimum receive threshold appropriately accounts for the intentional packet dropping behavior on backpressure.
103-140: Good stress test for publishing many tracks.Testing 256 concurrent tracks with large frame payloads is a solid stress test for the data track infrastructure.
142-171: Authorization and duplicate name error handling tests look correct.These tests properly verify that the API returns appropriate error variants for unauthorized publish attempts and duplicate track names.
173-233: E2EE test properly validates encrypted data track flow.The test correctly sets up shared key providers on both sides and verifies that
uses_e2ee()returns true for encrypted tracks.
360-451: Fault resilience tests cover important reconnection scenarios.Testing both signal reconnect and full reconnect (ForceTcp) scenarios for both subscriber and publisher sides ensures data tracks remain functional across connection disruptions.
453-464: Helper function is clean and reusable.The
wait_for_remote_trackhelper properly filters room events for remote data track publications.livekit-datatrack/src/packet/serialize.rs (5)
19-26: Clear error types for serialization failures.The
SerializeErrorenum provides specific variants for buffer capacity issues, making debugging straightforward.
28-58: Packet serialization implementation is correct.The
serialize_intomethod properly checks buffer capacity before writing, andserializeallocates the exact required capacity upfront.
76-128: Header serialization handles extensions correctly.The extension word calculation, padding, and conditional extension block emission are implemented correctly. The assertion at line 125 provides a useful sanity check during development.
153-168: Extension serialization follows the expected wire format.Both E2EE and user timestamp extensions correctly write their tag, length (minus 1 per protocol), and payload.
170-241: Comprehensive serialization tests.The tests validate header metrics calculations, total serialized length, and byte-level serialization correctness including extension ordering and padding.
livekit/tests/common/e2e/mod.rs (2)
47-74: Clean abstraction for test room configuration.The
TestRoomOptionsstruct nicely encapsulates both token grants and room options, with convenientFromimplementations for common use cases.
76-103: Test helper functions properly updated.The
test_roomsandtest_rooms_with_optionsfunctions correctly use the newTestRoomOptionsstructure, with proper room name injection into grants.livekit/src/room/e2ee/manager.rs (1)
250-284: API changes align well with data track integration.The updated signatures for
decrypt_dataandencrypt_datataking ownedVec<u8>are appropriate for the data track use case. TheDebugimplementation provides useful observability.livekit/src/room/mod.rs (7)
248-250: NewRemoteDataTrackPublishedevent variant expands the public API appropriately.This event allows consumers to react to remote data track publications.
455-468: Room session and handle correctly track data track managers and tasks.The addition of
local_dt_input,remote_dt_input, and the corresponding task handles ensures proper lifecycle management.
989-994: Data track input events are correctly routed to managers.The engine events for local and remote data track inputs are properly forwarded to their respective manager inputs.
1000-1022: Close path properly shuts down data track tasks.The shutdown sequence sends shutdown events to managers and awaits all task handles.
1271-1298: Sync state correctly includes data track publications.The
publish_data_tracksfield is populated from the local data track manager's query, ensuring the SFU has accurate state after reconnection.
1423-1428: Restart correctly resends data track subscription updates.This ensures the SFU continues delivering packets for existing subscriptions after a full reconnect.
1878-1922: Forward tasks properly bridge manager outputs to engine/dispatcher.The
local_dt_forward_taskandremote_dt_forward_taskcorrectly handle shutdown signals and forward events appropriately. TheRemoteDataTrackPublishedevent is dispatched directly while other remote events go to the engine.livekit/src/room/e2ee/data_track.rs (1)
52-81: Decryption provider correctly bridges to E2eeManager.The implementation properly extracts fields from
EncryptedPayloadand delegates todecrypt_data.livekit-datatrack/src/packet/deserialize.rs (4)
22-41: Comprehensive error types for deserialization failures.The
DeserializeErrorenum covers all failure modes including version mismatches, buffer issues, and malformed extensions.
43-50: Packet deserialization entry point is clean.Correctly deserializes the header first, then extracts the remaining bytes as payload.
52-95: Header deserialization handles all fields correctly.The implementation properly validates buffer length, checks version support, parses flags, and conditionally handles extensions.
137-275: Comprehensive test coverage for deserialization.The tests cover buffer boundary conditions, version validation, extension parsing (including padding and unknown extensions), and alignment requirements.
livekit/src/rtc_engine/mod.rs (3)
192-193: LGTM!The new
EngineEventvariants for data track input follow the established pattern for other event types in the engine.
276-292: LGTM!The
handle_local_data_track_outputandhandle_remote_data_track_outputmethods correctly follow the existing pattern used by other engine methods (e.g.,publish_data,simulate_scenario) with proper reconnection handling.
632-637: LGTM!Session event forwarding for data track inputs follows the established pattern used by other session events in
on_session_event.livekit-datatrack/src/remote/events.rs (1)
24-54: LGTM!Well-structured event types with clear documentation. The
FromVariantsderive reduces boilerplate for enum conversions, and the doc comments appropriately reference the corresponding protocol types.livekit-datatrack/src/remote/manager.rs (2)
309-311: Intentional packet dropping on backpressure—correctly implemented.The
try_sendusage here aligns with the design decision that data tracks don't guarantee reliable delivery. Theinspect_errfor debug logging is appropriate.Based on learnings: "data tracks do not guarantee reliable delivery in the initial version. The
process_and_sendmethod intentionally usestry_sendto drop packets on backpressure."
426-559: LGTM!Good test coverage for manager lifecycle, track task shutdown paths (via unpublish and unsubscribe), and the subscribe flow including SFU simulation. The use of
fakefor test data and timeouts for async tests is appropriate.livekit-datatrack/src/packet/mod.rs (2)
28-66: LGTM!Well-designed packet structure with appropriate derives. The custom
Debugimplementation that showspayload_leninstead of dumping the full payload is a nice touch for debugging without log spam.
92-109: LGTM!Good round-trip test that validates serialization/deserialization consistency. Using
fake::Fakerfor generating test data ensures diverse input coverage.livekit/src/rtc_engine/rtc_session.rs (5)
468-477: LGTM!Good code reuse—extracting
lossy_optionsand sharing it betweenlossy_dcanddt_transportreduces duplication and ensures consistent configuration.
627-673: LGTM!The
handle_local_data_track_outputandhandle_remote_data_track_outputmethods cleanly dispatch output events to the appropriate signaling or transport paths. The pattern matches are exhaustive for the current variants.
1074-1080: Early return short-circuits pending request handling.When
publish_result_from_request_responsereturnsSome, the method returns early without checkingpending_requests. This assumes publish responses never have associated pending requests, which appears correct but worth noting.
1144-1156: LGTM!Clean extension of the data channel handling to support
DATA_TRACK_DC_LABEL. Thehandle_remote_dt_packetscall properly sets up the message handler before storing the channel.
1848-1904: LGTM!Good refactoring to extract
ensure_publisher_connected_with_dcas a shared helper. The newensure_data_track_publisher_connectedcorrectly reuses this for the data track transport.livekit-datatrack/src/local/mod.rs (3)
83-91: LGTM!The
try_pushmethod correctly validates publication status before sending and properly maps channel errors toPushFrameError. Returning the frame on error enables application-level retry logic.
115-120: LGTM!The
Dropimplementation ensures the track is marked unpublished when the handle is dropped, preventing resource leaks and ensuring consistent state in the manager.
164-237: LGTM!Comprehensive error types with clear documentation. The
PushFrameErrordesign allowing recovery of the frame viainto_frame()is user-friendly for retry scenarios.livekit-datatrack/src/local/manager.rs (6)
32-101: Manager scaffolding and event loop are clear.No concerns with the options wiring, channel setup, or shutdown flow.
103-183: Publish/query/unpublish request flow looks consistent.Forwarding results and query handling read cleanly.
202-234: Track creation and descriptor transition look solid.
268-296: TrackTask loop handles publish state cleanly.
390-463: Tests cover shutdown and publish/unpublish flow well.
298-331: Backpressure handling is appropriate for non-guaranteed delivery semantics.Using
try_sendto drop packets under backpressure is the intended behavior for this non-reliable data track implementation. The error is logged but not propagated, which aligns with the design.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| /// Subscribe to the first data track published. | ||
| async fn handle_first_publication(mut rx: UnboundedReceiver<RoomEvent>) -> Result<()> { | ||
| log::info!("Waiting for publication…"); | ||
| while let Some(event) = rx.recv().await { | ||
| let RoomEvent::RemoteDataTrackPublished(track) = event else { | ||
| continue; | ||
| }; | ||
| subscribe(track).await? | ||
| } |
There was a problem hiding this comment.
Stop after the first publication to match the function intent.
The helper says “first data track,” but it keeps listening after the first track completes. Consider returning immediately after the first subscription.
✅ Suggested fix
- subscribe(track).await?
+ subscribe(track).await?;
+ return Ok(());📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Subscribe to the first data track published. | |
| async fn handle_first_publication(mut rx: UnboundedReceiver<RoomEvent>) -> Result<()> { | |
| log::info!("Waiting for publication…"); | |
| while let Some(event) = rx.recv().await { | |
| let RoomEvent::RemoteDataTrackPublished(track) = event else { | |
| continue; | |
| }; | |
| subscribe(track).await? | |
| } | |
| /// Subscribe to the first data track published. | |
| async fn handle_first_publication(mut rx: UnboundedReceiver<RoomEvent>) -> Result<()> { | |
| log::info!("Waiting for publication…"); | |
| while let Some(event) = rx.recv().await { | |
| let RoomEvent::RemoteDataTrackPublished(track) = event else { | |
| continue; | |
| }; | |
| subscribe(track).await?; | |
| return Ok(()); | |
| } |
🤖 Prompt for AI Agents
In `@examples/basic_data_track/src/subscriber.rs` around lines 23 - 31, The
function handle_first_publication currently continues listening after
subscribing to the first RemoteDataTrack; update it to stop after the first
successful subscription by returning (or breaking and returning) immediately
once subscribe(track).await? completes. Locate the loop in
handle_first_publication and add an early return (e.g., return Ok(())) right
after the subscribe(track).await? call so the function only handles the first
publication.
| edition = "2021" | ||
| publish = false | ||
|
|
||
| [dependencies] | ||
| tokio = { workspace = true, features = ["full"] } | ||
| env_logger = { workspace = true } | ||
| tokio = { version = "1", features = ["full"] } | ||
| env_logger = "0.10" | ||
| livekit = { workspace = true, features = ["rustls-tls-native-roots"] } | ||
| livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] } | ||
| libwebrtc = { workspace = true } | ||
| log = { workspace = true } | ||
| log = "0.4" | ||
| cpal = "0.15" | ||
| anyhow = { workspace = true } | ||
| clap = { workspace = true, features = ["derive"] } | ||
| futures-util = { workspace = true } | ||
| anyhow = "1.0" | ||
| clap = { version = "4.0", features = ["derive"] } | ||
| futures-util = "0.3" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find all Cargo.toml files and extract versions for the specified dependencies
echo "=== Searching for dependency versions in Cargo.toml files ==="
# First, let's find and list all Cargo.toml files
echo "--- All Cargo.toml files in workspace ---"
find . -name "Cargo.toml" -type f | head -20
echo ""
echo "--- Root workspace Cargo.toml ---"
if [ -f "Cargo.toml" ]; then
echo "Root Cargo.toml found"
echo "Contents of root Cargo.toml (first 50 lines):"
head -50 Cargo.toml
fi
echo ""
echo "--- examples/local_audio/Cargo.toml ---"
if [ -f "examples/local_audio/Cargo.toml" ]; then
echo "File exists"
cat examples/local_audio/Cargo.toml
fi
echo ""
echo "--- Searching for tokio versions ---"
rg "tokio\s*=" Cargo.toml -A 1 | head -40Repository: livekit/rust-sdks
Length of output: 2889
🏁 Script executed:
#!/bin/bash
echo "=== Checking for dependency versions across ALL examples ==="
# Search for each dependency in all Cargo.toml files
for dep in "tokio" "env_logger" "log" "anyhow" "clap" "futures-util"; do
echo ""
echo "--- $dep versions across workspace ---"
rg "^\s*$dep\s*=" --type toml | grep -E "(examples|Cargo)"
doneRepository: livekit/rust-sdks
Length of output: 6378
Align dependency versions with workspace defaults.
The example declares explicit versions that drift from the root workspace: env_logger is pinned to 0.10 while the workspace defines 0.11, and clap is pinned to 4.0 while the workspace defines 4.5. Update these to match the workspace versions unless there's a specific reason for the deviation.
🤖 Prompt for AI Agents
In `@examples/local_audio/Cargo.toml` around lines 4 - 17, Update the example
Cargo.toml dependencies to match the workspace defaults by aligning env_logger
and clap versions with the workspace (remove the explicit version pins or set
them to the workspace versions) so env_logger uses 0.11 and clap uses 4.5;
update the dependency entries for env_logger and clap in the
examples/local_audio Cargo.toml (the dependency keys "env_logger" and "clap") to
inherit or match the workspace versions.
| edition = "2021" | ||
| publish = false | ||
|
|
||
| [dependencies] | ||
| tokio = { workspace = true, features = ["full"] } | ||
| env_logger = { workspace = true } | ||
| tokio = { version = "1", features = ["full"] } | ||
| env_logger = "0.10" | ||
| livekit = { workspace = true, features = ["rustls-tls-native-roots"] } | ||
| livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] } | ||
| log = { workspace = true } | ||
| rand = { workspace = true } | ||
| serde_json = { workspace = true } | ||
| log = "0.4" | ||
| rand = "0.8" | ||
| serde_json = "1.0" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for explicit version declarations of these dependencies in Cargo.toml files
rg -n '^(tokio|env_logger|log|rand|serde_json)\s*=' Cargo.tomlRepository: livekit/rust-sdks
Length of output: 151
Update examples/rpc/Cargo.toml dependencies to match workspace versions.
Version drift detected: env_logger is pinned to "0.10" but workspace uses "0.11"; rand is pinned to "0.8" but workspace uses "0.9". Update these to "0.11" and "0.9" respectively to maintain consistency across the workspace.
🤖 Prompt for AI Agents
In `@examples/rpc/Cargo.toml` around lines 4 - 14, Update the dependency versions
in the examples/rpc Cargo.toml so they match the workspace: change the
env_logger dependency from "0.10" to "0.11" and change rand from "0.8" to "0.9"
(the entries for env_logger and rand in the examples/rpc Cargo.toml); leave
other dependency entries unchanged.
| edition = "2021" | ||
| publish = false | ||
|
|
||
| [dependencies] | ||
| tokio = { workspace = true, features = ["full"] } | ||
| env_logger = { workspace = true } | ||
| tokio = { version = "1", features = ["full"] } | ||
| env_logger = "0.10" | ||
| livekit = { workspace = true, features = ["rustls-tls-native-roots"] } | ||
| livekit-api = { workspace = true } | ||
| log = { workspace = true } | ||
| clap = { workspace = true, features = ["derive"] } | ||
| log = "0.4" | ||
| clap = { version = "4.0", features = ["derive"] } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# List versions of deps used here across the workspace.
rg -n '^(tokio|env_logger|log|clap)\s*=' -g 'Cargo.toml'Repository: livekit/rust-sdks
Length of output: 4918
Align example dependency versions with workspace root Cargo.toml.
This example declares explicit versions for dependencies, but they drift from the workspace root:
env_logger = "0.10"should be"0.11"(root specifies 0.11)clap = "4.0"should be"4.5"(root specifies 4.5)
Consistent pinning across examples reduces confusion about which versions are tested.
🤖 Prompt for AI Agents
In `@examples/screensharing/Cargo.toml` around lines 4 - 13, Update the example
Cargo.toml dependencies to match the workspace root: change env_logger from
"0.10" to "0.11" and change clap from "4.0" to "4.5" so the declared versions
for env_logger and clap in the examples align with the workspace root; verify
the dependency entries for env_logger and clap in the
examples/screensharing/Cargo.toml are updated accordingly while leaving other
entries (tokio, livekit, livekit-api, log) unchanged.
| edition = "2021" | ||
| publish = false | ||
|
|
||
| [features] | ||
| default = [] | ||
| tracing = ["console-subscriber", "tokio/tracing"] | ||
|
|
||
| [dependencies] | ||
| tokio = { workspace = true, features = ["full", "parking_lot"] } | ||
| tokio = { version = "1", features = ["full", "parking_lot"] } | ||
| livekit = { workspace = true, features = ["rustls-tls-native-roots"]} | ||
| webrtc-sys = { workspace = true } | ||
| futures = { workspace = true } | ||
| futures = "0.3" | ||
| winit = { version = "0.30.11", features = [ "android-native-activity" ] } | ||
| parking_lot = { workspace = true, features = ["deadlock_detection"] } | ||
| parking_lot = { version = "0.12.1", features = ["deadlock_detection"] } | ||
| image = "0.24" | ||
| wgpu = "25.0" | ||
| egui = "0.31.1" | ||
| egui-wgpu = "0.31.1" | ||
| eframe = { version = "0.31.1", default-features = false, features = ["default_fonts", "wgpu", "persistence"] } | ||
| serde = { workspace = true, features = ["derive"] } | ||
| log = { workspace = true } | ||
| env_logger = { workspace = true } | ||
| console-subscriber = { workspace = true, features = ["parking_lot"], optional = true } | ||
| serde = { version = "1", features = ["derive"] } | ||
| log = "0.4" | ||
| env_logger = "0.10.0" | ||
| console-subscriber = { version = "0.1.10", features = ["parking_lot"], optional = true } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, find the root Cargo.toml and workspace members
echo "=== Root workspace structure ==="
cat Cargo.toml | head -30
echo ""
echo "=== Searching for dependency versions across workspace Cargo.toml files ==="
# Search for the specific dependencies mentioned in the review
rg -A 1 '^\[dependencies\]' -g 'Cargo.toml' | head -200
echo ""
echo "=== Checking versions of specific dependencies ==="
# More focused search for each dependency version
for dep in tokio futures parking_lot serde log env_logger console-subscriber; do
echo "--- $dep versions ---"
rg "$dep\s*=" -g 'Cargo.toml' --max-count 10
doneRepository: livekit/rust-sdks
Length of output: 10521
Pin dependency versions to workspace definitions or accept version drift explicitly.
This example declares explicit versions that diverge from the workspace root:
env_logger: root defines"0.11"but this uses"0.10.0"parking_lot: root defines"0.12"but this uses"0.12.1"
Either use { workspace = true } for consistency, accept the drift, or document why these versions differ.
🤖 Prompt for AI Agents
In `@examples/wgpu_room/Cargo.toml` around lines 4 - 26, The Cargo.toml example
pins env_logger = "0.10.0" and parking_lot = "0.12.1" which diverge from the
workspace root versions; update the dependency declarations for env_logger and
parking_lot to either use workspace = true (e.g., env_logger = { workspace =
true } and parking_lot = { workspace = true, features = ["deadlock_detection"]
}) to inherit the root versions, or explicitly change the versions to match the
workspace root (env_logger "0.11" and parking_lot "0.12") and document the
intentional drift in a comment if you choose to keep different versions; adjust
only the entries for env_logger and parking_lot in this Cargo.toml.
| fn event_from_participant_info( | ||
| msg: &mut [proto::ParticipantInfo], | ||
| local_participant_identity: Option<&str>, | ||
| ) -> Result<SfuPublicationUpdates, InternalError> { | ||
| let updates = msg | ||
| .iter_mut() | ||
| .filter(|participant| { | ||
| local_participant_identity.is_none_or(|identity| participant.identity != identity) | ||
| }) | ||
| .map(|participant| -> Result<_, InternalError> { | ||
| Ok((participant.identity.clone(), extract_track_info(participant)?)) | ||
| }) | ||
| .collect::<Result<HashMap<String, Vec<DataTrackInfo>>, _>>()?; | ||
| Ok(SfuPublicationUpdates { updates }) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check for MSRV declarations in Cargo.toml files
rg -n 'rust-version|msrv' --type=tomlRepository: livekit/rust-sdks
Length of output: 43
🏁 Script executed:
#!/bin/bash
# Find all Cargo.toml files in the repository
fd -n 'Cargo.toml$' -exec cat -n {} +Repository: livekit/rust-sdks
Length of output: 291
🏁 Script executed:
#!/bin/bash
# Find all Cargo.toml files and display their contents
fd 'Cargo.toml$' -exec cat {} +Repository: livekit/rust-sdks
Length of output: 248
🏁 Script executed:
#!/bin/bash
# Find all Cargo.toml files
fd 'Cargo.toml$'Repository: livekit/rust-sdks
Length of output: 926
🏁 Script executed:
#!/bin/bash
# Check root Cargo.toml and the relevant package Cargo.toml
echo "=== ROOT CARGO.TOML ==="
cat Cargo.toml
echo -e "\n=== LIVEKIT-DATATRACK CARGO.TOML ==="
cat livekit-datatrack/Cargo.tomlRepository: livekit/rust-sdks
Length of output: 3054
Set MSRV to 1.82 or replace is_none_or with a compatible alternative.
The code uses Option::is_none_or (stabilized in Rust 1.82), but the project has no explicit rust-version declaration. Edition 2021 defaults to Rust 1.56 minimum, meaning users with Rust 1.56–1.81 will fail to compile this code.
Either add rust-version = "1.82" to livekit-datatrack/Cargo.toml or replace is_none_or with .map_or(true, |identity| ...) to maintain compatibility.
🤖 Prompt for AI Agents
In `@livekit-datatrack/src/remote/proto.rs` around lines 66 - 80, The call to
Option::is_none_or in event_from_participant_info (using
local_participant_identity.is_none_or(...)) requires Rust 1.82; either set the
crate MSRV by adding rust-version = "1.82" to the crate's Cargo.toml, or replace
the is_none_or usage with a compatible alternative such as
local_participant_identity.map_or(true, |identity| participant.identity !=
identity) inside event_from_participant_info so extract_track_info and the
participant filtering work on older compilers.
| tokio = { version = "1", default-features = false, features = ["sync", "macros", "fs"] } | ||
| parking_lot = { version = "0.12" } | ||
| futures-util = { version = "0.3", default-features = false, features = ["sink"] } | ||
| thiserror = "1.0" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check thiserror versions across the workspace
rg -n 'thiserror' --type=toml -g 'Cargo.toml'Repository: livekit/rust-sdks
Length of output: 525
Align thiserror versions across workspace: 1.0 vs 2.0.17 mismatch.
The workspace uses thiserror = "1.0" as the default (root Cargo.toml and most crates via workspace = true), but livekit-datatrack explicitly specifies thiserror = "2.0.17". This creates two different versions in the dependency graph, risking incompatible error types if errors cross crate boundaries. Notably, livekit-datatrack includes a TODO comment acknowledging this: # TODO: upgrade workspace version to 2.x.x. Upgrade the workspace version to 2.x.x and update all crates accordingly.
🤖 Prompt for AI Agents
In `@livekit/Cargo.toml` at line 40, The workspace is using thiserror = "1.0"
while livekit-datatrack pins thiserror = "2.0.17", causing duplicate versions;
update the root/workspace Cargo.toml's thiserror version to a 2.x range (e.g.,
"2.0") and then update each crate that currently inherits or specifies thiserror
(including the livekit crate entries and the livekit-datatrack Cargo.toml) to
the same 2.x version so all crates use a single thiserror v2 line; ensure
removal of any older explicit "1.0" entries and run cargo update to refresh
Cargo.lock so error types are unified across the workspace.
| /// Wrapper around [`E2eeManager`] implementing [`dt::EncryptionProvider`]. | ||
| #[derive(Debug)] | ||
| pub(crate) struct DataTrackEncryptionProvider { | ||
| manager: E2eeManager, | ||
| sender_identity: ParticipantIdentity, | ||
| } | ||
|
|
||
| impl DataTrackEncryptionProvider { | ||
| pub fn new(manager: E2eeManager, sender_identity: ParticipantIdentity) -> Self { | ||
| Self { manager, sender_identity } | ||
| } | ||
| } | ||
|
|
||
| impl dt::EncryptionProvider for DataTrackEncryptionProvider { | ||
| fn encrypt(&self, payload: bytes::Bytes) -> Result<dt::EncryptedPayload, dt::EncryptionError> { | ||
| let key_index = self | ||
| .manager | ||
| .key_provider() | ||
| .map_or(0, |kp| kp.get_latest_key_index() as u32); | ||
|
|
||
| let encrypted = self | ||
| .manager | ||
| .encrypt_data(payload.into(), &self.sender_identity, key_index) | ||
| .map_err(|_| dt::EncryptionError)?; | ||
|
|
||
| let payload = encrypted.data.into(); | ||
| let iv = encrypted.iv.try_into().map_err(|_| dt::EncryptionError)?; | ||
| let key_index = encrypted.key_index.try_into().map_err(|_| dt::EncryptionError)?; | ||
|
|
||
| Ok(dt::EncryptedPayload { payload, iv, key_index }) | ||
| } | ||
| } |
There was a problem hiding this comment.
Encryption provider implementation looks correct.
The provider properly retrieves the latest key index and delegates to E2eeManager.encrypt_data. The error handling maps all errors to dt::EncryptionError.
Note: The pipeline reports a formatting issue at line 31. Run cargo fmt to fix.
🔧 Run cargo fmt to fix formatting
cargo fmt🧰 Tools
🪛 GitHub Actions: Rust Formatting
[error] 31-31: cargo fmt -- --check detected a formatting difference in data_track.rs. Run 'cargo fmt' to fix code style issues.
🤖 Prompt for AI Agents
In `@livekit/src/room/e2ee/data_track.rs` around lines 19 - 50, The formatting in
DataTrackEncryptionProvider::encrypt (and surrounding code) doesn't match
rustfmt; run `cargo fmt` to reformat the file so the spacing/indentation around
the key_index and encrypted variable assignments is fixed; ensure
DataTrackEncryptionProvider, its new constructor, and the impl
dt::EncryptionProvider::encrypt (which calls E2eeManager::key_provider and
E2eeManager::encrypt_data) are left unchanged functionally and only reformatted.
| pub(crate) fn encrypt_data( | ||
| &self, | ||
| data: &[u8], | ||
| participant_identity: &str, | ||
| key_index: u32, | ||
| data: Vec<u8>, | ||
| participant_identity: &ParticipantIdentity, | ||
| key_index: u32 | ||
| ) -> Result<EncryptedPacket, Box<dyn std::error::Error>> { |
There was a problem hiding this comment.
Fix formatting to pass CI.
The pipeline reports a missing trailing comma after the key_index parameter.
🔧 Proposed fix
pub(crate) fn encrypt_data(
&self,
data: Vec<u8>,
participant_identity: &ParticipantIdentity,
- key_index: u32
+ key_index: u32,
) -> Result<EncryptedPacket, Box<dyn std::error::Error>> {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub(crate) fn encrypt_data( | |
| &self, | |
| data: &[u8], | |
| participant_identity: &str, | |
| key_index: u32, | |
| data: Vec<u8>, | |
| participant_identity: &ParticipantIdentity, | |
| key_index: u32 | |
| ) -> Result<EncryptedPacket, Box<dyn std::error::Error>> { | |
| pub(crate) fn encrypt_data( | |
| &self, | |
| data: Vec<u8>, | |
| participant_identity: &ParticipantIdentity, | |
| key_index: u32, | |
| ) -> Result<EncryptedPacket, Box<dyn std::error::Error>> { |
🧰 Tools
🪛 GitHub Actions: Rust Formatting
[error] 273-273: cargo fmt -- --check detected a formatting difference in manager.rs (added trailing comma after key_index parameter). Run 'cargo fmt' to fix code style issues.
🤖 Prompt for AI Agents
In `@livekit/src/room/e2ee/manager.rs` around lines 272 - 277, The function
signature for encrypt_data is missing a trailing comma after the key_index
parameter; update the signature in the encrypt_data method (the parameters:
data: Vec<u8>, participant_identity: &ParticipantIdentity, key_index: u32) to
include a trailing comma after key_index to satisfy formatting/CI requirements.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@livekit/src/rtc_engine/dc_sender.rs`:
- Around line 108-112: The log message has a grammatical typo: update the string
in the block that checks self.send_queue (where unsent_bytes is computed) to use
"byte(s)" instead of "bytes(s)"; i.e., modify the log::info! call that currently
prints "{} bytes(s) remain in queue" to print "{} byte(s) remain in queue" so
send_queue/unsent_bytes logging is correct.
- Around line 116-127: The buffered_amount is increased before calling dc.send()
in send_until_threshold, which leaves it inflated on send failures; change the
logic so buffered_amount is only incremented after a successful send or, more
minimally, decrement buffered_amount when dc.send() returns Err: update
send_until_threshold to attempt send first, and on Ok increment
self.buffered_amount by payload.len() as u64 (or if keeping the current order,
subtract payload.len() in the inspect_err closure), referencing the
send_until_threshold method, self.buffered_amount, self.send_queue, and dc.send
to locate and fix the update so BytesSent callbacks and error paths keep
buffered_amount consistent.
🧹 Nitpick comments (2)
livekit/src/rtc_engine/dc_sender.rs (1)
71-72: Consider making the send channel capacity configurable.The channel capacity is hardcoded to 128. If different use cases require different buffer depths, this could be exposed via
DataChannelSenderOptions.livekit/src/rtc_engine/rtc_session.rs (1)
670-680: Consider handling or logging unmatched output events.The
_ => {}match arm at Line 678 silently ignores anyOutputEventvariants that aren'tSfuUpdateSubscription. If new variants are added toOutputEventin the future, they would be silently ignored, potentially masking bugs.♻️ Option 1: Make the match exhaustive
match event.into() { OutputEvent::SfuUpdateSubscription(event) => { self.signal_client() .send(proto::signal_request::Message::UpdateDataSubscription(event.into())) .await } - _ => {} + // Explicitly list other variants when added }♻️ Option 2: Log unhandled variants
- _ => {} + other => { + log::debug!("Unhandled remote data track output event: {:?}", other); + }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
Cargo.tomllivekit/src/rtc_engine/dc_sender.rslivekit/src/rtc_engine/mod.rslivekit/src/rtc_engine/rtc_session.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- Cargo.toml
- livekit/src/rtc_engine/mod.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:11.346Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
Applied to files:
livekit/src/rtc_engine/dc_sender.rslivekit/src/rtc_engine/rtc_session.rs
🧬 Code graph analysis (1)
livekit/src/rtc_engine/rtc_session.rs (5)
livekit/src/rtc_engine/dc_sender.rs (1)
new(71-86)livekit-datatrack/src/remote/proto.rs (2)
event_from_join(47-51)event_from_participant_update(58-64)livekit-datatrack/src/local/proto.rs (1)
publish_result_from_request_response(76-92)livekit/src/room/e2ee/manager.rs (1)
encryption_type(199-202)livekit/src/rtc_engine/rtc_events.rs (1)
on_message(157-165)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Build (aarch64-linux-android)
- GitHub Check: Build (aarch64-unknown-linux-gnu)
- GitHub Check: Build (x86_64-pc-windows-msvc)
- GitHub Check: Build (aarch64-pc-windows-msvc)
- GitHub Check: Build (aarch64-apple-ios)
- GitHub Check: Build (x86_64-apple-darwin)
- GitHub Check: Build (aarch64-apple-ios-sim)
- GitHub Check: Build (armv7-linux-androideabi)
- GitHub Check: Build (x86_64-unknown-linux-gnu)
- GitHub Check: Build (aarch64-apple-darwin)
- GitHub Check: Build (x86_64-linux-android)
- GitHub Check: Test (x86_64-unknown-linux-gnu)
- GitHub Check: Test (x86_64-apple-darwin)
- GitHub Check: Test (x86_64-pc-windows-msvc)
🔇 Additional comments (15)
livekit/src/rtc_engine/dc_sender.rs (3)
20-62: LGTM!The struct design with internal
buffered_amounttracking andsend_queueis appropriate for implementing backpressure on data channel transmission. The documentation clearly explains the purpose and suggests future refactoring opportunities.
129-142: LGTM!Good defensive coding in
handle_bytes_sentto handle the edge case wherebytes_sentexceedsbuffered_amount, preventing underflow.
144-169: LGTM!Good use of
WeakUnboundedSenderto prevent the callback from keeping the channel alive. The design is extensible for adding more event types in the future.livekit/src/rtc_engine/rtc_session.rs (12)
26-72: LGTM!The new imports and
DATA_TRACK_DC_LABELconstant follow the existing patterns established byLOSSY_DC_LABELandRELIABLE_DC_LABEL.
208-209: LGTM!New session event variants cleanly integrate the data track input events from the
livekit_datatrackcrate.
357-377: LGTM!The new data track transport fields follow the existing patterns for lossy and reliable data channels.
538-548: LGTM!The
dt_sender_tasklifecycle is properly managed—spawned on connect and awaited on close, consistent with the other session tasks.
449-451: LGTM!Properly extracts initial remote data track publications from the join response and emits them as session events.
474-498: LGTM!Good refactoring to extract
lossy_optionsfor reuse. The data track transport using the same unreliable configuration aligns with the design where data tracks do not guarantee reliable delivery. Based on learnings: "data tracks do not guarantee reliable delivery in the initial version."
682-694: LGTM!The use of
try_sendwith break-on-error aligns with the intentional design where data tracks drop packets on backpressure rather than propagating it. Based on learnings: "Theprocess_and_sendmethod... intentionally usestry_sendto drop packets on backpressure rather than propagating it, and this is expected behavior."
1057-1118: LGTM!The signal event handling correctly routes data track-related responses to the appropriate session events. The early return at Line 1100 properly prevents
RequestResponsemessages for data track publishes from being processed by the genericpending_requestshandler.
1163-1177: LGTM!Good refactor to use a match expression for data channel routing. The
handle_remote_dt_packetscallback is correctly registered before storing the data channel reference.
1348-1372: LGTM!The encryption type is now correctly extracted from the encrypted packet and propagated through the decryption path to the emitted events.
1869-1925: LGTM!Good refactoring to extract the common publisher connection logic into
ensure_publisher_connected_with_dc, reducing duplication between the standard data channel path and the data track transport path.
1963-1975: LGTM with a note on the TODO.The
handle_remote_dt_packetsfunction correctly usesWeakUnboundedSenderto avoid preventing channel close. The TODO at Line 1970 about avoiding the clone is a valid future optimization—buffer.datacould potentially be converted toBytesmore efficiently depending on the underlying type.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| if !self.send_queue.is_empty() { | ||
| let unsent_bytes: usize = | ||
| self.send_queue.into_iter().map(|payload| payload.len()).sum(); | ||
| log::info!("{} bytes(s) remain in queue", unsent_bytes); | ||
| } |
There was a problem hiding this comment.
Minor typo in log message.
Line 111 has "bytes(s)" which should be "byte(s)" for grammatical correctness.
📝 Proposed fix
- log::info!("{} bytes(s) remain in queue", unsent_bytes);
+ log::info!("{} byte(s) remain in queue", unsent_bytes);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if !self.send_queue.is_empty() { | |
| let unsent_bytes: usize = | |
| self.send_queue.into_iter().map(|payload| payload.len()).sum(); | |
| log::info!("{} bytes(s) remain in queue", unsent_bytes); | |
| } | |
| if !self.send_queue.is_empty() { | |
| let unsent_bytes: usize = | |
| self.send_queue.into_iter().map(|payload| payload.len()).sum(); | |
| log::info!("{} byte(s) remain in queue", unsent_bytes); | |
| } |
🤖 Prompt for AI Agents
In `@livekit/src/rtc_engine/dc_sender.rs` around lines 108 - 112, The log message
has a grammatical typo: update the string in the block that checks
self.send_queue (where unsent_bytes is computed) to use "byte(s)" instead of
"bytes(s)"; i.e., modify the log::info! call that currently prints "{} bytes(s)
remain in queue" to print "{} byte(s) remain in queue" so
send_queue/unsent_bytes logging is correct.
| fn send_until_threshold(&mut self) { | ||
| while self.buffered_amount <= self.low_buffer_threshold { | ||
| let Some(payload) = self.send_queue.pop_front() else { | ||
| break; | ||
| }; | ||
| self.buffered_amount += payload.len() as u64; | ||
| _ = self | ||
| .dc | ||
| .send(&payload, true) | ||
| .inspect_err(|err| log::error!("Failed to send data: {}", err)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Potential buffered_amount inconsistency on send failure.
buffered_amount is incremented before calling dc.send() (Line 121). If send() fails (Line 124), the bytes weren't actually buffered by the data channel, but buffered_amount remains inflated. Since BytesSent callbacks only fire for successful sends, this could cause the sender to prematurely stop sending due to perceived backpressure.
Consider either:
- Incrementing
buffered_amountonly after a successful send, or - Decrementing on failure.
🛡️ Proposed fix to decrement on failure
fn send_until_threshold(&mut self) {
while self.buffered_amount <= self.low_buffer_threshold {
let Some(payload) = self.send_queue.pop_front() else {
break;
};
- self.buffered_amount += payload.len() as u64;
- _ = self
+ let payload_len = payload.len() as u64;
+ self.buffered_amount += payload_len;
+ if self
.dc
.send(&payload, true)
- .inspect_err(|err| log::error!("Failed to send data: {}", err));
+ .inspect_err(|err| log::error!("Failed to send data: {}", err))
+ .is_err()
+ {
+ self.buffered_amount -= payload_len;
+ }
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn send_until_threshold(&mut self) { | |
| while self.buffered_amount <= self.low_buffer_threshold { | |
| let Some(payload) = self.send_queue.pop_front() else { | |
| break; | |
| }; | |
| self.buffered_amount += payload.len() as u64; | |
| _ = self | |
| .dc | |
| .send(&payload, true) | |
| .inspect_err(|err| log::error!("Failed to send data: {}", err)); | |
| } | |
| } | |
| fn send_until_threshold(&mut self) { | |
| while self.buffered_amount <= self.low_buffer_threshold { | |
| let Some(payload) = self.send_queue.pop_front() else { | |
| break; | |
| }; | |
| let payload_len = payload.len() as u64; | |
| self.buffered_amount += payload_len; | |
| if self | |
| .dc | |
| .send(&payload, true) | |
| .inspect_err(|err| log::error!("Failed to send data: {}", err)) | |
| .is_err() | |
| { | |
| self.buffered_amount -= payload_len; | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@livekit/src/rtc_engine/dc_sender.rs` around lines 116 - 127, The
buffered_amount is increased before calling dc.send() in send_until_threshold,
which leaves it inflated on send failures; change the logic so buffered_amount
is only incremented after a successful send or, more minimally, decrement
buffered_amount when dc.send() returns Err: update send_until_threshold to
attempt send first, and on Ok increment self.buffered_amount by payload.len() as
u64 (or if keeping the current order, subtract payload.len() in the inspect_err
closure), referencing the send_until_threshold method, self.buffered_amount,
self.send_queue, and dc.send to locate and fix the update so BytesSent callbacks
and error paths keep buffered_amount consistent.
This PR adds data tracks support to the Rust client by integrating the livekit-datatrack crate introduced in #862.
Summary of changes:
Note to reviewers: relevant changes are located under livekit/.
Summary by CodeRabbit
New Features
Examples
Tests
Documentation