diff --git a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py index a47d417db..bce6896a8 100644 --- a/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py +++ b/packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py @@ -62,13 +62,8 @@ def _load_kinds(self, graph_config, target_kind=None): yield FakeKind(kind_name, "/fake", config, graph_config) -class FakeGraphConfig(GraphConfig): - def register(self): - pass - - def fake_load_graph_config(root_dir): - graph_config = FakeGraphConfig( + graph_config = GraphConfig( { "trust-domain": "test-domain", "taskgraph": { @@ -108,6 +103,7 @@ def fake_load_graph_config(root_dir): }, root_dir, ) + graph_config.__dict__["register"] = lambda: None return graph_config diff --git a/src/taskgraph/generator.py b/src/taskgraph/generator.py index d2f7e8a65..36491bba7 100644 --- a/src/taskgraph/generator.py +++ b/src/taskgraph/generator.py @@ -5,11 +5,6 @@ import copy import logging import os -from concurrent.futures import ( - FIRST_COMPLETED, - ProcessPoolExecutor, - wait, -) from dataclasses import dataclass from typing import Callable, Dict, Optional, Union @@ -49,20 +44,16 @@ def _get_loader(self): loader = "taskgraph.loader.default:loader" return find_object(loader) - def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts): - logger.debug(f"Loading tasks for kind {self.name}") - - parameters = Parameters(**parameters) + def load_tasks(self, parameters, loaded_tasks, write_artifacts): loader = self._get_loader() config = copy.deepcopy(self.config) - inputs = loader( - self.name, - self.path, - config, - parameters, - list(kind_dependencies_tasks.values()), - ) + kind_dependencies = config.get("kind-dependencies", []) + kind_dependencies_tasks = { + task.label: task for task in loaded_tasks if task.kind in kind_dependencies + } + + inputs = loader(self.name, self.path, config, parameters, loaded_tasks) transforms = TransformSequence() for xform_path in config["transforms"]: @@ -96,7 +87,6 @@ def load_tasks(self, parameters, kind_dependencies_tasks, write_artifacts): ) for task_dict in transforms(trans_config, inputs) ] - logger.info(f"Generated {len(tasks)} tasks for kind {self.name}") return tasks @classmethod @@ -261,69 +251,6 @@ def _load_kinds(self, graph_config, target_kinds=None): except KindNotFound: continue - def _load_tasks(self, kinds, kind_graph, parameters): - all_tasks = {} - futures_to_kind = {} - futures = set() - edges = set(kind_graph.edges) - - with ProcessPoolExecutor() as executor: - - def submit_ready_kinds(): - """Create the next batch of tasks for kinds without dependencies.""" - nonlocal kinds, edges, futures - loaded_tasks = all_tasks.copy() - kinds_with_deps = {edge[0] for edge in edges} - ready_kinds = ( - set(kinds) - kinds_with_deps - set(futures_to_kind.values()) - ) - for name in ready_kinds: - kind = kinds.get(name) - if not kind: - message = ( - f'Could not find the kind "{name}"\nAvailable kinds:\n' - ) - for k in sorted(kinds): - message += f' - "{k}"\n' - raise Exception(message) - - future = executor.submit( - kind.load_tasks, - dict(parameters), - { - k: t - for k, t in loaded_tasks.items() - if t.kind in kind.config.get("kind-dependencies", []) - }, - self._write_artifacts, - ) - futures.add(future) - futures_to_kind[future] = name - - submit_ready_kinds() - while futures: - done, _ = wait(futures, return_when=FIRST_COMPLETED) - for future in done: - if exc := future.exception(): - executor.shutdown(wait=False, cancel_futures=True) - raise exc - kind = futures_to_kind.pop(future) - futures.remove(future) - - for task in future.result(): - if task.label in all_tasks: - raise Exception("duplicate tasks with label " + task.label) - all_tasks[task.label] = task - - # Update state for next batch of futures. - del kinds[kind] - edges = {e for e in edges if e[1] != kind} - - # Submit any newly unblocked kinds - submit_ready_kinds() - - return all_tasks - def _run(self): logger.info("Loading graph configuration.") graph_config = load_graph_config(self.root_dir) @@ -378,8 +305,31 @@ def _run(self): ) logger.info("Generating full task set") - all_tasks = self._load_tasks(kinds, kind_graph, parameters) + all_tasks = {} + for kind_name in kind_graph.visit_postorder(): + logger.debug(f"Loading tasks for kind {kind_name}") + + kind = kinds.get(kind_name) + if not kind: + message = f'Could not find the kind "{kind_name}"\nAvailable kinds:\n' + for k in sorted(kinds): + message += f' - "{k}"\n' + raise Exception(message) + try: + new_tasks = kind.load_tasks( + parameters, + list(all_tasks.values()), + self._write_artifacts, + ) + except Exception: + logger.exception(f"Error loading tasks for kind {kind_name}:") + raise + for task in new_tasks: + if task.label in all_tasks: + raise Exception("duplicate tasks with label " + task.label) + all_tasks[task.label] = task + logger.info(f"Generated {len(new_tasks)} tasks for kind {kind_name}") full_task_set = TaskGraph(all_tasks, Graph(frozenset(all_tasks), frozenset())) yield self.verify("full_task_set", full_task_set, graph_config, parameters) diff --git a/test/test_generator.py b/test/test_generator.py index e01f5c642..d37ca22ed 100644 --- a/test/test_generator.py +++ b/test/test_generator.py @@ -3,27 +3,16 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. -from concurrent.futures import ProcessPoolExecutor - import pytest -from pytest_taskgraph import WithFakeKind, fake_load_graph_config +from pytest_taskgraph import FakeKind, WithFakeKind, fake_load_graph_config from taskgraph import generator, graph from taskgraph.generator import Kind, load_tasks_for_kind from taskgraph.loader.default import loader as default_loader -class FakePPE(ProcessPoolExecutor): - loaded_kinds = [] - - def submit(self, kind_load_tasks, *args): - self.loaded_kinds.append(kind_load_tasks.__self__.name) - return super().submit(kind_load_tasks, *args) - - -def test_kind_ordering(mocker, maketgg): +def test_kind_ordering(maketgg): "When task kinds depend on each other, they are loaded in postorder" - mocked_ppe = mocker.patch.object(generator, "ProcessPoolExecutor", new=FakePPE) tgg = maketgg( kinds=[ ("_fake3", {"kind-dependencies": ["_fake2", "_fake1"]}), @@ -32,7 +21,7 @@ def test_kind_ordering(mocker, maketgg): ] ) tgg._run_until("full_task_set") - assert mocked_ppe.loaded_kinds == ["_fake1", "_fake2", "_fake3"] + assert FakeKind.loaded_kinds == ["_fake1", "_fake2", "_fake3"] def test_full_task_set(maketgg): @@ -286,5 +275,5 @@ def test_kind_load_tasks(monkeypatch, graph_config, parameters, datadir, kind_co kind = Kind( name="fake", path="foo/bar", config=kind_config, graph_config=graph_config ) - tasks = kind.load_tasks(parameters, {}, False) + tasks = kind.load_tasks(parameters, [], False) assert tasks