-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Python: Fix workflow runner concurrent processing #4143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -150,6 +150,82 @@ async def test_runner_run_until_convergence_not_completed(): | |||||
| assert event.type != "status" or event.state != WorkflowRunState.IDLE | ||||||
|
|
||||||
|
|
||||||
| async def test_runner_run_iteration_preserves_message_order_per_edge_runner() -> None: | ||||||
| """Test that _run_iteration preserves message order to the same target path.""" | ||||||
|
|
||||||
| class RecordingEdgeRunner: | ||||||
| def __init__(self) -> None: | ||||||
| self.received: list[int] = [] | ||||||
|
|
||||||
| async def send_message(self, message: WorkflowMessage, state: State, ctx: RunnerContext) -> bool: | ||||||
| message_data = message.data | ||||||
| assert isinstance(message_data, MockMessage) | ||||||
| self.received.append(message_data.data) | ||||||
| await asyncio.sleep(0.005) | ||||||
|
||||||
| await asyncio.sleep(0.005) |
Copilot
AI
Feb 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new concurrency test only covers parallelism across multiple edge runners. Given the PR motivation is about delivering to different targets concurrently, it would be good to add coverage for FanOutEdgeGroup/SwitchCaseEdgeGroup with targeted messages (target_id set) to ensure messages to different targets within a single edge runner can also be delivered concurrently (or to document that limitation explicitly).
Copilot
AI
Feb 21, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 0.2s timeout on waiting for probe_edge_runner can be flaky on slower CI/event loops. Consider increasing the timeout (or structuring the test to avoid timing sensitivity) so the concurrency assertion doesn’t intermittently fail under load.
| await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=0.2) | |
| await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=2.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change only parallelizes delivery across different EdgeRunner instances. For workflows using a single FanOutEdgeRunner/SwitchCaseEdgeRunner (multiple targets inside one edge group), multiple targeted messages (message.target_id differs) will still be delivered sequentially because they share the same edge runner. That appears to leave the PR’s stated concurrency scenario unfixed for fan-out/switch-case graphs. Consider introducing concurrency keyed by (edge runner, resolved target id) or per-target delivery queues/locks so messages to different targets can run in parallel while preserving per-target ordering.