From 53b6cc0bb4f802ad019ff152cd317670f8e5cc26 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Mon, 17 Nov 2025 13:55:45 -0800 Subject: [PATCH 01/11] Respond with AgentRunResponse --- .../_entities.py | 102 ++++++++------- .../agent_framework_azurefunctions/_models.py | 47 ------- .../_orchestration.py | 87 +++++++++++-- .../packages/azurefunctions/tests/test_app.py | 19 +-- .../azurefunctions/tests/test_entities.py | 97 ++++++++------ .../azurefunctions/tests/test_models.py | 118 +----------------- .../tests/test_orchestration.py | 40 +++++- .../function_app.py | 8 +- .../function_app.py | 44 +++++-- .../function_app.py | 26 +--- .../function_app.py | 34 ++--- 11 files changed, 296 insertions(+), 326 deletions(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py index 8df8e3f335..7c3e1b0a75 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py @@ -9,15 +9,14 @@ import asyncio import inspect -import json from collections.abc import AsyncIterable, Callable from typing import Any, cast import azure.durable_functions as df -from agent_framework import AgentProtocol, AgentRunResponse, AgentRunResponseUpdate, Role, get_logger +from agent_framework import AgentProtocol, AgentRunResponse, AgentRunResponseUpdate, ChatMessage, Role, get_logger from ._callbacks import AgentCallbackContext, AgentResponseCallbackProtocol -from ._models import AgentResponse, RunRequest +from ._models import RunRequest from ._state import AgentState logger = get_logger("agent_framework.azurefunctions.entities") @@ -65,7 +64,7 @@ async def run_agent( self, context: df.DurableEntityContext, request: RunRequest | dict[str, Any] | str, - ) -> dict[str, Any]: + ) -> AgentRunResponse: """Execute the agent with a message directly in the entity. Args: @@ -73,13 +72,8 @@ async def run_agent( request: RunRequest object, dict, or string message (for backward compatibility) Returns: - Dict with status information and response (serialized AgentResponse) - - Note: - The agent returns an AgentRunResponse object which is stored in state. - This method extracts the text/structured response and returns an AgentResponse dict. + AgentRunResponse enriched with execution metadata. """ - # Convert string or dict to RunRequest if isinstance(request, str): run_request = RunRequest(message=request, role=Role.USER) elif isinstance(request, dict): @@ -103,16 +97,16 @@ async def run_agent( logger.debug(f"[AgentEntity.run_agent] Correlation ID: {correlation_id}") logger.debug(f"[AgentEntity.run_agent] Role: {role.value}") logger.debug(f"[AgentEntity.run_agent] Enable tool calls: {enable_tool_calls}") - logger.debug(f"[AgentEntity.run_agent] Response format: {'provided' if response_format else 'none'}") + logger.debug( + "[AgentEntity.run_agent] Response format: %s", + "provided" if response_format else "none", + ) - # Store message in history with role self.state.add_user_message(message, role=role, correlation_id=correlation_id) logger.debug("[AgentEntity.run_agent] Executing agent...") try: - logger.debug("[AgentEntity.run_agent] Starting agent invocation") - run_kwargs: dict[str, Any] = {"messages": self.state.get_chat_messages()} if not enable_tool_calls: run_kwargs["tools"] = None @@ -131,46 +125,38 @@ async def run_agent( type(agent_run_response).__name__, ) - response_text = None - structured_response = None + response_text: str | None = None - response_str: str | None = None try: - if response_format: - try: - response_str = agent_run_response.text - structured_response = json.loads(response_str) - logger.debug("Parsed structured JSON response") - except json.JSONDecodeError as decode_error: - logger.warning(f"Failed to parse JSON response: {decode_error}") - response_text = response_str - else: - raw_text = agent_run_response.text - response_text = raw_text if raw_text else "No response" - preview = response_text - logger.debug(f"Response: {preview[:100]}..." if len(preview) > 100 else f"Response: {preview}") + raw_text = agent_run_response.text + response_text = raw_text if raw_text else "No response" + logger.debug(f"Response: {response_text[:100]}...") except Exception as extraction_error: logger.error( - f"Error extracting response: {extraction_error}", + "Error extracting response text: %s", + extraction_error, exc_info=True, ) response_text = "Error extracting response" - agent_response = AgentResponse( - response=response_text, - message=str(message), - thread_id=str(thread_id), - status="success", - message_count=self.state.message_count, - structured_response=structured_response, - ) - result = agent_response.to_dict() + message_count = self.state.message_count + metadata: dict[str, Any] = { + "status": "success", + "message": str(message), + "thread_id": str(thread_id), + "correlation_id": correlation_id, + "message_count": message_count, + } + + metadata["response"] = response_text + content = response_text or "" + + agent_run_response.additional_properties.update(metadata) - content = json.dumps(structured_response) if structured_response else (response_text or "") self.state.add_assistant_message(content, agent_run_response, correlation_id) logger.debug("[AgentEntity.run_agent] AgentRunResponse stored in conversation history") - return result + return agent_run_response except Exception as exc: import traceback @@ -181,16 +167,28 @@ async def run_agent( logger.error(f"Error type: {type(exc).__name__}") logger.error(f"Full traceback:\n{error_traceback}") - error_response = AgentResponse( - response=f"Error: {exc!s}", - message=str(message), - thread_id=str(thread_id), - status="error", - message_count=self.state.message_count, - error=str(exc), - error_type=type(exc).__name__, + error_text = f"Error: {exc!s}" + error_metadata = { + "status": "error", + "error": str(exc), + "error_type": type(exc).__name__, + "message": str(message), + "thread_id": str(thread_id), + "correlation_id": correlation_id, + "message_count": self.state.message_count, + } + + error_response = AgentRunResponse( + messages=[ChatMessage(role="assistant", text=error_text)], + additional_properties=error_metadata, ) - return error_response.to_dict() + + try: + self.state.add_assistant_message(error_text, error_response, correlation_id) + except Exception: # pragma: no cover - defensive logging only + logger.warning("[AgentEntity.run_agent] Failed to record error response in state", exc_info=True) + + return error_response async def _invoke_agent( self, @@ -382,7 +380,7 @@ async def _entity_coroutine(context: df.DurableEntityContext) -> None: request = "" if input_data is None else str(cast(object, input_data)) result = await entity.run_agent(context, request) - context.set_result(result) + context.set_result(result.to_dict()) elif operation == "reset": entity.reset(context) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_models.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_models.py index 015ca40754..e13587dfce 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_models.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_models.py @@ -346,50 +346,3 @@ def from_dict(cls, data: dict[str, Any]) -> RunRequest: thread_id=data.get("thread_id"), correlation_id=data.get("correlation_id"), ) - - -@dataclass -class AgentResponse: - """Response from agent execution. - - Attributes: - response: The agent's text response (or None for structured responses) - message: The original message sent to the agent - thread_id: The thread identifier - status: Status of the execution (success, error, etc.) - message_count: Number of messages in the conversation - error: Error message if status is error - error_type: Type of error if status is error - structured_response: Structured response if response_format was provided - """ - - response: str | None - message: str - thread_id: str | None - status: str - message_count: int = 0 - error: str | None = None - error_type: str | None = None - structured_response: dict[str, Any] | None = None - - def to_dict(self) -> dict[str, Any]: - """Convert to dictionary for JSON serialization.""" - result: dict[str, Any] = { - "message": self.message, - "thread_id": self.thread_id, - "status": self.status, - "message_count": self.message_count, - } - - # Add response or structured_response based on what's available - if self.structured_response is not None: - result["structured_response"] = self.structured_response - elif self.response is not None: - result["response"] = self.response - - if self.error: - result["error"] = self.error - if self.error_type: - result["error_type"] = self.error_type - - return result diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py index 2fd4522964..456538e0b3 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py @@ -6,19 +6,27 @@ """ import uuid -from collections.abc import AsyncIterator +from collections.abc import AsyncIterator, Generator from typing import TYPE_CHECKING, Any, TypeAlias, cast -from agent_framework import AgentProtocol, AgentRunResponseUpdate, AgentThread, ChatMessage, get_logger +from agent_framework import ( + AgentProtocol, + AgentRunResponse, + AgentRunResponseUpdate, + AgentThread, + ChatMessage, + get_logger, +) +from pydantic import BaseModel from ._models import AgentSessionId, DurableAgentThread, RunRequest logger = get_logger("agent_framework.azurefunctions.orchestration") if TYPE_CHECKING: - from azure.durable_functions import DurableOrchestrationContext as _DurableOrchestrationContext + from azure.durable_functions import DurableOrchestrationContext - AgentOrchestrationContextType: TypeAlias = _DurableOrchestrationContext + AgentOrchestrationContextType: TypeAlias = DurableOrchestrationContext else: AgentOrchestrationContextType = Any @@ -81,13 +89,16 @@ def description(self) -> str | None: """Get the description of the agent.""" return self._description - def run( + # We return a Generator[Any, Any, ...] here because Durable Functions orchestrations + # require yielding Tasks, and the run method must return a Task that can be yielded. + # AgentProtocol defines run() as async, so we use type: ignore[override] to suppress the warning. + def run( # type: ignore[override] self, messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, - ) -> Any: # TODO(msft-team): Add a wrapper to respond correctly with `AgentRunResponse` + ) -> Generator[Any, Any, AgentRunResponse]: """Execute the agent with messages and return a Task for orchestrations. This method implements AgentProtocol and returns a Task that can be yielded @@ -106,13 +117,15 @@ def run( def my_orchestration(context): agent = app.get_agent(context, "MyAgent") thread = agent.get_new_thread() - result = yield agent.run("Hello", thread=thread) + result = yield from agent.run("Hello", thread=thread) """ message_str = self._normalize_messages(messages) + logger.debug(f"[DurableAIAgent] Running agent '{self.agent_name}' with message: {message_str[:100]}...") # Extract optional parameters from kwargs enable_tool_calls = kwargs.get("enable_tool_calls", True) response_format = kwargs.get("response_format") + response_format = cast(type[BaseModel], response_format) # Get the session ID for the entity if isinstance(thread, DurableAgentThread) and thread.session_id is not None: @@ -130,6 +143,12 @@ def my_orchestration(context): # Generate a deterministic correlation ID for this call # This is required by the entity and must be unique per call correlation_id = str(self.context.new_uuid()) + logger.debug( + "[DurableAIAgent] Using correlation_id: %s for entity_id: %s for session_id: %s", + correlation_id, + entity_id, + session_id, + ) # Prepare the request using RunRequest model run_request = RunRequest( @@ -144,7 +163,59 @@ def my_orchestration(context): # Call the entity and return the Task directly # The orchestration will yield this Task - return self.context.call_entity(entity_id, "run_agent", run_request.to_dict()) + result = yield self.context.call_entity(entity_id, "run_agent", run_request.to_dict()) + + logger.debug( + "[DurableAIAgent] Entity call completed for correlation_id %s; raw result type: %s", + correlation_id, + type(result), + ) + + response = self._load_agent_response(result) + + if response_format is not None: + self._ensure_response_format(response_format, correlation_id, response) + + return response + + def _load_agent_response(self, agent_response: AgentRunResponse | dict[str, Any] | None) -> AgentRunResponse: + """Convert raw payloads into AgentRunResponse instance.""" + if agent_response is None: + raise ValueError("agent_response cannot be None") + + logger.debug(f"[load_agent_response] Loading agent response of type: {type(agent_response)}") + + response = None + if isinstance(agent_response, AgentRunResponse): + response = agent_response + elif isinstance(agent_response, dict): + logger.debug("[load_agent_response] Converting dict payload using AgentRunResponse.from_dict") + response = AgentRunResponse.from_dict(agent_response) + + return response + + def _ensure_response_format( + self, + response_format: type[BaseModel] | None, + correlation_id: str, + response: AgentRunResponse, + ) -> None: + """Ensure the AgentRunResponse value is parsed into the expected response_format.""" + if response_format is not None and not isinstance(response.value, response_format): + logger.debug( + "[DurableAIAgent] Response value type %s does not match expected %s for correlation_id %s", + type(response.value), + response_format, + correlation_id, + ) + + response.try_parse_value(response_format) + + logger.debug( + "[DurableAIAgent] Loaded AgentRunResponse.value for correlation_id %s with type: %s", + correlation_id, + type(response.value), + ) def run_stream( self, diff --git a/python/packages/azurefunctions/tests/test_app.py b/python/packages/azurefunctions/tests/test_app.py index c11bb873d6..eff45c108a 100644 --- a/python/packages/azurefunctions/tests/test_app.py +++ b/python/packages/azurefunctions/tests/test_app.py @@ -336,10 +336,12 @@ async def test_entity_run_agent_operation(self) -> None: {"message": "Test message", "thread_id": "test-conv-123", "correlation_id": "corr-app-entity-1"}, ) - assert result["status"] == "success" - assert result["response"] == "Test response" - assert result["message"] == "Test message" - assert result["thread_id"] == "test-conv-123" + assert isinstance(result, AgentRunResponse) + metadata = result.additional_properties + assert metadata["status"] == "success" + assert metadata["response"] == "Test response" + assert metadata["message"] == "Test message" + assert metadata["thread_id"] == "test-conv-123" assert entity.state.message_count == 1 async def test_entity_stores_conversation_history(self) -> None: @@ -527,10 +529,11 @@ async def test_entity_handles_agent_error(self) -> None: mock_context, {"message": "Test message", "thread_id": "conv-1", "correlation_id": "corr-app-error-1"} ) - assert result["status"] == "error" - assert "error" in result - assert "Agent error" in result["error"] - assert result["error_type"] == "Exception" + assert isinstance(result, AgentRunResponse) + metadata = result.additional_properties + assert metadata["status"] == "error" + assert "Agent error" in metadata["error"] + assert metadata["error_type"] == "Exception" def test_entity_function_handles_exception(self) -> None: """Test that the entity function handles exceptions gracefully.""" diff --git a/python/packages/azurefunctions/tests/test_entities.py b/python/packages/azurefunctions/tests/test_entities.py index 5b053efbf3..2ff763edea 100644 --- a/python/packages/azurefunctions/tests/test_entities.py +++ b/python/packages/azurefunctions/tests/test_entities.py @@ -127,10 +127,11 @@ async def test_run_agent_executes_agent(self) -> None: assert _role_value(sent_message) == "user" # Verify result - assert result["status"] == "success" - assert result["response"] == "Test response" - assert result["message"] == "Test message" - assert result["thread_id"] == "conv-123" + assert isinstance(result, AgentRunResponse) + assert result.additional_properties["status"] == "success" + assert result.additional_properties["response"] == "Test response" + assert result.additional_properties["message"] == "Test message" + assert result.additional_properties["thread_id"] == "conv-123" async def test_run_agent_streaming_callbacks_invoked(self) -> None: """Ensure streaming updates trigger callbacks and run() is not used.""" @@ -162,8 +163,9 @@ async def update_generator() -> AsyncIterator[AgentRunResponseUpdate]: }, ) - assert result["status"] == "success" - assert "Hello" in result.get("response", "") + assert isinstance(result, AgentRunResponse) + assert result.additional_properties["status"] == "success" + assert "Hello" in result.additional_properties.get("response", "") assert callback.stream_mock.await_count == len(updates) assert callback.response_mock.await_count == 1 mock_agent.run.assert_not_called() @@ -209,8 +211,9 @@ async def test_run_agent_final_callback_without_streaming(self) -> None: }, ) - assert result["status"] == "success" - assert result.get("response") == "Final response" + assert isinstance(result, AgentRunResponse) + assert result.additional_properties["status"] == "success" + assert result.additional_properties.get("response") == "Final response" assert callback.stream_mock.await_count == 0 assert callback.response_mock.await_count == 1 @@ -326,8 +329,9 @@ def text(self) -> str: # type: ignore[override] ) # Should handle gracefully - assert result["status"] == "success" - assert result["response"] == "Error extracting response" + assert isinstance(result, AgentRunResponse) + assert result.additional_properties["status"] == "success" + assert result.additional_properties["response"] == "Error extracting response" async def test_run_agent_handles_none_response_text(self) -> None: """Test that run_agent handles responses with None text.""" @@ -341,8 +345,9 @@ async def test_run_agent_handles_none_response_text(self) -> None: mock_context, {"message": "Message", "thread_id": "conv-1", "correlation_id": "corr-entity-7"} ) - assert result["status"] == "success" - assert result["response"] == "No response" + assert isinstance(result, AgentRunResponse) + assert result.additional_properties["status"] == "success" + assert result.additional_properties["response"] == "No response" async def test_run_agent_multiple_conversations(self) -> None: """Test that run_agent maintains history across multiple messages.""" @@ -594,10 +599,12 @@ async def test_run_agent_handles_agent_exception(self) -> None: mock_context, {"message": "Message", "thread_id": "conv-1", "correlation_id": "corr-entity-error-1"} ) - assert result["status"] == "error" - assert "error" in result - assert "Agent failed" in result["error"] - assert result["error_type"] == "Exception" + assert isinstance(result, AgentRunResponse) + error_metadata = result.additional_properties + assert error_metadata["status"] == "error" + assert "error" in error_metadata + assert "Agent failed" in error_metadata["error"] + assert error_metadata["error_type"] == "Exception" async def test_run_agent_handles_value_error(self) -> None: """Test that run_agent handles ValueError instances.""" @@ -611,9 +618,11 @@ async def test_run_agent_handles_value_error(self) -> None: mock_context, {"message": "Message", "thread_id": "conv-1", "correlation_id": "corr-entity-error-2"} ) - assert result["status"] == "error" - assert result["error_type"] == "ValueError" - assert "Invalid input" in result["error"] + assert isinstance(result, AgentRunResponse) + error_metadata = result.additional_properties + assert error_metadata["status"] == "error" + assert error_metadata["error_type"] == "ValueError" + assert "Invalid input" in error_metadata["error"] async def test_run_agent_handles_timeout_error(self) -> None: """Test that run_agent handles TimeoutError instances.""" @@ -627,8 +636,10 @@ async def test_run_agent_handles_timeout_error(self) -> None: mock_context, {"message": "Message", "thread_id": "conv-1", "correlation_id": "corr-entity-error-3"} ) - assert result["status"] == "error" - assert result["error_type"] == "TimeoutError" + assert isinstance(result, AgentRunResponse) + error_metadata = result.additional_properties + assert error_metadata["status"] == "error" + assert error_metadata["error_type"] == "TimeoutError" def test_entity_function_handles_exception_in_operation(self) -> None: """Test that the entity function handles exceptions gracefully.""" @@ -663,9 +674,11 @@ async def test_run_agent_preserves_message_on_error(self) -> None: ) # Even on error, message info should be preserved - assert result["message"] == "Test message" - assert result["thread_id"] == "conv-123" - assert result["status"] == "error" + assert isinstance(result, AgentRunResponse) + error_metadata = result.additional_properties + assert error_metadata["message"] == "Test message" + assert error_metadata["thread_id"] == "conv-123" + assert error_metadata["status"] == "error" class TestConversationHistory: @@ -771,10 +784,11 @@ async def test_run_agent_with_run_request_object(self) -> None: result = await entity.run_agent(mock_context, request) - assert result["status"] == "success" - assert result["response"] == "Response" - assert result["message"] == "Test message" - assert result["thread_id"] == "conv-123" + assert isinstance(result, AgentRunResponse) + assert result.additional_properties["status"] == "success" + assert result.additional_properties["response"] == "Response" + assert result.additional_properties["message"] == "Test message" + assert result.additional_properties["thread_id"] == "conv-123" async def test_run_agent_with_dict_request(self) -> None: """Test run_agent with a dictionary request.""" @@ -794,9 +808,10 @@ async def test_run_agent_with_dict_request(self) -> None: result = await entity.run_agent(mock_context, request_dict) - assert result["status"] == "success" - assert result["message"] == "Test message" - assert result["thread_id"] == "conv-456" + assert isinstance(result, AgentRunResponse) + assert result.additional_properties["status"] == "success" + assert result.additional_properties["message"] == "Test message" + assert result.additional_properties["thread_id"] == "conv-456" async def test_run_agent_with_string_raises_without_correlation(self) -> None: """Test that run_agent rejects legacy string input without correlation ID.""" @@ -850,10 +865,12 @@ async def test_run_agent_with_response_format(self) -> None: result = await entity.run_agent(mock_context, request) - assert result["status"] == "success" - # Should have structured_response - if "structured_response" in result: - assert result["structured_response"]["answer"] == 42 + assert isinstance(result, AgentRunResponse) + metadata = result.additional_properties + assert metadata["status"] == "success" + assert "structured_response" not in metadata + assert metadata["response"] == '{"answer": 42}' + assert result.value is None async def test_run_agent_disable_tool_calls(self) -> None: """Test run_agent with tool calls disabled.""" @@ -869,7 +886,8 @@ async def test_run_agent_disable_tool_calls(self) -> None: result = await entity.run_agent(mock_context, request) - assert result["status"] == "success" + assert isinstance(result, AgentRunResponse) + assert result.additional_properties["status"] == "success" # Agent should have been called (tool disabling is framework-dependent) mock_agent.run.assert_called_once() @@ -896,8 +914,11 @@ async def test_entity_function_with_run_request_dict(self) -> None: # Verify result was set assert mock_context.set_result.called result = mock_context.set_result.call_args[0][0] - assert result["status"] == "success" - assert result["message"] == "Test message" + assert isinstance(result, dict) + assert result["type"] == "agent_run_response" + metadata = result.get("additional_properties", {}) + assert metadata["status"] == "success" + assert metadata["message"] == "Test message" if __name__ == "__main__": diff --git a/python/packages/azurefunctions/tests/test_models.py b/python/packages/azurefunctions/tests/test_models.py index bb802956ff..006c03f5bd 100644 --- a/python/packages/azurefunctions/tests/test_models.py +++ b/python/packages/azurefunctions/tests/test_models.py @@ -7,7 +7,7 @@ from agent_framework import Role from pydantic import BaseModel -from agent_framework_azurefunctions._models import AgentResponse, AgentSessionId, RunRequest +from agent_framework_azurefunctions._models import AgentSessionId, RunRequest class ModuleStructuredResponse(BaseModel): @@ -337,107 +337,6 @@ def test_round_trip_with_correlation_id(self) -> None: assert restored.thread_id == original.thread_id -class TestAgentResponse: - """Test suite for AgentResponse.""" - - def test_init_with_required_fields(self) -> None: - """Test AgentResponse initialization with required fields.""" - response = AgentResponse( - response="Test response", message="Test message", thread_id="thread-123", status="success" - ) - - assert response.response == "Test response" - assert response.message == "Test message" - assert response.thread_id == "thread-123" - assert response.status == "success" - assert response.message_count == 0 - assert response.error is None - assert response.error_type is None - assert response.structured_response is None - - def test_init_with_all_fields(self) -> None: - """Test AgentResponse initialization with all fields.""" - structured = {"answer": "42"} - response = AgentResponse( - response=None, - message="What is the answer?", - thread_id="thread-456", - status="success", - message_count=5, - error=None, - error_type=None, - structured_response=structured, - ) - - assert response.response is None - assert response.structured_response == structured - assert response.message_count == 5 - - def test_to_dict_with_text_response(self) -> None: - """Test to_dict with text response.""" - response = AgentResponse( - response="Text response", message="Message", thread_id="thread-1", status="success", message_count=3 - ) - data = response.to_dict() - - assert data["response"] == "Text response" - assert data["message"] == "Message" - assert data["thread_id"] == "thread-1" - assert data["status"] == "success" - assert data["message_count"] == 3 - assert "structured_response" not in data - assert "error" not in data - assert "error_type" not in data - - def test_to_dict_with_structured_response(self) -> None: - """Test to_dict with structured response.""" - structured = {"answer": 42, "confidence": 0.95} - response = AgentResponse( - response=None, - message="Question", - thread_id="thread-2", - status="success", - structured_response=structured, - ) - data = response.to_dict() - - assert data["structured_response"] == structured - assert "response" not in data - - def test_to_dict_with_error(self) -> None: - """Test to_dict with error.""" - response = AgentResponse( - response=None, - message="Failed message", - thread_id="thread-3", - status="error", - error="Something went wrong", - error_type="ValueError", - ) - data = response.to_dict() - - assert data["status"] == "error" - assert data["error"] == "Something went wrong" - assert data["error_type"] == "ValueError" - - def test_to_dict_prefers_structured_over_text(self) -> None: - """Test to_dict prefers structured_response over response.""" - structured = {"result": "structured"} - response = AgentResponse( - response="Text response", - message="Message", - thread_id="thread-4", - status="success", - structured_response=structured, - ) - data = response.to_dict() - - assert "structured_response" in data - assert data["structured_response"] == structured - # Text response should not be included when structured is present - assert "response" not in data - - class TestModelIntegration: """Test suite for integration between models.""" @@ -450,21 +349,6 @@ def test_run_request_with_session_id(self) -> None: assert request.thread_id == str(session_id) assert request.thread_id.startswith("@AgentEntity@") - def test_response_from_run_request(self) -> None: - """Test creating AgentResponse from RunRequest.""" - request = RunRequest(message="What is 2+2?", thread_id="thread-123", role=Role.USER) - - response = AgentResponse( - response="4", - message=request.message, - thread_id=request.thread_id, - status="success", - message_count=1, - ) - - assert response.message == request.message - assert response.thread_id == request.thread_id - if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"]) diff --git a/python/packages/azurefunctions/tests/test_orchestration.py b/python/packages/azurefunctions/tests/test_orchestration.py index c30f9f0bec..a9fb26662f 100644 --- a/python/packages/azurefunctions/tests/test_orchestration.py +++ b/python/packages/azurefunctions/tests/test_orchestration.py @@ -6,7 +6,7 @@ from unittest.mock import Mock import pytest -from agent_framework import AgentThread +from agent_framework import AgentRunResponse, AgentThread, ChatMessage from agent_framework_azurefunctions import AgentFunctionApp, DurableAIAgent from agent_framework_azurefunctions._models import AgentSessionId, DurableAgentThread @@ -21,6 +21,44 @@ def _app_with_registered_agents(*agent_names: str) -> AgentFunctionApp: return app +class TestAgentResponseHelpers: + """Tests for helper utilities that prepare AgentRunResponse values.""" + + @staticmethod + def _create_agent() -> DurableAIAgent: + mock_context = Mock() + mock_context.instance_id = "test-instance" + mock_context.new_uuid = Mock(return_value="helper-guid") + return DurableAIAgent(mock_context, "HelperAgent") + + def test_load_agent_response_from_instance(self) -> None: + agent = self._create_agent() + response = AgentRunResponse(messages=[ChatMessage(role="assistant", text='{"foo": "bar"}')]) + + loaded = agent._load_agent_response(response) + + assert loaded is response + assert loaded.value is None + + def test_load_agent_response_from_serialized(self) -> None: + agent = self._create_agent() + serialized = AgentRunResponse(messages=[ChatMessage(role="assistant", text="structured")]).to_dict() + serialized["value"] = {"answer": 42} + + loaded = agent._load_agent_response(serialized) + + assert loaded is not None + assert loaded.value == {"answer": 42} + loaded_dict = loaded.to_dict() + assert loaded_dict["type"] == "agent_run_response" + + def test_load_agent_response_rejects_none(self) -> None: + agent = self._create_agent() + + with pytest.raises(ValueError): + agent._load_agent_response(None) + + class TestDurableAIAgent: """Test suite for DurableAIAgent wrapper.""" diff --git a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py index 15a95327b9..b3f4131985 100644 --- a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py +++ b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py @@ -51,22 +51,22 @@ def single_agent_orchestration(context: DurableOrchestrationContext): writer = app.get_agent(context, WRITER_AGENT_NAME) writer_thread = writer.get_new_thread() - initial = yield writer.run( + initial = yield from writer.run( messages="Write a concise inspirational sentence about learning.", thread=writer_thread, ) improved_prompt = ( "Improve this further while keeping it under 25 words: " - f"{initial.get('response', '').strip()}" + f"{initial.text}" ) - refined = yield writer.run( + refined = yield from writer.run( messages=improved_prompt, thread=writer_thread, ) - return refined.get("response", "") + return refined.text # 5. HTTP endpoint to kick off the orchestration and return the status query URI. diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py index 6d0fd17eca..2bc7d8dfb4 100644 --- a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py @@ -52,7 +52,13 @@ def _create_agents() -> list[Any]: # 4. Durable Functions orchestration that runs both agents in parallel. @app.orchestration_trigger(context_name="context") def multi_agent_concurrent_orchestration(context: DurableOrchestrationContext): - """Fan out to two domain-specific agents and aggregate their responses.""" + """Fan out to two domain-specific agents and aggregate their responses. + + Note: This uses generator protocol to achieve true parallelism. + For sequential execution, you can simply use: + physicist_result = yield from physicist.run(messages=prompt, thread=physicist_thread) + chemist_result = yield from chemist.run(messages=prompt, thread=chemist_thread) + """ prompt = context.get_input() if not prompt or not str(prompt).strip(): @@ -64,14 +70,38 @@ def multi_agent_concurrent_orchestration(context: DurableOrchestrationContext): physicist_thread = physicist.get_new_thread() chemist_thread = chemist.get_new_thread() - physicist_task = physicist.run(messages=str(prompt), thread=physicist_thread) - chemist_task = chemist.run(messages=str(prompt), thread=chemist_thread) - - results = yield context.task_all([physicist_task, chemist_task]) + # Create generators from agent.run() calls + physicist_gen = physicist.run(messages=str(prompt), thread=physicist_thread) + chemist_gen = chemist.run(messages=str(prompt), thread=chemist_thread) + + # Advance each generator to extract the underlying Durable Task objects + physicist_task = next(physicist_gen) + chemist_task = next(chemist_gen) + + # Execute both tasks concurrently using task_all + task_results = yield context.task_all([physicist_task, chemist_task]) + + # Complete the generator protocol by sending results back + # Each generator returns its final value via StopIteration.value + physicist_result = None + chemist_result = None + + try: + physicist_gen.send(task_results[0]) + except StopIteration as e: + physicist_result = e.value + + try: + chemist_gen.send(task_results[1]) + except StopIteration as e: + chemist_result = e.value + + if physicist_result is None or chemist_result is None: + raise ValueError("Failed to get results from one or both agents") return { - "physicist": results[0].get("response", ""), - "chemist": results[1].get("response", ""), + "physicist": physicist_result.text, + "chemist": chemist_result.text, } diff --git a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py index a5991e76f5..ff0cf3aa72 100644 --- a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py +++ b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py @@ -96,13 +96,13 @@ def spam_detection_orchestration(context: DurableOrchestrationContext): f"Content: {payload.email_content}" ) - spam_result_raw = yield spam_agent.run( + spam_result_raw = yield from spam_agent.run( messages=spam_prompt, thread=spam_thread, response_format=SpamDetectionResult, ) - spam_result = cast(SpamDetectionResult, _coerce_structured(spam_result_raw, SpamDetectionResult)) + spam_result = cast(SpamDetectionResult, spam_result_raw.value) if spam_result.is_spam: result = yield context.call_activity("handle_spam_email", spam_result.reason) @@ -117,13 +117,13 @@ def spam_detection_orchestration(context: DurableOrchestrationContext): f"Content: {payload.email_content}" ) - email_result_raw = yield email_agent.run( + email_result_raw = yield from email_agent.run( messages=email_prompt, thread=email_thread, response_format=EmailResponse, ) - email_result = cast(EmailResponse, _coerce_structured(email_result_raw, EmailResponse)) + email_result = cast(EmailResponse, email_result_raw.value) result = yield context.call_activity("send_email", email_result.response) return result @@ -231,24 +231,6 @@ def _build_status_url(request_url: str, instance_id: str, *, route: str) -> str: return f"{base_url}/api/{route}/status/{instance_id}" -def _coerce_structured(result: Mapping[str, Any], model: type[BaseModel]) -> BaseModel: - structured = result.get("structured_response") if isinstance(result, Mapping) else None - if structured is not None: - return model.model_validate(structured) - - response_text = result.get("response") if isinstance(result, Mapping) else None - if isinstance(response_text, str) and response_text.strip(): - try: - parsed = json.loads(response_text) - if isinstance(parsed, Mapping): - return model.model_validate(parsed) - except json.JSONDecodeError: - logger.warning("[ConditionalOrchestration] Failed to parse agent JSON response; raising error.") - - # If parsing failed, raise to surface the issue to the caller. - raise ValueError(f"Agent response could not be parsed as {model.__name__}.") - - """ Expected response from `POST /api/spamdetection/run`: diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py index aac7d092f1..1cf433cbed 100644 --- a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py @@ -12,10 +12,11 @@ import logging from collections.abc import Mapping from datetime import timedelta -from typing import Any +from typing import Any, cast import azure.durable_functions as df import azure.functions as func +from agent_framework import AgentRunResponse from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient from azure.durable_functions import DurableOrchestrationContext from azure.identity import AzureCliCredential @@ -96,12 +97,17 @@ def content_generation_hitl_orchestration(context: DurableOrchestrationContext): context.set_custom_status(f"Starting content generation for topic: {payload.topic}") - initial_raw = yield writer.run( + initial_raw = yield from writer.run( messages=f"Write a short article about '{payload.topic}'.", thread=writer_thread, response_format=GeneratedContent, ) - content = _coerce_generated_content(initial_raw) + + content = initial_raw.value + logger.info("Type of content after extraction: %s", type(content)) + + if content is None or not isinstance(content, GeneratedContent): + raise ValueError("Agent returned no content after extraction.") attempt = 0 while attempt < payload.max_review_attempts: @@ -138,12 +144,13 @@ def content_generation_hitl_orchestration(context: DurableOrchestrationContext): "The content was rejected by a human reviewer. Please rewrite the article incorporating their feedback.\n\n" f"Human Feedback: {approval_payload.feedback or 'No feedback provided.'}" ) - rewritten_raw = yield writer.run( + rewritten_raw = yield from writer.run( messages=rewrite_prompt, thread=writer_thread, response_format=GeneratedContent, ) - content = _coerce_generated_content(rewritten_raw) + + content = cast(GeneratedContent, rewritten_raw.value) else: context.set_custom_status( f"Human approval timed out after {payload.approval_timeout_hours} hour(s). Treating as rejection." @@ -318,23 +325,6 @@ def _build_status_url(request_url: str, instance_id: str, *, route: str) -> str: return f"{base_url}/api/{route}/status/{instance_id}" -def _coerce_generated_content(result: Mapping[str, Any]) -> GeneratedContent: - structured = result.get("structured_response") if isinstance(result, Mapping) else None - if structured is not None: - return GeneratedContent.model_validate(structured) - - response_text = result.get("response") if isinstance(result, Mapping) else None - if isinstance(response_text, str) and response_text.strip(): - try: - parsed = json.loads(response_text) - if isinstance(parsed, Mapping): - return GeneratedContent.model_validate(parsed) - except json.JSONDecodeError: - logger.warning("[HITL] Failed to parse agent JSON response; falling back to defaults.") - - raise ValueError("Agent response could not be parsed as GeneratedContent.") - - def _parse_human_approval(raw: Any) -> HumanApproval: if isinstance(raw, Mapping): return HumanApproval.model_validate(raw) From 92260d7e35aa253e65ba636547a1c0e739b3f9b6 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Mon, 17 Nov 2025 14:22:52 -0800 Subject: [PATCH 02/11] Fix response_Format type --- .../_orchestration.py | 7 +++-- .../tests/test_orchestration.py | 29 +++++++++++-------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py index 456538e0b3..498c5eda0f 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py @@ -91,12 +91,14 @@ def description(self) -> str | None: # We return a Generator[Any, Any, ...] here because Durable Functions orchestrations # require yielding Tasks, and the run method must return a Task that can be yielded. - # AgentProtocol defines run() as async, so we use type: ignore[override] to suppress the warning. + # This is an intentional deviation from AgentProtocol which defines run() as async. + # The Generator type is correct for Durable Functions orchestrations. def run( # type: ignore[override] self, messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, *, thread: AgentThread | None = None, + response_format: type[BaseModel] | None = None, **kwargs: Any, ) -> Generator[Any, Any, AgentRunResponse]: """Execute the agent with messages and return a Task for orchestrations. @@ -107,6 +109,7 @@ def run( # type: ignore[override] Args: messages: The message(s) to send to the agent thread: Optional agent thread for conversation context + response_format: Optional Pydantic model for response parsing **kwargs: Additional arguments (enable_tool_calls, response_format, etc.) Returns: @@ -124,8 +127,6 @@ def my_orchestration(context): # Extract optional parameters from kwargs enable_tool_calls = kwargs.get("enable_tool_calls", True) - response_format = kwargs.get("response_format") - response_format = cast(type[BaseModel], response_format) # Get the session ID for the entity if isinstance(thread, DurableAgentThread) and thread.session_id is not None: diff --git a/python/packages/azurefunctions/tests/test_orchestration.py b/python/packages/azurefunctions/tests/test_orchestration.py index a9fb26662f..784003bf48 100644 --- a/python/packages/azurefunctions/tests/test_orchestration.py +++ b/python/packages/azurefunctions/tests/test_orchestration.py @@ -160,10 +160,11 @@ def test_run_creates_entity_call(self) -> None: # Create thread thread = agent.get_new_thread() - # Call run() - it should return the Task directly - task = agent.run(messages="Test message", thread=thread, enable_tool_calls=True) + # Call run() - advance the generator to get the Task durable functions will yield + run_gen = agent.run(messages="Test message", thread=thread, enable_tool_calls=True) + task = next(run_gen) - # Verify run() returns the Task from call_entity + # Verify run() yields the Task from call_entity assert task == mock_task # Verify call_entity was called with correct parameters @@ -193,7 +194,8 @@ def test_run_without_thread(self) -> None: agent = DurableAIAgent(mock_context, "TestAgent") # Call without thread - task = agent.run(messages="Test message") + run_gen = agent.run(messages="Test message") + task = next(run_gen) assert task == mock_task @@ -224,7 +226,8 @@ class SampleSchema(BaseModel): # Create thread and call thread = agent.get_new_thread() - task = agent.run(messages="Test message", thread=thread, response_format=SampleSchema) + run_gen = agent.run(messages="Test message", thread=thread, response_format=SampleSchema) + task = next(run_gen) assert task == mock_task @@ -267,7 +270,8 @@ def test_run_with_chat_message(self) -> None: # Call with ChatMessage msg = ChatMessage(role="user", text="Hello") - task = agent.run(messages=msg, thread=thread) + run_gen = agent.run(messages=msg, thread=thread) + task = next(run_gen) assert task == mock_task @@ -299,7 +303,8 @@ def test_entity_id_format(self) -> None: thread = agent.get_new_thread() # Call run() to trigger entity ID creation - agent.run("Test", thread=thread) + run_gen = agent.run("Test", thread=thread) + next(run_gen) # Verify call_entity was called with correct EntityId call_args = mock_context.call_entity.call_args @@ -368,12 +373,12 @@ def mock_call_entity_side_effect(entity_id: Any, operation: str, input_data: dic # Create thread thread = agent.get_new_thread() - # First call - returns Task - task1 = agent.run("Write something", thread=thread) + # First call - returns Task when generator is advanced + task1 = next(agent.run("Write something", thread=thread)) assert hasattr(task1, "_is_scheduled") # Second call - returns Task - task2 = agent.run("Improve: something", thread=thread) + task2 = next(agent.run("Improve: something", thread=thread)) assert hasattr(task2, "_is_scheduled") # Verify both calls used the same entity (same session key) @@ -410,8 +415,8 @@ def mock_call_entity_side_effect(entity_id: Any, operation: str, input_data: dic editor_thread = editor.get_new_thread() # Call both agents - returns Tasks - writer_task = writer.run("Write", thread=writer_thread) - editor_task = editor.run("Edit", thread=editor_thread) + writer_task = next(writer.run("Write", thread=writer_thread)) + editor_task = next(editor.run("Edit", thread=editor_thread)) assert hasattr(writer_task, "_is_scheduled") assert hasattr(editor_task, "_is_scheduled") From 47ef15f6caac9d336ca5b665ceb3b0ebdea38137 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Mon, 17 Nov 2025 14:49:18 -0800 Subject: [PATCH 03/11] Address comments --- .../_orchestration.py | 9 ++++----- .../function_app.py | 8 +++++--- .../function_app.py | 13 ++++++++----- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py index 498c5eda0f..0aa6434b6c 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py @@ -186,14 +186,13 @@ def _load_agent_response(self, agent_response: AgentRunResponse | dict[str, Any] logger.debug(f"[load_agent_response] Loading agent response of type: {type(agent_response)}") - response = None if isinstance(agent_response, AgentRunResponse): - response = agent_response - elif isinstance(agent_response, dict): + return agent_response + if isinstance(agent_response, dict): logger.debug("[load_agent_response] Converting dict payload using AgentRunResponse.from_dict") - response = AgentRunResponse.from_dict(agent_response) + return AgentRunResponse.from_dict(agent_response) - return response + raise TypeError(f"Unsupported type for agent_response: {type(agent_response)}") def _ensure_response_format( self, diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py index 938b04dd1f..620660c09c 100644 --- a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py @@ -52,8 +52,10 @@ def _create_agents() -> list[Any]: @app.orchestration_trigger(context_name="context") def multi_agent_concurrent_orchestration(context: DurableOrchestrationContext): """Fan out to two domain-specific agents and aggregate their responses. - - Note: This uses generator protocol to achieve true parallelism. + + Note: The generator protocol lets us extract the Durable Task objects so we + can pass them to task_all for true parallelism. If you only use `yield + from` on each run call, the agents execute sequentially. For sequential execution, you can simply use: physicist_result = yield from physicist.run(messages=prompt, thread=physicist_thread) chemist_result = yield from chemist.run(messages=prompt, thread=chemist_thread) @@ -84,7 +86,7 @@ def multi_agent_concurrent_orchestration(context: DurableOrchestrationContext): # Each generator returns its final value via StopIteration.value physicist_result = None chemist_result = None - + try: physicist_gen.send(task_results[0]) except StopIteration as e: diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py index 0c3f761358..b2894c9b4c 100644 --- a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py @@ -12,10 +12,9 @@ import logging from collections.abc import Mapping from datetime import timedelta -from typing import Any, cast +from typing import Any import azure.functions as func -from agent_framework import AgentRunResponse from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient from azure.durable_functions import DurableOrchestrationClient, DurableOrchestrationContext from azure.identity import AzureCliCredential @@ -104,7 +103,7 @@ def content_generation_hitl_orchestration(context: DurableOrchestrationContext): content = initial_raw.value logger.info("Type of content after extraction: %s", type(content)) - + if content is None or not isinstance(content, GeneratedContent): raise ValueError("Agent returned no content after extraction.") @@ -149,7 +148,11 @@ def content_generation_hitl_orchestration(context: DurableOrchestrationContext): response_format=GeneratedContent, ) - content = cast(GeneratedContent, rewritten_raw.value) + rewritten_value = rewritten_raw.value + if rewritten_value is None or not isinstance(rewritten_value, GeneratedContent): + raise ValueError("Agent returned no content after rewrite.") + + content = rewritten_value else: context.set_custom_status( f"Human approval timed out after {payload.approval_timeout_hours} hour(s). Treating as rejection." @@ -282,7 +285,7 @@ async def get_orchestration_status( show_history_output=False, show_input=True, ) - + # Check if status is None or if the instance doesn't exist (runtime_status is None) if status is None or getattr(status, "runtime_status", None) is None: return func.HttpResponse( From f3bfc8e53e2da0e0a161cb3932f1529e0b936a7b Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Thu, 20 Nov 2025 10:15:53 -0800 Subject: [PATCH 04/11] Fix tests --- .../packages/azurefunctions/tests/test_app.py | 17 +-- .../azurefunctions/tests/test_entities.py | 122 ++++++------------ 2 files changed, 48 insertions(+), 91 deletions(-) diff --git a/python/packages/azurefunctions/tests/test_app.py b/python/packages/azurefunctions/tests/test_app.py index fd93ddc7eb..a6961195a1 100644 --- a/python/packages/azurefunctions/tests/test_app.py +++ b/python/packages/azurefunctions/tests/test_app.py @@ -9,7 +9,7 @@ import azure.durable_functions as df import azure.functions as func import pytest -from agent_framework import AgentRunResponse, ChatMessage +from agent_framework import AgentRunResponse, ChatMessage, ErrorContent from agent_framework_azurefunctions import AgentFunctionApp from agent_framework_azurefunctions._app import WAIT_FOR_RESPONSE_FIELD, WAIT_FOR_RESPONSE_HEADER @@ -343,11 +343,7 @@ async def test_entity_run_agent_operation(self) -> None: ) assert isinstance(result, AgentRunResponse) - metadata = result.additional_properties - assert metadata["status"] == "success" - assert metadata["response"] == "Test response" - assert metadata["message"] == "Test message" - assert metadata["thread_id"] == "test-conv-123" + assert result.text == "Test response" assert entity.state.message_count == 2 async def test_entity_stores_conversation_history(self) -> None: @@ -593,10 +589,11 @@ async def test_entity_handles_agent_error(self) -> None: ) assert isinstance(result, AgentRunResponse) - metadata = result.additional_properties - assert metadata["status"] == "error" - assert "Agent error" in metadata["error"] - assert metadata["error_type"] == "Exception" + assert len(result.messages) == 1 + content = result.messages[0].contents[0] + assert isinstance(content, ErrorContent) + assert "Agent error" in (content.message or "") + assert content.error_code == "Exception" def test_entity_function_handles_exception(self) -> None: """Test that the entity function handles exceptions gracefully.""" diff --git a/python/packages/azurefunctions/tests/test_entities.py b/python/packages/azurefunctions/tests/test_entities.py index 88fae8af2e..d47fae4a1f 100644 --- a/python/packages/azurefunctions/tests/test_entities.py +++ b/python/packages/azurefunctions/tests/test_entities.py @@ -12,7 +12,7 @@ from unittest.mock import AsyncMock, Mock, patch import pytest -from agent_framework import AgentRunResponse, AgentRunResponseUpdate, ChatMessage, Role +from agent_framework import AgentRunResponse, AgentRunResponseUpdate, ChatMessage, ErrorContent, Role from pydantic import BaseModel from agent_framework_azurefunctions._durable_agent_state import ( @@ -134,10 +134,7 @@ async def test_run_agent_executes_agent(self) -> None: # Verify result assert isinstance(result, AgentRunResponse) - assert result.additional_properties["status"] == "success" - assert result.additional_properties["response"] == "Test response" - assert result.additional_properties["message"] == "Test message" - assert result.additional_properties["thread_id"] == "conv-123" + assert result.text == "Test response" async def test_run_agent_streaming_callbacks_invoked(self) -> None: """Ensure streaming updates trigger callbacks and run() is not used.""" @@ -170,8 +167,7 @@ async def update_generator() -> AsyncIterator[AgentRunResponseUpdate]: ) assert isinstance(result, AgentRunResponse) - assert result.additional_properties["status"] == "success" - assert "Hello" in result.additional_properties.get("response", "") + assert "Hello" in result.text assert callback.stream_mock.await_count == len(updates) assert callback.response_mock.await_count == 1 mock_agent.run.assert_not_called() @@ -218,8 +214,7 @@ async def test_run_agent_final_callback_without_streaming(self) -> None: ) assert isinstance(result, AgentRunResponse) - assert result.additional_properties["status"] == "success" - assert result.additional_properties.get("response") == "Final response" + assert result.text == "Final response" assert callback.stream_mock.await_count == 0 assert callback.response_mock.await_count == 1 @@ -297,46 +292,6 @@ async def test_run_agent_with_none_thread_id(self) -> None: mock_context, {"message": "Message", "thread_id": None, "correlationId": "corr-entity-5"} ) - async def test_run_agent_handles_response_without_text_attribute(self) -> None: - """Test that run_agent handles responses without a text attribute.""" - mock_agent = Mock() - - class NoTextResponse(AgentRunResponse): - @property - def text(self) -> str: # type: ignore[override] - raise AttributeError("text attribute missing") - - mock_response = NoTextResponse(messages=[ChatMessage(role="assistant", text="ignored")]) - mock_agent.run = AsyncMock(return_value=mock_response) - - entity = AgentEntity(mock_agent) - mock_context = Mock() - - result = await entity.run_agent( - mock_context, {"message": "Message", "thread_id": "conv-1", "correlationId": "corr-entity-6"} - ) - - # Should handle gracefully - assert isinstance(result, AgentRunResponse) - assert result.additional_properties["status"] == "success" - assert result.additional_properties["response"] == "Error extracting response" - - async def test_run_agent_handles_none_response_text(self) -> None: - """Test that run_agent handles responses with None text.""" - mock_agent = Mock() - mock_agent.run = AsyncMock(return_value=_agent_response(None)) - - entity = AgentEntity(mock_agent) - mock_context = Mock() - - result = await entity.run_agent( - mock_context, {"message": "Message", "thread_id": "conv-1", "correlationId": "corr-entity-7"} - ) - - assert isinstance(result, AgentRunResponse) - assert result.additional_properties["status"] == "success" - assert result.additional_properties["response"] == "No response" - async def test_run_agent_multiple_conversations(self) -> None: """Test that run_agent maintains history across multiple messages.""" mock_agent = Mock() @@ -627,11 +582,11 @@ async def test_run_agent_handles_agent_exception(self) -> None: ) assert isinstance(result, AgentRunResponse) - error_metadata = result.additional_properties - assert error_metadata["status"] == "error" - assert "error" in error_metadata - assert "Agent failed" in error_metadata["error"] - assert error_metadata["error_type"] == "Exception" + assert len(result.messages) == 1 + content = result.messages[0].contents[0] + assert isinstance(content, ErrorContent) + assert "Agent failed" in (content.message or "") + assert content.error_code == "Exception" async def test_run_agent_handles_value_error(self) -> None: """Test that run_agent handles ValueError instances.""" @@ -646,10 +601,11 @@ async def test_run_agent_handles_value_error(self) -> None: ) assert isinstance(result, AgentRunResponse) - error_metadata = result.additional_properties - assert error_metadata["status"] == "error" - assert error_metadata["error_type"] == "ValueError" - assert "Invalid input" in error_metadata["error"] + assert len(result.messages) == 1 + content = result.messages[0].contents[0] + assert isinstance(content, ErrorContent) + assert content.error_code == "ValueError" + assert "Invalid input" in str(content.message) async def test_run_agent_handles_timeout_error(self) -> None: """Test that run_agent handles TimeoutError instances.""" @@ -664,9 +620,10 @@ async def test_run_agent_handles_timeout_error(self) -> None: ) assert isinstance(result, AgentRunResponse) - error_metadata = result.additional_properties - assert error_metadata["status"] == "error" - assert error_metadata["error_type"] == "TimeoutError" + assert len(result.messages) == 1 + content = result.messages[0].contents[0] + assert isinstance(content, ErrorContent) + assert content.error_code == "TimeoutError" def test_entity_function_handles_exception_in_operation(self) -> None: """Test that the entity function handles exceptions gracefully.""" @@ -702,10 +659,9 @@ async def test_run_agent_preserves_message_on_error(self) -> None: # Even on error, message info should be preserved assert isinstance(result, AgentRunResponse) - error_metadata = result.additional_properties - assert error_metadata["message"] == "Test message" - assert error_metadata["thread_id"] == "conv-123" - assert error_metadata["status"] == "error" + assert len(result.messages) == 1 + content = result.messages[0].contents[0] + assert isinstance(content, ErrorContent) class TestConversationHistory: @@ -814,10 +770,7 @@ async def test_run_agent_with_run_request_object(self) -> None: result = await entity.run_agent(mock_context, request) assert isinstance(result, AgentRunResponse) - assert result.additional_properties["status"] == "success" - assert result.additional_properties["response"] == "Response" - assert result.additional_properties["message"] == "Test message" - assert result.additional_properties["thread_id"] == "conv-123" + assert result.text == "Response" async def test_run_agent_with_dict_request(self) -> None: """Test run_agent with a dictionary request.""" @@ -838,9 +791,7 @@ async def test_run_agent_with_dict_request(self) -> None: result = await entity.run_agent(mock_context, request_dict) assert isinstance(result, AgentRunResponse) - assert result.additional_properties["status"] == "success" - assert result.additional_properties["message"] == "Test message" - assert result.additional_properties["thread_id"] == "conv-456" + assert result.text == "Response" async def test_run_agent_with_string_raises_without_correlation(self) -> None: """Test that run_agent rejects legacy string input without correlation ID.""" @@ -895,10 +846,7 @@ async def test_run_agent_with_response_format(self) -> None: result = await entity.run_agent(mock_context, request) assert isinstance(result, AgentRunResponse) - metadata = result.additional_properties - assert metadata["status"] == "success" - assert "structured_response" not in metadata - assert metadata["response"] == '{"answer": 42}' + assert result.text == '{"answer": 42}' assert result.value is None async def test_run_agent_disable_tool_calls(self) -> None: @@ -916,7 +864,6 @@ async def test_run_agent_disable_tool_calls(self) -> None: result = await entity.run_agent(mock_context, request) assert isinstance(result, AgentRunResponse) - assert result.additional_properties["status"] == "success" # Agent should have been called (tool disabling is framework-dependent) mock_agent.run.assert_called_once() @@ -944,10 +891,23 @@ async def test_entity_function_with_run_request_dict(self) -> None: assert mock_context.set_result.called result = mock_context.set_result.call_args[0][0] assert isinstance(result, dict) - assert result["type"] == "agent_run_response" - metadata = result.get("additional_properties", {}) - assert metadata["status"] == "success" - assert metadata["message"] == "Test message" + + # Check if messages are present + assert "messages" in result + assert len(result["messages"]) > 0 + message = result["messages"][0] + + # Check for text in various possible locations + text_found = False + if "text" in message and message["text"] == "Response": + text_found = True + elif "contents" in message: + for content in message["contents"]: + if isinstance(content, dict) and content.get("text") == "Response": + text_found = True + break + + assert text_found, f"Response text not found in message: {message}" if __name__ == "__main__": From cd07aab8c4ceff5334f1bd46e86ebf9018c6025b Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Thu, 20 Nov 2025 10:26:53 -0800 Subject: [PATCH 05/11] Fix log --- .../agent_framework_azurefunctions/_entities.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py index 834e9bc3b6..39df766ec5 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py @@ -154,11 +154,8 @@ async def run_agent( type(agent_run_response).__name__, ) - response_text = None - try: - raw_text = agent_run_response.text - response_text = raw_text if raw_text else "No response" + response_text = agent_run_response.text if agent_run_response.text else "No response" logger.debug(f"Response: {response_text[:100]}...") except Exception as extraction_error: logger.error( @@ -166,7 +163,6 @@ async def run_agent( extraction_error, exc_info=True, ) - response_text = "Error extracting response" state_response = DurableAgentStateResponse.from_run_response(correlation_id, agent_run_response) self.state.data.conversation_history.append(state_response) From cc504c5625390b08c6622015dc166a47defe95cb Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Thu, 20 Nov 2025 11:18:07 -0800 Subject: [PATCH 06/11] Addressed comments --- .../agent_framework_azurefunctions/_entities.py | 5 +++-- .../agent_framework_azurefunctions/_orchestration.py | 7 +++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py index 39df766ec5..34ec0f1aab 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py @@ -391,8 +391,9 @@ async def _entity_coroutine(context: df.DurableEntityContext) -> None: logger.error("[entity_function] Unknown operation: %s", operation) context.set_result({"error": f"Unknown operation: {operation}"}) - logger.debug("State dict: %s", entity.state.to_dict()) - context.set_state(entity.state.to_dict()) + serialized_state = entity.state.to_dict() + logger.debug("State dict: %s", serialized_state) + context.set_state(serialized_state) logger.info(f"[entity_function] Operation {operation} completed successfully") except Exception as exc: diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py index 0aa6434b6c..9ba3f4efe4 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py @@ -110,10 +110,10 @@ def run( # type: ignore[override] messages: The message(s) to send to the agent thread: Optional agent thread for conversation context response_format: Optional Pydantic model for response parsing - **kwargs: Additional arguments (enable_tool_calls, response_format, etc.) + **kwargs: Additional arguments (enable_tool_calls) Returns: - Task that will resolve to the agent response + Yield a task that will resolve to the agent response Example: @app.orchestration_trigger(context_name="context") @@ -167,9 +167,8 @@ def my_orchestration(context): result = yield self.context.call_entity(entity_id, "run_agent", run_request.to_dict()) logger.debug( - "[DurableAIAgent] Entity call completed for correlation_id %s; raw result type: %s", + "[DurableAIAgent] Entity call completed for correlation_id %s", correlation_id, - type(result), ) response = self._load_agent_response(result) From 62a79deea16b770b16052b580d2cbfa83b27399f Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Mon, 24 Nov 2025 09:53:21 -0800 Subject: [PATCH 07/11] Code cleanup --- .../agent_framework_azurefunctions/_entities.py | 10 ++-------- .../agent_framework_azurefunctions/_orchestration.py | 10 +--------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py index 34ec0f1aab..45872ce1a1 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_entities.py @@ -172,10 +172,7 @@ async def run_agent( return agent_run_response except Exception as exc: - import traceback - - error_traceback = traceback.format_exc() - logger.error(f"[AgentEntity.run_agent] Agent execution failed. Full traceback:\n{error_traceback}") + logger.exception("[AgentEntity.run_agent] Agent execution failed.") # Create error message error_message = ChatMessage( @@ -397,10 +394,7 @@ async def _entity_coroutine(context: df.DurableEntityContext) -> None: logger.info(f"[entity_function] Operation {operation} completed successfully") except Exception as exc: - import traceback - - logger.error("[entity_function] Error in entity: %s", exc) - logger.error(f"[entity_function] Traceback:\n{traceback.format_exc()}") + logger.exception("[entity_function] Error executing entity operation %s", exc) context.set_result({"error": str(exc), "status": "error"}) def entity_function(context: df.DurableEntityContext) -> None: diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py index 9ba3f4efe4..b4e4910ef8 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py @@ -123,7 +123,6 @@ def my_orchestration(context): result = yield from agent.run("Hello", thread=thread) """ message_str = self._normalize_messages(messages) - logger.debug(f"[DurableAIAgent] Running agent '{self.agent_name}' with message: {message_str[:100]}...") # Extract optional parameters from kwargs enable_tool_calls = kwargs.get("enable_tool_calls", True) @@ -201,19 +200,12 @@ def _ensure_response_format( ) -> None: """Ensure the AgentRunResponse value is parsed into the expected response_format.""" if response_format is not None and not isinstance(response.value, response_format): - logger.debug( - "[DurableAIAgent] Response value type %s does not match expected %s for correlation_id %s", - type(response.value), - response_format, - correlation_id, - ) - response.try_parse_value(response_format) logger.debug( "[DurableAIAgent] Loaded AgentRunResponse.value for correlation_id %s with type: %s", correlation_id, - type(response.value), + type(response.value).__name__, ) def run_stream( From af398910142e42e3e789308a754d8a6275bc5564 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Tue, 25 Nov 2025 13:20:50 -0800 Subject: [PATCH 08/11] Use AgentTask vs Generator --- .../_durable_agent_state.py | 4 +- .../agent_framework_azurefunctions/_models.py | 4 +- .../_orchestration.py | 199 +++++++++++++----- .../tests/test_orchestration.py | 125 +++++------ .../function_app.py | 4 +- .../function_app.py | 41 +--- .../function_app.py | 4 +- .../function_app.py | 4 +- 8 files changed, 230 insertions(+), 155 deletions(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_durable_agent_state.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_durable_agent_state.py index 73695e61f2..0d9166373f 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_durable_agent_state.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_durable_agent_state.py @@ -53,7 +53,7 @@ ) from dateutil import parser as date_parser -from ._models import RunRequest, _serialize_response_format +from ._models import RunRequest, serialize_response_format logger = get_logger("agent_framework.azurefunctions.durable_agent_state") @@ -494,7 +494,7 @@ def from_run_request(request: RunRequest) -> DurableAgentStateRequest: messages=[DurableAgentStateMessage.from_run_request(request)], created_at=datetime.now(tz=timezone.utc), response_type=request.request_response_format, - response_schema=_serialize_response_format(request.response_format), + response_schema=serialize_response_format(request.response_format), ) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_models.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_models.py index 7978d71dd2..e9ed6f7cad 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_models.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_models.py @@ -213,7 +213,7 @@ async def deserialize( return thread -def _serialize_response_format(response_format: type[BaseModel] | None) -> Any: +def serialize_response_format(response_format: type[BaseModel] | None) -> Any: """Serialize response format for transport across durable function boundaries.""" if response_format is None: return None @@ -339,7 +339,7 @@ def to_dict(self) -> dict[str, Any]: "request_response_format": self.request_response_format, } if self.response_format: - result["response_format"] = _serialize_response_format(self.response_format) + result["response_format"] = serialize_response_format(self.response_format) if self.thread_id: result["thread_id"] = self.thread_id if self.correlation_id: diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py index b4e4910ef8..7017016fbc 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py @@ -6,7 +6,7 @@ """ import uuid -from collections.abc import AsyncIterator, Generator +from collections.abc import AsyncIterator, Callable from typing import TYPE_CHECKING, Any, TypeAlias, cast from agent_framework import ( @@ -17,18 +17,142 @@ ChatMessage, get_logger, ) +from azure.durable_functions.models import TaskBase +from azure.durable_functions.models.Task import CompoundTask, TaskState from pydantic import BaseModel from ._models import AgentSessionId, DurableAgentThread, RunRequest logger = get_logger("agent_framework.azurefunctions.orchestration") +CompoundActionConstructor: TypeAlias = Callable[[list[Any]], Any] | None + if TYPE_CHECKING: from azure.durable_functions import DurableOrchestrationContext + class _TypedCompoundTask(CompoundTask): + def __init__( + self, + tasks: list[TaskBase], + compound_action_constructor: CompoundActionConstructor = None, + ) -> None: ... + AgentOrchestrationContextType: TypeAlias = DurableOrchestrationContext else: AgentOrchestrationContextType = Any + _TypedCompoundTask = CompoundTask + + +class AgentTask(_TypedCompoundTask): + """A custom Task that wraps entity calls and provides typed AgentRunResponse results. + + This task wraps the underlying entity call task and intercepts its completion + to convert the raw result into a typed AgentRunResponse object. + """ + + def __init__( + self, + entity_task: TaskBase, + response_format: type[BaseModel] | None, + agent: "DurableAIAgent", + correlation_id: str, + ): + """Initialize the AgentTask. + + Args: + entity_task: The underlying entity call task + response_format: Optional Pydantic model for response parsing + agent: Reference to the DurableAIAgent for response conversion + correlation_id: Correlation ID for logging + """ + super().__init__([entity_task]) + self._response_format = response_format + self._agent = agent + self._correlation_id = correlation_id + + # Override action_repr to expose the inner task's action directly + # This ensures compatibility with ReplaySchema V3 which expects Action objects. + self.action_repr = entity_task.action_repr + + # Also copy the task ID to match the entity task's identity + self.id = entity_task.id + + def try_set_value(self, child: TaskBase): + """Intercept task completion and convert raw result to typed AgentRunResponse. + + This method delegates to the parent WhenAllTask behavior but intercepts + successful completion to transform the result. + + Parameters + ---------- + child : TaskBase + The entity call task that just completed + """ + if child.state is TaskState.SUCCEEDED: + # Delegate to parent class for standard completion logic + if len(self.pending_tasks) == 0: + # Transform the raw result before setting it + raw_result = child.result + logger.debug( + "[AgentTask] Converting raw result for correlation_id %s", + self._correlation_id, + ) + + try: + response = self._load_agent_response(raw_result) + + if self._response_format is not None: + self._ensure_response_format( + self._response_format, + self._correlation_id, + response, + ) + + # Set the typed AgentRunResponse as this task's result + self.set_value(is_error=False, value=response) + except Exception as e: + logger.error( + "[AgentTask] Failed to convert result for correlation_id %s: %s", + self._correlation_id, + e, + ) + self.set_value(is_error=True, value=e) + else: # child.state is TaskState.FAILED + # Delegate error handling to parent class + if self._first_error is None: + self._first_error = child.result + self.set_value(is_error=True, value=self._first_error) + + def _load_agent_response(self, agent_response: AgentRunResponse | dict[str, Any] | None) -> AgentRunResponse: + """Convert raw payloads into AgentRunResponse instance.""" + if agent_response is None: + raise ValueError("agent_response cannot be None") + + logger.debug(f"[load_agent_response] Loading agent response of type: {type(agent_response)}") + + if isinstance(agent_response, AgentRunResponse): + return agent_response + if isinstance(agent_response, dict): + logger.debug("[load_agent_response] Converting dict payload using AgentRunResponse.from_dict") + return AgentRunResponse.from_dict(agent_response) + + raise TypeError(f"Unsupported type for agent_response: {type(agent_response)}") + + def _ensure_response_format( + self, + response_format: type[BaseModel] | None, + correlation_id: str, + response: AgentRunResponse, + ) -> None: + """Ensure the AgentRunResponse value is parsed into the expected response_format.""" + if response_format is not None and not isinstance(response.value, response_format): + response.try_parse_value(response_format) + + logger.debug( + "[DurableAIAgent] Loaded AgentRunResponse.value for correlation_id %s with type: %s", + correlation_id, + type(response.value).__name__, + ) class DurableAIAgent(AgentProtocol): @@ -89,10 +213,10 @@ def description(self) -> str | None: """Get the description of the agent.""" return self._description - # We return a Generator[Any, Any, ...] here because Durable Functions orchestrations - # require yielding Tasks, and the run method must return a Task that can be yielded. + # We return an AgentTask here which is a TaskBase subclass. # This is an intentional deviation from AgentProtocol which defines run() as async. - # The Generator type is correct for Durable Functions orchestrations. + # The AgentTask can be yielded in Durable Functions orchestrations and will provide + # a typed AgentRunResponse result. def run( # type: ignore[override] self, messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, @@ -100,11 +224,12 @@ def run( # type: ignore[override] thread: AgentThread | None = None, response_format: type[BaseModel] | None = None, **kwargs: Any, - ) -> Generator[Any, Any, AgentRunResponse]: - """Execute the agent with messages and return a Task for orchestrations. + ) -> AgentTask: + """Execute the agent with messages and return an AgentTask for orchestrations. - This method implements AgentProtocol and returns a Task that can be yielded - in Durable Functions orchestrations. + This method implements AgentProtocol and returns an AgentTask (subclass of TaskBase) + that can be yielded in Durable Functions orchestrations. The task's result will be + a typed AgentRunResponse. Args: messages: The message(s) to send to the agent @@ -113,14 +238,15 @@ def run( # type: ignore[override] **kwargs: Additional arguments (enable_tool_calls) Returns: - Yield a task that will resolve to the agent response + An AgentTask that resolves to an AgentRunResponse when yielded Example: @app.orchestration_trigger(context_name="context") def my_orchestration(context): agent = app.get_agent(context, "MyAgent") thread = agent.get_new_thread() - result = yield from agent.run("Hello", thread=thread) + response = yield agent.run("Hello", thread=thread) + # response is typed as AgentRunResponse """ message_str = self._normalize_messages(messages) @@ -161,52 +287,23 @@ def my_orchestration(context): logger.debug(f"[DurableAIAgent] Calling entity {entity_id} with message: {message_str[:100]}...") - # Call the entity and return the Task directly - # The orchestration will yield this Task - result = yield self.context.call_entity(entity_id, "run_agent", run_request.to_dict()) + # Call the entity to get the underlying task + entity_task = self.context.call_entity(entity_id, "run_agent", run_request.to_dict()) + + # Wrap it in an AgentTask that will convert the result to AgentRunResponse + agent_task = AgentTask( + entity_task=entity_task, + response_format=response_format, + agent=self, + correlation_id=correlation_id, + ) logger.debug( - "[DurableAIAgent] Entity call completed for correlation_id %s", + "[DurableAIAgent] Created AgentTask for correlation_id %s", correlation_id, ) - response = self._load_agent_response(result) - - if response_format is not None: - self._ensure_response_format(response_format, correlation_id, response) - - return response - - def _load_agent_response(self, agent_response: AgentRunResponse | dict[str, Any] | None) -> AgentRunResponse: - """Convert raw payloads into AgentRunResponse instance.""" - if agent_response is None: - raise ValueError("agent_response cannot be None") - - logger.debug(f"[load_agent_response] Loading agent response of type: {type(agent_response)}") - - if isinstance(agent_response, AgentRunResponse): - return agent_response - if isinstance(agent_response, dict): - logger.debug("[load_agent_response] Converting dict payload using AgentRunResponse.from_dict") - return AgentRunResponse.from_dict(agent_response) - - raise TypeError(f"Unsupported type for agent_response: {type(agent_response)}") - - def _ensure_response_format( - self, - response_format: type[BaseModel] | None, - correlation_id: str, - response: AgentRunResponse, - ) -> None: - """Ensure the AgentRunResponse value is parsed into the expected response_format.""" - if response_format is not None and not isinstance(response.value, response_format): - response.try_parse_value(response_format) - - logger.debug( - "[DurableAIAgent] Loaded AgentRunResponse.value for correlation_id %s with type: %s", - correlation_id, - type(response.value).__name__, - ) + return agent_task def run_stream( self, diff --git a/python/packages/azurefunctions/tests/test_orchestration.py b/python/packages/azurefunctions/tests/test_orchestration.py index 60a9f895fa..d7d3d2fbff 100644 --- a/python/packages/azurefunctions/tests/test_orchestration.py +++ b/python/packages/azurefunctions/tests/test_orchestration.py @@ -7,9 +7,11 @@ import pytest from agent_framework import AgentRunResponse, AgentThread, ChatMessage +from azure.durable_functions.models.Task import TaskBase, TaskState from agent_framework_azurefunctions import AgentFunctionApp, DurableAIAgent from agent_framework_azurefunctions._models import AgentSessionId, DurableAgentThread +from agent_framework_azurefunctions._orchestration import AgentTask def _app_with_registered_agents(*agent_names: str) -> AgentFunctionApp: @@ -21,31 +23,48 @@ def _app_with_registered_agents(*agent_names: str) -> AgentFunctionApp: return app +class _FakeTask(TaskBase): + """Concrete TaskBase for testing AgentTask wiring.""" + + def __init__(self, task_id: int = 1): + super().__init__(task_id, []) + self._set_is_scheduled(False) + self.action_repr = [] + self.state = TaskState.RUNNING + + +def _create_entity_task(task_id: int = 1) -> TaskBase: + """Create a minimal TaskBase instance for AgentTask tests.""" + return _FakeTask(task_id) + + class TestAgentResponseHelpers: """Tests for helper utilities that prepare AgentRunResponse values.""" @staticmethod - def _create_agent() -> DurableAIAgent: + def _create_agent_task() -> AgentTask: mock_context = Mock() mock_context.instance_id = "test-instance" mock_context.new_uuid = Mock(return_value="helper-guid") - return DurableAIAgent(mock_context, "HelperAgent") + agent = DurableAIAgent(mock_context, "HelperAgent") + entity_task = _create_entity_task() + return AgentTask(entity_task, None, agent, "correlation-id") def test_load_agent_response_from_instance(self) -> None: - agent = self._create_agent() + task = self._create_agent_task() response = AgentRunResponse(messages=[ChatMessage(role="assistant", text='{"foo": "bar"}')]) - loaded = agent._load_agent_response(response) + loaded = task._load_agent_response(response) assert loaded is response assert loaded.value is None def test_load_agent_response_from_serialized(self) -> None: - agent = self._create_agent() + task = self._create_agent_task() serialized = AgentRunResponse(messages=[ChatMessage(role="assistant", text="structured")]).to_dict() serialized["value"] = {"answer": 42} - loaded = agent._load_agent_response(serialized) + loaded = task._load_agent_response(serialized) assert loaded is not None assert loaded.value == {"answer": 42} @@ -53,10 +72,10 @@ def test_load_agent_response_from_serialized(self) -> None: assert loaded_dict["type"] == "agent_run_response" def test_load_agent_response_rejects_none(self) -> None: - agent = self._create_agent() + task = self._create_agent_task() with pytest.raises(ValueError): - agent._load_agent_response(None) + task._load_agent_response(None) class TestDurableAIAgent: @@ -149,23 +168,19 @@ def test_run_creates_entity_call(self) -> None: mock_context.instance_id = "test-instance-001" mock_context.new_uuid = Mock(side_effect=["thread-guid", "correlation-guid"]) - # Mock call_entity to return a Task-like object - mock_task = Mock() - mock_task._is_scheduled = False # Task attribute that orchestration checks - - mock_context.call_entity = Mock(return_value=mock_task) + entity_task = _create_entity_task() + mock_context.call_entity = Mock(return_value=entity_task) agent = DurableAIAgent(mock_context, "TestAgent") # Create thread thread = agent.get_new_thread() - # Call run() - advance the generator to get the Task durable functions will yield - run_gen = agent.run(messages="Test message", thread=thread, enable_tool_calls=True) - task = next(run_gen) + # Call run() - returns AgentTask directly + task = agent.run(messages="Test message", thread=thread, enable_tool_calls=True) - # Verify run() yields the Task from call_entity - assert task == mock_task + assert isinstance(task, AgentTask) + assert task.children[0] == entity_task # Verify call_entity was called with correct parameters assert mock_context.call_entity.called @@ -184,20 +199,18 @@ def test_run_without_thread(self) -> None: """Test that run() works without explicit thread (creates unique session key).""" mock_context = Mock() mock_context.instance_id = "test-instance-002" - # Two calls to new_uuid: one for session_key, one for correlationId mock_context.new_uuid = Mock(side_effect=["auto-generated-guid", "correlation-guid"]) - mock_task = Mock() - mock_task._is_scheduled = False - mock_context.call_entity = Mock(return_value=mock_task) + entity_task = _create_entity_task() + mock_context.call_entity = Mock(return_value=entity_task) agent = DurableAIAgent(mock_context, "TestAgent") # Call without thread - run_gen = agent.run(messages="Test message") - task = next(run_gen) + task = agent.run(messages="Test message") - assert task == mock_task + assert isinstance(task, AgentTask) + assert task.children[0] == entity_task # Verify the entity ID uses the auto-generated GUID with dafx- prefix call_args = mock_context.call_entity.call_args @@ -212,9 +225,8 @@ def test_run_with_response_format(self) -> None: mock_context = Mock() mock_context.instance_id = "test-instance-003" - mock_task = Mock() - mock_task._is_scheduled = False - mock_context.call_entity = Mock(return_value=mock_task) + entity_task = _create_entity_task() + mock_context.call_entity = Mock(return_value=entity_task) agent = DurableAIAgent(mock_context, "TestAgent") @@ -226,10 +238,10 @@ class SampleSchema(BaseModel): # Create thread and call thread = agent.get_new_thread() - run_gen = agent.run(messages="Test message", thread=thread, response_format=SampleSchema) - task = next(run_gen) + task = agent.run(messages="Test message", thread=thread, response_format=SampleSchema) - assert task == mock_task + assert isinstance(task, AgentTask) + assert task.children[0] == entity_task # Verify schema was passed in the call_entity arguments call_args = mock_context.call_entity.call_args @@ -262,18 +274,18 @@ def test_run_with_chat_message(self) -> None: mock_context = Mock() mock_context.new_uuid = Mock(side_effect=["thread-guid", "correlation-guid"]) - mock_task = Mock() - mock_context.call_entity = Mock(return_value=mock_task) + entity_task = _create_entity_task() + mock_context.call_entity = Mock(return_value=entity_task) agent = DurableAIAgent(mock_context, "TestAgent") thread = agent.get_new_thread() # Call with ChatMessage msg = ChatMessage(role="user", text="Hello") - run_gen = agent.run(messages=msg, thread=thread) - task = next(run_gen) + task = agent.run(messages=msg, thread=thread) - assert task == mock_task + assert isinstance(task, AgentTask) + assert task.children[0] == entity_task # Verify message was converted to string call_args = mock_context.call_entity.call_args @@ -297,14 +309,13 @@ def test_entity_id_format(self) -> None: mock_context = Mock() mock_context.new_uuid = Mock(return_value="test-guid-789") - mock_context.call_entity = Mock(return_value=Mock()) + mock_context.call_entity = Mock(return_value=_create_entity_task()) agent = DurableAIAgent(mock_context, "WriterAgent") thread = agent.get_new_thread() # Call run() to trigger entity ID creation - run_gen = agent.run("Test", thread=thread) - next(run_gen) + agent.run("Test", thread=thread) # Verify call_entity was called with correct EntityId call_args = mock_context.call_entity.call_args @@ -357,13 +368,9 @@ def test_sequential_agent_calls_simulation(self) -> None: # Track entity calls entity_calls: list[dict[str, Any]] = [] - def mock_call_entity_side_effect(entity_id: Any, operation: str, input_data: dict[str, Any]) -> Mock: + def mock_call_entity_side_effect(entity_id: Any, operation: str, input_data: dict[str, Any]) -> TaskBase: entity_calls.append({"entity_id": str(entity_id), "operation": operation, "input": input_data}) - - # Return a mock Task - mock_task = Mock() - mock_task._is_scheduled = False - return mock_task + return _create_entity_task() mock_context.call_entity = Mock(side_effect=mock_call_entity_side_effect) @@ -373,13 +380,13 @@ def mock_call_entity_side_effect(entity_id: Any, operation: str, input_data: dic # Create thread thread = agent.get_new_thread() - # First call - returns Task when generator is advanced - task1 = next(agent.run("Write something", thread=thread)) - assert hasattr(task1, "_is_scheduled") + # First call - returns AgentTask + task1 = agent.run("Write something", thread=thread) + assert isinstance(task1, AgentTask) - # Second call - returns Task - task2 = next(agent.run("Improve: something", thread=thread)) - assert hasattr(task2, "_is_scheduled") + # Second call - returns AgentTask + task2 = agent.run("Improve: something", thread=thread) + assert isinstance(task2, AgentTask) # Verify both calls used the same entity (same session key) assert len(entity_calls) == 2 @@ -399,11 +406,9 @@ def test_multiple_agents_in_orchestration(self) -> None: entity_calls: list[str] = [] - def mock_call_entity_side_effect(entity_id: Any, operation: str, input_data: dict[str, Any]) -> Mock: + def mock_call_entity_side_effect(entity_id: Any, operation: str, input_data: dict[str, Any]) -> TaskBase: entity_calls.append(str(entity_id)) - mock_task = Mock() - mock_task._is_scheduled = False - return mock_task + return _create_entity_task() mock_context.call_entity = Mock(side_effect=mock_call_entity_side_effect) @@ -414,12 +419,12 @@ def mock_call_entity_side_effect(entity_id: Any, operation: str, input_data: dic writer_thread = writer.get_new_thread() editor_thread = editor.get_new_thread() - # Call both agents - returns Tasks - writer_task = next(writer.run("Write", thread=writer_thread)) - editor_task = next(editor.run("Edit", thread=editor_thread)) + # Call both agents - returns AgentTasks + writer_task = writer.run("Write", thread=writer_thread) + editor_task = editor.run("Edit", thread=editor_thread) - assert hasattr(writer_task, "_is_scheduled") - assert hasattr(editor_task, "_is_scheduled") + assert isinstance(writer_task, AgentTask) + assert isinstance(editor_task, AgentTask) # Verify different entity IDs were used assert len(entity_calls) == 2 diff --git a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py index a9c181e99f..cc05a323f3 100644 --- a/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py +++ b/python/samples/getting_started/azure_functions/04_single_agent_orchestration_chaining/function_app.py @@ -50,7 +50,7 @@ def single_agent_orchestration(context: DurableOrchestrationContext): writer = app.get_agent(context, WRITER_AGENT_NAME) writer_thread = writer.get_new_thread() - initial = yield from writer.run( + initial = yield writer.run( messages="Write a concise inspirational sentence about learning.", thread=writer_thread, ) @@ -60,7 +60,7 @@ def single_agent_orchestration(context: DurableOrchestrationContext): f"{initial.text}" ) - refined = yield from writer.run( + refined = yield writer.run( messages=improved_prompt, thread=writer_thread, ) diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py index 620660c09c..66743f210c 100644 --- a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py @@ -10,8 +10,9 @@ import json import logging -from typing import Any +from typing import Any, cast +from agent_framework import AgentRunResponse import azure.functions as func from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient from azure.durable_functions import DurableOrchestrationClient, DurableOrchestrationContext @@ -51,15 +52,7 @@ def _create_agents() -> list[Any]: # 4. Durable Functions orchestration that runs both agents in parallel. @app.orchestration_trigger(context_name="context") def multi_agent_concurrent_orchestration(context: DurableOrchestrationContext): - """Fan out to two domain-specific agents and aggregate their responses. - - Note: The generator protocol lets us extract the Durable Task objects so we - can pass them to task_all for true parallelism. If you only use `yield - from` on each run call, the agents execute sequentially. - For sequential execution, you can simply use: - physicist_result = yield from physicist.run(messages=prompt, thread=physicist_thread) - chemist_result = yield from chemist.run(messages=prompt, thread=chemist_thread) - """ + """Fan out to two domain-specific agents and aggregate their responses.""" prompt = context.get_input() if not prompt or not str(prompt).strip(): @@ -75,30 +68,10 @@ def multi_agent_concurrent_orchestration(context: DurableOrchestrationContext): physicist_gen = physicist.run(messages=str(prompt), thread=physicist_thread) chemist_gen = chemist.run(messages=str(prompt), thread=chemist_thread) - # Advance each generator to extract the underlying Durable Task objects - physicist_task = next(physicist_gen) - chemist_task = next(chemist_gen) - - # Execute both tasks concurrently using task_all - task_results = yield context.task_all([physicist_task, chemist_task]) - - # Complete the generator protocol by sending results back - # Each generator returns its final value via StopIteration.value - physicist_result = None - chemist_result = None - - try: - physicist_gen.send(task_results[0]) - except StopIteration as e: - physicist_result = e.value - - try: - chemist_gen.send(task_results[1]) - except StopIteration as e: - chemist_result = e.value - - if physicist_result is None or chemist_result is None: - raise ValueError("Failed to get results from one or both agents") + task_results = yield context.task_all([physicist_gen, chemist_gen]) + + physicist_result = cast(AgentRunResponse, task_results[0]) + chemist_result = cast(AgentRunResponse, task_results[1]) return { "physicist": physicist_result.text, diff --git a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py index 6f224dbf9f..2ee445d423 100644 --- a/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py +++ b/python/samples/getting_started/azure_functions/06_multi_agent_orchestration_conditionals/function_app.py @@ -96,7 +96,7 @@ def spam_detection_orchestration(context: DurableOrchestrationContext): f"Content: {payload.email_content}" ) - spam_result_raw = yield from spam_agent.run( + spam_result_raw = yield spam_agent.run( messages=spam_prompt, thread=spam_thread, response_format=SpamDetectionResult, @@ -117,7 +117,7 @@ def spam_detection_orchestration(context: DurableOrchestrationContext): f"Content: {payload.email_content}" ) - email_result_raw = yield from email_agent.run( + email_result_raw = yield email_agent.run( messages=email_prompt, thread=email_thread, response_format=EmailResponse, diff --git a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py index b2894c9b4c..c8e2bbaa9c 100644 --- a/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py +++ b/python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/function_app.py @@ -95,7 +95,7 @@ def content_generation_hitl_orchestration(context: DurableOrchestrationContext): context.set_custom_status(f"Starting content generation for topic: {payload.topic}") - initial_raw = yield from writer.run( + initial_raw = yield writer.run( messages=f"Write a short article about '{payload.topic}'.", thread=writer_thread, response_format=GeneratedContent, @@ -142,7 +142,7 @@ def content_generation_hitl_orchestration(context: DurableOrchestrationContext): "The content was rejected by a human reviewer. Please rewrite the article incorporating their feedback.\n\n" f"Human Feedback: {approval_payload.feedback or 'No feedback provided.'}" ) - rewritten_raw = yield from writer.run( + rewritten_raw = yield writer.run( messages=rewrite_prompt, thread=writer_thread, response_format=GeneratedContent, From 867ff67d91121d6324f9cabc06681788da8838a1 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Tue, 25 Nov 2025 15:51:25 -0800 Subject: [PATCH 09/11] Address comments --- .../_orchestration.py | 6 +- .../tests/test_orchestration.py | 118 +++++++++++++++++- .../function_app.py | 9 +- 3 files changed, 119 insertions(+), 14 deletions(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py index 7017016fbc..19bf6c9e9a 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py @@ -54,7 +54,6 @@ def __init__( self, entity_task: TaskBase, response_format: type[BaseModel] | None, - agent: "DurableAIAgent", correlation_id: str, ): """Initialize the AgentTask. @@ -62,12 +61,10 @@ def __init__( Args: entity_task: The underlying entity call task response_format: Optional Pydantic model for response parsing - agent: Reference to the DurableAIAgent for response conversion correlation_id: Correlation ID for logging """ super().__init__([entity_task]) self._response_format = response_format - self._agent = agent self._correlation_id = correlation_id # Override action_repr to expose the inner task's action directly @@ -77,7 +74,7 @@ def __init__( # Also copy the task ID to match the entity task's identity self.id = entity_task.id - def try_set_value(self, child: TaskBase): + def try_set_value(self, child: TaskBase) -> None: """Intercept task completion and convert raw result to typed AgentRunResponse. This method delegates to the parent WhenAllTask behavior but intercepts @@ -294,7 +291,6 @@ def my_orchestration(context): agent_task = AgentTask( entity_task=entity_task, response_format=response_format, - agent=self, correlation_id=correlation_id, ) diff --git a/python/packages/azurefunctions/tests/test_orchestration.py b/python/packages/azurefunctions/tests/test_orchestration.py index d7d3d2fbff..c65724c160 100644 --- a/python/packages/azurefunctions/tests/test_orchestration.py +++ b/python/packages/azurefunctions/tests/test_orchestration.py @@ -43,12 +43,8 @@ class TestAgentResponseHelpers: @staticmethod def _create_agent_task() -> AgentTask: - mock_context = Mock() - mock_context.instance_id = "test-instance" - mock_context.new_uuid = Mock(return_value="helper-guid") - agent = DurableAIAgent(mock_context, "HelperAgent") entity_task = _create_entity_task() - return AgentTask(entity_task, None, agent, "correlation-id") + return AgentTask(entity_task, None, "correlation-id") def test_load_agent_response_from_instance(self) -> None: task = self._create_agent_task() @@ -77,6 +73,118 @@ def test_load_agent_response_rejects_none(self) -> None: with pytest.raises(ValueError): task._load_agent_response(None) + def test_load_agent_response_rejects_unsupported_type(self) -> None: + task = self._create_agent_task() + + with pytest.raises(TypeError, match="Unsupported type"): + task._load_agent_response(["invalid", "list"]) # type: ignore[arg-type] + + def test_try_set_value_success(self) -> None: + """Test try_set_value correctly processes successful task completion.""" + entity_task = _create_entity_task() + task = AgentTask(entity_task, None, "correlation-id") + + # Simulate successful entity task completion + entity_task.state = TaskState.SUCCEEDED + entity_task.result = AgentRunResponse(messages=[ChatMessage(role="assistant", text="Test response")]).to_dict() + + # Clear pending_tasks to simulate that parent has processed the child + task.pending_tasks.clear() + + # Call try_set_value + task.try_set_value(entity_task) + + # Verify task completed successfully with AgentRunResponse + assert task.state == TaskState.SUCCEEDED + assert isinstance(task.result, AgentRunResponse) + assert task.result.text == "Test response" + + def test_try_set_value_failure(self) -> None: + """Test try_set_value correctly handles failed task completion.""" + entity_task = _create_entity_task() + task = AgentTask(entity_task, None, "correlation-id") + + # Simulate failed entity task + entity_task.state = TaskState.FAILED + entity_task.result = Exception("Entity call failed") + + # Call try_set_value + task.try_set_value(entity_task) + + # Verify task failed with the error + assert task.state == TaskState.FAILED + assert isinstance(task.result, Exception) + assert str(task.result) == "Entity call failed" + + def test_try_set_value_with_response_format(self) -> None: + """Test try_set_value parses structured output when response_format is provided.""" + from pydantic import BaseModel + + class TestSchema(BaseModel): + answer: str + + entity_task = _create_entity_task() + task = AgentTask(entity_task, TestSchema, "correlation-id") + + # Simulate successful entity task with JSON response + entity_task.state = TaskState.SUCCEEDED + entity_task.result = AgentRunResponse( + messages=[ChatMessage(role="assistant", text='{"answer": "42"}')] + ).to_dict() + + # Clear pending_tasks to simulate that parent has processed the child + task.pending_tasks.clear() + + # Call try_set_value + task.try_set_value(entity_task) + + # Verify task completed and value was parsed + assert task.state == TaskState.SUCCEEDED + assert isinstance(task.result, AgentRunResponse) + assert isinstance(task.result.value, TestSchema) + assert task.result.value.answer == "42" + + def test_ensure_response_format_parses_value(self) -> None: + """Test _ensure_response_format correctly parses response value.""" + from pydantic import BaseModel + + class SampleSchema(BaseModel): + name: str + + task = self._create_agent_task() + response = AgentRunResponse(messages=[ChatMessage(role="assistant", text='{"name": "test"}')]) + + # Value should be None initially + assert response.value is None + + # Parse the value + task._ensure_response_format(SampleSchema, "test-correlation", response) + + # Value should now be parsed + assert isinstance(response.value, SampleSchema) + assert response.value.name == "test" + + def test_ensure_response_format_skips_if_already_parsed(self) -> None: + """Test _ensure_response_format does not re-parse if value already matches format.""" + from pydantic import BaseModel + + class SampleSchema(BaseModel): + name: str + + task = self._create_agent_task() + existing_value = SampleSchema(name="existing") + response = AgentRunResponse( + messages=[ChatMessage(role="assistant", text='{"name": "new"}')], + value=existing_value, + ) + + # Call _ensure_response_format + task._ensure_response_format(SampleSchema, "test-correlation", response) + + # Value should remain unchanged (not re-parsed) + assert response.value is existing_value + assert response.value.name == "existing" + class TestDurableAIAgent: """Test suite for DurableAIAgent wrapper.""" diff --git a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py index 66743f210c..69ea8816b2 100644 --- a/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py +++ b/python/samples/getting_started/azure_functions/05_multi_agent_orchestration_concurrency/function_app.py @@ -64,11 +64,12 @@ def multi_agent_concurrent_orchestration(context: DurableOrchestrationContext): physicist_thread = physicist.get_new_thread() chemist_thread = chemist.get_new_thread() - # Create generators from agent.run() calls - physicist_gen = physicist.run(messages=str(prompt), thread=physicist_thread) - chemist_gen = chemist.run(messages=str(prompt), thread=chemist_thread) + # Create tasks from agent.run() calls + physicist_task = physicist.run(messages=str(prompt), thread=physicist_thread) + chemist_task = chemist.run(messages=str(prompt), thread=chemist_thread) - task_results = yield context.task_all([physicist_gen, chemist_gen]) + # Execute both tasks concurrently using task_all + task_results = yield context.task_all([physicist_task, chemist_task]) physicist_result = cast(AgentRunResponse, task_results[0]) chemist_result = cast(AgentRunResponse, task_results[1]) From 09a639ea6435e37dda7e680798b19ccf1abe2654 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Tue, 25 Nov 2025 17:09:50 -0800 Subject: [PATCH 10/11] use lazy logging --- .../_orchestration.py | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py index 19bf6c9e9a..bcb4c35e59 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py @@ -75,10 +75,7 @@ def __init__( self.id = entity_task.id def try_set_value(self, child: TaskBase) -> None: - """Intercept task completion and convert raw result to typed AgentRunResponse. - - This method delegates to the parent WhenAllTask behavior but intercepts - successful completion to transform the result. + """Transition the AgentTask to a terminal state and set its value to `AgentRunResponse`. Parameters ---------- @@ -108,14 +105,13 @@ def try_set_value(self, child: TaskBase) -> None: # Set the typed AgentRunResponse as this task's result self.set_value(is_error=False, value=response) except Exception as e: - logger.error( - "[AgentTask] Failed to convert result for correlation_id %s: %s", + logger.exception( + "[AgentTask] Failed to convert result for correlation_id: %s", self._correlation_id, - e, ) self.set_value(is_error=True, value=e) - else: # child.state is TaskState.FAILED - # Delegate error handling to parent class + else: + # If error not handled by the parent, set it explicitly. if self._first_error is None: self._first_error = child.result self.set_value(is_error=True, value=self._first_error) @@ -125,7 +121,7 @@ def _load_agent_response(self, agent_response: AgentRunResponse | dict[str, Any] if agent_response is None: raise ValueError("agent_response cannot be None") - logger.debug(f"[load_agent_response] Loading agent response of type: {type(agent_response)}") + logger.debug("[load_agent_response] Loading agent response of type: %s", type(agent_response)) if isinstance(agent_response, AgentRunResponse): return agent_response @@ -188,7 +184,7 @@ def __init__(self, context: AgentOrchestrationContextType, agent_name: str): self._name = agent_name self._display_name = agent_name self._description = f"Durable agent proxy for {agent_name}" - logger.debug(f"[DurableAIAgent] Initialized for agent: {agent_name}") + logger.debug("[DurableAIAgent] Initialized for agent: %s", agent_name) @property def id(self) -> str: @@ -258,7 +254,7 @@ def my_orchestration(context): # This ensures each call gets its own conversation context session_key = str(self.context.new_uuid()) session_id = AgentSessionId(name=self.agent_name, key=session_key) - logger.warning(f"[DurableAIAgent] No thread provided, created unique session_id: {session_id}") + logger.warning("[DurableAIAgent] No thread provided, created unique session_id: %s", session_id) # Create entity ID from session ID entity_id = session_id.to_entity_id() @@ -282,7 +278,7 @@ def my_orchestration(context): response_format=response_format, ) - logger.debug(f"[DurableAIAgent] Calling entity {entity_id} with message: {message_str[:100]}...") + logger.debug("[DurableAIAgent] Calling entity %s with message: %s", entity_id, message_str[:100]) # Call the entity to get the underlying task entity_task = self.context.call_entity(entity_id, "run_agent", run_request.to_dict()) @@ -334,7 +330,7 @@ def get_new_thread(self, **kwargs: Any) -> AgentThread: thread = DurableAgentThread.from_session_id(session_id, **kwargs) - logger.debug(f"[DurableAIAgent] Created new thread with session_id: {session_id}") + logger.debug("[DurableAIAgent] Created new thread with session_id: %s", session_id) return thread def _messages_to_string(self, messages: list[ChatMessage]) -> str: From af331dd0abc196c25f54ac2f3813114e8fbe2cb9 Mon Sep 17 00:00:00 2001 From: Laveesh Rohra Date: Wed, 26 Nov 2025 10:05:32 -0800 Subject: [PATCH 11/11] fix mypy errors --- .../agent_framework_azurefunctions/_orchestration.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py index bcb4c35e59..fb6613b85b 100644 --- a/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py +++ b/python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py @@ -30,7 +30,9 @@ if TYPE_CHECKING: from azure.durable_functions import DurableOrchestrationContext - class _TypedCompoundTask(CompoundTask): + class _TypedCompoundTask(CompoundTask): # type: ignore[misc] + _first_error: Any + def __init__( self, tasks: list[TaskBase],