From e648ca4dccd808ba895e924cbd63290da82e2164 Mon Sep 17 00:00:00 2001 From: Lalatendu Mohanty Date: Fri, 12 Dec 2025 14:05:06 -0500 Subject: [PATCH] feat(build): optimize max_workers based on dependency graph parallelism Calculate optimal max_workers by analyzing the dependency graph to find the maximum number of packages that can be built in parallel. Uses the smaller of CPU-based default and graph-based maximum to avoid allocating idle worker threads. closes #880 Co-Authored-By: Claude Signed-off-by: Lalatendu Mohanty --- src/fromager/commands/build.py | 57 +++++++++++++++++++++++++- tests/test_commands.py | 52 +++++++++++++++++++++++ tests/test_dependency_graph.py | 75 ++++++++++++++++++++++++++++++++++ 3 files changed, 183 insertions(+), 1 deletion(-) diff --git a/src/fromager/commands/build.py b/src/fromager/commands/build.py index 08c7d042..4c903179 100644 --- a/src/fromager/commands/build.py +++ b/src/fromager/commands/build.py @@ -3,6 +3,7 @@ import datetime import json import logging +import os import pathlib import sys import threading @@ -560,6 +561,55 @@ def _nodes_to_string(nodes: typing.Iterable[dependency_graph.DependencyNode]) -> return ", ".join(sorted(node.key for node in nodes)) +def _calculate_optimal_max_workers( + graph: dependency_graph.DependencyGraph, + wkctx: context.WorkContext, + user_max_workers: int | None, +) -> int: + """Calculate optimal max_workers based on graph parallelism. + + Analyzes the dependency graph to determine the maximum number of packages + that can be built in parallel at any point. Uses this to optimize the + number of worker threads, avoiding wasteful allocation of idle workers. + + Args: + graph: The dependency graph to analyze. + wkctx: The work context for topology calculation. + user_max_workers: User-specified max_workers, or None for automatic. + + Returns: + The optimal number of workers to use. + """ + # Analyze the topology to find maximum parallelism available + topo_for_analysis = graph.get_build_topology(context=wkctx) + max_parallelism = max( + (len(batch) for batch in topo_for_analysis.static_batches()), default=1 + ) + + if user_max_workers is None: + # Use the smaller of CPU-based default and graph-based maximum + cpu_default = min(32, (os.cpu_count() or 1) + 4) + optimal_workers = min(cpu_default, max_parallelism) + logger.info( + "graph allows max %i parallel builds, using %i workers (cpu default: %i)", + max_parallelism, + optimal_workers, + cpu_default, + ) + return optimal_workers + else: + # User specified max_workers, log if it exceeds graph parallelism + if user_max_workers > max_parallelism: + logger.info( + "user specified %i workers, but graph only allows %i parallel builds", + user_max_workers, + max_parallelism, + ) + else: + logger.info("using user-specified %i workers", user_max_workers) + return user_max_workers + + @click.command() @click.option( "-f", @@ -633,13 +683,18 @@ def build_parallel( _nodes_to_string(topo.dependency_nodes), ) + # Calculate optimal max_workers based on graph parallelism + optimal_max_workers = _calculate_optimal_max_workers(graph, wkctx, max_workers) + future2node: dict[BuildSequenceEntryFuture, dependency_graph.DependencyNode] = {} built_entries: list[BuildSequenceEntry] = [] rounds: int = 0 with ( progress.progress_context(total=len(graph)) as progressbar, - concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor, + concurrent.futures.ThreadPoolExecutor( + max_workers=optimal_max_workers + ) as executor, ): def update_progressbar_cb(future: concurrent.futures.Future) -> None: diff --git a/tests/test_commands.py b/tests/test_commands.py index 7617e308..b0df7565 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,8 +1,12 @@ +import pathlib import typing +from unittest import mock import click +from fromager import context from fromager.commands import bootstrap, build +from fromager.dependency_graph import DependencyGraph def get_option_names(cmd: click.Command) -> typing.Iterable[str]: @@ -19,3 +23,51 @@ def test_bootstrap_parallel_options() -> None: expected.discard("graph_file") assert set(get_option_names(bootstrap.bootstrap_parallel)) == expected + + +def test_calculate_optimal_max_workers_auto( + tmp_context: context.WorkContext, + e2e_path: pathlib.Path, +) -> None: + """Test automatic max_workers calculation based on graph parallelism.""" + graph = DependencyGraph.from_file(e2e_path / "build-parallel" / "graph.json") + + # When user_max_workers is None, should use min(cpu_default, max_parallelism) + with mock.patch("os.cpu_count", return_value=8): + # cpu_default = min(32, 8+4) = 12 + # The graph has batches with max parallelism of 6 (from test_e2e_parallel_graph) + result = build._calculate_optimal_max_workers(graph, tmp_context, None) + # Should use the smaller of cpu_default (12) and max_parallelism (6) + assert result == 6 + + +def test_calculate_optimal_max_workers_user_specified( + tmp_context: context.WorkContext, + e2e_path: pathlib.Path, +) -> None: + """Test that user-specified max_workers is respected.""" + graph = DependencyGraph.from_file(e2e_path / "build-parallel" / "graph.json") + + # User specifies 4 workers + result = build._calculate_optimal_max_workers(graph, tmp_context, 4) + assert result == 4 + + # User specifies more workers than graph parallelism allows + result = build._calculate_optimal_max_workers(graph, tmp_context, 100) + assert result == 100 # Still respects user choice, just logs a warning + + +def test_calculate_optimal_max_workers_limited_by_cpu( + tmp_context: context.WorkContext, + e2e_path: pathlib.Path, +) -> None: + """Test when CPU count limits the workers (cpu_default < max_parallelism).""" + graph = DependencyGraph.from_file(e2e_path / "build-parallel" / "graph.json") + + # Simulate a machine with only 1 CPU + with mock.patch("os.cpu_count", return_value=1): + # cpu_default = min(32, 1+4) = 5 + # max_parallelism = 6 (largest batch in graph) + result = build._calculate_optimal_max_workers(graph, tmp_context, None) + # Should use min(5, 6) = 5 (limited by CPU) + assert result == 5 diff --git a/tests/test_dependency_graph.py b/tests/test_dependency_graph.py index 8863bd6f..b2e04f9a 100644 --- a/tests/test_dependency_graph.py +++ b/tests/test_dependency_graph.py @@ -468,3 +468,78 @@ def test_tracking_topology_sorter_empty_graph() -> None: with pytest.raises(ValueError) as excinfo: topo.get_available() assert "topology is not active" in str(excinfo.value) + + +def test_max_parallelism_calculation() -> None: + """Test that max parallelism can be calculated from static_batches. + + This test validates the approach used in build_parallel to optimize + max_workers based on graph structure. + """ + a = mknode("a") + b = mknode("b") + c = mknode("c") + d = mknode("d") + e = mknode("e") + f = mknode("f") + + # Graph: a->[b,c], b->[c,d], d->[e], f->[d] => batches {c,e}, {d}, {b,f}, {a} + graph: typing.Mapping[DependencyNode, typing.Iterable[DependencyNode]] + graph = { + a: [b, c], + b: [c, d], + d: [e], + f: [d], + } + + topo = TrackingTopologicalSorter(graph) + batches = list(topo.static_batches()) + + # Calculate max parallelism - the largest batch size + max_parallelism = max(len(batch) for batch in batches) + + # Expected batches: {c, e}, {d}, {b, f}, {a} + # Max parallelism should be 2 (from {c, e} or {b, f}) + assert max_parallelism == 2 + + +def test_max_parallelism_highly_parallel_graph() -> None: + """Test max parallelism with a highly parallel graph (no dependencies).""" + nodes = [mknode(f"node_{i}") for i in range(10)] + + # No dependencies - all nodes can build in parallel + graph: typing.Mapping[DependencyNode, typing.Iterable[DependencyNode]] + graph = {node: [] for node in nodes} + + topo = TrackingTopologicalSorter(graph) + batches = list(topo.static_batches()) + + # All 10 nodes should be in one batch + assert len(batches) == 1 + max_parallelism = max(len(batch) for batch in batches) + assert max_parallelism == 10 + + +def test_max_parallelism_sequential_graph() -> None: + """Test max parallelism with a fully sequential graph.""" + a = mknode("a") + b = mknode("b") + c = mknode("c") + d = mknode("d") + + # Fully sequential: a -> b -> c -> d + graph: typing.Mapping[DependencyNode, typing.Iterable[DependencyNode]] + graph = { + a: [b], + b: [c], + c: [d], + d: [], + } + + topo = TrackingTopologicalSorter(graph) + batches = list(topo.static_batches()) + + # Each batch should have exactly 1 node + assert len(batches) == 4 + max_parallelism = max(len(batch) for batch in batches) + assert max_parallelism == 1 # Only 1 worker needed for fully sequential