From 4163172c36220bba665b7ab44f436be2837421b3 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Fri, 5 Dec 2025 11:36:40 -0800 Subject: [PATCH 1/8] RPC features --- CMakeLists.txt | 2 + README.md | 37 +- examples/CMakeLists.txt | 21 + examples/simple_room/main.cpp | 12 +- examples/simple_rpc/main.cpp | 572 ++++++++++++++++++++++++++++ include/livekit/ffi_client.h | 5 + include/livekit/local_participant.h | 95 ++++- include/livekit/rpc_error.h | 123 ++++++ src/ffi_client.cpp | 40 ++ src/local_participant.cpp | 99 +++++ src/room.cpp | 31 +- src/room_proto_converter.cpp | 1 + src/room_proto_converter.h | 4 + src/rpc_error.cpp | 91 +++++ 14 files changed, 1116 insertions(+), 17 deletions(-) create mode 100644 examples/simple_rpc/main.cpp create mode 100644 include/livekit/rpc_error.h create mode 100644 src/rpc_error.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 6baabde..473fd49 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -169,6 +169,7 @@ add_library(livekit include/livekit/local_participant.h include/livekit/remote_participant.h include/livekit/livekit.h + include/livekit/rpc_error.h include/livekit/stats.h include/livekit/track.h include/livekit/track_publication.h @@ -198,6 +199,7 @@ add_library(livekit src/track_publication.cpp src/local_track_publication.cpp src/remote_track_publication.cpp + src/rpc_error.cpp src/video_frame.cpp src/video_source.cpp src/video_stream.cpp diff --git a/README.md b/README.md index c294f8f..cc55a3a 100644 --- a/README.md +++ b/README.md @@ -41,8 +41,16 @@ All build actions are managed by the provided build.sh script. ## πŸ§ͺ Run Example +### Generate Tokens +Before running any participant, create JWT tokens with the proper identity and room name, example ```bash -./build/examples/SimpleRoom --url ws://localhost:7880 --token +lk token create -r test -i your_own_identity --join --valid-for 99999h --dev --room=your_own_room +``` + +### SimpleRoom + +```bash +./build/examples/SimpleRoom --url $URL --token ``` You can also provide the URL and token via environment variables: @@ -54,6 +62,33 @@ export LIVEKIT_TOKEN= Press Ctrl-C to exit the example. +### SimpleRpc +The SimpleRpc example demonstrates how to: +- Connect multiple participants to the same LiveKit room +- Register RPC handlers (e.g., arrival, square-root, divide, long-calculation) +- Send RPC requests from one participant to another +- Handle success, application errors, unsupported methods, and timeouts +- Observe round-trip times (RTT) for each RPC call + +#### πŸ”‘ Generate Tokens +Before running any participant, create JWT tokens with "caller", "greeter" and "math-genius" identities and room name. +```bash +lk token create -r test -i caller --join --valid-for 99999h --dev --room=your_own_room +lk token create -r test -i greeter --join --valid-for 99999h --dev --room=your_own_room +lk token create -r test -i math-genius --join --valid-for 99999h --dev --room=your_own_room +``` + +#### β–Ά Start Participants +Every participant is run as a separate terminal process, note --role needs to match the token identity. +```bash +./build/examples/SimpleRpc --url $URL --token --role=math-genius +``` +The caller will automatically: +- Wait for the greeter and math-genius to join +- Perform RPC calls +- Print round-trip times +- Annotate expected successes or expected failures + ## 🧰 Recommended Setup ### macOS ```bash diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f242af3..39e223e 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -4,6 +4,8 @@ project (livekit-examples) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) +#################### SimpleRoom example ########################## + list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") include(sdl3) @@ -32,3 +34,22 @@ add_custom_command(TARGET SimpleRoom POST_BUILD ${CMAKE_SOURCE_DIR}/data ${CMAKE_CURRENT_BINARY_DIR}/data ) + +#################### SimpleRpc example ########################## + +include(FetchContent) +FetchContent_Declare( + nlohmann_json + URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz +) +FetchContent_MakeAvailable(nlohmann_json) + +add_executable(SimpleRpc + simple_rpc/main.cpp +) + +target_link_libraries(SimpleRpc + PRIVATE + nlohmann_json::nlohmann_json + livekit +) \ No newline at end of file diff --git a/examples/simple_room/main.cpp b/examples/simple_room/main.cpp index e675a2f..9338e2f 100644 --- a/examples/simple_room/main.cpp +++ b/examples/simple_room/main.cpp @@ -42,7 +42,7 @@ namespace { std::atomic g_running{true}; -void print_usage(const char *prog) { +void printUsage(const char *prog) { std::cerr << "Usage:\n" << " " << prog << " \n" << "or:\n" @@ -52,9 +52,9 @@ void print_usage(const char *prog) { << " LIVEKIT_URL, LIVEKIT_TOKEN\n"; } -void handle_sigint(int) { g_running.store(false); } +void handleSignal(int) { g_running.store(false); } -bool parse_args(int argc, char *argv[], std::string &url, std::string &token) { +bool parseArgs(int argc, char *argv[], std::string &url, std::string &token) { // 1) --help for (int i = 1; i < argc; ++i) { std::string a = argv[i]; @@ -215,8 +215,8 @@ class SimpleRoomDelegate : public livekit::RoomDelegate { int main(int argc, char *argv[]) { std::string url, token; - if (!parse_args(argc, argv, url, token)) { - print_usage(argv[0]); + if (!parseArgs(argc, argv, url, token)) { + printUsage(argv[0]); return 1; } @@ -238,7 +238,7 @@ int main(int argc, char *argv[]) { std::cout << "Connecting to: " << url << std::endl; // Handle Ctrl-C to exit the idle loop - std::signal(SIGINT, handle_sigint); + std::signal(SIGINT, handleSignal); livekit::Room room{}; SimpleRoomDelegate delegate(media); diff --git a/examples/simple_rpc/main.cpp b/examples/simple_rpc/main.cpp new file mode 100644 index 0000000..4686bb5 --- /dev/null +++ b/examples/simple_rpc/main.cpp @@ -0,0 +1,572 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an β€œAS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "livekit/livekit.h" +#include "livekit_ffi.h" // same as simple_room; internal but used here + +using namespace livekit; +using namespace std::chrono_literals; + +namespace { + +// ------------------------------------------------------------ +// Global control (same pattern as simple_room) +// ------------------------------------------------------------ + +std::atomic g_running{true}; + +void handleSignal(int) { g_running.store(false); } + +void printUsage(const char *prog) { + std::cerr << "Usage:\n" + << " " << prog << " [role]\n" + << "or:\n" + << " " << prog + << " --url= --token= [--role=]\n" + << " " << prog + << " --url --token [--role ]\n\n" + << "Env fallbacks:\n" + << " LIVEKIT_URL, LIVEKIT_TOKEN\n" + << "Role (participant behavior):\n" + << " SIMPLE_RPC_ROLE or --role=\n" + << " default: caller\n"; +} + +inline double nowMs() { + return std::chrono::duration( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); +} + +// Poll the room until a remote participant with the given identity appears, +// or until 'timeout' elapses. Returns true if found, false on timeout. +bool waitForParticipant(Room &room, const std::string &identity, + std::chrono::milliseconds timeout) { + auto start = std::chrono::steady_clock::now(); + + while (std::chrono::steady_clock::now() - start < timeout) { + if (room.remote_participant(identity) != nullptr) { + return true; + } + std::this_thread::sleep_for(100ms); + } + return false; +} + +// For the caller: wait for a specific peer, and if they don't show up, +// explain why and how to start them in another terminal. +bool ensurePeerPresent(Room &room, const std::string &identity, + const std::string &friendly_role, const std::string &url, + std::chrono::seconds timeout) { + std::cout << "[Caller] Waiting up to " << timeout.count() << "s for " + << friendly_role << " (identity=\"" << identity + << "\") to join...\n"; + + bool present = waitForParticipant( + room, identity, + std::chrono::duration_cast(timeout)); + + if (present) { + std::cout << "[Caller] " << friendly_role << " is present.\n"; + return true; + } + + // Timed out + auto info = room.room_info(); + const std::string room_name = info.name; + + std::cout << "[Caller] Timed out after " << timeout.count() + << "s waiting for " << friendly_role << " (identity=\"" << identity + << "\").\n"; + std::cout << "[Caller] No participant with identity \"" << identity + << "\" appears to be connected to room \"" << room_name + << "\".\n\n"; + + std::cout << "To start a " << friendly_role + << " in another terminal, run:\n\n" + << " lk token create -r test -i " << identity + << " --join --valid-for 99999h --dev --room=" << room_name << "\n" + << " ./build/examples/SimpleRpc " << url + << " $token --role=" << friendly_role << "\n\n"; + + return false; +} + +// Parse args similar to simple_room, plus optional --role / role positional +bool parseArgs(int argc, char *argv[], std::string &url, std::string &token, + std::string &role) { + // --help + for (int i = 1; i < argc; ++i) { + std::string a = argv[i]; + if (a == "-h" || a == "--help") { + return false; + } + } + + // helper for flags + auto get_flag_value = [&](const std::string &name, int &i) -> std::string { + std::string arg = argv[i]; + const std::string eq = name + "="; + if (arg.rfind(name, 0) == 0) { // starts with name + if (arg.size() > name.size() && arg[name.size()] == '=') { + return arg.substr(eq.size()); + } else if (i + 1 < argc) { + return std::string(argv[++i]); + } + } + return {}; + }; + + // flags: --url / --token / --role (with = or split) + for (int i = 1; i < argc; ++i) { + const std::string a = argv[i]; + if (a.rfind("--url", 0) == 0) { + auto v = get_flag_value("--url", i); + if (!v.empty()) + url = v; + } else if (a.rfind("--token", 0) == 0) { + auto v = get_flag_value("--token", i); + if (!v.empty()) + token = v; + } else if (a.rfind("--role", 0) == 0) { + auto v = get_flag_value("--role", i); + if (!v.empty()) + role = v; + } + } + + std::vector pos; + for (int i = 1; i < argc; ++i) { + std::string a = argv[i]; + if (a.rfind("--", 0) == 0) + continue; + pos.push_back(std::move(a)); + } + if (!pos.empty()) { + if (url.empty() && pos.size() >= 1) { + url = pos[0]; + } + if (token.empty() && pos.size() >= 2) { + token = pos[1]; + } + if (role.empty() && pos.size() >= 3) { + role = pos[2]; + } + } + + if (url.empty()) { + const char *e = std::getenv("LIVEKIT_URL"); + if (e) + url = e; + } + if (token.empty()) { + const char *e = std::getenv("LIVEKIT_TOKEN"); + if (e) + token = e; + } + if (role.empty()) { + const char *e = std::getenv("SIMPLE_RPC_ROLE"); + if (e) + role = e; + } + if (role.empty()) { + role = "caller"; + } + + return !(url.empty() || token.empty()); +} + +// ------------------------------------------------------------ +// Tiny helpers for the simple JSON used in the sample +// (to avoid bringing in a json library) +// ------------------------------------------------------------ + +// create {"key":number} +std::string makeNumberJson(const std::string &key, double value) { + std::ostringstream oss; + oss << "{\"" << key << "\":" << value << "}"; + return oss.str(); +} + +// create {"key":"value"} +std::string makeStringJson(const std::string &key, const std::string &value) { + std::ostringstream oss; + oss << "{\"" << key << "\":\"" << value << "\"}"; + return oss.str(); +} + +// very naive parse of {"key":number} +double parseNumberFromJson(const std::string &json) { + auto colon = json.find(':'); + if (colon == std::string::npos) + throw std::runtime_error("invalid json: " + json); + auto start = colon + 1; + auto end = json.find_first_of(",}", start); + std::string num_str = json.substr(start, end - start); + return std::stod(num_str); +} + +// very naive parse of {"key":"value"} +std::string parseStringFromJson(const std::string &json) { + auto colon = json.find(':'); + if (colon == std::string::npos) + throw std::runtime_error("invalid json: " + json); + auto first_quote = json.find('"', colon + 1); + if (first_quote == std::string::npos) + throw std::runtime_error("invalid json: " + json); + auto second_quote = json.find('"', first_quote + 1); + if (second_quote == std::string::npos) + throw std::runtime_error("invalid json: " + json); + return json.substr(first_quote + 1, second_quote - first_quote - 1); +} + +// ------------------------------------------------------------ +// RPC handler registration (for greeter & math-genius) +// ------------------------------------------------------------ + +void registerReceiverMethods(Room &greeters_room, Room &math_genius_room) { + LocalParticipant *greeter_lp = greeters_room.local_participant(); + LocalParticipant *math_genius_lp = math_genius_room.local_participant(); + + // arrival + greeter_lp->registerRpcMethod( + "arrival", + [](const RpcInvocationData &data) -> std::optional { + std::cout << "[Greeter] Oh " << data.caller_identity + << " arrived and said \"" << data.payload << "\"\n"; + std::this_thread::sleep_for(2s); + return std::optional{"Welcome and have a wonderful day!"}; + }); + + // square-root + math_genius_lp->registerRpcMethod( + "square-root", + [](const RpcInvocationData &data) -> std::optional { + double number = parseNumberFromJson(data.payload); + std::cout << "[Math Genius] I guess " << data.caller_identity + << " wants the square root of " << number + << ". I've only got " << data.response_timeout_sec + << " seconds to respond but I think I can pull it off.\n"; + std::cout << "[Math Genius] *doing math*…\n"; + std::this_thread::sleep_for(2s); + double result = std::sqrt(number); + std::cout << "[Math Genius] Aha! It's " << result << "\n"; + return makeNumberJson("result", result); + }); + + // divide + math_genius_lp->registerRpcMethod( + "divide", + [](const RpcInvocationData &data) -> std::optional { + // expect {"dividend":X,"divisor":Y} – we'll parse very lazily + auto div_pos = data.payload.find("dividend"); + auto dvr_pos = data.payload.find("divisor"); + if (div_pos == std::string::npos || dvr_pos == std::string::npos) { + throw std::runtime_error("invalid divide payload"); + } + + double dividend = parseNumberFromJson( + data.payload.substr(div_pos, dvr_pos - div_pos - 1)); // rough slice + double divisor = parseNumberFromJson(data.payload.substr(dvr_pos)); + + std::cout << "[Math Genius] " << data.caller_identity + << " wants to divide " << dividend << " by " << divisor + << ".\n"; + + if (divisor == 0.0) { + // will be translated to APPLICATION_ERROR by your RpcError logic + throw std::runtime_error("division by zero"); + } + + double result = dividend / divisor; + return makeNumberJson("result", result); + }); + + // long-calculation + math_genius_lp->registerRpcMethod( + "long-calculation", + [](const RpcInvocationData &data) -> std::optional { + std::cout << "[Math Genius] Starting a very long calculation for " + << data.caller_identity << "\n"; + std::cout << "[Math Genius] This will take 30 seconds even though " + "you're only giving me " + << data.response_timeout_sec << " seconds\n"; + + std::this_thread::sleep_for(30s); + return makeStringJson("result", "Calculation complete!"); + }); + + // Note: we do NOT register "quantum-hypergeometric-series" here, + // so the caller sees UNSUPPORTED_METHOD, just like in Python. +} + +// ------------------------------------------------------------ +// Caller-side helpers (like perform_* in rpc.py) +// ------------------------------------------------------------ + +void performGreeting(Room &room) { + std::cout << "[Caller] Letting the greeter know that I've arrived\n"; + double t0 = nowMs(); + try { + std::string response = room.local_participant()->performRpc( + "greeter", "arrival", "Hello", std::nullopt); + double t1 = nowMs(); + std::cout << "[Caller] RTT: " << (t1 - t0) << " ms\n"; + std::cout << "[Caller] That's nice, the greeter said: \"" << response + << "\"\n"; + } catch (const std::exception &error) { + double t1 = nowMs(); + std::cout << "[Caller] (FAILED) RTT: " << (t1 - t0) << " ms\n"; + std::cout << "[Caller] RPC call failed: " << error.what() << "\n"; + throw; + } +} + +void performSquareRoot(Room &room) { + std::cout << "[Caller] What's the square root of 16?\n"; + double t0 = nowMs(); + try { + std::string payload = makeNumberJson("number", 16.0); + std::string response = room.local_participant()->performRpc( + "math-genius", "square-root", payload, std::nullopt); + double t1 = nowMs(); + std::cout << "[Caller] RTT: " << (t1 - t0) << " ms\n"; + double result = parseNumberFromJson(response); + std::cout << "[Caller] Nice, the answer was " << result << "\n"; + } catch (const std::exception &error) { + double t1 = nowMs(); + std::cout << "[Caller] (FAILED) RTT: " << (t1 - t0) << " ms\n"; + std::cout << "[Caller] RPC call failed: " << error.what() << "\n"; + throw; + } +} + +void performQuantumHyperGeometricSeries(Room &room) { + std::cout << "\n=== Unsupported Method Example ===\n"; + std::cout + << "[Caller] Asking math-genius for 'quantum-hypergeometric-series'. " + "This should FAIL because the handler is NOT registered.\n"; + double t0 = nowMs(); + try { + std::string payload = makeNumberJson("number", 42.0); + std::string response = room.local_participant()->performRpc( + "math-genius", "quantum-hypergeometric-series", payload, std::nullopt); + double t1 = nowMs(); + std::cout << "[Caller] (Unexpected success) RTT=" << (t1 - t0) << " ms\n"; + std::cout << "[Caller] Result: " << response << "\n"; + } catch (const RpcError &error) { + double t1 = nowMs(); + std::cout << "[Caller] RpcError RTT=" << (t1 - t0) << " ms\n"; + auto code = static_cast(error.code()); + if (code == RpcError::ErrorCode::UNSUPPORTED_METHOD) { + std::cout << "[Caller] βœ“ Expected: math-genius does NOT implement this " + "method.\n"; + std::cout << "[Caller] Server returned UNSUPPORTED_METHOD.\n"; + } else { + std::cout << "[Caller] βœ— Unexpected error type: " << error.message() + << "\n"; + } + } +} + +void performDivide(Room &room) { + std::cout << "\n=== Divide Example ===\n"; + std::cout << "[Caller] Asking math-genius to divide 10 by 0. " + "This is EXPECTED to FAIL with an APPLICATION_ERROR.\n"; + double t0 = nowMs(); + try { + std::string payload = "{\"dividend\":10,\"divisor\":0}"; + std::string response = room.local_participant()->performRpc( + "math-genius", "divide", payload, std::nullopt); + double t1 = nowMs(); + std::cout << "[Caller] (Unexpected success) RTT=" << (t1 - t0) << " ms\n"; + std::cout << "[Caller] Result = " << response << "\n"; + } catch (const RpcError &error) { + double t1 = nowMs(); + std::cout << "[Caller] RpcError RTT=" << (t1 - t0) << " ms\n"; + auto code = static_cast(error.code()); + if (code == RpcError::ErrorCode::APPLICATION_ERROR) { + std::cout << "[Caller] βœ“ Expected: divide-by-zero triggers " + "APPLICATION_ERROR.\n"; + std::cout << "[Caller] Math-genius threw an exception: " + << error.message() << "\n"; + } else { + std::cout << "[Caller] βœ— Unexpected RpcError type: " << error.message() + << "\n"; + } + } +} + +void performLongCalculation(Room &room) { + std::cout << "\n=== Long Calculation Example ===\n"; + std::cout + << "[Caller] Asking math-genius for a calculation that takes 30s.\n"; + std::cout + << "[Caller] Giving only 10s to respond. EXPECTED RESULT: TIMEOUT.\n"; + double t0 = nowMs(); + try { + std::string response = room.local_participant()->performRpc( + "math-genius", "long-calculation", "{}", 10.0); + double t1 = nowMs(); + std::cout << "[Caller] (Unexpected success) RTT=" << (t1 - t0) << " ms\n"; + std::cout << "[Caller] Result: " << response << "\n"; + } catch (const RpcError &error) { + double t1 = nowMs(); + std::cout << "[Caller] RpcError RTT=" << (t1 - t0) << " ms\n"; + auto code = static_cast(error.code()); + if (code == RpcError::ErrorCode::RESPONSE_TIMEOUT) { + std::cout + << "[Caller] βœ“ Expected: handler sleeps 30s but timeout is 10s.\n"; + std::cout << "[Caller] Server correctly returned RESPONSE_TIMEOUT.\n"; + } else if (code == RpcError::ErrorCode::RECIPIENT_DISCONNECTED) { + std::cout << "[Caller] βœ“ Expected if math-genius disconnects during the " + "test.\n"; + } else { + std::cout << "[Caller] βœ— Unexpected RPC error: " << error.message() + << "\n"; + } + } +} + +} // namespace + +// ------------------------------------------------------------ +// main – similar style to simple_room/main.cpp +// ------------------------------------------------------------ + +int main(int argc, char *argv[]) { + std::string url, token, role; + if (!parseArgs(argc, argv, url, token, role)) { + printUsage(argv[0]); + return 1; + } + + if (url.empty() || token.empty()) { + std::cerr << "LIVEKIT_URL and LIVEKIT_TOKEN (or CLI args) are required\n"; + return 1; + } + + std::cout << "Connecting to: " << url << "\n"; + std::cout << "Role: " << role << "\n"; + + // Ctrl-C + std::signal(SIGINT, handleSignal); + + Room room{}; + RoomOptions options; + options.auto_subscribe = true; + options.dynacast = false; + + bool res = room.Connect(url, token, options); + std::cout << "Connect result is " << std::boolalpha << res << "\n"; + if (!res) { + std::cerr << "Failed to connect to room\n"; + FfiClient::instance().shutdown(); + return 1; + } + + auto info = room.room_info(); + std::cout << "Connected to room:\n" + << " Name: " << info.name << "\n" + << " Metadata: " << info.metadata << "\n" + << " Num participants: " << info.num_participants << "\n"; + + try { + if (role == "caller") { + // Check that both peers are present (or explain how to start them). + bool has_greeter = ensurePeerPresent(room, "greeter", "greeter", url, 8s); + bool has_math_genius = + ensurePeerPresent(room, "math-genius", "math-genius", url, 8s); + if (!has_greeter || !has_math_genius) { + std::cout << "\n[Caller] One or more RPC peers are missing. " + << "Some examples may be skipped.\n"; + } + if (has_greeter) { + std::cout << "\n\nRunning greeting example...\n"; + performGreeting(room); + } else { + std::cout << "[Caller] Skipping greeting example because greeter is " + "not present.\n"; + } + if (has_math_genius) { + std::cout << "\n\nRunning error handling example...\n"; + performDivide(room); + + std::cout << "\n\nRunning math example...\n"; + performSquareRoot(room); + std::this_thread::sleep_for(2s); + performQuantumHyperGeometricSeries(room); + + std::cout << "\n\nRunning long calculation with timeout...\n"; + performLongCalculation(room); + } else { + std::cout << "[Caller] Skipping math examples because math-genius is " + "not present.\n"; + } + + std::cout << "\n\nCaller done. Exiting.\n"; + } else if (role == "greeter" || role == "math-genius") { + // For these roles we expect multiple processes: + // - One process with role=caller + // - One with role=greeter + // - One with role=math-genius + // + // Each process gets its own token (with that identity) via LIVEKIT_TOKEN. + // Here we only register handlers for the appropriate role, and then + // stay alive until Ctrl-C so we can receive RPCs. + + if (role == "greeter") { + // Use the same room object for both arguments; only "arrival" is used. + registerReceiverMethods(room, room); + } else { // math-genius + // We only need math handlers; greeter handlers won't be used. + registerReceiverMethods(room, room); + } + + std::cout << "RPC handlers registered for role=" << role + << ". Waiting for RPC calls (Ctrl-C to exit)...\n"; + + while (g_running.load()) { + std::this_thread::sleep_for(50ms); + } + std::cout << "Exiting receiver role.\n"; + } else { + std::cerr << "Unknown role: " << role << "\n"; + } + } catch (const std::exception &e) { + std::cerr << "Unexpected error in main: " << e.what() << "\n"; + } + + FfiClient::instance().shutdown(); + return 0; +} diff --git a/include/livekit/ffi_client.h b/include/livekit/ffi_client.h index 51cd226..6b2fa3d 100644 --- a/include/livekit/ffi_client.h +++ b/include/livekit/ffi_client.h @@ -111,6 +111,11 @@ class FfiClient { std::future captureAudioFrameAsync(std::uint64_t source_handle, const proto::AudioFrameBufferInfo &buffer); + std::future performRpcAsync( + std::uint64_t local_participant_handle, + const std::string &destination_identity, const std::string &method, + const std::string &payload, + std::optional response_timeout_ms = std::nullopt); // Generic function for sending a request to the Rust FFI. // Note: For asynchronous requests, use the dedicated async functions instead diff --git a/include/livekit/local_participant.h b/include/livekit/local_participant.h index 1077d73..28aa70a 100644 --- a/include/livekit/local_participant.h +++ b/include/livekit/local_participant.h @@ -19,6 +19,7 @@ #include "livekit/ffi_handle.h" #include "livekit/participant.h" #include "livekit/room_delegate.h" +#include "livekit/rpc_error.h" #include #include @@ -33,8 +34,16 @@ struct ParticipantTrackPermission; class FfiClient; class Track; class LocalTrackPublication; +// TODO, should consider moving Transcription to local_participant.h? struct Transcription; +struct RpcInvocationData { + std::string request_id; + std::string caller_identity; + std::string payload; + double response_timeout_sec; // seconds +}; + /** * Represents the local participant in a room. * @@ -45,6 +54,19 @@ class LocalParticipant : public Participant { using PublicationMap = std::unordered_map>; + /** + * Type of callback used to handle incoming RPC method invocations. + * + * The handler receives an RpcInvocationData describing the incoming call + * and may return an optional response payload. To signal an error to the + * remote caller, throw an RpcError; it will be serialized and forwarded. + * + * Returning std::nullopt means "no payload" and results in an empty + * response body being sent back to the caller. + */ + using RpcHandler = + std::function(const RpcInvocationData &)>; + LocalParticipant(FfiHandle handle, std::string sid, std::string name, std::string identity, std::string metadata, std::unordered_map attributes, @@ -91,10 +113,6 @@ class LocalParticipant : public Participant { void setAttributes(const std::unordered_map &attributes); - // ------------------------------------------------------------------------- - // Subscription permissions - // ------------------------------------------------------------------------- - /** * Set track subscription permissions for this participant. * @@ -106,10 +124,6 @@ class LocalParticipant : public Participant { const std::vector &participant_permissions = {}); - // ------------------------------------------------------------------------- - // Track publish / unpublish (synchronous analogue) - // ------------------------------------------------------------------------- - /** * Publish a local track to the room. * @@ -126,8 +140,73 @@ class LocalParticipant : public Participant { */ void unpublishTrack(const std::string &track_sid); + /** + * Initiate an RPC call to a remote participant. + * + * @param destination_identity Identity of the destination participant. + * @param method Name of the RPC method to invoke. + * @param payload Request payload to send to the remote handler. + * @param response_timeout Optional timeout in seconds for receiving + * a response. If not set, the server default + * timeout (15 seconds) is used. + * + * @return The response payload returned by the remote handler. + * + * @throws RpcError If the remote side returns an RPC error, times out, + * or rejects the request. + * @throws std::runtime_error If the underlying FFI handle is invalid or + * the FFI call fails unexpectedly. + */ + std::string performRpc(const std::string &destination_identity, + const std::string &method, const std::string &payload, + std::optional response_timeout = std::nullopt); + + /** + * Register a handler for an incoming RPC method. + * + * Once registered, the provided handler will be invoked whenever a remote + * participant calls the given method name on this LocalParticipant. + * + * @param method_name Name of the RPC method to handle. This must match + * the method name used by remote callers. + * @param handler Callback to execute when an invocation is received. + * The handler may return an optional response payload + * or throw an RpcError to signal failure. + * + * If a handler is already registered for the same method_name, it will be + * replaced by the new handler. + */ + + void registerRpcMethod(const std::string &method_name, RpcHandler handler); + + /** + * Unregister a previously registered RPC method handler. + * + * After this call, invocations for the given method_name will no longer + * be dispatched to a local handler and will instead result in an + * "unsupported method" error being returned to the caller. + * + * @param method_name Name of the RPC method to unregister. + * If no handler is registered for this name, the call + * is a no-op. + */ + void unregisterRpcMethod(const std::string &method_name); + +protected: + // Called by Room when an rpc_method_invocation event is received from the + // SFU. This is internal plumbing and not intended to be called directly by + // SDK users. + void handleRpcMethodInvocation(std::uint64_t invocation_id, + const std::string &method, + const std::string &request_id, + const std::string &caller_identity, + const std::string &payload, + double response_timeout); + friend class Room; + private: PublicationMap track_publications_; + std::unordered_map rpc_handlers_; }; } // namespace livekit diff --git a/include/livekit/rpc_error.h b/include/livekit/rpc_error.h new file mode 100644 index 0000000..0fd7195 --- /dev/null +++ b/include/livekit/rpc_error.h @@ -0,0 +1,123 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an β€œAS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace livekit { + +namespace proto { +class RpcError; +} + +/** + * Specialized error type for RPC methods. + * + * Instances of this type, when thrown in a method handler, will have their + * `code`, `message`, and optional `data` serialized into a proto::RpcError + * and sent across the wire. The caller will receive an equivalent error + * on the other side. + * + * Built-in errors are included (codes 1400–1999) but developers may use + * arbitrary codes as well. + */ +class RpcError : public std::runtime_error { +public: + /** + * Built-in error codes, mirroring the Python RpcError.ErrorCode enum. + */ + enum class ErrorCode : std::uint32_t { + APPLICATION_ERROR = 1500, + CONNECTION_TIMEOUT = 1501, + RESPONSE_TIMEOUT = 1502, + RECIPIENT_DISCONNECTED = 1503, + RESPONSE_PAYLOAD_TOO_LARGE = 1504, + SEND_FAILED = 1505, + + UNSUPPORTED_METHOD = 1400, + RECIPIENT_NOT_FOUND = 1401, + REQUEST_PAYLOAD_TOO_LARGE = 1402, + UNSUPPORTED_SERVER = 1403, + UNSUPPORTED_VERSION = 1404, + }; + + /** + * Construct an RpcError with an explicit numeric code. + * + * @param code Error code value. Codes 1001–1999 are reserved for + * built-in errors (see ErrorCode). + * @param message Human-readable error message. + * @param data Optional extra data, e.g. JSON. Empty string means no data. + */ + RpcError(std::uint32_t code, std::string message, std::string data = {}); + + /** + * Construct an RpcError from a built-in ErrorCode. + * + * @param code Built-in error code. + * @param message Human-readable error message. + * @param data Optional extra data, e.g. JSON. Empty string means no data. + */ + RpcError(ErrorCode code, std::string message, std::string data = {}); + + /** + * Numeric error code. + * + * Codes 1001–1999 are reserved for built-in errors. For built-ins, this + * value matches the underlying ErrorCode enum value. + */ + std::uint32_t code() const noexcept; + + /** + * Human-readable error message. + */ + const std::string &message() const noexcept; + + /** + * Optional extra data associated with the error (JSON recommended). + * May be an empty string if no data was provided. + */ + const std::string &data() const noexcept; + + /** + * Create a built-in RpcError using a predefined ErrorCode and default + * message text that matches the Python RpcError.ErrorMessage table. + * + * @param code Built-in error code. + * @param data Optional extra data payload (JSON recommended). + */ + static RpcError builtIn(ErrorCode code, const std::string &data = {}); + +protected: + // ----- Protected: only used by LocalParticipant (internal SDK code) ----- + proto::RpcError toProto() const; + static RpcError fromProto(const proto::RpcError &err); + + friend class LocalParticipant; + friend class FfiClient; + +private: + static const char *defaultMessageFor(ErrorCode code); + + std::uint32_t code_; + std::string message_; + std::string data_; +}; + +} // namespace livekit \ No newline at end of file diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 5b426d0..38bc52d 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -22,6 +22,7 @@ #include "livekit/ffi_client.h" #include "livekit/ffi_handle.h" #include "livekit/room.h" // TODO, maybe avoid circular deps by moving RoomOptions to a room_types.h ? +#include "livekit/rpc_error.h" #include "livekit/track.h" #include "livekit_ffi.h" #include "room.pb.h" @@ -505,4 +506,43 @@ FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, }); } +std::future +FfiClient::performRpcAsync(std::uint64_t local_participant_handle, + const std::string &destination_identity, + const std::string &method, + const std::string &payload, + std::optional response_timeout_ms) { + proto::FfiRequest req; + auto *msg = req.mutable_perform_rpc(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_destination_identity(destination_identity); + msg->set_method(method); + msg->set_payload(payload); + if (response_timeout_ms.has_value()) { + msg->set_response_timeout_ms(*response_timeout_ms); + } + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_perform_rpc()) { + throw std::runtime_error("FfiResponse missing perform_rpc"); + } + const AsyncId async_id = resp.perform_rpc().async_id(); + return registerAsync( + // match predicate + [async_id](const proto::FfiEvent &event) { + return event.has_perform_rpc() && + event.perform_rpc().async_id() == async_id; + }, + [](const proto::FfiEvent &event, std::promise &pr) { + const auto &cb = event.perform_rpc(); + + if (cb.has_error()) { + // RpcError is a proto message; convert to C++ RpcError and throw + pr.set_exception( + std::make_exception_ptr(RpcError::fromProto(cb.error()))); + return; + } + pr.set_value(cb.payload()); + }); +} + } // namespace livekit diff --git a/src/local_participant.cpp b/src/local_participant.cpp index 2f52573..02f90d5 100644 --- a/src/local_participant.cpp +++ b/src/local_participant.cpp @@ -251,4 +251,103 @@ void LocalParticipant::unpublishTrack(const std::string &track_sid) { track_publications_.erase(track_sid); } +std::string LocalParticipant::performRpc( + const std::string &destination_identity, const std::string &method, + const std::string &payload, std::optional response_timeout) { + auto handle_id = ffiHandleId(); + if (handle_id == 0) { + throw std::runtime_error( + "LocalParticipant::performRpc: invalid FFI handle"); + } + + std::uint32_t timeout_ms = 0; + bool has_timeout = false; + if (response_timeout.has_value()) { + timeout_ms = static_cast(response_timeout.value() * 1000.0); + has_timeout = true; + } + + auto fut = FfiClient::instance().performRpcAsync( + static_cast(handle_id), destination_identity, method, + payload, + has_timeout ? std::optional(timeout_ms) : std::nullopt); + return fut.get(); +} + +void LocalParticipant::registerRpcMethod(const std::string &method_name, + RpcHandler handler) { + auto handle_id = ffiHandleId(); + if (handle_id == 0) { + throw std::runtime_error( + "LocalParticipant::registerRpcMethod: invalid FFI handle"); + } + rpc_handlers_[method_name] = std::move(handler); + FfiRequest req; + auto *msg = req.mutable_register_rpc_method(); + msg->set_local_participant_handle(static_cast(handle_id)); + msg->set_method(method_name); + + (void)FfiClient::instance().sendRequest(req); +} + +void LocalParticipant::unregisterRpcMethod(const std::string &method_name) { + auto handle_id = ffiHandleId(); + if (handle_id == 0) { + throw std::runtime_error( + "LocalParticipant::unregisterRpcMethod: invalid FFI handle"); + } + rpc_handlers_.erase(method_name); + FfiRequest req; + auto *msg = req.mutable_unregister_rpc_method(); + msg->set_local_participant_handle(static_cast(handle_id)); + msg->set_method(method_name); + + (void)FfiClient::instance().sendRequest(req); +} + +void LocalParticipant::handleRpcMethodInvocation( + uint64_t invocation_id, const std::string &method, + const std::string &request_id, const std::string &caller_identity, + const std::string &payload, double response_timeout_sec) { + std::optional response_error; + std::optional response_payload; + std::cout << "handleRpcMethodInvocation \n"; + RpcInvocationData params{request_id, caller_identity, payload, + response_timeout_sec}; + auto it = rpc_handlers_.find(method); + if (it == rpc_handlers_.end()) { + // No handler registered β†’ built-in UNSUPPORTED_METHOD + response_error = RpcError::builtIn(RpcError::ErrorCode::UNSUPPORTED_METHOD); + } else { + try { + // Invoke user handler: may return payload or throw RpcError + response_payload = it->second(params); + } catch (const RpcError &err) { + // Handler explicitly signalled an RPC error: forward as-is + response_error = err; + } catch (const std::exception &ex) { + // Any other exception: wrap as built-in APPLICATION_ERROR + response_error = + RpcError::builtIn(RpcError::ErrorCode::APPLICATION_ERROR, ex.what()); + } catch (...) { + response_error = RpcError::builtIn(RpcError::ErrorCode::APPLICATION_ERROR, + "unknown error"); + } + } + + FfiRequest req; + auto *msg = req.mutable_rpc_method_invocation_response(); + msg->set_local_participant_handle(ffiHandleId()); + msg->set_invocation_id(invocation_id); + if (response_error.has_value()) { + auto *err_proto = msg->mutable_error(); + err_proto->CopyFrom(response_error->toProto()); + } + if (response_payload.has_value()) { + msg->set_payload(*response_payload); + } + std::cout << "handleRpcMethodInvocation sendrequest \n"; + FfiClient::instance().sendRequest(req); +} + } // namespace livekit diff --git a/src/room.cpp b/src/room.cpp index f52edb1..6bf1877 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -166,8 +166,35 @@ void Room::OnEvent(const FfiEvent &event) { { std::lock_guard guard(lock_); delegate_snapshot = delegate_; - // If you want, you can also update internal state here (participants, room - // info, etc.). + } + + // First, handle RPC method invocations (not part of RoomEvent). + if (event.message_case() == FfiEvent::kRpcMethodInvocation) { + const auto &rpc = event.rpc_method_invocation(); + std::cout << "kRpcMethodInvocation \n"; + + LocalParticipant *lp = nullptr; + { + std::lock_guard guard(lock_); + if (!local_participant_) { + return; + } + auto local_handle = local_participant_->ffiHandleId(); + if (local_handle == 0 || rpc.local_participant_handle() != + static_cast(local_handle)) { + // RPC is not targeted at this room's local participant; ignore. + return; + } + lp = local_participant_.get(); + } + + // Call outside the lock to avoid deadlocks / re-entrancy issues. + lp->handleRpcMethodInvocation( + rpc.invocation_id(), rpc.method(), rpc.request_id(), + rpc.caller_identity(), rpc.payload(), + static_cast(rpc.response_timeout_ms()) / 1000.0); + + return; } if (!delegate_snapshot) { diff --git a/src/room_proto_converter.cpp b/src/room_proto_converter.cpp index 0c41a6a..7a6b1c1 100644 --- a/src/room_proto_converter.cpp +++ b/src/room_proto_converter.cpp @@ -16,6 +16,7 @@ #include "room_proto_converter.h" +#include "livekit/local_participant.h" #include "room.pb.h" namespace livekit { diff --git a/src/room_proto_converter.h b/src/room_proto_converter.h index 89ba7c4..127a70c 100644 --- a/src/room_proto_converter.h +++ b/src/room_proto_converter.h @@ -19,8 +19,12 @@ #include "livekit/room_delegate.h" #include "room.pb.h" +#include + namespace livekit { +enum class RpcErrorCode; + // --------- basic helper conversions --------- ConnectionQuality toConnectionQuality(proto::ConnectionQuality in); diff --git a/src/rpc_error.cpp b/src/rpc_error.cpp new file mode 100644 index 0000000..14c7d29 --- /dev/null +++ b/src/rpc_error.cpp @@ -0,0 +1,91 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an β€œAS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/rpc_error.h" + +#include "rpc.pb.h" + +namespace livekit { + +RpcError::RpcError(std::uint32_t code, std::string message, std::string data) + : std::runtime_error(message), code_(code), message_(std::move(message)), + data_(std::move(data)) {} + +RpcError::RpcError(ErrorCode code, std::string message, std::string data) + : RpcError(static_cast(code), std::move(message), + std::move(data)) {} + +std::uint32_t RpcError::code() const noexcept { return code_; } + +const std::string &RpcError::message() const noexcept { return message_; } + +const std::string &RpcError::data() const noexcept { return data_; } + +proto::RpcError RpcError::toProto() const { + proto::RpcError err; + err.set_code(code_); + err.set_message(message_); + + // Set data only if non-empty; empty string means "no data". + if (!data_.empty()) { + err.set_data(data_); + } + + return err; +} + +RpcError RpcError::fromProto(const proto::RpcError &err) { + // proto::RpcError.data() will return empty string if unset, which is fine. + return RpcError(err.code(), err.message(), err.data()); +} + +RpcError RpcError::builtIn(ErrorCode code, const std::string &data) { + const char *msg = defaultMessageFor(code); + return RpcError(code, msg ? std::string(msg) : std::string{}, data); +} + +const char *RpcError::defaultMessageFor(ErrorCode code) { + // Mirror Python RpcError.ErrorMessage mapping. + switch (code) { + case ErrorCode::APPLICATION_ERROR: + return "Application error in method handler"; + case ErrorCode::CONNECTION_TIMEOUT: + return "Connection timeout"; + case ErrorCode::RESPONSE_TIMEOUT: + return "Response timeout"; + case ErrorCode::RECIPIENT_DISCONNECTED: + return "Recipient disconnected"; + case ErrorCode::RESPONSE_PAYLOAD_TOO_LARGE: + return "Response payload too large"; + case ErrorCode::SEND_FAILED: + return "Failed to send"; + case ErrorCode::UNSUPPORTED_METHOD: + return "Method not supported at destination"; + case ErrorCode::RECIPIENT_NOT_FOUND: + return "Recipient not found"; + case ErrorCode::REQUEST_PAYLOAD_TOO_LARGE: + return "Request payload too large"; + case ErrorCode::UNSUPPORTED_SERVER: + return "RPC not supported by server"; + case ErrorCode::UNSUPPORTED_VERSION: + return "Unsupported RPC version"; + } + + // Should be unreachable if all enum values are covered. + return ""; +} + +} // namespace livekit \ No newline at end of file From fd772ffbbbaa7c614151d43884bac956d1f902b8 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Fri, 5 Dec 2025 12:16:15 -0800 Subject: [PATCH 2/8] added a new README and fix the comments --- README.md | 2 +- examples/simple_rpc/README | 157 +++++++++++++++++++++++++++++++++++ examples/simple_rpc/main.cpp | 41 ++------- 3 files changed, 163 insertions(+), 37 deletions(-) create mode 100644 examples/simple_rpc/README diff --git a/README.md b/README.md index cc55a3a..fe44301 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ The SimpleRpc example demonstrates how to: - Observe round-trip times (RTT) for each RPC call #### πŸ”‘ Generate Tokens -Before running any participant, create JWT tokens with "caller", "greeter" and "math-genius" identities and room name. +Before running any participant, create JWT tokens with **caller**, **greeter** and **math-genius** identities and room name. ```bash lk token create -r test -i caller --join --valid-for 99999h --dev --room=your_own_room lk token create -r test -i greeter --join --valid-for 99999h --dev --room=your_own_room diff --git a/examples/simple_rpc/README b/examples/simple_rpc/README new file mode 100644 index 0000000..64bc870 --- /dev/null +++ b/examples/simple_rpc/README @@ -0,0 +1,157 @@ +# πŸ“˜ SimpleRpc Example β€” Technical Overview + +This README provides deeper technical details about the RPC (Remote Procedure Call) support demonstrated in the SimpleRpc example. +It complements the example instructions found in the root README.md. + +If you're looking for how to run the example, see the root [README](https://github.com/livekit/client-sdk-cpp). + +This document explains: +- How LiveKit RPC works in the C++ SDK +- Where the APIs are defined +- How senders call RPC methods +- How receivers register handlers +- What happens if the receiver is absent +- How long-running operations behave +- Timeouts, disconnects, and unsupported methods +- RPC lifecycle events and error propagation + +## πŸ”§ Overview: How RPC Works +LiveKit RPC allows one participant (the caller) to invoke a method on another participant (the receiver) using the data channel transport. +It is: +- Peer-to-peer within the room (not server-executed RPC) +- Request/response (caller waits for a reply or an error) +- Asynchronous under the hood, synchronous or blocking from the caller’s perspective +- Delivery-guaranteed when using the reliable data channel + +Each RPC call includes: +| Field | Meaning | +|--------------------------|-----------------------------------------------------| +| **destination_identity** | Identity of the target participant | +| **method** | Method name string (e.g., "square-root") | +| **payload** | Arbitrary UTF-8 text | +| **response_timeout** | Optional timeout (seconds) | +| **invocation_id** | Server-generated ID used internally for correlation | + +## πŸ“ Location of APIs in C++ +All public-facing RPC APIs live in: +[include/livekit/local_participant.h](https://github.com/livekit/client-sdk-cpp/blob/main/include/livekit/local_participant.h#L124) + +### Key methods: + +#### Sender-side APIs +```bash +std::string performRpc( + const std::string& destination_identity, + const std::string& method, + const std::string& payload, + std::optional response_timeout_sec = std::nullopt +); + +Receiver-side APIs +void registerRpcMethod( + const std::string& method_name, + RpcHandler handler +); + +void unregisterRpcMethod(const std::string& method_name); + +Handler signature +using RpcHandler = + std::function(const RpcInvocationData&)>; +``` + +Handlers can: +- Return a string (the RPC response payload) +- Return std::nullopt (meaning β€œno return payload”) +- Throw exceptions (mapped to APPLICATION_ERROR) +- Throw a RpcError (mapped to specific error codes) + +#### πŸ›° Sender Behavior (performRpc) + +When the caller invokes: +```bash +auto reply = lp->performRpc("math-genius", "square-root", "{\"number\":16}"); +``` + +The following occurs: + +A PerformRpcRequest is sent through FFI to the SDK core. + +The SDK transmits the invocation to the target participant (if present). + +The caller begins waiting for a matching RpcMethodInvocationResponse. + +One of the following happens: +| Outcome | Meaning | +|--------------------------|------------------------------------------| +| **Success** | Receiver returned a payload | +| **UNSUPPORTED_METHOD** | Receiver did not register the method | +| **RECIPIENT_NOT_FOUND** | Target identity not present in room | +| **RECIPIENT_DISCONNECTED** | Target left before replying | +| **RESPONSE_TIMEOUT** | Receiver took too long | +| **APPLICATION_ERROR** | Handler threw an exception | + +#### πŸ”„ Round-trip time (RTT) + +The caller can measure RTT externally (as SimpleRpc does), but the SDK does not measure RTT internally. + +#### πŸ“‘ Receiver Behavior (registerRpcMethod) + +A receiver must explicitly register handlers: +```bash +local_participant->registerRpcMethod("square-root", + [](const RpcInvocationData& data) { + double number = parse(data.payload); + return make_json("result", std::sqrt(number)); + }); +``` + +When an invocation arrives: +- Room receives a RpcMethodInvocationEvent +- Room forwards it to the corresponding LocalParticipant +- LocalParticipant::handleRpcMethodInvocation(): +- Calls the handler +- Converts any exceptions into RpcError +- Sends back RpcMethodInvocationResponse + +⚠ If no handler exists: + +Receiver returns: UNSUPPORTED_METHOD + + +#### 🚨 What Happens if Receiver Is Absent? +| Case | Behavior | +|-----------------------------------------------------|---------------------------------------------------| +| Receiver identity is not in the room | Caller immediately receives `RECIPIENT_NOT_FOUND` | +| Receiver is present but disconnects before replying | Caller receives `RECIPIENT_DISCONNECTED` | +| Receiver joins later | Caller must retry manually (no automatic waiting) | + +**Important**: +LiveKit does not queue RPC calls for offline participants. + +#### ⏳ Timeout Behavior + +If the caller specifies: + +performRpc(..., /*response_timeout=*/10.0); + +Then: +- Receiver is given 10 seconds to respond. +- If the receiver handler takes longer (e.g., sleep 30s), caller receives: +RESPONSE_TIMEOUT + +**If no response_timeout is provided explicitly, the default timeout is 15 seconds.** + + +This is by design and demonstrated in the example. + +#### 🧨 Errors & Failure Modes +| Error Code | Cause | +|------------------------|---------------------------------------------| +| **APPLICATION_ERROR** | Handler threw a C++ exception | +| **UNSUPPORTED_METHOD** | No handler registered for the method | +| **RECIPIENT_NOT_FOUND** | Destination identity not in room | +| **RECIPIENT_DISCONNECTED** | Participant left mid-flight | +| **RESPONSE_TIMEOUT** | Handler exceeded allowed response time | +| **CONNECTION_TIMEOUT** | Transport-level issue | +| **SEND_FAILED** | SDK failed to send invocation | diff --git a/examples/simple_rpc/main.cpp b/examples/simple_rpc/main.cpp index 4686bb5..ff5c4af 100644 --- a/examples/simple_rpc/main.cpp +++ b/examples/simple_rpc/main.cpp @@ -32,17 +32,13 @@ #include #include "livekit/livekit.h" -#include "livekit_ffi.h" // same as simple_room; internal but used here +#include "livekit_ffi.h" using namespace livekit; using namespace std::chrono_literals; namespace { -// ------------------------------------------------------------ -// Global control (same pattern as simple_room) -// ------------------------------------------------------------ - std::atomic g_running{true}; void handleSignal(int) { g_running.store(false); } @@ -91,38 +87,31 @@ bool ensurePeerPresent(Room &room, const std::string &identity, std::cout << "[Caller] Waiting up to " << timeout.count() << "s for " << friendly_role << " (identity=\"" << identity << "\") to join...\n"; - bool present = waitForParticipant( room, identity, std::chrono::duration_cast(timeout)); - if (present) { std::cout << "[Caller] " << friendly_role << " is present.\n"; return true; } - // Timed out auto info = room.room_info(); const std::string room_name = info.name; - std::cout << "[Caller] Timed out after " << timeout.count() << "s waiting for " << friendly_role << " (identity=\"" << identity << "\").\n"; std::cout << "[Caller] No participant with identity \"" << identity << "\" appears to be connected to room \"" << room_name << "\".\n\n"; - std::cout << "To start a " << friendly_role << " in another terminal, run:\n\n" << " lk token create -r test -i " << identity << " --join --valid-for 99999h --dev --room=" << room_name << "\n" << " ./build/examples/SimpleRpc " << url << " $token --role=" << friendly_role << "\n\n"; - return false; } -// Parse args similar to simple_room, plus optional --role / role positional bool parseArgs(int argc, char *argv[], std::string &url, std::string &token, std::string &role) { // --help @@ -206,26 +195,18 @@ bool parseArgs(int argc, char *argv[], std::string &url, std::string &token, return !(url.empty() || token.empty()); } -// ------------------------------------------------------------ -// Tiny helpers for the simple JSON used in the sample -// (to avoid bringing in a json library) -// ------------------------------------------------------------ - -// create {"key":number} std::string makeNumberJson(const std::string &key, double value) { std::ostringstream oss; oss << "{\"" << key << "\":" << value << "}"; return oss.str(); } -// create {"key":"value"} std::string makeStringJson(const std::string &key, const std::string &value) { std::ostringstream oss; oss << "{\"" << key << "\":\"" << value << "\"}"; return oss.str(); } -// very naive parse of {"key":number} double parseNumberFromJson(const std::string &json) { auto colon = json.find(':'); if (colon == std::string::npos) @@ -236,7 +217,6 @@ double parseNumberFromJson(const std::string &json) { return std::stod(num_str); } -// very naive parse of {"key":"value"} std::string parseStringFromJson(const std::string &json) { auto colon = json.find(':'); if (colon == std::string::npos) @@ -250,10 +230,7 @@ std::string parseStringFromJson(const std::string &json) { return json.substr(first_quote + 1, second_quote - first_quote - 1); } -// ------------------------------------------------------------ -// RPC handler registration (for greeter & math-genius) -// ------------------------------------------------------------ - +// RPC handler registration void registerReceiverMethods(Room &greeters_room, Room &math_genius_room) { LocalParticipant *greeter_lp = greeters_room.local_participant(); LocalParticipant *math_genius_lp = math_genius_room.local_participant(); @@ -321,19 +298,15 @@ void registerReceiverMethods(Room &greeters_room, Room &math_genius_room) { std::cout << "[Math Genius] This will take 30 seconds even though " "you're only giving me " << data.response_timeout_sec << " seconds\n"; - + // Sleep for 30 seconds to mimic a long running task. std::this_thread::sleep_for(30s); return makeStringJson("result", "Calculation complete!"); }); // Note: we do NOT register "quantum-hypergeometric-series" here, - // so the caller sees UNSUPPORTED_METHOD, just like in Python. + // so the caller sees UNSUPPORTED_METHOD } -// ------------------------------------------------------------ -// Caller-side helpers (like perform_* in rpc.py) -// ------------------------------------------------------------ - void performGreeting(Room &room) { std::cout << "[Caller] Letting the greeter know that I've arrived\n"; double t0 = nowMs(); @@ -460,10 +433,6 @@ void performLongCalculation(Room &room) { } // namespace -// ------------------------------------------------------------ -// main – similar style to simple_room/main.cpp -// ------------------------------------------------------------ - int main(int argc, char *argv[]) { std::string url, token, role; if (!parseArgs(argc, argv, url, token, role)) { @@ -479,7 +448,7 @@ int main(int argc, char *argv[]) { std::cout << "Connecting to: " << url << "\n"; std::cout << "Role: " << role << "\n"; - // Ctrl-C + // Ctrl-C to quit the program std::signal(SIGINT, handleSignal); Room room{}; From 615921445ded8af4d6eb63bdfa9efbe8f7aa35c9 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Fri, 5 Dec 2025 12:17:21 -0800 Subject: [PATCH 3/8] point to the right line --- examples/simple_rpc/README | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/simple_rpc/README b/examples/simple_rpc/README index 64bc870..2ded78c 100644 --- a/examples/simple_rpc/README +++ b/examples/simple_rpc/README @@ -34,7 +34,7 @@ Each RPC call includes: ## πŸ“ Location of APIs in C++ All public-facing RPC APIs live in: -[include/livekit/local_participant.h](https://github.com/livekit/client-sdk-cpp/blob/main/include/livekit/local_participant.h#L124) +[include/livekit/local_participant.h](https://github.com/livekit/client-sdk-cpp/blob/main/include/livekit/local_participant.h#L160) ### Key methods: From 94f00cef2a393c28e3ed98ad1b2322351530b42f Mon Sep 17 00:00:00 2001 From: shijing xian Date: Fri, 5 Dec 2025 16:23:12 -0800 Subject: [PATCH 4/8] fix the linux build --- include/livekit/local_participant.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/livekit/local_participant.h b/include/livekit/local_participant.h index 28aa70a..17dc9a0 100644 --- a/include/livekit/local_participant.h +++ b/include/livekit/local_participant.h @@ -22,6 +22,7 @@ #include "livekit/rpc_error.h" #include +#include #include #include #include From b29e8bcef234cddb5e8a2bff97dd564dfb868d54 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 8 Dec 2025 20:31:24 -0800 Subject: [PATCH 5/8] fix the build and addressed the comments --- examples/simple_rpc/main.cpp | 1 + src/room.cpp | 596 ++++++++++++++++++++++++++++++----- src/room_event_converter.h | 107 ------- 3 files changed, 525 insertions(+), 179 deletions(-) delete mode 100644 src/room_event_converter.h diff --git a/examples/simple_rpc/main.cpp b/examples/simple_rpc/main.cpp index ff5c4af..61151db 100644 --- a/examples/simple_rpc/main.cpp +++ b/examples/simple_rpc/main.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include diff --git a/src/room.cpp b/src/room.cpp index 6bf1877..4f29583 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -45,7 +45,7 @@ using proto::FfiResponse; namespace { -std::unique_ptr +std::shared_ptr createRemoteParticipant(const proto::OwnedParticipant &owned) { const auto &pinfo = owned.info(); std::unordered_map attrs; @@ -56,7 +56,7 @@ createRemoteParticipant(const proto::OwnedParticipant &owned) { auto kind = livekit::fromProto(pinfo.kind()); auto reason = livekit::toDisconnectReason(pinfo.disconnect_reason()); livekit::FfiHandle handle(static_cast(owned.handle().id())); - return std::make_unique( + return std::make_shared( std::move(handle), pinfo.sid(), pinfo.name(), pinfo.identity(), pinfo.metadata(), std::move(attrs), kind, reason); } @@ -125,8 +125,8 @@ bool Room::Connect(const std::string &url, const std::string &token, for (const auto &owned_publication_info : pt.publications()) { auto publication = std::make_shared(owned_publication_info); - rp->mutable_track_publications().emplace(publication->sid(), - std::move(publication)); + rp->mutableTrackPublications().emplace(publication->sid(), + std::move(publication)); } remote_participants_.emplace(rp->identity(), std::move(rp)); @@ -171,7 +171,6 @@ void Room::OnEvent(const FfiEvent &event) { // First, handle RPC method invocations (not part of RoomEvent). if (event.message_case() == FfiEvent::kRpcMethodInvocation) { const auto &rpc = event.rpc_method_invocation(); - std::cout << "kRpcMethodInvocation \n"; LocalParticipant *lp = nullptr; { @@ -180,8 +179,9 @@ void Room::OnEvent(const FfiEvent &event) { return; } auto local_handle = local_participant_->ffiHandleId(); - if (local_handle == 0 || rpc.local_participant_handle() != - static_cast(local_handle)) { + if (local_handle == INVALID_HANDLE || + rpc.local_participant_handle() != + static_cast(local_handle)) { // RPC is not targeted at this room's local participant; ignore. return; } @@ -210,27 +210,34 @@ void Room::OnEvent(const FfiEvent &event) { switch (re.message_case()) { case proto::RoomEvent::kParticipantConnected: { - auto ev = fromProto(re.participant_connected()); - std::cout << "kParticipantConnected " << std::endl; - // Create and register RemoteParticipant + std::shared_ptr new_participant; { std::lock_guard guard(lock_); - auto rp = createRemoteParticipant(re.participant_connected().info()); - remote_participants_.emplace(rp->identity(), std::move(rp)); + const auto &owned = re.participant_connected().info(); + // createRemoteParticipant takes proto::OwnedParticipant + new_participant = createRemoteParticipant(owned); + remote_participants_.emplace(new_participant->identity(), + new_participant); } - // TODO, use better public callback events + ParticipantConnectedEvent ev; + ev.participant = new_participant.get(); delegate_snapshot->onParticipantConnected(*this, ev); break; } case proto::RoomEvent::kParticipantDisconnected: { - auto ev = fromProto(re.participant_disconnected()); + std::shared_ptr removed; + DisconnectReason reason = DisconnectReason::Unknown; + { std::lock_guard guard(lock_); const auto &pd = re.participant_disconnected(); const std::string &identity = pd.participant_identity(); + reason = toDisconnectReason(pd.disconnect_reason()); + auto it = remote_participants_.find(identity); if (it != remote_participants_.end()) { + removed = it->second; remote_participants_.erase(it); } else { // We saw a disconnect event for a participant we don't track @@ -240,28 +247,86 @@ void Room::OnEvent(const FfiEvent &event) { << identity << std::endl; } } - // TODO, should we trigger onParticipantDisconnected if remote - // participants can't be found ? - delegate_snapshot->onParticipantDisconnected(*this, ev); + if (removed) { + ParticipantDisconnectedEvent ev; + ev.participant = removed.get(); + ev.reason = reason; + delegate_snapshot->onParticipantDisconnected(*this, ev); + } break; } case proto::RoomEvent::kLocalTrackPublished: { - auto ev = fromProto(re.local_track_published()); + LocalTrackPublishedEvent ev; + { + std::lock_guard guard(lock_); + if (!local_participant_) { + std::cerr << "kLocalTrackPublished: local_participant_ is nullptr" + << std::endl; + break; + } + const auto <p = re.local_track_published(); + const std::string &sid = ltp.track_sid(); + auto &pubs = local_participant_->trackPublications(); + auto it = pubs.find(sid); + if (it == pubs.end()) { + std::cerr << "local_track_published for unknown sid: " << sid + << std::endl; + break; + } + ev.publication = it->second; + ev.track = ev.publication ? ev.publication->track() : nullptr; + } delegate_snapshot->onLocalTrackPublished(*this, ev); break; } case proto::RoomEvent::kLocalTrackUnpublished: { - auto ev = fromProto(re.local_track_unpublished()); + LocalTrackUnpublishedEvent ev; + { + std::lock_guard guard(lock_); + if (!local_participant_) { + std::cerr << "kLocalTrackPublished: local_participant_ is nullptr" + << std::endl; + break; + } + const auto <u = re.local_track_unpublished(); + const std::string &pub_sid = ltu.publication_sid(); + auto &pubs = local_participant_->trackPublications(); + auto it = pubs.find(pub_sid); + if (it == pubs.end()) { + std::cerr << "local_track_unpublished for unknown publication sid: " + << pub_sid << std::endl; + break; + } + ev.publication = it->second; + } delegate_snapshot->onLocalTrackUnpublished(*this, ev); break; } case proto::RoomEvent::kLocalTrackSubscribed: { - auto ev = fromProto(re.local_track_subscribed()); + LocalTrackSubscribedEvent ev; + { + std::lock_guard guard(lock_); + if (!local_participant_) { + break; + } + const auto <s = re.local_track_subscribed(); + const std::string &sid = lts.track_sid(); + auto &pubs = local_participant_->trackPublications(); + auto it = pubs.find(sid); + if (it == pubs.end()) { + std::cerr << "local_track_subscribed for unknown sid: " << sid + << std::endl; + break; + } + auto publication = it->second; + ev.track = publication ? publication->track() : nullptr; + } + delegate_snapshot->onLocalTrackSubscribed(*this, ev); break; } case proto::RoomEvent::kTrackPublished: { - auto ev = fromProto(re.track_published()); + TrackPublishedEvent ev; { std::lock_guard guard(lock_); const auto &tp = re.track_published(); @@ -273,13 +338,14 @@ void Room::OnEvent(const FfiEvent &event) { auto rpublication = std::make_shared(owned_publication); // Store it on the participant, keyed by SID - rparticipant->mutable_track_publications().emplace( + rparticipant->mutableTrackPublications().emplace( rpublication->sid(), std::move(rpublication)); - + ev.participant = rparticipant; + ev.publication = rpublication; } else { // Optional: log if we get a track for an unknown participant std::cerr << "track_published for unknown participant: " << identity - << "\n"; + << std::endl; // Don't emit the break; } @@ -288,7 +354,31 @@ void Room::OnEvent(const FfiEvent &event) { break; } case proto::RoomEvent::kTrackUnpublished: { - auto ev = fromProto(re.track_unpublished()); + TrackUnpublishedEvent ev; + { + std::lock_guard guard(lock_); + const auto &tu = re.track_unpublished(); + const std::string &identity = tu.participant_identity(); + const std::string &pub_sid = tu.publication_sid(); + auto pit = remote_participants_.find(identity); + if (pit == remote_participants_.end()) { + std::cerr << "track_unpublished for unknown participant: " << identity + << std::endl; + break; + } + RemoteParticipant *rparticipant = pit->second.get(); + auto &pubs = rparticipant->mutableTrackPublications(); + auto it = pubs.find(pub_sid); + if (it == pubs.end()) { + std::cerr << "track_unpublished for unknown publication sid " + << pub_sid << " (participant " << identity << ")\n"; + break; + } + ev.participant = rparticipant; + ev.publication = it->second; + pubs.erase(it); + } + delegate_snapshot->onTrackUnpublished(*this, ev); break; } @@ -311,7 +401,7 @@ void Room::OnEvent(const FfiEvent &event) { } rparticipant = pit->second.get(); // Find existing publication by track SID (from track_published) - auto &pubs = rparticipant->mutable_track_publications(); + auto &pubs = rparticipant->mutableTrackPublications(); auto pubIt = pubs.find(track_info.sid()); if (pubIt == pubs.end()) { std::cerr << "track_subscribed for unknown publication sid " @@ -331,13 +421,9 @@ void Room::OnEvent(const FfiEvent &event) { << track_info.kind() << "\n"; break; } - std::cout << "before setTrack " << std::endl; - // Attach to publication, mark subscribed rpublication->setTrack(remote_track); - std::cout << "setTrack " << std::endl; rpublication->setSubscribed(true); - std::cout << "setSubscribed " << std::endl; } // Emit remote track_subscribed-style callback @@ -345,111 +431,478 @@ void Room::OnEvent(const FfiEvent &event) { ev.track = remote_track; ev.publication = rpublication; ev.participant = rparticipant; - std::cout << "onTrackSubscribed " << std::endl; delegate_snapshot->onTrackSubscribed(*this, ev); - std::cout << "after onTrackSubscribed " << std::endl; break; } case proto::RoomEvent::kTrackUnsubscribed: { - auto ev = fromProto(re.track_unsubscribed()); + TrackUnsubscribedEvent ev; + { + std::lock_guard guard(lock_); + const auto &tu = re.track_unsubscribed(); + const std::string &identity = tu.participant_identity(); + const std::string &track_sid = tu.track_sid(); + auto pit = remote_participants_.find(identity); + if (pit == remote_participants_.end()) { + std::cerr << "track_unsubscribed for unknown participant: " + << identity << "\n"; + break; + } + RemoteParticipant *rparticipant = pit->second.get(); + auto &pubs = rparticipant->mutableTrackPublications(); + auto pubIt = pubs.find(track_sid); + if (pubIt == pubs.end()) { + std::cerr << "track_unsubscribed for unknown publication sid " + << track_sid << " (participant " << identity << ")\n"; + break; + } + auto publication = pubIt->second; + auto track = publication->track(); + publication->setTrack(nullptr); + publication->setSubscribed(false); + ev.participant = rparticipant; + ev.publication = publication; + ev.track = track; + } + delegate_snapshot->onTrackUnsubscribed(*this, ev); break; } case proto::RoomEvent::kTrackSubscriptionFailed: { - auto ev = fromProto(re.track_subscription_failed()); + TrackSubscriptionFailedEvent ev; + { + std::lock_guard guard(lock_); + const auto &tsf = re.track_subscription_failed(); + const std::string &identity = tsf.participant_identity(); + auto pit = remote_participants_.find(identity); + if (pit == remote_participants_.end()) { + std::cerr << "track_subscription_failed for unknown participant: " + << identity << "\n"; + break; + } + ev.participant = pit->second.get(); + ev.track_sid = tsf.track_sid(); + ev.error = tsf.error(); + } delegate_snapshot->onTrackSubscriptionFailed(*this, ev); break; } case proto::RoomEvent::kTrackMuted: { - auto ev = fromProto(re.track_muted()); - delegate_snapshot->onTrackMuted(*this, ev); + TrackMutedEvent ev; + bool success = false; + { + std::lock_guard guard(lock_); + const auto &tm = re.track_muted(); + const std::string &identity = tm.participant_identity(); + const std::string &sid = tm.track_sid(); + Participant *participant = nullptr; + if (local_participant_ && local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto pit = remote_participants_.find(identity); + if (pit != remote_participants_.end()) { + participant = pit->second.get(); + } + } + if (!participant) { + std::cerr << "track_muted for unknown participant: " << identity + << "\n"; + break; + } + auto pub = participant->findTrackPublication(sid); + if (!pub) { + std::cerr << "track_muted for unknown track sid: " << sid + << std::endl; + } else { + pub->setMuted(true); + if (auto t = pub->track()) { + t->setMuted(true); + } + ev.participant = participant; + ev.publication = pub; + success = true; + } + } + if (success) { + delegate_snapshot->onTrackMuted(*this, ev); + } break; } case proto::RoomEvent::kTrackUnmuted: { - auto ev = fromProto(re.track_unmuted()); - delegate_snapshot->onTrackUnmuted(*this, ev); + TrackUnmutedEvent ev; + bool success = false; + { + std::lock_guard guard(lock_); + const auto &tu = re.track_unmuted(); + const std::string &identity = tu.participant_identity(); + const std::string &sid = tu.track_sid(); + Participant *participant = nullptr; + if (local_participant_ && local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto pit = remote_participants_.find(identity); + if (pit != remote_participants_.end()) { + participant = pit->second.get(); + } + } + if (!participant) { + std::cerr << "track_unmuted for unknown participant: " << identity + << "\n"; + break; + } + + auto pub = participant->findTrackPublication(sid); + if (!pub) { + std::cerr << "track_muted for unknown track sid: " << sid + << std::endl; + } else { + pub->setMuted(false); + if (auto t = pub->track()) { + t->setMuted(false); + } + ev.participant = participant; + ev.publication = pub; + success = true; + } + + ev.participant = participant; + ev.publication = pub; + } + + if (success) { + delegate_snapshot->onTrackUnmuted(*this, ev); + } break; } case proto::RoomEvent::kActiveSpeakersChanged: { - auto ev = fromProto(re.active_speakers_changed()); + ActiveSpeakersChangedEvent ev; + { + std::lock_guard guard(lock_); + const auto &asc = re.active_speakers_changed(); + for (const auto &identity : asc.participant_identities()) { + Participant *participant = nullptr; + if (local_participant_ && + local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto pit = remote_participants_.find(identity); + if (pit != remote_participants_.end()) { + participant = pit->second.get(); + } + } + if (participant) { + ev.speakers.push_back(participant); + } + } + } delegate_snapshot->onActiveSpeakersChanged(*this, ev); break; } case proto::RoomEvent::kRoomMetadataChanged: { - auto ev = fromProto(re.room_metadata_changed()); + RoomMetadataChangedEvent ev; + { + std::lock_guard guard(lock_); + const auto old_metadata = room_info_.metadata; + room_info_.metadata = re.room_metadata_changed().metadata(); + ev.old_metadata = old_metadata; + ev.new_metadata = room_info_.metadata; + } delegate_snapshot->onRoomMetadataChanged(*this, ev); break; } case proto::RoomEvent::kRoomSidChanged: { - auto ev = fromProto(re.room_sid_changed()); + RoomSidChangedEvent ev; + { + std::lock_guard guard(lock_); + room_info_.sid = re.room_sid_changed().sid(); + ev.sid = room_info_.sid.value_or(std::string{}); + } delegate_snapshot->onRoomSidChanged(*this, ev); break; } case proto::RoomEvent::kParticipantMetadataChanged: { - auto ev = fromProto(re.participant_metadata_changed()); + ParticipantMetadataChangedEvent ev; + { + std::lock_guard guard(lock_); + const auto &pm = re.participant_metadata_changed(); + const std::string &identity = pm.participant_identity(); + Participant *participant = nullptr; + if (local_participant_ && local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + participant = it->second.get(); + } + } + if (!participant) { + std::cerr << "participant_metadata_changed for unknown participant: " + << identity << "\n"; + break; + } + std::string old_metadata = participant->metadata(); + participant->set_metadata(pm.metadata()); + ev.participant = participant; + ev.old_metadata = old_metadata; + ev.new_metadata = participant->metadata(); + } + delegate_snapshot->onParticipantMetadataChanged(*this, ev); break; } case proto::RoomEvent::kParticipantNameChanged: { - auto ev = fromProto(re.participant_name_changed()); + ParticipantNameChangedEvent ev; + { + std::lock_guard guard(lock_); + const auto &pn = re.participant_name_changed(); + const std::string &identity = pn.participant_identity(); + Participant *participant = nullptr; + if (local_participant_ && local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + participant = it->second.get(); + } + } + if (!participant) { + std::cerr << "participant_name_changed for unknown participant: " + << identity << "\n"; + break; + } + std::string old_name = participant->name(); + participant->set_name(pn.name()); + ev.participant = participant; + ev.old_name = old_name; + ev.new_name = participant->name(); + } delegate_snapshot->onParticipantNameChanged(*this, ev); break; } case proto::RoomEvent::kParticipantAttributesChanged: { - auto ev = fromProto(re.participant_attributes_changed()); + ParticipantAttributesChangedEvent ev; + { + std::lock_guard guard(lock_); + const auto &pa = re.participant_attributes_changed(); + const std::string &identity = pa.participant_identity(); + Participant *participant = nullptr; + if (local_participant_ && local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + participant = it->second.get(); + } + } + if (!participant) { + std::cerr + << "participant_attributes_changed for unknown participant: " + << identity << "\n"; + break; + } + // Build full attributes map + std::unordered_map attrs; + for (const auto &entry : pa.attributes()) { + attrs.emplace(entry.key(), entry.value()); + } + participant->set_attributes(attrs); + + // Build changed_attributes map + for (const auto &entry : pa.changed_attributes()) { + ev.changed_attributes.emplace_back(entry.key(), entry.value()); + } + ev.participant = participant; + } delegate_snapshot->onParticipantAttributesChanged(*this, ev); break; } case proto::RoomEvent::kParticipantEncryptionStatusChanged: { - auto ev = fromProto(re.participant_encryption_status_changed()); + ParticipantEncryptionStatusChangedEvent ev; + { + std::lock_guard guard(lock_); + const auto &pe = re.participant_encryption_status_changed(); + const std::string &identity = pe.participant_identity(); + Participant *participant = nullptr; + if (local_participant_ && local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + participant = it->second.get(); + } + } + if (!participant) { + std::cerr << "participant_encryption_status_changed for unknown " + "participant: " + << identity << "\n"; + break; + } + ev.participant = participant; + ev.is_encrypted = pe.is_encrypted(); + } + delegate_snapshot->onParticipantEncryptionStatusChanged(*this, ev); break; } case proto::RoomEvent::kConnectionQualityChanged: { - auto ev = fromProto(re.connection_quality_changed()); + ConnectionQualityChangedEvent ev; + { + std::lock_guard guard(lock_); + const auto &cq = re.connection_quality_changed(); + const std::string &identity = cq.participant_identity(); + Participant *participant = nullptr; + if (local_participant_ && local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + participant = it->second.get(); + } + } + if (!participant) { + std::cerr << "connection_quality_changed for unknown participant: " + << identity << "\n"; + break; + } + ev.participant = participant; + ev.quality = static_cast(cq.quality()); + } + delegate_snapshot->onConnectionQualityChanged(*this, ev); break; } + + // ------------------------------------------------------------------------ + // Transcription + // ------------------------------------------------------------------------ + + case proto::RoomEvent::kTranscriptionReceived: { + TranscriptionReceivedEvent ev; + { + std::lock_guard guard(lock_); + const auto &tr = re.transcription_received(); + for (const auto &s : tr.segments()) { + TranscriptionSegment seg; + seg.id = s.id(); + seg.text = s.text(); + seg.final = s.final(); + seg.start_time = s.start_time(); + seg.end_time = s.end_time(); + seg.language = s.language(); + ev.segments.push_back(std::move(seg)); + } + + Participant *participant = nullptr; + if (!tr.participant_identity().empty()) { + const std::string &identity = tr.participant_identity(); + if (local_participant_ && + local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + participant = it->second.get(); + } + } + } + ev.participant = participant; + ev.publication = participant->findTrackPublication(tr.track_sid()); + } + + delegate_snapshot->onTranscriptionReceived(*this, ev); + break; + } + + // ------------------------------------------------------------------------ + // Data packets: user vs SIP DTMF + // ------------------------------------------------------------------------ + case proto::RoomEvent::kDataPacketReceived: { + const auto &dp = re.data_packet_received(); + RemoteParticipant *rp = nullptr; + { + std::lock_guard guard(lock_); + auto it = remote_participants_.find(dp.participant_identity()); + if (it != remote_participants_.end()) { + rp = it->second.get(); + } + } + const auto which_val = dp.value_case(); + if (which_val == proto::DataPacketReceived::kUser) { + UserDataPacketEvent ev = userDataPacketFromProto(dp, rp); + delegate_snapshot->onUserPacketReceived(*this, ev); + } else if (which_val == proto::DataPacketReceived::kSipDtmf) { + SipDtmfReceivedEvent ev = sipDtmfFromProto(dp, rp); + delegate_snapshot->onSipDtmfReceived(*this, ev); + } + break; + } + + // ------------------------------------------------------------------------ + // E2EE state + // ------------------------------------------------------------------------ + case proto::RoomEvent::kE2EeStateChanged: { + E2eeStateChangedEvent ev; + { + std::lock_guard guard(lock_); + const auto &es = re.e2ee_state_changed(); + const std::string &identity = es.participant_identity(); + Participant *participant = nullptr; + if (local_participant_ && local_participant_->identity() == identity) { + participant = local_participant_.get(); + } else { + auto it = remote_participants_.find(identity); + if (it != remote_participants_.end()) { + participant = it->second.get(); + } + } + if (!participant) { + std::cerr << "e2ee_state_changed for unknown participant: " + << identity << std::endl; + break; + } + + ev.participant = participant; + ev.state = static_cast(es.state()); + } + delegate_snapshot->onE2eeStateChanged(*this, ev); + break; + } + + // ------------------------------------------------------------------------ + // Connection state / lifecycle + // ------------------------------------------------------------------------ + case proto::RoomEvent::kConnectionStateChanged: { - auto ev = fromProto(re.connection_state_changed()); + ConnectionStateChangedEvent ev; + { + std::lock_guard guard(lock_); + const auto &cs = re.connection_state_changed(); + connection_state_ = static_cast(cs.state()); + ev.state = connection_state_; + } delegate_snapshot->onConnectionStateChanged(*this, ev); break; } case proto::RoomEvent::kDisconnected: { - auto ev = fromProto(re.disconnected()); + DisconnectedEvent ev; + ev.reason = toDisconnectReason(re.disconnected().reason()); delegate_snapshot->onDisconnected(*this, ev); break; } case proto::RoomEvent::kReconnecting: { - auto ev = fromProto(re.reconnecting()); + ReconnectingEvent ev; delegate_snapshot->onReconnecting(*this, ev); break; } case proto::RoomEvent::kReconnected: { - auto ev = fromProto(re.reconnected()); + ReconnectedEvent ev; delegate_snapshot->onReconnected(*this, ev); break; } - case proto::RoomEvent::kE2EeStateChanged: { - auto ev = fromProto(re.e2ee_state_changed()); - delegate_snapshot->onE2eeStateChanged(*this, ev); - break; - } case proto::RoomEvent::kEos: { - auto ev = fromProto(re.eos()); + RoomEosEvent ev; delegate_snapshot->onRoomEos(*this, ev); break; } - case proto::RoomEvent::kDataPacketReceived: { - auto ev = fromProto(re.data_packet_received()); - delegate_snapshot->onDataPacketReceived(*this, ev); - break; - } - case proto::RoomEvent::kTranscriptionReceived: { - auto ev = fromProto(re.transcription_received()); - delegate_snapshot->onTranscriptionReceived(*this, ev); - break; - } case proto::RoomEvent::kChatMessage: { auto ev = fromProto(re.chat_message()); delegate_snapshot->onChatMessageReceived(*this, ev); @@ -497,36 +950,33 @@ void Room::OnEvent(const FfiEvent &event) { break; } case proto::RoomEvent::kParticipantsUpdated: { - auto ev = fromProto(re.participants_updated()); + ParticipantsUpdatedEvent ev; { std::lock_guard guard(lock_); const auto &pu = re.participants_updated(); for (const auto &info : pu.participants()) { const std::string &identity = info.identity(); Participant *participant = nullptr; - // First, check local participant. + if (local_participant_ && identity == local_participant_->identity()) { participant = local_participant_.get(); } else { - // Otherwise, look for a remote participant. auto it = remote_participants_.find(identity); if (it != remote_participants_.end()) { participant = it->second.get(); } } - if (!participant) { - // Participant might not exist yet; ignore for now. std::cerr << "Room::RoomEvent::kParticipantsUpdated participant " "does not exist: " << identity << std::endl; continue; } - // Update basic fields participant->set_name(info.name()); participant->set_metadata(info.metadata()); + std::unordered_map attrs; attrs.reserve(info.attributes_size()); for (const auto &kv : info.attributes()) { @@ -536,6 +986,8 @@ void Room::OnEvent(const FfiEvent &event) { participant->set_kind(fromProto(info.kind())); participant->set_disconnect_reason( toDisconnectReason(info.disconnect_reason())); + + ev.participants.push_back(participant); } } delegate_snapshot->onParticipantsUpdated(*this, ev); diff --git a/src/room_event_converter.h b/src/room_event_converter.h deleted file mode 100644 index f8dde1f..0000000 --- a/src/room_event_converter.h +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2023 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an β€œAS IS” BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "livekit/room_delegate.h" -#include "room.pb.h" - -namespace livekit { - -// --------- basic helper conversions --------- - -ConnectionQuality toConnectionQuality(proto::ConnectionQuality src); -ConnectionState toConnectionState(proto::ConnectionState src); -DataPacketKind toDataPacketKind(proto::DataPacketKind src); -EncryptionState toEncryptionState(proto::EncryptionState src); -DisconnectReason toDisconnectReason(proto::DisconnectReason src); - -TranscriptionSegmentData fromProto(const proto::TranscriptionSegment &src); -ChatMessageData fromProto(const proto::ChatMessage &src); -UserPacketData fromProto(const proto::UserPacket &src); -SipDtmfData fromProto(const proto::SipDTMF &src); -RoomInfoData fromProto(const proto::RoomInfo &src); -AttributeEntry fromProto(const proto::AttributesEntry &src); - -DataStreamHeaderData fromProto(const proto::DataStream_Header &src); -DataStreamChunkData fromProto(const proto::DataStream_Chunk &src); -DataStreamTrailerData fromProto(const proto::DataStream_Trailer &src); - -// --------- event conversions (RoomEvent.oneof message) --------- - -ParticipantConnectedEvent fromProto(const proto::ParticipantConnected &src); -ParticipantDisconnectedEvent -fromProto(const proto::ParticipantDisconnected &src); - -LocalTrackPublishedEvent fromProto(const proto::LocalTrackPublished &src); -LocalTrackUnpublishedEvent fromProto(const proto::LocalTrackUnpublished &src); -LocalTrackSubscribedEvent fromProto(const proto::LocalTrackSubscribed &src); - -TrackPublishedEvent fromProto(const proto::TrackPublished &src); -TrackUnpublishedEvent fromProto(const proto::TrackUnpublished &src); -TrackUnsubscribedEvent fromProto(const proto::TrackUnsubscribed &src); -TrackSubscriptionFailedEvent -fromProto(const proto::TrackSubscriptionFailed &src); -TrackMutedEvent fromProto(const proto::TrackMuted &src); -TrackUnmutedEvent fromProto(const proto::TrackUnmuted &src); - -ActiveSpeakersChangedEvent fromProto(const proto::ActiveSpeakersChanged &src); - -RoomMetadataChangedEvent fromProto(const proto::RoomMetadataChanged &src); -RoomSidChangedEvent fromProto(const proto::RoomSidChanged &src); - -ParticipantMetadataChangedEvent -fromProto(const proto::ParticipantMetadataChanged &src); -ParticipantNameChangedEvent fromProto(const proto::ParticipantNameChanged &src); -ParticipantAttributesChangedEvent -fromProto(const proto::ParticipantAttributesChanged &src); -ParticipantEncryptionStatusChangedEvent -fromProto(const proto::ParticipantEncryptionStatusChanged &src); - -ConnectionQualityChangedEvent -fromProto(const proto::ConnectionQualityChanged &src); - -DataPacketReceivedEvent fromProto(const proto::DataPacketReceived &src); -TranscriptionReceivedEvent fromProto(const proto::TranscriptionReceived &src); - -ConnectionStateChangedEvent fromProto(const proto::ConnectionStateChanged &src); -DisconnectedEvent fromProto(const proto::Disconnected &src); -ReconnectingEvent fromProto(const proto::Reconnecting &src); -ReconnectedEvent fromProto(const proto::Reconnected &src); -RoomEosEvent fromProto(const proto::RoomEOS &src); - -DataStreamHeaderReceivedEvent -fromProto(const proto::DataStreamHeaderReceived &src); -DataStreamChunkReceivedEvent -fromProto(const proto::DataStreamChunkReceived &src); -DataStreamTrailerReceivedEvent -fromProto(const proto::DataStreamTrailerReceived &src); - -DataChannelBufferedAmountLowThresholdChangedEvent -fromProto(const proto::DataChannelBufferedAmountLowThresholdChanged &src); - -ByteStreamOpenedEvent fromProto(const proto::ByteStreamOpened &src); -TextStreamOpenedEvent fromProto(const proto::TextStreamOpened &src); - -RoomUpdatedEvent -roomUpdatedFromProto(const proto::RoomInfo &src); // room_updated -RoomMovedEvent roomMovedFromProto(const proto::RoomInfo &src); // moved - -ParticipantsUpdatedEvent fromProto(const proto::ParticipantsUpdated &src); -E2eeStateChangedEvent fromProto(const proto::E2eeStateChanged &src); -ChatMessageReceivedEvent fromProto(const proto::ChatMessageReceived &src); - -} // namespace livekit From b584aa1849ea49916fe14fdd80706928b59e0222 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 8 Dec 2025 20:50:01 -0800 Subject: [PATCH 6/8] fix the build From 24a207245156153b57048ccd4a84ab0299eecc08 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 8 Dec 2025 20:54:49 -0800 Subject: [PATCH 7/8] recover the room.cpp From 2b21e655c8b2a53f1e12c0eb2f1143c3467acba3 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 8 Dec 2025 20:56:16 -0800 Subject: [PATCH 8/8] recover room.cpp from the uncheckin changes --- src/room.cpp | 590 ++++++--------------------------------------------- 1 file changed, 69 insertions(+), 521 deletions(-) diff --git a/src/room.cpp b/src/room.cpp index 4f29583..c20aac1 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -45,7 +45,7 @@ using proto::FfiResponse; namespace { -std::shared_ptr +std::unique_ptr createRemoteParticipant(const proto::OwnedParticipant &owned) { const auto &pinfo = owned.info(); std::unordered_map attrs; @@ -56,7 +56,7 @@ createRemoteParticipant(const proto::OwnedParticipant &owned) { auto kind = livekit::fromProto(pinfo.kind()); auto reason = livekit::toDisconnectReason(pinfo.disconnect_reason()); livekit::FfiHandle handle(static_cast(owned.handle().id())); - return std::make_shared( + return std::make_unique( std::move(handle), pinfo.sid(), pinfo.name(), pinfo.identity(), pinfo.metadata(), std::move(attrs), kind, reason); } @@ -125,8 +125,8 @@ bool Room::Connect(const std::string &url, const std::string &token, for (const auto &owned_publication_info : pt.publications()) { auto publication = std::make_shared(owned_publication_info); - rp->mutableTrackPublications().emplace(publication->sid(), - std::move(publication)); + rp->mutable_track_publications().emplace(publication->sid(), + std::move(publication)); } remote_participants_.emplace(rp->identity(), std::move(rp)); @@ -210,34 +210,27 @@ void Room::OnEvent(const FfiEvent &event) { switch (re.message_case()) { case proto::RoomEvent::kParticipantConnected: { - std::shared_ptr new_participant; + auto ev = fromProto(re.participant_connected()); + std::cout << "kParticipantConnected " << std::endl; + // Create and register RemoteParticipant { std::lock_guard guard(lock_); - const auto &owned = re.participant_connected().info(); - // createRemoteParticipant takes proto::OwnedParticipant - new_participant = createRemoteParticipant(owned); - remote_participants_.emplace(new_participant->identity(), - new_participant); + auto rp = createRemoteParticipant(re.participant_connected().info()); + remote_participants_.emplace(rp->identity(), std::move(rp)); } - ParticipantConnectedEvent ev; - ev.participant = new_participant.get(); + // TODO, use better public callback events delegate_snapshot->onParticipantConnected(*this, ev); break; } case proto::RoomEvent::kParticipantDisconnected: { - std::shared_ptr removed; - DisconnectReason reason = DisconnectReason::Unknown; - + auto ev = fromProto(re.participant_disconnected()); { std::lock_guard guard(lock_); const auto &pd = re.participant_disconnected(); const std::string &identity = pd.participant_identity(); - reason = toDisconnectReason(pd.disconnect_reason()); - auto it = remote_participants_.find(identity); if (it != remote_participants_.end()) { - removed = it->second; remote_participants_.erase(it); } else { // We saw a disconnect event for a participant we don't track @@ -247,86 +240,28 @@ void Room::OnEvent(const FfiEvent &event) { << identity << std::endl; } } - if (removed) { - ParticipantDisconnectedEvent ev; - ev.participant = removed.get(); - ev.reason = reason; - delegate_snapshot->onParticipantDisconnected(*this, ev); - } + // TODO, should we trigger onParticipantDisconnected if remote + // participants can't be found ? + delegate_snapshot->onParticipantDisconnected(*this, ev); break; } case proto::RoomEvent::kLocalTrackPublished: { - LocalTrackPublishedEvent ev; - { - std::lock_guard guard(lock_); - if (!local_participant_) { - std::cerr << "kLocalTrackPublished: local_participant_ is nullptr" - << std::endl; - break; - } - const auto <p = re.local_track_published(); - const std::string &sid = ltp.track_sid(); - auto &pubs = local_participant_->trackPublications(); - auto it = pubs.find(sid); - if (it == pubs.end()) { - std::cerr << "local_track_published for unknown sid: " << sid - << std::endl; - break; - } - ev.publication = it->second; - ev.track = ev.publication ? ev.publication->track() : nullptr; - } + auto ev = fromProto(re.local_track_published()); delegate_snapshot->onLocalTrackPublished(*this, ev); break; } case proto::RoomEvent::kLocalTrackUnpublished: { - LocalTrackUnpublishedEvent ev; - { - std::lock_guard guard(lock_); - if (!local_participant_) { - std::cerr << "kLocalTrackPublished: local_participant_ is nullptr" - << std::endl; - break; - } - const auto <u = re.local_track_unpublished(); - const std::string &pub_sid = ltu.publication_sid(); - auto &pubs = local_participant_->trackPublications(); - auto it = pubs.find(pub_sid); - if (it == pubs.end()) { - std::cerr << "local_track_unpublished for unknown publication sid: " - << pub_sid << std::endl; - break; - } - ev.publication = it->second; - } + auto ev = fromProto(re.local_track_unpublished()); delegate_snapshot->onLocalTrackUnpublished(*this, ev); break; } case proto::RoomEvent::kLocalTrackSubscribed: { - LocalTrackSubscribedEvent ev; - { - std::lock_guard guard(lock_); - if (!local_participant_) { - break; - } - const auto <s = re.local_track_subscribed(); - const std::string &sid = lts.track_sid(); - auto &pubs = local_participant_->trackPublications(); - auto it = pubs.find(sid); - if (it == pubs.end()) { - std::cerr << "local_track_subscribed for unknown sid: " << sid - << std::endl; - break; - } - auto publication = it->second; - ev.track = publication ? publication->track() : nullptr; - } - + auto ev = fromProto(re.local_track_subscribed()); delegate_snapshot->onLocalTrackSubscribed(*this, ev); break; } case proto::RoomEvent::kTrackPublished: { - TrackPublishedEvent ev; + auto ev = fromProto(re.track_published()); { std::lock_guard guard(lock_); const auto &tp = re.track_published(); @@ -338,14 +273,13 @@ void Room::OnEvent(const FfiEvent &event) { auto rpublication = std::make_shared(owned_publication); // Store it on the participant, keyed by SID - rparticipant->mutableTrackPublications().emplace( + rparticipant->mutable_track_publications().emplace( rpublication->sid(), std::move(rpublication)); - ev.participant = rparticipant; - ev.publication = rpublication; + } else { // Optional: log if we get a track for an unknown participant std::cerr << "track_published for unknown participant: " << identity - << std::endl; + << "\n"; // Don't emit the break; } @@ -354,31 +288,7 @@ void Room::OnEvent(const FfiEvent &event) { break; } case proto::RoomEvent::kTrackUnpublished: { - TrackUnpublishedEvent ev; - { - std::lock_guard guard(lock_); - const auto &tu = re.track_unpublished(); - const std::string &identity = tu.participant_identity(); - const std::string &pub_sid = tu.publication_sid(); - auto pit = remote_participants_.find(identity); - if (pit == remote_participants_.end()) { - std::cerr << "track_unpublished for unknown participant: " << identity - << std::endl; - break; - } - RemoteParticipant *rparticipant = pit->second.get(); - auto &pubs = rparticipant->mutableTrackPublications(); - auto it = pubs.find(pub_sid); - if (it == pubs.end()) { - std::cerr << "track_unpublished for unknown publication sid " - << pub_sid << " (participant " << identity << ")\n"; - break; - } - ev.participant = rparticipant; - ev.publication = it->second; - pubs.erase(it); - } - + auto ev = fromProto(re.track_unpublished()); delegate_snapshot->onTrackUnpublished(*this, ev); break; } @@ -401,7 +311,7 @@ void Room::OnEvent(const FfiEvent &event) { } rparticipant = pit->second.get(); // Find existing publication by track SID (from track_published) - auto &pubs = rparticipant->mutableTrackPublications(); + auto &pubs = rparticipant->mutable_track_publications(); auto pubIt = pubs.find(track_info.sid()); if (pubIt == pubs.end()) { std::cerr << "track_subscribed for unknown publication sid " @@ -421,9 +331,13 @@ void Room::OnEvent(const FfiEvent &event) { << track_info.kind() << "\n"; break; } + std::cout << "before setTrack " << std::endl; + // Attach to publication, mark subscribed rpublication->setTrack(remote_track); + std::cout << "setTrack " << std::endl; rpublication->setSubscribed(true); + std::cout << "setSubscribed " << std::endl; } // Emit remote track_subscribed-style callback @@ -431,478 +345,111 @@ void Room::OnEvent(const FfiEvent &event) { ev.track = remote_track; ev.publication = rpublication; ev.participant = rparticipant; + std::cout << "onTrackSubscribed " << std::endl; delegate_snapshot->onTrackSubscribed(*this, ev); + std::cout << "after onTrackSubscribed " << std::endl; break; } case proto::RoomEvent::kTrackUnsubscribed: { - TrackUnsubscribedEvent ev; - { - std::lock_guard guard(lock_); - const auto &tu = re.track_unsubscribed(); - const std::string &identity = tu.participant_identity(); - const std::string &track_sid = tu.track_sid(); - auto pit = remote_participants_.find(identity); - if (pit == remote_participants_.end()) { - std::cerr << "track_unsubscribed for unknown participant: " - << identity << "\n"; - break; - } - RemoteParticipant *rparticipant = pit->second.get(); - auto &pubs = rparticipant->mutableTrackPublications(); - auto pubIt = pubs.find(track_sid); - if (pubIt == pubs.end()) { - std::cerr << "track_unsubscribed for unknown publication sid " - << track_sid << " (participant " << identity << ")\n"; - break; - } - auto publication = pubIt->second; - auto track = publication->track(); - publication->setTrack(nullptr); - publication->setSubscribed(false); - ev.participant = rparticipant; - ev.publication = publication; - ev.track = track; - } - + auto ev = fromProto(re.track_unsubscribed()); delegate_snapshot->onTrackUnsubscribed(*this, ev); break; } case proto::RoomEvent::kTrackSubscriptionFailed: { - TrackSubscriptionFailedEvent ev; - { - std::lock_guard guard(lock_); - const auto &tsf = re.track_subscription_failed(); - const std::string &identity = tsf.participant_identity(); - auto pit = remote_participants_.find(identity); - if (pit == remote_participants_.end()) { - std::cerr << "track_subscription_failed for unknown participant: " - << identity << "\n"; - break; - } - ev.participant = pit->second.get(); - ev.track_sid = tsf.track_sid(); - ev.error = tsf.error(); - } + auto ev = fromProto(re.track_subscription_failed()); delegate_snapshot->onTrackSubscriptionFailed(*this, ev); break; } case proto::RoomEvent::kTrackMuted: { - TrackMutedEvent ev; - bool success = false; - { - std::lock_guard guard(lock_); - const auto &tm = re.track_muted(); - const std::string &identity = tm.participant_identity(); - const std::string &sid = tm.track_sid(); - Participant *participant = nullptr; - if (local_participant_ && local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto pit = remote_participants_.find(identity); - if (pit != remote_participants_.end()) { - participant = pit->second.get(); - } - } - if (!participant) { - std::cerr << "track_muted for unknown participant: " << identity - << "\n"; - break; - } - auto pub = participant->findTrackPublication(sid); - if (!pub) { - std::cerr << "track_muted for unknown track sid: " << sid - << std::endl; - } else { - pub->setMuted(true); - if (auto t = pub->track()) { - t->setMuted(true); - } - ev.participant = participant; - ev.publication = pub; - success = true; - } - } - if (success) { - delegate_snapshot->onTrackMuted(*this, ev); - } + auto ev = fromProto(re.track_muted()); + delegate_snapshot->onTrackMuted(*this, ev); break; } case proto::RoomEvent::kTrackUnmuted: { - TrackUnmutedEvent ev; - bool success = false; - { - std::lock_guard guard(lock_); - const auto &tu = re.track_unmuted(); - const std::string &identity = tu.participant_identity(); - const std::string &sid = tu.track_sid(); - Participant *participant = nullptr; - if (local_participant_ && local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto pit = remote_participants_.find(identity); - if (pit != remote_participants_.end()) { - participant = pit->second.get(); - } - } - if (!participant) { - std::cerr << "track_unmuted for unknown participant: " << identity - << "\n"; - break; - } - - auto pub = participant->findTrackPublication(sid); - if (!pub) { - std::cerr << "track_muted for unknown track sid: " << sid - << std::endl; - } else { - pub->setMuted(false); - if (auto t = pub->track()) { - t->setMuted(false); - } - ev.participant = participant; - ev.publication = pub; - success = true; - } - - ev.participant = participant; - ev.publication = pub; - } - - if (success) { - delegate_snapshot->onTrackUnmuted(*this, ev); - } + auto ev = fromProto(re.track_unmuted()); + delegate_snapshot->onTrackUnmuted(*this, ev); break; } case proto::RoomEvent::kActiveSpeakersChanged: { - ActiveSpeakersChangedEvent ev; - { - std::lock_guard guard(lock_); - const auto &asc = re.active_speakers_changed(); - for (const auto &identity : asc.participant_identities()) { - Participant *participant = nullptr; - if (local_participant_ && - local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto pit = remote_participants_.find(identity); - if (pit != remote_participants_.end()) { - participant = pit->second.get(); - } - } - if (participant) { - ev.speakers.push_back(participant); - } - } - } + auto ev = fromProto(re.active_speakers_changed()); delegate_snapshot->onActiveSpeakersChanged(*this, ev); break; } case proto::RoomEvent::kRoomMetadataChanged: { - RoomMetadataChangedEvent ev; - { - std::lock_guard guard(lock_); - const auto old_metadata = room_info_.metadata; - room_info_.metadata = re.room_metadata_changed().metadata(); - ev.old_metadata = old_metadata; - ev.new_metadata = room_info_.metadata; - } + auto ev = fromProto(re.room_metadata_changed()); delegate_snapshot->onRoomMetadataChanged(*this, ev); break; } case proto::RoomEvent::kRoomSidChanged: { - RoomSidChangedEvent ev; - { - std::lock_guard guard(lock_); - room_info_.sid = re.room_sid_changed().sid(); - ev.sid = room_info_.sid.value_or(std::string{}); - } + auto ev = fromProto(re.room_sid_changed()); delegate_snapshot->onRoomSidChanged(*this, ev); break; } case proto::RoomEvent::kParticipantMetadataChanged: { - ParticipantMetadataChangedEvent ev; - { - std::lock_guard guard(lock_); - const auto &pm = re.participant_metadata_changed(); - const std::string &identity = pm.participant_identity(); - Participant *participant = nullptr; - if (local_participant_ && local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto it = remote_participants_.find(identity); - if (it != remote_participants_.end()) { - participant = it->second.get(); - } - } - if (!participant) { - std::cerr << "participant_metadata_changed for unknown participant: " - << identity << "\n"; - break; - } - std::string old_metadata = participant->metadata(); - participant->set_metadata(pm.metadata()); - ev.participant = participant; - ev.old_metadata = old_metadata; - ev.new_metadata = participant->metadata(); - } - + auto ev = fromProto(re.participant_metadata_changed()); delegate_snapshot->onParticipantMetadataChanged(*this, ev); break; } case proto::RoomEvent::kParticipantNameChanged: { - ParticipantNameChangedEvent ev; - { - std::lock_guard guard(lock_); - const auto &pn = re.participant_name_changed(); - const std::string &identity = pn.participant_identity(); - Participant *participant = nullptr; - if (local_participant_ && local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto it = remote_participants_.find(identity); - if (it != remote_participants_.end()) { - participant = it->second.get(); - } - } - if (!participant) { - std::cerr << "participant_name_changed for unknown participant: " - << identity << "\n"; - break; - } - std::string old_name = participant->name(); - participant->set_name(pn.name()); - ev.participant = participant; - ev.old_name = old_name; - ev.new_name = participant->name(); - } + auto ev = fromProto(re.participant_name_changed()); delegate_snapshot->onParticipantNameChanged(*this, ev); break; } case proto::RoomEvent::kParticipantAttributesChanged: { - ParticipantAttributesChangedEvent ev; - { - std::lock_guard guard(lock_); - const auto &pa = re.participant_attributes_changed(); - const std::string &identity = pa.participant_identity(); - Participant *participant = nullptr; - if (local_participant_ && local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto it = remote_participants_.find(identity); - if (it != remote_participants_.end()) { - participant = it->second.get(); - } - } - if (!participant) { - std::cerr - << "participant_attributes_changed for unknown participant: " - << identity << "\n"; - break; - } - // Build full attributes map - std::unordered_map attrs; - for (const auto &entry : pa.attributes()) { - attrs.emplace(entry.key(), entry.value()); - } - participant->set_attributes(attrs); - - // Build changed_attributes map - for (const auto &entry : pa.changed_attributes()) { - ev.changed_attributes.emplace_back(entry.key(), entry.value()); - } - ev.participant = participant; - } + auto ev = fromProto(re.participant_attributes_changed()); delegate_snapshot->onParticipantAttributesChanged(*this, ev); break; } case proto::RoomEvent::kParticipantEncryptionStatusChanged: { - ParticipantEncryptionStatusChangedEvent ev; - { - std::lock_guard guard(lock_); - const auto &pe = re.participant_encryption_status_changed(); - const std::string &identity = pe.participant_identity(); - Participant *participant = nullptr; - if (local_participant_ && local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto it = remote_participants_.find(identity); - if (it != remote_participants_.end()) { - participant = it->second.get(); - } - } - if (!participant) { - std::cerr << "participant_encryption_status_changed for unknown " - "participant: " - << identity << "\n"; - break; - } - ev.participant = participant; - ev.is_encrypted = pe.is_encrypted(); - } - + auto ev = fromProto(re.participant_encryption_status_changed()); delegate_snapshot->onParticipantEncryptionStatusChanged(*this, ev); break; } case proto::RoomEvent::kConnectionQualityChanged: { - ConnectionQualityChangedEvent ev; - { - std::lock_guard guard(lock_); - const auto &cq = re.connection_quality_changed(); - const std::string &identity = cq.participant_identity(); - Participant *participant = nullptr; - if (local_participant_ && local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto it = remote_participants_.find(identity); - if (it != remote_participants_.end()) { - participant = it->second.get(); - } - } - if (!participant) { - std::cerr << "connection_quality_changed for unknown participant: " - << identity << "\n"; - break; - } - ev.participant = participant; - ev.quality = static_cast(cq.quality()); - } - + auto ev = fromProto(re.connection_quality_changed()); delegate_snapshot->onConnectionQualityChanged(*this, ev); break; } - - // ------------------------------------------------------------------------ - // Transcription - // ------------------------------------------------------------------------ - - case proto::RoomEvent::kTranscriptionReceived: { - TranscriptionReceivedEvent ev; - { - std::lock_guard guard(lock_); - const auto &tr = re.transcription_received(); - for (const auto &s : tr.segments()) { - TranscriptionSegment seg; - seg.id = s.id(); - seg.text = s.text(); - seg.final = s.final(); - seg.start_time = s.start_time(); - seg.end_time = s.end_time(); - seg.language = s.language(); - ev.segments.push_back(std::move(seg)); - } - - Participant *participant = nullptr; - if (!tr.participant_identity().empty()) { - const std::string &identity = tr.participant_identity(); - if (local_participant_ && - local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto it = remote_participants_.find(identity); - if (it != remote_participants_.end()) { - participant = it->second.get(); - } - } - } - ev.participant = participant; - ev.publication = participant->findTrackPublication(tr.track_sid()); - } - - delegate_snapshot->onTranscriptionReceived(*this, ev); - break; - } - - // ------------------------------------------------------------------------ - // Data packets: user vs SIP DTMF - // ------------------------------------------------------------------------ - case proto::RoomEvent::kDataPacketReceived: { - const auto &dp = re.data_packet_received(); - RemoteParticipant *rp = nullptr; - { - std::lock_guard guard(lock_); - auto it = remote_participants_.find(dp.participant_identity()); - if (it != remote_participants_.end()) { - rp = it->second.get(); - } - } - const auto which_val = dp.value_case(); - if (which_val == proto::DataPacketReceived::kUser) { - UserDataPacketEvent ev = userDataPacketFromProto(dp, rp); - delegate_snapshot->onUserPacketReceived(*this, ev); - } else if (which_val == proto::DataPacketReceived::kSipDtmf) { - SipDtmfReceivedEvent ev = sipDtmfFromProto(dp, rp); - delegate_snapshot->onSipDtmfReceived(*this, ev); - } - break; - } - - // ------------------------------------------------------------------------ - // E2EE state - // ------------------------------------------------------------------------ - case proto::RoomEvent::kE2EeStateChanged: { - E2eeStateChangedEvent ev; - { - std::lock_guard guard(lock_); - const auto &es = re.e2ee_state_changed(); - const std::string &identity = es.participant_identity(); - Participant *participant = nullptr; - if (local_participant_ && local_participant_->identity() == identity) { - participant = local_participant_.get(); - } else { - auto it = remote_participants_.find(identity); - if (it != remote_participants_.end()) { - participant = it->second.get(); - } - } - if (!participant) { - std::cerr << "e2ee_state_changed for unknown participant: " - << identity << std::endl; - break; - } - - ev.participant = participant; - ev.state = static_cast(es.state()); - } - delegate_snapshot->onE2eeStateChanged(*this, ev); - break; - } - - // ------------------------------------------------------------------------ - // Connection state / lifecycle - // ------------------------------------------------------------------------ - case proto::RoomEvent::kConnectionStateChanged: { - ConnectionStateChangedEvent ev; - { - std::lock_guard guard(lock_); - const auto &cs = re.connection_state_changed(); - connection_state_ = static_cast(cs.state()); - ev.state = connection_state_; - } + auto ev = fromProto(re.connection_state_changed()); delegate_snapshot->onConnectionStateChanged(*this, ev); break; } case proto::RoomEvent::kDisconnected: { - DisconnectedEvent ev; - ev.reason = toDisconnectReason(re.disconnected().reason()); + auto ev = fromProto(re.disconnected()); delegate_snapshot->onDisconnected(*this, ev); break; } case proto::RoomEvent::kReconnecting: { - ReconnectingEvent ev; + auto ev = fromProto(re.reconnecting()); delegate_snapshot->onReconnecting(*this, ev); break; } case proto::RoomEvent::kReconnected: { - ReconnectedEvent ev; + auto ev = fromProto(re.reconnected()); delegate_snapshot->onReconnected(*this, ev); break; } + case proto::RoomEvent::kE2EeStateChanged: { + auto ev = fromProto(re.e2ee_state_changed()); + delegate_snapshot->onE2eeStateChanged(*this, ev); + break; + } case proto::RoomEvent::kEos: { - RoomEosEvent ev; + auto ev = fromProto(re.eos()); delegate_snapshot->onRoomEos(*this, ev); break; } + case proto::RoomEvent::kDataPacketReceived: { + auto ev = fromProto(re.data_packet_received()); + delegate_snapshot->onDataPacketReceived(*this, ev); + break; + } + case proto::RoomEvent::kTranscriptionReceived: { + auto ev = fromProto(re.transcription_received()); + delegate_snapshot->onTranscriptionReceived(*this, ev); + break; + } case proto::RoomEvent::kChatMessage: { auto ev = fromProto(re.chat_message()); delegate_snapshot->onChatMessageReceived(*this, ev); @@ -950,33 +497,36 @@ void Room::OnEvent(const FfiEvent &event) { break; } case proto::RoomEvent::kParticipantsUpdated: { - ParticipantsUpdatedEvent ev; + auto ev = fromProto(re.participants_updated()); { std::lock_guard guard(lock_); const auto &pu = re.participants_updated(); for (const auto &info : pu.participants()) { const std::string &identity = info.identity(); Participant *participant = nullptr; - + // First, check local participant. if (local_participant_ && identity == local_participant_->identity()) { participant = local_participant_.get(); } else { + // Otherwise, look for a remote participant. auto it = remote_participants_.find(identity); if (it != remote_participants_.end()) { participant = it->second.get(); } } + if (!participant) { + // Participant might not exist yet; ignore for now. std::cerr << "Room::RoomEvent::kParticipantsUpdated participant " "does not exist: " << identity << std::endl; continue; } + // Update basic fields participant->set_name(info.name()); participant->set_metadata(info.metadata()); - std::unordered_map attrs; attrs.reserve(info.attributes_size()); for (const auto &kv : info.attributes()) { @@ -986,8 +536,6 @@ void Room::OnEvent(const FfiEvent &event) { participant->set_kind(fromProto(info.kind())); participant->set_disconnect_reason( toDisconnectReason(info.disconnect_reason())); - - ev.participants.push_back(participant); } } delegate_snapshot->onParticipantsUpdated(*this, ev);