From 789f1d3d933763f8fc85bcc4b85cf04187eb83ab Mon Sep 17 00:00:00 2001 From: Ben Hearsum Date: Thu, 14 Aug 2025 13:22:19 -0400 Subject: [PATCH] revert: multiprocess kind processing While this worked very well on Linux, it causes issues anywhere we can't `fork` to get a new process (most notably on Windows). The problem lies in the fact that in these cases, we spawn an entire new process, which re-imports taskgraph from scratch. This is fine in some cases, but in any case where global state has been modified in an earlier part of `TaskGraphGenerator._run`, we lose whatever side effects happened there, and end up failing in some way. Concretely: in gecko we add a bunch of `payload_builders` as part of registering the graph config. This code doesn't re-run in the spawned processes, so the payload builders don't exist there. There are workarounds for this: for example, redoing all the earlier work of `_run` in each subprocess, or perhaps finding some way to ensure all the needed state is passed explicitly. There's no quick and easy way to make this work though, and some thought should be given to the tradeoffs of doing it (vs. doing nothing, or spending the effort on a different way to parallelize) before proceeding. --- .../src/pytest_taskgraph/fixtures/gen.py | 8 +- src/taskgraph/generator.py | 112 +++++------------- test/test_generator.py | 19 +-- 3 files changed, 37 insertions(+), 102 deletions(-) diff --git a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py index a47d417db..bce6896a8 100644 --- a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py +++ b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py @@ -62,13 +62,8 @@ def _load_kinds(self, graph_config, target_kind=None): yield FakeKind(kind_name, "/fake", config, graph_config) -class FakeGraphConfig(GraphConfig): - def register(self): - pass - - def fake_load_graph_config(root_dir): - graph_config = FakeGraphConfig( + graph_config = GraphConfig( { "trust-domain": "test-domain", "taskgraph": { @@ -108,6 +103,7 @@ def fake_load_graph_config(root_dir): }, root_dir, ) + graph_config.__dict__["register"] = lambda: None return graph_config diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index d2f7e8a65..36491bba7 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -5,11 +5,6 @@ import copy import logging import os -from concurrent.futures import ( - FIRST_COMPLETED, - ProcessPoolExecutor, - wait, -) from dataclasses import dataclass from typing import Callable, Dict, Optional, Union @@ -49,20 +44,16 @@ def _get_loader(self): loader = "taskgraph.loader.default:loader" return find_object(loader) - def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts): - logger.debug(f"Loading tasks for kind {self.name}") - - parameters = Parameters(**parameters) + def load_tasks(self, parameters, loaded_tasks, write_artifacts): loader = self._get_loader() config = copy.deepcopy(self.config) - inputs = loader( - self.name, - self.path, - config, - parameters, - list(kind_dependencies_tasks.values()), - ) + kind_dependencies = config.get("kind-dependencies", []) + kind_dependencies_tasks = { + task.label: task for task in loaded_tasks if task.kind in kind_dependencies + } + + inputs = loader(self.name, self.path, config, parameters, loaded_tasks) transforms = TransformSequence() for xform_path in config["transforms"]: @@ -96,7 +87,6 @@ def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts): ) for task_dict in transforms(trans_config, inputs) ] - logger.info(f"Generated {len(tasks)} tasks for kind {self.name}") return tasks @classmethod @@ -261,69 +251,6 @@ def _load_kinds(self, graph_config, target_kinds=None): except KindNotFound: continue - def _load_tasks(self, kinds, kind_graph, parameters): - all_tasks = {} - futures_to_kind = {} - futures = set() - edges = set(kind_graph.edges) - - with ProcessPoolExecutor() as executor: - - def submit_ready_kinds(): - """Create the next batch of tasks for kinds without dependencies.""" - nonlocal kinds, edges, futures - loaded_tasks = all_tasks.copy() - kinds_with_deps = {edge[0] for edge in edges} - ready_kinds = ( - set(kinds) - kinds_with_deps - set(futures_to_kind.values()) - ) - for name in ready_kinds: - kind = kinds.get(name) - if not kind: - message = ( - f'Could not find the kind "{name}"\nAvailable kinds:\n' - ) - for k in sorted(kinds): - message += f' - "{k}"\n' - raise Exception(message) - - future = executor.submit( - kind.load_tasks, - dict(parameters), - { - k: t - for k, t in loaded_tasks.items() - if t.kind in kind.config.get("kind-dependencies", []) - }, - self._write_artifacts, - ) - futures.add(future) - futures_to_kind[future] = name - - submit_ready_kinds() - while futures: - done, _ = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - if exc := future.exception(): - executor.shutdown(wait=False, cancel_futures=True) - raise exc - kind = futures_to_kind.pop(future) - futures.remove(future) - - for task in future.result(): - if task.label in all_tasks: - raise Exception("duplicate tasks with label " + task.label) - all_tasks[task.label] = task - - # Update state for next batch of futures. - del kinds[kind] - edges = {e for e in edges if e[1] != kind} - - # Submit any newly unblocked kinds - submit_ready_kinds() - - return all_tasks - def _run(self): logger.info("Loading graph configuration.") graph_config = load_graph_config(self.root_dir) @@ -378,8 +305,31 @@ def _run(self): ) logger.info("Generating full task set") - all_tasks = self._load_tasks(kinds, kind_graph, parameters) + all_tasks = {} + for kind_name in kind_graph.visit_postorder(): + logger.debug(f"Loading tasks for kind {kind_name}") + + kind = kinds.get(kind_name) + if not kind: + message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n' + for k in sorted(kinds): + message += f' - "{k}"\n' + raise Exception(message) + try: + new_tasks = kind.load_tasks( + parameters, + list(all_tasks.values()), + self._write_artifacts, + ) + except Exception: + logger.exception(f"Error loading tasks for kind {kind_name}:") + raise + for task in new_tasks: + if task.label in all_tasks: + raise Exception("duplicate tasks with label " + task.label) + all_tasks[task.label] = task + logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}") full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset())) yield self.verify("full_task_set", full_task_set, graph_config, parameters) diff --git a/test/test_generator.py b/test/test_generator.py index e01f5c642..d37ca22ed 100644 --- a/test/test_generator.py +++ b/test/test_generator.py @@ -3,27 +3,16 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. -from concurrent.futures import ProcessPoolExecutor - import pytest -from pytest_taskgraph import WithFakeKind, fake_load_graph_config +from pytest_taskgraph import FakeKind, WithFakeKind, fake_load_graph_config from taskgraph import generator, graph from taskgraph.generator import Kind, load_tasks_for_kind from taskgraph.loader.default import loader as default_loader -class FakePPE(ProcessPoolExecutor): - loaded_kinds = [] - - def submit(self, kind_load_tasks, *args): - self.loaded_kinds.append(kind_load_tasks.__self__.name) - return super().submit(kind_load_tasks, *args) - - -def test_kind_ordering(mocker, maketgg): +def test_kind_ordering(maketgg): "When task kinds depend on each other, they are loaded in postorder" - mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE) tgg = maketgg( kinds=[ ("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}), @@ -32,7 +21,7 @@ def test_kind_ordering(mocker, maketgg): ] ) tgg._run_until("full_task_set") - assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"] + assert FakeKind.loaded_kinds == ["_fake1", "_fake2", "_fake3"] def test_full_task_set(maketgg): @@ -286,5 +275,5 @@ def test_kind_load_tasks(monkeypatch, graph_config, parameters, datadir, kind_co kind = Kind( name="fake", path="foo/bar", config=kind_config, graph_config=graph_config ) - tasks = kind.load_tasks(parameters, {}, False) + tasks = kind.load_tasks(parameters, [], False) assert tasks