Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
)
from dateutil import parser as date_parser

from ._models import RunRequest, _serialize_response_format
from ._models import RunRequest, serialize_response_format

logger = get_logger("agent_framework.azurefunctions.durable_agent_state")

Expand Down Expand Up @@ -494,7 +494,7 @@ def from_run_request(request: RunRequest) -> DurableAgentStateRequest:
messages=[DurableAgentStateMessage.from_run_request(request)],
created_at=datetime.now(tz=timezone.utc),
response_type=request.request_response_format,
response_schema=_serialize_response_format(request.response_format),
response_schema=serialize_response_format(request.response_format),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

import asyncio
import inspect
import json
from collections.abc import AsyncIterable, Callable
from datetime import datetime, timezone
from typing import Any, cast

import azure.durable_functions as df
Expand All @@ -30,11 +28,10 @@
DurableAgentState,
DurableAgentStateData,
DurableAgentStateEntry,
DurableAgentStateMessage,
DurableAgentStateRequest,
DurableAgentStateResponse,
)
from ._models import AgentResponse, RunRequest
from ._models import RunRequest

logger = get_logger("agent_framework.azurefunctions.entities")

Expand Down Expand Up @@ -97,21 +94,16 @@ async def run_agent(
self,
context: df.DurableEntityContext,
request: RunRequest | dict[str, Any] | str,
) -> dict[str, Any]:
) -> AgentRunResponse:
"""Execute the agent with a message directly in the entity.

Args:
context: Entity context
request: RunRequest object, dict, or string message (for backward compatibility)

Returns:
Dict with status information and response (serialized AgentResponse)

Note:
The agent returns an AgentRunResponse object which is stored in state.
This method extracts the text/structured response and returns an AgentResponse dict.
AgentRunResponse enriched with execution metadata.
"""
# Convert string or dict to RunRequest
if isinstance(request, str):
run_request = RunRequest(message=request, role=Role.USER)
elif isinstance(request, dict):
Expand All @@ -135,8 +127,6 @@ async def run_agent(
logger.debug(f"[AgentEntity.run_agent] Received Message: {state_request}")

try:
logger.debug("[AgentEntity.run_agent] Starting agent invocation")

# Build messages from conversation history, excluding error responses
# Error responses are kept in history for tracking but not sent to the agent
chat_messages: list[ChatMessage] = [
Expand Down Expand Up @@ -164,83 +154,39 @@ async def run_agent(
type(agent_run_response).__name__,
)

response_text = None
structured_response = None
response_str: str | None = None

try:
if response_format:
try:
response_str = agent_run_response.text
structured_response = json.loads(response_str)
logger.debug("Parsed structured JSON response")
except json.JSONDecodeError as decode_error:
logger.warning(f"Failed to parse JSON response: {decode_error}")
response_text = response_str
else:
raw_text = agent_run_response.text
response_text = raw_text if raw_text else "No response"
preview = response_text
logger.debug(f"Response: {preview[:100]}..." if len(preview) > 100 else f"Response: {preview}")
response_text = agent_run_response.text if agent_run_response.text else "No response"
logger.debug(f"Response: {response_text[:100]}...")
except Exception as extraction_error:
logger.error(
f"Error extracting response: {extraction_error}",
"Error extracting response text: %s",
extraction_error,
exc_info=True,
)
response_text = "Error extracting response"

state_response = DurableAgentStateResponse.from_run_response(correlation_id, agent_run_response)
self.state.data.conversation_history.append(state_response)

agent_response = AgentResponse(
response=response_text,
message=str(message),
thread_id=str(thread_id),
status="success",
message_count=len(self.state.data.conversation_history),
structured_response=structured_response,
)
result = agent_response.to_dict()

logger.debug("[AgentEntity.run_agent] AgentRunResponse stored in conversation history")

return result
return agent_run_response

except Exception as exc:
import traceback

error_traceback = traceback.format_exc()
logger.error("[AgentEntity.run_agent] Agent execution failed")
logger.error(f"Error: {exc!s}")
logger.error(f"Error type: {type(exc).__name__}")
logger.error(f"Full traceback:\n{error_traceback}")
logger.exception("[AgentEntity.run_agent] Agent execution failed.")

# Create error message
error_message = DurableAgentStateMessage.from_chat_message(
ChatMessage(
role=Role.ASSISTANT, contents=[ErrorContent(message=str(exc), error_code=type(exc).__name__)]
)
error_message = ChatMessage(
role=Role.ASSISTANT, contents=[ErrorContent(message=str(exc), error_code=type(exc).__name__)]
)

error_response = AgentRunResponse(messages=[error_message])

# Create and store error response in conversation history
error_state_response = DurableAgentStateResponse(
correlation_id=correlation_id,
created_at=datetime.now(tz=timezone.utc),
messages=[error_message],
is_error=True,
)
error_state_response = DurableAgentStateResponse.from_run_response(correlation_id, error_response)
error_state_response.is_error = True
self.state.data.conversation_history.append(error_state_response)

error_response = AgentResponse(
response=f"Error: {exc!s}",
message=str(message),
thread_id=str(thread_id),
status="error",
message_count=len(self.state.data.conversation_history),
error=str(exc),
error_type=type(exc).__name__,
)
return error_response.to_dict()
return error_response

async def _invoke_agent(
self,
Expand Down Expand Up @@ -432,7 +378,7 @@ async def _entity_coroutine(context: df.DurableEntityContext) -> None:
request = "" if input_data is None else str(cast(object, input_data))

result = await entity.run_agent(context, request)
context.set_result(result)
context.set_result(result.to_dict())

elif operation == "reset":
entity.reset(context)
Expand All @@ -442,15 +388,13 @@ async def _entity_coroutine(context: df.DurableEntityContext) -> None:
logger.error("[entity_function] Unknown operation: %s", operation)
context.set_result({"error": f"Unknown operation: {operation}"})

logger.debug("State dict: %s", entity.state.to_dict())
context.set_state(entity.state.to_dict())
serialized_state = entity.state.to_dict()
logger.debug("State dict: %s", serialized_state)
context.set_state(serialized_state)
logger.info(f"[entity_function] Operation {operation} completed successfully")

except Exception as exc:
import traceback

logger.error("[entity_function] Error in entity: %s", exc)
logger.error(f"[entity_function] Traceback:\n{traceback.format_exc()}")
logger.exception("[entity_function] Error executing entity operation %s", exc)
context.set_result({"error": str(exc), "status": "error"})

def entity_function(context: df.DurableEntityContext) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ async def deserialize(
return thread


def _serialize_response_format(response_format: type[BaseModel] | None) -> Any:
def serialize_response_format(response_format: type[BaseModel] | None) -> Any:
"""Serialize response format for transport across durable function boundaries."""
if response_format is None:
return None
Expand Down Expand Up @@ -339,7 +339,7 @@ def to_dict(self) -> dict[str, Any]:
"request_response_format": self.request_response_format,
}
if self.response_format:
result["response_format"] = _serialize_response_format(self.response_format)
result["response_format"] = serialize_response_format(self.response_format)
if self.thread_id:
result["thread_id"] = self.thread_id
if self.correlation_id:
Expand All @@ -362,50 +362,3 @@ def from_dict(cls, data: dict[str, Any]) -> RunRequest:
correlation_id=data.get("correlationId"),
created_at=data.get("created_at"),
)


@dataclass
class AgentResponse:
"""Response from agent execution.

Attributes:
response: The agent's text response (or None for structured responses)
message: The original message sent to the agent
thread_id: The thread identifier
status: Status of the execution (success, error, etc.)
message_count: Number of messages in the conversation
error: Error message if status is error
error_type: Type of error if status is error
structured_response: Structured response if response_format was provided
"""

response: str | None
message: str
thread_id: str | None
status: str
message_count: int = 0
error: str | None = None
error_type: str | None = None
structured_response: dict[str, Any] | None = None

def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
result: dict[str, Any] = {
"message": self.message,
"thread_id": self.thread_id,
"status": self.status,
"message_count": self.message_count,
}

# Add response or structured_response based on what's available
if self.structured_response is not None:
result["structured_response"] = self.structured_response
elif self.response is not None:
result["response"] = self.response

if self.error:
result["error"] = self.error
if self.error_type:
result["error_type"] = self.error_type

return result
Loading
Loading