From ad91393087e597d2675a37645b790f2ab0b52d03 Mon Sep 17 00:00:00 2001 From: yen0304 Date: Fri, 5 Jun 2026 01:24:25 +0800 Subject: [PATCH 1/3] fix(realtime): add reconnection resilience to send_raw() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit send_raw() was missing the try/except that send() has — if the WebSocket disconnects mid-send, the data is silently lost instead of being enqueued for retry after reconnection. --- src/openai/resources/realtime/realtime.py | 14 ++++++++++++-- src/openai/resources/responses/responses.py | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/openai/resources/realtime/realtime.py b/src/openai/resources/realtime/realtime.py index e4c5bd8163..bde3423e89 100644 --- a/src/openai/resources/realtime/realtime.py +++ b/src/openai/resources/realtime/realtime.py @@ -362,7 +362,12 @@ async def send_raw(self, data: bytes | str) -> None: raw = data if isinstance(data, str) else data.decode("utf-8") self._send_queue.enqueue(raw) return - await self._connection.send(data) + try: + await self._connection.send(data) + except Exception: + raw = data if isinstance(data, str) else data.decode("utf-8") + self._send_queue.enqueue(raw) + raise async def close(self, *, code: int = 1000, reason: str = "") -> None: self._intentionally_closed = True @@ -842,7 +847,12 @@ def send_raw(self, data: bytes | str) -> None: raw = data if isinstance(data, str) else data.decode("utf-8") self._send_queue.enqueue(raw) return - self._connection.send(data) + try: + self._connection.send(data) + except Exception: + raw = data if isinstance(data, str) else data.decode("utf-8") + self._send_queue.enqueue(raw) + raise def close(self, *, code: int = 1000, reason: str = "") -> None: self._intentionally_closed = True diff --git a/src/openai/resources/responses/responses.py b/src/openai/resources/responses/responses.py index 5019d7e831..bf1712cc10 100644 --- a/src/openai/resources/responses/responses.py +++ b/src/openai/resources/responses/responses.py @@ -3855,7 +3855,12 @@ async def send_raw(self, data: bytes | str) -> None: raw = data if isinstance(data, str) else data.decode("utf-8") self._send_queue.enqueue(raw) return - await self._connection.send(data) + try: + await self._connection.send(data) + except Exception: + raw = data if isinstance(data, str) else data.decode("utf-8") + self._send_queue.enqueue(raw) + raise async def close(self, *, code: int = 1000, reason: str = "") -> None: self._intentionally_closed = True @@ -4312,7 +4317,12 @@ def send_raw(self, data: bytes | str) -> None: raw = data if isinstance(data, str) else data.decode("utf-8") self._send_queue.enqueue(raw) return - self._connection.send(data) + try: + self._connection.send(data) + except Exception: + raw = data if isinstance(data, str) else data.decode("utf-8") + self._send_queue.enqueue(raw) + raise def close(self, *, code: int = 1000, reason: str = "") -> None: self._intentionally_closed = True From 02561611ea9c582fbb7e36286bf961cdf4992e29 Mon Sep 17 00:00:00 2001 From: yen0304 Date: Tue, 9 Jun 2026 23:11:15 +0800 Subject: [PATCH 2/3] fix(realtime): preserve binary frames when queueing failed raw sends send_raw() accepts bytes | str, but the reconnect/retry path decoded bytes to UTF-8 before enqueueing. That turned binary WebSocket frames into text frames and raised UnicodeDecodeError for arbitrary binary payloads (e.g. audio chunks containing 0xff) before the original connection failure could surface. SendQueue now stores bytes | str natively and counts byte length per type, so send_raw() can enqueue the original payload unchanged on both the reconnecting and send-failure paths (sync + async, realtime + responses). Addresses Codex review feedback on #3363. --- src/openai/_send_queue.py | 19 ++++++------ src/openai/resources/realtime/realtime.py | 12 +++----- src/openai/resources/responses/responses.py | 12 +++----- tests/test_send_queue.py | 33 +++++++++++++++++++++ 4 files changed, 51 insertions(+), 25 deletions(-) diff --git a/src/openai/_send_queue.py b/src/openai/_send_queue.py index b35d0fbcba..998b14850a 100644 --- a/src/openai/_send_queue.py +++ b/src/openai/_send_queue.py @@ -11,31 +11,32 @@ class SendQueue: """Bounded byte-size queue for outgoing WebSocket messages. - Messages are stored as pre-serialized strings. The queue enforces a - maximum byte budget so that unbounded buffering cannot occur during - reconnection windows. + Messages are stored as either ``str`` (text frames) or ``bytes`` (binary + frames), preserving the original frame type so that binary payloads are + not corrupted on replay. The queue enforces a maximum byte budget so that + unbounded buffering cannot occur during reconnection windows. """ def __init__(self, max_bytes: int = 1_048_576) -> None: - self._queue: list[tuple[str, int]] = [] # (data, byte_length) + self._queue: list[tuple[bytes | str, int]] = [] # (data, byte_length) self._bytes: int = 0 self._max_bytes = max_bytes self._lock = threading.Lock() - def enqueue(self, data: str) -> None: + def enqueue(self, data: bytes | str) -> None: """Append *data* to the queue. Raises :class:`WebSocketQueueFullError` if the message would exceed the byte-size limit. """ - byte_length = len(data.encode("utf-8")) + byte_length = len(data) if isinstance(data, bytes) else len(data.encode("utf-8")) with self._lock: if self._bytes + byte_length > self._max_bytes: raise WebSocketQueueFullError("send queue is full, message discarded") self._queue.append((data, byte_length)) self._bytes += byte_length - def flush_sync(self, send: typing.Callable[[str], object]) -> None: + def flush_sync(self, send: typing.Callable[[bytes | str], object]) -> None: """Send every queued message via *send*. If *send* raises, the failing message and all subsequent messages @@ -56,7 +57,7 @@ def flush_sync(self, send: typing.Callable[[str], object]) -> None: self._bytes = sum(bl for _, bl in self._queue) raise - async def flush_async(self, send: typing.Callable[[str], typing.Awaitable[object]]) -> None: + async def flush_async(self, send: typing.Callable[[bytes | str], typing.Awaitable[object]]) -> None: """Async variant of :meth:`flush_sync`.""" with self._lock: pending = list(self._queue) @@ -73,7 +74,7 @@ async def flush_async(self, send: typing.Callable[[str], typing.Awaitable[object self._bytes = sum(bl for _, bl in self._queue) raise - def drain(self) -> list[str]: + def drain(self) -> list[bytes | str]: """Remove and return all queued messages.""" with self._lock: items = [data for data, _ in self._queue] diff --git a/src/openai/resources/realtime/realtime.py b/src/openai/resources/realtime/realtime.py index bde3423e89..a29248e3c8 100644 --- a/src/openai/resources/realtime/realtime.py +++ b/src/openai/resources/realtime/realtime.py @@ -359,14 +359,12 @@ async def send(self, event: RealtimeClientEvent | RealtimeClientEventParam) -> N async def send_raw(self, data: bytes | str) -> None: if self._is_reconnecting: - raw = data if isinstance(data, str) else data.decode("utf-8") - self._send_queue.enqueue(raw) + self._send_queue.enqueue(data) return try: await self._connection.send(data) except Exception: - raw = data if isinstance(data, str) else data.decode("utf-8") - self._send_queue.enqueue(raw) + self._send_queue.enqueue(data) raise async def close(self, *, code: int = 1000, reason: str = "") -> None: @@ -844,14 +842,12 @@ def send(self, event: RealtimeClientEvent | RealtimeClientEventParam) -> None: def send_raw(self, data: bytes | str) -> None: if self._is_reconnecting: - raw = data if isinstance(data, str) else data.decode("utf-8") - self._send_queue.enqueue(raw) + self._send_queue.enqueue(data) return try: self._connection.send(data) except Exception: - raw = data if isinstance(data, str) else data.decode("utf-8") - self._send_queue.enqueue(raw) + self._send_queue.enqueue(data) raise def close(self, *, code: int = 1000, reason: str = "") -> None: diff --git a/src/openai/resources/responses/responses.py b/src/openai/resources/responses/responses.py index bf1712cc10..06f3d8e28a 100644 --- a/src/openai/resources/responses/responses.py +++ b/src/openai/resources/responses/responses.py @@ -3852,14 +3852,12 @@ async def send(self, event: ResponsesClientEvent | ResponsesClientEventParam) -> async def send_raw(self, data: bytes | str) -> None: if self._is_reconnecting: - raw = data if isinstance(data, str) else data.decode("utf-8") - self._send_queue.enqueue(raw) + self._send_queue.enqueue(data) return try: await self._connection.send(data) except Exception: - raw = data if isinstance(data, str) else data.decode("utf-8") - self._send_queue.enqueue(raw) + self._send_queue.enqueue(data) raise async def close(self, *, code: int = 1000, reason: str = "") -> None: @@ -4314,14 +4312,12 @@ def send(self, event: ResponsesClientEvent | ResponsesClientEventParam) -> None: def send_raw(self, data: bytes | str) -> None: if self._is_reconnecting: - raw = data if isinstance(data, str) else data.decode("utf-8") - self._send_queue.enqueue(raw) + self._send_queue.enqueue(data) return try: self._connection.send(data) except Exception: - raw = data if isinstance(data, str) else data.decode("utf-8") - self._send_queue.enqueue(raw) + self._send_queue.enqueue(data) raise def close(self, *, code: int = 1000, reason: str = "") -> None: diff --git a/tests/test_send_queue.py b/tests/test_send_queue.py index 61db916bc4..6d0676337f 100644 --- a/tests/test_send_queue.py +++ b/tests/test_send_queue.py @@ -19,6 +19,39 @@ def test_enqueue_and_drain(self) -> None: assert items == ['{"type": "session.update"}', '{"type": "response.create"}'] assert len(q) == 0 + def test_enqueue_preserves_binary_frames(self) -> None: + """Binary payloads must be stored as-is, not decoded to text. + + Decoding to UTF-8 would corrupt binary frames and raise + ``UnicodeDecodeError`` for arbitrary bytes (e.g. audio chunks). + """ + q = SendQueue() + binary = b"\xff\xfe\x00audio" # not valid UTF-8 + q.enqueue(binary) + q.enqueue("text") + + items = q.drain() + assert items == [binary, "text"] + assert isinstance(items[0], bytes) + assert isinstance(items[1], str) + + def test_enqueue_counts_binary_byte_length(self) -> None: + q = SendQueue(max_bytes=4) + q.enqueue(b"\xff\xfe\xfd\xfc") # 4 bytes, fits exactly + with pytest.raises(WebSocketQueueFullError): + q.enqueue(b"\x00") # would exceed + assert len(q) == 1 + + def test_flush_sync_preserves_binary(self) -> None: + q = SendQueue() + binary = b"\xff\xfe" + q.enqueue(binary) + q.enqueue("text") + + sent: list[bytes | str] = [] + q.flush_sync(sent.append) + assert sent == [binary, "text"] + def test_enqueue_respects_byte_limit(self) -> None: q = SendQueue(max_bytes=10) q.enqueue("12345") # 5 bytes, fits From 4c9e59dc096f997fa4bf21347013262312632f0f Mon Sep 17 00:00:00 2001 From: yen0304 Date: Tue, 9 Jun 2026 23:16:44 +0800 Subject: [PATCH 3/3] test(realtime): cover binary payload replay across reconnect Add an end-to-end test that drives the real _reconnect() + flush path: a send_raw() with non-UTF-8 bytes fails on a dropped socket, gets queued, and is replayed byte-for-byte to the new connection after reconnect. Verified this fails (UnicodeDecodeError) against the prior decode-to-UTF-8 implementation. --- tests/test_realtime_reconnect.py | 89 ++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/test_realtime_reconnect.py diff --git a/tests/test_realtime_reconnect.py b/tests/test_realtime_reconnect.py new file mode 100644 index 0000000000..27b14744bc --- /dev/null +++ b/tests/test_realtime_reconnect.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from typing import Any + +import pytest + +from openai.resources.realtime.realtime import AsyncRealtimeConnection + + +def _connection_closed_error(code: int = 1011) -> Exception: + from websockets.frames import Close + from websockets.exceptions import ConnectionClosedError + + return ConnectionClosedError(Close(code=code, reason=""), None) + + +class _DeadConnection: + """A connection whose send() always fails, simulating a dropped socket.""" + + async def send(self, _data: bytes | str) -> None: + raise _connection_closed_error() + + async def close(self, *, code: int = 1000, reason: str = "") -> None: + pass + + +class _RecordingConnection: + """The connection returned after a successful reconnect.""" + + def __init__(self) -> None: + self.sent: list[bytes | str] = [] + + async def send(self, data: bytes | str) -> None: + self.sent.append(data) + + +def _make_connection(new_conn: _RecordingConnection) -> AsyncRealtimeConnection: + async def make_ws(_extra_query: Any, _extra_headers: Any) -> Any: + return new_conn + + return AsyncRealtimeConnection( + _DeadConnection(), # type: ignore[arg-type] + make_ws=make_ws, + on_reconnecting=lambda _event: None, + max_retries=1, + initial_delay=0.0, + max_delay=0.0, + ) + + +@pytest.mark.asyncio +async def test_reconnect_resends_binary_payload_unchanged() -> None: + """End-to-end: a binary send_raw() that fails mid-send is queued and + replayed byte-for-byte after reconnect, without UTF-8 corruption.""" + from websockets.exceptions import ConnectionClosedError + + new_conn = _RecordingConnection() + conn = _make_connection(new_conn) + + binary = b"\xff\xfe\x00audio" # not valid UTF-8 (would crash on decode) + + # send fails on the dead socket -> the original connection error must + # surface (NOT a UnicodeDecodeError from decoding the binary payload), + # and the payload must be queued for replay. + with pytest.raises(ConnectionClosedError): + await conn.send_raw(binary) + + # Drive the real reconnect path, which flushes the queue to the new socket. + reconnected = await conn._reconnect(_connection_closed_error()) + assert reconnected is True + + assert new_conn.sent == [binary] + assert isinstance(new_conn.sent[0], bytes) + + +@pytest.mark.asyncio +async def test_reconnect_resends_text_payload() -> None: + """A str send_raw() is replayed as text after reconnect.""" + from websockets.exceptions import ConnectionClosedError + + new_conn = _RecordingConnection() + conn = _make_connection(new_conn) + + with pytest.raises(ConnectionClosedError): + await conn.send_raw('{"type": "input_audio_buffer.append"}') + + assert await conn._reconnect(_connection_closed_error()) is True + assert new_conn.sent == ['{"type": "input_audio_buffer.append"}'] + assert isinstance(new_conn.sent[0], str)