From 3d80c0c50bc073078261e695abc390777595dc68 Mon Sep 17 00:00:00 2001 From: root Date: Thu, 19 Feb 2026 01:19:16 +0000 Subject: [PATCH] fix: recover from non-JSON lines in stdout stream When non-JSON lines appear on stdout (e.g. verbose HTTP logs, sandbox debug messages, mTLS configuration output), the existing json_buffer accumulation logic permanently poisons the buffer. Once any non-JSON content enters the buffer, all subsequent json.loads() calls fail, causing the SDK to silently drop every remaining message. In practice this manifests as query() returning incomplete results or the client hanging indefinitely. Root cause: _read_messages_impl unconditionally appends every stdout line to json_buffer. A non-JSON line like "2026-02-19 [DEBUG] ..." gets concatenated with the next valid JSON object, producing unparseable content that persists for the lifetime of the stream. Fix: try standalone json.loads() first (covers the common case of a complete JSON object on a single line). Only start buffering if the line starts with "{" (the stream-json protocol emits JSON objects). Non-JSON lines are logged at DEBUG level and skipped. Fixes #347 Co-Authored-By: Claude Opus 4.6 --- .../_internal/transport/subprocess_cli.py | 29 ++++- tests/test_subprocess_buffering.py | 101 ++++++++++++++++++ 2 files changed, 125 insertions(+), 5 deletions(-) diff --git a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py index 1f0aac58..0860ff80 100644 --- a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py @@ -540,8 +540,29 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]: if not json_line: continue - # Keep accumulating partial JSON until we can parse it - json_buffer += json_line + # Try standalone parse first (most common case). + # This prevents non-JSON lines (e.g. verbose HTTP logs, + # sandbox debug messages) from poisoning the buffer. + if not json_buffer: + try: + data = json.loads(json_line) + yield data + continue + except json.JSONDecodeError: + # Only start buffering if line looks like start of + # a JSON object. The stream-json protocol emits + # JSON objects, so "{" is the expected start. + if json_line.startswith("{"): + json_buffer = json_line + else: + logger.debug( + "Skipping non-JSON line on stdout: %s", + json_line[:100], + ) + continue + else: + # Accumulate into existing buffer + json_buffer += json_line if len(json_buffer) > self._max_buffer_size: buffer_length = len(json_buffer) @@ -558,9 +579,7 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]: json_buffer = "" yield data except json.JSONDecodeError: - # We are speculatively decoding the buffer until we get - # a full JSON object. If there is an actual issue, we - # raise an error after exceeding the configured limit. + # Still incomplete, keep buffering continue except anyio.ClosedResourceError: diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 03710748..891ea192 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -327,3 +327,104 @@ async def _test() -> None: assert messages[2]["subtype"] == "end" anyio.run(_test) + + def test_non_json_lines_dont_poison_buffer(self) -> None: + """Test that non-JSON lines on stdout don't poison the buffer. + + Non-JSON lines (e.g. verbose HTTP logs, sandbox debug messages) should + be skipped without affecting parsing of subsequent JSON messages. + Reproduces the bug described in https://github.com/anthropics/claude-agent-sdk-python/issues/347 + """ + + async def _test() -> None: + json_obj1 = {"type": "system", "subtype": "init", "session_id": "abc123"} + json_obj2 = { + "type": "assistant", + "content": [{"type": "text", "text": "Hello"}], + } + json_obj3 = {"type": "result", "result": "done", "is_error": False} + + # Simulate non-JSON lines interleaved with valid JSON messages + lines = [ + json.dumps(json_obj1) + "\n", + "2026-02-19T01:12:58.573Z [DEBUG] configureGlobalMTLS starting\n", + "[Anthropic SDK INFO] post https://api.example.com status 200\n", + json.dumps(json_obj2) + "\n", + "Warning: some random stderr leaked to stdout\n", + json.dumps(json_obj3) + "\n", + ] + + transport = SubprocessCLITransport(prompt="test", options=make_options()) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(lines) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.read_messages(): + messages.append(msg) + + assert len(messages) == 3 + assert messages[0]["type"] == "system" + assert messages[0]["session_id"] == "abc123" + assert messages[1]["type"] == "assistant" + assert messages[1]["content"][0]["text"] == "Hello" + assert messages[2]["type"] == "result" + assert messages[2]["result"] == "done" + + anyio.run(_test) + + def test_non_json_lines_between_split_json(self) -> None: + """Test non-JSON lines don't corrupt an in-progress buffered JSON parse. + + When a JSON object is split across multiple reads and a non-JSON line + appears before it, the buffer should not be corrupted. + """ + + async def _test() -> None: + msg_before = json.dumps({"type": "system", "subtype": "start"}) + + large_msg = { + "type": "assistant", + "message": { + "content": [{"type": "text", "text": "y" * 5000}], + }, + } + large_json = json.dumps(large_msg) + + msg_after = json.dumps({"type": "system", "subtype": "end"}) + + # Split the large JSON across reads, with a non-JSON line before it + lines = [ + msg_before + "\n", + "Some debug log line that is not JSON\n", + large_json[:1000], + large_json[1000:], + "\n" + msg_after, + ] + + transport = SubprocessCLITransport(prompt="test", options=make_options()) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(lines) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.read_messages(): + messages.append(msg) + + assert len(messages) == 3 + assert messages[0]["type"] == "system" + assert messages[0]["subtype"] == "start" + assert messages[1]["type"] == "assistant" + assert len(messages[1]["message"]["content"][0]["text"]) == 5000 + assert messages[2]["type"] == "system" + assert messages[2]["subtype"] == "end" + + anyio.run(_test)