Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b3c8c87
Add workflow support for Azure Functions
ahmedmuhsin Jan 31, 2026
fa1cf84
fix compatability with latest framework changes and add integration t…
ahmedmuhsin Feb 2, 2026
8e6c0d2
refactor code
ahmedmuhsin Feb 3, 2026
d4337da
remove white space
ahmedmuhsin Feb 4, 2026
9bbb9d6
align help text with actual port used
ahmedmuhsin Feb 4, 2026
7357cbf
replace instance id with a place holder
ahmedmuhsin Feb 4, 2026
e6a5035
remove unused import
ahmedmuhsin Feb 4, 2026
d9702de
remove redundant typing import and fix SIM115
ahmedmuhsin Feb 4, 2026
d5d1af8
fix latest breaking changes
ahmedmuhsin Feb 4, 2026
27bef11
fix mypy issues
ahmedmuhsin Feb 4, 2026
6a0adb6
clean up imports
ahmedmuhsin Feb 6, 2026
a5b2a83
define source marker strings as constants
ahmedmuhsin Feb 6, 2026
38f6ff6
fix json module name
ahmedmuhsin Feb 6, 2026
8b66a8b
refactor _extract_message_content_from_dict
ahmedmuhsin Feb 6, 2026
1c2f50b
refactor serialization
ahmedmuhsin Feb 6, 2026
7d95547
add helper method for error response construction and remove _extract…
ahmedmuhsin Feb 6, 2026
e861b2b
use strict tpe checking for edges
ahmedmuhsin Feb 6, 2026
b75fa5a
change how duplicate agent registrations are handled
ahmedmuhsin Feb 7, 2026
f824d56
cancel approval_task on HITL timeout
ahmedmuhsin Feb 7, 2026
537e869
update docstring
ahmedmuhsin Feb 7, 2026
b5d1e26
fix: align azurefunctions package with core API changes after rebase
ahmedmuhsin Feb 9, 2026
02315c1
fix sample check errors
ahmedmuhsin Feb 9, 2026
440ffa8
fix mypy issues
ahmedmuhsin Feb 9, 2026
0f6b007
fix trailing white spaces
ahmedmuhsin Feb 10, 2026
a44af9c
fix test imports
ahmedmuhsin Feb 10, 2026
6894bfb
feat: add durable workflow samples and adapt to main branch changes
ahmedmuhsin Feb 12, 2026
ad23d32
fix: update create_checkpoint signature to match superclass
ahmedmuhsin Feb 12, 2026
3d0e91f
fix: correct relative link in HITL sample README
ahmedmuhsin Feb 12, 2026
ea704ce
fix: resolve import breakage after rebase (State, DurableAgentThread,…
ahmedmuhsin Feb 17, 2026
e8117c2
Merge branch 'main' into durable-workflows-v2
dmytrostruk Feb 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Copyright (c) Microsoft. All rights reserved.

"""Runner context for Azure Functions activity execution.

This module provides the CapturingRunnerContext class that captures messages
and events produced during executor execution within Azure Functions activities.
"""

from __future__ import annotations

import asyncio
from copy import copy
from typing import Any

from agent_framework import (
CheckpointStorage,
RunnerContext,
WorkflowCheckpoint,
WorkflowEvent,
WorkflowMessage,
)
from agent_framework._workflows import State


class CapturingRunnerContext(RunnerContext):
"""A RunnerContext implementation that captures messages and events for Azure Functions activities.

This context is designed for executing standard Executors within Azure Functions activities.
It captures all messages and events produced during execution without requiring durable
entity storage, allowing the results to be returned to the orchestrator.

Unlike InProcRunnerContext, this implementation does NOT support checkpointing
(always returns False for has_checkpointing). The orchestrator manages state
coordination; this context just captures execution output.
"""

def __init__(self) -> None:
"""Initialize the capturing runner context."""
self._messages: dict[str, list[WorkflowMessage]] = {}
self._event_queue: asyncio.Queue[WorkflowEvent] = asyncio.Queue()
self._pending_request_info_events: dict[str, WorkflowEvent[Any]] = {}
self._workflow_id: str | None = None
self._streaming: bool = False

# region Messaging

async def send_message(self, message: WorkflowMessage) -> None:
"""Capture a message sent by an executor."""
self._messages.setdefault(message.source_id, [])
self._messages[message.source_id].append(message)

async def drain_messages(self) -> dict[str, list[WorkflowMessage]]:
"""Drain and return all captured messages."""
messages = copy(self._messages)
self._messages.clear()
return messages

async def has_messages(self) -> bool:
"""Check if there are any captured messages."""
return bool(self._messages)

# endregion Messaging

# region Events

async def add_event(self, event: WorkflowEvent) -> None:
"""Capture an event produced during execution."""
await self._event_queue.put(event)

async def drain_events(self) -> list[WorkflowEvent]:
"""Drain all currently queued events without blocking."""
events: list[WorkflowEvent] = []
while True:
try:
events.append(self._event_queue.get_nowait())
except asyncio.QueueEmpty:
break
return events

async def has_events(self) -> bool:
"""Check if there are any queued events."""
return not self._event_queue.empty()

async def next_event(self) -> WorkflowEvent:
"""Wait for and return the next event."""
return await self._event_queue.get()

# endregion Events

# region Checkpointing (not supported in activity context)

def has_checkpointing(self) -> bool:
"""Checkpointing is not supported in activity context."""
return False

def set_runtime_checkpoint_storage(self, storage: CheckpointStorage) -> None:
"""No-op: checkpointing not supported in activity context."""
pass

def clear_runtime_checkpoint_storage(self) -> None:
"""No-op: checkpointing not supported in activity context."""
pass

async def create_checkpoint(
self,
workflow_name: str,
graph_signature_hash: str,
state: State,
previous_checkpoint_id: str | None,
iteration_count: int,
metadata: dict[str, Any] | None = None,
) -> str:
"""Checkpointing not supported in activity context."""
raise NotImplementedError("Checkpointing is not supported in Azure Functions activity context")

async def load_checkpoint(self, checkpoint_id: str) -> WorkflowCheckpoint | None:
"""Checkpointing not supported in activity context."""
raise NotImplementedError("Checkpointing is not supported in Azure Functions activity context")

async def apply_checkpoint(self, checkpoint: WorkflowCheckpoint) -> None:
"""Checkpointing not supported in activity context."""
raise NotImplementedError("Checkpointing is not supported in Azure Functions activity context")

# endregion Checkpointing

# region Workflow Configuration

def set_workflow_id(self, workflow_id: str) -> None:
"""Set the workflow ID."""
self._workflow_id = workflow_id

def reset_for_new_run(self) -> None:
"""Reset the context for a new run."""
self._messages.clear()
self._event_queue = asyncio.Queue()
self._pending_request_info_events.clear()
self._streaming = False

def set_streaming(self, streaming: bool) -> None:
"""Set streaming mode (not used in activity context)."""
self._streaming = streaming

def is_streaming(self) -> bool:
"""Check if streaming mode is enabled (always False in activity context)."""
return self._streaming

# endregion Workflow Configuration

# region Request Info Events

async def add_request_info_event(self, event: WorkflowEvent[Any]) -> None:
"""Add a request_info WorkflowEvent and track it for correlation."""
self._pending_request_info_events[event.request_id] = event
await self.add_event(event)

async def send_request_info_response(self, request_id: str, response: Any) -> None:
"""Send a response correlated to a pending request.

Note: This is not supported in activity context since human-in-the-loop
scenarios require orchestrator-level coordination.
"""
raise NotImplementedError(
"send_request_info_response is not supported in Azure Functions activity context. "
"Human-in-the-loop scenarios should be handled at the orchestrator level."
)

async def get_pending_request_info_events(self) -> dict[str, WorkflowEvent[Any]]:
"""Get the mapping of request IDs to their corresponding request_info events."""
return dict(self._pending_request_info_events)

# endregion Request Info Events
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright (c) Microsoft. All rights reserved.

"""Serialization utilities for workflow execution.

This module provides thin wrappers around the core checkpoint encoding system
(encode_checkpoint_value / decode_checkpoint_value) from agent_framework._workflows.

The core checkpoint encoding uses pickle + base64 for type-safe roundtripping of
arbitrary Python objects (dataclasses, Pydantic models, Message, etc.) while
keeping JSON-native types (str, int, float, bool, None) as-is.

This module adds:
- serialize_value / deserialize_value: convenience aliases for encode/decode
- reconstruct_to_type: for HITL responses where external data (without type markers)
needs to be reconstructed to a known type
- _resolve_type: resolves 'module:class' type keys to Python types
"""

from __future__ import annotations

import importlib
import logging
from dataclasses import is_dataclass
from typing import Any

from agent_framework._workflows import decode_checkpoint_value, encode_checkpoint_value

logger = logging.getLogger(__name__)


def _resolve_type(type_key: str) -> type | None:
"""Resolve a 'module:class' type key to its Python type.

Args:
type_key: Fully qualified type reference in 'module_name:class_name' format.

Returns:
The resolved type, or None if resolution fails.
"""
try:
module_name, class_name = type_key.split(":", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name, None)
except Exception:
logger.debug("Could not resolve type %s", type_key)
return None


# ============================================================================
# Serialize / Deserialize
# ============================================================================


def serialize_value(value: Any) -> Any:
"""Serialize a value for JSON-compatible cross-activity communication.

Delegates to core checkpoint encoding which uses pickle + base64 for
non-JSON-native types (dataclasses, Pydantic models, Message, etc.).

Args:
value: Any Python value (primitive, dataclass, Pydantic model, Message, etc.)

Returns:
A JSON-serializable representation with embedded type metadata for reconstruction.
"""
return encode_checkpoint_value(value)


def deserialize_value(value: Any) -> Any:
"""Deserialize a value previously serialized with serialize_value().

Delegates to core checkpoint decoding which unpickles base64-encoded values
and verifies type integrity.

Args:
value: The serialized data (dict with pickle markers, list, or primitive)

Returns:
Reconstructed typed object if type metadata found, otherwise original value.
"""
return decode_checkpoint_value(value)


# ============================================================================
# HITL Type Reconstruction
# ============================================================================


def reconstruct_to_type(value: Any, target_type: type) -> Any:
"""Reconstruct a value to a known target type.

Used for HITL responses where external data (without checkpoint type markers)
needs to be reconstructed to a specific type determined by the response_type hint.

Tries strategies in order:
1. Return as-is if already the correct type
2. deserialize_value (for data with any type markers)
3. Pydantic model_validate (for Pydantic models)
4. Dataclass constructor (for dataclasses)

Args:
value: The value to reconstruct (typically a dict from JSON)
target_type: The expected type to reconstruct to

Returns:
Reconstructed value if possible, otherwise the original value
"""
if value is None:
return None

try:
if isinstance(value, target_type):
return value
except TypeError:
pass

if not isinstance(value, dict):
return value

# Try decoding if data has pickle markers (from checkpoint encoding)
decoded = deserialize_value(value)
if not isinstance(decoded, dict):
return decoded

# Try Pydantic model validation (for unmarked dicts, e.g., external HITL data)
if hasattr(target_type, "model_validate"):
try:
return target_type.model_validate(value)
except Exception:
logger.debug("Could not validate Pydantic model %s", target_type)

# Try dataclass construction (for unmarked dicts, e.g., external HITL data)
if is_dataclass(target_type) and isinstance(target_type, type):
try:
return target_type(**value)
except Exception:
logger.debug("Could not construct dataclass %s", target_type)

return value
Loading