-
Notifications
You must be signed in to change notification settings - Fork 14
Send webRTC stats to SFU #213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Move signal.ws.open trace from connection_manager to signaling._on_open() where it fires when the WebSocket actually opens. Add joinRequest trace before sending the join request to match JS SDK behavior.
Extract local-candidate, remote-candidate, and candidate-pair stats from aiortc's ICE transports since getStats() doesn't provide them. Format matches JS SDK for dashboard compatibility.
- Add tests/rtc/test_tracer.py: 11 tests for Tracer and sanitize_value - Add tests/rtc/test_stats_tracer.py: 5 tests for StatsTracer filtering, delta compression, and ICE candidate stats (using real aiortc PC) - Add tracing test to test_signaling.py - Fix bug: empty Tracer evaluated to False due to __len__ - Delete 3 redundant signaling test files that tested fake/mock code
📝 WalkthroughWalkthroughThis PR adds a tracing buffer, per‑PC stats collection (StatsTracer), periodic SFU stats reporting (SfuStatsReporter), and propagates tracing through connection, signaling, and RPC layers; enforces explicit RTCConfiguration and removes several legacy signaling test mocks. (50 words) Changes
Sequence Diagram(s)sequenceDiagram
participant CM as ConnectionManager
participant PM as PeerConnectionManager
participant PC as PeerConnection
participant ST as StatsTracer
participant SR as SfuStatsReporter
participant SC as SignalClient
CM->>PM: on join -> _build_rtc_configuration()
PM->>PC: create publisher/subscriber PC
PM->>ST: instantiate StatsTracer for each PC
CM->>SR: create SfuStatsReporter (tracer, session_id)
SR->>SR: start() -> schedule periodic _run()
loop every interval
SR->>PM: request publisher/subscriber stats
PM->>ST: StatsTracer.get()
ST->>PC: getStats()
PC-->>ST: RTCStatsReport
ST-->>SR: ComputedStats
SR->>CM: tracer.take()
CM-->>SR: TraceSlice (snapshot + rollback)
SR->>SC: SendStats RPC (includes traces + stats)
alt success
SC-->>SR: OK
else failure
SR->>CM: rollback() -> restore traces
end
end
CM->>SR: stop() on leave -> cancel tasks
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@getstream/video/rtc/signaling.py`:
- Around line 137-151: The trace currently emits the full joinRequest
(MessageToDict(self.join_request)) which can include auth tokens; before calling
self._tracer.trace for "joinRequest" convert the proto to a dict, locate and
remove or replace sensitive keys (e.g., "auth", "token", "accessToken",
"authToken") inside the resulting dict/joinRequest entry (or set their values to
"REDACTED") and then pass that sanitized dict to self._tracer.trace so no
secrets are emitted; update the code around MessageToDict(self.join_request) and
the subsequent "joinRequest" trace call to use this sanitized payload.
In `@getstream/video/rtc/twirp_client_wrapper.py`:
- Around line 176-193: The current truthiness check "should_trace = tracer and
name not in RPC_TRACE_EXCLUSIONS" can be False when tracer implements __len__
(e.g., empty buffer); change it to an explicit None and enabled check: compute
should_trace using "tracer is not None" and if the tracer has an enabled
attribute use that (e.g., getattr(tracer, 'enabled', True)), and still ensure
name not in RPC_TRACE_EXCLUSIONS; keep the rest of the block (sfu_id_fn(),
extracting request via MessageToDict, and tracer.trace(name, sfu_id,
request_data)) unchanged.
In `@tests/rtc/test_peer_connection_config.py`:
- Around line 3-21: Replace the unittest.mock.Mock usage in
_create_manager_with_credentials with a lightweight stub (e.g., SimpleNamespace)
provided by a pytest fixture: build a simple namespace object that has
join_response -> credentials -> ice_servers and server.edge_name attributes and
pass that stub into PeerConnectionManager instead of Mock(), or create a fixture
that returns that namespace and update tests to use it; modify the helper
_create_manager_with_credentials to construct and return
PeerConnectionManager(stub) so only attribute access (no mocking) is used.
🧹 Nitpick comments (5)
tests/rtc/test_stats_tracer.py (2)
27-30: Moveasyncioimport to module level for consistency.The import is inside the function but is a standard library module used in async tests. Moving it to the top with the other imports improves readability.
Suggested change
import pytest from aiortc import RTCPeerConnection, RTCConfiguration, RTCIceServer +import asyncioAnd remove line 28.
78-91: Inconsistent RTCConfiguration usage between tests.The publisher test explicitly provides an RTCConfiguration with ICE servers (lines 15-17), while this subscriber test creates
RTCPeerConnection()without configuration. For consistency and to ensure similar ICE gathering behavior, consider using the same configuration pattern.Suggested fix
async def test_subscriber_filters_outbound_rtp(self): """Subscriber stats should filter out outbound-rtp but include codec stats.""" from getstream.video.rtc.stats_tracer import StatsTracer - pc = RTCPeerConnection() + config = RTCConfiguration( + iceServers=[RTCIceServer(urls=["stun:stun.l.google.com:19302"])] + ) + pc = RTCPeerConnection(configuration=config)getstream/video/rtc/stats_reporter.py (1)
117-128: First stats report is delayed by the full interval.The loop sleeps before the first
_run()call, meaning stats won't be sent untilinterval_msafterstart(). If immediate initial reporting is desired, consider running once before entering the loop or moving the sleep to the end.Alternative pattern for immediate first report
async def _run_loop(self) -> None: """Main loop that periodically collects and sends stats.""" while self._running: + try: + await self._run() + except asyncio.CancelledError: + break + except Exception as e: + logger.warning(f"Failed to send stats: {e}") await asyncio.sleep(self._interval_ms / 1000) if not self._running: break - try: - await self._run() - except asyncio.CancelledError: - break - except Exception as e: - logger.warning(f"Failed to send stats: {e}")getstream/video/rtc/stats_tracer.py (1)
365-372: Fragile coupling to aiortc internals should be documented.The code accesses
_RTCPeerConnection__iceTransports(a name-mangled private attribute) at line 370. While the defensive coding withgetattr(..., set())provides a fallback and the method is wrapped in try/except, this coupling to aiortc internals is fragile across versions.The aiortc-getstream dependency is pinned to
1.13.0.post1in thewebrtcextras, which mitigates the version drift risk. Consider adding an inline comment at line 370 documenting which aiortc-getstream version this attribute access is known to work with, or implement a version check on import to fail loudly rather than silently returning incomplete stats if the aiortc API changes.tests/rtc/test_tracer.py (1)
62-129: Use a Tracer fixture to align with test guidelines and reduce repetition.
Centralizing setup keeps tests consistent and minimizes boilerplate.♻️ Example fixture refactor
+import pytest + +@pytest.fixture +def tracer(): + return Tracer() ... - def test_trace_adds_record_with_correct_format(self): - tracer = Tracer() + def test_trace_adds_record_with_correct_format(self, tracer): tracer.trace("test-tag", "0-pub", {"key": "value"})As per coding guidelines: Use fixtures to inject objects in tests.
| if self._tracer is not None: | ||
| from google.protobuf.json_format import MessageToDict | ||
|
|
||
| sfu_id = self._sfu_id_fn() if self._sfu_id_fn else None | ||
| self._tracer.trace("signal.ws.open", sfu_id, {"isTrusted": True}) | ||
| self._tracer.trace( | ||
| "joinRequest", | ||
| sfu_id, | ||
| { | ||
| "requestPayload": { | ||
| "oneofKind": "joinRequest", | ||
| "joinRequest": MessageToDict(self.join_request), | ||
| } | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redact auth tokens before tracing joinRequest.
The joinRequest payload includes the auth token; avoid emitting secrets in trace data by masking or removing it before tracing.
🔐 Token redaction example
if self._tracer is not None:
- from google.protobuf.json_format import MessageToDict
+ from google.protobuf.json_format import MessageToDict
sfu_id = self._sfu_id_fn() if self._sfu_id_fn else None
self._tracer.trace("signal.ws.open", sfu_id, {"isTrusted": True})
+ join_request_dict = MessageToDict(self.join_request)
+ join_request_dict.pop("token", None)
self._tracer.trace(
"joinRequest",
sfu_id,
{
"requestPayload": {
"oneofKind": "joinRequest",
- "joinRequest": MessageToDict(self.join_request),
+ "joinRequest": join_request_dict,
}
},
)🤖 Prompt for AI Agents
In `@getstream/video/rtc/signaling.py` around lines 137 - 151, The trace currently
emits the full joinRequest (MessageToDict(self.join_request)) which can include
auth tokens; before calling self._tracer.trace for "joinRequest" convert the
proto to a dict, locate and remove or replace sensitive keys (e.g., "auth",
"token", "accessToken", "authToken") inside the resulting dict/joinRequest entry
(or set their values to "REDACTED") and then pass that sanitized dict to
self._tracer.trace so no secrets are emitted; update the code around
MessageToDict(self.join_request) and the subsequent "joinRequest" trace call to
use this sanitized payload.
| # Trace RPC request (if not excluded) | ||
| should_trace = tracer and name not in RPC_TRACE_EXCLUSIONS | ||
| sfu_id = sfu_id_fn() if sfu_id_fn else None | ||
|
|
||
| # Extract request data for tracing | ||
| request_data = None | ||
| if should_trace: | ||
| request = kwargs.get("request") | ||
| if request: | ||
| try: | ||
| from google.protobuf.json_format import MessageToDict | ||
|
|
||
| # Use default camelCase field names (no preserving_proto_field_name) | ||
| request_data = MessageToDict(request) | ||
| except Exception: | ||
| request_data = str(request) | ||
| tracer.trace(name, sfu_id, request_data) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find the file
fd "twirp_client_wrapper.py"Repository: GetStream/stream-py
Length of output: 106
🏁 Script executed:
# Find the Tracer class definition
rg "class Tracer" --type py -A 5Repository: GetStream/stream-py
Length of output: 435
🏁 Script executed:
# Check the content of twirp_client_wrapper.py around lines 176-193
fd "twirp_client_wrapper.py" --exec cat -n {} \; | sed -n '170,200p'Repository: GetStream/stream-py
Length of output: 1840
🏁 Script executed:
# Read the full Tracer class implementation
fd "tracer.py" -p "video/rtc" --exec cat -n {}Repository: GetStream/stream-py
Length of output: 159
🏁 Script executed:
# Find and read the Tracer class file
fd "tracer.py" --type f --exec cat -n {}Repository: GetStream/stream-py
Length of output: 41334
Use an explicit None check to avoid tracer truthiness issues from __len__.
When the buffer is empty, __len__() returns 0, making the tracer falsy and silently disabling tracing. Replace the truthiness check with an explicit is not None comparison and check the enabled property:
🐛 Fix should_trace gating
- should_trace = tracer and name not in RPC_TRACE_EXCLUSIONS
+ should_trace = (
+ tracer is not None
+ and tracer.enabled
+ and name not in RPC_TRACE_EXCLUSIONS
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Trace RPC request (if not excluded) | |
| should_trace = tracer and name not in RPC_TRACE_EXCLUSIONS | |
| sfu_id = sfu_id_fn() if sfu_id_fn else None | |
| # Extract request data for tracing | |
| request_data = None | |
| if should_trace: | |
| request = kwargs.get("request") | |
| if request: | |
| try: | |
| from google.protobuf.json_format import MessageToDict | |
| # Use default camelCase field names (no preserving_proto_field_name) | |
| request_data = MessageToDict(request) | |
| except Exception: | |
| request_data = str(request) | |
| tracer.trace(name, sfu_id, request_data) | |
| # Trace RPC request (if not excluded) | |
| should_trace = ( | |
| tracer is not None | |
| and tracer.enabled | |
| and name not in RPC_TRACE_EXCLUSIONS | |
| ) | |
| sfu_id = sfu_id_fn() if sfu_id_fn else None | |
| # Extract request data for tracing | |
| request_data = None | |
| if should_trace: | |
| request = kwargs.get("request") | |
| if request: | |
| try: | |
| from google.protobuf.json_format import MessageToDict | |
| # Use default camelCase field names (no preserving_proto_field_name) | |
| request_data = MessageToDict(request) | |
| except Exception: | |
| request_data = str(request) | |
| tracer.trace(name, sfu_id, request_data) |
🤖 Prompt for AI Agents
In `@getstream/video/rtc/twirp_client_wrapper.py` around lines 176 - 193, The
current truthiness check "should_trace = tracer and name not in
RPC_TRACE_EXCLUSIONS" can be False when tracer implements __len__ (e.g., empty
buffer); change it to an explicit None and enabled check: compute should_trace
using "tracer is not None" and if the tracer has an enabled attribute use that
(e.g., getattr(tracer, 'enabled', True)), and still ensure name not in
RPC_TRACE_EXCLUSIONS; keep the rest of the block (sfu_id_fn(), extracting
request via MessageToDict, and tracer.trace(name, sfu_id, request_data))
unchanged.
| import aiortc | ||
| from unittest.mock import Mock | ||
|
|
||
| from getstream.video.rtc.peer_connection import PeerConnectionManager | ||
|
|
||
|
|
||
| class TestPeerConnectionConfig: | ||
| """Tests for ICE server configuration conversion.""" | ||
|
|
||
| def _create_manager_with_credentials( | ||
| self, ice_servers: list | ||
| ) -> PeerConnectionManager: | ||
| """Create a PeerConnectionManager with mocked credentials.""" | ||
| mock_connection_manager = Mock() | ||
| mock_connection_manager.join_response.credentials.ice_servers = ice_servers | ||
| mock_connection_manager.join_response.credentials.server.edge_name = ( | ||
| "sfu-dpk-london-test.stream-io-video.com" | ||
| ) | ||
| return PeerConnectionManager(mock_connection_manager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, check if the file exists and read its content
if [ -f "tests/rtc/test_peer_connection_config.py" ]; then
echo "=== File exists. Reading content ==="
wc -l tests/rtc/test_peer_connection_config.py
echo ""
cat -n tests/rtc/test_peer_connection_config.py
else
echo "File does not exist at tests/rtc/test_peer_connection_config.py"
# Try to find the file
find . -name "test_peer_connection_config.py" -type f
fiRepository: GetStream/stream-py
Length of output: 4989
🏁 Script executed:
# Also check the PeerConnectionManager class to understand its interface
fd -e "py" -x rg -l "class PeerConnectionManager" {} \;Repository: GetStream/stream-py
Length of output: 103
🏁 Script executed:
# Now let's examine the PeerConnectionManager class to understand what interface it expects
head -100 ./getstream/video/rtc/peer_connection.pyRepository: GetStream/stream-py
Length of output: 3804
🏁 Script executed:
# Also check if there's an __init__ method or constructor that shows expected interface
ast-grep --pattern 'class PeerConnectionManager {
$$$
}'Repository: GetStream/stream-py
Length of output: 45
🏁 Script executed:
# Let's see more of the PeerConnectionManager class to understand what attributes it accesses from connection_manager
sed -n '1,200p' ./getstream/video/rtc/peer_connection.py | grep -n "self.connection_manager\." | head -30Repository: GetStream/stream-py
Length of output: 547
🏁 Script executed:
# Let's specifically check the _build_rtc_configuration and _get_connection_config methods
sed -n '200,400p' ./getstream/video/rtc/peer_connection.pyRepository: GetStream/stream-py
Length of output: 7918
Avoid unittest.mock.Mock in tests; use a fixture with lightweight stubs instead.
The _create_manager_with_credentials helper method uses Mock() which violates the coding guideline: "Do not use mocks or mock objects in tests unless directly requested." Since the code only accesses attributes (no method calls), a SimpleNamespace-based stub injected via a pytest fixture is the correct approach.
🧪 Example stub fixture (no mocks)
-import aiortc
-from unittest.mock import Mock
+import aiortc
+from types import SimpleNamespace
+import pytest
...
-class TestPeerConnectionConfig:
- def _create_manager_with_credentials(
- self, ice_servers: list
- ) -> PeerConnectionManager:
- """Create a PeerConnectionManager with mocked credentials."""
- mock_connection_manager = Mock()
- mock_connection_manager.join_response.credentials.ice_servers = ice_servers
- mock_connection_manager.join_response.credentials.server.edge_name = (
- "sfu-dpk-london-test.stream-io-video.com"
- )
- return PeerConnectionManager(mock_connection_manager)
+@pytest.fixture
+def manager_factory():
+ def _factory(ice_servers: list) -> PeerConnectionManager:
+ credentials = SimpleNamespace(
+ ice_servers=ice_servers,
+ server=SimpleNamespace(
+ edge_name="sfu-dpk-london-test.stream-io-video.com"
+ ),
+ )
+ join_response = SimpleNamespace(credentials=credentials)
+ connection_manager = SimpleNamespace(join_response=join_response)
+ return PeerConnectionManager(connection_manager)
+ return _factory- manager = self._create_manager_with_credentials(ice_servers)
+ manager = manager_factory(ice_servers)🤖 Prompt for AI Agents
In `@tests/rtc/test_peer_connection_config.py` around lines 3 - 21, Replace the
unittest.mock.Mock usage in _create_manager_with_credentials with a lightweight
stub (e.g., SimpleNamespace) provided by a pytest fixture: build a simple
namespace object that has join_response -> credentials -> ice_servers and
server.edge_name attributes and pass that stub into PeerConnectionManager
instead of Mock(), or create a fixture that returns that namespace and update
tests to use it; modify the helper _create_manager_with_credentials to construct
and return PeerConnectionManager(stub) so only attribute access (no mocking) is
used.
|
Also tagged @thesyncim since he knows this area better than most |
4cc7960 to
b61de98
Compare
WebRTC Stats and Tracing Implementation
Summary
SendStatsRPCWhat's Included
Core Components:
Tracer: Central trace buffer with rollback support for failed sendsStatsTracer: Per-PeerConnection stats collection with delta compression and performance metricsSfuStatsReporter: Orchestrates periodic stats collection (8s interval) and sends to SFUTracing Coverage:
create, state changes, ICE candidates, track eventscreateOffer,setLocalDescription,setRemoteDescriptionwith success/failure variantssignal.ws.open,joinRequestSetPublisher,SendAnswer, etc. with request/response datagetstatswith delta-compressed dataStats Fields Provided:
ssrc,kind,packetsSent,bytesSent,packetsReceived,packetsLost,jitter,roundTripTimemid,mediaType,codecId(not provided by aiortc natively)frameWidth,frameHeight,framesSent/framesDecoded,framesPerSecond,totalEncodeTime/totalDecodeTimelocal-candidate,remote-candidate,candidate-pairpayloadType,mimeType,clockRate,channels,sdpFmtpLineavg_frame_time_ms,avg_fps,video_dimension,codecLimitations (aiortc vs browser):
pliCount,nackCount,firCount)targetBitrate,qpSum)rid,qualityLimitationReason)STATS_LIMITATIONS.mdfor full detailsTest Plan
test_tracer.py: Trace buffer, sanitization, rollbacktest_stats_tracer.py: Delta compression, codec/ICE statstest_peer_connection_config.py: RTCConfiguration buildingtest_signaling.py: Signal client tracingSummary by CodeRabbit
Release Notes
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.