Skip to content

Python: (core): Add functional workflow API#4238

Open
moonbox3 wants to merge 8 commits intomicrosoft:mainfrom
moonbox3:functional-workflow-api
Open

Python: (core): Add functional workflow API#4238
moonbox3 wants to merge 8 commits intomicrosoft:mainfrom
moonbox3:functional-workflow-api

Conversation

@moonbox3
Copy link
Contributor

Motivation and Context

The functional API is a stepping stone between single-agent use and the full graph API. Users write workflows as plain async functions -- no executor classes, no edges, no builder patterns.

  • Add @workflow and @step decorators for writing workflows as plain async functions
  • Native Python control flow (if/else, loops, asyncio.gather) replaces graph concepts
  • @step is opt-in: plain functions work inside @workflow without it. Use @step on expensive operations (agent calls, API requests) to save their results and skip re-execution on
    HITL resume or crash recovery
  • Streaming support via run(stream=True)
  • HITL support via ctx.request_info() with replay
  • .as_agent() wraps a functional workflow as an agent-compatible object

A very basic example of the functional workflow API:

@workflow
async def pipeline(data: str) -> str:
    upper = await to_upper(data)
    return await reverse(upper)

result = await pipeline.run("hello")
print(result.get_outputs())  # ['OLLEH']

Note: @step is opt-in for functions where per-step checkpointing matters (for example, agent calls). Without @step, workflows still support HITL and checkpointing — functions just re-execute on resume.

@step
async def call_agent(prompt: str) -> str:
    return (await agent.run(prompt)).text

@workflow
async def pipeline(data: str, ctx: RunContext) -> str:
    result = await call_agent(data)        # saved by @step
    validated = await validate(result)      # plain function, re-runs on resume
    feedback = await ctx.request_info(...)  # HITL pause
    return await finalize(result, feedback)

ctx: RunContext is only needed when you use HITL (request_info), custom events (add_event), or state (get_state/set_state). Otherwise, omit it for a cleaner signature.

Description

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

Copilot AI review requested due to automatic review settings February 25, 2026 08:40
@markwallace-microsoft markwallace-microsoft added documentation Improvements or additions to documentation python labels Feb 25, 2026
@markwallace-microsoft
Copy link
Member

markwallace-microsoft commented Feb 25, 2026

Python Test Coverage

Python Test Coverage Report •
FileStmtsMissCoverMissing
packages/core/agent_framework
   _agents.py3424387%435, 439, 491, 856, 892, 908, 991–994, 1055–1057, 1178, 1194, 1196, 1209, 1215, 1251, 1253, 1262–1267, 1272, 1274, 1280–1281, 1288, 1290–1291, 1299–1300, 1303–1305, 1313–1314, 1316, 1321, 1323
packages/core/agent_framework/_workflows
   _functional.py3411495%686, 711, 714–715, 723–724, 784–785, 791, 1071–1072, 1074, 1094, 1096
packages/orchestrations/agent_framework_orchestrations
   _handoff.py3855884%105–106, 108, 163–173, 175, 177, 179, 184, 278, 284, 316, 339, 361, 417, 442, 500, 532, 590–591, 623, 631, 635–636, 674–676, 681–683, 801, 804, 817, 879, 884, 891, 901, 903, 922, 924, 1006–1007, 1039–1040, 1122, 1129, 1201–1202, 1204
TOTAL22563277287% 

Python Unit Test Overview

Tests Skipped Failures Errors Time
4780 247 💤 0 ❌ 0 🔥 1m 18s ⏱️

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a functional workflow API as an alternative to the existing graph-based workflow API. The functional approach allows users to write workflows as plain async functions decorated with @workflow, using native Python control flow (if/else, loops, asyncio.gather) instead of explicit graph construction with executors and edges. The @step decorator is optional and provides per-step checkpointing, caching, and observability.

Changes:

  • Added core implementation (_functional.py) with @workflow, @step decorators, RunContext, FunctionalWorkflow, and FunctionalWorkflowAgent classes
  • Added comprehensive test suite (40+ test cases covering basic execution, HITL, checkpointing, streaming, error handling, edge cases)
  • Added 6 sample files demonstrating functional workflows (basic pipeline, streaming, parallel execution, checkpointing, HITL, agent integration)
  • Restructured getting-started samples to introduce functional workflows before graph workflows
  • Updated exports in __init__.py to expose new functional API symbols

Reviewed changes

Copilot reviewed 13 out of 14 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
python/packages/core/agent_framework/_workflows/_functional.py Core implementation of functional workflow API with RunContext, StepWrapper, FunctionalWorkflow, and FunctionalWorkflowAgent classes (1105 lines)
python/packages/core/agent_framework/__init__.py Added exports for FunctionalWorkflow, FunctionalWorkflowAgent, RunContext, StepWrapper, step, and workflow
python/packages/core/tests/workflow/test_functional_workflow.py Comprehensive test suite covering basic execution, events, parallelism, HITL, errors, streaming, state, checkpointing, control flow, and edge cases (1031 lines)
python/samples/01-get-started/05_first_functional_workflow.py Getting started sample demonstrating basic functional workflow with plain async functions
python/samples/01-get-started/06_first_graph_workflow.py Renamed and updated graph workflow sample (previously 05_first_workflow.py)
python/samples/01-get-started/07_host_your_agent.py Renamed agent hosting sample (previously 06_host_your_agent.py)
python/samples/01-get-started/README.md Updated sample listing to include both functional and graph workflow samples
python/samples/03-workflows/functional/basic_pipeline.py Sample showing simplest sequential pipeline with @workflow decorator
python/samples/03-workflows/functional/basic_streaming_pipeline.py Sample demonstrating streaming workflow events with run(stream=True)
python/samples/03-workflows/functional/parallel_pipeline.py Sample showing fan-out/fan-in with asyncio.gather
python/samples/03-workflows/functional/steps_and_checkpointing.py Sample explaining @step decorator for per-step checkpointing and observability
python/samples/03-workflows/functional/hitl_review.py Sample demonstrating HITL with ctx.request_info() and resume
python/samples/03-workflows/functional/agent_integration.py Sample showing agent calls inside workflows and .as_agent() wrapper
python/samples/03-workflows/README.md Added functional workflow section to samples overview

@workflow
async def data_pipeline(url: str) -> str:
"""A simple sequential data pipeline."""
raw = await fetch_data(url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be usefull to also demonstrate that because this is just a function, you do not have to wrap everything in steps, you can do some of the manipulation just as simple code between steps, making it a lot simpler

return f"Draft document about '{topic}': Lorem ipsum dolor sit amet..."


@step
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how this does step compare to handler is there a lot of overlap and could we reuse steps in graphs, or handler here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want them to be conceptually different things. A @handler handles a message routed to it by the graph: it's reactive, tied to the executor contract. A @step marks a function call in a sequential flow: it's proactive, just "I called this function as step N." Different mental models, different names.

print(f"State: {result1.get_final_state()}")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen here if state != WorkflowRunState.IDLE_WITH_PENDING_REQUESTS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If request_info() was never reached (an early return), the workflow completes normally with state IDLE and get_request_info_events() returns an empty list. Added a comment in the sample clarifying this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so requests would be None or would the get... call raise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_request_info_events() returns an empty list. It's a filter over the event stream, so no requests means []. No exception raised.

``asyncio.gather``) instead of a graph-based topology.

A ``@workflow``-decorated async function receives its input as the first
positional argument. If the function needs HITL (``request_info``), custom
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be:

Suggested change
positional argument. If the function needs HITL (``request_info``), custom
positional argument. If a step needs HITL (``request_info``), custom

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring is describing the @workflow-decorated function, not a @step. The workflow function is what receives RunContext and calls request_info(). A @step doesn't receive context — so "If the function needs HITL" is referring to the workflow function.

class RunContext:
"""Execution context injected into ``@workflow`` functions.

Every ``@workflow`` invocation receives a ``RunContext`` instance that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused on which entity receives the context. Is it the workflow or the steps or both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the @workflow function receives RunContext. Steps don't — they're plain async functions with caching. If a step needs HITL input, the workflow calls request_info() and passes the result to the step as an argument.

Comment on lines +114 to +117
@workflow
async def hitl_pipeline(data: str, ctx: RunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My brain maps @workflow to the graph-based Workflow and @step to Executor. I can see the benefit of allowing request_info at the workflow level. It's kind of like an executor whose sole purpose is to get user feedback. But should we also allow request_info inside a @step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By design, request_info lives at the workflow level — the workflow is the orchestrator that decides when to pause for input. Steps are meant to be self-contained units of work. If a step needs human input, the workflow calls request_info() first and passes the result to the step. This keeps steps simple and testable in isolation (no framework dependency).

def __init__(self, func: Callable[..., Awaitable[R]], *, name: str | None = None) -> None:
if not inspect.iscoroutinefunction(func):
raise TypeError(
f"@step can only decorate async functions, but '{func.__name__}' is not a coroutine function."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably not super important but should we also support not async methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but it adds complexity for limited benefit — the workflow itself is async, so wrapping a sync function in asyncio.to_thread() or similar is straightforward for users. For now, @step requires async. We can revisit if users ask for it.

functools.update_wrapper(self, func) # type: ignore[arg-type]

# ------------------------------------------------------------------
# run() — same overloaded interface as graph Workflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extract these methods and make both workflow types get them for free?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this is worth considering. The run() overload pattern and _finalize_events logic are similar between FunctionalWorkflow and graph Workflow. Worth a follow-up refactoring PR to extract a shared base, but out of scope for this one.

self._last_step_cache = dict(ctx._step_cache)

# Yield collected events
for event in ctx._get_events():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is this true streaming?

It looks like all events have been produced at this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently events are collected during execution and yielded after. Within a single function execution there's no natural suspension point to stream intermediate events (unlike graph workflows where each superstep boundary is a yield point). However, @step events are collected incrementally, and the async generator yields them as they're produced. For true mid-function streaming, users can use ctx.add_event() — though these are still batched by the current implementation. This is a known limitation we can improve in a follow-up.

cache_key = ctx._get_step_cache_key(self.name)
found, cached = ctx._get_cached_result(cache_key)
invocation_data = deepcopy({"args": args, "kwargs": kwargs}) if args or kwargs else None
if found:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check if the input arguments have the same values?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be scenarios where some steps are checkpointed and some are not (the ones without the decorator). If a checkpointed step depends on the output of a non-checkpointed step and its output changes, it may lead to incorrect results.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache key is (step_name, call_index) and relies on the documented determinism requirement — if the workflow function is deterministic w.r.t. step results, the arguments will always match on replay. Adding argument comparison would be defensive but expensive (deep equality on arbitrary objects) and could give false negatives for objects that don't implement __eq__.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is by design and documented: workflow functions must be deterministic w.r.t. step results. If a non-@step function is nondeterministic and a cached @step depends on it, results can diverge. The fix is to use @step on any function whose output affects downstream cached steps. This tradeoff keeps the simple case simple.



# Plain async functions — no decorators needed
async def to_upper_case(text: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this sample will become even simpler if we can remove the asyncs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the sync support discussion — supporting sync functions would simplify the getting-started sample, but @workflow is inherently async (it runs an async event loop internally). We can consider sync support as a follow-up.

@moonbox3
Copy link
Contributor Author

Btw, @eavanvalkenburg and @TaoChenOSU I think it would be best to stick this functional API in to its own package. We want to get some more signal around the APIs and use of it before we deem it "GA worthy," IMO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python: Add functional workflow API

5 participants