.NET: Python: Durable Workflow Support#2969
.NET: Python: Durable Workflow Support#2969ahmedmuhsin wants to merge 19 commits intomicrosoft:feature-durabletask-pythonfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds workflow orchestration support to the agent-framework-azurefunctions package, enabling execution of MAF (Microsoft Agent Framework) Workflow and WorkflowBuilder graphs inside Azure Durable Functions orchestrations. The implementation provides durable state persistence, cloud-native scalability, and support for complex routing patterns while bridging MAF's declarative workflow model with Durable Functions' orchestration capabilities.
Key Changes
- New workflow execution engine that adapts MAF workflows to Durable Functions' generator-based model
- Durable shared state implementation backed by Azure Durable Entities
- Serialization utilities for workflow messages and state management
- Two sample implementations demonstrating workflow usage with and without shared state
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 20 comments.
Show a summary per file
| File | Description |
|---|---|
python/packages/azurefunctions/agent_framework_azurefunctions/_workflow.py |
Core workflow orchestration engine with routing logic and executor execution |
python/packages/azurefunctions/agent_framework_azurefunctions/_shared_state.py |
DurableSharedState entity for cross-executor state sharing |
python/packages/azurefunctions/agent_framework_azurefunctions/_utils.py |
Serialization/deserialization utilities and capturing context implementation |
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py |
Integration of workflow support into AgentFunctionApp with HTTP endpoints |
python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py |
Added created_at timestamp to RunRequest |
python/packages/azurefunctions/agent_framework_azurefunctions/__init__.py |
Exported DurableSharedState class |
python/samples/getting_started/azure_functions/09_workflow_shared_state/ |
Sample demonstrating workflow with shared state (research/writer pattern) |
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/ |
Sample demonstrating workflow without shared state (spam detection pattern) |
Comments suppressed due to low confidence (1)
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:931
- This import of module asyncio is redundant, as it was previously imported on line 9.
import asyncio
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/function_app.py
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/README.md
Outdated
Show resolved
Hide resolved
python/packages/azurefunctions/agent_framework_azurefunctions/_utils.py
Outdated
Show resolved
Hide resolved
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/function_app.py
Outdated
Show resolved
Hide resolved
python/packages/azurefunctions/agent_framework_azurefunctions/_workflow.py
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/requirements.txt
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/09_workflow_shared_state/function_app.py
Show resolved
Hide resolved
python/packages/azurefunctions/agent_framework_azurefunctions/_utils.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 22 out of 22 changed files in this pull request and generated 13 comments.
Comments suppressed due to low confidence (1)
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:960
- This import of module asyncio is redundant, as it was previously imported on line 9.
import asyncio
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/host.json
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/function_app.py
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/09_workflow_shared_state/requirements.txt
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/function_app.py
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/function_app.py
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/09_workflow_shared_state/requirements.txt
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/function_app.py
Outdated
Show resolved
Hide resolved
python/packages/azurefunctions/agent_framework_azurefunctions/_utils.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 26 changed files in this pull request and generated 16 comments.
Comments suppressed due to low confidence (1)
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:960
- This import of module asyncio is redundant, as it was previously imported on line 9.
import asyncio
|
|
||
|
|
There was a problem hiding this comment.
Trailing whitespace detected on this line. Please remove the trailing spaces to maintain code cleanliness.
| # Check for FanInEdgeGroup sources first | ||
| for group in workflow.edge_groups: | ||
| if isinstance(group, FanInEdgeGroup) and executor_id in group.source_executor_ids: | ||
| # Accumulate message for fan-in | ||
| if executor_id not in fan_in_pending[group.id]: | ||
| fan_in_pending[group.id][executor_id] = [] | ||
| fan_in_pending[group.id][executor_id].append((msg_to_route, executor_id)) | ||
| logger.debug("Accumulated message for FanIn group %s from %s", group.id, executor_id) | ||
|
|
||
| # Use MAF's edge group routing for other edge types | ||
| targets = route_message_through_edge_groups( | ||
| workflow.edge_groups, | ||
| executor_id, | ||
| msg_to_route, | ||
| ) | ||
|
|
||
| for target_id in targets: | ||
| logger.debug("Routing to %s", target_id) | ||
| if target_id not in next_pending_messages: | ||
| next_pending_messages[target_id] = [] | ||
| next_pending_messages[target_id].append((msg_to_route, executor_id)) |
There was a problem hiding this comment.
Potential logic issue with FanIn routing: When a message is routed through FanInEdgeGroup (lines 316-321), the message is accumulated but the loop continues without breaking or skipping the subsequent route_message_through_edge_groups call (lines 324-334). This means the same message could be routed both to the FanIn accumulator AND to other targets via normal edge routing, which may not be the intended behavior. Consider adding a break or continue statement after accumulating to FanIn, or using a flag to skip normal routing for messages that are part of a FanIn group.
| except Exception: | ||
| pass | ||
|
|
||
| if type_name == "AgentExecutorResponse" or ("executor_id" in data and "agent_run_response" in data): | ||
| try: | ||
| return reconstruct_agent_executor_response(data) | ||
| except Exception: | ||
| pass | ||
|
|
||
| if not type_name: | ||
| return data | ||
|
|
||
| # Try to find the type | ||
| target_type = None | ||
|
|
||
| # First check the registry | ||
| if type_registry and type_name in type_registry: | ||
| target_type = type_registry[type_name] | ||
| else: | ||
| # Try to import from module | ||
| if module_name: | ||
| try: | ||
| import importlib | ||
|
|
||
| module = importlib.import_module(module_name) | ||
| target_type = getattr(module, type_name, None) | ||
| except Exception: | ||
| # Ignore import errors - type may not be available in this context | ||
| # Will fall back to returning the raw dict below | ||
| pass | ||
|
|
||
| if target_type: | ||
| # Remove metadata before reconstruction | ||
| clean_data = {k: v for k, v in data.items() if not k.startswith("__")} | ||
| try: | ||
| if is_dataclass(target_type): | ||
| return target_type(**clean_data) | ||
| elif issubclass(target_type, BaseModel): | ||
| return target_type(**clean_data) | ||
| except Exception: | ||
| # Ignore reconstruction errors (e.g., missing fields, type mismatches) | ||
| # Will fall back to returning the raw dict below | ||
| pass |
There was a problem hiding this comment.
Bare except clauses silently swallow all exceptions including KeyboardInterrupt and SystemExit. While the comments indicate this is intentional for fallback behavior, consider catching more specific exceptions (e.g., except (ValueError, TypeError, ImportError, AttributeError)) to avoid masking programming errors or system signals.
| def test_set_workflow_id(self, context: CapturingRunnerContext) -> None: | ||
| """Test setting workflow ID.""" | ||
| context.set_workflow_id("workflow-123") | ||
| assert context._workflow_id == "workflow-123" |
There was a problem hiding this comment.
The test accesses a private attribute _workflow_id directly. This violates encapsulation and makes tests brittle to internal implementation changes. Consider either making workflow_id a public property or testing the behavior through the public API instead.
| assert context._workflow_id == "workflow-123" |
| gen = shared_state.get("my_key", default="default_val") | ||
|
|
||
| # The generator should yield the entity call | ||
| yielded = next(gen) |
There was a problem hiding this comment.
Variable yielded is not used.
|
|
||
| """Unit tests for DurableSharedState and SharedState entity.""" | ||
|
|
||
| from typing import Any |
There was a problem hiding this comment.
Import of 'Any' is not used.
| from typing import Any |
|
|
||
| """Unit tests for workflow utility functions.""" | ||
|
|
||
| import asyncio |
There was a problem hiding this comment.
Import of 'asyncio' is not used.
| import asyncio |
|
|
||
| import asyncio | ||
| from dataclasses import dataclass | ||
| from typing import Any |
There was a problem hiding this comment.
Import of 'Any' is not used.
| from typing import Any |
| import json | ||
| from dataclasses import dataclass | ||
| from typing import Any | ||
| from unittest.mock import Mock, patch |
There was a problem hiding this comment.
Import of 'Mock' is not used.
Import of 'patch' is not used.
| from unittest.mock import Mock, patch |
| from typing import Any | ||
| from unittest.mock import Mock, patch | ||
|
|
||
| import pytest |
There was a problem hiding this comment.
Import of 'pytest' is not used.
| import pytest |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 26 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (2)
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:113
- The docstring example imports from
agent_framework.azure, but the actual import should beagent_framework_azurefunctions. Update the docstring example to use the correct import path:from agent_framework_azurefunctions import AgentFunctionAppandfrom agent_framework.azure import AzureOpenAIChatClient.
from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:962
- This import of module asyncio is redundant, as it was previously imported on line 9.
import asyncio
|
|
||
|
|
There was a problem hiding this comment.
Remove trailing whitespace on line 224.
| message_content = _extract_message_content_from_dict(message) | ||
| elif isinstance(message, str): | ||
| message_content = message | ||
|
|
||
| return message_content | ||
|
|
||
|
|
||
| def _extract_message_content_from_dict(message: dict[str, Any]) -> str: |
There was a problem hiding this comment.
The function name _extract_message_content_from_dict is verbose. Consider renaming to _extract_dict_content for brevity while maintaining clarity.
| message_content = _extract_message_content_from_dict(message) | |
| elif isinstance(message, str): | |
| message_content = message | |
| return message_content | |
| def _extract_message_content_from_dict(message: dict[str, Any]) -> str: | |
| message_content = _extract_dict_content(message) | |
| elif isinstance(message, str): | |
| message_content = message | |
| return message_content | |
| def _extract_dict_content(message: dict[str, Any]) -> str: |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 33 out of 34 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:960
- This import of module asyncio is redundant, as it was previously imported on line 9.
import asyncio
|
|
||
| import logging | ||
| import os | ||
| from typing import Any, Dict |
There was a problem hiding this comment.
Import of 'Dict' is not used.
| from typing import Any, Dict | |
| from typing import Any |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 30 out of 31 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (1)
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:953
- This import of module asyncio is redundant, as it was previously imported on line 9.
import asyncio
|
This PR is stale, and there have been lots of changes to the azurefunction and durabletask packages. |
Motivation and Context
This PR adds Workflow support to the
agent-framework-azurefunctionsPython package, enabling developers to run MAF (Microsoft Agent Framework) Workflows on Azure Durable Functions.Why is this change required?
The existing
agent-framework-azurefunctionspackage provides durable AI agent support via Durable Entities, but lacks the ability to orchestrate complex multi-agent workflows. Developers building sophisticated AI applications need the ability to:What problem does it solve?
Prior to this change, developers had to manually implement workflow orchestration patterns, manage state across agent invocations, and handle message routing logic. This PR integrates MAF's Workflow abstraction with Azure Durable Functions, providing:
Scenario Contribution:
This enables scenarios such as:
Description
Overview
This PR introduces a complete workflow execution engine for Azure Durable Functions that reuses MAF's workflow abstractions while adapting execution to the Durable Functions generator-based model (
yieldinstead ofawait).Key Components
1.
AgentFunctionAppWorkflow Integration (_app.py)The
AgentFunctionAppnow accepts an optionalworkflowparameter:When a workflow is provided:
AgentExecutorinstances in the workflowExecuteExecutoractivity is registered for standard (non-agent) executors/api/workflow/run,/api/workflow/status/{instanceId}) are createdSharedStateEntityis registered (unlessenable_shared_state=False)2.
DurableSharedState(_shared_state.py)A generator-based
SharedStatewrapper for orchestration-side access to durable shared state:Key design decisions:
SharedStateget,set,has,delete,get_all,update,clear3. Workflow Orchestrator (
_workflow.py)The
run_workflow_orchestrator()function is the main orchestration engine:start_executor_idSingleEdgeGroup,SwitchCaseEdgeGroup,FanOutEdgeGroup,FanInEdgeGroup)AgentExecutorinstances via Durable Entities (for durability)ExecuteExecutoractivityWorkflowOutputEventoutputs from executorsKey design decision: Reuse MAF's edge group routing logic rather than reimplementing it:
4. Utility Functions (
_utils.py)CapturingRunnerContext: ARunnerContextimplementation that captures messages and events during activity executionserialize_message()/deserialize_value(): JSON-safe serialization with type metadata for dataclasses/Pydantic modelsreconstruct_message_for_handler(): Reconstructs typed messages based on executor input types5. Samples
Two comprehensive samples demonstrating different patterns:
09_workflow_shared_stateDurableSharedStatefor cross-executor state sharing. Executors accumulate context via shared state.10_workflow_no_shared_stateMessagepassing between executors. Demonstrates routing and executor composition without shared state.Design Decisions
1. Generator-based orchestration pattern
Azure Durable Functions Python SDK uses generators (
yield) for async operations. All shared state operations return generators:2. Activity-based executor execution
Standard (non-agent) executors run in activities, not the orchestrator, because:
3. SharedState delta tracking
Activities receive a snapshot of shared state, and return only the changes (updates + deletes):
4. Type reconstruction for message passing
Messages are serialized with type metadata (
__type__,__module__) to enable reconstruction:AgentExecutorRequest,AgentExecutorResponse, and custom dataclasses/Pydantic modelsCurrent Gaps and Limitations
1. Human-in-the-Loop (HITL) Not Supported
The
CapturingRunnerContextdoes not supportsend_request_info_response():HITL requires orchestrator-level coordination (e.g., waiting for external events). Future work could:
RequestInfoEventin activity resultscontext.wait_for_external_event()2. WorkflowExecutor Not Supported
Nested
WorkflowExecutorinstances (workflows within workflows) are not currently supported. The code handles only:AgentExecutor(via Durable Entities)3. Checkpointing Not Supported
The
CapturingRunnerContext.has_checkpointing()returnsFalse. Durable Functions provides its own checkpoint mechanism via orchestration replay.4. Streaming Not Supported
Streaming responses are not implemented. The
is_streaming()always returnsFalse.5. No parallel execution within activities
Activities execute sequentially within the orchestrator loop. True parallelism would require using
context.task_all()with multiple activity calls.Contribution Checklist
workflowparameter added toAgentFunctionApp.