Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ agents.md
# AI
.claude/
WARP.md
**/memory-bank/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

**/projectBrief.md

# Azurite storage emulator files
*/__azurite_db_blob__.json
Expand Down
134 changes: 83 additions & 51 deletions python/packages/azurefunctions/agent_framework_azurefunctions/_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
from ._constants import (
DEFAULT_MAX_POLL_RETRIES,
DEFAULT_POLL_INTERVAL_SECONDS,
RESPONSE_FORMAT_JSON,
RESPONSE_FORMAT_TEXT,
MIMETYPE_APPLICATION_JSON,
MIMETYPE_TEXT_PLAIN,
REQUEST_RESPONSE_FORMAT_JSON,
REQUEST_RESPONSE_FORMAT_TEXT,
THREAD_ID_FIELD,
THREAD_ID_HEADER,
WAIT_FOR_RESPONSE_FIELD,
WAIT_FOR_RESPONSE_HEADER,
)
Expand Down Expand Up @@ -316,11 +319,11 @@ async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClien
"""
logger.debug(f"[HTTP Trigger] Received request on route: /api/agents/{agent_name}/run")

response_format: str = RESPONSE_FORMAT_JSON
request_response_format: str = REQUEST_RESPONSE_FORMAT_JSON
thread_id: str | None = None

try:
req_body, message, response_format = self._parse_incoming_request(req)
req_body, message, request_response_format = self._parse_incoming_request(req)
thread_id = self._resolve_thread_id(req=req, req_body=req_body)
wait_for_response = self._should_wait_for_response(req=req, req_body=req_body)

Expand All @@ -333,7 +336,7 @@ async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClien
return self._create_http_response(
payload={"error": "Message is required"},
status_code=400,
response_format=response_format,
request_response_format=request_response_format,
thread_id=thread_id,
)

Expand All @@ -350,6 +353,7 @@ async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClien
message,
thread_id,
correlation_id,
request_response_format,
)
logger.debug("Signalling entity %s with request: %s", entity_instance_id, run_request)
await client.signal_entity(entity_instance_id, "run_agent", run_request)
Expand All @@ -369,7 +373,7 @@ async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClien
return self._create_http_response(
payload=result,
status_code=200 if result.get("status") == "success" else 500,
response_format=response_format,
request_response_format=request_response_format,
thread_id=thread_id,
)

Expand All @@ -382,7 +386,7 @@ async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClien
return self._create_http_response(
payload=accepted_response,
status_code=202,
response_format=response_format,
request_response_format=request_response_format,
thread_id=thread_id,
)

Expand All @@ -391,23 +395,23 @@ async def http_start(req: func.HttpRequest, client: df.DurableOrchestrationClien
return self._create_http_response(
payload={"error": str(exc)},
status_code=exc.status_code,
response_format=response_format,
request_response_format=request_response_format,
thread_id=thread_id,
)
except ValueError as exc:
logger.error(f"[HTTP Trigger] Invalid JSON: {exc!s}")
return self._create_http_response(
payload={"error": "Invalid JSON"},
status_code=400,
response_format=response_format,
request_response_format=request_response_format,
thread_id=thread_id,
)
except Exception as exc:
logger.error(f"[HTTP Trigger] Error: {exc!s}", exc_info=True)
return self._create_http_response(
payload={"error": str(exc)},
status_code=500,
response_format=response_format,
request_response_format=request_response_format,
thread_id=thread_id,
)

Expand Down Expand Up @@ -465,7 +469,7 @@ def health_check(req: func.HttpRequest) -> func.HttpResponse:
return func.HttpResponse(
json.dumps({"status": "healthy", "agents": agent_info, "agent_count": len(self.agents)}),
status_code=200,
mimetype="application/json",
mimetype=MIMETYPE_APPLICATION_JSON,
)

_ = health_check
Expand Down Expand Up @@ -501,9 +505,7 @@ async def _read_cached_state(

typed_state_payload = cast(dict[str, Any], state_payload)

agent_state = DurableAgentState()
agent_state.restore_state(typed_state_payload)
return agent_state
return DurableAgentState.from_dict(typed_state_payload)

async def _get_response_from_entity(
self,
Expand Down Expand Up @@ -579,31 +581,58 @@ async def _poll_entity_for_response(

return result

async def _build_timeout_result(self, message: str, thread_id: str, correlation_id: str) -> dict[str, Any]:
"""Create the timeout response."""
return {
"response": "Agent is still processing or timed out...",
def _build_response_payload(
self,
*,
response: str | None,
message: str,
thread_id: str,
status: str,
correlation_id: str,
extra_fields: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Create a consistent response structure and allow optional extra fields."""
payload = {
"response": response,
"message": message,
THREAD_ID_FIELD: thread_id,
"status": "timeout",
"correlationId": correlation_id,
"status": status,
"correlation_id": correlation_id,
}
if extra_fields:
payload.update(extra_fields)
return payload

async def _build_timeout_result(self, message: str, thread_id: str, correlation_id: str) -> dict[str, Any]:
"""Create the timeout response."""
return self._build_response_payload(
response="Agent is still processing or timed out...",
message=message,
thread_id=thread_id,
status="timeout",
correlation_id=correlation_id,
)

def _build_success_result(
self, response_data: dict[str, Any], message: str, thread_id: str, correlation_id: str, state: DurableAgentState
) -> dict[str, Any]:
"""Build the success result returned to the HTTP caller."""
return {
"response": response_data.get("content"),
"message": message,
THREAD_ID_FIELD: thread_id,
"status": "success",
"message_count": response_data.get("message_count", state.message_count),
"correlationId": correlation_id,
}
return self._build_response_payload(
response=response_data.get("content"),
message=message,
thread_id=thread_id,
status="success",
correlation_id=correlation_id,
extra_fields={"message_count": response_data.get("message_count", state.message_count)},
)

def _build_request_data(
self, req_body: dict[str, Any], message: str, thread_id: str, correlation_id: str
self,
req_body: dict[str, Any],
message: str,
thread_id: str,
correlation_id: str,
request_response_format: str,
) -> dict[str, Any]:
"""Create the durable entity request payload."""
enable_tool_calls_value = req_body.get("enable_tool_calls")
Expand All @@ -612,6 +641,7 @@ def _build_request_data(
return RunRequest(
message=message,
role=req_body.get("role"),
request_response_format=request_response_format,
response_format=req_body.get("response_format"),
enable_tool_calls=enable_tool_calls,
thread_id=thread_id,
Expand All @@ -620,23 +650,23 @@ def _build_request_data(

def _build_accepted_response(self, message: str, thread_id: str, correlation_id: str) -> dict[str, Any]:
"""Build the response returned when not waiting for completion."""
return {
"response": "Agent request accepted",
"message": message,
THREAD_ID_FIELD: thread_id,
"status": "accepted",
"correlationId": correlation_id,
}
return self._build_response_payload(
response="Agent request accepted",
message=message,
thread_id=thread_id,
status="accepted",
correlation_id=correlation_id,
)

def _create_http_response(
self,
payload: dict[str, Any] | str,
status_code: int,
response_format: str,
request_response_format: str,
thread_id: str | None,
) -> func.HttpResponse:
"""Create the HTTP response using helper serializers for clarity."""
if response_format == RESPONSE_FORMAT_TEXT:
if request_response_format == REQUEST_RESPONSE_FORMAT_TEXT:
return self._build_plain_text_response(payload=payload, status_code=status_code, thread_id=thread_id)

return self._build_json_response(payload=payload, status_code=status_code)
Expand All @@ -649,13 +679,13 @@ def _build_plain_text_response(
) -> func.HttpResponse:
"""Return a plain-text response with optional thread identifier header."""
body_text = payload if isinstance(payload, str) else self._convert_payload_to_text(payload)
headers = {"x-ms-thread-id": thread_id} if thread_id is not None else None
return func.HttpResponse(body_text, status_code=status_code, mimetype="text/plain", headers=headers)
headers = {THREAD_ID_HEADER: thread_id} if thread_id is not None else None
return func.HttpResponse(body_text, status_code=status_code, mimetype=MIMETYPE_TEXT_PLAIN, headers=headers)

def _build_json_response(self, payload: dict[str, Any] | str, status_code: int) -> func.HttpResponse:
"""Return the JSON response, serializing dictionaries as needed."""
body_json = payload if isinstance(payload, str) else json.dumps(payload)
return func.HttpResponse(body_json, status_code=status_code, mimetype="application/json")
return func.HttpResponse(body_json, status_code=status_code, mimetype=MIMETYPE_APPLICATION_JSON)

def _convert_payload_to_text(self, payload: dict[str, Any]) -> str:
"""Convert a structured payload into a human-readable text response."""
Expand Down Expand Up @@ -701,10 +731,12 @@ def _parse_incoming_request(self, req: func.HttpRequest) -> tuple[dict[str, Any]
normalized_content_type = self._extract_content_type(headers)
body_parser, body_format = self._select_body_parser(normalized_content_type)
prefers_json = self._accepts_json_response(headers)
response_format = self._select_response_format(body_format=body_format, prefers_json=prefers_json)
request_response_format = self._select_request_response_format(
body_format=body_format, prefers_json=prefers_json
)

req_body, message = body_parser(req)
return req_body, message, response_format
return req_body, message, request_response_format

def _extract_normalized_headers(self, req: func.HttpRequest) -> dict[str, str]:
"""Create a lowercase header mapping from the incoming request."""
Expand All @@ -727,9 +759,9 @@ def _select_body_parser(
normalized_content_type: str,
) -> tuple[Callable[[func.HttpRequest], tuple[dict[str, Any], str]], str]:
"""Choose the body parser and declared body format."""
if normalized_content_type in {"application/json"} or normalized_content_type.endswith("+json"):
return self._parse_json_body, RESPONSE_FORMAT_JSON
return self._parse_text_body, RESPONSE_FORMAT_TEXT
if normalized_content_type in {MIMETYPE_APPLICATION_JSON} or normalized_content_type.endswith("+json"):
return self._parse_json_body, REQUEST_RESPONSE_FORMAT_JSON
return self._parse_text_body, REQUEST_RESPONSE_FORMAT_TEXT

@staticmethod
def _accepts_json_response(headers: dict[str, str]) -> bool:
Expand All @@ -740,16 +772,16 @@ def _accepts_json_response(headers: dict[str, str]) -> bool:

for value in accept_header.split(","):
media_type = value.split(";")[0].strip().lower()
if media_type == "application/json":
if media_type == MIMETYPE_APPLICATION_JSON:
return True
return False

@staticmethod
def _select_response_format(body_format: str, prefers_json: bool) -> str:
def _select_request_response_format(body_format: str, prefers_json: bool) -> str:
"""Combine body format and accept preference to determine response format."""
if body_format == RESPONSE_FORMAT_JSON or prefers_json:
return RESPONSE_FORMAT_JSON
return RESPONSE_FORMAT_TEXT
if body_format == REQUEST_RESPONSE_FORMAT_JSON or prefers_json:
return REQUEST_RESPONSE_FORMAT_JSON
return REQUEST_RESPONSE_FORMAT_TEXT

@staticmethod
def _parse_json_body(req: func.HttpRequest) -> tuple[dict[str, Any], str]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Copyright (c) Microsoft. All rights reserved.

"""Constants used across the Azure Functions agent framework."""
"""Constants for Azure Functions Agent Framework integration."""

# Response format constants
RESPONSE_FORMAT_JSON: str = "json"
RESPONSE_FORMAT_TEXT: str = "text"
# Supported request/response formats and MIME types
REQUEST_RESPONSE_FORMAT_JSON: str = "json"
REQUEST_RESPONSE_FORMAT_TEXT: str = "text"
MIMETYPE_APPLICATION_JSON: str = "application/json"
MIMETYPE_TEXT_PLAIN: str = "text/plain"

# Field and header names
THREAD_ID_FIELD: str = "thread_id"
THREAD_ID_HEADER: str = "x-ms-thread-id"
WAIT_FOR_RESPONSE_FIELD: str = "wait_for_response"
WAIT_FOR_RESPONSE_HEADER: str = "x-ms-wait-for-response"

Expand Down
Loading