From 7a8de312157566e9946ed0c20908d77552b2881b Mon Sep 17 00:00:00 2001 From: Ben Hearsum Date: Fri, 8 Aug 2025 13:13:20 -0400 Subject: [PATCH 1/2] refactor: make fake graph config's picklable --- .../pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py index 32739f6e2..8318f8c91 100644 --- a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py +++ b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py @@ -62,8 +62,13 @@ def _load_kinds(self, graph_config, target_kind=None): yield FakeKind(kind_name, "/fake", config, graph_config) +class FakeGraphConfig(GraphConfig): + def register(self): + pass + + def fake_load_graph_config(root_dir): - graph_config = GraphConfig( + graph_config = FakeGraphConfig( { "trust-domain": "test-domain", "taskgraph": { @@ -103,7 +108,6 @@ def fake_load_graph_config(root_dir): }, root_dir, ) - graph_config.__dict__["register"] = lambda: None return graph_config From 9e42c6abaa807cef2248cf8a2db696a90fec9d7f Mon Sep 17 00:00:00 2001 From: Andrew Halberstadt Date: Fri, 4 Jul 2025 10:02:29 -0400 Subject: [PATCH 2/2] feat: generate kinds in parallel across multiple processes This is a cleaned up and slightly improved version of @ahal's original patch. Most notably, it uses `wait` to resubmit new kinds as soon as they become available (instead of waiting for all kinds in each round to be completed). This means that if a slow kind gets submitted before all other (non-downstream) kinds have been submitted, that it won't block them. In the case of Gecko, the effect of this is that the `test` kind begins to process very quickly, and all other kinds are finished processing before that has completed. Locally, this took `./mach taskgraph tasks` from 1m26s to 1m9s (measured from command start to the final "Generated xxx tasks" message. On try the results were a bit more mixed. The minimum time I observed without this patch was 140s, while the maximum was 291s (which seems to have been caused by bugbug slowness...which I'm willing to throw out). Outside of that outlier, the maximum was 146s and the mean was 143s. The minimum time I observed with this patch was 130s, while the maximum was 144s and the mean was 138s. I presume the difference in results locally vs. Try is that locally I'm on a 64-core SSD machine, and the decision tasks run on lowered powered machines on Try, so there ends up being some resource contention (I/O, I suspect, because the ProcessPoolExecutor will only run one process per CPU core) when we process kinds in parallel there. Despite this disappointing result on Try, this may still be worth taking, as `./mach taskgraph` runs twice in the critical path of many try pushes (once on a developer's machine, and again in the decision task). raw data: Over 5 runs on try I got, without this patch: 291s, 146s, 146s, 140s, 140s In each of those, there were 241s, 92s, 94s, 90s, 90s between "Loading tasks for kind test" and "Generated xxxxxx tasks for kind test" Which means we spent the following amount of time doing non-test kind things in the critical path: 50s, 54s, 52s, 50s, 50s With this patch: 130s, 141s, and 144s, 140s, 135s In each of those, there were 105s, 114s, 115s, 114s, 109s between "Loading tasks for kind test" and "Generated xxxxxx tasks for kind test" Which means we spent the following amount of time doing non-test kind things, but it was almost entirely out of the critical path: 25s, 27s, 29s, 26s, 26s --- src/taskgraph/generator.py | 155 +++++++++++++++++++++++++++++-------- test/test_generator.py | 19 ++++- 2 files changed, 139 insertions(+), 35 deletions(-) diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index a282c2e6c..94d0813aa 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -5,6 +5,12 @@ import copy import logging import os +import platform +from concurrent.futures import ( + FIRST_COMPLETED, + ProcessPoolExecutor, + wait, +) from dataclasses import dataclass from typing import Callable, Dict, Optional, Union @@ -46,16 +52,20 @@ def _get_loader(self) -> Callable: assert callable(loader) return loader - def load_tasks(self, parameters, loaded_tasks, write_artifacts): + def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts): + logger.debug(f"Loading tasks for kind {self.name}") + + parameters = Parameters(**parameters) loader = self._get_loader() config = copy.deepcopy(self.config) - kind_dependencies = config.get("kind-dependencies", []) - kind_dependencies_tasks = { - task.label: task for task in loaded_tasks if task.kind in kind_dependencies - } - - inputs = loader(self.name, self.path, config, parameters, loaded_tasks) + inputs = loader( + self.name, + self.path, + config, + parameters, + list(kind_dependencies_tasks.values()), + ) transforms = TransformSequence() for xform_path in config["transforms"]: @@ -89,6 +99,7 @@ def load_tasks(self, parameters, loaded_tasks, write_artifacts): ) for task_dict in transforms(trans_config, inputs) ] + logger.info(f"Generated {len(tasks)} tasks for kind {self.name}") return tasks @classmethod @@ -253,6 +264,101 @@ def _load_kinds(self, graph_config, target_kinds=None): except KindNotFound: continue + def _load_tasks_serial(self, kinds, kind_graph, parameters): + all_tasks = {} + for kind_name in kind_graph.visit_postorder(): + logger.debug(f"Loading tasks for kind {kind_name}") + + kind = kinds.get(kind_name) + if not kind: + message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n' + for k in sorted(kinds): + message += f' - "{k}"\n' + raise Exception(message) + + try: + new_tasks = kind.load_tasks( + parameters, + { + k: t + for k, t in all_tasks.items() + if t.kind in kind.config.get("kind-dependencies", []) + }, + self._write_artifacts, + ) + except Exception: + logger.exception(f"Error loading tasks for kind {kind_name}:") + raise + for task in new_tasks: + if task.label in all_tasks: + raise Exception("duplicate tasks with label " + task.label) + all_tasks[task.label] = task + + return all_tasks + + def _load_tasks_parallel(self, kinds, kind_graph, parameters): + all_tasks = {} + futures_to_kind = {} + futures = set() + edges = set(kind_graph.edges) + + with ProcessPoolExecutor() as executor: + + def submit_ready_kinds(): + """Create the next batch of tasks for kinds without dependencies.""" + nonlocal kinds, edges, futures + loaded_tasks = all_tasks.copy() + kinds_with_deps = {edge[0] for edge in edges} + ready_kinds = ( + set(kinds) - kinds_with_deps - set(futures_to_kind.values()) + ) + for name in ready_kinds: + kind = kinds.get(name) + if not kind: + message = ( + f'Could not find the kind "{name}"\nAvailable kinds:\n' + ) + for k in sorted(kinds): + message += f' - "{k}"\n' + raise Exception(message) + + future = executor.submit( + kind.load_tasks, + dict(parameters), + { + k: t + for k, t in loaded_tasks.items() + if t.kind in kind.config.get("kind-dependencies", []) + }, + self._write_artifacts, + ) + futures.add(future) + futures_to_kind[future] = name + + submit_ready_kinds() + while futures: + done, _ = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + if exc := future.exception(): + executor.shutdown(wait=False, cancel_futures=True) + raise exc + kind = futures_to_kind.pop(future) + futures.remove(future) + + for task in future.result(): + if task.label in all_tasks: + raise Exception("duplicate tasks with label " + task.label) + all_tasks[task.label] = task + + # Update state for next batch of futures. + del kinds[kind] + edges = {e for e in edges if e[1] != kind} + + # Submit any newly unblocked kinds + submit_ready_kinds() + + return all_tasks + def _run(self): logger.info("Loading graph configuration.") graph_config = load_graph_config(self.root_dir) @@ -307,31 +413,18 @@ def _run(self): ) logger.info("Generating full task set") - all_tasks = {} - for kind_name in kind_graph.visit_postorder(): - logger.debug(f"Loading tasks for kind {kind_name}") - - kind = kinds.get(kind_name) - if not kind: - message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n' - for k in sorted(kinds): - message += f' - "{k}"\n' - raise Exception(message) + # Current parallel generation relies on multiprocessing, and forking. + # This causes problems on Windows and macOS due to how new processes + # are created there, and how doing so reinitializes global variables + # that are modified earlier in graph generation, that doesn't get + # redone in the new processes. Ideally this would be fixed, or we + # would take another approach to parallel kind generation. In the + # meantime, it's not supported outside of Linux. + if platform.system() != "Linux": + all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters) + else: + all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters) - try: - new_tasks = kind.load_tasks( - parameters, - list(all_tasks.values()), - self._write_artifacts, - ) - except Exception: - logger.exception(f"Error loading tasks for kind {kind_name}:") - raise - for task in new_tasks: - if task.label in all_tasks: - raise Exception("duplicate tasks with label " + task.label) - all_tasks[task.label] = task - logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}") full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset())) yield self.verify("full_task_set", full_task_set, graph_config, parameters) diff --git a/test/test_generator.py b/test/test_generator.py index c87e23170..701e5e02f 100644 --- a/test/test_generator.py +++ b/test/test_generator.py @@ -3,16 +3,27 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. +from concurrent.futures import ProcessPoolExecutor + import pytest -from pytest_taskgraph import FakeKind, WithFakeKind, fake_load_graph_config +from pytest_taskgraph import WithFakeKind, fake_load_graph_config from taskgraph import generator, graph from taskgraph.generator import Kind, load_tasks_for_kind, load_tasks_for_kinds from taskgraph.loader.default import loader as default_loader -def test_kind_ordering(maketgg): +class FakePPE(ProcessPoolExecutor): + loaded_kinds = [] + + def submit(self, kind_load_tasks, *args): + self.loaded_kinds.append(kind_load_tasks.__self__.name) + return super().submit(kind_load_tasks, *args) + + +def test_kind_ordering(mocker, maketgg): "When task kinds depend on each other, they are loaded in postorder" + mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE) tgg = maketgg( kinds=[ ("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}), @@ -21,7 +32,7 @@ def test_kind_ordering(maketgg): ] ) tgg._run_until("full_task_set") - assert FakeKind.loaded_kinds == ["_fake1", "_fake2", "_fake3"] + assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"] def test_full_task_set(maketgg): @@ -293,5 +304,5 @@ def test_kind_load_tasks(monkeypatch, graph_config, parameters, datadir, kind_co kind = Kind( name="fake", path="foo/bar", config=kind_config, graph_config=graph_config ) - tasks = kind.load_tasks(parameters, [], False) + tasks = kind.load_tasks(parameters, {}, False) assert tasks