Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion python/packages/claude/agent_framework_claude/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,12 +618,30 @@ def run(
"""
response = ResponseStream(
self._get_stream(messages, session=session, options=options, **kwargs),
finalizer=AgentResponse.from_updates,
finalizer=self._finalize_response,
)
if stream:
return response
return response.get_final_response()

@staticmethod
def _finalize_response(updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]:
"""Build AgentResponse and propagate structured_output as value.

Args:
updates: The collected stream updates.

Returns:
An AgentResponse with structured_output set as value if present.
"""
response = AgentResponse.from_updates(updates)
for update in updates:
if update.additional_properties and "structured_output" in update.additional_properties:
response._value = update.additional_properties["structured_output"]
response._value_parsed = True
break
return response

async def _get_stream(
self,
messages: AgentRunInputs | None = None,
Expand All @@ -647,6 +665,7 @@ async def _get_stream(
await self._apply_runtime_options(dict(options) if options else None)

session_id: str | None = None
structured_output: Any = None

await self._client.query(prompt)
async for message in self._client.receive_response():
Expand Down Expand Up @@ -700,6 +719,14 @@ async def _get_stream(
error_msg = message.result or "Unknown error from Claude API"
raise AgentException(f"Claude API error: {error_msg}")
session_id = message.session_id
structured_output = message.structured_output

# Yield structured output if present
if structured_output is not None:
yield AgentResponseUpdate(
role="assistant",
additional_properties={"structured_output": structured_output},
)

# Update session with session ID
if session_id:
Expand Down
160 changes: 160 additions & 0 deletions python/packages/claude/tests/test_claude_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,3 +785,163 @@ async def test_apply_runtime_options_none(self) -> None:
await agent._apply_runtime_options(None) # type: ignore[reportPrivateUsage]
mock_client.set_model.assert_not_called()
mock_client.set_permission_mode.assert_not_called()


# region Test ClaudeAgent Structured Output


class TestClaudeAgentStructuredOutput:
"""Tests for ClaudeAgent structured output propagation."""

@staticmethod
async def _create_async_generator(items: list[Any]) -> Any:
"""Helper to create async generator from list."""
for item in items:
yield item
Comment on lines +796 to +800
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type annotation 'Any' for this async generator helper is too broad. Consider specifying the actual type being yielded. While this doesn't affect functionality, a more precise type hint like 'AsyncIterator[Any]' would be clearer and more maintainable.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot open a new pull request to apply changes based on this feedback


def _create_mock_client(self, messages: list[Any]) -> MagicMock:
"""Create a mock ClaudeSDKClient that yields given messages."""
mock_client = MagicMock()
mock_client.connect = AsyncMock()
mock_client.disconnect = AsyncMock()
mock_client.query = AsyncMock()
mock_client.set_model = AsyncMock()
mock_client.set_permission_mode = AsyncMock()
mock_client.receive_response = MagicMock(return_value=self._create_async_generator(messages))
return mock_client

async def test_structured_output_propagated_to_response(self) -> None:
"""Test that structured_output from ResultMessage is propagated to response.value."""
from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock
from claude_agent_sdk.types import StreamEvent

structured_data = {"name": "Alice", "age": 30}
messages = [
StreamEvent(
event={
"type": "content_block_delta",
"delta": {"type": "text_delta", "text": '{"name": "Alice", "age": 30}'},
},
uuid="event-1",
session_id="session-123",
),
AssistantMessage(
content=[TextBlock(text='{"name": "Alice", "age": 30}')],
model="claude-sonnet",
),
ResultMessage(
subtype="success",
duration_ms=100,
duration_api_ms=50,
is_error=False,
num_turns=1,
session_id="session-123",
structured_output=structured_data,
),
]
mock_client = self._create_mock_client(messages)

with patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client):
agent = ClaudeAgent()
response = await agent.run("Return structured data")
assert response.value == structured_data

async def test_structured_output_none_when_not_present(self) -> None:
"""Test that response.value is None when structured_output is not present."""
from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock
from claude_agent_sdk.types import StreamEvent

messages = [
StreamEvent(
event={
"type": "content_block_delta",
"delta": {"type": "text_delta", "text": "Hello!"},
},
uuid="event-1",
session_id="session-123",
),
AssistantMessage(
content=[TextBlock(text="Hello!")],
model="claude-sonnet",
),
ResultMessage(
subtype="success",
duration_ms=100,
duration_api_ms=50,
is_error=False,
num_turns=1,
session_id="session-123",
),
]
mock_client = self._create_mock_client(messages)

with patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client):
agent = ClaudeAgent()
response = await agent.run("Hello")
assert response.value is None

async def test_structured_output_with_streaming(self) -> None:
"""Test that structured_output is available in streaming updates."""
from claude_agent_sdk import AssistantMessage, ResultMessage, TextBlock
from claude_agent_sdk.types import StreamEvent

structured_data = {"key": "value"}
messages = [
StreamEvent(
event={
"type": "content_block_delta",
"delta": {"type": "text_delta", "text": '{"key": "value"}'},
},
uuid="event-1",
session_id="session-123",
),
AssistantMessage(
content=[TextBlock(text='{"key": "value"}')],
model="claude-sonnet",
),
ResultMessage(
subtype="success",
duration_ms=100,
duration_api_ms=50,
is_error=False,
num_turns=1,
session_id="session-123",
structured_output=structured_data,
),
]
mock_client = self._create_mock_client(messages)

with patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client):
agent = ClaudeAgent()
updates: list[AgentResponseUpdate] = []
async for update in agent.run("Return structured data", stream=True):
updates.append(update)
# Last update should carry structured_output in additional_properties
last_update = updates[-1]
assert last_update.additional_properties is not None
assert last_update.additional_properties.get("structured_output") == structured_data

async def test_structured_output_with_error_does_not_propagate(self) -> None:
"""Test that structured_output is not propagated when ResultMessage is an error."""
from agent_framework.exceptions import AgentException
from claude_agent_sdk import ResultMessage

messages = [
ResultMessage(
subtype="error",
duration_ms=100,
duration_api_ms=50,
is_error=True,
num_turns=0,
session_id="error-session",
result="Something went wrong",
structured_output={"some": "data"},
),
]
mock_client = self._create_mock_client(messages)

with patch("agent_framework_claude._agent.ClaudeSDKClient", return_value=mock_client):
agent = ClaudeAgent()
with pytest.raises(AgentException) as exc_info:
await agent.run("Hello")
assert "Something went wrong" in str(exc_info.value)
Loading