Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion src/fromager/commands/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import datetime
import json
import logging
import os
import pathlib
import sys
import threading
Expand Down Expand Up @@ -560,6 +561,55 @@ def _nodes_to_string(nodes: typing.Iterable[dependency_graph.DependencyNode]) ->
return ", ".join(sorted(node.key for node in nodes))


def _calculate_optimal_max_workers(
graph: dependency_graph.DependencyGraph,
wkctx: context.WorkContext,
user_max_workers: int | None,
) -> int:
"""Calculate optimal max_workers based on graph parallelism.

Analyzes the dependency graph to determine the maximum number of packages
that can be built in parallel at any point. Uses this to optimize the
number of worker threads, avoiding wasteful allocation of idle workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure I understand this change. I think you're saying the batch size from the graph should be factored in because there's no point in setting up a worker pool larger than the number of jobs we would try to run at one time. Is that it?

@tiran had an idea at one point to continuously add tasks to the pool as packages finished their build. I don't know if we ever implemented that. If we did that, we might end up wanting to build more than the maximum batch size because finishing 1 item in a batch might let us build another set of items that would be considered to be in another batch based on this logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're saying the batch size from the graph should be factored in because there's no point in setting up a worker pool larger than the number of jobs we would try to run at one time. Is that it?

Yes, if we can only run 2 parallel builds (from the way graph is ) but we are setting 6 workers , it is not useful right. It is not a critical issue but a code improvement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Python's worker pool implementation actually create all of the workers based on the size passed in, or does it just limit the number of workers to the size given?


Args:
graph: The dependency graph to analyze.
wkctx: The work context for topology calculation.
user_max_workers: User-specified max_workers, or None for automatic.

Returns:
The optimal number of workers to use.
"""
# Analyze the topology to find maximum parallelism available
topo_for_analysis = graph.get_build_topology(context=wkctx)
max_parallelism = max(
(len(batch) for batch in topo_for_analysis.static_batches()), default=1
)

if user_max_workers is None:
# Use the smaller of CPU-based default and graph-based maximum
cpu_default = min(32, (os.cpu_count() or 1) + 4)
optimal_workers = min(cpu_default, max_parallelism)
logger.info(
"graph allows max %i parallel builds, using %i workers (cpu default: %i)",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know that I would understand this log message if I was reading it without having seen the code. Could you log each value separately with a short description before going through this if statement, then log the actual selected worker pool size being returned? Something like the messages below, for example.

You could add messages like "batch size from graph exceeds CPU count" or whatever to help with debugging, too, but just having each number separately would make it easier to understand.

"batch size from graph: %d"
"CPU count: %d"
"minimum pool size: %d" (from cpu_default)
"user requested pool size: %d"
"optimal worker pool size: %d"

max_parallelism,
optimal_workers,
cpu_default,
)
return optimal_workers
else:
# User specified max_workers, log if it exceeds graph parallelism
if user_max_workers > max_parallelism:
logger.info(
"user specified %i workers, but graph only allows %i parallel builds",
user_max_workers,
max_parallelism,
)
else:
logger.info("using user-specified %i workers", user_max_workers)
return user_max_workers


@click.command()
@click.option(
"-f",
Expand Down Expand Up @@ -633,13 +683,18 @@ def build_parallel(
_nodes_to_string(topo.dependency_nodes),
)

# Calculate optimal max_workers based on graph parallelism
optimal_max_workers = _calculate_optimal_max_workers(graph, wkctx, max_workers)

future2node: dict[BuildSequenceEntryFuture, dependency_graph.DependencyNode] = {}
built_entries: list[BuildSequenceEntry] = []
rounds: int = 0

with (
progress.progress_context(total=len(graph)) as progressbar,
concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor,
concurrent.futures.ThreadPoolExecutor(
max_workers=optimal_max_workers
) as executor,
):

def update_progressbar_cb(future: concurrent.futures.Future) -> None:
Expand Down
52 changes: 52 additions & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import pathlib
import typing
from unittest import mock

import click

from fromager import context
from fromager.commands import bootstrap, build
from fromager.dependency_graph import DependencyGraph


def get_option_names(cmd: click.Command) -> typing.Iterable[str]:
Expand All @@ -19,3 +23,51 @@ def test_bootstrap_parallel_options() -> None:
expected.discard("graph_file")

assert set(get_option_names(bootstrap.bootstrap_parallel)) == expected


def test_calculate_optimal_max_workers_auto(
tmp_context: context.WorkContext,
e2e_path: pathlib.Path,
) -> None:
"""Test automatic max_workers calculation based on graph parallelism."""
graph = DependencyGraph.from_file(e2e_path / "build-parallel" / "graph.json")

# When user_max_workers is None, should use min(cpu_default, max_parallelism)
with mock.patch("os.cpu_count", return_value=8):
# cpu_default = min(32, 8+4) = 12
# The graph has batches with max parallelism of 6 (from test_e2e_parallel_graph)
result = build._calculate_optimal_max_workers(graph, tmp_context, None)
# Should use the smaller of cpu_default (12) and max_parallelism (6)
assert result == 6


def test_calculate_optimal_max_workers_user_specified(
tmp_context: context.WorkContext,
e2e_path: pathlib.Path,
) -> None:
"""Test that user-specified max_workers is respected."""
graph = DependencyGraph.from_file(e2e_path / "build-parallel" / "graph.json")

# User specifies 4 workers
result = build._calculate_optimal_max_workers(graph, tmp_context, 4)
assert result == 4

# User specifies more workers than graph parallelism allows
result = build._calculate_optimal_max_workers(graph, tmp_context, 100)
assert result == 100 # Still respects user choice, just logs a warning


def test_calculate_optimal_max_workers_limited_by_cpu(
tmp_context: context.WorkContext,
e2e_path: pathlib.Path,
) -> None:
"""Test when CPU count limits the workers (cpu_default < max_parallelism)."""
graph = DependencyGraph.from_file(e2e_path / "build-parallel" / "graph.json")

# Simulate a machine with only 1 CPU
with mock.patch("os.cpu_count", return_value=1):
# cpu_default = min(32, 1+4) = 5
# max_parallelism = 6 (largest batch in graph)
result = build._calculate_optimal_max_workers(graph, tmp_context, None)
# Should use min(5, 6) = 5 (limited by CPU)
assert result == 5
75 changes: 75 additions & 0 deletions tests/test_dependency_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,3 +468,78 @@ def test_tracking_topology_sorter_empty_graph() -> None:
with pytest.raises(ValueError) as excinfo:
topo.get_available()
assert "topology is not active" in str(excinfo.value)


def test_max_parallelism_calculation() -> None:
"""Test that max parallelism can be calculated from static_batches.

This test validates the approach used in build_parallel to optimize
max_workers based on graph structure.
"""
a = mknode("a")
b = mknode("b")
c = mknode("c")
d = mknode("d")
e = mknode("e")
f = mknode("f")

# Graph: a->[b,c], b->[c,d], d->[e], f->[d] => batches {c,e}, {d}, {b,f}, {a}
graph: typing.Mapping[DependencyNode, typing.Iterable[DependencyNode]]
graph = {
a: [b, c],
b: [c, d],
d: [e],
f: [d],
}

topo = TrackingTopologicalSorter(graph)
batches = list(topo.static_batches())

# Calculate max parallelism - the largest batch size
max_parallelism = max(len(batch) for batch in batches)

# Expected batches: {c, e}, {d}, {b, f}, {a}
# Max parallelism should be 2 (from {c, e} or {b, f})
assert max_parallelism == 2


def test_max_parallelism_highly_parallel_graph() -> None:
"""Test max parallelism with a highly parallel graph (no dependencies)."""
nodes = [mknode(f"node_{i}") for i in range(10)]

# No dependencies - all nodes can build in parallel
graph: typing.Mapping[DependencyNode, typing.Iterable[DependencyNode]]
graph = {node: [] for node in nodes}

topo = TrackingTopologicalSorter(graph)
batches = list(topo.static_batches())

# All 10 nodes should be in one batch
assert len(batches) == 1
max_parallelism = max(len(batch) for batch in batches)
assert max_parallelism == 10


def test_max_parallelism_sequential_graph() -> None:
"""Test max parallelism with a fully sequential graph."""
a = mknode("a")
b = mknode("b")
c = mknode("c")
d = mknode("d")

# Fully sequential: a -> b -> c -> d
graph: typing.Mapping[DependencyNode, typing.Iterable[DependencyNode]]
graph = {
a: [b],
b: [c],
c: [d],
d: [],
}

topo = TrackingTopologicalSorter(graph)
batches = list(topo.static_batches())

# Each batch should have exactly 1 node
assert len(batches) == 4
max_parallelism = max(len(batch) for batch in batches)
assert max_parallelism == 1 # Only 1 worker needed for fully sequential
Loading