From 7819ba7d854b02568909d4fea0986bfffcf9a2ad Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Fri, 4 Jul 2025 10:02:29 -0400 Subject: [PATCH 1/2] feat: generate kinds in parallel across multiple processes This is a cleaned up and slightly improved version of @ahal's original patch. Most notably, it uses `wait` to resubmit new kinds as soon as they become available (instead of waiting for all kinds in each round to be completed). This means that if a slow kind gets submitted before all other (non-downstream) kinds have been submitted, that it won't block them. In the case of Gecko, the effect of this is that the `test` kind begins to process very quickly, and all other kinds are finished processing before that has completed. Locally, this took `./mach taskgraph tasks` from 1m26s to 1m9s (measured from command start to the final "Generated xxx tasks" message. On try the results were a bit more mixed. The minimum time I observed without this patch was 140s, while the maximum was 291s (which seems to have been caused by bugbug slowness...which I'm willing to throw out). Outside of that outlier, the maximum was 146s and the mean was 143s. The minimum time I observed with this patch was 130s, while the maximum was 144s and the mean was 138s. I presume the difference in results locally vs. Try is that locally I'm on a 64-core SSD machine, and the decision tasks run on lowered powered machines on Try, so there ends up being some resource contention (I/O, I suspect, because the ProcessPoolExecutor will only run one process per CPU core) when we process kinds in parallel there. Despite this disappointing result on Try, this may still be worth taking, as `./mach taskgraph` runs twice in the critical path of many try pushes (once on a developer's machine, and again in the decision task). raw data: Over 5 runs on try I got, without this patch: 291s, 146s, 146s, 140s, 140s In each of those, there were 241s, 92s, 94s, 90s, 90s between "Loading tasks for kind test" and "Generated xxxxxx tasks for kind test" Which means we spent the following amount of time doing non-test kind things in the critical path: 50s, 54s, 52s, 50s, 50s With this patch: 130s, 141s, and 144s, 140s, 135s In each of those, there were 105s, 114s, 115s, 114s, 109s between "Loading tasks for kind test" and "Generated xxxxxx tasks for kind test" Which means we spent the following amount of time doing non-test kind things, but it was almost entirely out of the critical path: 25s, 27s, 29s, 26s, 26s --- src/taskgraph/generator.py | 114 +++++++++++++++++++++++++++---------- test/test_generator.py | 19 +++++-- 2 files changed, 98 insertions(+), 35 deletions(-) diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index 36491bba7..d4a79f7de 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -5,6 +5,11 @@ import copy import logging import os +from concurrent.futures import ( + FIRST_COMPLETED, + ProcessPoolExecutor, + wait, +) from dataclasses import dataclass from typing import Callable, Dict, Optional, Union @@ -44,16 +49,20 @@ def _get_loader(self): loader = "taskgraph.loader.default:loader" return find_object(loader) - def load_tasks(self, parameters, loaded_tasks, write_artifacts): + def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts): + logger.debug(f"Loading tasks for kind {self.name}") + + parameters = Parameters(**parameters) loader = self._get_loader() config = copy.deepcopy(self.config) - kind_dependencies = config.get("kind-dependencies", []) - kind_dependencies_tasks = { - task.label: task for task in loaded_tasks if task.kind in kind_dependencies - } - - inputs = loader(self.name, self.path, config, parameters, loaded_tasks) + inputs = loader( + self.name, + self.path, + config, + parameters, + list(kind_dependencies_tasks.values()), + ) transforms = TransformSequence() for xform_path in config["transforms"]: @@ -87,6 +96,7 @@ def load_tasks(self, parameters, loaded_tasks, write_artifacts): ) for task_dict in transforms(trans_config, inputs) ] + logger.info(f"Generated {len(tasks)} tasks for kind {self.name}") return tasks @classmethod @@ -251,6 +261,71 @@ def _load_kinds(self, graph_config, target_kinds=None): except KindNotFound: continue + def _load_tasks(self, kinds, kind_graph, parameters): + all_tasks = {} + futures_to_kind = {} + futures = set() + edges = set(kind_graph.edges) + + def add_new_tasks(future): + for task in future.result(): + if task.label in all_tasks: + raise Exception("duplicate tasks with label " + task.label) + all_tasks[task.label] = task + + with ProcessPoolExecutor() as executor: + + def submit_ready_kinds(): + """Create the next batch of tasks for kinds without dependencies.""" + nonlocal kinds, edges, futures + loaded_tasks = all_tasks.copy() + kinds_with_deps = {edge[0] for edge in edges} + ready_kinds = ( + set(kinds) - kinds_with_deps - set(futures_to_kind.values()) + ) + for name in ready_kinds: + kind = kinds.get(name) + if not kind: + message = ( + f'Could not find the kind "{name}"\nAvailable kinds:\n' + ) + for k in sorted(kinds): + message += f' - "{k}"\n' + raise Exception(message) + + future = executor.submit( + kind.load_tasks, + dict(parameters), + { + k: t + for k, t in loaded_tasks.items() + if t.kind in kind.config.get("kind-dependencies", []) + }, + self._write_artifacts, + ) + future.add_done_callback(add_new_tasks) + futures.add(future) + futures_to_kind[future] = name + + submit_ready_kinds() + while futures: + done, _ = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + if exc := future.exception(): + executor.shutdown(wait=False, cancel_futures=True) + raise exc + kind = futures_to_kind.pop(future) + futures.remove(future) + + # Update state for next batch of futures. + del kinds[kind] + edges = {e for e in edges if e[1] != kind} + + # Submit any newly unblocked kinds + submit_ready_kinds() + + return all_tasks + def _run(self): logger.info("Loading graph configuration.") graph_config = load_graph_config(self.root_dir) @@ -305,31 +380,8 @@ def _run(self): ) logger.info("Generating full task set") - all_tasks = {} - for kind_name in kind_graph.visit_postorder(): - logger.debug(f"Loading tasks for kind {kind_name}") - - kind = kinds.get(kind_name) - if not kind: - message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n' - for k in sorted(kinds): - message += f' - "{k}"\n' - raise Exception(message) + all_tasks = self._load_tasks(kinds, kind_graph, parameters) - try: - new_tasks = kind.load_tasks( - parameters, - list(all_tasks.values()), - self._write_artifacts, - ) - except Exception: - logger.exception(f"Error loading tasks for kind {kind_name}:") - raise - for task in new_tasks: - if task.label in all_tasks: - raise Exception("duplicate tasks with label " + task.label) - all_tasks[task.label] = task - logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}") full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset())) yield self.verify("full_task_set", full_task_set, graph_config, parameters) diff --git a/test/test_generator.py b/test/test_generator.py index d37ca22ed..e01f5c642 100644 --- a/test/test_generator.py +++ b/test/test_generator.py @@ -3,16 +3,27 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. +from concurrent.futures import ProcessPoolExecutor + import pytest -from pytest_taskgraph import FakeKind, WithFakeKind, fake_load_graph_config +from pytest_taskgraph import WithFakeKind, fake_load_graph_config from taskgraph import generator, graph from taskgraph.generator import Kind, load_tasks_for_kind from taskgraph.loader.default import loader as default_loader -def test_kind_ordering(maketgg): +class FakePPE(ProcessPoolExecutor): + loaded_kinds = [] + + def submit(self, kind_load_tasks, *args): + self.loaded_kinds.append(kind_load_tasks.__self__.name) + return super().submit(kind_load_tasks, *args) + + +def test_kind_ordering(mocker, maketgg): "When task kinds depend on each other, they are loaded in postorder" + mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE) tgg = maketgg( kinds=[ ("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}), @@ -21,7 +32,7 @@ def test_kind_ordering(maketgg): ] ) tgg._run_until("full_task_set") - assert FakeKind.loaded_kinds == ["_fake1", "_fake2", "_fake3"] + assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"] def test_full_task_set(maketgg): @@ -275,5 +286,5 @@ def test_kind_load_tasks(monkeypatch, graph_config, parameters, datadir, kind_co kind = Kind( name="fake", path="foo/bar", config=kind_config, graph_config=graph_config ) - tasks = kind.load_tasks(parameters, [], False) + tasks = kind.load_tasks(parameters, {}, False) assert tasks From 315fd0416af76104807cac02d6ca879443cff065 Mon Sep 17 00:00:00 2001 From: Ben Hearsum Date: Fri, 8 Aug 2025 13:13:20 -0400 Subject: [PATCH 2/2] refactor: make fake graph config's picklable --- .../pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py index bce6896a8..a47d417db 100644 --- a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py +++ b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py @@ -62,8 +62,13 @@ def _load_kinds(self, graph_config, target_kind=None): yield FakeKind(kind_name, "/fake", config, graph_config) +class FakeGraphConfig(GraphConfig): + def register(self): + pass + + def fake_load_graph_config(root_dir): - graph_config = GraphConfig( + graph_config = FakeGraphConfig( { "trust-domain": "test-domain", "taskgraph": { @@ -103,7 +108,6 @@ def fake_load_graph_config(root_dir): }, root_dir, ) - graph_config.__dict__["register"] = lambda: None return graph_config