Python: Add bulk executor registration method to WorkflowBuilder#3565
Python: Add bulk executor registration method to WorkflowBuilder#3565holtvogt wants to merge 1 commit intomicrosoft:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds a bulk executor registration method to the Python WorkflowBuilder class to reduce boilerplate when registering multiple executors. The change addresses issue #3551 by providing a dict-based API as an alternative to multiple individual register_executor() calls.
Changes:
- Added
register_executors()method that accepts adict[str, Callable[[], Executor]]for bulk registration - Added unit test
test_register_executors_bulk()to verify the new method works correctly and supports method chaining
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
python/packages/core/agent_framework/_workflows/_workflow_builder.py |
Added register_executors() method with proper validation, documentation, and examples following existing patterns |
python/packages/core/tests/workflow/test_workflow_builder.py |
Added basic test for bulk registration functionality and chaining behavior |
| def test_register_executors_bulk(): | ||
| """Test bulk executor registration with a mapping of factories.""" | ||
| builder = WorkflowBuilder() | ||
|
|
||
| result = builder.register_executors({ | ||
| "ExecutorA": lambda: MockExecutor(id="ExecutorA"), | ||
| "ExecutorB": lambda: MockExecutor(id="ExecutorB"), | ||
| }) | ||
|
|
||
| assert result is builder | ||
|
|
||
| workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build() | ||
|
|
||
| assert "ExecutorA" in workflow.executors | ||
| assert "ExecutorB" in workflow.executors | ||
| assert workflow.start_executor_id == "ExecutorA" | ||
|
|
There was a problem hiding this comment.
The current implementation validates all names for duplicates before adding any to the registry, which is good. However, dictionary iteration order is guaranteed in Python 3.7+, so if an error occurs during validation, the behavior is consistent. Consider adding a test case to verify the error handling behavior when attempting to register executors with duplicate names (e.g., mixing register_executor followed by register_executors, or register_executors with an already-registered name).
There was a problem hiding this comment.
Good point. We can introduce a guard variable such as _uses_bulk_executor_registration and set it to True when register_executors() is used. The register_executor() method would then check this flag accordingly?
| def test_register_executors_bulk(): | ||
| """Test bulk executor registration with a mapping of factories.""" | ||
| builder = WorkflowBuilder() | ||
|
|
||
| result = builder.register_executors({ | ||
| "ExecutorA": lambda: MockExecutor(id="ExecutorA"), | ||
| "ExecutorB": lambda: MockExecutor(id="ExecutorB"), | ||
| }) | ||
|
|
||
| assert result is builder | ||
|
|
||
| workflow = builder.set_start_executor("ExecutorA").add_edge("ExecutorA", "ExecutorB").build() | ||
|
|
||
| assert "ExecutorA" in workflow.executors | ||
| assert "ExecutorB" in workflow.executors | ||
| assert workflow.start_executor_id == "ExecutorA" | ||
|
|
There was a problem hiding this comment.
Consider adding a test case for edge cases such as an empty dictionary being passed to register_executors. This would verify the method handles this gracefully (which it currently does by simply returning self without error).
|
|
||
| return self | ||
|
|
||
| def register_executors(self, executor_factories: dict[str, Callable[[], Executor]]) -> Self: |
There was a problem hiding this comment.
We can replace the original register_executor API with this new one.
There was a problem hiding this comment.
@TaoChenOSU Okay! Do we plan to add deprecation notes for the old approach, or are we considering a big bang removal of register_executor() entirely? The latter would be a larger change, since it would require touching more files.
There was a problem hiding this comment.
We can add a deprecation note to the old one.
There was a problem hiding this comment.
I'd also simply prefer us to remove the old one. We're trying to get the API as clean as possible, as quickly as possible. Breaking changes are acceptable right now.
|
Hi, thank you for your contribution! This is actually something we wanted to improve too! |
|
|
||
| return self | ||
|
|
||
| def register_executors(self, executor_factories: dict[str, Callable[[], Executor]]) -> Self: |
There was a problem hiding this comment.
Can we also add async callable support, too? Currently when one uses the AzureAIClient, it's expected that creating an agent is an asynchronous process - therefore we should allow for an asynchronous callable, and handle it properly.
Motivation and Context
Registering many executors currently needs repetitive
register_executorcalls. It solves this by adding a dict-based bulk registration method that reduces boilerplate. This is most useful in complex workflows with many executors.Fixes: #3551
Description
Added a bulk executor registration API to
WorkflowBuilderthat accepts adict[str, Callable[[], Executor]]. This reduces boilerplate for workflows that register many executors, while keeping the existingregister_executor()API intact. Also added a small unit test covering the new bulk registration path and chaining behavior.Contribution Checklist