Skip to content

.NET: Python: Durable Workflow Support#2969

Closed
ahmedmuhsin wants to merge 19 commits intomicrosoft:feature-durabletask-pythonfrom
ahmedmuhsin:durable-workflows
Closed

.NET: Python: Durable Workflow Support#2969
ahmedmuhsin wants to merge 19 commits intomicrosoft:feature-durabletask-pythonfrom
ahmedmuhsin:durable-workflows

Conversation

@ahmedmuhsin
Copy link
Contributor

@ahmedmuhsin ahmedmuhsin commented Dec 19, 2025

Motivation and Context

This PR adds Workflow support to the agent-framework-azurefunctions Python package, enabling developers to run MAF (Microsoft Agent Framework) Workflows on Azure Durable Functions.

Why is this change required?

The existing agent-framework-azurefunctions package provides durable AI agent support via Durable Entities, but lacks the ability to orchestrate complex multi-agent workflows. Developers building sophisticated AI applications need the ability to:

  • Define workflow graphs with multiple executors
  • Route messages between executors using edge groups (SingleEdge, SwitchCase, FanOut, FanIn)
  • Share state across executors during workflow execution
  • Leverage Azure Durable Functions' reliability and replay-safety guarantees

What problem does it solve?

Prior to this change, developers had to manually implement workflow orchestration patterns, manage state across agent invocations, and handle message routing logic. This PR integrates MAF's Workflow abstraction with Azure Durable Functions, providing:

  • Automatic workflow orchestration via Durable Functions
  • Durable shared state via Durable Entities
  • Edge group-based message routing
  • HTTP endpoints for workflow execution and status monitoring

Scenario Contribution:

This enables scenarios such as:

  • Multi-agent research workflows (e.g., researcher → writer → editor pipeline)
  • Parallel processing with fan-out/fan-in patterns
  • Conditional routing based on agent responses
  • Stateful workflows where executors share accumulated context

Description

Overview

This PR introduces a complete workflow execution engine for Azure Durable Functions that reuses MAF's workflow abstractions while adapting execution to the Durable Functions generator-based model (yield instead of await).

Key Components

1. AgentFunctionApp Workflow Integration (_app.py)

The AgentFunctionApp now accepts an optional workflow parameter:

app = AgentFunctionApp(
    workflow=my_workflow,
    enable_shared_state=True,  # Optional, defaults to True
)

When a workflow is provided:

  • Agents are automatically extracted from AgentExecutor instances in the workflow
  • The ExecuteExecutor activity is registered for standard (non-agent) executors
  • The workflow orchestrator function is registered
  • HTTP endpoints (/api/workflow/run, /api/workflow/status/{instanceId}) are created
  • The SharedStateEntity is registered (unless enable_shared_state=False)

2. DurableSharedState (_shared_state.py)

A generator-based SharedState wrapper for orchestration-side access to durable shared state:

shared_state = DurableSharedState(context, instance_id)
value = yield from shared_state.get("key")
yield from shared_state.set("key", value)

Key design decisions:

  • Uses Azure Durable Entities for durability and replay-safety
  • Entity scoped by workflow session (instance_id)
  • Compatible API with MAF's in-memory SharedState
  • Supports: get, set, has, delete, get_all, update, clear

3. Workflow Orchestrator (_workflow.py)

The run_workflow_orchestrator() function is the main orchestration engine:

  • Traverses the workflow graph starting from start_executor_id
  • Routes messages using MAF's edge group APIs (SingleEdgeGroup, SwitchCaseEdgeGroup, FanOutEdgeGroup, FanInEdgeGroup)
  • Executes AgentExecutor instances via Durable Entities (for durability)
  • Executes standard executors via the ExecuteExecutor activity
  • Coordinates shared state updates between orchestrator and activities
  • Collects WorkflowOutputEvent outputs from executors

Key design decision: Reuse MAF's edge group routing logic rather than reimplementing it:

targets = route_message_through_edge_groups(workflow.edge_groups, source_id, message)

4. Utility Functions (_utils.py)

  • CapturingRunnerContext: A RunnerContext implementation that captures messages and events during activity execution
  • serialize_message() / deserialize_value(): JSON-safe serialization with type metadata for dataclasses/Pydantic models
  • reconstruct_message_for_handler(): Reconstructs typed messages based on executor input types

5. Samples

Two comprehensive samples demonstrating different patterns:

Sample Pattern Description
09_workflow_shared_state Stateful Workflow Uses DurableSharedState for cross-executor state sharing. Executors accumulate context via shared state.
10_workflow_no_shared_state Stateless Workflow Uses Message passing between executors. Demonstrates routing and executor composition without shared state.

Design Decisions

1. Generator-based orchestration pattern

Azure Durable Functions Python SDK uses generators (yield) for async operations. All shared state operations return generators:

# In orchestrator
value = yield from shared_state.get("key")

2. Activity-based executor execution

Standard (non-agent) executors run in activities, not the orchestrator, because:

  • Activities can run user code with I/O operations
  • Orchestrators must be deterministic (replay-safe)
  • Enables proper error isolation

3. SharedState delta tracking

Activities receive a snapshot of shared state, and return only the changes (updates + deletes):

result = {
    "shared_state_updates": {"key": new_value},
    "shared_state_deletes": ["removed_key"],
}

4. Type reconstruction for message passing

Messages are serialized with type metadata (__type__, __module__) to enable reconstruction:

  • Critical for condition functions in edge groups that check message types
  • Handles AgentExecutorRequest, AgentExecutorResponse, and custom dataclasses/Pydantic models

Current Gaps and Limitations

1. Human-in-the-Loop (HITL) Not Supported

The CapturingRunnerContext does not support send_request_info_response():

async def send_request_info_response(self, request_id: str, response: Any) -> None:
    raise NotImplementedError(
        "send_request_info_response is not supported in Azure Functions activity context. "
        "Human-in-the-loop scenarios should be handled at the orchestrator level."
    )

HITL requires orchestrator-level coordination (e.g., waiting for external events). Future work could:

  • Detect RequestInfoEvent in activity results
  • Pause orchestration using context.wait_for_external_event()
  • Resume when external response is provided

2. WorkflowExecutor Not Supported

Nested WorkflowExecutor instances (workflows within workflows) are not currently supported. The code handles only:

  • AgentExecutor (via Durable Entities)
  • Standard executors (via activities)

3. Checkpointing Not Supported

The CapturingRunnerContext.has_checkpointing() returns False. Durable Functions provides its own checkpoint mechanism via orchestration replay.

4. Streaming Not Supported

Streaming responses are not implemented. The is_streaming() always returns False.

5. No parallel execution within activities

Activities execute sequentially within the orchestrator loop. True parallelism would require using context.task_all() with multiple activity calls.


Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? No breaking changes. New optional workflow parameter added to AgentFunctionApp.

Copilot AI review requested due to automatic review settings December 19, 2025 02:16
@markwallace-microsoft markwallace-microsoft added documentation Improvements or additions to documentation python labels Dec 19, 2025
@github-actions github-actions bot changed the title Durable Workflow Support Python: Durable Workflow Support Dec 19, 2025
@markwallace-microsoft
Copy link
Member

markwallace-microsoft commented Dec 19, 2025

Python Test Coverage

Python Test Coverage Report •
FileStmtsMissCoverMissing
packages/azurefunctions/agent_framework_azurefunctions
   _app.py47116165%216–217, 222–223, 253–254, 260, 262, 264–268, 270–271, 273–275, 278, 280, 282–283, 286–288, 291, 299–301, 304, 307, 312–313, 316–319, 322–325, 331, 333, 340–341, 345–348, 353–354, 357, 359–360, 362, 365–366, 368, 371–373, 375–376, 378–380, 384–387, 393, 395, 397, 407–409, 413–414, 416–417, 423, 432, 442, 541–542, 650, 658–659, 679–681, 687–689, 695–697, 730–731, 791–792, 929, 932, 941–943, 945–947, 949, 951, 962, 964–967, 969, 971–972, 974, 981–982, 984–985, 987–988, 990, 994, 1004–1006, 1008–1009, 1011–1013, 1020, 1022–1023, 1025, 1046, 1051, 1063, 1138, 1148, 1155–1157, 1202, 1216, 1227–1229, 1231–1234, 1259, 1266, 1268, 1271
   _shared_state.py83495%114, 135, 147, 156
   _utils.py1913581%113, 117, 165–166, 174, 181, 254–255, 260–261, 280, 283, 291, 294, 318, 354, 361–366, 368–371, 373, 376, 385–386, 392–393, 399, 413–414
   _workflow.py25317431%91–93, 182, 185–187, 191, 194–196, 198–200, 205–206, 208–212, 214, 218, 222, 224–228, 234–235, 237–239, 241–242, 244–245, 247–249, 251–253, 255–256, 259–262, 265, 267–269, 272–278, 280, 287, 290, 293–295, 297–299, 301–304, 306–312, 314, 321, 324–325, 328–331, 334–335, 337–338, 340, 348–351, 354–356, 361–364, 367–376, 379–380, 383, 386–387, 389–390, 393–399, 401–402, 405–410, 413–418, 421, 427–431, 434–436, 438, 440–445, 447–448, 452–453, 456–457, 460, 462–463, 465, 474–475, 504–505, 520
packages/core/agent_framework/_workflows
   _agent_executor.py1652286%26, 93, 111, 117, 150, 166–167, 218–219, 221–222, 253–255, 265–267, 269, 273, 277, 281–282
TOTAL17393301582% 

Python Unit Test Overview

Tests Skipped Failures Errors Time
2785 154 💤 0 ❌ 0 🔥 1m 0s ⏱️

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds workflow orchestration support to the agent-framework-azurefunctions package, enabling execution of MAF (Microsoft Agent Framework) Workflow and WorkflowBuilder graphs inside Azure Durable Functions orchestrations. The implementation provides durable state persistence, cloud-native scalability, and support for complex routing patterns while bridging MAF's declarative workflow model with Durable Functions' orchestration capabilities.

Key Changes

  • New workflow execution engine that adapts MAF workflows to Durable Functions' generator-based model
  • Durable shared state implementation backed by Azure Durable Entities
  • Serialization utilities for workflow messages and state management
  • Two sample implementations demonstrating workflow usage with and without shared state

Reviewed changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 20 comments.

Show a summary per file
File Description
python/packages/azurefunctions/agent_framework_azurefunctions/_workflow.py Core workflow orchestration engine with routing logic and executor execution
python/packages/azurefunctions/agent_framework_azurefunctions/_shared_state.py DurableSharedState entity for cross-executor state sharing
python/packages/azurefunctions/agent_framework_azurefunctions/_utils.py Serialization/deserialization utilities and capturing context implementation
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py Integration of workflow support into AgentFunctionApp with HTTP endpoints
python/packages/azurefunctions/agent_framework_azurefunctions/_orchestration.py Added created_at timestamp to RunRequest
python/packages/azurefunctions/agent_framework_azurefunctions/__init__.py Exported DurableSharedState class
python/samples/getting_started/azure_functions/09_workflow_shared_state/ Sample demonstrating workflow with shared state (research/writer pattern)
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/ Sample demonstrating workflow without shared state (spam detection pattern)
Comments suppressed due to low confidence (1)

python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:931

  • This import of module asyncio is redundant, as it was previously imported on line 9.
        import asyncio

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 22 out of 22 changed files in this pull request and generated 13 comments.

Comments suppressed due to low confidence (1)

python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:960

  • This import of module asyncio is redundant, as it was previously imported on line 9.
        import asyncio

Copilot AI review requested due to automatic review settings January 14, 2026 22:25
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 26 out of 26 changed files in this pull request and generated 16 comments.

Comments suppressed due to low confidence (1)

python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:960

  • This import of module asyncio is redundant, as it was previously imported on line 9.
        import asyncio

Comment on lines 224 to 225


Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trailing whitespace detected on this line. Please remove the trailing spaces to maintain code cleanliness.

Suggested change

Copilot uses AI. Check for mistakes.
Comment on lines 314 to 334
# Check for FanInEdgeGroup sources first
for group in workflow.edge_groups:
if isinstance(group, FanInEdgeGroup) and executor_id in group.source_executor_ids:
# Accumulate message for fan-in
if executor_id not in fan_in_pending[group.id]:
fan_in_pending[group.id][executor_id] = []
fan_in_pending[group.id][executor_id].append((msg_to_route, executor_id))
logger.debug("Accumulated message for FanIn group %s from %s", group.id, executor_id)

# Use MAF's edge group routing for other edge types
targets = route_message_through_edge_groups(
workflow.edge_groups,
executor_id,
msg_to_route,
)

for target_id in targets:
logger.debug("Routing to %s", target_id)
if target_id not in next_pending_messages:
next_pending_messages[target_id] = []
next_pending_messages[target_id].append((msg_to_route, executor_id))
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential logic issue with FanIn routing: When a message is routed through FanInEdgeGroup (lines 316-321), the message is accumulated but the loop continues without breaking or skipping the subsequent route_message_through_edge_groups call (lines 324-334). This means the same message could be routed both to the FanIn accumulator AND to other targets via normal edge routing, which may not be the intended behavior. Consider adding a break or continue statement after accumulating to FanIn, or using a flag to skip normal routing for messages that are part of a FanIn group.

Copilot uses AI. Check for mistakes.
Comment on lines 256 to 298
except Exception:
pass

if type_name == "AgentExecutorResponse" or ("executor_id" in data and "agent_run_response" in data):
try:
return reconstruct_agent_executor_response(data)
except Exception:
pass

if not type_name:
return data

# Try to find the type
target_type = None

# First check the registry
if type_registry and type_name in type_registry:
target_type = type_registry[type_name]
else:
# Try to import from module
if module_name:
try:
import importlib

module = importlib.import_module(module_name)
target_type = getattr(module, type_name, None)
except Exception:
# Ignore import errors - type may not be available in this context
# Will fall back to returning the raw dict below
pass

if target_type:
# Remove metadata before reconstruction
clean_data = {k: v for k, v in data.items() if not k.startswith("__")}
try:
if is_dataclass(target_type):
return target_type(**clean_data)
elif issubclass(target_type, BaseModel):
return target_type(**clean_data)
except Exception:
# Ignore reconstruction errors (e.g., missing fields, type mismatches)
# Will fall back to returning the raw dict below
pass
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bare except clauses silently swallow all exceptions including KeyboardInterrupt and SystemExit. While the comments indicate this is intentional for fallback behavior, consider catching more specific exceptions (e.g., except (ValueError, TypeError, ImportError, AttributeError)) to avoid masking programming errors or system signals.

Copilot uses AI. Check for mistakes.
def test_set_workflow_id(self, context: CapturingRunnerContext) -> None:
"""Test setting workflow ID."""
context.set_workflow_id("workflow-123")
assert context._workflow_id == "workflow-123"
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test accesses a private attribute _workflow_id directly. This violates encapsulation and makes tests brittle to internal implementation changes. Consider either making workflow_id a public property or testing the behavior through the public API instead.

Suggested change
assert context._workflow_id == "workflow-123"

Copilot uses AI. Check for mistakes.
gen = shared_state.get("my_key", default="default_val")

# The generator should yield the entity call
yielded = next(gen)
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable yielded is not used.

Copilot uses AI. Check for mistakes.

"""Unit tests for DurableSharedState and SharedState entity."""

from typing import Any
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'Any' is not used.

Suggested change
from typing import Any

Copilot uses AI. Check for mistakes.

"""Unit tests for workflow utility functions."""

import asyncio
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'asyncio' is not used.

Suggested change
import asyncio

Copilot uses AI. Check for mistakes.

import asyncio
from dataclasses import dataclass
from typing import Any
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'Any' is not used.

Suggested change
from typing import Any

Copilot uses AI. Check for mistakes.
import json
from dataclasses import dataclass
from typing import Any
from unittest.mock import Mock, patch
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'Mock' is not used.
Import of 'patch' is not used.

Suggested change
from unittest.mock import Mock, patch

Copilot uses AI. Check for mistakes.
from typing import Any
from unittest.mock import Mock, patch

import pytest
Copy link

Copilot AI Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'pytest' is not used.

Suggested change
import pytest

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings January 15, 2026 20:04
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 25 out of 26 changed files in this pull request and generated 2 comments.

Comments suppressed due to low confidence (2)

python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:113

  • The docstring example imports from agent_framework.azure, but the actual import should be agent_framework_azurefunctions. Update the docstring example to use the correct import path: from agent_framework_azurefunctions import AgentFunctionApp and from agent_framework.azure import AzureOpenAIChatClient.
        from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient

python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:962

  • This import of module asyncio is redundant, as it was previously imported on line 9.
        import asyncio

Comment on lines 224 to 225


Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove trailing whitespace on line 224.

Suggested change

Copilot uses AI. Check for mistakes.
Comment on lines +380 to +387
message_content = _extract_message_content_from_dict(message)
elif isinstance(message, str):
message_content = message

return message_content


def _extract_message_content_from_dict(message: dict[str, Any]) -> str:
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name _extract_message_content_from_dict is verbose. Consider renaming to _extract_dict_content for brevity while maintaining clarity.

Suggested change
message_content = _extract_message_content_from_dict(message)
elif isinstance(message, str):
message_content = message
return message_content
def _extract_message_content_from_dict(message: dict[str, Any]) -> str:
message_content = _extract_dict_content(message)
elif isinstance(message, str):
message_content = message
return message_content
def _extract_dict_content(message: dict[str, Any]) -> str:

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings January 20, 2026 17:25
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 33 out of 34 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:960

  • This import of module asyncio is redundant, as it was previously imported on line 9.
        import asyncio


import logging
import os
from typing import Any, Dict
Copy link

Copilot AI Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'Dict' is not used.

Suggested change
from typing import Any, Dict
from typing import Any

Copilot uses AI. Check for mistakes.
Copilot AI review requested due to automatic review settings January 22, 2026 16:31
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 30 out of 31 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (1)

python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:953

  • This import of module asyncio is redundant, as it was previously imported on line 9.
        import asyncio

@ahmedmuhsin ahmedmuhsin changed the base branch from feature-durabletask-python to main January 30, 2026 23:20
@markwallace-microsoft markwallace-microsoft added .NET workflows Related to Workflows in agent-framework lab Agent Framework Lab labels Jan 30, 2026
@github-actions github-actions bot changed the title Python: Durable Workflow Support .NET: Python: Durable Workflow Support Jan 30, 2026
@ahmedmuhsin ahmedmuhsin changed the base branch from main to feature-durabletask-python January 30, 2026 23:40
@eavanvalkenburg
Copy link
Member

This PR is stale, and there have been lots of changes to the azurefunction and durabletask packages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation lab Agent Framework Lab .NET python workflows Related to Workflows in agent-framework

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants