Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ jobs:
sudo apt-get update
sudo apt-get install -y cmake ninja-build build-essential \
protobuf-compiler libprotobuf-dev libabsl-dev \
libx11-dev libxext-dev libgl1-mesa-dev libssl-dev
libx11-dev libxext-dev libgl1-mesa-dev libssl-dev \
libxext-dev libxcomposite-dev libxdamage-dev libxfixes-dev \
libxrandr-dev libxi-dev libxkbcommon-dev \
libdrm-dev libgbm-dev libva-dev \
libasound2-dev libpulse-dev
protoc --version
pkg-config --modversion protobuf
# Fail if versions don't match (best-effort)
Expand Down
20 changes: 16 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ set(FFI_PROTO_FILES
${FFI_PROTO_DIR}/track.proto
${FFI_PROTO_DIR}/video_frame.proto
${FFI_PROTO_DIR}/audio_frame.proto
${FFI_PROTO_DIR}/e2ee.proto
${FFI_PROTO_DIR}/stats.proto
${FFI_PROTO_DIR}/data_stream.proto
${FFI_PROTO_DIR}/rpc.proto
${FFI_PROTO_DIR}/track_publication.proto
)
set(PROTO_BINARY_DIR ${CMAKE_BINARY_DIR}/generated)
file(MAKE_DIRECTORY ${PROTO_BINARY_DIR})
Expand All @@ -25,11 +30,11 @@ find_package(absl CONFIG REQUIRED)

# Object library that owns generated .pb.cc/.pb.h
add_library(livekit_proto OBJECT ${FFI_PROTO_FILES})
target_include_directories(livekit_proto PUBLIC
target_include_directories(livekit_proto PRIVATE
"$<BUILD_INTERFACE:${PROTO_BINARY_DIR}>"
${Protobuf_INCLUDE_DIRS}
)
target_link_libraries(livekit_proto PUBLIC protobuf::libprotobuf)
target_link_libraries(livekit_proto PRIVATE protobuf::libprotobuf)

# Generate .pb sources into ${PROTO_BINARY_DIR} and attach to livekit_proto
protobuf_generate(
Expand Down Expand Up @@ -129,6 +134,11 @@ add_dependencies(livekit build_rust_ffi)
########### Platform specific settings ###########

if(APPLE)
# (12.3 is when ScreenCaptureKit was introduced)
if(NOT DEFINED CMAKE_OSX_DEPLOYMENT_TARGET OR CMAKE_OSX_DEPLOYMENT_TARGET VERSION_LESS "12.3")
set(CMAKE_OSX_DEPLOYMENT_TARGET "12.3" CACHE STRING "Minimum macOS version" FORCE)
endif()

find_library(FW_COREAUDIO CoreAudio REQUIRED)
find_library(FW_AUDIOTOOLBOX AudioToolbox REQUIRED)
find_library(FW_COREFOUNDATION CoreFoundation REQUIRED)
Expand All @@ -142,9 +152,10 @@ if(APPLE)
find_library(FW_APPKIT AppKit REQUIRED)
find_library(FW_QUARTZCORE QuartzCore REQUIRED)
find_library(FW_OPENGL OpenGL REQUIRED)
find_library(FW_IOSURFACE IOSurface REQUIRED)
find_library(FW_METAL Metal REQUIRED)
find_library(FW_IOSURFACE IOSurface REQUIRED)
find_library(FW_METAL Metal REQUIRED)
find_library(FW_METALKIT MetalKit REQUIRED)
find_library(FW_SCREENCAPTUREKIT ScreenCaptureKit REQUIRED)

target_link_libraries(livekit PUBLIC
${FW_COREAUDIO}
Expand All @@ -163,6 +174,7 @@ if(APPLE)
${FW_IOSURFACE}
${FW_METAL}
${FW_METALKIT}
${FW_SCREENCAPTUREKIT}
)

# Ensure Objective-C categories/classes in static archives are loaded (WebRTC needs this)
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,35 @@ brew install cmake protobuf rust
sudo apt update
sudo apt install -y cmake protobuf-compiler build-essential
curl https://sh.rustup.rs -sSf | sh
```

## 🛠️ Development Tips
### Update Rust version
```bash
git fetch origin
git switch -c try-rust-main origin/main

# Sync submodule URLs and check out what origin/main pins (recursively):
git submodule sync --recursive
git submodule update --init --recursive --checkout

# Now, in case the nested submodule under yuv-sys didn’t materialize, force it explicitly:
git -C client-sdk-rust/yuv-sys submodule sync --recursive
git -C client-sdk-rust/yuv-sys submodule update --init --recursive --checkout

# Sanity check:
git submodule status --recursive
```

### If yuv-sys fails to build
```bash
cargo clean -p yuv-sys
cargo build -p yuv-sys -vv
```

### Full clean (Rust + C++ build folders)

In some cases, you may need to perform a full clean that deletes all build artifacts from both the Rust and C++ folders:
```bash
./build.sh clean-all
```
2 changes: 1 addition & 1 deletion client-sdk-rust
13 changes: 10 additions & 3 deletions include/livekit/ffi_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,22 @@

namespace livekit
{
using FfiCallbackFn = void(*)(const uint8_t*, size_t);
extern "C" void livekit_ffi_initialize(FfiCallbackFn cb,
bool capture_logs,
const char* sdk,
const char* sdk_version);

extern "C" void LivekitFfiCallback(const uint8_t *buf, size_t len);


// The FfiClient is used to communicate with the FFI interface of the Rust SDK
// We use the generated protocol messages to facilitate the communication
class FfiClient
{
public:
using ListenerId = int;
using Listener = std::function<void(const FFIEvent&)>;
using Listener = std::function<void(const proto::FfiEvent&)>;

FfiClient(const FfiClient&) = delete;
FfiClient& operator=(const FfiClient&) = delete;
Expand All @@ -48,7 +55,7 @@ namespace livekit
ListenerId AddListener(const Listener& listener);
void RemoveListener(ListenerId id);

FFIResponse SendRequest(const FFIRequest& request)const;
proto::FfiResponse SendRequest(const proto::FfiRequest& request)const;

private:
std::unordered_map<ListenerId, Listener> listeners_;
Expand All @@ -58,7 +65,7 @@ namespace livekit
FfiClient();
~FfiClient() = default;

void PushEvent(const FFIEvent& event) const;
void PushEvent(const proto::FfiEvent& event) const;
friend void LivekitFfiCallback(const uint8_t *buf, size_t len);
};

Expand Down
4 changes: 3 additions & 1 deletion include/livekit/room.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ namespace livekit
void Connect(const std::string& url, const std::string& token);

private:
void OnConnect(const proto::ConnectCallback& cb);

mutable std::mutex lock_;
FfiHandle handle_{INVALID_HANDLE};
bool connected_{false};
uint64_t connectAsyncId_{0};


void OnEvent(const FFIEvent& event);
void OnEvent(const proto::FfiEvent& event);
};
}

Expand Down
59 changes: 30 additions & 29 deletions src/ffi_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ namespace livekit
{

FfiClient::FfiClient() {
InitializeRequest *initRequest = new InitializeRequest;
initRequest->set_event_callback_ptr(reinterpret_cast<uint64_t>(&LivekitFfiCallback));

FFIRequest request{};
request.set_allocated_initialize(initRequest);
SendRequest(request);
livekit_ffi_initialize(&LivekitFfiCallback,
true,
"cpp",
"0.0.0-dev");
}

FfiClient::ListenerId FfiClient::AddListener(const FfiClient::Listener& listener) {
Expand All @@ -44,33 +42,36 @@ void FfiClient::RemoveListener(ListenerId id) {
listeners_.erase(id);
}

FFIResponse FfiClient::SendRequest(const FFIRequest &request) const {
size_t len = request.ByteSizeLong();
uint8_t *buf = new uint8_t[len];
assert(request.SerializeToArray(buf, len));

const uint8_t **res_ptr = new const uint8_t*;
size_t *res_len = new size_t;

FfiHandleId handle = livekit_ffi_request(buf, len, res_ptr, res_len);

delete[] buf;
if (handle == INVALID_HANDLE) {
delete res_ptr;
delete res_len;
proto::FfiResponse FfiClient::SendRequest(const proto::FfiRequest &request) const {
std::string bytes;
if (!request.SerializeToString(&bytes) || bytes.empty()) {
throw std::runtime_error("failed to serialize FfiRequest");
}
const uint8_t* resp_ptr = nullptr;
size_t resp_len = 0;
FfiHandleId handle = livekit_ffi_request(
reinterpret_cast<const uint8_t*>(bytes.data()),
bytes.size(), &resp_ptr, &resp_len);
std::cout << "receive a handle " << handle << std::endl;

if (handle == INVALID_HANDLE) {
throw std::runtime_error("failed to send request, received an invalid handle");
}
FfiHandle _handle(handle);

FFIResponse response;
assert(response.ParseFromArray(*res_ptr, *res_len));
delete res_ptr;
delete res_len;
// Ensure we drop the handle exactly once on all paths
FfiHandle handle_guard(static_cast<uintptr_t>(handle));
if (!resp_ptr || resp_len == 0) {
throw std::runtime_error("FFI returned empty response bytes");
}

proto::FfiResponse response;
if (!response.ParseFromArray(resp_ptr, static_cast<int>(resp_len))) {
throw std::runtime_error("failed to parse FfiResponse");
}
return response;
}

void FfiClient::PushEvent(const FFIEvent &event) const {
void FfiClient::PushEvent(const proto::FfiEvent &event) const {
// Dispatch the events to the internal listeners
std::lock_guard<std::mutex> guard(lock_);
for (auto& [_, listener] : listeners_) {
Expand All @@ -79,8 +80,8 @@ void FfiClient::PushEvent(const FFIEvent &event) const {
}

void LivekitFfiCallback(const uint8_t *buf, size_t len) {
FFIEvent event;
assert(event.ParseFromArray(buf, len));
proto::FfiEvent event;
event.ParseFromArray(buf, len);

FfiClient::getInstance().PushEvent(event);
}
Expand All @@ -91,7 +92,7 @@ FfiHandle::FfiHandle(uintptr_t id) : handle(id) {}

FfiHandle::~FfiHandle() {
if (handle != INVALID_HANDLE) {
assert(livekit_ffi_drop_handle(handle));
livekit_ffi_drop_handle(handle);
}
}

Expand Down
104 changes: 64 additions & 40 deletions src/room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,59 +25,83 @@
namespace livekit
{

void Room::Connect(const std::string& url, const std::string& token)
{
std::lock_guard<std::mutex> guard(lock_);
if (connected_) {
throw std::runtime_error("already connected");
}
using proto::FfiRequest;
using proto::FfiResponse;
using proto::ConnectRequest;
using proto::RoomOptions;
using proto::ConnectCallback;
using proto::FfiEvent;

connected_ = true;
void Room::Connect(const std::string& url, const std::string& token) {
// Register listener first (outside Room lock to avoid lock inversion)
auto listenerId = FfiClient::getInstance().AddListener(
std::bind(&Room::OnEvent, this, std::placeholders::_1));

RoomOptions *options = new RoomOptions;
options->set_auto_subscribe(true);

ConnectRequest *connectRequest = new ConnectRequest;
connectRequest->set_url(url);
connectRequest->set_token(token);
connectRequest->set_allocated_options(options);
// Build request without heap allocs
livekit::proto::FfiRequest req;
auto* connect = req.mutable_connect();
connect->set_url(url);
connect->set_token(token);
connect->mutable_options()->set_auto_subscribe(true);

FFIRequest request;
request.set_allocated_connect(connectRequest);

// TODO Free:
FfiClient::getInstance().AddListener(std::bind(&Room::OnEvent, this, std::placeholders::_1));

FFIResponse response = FfiClient::getInstance().SendRequest(request);
FFIAsyncId asyncId = response.connect().async_id();
// Mark “connecting” under lock, but DO NOT keep the lock across SendRequest
{
std::lock_guard<std::mutex> g(lock_);
if (connected_) {
FfiClient::getInstance().RemoveListener(listenerId);
throw std::runtime_error("already connected");
}
connectAsyncId_ = listenerId;
}

connectAsyncId_ = asyncId.id();
// Call into FFI with no Room lock held (avoid re-entrancy deadlock)
livekit::proto::FfiResponse resp = FfiClient::getInstance().SendRequest(req);
// Store async id under lock
{
std::lock_guard<std::mutex> g(lock_);
connectAsyncId_ = resp.connect().async_id();
}
}

void Room::OnEvent(const FFIEvent& event)
{
void Room::OnEvent(const FfiEvent& event) {
// TODO, it is not a good idea to lock all the callbacks, improve it.
std::lock_guard<std::mutex> guard(lock_);
if (!connected_) {
switch (event.message_case()) {
case FfiEvent::kConnect:
OnConnect(event.connect());
break;

// TODO: Handle other FfiEvent types here (e.g. room_event, track_event, etc.)
default:
break;
}
}

void Room::OnConnect(const ConnectCallback& cb) {
// Match the async_id with the pending connectAsyncId_
if (cb.async_id() != connectAsyncId_) {
return;
}

if (event.has_connect()) {
ConnectCallback connectCallback = event.connect();
if (connectCallback.async_id().id() != connectAsyncId_) {
return;
}

std::cout << "Received ConnectCallback" << std::endl;
std::cout << "Received ConnectCallback" << std::endl;

if (!connectCallback.has_error()) {
handle_ = FfiHandle(connectCallback.room().handle().id());
if (cb.message_case() == ConnectCallback::kError) {
std::cerr << "Failed to connect to room: " << cb.error() << std::endl;
connected_ = false;
return;
}

std::cout << "Connected to room" << std::endl;
std::cout << "Room SID: " << connectCallback.room().sid() << std::endl;
} else {
std::cerr << "Failed to connect to room: " << connectCallback.error() << std::endl;
}
// Success path
const auto& result = cb.result();
const auto& owned_room = result.room();
// OwnedRoom { FfiOwnedHandle handle = 1; RoomInfo info = 2; }
handle_ = FfiHandle(static_cast<uintptr_t>(owned_room.handle().id()));
if (owned_room.info().has_sid()) {
std::cout << "Room SID: " << owned_room.info().sid() << std::endl;
}

connected_ = true;
std::cout << "Connected to room" << std::endl;
}

}
Loading