Skip to content

feat: add Pydantic AI sync streaming converter#353

Open
danielmillerp wants to merge 1 commit intonextfrom
dm/pydantic-ai-sync-streaming
Open

feat: add Pydantic AI sync streaming converter#353
danielmillerp wants to merge 1 commit intonextfrom
dm/pydantic-ai-sync-streaming

Conversation

@danielmillerp
Copy link
Copy Markdown
Contributor

@danielmillerp danielmillerp commented May 8, 2026

Summary

Slice 1 of the Pydantic AI integration: a sync streaming event converter that maps Pydantic AI's AgentStreamEvent stream into Agentex StreamTaskMessage* events. Mirrors the existing OpenAI Agents SDK converter pattern in sync_provider.py.

  • New module: agentex.lib.adk.providers._modules.pydantic_ai with convert_pydantic_ai_to_agentex_events
  • Maps tool-call argument tokens (ToolCallPartDelta.args_delta) directly to Agentex's ToolRequestDelta.arguments_delta — token-by-token tool-call streaming works end-to-end without any new server-side primitives
  • pydantic-ai-slim>=1.0,<2 added as hard dep, consistent with openai-agents / langgraph-checkpoint / claude-agent-sdk

Event mapping

Pydantic AI Agentex
PartStartEvent(TextPart) StreamTaskMessageStart(TextContent)
PartStartEvent(ThinkingPart) StreamTaskMessageStart(TextContent) (reasoning channel)
PartStartEvent(ToolCallPart) StreamTaskMessageStart(ToolRequestContent)
PartDeltaEvent(TextPartDelta) StreamTaskMessageDelta(TextDelta)
PartDeltaEvent(ThinkingPartDelta) StreamTaskMessageDelta(ReasoningContentDelta)
PartDeltaEvent(ToolCallPartDelta) StreamTaskMessageDelta(ToolRequestDelta)
PartEndEvent StreamTaskMessageDone
FunctionToolResultEvent StreamTaskMessageFull(ToolResponseContent)
FunctionToolCallEvent / FinalResultEvent / AgentRunResultEvent ignored (covered by the events above or run-level only)

Multi-step runs

Pydantic AI restarts part indices at 0 per model response. The converter maintains its own monotonic message_index and a part_index -> message_index map that's overwritten on each PartStartEvent, so a multi-step run (text → tool → text) produces 4 distinct Agentex message indices.

Roadmap

This is the first of several slices to bring Pydantic AI to feature parity with the OpenAI Agents SDK integration:

  1. ✅ Sync streaming converter + tests (this PR)
  2. Sync hello-world example agent (text-only streaming)
  3. Sync agent with tools (tool-call delta streaming)
  4. Tracing wired to SGP via Agent.instrument_all()
  5. Sync CLI template (agentex new-agent --type sync-pydantic-ai)
  6. Async (non-Temporal) example + template
  7. Temporal example + template (using TemporalAgent + PydanticAIPlugin)
  8. HIL, structured output, multi-agent examples

Test plan

  • 22 unit tests pass: rye run pytest tests/lib/adk/providers/test_pydantic_ai.py -v
  • End-to-end: hello-world agent in agentex-agents (separate PR) chats successfully and streams text token-by-token
  • End-to-end: tool-call args stream token-by-token visible in Agentex UI

🤖 Generated with Claude Code

Greptile Summary

Adds convert_pydantic_ai_to_agentex_events, a new async generator that maps Pydantic AI's AgentStreamEvent stream to Agentex StreamTaskMessage* events, mirroring the existing OpenAI Agents SDK converter. The implementation and test coverage are solid for the common cases, but there is one behavioral inconsistency with the established OpenAI reasoning channel contract worth resolving before end-to-end validation.

  • P1: PartEndEvent emits StreamTaskMessageDone for all part types, including ThinkingPart. sync_provider.py explicitly skips Done for reasoning/thinking channels; if the server/client relies on that contract, this extra event will cause a mismatch.
  • P2: _args_delta_to_str silently produces concatenated-invalid JSON when a provider sends more than one dict-typed args_delta — adding a warning makes the failure visible.

Confidence Score: 3/5

Safe to merge for unit-test purposes; the P1 ThinkingPart done-event inconsistency should be resolved before the first end-to-end integration is attempted

One P1 finding (ThinkingPart emitting Done events contrary to the established reasoning-channel contract) caps the score at 4; lack of end-to-end validation further lowers it. P2 findings are contained.

src/agentex/lib/adk/providers/_modules/pydantic_ai.py — the ThinkingPart PartEndEvent handling at lines 448-452

Important Files Changed

Filename Overview
src/agentex/lib/adk/providers/_modules/pydantic_ai.py New Pydantic AI → Agentex streaming converter; has a P1 behavioral inconsistency for ThinkingPart done events and a P2 dict-delta JSON composition issue
tests/lib/adk/providers/test_pydantic_ai.py 22 unit tests covering all major event-mapping paths; missing coverage for AgentRunResultEvent in the ignored-events suite
pyproject.toml Adds pydantic-ai-slim>=1.0,<2 as a hard dependency, consistent with other framework deps

Sequence Diagram

sequenceDiagram
    participant PAI as Pydantic AI Stream
    participant Conv as convert_pydantic_ai_to_agentex_events
    participant AEX as Agentex Server

    PAI->>Conv: PartStartEvent(TextPart)
    Conv->>AEX: StreamTaskMessageStart(TextContent)
    PAI->>Conv: PartDeltaEvent(TextPartDelta)
    Conv->>AEX: StreamTaskMessageDelta(TextDelta)
    PAI->>Conv: PartEndEvent
    Conv->>AEX: StreamTaskMessageDone

    PAI->>Conv: PartStartEvent(ThinkingPart)
    Conv->>AEX: StreamTaskMessageStart(TextContent, empty)
    PAI->>Conv: PartDeltaEvent(ThinkingPartDelta)
    Conv->>AEX: StreamTaskMessageDelta(ReasoningContentDelta)
    PAI->>Conv: PartEndEvent
    Conv->>AEX: StreamTaskMessageDone (differs from OpenAI provider)

    PAI->>Conv: PartStartEvent(ToolCallPart)
    Conv->>AEX: StreamTaskMessageStart(ToolRequestContent)
    PAI->>Conv: PartDeltaEvent(ToolCallPartDelta)
    Conv->>AEX: StreamTaskMessageDelta(ToolRequestDelta)
    PAI->>Conv: PartEndEvent
    Conv->>AEX: StreamTaskMessageDone

    PAI->>Conv: FunctionToolResultEvent
    Conv->>AEX: StreamTaskMessageFull(ToolResponseContent)

    PAI->>Conv: FunctionToolCallEvent / FinalResultEvent / AgentRunResultEvent
    Conv-->>AEX: (ignored)
Loading

Comments Outside Diff (2)

  1. src/agentex/lib/adk/providers/_modules/pydantic_ai.py, line 448-452 (link)

    P1 ThinkingPart end emits StreamTaskMessageDone — inconsistent with existing reasoning-channel contract

    sync_provider.py (the established OpenAI converter) explicitly skips StreamTaskMessageDone for reasoning/thinking channels: if message_type not in ("reasoning_content", "reasoning_summary"): yield StreamTaskMessageDone(...). The pydantic_ai.py converter emits Done for all parts via the generic PartEndEvent branch, including ThinkingPart. If the Agentex server or client treats reasoning channels as never receiving a done event (relying only on the last delta to close them), this extra Done for thinking parts will cause a behavioral mismatch. End-to-end tests are still pending, making this a silent risk.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: src/agentex/lib/adk/providers/_modules/pydantic_ai.py
    Line: 448-452
    
    Comment:
    **`ThinkingPart` end emits `StreamTaskMessageDone` — inconsistent with existing reasoning-channel contract**
    
    `sync_provider.py` (the established OpenAI converter) explicitly skips `StreamTaskMessageDone` for reasoning/thinking channels: `if message_type not in ("reasoning_content", "reasoning_summary"): yield StreamTaskMessageDone(...)`. The `pydantic_ai.py` converter emits `Done` for **all** parts via the generic `PartEndEvent` branch, including `ThinkingPart`. If the Agentex server or client treats reasoning channels as never receiving a done event (relying only on the last delta to close them), this extra `Done` for thinking parts will cause a behavioral mismatch. End-to-end tests are still pending, making this a silent risk.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

  2. tests/lib/adk/providers/test_pydantic_ai.py, line 820-853 (link)

    P2 AgentRunResultEvent missing from ignored-events test suite

    The converter imports and explicitly handles AgentRunResultEvent in the FunctionToolCallEvent | FinalResultEvent | AgentRunResultEvent branch, but TestIgnoredEvents only verifies FunctionToolCallEvent and FinalResultEvent. Adding a test mirroring test_final_result_event_ignored for AgentRunResultEvent (requiring from pydantic_ai.run import AgentRunResultEvent) would close the coverage gap and confirm the import path is correct.

    Prompt To Fix With AI
    This is a comment left during a code review.
    Path: tests/lib/adk/providers/test_pydantic_ai.py
    Line: 820-853
    
    Comment:
    **`AgentRunResultEvent` missing from ignored-events test suite**
    
    The converter imports and explicitly handles `AgentRunResultEvent` in the `FunctionToolCallEvent | FinalResultEvent | AgentRunResultEvent` branch, but `TestIgnoredEvents` only verifies `FunctionToolCallEvent` and `FinalResultEvent`. Adding a test mirroring `test_final_result_event_ignored` for `AgentRunResultEvent` (requiring `from pydantic_ai.run import AgentRunResultEvent`) would close the coverage gap and confirm the import path is correct.
    
    How can I resolve this? If you propose a fix, please make it concise.

    Fix in Cursor Fix in Claude Code Fix in Codex

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
src/agentex/lib/adk/providers/_modules/pydantic_ai.py:448-452
**`ThinkingPart` end emits `StreamTaskMessageDone` — inconsistent with existing reasoning-channel contract**

`sync_provider.py` (the established OpenAI converter) explicitly skips `StreamTaskMessageDone` for reasoning/thinking channels: `if message_type not in ("reasoning_content", "reasoning_summary"): yield StreamTaskMessageDone(...)`. The `pydantic_ai.py` converter emits `Done` for **all** parts via the generic `PartEndEvent` branch, including `ThinkingPart`. If the Agentex server or client treats reasoning channels as never receiving a done event (relying only on the last delta to close them), this extra `Done` for thinking parts will cause a behavioral mismatch. End-to-end tests are still pending, making this a silent risk.

### Issue 2 of 3
src/agentex/lib/adk/providers/_modules/pydantic_ai.py:73-77
**Multiple consecutive dict-type `args_delta` values produce invalid concatenated JSON**

When a provider emits more than one `ToolCallPartDelta` with a `dict` `args_delta`, each call to `json.dumps` produces a standalone JSON object (e.g. `'{"a":1}'`). These get concatenated server-side as `'{"a":1}{"b":2}'`, which is not valid JSON and will fail to parse on completion. The docstring says "in practice dict deltas arrive as a single final fragment", but that assumption is not enforced. Adding a warning log makes the failure visible rather than silently corrupt.

```suggestion
    if args_delta is None:
        return ""
    if isinstance(args_delta, str):
        return args_delta
    # Dict deltas are one-shot: multiple dicts would concatenate to invalid JSON.
    # Log a warning so callers are not silently surprised.
    logger.warning(
        "ToolCallPartDelta.args_delta is a dict; converting with json.dumps. "
        "Multiple dict deltas will not compose to valid JSON."
    )
    return json.dumps(args_delta)
```

### Issue 3 of 3
tests/lib/adk/providers/test_pydantic_ai.py:820-853
**`AgentRunResultEvent` missing from ignored-events test suite**

The converter imports and explicitly handles `AgentRunResultEvent` in the `FunctionToolCallEvent | FinalResultEvent | AgentRunResultEvent` branch, but `TestIgnoredEvents` only verifies `FunctionToolCallEvent` and `FinalResultEvent`. Adding a test mirroring `test_final_result_event_ignored` for `AgentRunResultEvent` (requiring `from pydantic_ai.run import AgentRunResultEvent`) would close the coverage gap and confirm the import path is correct.

Reviews (1): Last reviewed commit: "feat: add Pydantic AI sync streaming con..." | Re-trigger Greptile

Greptile also left 1 inline comment on this PR.

Introduces convert_pydantic_ai_to_agentex_events at
agentex.lib.adk.providers._modules.pydantic_ai, mirroring the existing
OpenAI Agents SDK converter in sync_provider.py. Maps Pydantic AI's
AgentStreamEvent stream (PartStartEvent / PartDeltaEvent / PartEndEvent
/ FunctionToolResultEvent) into Agentex StreamTaskMessage* events,
with first-class support for tool-call argument tokens streaming via
ToolCallPartDelta.args_delta -> ToolRequestDelta.arguments_delta.

This is slice 1 of an Agentex <-> Pydantic AI integration intended to
match the level of support we have for OpenAI Agents SDK (sync,
non-Temporal async, and Temporal). Subsequent slices will add the
example agent, tracing wiring, and CLI templates.

Adds pydantic-ai-slim>=1.0,<2 as a hard dependency, consistent with
how openai-agents and other framework integrations are pinned.

22 unit tests cover text streaming, tool-call delta streaming,
multi-step runs (where Pydantic AI part indices restart at 0 per
model response), thinking/reasoning deltas, structured one-shot args,
RetryPromptPart results, and ignored events.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment on lines +73 to +77
if args_delta is None:
return ""
if isinstance(args_delta, str):
return args_delta
return json.dumps(args_delta)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Multiple consecutive dict-type args_delta values produce invalid concatenated JSON

When a provider emits more than one ToolCallPartDelta with a dict args_delta, each call to json.dumps produces a standalone JSON object (e.g. '{"a":1}'). These get concatenated server-side as '{"a":1}{"b":2}', which is not valid JSON and will fail to parse on completion. The docstring says "in practice dict deltas arrive as a single final fragment", but that assumption is not enforced. Adding a warning log makes the failure visible rather than silently corrupt.

Suggested change
if args_delta is None:
return ""
if isinstance(args_delta, str):
return args_delta
return json.dumps(args_delta)
if args_delta is None:
return ""
if isinstance(args_delta, str):
return args_delta
# Dict deltas are one-shot: multiple dicts would concatenate to invalid JSON.
# Log a warning so callers are not silently surprised.
logger.warning(
"ToolCallPartDelta.args_delta is a dict; converting with json.dumps. "
"Multiple dict deltas will not compose to valid JSON."
)
return json.dumps(args_delta)
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/agentex/lib/adk/providers/_modules/pydantic_ai.py
Line: 73-77

Comment:
**Multiple consecutive dict-type `args_delta` values produce invalid concatenated JSON**

When a provider emits more than one `ToolCallPartDelta` with a `dict` `args_delta`, each call to `json.dumps` produces a standalone JSON object (e.g. `'{"a":1}'`). These get concatenated server-side as `'{"a":1}{"b":2}'`, which is not valid JSON and will fail to parse on completion. The docstring says "in practice dict deltas arrive as a single final fragment", but that assumption is not enforced. Adding a warning log makes the failure visible rather than silently corrupt.

```suggestion
    if args_delta is None:
        return ""
    if isinstance(args_delta, str):
        return args_delta
    # Dict deltas are one-shot: multiple dicts would concatenate to invalid JSON.
    # Log a warning so callers are not silently surprised.
    logger.warning(
        "ToolCallPartDelta.args_delta is a dict; converting with json.dumps. "
        "Multiple dict deltas will not compose to valid JSON."
    )
    return json.dumps(args_delta)
```

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant