Skip to content

Data tracks integration#863

Open
ladvoc wants to merge 24 commits intomainfrom
ladvoc/data-tracks-integration
Open

Data tracks integration#863
ladvoc wants to merge 24 commits intomainfrom
ladvoc/data-tracks-integration

Conversation

@ladvoc
Copy link
Contributor

@ladvoc ladvoc commented Feb 3, 2026

This PR adds data tracks support to the Rust client by integrating the livekit-datatrack crate introduced in #862.

Summary of changes:

  • Public API changes (non-breaking)
    • Adds publish_data_track method to LocalParticipant
    • Adds a new room event, RemoteDataTrackPublished
  • Adds the basic_data_track example (see publisher.rs and subscriber.rs)
  • Defines a suite of end-to-end tests to verify proper functionality under normal and fault conditions

Note to reviewers: relevant changes are located under livekit/.

Summary by CodeRabbit

  • New Features

    • Added data tracks: publish/subscribe arbitrary binary data in a room
    • Built-in end-to-end encryption support for data tracks
    • Exposed data-track APIs in the public prelude and Room/Participant surfaces (publish/subscribe)
  • Examples

    • Added example demonstrating publish/subscribe data-track publisher and subscriber
  • Tests

    • End-to-end tests covering data tracks, encryption, and lifecycle scenarios
  • Documentation

    • README and usage docs for the new data-track example and APIs

@coderabbitai
Copy link

coderabbitai bot commented Feb 3, 2026

📝 Walkthrough

Walkthrough

Adds a new internal crate livekit-datatrack implementing data-track core (frames, packet format, packetize/depacketize, E2EE traits), local/remote managers and pipelines, integrates data-track transport into room/RTC lifecycle, updates Cargo manifests/examples, and adds end-to-end tests and release config.

Changes

Cohort / File(s) Summary
Workspace / Manifests
Cargo.toml, livekit/Cargo.toml, release-plz.toml, examples/*/Cargo.toml
Adds livekit-datatrack to workspace and release config; many example manifests moved from workspace = true to explicit version pins; adds deps (from_variants, futures-core, tokio-stream, etc.).
New crate: core API
livekit-datatrack/Cargo.toml, livekit-datatrack/src/lib.rs, livekit-datatrack/src/track.rs, livekit-datatrack/src/frame.rs, livekit-datatrack/src/error.rs
New crate manifest and public API surface: DataTrack generic type, DataTrackInfo/Sid, DataTrackFrame, InternalError and api/backend re-exports.
E2EE surface & adapters
livekit-datatrack/src/e2ee.rs, livekit/src/room/e2ee/data_track.rs, livekit/src/room/e2ee/mod.rs, livekit/src/room/e2ee/manager.rs
Defines EncryptionProvider/DecryptionProvider traits and EncryptedPayload; adds room-level provider adapters and updates E2EE manager signatures/Debug impl.
Local path
livekit-datatrack/src/local/* (events.rs, manager.rs, packetizer.rs, pipeline.rs, proto.rs, mod.rs)
Implements local publish manager, event types, packetizer (fragmentation/MTU handling), pipeline (optional encrypt→packetize), proto conversions, and per-track lifecycle.
Remote path
livekit-datatrack/src/remote/* (depacketizer.rs, manager.rs, pipeline.rs, events.rs, proto.rs, mod.rs)
Implements remote subscribe manager, depacketizer (reassembly), pipeline (depacketize→optional decrypt→frames), proto conversions, subscription lifecycle and events.
Packet layer
livekit-datatrack/src/packet/* (mod.rs, serialize.rs, deserialize.rs, extension.rs, handle.rs, time.rs)
Adds Packet/Header/Extensions, serialization/deserialization, Handle allocator, Timestamp/Clock, extension types (E2EE, user timestamp) and tests.
Utilities
livekit-datatrack/src/utils/* (bytes.rs, counter.rs, mod.rs)
Adds Bytes chunk iterator and generic Counter with wrapping increment; re-exports utilities.
Integration: room & engine
livekit/src/room/mod.rs, livekit/src/room/data_track.rs, livekit/src/room/participant/local_participant.rs, livekit/src/rtc_engine/*, livekit/src/room/e2ee/*, livekit/src/rtc_engine/rtc_session.rs, livekit/src/rtc_engine/rtc_events.rs
Wires datatrack managers into room lifecycle and RTC session, exposes room::data_track, adds session/engine event variants and handlers, integrates data-track data-channel label and forwarding.
Examples & docs
examples/basic_data_track/*
Adds basic_data_track example with publisher and subscriber binaries and README demonstrating publish/subscribe usage.
Tests
livekit/tests/data_track_test.rs, livekit/tests/common/e2e/mod.rs, livekit/tests/data_channel_encryption.rs
Adds extensive e2e tests for publish/subscribe, E2EE, stress scenarios; refactors test option types.
Small edits / formatting
livekit-ffi/src/conversion/participant.rs, livekit/src/proto.rs, livekit/src/prelude.rs, livekit/src/room/participant/mod.rs
Minor formatting/trailing-comma changes and prelude re-exports of datatrack types.

Sequence Diagram(s)

sequenceDiagram
    participant App as Application
    participant LP as LocalParticipant
    participant LDM as LocalManager
    participant Enc as Encryption
    participant Pkt as Packetizer
    participant RTCE as RTC Engine
    participant DC as Data Channel

    App->>LP: publish_data_track(options)
    LP->>LDM: publish_track(options)
    LDM->>LDM: allocate handle / create LocalDataTrack
    App->>LP: try_push(frame)
    LP->>LDM: frame_tx.send(frame)
    LDM->>Enc: encrypt_if_needed(frame)
    Enc-->>LDM: EncryptedPayload or frame
    LDM->>Pkt: packetize(frame)
    Pkt-->>LDM: Vec<Packet>
    LDM->>RTCE: OutputEvent::PacketsAvailable
    RTCE->>DC: send packets (data channel)
Loading
sequenceDiagram
    participant DC as Data Channel
    participant RTCE as RTC Engine
    participant RDM as RemoteManager
    participant Dep as Depacketizer
    participant Dec as Decryption
    participant Sub as Subscriber

    DC->>RTCE: binary packet
    RTCE->>RDM: RemoteDataTrackInput::PacketReceived
    RDM->>Dep: push packet
    Dep-->>RDM: DepacketizerFrame or drop
    RDM->>Dec: decrypt_if_needed(frame)
    Dec-->>RDM: DataTrackFrame
    RDM->>Sub: broadcast frame to subscribers
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested reviewers

  • cloudwebrtc
  • davidliu
  • xianshijing-lk

Poem

"I hop on bytes and scatter cheer,
Packets stitched and frames appear,
Keys tucked safe in carrot nests,
Publishers push, subscribers rest,
A rabbit's nod — the datatrack's here!" 🐇

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Data tracks integration' is concise and accurately reflects the main objective of the pull request, which is to integrate data tracks support into the LiveKit Rust SDK.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ladvoc/data-tracks-integration

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ladvoc ladvoc mentioned this pull request Feb 3, 2026
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 19

🤖 Fix all issues with AI agents
In `@examples/basic_data_track/src/subscriber.rs`:
- Around line 23-31: The function handle_first_publication currently continues
listening after subscribing to the first RemoteDataTrack; update it to stop
after the first successful subscription by returning (or breaking and returning)
immediately once subscribe(track).await? completes. Locate the loop in
handle_first_publication and add an early return (e.g., return Ok(())) right
after the subscribe(track).await? call so the function only handles the first
publication.

In `@examples/local_audio/Cargo.toml`:
- Around line 4-17: Update the example Cargo.toml dependencies to match the
workspace defaults by aligning env_logger and clap versions with the workspace
(remove the explicit version pins or set them to the workspace versions) so
env_logger uses 0.11 and clap uses 4.5; update the dependency entries for
env_logger and clap in the examples/local_audio Cargo.toml (the dependency keys
"env_logger" and "clap") to inherit or match the workspace versions.

In `@examples/rpc/Cargo.toml`:
- Around line 4-14: Update the dependency versions in the examples/rpc
Cargo.toml so they match the workspace: change the env_logger dependency from
"0.10" to "0.11" and change rand from "0.8" to "0.9" (the entries for env_logger
and rand in the examples/rpc Cargo.toml); leave other dependency entries
unchanged.

In `@examples/screensharing/Cargo.toml`:
- Around line 4-13: Update the example Cargo.toml dependencies to match the
workspace root: change env_logger from "0.10" to "0.11" and change clap from
"4.0" to "4.5" so the declared versions for env_logger and clap in the examples
align with the workspace root; verify the dependency entries for env_logger and
clap in the examples/screensharing/Cargo.toml are updated accordingly while
leaving other entries (tokio, livekit, livekit-api, log) unchanged.

In `@examples/wgpu_room/Cargo.toml`:
- Around line 4-26: The Cargo.toml example pins env_logger = "0.10.0" and
parking_lot = "0.12.1" which diverge from the workspace root versions; update
the dependency declarations for env_logger and parking_lot to either use
workspace = true (e.g., env_logger = { workspace = true } and parking_lot = {
workspace = true, features = ["deadlock_detection"] }) to inherit the root
versions, or explicitly change the versions to match the workspace root
(env_logger "0.11" and parking_lot "0.12") and document the intentional drift in
a comment if you choose to keep different versions; adjust only the entries for
env_logger and parking_lot in this Cargo.toml.

In `@livekit-datatrack/Cargo.toml`:
- Line 13: The workspace root Cargo.toml currently pins thiserror = "1" while
the livekit-datatrack crate uses thiserror = "2.0.17", causing dual major
versions; open the root Cargo.toml and change the workspace dependency entry
thiserror = "1" to thiserror = "2" so all crates use the same major version
(ensure the workspace dependency name is exactly "thiserror" and then run cargo
update/build to verify no other dependency conflicts).

In `@livekit-datatrack/src/local/manager.rs`:
- Around line 185-199: In on_sfu_publish_response, avoid calling
self.descriptors.remove(&event.handle) before confirming the descriptor is a
pending descriptor; instead look up the descriptor (e.g., with get_mut or entry
API), match it to Descriptor::Pending(result_tx) and only remove the descriptor
when you transition it to active (after calling self.create_local_track and
sending the result), keep active descriptors intact on duplicate responses, and
if the descriptor is missing or result_tx.is_closed() but event.result is Ok
(SFU accepted the publish), call your server-side unpublish path (e.g., a method
like self.unpublish_server_side(event.handle) or similar) to clean up the
orphaned SFU publication so it isn’t left published without a local track.
- Around line 236-265: The HashMap removal in remove_descriptor (and in
on_publish_cancelled) currently frees the descriptor but never returns the u16
handle to the allocator, exhausting HandleAllocator after 65,535 cycles; add a
release(id: Handle) method to HandleAllocator (or implement a reusable allocator
with a free list/bitset) and call it whenever a descriptor is discarded: inside
remove_descriptor() after taking the descriptor (and after sending
published_tx), and inside on_publish_cancelled() where the pending descriptor is
removed, ensuring handles returned to the allocator so HandleAllocator.get() can
reuse freed handles.
- Around line 333-380: The publish_track method uses event_in_tx.try_send which
conflates a full channel with a disconnected receiver; change it to await the
async send to properly handle backpressure. In ManagerInput::publish_track,
replace self.event_in_tx.try_send(event.into()).map_err(|_|
PublishError::Disconnected)? with an awaitable send:
self.event_in_tx.send(event.into()).await.map_err(|_|
PublishError::Disconnected)?, keeping the same PublishRequest/event.into() usage
and existing timeout/result handling (PUBLISH_TIMEOUT, PublishError) otherwise.

In `@livekit-datatrack/src/local/pipeline.rs`:
- Around line 47-51: The debug-only check in Pipeline::new using
debug_assert_eq!(options.info.uses_e2ee, options.encryption_provider.is_some())
must be promoted to a runtime check so releases don't silently misconfigure
E2EE; update the Pipeline::new constructor to validate options.info.uses_e2ee
against options.encryption_provider.is_some() and fail fast (e.g., use
assert_eq! or an explicit if check that panics with a clear message including
the actual values) before creating Packetizer so mismatches cannot result in
plaintext or unreadable ciphertext.

In `@livekit-datatrack/src/local/proto.rs`:
- Around line 76-92: The match on proto::request_response::Reason in
publish_result_from_request_response currently treats any unhandled variant
(including Reason::Ok) as an error; add an explicit check for Reason::Ok and
return None early (since this function is the error-only path) before mapping
other failure reasons to PublishError, keeping the existing mappings for
Reason::NotAllowed and Reason::DuplicateName and constructing the
SfuPublishResponse with Err(error) only for genuine error reasons.

In `@livekit-datatrack/src/remote/depacketizer.rs`:
- Around line 134-159: The finalize function in Depacketizer incorrectly uses
`sequence < end_sequence` which breaks when u16 sequence numbers wrap; update
the loop logic in finalize (working on PartialFrame, sequence, end_sequence,
start_sequence) to check `sequence != end_sequence` and advance `sequence` with
`wrapping_add(1)` so the reassembly correctly iterates across wrap-around
boundaries before stopping at end_sequence; keep the existing removal of
payloads and final return of DepacketizerFrame when sequence equals
end_sequence.

In `@livekit-datatrack/src/remote/manager.rs`:
- Around line 419-424: The ManagerInput::send implementation uses
event_in_tx.try_send which can drop events when the channel is full; change
ManagerInput::send to reliably deliver control events by either (a) making the
method async and replacing try_send with event_in_tx.send(event).await, or (b)
using event_in_tx.blocking_send(event) for a synchronous guarantee, and adjust
callers of ManagerInput::send accordingly (look for the ManagerInput type and
its send method, and the event_in_tx field) so critical events like
SubscribeRequest or Shutdown are never silently dropped.

In `@livekit-datatrack/src/remote/mod.rs`:
- Around line 75-97: Add an early publication-status check to the subscribe()
method: call self.is_published() at the start of subscribe() and if it returns
false return Err(SubscribeError::Unpublished) before creating/sending the
SubscribeRequest (the SubscribeRequest { sid: ..., result_tx } and event_in_tx
send logic). This ensures SubscribeError::Unpublished is used for unpublished
remote tracks and prevents sending subscribe events for un-published tracks.

In `@livekit-datatrack/src/remote/proto.rs`:
- Around line 66-80: The call to Option::is_none_or in
event_from_participant_info (using local_participant_identity.is_none_or(...))
requires Rust 1.82; either set the crate MSRV by adding rust-version = "1.82" to
the crate's Cargo.toml, or replace the is_none_or usage with a compatible
alternative such as local_participant_identity.map_or(true, |identity|
participant.identity != identity) inside event_from_participant_info so
extract_track_info and the participant filtering work on older compilers.

In `@livekit/Cargo.toml`:
- Line 40: The workspace is using thiserror = "1.0" while livekit-datatrack pins
thiserror = "2.0.17", causing duplicate versions; update the root/workspace
Cargo.toml's thiserror version to a 2.x range (e.g., "2.0") and then update each
crate that currently inherits or specifies thiserror (including the livekit
crate entries and the livekit-datatrack Cargo.toml) to the same 2.x version so
all crates use a single thiserror v2 line; ensure removal of any older explicit
"1.0" entries and run cargo update to refresh Cargo.lock so error types are
unified across the workspace.

In `@livekit/src/room/e2ee/data_track.rs`:
- Around line 19-50: The formatting in DataTrackEncryptionProvider::encrypt (and
surrounding code) doesn't match rustfmt; run `cargo fmt` to reformat the file so
the spacing/indentation around the key_index and encrypted variable assignments
is fixed; ensure DataTrackEncryptionProvider, its new constructor, and the impl
dt::EncryptionProvider::encrypt (which calls E2eeManager::key_provider and
E2eeManager::encrypt_data) are left unchanged functionally and only reformatted.

In `@livekit/src/room/e2ee/manager.rs`:
- Around line 272-277: The function signature for encrypt_data is missing a
trailing comma after the key_index parameter; update the signature in the
encrypt_data method (the parameters: data: Vec<u8>, participant_identity:
&ParticipantIdentity, key_index: u32) to include a trailing comma after
key_index to satisfy formatting/CI requirements.

In `@livekit/src/room/mod.rs`:
- Around line 611-628: The variables are swapped: decryption_provider is
constructed as DataTrackEncryptionProvider and encryption_provider as
DataTrackDecryptionProvider, and those are passed to the wrong ManagerOptions;
change the assignments so the actual encryption provider is passed to
local::ManagerOptions and the actual decryption provider is passed to
remote::ManagerOptions — specifically ensure DataTrackEncryptionProvider
(currently in decryption_provider) is used for local_dt_options
(encryption_provider) and DataTrackDecryptionProvider (currently in
encryption_provider) is used for remote_dt_options (decryption_provider), or
alternatively rename the variables so decryption_provider/encryption_provider
reflect their real types and then pass them to local_dt_options and
remote_dt_options accordingly (affecting DataTrackEncryptionProvider,
DataTrackDecryptionProvider, local::ManagerOptions, remote::ManagerOptions,
local_dt_options, remote_dt_options).
🧹 Nitpick comments (15)
examples/send_bytes/Cargo.toml (1)

8-13: Inconsistent dependency versioning compared to other examples.

This file pins exact patch versions (tokio = "1.47.1", log = "0.4.28", env_logger = "0.11.8", rand = "0.9.2"), while other examples in this PR use semver-compatible ranges (e.g., tokio = { version = "1", ... }, log = "0.4"). For unpublished examples, exact pinning creates unnecessary maintenance burden.

Consider aligning with other examples:

♻️ Suggested alignment with other examples
-tokio = { version = "1.47.1", features = ["full"] }
+tokio = { version = "1", features = ["full"] }
 livekit = { workspace = true, features = ["rustls-tls-native-roots"] }
-log = "0.4.28"
-env_logger = "0.11.8"
+log = "0.4"
+env_logger = "0.11"
 bitfield-struct = "0.11.0"
-rand = "0.9.2"
+rand = "0.9"
examples/basic_text_stream/Cargo.toml (1)

8-12: Minor version pinning inconsistency within this file.

tokio and futures-util use semver ranges ("1", "0.3") while log and env_logger use exact patch versions ("0.4.26", "0.11.7"). For consistency within this manifest and with other examples:

♻️ Optional: align version specifications
 tokio = { version = "1", features = ["full"] }
 futures-util = { version = "0.3", default-features = false, features = ["sink"] }
 livekit = { workspace = true, features = ["rustls-tls-native-roots"] }
-log = "0.4.26"
-env_logger = "0.11.7"
+log = "0.4"
+env_logger = "0.11"
examples/basic_data_track/README.md (1)

1-21: Add a best‑effort delivery note to set expectations.

This example reads as fully reliable; a short note about possible frame drops would avoid confusion.

✏️ Proposed doc tweak
-Simple example of publishing and subscribing to a data track.
+Simple example of publishing and subscribing to a data track.
+Note: data tracks are best-effort and may drop frames under backpressure.

Based on learnings: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The process_and_send method in livekit-datatrack/src/local/manager.rs intentionally uses try_send to drop packets on backpressure rather than propagating it, and this is expected behavior.

livekit-datatrack/src/packet/handle.rs (1)

84-89: Minor: Test dummy excludes u16::MAX as a valid handle.

The range 1..u16::MAX is exclusive of u16::MAX, meaning handle 65535 will never be generated in tests. If u16::MAX is a valid handle value (which it appears to be per TryFrom<u16>), consider using 1..=u16::MAX for full coverage.

Proposed fix
 impl fake::Dummy<fake::Faker> for Handle {
     fn dummy_with_rng<R: rand::Rng + ?Sized>(_: &fake::Faker, rng: &mut R) -> Self {
-        Self::try_from(rng.random_range(1..u16::MAX)).unwrap()
+        Self::try_from(rng.random_range(1..=u16::MAX)).unwrap()
     }
 }
livekit-datatrack/src/local/packetizer.rs (1)

133-136: Nitpick: Prefer is_empty() over length comparison.

Idiomatic Rust uses is_empty() for emptiness checks.

Proposed fix
-        if packets.len() == 0 {
+        if packets.is_empty() {
             assert_eq!(payload_size, 0, "Should be no packets for zero payload");
             return;
         }
livekit-datatrack/src/packet/time.rs (1)

82-88: Document intentional u32 truncation in duration_to_ticks.

The ticks as u32 cast silently truncates values exceeding u32::MAX. While this is correct behavior for wrapping RTP timestamps, a brief comment would clarify that this is intentional rather than an oversight.

Proposed documentation addition
     /// Convert a duration since the epoch into clock ticks.
     const fn duration_to_ticks(duration: Duration) -> u32 {
         // round(nanos * rate_hz / 1e9)
         let nanos = duration.as_nanos();
         let ticks = (nanos * RATE as u128 + 500_000_000) / 1_000_000_000;
+        // Intentional truncation: timestamps wrap at u32::MAX
         ticks as u32
     }
livekit/Cargo.toml (1)

35-42: Consider using workspace dependencies for consistency.

Several dependencies are pinned to explicit versions here while other crates in the workspace may use workspace = true. This can lead to version drift. If these versions are intentionally different from workspace defaults, this is fine; otherwise, consider using workspace references for consistency.

livekit-datatrack/src/remote/depacketizer.rs (1)

240-420: Good test coverage; consider a wrap‑around case.

After fixing the wrap logic, a test with start_sequence near u16::MAX would guard against regressions.

livekit-datatrack/src/packet/deserialize.rs (1)

97-135: Known extension parsing ignores the wire len field.

For E2eeExt and UserTimestampExt, the code checks raw.remaining() < E2eeExt::LEN (hardcoded constant) rather than using the len value read from the wire at line 102. If a future protocol version sends a different length for these extensions, deserialization could read incorrect data or leave bytes unconsumed.

Consider validating that len matches the expected LEN constant, or using len to bound the read.

♻️ Proposed validation
                 E2eeExt::TAG => {
+                    if len != E2eeExt::LEN {
+                        Err(DeserializeError::MalformedExt(tag))?
+                    }
                     if raw.remaining() < E2eeExt::LEN {
                         Err(DeserializeError::MalformedExt(tag))?
                     }
                     // ...
                 }
                 UserTimestampExt::TAG => {
+                    if len != UserTimestampExt::LEN {
+                        Err(DeserializeError::MalformedExt(tag))?
+                    }
                     if raw.remaining() < UserTimestampExt::LEN {
                         Err(DeserializeError::MalformedExt(tag))?
                     }
                     // ...
                 }
livekit-datatrack/src/remote/mod.rs (1)

94-96: Silent frame drops on subscriber lag.

BroadcastStream returns Err(BroadcastStreamRecvError::Lagged(n)) when a subscriber falls behind. Filtering these out via filter_map(|r| async { r.ok() }) silently discards the lag notification—subscribers won't know they missed frames.

Consider exposing the lag info (e.g., via a wrapper event or logging) so applications can detect and handle missed frames. Based on learnings, this may be acceptable if data tracks don't guarantee reliable delivery, but documenting this behavior in the subscribe doc comment would help users understand the semantics.

livekit-datatrack/src/remote/manager.rs (2)

74-75: Consider documenting or increasing channel buffer sizes.

The buffer sizes of 4 for event channels may be tight under load. While the TODO: tune buffer size comment acknowledges this, consider either:

  1. Making buffer sizes configurable via ManagerOptions
  2. Documenting the expected throughput characteristics

128-130: Address the TODO for subscription timeout.

Pending subscriptions have no internal timeout—if the SFU never responds with a subscriber handle, the result_txs will never be resolved. Consider adding a timeout mechanism or tracking this in an issue.

Would you like me to open an issue to track implementing an internal subscription timeout?

livekit/src/rtc_engine/rtc_session.rs (2)

665-672: Partial batch sending on error—verify this is acceptable.

try_send_data_track_packets sends packets until the first error, then drops the rest of the batch. This aligns with the lossy delivery model, but consider whether logging the number of dropped packets would aid debugging.


1942-1954: Consider removing the TODO or clarifying the clone concern.

The comment // TODO: avoid clone if possible refers to buffer.data.to_vec().into(). Since DataBuffer.data is a slice (&[u8]), a copy is unavoidable when converting to owned Bytes. The current approach is correct—consider removing the TODO or noting that ownership transfer from the C++ layer requires this copy.

livekit-datatrack/src/local/mod.rs (1)

143-149: Consider validating non-empty track name.

The documentation states the name "Must not be empty", but DataTrackOptions::new doesn't enforce this. Consider adding validation:

🛡️ Proposed fix to validate name
 pub fn new(name: impl Into<String>) -> Self {
-    Self { name: name.into() }
+    let name = name.into();
+    debug_assert!(!name.is_empty(), "Track name must not be empty");
+    Self { name }
 }

Alternatively, return a Result to enforce at runtime, or document that empty names result in undefined behavior.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7424141 and fae05c9.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (66)
  • Cargo.toml
  • examples/agent_dispatch/Cargo.toml
  • examples/api/Cargo.toml
  • examples/basic_data_track/Cargo.toml
  • examples/basic_data_track/README.md
  • examples/basic_data_track/src/publisher.rs
  • examples/basic_data_track/src/subscriber.rs
  • examples/basic_room/Cargo.toml
  • examples/basic_text_stream/Cargo.toml
  • examples/encrypted_text_stream/Cargo.toml
  • examples/local_audio/Cargo.toml
  • examples/local_video/Cargo.toml
  • examples/mobile/Cargo.toml
  • examples/play_from_disk/Cargo.toml
  • examples/rpc/Cargo.toml
  • examples/save_to_disk/Cargo.toml
  • examples/screensharing/Cargo.toml
  • examples/send_bytes/Cargo.toml
  • examples/webhooks/Cargo.toml
  • examples/wgpu_room/Cargo.toml
  • livekit-datatrack/Cargo.toml
  • livekit-datatrack/README.md
  • livekit-datatrack/src/e2ee.rs
  • livekit-datatrack/src/error.rs
  • livekit-datatrack/src/frame.rs
  • livekit-datatrack/src/lib.rs
  • livekit-datatrack/src/local/events.rs
  • livekit-datatrack/src/local/manager.rs
  • livekit-datatrack/src/local/mod.rs
  • livekit-datatrack/src/local/packetizer.rs
  • livekit-datatrack/src/local/pipeline.rs
  • livekit-datatrack/src/local/proto.rs
  • livekit-datatrack/src/packet/deserialize.rs
  • livekit-datatrack/src/packet/extension.rs
  • livekit-datatrack/src/packet/handle.rs
  • livekit-datatrack/src/packet/mod.rs
  • livekit-datatrack/src/packet/serialize.rs
  • livekit-datatrack/src/packet/time.rs
  • livekit-datatrack/src/remote/depacketizer.rs
  • livekit-datatrack/src/remote/events.rs
  • livekit-datatrack/src/remote/manager.rs
  • livekit-datatrack/src/remote/mod.rs
  • livekit-datatrack/src/remote/pipeline.rs
  • livekit-datatrack/src/remote/proto.rs
  • livekit-datatrack/src/track.rs
  • livekit-datatrack/src/utils/bytes.rs
  • livekit-datatrack/src/utils/counter.rs
  • livekit-datatrack/src/utils/mod.rs
  • livekit-ffi/src/conversion/participant.rs
  • livekit/Cargo.toml
  • livekit/src/prelude.rs
  • livekit/src/proto.rs
  • livekit/src/room/data_track.rs
  • livekit/src/room/e2ee/data_track.rs
  • livekit/src/room/e2ee/manager.rs
  • livekit/src/room/e2ee/mod.rs
  • livekit/src/room/mod.rs
  • livekit/src/room/participant/local_participant.rs
  • livekit/src/room/participant/mod.rs
  • livekit/src/rtc_engine/mod.rs
  • livekit/src/rtc_engine/rtc_events.rs
  • livekit/src/rtc_engine/rtc_session.rs
  • livekit/tests/common/e2e/mod.rs
  • livekit/tests/data_channel_encryption.rs
  • livekit/tests/data_track_test.rs
  • release-plz.toml
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:04.638Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:04.638Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:04.638Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.

Applied to files:

  • livekit/src/room/e2ee/mod.rs
  • livekit/src/prelude.rs
  • examples/basic_data_track/README.md
  • livekit-datatrack/src/local/packetizer.rs
  • livekit/tests/data_track_test.rs
  • livekit-datatrack/src/remote/proto.rs
  • livekit-datatrack/src/local/pipeline.rs
  • livekit-datatrack/src/lib.rs
  • livekit/src/room/data_track.rs
  • livekit-datatrack/README.md
  • livekit/src/room/participant/local_participant.rs
  • livekit-datatrack/src/packet/handle.rs
  • livekit-datatrack/src/remote/pipeline.rs
  • examples/basic_data_track/src/publisher.rs
  • livekit/Cargo.toml
  • livekit-datatrack/src/remote/mod.rs
  • livekit-datatrack/src/remote/depacketizer.rs
  • livekit-datatrack/src/local/events.rs
  • livekit-datatrack/src/local/proto.rs
  • livekit-datatrack/src/packet/mod.rs
  • livekit/src/room/mod.rs
  • livekit-datatrack/src/remote/manager.rs
  • examples/basic_data_track/Cargo.toml
  • examples/basic_data_track/src/subscriber.rs
  • Cargo.toml
  • livekit-datatrack/src/remote/events.rs
  • livekit-datatrack/src/local/manager.rs
  • livekit-datatrack/Cargo.toml
  • livekit/src/rtc_engine/mod.rs
  • livekit-datatrack/src/track.rs
  • livekit-datatrack/src/frame.rs
  • livekit/src/rtc_engine/rtc_session.rs
  • livekit-datatrack/src/local/mod.rs
🧬 Code graph analysis (21)
livekit-datatrack/src/local/packetizer.rs (3)
livekit-datatrack/src/packet/time.rs (2)
  • new (54-56)
  • random (23-25)
livekit-datatrack/src/utils/counter.rs (1)
  • new (21-23)
livekit-datatrack/src/packet/handle.rs (2)
  • from (52-54)
  • from (58-60)
livekit/tests/data_track_test.rs (3)
livekit-datatrack/src/track.rs (1)
  • uses_e2ee (91-93)
livekit-datatrack/src/remote/mod.rs (1)
  • publisher_identity (100-102)
livekit-datatrack/src/frame.rs (2)
  • payload (41-43)
  • user_timestamp (46-48)
livekit-datatrack/src/remote/proto.rs (2)
livekit-datatrack/src/track.rs (3)
  • try_from (111-117)
  • from (121-123)
  • name (87-89)
livekit-datatrack/src/packet/handle.rs (4)
  • try_from (34-39)
  • try_from (45-48)
  • from (52-54)
  • from (58-60)
livekit-datatrack/src/lib.rs (1)
livekit-datatrack/src/packet/serialize.rs (1)
  • packet (178-193)
livekit/tests/data_channel_encryption.rs (1)
livekit/tests/common/e2e/mod.rs (1)
  • test_rooms_with_options (82-128)
livekit-datatrack/src/packet/deserialize.rs (3)
livekit-datatrack/src/packet/handle.rs (2)
  • from (52-54)
  • from (58-60)
livekit-datatrack/src/packet/time.rs (1)
  • from_ticks (27-29)
livekit-datatrack/src/packet/serialize.rs (1)
  • packet (178-193)
livekit-datatrack/src/remote/pipeline.rs (1)
livekit-datatrack/src/remote/depacketizer.rs (3)
  • new (39-41)
  • from (190-192)
  • from (196-198)
examples/basic_data_track/src/publisher.rs (1)
examples/basic_data_track/src/subscriber.rs (1)
  • main (8-21)
livekit/src/rtc_engine/rtc_events.rs (1)
livekit/src/rtc_engine/rtc_session.rs (2)
  • data_channel (622-624)
  • data_channel (1906-1922)
livekit-datatrack/src/remote/mod.rs (4)
livekit-runtime/src/dispatcher.rs (1)
  • timeout (115-128)
livekit-datatrack/src/remote/depacketizer.rs (3)
  • new (39-41)
  • from (190-192)
  • from (196-198)
livekit-datatrack/src/remote/pipeline.rs (1)
  • new (39-47)
livekit-datatrack/src/track.rs (3)
  • info (38-40)
  • published_rx (63-68)
  • from (121-123)
livekit-datatrack/src/local/events.rs (1)
livekit-api/src/http_client.rs (1)
  • bytes (81-83)
livekit-datatrack/src/utils/mod.rs (1)
livekit-api/src/http_client.rs (1)
  • bytes (81-83)
livekit-datatrack/src/local/proto.rs (2)
livekit-datatrack/src/track.rs (5)
  • from (121-123)
  • try_from (111-117)
  • uses_e2ee (91-93)
  • sid (83-85)
  • name (87-89)
livekit-datatrack/src/packet/handle.rs (4)
  • from (52-54)
  • from (58-60)
  • try_from (34-39)
  • try_from (45-48)
livekit-datatrack/src/packet/mod.rs (2)
livekit-datatrack/src/packet/deserialize.rs (3)
  • deserialize (44-49)
  • deserialize (53-94)
  • deserialize (98-134)
livekit-datatrack/src/packet/serialize.rs (1)
  • serialize (50-57)
livekit/src/room/mod.rs (1)
livekit-datatrack/src/local/proto.rs (1)
  • publish_responses_for_sync_state (113-120)
livekit-datatrack/src/remote/manager.rs (5)
livekit-datatrack/src/remote/mod.rs (4)
  • new (43-45)
  • publisher_identity (100-102)
  • published_rx (113-115)
  • inner (47-52)
livekit-datatrack/src/remote/pipeline.rs (2)
  • new (39-47)
  • from (88-93)
livekit-datatrack/src/track.rs (5)
  • info (38-40)
  • sid (83-85)
  • published_rx (63-68)
  • from (121-123)
  • uses_e2ee (91-93)
livekit-runtime/src/dispatcher.rs (1)
  • timeout (115-128)
livekit-datatrack/src/packet/handle.rs (2)
  • from (52-54)
  • from (58-60)
examples/basic_data_track/src/subscriber.rs (2)
livekit-datatrack/src/remote/mod.rs (1)
  • publisher_identity (100-102)
livekit-datatrack/src/frame.rs (1)
  • payload (41-43)
livekit-datatrack/src/packet/time.rs (1)
livekit-datatrack/src/local/packetizer.rs (1)
  • new (46-54)
livekit-datatrack/src/track.rs (2)
livekit-datatrack/src/local/mod.rs (1)
  • published_rx (110-112)
livekit-datatrack/src/remote/mod.rs (1)
  • published_rx (113-115)
livekit-datatrack/src/frame.rs (2)
livekit-datatrack/src/local/pipeline.rs (1)
  • from (81-89)
livekit-datatrack/src/remote/pipeline.rs (1)
  • from (88-93)
livekit/src/rtc_engine/rtc_session.rs (4)
livekit-datatrack/src/remote/proto.rs (1)
  • event_from_join (47-51)
livekit-datatrack/src/local/proto.rs (1)
  • publish_result_from_request_response (76-92)
livekit/src/rtc_engine/rtc_events.rs (1)
  • on_message (157-165)
libwebrtc/src/native/data_channel.rs (2)
  • on_message (105-107)
  • on_message (129-134)
🪛 GitHub Actions: Rust Formatting
livekit/src/room/e2ee/manager.rs

[error] 273-273: cargo fmt -- --check detected a formatting difference in manager.rs (added trailing comma after key_index parameter). Run 'cargo fmt' to fix code style issues.

livekit/src/room/e2ee/data_track.rs

[error] 31-31: cargo fmt -- --check detected a formatting difference in data_track.rs. Run 'cargo fmt' to fix code style issues.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Build (aarch64-linux-android)
  • GitHub Check: Build (aarch64-unknown-linux-gnu)
  • GitHub Check: Build (x86_64-unknown-linux-gnu)
  • GitHub Check: Build (aarch64-apple-darwin)
  • GitHub Check: Build (x86_64-linux-android)
  • GitHub Check: Build (aarch64-apple-ios)
  • GitHub Check: Build (x86_64-pc-windows-msvc)
  • GitHub Check: Build (aarch64-apple-ios-sim)
  • GitHub Check: Build (aarch64-pc-windows-msvc)
  • GitHub Check: Build (x86_64-apple-darwin)
  • GitHub Check: Build (armv7-linux-androideabi)
  • GitHub Check: Test (x86_64-pc-windows-msvc)
  • GitHub Check: Test (x86_64-unknown-linux-gnu)
  • GitHub Check: Test (x86_64-apple-darwin)
🔇 Additional comments (122)
livekit/src/proto.rs (1)

149-159: LGTM — formatting-only change.
Trailing comma is fine and keeps the match arm consistent.

livekit-ffi/src/conversion/participant.rs (1)

61-61: LGTM! Good formatting improvement.

Adding trailing commas to the last match arms follows Rust best practices and makes future additions cleaner.

Also applies to: 75-75

release-plz.toml (1)

87-91: LGTM!

The release configuration for the new livekit-datatrack crate is properly set up with changelog path, publish, and release settings consistent with other workspace packages.

examples/webhooks/Cargo.toml (1)

4-10: LGTM!

The explicit edition and tokio dependency declaration align with the broader migration pattern in this PR.

Cargo.toml (2)

9-9: LGTM!

The livekit-datatrack crate is correctly added as a workspace member and dependency with appropriate version pinning.

Also applies to: 47-47


60-62: New workspace dependencies look appropriate.

The additions of from_variants, futures-core, and tokio-stream are reasonable supporting dependencies for the new data tracks functionality.

Also applies to: 75-75

livekit/tests/data_channel_encryption.rs (1)

52-52: LGTM!

The .into() conversions correctly adapt RoomOptions to the expected TestRoomOptions type required by test_rooms_with_options, aligning with the function signature that expects IntoIterator<Item = TestRoomOptions>.

examples/api/Cargo.toml (1)

4-8: LGTM!

The explicit edition and tokio dependency with parking_lot feature are appropriate for this API example.

examples/basic_room/Cargo.toml (1)

4-4: LGTM.
Manifest pinning and explicit edition are clear and consistent with the workspace direction.

Also applies to: 8-12

examples/agent_dispatch/Cargo.toml (1)

4-4: LGTM.
Explicit dependency pins and edition update look fine.

Also applies to: 8-13

examples/basic_data_track/Cargo.toml (1)

1-21: LGTM.
New example manifest is clean and well-structured.

examples/local_video/Cargo.toml (1)

4-4: LGTM.
Pin updates and edition change are straightforward.

Also applies to: 26-26, 32-35, 42-43

livekit/src/room/participant/mod.rs (1)

48-49: LGTM.
Pure formatting change; no behavioral impact.

Also applies to: 57-58

examples/mobile/Cargo.toml (1)

4-4: LGTM.
Manifest updates look consistent with the rest of the examples.

Also applies to: 8-11, 15-15

livekit-datatrack/src/error.rs (1)

15-24: Concise internal error wrapper.

Clear and lightweight; no issues spotted.

examples/basic_data_track/src/publisher.rs (3)

6-21: Clear connect→publish→shutdown flow.

Easy to follow and cleanly structured.


24-27: Good placeholder for sensor input.

Simple stub makes it easy to swap in real data later.


29-38: Push loop cadence is straightforward.

Readable, predictable pacing for example use.

examples/save_to_disk/Cargo.toml (1)

4-11: Dependency versions are aligned with peer examples in the workspace.

The explicit pins for tokio, bytes, and futures in this example are consistent with similar examples. Tokio uses the flexible version = "1" pinning strategy common across save_to_disk, screensharing, play_from_disk, rpc, and other examples; bytes is uniquely declared here; and futures versions (0.3.28 vs. 0.3 elsewhere) fall within the same major.minor release band. No problematic drift detected.

examples/encrypted_text_stream/Cargo.toml (1)

1-13: LGTM.

livekit-datatrack/src/e2ee.rs (1)

22-56: API surface looks solid.

livekit-datatrack/src/frame.rs (1)

19-124: Frame API and helpers look good.

examples/play_from_disk/Cargo.toml (1)

1-12: LGTM.

livekit-datatrack/src/utils/bytes.rs (1)

17-85: Chunking utilities and tests look good.

livekit-datatrack/src/packet/handle.rs (1)

76-81: LGTM! Safe handle allocation with overflow protection.

The checked_add approach correctly prevents integer overflow when allocating handles. The implementation ensures handle 0 is never allocated (starts at 1 after first increment).

livekit-datatrack/src/remote/proto.rs (2)

25-39: LGTM! Clean protocol-to-event conversion.

The TryFrom implementation properly chains conversions for both handle and SID, collecting into a HashMap with appropriate error propagation via ?.


82-89: LGTM! Efficient ownership transfer with mem::take.

Using mem::take to move ownership of data_tracks while leaving an empty vector in place is an efficient pattern that avoids cloning.

livekit-datatrack/src/local/packetizer.rs (2)

44-54: LGTM! Well-structured packetizer initialization.

The constructor properly initializes counters and creates a randomized clock base timestamp, which helps avoid correlation attacks on timestamp values.


56-88: LGTM! Correct packetization with proper header propagation.

The implementation correctly:

  • Computes max payload size accounting for header overhead
  • Handles MTU-too-short edge case
  • Assigns proper frame markers (Single/Start/Inter/Final)
  • Increments sequence numbers per packet while keeping frame_number constant within a frame
livekit-datatrack/src/packet/time.rs (2)

44-50: LGTM! Well-designed monotonic clock with wraparound handling.

The Clock struct correctly tracks the previous timestamp to enforce monotonicity in RTP wraparound space, which is essential for media synchronization.


68-80: LGTM! Correct monotonicity enforcement.

The at method properly clamps the computed timestamp to prev when wraparound would cause a backwards jump. The is_before method using signed comparison of wrapping difference is the standard approach for RTP timestamp comparison.

livekit-datatrack/src/local/events.rs (2)

24-44: LGTM! Clean event-driven architecture.

The InputEvent and OutputEvent enums provide a clear separation between events consumed by the manager and events produced for external handling. The FromVariants derive reduces boilerplate for event construction.


54-60: LGTM! Appropriate use of oneshot channel for async result.

Using oneshot::Sender for the publish result allows the caller to await the outcome without blocking the manager's event loop.

livekit/Cargo.toml (1)

33-33: LGTM! New data track dependency added.

The livekit-datatrack workspace dependency enables the data tracks feature integration.

livekit-datatrack/src/track.rs (4)

36-47: LGTM! Clean accessor methods with proper delegation.

The info() and is_published() methods correctly delegate to the underlying state, with is_published() borrowing from the watch receiver appropriately.


54-61: LGTM! Correct async unpublish wait implementation.

The method properly handles the already-unpublished case with an early return, and the wait_for call correctly waits for the condition !*is_published. Discarding the result with _ = is acceptable since channel closure implies the track is gone.


108-117: LGTM! Appropriate SID validation.

The TryFrom<String> implementation correctly validates the DTR_ prefix requirement, providing a type-safe way to construct DataTrackSid values.


132-144: LGTM! Well-designed test dummy with Base57 alphabet.

The fake implementation generates valid SIDs using a Base57 alphabet (excluding ambiguous characters like 0, O, I, l), which is a common practice for human-readable identifiers.

livekit-datatrack/src/local/proto.rs (4)

26-38: Publish/unpublish request mapping is consistent.

E2EE flag and handles are cleanly translated into protocol fields.


42-74: Response/DataTrackInfo conversions look solid.

The error context and encryption mapping are clear and consistent.


96-119: Sync‑state helpers look good.

The encryption mapping and response construction are straightforward.


122-204: Conversion tests provide good coverage.

Nice breadth across publish/unpublish and sync-state flows.

livekit-datatrack/src/remote/depacketizer.rs (3)

20-41: Depacketizer setup is straightforward.

Structs and constructor are clear and minimal.


43-133: Packet ingestion logic reads well.

The single/start/inter/final routing and drop handling are clean.


162-238: Result/error types are clear and well‑documented.

Nice separation between drop reasons and payload delivery.

livekit-datatrack/src/remote/pipeline.rs (5)

23-35: Pipeline option/struct definitions are clean.


49-85: Packet processing/decryption flow looks consistent.


87-93: Frame conversion is straightforward.


96-123: Test validates the non‑E2EE path well.


37-47: The E2EE/provider invariant is already enforced at all call sites.

The invariant cannot be violated at runtime. In manager.rs (line 253-257), the decryption_provider is set conditionally based on uses_e2ee(), guaranteeing the invariant before Pipeline::new() is called. The debug_assert_eq! serves as a defensive sanity check, not as the sole point of enforcement. Since the invariant is structurally guaranteed by the calling code, changing to assert_eq! would add unnecessary runtime overhead in release builds without practical safety benefit.

livekit-datatrack/README.md (1)

1-6: Clear guidance on internal crate usage.

livekit/src/room/e2ee/mod.rs (1)

22-23: Data‑track module wiring looks good.

livekit/src/room/data_track.rs (1)

15-16: Re-export keeps the data‑track API discoverable.

livekit/src/prelude.rs (1)

17-34: Prelude exports for data tracks are coherent.

livekit-datatrack/src/utils/mod.rs (1)

15-21: Nice consolidated utils module.
Centralized re-exports keep the Bytes and counter helpers discoverable.

livekit/src/rtc_engine/rtc_events.rs (2)

21-24: Label imports are now centralized.
Pulling the DC labels from rtc_session keeps constants co-located with their definitions.


100-108: Label-based dispatch is clearer.
Explicit matching avoids accidentally wiring unknown labels to the data handler.

livekit/src/room/participant/local_participant.rs (3)

34-35: Imports align with the new data-track API.
The added data_track types are consistent with the new publish path.


253-285: publish_data_track wiring looks solid.
Session validation and error propagation are clean and minimal.


543-547: Clear deprecation guidance for publish_data.
The doc note gives users a good forward path.

livekit-datatrack/src/packet/extension.rs (2)

17-33: Typed extension structs are well-shaped.
Clear separation of user timestamp and E2EE metadata.


35-52: E2EE debug redaction and metadata constants look good.
Avoiding sensitive field output is the right call.

livekit-datatrack/src/local/pipeline.rs (6)

25-43: Pipeline scaffolding and error unification are clean.
Options, state, and error types are concise and composable.


53-57: process_frame flow is straightforward.
Encrypt-then-packetize ordering is clear and easy to follow.


59-74: Conditional encryption and extension injection are correct.
The E2EE extension is attached only when encryption is active.


76-77: Explicit MTU constant improves clarity.
Having it centralized will help future tuning.


80-89: Frame-to-packetizer conversion preserves timestamps.
The user timestamp mapping is handled cleanly.


92-120: Good coverage for non‑E2EE packetization.
The test validates packet count and payload integrity.

livekit-datatrack/src/lib.rs (3)

17-39: Module layout is clear and comprehensive.
The separation of track/local/remote/frame/e2ee/packet is clean.


41-44: Public API re-export is tidy.
The api module keeps the surface cohesive.


46-58: Backend re-exports are well organized.
The nested local/remote modules mirror internal structure nicely.

livekit-datatrack/src/utils/counter.rs (3)

15-36: Counter API is concise and usable.
The get / get_then_increment split is clear.


38-41: WrappingIncrement trait is straightforward.
Good abstraction for wrap-around behavior.


43-53: Macro-based impls reduce boilerplate cleanly.
The unsigned coverage looks appropriate.

livekit/tests/data_track_test.rs (6)

36-101: Well-structured parameterized test for data track publishing and subscribing.

The test covers both high FPS/small payload and low FPS/large payload scenarios with appropriate timing and frame count validation. The 95% minimum receive threshold appropriately accounts for the intentional packet dropping behavior on backpressure.


103-140: Good stress test for publishing many tracks.

Testing 256 concurrent tracks with large frame payloads is a solid stress test for the data track infrastructure.


142-171: Authorization and duplicate name error handling tests look correct.

These tests properly verify that the API returns appropriate error variants for unauthorized publish attempts and duplicate track names.


173-233: E2EE test properly validates encrypted data track flow.

The test correctly sets up shared key providers on both sides and verifies that uses_e2ee() returns true for encrypted tracks.


360-451: Fault resilience tests cover important reconnection scenarios.

Testing both signal reconnect and full reconnect (ForceTcp) scenarios for both subscriber and publisher sides ensures data tracks remain functional across connection disruptions.


453-464: Helper function is clean and reusable.

The wait_for_remote_track helper properly filters room events for remote data track publications.

livekit-datatrack/src/packet/serialize.rs (5)

19-26: Clear error types for serialization failures.

The SerializeError enum provides specific variants for buffer capacity issues, making debugging straightforward.


28-58: Packet serialization implementation is correct.

The serialize_into method properly checks buffer capacity before writing, and serialize allocates the exact required capacity upfront.


76-128: Header serialization handles extensions correctly.

The extension word calculation, padding, and conditional extension block emission are implemented correctly. The assertion at line 125 provides a useful sanity check during development.


153-168: Extension serialization follows the expected wire format.

Both E2EE and user timestamp extensions correctly write their tag, length (minus 1 per protocol), and payload.


170-241: Comprehensive serialization tests.

The tests validate header metrics calculations, total serialized length, and byte-level serialization correctness including extension ordering and padding.

livekit/tests/common/e2e/mod.rs (2)

47-74: Clean abstraction for test room configuration.

The TestRoomOptions struct nicely encapsulates both token grants and room options, with convenient From implementations for common use cases.


76-103: Test helper functions properly updated.

The test_rooms and test_rooms_with_options functions correctly use the new TestRoomOptions structure, with proper room name injection into grants.

livekit/src/room/e2ee/manager.rs (1)

250-284: API changes align well with data track integration.

The updated signatures for decrypt_data and encrypt_data taking owned Vec<u8> are appropriate for the data track use case. The Debug implementation provides useful observability.

livekit/src/room/mod.rs (7)

248-250: New RemoteDataTrackPublished event variant expands the public API appropriately.

This event allows consumers to react to remote data track publications.


455-468: Room session and handle correctly track data track managers and tasks.

The addition of local_dt_input, remote_dt_input, and the corresponding task handles ensures proper lifecycle management.


989-994: Data track input events are correctly routed to managers.

The engine events for local and remote data track inputs are properly forwarded to their respective manager inputs.


1000-1022: Close path properly shuts down data track tasks.

The shutdown sequence sends shutdown events to managers and awaits all task handles.


1271-1298: Sync state correctly includes data track publications.

The publish_data_tracks field is populated from the local data track manager's query, ensuring the SFU has accurate state after reconnection.


1423-1428: Restart correctly resends data track subscription updates.

This ensures the SFU continues delivering packets for existing subscriptions after a full reconnect.


1878-1922: Forward tasks properly bridge manager outputs to engine/dispatcher.

The local_dt_forward_task and remote_dt_forward_task correctly handle shutdown signals and forward events appropriately. The RemoteDataTrackPublished event is dispatched directly while other remote events go to the engine.

livekit/src/room/e2ee/data_track.rs (1)

52-81: Decryption provider correctly bridges to E2eeManager.

The implementation properly extracts fields from EncryptedPayload and delegates to decrypt_data.

livekit-datatrack/src/packet/deserialize.rs (4)

22-41: Comprehensive error types for deserialization failures.

The DeserializeError enum covers all failure modes including version mismatches, buffer issues, and malformed extensions.


43-50: Packet deserialization entry point is clean.

Correctly deserializes the header first, then extracts the remaining bytes as payload.


52-95: Header deserialization handles all fields correctly.

The implementation properly validates buffer length, checks version support, parses flags, and conditionally handles extensions.


137-275: Comprehensive test coverage for deserialization.

The tests cover buffer boundary conditions, version validation, extension parsing (including padding and unknown extensions), and alignment requirements.

livekit/src/rtc_engine/mod.rs (3)

192-193: LGTM!

The new EngineEvent variants for data track input follow the established pattern for other event types in the engine.


276-292: LGTM!

The handle_local_data_track_output and handle_remote_data_track_output methods correctly follow the existing pattern used by other engine methods (e.g., publish_data, simulate_scenario) with proper reconnection handling.


632-637: LGTM!

Session event forwarding for data track inputs follows the established pattern used by other session events in on_session_event.

livekit-datatrack/src/remote/events.rs (1)

24-54: LGTM!

Well-structured event types with clear documentation. The FromVariants derive reduces boilerplate for enum conversions, and the doc comments appropriately reference the corresponding protocol types.

livekit-datatrack/src/remote/manager.rs (2)

309-311: Intentional packet dropping on backpressure—correctly implemented.

The try_send usage here aligns with the design decision that data tracks don't guarantee reliable delivery. The inspect_err for debug logging is appropriate.

Based on learnings: "data tracks do not guarantee reliable delivery in the initial version. The process_and_send method intentionally uses try_send to drop packets on backpressure."


426-559: LGTM!

Good test coverage for manager lifecycle, track task shutdown paths (via unpublish and unsubscribe), and the subscribe flow including SFU simulation. The use of fake for test data and timeouts for async tests is appropriate.

livekit-datatrack/src/packet/mod.rs (2)

28-66: LGTM!

Well-designed packet structure with appropriate derives. The custom Debug implementation that shows payload_len instead of dumping the full payload is a nice touch for debugging without log spam.


92-109: LGTM!

Good round-trip test that validates serialization/deserialization consistency. Using fake::Faker for generating test data ensures diverse input coverage.

livekit/src/rtc_engine/rtc_session.rs (5)

468-477: LGTM!

Good code reuse—extracting lossy_options and sharing it between lossy_dc and dt_transport reduces duplication and ensures consistent configuration.


627-673: LGTM!

The handle_local_data_track_output and handle_remote_data_track_output methods cleanly dispatch output events to the appropriate signaling or transport paths. The pattern matches are exhaustive for the current variants.


1074-1080: Early return short-circuits pending request handling.

When publish_result_from_request_response returns Some, the method returns early without checking pending_requests. This assumes publish responses never have associated pending requests, which appears correct but worth noting.


1144-1156: LGTM!

Clean extension of the data channel handling to support DATA_TRACK_DC_LABEL. The handle_remote_dt_packets call properly sets up the message handler before storing the channel.


1848-1904: LGTM!

Good refactoring to extract ensure_publisher_connected_with_dc as a shared helper. The new ensure_data_track_publisher_connected correctly reuses this for the data track transport.

livekit-datatrack/src/local/mod.rs (3)

83-91: LGTM!

The try_push method correctly validates publication status before sending and properly maps channel errors to PushFrameError. Returning the frame on error enables application-level retry logic.


115-120: LGTM!

The Drop implementation ensures the track is marked unpublished when the handle is dropped, preventing resource leaks and ensuring consistent state in the manager.


164-237: LGTM!

Comprehensive error types with clear documentation. The PushFrameError design allowing recovery of the frame via into_frame() is user-friendly for retry scenarios.

livekit-datatrack/src/local/manager.rs (6)

32-101: Manager scaffolding and event loop are clear.

No concerns with the options wiring, channel setup, or shutdown flow.


103-183: Publish/query/unpublish request flow looks consistent.

Forwarding results and query handling read cleanly.


202-234: Track creation and descriptor transition look solid.


268-296: TrackTask loop handles publish state cleanly.


390-463: Tests cover shutdown and publish/unpublish flow well.


298-331: Backpressure handling is appropriate for non-guaranteed delivery semantics.

Using try_send to drop packets under backpressure is the intended behavior for this non-reliable data track implementation. The error is logged but not propagated, which aligns with the design.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +23 to +31
/// Subscribe to the first data track published.
async fn handle_first_publication(mut rx: UnboundedReceiver<RoomEvent>) -> Result<()> {
log::info!("Waiting for publication…");
while let Some(event) = rx.recv().await {
let RoomEvent::RemoteDataTrackPublished(track) = event else {
continue;
};
subscribe(track).await?
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Stop after the first publication to match the function intent.

The helper says “first data track,” but it keeps listening after the first track completes. Consider returning immediately after the first subscription.

✅ Suggested fix
-        subscribe(track).await?
+        subscribe(track).await?;
+        return Ok(());
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Subscribe to the first data track published.
async fn handle_first_publication(mut rx: UnboundedReceiver<RoomEvent>) -> Result<()> {
log::info!("Waiting for publication…");
while let Some(event) = rx.recv().await {
let RoomEvent::RemoteDataTrackPublished(track) = event else {
continue;
};
subscribe(track).await?
}
/// Subscribe to the first data track published.
async fn handle_first_publication(mut rx: UnboundedReceiver<RoomEvent>) -> Result<()> {
log::info!("Waiting for publication…");
while let Some(event) = rx.recv().await {
let RoomEvent::RemoteDataTrackPublished(track) = event else {
continue;
};
subscribe(track).await?;
return Ok(());
}
🤖 Prompt for AI Agents
In `@examples/basic_data_track/src/subscriber.rs` around lines 23 - 31, The
function handle_first_publication currently continues listening after
subscribing to the first RemoteDataTrack; update it to stop after the first
successful subscription by returning (or breaking and returning) immediately
once subscribe(track).await? completes. Locate the loop in
handle_first_publication and add an early return (e.g., return Ok(())) right
after the subscribe(track).await? call so the function only handles the first
publication.

Comment on lines +4 to +17
edition = "2021"
publish = false

[dependencies]
tokio = { workspace = true, features = ["full"] }
env_logger = { workspace = true }
tokio = { version = "1", features = ["full"] }
env_logger = "0.10"
livekit = { workspace = true, features = ["rustls-tls-native-roots"] }
livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] }
libwebrtc = { workspace = true }
log = { workspace = true }
log = "0.4"
cpal = "0.15"
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
futures-util = { workspace = true }
anyhow = "1.0"
clap = { version = "4.0", features = ["derive"] }
futures-util = "0.3"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash

# Find all Cargo.toml files and extract versions for the specified dependencies
echo "=== Searching for dependency versions in Cargo.toml files ==="

# First, let's find and list all Cargo.toml files
echo "--- All Cargo.toml files in workspace ---"
find . -name "Cargo.toml" -type f | head -20

echo ""
echo "--- Root workspace Cargo.toml ---"
if [ -f "Cargo.toml" ]; then
  echo "Root Cargo.toml found"
  echo "Contents of root Cargo.toml (first 50 lines):"
  head -50 Cargo.toml
fi

echo ""
echo "--- examples/local_audio/Cargo.toml ---"
if [ -f "examples/local_audio/Cargo.toml" ]; then
  echo "File exists"
  cat examples/local_audio/Cargo.toml
fi

echo ""
echo "--- Searching for tokio versions ---"
rg "tokio\s*=" Cargo.toml -A 1 | head -40

Repository: livekit/rust-sdks

Length of output: 2889


🏁 Script executed:

#!/bin/bash

echo "=== Checking for dependency versions across ALL examples ==="

# Search for each dependency in all Cargo.toml files
for dep in "tokio" "env_logger" "log" "anyhow" "clap" "futures-util"; do
  echo ""
  echo "--- $dep versions across workspace ---"
  rg "^\s*$dep\s*=" --type toml | grep -E "(examples|Cargo)"
done

Repository: livekit/rust-sdks

Length of output: 6378


Align dependency versions with workspace defaults.

The example declares explicit versions that drift from the root workspace: env_logger is pinned to 0.10 while the workspace defines 0.11, and clap is pinned to 4.0 while the workspace defines 4.5. Update these to match the workspace versions unless there's a specific reason for the deviation.

🤖 Prompt for AI Agents
In `@examples/local_audio/Cargo.toml` around lines 4 - 17, Update the example
Cargo.toml dependencies to match the workspace defaults by aligning env_logger
and clap versions with the workspace (remove the explicit version pins or set
them to the workspace versions) so env_logger uses 0.11 and clap uses 4.5;
update the dependency entries for env_logger and clap in the
examples/local_audio Cargo.toml (the dependency keys "env_logger" and "clap") to
inherit or match the workspace versions.

Comment on lines +4 to +14
edition = "2021"
publish = false

[dependencies]
tokio = { workspace = true, features = ["full"] }
env_logger = { workspace = true }
tokio = { version = "1", features = ["full"] }
env_logger = "0.10"
livekit = { workspace = true, features = ["rustls-tls-native-roots"] }
livekit-api = { workspace = true, features = ["rustls-tls-native-roots"] }
log = { workspace = true }
rand = { workspace = true }
serde_json = { workspace = true }
log = "0.4"
rand = "0.8"
serde_json = "1.0"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for explicit version declarations of these dependencies in Cargo.toml files
rg -n '^(tokio|env_logger|log|rand|serde_json)\s*=' Cargo.toml

Repository: livekit/rust-sdks

Length of output: 151


Update examples/rpc/Cargo.toml dependencies to match workspace versions.

Version drift detected: env_logger is pinned to "0.10" but workspace uses "0.11"; rand is pinned to "0.8" but workspace uses "0.9". Update these to "0.11" and "0.9" respectively to maintain consistency across the workspace.

🤖 Prompt for AI Agents
In `@examples/rpc/Cargo.toml` around lines 4 - 14, Update the dependency versions
in the examples/rpc Cargo.toml so they match the workspace: change the
env_logger dependency from "0.10" to "0.11" and change rand from "0.8" to "0.9"
(the entries for env_logger and rand in the examples/rpc Cargo.toml); leave
other dependency entries unchanged.

Comment on lines +4 to +13
edition = "2021"
publish = false

[dependencies]
tokio = { workspace = true, features = ["full"] }
env_logger = { workspace = true }
tokio = { version = "1", features = ["full"] }
env_logger = "0.10"
livekit = { workspace = true, features = ["rustls-tls-native-roots"] }
livekit-api = { workspace = true }
log = { workspace = true }
clap = { workspace = true, features = ["derive"] }
log = "0.4"
clap = { version = "4.0", features = ["derive"] }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# List versions of deps used here across the workspace.
rg -n '^(tokio|env_logger|log|clap)\s*=' -g 'Cargo.toml'

Repository: livekit/rust-sdks

Length of output: 4918


Align example dependency versions with workspace root Cargo.toml.

This example declares explicit versions for dependencies, but they drift from the workspace root:

  • env_logger = "0.10" should be "0.11" (root specifies 0.11)
  • clap = "4.0" should be "4.5" (root specifies 4.5)

Consistent pinning across examples reduces confusion about which versions are tested.

🤖 Prompt for AI Agents
In `@examples/screensharing/Cargo.toml` around lines 4 - 13, Update the example
Cargo.toml dependencies to match the workspace root: change env_logger from
"0.10" to "0.11" and change clap from "4.0" to "4.5" so the declared versions
for env_logger and clap in the examples align with the workspace root; verify
the dependency entries for env_logger and clap in the
examples/screensharing/Cargo.toml are updated accordingly while leaving other
entries (tokio, livekit, livekit-api, log) unchanged.

Comment on lines +4 to +26
edition = "2021"
publish = false

[features]
default = []
tracing = ["console-subscriber", "tokio/tracing"]

[dependencies]
tokio = { workspace = true, features = ["full", "parking_lot"] }
tokio = { version = "1", features = ["full", "parking_lot"] }
livekit = { workspace = true, features = ["rustls-tls-native-roots"]}
webrtc-sys = { workspace = true }
futures = { workspace = true }
futures = "0.3"
winit = { version = "0.30.11", features = [ "android-native-activity" ] }
parking_lot = { workspace = true, features = ["deadlock_detection"] }
parking_lot = { version = "0.12.1", features = ["deadlock_detection"] }
image = "0.24"
wgpu = "25.0"
egui = "0.31.1"
egui-wgpu = "0.31.1"
eframe = { version = "0.31.1", default-features = false, features = ["default_fonts", "wgpu", "persistence"] }
serde = { workspace = true, features = ["derive"] }
log = { workspace = true }
env_logger = { workspace = true }
console-subscriber = { workspace = true, features = ["parking_lot"], optional = true }
serde = { version = "1", features = ["derive"] }
log = "0.4"
env_logger = "0.10.0"
console-subscriber = { version = "0.1.10", features = ["parking_lot"], optional = true }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the root Cargo.toml and workspace members
echo "=== Root workspace structure ==="
cat Cargo.toml | head -30

echo ""
echo "=== Searching for dependency versions across workspace Cargo.toml files ==="
# Search for the specific dependencies mentioned in the review
rg -A 1 '^\[dependencies\]' -g 'Cargo.toml' | head -200

echo ""
echo "=== Checking versions of specific dependencies ==="
# More focused search for each dependency version
for dep in tokio futures parking_lot serde log env_logger console-subscriber; do
  echo "--- $dep versions ---"
  rg "$dep\s*=" -g 'Cargo.toml' --max-count 10
done

Repository: livekit/rust-sdks

Length of output: 10521


Pin dependency versions to workspace definitions or accept version drift explicitly.

This example declares explicit versions that diverge from the workspace root:

  • env_logger: root defines "0.11" but this uses "0.10.0"
  • parking_lot: root defines "0.12" but this uses "0.12.1"

Either use { workspace = true } for consistency, accept the drift, or document why these versions differ.

🤖 Prompt for AI Agents
In `@examples/wgpu_room/Cargo.toml` around lines 4 - 26, The Cargo.toml example
pins env_logger = "0.10.0" and parking_lot = "0.12.1" which diverge from the
workspace root versions; update the dependency declarations for env_logger and
parking_lot to either use workspace = true (e.g., env_logger = { workspace =
true } and parking_lot = { workspace = true, features = ["deadlock_detection"]
}) to inherit the root versions, or explicitly change the versions to match the
workspace root (env_logger "0.11" and parking_lot "0.12") and document the
intentional drift in a comment if you choose to keep different versions; adjust
only the entries for env_logger and parking_lot in this Cargo.toml.

Comment on lines +66 to +80
fn event_from_participant_info(
msg: &mut [proto::ParticipantInfo],
local_participant_identity: Option<&str>,
) -> Result<SfuPublicationUpdates, InternalError> {
let updates = msg
.iter_mut()
.filter(|participant| {
local_participant_identity.is_none_or(|identity| participant.identity != identity)
})
.map(|participant| -> Result<_, InternalError> {
Ok((participant.identity.clone(), extract_track_info(participant)?))
})
.collect::<Result<HashMap<String, Vec<DataTrackInfo>>, _>>()?;
Ok(SfuPublicationUpdates { updates })
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check for MSRV declarations in Cargo.toml files
rg -n 'rust-version|msrv' --type=toml

Repository: livekit/rust-sdks

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Find all Cargo.toml files in the repository
fd -n 'Cargo.toml$' -exec cat -n {} +

Repository: livekit/rust-sdks

Length of output: 291


🏁 Script executed:

#!/bin/bash
# Find all Cargo.toml files and display their contents
fd 'Cargo.toml$' -exec cat {} +

Repository: livekit/rust-sdks

Length of output: 248


🏁 Script executed:

#!/bin/bash
# Find all Cargo.toml files
fd 'Cargo.toml$'

Repository: livekit/rust-sdks

Length of output: 926


🏁 Script executed:

#!/bin/bash
# Check root Cargo.toml and the relevant package Cargo.toml
echo "=== ROOT CARGO.TOML ==="
cat Cargo.toml
echo -e "\n=== LIVEKIT-DATATRACK CARGO.TOML ==="
cat livekit-datatrack/Cargo.toml

Repository: livekit/rust-sdks

Length of output: 3054


Set MSRV to 1.82 or replace is_none_or with a compatible alternative.

The code uses Option::is_none_or (stabilized in Rust 1.82), but the project has no explicit rust-version declaration. Edition 2021 defaults to Rust 1.56 minimum, meaning users with Rust 1.56–1.81 will fail to compile this code.

Either add rust-version = "1.82" to livekit-datatrack/Cargo.toml or replace is_none_or with .map_or(true, |identity| ...) to maintain compatibility.

🤖 Prompt for AI Agents
In `@livekit-datatrack/src/remote/proto.rs` around lines 66 - 80, The call to
Option::is_none_or in event_from_participant_info (using
local_participant_identity.is_none_or(...)) requires Rust 1.82; either set the
crate MSRV by adding rust-version = "1.82" to the crate's Cargo.toml, or replace
the is_none_or usage with a compatible alternative such as
local_participant_identity.map_or(true, |identity| participant.identity !=
identity) inside event_from_participant_info so extract_track_info and the
participant filtering work on older compilers.

tokio = { version = "1", default-features = false, features = ["sync", "macros", "fs"] }
parking_lot = { version = "0.12" }
futures-util = { version = "0.3", default-features = false, features = ["sink"] }
thiserror = "1.0"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check thiserror versions across the workspace
rg -n 'thiserror' --type=toml -g 'Cargo.toml'

Repository: livekit/rust-sdks

Length of output: 525


Align thiserror versions across workspace: 1.0 vs 2.0.17 mismatch.

The workspace uses thiserror = "1.0" as the default (root Cargo.toml and most crates via workspace = true), but livekit-datatrack explicitly specifies thiserror = "2.0.17". This creates two different versions in the dependency graph, risking incompatible error types if errors cross crate boundaries. Notably, livekit-datatrack includes a TODO comment acknowledging this: # TODO: upgrade workspace version to 2.x.x. Upgrade the workspace version to 2.x.x and update all crates accordingly.

🤖 Prompt for AI Agents
In `@livekit/Cargo.toml` at line 40, The workspace is using thiserror = "1.0"
while livekit-datatrack pins thiserror = "2.0.17", causing duplicate versions;
update the root/workspace Cargo.toml's thiserror version to a 2.x range (e.g.,
"2.0") and then update each crate that currently inherits or specifies thiserror
(including the livekit crate entries and the livekit-datatrack Cargo.toml) to
the same 2.x version so all crates use a single thiserror v2 line; ensure
removal of any older explicit "1.0" entries and run cargo update to refresh
Cargo.lock so error types are unified across the workspace.

Comment on lines +19 to +50
/// Wrapper around [`E2eeManager`] implementing [`dt::EncryptionProvider`].
#[derive(Debug)]
pub(crate) struct DataTrackEncryptionProvider {
manager: E2eeManager,
sender_identity: ParticipantIdentity,
}

impl DataTrackEncryptionProvider {
pub fn new(manager: E2eeManager, sender_identity: ParticipantIdentity) -> Self {
Self { manager, sender_identity }
}
}

impl dt::EncryptionProvider for DataTrackEncryptionProvider {
fn encrypt(&self, payload: bytes::Bytes) -> Result<dt::EncryptedPayload, dt::EncryptionError> {
let key_index = self
.manager
.key_provider()
.map_or(0, |kp| kp.get_latest_key_index() as u32);

let encrypted = self
.manager
.encrypt_data(payload.into(), &self.sender_identity, key_index)
.map_err(|_| dt::EncryptionError)?;

let payload = encrypted.data.into();
let iv = encrypted.iv.try_into().map_err(|_| dt::EncryptionError)?;
let key_index = encrypted.key_index.try_into().map_err(|_| dt::EncryptionError)?;

Ok(dt::EncryptedPayload { payload, iv, key_index })
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Encryption provider implementation looks correct.

The provider properly retrieves the latest key index and delegates to E2eeManager.encrypt_data. The error handling maps all errors to dt::EncryptionError.

Note: The pipeline reports a formatting issue at line 31. Run cargo fmt to fix.

🔧 Run cargo fmt to fix formatting
cargo fmt
🧰 Tools
🪛 GitHub Actions: Rust Formatting

[error] 31-31: cargo fmt -- --check detected a formatting difference in data_track.rs. Run 'cargo fmt' to fix code style issues.

🤖 Prompt for AI Agents
In `@livekit/src/room/e2ee/data_track.rs` around lines 19 - 50, The formatting in
DataTrackEncryptionProvider::encrypt (and surrounding code) doesn't match
rustfmt; run `cargo fmt` to reformat the file so the spacing/indentation around
the key_index and encrypted variable assignments is fixed; ensure
DataTrackEncryptionProvider, its new constructor, and the impl
dt::EncryptionProvider::encrypt (which calls E2eeManager::key_provider and
E2eeManager::encrypt_data) are left unchanged functionally and only reformatted.

Comment on lines +272 to 277
pub(crate) fn encrypt_data(
&self,
data: &[u8],
participant_identity: &str,
key_index: u32,
data: Vec<u8>,
participant_identity: &ParticipantIdentity,
key_index: u32
) -> Result<EncryptedPacket, Box<dyn std::error::Error>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix formatting to pass CI.

The pipeline reports a missing trailing comma after the key_index parameter.

🔧 Proposed fix
     pub(crate) fn encrypt_data(
         &self,
         data: Vec<u8>,
         participant_identity: &ParticipantIdentity,
-        key_index: u32
+        key_index: u32,
     ) -> Result<EncryptedPacket, Box<dyn std::error::Error>> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub(crate) fn encrypt_data(
&self,
data: &[u8],
participant_identity: &str,
key_index: u32,
data: Vec<u8>,
participant_identity: &ParticipantIdentity,
key_index: u32
) -> Result<EncryptedPacket, Box<dyn std::error::Error>> {
pub(crate) fn encrypt_data(
&self,
data: Vec<u8>,
participant_identity: &ParticipantIdentity,
key_index: u32,
) -> Result<EncryptedPacket, Box<dyn std::error::Error>> {
🧰 Tools
🪛 GitHub Actions: Rust Formatting

[error] 273-273: cargo fmt -- --check detected a formatting difference in manager.rs (added trailing comma after key_index parameter). Run 'cargo fmt' to fix code style issues.

🤖 Prompt for AI Agents
In `@livekit/src/room/e2ee/manager.rs` around lines 272 - 277, The function
signature for encrypt_data is missing a trailing comma after the key_index
parameter; update the signature in the encrypt_data method (the parameters:
data: Vec<u8>, participant_identity: &ParticipantIdentity, key_index: u32) to
include a trailing comma after key_index to satisfy formatting/CI requirements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@livekit/src/rtc_engine/dc_sender.rs`:
- Around line 108-112: The log message has a grammatical typo: update the string
in the block that checks self.send_queue (where unsent_bytes is computed) to use
"byte(s)" instead of "bytes(s)"; i.e., modify the log::info! call that currently
prints "{} bytes(s) remain in queue" to print "{} byte(s) remain in queue" so
send_queue/unsent_bytes logging is correct.
- Around line 116-127: The buffered_amount is increased before calling dc.send()
in send_until_threshold, which leaves it inflated on send failures; change the
logic so buffered_amount is only incremented after a successful send or, more
minimally, decrement buffered_amount when dc.send() returns Err: update
send_until_threshold to attempt send first, and on Ok increment
self.buffered_amount by payload.len() as u64 (or if keeping the current order,
subtract payload.len() in the inspect_err closure), referencing the
send_until_threshold method, self.buffered_amount, self.send_queue, and dc.send
to locate and fix the update so BytesSent callbacks and error paths keep
buffered_amount consistent.
🧹 Nitpick comments (2)
livekit/src/rtc_engine/dc_sender.rs (1)

71-72: Consider making the send channel capacity configurable.

The channel capacity is hardcoded to 128. If different use cases require different buffer depths, this could be exposed via DataChannelSenderOptions.

livekit/src/rtc_engine/rtc_session.rs (1)

670-680: Consider handling or logging unmatched output events.

The _ => {} match arm at Line 678 silently ignores any OutputEvent variants that aren't SfuUpdateSubscription. If new variants are added to OutputEvent in the future, they would be silently ignored, potentially masking bugs.

♻️ Option 1: Make the match exhaustive
         match event.into() {
             OutputEvent::SfuUpdateSubscription(event) => {
                 self.signal_client()
                     .send(proto::signal_request::Message::UpdateDataSubscription(event.into()))
                     .await
             }
-            _ => {}
+            // Explicitly list other variants when added
         }
♻️ Option 2: Log unhandled variants
-            _ => {}
+            other => {
+                log::debug!("Unhandled remote data track output event: {:?}", other);
+            }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 861755c and f612712.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (4)
  • Cargo.toml
  • livekit/src/rtc_engine/dc_sender.rs
  • livekit/src/rtc_engine/mod.rs
  • livekit/src/rtc_engine/rtc_session.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • Cargo.toml
  • livekit/src/rtc_engine/mod.rs
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.
📚 Learning: 2026-02-03T01:23:11.346Z
Learnt from: ladvoc
Repo: livekit/rust-sdks PR: 862
File: livekit-datatrack/src/local/manager.rs:298-311
Timestamp: 2026-02-03T01:23:11.346Z
Learning: In the livekit-datatrack crate, data tracks do not guarantee reliable delivery in the initial version. The `process_and_send` method in `livekit-datatrack/src/local/manager.rs` intentionally uses `try_send` to drop packets on backpressure rather than propagating it, and this is expected behavior.

Applied to files:

  • livekit/src/rtc_engine/dc_sender.rs
  • livekit/src/rtc_engine/rtc_session.rs
🧬 Code graph analysis (1)
livekit/src/rtc_engine/rtc_session.rs (5)
livekit/src/rtc_engine/dc_sender.rs (1)
  • new (71-86)
livekit-datatrack/src/remote/proto.rs (2)
  • event_from_join (47-51)
  • event_from_participant_update (58-64)
livekit-datatrack/src/local/proto.rs (1)
  • publish_result_from_request_response (76-92)
livekit/src/room/e2ee/manager.rs (1)
  • encryption_type (199-202)
livekit/src/rtc_engine/rtc_events.rs (1)
  • on_message (157-165)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: Build (aarch64-linux-android)
  • GitHub Check: Build (aarch64-unknown-linux-gnu)
  • GitHub Check: Build (x86_64-pc-windows-msvc)
  • GitHub Check: Build (aarch64-pc-windows-msvc)
  • GitHub Check: Build (aarch64-apple-ios)
  • GitHub Check: Build (x86_64-apple-darwin)
  • GitHub Check: Build (aarch64-apple-ios-sim)
  • GitHub Check: Build (armv7-linux-androideabi)
  • GitHub Check: Build (x86_64-unknown-linux-gnu)
  • GitHub Check: Build (aarch64-apple-darwin)
  • GitHub Check: Build (x86_64-linux-android)
  • GitHub Check: Test (x86_64-unknown-linux-gnu)
  • GitHub Check: Test (x86_64-apple-darwin)
  • GitHub Check: Test (x86_64-pc-windows-msvc)
🔇 Additional comments (15)
livekit/src/rtc_engine/dc_sender.rs (3)

20-62: LGTM!

The struct design with internal buffered_amount tracking and send_queue is appropriate for implementing backpressure on data channel transmission. The documentation clearly explains the purpose and suggests future refactoring opportunities.


129-142: LGTM!

Good defensive coding in handle_bytes_sent to handle the edge case where bytes_sent exceeds buffered_amount, preventing underflow.


144-169: LGTM!

Good use of WeakUnboundedSender to prevent the callback from keeping the channel alive. The design is extensible for adding more event types in the future.

livekit/src/rtc_engine/rtc_session.rs (12)

26-72: LGTM!

The new imports and DATA_TRACK_DC_LABEL constant follow the existing patterns established by LOSSY_DC_LABEL and RELIABLE_DC_LABEL.


208-209: LGTM!

New session event variants cleanly integrate the data track input events from the livekit_datatrack crate.


357-377: LGTM!

The new data track transport fields follow the existing patterns for lossy and reliable data channels.


538-548: LGTM!

The dt_sender_task lifecycle is properly managed—spawned on connect and awaited on close, consistent with the other session tasks.


449-451: LGTM!

Properly extracts initial remote data track publications from the join response and emits them as session events.


474-498: LGTM!

Good refactoring to extract lossy_options for reuse. The data track transport using the same unreliable configuration aligns with the design where data tracks do not guarantee reliable delivery. Based on learnings: "data tracks do not guarantee reliable delivery in the initial version."


682-694: LGTM!

The use of try_send with break-on-error aligns with the intentional design where data tracks drop packets on backpressure rather than propagating it. Based on learnings: "The process_and_send method... intentionally uses try_send to drop packets on backpressure rather than propagating it, and this is expected behavior."


1057-1118: LGTM!

The signal event handling correctly routes data track-related responses to the appropriate session events. The early return at Line 1100 properly prevents RequestResponse messages for data track publishes from being processed by the generic pending_requests handler.


1163-1177: LGTM!

Good refactor to use a match expression for data channel routing. The handle_remote_dt_packets callback is correctly registered before storing the data channel reference.


1348-1372: LGTM!

The encryption type is now correctly extracted from the encrypted packet and propagated through the decryption path to the emitted events.


1869-1925: LGTM!

Good refactoring to extract the common publisher connection logic into ensure_publisher_connected_with_dc, reducing duplication between the standard data channel path and the data track transport path.


1963-1975: LGTM with a note on the TODO.

The handle_remote_dt_packets function correctly uses WeakUnboundedSender to avoid preventing channel close. The TODO at Line 1970 about avoiding the clone is a valid future optimization—buffer.data could potentially be converted to Bytes more efficiently depending on the underlying type.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +108 to +112
if !self.send_queue.is_empty() {
let unsent_bytes: usize =
self.send_queue.into_iter().map(|payload| payload.len()).sum();
log::info!("{} bytes(s) remain in queue", unsent_bytes);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor typo in log message.

Line 111 has "bytes(s)" which should be "byte(s)" for grammatical correctness.

📝 Proposed fix
-            log::info!("{} bytes(s) remain in queue", unsent_bytes);
+            log::info!("{} byte(s) remain in queue", unsent_bytes);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if !self.send_queue.is_empty() {
let unsent_bytes: usize =
self.send_queue.into_iter().map(|payload| payload.len()).sum();
log::info!("{} bytes(s) remain in queue", unsent_bytes);
}
if !self.send_queue.is_empty() {
let unsent_bytes: usize =
self.send_queue.into_iter().map(|payload| payload.len()).sum();
log::info!("{} byte(s) remain in queue", unsent_bytes);
}
🤖 Prompt for AI Agents
In `@livekit/src/rtc_engine/dc_sender.rs` around lines 108 - 112, The log message
has a grammatical typo: update the string in the block that checks
self.send_queue (where unsent_bytes is computed) to use "byte(s)" instead of
"bytes(s)"; i.e., modify the log::info! call that currently prints "{} bytes(s)
remain in queue" to print "{} byte(s) remain in queue" so
send_queue/unsent_bytes logging is correct.

Comment on lines +116 to +127
fn send_until_threshold(&mut self) {
while self.buffered_amount <= self.low_buffer_threshold {
let Some(payload) = self.send_queue.pop_front() else {
break;
};
self.buffered_amount += payload.len() as u64;
_ = self
.dc
.send(&payload, true)
.inspect_err(|err| log::error!("Failed to send data: {}", err));
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential buffered_amount inconsistency on send failure.

buffered_amount is incremented before calling dc.send() (Line 121). If send() fails (Line 124), the bytes weren't actually buffered by the data channel, but buffered_amount remains inflated. Since BytesSent callbacks only fire for successful sends, this could cause the sender to prematurely stop sending due to perceived backpressure.

Consider either:

  1. Incrementing buffered_amount only after a successful send, or
  2. Decrementing on failure.
🛡️ Proposed fix to decrement on failure
 fn send_until_threshold(&mut self) {
     while self.buffered_amount <= self.low_buffer_threshold {
         let Some(payload) = self.send_queue.pop_front() else {
             break;
         };
-        self.buffered_amount += payload.len() as u64;
-        _ = self
+        let payload_len = payload.len() as u64;
+        self.buffered_amount += payload_len;
+        if self
             .dc
             .send(&payload, true)
-            .inspect_err(|err| log::error!("Failed to send data: {}", err));
+            .inspect_err(|err| log::error!("Failed to send data: {}", err))
+            .is_err()
+        {
+            self.buffered_amount -= payload_len;
+        }
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn send_until_threshold(&mut self) {
while self.buffered_amount <= self.low_buffer_threshold {
let Some(payload) = self.send_queue.pop_front() else {
break;
};
self.buffered_amount += payload.len() as u64;
_ = self
.dc
.send(&payload, true)
.inspect_err(|err| log::error!("Failed to send data: {}", err));
}
}
fn send_until_threshold(&mut self) {
while self.buffered_amount <= self.low_buffer_threshold {
let Some(payload) = self.send_queue.pop_front() else {
break;
};
let payload_len = payload.len() as u64;
self.buffered_amount += payload_len;
if self
.dc
.send(&payload, true)
.inspect_err(|err| log::error!("Failed to send data: {}", err))
.is_err()
{
self.buffered_amount -= payload_len;
}
}
}
🤖 Prompt for AI Agents
In `@livekit/src/rtc_engine/dc_sender.rs` around lines 116 - 127, The
buffered_amount is increased before calling dc.send() in send_until_threshold,
which leaves it inflated on send failures; change the logic so buffered_amount
is only incremented after a successful send or, more minimally, decrement
buffered_amount when dc.send() returns Err: update send_until_threshold to
attempt send first, and on Ok increment self.buffered_amount by payload.len() as
u64 (or if keeping the current order, subtract payload.len() in the inspect_err
closure), referencing the send_until_threshold method, self.buffered_amount,
self.send_queue, and dc.send to locate and fix the update so BytesSent callbacks
and error paths keep buffered_amount consistent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant