-
Notifications
You must be signed in to change notification settings - Fork 39
feat(build): optimize max_workers based on dependency graph parallelism #881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| import datetime | ||
| import json | ||
| import logging | ||
| import os | ||
| import pathlib | ||
| import sys | ||
| import threading | ||
|
|
@@ -560,6 +561,55 @@ def _nodes_to_string(nodes: typing.Iterable[dependency_graph.DependencyNode]) -> | |
| return ", ".join(sorted(node.key for node in nodes)) | ||
|
|
||
|
|
||
| def _calculate_optimal_max_workers( | ||
| graph: dependency_graph.DependencyGraph, | ||
| wkctx: context.WorkContext, | ||
| user_max_workers: int | None, | ||
| ) -> int: | ||
| """Calculate optimal max_workers based on graph parallelism. | ||
|
|
||
| Analyzes the dependency graph to determine the maximum number of packages | ||
| that can be built in parallel at any point. Uses this to optimize the | ||
| number of worker threads, avoiding wasteful allocation of idle workers. | ||
|
|
||
| Args: | ||
| graph: The dependency graph to analyze. | ||
| wkctx: The work context for topology calculation. | ||
| user_max_workers: User-specified max_workers, or None for automatic. | ||
|
|
||
| Returns: | ||
| The optimal number of workers to use. | ||
| """ | ||
| # Analyze the topology to find maximum parallelism available | ||
| topo_for_analysis = graph.get_build_topology(context=wkctx) | ||
| max_parallelism = max( | ||
| (len(batch) for batch in topo_for_analysis.static_batches()), default=1 | ||
| ) | ||
|
|
||
| if user_max_workers is None: | ||
| # Use the smaller of CPU-based default and graph-based maximum | ||
| cpu_default = min(32, (os.cpu_count() or 1) + 4) | ||
| optimal_workers = min(cpu_default, max_parallelism) | ||
| logger.info( | ||
| "graph allows max %i parallel builds, using %i workers (cpu default: %i)", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know that I would understand this log message if I was reading it without having seen the code. Could you log each value separately with a short description before going through this if statement, then log the actual selected worker pool size being returned? Something like the messages below, for example. You could add messages like "batch size from graph exceeds CPU count" or whatever to help with debugging, too, but just having each number separately would make it easier to understand. |
||
| max_parallelism, | ||
| optimal_workers, | ||
| cpu_default, | ||
| ) | ||
| return optimal_workers | ||
| else: | ||
| # User specified max_workers, log if it exceeds graph parallelism | ||
| if user_max_workers > max_parallelism: | ||
| logger.info( | ||
| "user specified %i workers, but graph only allows %i parallel builds", | ||
| user_max_workers, | ||
| max_parallelism, | ||
| ) | ||
| else: | ||
| logger.info("using user-specified %i workers", user_max_workers) | ||
| return user_max_workers | ||
|
|
||
|
|
||
| @click.command() | ||
| @click.option( | ||
| "-f", | ||
|
|
@@ -633,13 +683,18 @@ def build_parallel( | |
| _nodes_to_string(topo.dependency_nodes), | ||
| ) | ||
|
|
||
| # Calculate optimal max_workers based on graph parallelism | ||
| optimal_max_workers = _calculate_optimal_max_workers(graph, wkctx, max_workers) | ||
|
|
||
| future2node: dict[BuildSequenceEntryFuture, dependency_graph.DependencyNode] = {} | ||
| built_entries: list[BuildSequenceEntry] = [] | ||
| rounds: int = 0 | ||
|
|
||
| with ( | ||
| progress.progress_context(total=len(graph)) as progressbar, | ||
| concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor, | ||
| concurrent.futures.ThreadPoolExecutor( | ||
| max_workers=optimal_max_workers | ||
| ) as executor, | ||
| ): | ||
|
|
||
| def update_progressbar_cb(future: concurrent.futures.Future) -> None: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure I understand this change. I think you're saying the batch size from the graph should be factored in because there's no point in setting up a worker pool larger than the number of jobs we would try to run at one time. Is that it?
@tiran had an idea at one point to continuously add tasks to the pool as packages finished their build. I don't know if we ever implemented that. If we did that, we might end up wanting to build more than the maximum batch size because finishing 1 item in a batch might let us build another set of items that would be considered to be in another batch based on this logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if we can only run 2 parallel builds (from the way graph is ) but we are setting 6 workers , it is not useful right. It is not a critical issue but a code improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Python's worker pool implementation actually create all of the workers based on the size passed in, or does it just limit the number of workers to the size given?