diff --git a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py index 32739f6e2..8318f8c91 100644 --- a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py +++ b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py @@ -62,8 +62,13 @@ def _load_kinds(self, graph_config, target_kind=None): yield FakeKind(kind_name, "/fake", config, graph_config) +class FakeGraphConfig(GraphConfig): + def register(self): + pass + + def fake_load_graph_config(root_dir): - graph_config = GraphConfig( + graph_config = FakeGraphConfig( { "trust-domain": "test-domain", "taskgraph": { @@ -103,7 +108,6 @@ def fake_load_graph_config(root_dir): }, root_dir, ) - graph_config.__dict__["register"] = lambda: None return graph_config diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index a282c2e6c..94d0813aa 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -5,6 +5,12 @@ import copy import logging import os +import platform +from concurrent.futures import ( + FIRST_COMPLETED, + ProcessPoolExecutor, + wait, +) from dataclasses import dataclass from typing import Callable, Dict, Optional, Union @@ -46,16 +52,20 @@ def _get_loader(self) -> Callable: assert callable(loader) return loader - def load_tasks(self, parameters, loaded_tasks, write_artifacts): + def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts): + logger.debug(f"Loading tasks for kind {self.name}") + + parameters = Parameters(**parameters) loader = self._get_loader() config = copy.deepcopy(self.config) - kind_dependencies = config.get("kind-dependencies", []) - kind_dependencies_tasks = { - task.label: task for task in loaded_tasks if task.kind in kind_dependencies - } - - inputs = loader(self.name, self.path, config, parameters, loaded_tasks) + inputs = loader( + self.name, + self.path, + config, + parameters, + list(kind_dependencies_tasks.values()), + ) transforms = TransformSequence() for xform_path in config["transforms"]: @@ -89,6 +99,7 @@ def load_tasks(self, parameters, loaded_tasks, write_artifacts): ) for task_dict in transforms(trans_config, inputs) ] + logger.info(f"Generated {len(tasks)} tasks for kind {self.name}") return tasks @classmethod @@ -253,6 +264,101 @@ def _load_kinds(self, graph_config, target_kinds=None): except KindNotFound: continue + def _load_tasks_serial(self, kinds, kind_graph, parameters): + all_tasks = {} + for kind_name in kind_graph.visit_postorder(): + logger.debug(f"Loading tasks for kind {kind_name}") + + kind = kinds.get(kind_name) + if not kind: + message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n' + for k in sorted(kinds): + message += f' - "{k}"\n' + raise Exception(message) + + try: + new_tasks = kind.load_tasks( + parameters, + { + k: t + for k, t in all_tasks.items() + if t.kind in kind.config.get("kind-dependencies", []) + }, + self._write_artifacts, + ) + except Exception: + logger.exception(f"Error loading tasks for kind {kind_name}:") + raise + for task in new_tasks: + if task.label in all_tasks: + raise Exception("duplicate tasks with label " + task.label) + all_tasks[task.label] = task + + return all_tasks + + def _load_tasks_parallel(self, kinds, kind_graph, parameters): + all_tasks = {} + futures_to_kind = {} + futures = set() + edges = set(kind_graph.edges) + + with ProcessPoolExecutor() as executor: + + def submit_ready_kinds(): + """Create the next batch of tasks for kinds without dependencies.""" + nonlocal kinds, edges, futures + loaded_tasks = all_tasks.copy() + kinds_with_deps = {edge[0] for edge in edges} + ready_kinds = ( + set(kinds) - kinds_with_deps - set(futures_to_kind.values()) + ) + for name in ready_kinds: + kind = kinds.get(name) + if not kind: + message = ( + f'Could not find the kind "{name}"\nAvailable kinds:\n' + ) + for k in sorted(kinds): + message += f' - "{k}"\n' + raise Exception(message) + + future = executor.submit( + kind.load_tasks, + dict(parameters), + { + k: t + for k, t in loaded_tasks.items() + if t.kind in kind.config.get("kind-dependencies", []) + }, + self._write_artifacts, + ) + futures.add(future) + futures_to_kind[future] = name + + submit_ready_kinds() + while futures: + done, _ = wait(futures, return_when=FIRST_COMPLETED) + for future in done: + if exc := future.exception(): + executor.shutdown(wait=False, cancel_futures=True) + raise exc + kind = futures_to_kind.pop(future) + futures.remove(future) + + for task in future.result(): + if task.label in all_tasks: + raise Exception("duplicate tasks with label " + task.label) + all_tasks[task.label] = task + + # Update state for next batch of futures. + del kinds[kind] + edges = {e for e in edges if e[1] != kind} + + # Submit any newly unblocked kinds + submit_ready_kinds() + + return all_tasks + def _run(self): logger.info("Loading graph configuration.") graph_config = load_graph_config(self.root_dir) @@ -307,31 +413,18 @@ def _run(self): ) logger.info("Generating full task set") - all_tasks = {} - for kind_name in kind_graph.visit_postorder(): - logger.debug(f"Loading tasks for kind {kind_name}") - - kind = kinds.get(kind_name) - if not kind: - message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n' - for k in sorted(kinds): - message += f' - "{k}"\n' - raise Exception(message) + # Current parallel generation relies on multiprocessing, and forking. + # This causes problems on Windows and macOS due to how new processes + # are created there, and how doing so reinitializes global variables + # that are modified earlier in graph generation, that doesn't get + # redone in the new processes. Ideally this would be fixed, or we + # would take another approach to parallel kind generation. In the + # meantime, it's not supported outside of Linux. + if platform.system() != "Linux": + all_tasks = self._load_tasks_serial(kinds, kind_graph, parameters) + else: + all_tasks = self._load_tasks_parallel(kinds, kind_graph, parameters) - try: - new_tasks = kind.load_tasks( - parameters, - list(all_tasks.values()), - self._write_artifacts, - ) - except Exception: - logger.exception(f"Error loading tasks for kind {kind_name}:") - raise - for task in new_tasks: - if task.label in all_tasks: - raise Exception("duplicate tasks with label " + task.label) - all_tasks[task.label] = task - logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}") full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset())) yield self.verify("full_task_set", full_task_set, graph_config, parameters) diff --git a/test/test_generator.py b/test/test_generator.py index c87e23170..701e5e02f 100644 --- a/test/test_generator.py +++ b/test/test_generator.py @@ -3,16 +3,27 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. +from concurrent.futures import ProcessPoolExecutor + import pytest -from pytest_taskgraph import FakeKind, WithFakeKind, fake_load_graph_config +from pytest_taskgraph import WithFakeKind, fake_load_graph_config from taskgraph import generator, graph from taskgraph.generator import Kind, load_tasks_for_kind, load_tasks_for_kinds from taskgraph.loader.default import loader as default_loader -def test_kind_ordering(maketgg): +class FakePPE(ProcessPoolExecutor): + loaded_kinds = [] + + def submit(self, kind_load_tasks, *args): + self.loaded_kinds.append(kind_load_tasks.__self__.name) + return super().submit(kind_load_tasks, *args) + + +def test_kind_ordering(mocker, maketgg): "When task kinds depend on each other, they are loaded in postorder" + mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE) tgg = maketgg( kinds=[ ("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}), @@ -21,7 +32,7 @@ def test_kind_ordering(maketgg): ] ) tgg._run_until("full_task_set") - assert FakeKind.loaded_kinds == ["_fake1", "_fake2", "_fake3"] + assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"] def test_full_task_set(maketgg): @@ -293,5 +304,5 @@ def test_kind_load_tasks(monkeypatch, graph_config, parameters, datadir, kind_co kind = Kind( name="fake", path="foo/bar", config=kind_config, graph_config=graph_config ) - tasks = kind.load_tasks(parameters, [], False) + tasks = kind.load_tasks(parameters, {}, False) assert tasks