diff --git a/CMakeLists.txt b/CMakeLists.txt index 07fcb34..6baabde 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -158,6 +158,7 @@ add_custom_command( add_library(livekit include/livekit/audio_frame.h include/livekit/audio_source.h + include/livekit/audio_stream.h include/livekit/room.h include/livekit/room_delegate.h include/livekit/ffi_handle.h @@ -166,6 +167,7 @@ add_library(livekit include/livekit/remote_audio_track.h include/livekit/participant.h include/livekit/local_participant.h + include/livekit/remote_participant.h include/livekit/livekit.h include/livekit/stats.h include/livekit/track.h @@ -174,10 +176,12 @@ add_library(livekit include/livekit/remote_track_publication.h include/livekit/video_frame.h include/livekit/video_source.h + include/livekit/video_stream.h include/livekit/local_video_track.h include/livekit/remote_video_track.h src/audio_frame.cpp src/audio_source.cpp + src/audio_stream.cpp src/ffi_handle.cpp src/ffi_client.cpp src/local_audio_track.cpp @@ -186,6 +190,7 @@ add_library(livekit src/room_proto_converter.cpp src/room_proto_converter.h src/local_participant.cpp + src/remote_participant.cpp src/stats.cpp src/track.cpp src/track_proto_converter.cpp @@ -195,6 +200,7 @@ add_library(livekit src/remote_track_publication.cpp src/video_frame.cpp src/video_source.cpp + src/video_stream.cpp src/local_video_track.cpp src/remote_video_track.cpp src/video_utils.cpp diff --git a/include/livekit/audio_stream.h b/include/livekit/audio_stream.h new file mode 100644 index 0000000..84aac8a --- /dev/null +++ b/include/livekit/audio_stream.h @@ -0,0 +1,107 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "audio_frame.h" +#include "ffi_handle.h" +#include "participant.h" +#include "track.h" + +namespace livekit { + +namespace proto { +class FfiEvent; +} + +struct AudioFrameEvent { + AudioFrame frame; +}; + +class AudioStream { +public: + struct Options { + std::size_t capacity{0}; // 0 = unbounded + int sample_rate{48000}; + int num_channels{1}; + std::string noise_cancellation_module; // empty = disabled + std::string noise_cancellation_options_json; // empty = no options + }; + + // Factory: create an AudioStream bound to a specific Track + static std::unique_ptr + from_track(const std::shared_ptr &track, const Options &options); + + // Factory: create an AudioStream from a Participant + TrackSource + static std::unique_ptr from_participant(Participant &participant, + TrackSource track_source, + const Options &options); + + ~AudioStream(); + + AudioStream(const AudioStream &) = delete; + AudioStream &operator=(const AudioStream &) = delete; + AudioStream(AudioStream &&) noexcept; + AudioStream &operator=(AudioStream &&) noexcept; + + /// Blocking read: returns true if a frame was delivered, + /// false if the stream has ended (EOS or closed). + bool read(AudioFrameEvent &out_event); + + /// Signal that we are no longer interested in frames. + /// Disposes the underlying FFI stream and removes the listener. + void close(); + +private: + AudioStream() = default; + + void init_from_track(const std::shared_ptr &track, + const Options &options); + void init_from_participant(Participant &participant, TrackSource track_source, + const Options &options); + + // FFI event handler (registered with FfiClient) + void on_ffi_event(const proto::FfiEvent &event); + + // Queue helpers + void push_frame(AudioFrameEvent &&ev); + void push_eos(); + + mutable std::mutex mutex_; + std::condition_variable cv_; + std::deque queue_; + std::size_t capacity_{0}; + bool eof_{false}; + bool closed_{false}; + + Options options_; + + // Underlying FFI audio stream handle + FfiHandle stream_handle_; + + // Listener id registered on FfiClient + std::int64_t listener_id_{0}; +}; + +} // namespace livekit diff --git a/include/livekit/livekit.h b/include/livekit/livekit.h index 23d63c4..4b5fc30 100644 --- a/include/livekit/livekit.h +++ b/include/livekit/livekit.h @@ -21,6 +21,7 @@ #include "local_track_publication.h" #include "local_video_track.h" #include "participant.h" +#include "remote_participant.h" #include "room.h" #include "room_delegate.h" #include "track_publication.h" diff --git a/include/livekit/local_track_publication.h b/include/livekit/local_track_publication.h index b53e4f5..3e8c6ed 100644 --- a/include/livekit/local_track_publication.h +++ b/include/livekit/local_track_publication.h @@ -28,7 +28,8 @@ class Track; class LocalTrackPublication : public TrackPublication { public: - /// Construct from an OwnedTrackPublication proto. + /// Note, this RemoteTrackPublication is constructed internally only; + /// safe to accept proto::OwnedTrackPublication. explicit LocalTrackPublication(const proto::OwnedTrackPublication &owned); /// Typed accessor for the attached LocalTrack (if any). diff --git a/include/livekit/participant.h b/include/livekit/participant.h index 826263b..a0003fe 100644 --- a/include/livekit/participant.h +++ b/include/livekit/participant.h @@ -41,7 +41,7 @@ class Participant { metadata_(std::move(metadata)), attributes_(std::move(attributes)), kind_(kind), reason_(reason) {} - // Plain getters/setters (caller ensures threading) + // Plain getters (caller ensures threading) const std::string &sid() const noexcept { return sid_; } const std::string &name() const noexcept { return name_; } const std::string &identity() const noexcept { return identity_; } @@ -55,6 +55,24 @@ class Participant { uintptr_t ffiHandleId() const noexcept { return handle_.get(); } + // Setters (caller ensures threading) + void set_name(std::string name) noexcept { name_ = std::move(name); } + void set_metadata(std::string metadata) noexcept { + metadata_ = std::move(metadata); + } + void + set_attributes(std::unordered_map attrs) noexcept { + attributes_ = std::move(attrs); + } + void set_attribute(const std::string &key, const std::string &value) { + attributes_[key] = value; + } + void remove_attribute(const std::string &key) { attributes_.erase(key); } + void set_kind(ParticipantKind kind) noexcept { kind_ = kind; } + void set_disconnect_reason(DisconnectReason reason) noexcept { + reason_ = reason; + } + private: FfiHandle handle_; std::string sid_, name_, identity_, metadata_; diff --git a/include/livekit/remote_participant.h b/include/livekit/remote_participant.h new file mode 100644 index 0000000..d1c1579 --- /dev/null +++ b/include/livekit/remote_participant.h @@ -0,0 +1,60 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "participant.h" + +#include +#include +#include + +namespace livekit { + +class RemoteTrackPublication; + +class RemoteParticipant : public Participant { +public: + using TrackPublicationMap = + std::unordered_map>; + + RemoteParticipant(FfiHandle handle, std::string sid, std::string name, + std::string identity, std::string metadata, + std::unordered_map attributes, + ParticipantKind kind, DisconnectReason reason); + + // A dictionary of track publications associated with the participant. + const TrackPublicationMap &track_publications() const noexcept { + return track_publications_; + } + + // Optional: non-const access if you want to mutate in-place. + TrackPublicationMap &mutable_track_publications() noexcept { + return track_publications_; + } + + // C++ equivalent of Python's __repr__ + std::string to_string() const; + +private: + TrackPublicationMap track_publications_; +}; + +// Convenience for logging / streaming +std::ostream &operator<<(std::ostream &os, + const RemoteParticipant &participant); + +} // namespace livekit diff --git a/include/livekit/remote_track_publication.h b/include/livekit/remote_track_publication.h index 9066058..aa39408 100644 --- a/include/livekit/remote_track_publication.h +++ b/include/livekit/remote_track_publication.h @@ -28,6 +28,8 @@ class Track; class RemoteTrackPublication : public TrackPublication { public: + /// Note, this RemoteTrackPublication is constructed internally only; + /// safe to accept proto::OwnedTrackPublication. explicit RemoteTrackPublication(const proto::OwnedTrackPublication &owned); /// Typed accessor for the attached RemoteTrack (if any). diff --git a/include/livekit/room.h b/include/livekit/room.h index 54f0d99..98077ed 100644 --- a/include/livekit/room.h +++ b/include/livekit/room.h @@ -32,6 +32,7 @@ class FfiEvent; } class LocalParticipant; +class RemoteParticipant; class Room { public: @@ -43,6 +44,7 @@ class Room { // Accessors RoomInfoData room_info() const; LocalParticipant *local_participant() const; + RemoteParticipant *remote_participant(const std::string &identity) const; private: mutable std::mutex lock_; @@ -51,6 +53,8 @@ class Room { RoomInfoData room_info_; std::shared_ptr room_handle_; std::unique_ptr local_participant_; + std::unordered_map> + remote_participants_; void OnEvent(const proto::FfiEvent &event); }; diff --git a/include/livekit/track_publication.h b/include/livekit/track_publication.h index 9536437..8503188 100644 --- a/include/livekit/track_publication.h +++ b/include/livekit/track_publication.h @@ -22,7 +22,7 @@ #include #include "livekit/ffi_handle.h" -#include "livekit/track.h" // TrackKind, TrackSource, AudioTrackFeature +#include "livekit/track.h" namespace livekit { diff --git a/include/livekit/video_frame.h b/include/livekit/video_frame.h index 3ba4f16..a43dc21 100644 --- a/include/livekit/video_frame.h +++ b/include/livekit/video_frame.h @@ -44,6 +44,10 @@ struct VideoPlaneInfo { std::uint32_t size; // plane size in bytes }; +namespace proto { +class OwnedVideoBuffer; +} + /** * Public SDK representation of a video frame. * @@ -63,16 +67,6 @@ class LKVideoFrame { LKVideoFrame(LKVideoFrame &&) noexcept = default; LKVideoFrame &operator=(LKVideoFrame &&) noexcept = default; - /* LKVideoFrame(LKVideoFrame&& other) noexcept - : width_(other.width_), - height_(other.height_), - type_(other.type_), - data_(std::move(other.data_)) { - other.width_ = 0; - other.height_ = 0; - } - LKVideoFrame& operator=(LKVideoFrame&& other) noexcept;*/ - /** * Allocate a new frame with the correct buffer size for the given format. * Data is zero-initialized. @@ -123,6 +117,12 @@ class LKVideoFrame { */ LKVideoFrame convert(VideoBufferType dst, bool flip_y = false) const; +protected: + friend class VideoStream; + // Only internal classes (e.g., VideoStream) + // should construct frames directly from FFI buffers. + static LKVideoFrame fromOwnedInfo(const proto::OwnedVideoBuffer &owned); + private: int width_; int height_; diff --git a/include/livekit/video_stream.h b/include/livekit/video_stream.h new file mode 100644 index 0000000..322ab1d --- /dev/null +++ b/include/livekit/video_stream.h @@ -0,0 +1,106 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "ffi_handle.h" +#include "participant.h" +#include "track.h" +#include "video_frame.h" +#include "video_source.h" + +namespace livekit { + +// C++ equivalent of Python VideoFrameEvent +struct VideoFrameEvent { + LKVideoFrame frame; + std::int64_t timestamp_us; + VideoRotation rotation; +}; + +namespace proto { +class FfiEvent; +} + +class VideoStream { +public: + struct Options { + std::size_t capacity{0}; // 0 = unbounded + VideoBufferType format; + }; + + // Factory: create a VideoStream bound to a specific Track + static std::unique_ptr + fromTrack(const std::shared_ptr &track, const Options &options); + + // Factory: create a VideoStream from a Participant + TrackSource + static std::unique_ptr fromParticipant(Participant &participant, + TrackSource track_source, + const Options &options); + + ~VideoStream(); + + VideoStream(const VideoStream &) = delete; + VideoStream &operator=(const VideoStream &) = delete; + VideoStream(VideoStream &&) noexcept; + VideoStream &operator=(VideoStream &&) noexcept; + + /// Blocking read: returns true if a frame was delivered, + /// false if the stream has ended (EOS or closed). + bool read(VideoFrameEvent &out); + + /// Signal that we are no longer interested in frames. + /// Disposes the underlying FFI stream and drains internal listener. + void close(); + +private: + VideoStream() = default; + + // Internal init helpers, used by the factories + void initFromTrack(const std::shared_ptr &track, + const Options &options); + void initFromParticipant(Participant &participant, TrackSource source, + const Options &options); + + // FFI event handler (registered with FfiClient) + void onFfiEvent(const proto::FfiEvent &event); + + // Queue helpers + void pushFrame(VideoFrameEvent &&ev); + void pushEos(); + + mutable std::mutex mutex_; + std::condition_variable cv_; + std::deque queue_; + std::size_t capacity_{0}; + bool eof_{false}; + bool closed_{false}; + + // Underlying FFI handle for the video stream + FfiHandle stream_handle_; + + // Listener id registered on FfiClient + std::int64_t listener_id_{0}; +}; + +} // namespace livekit diff --git a/src/audio_stream.cpp b/src/audio_stream.cpp new file mode 100644 index 0000000..975908f --- /dev/null +++ b/src/audio_stream.cpp @@ -0,0 +1,254 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/audio_stream.h" + +#include + +#include "audio_frame.pb.h" +#include "ffi.pb.h" +#include "livekit/ffi_client.h" +#include "livekit/track.h" + +namespace livekit { + +using proto::FfiEvent; +using proto::FfiRequest; + +// ------------------------ +// Factory helpers +// ------------------------ + +std::unique_ptr +AudioStream::from_track(const std::shared_ptr &track, + const Options &options) { + auto stream = std::unique_ptr(new AudioStream()); + stream->init_from_track(track, options); + return stream; +} + +std::unique_ptr +AudioStream::from_participant(Participant &participant, + TrackSource track_source, + const Options &options) { + auto stream = std::unique_ptr(new AudioStream()); + stream->init_from_participant(participant, track_source, options); + return stream; +} + +// ------------------------ +// Destructor / move +// ------------------------ + +AudioStream::~AudioStream() { close(); } + +AudioStream::AudioStream(AudioStream &&other) noexcept { + std::lock_guard lock(other.mutex_); + queue_ = std::move(other.queue_); + capacity_ = other.capacity_; + eof_ = other.eof_; + closed_ = other.closed_; + options_ = other.options_; + stream_handle_ = std::move(other.stream_handle_); + listener_id_ = other.listener_id_; + + other.listener_id_ = 0; + other.closed_ = true; +} + +AudioStream &AudioStream::operator=(AudioStream &&other) noexcept { + if (this == &other) { + return *this; + } + + close(); + + { + std::lock_guard lock_this(mutex_); + std::lock_guard lock_other(other.mutex_); + + queue_ = std::move(other.queue_); + capacity_ = other.capacity_; + eof_ = other.eof_; + closed_ = other.closed_; + options_ = other.options_; + stream_handle_ = std::move(other.stream_handle_); + listener_id_ = other.listener_id_; + + other.listener_id_ = 0; + other.closed_ = true; + } + + return *this; +} + +bool AudioStream::read(AudioFrameEvent &out_event) { + std::unique_lock lock(mutex_); + + cv_.wait(lock, [this] { return !queue_.empty() || eof_ || closed_; }); + + if (closed_ || (queue_.empty() && eof_)) { + return false; // EOS / closed + } + + out_event = std::move(queue_.front()); + queue_.pop_front(); + return true; +} + +void AudioStream::close() { + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + closed_ = true; + } + + // Dispose FFI handle + if (stream_handle_.get() != 0) { + stream_handle_.reset(); + } + + // Remove listener + if (listener_id_ != 0) { + FfiClient::instance().RemoveListener(listener_id_); + listener_id_ = 0; + } + + // Wake any waiting readers + cv_.notify_all(); +} + +// Internal functions + +void AudioStream::init_from_track(const std::shared_ptr &track, + const Options &options) { + capacity_ = options.capacity; + options_ = options; + + // 1) Subscribe to FFI events + listener_id_ = FfiClient::instance().AddListener( + [this](const FfiEvent &e) { this->on_ffi_event(e); }); + + // 2) Send FfiRequest to create a new audio stream bound to this track + FfiRequest req; + auto *new_audio_stream = req.mutable_new_audio_stream(); + new_audio_stream->set_track_handle( + static_cast(track->ffi_handle_id())); + new_audio_stream->set_sample_rate(options_.sample_rate); + new_audio_stream->set_num_channels(options.num_channels); + new_audio_stream->set_type(proto::AudioStreamType::AUDIO_STREAM_NATIVE); + + if (!options_.noise_cancellation_module.empty()) { + new_audio_stream->set_audio_filter_module_id( + options_.noise_cancellation_module); + // Always set options JSON even if empty — backend will treat empty string + // as “no options” + new_audio_stream->set_audio_filter_options( + options_.noise_cancellation_options_json); + } + + auto resp = FfiClient::instance().sendRequest(req); + const auto &stream = resp.new_audio_stream().stream(); + stream_handle_ = FfiHandle(static_cast(stream.handle().id())); +} + +void AudioStream::init_from_participant(Participant &participant, + TrackSource track_source, + const Options &options) { + capacity_ = options.capacity; + options_ = options; + + // 1) Subscribe to FFI events + listener_id_ = FfiClient::instance().AddListener( + [this](const FfiEvent &e) { this->on_ffi_event(e); }); + + // 2) Send FfiRequest to create audio stream from participant + track source + FfiRequest req; + auto *as = req.mutable_audio_stream_from_participant(); + as->set_participant_handle(participant.ffiHandleId()); + as->set_sample_rate(options_.sample_rate); + as->set_num_channels(options_.num_channels); + as->set_type(proto::AudioStreamType::AUDIO_STREAM_NATIVE); + as->set_track_source(static_cast(track_source)); + + if (!options_.noise_cancellation_module.empty()) { + as->set_audio_filter_module_id(options_.noise_cancellation_module); + // Always set options JSON even if empty — backend will treat empty string + // as “no options” + as->set_audio_filter_options(options_.noise_cancellation_options_json); + } + + auto resp = FfiClient::instance().sendRequest(req); + const auto &stream = resp.audio_stream_from_participant().stream(); + stream_handle_ = FfiHandle(static_cast(stream.handle().id())); +} + +void AudioStream::on_ffi_event(const FfiEvent &event) { + if (event.message_case() != FfiEvent::kAudioStreamEvent) { + return; + } + + const auto &ase = event.audio_stream_event(); + // Check if this event is for our stream handle. + if (ase.stream_handle() != static_cast(stream_handle_.get())) { + return; + } + if (ase.has_frame_received()) { + const auto &fr = ase.frame_received(); + + // Convert owned buffer -> AudioFrame via helper. + // Implement AudioFrame::fromOwnedInfo(...) to mirror Python's + // AudioFrame._from_owned_info. + AudioFrame frame = AudioFrame::fromOwnedInfo(fr.frame()); + AudioFrameEvent ev{std::move(frame)}; + push_frame(std::move(ev)); + } else if (ase.has_eos()) { + push_eos(); + } +} + +void AudioStream::push_frame(AudioFrameEvent &&ev) { + { + std::lock_guard lock(mutex_); + + if (closed_ || eof_) { + return; + } + + if (capacity_ > 0 && queue_.size() >= capacity_) { + // Ring behavior: drop oldest frame when full. + queue_.pop_front(); + } + + queue_.push_back(std::move(ev)); + } + cv_.notify_one(); +} + +void AudioStream::push_eos() { + { + std::lock_guard lock(mutex_); + if (eof_) { + return; + } + eof_ = true; + } + cv_.notify_all(); +} + +} // namespace livekit diff --git a/src/remote_participant.cpp b/src/remote_participant.cpp new file mode 100644 index 0000000..38def8e --- /dev/null +++ b/src/remote_participant.cpp @@ -0,0 +1,48 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/remote_participant.h" + +#include +#include +#include + +namespace livekit { + +RemoteParticipant::RemoteParticipant( + FfiHandle handle, std::string sid, std::string name, std::string identity, + std::string metadata, + std::unordered_map attributes, + ParticipantKind kind, DisconnectReason reason) + : Participant(std::move(handle), std::move(sid), std::move(name), + std::move(identity), std::move(metadata), + std::move(attributes), kind, reason), + track_publications_() {} + +std::string RemoteParticipant::to_string() const { + std::ostringstream oss; + oss << "rtc.RemoteParticipant(sid=" << sid() << ", identity=" << identity() + << ", name=" << name() << ")"; + return oss.str(); +} + +std::ostream &operator<<(std::ostream &os, + const RemoteParticipant &participant) { + os << participant.to_string(); + return os; +} + +} // namespace livekit diff --git a/src/room.cpp b/src/room.cpp index 4cfa3c0..9f56cdb 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -18,6 +18,9 @@ #include "livekit/ffi_client.h" #include "livekit/local_participant.h" +#include "livekit/local_track_publication.h" +#include "livekit/remote_participant.h" +#include "livekit/remote_track_publication.h" #include "livekit/room_delegate.h" #include "ffi.pb.h" @@ -36,6 +39,25 @@ using proto::FfiRequest; using proto::FfiResponse; using proto::RoomOptions; +namespace { + +std::unique_ptr +createRemoteParticipant(const proto::OwnedParticipant &owned) { + const auto &pinfo = owned.info(); + std::unordered_map attrs; + attrs.reserve(pinfo.attributes_size()); + for (const auto &kv : pinfo.attributes()) { + attrs.emplace(kv.first, kv.second); + } + auto kind = livekit::fromProto(pinfo.kind()); + auto reason = livekit::toDisconnectReason(pinfo.disconnect_reason()); + livekit::FfiHandle handle(static_cast(owned.handle().id())); + return std::make_unique( + std::move(handle), pinfo.sid(), pinfo.name(), pinfo.identity(), + pinfo.metadata(), std::move(attrs), kind, reason); +} + +} // namespace Room::Room() {} Room::~Room() {} @@ -87,10 +109,25 @@ bool Room::Connect(const std::string &url, const std::string &token) { std::move(participant_handle), pinfo.sid(), pinfo.name(), pinfo.identity(), pinfo.metadata(), std::move(attrs), kind, reason); } - // Setup remote particpants + // Setup remote participants { - // TODO, implement this remote participant feature + const auto &participants = connectCb.result().participants(); + std::lock_guard g(lock_); + for (const auto &pt : participants) { + const auto &owned = pt.participant(); + auto rp = createRemoteParticipant(owned); + // Add the initial remote participant tracks (like Python does) + for (const auto &owned_publication_info : pt.publications()) { + auto publication = + std::make_shared(owned_publication_info); + rp->mutable_track_publications().emplace(publication->sid(), + std::move(publication)); + } + + remote_participants_.emplace(rp->identity(), std::move(rp)); + } } + return true; } catch (const std::exception &e) { // On error, remove the listener and rethrow @@ -110,6 +147,12 @@ LocalParticipant *Room::local_participant() const { return local_participant_.get(); } +RemoteParticipant *Room::remote_participant(const std::string &identity) const { + std::lock_guard g(lock_); + auto it = remote_participants_.find(identity); + return it == remote_participants_.end() ? nullptr : it->second.get(); +} + void Room::OnEvent(const FfiEvent &event) { // Take a snapshot of the delegate under lock, but do NOT call it under the // lock. @@ -136,11 +179,37 @@ void Room::OnEvent(const FfiEvent &event) { switch (re.message_case()) { case proto::RoomEvent::kParticipantConnected: { auto ev = fromProto(re.participant_connected()); + std::cout << "kParticipantConnected " << std::endl; + // Create and register RemoteParticipant + { + std::lock_guard guard(lock_); + auto rp = createRemoteParticipant(re.participant_connected().info()); + remote_participants_.emplace(rp->identity(), std::move(rp)); + } + // TODO, use better public callback events delegate_snapshot->onParticipantConnected(*this, ev); + break; } case proto::RoomEvent::kParticipantDisconnected: { auto ev = fromProto(re.participant_disconnected()); + { + std::lock_guard guard(lock_); + const auto &pd = re.participant_disconnected(); + const std::string &identity = pd.participant_identity(); + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + remote_participants_.erase(it); + } else { + // We saw a disconnect event for a participant we don't track + // internally. This can happen on races or if we never created a + // RemoteParticipant + std::cerr << "participant_disconnected for unknown identity: " + << identity << std::endl; + } + } + // TODO, should we trigger onParticipantDisconnected if remote + // participants can't be found ? delegate_snapshot->onParticipantDisconnected(*this, ev); break; } @@ -161,6 +230,28 @@ void Room::OnEvent(const FfiEvent &event) { } case proto::RoomEvent::kTrackPublished: { auto ev = fromProto(re.track_published()); + { + std::lock_guard guard(lock_); + const auto &tp = re.track_published(); + const std::string &identity = tp.participant_identity(); + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + RemoteParticipant *rparticipant = it->second.get(); + const auto &owned_publication = tp.publication(); + auto rpublication = + std::make_shared(owned_publication); + // Store it on the participant, keyed by SID + rparticipant->mutable_track_publications().emplace( + rpublication->sid(), std::move(rpublication)); + + } else { + // Optional: log if we get a track for an unknown participant + std::cerr << "track_published for unknown participant: " << identity + << "\n"; + // Don't emit the + break; + } + } delegate_snapshot->onTrackPublished(*this, ev); break; } @@ -322,6 +413,46 @@ void Room::OnEvent(const FfiEvent &event) { } case proto::RoomEvent::kParticipantsUpdated: { auto ev = fromProto(re.participants_updated()); + { + std::lock_guard guard(lock_); + const auto &pu = re.participants_updated(); + for (const auto &info : pu.participants()) { + const std::string &identity = info.identity(); + Participant *participant = nullptr; + // First, check local participant. + if (local_participant_ && + identity == local_participant_->identity()) { + participant = local_participant_.get(); + } else { + // Otherwise, look for a remote participant. + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + participant = it->second.get(); + } + } + + if (!participant) { + // Participant might not exist yet; ignore for now. + std::cerr << "Room::RoomEvent::kParticipantsUpdated participant " + "does not exist: " + << identity << std::endl; + continue; + } + + // Update basic fields + participant->set_name(info.name()); + participant->set_metadata(info.metadata()); + std::unordered_map attrs; + attrs.reserve(info.attributes_size()); + for (const auto &kv : info.attributes()) { + attrs.emplace(kv.first, kv.second); + } + participant->set_attributes(std::move(attrs)); + participant->set_kind(fromProto(info.kind())); + participant->set_disconnect_reason( + toDisconnectReason(info.disconnect_reason())); + } + } delegate_snapshot->onParticipantsUpdated(*this, ev); break; } diff --git a/src/room_proto_converter.cpp b/src/room_proto_converter.cpp index 1df125c..5423f07 100644 --- a/src/room_proto_converter.cpp +++ b/src/room_proto_converter.cpp @@ -215,12 +215,12 @@ DataStreamTrailerData fromProto(const proto::DataStream_Trailer &in) { // --------- event conversions --------- -ParticipantConnectedEvent -fromProto(const proto::ParticipantConnected & /*in*/) { +ParticipantConnectedEvent fromProto(const proto::ParticipantConnected &in) { ParticipantConnectedEvent ev; - // in.info() is OwnedParticipant; you can fill more fields once you inspect - // it. For now, leave metadata/name/identity as TODO. - // TODO: map in.info().info().identity(), name(), metadata(), etc. + const auto &pinfo = in.info().info(); + ev.identity = pinfo.identity(); + ev.name = pinfo.name(); + ev.metadata = pinfo.metadata(); return ev; } diff --git a/src/video_frame.cpp b/src/video_frame.cpp index badce04..a56e7f2 100644 --- a/src/video_frame.cpp +++ b/src/video_frame.cpp @@ -6,6 +6,7 @@ #include #include +#include "livekit/ffi_handle.h" #include "video_utils.h" namespace livekit { @@ -271,37 +272,7 @@ LKVideoFrame::LKVideoFrame(int width, int height, VideoBufferType type, throw std::invalid_argument("LKVideoFrame: provided data is too small for " "the specified format and size"); } - std::cout << "width_ is " << width_ << std::endl; - std::cout << "height_ is " << height_ << std::endl; } -/* -LKVideoFrame& LKVideoFrame::operator=(LKVideoFrame&& other) noexcept { - // 1. Self-assignment check - if (this == &other) { - return *this; - } - - // 2. Resource cleanup (The std::vector handles its own memory cleanup, - // but the simple members must be transferred.) - - // 3. Transfer simple members (width, height, type) - width_ = other.width_; - height_ = other.height_; - type_ = other.type_; - - // 4. Transfer complex resource (std::vector) - // std::move() is used to invoke std::vector's move assignment operator - data_ = std::move(other.data_); - - // 5. Optional: Reset the 'other' object to a valid but empty/default state. - // This is good practice for the object that has been moved-from. - other.width_ = 0; - other.height_ = 0; - // other.data_ is already empty after the move assignment - - // 6. Return reference to the assigned object - return *this; -}*/ LKVideoFrame LKVideoFrame::create(int width, int height, VideoBufferType type) { const std::size_t size = computeBufferSize(width, height, type); @@ -335,4 +306,58 @@ LKVideoFrame LKVideoFrame::convert(VideoBufferType dst, bool flip_y) const { return convertViaFfi(*this, dst, flip_y); } +LKVideoFrame LKVideoFrame::fromOwnedInfo(const proto::OwnedVideoBuffer &owned) { + const auto &info = owned.info(); + const int width = static_cast(info.width()); + const int height = static_cast(info.height()); + // Assuming your C++ enum matches proto's underlying values. + const VideoBufferType type = static_cast(info.type()); + + std::vector buffer; + + if (info.components_size() > 0) { + // Multi-plane (e.g. I420, NV12, etc.). We pack planes back-to-back. + std::size_t total_size = 0; + for (const auto &comp : info.components()) { + total_size += static_cast(comp.size()); + } + + buffer.resize(total_size); + std::size_t offset = 0; + for (const auto &comp : info.components()) { + const auto sz = static_cast(comp.size()); + const auto src_ptr = reinterpret_cast( + static_cast(comp.data_ptr())); + + std::memcpy(buffer.data() + offset, src_ptr, sz); + offset += sz; + } + } else { + // Packed format: treat top-level data_ptr as a single contiguous buffer. + const auto src_ptr = reinterpret_cast( + static_cast(info.data_ptr())); + + std::size_t total_size = 0; + if (info.has_stride()) { + // Use stride * height as total size (includes per-row padding if any). + total_size = static_cast(info.stride()) * + static_cast(height); + } else { + // Use our generic buffer-size helper (width/height/type). + total_size = computeBufferSize(width, height, type); + } + + buffer.resize(total_size); + std::memcpy(buffer.data(), src_ptr, total_size); + } + + // Release the FFI-owned buffer after copying the data. + { + FfiHandle owned_handle(static_cast(owned.handle().id())); + // owned_handle destroyed at end of scope → native buffer disposed. + } + + return LKVideoFrame(width, height, type, std::move(buffer)); +} + } // namespace livekit diff --git a/src/video_stream.cpp b/src/video_stream.cpp new file mode 100644 index 0000000..19b1903 --- /dev/null +++ b/src/video_stream.cpp @@ -0,0 +1,234 @@ +#include "livekit/video_stream.h" + +#include + +#include "ffi.pb.h" +#include "livekit/ffi_client.h" +#include "livekit/track.h" +#include "video_frame.pb.h" +#include "video_utils.h" + +namespace livekit { + +using proto::FfiEvent; +using proto::FfiRequest; +using proto::VideoStreamEvent; + +// ------------------------ +// Factory helpers +// ------------------------ + +std::unique_ptr +VideoStream::fromTrack(const std::shared_ptr &track, + const Options &options) { + auto stream = std::unique_ptr(new VideoStream()); + stream->initFromTrack(track, options); + return stream; +} + +std::unique_ptr +VideoStream::fromParticipant(Participant &participant, TrackSource track_source, + const Options &options) { + auto stream = std::unique_ptr(new VideoStream()); + stream->initFromParticipant(participant, track_source, options); + return stream; +} + +// ------------------------ +// Destructor / move +// ------------------------ + +VideoStream::~VideoStream() { close(); } + +VideoStream::VideoStream(VideoStream &&other) noexcept { + std::lock_guard lock(other.mutex_); + queue_ = std::move(other.queue_); + capacity_ = other.capacity_; + eof_ = other.eof_; + closed_ = other.closed_; + stream_handle_ = std::move(other.stream_handle_); + listener_id_ = other.listener_id_; + + other.listener_id_ = 0; + other.closed_ = true; +} + +VideoStream &VideoStream::operator=(VideoStream &&other) noexcept { + if (this == &other) + return *this; + + close(); + + { + std::lock_guard lock_this(mutex_); + std::lock_guard lock_other(other.mutex_); + + queue_ = std::move(other.queue_); + capacity_ = other.capacity_; + eof_ = other.eof_; + closed_ = other.closed_; + stream_handle_ = std::move(other.stream_handle_); + listener_id_ = other.listener_id_; + + other.listener_id_ = 0; + other.closed_ = true; + } + + return *this; +} + +// ------------------------ +// Init internals +// ------------------------ + +void VideoStream::initFromTrack(const std::shared_ptr &track, + const Options &options) { + capacity_ = options.capacity; + + // 1) Subscribe to FFI events + listener_id_ = FfiClient::instance().AddListener( + [this](const proto::FfiEvent &e) { this->onFfiEvent(e); }); + + // 2) Send FFI request to create a new video stream bound to this track + FfiRequest req; + auto *new_video_stream = req.mutable_new_video_stream(); + new_video_stream->set_track_handle(track->ffi_handle_id()); + new_video_stream->set_type(proto::VideoStreamType::VIDEO_STREAM_NATIVE); + new_video_stream->set_normalize_stride(true); + new_video_stream->set_format(toProto(options.format)); + + auto resp = FfiClient::instance().sendRequest(req); + // Adjust field names to match your proto exactly: + const auto &stream = resp.new_video_stream().stream(); + stream_handle_ = FfiHandle(static_cast(stream.handle().id())); + // stream.info() is available if you want to cache metadata. +} + +void VideoStream::initFromParticipant(Participant &participant, + TrackSource track_source, + const Options &options) { + capacity_ = options.capacity; + + // 1) Subscribe to FFI events + listener_id_ = FfiClient::instance().AddListener( + [this](const FfiEvent &e) { this->onFfiEvent(e); }); + + // 2) Send FFI request to create a video stream from participant + track + // source + FfiRequest req; + auto *vs = req.mutable_video_stream_from_participant(); + vs->set_participant_handle(participant.ffiHandleId()); + vs->set_type(proto::VideoStreamType::VIDEO_STREAM_NATIVE); + vs->set_track_source(static_cast(track_source)); + vs->set_normalize_stride(true); + vs->set_format(toProto(options.format)); + + auto resp = FfiClient::instance().sendRequest(req); + // Adjust field names to match your proto exactly: + const auto &stream = resp.video_stream_from_participant().stream(); + stream_handle_ = FfiHandle(static_cast(stream.handle().id())); +} + +// ------------------------ +// Public API +// ------------------------ + +bool VideoStream::read(VideoFrameEvent &out) { + std::unique_lock lock(mutex_); + + cv_.wait(lock, [this] { return !queue_.empty() || eof_ || closed_; }); + + if (closed_ || (queue_.empty() && eof_)) { + return false; // EOS / closed + } + + out = std::move(queue_.front()); + queue_.pop_front(); + return true; +} + +void VideoStream::close() { + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + closed_ = true; + } + + // Dispose FFI handle + if (stream_handle_.get() != 0) { + stream_handle_.reset(); + } + + // Remove listener + if (listener_id_ != 0) { + FfiClient::instance().RemoveListener(listener_id_); + listener_id_ = 0; + } + + // Wake any waiting readers + cv_.notify_all(); +} + +// ------------------------ +// Internal helpers +// ------------------------ + +void VideoStream::onFfiEvent(const proto::FfiEvent &event) { + // Filter for video_stream_event first. + if (event.message_case() != FfiEvent::kVideoStreamEvent) { + return; + } + const auto &vse = event.video_stream_event(); + // Check if this event is for our stream handle. + if (vse.stream_handle() != static_cast(stream_handle_.get())) { + return; + } + // Handle frame_received or eos. + if (vse.has_frame_received()) { + const auto &fr = vse.frame_received(); + + // Convert owned buffer->VideoFrame via a helper. + // You should implement this static function in your VideoFrame class. + LKVideoFrame frame = LKVideoFrame::fromOwnedInfo(fr.buffer()); + + VideoFrameEvent ev{std::move(frame), fr.timestamp_us(), + static_cast(fr.rotation())}; + + pushFrame(std::move(ev)); + } else if (vse.has_eos()) { + pushEos(); + } +} + +void VideoStream::pushFrame(VideoFrameEvent &&ev) { + { + std::lock_guard lock(mutex_); + + if (closed_ || eof_) { + return; + } + + if (capacity_ > 0 && queue_.size() >= capacity_) { + // Ring behavior: drop oldest frame. + queue_.pop_front(); + } + + queue_.push_back(std::move(ev)); + } + cv_.notify_one(); +} + +void VideoStream::pushEos() { + { + std::lock_guard lock(mutex_); + if (eof_) { + return; + } + eof_ = true; + } + cv_.notify_all(); +} + +} // namespace livekit diff --git a/src/video_utils.cpp b/src/video_utils.cpp index be1470f..4f4c5c0 100644 --- a/src/video_utils.cpp +++ b/src/video_utils.cpp @@ -11,10 +11,7 @@ namespace livekit { -namespace { - -// Map SDK enum -> proto enum -proto::VideoBufferType toProtoBufferType(VideoBufferType t) { +proto::VideoBufferType toProto(VideoBufferType t) { switch (t) { case VideoBufferType::ARGB: return proto::VideoBufferType::ARGB; @@ -39,12 +36,12 @@ proto::VideoBufferType toProtoBufferType(VideoBufferType t) { case VideoBufferType::NV12: return proto::VideoBufferType::NV12; default: - throw std::runtime_error("Unknown VideoBufferType in toProtoBufferType"); + throw std::runtime_error("Unknown VideoBufferType in toProto"); } } // Map proto enum -> SDK enum -VideoBufferType fromProtoBufferType(proto::VideoBufferType t) { +VideoBufferType fromProto(proto::VideoBufferType t) { switch (t) { case proto::VideoBufferType::ARGB: return VideoBufferType::ARGB; @@ -69,13 +66,10 @@ VideoBufferType fromProtoBufferType(proto::VideoBufferType t) { case proto::VideoBufferType::NV12: return VideoBufferType::NV12; default: - throw std::runtime_error( - "Unknown proto::VideoBufferType in fromProtoBufferType"); + throw std::runtime_error("Unknown proto::VideoBufferType in fromProto"); } } -} // namespace - proto::VideoBufferInfo toProto(const LKVideoFrame &frame) { proto::VideoBufferInfo info; @@ -83,7 +77,7 @@ proto::VideoBufferInfo toProto(const LKVideoFrame &frame) { const int h = frame.height(); info.set_width(w); info.set_height(h); - info.set_type(toProtoBufferType(frame.type())); + info.set_type(toProto(frame.type())); // Backing data pointer for the whole buffer auto base_ptr = reinterpret_cast(frame.data()); @@ -123,7 +117,7 @@ LKVideoFrame fromOwnedProto(const proto::OwnedVideoBuffer &owned) { const int width = static_cast(info.width()); const int height = static_cast(info.height()); - const VideoBufferType type = fromProtoBufferType(info.type()); + const VideoBufferType type = fromProto(info.type()); // Allocate a new LKVideoFrame with the correct size/format LKVideoFrame frame = LKVideoFrame::create(width, height, type); @@ -154,7 +148,7 @@ LKVideoFrame convertViaFfi(const LKVideoFrame &frame, VideoBufferType dst, proto::FfiRequest req; auto *vc = req.mutable_video_convert(); vc->set_flip_y(flip_y); - vc->set_dst_type(toProtoBufferType(dst)); + vc->set_dst_type(toProto(dst)); vc->mutable_buffer()->CopyFrom(toProto(frame)); proto::FfiResponse resp = FfiClient::instance().sendRequest(req); diff --git a/src/video_utils.h b/src/video_utils.h index 5ece8f2..733e4a5 100644 --- a/src/video_utils.h +++ b/src/video_utils.h @@ -26,5 +26,7 @@ proto::VideoBufferInfo toProto(const LKVideoFrame &frame); LKVideoFrame fromOwnedProto(const proto::OwnedVideoBuffer &owned); LKVideoFrame convertViaFfi(const LKVideoFrame &frame, VideoBufferType dst, bool flip_y); +proto::VideoBufferType toProto(VideoBufferType t); +VideoBufferType fromProto(proto::VideoBufferType t); } // namespace livekit