diff --git a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py index 1f0aac58..0860ff80 100644 --- a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py @@ -540,8 +540,29 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]: if not json_line: continue - # Keep accumulating partial JSON until we can parse it - json_buffer += json_line + # Try standalone parse first (most common case). + # This prevents non-JSON lines (e.g. verbose HTTP logs, + # sandbox debug messages) from poisoning the buffer. + if not json_buffer: + try: + data = json.loads(json_line) + yield data + continue + except json.JSONDecodeError: + # Only start buffering if line looks like start of + # a JSON object. The stream-json protocol emits + # JSON objects, so "{" is the expected start. + if json_line.startswith("{"): + json_buffer = json_line + else: + logger.debug( + "Skipping non-JSON line on stdout: %s", + json_line[:100], + ) + continue + else: + # Accumulate into existing buffer + json_buffer += json_line if len(json_buffer) > self._max_buffer_size: buffer_length = len(json_buffer) @@ -558,9 +579,7 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]: json_buffer = "" yield data except json.JSONDecodeError: - # We are speculatively decoding the buffer until we get - # a full JSON object. If there is an actual issue, we - # raise an error after exceeding the configured limit. + # Still incomplete, keep buffering continue except anyio.ClosedResourceError: diff --git a/tests/test_subprocess_buffering.py b/tests/test_subprocess_buffering.py index 03710748..891ea192 100644 --- a/tests/test_subprocess_buffering.py +++ b/tests/test_subprocess_buffering.py @@ -327,3 +327,104 @@ async def _test() -> None: assert messages[2]["subtype"] == "end" anyio.run(_test) + + def test_non_json_lines_dont_poison_buffer(self) -> None: + """Test that non-JSON lines on stdout don't poison the buffer. + + Non-JSON lines (e.g. verbose HTTP logs, sandbox debug messages) should + be skipped without affecting parsing of subsequent JSON messages. + Reproduces the bug described in https://github.com/anthropics/claude-agent-sdk-python/issues/347 + """ + + async def _test() -> None: + json_obj1 = {"type": "system", "subtype": "init", "session_id": "abc123"} + json_obj2 = { + "type": "assistant", + "content": [{"type": "text", "text": "Hello"}], + } + json_obj3 = {"type": "result", "result": "done", "is_error": False} + + # Simulate non-JSON lines interleaved with valid JSON messages + lines = [ + json.dumps(json_obj1) + "\n", + "2026-02-19T01:12:58.573Z [DEBUG] configureGlobalMTLS starting\n", + "[Anthropic SDK INFO] post https://api.example.com status 200\n", + json.dumps(json_obj2) + "\n", + "Warning: some random stderr leaked to stdout\n", + json.dumps(json_obj3) + "\n", + ] + + transport = SubprocessCLITransport(prompt="test", options=make_options()) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(lines) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.read_messages(): + messages.append(msg) + + assert len(messages) == 3 + assert messages[0]["type"] == "system" + assert messages[0]["session_id"] == "abc123" + assert messages[1]["type"] == "assistant" + assert messages[1]["content"][0]["text"] == "Hello" + assert messages[2]["type"] == "result" + assert messages[2]["result"] == "done" + + anyio.run(_test) + + def test_non_json_lines_between_split_json(self) -> None: + """Test non-JSON lines don't corrupt an in-progress buffered JSON parse. + + When a JSON object is split across multiple reads and a non-JSON line + appears before it, the buffer should not be corrupted. + """ + + async def _test() -> None: + msg_before = json.dumps({"type": "system", "subtype": "start"}) + + large_msg = { + "type": "assistant", + "message": { + "content": [{"type": "text", "text": "y" * 5000}], + }, + } + large_json = json.dumps(large_msg) + + msg_after = json.dumps({"type": "system", "subtype": "end"}) + + # Split the large JSON across reads, with a non-JSON line before it + lines = [ + msg_before + "\n", + "Some debug log line that is not JSON\n", + large_json[:1000], + large_json[1000:], + "\n" + msg_after, + ] + + transport = SubprocessCLITransport(prompt="test", options=make_options()) + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.wait = AsyncMock(return_value=None) + transport._process = mock_process + transport._stdout_stream = MockTextReceiveStream(lines) + transport._stderr_stream = MockTextReceiveStream([]) + + messages: list[Any] = [] + async for msg in transport.read_messages(): + messages.append(msg) + + assert len(messages) == 3 + assert messages[0]["type"] == "system" + assert messages[0]["subtype"] == "start" + assert messages[1]["type"] == "assistant" + assert len(messages[1]["message"]["content"][0]["text"]) == 5000 + assert messages[2]["type"] == "system" + assert messages[2]["subtype"] == "end" + + anyio.run(_test)