Skip to content

Python: Durable Support for Workflows#3630

Open
ahmedmuhsin wants to merge 11 commits intomainfrom
durable-workflows-v2
Open

Python: Durable Support for Workflows#3630
ahmedmuhsin wants to merge 11 commits intomainfrom
durable-workflows-v2

Conversation

@ahmedmuhsin
Copy link

@ahmedmuhsin ahmedmuhsin commented Feb 2, 2026

Motivation and Context

Adds workflow orchestration support to Azure Functions, enabling multi-agent workflows with conditional routing, parallel execution, and human-in-the-loop (HITL) patterns using Azure Durable Functions.

Description

Core Package Changes (agent_framework_azurefunctions):

  • _workflow.py - Orchestration engine that executes MAF Workflows using Durable Functions' generator-based model:

    • Sequential and parallel agent/executor execution
    • Fan-out/fan-in patterns
    • Conditional routing via edge groups (switch/case, single edges)
    • HITL via wait_for_external_event with configurable timeouts
  • _context.py - CapturingRunnerContext for activity execution (captures messages/events without durable storage)

  • _serialization.py - Serialization utilities for cross-activity message passing (dataclasses, Pydantic models, ChatMessage)

  • _app.py - Extended AgentFunctionApp with:

    • New workflow parameter accepting a Workflow instance
    • Auto-registers agents from workflow executors
    • HTTP endpoints: POST /workflow/run, GET /workflow/status/{id}, POST /workflow/respond/{id}/{requestId}

New Samples (09-12):

Sample Pattern
09_workflow_shared_state Shared state + conditional routing (spam detection → email response)
10_workflow_no_shared_state Stateless workflow with switch/case routing
11_workflow_parallel Fan-out/fan-in for parallel agent execution
12_workflow_hitl Human-in-the-loop with approval/rejection flow

Tests:

  • Unit tests for _workflow.py, _serialization.py, _context.py, and _app.py workflow features (152 total)
  • Integration tests for all 4 new samples (14 tests)

API Fix:

  • Updated samples to use chat_client.as_agent() with default_options={"response_format": ...} instead of non-existent create_agent()

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? No

@markwallace-microsoft markwallace-microsoft added documentation Improvements or additions to documentation python labels Feb 3, 2026
@github-actions github-actions bot changed the title Durable Support for Workflows in Python Python: Durable Support for Workflows in Python Feb 3, 2026
@markwallace-microsoft
Copy link
Member

markwallace-microsoft commented Feb 3, 2026

Python Test Coverage

Python Test Coverage Report •
FileStmtsMissCoverMissing
packages/azurefunctions/agent_framework_azurefunctions
   _app.py49418762%215–216, 221–222, 257–258, 261, 263–265, 271, 273, 275–278, 280–281, 283–285, 288, 291, 293, 295–296, 299–301, 303, 305, 313, 321–323, 326, 329, 334–335, 338–341, 344, 347–349, 360–363, 369, 371, 379–380, 383, 388–389, 391–392, 394, 397, 400, 402, 404, 406–408, 412–415, 421, 423–424, 426, 437–439, 443–444, 446–447, 453, 464–469, 477, 479, 485–487, 493–494, 496–497, 503–506, 514, 520, 539, 638–639, 747, 755–756, 776–778, 784–786, 792–794, 827–828, 888–889, 938–939, 944, 1026, 1029, 1038–1040, 1042–1044, 1046, 1048, 1059, 1061–1064, 1066, 1068–1069, 1071, 1078–1079, 1081–1082, 1084–1085, 1087, 1091, 1101–1103, 1105–1106, 1108–1110, 1117, 1119–1120, 1122, 1143, 1148, 1160, 1235, 1245, 1252–1254, 1299, 1313, 1324–1326, 1328–1331, 1356, 1363, 1365, 1368
   _context.py65690%103, 107, 155–156, 164, 171
   _serialization.py1866763%106–107, 112–113, 132–133, 143, 145–147, 166, 174, 176–177, 181–182, 189, 191–193, 214, 220, 222, 225, 227–230, 233–237, 240–241, 243–246, 249–255, 257, 314, 321–326, 328–331, 333, 336, 345–346, 352–353, 359, 375–376
   _workflow.py34822335%133, 138, 140, 147, 196–198, 268–270, 272–274, 296, 302, 304–305, 328–329, 331–335, 337, 344, 369, 372–381, 384–385, 387, 414–415, 418–419, 422–428, 431–432, 435–440, 443–446, 449, 451–455, 470–472, 474, 477–478, 481–486, 488–489, 491–492, 494–495, 498, 516–520, 527, 551, 559–561, 564–565, 567, 615, 618–619, 622, 627, 629–631, 634, 639–643, 646–649, 651–652, 655–659, 661–662, 665–666, 669–670, 673, 675, 678–679, 682, 698–699, 702–703, 705, 707, 709, 712–713, 721–723, 725–728, 731, 734, 741–742, 747, 749, 752, 779–781, 784, 787–789, 791–793, 796–799, 809–811, 813–816, 826, 828, 842–843, 872–873, 888, 918, 921–923, 926, 929, 932, 934–935, 941, 945, 954, 958, 971, 977–978, 981–983, 986–989, 991–992, 994–995, 997–1000, 1002–1003, 1006–1007
packages/core/agent_framework/_workflows
   _agent_executor.py1742486%95, 117, 123, 156, 172–173, 224–225, 227–228, 260–262, 270–272, 282–284, 286, 290, 294, 298–299
TOTAL16990231786% 

Python Unit Test Overview

Tests Skipped Failures Errors Time
4015 235 💤 0 ❌ 0 🔥 1m 7s ⏱️

@ahmedmuhsin ahmedmuhsin changed the title Python: Durable Support for Workflows in Python Python: Durable Support for Workflows Feb 3, 2026
@ahmedmuhsin ahmedmuhsin marked this pull request as ready for review February 3, 2026 16:55
@ahmedmuhsin ahmedmuhsin requested a review from a team as a code owner February 3, 2026 16:55
Copilot AI review requested due to automatic review settings February 3, 2026 16:55
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds comprehensive workflow orchestration support to Azure Functions, enabling multi-agent workflows with conditional routing, parallel execution, and human-in-the-loop (HITL) patterns using Azure Durable Functions.

Changes:

  • New workflow orchestration engine in _workflow.py that executes MAF Workflows using Durable Functions' generator-based model
  • Serialization utilities in _serialization.py for cross-activity message passing
  • Capturing runner context in _context.py for activity execution
  • Extended AgentFunctionApp with workflow parameter and auto-registration of agents
  • Four new samples (09-12) demonstrating shared state, stateless, parallel, and HITL workflow patterns
  • Comprehensive unit and integration tests (152 unit tests, 14 integration tests)
  • Bug fix: Updated chat_client.as_agent() usage with default_options parameter

Reviewed changes

Copilot reviewed 47 out of 47 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
_workflow.py Main orchestration engine with HITL support, parallel execution, and edge group routing
_serialization.py Serialization/deserialization for dataclasses, Pydantic models, and MAF types
_context.py CapturingRunnerContext for activity execution without durable storage
_app.py Extended AgentFunctionApp with workflow support and HTTP endpoints
_agent_executor.py Added agent property to expose underlying agent
Samples 09-12 Four new workflow samples with requirements, configs, and README files
Test files Unit tests for workflow utilities and integration tests for all samples
pyproject.toml Increased test timeout from 120s to 300s for workflow tests
Sample 07 Fixed type annotations for Azure Functions worker compatibility
Comments suppressed due to low confidence (1)

python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:1056

  • This import of module asyncio is redundant, as it was previously imported on line 9.
        import asyncio

Comment on lines +722 to +723
import json

Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The json module is imported inside a try-except block on line 722. However, json is already imported at the module level (line 22). This redundant import should be removed, and the module-level import should be used instead.

Suggested change
import json

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this comment, unresolving it here since the comment is still relevant

ahmedmuhsin and others added 9 commits February 4, 2026 13:29
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
logger.debug("[AgentFunctionApp] Extracting agents from workflow")
for executor in workflow.executors.values():
if isinstance(executor, AgentExecutor):
agents.append(executor.agent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the same agent is registered in both "agents" and "workflow" will that agent be re-registered?

Note: We use str type annotations instead of dict to work around
Azure Functions worker type validation issues with dict[str, Any].
"""
import json as json_module
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Move all imports to top of the file

data = json_module.loads(inputData)
message_data = data["message"]
shared_state_snapshot = data.get("shared_state_snapshot", {})
source_executor_ids = data.get("source_executor_ids", ["__orchestrator__"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why defaulting to __orchestrator__? Is that always going to be there?

runner_context = CapturingRunnerContext()
shared_state = SharedState()

# Deserialize shared state values to reconstruct dataclasses/Pydantic models
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have some logic for this in

def _deserialize_response_format(response_format: Any) -> type[BaseModel] | None:
, can we reuse same classes (feel free to expand on a single class and use it everywhere)

try:
req_body = req.get_json()
except ValueError:
return func.HttpResponse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a generic function to return a 400 back (I see it being used in multiple places here)

It captures all messages and events produced during execution without requiring durable
entity storage, allowing the results to be returned to the orchestrator.
Unlike the full InProcRunnerContext, this implementation:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Is this remnants of a previous implementation? If so, would be good to cleanup


async def drain_messages(self) -> dict[str, list[Message]]:
"""Drain and return all captured messages."""
from copy import copy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Move to top (for all files)

@@ -0,0 +1,378 @@
# Copyright (c) Microsoft. All rights reserved.

"""Serialization and deserialization utilities for workflow execution.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you already looked, but agent-framework already has a very comprehensive class for thsi - https://github.com/microsoft/agent-framework/blob/main/python/packages/core/agent_framework/_serialization.py

It would be ideal if we can reuse code from here and not introduce new overrides for similar experiences

# ============================================================================


def reconstruct_agent_executor_request(data: dict[str, Any]) -> AgentExecutorRequest:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have some similar logic (to deserialize-from and serialize-to MAF constructs) in this file -

return DurableAgentStateResponse.to_run_response(entry)

Given that this is doing something similar, it would be a good idea to combine all of them in a single place

)
else:
# Timeout occurred
logger.warning("HITL request %s timed out after %s hours", request_id, hitl_timeout_hours)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the approval_task need to be canceled in this case?

# ============================================================================


def _evaluate_edge_condition_sync(edge: Any, message: Any) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have stricter type-checks so that we dont need to add code like getattr(edge, "_condition", None) everywhere

if structured_response:
final_text = json.dumps(structured_response)

assistant_message = ChatMessage(role="assistant", text=final_text)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:

Suggested change
assistant_message = ChatMessage(role="assistant", text=final_text)
assistant_message = ChatMessage(role=Role.ASSISTANT, text=final_text)

List of workflow outputs collected from executor activities
"""
pending_messages: dict[str, list[tuple[Any, str]]] = {
workflow.start_executor_id: [(initial_message, "__workflow_start__")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend making all these markers ("workflow_start", "hitl_response", etc) as constants and using them everywhere to avoid chances of confusion or typos

Comment on lines +722 to +723
import json

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this comment, unresolving it here since the comment is still relevant

"""Extract text content from serialized message dictionaries."""
message_content = ""

if message.get("messages"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems very fragile and easy to break. Can we use existing to_dict and from_dict methods provided by MAF here instead? If not, consider having a similar appraoch to https://github.com/microsoft/agent-framework/blob/907654a489a4f41bd4b44618ce5daa994383cb3c/python/packages/durabletask/agent_framework_durabletask/_durable_agent_state.py to use inheritance to make this serialization/deserialization more cleaner and less specific to how it is today.

if response_type_str:
try:
module_name, class_name = response_type_str.rsplit(":", 1)
import importlib
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: move to top

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants