feat: add Pydantic AI sync streaming converter#353
Open
danielmillerp wants to merge 1 commit intonextfrom
Open
feat: add Pydantic AI sync streaming converter#353danielmillerp wants to merge 1 commit intonextfrom
danielmillerp wants to merge 1 commit intonextfrom
Conversation
Introduces convert_pydantic_ai_to_agentex_events at agentex.lib.adk.providers._modules.pydantic_ai, mirroring the existing OpenAI Agents SDK converter in sync_provider.py. Maps Pydantic AI's AgentStreamEvent stream (PartStartEvent / PartDeltaEvent / PartEndEvent / FunctionToolResultEvent) into Agentex StreamTaskMessage* events, with first-class support for tool-call argument tokens streaming via ToolCallPartDelta.args_delta -> ToolRequestDelta.arguments_delta. This is slice 1 of an Agentex <-> Pydantic AI integration intended to match the level of support we have for OpenAI Agents SDK (sync, non-Temporal async, and Temporal). Subsequent slices will add the example agent, tracing wiring, and CLI templates. Adds pydantic-ai-slim>=1.0,<2 as a hard dependency, consistent with how openai-agents and other framework integrations are pinned. 22 unit tests cover text streaming, tool-call delta streaming, multi-step runs (where Pydantic AI part indices restart at 0 per model response), thinking/reasoning deltas, structured one-shot args, RetryPromptPart results, and ignored events. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment on lines
+73
to
+77
| if args_delta is None: | ||
| return "" | ||
| if isinstance(args_delta, str): | ||
| return args_delta | ||
| return json.dumps(args_delta) |
There was a problem hiding this comment.
Multiple consecutive dict-type
args_delta values produce invalid concatenated JSON
When a provider emits more than one ToolCallPartDelta with a dict args_delta, each call to json.dumps produces a standalone JSON object (e.g. '{"a":1}'). These get concatenated server-side as '{"a":1}{"b":2}', which is not valid JSON and will fail to parse on completion. The docstring says "in practice dict deltas arrive as a single final fragment", but that assumption is not enforced. Adding a warning log makes the failure visible rather than silently corrupt.
Suggested change
| if args_delta is None: | |
| return "" | |
| if isinstance(args_delta, str): | |
| return args_delta | |
| return json.dumps(args_delta) | |
| if args_delta is None: | |
| return "" | |
| if isinstance(args_delta, str): | |
| return args_delta | |
| # Dict deltas are one-shot: multiple dicts would concatenate to invalid JSON. | |
| # Log a warning so callers are not silently surprised. | |
| logger.warning( | |
| "ToolCallPartDelta.args_delta is a dict; converting with json.dumps. " | |
| "Multiple dict deltas will not compose to valid JSON." | |
| ) | |
| return json.dumps(args_delta) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/adk/providers/_modules/pydantic_ai.py
Line: 73-77
Comment:
**Multiple consecutive dict-type `args_delta` values produce invalid concatenated JSON**
When a provider emits more than one `ToolCallPartDelta` with a `dict` `args_delta`, each call to `json.dumps` produces a standalone JSON object (e.g. `'{"a":1}'`). These get concatenated server-side as `'{"a":1}{"b":2}'`, which is not valid JSON and will fail to parse on completion. The docstring says "in practice dict deltas arrive as a single final fragment", but that assumption is not enforced. Adding a warning log makes the failure visible rather than silently corrupt.
```suggestion
if args_delta is None:
return ""
if isinstance(args_delta, str):
return args_delta
# Dict deltas are one-shot: multiple dicts would concatenate to invalid JSON.
# Log a warning so callers are not silently surprised.
logger.warning(
"ToolCallPartDelta.args_delta is a dict; converting with json.dumps. "
"Multiple dict deltas will not compose to valid JSON."
)
return json.dumps(args_delta)
```
How can I resolve this? If you propose a fix, please make it concise.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Slice 1 of the Pydantic AI integration: a sync streaming event converter that maps Pydantic AI's
AgentStreamEventstream into AgentexStreamTaskMessage*events. Mirrors the existing OpenAI Agents SDK converter pattern insync_provider.py.agentex.lib.adk.providers._modules.pydantic_aiwithconvert_pydantic_ai_to_agentex_eventsToolCallPartDelta.args_delta) directly to Agentex'sToolRequestDelta.arguments_delta— token-by-token tool-call streaming works end-to-end without any new server-side primitivespydantic-ai-slim>=1.0,<2added as hard dep, consistent withopenai-agents/langgraph-checkpoint/claude-agent-sdkEvent mapping
PartStartEvent(TextPart)StreamTaskMessageStart(TextContent)PartStartEvent(ThinkingPart)StreamTaskMessageStart(TextContent)(reasoning channel)PartStartEvent(ToolCallPart)StreamTaskMessageStart(ToolRequestContent)PartDeltaEvent(TextPartDelta)StreamTaskMessageDelta(TextDelta)PartDeltaEvent(ThinkingPartDelta)StreamTaskMessageDelta(ReasoningContentDelta)PartDeltaEvent(ToolCallPartDelta)StreamTaskMessageDelta(ToolRequestDelta)PartEndEventStreamTaskMessageDoneFunctionToolResultEventStreamTaskMessageFull(ToolResponseContent)FunctionToolCallEvent/FinalResultEvent/AgentRunResultEventMulti-step runs
Pydantic AI restarts part indices at 0 per model response. The converter maintains its own monotonic
message_indexand apart_index -> message_indexmap that's overwritten on eachPartStartEvent, so a multi-step run (text → tool → text) produces 4 distinct Agentex message indices.Roadmap
This is the first of several slices to bring Pydantic AI to feature parity with the OpenAI Agents SDK integration:
Agent.instrument_all()agentex new-agent --type sync-pydantic-ai)TemporalAgent+PydanticAIPlugin)Test plan
rye run pytest tests/lib/adk/providers/test_pydantic_ai.py -v🤖 Generated with Claude Code
Greptile Summary
Adds
convert_pydantic_ai_to_agentex_events, a new async generator that maps Pydantic AI'sAgentStreamEventstream to AgentexStreamTaskMessage*events, mirroring the existing OpenAI Agents SDK converter. The implementation and test coverage are solid for the common cases, but there is one behavioral inconsistency with the established OpenAI reasoning channel contract worth resolving before end-to-end validation.PartEndEventemitsStreamTaskMessageDonefor all part types, includingThinkingPart.sync_provider.pyexplicitly skipsDonefor reasoning/thinking channels; if the server/client relies on that contract, this extra event will cause a mismatch._args_delta_to_strsilently produces concatenated-invalid JSON when a provider sends more than one dict-typedargs_delta— adding a warning makes the failure visible.Confidence Score: 3/5
Safe to merge for unit-test purposes; the P1 ThinkingPart done-event inconsistency should be resolved before the first end-to-end integration is attempted
One P1 finding (ThinkingPart emitting Done events contrary to the established reasoning-channel contract) caps the score at 4; lack of end-to-end validation further lowers it. P2 findings are contained.
src/agentex/lib/adk/providers/_modules/pydantic_ai.py — the ThinkingPart PartEndEvent handling at lines 448-452
Important Files Changed
Sequence Diagram
sequenceDiagram participant PAI as Pydantic AI Stream participant Conv as convert_pydantic_ai_to_agentex_events participant AEX as Agentex Server PAI->>Conv: PartStartEvent(TextPart) Conv->>AEX: StreamTaskMessageStart(TextContent) PAI->>Conv: PartDeltaEvent(TextPartDelta) Conv->>AEX: StreamTaskMessageDelta(TextDelta) PAI->>Conv: PartEndEvent Conv->>AEX: StreamTaskMessageDone PAI->>Conv: PartStartEvent(ThinkingPart) Conv->>AEX: StreamTaskMessageStart(TextContent, empty) PAI->>Conv: PartDeltaEvent(ThinkingPartDelta) Conv->>AEX: StreamTaskMessageDelta(ReasoningContentDelta) PAI->>Conv: PartEndEvent Conv->>AEX: StreamTaskMessageDone (differs from OpenAI provider) PAI->>Conv: PartStartEvent(ToolCallPart) Conv->>AEX: StreamTaskMessageStart(ToolRequestContent) PAI->>Conv: PartDeltaEvent(ToolCallPartDelta) Conv->>AEX: StreamTaskMessageDelta(ToolRequestDelta) PAI->>Conv: PartEndEvent Conv->>AEX: StreamTaskMessageDone PAI->>Conv: FunctionToolResultEvent Conv->>AEX: StreamTaskMessageFull(ToolResponseContent) PAI->>Conv: FunctionToolCallEvent / FinalResultEvent / AgentRunResultEvent Conv-->>AEX: (ignored)Comments Outside Diff (2)
src/agentex/lib/adk/providers/_modules/pydantic_ai.py, line 448-452 (link)ThinkingPartend emitsStreamTaskMessageDone— inconsistent with existing reasoning-channel contractsync_provider.py(the established OpenAI converter) explicitly skipsStreamTaskMessageDonefor reasoning/thinking channels:if message_type not in ("reasoning_content", "reasoning_summary"): yield StreamTaskMessageDone(...). Thepydantic_ai.pyconverter emitsDonefor all parts via the genericPartEndEventbranch, includingThinkingPart. If the Agentex server or client treats reasoning channels as never receiving a done event (relying only on the last delta to close them), this extraDonefor thinking parts will cause a behavioral mismatch. End-to-end tests are still pending, making this a silent risk.Prompt To Fix With AI
tests/lib/adk/providers/test_pydantic_ai.py, line 820-853 (link)AgentRunResultEventmissing from ignored-events test suiteThe converter imports and explicitly handles
AgentRunResultEventin theFunctionToolCallEvent | FinalResultEvent | AgentRunResultEventbranch, butTestIgnoredEventsonly verifiesFunctionToolCallEventandFinalResultEvent. Adding a test mirroringtest_final_result_event_ignoredforAgentRunResultEvent(requiringfrom pydantic_ai.run import AgentRunResultEvent) would close the coverage gap and confirm the import path is correct.Prompt To Fix With AI
Prompt To Fix All With AI
Reviews (1): Last reviewed commit: "feat: add Pydantic AI sync streaming con..." | Re-trigger Greptile