Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 10 additions & 18 deletions python/packages/core/agent_framework/_workflows/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,45 +340,38 @@ async def _run_core(
Yields:
WorkflowEvent objects from the workflow execution.
"""
# Determine the execution mode based on state
# Determine the execution mode based on state.
# The streaming flag controls the workflow's internal streaming mode,
# which affects executor behavior (e.g. AgentExecutor emits different event
# types in streaming vs non-streaming mode).
if bool(self.pending_requests):
# This is a continuation - send function responses back
function_responses = self._process_pending_requests(input_messages)

if streaming:
async for event in self.workflow.send_responses_streaming(function_responses):
async for event in self.workflow.run(responses=function_responses, stream=True, **kwargs):
yield event
else:
workflow_result = await self.workflow.send_responses(function_responses)
for event in workflow_result:
for event in await self.workflow.run(responses=function_responses, **kwargs):
yield event

elif checkpoint_id is not None:
# Resume from checkpoint - don't prepend thread history since workflow state
# is being restored from the checkpoint
if streaming:
async for event in self.workflow.run(
message=None,
stream=True,
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage,
**kwargs,
):
yield event
else:
workflow_result = await self.workflow.run(
message=None,
for event in await self.workflow.run(
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage,
**kwargs,
)
for event in workflow_result:
):
yield event

else:
# Initial run - build conversation from thread history
conversation_messages = await self._build_conversation_messages(thread, input_messages)

if streaming:
async for event in self.workflow.run(
message=conversation_messages,
Expand All @@ -388,12 +381,11 @@ async def _run_core(
):
yield event
else:
workflow_result = await self.workflow.run(
for event in await self.workflow.run(
message=conversation_messages,
checkpoint_storage=checkpoint_storage,
**kwargs,
)
for event in workflow_result:
):
yield event

# endregion Run Methods
Expand Down
48 changes: 48 additions & 0 deletions python/packages/core/agent_framework/_workflows/_typing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,54 @@ def is_instance_of(data: Any, target_type: type | UnionType | Any) -> bool:
return isinstance(data, target_type)


def try_coerce_to_type(data: Any, target_type: type | UnionType | Any) -> Any:
"""Try to coerce data to the target type.

Attempts lightweight type coercion for common cases where raw data
(e.g., from JSON deserialization) needs to be converted to the expected type.

Returns the coerced value if successful, or the original value if coercion
is not needed or not possible.

Args:
data: The data to coerce.
target_type: The type to coerce to.

Returns:
The coerced value, or the original value if coercion fails.
"""
# If already the right type, return as-is
if is_instance_of(data, target_type):
return data

# Can't coerce to non-concrete targets (Union, generic, etc.)
if not isinstance(target_type, type):
return data

# int -> float (JSON integers for float fields)
if isinstance(data, int) and target_type is float:
return float(data)

# dict -> dataclass
if isinstance(data, dict):
from dataclasses import is_dataclass

if is_dataclass(target_type):
try:
return target_type(**data)
except (TypeError, ValueError):
return data

# dict -> Pydantic model
if hasattr(target_type, "model_validate"):
try:
return target_type.model_validate(data)
except Exception:
return data

return data


def serialize_type(t: type) -> str:
"""Serialize a type to a string.

Expand Down
Loading
Loading