diff --git a/CMakeLists.txt b/CMakeLists.txt index 6baabde..473fd49 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -169,6 +169,7 @@ add_library(livekit include/livekit/local_participant.h include/livekit/remote_participant.h include/livekit/livekit.h + include/livekit/rpc_error.h include/livekit/stats.h include/livekit/track.h include/livekit/track_publication.h @@ -198,6 +199,7 @@ add_library(livekit src/track_publication.cpp src/local_track_publication.cpp src/remote_track_publication.cpp + src/rpc_error.cpp src/video_frame.cpp src/video_source.cpp src/video_stream.cpp diff --git a/README.md b/README.md index c294f8f..fe44301 100644 --- a/README.md +++ b/README.md @@ -41,8 +41,16 @@ All build actions are managed by the provided build.sh script. ## πŸ§ͺ Run Example +### Generate Tokens +Before running any participant, create JWT tokens with the proper identity and room name, example ```bash -./build/examples/SimpleRoom --url ws://localhost:7880 --token +lk token create -r test -i your_own_identity --join --valid-for 99999h --dev --room=your_own_room +``` + +### SimpleRoom + +```bash +./build/examples/SimpleRoom --url $URL --token ``` You can also provide the URL and token via environment variables: @@ -54,6 +62,33 @@ export LIVEKIT_TOKEN= Press Ctrl-C to exit the example. +### SimpleRpc +The SimpleRpc example demonstrates how to: +- Connect multiple participants to the same LiveKit room +- Register RPC handlers (e.g., arrival, square-root, divide, long-calculation) +- Send RPC requests from one participant to another +- Handle success, application errors, unsupported methods, and timeouts +- Observe round-trip times (RTT) for each RPC call + +#### πŸ”‘ Generate Tokens +Before running any participant, create JWT tokens with **caller**, **greeter** and **math-genius** identities and room name. +```bash +lk token create -r test -i caller --join --valid-for 99999h --dev --room=your_own_room +lk token create -r test -i greeter --join --valid-for 99999h --dev --room=your_own_room +lk token create -r test -i math-genius --join --valid-for 99999h --dev --room=your_own_room +``` + +#### β–Ά Start Participants +Every participant is run as a separate terminal process, note --role needs to match the token identity. +```bash +./build/examples/SimpleRpc --url $URL --token --role=math-genius +``` +The caller will automatically: +- Wait for the greeter and math-genius to join +- Perform RPC calls +- Print round-trip times +- Annotate expected successes or expected failures + ## 🧰 Recommended Setup ### macOS ```bash diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index f242af3..39e223e 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -4,6 +4,8 @@ project (livekit-examples) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) +#################### SimpleRoom example ########################## + list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") include(sdl3) @@ -32,3 +34,22 @@ add_custom_command(TARGET SimpleRoom POST_BUILD ${CMAKE_SOURCE_DIR}/data ${CMAKE_CURRENT_BINARY_DIR}/data ) + +#################### SimpleRpc example ########################## + +include(FetchContent) +FetchContent_Declare( + nlohmann_json + URL https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz +) +FetchContent_MakeAvailable(nlohmann_json) + +add_executable(SimpleRpc + simple_rpc/main.cpp +) + +target_link_libraries(SimpleRpc + PRIVATE + nlohmann_json::nlohmann_json + livekit +) \ No newline at end of file diff --git a/examples/simple_room/main.cpp b/examples/simple_room/main.cpp index e675a2f..9338e2f 100644 --- a/examples/simple_room/main.cpp +++ b/examples/simple_room/main.cpp @@ -42,7 +42,7 @@ namespace { std::atomic g_running{true}; -void print_usage(const char *prog) { +void printUsage(const char *prog) { std::cerr << "Usage:\n" << " " << prog << " \n" << "or:\n" @@ -52,9 +52,9 @@ void print_usage(const char *prog) { << " LIVEKIT_URL, LIVEKIT_TOKEN\n"; } -void handle_sigint(int) { g_running.store(false); } +void handleSignal(int) { g_running.store(false); } -bool parse_args(int argc, char *argv[], std::string &url, std::string &token) { +bool parseArgs(int argc, char *argv[], std::string &url, std::string &token) { // 1) --help for (int i = 1; i < argc; ++i) { std::string a = argv[i]; @@ -215,8 +215,8 @@ class SimpleRoomDelegate : public livekit::RoomDelegate { int main(int argc, char *argv[]) { std::string url, token; - if (!parse_args(argc, argv, url, token)) { - print_usage(argv[0]); + if (!parseArgs(argc, argv, url, token)) { + printUsage(argv[0]); return 1; } @@ -238,7 +238,7 @@ int main(int argc, char *argv[]) { std::cout << "Connecting to: " << url << std::endl; // Handle Ctrl-C to exit the idle loop - std::signal(SIGINT, handle_sigint); + std::signal(SIGINT, handleSignal); livekit::Room room{}; SimpleRoomDelegate delegate(media); diff --git a/examples/simple_rpc/README b/examples/simple_rpc/README new file mode 100644 index 0000000..2ded78c --- /dev/null +++ b/examples/simple_rpc/README @@ -0,0 +1,157 @@ +# πŸ“˜ SimpleRpc Example β€” Technical Overview + +This README provides deeper technical details about the RPC (Remote Procedure Call) support demonstrated in the SimpleRpc example. +It complements the example instructions found in the root README.md. + +If you're looking for how to run the example, see the root [README](https://github.com/livekit/client-sdk-cpp). + +This document explains: +- How LiveKit RPC works in the C++ SDK +- Where the APIs are defined +- How senders call RPC methods +- How receivers register handlers +- What happens if the receiver is absent +- How long-running operations behave +- Timeouts, disconnects, and unsupported methods +- RPC lifecycle events and error propagation + +## πŸ”§ Overview: How RPC Works +LiveKit RPC allows one participant (the caller) to invoke a method on another participant (the receiver) using the data channel transport. +It is: +- Peer-to-peer within the room (not server-executed RPC) +- Request/response (caller waits for a reply or an error) +- Asynchronous under the hood, synchronous or blocking from the caller’s perspective +- Delivery-guaranteed when using the reliable data channel + +Each RPC call includes: +| Field | Meaning | +|--------------------------|-----------------------------------------------------| +| **destination_identity** | Identity of the target participant | +| **method** | Method name string (e.g., "square-root") | +| **payload** | Arbitrary UTF-8 text | +| **response_timeout** | Optional timeout (seconds) | +| **invocation_id** | Server-generated ID used internally for correlation | + +## πŸ“ Location of APIs in C++ +All public-facing RPC APIs live in: +[include/livekit/local_participant.h](https://github.com/livekit/client-sdk-cpp/blob/main/include/livekit/local_participant.h#L160) + +### Key methods: + +#### Sender-side APIs +```bash +std::string performRpc( + const std::string& destination_identity, + const std::string& method, + const std::string& payload, + std::optional response_timeout_sec = std::nullopt +); + +Receiver-side APIs +void registerRpcMethod( + const std::string& method_name, + RpcHandler handler +); + +void unregisterRpcMethod(const std::string& method_name); + +Handler signature +using RpcHandler = + std::function(const RpcInvocationData&)>; +``` + +Handlers can: +- Return a string (the RPC response payload) +- Return std::nullopt (meaning β€œno return payload”) +- Throw exceptions (mapped to APPLICATION_ERROR) +- Throw a RpcError (mapped to specific error codes) + +#### πŸ›° Sender Behavior (performRpc) + +When the caller invokes: +```bash +auto reply = lp->performRpc("math-genius", "square-root", "{\"number\":16}"); +``` + +The following occurs: + +A PerformRpcRequest is sent through FFI to the SDK core. + +The SDK transmits the invocation to the target participant (if present). + +The caller begins waiting for a matching RpcMethodInvocationResponse. + +One of the following happens: +| Outcome | Meaning | +|--------------------------|------------------------------------------| +| **Success** | Receiver returned a payload | +| **UNSUPPORTED_METHOD** | Receiver did not register the method | +| **RECIPIENT_NOT_FOUND** | Target identity not present in room | +| **RECIPIENT_DISCONNECTED** | Target left before replying | +| **RESPONSE_TIMEOUT** | Receiver took too long | +| **APPLICATION_ERROR** | Handler threw an exception | + +#### πŸ”„ Round-trip time (RTT) + +The caller can measure RTT externally (as SimpleRpc does), but the SDK does not measure RTT internally. + +#### πŸ“‘ Receiver Behavior (registerRpcMethod) + +A receiver must explicitly register handlers: +```bash +local_participant->registerRpcMethod("square-root", + [](const RpcInvocationData& data) { + double number = parse(data.payload); + return make_json("result", std::sqrt(number)); + }); +``` + +When an invocation arrives: +- Room receives a RpcMethodInvocationEvent +- Room forwards it to the corresponding LocalParticipant +- LocalParticipant::handleRpcMethodInvocation(): +- Calls the handler +- Converts any exceptions into RpcError +- Sends back RpcMethodInvocationResponse + +⚠ If no handler exists: + +Receiver returns: UNSUPPORTED_METHOD + + +#### 🚨 What Happens if Receiver Is Absent? +| Case | Behavior | +|-----------------------------------------------------|---------------------------------------------------| +| Receiver identity is not in the room | Caller immediately receives `RECIPIENT_NOT_FOUND` | +| Receiver is present but disconnects before replying | Caller receives `RECIPIENT_DISCONNECTED` | +| Receiver joins later | Caller must retry manually (no automatic waiting) | + +**Important**: +LiveKit does not queue RPC calls for offline participants. + +#### ⏳ Timeout Behavior + +If the caller specifies: + +performRpc(..., /*response_timeout=*/10.0); + +Then: +- Receiver is given 10 seconds to respond. +- If the receiver handler takes longer (e.g., sleep 30s), caller receives: +RESPONSE_TIMEOUT + +**If no response_timeout is provided explicitly, the default timeout is 15 seconds.** + + +This is by design and demonstrated in the example. + +#### 🧨 Errors & Failure Modes +| Error Code | Cause | +|------------------------|---------------------------------------------| +| **APPLICATION_ERROR** | Handler threw a C++ exception | +| **UNSUPPORTED_METHOD** | No handler registered for the method | +| **RECIPIENT_NOT_FOUND** | Destination identity not in room | +| **RECIPIENT_DISCONNECTED** | Participant left mid-flight | +| **RESPONSE_TIMEOUT** | Handler exceeded allowed response time | +| **CONNECTION_TIMEOUT** | Transport-level issue | +| **SEND_FAILED** | SDK failed to send invocation | diff --git a/examples/simple_rpc/main.cpp b/examples/simple_rpc/main.cpp new file mode 100644 index 0000000..61151db --- /dev/null +++ b/examples/simple_rpc/main.cpp @@ -0,0 +1,542 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an β€œAS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "livekit/livekit.h" +#include "livekit_ffi.h" + +using namespace livekit; +using namespace std::chrono_literals; + +namespace { + +std::atomic g_running{true}; + +void handleSignal(int) { g_running.store(false); } + +void printUsage(const char *prog) { + std::cerr << "Usage:\n" + << " " << prog << " [role]\n" + << "or:\n" + << " " << prog + << " --url= --token= [--role=]\n" + << " " << prog + << " --url --token [--role ]\n\n" + << "Env fallbacks:\n" + << " LIVEKIT_URL, LIVEKIT_TOKEN\n" + << "Role (participant behavior):\n" + << " SIMPLE_RPC_ROLE or --role=\n" + << " default: caller\n"; +} + +inline double nowMs() { + return std::chrono::duration( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); +} + +// Poll the room until a remote participant with the given identity appears, +// or until 'timeout' elapses. Returns true if found, false on timeout. +bool waitForParticipant(Room &room, const std::string &identity, + std::chrono::milliseconds timeout) { + auto start = std::chrono::steady_clock::now(); + + while (std::chrono::steady_clock::now() - start < timeout) { + if (room.remote_participant(identity) != nullptr) { + return true; + } + std::this_thread::sleep_for(100ms); + } + return false; +} + +// For the caller: wait for a specific peer, and if they don't show up, +// explain why and how to start them in another terminal. +bool ensurePeerPresent(Room &room, const std::string &identity, + const std::string &friendly_role, const std::string &url, + std::chrono::seconds timeout) { + std::cout << "[Caller] Waiting up to " << timeout.count() << "s for " + << friendly_role << " (identity=\"" << identity + << "\") to join...\n"; + bool present = waitForParticipant( + room, identity, + std::chrono::duration_cast(timeout)); + if (present) { + std::cout << "[Caller] " << friendly_role << " is present.\n"; + return true; + } + // Timed out + auto info = room.room_info(); + const std::string room_name = info.name; + std::cout << "[Caller] Timed out after " << timeout.count() + << "s waiting for " << friendly_role << " (identity=\"" << identity + << "\").\n"; + std::cout << "[Caller] No participant with identity \"" << identity + << "\" appears to be connected to room \"" << room_name + << "\".\n\n"; + std::cout << "To start a " << friendly_role + << " in another terminal, run:\n\n" + << " lk token create -r test -i " << identity + << " --join --valid-for 99999h --dev --room=" << room_name << "\n" + << " ./build/examples/SimpleRpc " << url + << " $token --role=" << friendly_role << "\n\n"; + return false; +} + +bool parseArgs(int argc, char *argv[], std::string &url, std::string &token, + std::string &role) { + // --help + for (int i = 1; i < argc; ++i) { + std::string a = argv[i]; + if (a == "-h" || a == "--help") { + return false; + } + } + + // helper for flags + auto get_flag_value = [&](const std::string &name, int &i) -> std::string { + std::string arg = argv[i]; + const std::string eq = name + "="; + if (arg.rfind(name, 0) == 0) { // starts with name + if (arg.size() > name.size() && arg[name.size()] == '=') { + return arg.substr(eq.size()); + } else if (i + 1 < argc) { + return std::string(argv[++i]); + } + } + return {}; + }; + + // flags: --url / --token / --role (with = or split) + for (int i = 1; i < argc; ++i) { + const std::string a = argv[i]; + if (a.rfind("--url", 0) == 0) { + auto v = get_flag_value("--url", i); + if (!v.empty()) + url = v; + } else if (a.rfind("--token", 0) == 0) { + auto v = get_flag_value("--token", i); + if (!v.empty()) + token = v; + } else if (a.rfind("--role", 0) == 0) { + auto v = get_flag_value("--role", i); + if (!v.empty()) + role = v; + } + } + + std::vector pos; + for (int i = 1; i < argc; ++i) { + std::string a = argv[i]; + if (a.rfind("--", 0) == 0) + continue; + pos.push_back(std::move(a)); + } + if (!pos.empty()) { + if (url.empty() && pos.size() >= 1) { + url = pos[0]; + } + if (token.empty() && pos.size() >= 2) { + token = pos[1]; + } + if (role.empty() && pos.size() >= 3) { + role = pos[2]; + } + } + + if (url.empty()) { + const char *e = std::getenv("LIVEKIT_URL"); + if (e) + url = e; + } + if (token.empty()) { + const char *e = std::getenv("LIVEKIT_TOKEN"); + if (e) + token = e; + } + if (role.empty()) { + const char *e = std::getenv("SIMPLE_RPC_ROLE"); + if (e) + role = e; + } + if (role.empty()) { + role = "caller"; + } + + return !(url.empty() || token.empty()); +} + +std::string makeNumberJson(const std::string &key, double value) { + std::ostringstream oss; + oss << "{\"" << key << "\":" << value << "}"; + return oss.str(); +} + +std::string makeStringJson(const std::string &key, const std::string &value) { + std::ostringstream oss; + oss << "{\"" << key << "\":\"" << value << "\"}"; + return oss.str(); +} + +double parseNumberFromJson(const std::string &json) { + auto colon = json.find(':'); + if (colon == std::string::npos) + throw std::runtime_error("invalid json: " + json); + auto start = colon + 1; + auto end = json.find_first_of(",}", start); + std::string num_str = json.substr(start, end - start); + return std::stod(num_str); +} + +std::string parseStringFromJson(const std::string &json) { + auto colon = json.find(':'); + if (colon == std::string::npos) + throw std::runtime_error("invalid json: " + json); + auto first_quote = json.find('"', colon + 1); + if (first_quote == std::string::npos) + throw std::runtime_error("invalid json: " + json); + auto second_quote = json.find('"', first_quote + 1); + if (second_quote == std::string::npos) + throw std::runtime_error("invalid json: " + json); + return json.substr(first_quote + 1, second_quote - first_quote - 1); +} + +// RPC handler registration +void registerReceiverMethods(Room &greeters_room, Room &math_genius_room) { + LocalParticipant *greeter_lp = greeters_room.local_participant(); + LocalParticipant *math_genius_lp = math_genius_room.local_participant(); + + // arrival + greeter_lp->registerRpcMethod( + "arrival", + [](const RpcInvocationData &data) -> std::optional { + std::cout << "[Greeter] Oh " << data.caller_identity + << " arrived and said \"" << data.payload << "\"\n"; + std::this_thread::sleep_for(2s); + return std::optional{"Welcome and have a wonderful day!"}; + }); + + // square-root + math_genius_lp->registerRpcMethod( + "square-root", + [](const RpcInvocationData &data) -> std::optional { + double number = parseNumberFromJson(data.payload); + std::cout << "[Math Genius] I guess " << data.caller_identity + << " wants the square root of " << number + << ". I've only got " << data.response_timeout_sec + << " seconds to respond but I think I can pull it off.\n"; + std::cout << "[Math Genius] *doing math*…\n"; + std::this_thread::sleep_for(2s); + double result = std::sqrt(number); + std::cout << "[Math Genius] Aha! It's " << result << "\n"; + return makeNumberJson("result", result); + }); + + // divide + math_genius_lp->registerRpcMethod( + "divide", + [](const RpcInvocationData &data) -> std::optional { + // expect {"dividend":X,"divisor":Y} – we'll parse very lazily + auto div_pos = data.payload.find("dividend"); + auto dvr_pos = data.payload.find("divisor"); + if (div_pos == std::string::npos || dvr_pos == std::string::npos) { + throw std::runtime_error("invalid divide payload"); + } + + double dividend = parseNumberFromJson( + data.payload.substr(div_pos, dvr_pos - div_pos - 1)); // rough slice + double divisor = parseNumberFromJson(data.payload.substr(dvr_pos)); + + std::cout << "[Math Genius] " << data.caller_identity + << " wants to divide " << dividend << " by " << divisor + << ".\n"; + + if (divisor == 0.0) { + // will be translated to APPLICATION_ERROR by your RpcError logic + throw std::runtime_error("division by zero"); + } + + double result = dividend / divisor; + return makeNumberJson("result", result); + }); + + // long-calculation + math_genius_lp->registerRpcMethod( + "long-calculation", + [](const RpcInvocationData &data) -> std::optional { + std::cout << "[Math Genius] Starting a very long calculation for " + << data.caller_identity << "\n"; + std::cout << "[Math Genius] This will take 30 seconds even though " + "you're only giving me " + << data.response_timeout_sec << " seconds\n"; + // Sleep for 30 seconds to mimic a long running task. + std::this_thread::sleep_for(30s); + return makeStringJson("result", "Calculation complete!"); + }); + + // Note: we do NOT register "quantum-hypergeometric-series" here, + // so the caller sees UNSUPPORTED_METHOD +} + +void performGreeting(Room &room) { + std::cout << "[Caller] Letting the greeter know that I've arrived\n"; + double t0 = nowMs(); + try { + std::string response = room.local_participant()->performRpc( + "greeter", "arrival", "Hello", std::nullopt); + double t1 = nowMs(); + std::cout << "[Caller] RTT: " << (t1 - t0) << " ms\n"; + std::cout << "[Caller] That's nice, the greeter said: \"" << response + << "\"\n"; + } catch (const std::exception &error) { + double t1 = nowMs(); + std::cout << "[Caller] (FAILED) RTT: " << (t1 - t0) << " ms\n"; + std::cout << "[Caller] RPC call failed: " << error.what() << "\n"; + throw; + } +} + +void performSquareRoot(Room &room) { + std::cout << "[Caller] What's the square root of 16?\n"; + double t0 = nowMs(); + try { + std::string payload = makeNumberJson("number", 16.0); + std::string response = room.local_participant()->performRpc( + "math-genius", "square-root", payload, std::nullopt); + double t1 = nowMs(); + std::cout << "[Caller] RTT: " << (t1 - t0) << " ms\n"; + double result = parseNumberFromJson(response); + std::cout << "[Caller] Nice, the answer was " << result << "\n"; + } catch (const std::exception &error) { + double t1 = nowMs(); + std::cout << "[Caller] (FAILED) RTT: " << (t1 - t0) << " ms\n"; + std::cout << "[Caller] RPC call failed: " << error.what() << "\n"; + throw; + } +} + +void performQuantumHyperGeometricSeries(Room &room) { + std::cout << "\n=== Unsupported Method Example ===\n"; + std::cout + << "[Caller] Asking math-genius for 'quantum-hypergeometric-series'. " + "This should FAIL because the handler is NOT registered.\n"; + double t0 = nowMs(); + try { + std::string payload = makeNumberJson("number", 42.0); + std::string response = room.local_participant()->performRpc( + "math-genius", "quantum-hypergeometric-series", payload, std::nullopt); + double t1 = nowMs(); + std::cout << "[Caller] (Unexpected success) RTT=" << (t1 - t0) << " ms\n"; + std::cout << "[Caller] Result: " << response << "\n"; + } catch (const RpcError &error) { + double t1 = nowMs(); + std::cout << "[Caller] RpcError RTT=" << (t1 - t0) << " ms\n"; + auto code = static_cast(error.code()); + if (code == RpcError::ErrorCode::UNSUPPORTED_METHOD) { + std::cout << "[Caller] βœ“ Expected: math-genius does NOT implement this " + "method.\n"; + std::cout << "[Caller] Server returned UNSUPPORTED_METHOD.\n"; + } else { + std::cout << "[Caller] βœ— Unexpected error type: " << error.message() + << "\n"; + } + } +} + +void performDivide(Room &room) { + std::cout << "\n=== Divide Example ===\n"; + std::cout << "[Caller] Asking math-genius to divide 10 by 0. " + "This is EXPECTED to FAIL with an APPLICATION_ERROR.\n"; + double t0 = nowMs(); + try { + std::string payload = "{\"dividend\":10,\"divisor\":0}"; + std::string response = room.local_participant()->performRpc( + "math-genius", "divide", payload, std::nullopt); + double t1 = nowMs(); + std::cout << "[Caller] (Unexpected success) RTT=" << (t1 - t0) << " ms\n"; + std::cout << "[Caller] Result = " << response << "\n"; + } catch (const RpcError &error) { + double t1 = nowMs(); + std::cout << "[Caller] RpcError RTT=" << (t1 - t0) << " ms\n"; + auto code = static_cast(error.code()); + if (code == RpcError::ErrorCode::APPLICATION_ERROR) { + std::cout << "[Caller] βœ“ Expected: divide-by-zero triggers " + "APPLICATION_ERROR.\n"; + std::cout << "[Caller] Math-genius threw an exception: " + << error.message() << "\n"; + } else { + std::cout << "[Caller] βœ— Unexpected RpcError type: " << error.message() + << "\n"; + } + } +} + +void performLongCalculation(Room &room) { + std::cout << "\n=== Long Calculation Example ===\n"; + std::cout + << "[Caller] Asking math-genius for a calculation that takes 30s.\n"; + std::cout + << "[Caller] Giving only 10s to respond. EXPECTED RESULT: TIMEOUT.\n"; + double t0 = nowMs(); + try { + std::string response = room.local_participant()->performRpc( + "math-genius", "long-calculation", "{}", 10.0); + double t1 = nowMs(); + std::cout << "[Caller] (Unexpected success) RTT=" << (t1 - t0) << " ms\n"; + std::cout << "[Caller] Result: " << response << "\n"; + } catch (const RpcError &error) { + double t1 = nowMs(); + std::cout << "[Caller] RpcError RTT=" << (t1 - t0) << " ms\n"; + auto code = static_cast(error.code()); + if (code == RpcError::ErrorCode::RESPONSE_TIMEOUT) { + std::cout + << "[Caller] βœ“ Expected: handler sleeps 30s but timeout is 10s.\n"; + std::cout << "[Caller] Server correctly returned RESPONSE_TIMEOUT.\n"; + } else if (code == RpcError::ErrorCode::RECIPIENT_DISCONNECTED) { + std::cout << "[Caller] βœ“ Expected if math-genius disconnects during the " + "test.\n"; + } else { + std::cout << "[Caller] βœ— Unexpected RPC error: " << error.message() + << "\n"; + } + } +} + +} // namespace + +int main(int argc, char *argv[]) { + std::string url, token, role; + if (!parseArgs(argc, argv, url, token, role)) { + printUsage(argv[0]); + return 1; + } + + if (url.empty() || token.empty()) { + std::cerr << "LIVEKIT_URL and LIVEKIT_TOKEN (or CLI args) are required\n"; + return 1; + } + + std::cout << "Connecting to: " << url << "\n"; + std::cout << "Role: " << role << "\n"; + + // Ctrl-C to quit the program + std::signal(SIGINT, handleSignal); + + Room room{}; + RoomOptions options; + options.auto_subscribe = true; + options.dynacast = false; + + bool res = room.Connect(url, token, options); + std::cout << "Connect result is " << std::boolalpha << res << "\n"; + if (!res) { + std::cerr << "Failed to connect to room\n"; + FfiClient::instance().shutdown(); + return 1; + } + + auto info = room.room_info(); + std::cout << "Connected to room:\n" + << " Name: " << info.name << "\n" + << " Metadata: " << info.metadata << "\n" + << " Num participants: " << info.num_participants << "\n"; + + try { + if (role == "caller") { + // Check that both peers are present (or explain how to start them). + bool has_greeter = ensurePeerPresent(room, "greeter", "greeter", url, 8s); + bool has_math_genius = + ensurePeerPresent(room, "math-genius", "math-genius", url, 8s); + if (!has_greeter || !has_math_genius) { + std::cout << "\n[Caller] One or more RPC peers are missing. " + << "Some examples may be skipped.\n"; + } + if (has_greeter) { + std::cout << "\n\nRunning greeting example...\n"; + performGreeting(room); + } else { + std::cout << "[Caller] Skipping greeting example because greeter is " + "not present.\n"; + } + if (has_math_genius) { + std::cout << "\n\nRunning error handling example...\n"; + performDivide(room); + + std::cout << "\n\nRunning math example...\n"; + performSquareRoot(room); + std::this_thread::sleep_for(2s); + performQuantumHyperGeometricSeries(room); + + std::cout << "\n\nRunning long calculation with timeout...\n"; + performLongCalculation(room); + } else { + std::cout << "[Caller] Skipping math examples because math-genius is " + "not present.\n"; + } + + std::cout << "\n\nCaller done. Exiting.\n"; + } else if (role == "greeter" || role == "math-genius") { + // For these roles we expect multiple processes: + // - One process with role=caller + // - One with role=greeter + // - One with role=math-genius + // + // Each process gets its own token (with that identity) via LIVEKIT_TOKEN. + // Here we only register handlers for the appropriate role, and then + // stay alive until Ctrl-C so we can receive RPCs. + + if (role == "greeter") { + // Use the same room object for both arguments; only "arrival" is used. + registerReceiverMethods(room, room); + } else { // math-genius + // We only need math handlers; greeter handlers won't be used. + registerReceiverMethods(room, room); + } + + std::cout << "RPC handlers registered for role=" << role + << ". Waiting for RPC calls (Ctrl-C to exit)...\n"; + + while (g_running.load()) { + std::this_thread::sleep_for(50ms); + } + std::cout << "Exiting receiver role.\n"; + } else { + std::cerr << "Unknown role: " << role << "\n"; + } + } catch (const std::exception &e) { + std::cerr << "Unexpected error in main: " << e.what() << "\n"; + } + + FfiClient::instance().shutdown(); + return 0; +} diff --git a/include/livekit/ffi_client.h b/include/livekit/ffi_client.h index 51cd226..6b2fa3d 100644 --- a/include/livekit/ffi_client.h +++ b/include/livekit/ffi_client.h @@ -111,6 +111,11 @@ class FfiClient { std::future captureAudioFrameAsync(std::uint64_t source_handle, const proto::AudioFrameBufferInfo &buffer); + std::future performRpcAsync( + std::uint64_t local_participant_handle, + const std::string &destination_identity, const std::string &method, + const std::string &payload, + std::optional response_timeout_ms = std::nullopt); // Generic function for sending a request to the Rust FFI. // Note: For asynchronous requests, use the dedicated async functions instead diff --git a/include/livekit/local_participant.h b/include/livekit/local_participant.h index 1077d73..17dc9a0 100644 --- a/include/livekit/local_participant.h +++ b/include/livekit/local_participant.h @@ -19,8 +19,10 @@ #include "livekit/ffi_handle.h" #include "livekit/participant.h" #include "livekit/room_delegate.h" +#include "livekit/rpc_error.h" #include +#include #include #include #include @@ -33,8 +35,16 @@ struct ParticipantTrackPermission; class FfiClient; class Track; class LocalTrackPublication; +// TODO, should consider moving Transcription to local_participant.h? struct Transcription; +struct RpcInvocationData { + std::string request_id; + std::string caller_identity; + std::string payload; + double response_timeout_sec; // seconds +}; + /** * Represents the local participant in a room. * @@ -45,6 +55,19 @@ class LocalParticipant : public Participant { using PublicationMap = std::unordered_map>; + /** + * Type of callback used to handle incoming RPC method invocations. + * + * The handler receives an RpcInvocationData describing the incoming call + * and may return an optional response payload. To signal an error to the + * remote caller, throw an RpcError; it will be serialized and forwarded. + * + * Returning std::nullopt means "no payload" and results in an empty + * response body being sent back to the caller. + */ + using RpcHandler = + std::function(const RpcInvocationData &)>; + LocalParticipant(FfiHandle handle, std::string sid, std::string name, std::string identity, std::string metadata, std::unordered_map attributes, @@ -91,10 +114,6 @@ class LocalParticipant : public Participant { void setAttributes(const std::unordered_map &attributes); - // ------------------------------------------------------------------------- - // Subscription permissions - // ------------------------------------------------------------------------- - /** * Set track subscription permissions for this participant. * @@ -106,10 +125,6 @@ class LocalParticipant : public Participant { const std::vector &participant_permissions = {}); - // ------------------------------------------------------------------------- - // Track publish / unpublish (synchronous analogue) - // ------------------------------------------------------------------------- - /** * Publish a local track to the room. * @@ -126,8 +141,73 @@ class LocalParticipant : public Participant { */ void unpublishTrack(const std::string &track_sid); + /** + * Initiate an RPC call to a remote participant. + * + * @param destination_identity Identity of the destination participant. + * @param method Name of the RPC method to invoke. + * @param payload Request payload to send to the remote handler. + * @param response_timeout Optional timeout in seconds for receiving + * a response. If not set, the server default + * timeout (15 seconds) is used. + * + * @return The response payload returned by the remote handler. + * + * @throws RpcError If the remote side returns an RPC error, times out, + * or rejects the request. + * @throws std::runtime_error If the underlying FFI handle is invalid or + * the FFI call fails unexpectedly. + */ + std::string performRpc(const std::string &destination_identity, + const std::string &method, const std::string &payload, + std::optional response_timeout = std::nullopt); + + /** + * Register a handler for an incoming RPC method. + * + * Once registered, the provided handler will be invoked whenever a remote + * participant calls the given method name on this LocalParticipant. + * + * @param method_name Name of the RPC method to handle. This must match + * the method name used by remote callers. + * @param handler Callback to execute when an invocation is received. + * The handler may return an optional response payload + * or throw an RpcError to signal failure. + * + * If a handler is already registered for the same method_name, it will be + * replaced by the new handler. + */ + + void registerRpcMethod(const std::string &method_name, RpcHandler handler); + + /** + * Unregister a previously registered RPC method handler. + * + * After this call, invocations for the given method_name will no longer + * be dispatched to a local handler and will instead result in an + * "unsupported method" error being returned to the caller. + * + * @param method_name Name of the RPC method to unregister. + * If no handler is registered for this name, the call + * is a no-op. + */ + void unregisterRpcMethod(const std::string &method_name); + +protected: + // Called by Room when an rpc_method_invocation event is received from the + // SFU. This is internal plumbing and not intended to be called directly by + // SDK users. + void handleRpcMethodInvocation(std::uint64_t invocation_id, + const std::string &method, + const std::string &request_id, + const std::string &caller_identity, + const std::string &payload, + double response_timeout); + friend class Room; + private: PublicationMap track_publications_; + std::unordered_map rpc_handlers_; }; } // namespace livekit diff --git a/include/livekit/rpc_error.h b/include/livekit/rpc_error.h new file mode 100644 index 0000000..0fd7195 --- /dev/null +++ b/include/livekit/rpc_error.h @@ -0,0 +1,123 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an β€œAS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +namespace livekit { + +namespace proto { +class RpcError; +} + +/** + * Specialized error type for RPC methods. + * + * Instances of this type, when thrown in a method handler, will have their + * `code`, `message`, and optional `data` serialized into a proto::RpcError + * and sent across the wire. The caller will receive an equivalent error + * on the other side. + * + * Built-in errors are included (codes 1400–1999) but developers may use + * arbitrary codes as well. + */ +class RpcError : public std::runtime_error { +public: + /** + * Built-in error codes, mirroring the Python RpcError.ErrorCode enum. + */ + enum class ErrorCode : std::uint32_t { + APPLICATION_ERROR = 1500, + CONNECTION_TIMEOUT = 1501, + RESPONSE_TIMEOUT = 1502, + RECIPIENT_DISCONNECTED = 1503, + RESPONSE_PAYLOAD_TOO_LARGE = 1504, + SEND_FAILED = 1505, + + UNSUPPORTED_METHOD = 1400, + RECIPIENT_NOT_FOUND = 1401, + REQUEST_PAYLOAD_TOO_LARGE = 1402, + UNSUPPORTED_SERVER = 1403, + UNSUPPORTED_VERSION = 1404, + }; + + /** + * Construct an RpcError with an explicit numeric code. + * + * @param code Error code value. Codes 1001–1999 are reserved for + * built-in errors (see ErrorCode). + * @param message Human-readable error message. + * @param data Optional extra data, e.g. JSON. Empty string means no data. + */ + RpcError(std::uint32_t code, std::string message, std::string data = {}); + + /** + * Construct an RpcError from a built-in ErrorCode. + * + * @param code Built-in error code. + * @param message Human-readable error message. + * @param data Optional extra data, e.g. JSON. Empty string means no data. + */ + RpcError(ErrorCode code, std::string message, std::string data = {}); + + /** + * Numeric error code. + * + * Codes 1001–1999 are reserved for built-in errors. For built-ins, this + * value matches the underlying ErrorCode enum value. + */ + std::uint32_t code() const noexcept; + + /** + * Human-readable error message. + */ + const std::string &message() const noexcept; + + /** + * Optional extra data associated with the error (JSON recommended). + * May be an empty string if no data was provided. + */ + const std::string &data() const noexcept; + + /** + * Create a built-in RpcError using a predefined ErrorCode and default + * message text that matches the Python RpcError.ErrorMessage table. + * + * @param code Built-in error code. + * @param data Optional extra data payload (JSON recommended). + */ + static RpcError builtIn(ErrorCode code, const std::string &data = {}); + +protected: + // ----- Protected: only used by LocalParticipant (internal SDK code) ----- + proto::RpcError toProto() const; + static RpcError fromProto(const proto::RpcError &err); + + friend class LocalParticipant; + friend class FfiClient; + +private: + static const char *defaultMessageFor(ErrorCode code); + + std::uint32_t code_; + std::string message_; + std::string data_; +}; + +} // namespace livekit \ No newline at end of file diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 5b426d0..38bc52d 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -22,6 +22,7 @@ #include "livekit/ffi_client.h" #include "livekit/ffi_handle.h" #include "livekit/room.h" // TODO, maybe avoid circular deps by moving RoomOptions to a room_types.h ? +#include "livekit/rpc_error.h" #include "livekit/track.h" #include "livekit_ffi.h" #include "room.pb.h" @@ -505,4 +506,43 @@ FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, }); } +std::future +FfiClient::performRpcAsync(std::uint64_t local_participant_handle, + const std::string &destination_identity, + const std::string &method, + const std::string &payload, + std::optional response_timeout_ms) { + proto::FfiRequest req; + auto *msg = req.mutable_perform_rpc(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_destination_identity(destination_identity); + msg->set_method(method); + msg->set_payload(payload); + if (response_timeout_ms.has_value()) { + msg->set_response_timeout_ms(*response_timeout_ms); + } + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_perform_rpc()) { + throw std::runtime_error("FfiResponse missing perform_rpc"); + } + const AsyncId async_id = resp.perform_rpc().async_id(); + return registerAsync( + // match predicate + [async_id](const proto::FfiEvent &event) { + return event.has_perform_rpc() && + event.perform_rpc().async_id() == async_id; + }, + [](const proto::FfiEvent &event, std::promise &pr) { + const auto &cb = event.perform_rpc(); + + if (cb.has_error()) { + // RpcError is a proto message; convert to C++ RpcError and throw + pr.set_exception( + std::make_exception_ptr(RpcError::fromProto(cb.error()))); + return; + } + pr.set_value(cb.payload()); + }); +} + } // namespace livekit diff --git a/src/local_participant.cpp b/src/local_participant.cpp index 2f52573..02f90d5 100644 --- a/src/local_participant.cpp +++ b/src/local_participant.cpp @@ -251,4 +251,103 @@ void LocalParticipant::unpublishTrack(const std::string &track_sid) { track_publications_.erase(track_sid); } +std::string LocalParticipant::performRpc( + const std::string &destination_identity, const std::string &method, + const std::string &payload, std::optional response_timeout) { + auto handle_id = ffiHandleId(); + if (handle_id == 0) { + throw std::runtime_error( + "LocalParticipant::performRpc: invalid FFI handle"); + } + + std::uint32_t timeout_ms = 0; + bool has_timeout = false; + if (response_timeout.has_value()) { + timeout_ms = static_cast(response_timeout.value() * 1000.0); + has_timeout = true; + } + + auto fut = FfiClient::instance().performRpcAsync( + static_cast(handle_id), destination_identity, method, + payload, + has_timeout ? std::optional(timeout_ms) : std::nullopt); + return fut.get(); +} + +void LocalParticipant::registerRpcMethod(const std::string &method_name, + RpcHandler handler) { + auto handle_id = ffiHandleId(); + if (handle_id == 0) { + throw std::runtime_error( + "LocalParticipant::registerRpcMethod: invalid FFI handle"); + } + rpc_handlers_[method_name] = std::move(handler); + FfiRequest req; + auto *msg = req.mutable_register_rpc_method(); + msg->set_local_participant_handle(static_cast(handle_id)); + msg->set_method(method_name); + + (void)FfiClient::instance().sendRequest(req); +} + +void LocalParticipant::unregisterRpcMethod(const std::string &method_name) { + auto handle_id = ffiHandleId(); + if (handle_id == 0) { + throw std::runtime_error( + "LocalParticipant::unregisterRpcMethod: invalid FFI handle"); + } + rpc_handlers_.erase(method_name); + FfiRequest req; + auto *msg = req.mutable_unregister_rpc_method(); + msg->set_local_participant_handle(static_cast(handle_id)); + msg->set_method(method_name); + + (void)FfiClient::instance().sendRequest(req); +} + +void LocalParticipant::handleRpcMethodInvocation( + uint64_t invocation_id, const std::string &method, + const std::string &request_id, const std::string &caller_identity, + const std::string &payload, double response_timeout_sec) { + std::optional response_error; + std::optional response_payload; + std::cout << "handleRpcMethodInvocation \n"; + RpcInvocationData params{request_id, caller_identity, payload, + response_timeout_sec}; + auto it = rpc_handlers_.find(method); + if (it == rpc_handlers_.end()) { + // No handler registered β†’ built-in UNSUPPORTED_METHOD + response_error = RpcError::builtIn(RpcError::ErrorCode::UNSUPPORTED_METHOD); + } else { + try { + // Invoke user handler: may return payload or throw RpcError + response_payload = it->second(params); + } catch (const RpcError &err) { + // Handler explicitly signalled an RPC error: forward as-is + response_error = err; + } catch (const std::exception &ex) { + // Any other exception: wrap as built-in APPLICATION_ERROR + response_error = + RpcError::builtIn(RpcError::ErrorCode::APPLICATION_ERROR, ex.what()); + } catch (...) { + response_error = RpcError::builtIn(RpcError::ErrorCode::APPLICATION_ERROR, + "unknown error"); + } + } + + FfiRequest req; + auto *msg = req.mutable_rpc_method_invocation_response(); + msg->set_local_participant_handle(ffiHandleId()); + msg->set_invocation_id(invocation_id); + if (response_error.has_value()) { + auto *err_proto = msg->mutable_error(); + err_proto->CopyFrom(response_error->toProto()); + } + if (response_payload.has_value()) { + msg->set_payload(*response_payload); + } + std::cout << "handleRpcMethodInvocation sendrequest \n"; + FfiClient::instance().sendRequest(req); +} + } // namespace livekit diff --git a/src/room.cpp b/src/room.cpp index f52edb1..c20aac1 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -166,8 +166,35 @@ void Room::OnEvent(const FfiEvent &event) { { std::lock_guard guard(lock_); delegate_snapshot = delegate_; - // If you want, you can also update internal state here (participants, room - // info, etc.). + } + + // First, handle RPC method invocations (not part of RoomEvent). + if (event.message_case() == FfiEvent::kRpcMethodInvocation) { + const auto &rpc = event.rpc_method_invocation(); + + LocalParticipant *lp = nullptr; + { + std::lock_guard guard(lock_); + if (!local_participant_) { + return; + } + auto local_handle = local_participant_->ffiHandleId(); + if (local_handle == INVALID_HANDLE || + rpc.local_participant_handle() != + static_cast(local_handle)) { + // RPC is not targeted at this room's local participant; ignore. + return; + } + lp = local_participant_.get(); + } + + // Call outside the lock to avoid deadlocks / re-entrancy issues. + lp->handleRpcMethodInvocation( + rpc.invocation_id(), rpc.method(), rpc.request_id(), + rpc.caller_identity(), rpc.payload(), + static_cast(rpc.response_timeout_ms()) / 1000.0); + + return; } if (!delegate_snapshot) { diff --git a/src/room_event_converter.h b/src/room_event_converter.h deleted file mode 100644 index f8dde1f..0000000 --- a/src/room_event_converter.h +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2023 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an β€œAS IS” BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "livekit/room_delegate.h" -#include "room.pb.h" - -namespace livekit { - -// --------- basic helper conversions --------- - -ConnectionQuality toConnectionQuality(proto::ConnectionQuality src); -ConnectionState toConnectionState(proto::ConnectionState src); -DataPacketKind toDataPacketKind(proto::DataPacketKind src); -EncryptionState toEncryptionState(proto::EncryptionState src); -DisconnectReason toDisconnectReason(proto::DisconnectReason src); - -TranscriptionSegmentData fromProto(const proto::TranscriptionSegment &src); -ChatMessageData fromProto(const proto::ChatMessage &src); -UserPacketData fromProto(const proto::UserPacket &src); -SipDtmfData fromProto(const proto::SipDTMF &src); -RoomInfoData fromProto(const proto::RoomInfo &src); -AttributeEntry fromProto(const proto::AttributesEntry &src); - -DataStreamHeaderData fromProto(const proto::DataStream_Header &src); -DataStreamChunkData fromProto(const proto::DataStream_Chunk &src); -DataStreamTrailerData fromProto(const proto::DataStream_Trailer &src); - -// --------- event conversions (RoomEvent.oneof message) --------- - -ParticipantConnectedEvent fromProto(const proto::ParticipantConnected &src); -ParticipantDisconnectedEvent -fromProto(const proto::ParticipantDisconnected &src); - -LocalTrackPublishedEvent fromProto(const proto::LocalTrackPublished &src); -LocalTrackUnpublishedEvent fromProto(const proto::LocalTrackUnpublished &src); -LocalTrackSubscribedEvent fromProto(const proto::LocalTrackSubscribed &src); - -TrackPublishedEvent fromProto(const proto::TrackPublished &src); -TrackUnpublishedEvent fromProto(const proto::TrackUnpublished &src); -TrackUnsubscribedEvent fromProto(const proto::TrackUnsubscribed &src); -TrackSubscriptionFailedEvent -fromProto(const proto::TrackSubscriptionFailed &src); -TrackMutedEvent fromProto(const proto::TrackMuted &src); -TrackUnmutedEvent fromProto(const proto::TrackUnmuted &src); - -ActiveSpeakersChangedEvent fromProto(const proto::ActiveSpeakersChanged &src); - -RoomMetadataChangedEvent fromProto(const proto::RoomMetadataChanged &src); -RoomSidChangedEvent fromProto(const proto::RoomSidChanged &src); - -ParticipantMetadataChangedEvent -fromProto(const proto::ParticipantMetadataChanged &src); -ParticipantNameChangedEvent fromProto(const proto::ParticipantNameChanged &src); -ParticipantAttributesChangedEvent -fromProto(const proto::ParticipantAttributesChanged &src); -ParticipantEncryptionStatusChangedEvent -fromProto(const proto::ParticipantEncryptionStatusChanged &src); - -ConnectionQualityChangedEvent -fromProto(const proto::ConnectionQualityChanged &src); - -DataPacketReceivedEvent fromProto(const proto::DataPacketReceived &src); -TranscriptionReceivedEvent fromProto(const proto::TranscriptionReceived &src); - -ConnectionStateChangedEvent fromProto(const proto::ConnectionStateChanged &src); -DisconnectedEvent fromProto(const proto::Disconnected &src); -ReconnectingEvent fromProto(const proto::Reconnecting &src); -ReconnectedEvent fromProto(const proto::Reconnected &src); -RoomEosEvent fromProto(const proto::RoomEOS &src); - -DataStreamHeaderReceivedEvent -fromProto(const proto::DataStreamHeaderReceived &src); -DataStreamChunkReceivedEvent -fromProto(const proto::DataStreamChunkReceived &src); -DataStreamTrailerReceivedEvent -fromProto(const proto::DataStreamTrailerReceived &src); - -DataChannelBufferedAmountLowThresholdChangedEvent -fromProto(const proto::DataChannelBufferedAmountLowThresholdChanged &src); - -ByteStreamOpenedEvent fromProto(const proto::ByteStreamOpened &src); -TextStreamOpenedEvent fromProto(const proto::TextStreamOpened &src); - -RoomUpdatedEvent -roomUpdatedFromProto(const proto::RoomInfo &src); // room_updated -RoomMovedEvent roomMovedFromProto(const proto::RoomInfo &src); // moved - -ParticipantsUpdatedEvent fromProto(const proto::ParticipantsUpdated &src); -E2eeStateChangedEvent fromProto(const proto::E2eeStateChanged &src); -ChatMessageReceivedEvent fromProto(const proto::ChatMessageReceived &src); - -} // namespace livekit diff --git a/src/room_proto_converter.cpp b/src/room_proto_converter.cpp index 0c41a6a..7a6b1c1 100644 --- a/src/room_proto_converter.cpp +++ b/src/room_proto_converter.cpp @@ -16,6 +16,7 @@ #include "room_proto_converter.h" +#include "livekit/local_participant.h" #include "room.pb.h" namespace livekit { diff --git a/src/room_proto_converter.h b/src/room_proto_converter.h index 89ba7c4..127a70c 100644 --- a/src/room_proto_converter.h +++ b/src/room_proto_converter.h @@ -19,8 +19,12 @@ #include "livekit/room_delegate.h" #include "room.pb.h" +#include + namespace livekit { +enum class RpcErrorCode; + // --------- basic helper conversions --------- ConnectionQuality toConnectionQuality(proto::ConnectionQuality in); diff --git a/src/rpc_error.cpp b/src/rpc_error.cpp new file mode 100644 index 0000000..14c7d29 --- /dev/null +++ b/src/rpc_error.cpp @@ -0,0 +1,91 @@ +/* + * Copyright 2025 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an β€œAS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/rpc_error.h" + +#include "rpc.pb.h" + +namespace livekit { + +RpcError::RpcError(std::uint32_t code, std::string message, std::string data) + : std::runtime_error(message), code_(code), message_(std::move(message)), + data_(std::move(data)) {} + +RpcError::RpcError(ErrorCode code, std::string message, std::string data) + : RpcError(static_cast(code), std::move(message), + std::move(data)) {} + +std::uint32_t RpcError::code() const noexcept { return code_; } + +const std::string &RpcError::message() const noexcept { return message_; } + +const std::string &RpcError::data() const noexcept { return data_; } + +proto::RpcError RpcError::toProto() const { + proto::RpcError err; + err.set_code(code_); + err.set_message(message_); + + // Set data only if non-empty; empty string means "no data". + if (!data_.empty()) { + err.set_data(data_); + } + + return err; +} + +RpcError RpcError::fromProto(const proto::RpcError &err) { + // proto::RpcError.data() will return empty string if unset, which is fine. + return RpcError(err.code(), err.message(), err.data()); +} + +RpcError RpcError::builtIn(ErrorCode code, const std::string &data) { + const char *msg = defaultMessageFor(code); + return RpcError(code, msg ? std::string(msg) : std::string{}, data); +} + +const char *RpcError::defaultMessageFor(ErrorCode code) { + // Mirror Python RpcError.ErrorMessage mapping. + switch (code) { + case ErrorCode::APPLICATION_ERROR: + return "Application error in method handler"; + case ErrorCode::CONNECTION_TIMEOUT: + return "Connection timeout"; + case ErrorCode::RESPONSE_TIMEOUT: + return "Response timeout"; + case ErrorCode::RECIPIENT_DISCONNECTED: + return "Recipient disconnected"; + case ErrorCode::RESPONSE_PAYLOAD_TOO_LARGE: + return "Response payload too large"; + case ErrorCode::SEND_FAILED: + return "Failed to send"; + case ErrorCode::UNSUPPORTED_METHOD: + return "Method not supported at destination"; + case ErrorCode::RECIPIENT_NOT_FOUND: + return "Recipient not found"; + case ErrorCode::REQUEST_PAYLOAD_TOO_LARGE: + return "Request payload too large"; + case ErrorCode::UNSUPPORTED_SERVER: + return "RPC not supported by server"; + case ErrorCode::UNSUPPORTED_VERSION: + return "Unsupported RPC version"; + } + + // Should be unreachable if all enum values are covered. + return ""; +} + +} // namespace livekit \ No newline at end of file