Python: Fix workflow runner concurrent processing#4143
Python: Fix workflow runner concurrent processing#4143TaoChenOSU wants to merge 1 commit intomicrosoft:mainfrom
Conversation
Python Test Coverage Report •
Python Unit Test Overview
|
||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Pull request overview
This PR updates the Python workflow Runner’s message-delivery loop to improve concurrency so that messages from a single source can be delivered to multiple downstream targets in parallel (instead of being blocked behind one another).
Changes:
- Refactors
Runner._run_iteration()to parallelize message delivery across edge runners while preserving per-edge-runner message ordering. - Adds unit tests asserting (1) per-edge-runner ordering and (2) concurrent execution across multiple edge runners for the same source.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
python/packages/core/agent_framework/_workflows/_runner.py |
Changes _run_iteration() delivery scheduling to run different edge runners concurrently while keeping ordering within each edge runner. |
python/packages/core/tests/workflow/test_runner.py |
Adds tests for per-edge-runner ordering and concurrency across multiple edge runners. |
| async def _deliver_messages_for_edge_runner(edge_runner: EdgeRunner) -> None: | ||
| # Preserve message order per edge runner (and therefore per routed target path) | ||
| # while still allowing parallelism across different edge runners. | ||
| for message in source_messages: | ||
| await _deliver_message_inner(edge_runner, message) |
There was a problem hiding this comment.
This change only parallelizes delivery across different EdgeRunner instances. For workflows using a single FanOutEdgeRunner/SwitchCaseEdgeRunner (multiple targets inside one edge group), multiple targeted messages (message.target_id differs) will still be delivered sequentially because they share the same edge runner. That appears to leave the PR’s stated concurrency scenario unfixed for fan-out/switch-case graphs. Consider introducing concurrency keyed by (edge runner, resolved target id) or per-target delivery queues/locks so messages to different targets can run in parallel while preserving per-target ordering.
| iteration_task = asyncio.create_task(runner._run_iteration()) | ||
|
|
||
| await blocking_edge_runner.started.wait() | ||
| await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=0.2) |
There was a problem hiding this comment.
The 0.2s timeout on waiting for probe_edge_runner can be flaky on slower CI/event loops. Consider increasing the timeout (or structuring the test to avoid timing sensitivity) so the concurrency assertion doesn’t intermittently fail under load.
| await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=0.2) | |
| await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=2.0) |
| message_data = message.data | ||
| assert isinstance(message_data, MockMessage) | ||
| self.received.append(message_data.data) | ||
| await asyncio.sleep(0.005) |
There was a problem hiding this comment.
The extra sleep(0.005) makes the test slower without strengthening the ordering assertion (ordering is already guaranteed by awaiting each send). Consider removing it or using await asyncio.sleep(0) if you just need to yield control.
| await asyncio.sleep(0.005) |
| async def test_runner_run_iteration_delivers_different_edge_runners_concurrently() -> None: | ||
| """Test that different edge runners for the same source are executed concurrently.""" | ||
|
|
There was a problem hiding this comment.
The new concurrency test only covers parallelism across multiple edge runners. Given the PR motivation is about delivering to different targets concurrently, it would be good to add coverage for FanOutEdgeGroup/SwitchCaseEdgeGroup with targeted messages (target_id set) to ensure messages to different targets within a single edge runner can also be delivered concurrently (or to document that limitation explicitly).
Motivation and Context
Currently, the runner delivers messages from a source executor one after another regardless of the target. This means if a source sends more than one messages to multiple targets, the targets will be receiving those messages one after another. For example, executor A sends message X to executor B and message Y to executor C, message Y will not be delivered to executor C until executor B finishes processing message X. This breaks our concurrency promise.
Description
The runner will now deliver messages to all targets concurrently.
Summary of the current behavior:
Contribution Checklist