From 533700dce244a7ee572500f799a31eb935087a59 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Tue, 10 Feb 2026 13:37:59 +0000 Subject: [PATCH 01/11] refactor: make scheduler interactivity default to false Most of the time we want to use the scheduler in non-interactive mode. To reduce the verbosity of code using the scheduler, make the kwarg default to false. Signed-off-by: Alex Jones --- src/dvsim/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index dd818120..515c860f 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -80,7 +80,7 @@ def __init__( items: Sequence[JobSpec], launcher_cls: type[Launcher], *, - interactive: bool, + interactive: bool = False, ) -> None: """Initialise a job scheduler. From a6a8cc99bf4a73bdf20af87d200a8120e000b61e Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 6 Feb 2026 17:16:47 +0000 Subject: [PATCH 02/11] feat: add scheduler Job/Launcher mocks Add mock implementations of the DVSim launcher that can be used to test the scheduler implementation. This provides functionality for mocking each job (by name, despite the scheduler only being provided the class), with the ability to change the reported status, vary the reported status over a number of polls, emulate launcher errors and launcher busy errors, and emulate a job that takes some time to be killed. We can also track the number of `launch()`, `poll()` and `kill()` calls on a per-job basis. The mock launcher itself maintains a central context which can be used to control the job configuration, but also track the maximum number of concurrent jobs, as well as the order in which jobs started running and were completed. These features are useful for writing a variety of unit tests for the scheduler via the public APIs that remain opaque to the scheduler operation itself. Also define some fixtures that will be commonly used across the different scheduler tests. Signed-off-by: Alex Jones --- tests/test_scheduler.py | 197 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) create mode 100644 tests/test_scheduler.py diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py new file mode 100644 index 00000000..29151ad6 --- /dev/null +++ b/tests/test_scheduler.py @@ -0,0 +1,197 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Test the DVSim scheduler.""" + +import time +from dataclasses import dataclass +from pathlib import Path + +import pytest + +from dvsim.job.data import JobSpec, WorkspaceConfig +from dvsim.job.status import JobStatus +from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError + +__all__ = () + + +@dataclass +class MockJob: + """Mock of a single DVSim job to allow testing of scheduler behaviour. + + Attributes: + status_thresholds: Ordered list of (count, status) where the job should report + after being polled or more times. + default_status: Default status to report when polled, if not using `status_thresholds`. + launch_count: Number of times launched so far. + poll_count: Number of times polled so far. + kill_count: Number of times killed so far. + kill_time: Time that `kill()` should sleep/block for when called. + launcher_error: Any error to raise on `launch()`. + launcher_busy_error: Tuple (count, error) where should be raised for the first + launch attempts. + + """ + + status_thresholds: list[tuple[int, JobStatus]] | None = None + default_status: JobStatus = JobStatus.PASSED + launch_count: int = 0 + poll_count: int = 0 + kill_count: int = 0 + kill_time: float | None = None + launcher_error: LauncherError | None = None + launcher_busy_error: tuple[int, LauncherBusyError] | None = None + + @property + def current_status(self) -> JobStatus: + """The current status of the job, based on its status configuration & poll count.""" + if not self.status_thresholds: + return self.default_status + current_status = self.default_status + for target_count, status in self.status_thresholds: + if target_count <= self.poll_count: + current_status = status + else: + break + return current_status + + +class MockLauncherContext: + """Context for a mocked launcher to allow testing of scheduler behaviour.""" + + def __init__(self) -> None: + self._configs = {} + self._running = set() + self.max_concurrent = 0 + self.order_started = [] + self.order_completed = [] + + def update_running(self, job: JobSpec) -> None: + """Update the mock context to record that a given job is running.""" + job_name = (job.full_name, job.qual_name) + if job_name not in self._running: + self._running.add(job_name) + self.max_concurrent = max(self.max_concurrent, len(self._running)) + self.order_started.append(job) + + def update_completed(self, job: JobSpec) -> None: + """Update the mock context to record that a given job has completed (stopped running).""" + job_name = (job.full_name, job.qual_name) + if job_name in self._running: + self._running.remove(job_name) + self.order_completed.append(job) + + def set_config(self, job: JobSpec, config: MockJob) -> None: + """Configure the behaviour for mocking a specified job.""" + self._configs[(job.full_name, job.qual_name)] = config + + def get_config(self, job: JobSpec) -> MockJob | None: + """Retrieve the mock configuration/state of a specified job.""" + return self._configs.get((job.full_name, job.qual_name)) + + +class MockLauncher(Launcher): + """Mock of a launcher, used for testing scheduler behaviour.""" + + # Default to polling instantly so we don't wait additional time in tests + poll_freq = 0 + + # The launcher is currently provided to the scheduler as a type that inherits from the + # Launcher class. As a result of this design, we must store the mock context as a class + # attribute, which we directly update at the start of each test. + # + # TODO: In the future, the scheduler interface should be changed to a `Callable`, so + # that we can more easily do dependency-injection by providing the context via the + # constructor using partial arguments. + mock_context: MockLauncherContext | None = None + + @staticmethod + def prepare_workspace(cfg: WorkspaceConfig) -> None: ... + + @staticmethod + def prepare_workspace_for_cfg(cfg: WorkspaceConfig) -> None: ... + + def _do_launch(self) -> None: + """Launch the job.""" + if self.mock_context is None: + return + mock = self.mock_context.get_config(self.job_spec) + if mock is not None: + # Emulate any configured launcher errors for the job at this stage + mock.launch_count += 1 + if mock.launcher_busy_error and mock.launch_count <= mock.launcher_busy_error[0]: + raise mock.launcher_busy_error[1] + if mock.launcher_error: + raise mock.launcher_error + status = mock.current_status + if status == JobStatus.QUEUED: + return # Do not mark as running if still mocking a queued status. + self.mock_context.update_running(self.job_spec) + + def poll(self) -> JobStatus: + """Poll the launched job for completion.""" + # If there is no mock context / job config, just complete & report "PASSED". + if self.mock_context is None: + return JobStatus.PASSED + mock = self.mock_context.get_config(self.job_spec) + if mock is None: + self.mock_context.update_completed(self.job_spec) + return JobStatus.PASSED + + # Increment the poll count, and update the run state based on the reported status + mock.poll_count += 1 + status = mock.current_status + if status.ended: + self.mock_context.update_completed(self.job_spec) + elif status == JobStatus.DISPATCHED: + self.mock_context.update_running(self.job_spec) + return status + + def kill(self) -> None: + """Kill the running process.""" + if self.mock_context is not None: + # Update the kill count and perform any configured kill delay. + mock = self.mock_context.get_config(self.job_spec) + if mock is not None: + mock.kill_count += 1 + if mock.kill_time is not None: + time.sleep(mock.kill_time) + self.mock_context.update_completed(self.job_spec) + self._post_finish( + JobStatus.KILLED, + ErrorMessage(line_number=None, message="Job killed!", context=[]), + ) + + +@pytest.fixture +def mock_ctx() -> MockLauncherContext: + """Fixture for generating a unique mock launcher context per test.""" + return MockLauncherContext() + + +@pytest.fixture +def mock_launcher(mock_ctx: MockLauncherContext) -> type[MockLauncher]: + """Fixture for generating a unique mock launcher class/type per test.""" + + class TestMockLauncher(MockLauncher): + pass + + TestMockLauncher.mock_context = mock_ctx + return TestMockLauncher + + +@dataclass +class Fxt: + """Collection of fixtures used for mocking and testing the scheduler.""" + + tmp_path: Path + mock_ctx: MockLauncherContext + mock_launcher: type[MockLauncher] + + +@pytest.fixture +def fxt(tmp_path: Path, mock_ctx: MockLauncherContext, mock_launcher: type[MockLauncher]) -> Fxt: + """Fixtures used for mocking and testing the scheduler.""" + return Fxt(tmp_path, mock_ctx, mock_launcher) From 4db496bffcab35a722b0d87955d50d37cf51335c Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 6 Feb 2026 15:45:09 +0000 Subject: [PATCH 03/11] build: add `pytest-timeout` test dependency For changes to the scheduler, which is a core part of DVSim, we write tests beforehand to be sure that we are not breaking core functionality. Some of these test cases find issues where DVSim cannot handle its given inputs, but we do not want to make changes to fix DVSim at this stage. In such cases, where tests may get caught in infinite loops, it makes sense to have the ability to specify a test timeout. This can already be done as a pytest option, but we want to enable a default timeout so that anyone running the test suite without knowledge of these issues does not run into issues and can still see the "expected fail" (xfail) result, rather than the test being skipped. The `pytest-timeout` dependency lets us mark individual tests with timeouts values so that we can do this. Signed-off-by: Alex Jones --- pyproject.toml | 1 + uv.lock | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index b9593c8c..25af8097 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ test = [ "pyhamcrest>=2.1.0", "pytest>=8.3.3", "pytest-cov>=5.0.0", + "pytest-timeout>=2.4.0", ] release = [ "python-semantic-release>=10.4.1", diff --git a/uv.lock b/uv.lock index f3498c85..3aa54ab4 100644 --- a/uv.lock +++ b/uv.lock @@ -335,6 +335,7 @@ ci = [ { name = "pyright" }, { name = "pytest" }, { name = "pytest-cov" }, + { name = "pytest-timeout" }, { name = "ruff" }, ] debug = [ @@ -348,6 +349,7 @@ dev = [ { name = "pyright" }, { name = "pytest" }, { name = "pytest-cov" }, + { name = "pytest-timeout" }, { name = "ruff" }, ] linting = [ @@ -361,6 +363,7 @@ nix = [ { name = "pyright" }, { name = "pytest" }, { name = "pytest-cov" }, + { name = "pytest-timeout" }, ] release = [ { name = "python-semantic-release" }, @@ -369,6 +372,7 @@ test = [ { name = "pyhamcrest" }, { name = "pytest" }, { name = "pytest-cov" }, + { name = "pytest-timeout" }, ] typing = [ { name = "pyright" }, @@ -393,6 +397,7 @@ requires-dist = [ { name = "pyright", marker = "extra == 'typing'", specifier = ">=1.1.381" }, { name = "pytest", marker = "extra == 'test'", specifier = ">=8.3.3" }, { name = "pytest-cov", marker = "extra == 'test'", specifier = ">=5.0.0" }, + { name = "pytest-timeout", marker = "extra == 'test'", specifier = ">=2.4.0" }, { name = "python-semantic-release", marker = "extra == 'release'", specifier = ">=10.4.1" }, { name = "pyyaml", specifier = ">=6.0.2" }, { name = "ruff", marker = "extra == 'linting'", specifier = ">=0.6.7" }, @@ -1006,6 +1011,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" }, ] +[[package]] +name = "pytest-timeout" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ac/82/4c9ecabab13363e72d880f2fb504c5f750433b2b6f16e99f4ec21ada284c/pytest_timeout-2.4.0.tar.gz", hash = "sha256:7e68e90b01f9eff71332b25001f85c75495fc4e3a836701876183c4bcfd0540a", size = 17973, upload-time = "2025-05-05T19:44:34.99Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/b6/3127540ecdf1464a00e5a01ee60a1b09175f6913f0644ac748494d9c4b21/pytest_timeout-2.4.0-py3-none-any.whl", hash = "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2", size = 14382, upload-time = "2025-05-05T19:44:33.502Z" }, +] + [[package]] name = "python-gitlab" version = "6.5.0" From badfc56dc4197e7f8cad68f62bf28abf6ffac718 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 6 Feb 2026 17:20:41 +0000 Subject: [PATCH 04/11] test: add scheduler testing utilities & initial tests Add 3 initial tests for the scheduler which act as basic smoke tests to ensure that the scheduler at least appears to work on a basic level. The tests that the scheduler can handle being given no jobs, 1 job, and 5 jobs, where jobs are just some basic mock jobs. To help define these tests (and the creation of future tests), a variety of factories and helper functions / utilities are introduced for common test patterns, including the ability to define a single job spec or multiple job specifications that vary in some pre-determined way, and to create paths for tests where the scheduler logic makes use of file I/O operations and therefore expects output paths to exist. Signed-off-by: Alex Jones --- pyproject.toml | 3 + ruff-ci.toml | 3 + tests/test_scheduler.py | 217 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 222 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 25af8097..26e30f42 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -122,6 +122,9 @@ ignore = [ # main codebase for clarity. However it's common to assert against a boolean # in tests and having to use kwargs in all cases will clutter the tests. "FBT003", + # Configurable helper functions are common in test files and it is overbearing + # to restrict the number of arguments per function definition. + "PLR0913", ] [tool.pytest.ini_options] diff --git a/ruff-ci.toml b/ruff-ci.toml index dca968d7..9f59e109 100644 --- a/ruff-ci.toml +++ b/ruff-ci.toml @@ -115,4 +115,7 @@ ignore = [ # main codebase for clarity. However it's common to assert against a boolean # in tests and having to use kwargs in all cases will clutter the tests. "FBT003", + # Configurable helper functions are common in test files and it is overbearing + # to restrict the number of arguments per function definition. + "PLR0913", ] diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 29151ad6..ef0ad20e 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -5,18 +5,27 @@ """Test the DVSim scheduler.""" import time +from collections.abc import Callable, Sequence from dataclasses import dataclass from pathlib import Path +from typing import Any import pytest +from hamcrest import assert_that, empty, equal_to, only_contains -from dvsim.job.data import JobSpec, WorkspaceConfig +from dvsim.job.data import CompletedJobStatus, JobSpec, WorkspaceConfig from dvsim.job.status import JobStatus from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError +from dvsim.report.data import IPMeta, ToolMeta +from dvsim.scheduler import Scheduler __all__ = () +# Default scheduler test timeout to handle infinite loops in the scheduler +DEFAULT_TIMEOUT = 0.5 + + @dataclass class MockJob: """Mock of a single DVSim job to allow testing of scheduler behaviour. @@ -195,3 +204,209 @@ class Fxt: def fxt(tmp_path: Path, mock_ctx: MockLauncherContext, mock_launcher: type[MockLauncher]) -> Fxt: """Fixtures used for mocking and testing the scheduler.""" return Fxt(tmp_path, mock_ctx, mock_launcher) + + +def ip_meta_factory(**overrides: str | None) -> IPMeta: + """Create an IPMeta from a set of default values, for use in testing.""" + meta = { + "name": "test_ip", + "variant": None, + "commit": "test_commit", + "branch": "test_branch", + "url": "test_url", + } + meta.update(overrides) + return IPMeta(**meta) + + +def tool_meta_factory(name: str = "test_tool", version: str = "test_version") -> ToolMeta: + """Create a ToolMeta from a set of default values, for use in testing.""" + return ToolMeta(name=name, version=version) + + +def build_workspace( + tmp_path: Path, run_name: str = "test", **overrides: str | Path | None +) -> WorkspaceConfig: + """Create a WorkspaceConfig with a set of defaults and given temp paths for testing.""" + config = { + "timestamp": "test_timestamp", + "project_root": tmp_path / "root", + "scratch_root": tmp_path / "scratch", + "scratch_path": tmp_path / "scratch" / run_name, + } + config.update(overrides) + return WorkspaceConfig(**config) + + +@dataclass(frozen=True) +class JobSpecPaths: + """A bundle of paths for testing a Job / JobSpec.""" + + output: Path + log: Path + statuses: dict[JobStatus, Path] + + +def make_job_paths( + tmp_path: Path, job_name: str = "test", *, ensure_exists: bool = False +) -> JobSpecPaths: + """Generate a set of paths to use for testing a job (JobSpec).""" + root = tmp_path / job_name + output = root / "out" + log = root / "log.txt" + statuses = {} + for status in JobStatus: + if status == JobStatus.QUEUED: + continue + status_dir = output / status.name.lower() + statuses[status] = status_dir + if ensure_exists: + Path(status_dir).mkdir(exist_ok=True, parents=True) + return JobSpecPaths(output=output, log=log, statuses=statuses) + + +def job_spec_factory( + tmp_path: Path, + paths: JobSpecPaths | None = None, + **overrides: object, +) -> JobSpec: + """Create a JobSpec from a set of default values, for use in testing.""" + spec = { + "name": "test_job", + "job_type": "mock_type", + "target": "mock_target", + "seed": None, + "dependencies": [], + "needs_all_dependencies_passing": True, + "weight": 1, + "timeout_mins": None, + "cmd": "echo 'test_cmd'", + "exports": {}, + "dry_run": False, + "interactive": False, + "gui": False, + "pre_launch": lambda _: None, + "post_finish": lambda _: None, + "pass_patterns": [], + "fail_patterns": [], + } + spec.update(overrides) + + # Add job file paths if they do not exist + if paths is None: + paths = make_job_paths(tmp_path, job_name=spec["name"]) + if "odir" not in spec: + spec["odir"] = paths.output + if "log_path" not in spec: + spec["log_path"] = paths.log + if "links" not in spec: + spec["links"] = paths.statuses + + # Define the IP metadata, tool metadata and workspace if they do not exist + if "block" not in spec: + spec["block"] = ip_meta_factory() + if "tool" not in spec: + spec["tool"] = tool_meta_factory() + if "workspace_cfg" not in spec: + spec["workspace_cfg"] = build_workspace(tmp_path) + + # Use the name as the full name & qual name if not manually specified + if "full_name" not in spec: + spec["full_name"] = spec["name"] + if "qual_name" not in spec: + spec["qual_name"] = spec["name"] + return JobSpec(**spec) + + +def make_many_jobs( + tmp_path: Path, + n: int, + *, + workspace: WorkspaceConfig | None = None, + per_job: Callable[[int], dict[str, Any]] | None = None, + interdeps: dict[int, list[int]] | None = None, + ensure_paths_exist: bool = False, + vary_targets: bool = False, + reverse: bool = False, + **overrides: object, +) -> list[JobSpec]: + """Create many JobSpecs at once for scheduler test purposes. + + Arguments: + tmp_path: The path to the temp dir to use for creating files. + n: The number of jobs to create. + workspace: The workspace configuration to use by default for jobs. + per_job: Given the index of a job, this func returns specific per-job overrides. + interdeps: A directed edge-list of job dependencies (via their indexes). + ensure_paths_exist: Whether to create generated job output paths. + vary_targets: Whether to automatically generate unique targets per job. + reverse: Optionally reverse the output jobs. + overrides: Any additional kwargs to apply to *every* created job. + + """ + # Create the workspace to share between jobs if not given one. + if workspace is None: + workspace = build_workspace(tmp_path) + + # Create the job parameters + job_specs = [] + for i in range(n): + name = f"job_{i}" + job = { + "name": name, + "paths": make_job_paths(tmp_path, job_name=name, ensure_exists=ensure_paths_exist), + "target": f"target_{i}" if vary_targets else "mock_target", + "workspace_cfg": workspace, + } + # Apply global overrides + job.update(overrides) + # Fetch and apply per-job overrides + if per_job: + job.update(per_job(i)) + job_specs.append(job) + + # Create dependencies between the jobs + jobs = [] + for i, job in enumerate(job_specs): + if interdeps: + deps = job.setdefault("dependencies", []) + deps.extend(job_specs[d]["name"] for d in interdeps.get(i, [])) + jobs.append(job_spec_factory(tmp_path, **job)) + + return jobs[::-1] if reverse else jobs + + +def _assert_result_status( + result: Sequence[CompletedJobStatus], num: int, expected: JobStatus = JobStatus.PASSED +) -> None: + """Assert a common result pattern, checking the number & status of scheduler results.""" + assert_that(len(result), equal_to(num)) + statuses = [c.status for c in result] + assert_that(statuses, only_contains(expected)) + + +class TestScheduling: + """Unit tests for the scheduling decisions of the scheduler.""" + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_empty(fxt: Fxt) -> None: + """Test that the scheduler can handle being given no jobs.""" + result = Scheduler([], fxt.mock_launcher).run() + assert_that(result, empty()) + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_job_run(fxt: Fxt) -> None: + """Small smoketest that the scheduler can actually run a valid job.""" + job = job_spec_factory(fxt.tmp_path) + result = Scheduler([job], fxt.mock_launcher).run() + _assert_result_status(result, 1) + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_many_jobs_run(fxt: Fxt) -> None: + """Smoketest that the scheduler can run multiple valid jobs.""" + job_specs = make_many_jobs(fxt.tmp_path, n=5) + result = Scheduler(job_specs, fxt.mock_launcher).run() + _assert_result_status(result, 5) From 5f521b7a265fd263b11a4c83ee178cd69d875a0d Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 6 Feb 2026 17:21:25 +0000 Subject: [PATCH 05/11] ci: make test runs strict Add the `--strict` flag to the Python tests in CI. This flag makes Pytest run with strict operation, which does a few useful things for us. Importantly: * For tests that are expected to fail (due to known failures being addressed in the future), it will error if the test unexpectedly passes and is not correspondingly marked strict=False. This lets us still support flaky tests but ensures that test xfail markers are kept up-to-date with the code itself. * For any markers which pytest doesn't recognize / hasn't been informed about, it will explicitly error. * For any unknown command-line options, pytest will error. This lets us catch typos / stale configuration, and ensure that the code remains more in sync with the statuses of the tests. Signed-off-by: Alex Jones --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d3eeb3a6..f4317e2f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -104,4 +104,4 @@ jobs: - name: Test with pytest run: | - pytest + pytest --strict From c349d787203e691fe1396c5a564a088bded9d778 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Tue, 10 Feb 2026 12:48:54 +0000 Subject: [PATCH 06/11] test: add more scheduler tests This commit adds another ~7 unique scheduler tests for functionality relating to testing the parallelisation of the scheduler / launchers (parallel dispatch, and that max parallelism is respected), the polling behaviour of the launcher (the scheduler will poll for a job until done, and not beyond that), and the error handling of the scheduler (for launcher errors and duplicate jobs, and checking that errors propagate to killed jobs up the dependency tree). Signed-off-by: Alex Jones --- tests/test_scheduler.py | 123 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index ef0ad20e..4d8acabe 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -410,3 +410,126 @@ def test_many_jobs_run(fxt: Fxt) -> None: job_specs = make_many_jobs(fxt.tmp_path, n=5) result = Scheduler(job_specs, fxt.mock_launcher).run() _assert_result_status(result, 5) + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_duplicate_jobs(fxt: Fxt) -> None: + """Test that the scheduler does not double-schedule jobs with duplicate names.""" + workspace = build_workspace(fxt.tmp_path) + job_specs = make_many_jobs(fxt.tmp_path, n=3, workspace=workspace) + job_specs += make_many_jobs(fxt.tmp_path, n=6, workspace=workspace) + for _ in range(10): + job_specs.append(job_spec_factory(fxt.tmp_path, name="extra_job")) + job_specs.append(job_spec_factory(fxt.tmp_path, name="extra_job_2")) + result = Scheduler(job_specs, fxt.mock_launcher).run() + # Current behaviour expects duplicate jobs to be *silently ignored*. + # We should therefore have 3 + 3 + 2 = 8 jobs. + _assert_result_status(result, 8) + names = [c.name for c in result] + # Check names of all jobs are unique (i.e. no duplicates are returned). + assert_that(len(names), equal_to(len(set(names)))) + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("num_jobs", [2, 3, 5, 10, 20, 100]) + def test_parallel_dispatch(fxt: Fxt, num_jobs: int) -> None: + """Test that many jobs can be dispatched in parallel.""" + jobs = make_many_jobs(fxt.tmp_path, num_jobs) + scheduler = Scheduler(jobs, fxt.mock_launcher) + assert_that(fxt.mock_ctx.max_concurrent, equal_to(0)) + result = scheduler.run() + _assert_result_status(result, num_jobs) + assert_that(fxt.mock_ctx.max_concurrent, equal_to(num_jobs)) + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("num_jobs", [5, 10, 20]) + @pytest.mark.parametrize("max_parallel", [1, 5, 15, 25]) + def test_max_parallel(fxt: Fxt, num_jobs: int, max_parallel: int) -> None: + """Test that max parallel limits of launchers are used & respected.""" + jobs = make_many_jobs(fxt.tmp_path, num_jobs) + fxt.mock_launcher.max_parallel = max_parallel + scheduler = Scheduler(jobs, fxt.mock_launcher) + assert_that(fxt.mock_ctx.max_concurrent, equal_to(0)) + result = scheduler.run() + _assert_result_status(result, num_jobs) + assert_that(fxt.mock_ctx.max_concurrent, equal_to(min(num_jobs, max_parallel))) + + @staticmethod + @pytest.mark.parametrize("polls", [5, 10, 50]) + @pytest.mark.parametrize("final_status", [JobStatus.PASSED, JobStatus.FAILED, JobStatus.KILLED]) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_repeated_poll(fxt: Fxt, polls: int, final_status: JobStatus) -> None: + """Test that the scheduler will repeatedly poll for a dispatched job.""" + job = job_spec_factory(fxt.tmp_path) + fxt.mock_ctx.set_config( + job, MockJob(status_thresholds=[(0, JobStatus.DISPATCHED), (polls, final_status)]) + ) + result = Scheduler([job], fxt.mock_launcher).run() + _assert_result_status(result, 1, expected=final_status) + config = fxt.mock_ctx.get_config(job) + if config is not None: + assert_that(config.poll_count, equal_to(polls)) + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_no_over_poll(fxt: Fxt) -> None: + """Test that the schedule stops polling when it sees `PASSED`, and does not over-poll.""" + jobs = make_many_jobs(fxt.tmp_path, 10) + polls = [(i + 1) * 10 for i in range(10)] + for i in range(10): + fxt.mock_ctx.set_config( + jobs[i], + MockJob( + status_thresholds=[(0, JobStatus.DISPATCHED), (polls[i], JobStatus.PASSED)] + ), + ) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, 10) + # Check we do not unnecessarily over-poll the jobs + for i in range(10): + config = fxt.mock_ctx.get_config(jobs[i]) + if config is not None: + assert_that(config.poll_count, equal_to(polls[i])) + + @staticmethod + @pytest.mark.xfail( + reason="DVSim currently errors on this case. When DVSim dispatches and thus launches a" + " job, it is only set to running after the launch. If a launcher error occurs, it" + " immediately invokes `_kill_item` which tries to remove it from the list of running jobs" + " (where it does not exist)." + ) + def test_launcher_error(fxt: Fxt) -> None: + """Test that the launcher correctly handles an error during job launching.""" + job = job_spec_factory(fxt.tmp_path, paths=make_job_paths(fxt.tmp_path, ensure_exists=True)) + fxt.mock_ctx.set_config( + job, + MockJob( + status_thresholds=[(0, JobStatus.DISPATCHED), (10, JobStatus.PASSED)], + launcher_error=LauncherError("abc"), + ), + ) + result = Scheduler([job], fxt.mock_launcher).run() + # On a launcher error, the job has failed and should be killed. + _assert_result_status(result, 1, expected=JobStatus.KILLED) + + @staticmethod + @pytest.mark.parametrize("busy_polls", [1, 2, 5, 10]) + def test_launcher_busy_error(fxt: Fxt, busy_polls: int) -> None: + """Test that the launcher correctly handles the launcher busy case.""" + job = job_spec_factory(fxt.tmp_path) + err_mock = (busy_polls, LauncherBusyError("abc")) + fxt.mock_ctx.set_config( + job, + MockJob( + status_thresholds=[(0, JobStatus.DISPATCHED), (10, JobStatus.PASSED)], + launcher_busy_error=err_mock, + ), + ) + result = Scheduler([job], fxt.mock_launcher).run() + # We expect to have successfully launched and ran, eventually. + _assert_result_status(result, 1) + # Check that the scheduler tried to `launch()` the correct number of times. + config = fxt.mock_ctx.get_config(job) + if config is not None: + assert_that(config.launch_count, equal_to(busy_polls + 1)) From 9a7ecf7f8585b2094ef42e5eb200f4d89613e0c2 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Tue, 10 Feb 2026 12:52:17 +0000 Subject: [PATCH 07/11] test: add scheduler structural/dependency tests Adds around ~14 unique unit tests for the scheduler, specifically checking behaviours relating to the structure of the input job specification, including the dependencies between jobs and the different targets/phases/labels of the jobs. As part of this, the `needs_all_dependencies_passing` parameter on job specs is also tested, in addition to edge cases such as cyclic dependencies and failure propagation. Since a variety of these cases are currently failing in DVSim, these tests are marked as being "expected to fail" i.e. xfail, with appropriate reasoning provided for each expected failure. Signed-off-by: Alex Jones --- tests/test_scheduler.py | 307 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 306 insertions(+), 1 deletion(-) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 4d8acabe..c3f3eaba 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -11,7 +11,7 @@ from typing import Any import pytest -from hamcrest import assert_that, empty, equal_to, only_contains +from hamcrest import any_of, assert_that, empty, equal_to, has_item, only_contains from dvsim.job.data import CompletedJobStatus, JobSpec, WorkspaceConfig from dvsim.job.status import JobStatus @@ -21,6 +21,34 @@ __all__ = () +# Common reasoning for expected failures to avoid duplication across tests. +# Ideally these will be removed as incorrect behaviour is fixed. +FAIL_DEP_ON_MULTIPLE_TARGETS = """ +DVSim cannot handle dependency fan-in (i.e. depending on jobs) across multiple targets. + +Specifically, when all successors of the first target are initially enqueued, they are +removed from the `scheduled` queues. If any item in another target then also depends +on those items (i.e. across *another* target), then the completion of these items will +in turn attempt to enqueue their own successors, which cannot be found as they are no +longer present in the `scheduled` queues. +""" +FAIL_DEPS_ACROSS_MULTIPLE_TARGETS = ( + "DVSim cannot handle dependency fan-out across multiple targets." +) +FAIL_DEPS_ACROSS_NON_CONSECUTIVE_TARGETS = ( + "DVSim cannot handle dependencies that span non-consecutive (non-adjacent) targets." +) +FAIL_IF_NO_DEPS_WITHOUT_ALL_DEPS_NEEDED = """ +Current DVSim has a strange behaviour where a job with no dependencies is dispatched if it is +marked as needing all its dependencies to pass, but fails (i.e. is killed) if it is marked as +*not* needing all of its dependencies. +""" +FAIL_DEP_OUT_OF_ORDER = """ +DVSim cannot handle jobs given in an order that define dependencies and targets such that, to +resolve the jobs according to those dependencies, the targets must be processed in a different +order to the ordering of the jobs. +""" + # Default scheduler test timeout to handle infinite loops in the scheduler DEFAULT_TIMEOUT = 0.5 @@ -533,3 +561,280 @@ def test_launcher_busy_error(fxt: Fxt, busy_polls: int) -> None: config = fxt.mock_ctx.get_config(job) if config is not None: assert_that(config.launch_count, equal_to(busy_polls + 1)) + + +class TestSchedulingStructure: + """Unit tests for scheduling decisions related to the job specification structure. + + (i.e. the dependencies between jobs and the targets that jobs lie within). + """ + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize( + "needs_all_passing", + [ + True, + pytest.param( + False, + marks=pytest.mark.xfail(reason=FAIL_IF_NO_DEPS_WITHOUT_ALL_DEPS_NEEDED), + ), + ], + ) + def test_no_deps(fxt: Fxt, *, needs_all_passing: bool) -> None: + """Tests scheduling of jobs without any listed dependencies.""" + job = job_spec_factory(fxt.tmp_path, needs_all_dependencies_passing=needs_all_passing) + result = Scheduler([job], fxt.mock_launcher).run() + _assert_result_status(result, 1) + + @staticmethod + def _dep_test_case( + fxt: Fxt, + dep_list: dict[int, list[int]], + *, + all_passing: bool, + ) -> Sequence[CompletedJobStatus]: + """Run a simple dependency test, with 5 jobs where jobs 2 & 4 will fail.""" + jobs = make_many_jobs( + fxt.tmp_path, + 5, + needs_all_dependencies_passing=all_passing, + interdeps=dep_list, + ) + fxt.mock_ctx.set_config(jobs[2], MockJob(default_status=JobStatus.FAILED)) + fxt.mock_ctx.set_config(jobs[4], MockJob(default_status=JobStatus.FAILED)) + return Scheduler(jobs, fxt.mock_launcher).run() + + @staticmethod + @pytest.mark.xfail( + reason=FAIL_DEP_ON_MULTIPLE_TARGETS + " " + FAIL_IF_NO_DEPS_WITHOUT_ALL_DEPS_NEEDED + ) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize( + ("dep_list", "passes"), + [ + ({0: [1]}, [0, 1, 3]), + ({1: [2]}, [0, 3]), + ({3: [2, 4]}, [0, 1]), + ({3: [1, 2, 4]}, [0, 1, 3]), + ({0: [1, 2, 3, 4]}, [0, 1, 3]), + ], + ) + def test_needs_any_dep( + fxt: Fxt, + dep_list: dict[int, list[int]], + passes: list[int], + ) -> None: + """Tests scheduling of jobs with dependencies that don't need all passing.""" + result = TestSchedulingStructure._dep_test_case(fxt, dep_list, all_passing=False) + assert_that(len(result), equal_to(5)) + for job in passes: + assert_that(result[job].status, equal_to(JobStatus.PASSED)) + + @staticmethod + @pytest.mark.xfail(reason=FAIL_DEP_ON_MULTIPLE_TARGETS) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize( + ("dep_list", "passes"), + [ + ({0: [1]}, [0, 1, 3]), + ({1: [0, 3]}, [0, 1, 3]), + ({3: [2]}, [0, 1]), + ({0: [3, 4]}, [1, 3]), + ({3: [0, 1, 2]}, [0, 1]), + ({1: [1, 2, 3, 4]}, [0, 3]), + ], + ) + def test_needs_all_deps( + fxt: Fxt, + dep_list: dict[int, list[int]], + passes: list[int], + ) -> None: + """Tests scheduling of jobs with dependencies that need all passing.""" + result = TestSchedulingStructure._dep_test_case(fxt, dep_list, all_passing=True) + assert_that(len(result), equal_to(5)) + for job in passes: + assert_that(result[job].status, equal_to(JobStatus.PASSED)) + + @staticmethod + @pytest.mark.xfail( + reason="DVSim does not currently have logic to detect and error on" + "dependency cycles within provided job specifications." + ) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize( + ("dep_list"), + [ + {0: [1], 1: [0]}, + {0: [1], 1: [2], 2: [0]}, + {0: [1], 1: [2], 2: [3], 3: [4], 4: [0]}, + {0: [1, 2], 1: [2], 2: [3, 4, 0]}, + {0: [1, 2, 3, 4], 1: [2, 3, 4], 2: [3, 4], 3: [4], 4: [0]}, + ], + ) + def test_dep_cycle(fxt: Fxt, dep_list: dict[int, list[int]]) -> None: + """Test that the scheduler can detect and handle cycles in dependencies.""" + jobs = make_many_jobs(fxt.tmp_path, 5, interdeps=dep_list) + result = Scheduler(jobs, fxt.mock_launcher).run() + # Expect that either we get an empty result, or at least some job failed + # due to the cycle in dependencies. + assert_that(len(result), any_of(equal_to(5), equal_to(0))) + statuses = [c.status for c in result] + if statuses: + assert_that( + statuses, + any_of(has_item(JobStatus.FAILED), has_item(JobStatus.KILLED)), + ) + + @staticmethod + @pytest.mark.xfail( + reason=FAIL_DEP_ON_MULTIPLE_TARGETS + " " + FAIL_DEPS_ACROSS_MULTIPLE_TARGETS + ) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize( + ("dep_list"), + [ + {0: [1, 2, 3, 4], 1: [2, 3, 4], 2: [3, 4], 3: [4]}, + {0: [1, 2], 4: [2, 3]}, + {0: [1], 1: [2], 2: [3], 3: [4]}, + {0: [1, 2, 3, 4], 1: [2], 3: [2, 4], 4: [2]}, + ], + ) + def test_dep_resolution(fxt: Fxt, dep_list: dict[int, list[int]]) -> None: + """Test that the scheduler can correctly resolve complex job dependencies.""" + jobs = make_many_jobs(fxt.tmp_path, 5, interdeps=dep_list) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, 5) + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_deps_across_polls(fxt: Fxt) -> None: + """Test that the scheduler can resolve multiple deps that complete at different times.""" + jobs = make_many_jobs(fxt.tmp_path, 4) + # For now, define the end job separately so that we can put it in a different target + # but keep the other jobs in the same target (to circumvent FAIL_DEP_ON_MULTIPLE_TARGETS). + jobs.append( + job_spec_factory( + fxt.tmp_path, + name="end", + dependencies=[job.name for job in jobs], + target="end_target", + ) + ) + polls = [i * 5 for i in range(5)] + for i in range(1, 5): + fxt.mock_ctx.set_config( + jobs[i], + MockJob( + status_thresholds=[(0, JobStatus.DISPATCHED), (polls[i], JobStatus.PASSED)] + ), + ) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, 5) + # Sanity check that we did poll each job the correct number of times as well + for i in range(1, 5): + config = fxt.mock_ctx.get_config(jobs[i]) + if config is not None: + assert_that(config.poll_count, equal_to(polls[i])) + + @staticmethod + @pytest.mark.xfail( + reason="DVSim currently implicitly assumes that job with/in other targets" + " will be reachable (i.e. transitive) dependencies of jobs in the first target." + ) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_multiple_targets(fxt: Fxt) -> None: + """Test that the scheduler can handle jobs across many targets.""" + # Create 15 jobs across 5 targets (3 jobs per target), with no dependencies. + jobs = make_many_jobs(fxt.tmp_path, 15, per_job=lambda i: {"target": f"target_{i // 3}"}) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, 15) + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("num_deps", range(2, 6)) + def test_cross_target_deps(fxt: Fxt, num_deps: int) -> None: + """Test that the scheduler can handle dependencies across targets.""" + deps = {i: [i - 1] for i in range(1, num_deps)} + jobs = make_many_jobs(fxt.tmp_path, num_deps, interdeps=deps, vary_targets=True) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, num_deps) + + @staticmethod + @pytest.mark.xfail(reason=FAIL_DEP_ON_MULTIPLE_TARGETS) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("num_deps", range(2, 6)) + def test_dep_fan_in(fxt: Fxt, num_deps: int) -> None: + """Test that job dependencies can fan-in from multiple other jobs.""" + num_jobs = num_deps + 1 + deps = {0: list(range(1, num_jobs))} + jobs = make_many_jobs(fxt.tmp_path, num_jobs, interdeps=deps) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, num_jobs) + + @staticmethod + @pytest.mark.xfail(reason=FAIL_DEPS_ACROSS_MULTIPLE_TARGETS) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("num_deps", range(2, 6)) + def test_dep_fan_out(fxt: Fxt, num_deps: int) -> None: + """Test that job dependencies can fan-out to multiple other jobs.""" + num_jobs = num_deps + 1 + deps = {i: [num_deps] for i in range(num_deps)} + jobs = make_many_jobs(fxt.tmp_path, num_jobs, interdeps=deps, vary_targets=True) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, num_jobs) + + @staticmethod + @pytest.mark.xfail(reason=FAIL_DEPS_ACROSS_NON_CONSECUTIVE_TARGETS) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_non_consecutive_targets(fxt: Fxt) -> None: + """Test that jobs can have non-consecutive dependencies (deps in non-adjacent targets).""" + jobs = make_many_jobs(fxt.tmp_path, 4, interdeps={3: [0]}, vary_targets=True) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, 4) + + @staticmethod + @pytest.mark.xfail(reason=FAIL_DEP_OUT_OF_ORDER) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_target_out_of_order(fxt: Fxt) -> None: + """Test that the scheduler can handle targets being given out-of-dependency-order.""" + jobs = make_many_jobs(fxt.tmp_path, 4, interdeps={1: [0], 2: [3]}, vary_targets=True) + # First test jobs 0 and 1 (0 -> 1). Then test jobs 2 and 3 (2 <- 3). + for order in (jobs[:2], jobs[2:]): + result = Scheduler(order, fxt.mock_launcher).run() + _assert_result_status(result, 2) + + # TODO: it isn't clear if this is a feature that DVSim should actually support. + # If Job specifications can form any DAG where targets are essentially just vertex + # labels/groups, then it makes sense that we can support a target-/layer-annotated + # specification with "bi-directional" edges. If layers are structural and intended + # to be monotonically increasing, this test should be changed / removed. For now, + # we test as if the former is the intended behaviour. + @staticmethod + @pytest.mark.xfail(reason="DVSim cannot currently handle this case.") + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_bidirectional_deps(fxt: Fxt) -> None: + """Test that the scheduler handles bidirectional cross-target deps.""" + # job_0 (target_0) -> job_1 (target_1) -> job_2 (target_0) + targets = ["target_0", "target_1", "target_0"] + jobs = make_many_jobs( + fxt.tmp_path, 3, interdeps={0: [1], 1: [2]}, per_job=lambda i: {"target": targets[i]} + ) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, 3) + + @staticmethod + @pytest.mark.timeout(DEFAULT_TIMEOUT) + @pytest.mark.parametrize("error_status", [JobStatus.FAILED, JobStatus.KILLED]) + def test_dep_fail_propagation(fxt: Fxt, error_status: JobStatus) -> None: + """Test that failures in job dependencies propagate.""" + # Note: job order is due to working around FAIL_DEP_OUT_OF_ORDER. + deps = {i: [i - 1] for i in range(1, 5)} + jobs = make_many_jobs(fxt.tmp_path, n=5, interdeps=deps, vary_targets=True) + fxt.mock_ctx.set_config(jobs[0], MockJob(default_status=error_status)) + result = Scheduler(jobs, fxt.mock_launcher).run() + assert_that(len(result), equal_to(5)) + # The job that we configured to error should show the error status + assert_that(result[0].status, equal_to(error_status)) + # All other jobs should be "KILLED" due to failure propagation + _assert_result_status(result[1:], 4, expected=JobStatus.KILLED) From 6accc0c2e6debb5cb26aa424753bd4a35d304322 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Tue, 10 Feb 2026 12:54:23 +0000 Subject: [PATCH 08/11] test: add scheduler priority/weighting tests This commit adds 3 more unique tests related to the scheduler prioritisation of jobs/targets with different weights, including the edge case where jobs sum to zero weight. As mentioned in the TODO comment, this aspect is not covered in as much detail, as the limitations of DVSim's current operation (the structure of the job specification DAG lists that it will actually accept) make it very difficult to reason about and construct tests about this behaviour. Signed-off-by: Alex Jones --- tests/test_scheduler.py | 91 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index c3f3eaba..b1dd5933 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -838,3 +838,94 @@ def test_dep_fail_propagation(fxt: Fxt, error_status: JobStatus) -> None: assert_that(result[0].status, equal_to(error_status)) # All other jobs should be "KILLED" due to failure propagation _assert_result_status(result[1:], 4, expected=JobStatus.KILLED) + + +class TestSchedulingPriority: + """Unit tests for scheduler decisions related to job/target weighting/priority.""" + + @staticmethod + @pytest.mark.xfail( + reason=FAIL_DEPS_ACROSS_MULTIPLE_TARGETS + " " + FAIL_DEPS_ACROSS_NON_CONSECUTIVE_TARGETS + ) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_job_priority(fxt: Fxt) -> None: + """Test that jobs across targets are prioritised according to their weight.""" + start_job = job_spec_factory(fxt.tmp_path, name="start") + weighted_jobs = make_many_jobs( + fxt.tmp_path, + n=6, + per_job=lambda n: {"weight": n + 1}, + dependencies=["start"], + vary_targets=True, + ) + jobs = [start_job, *weighted_jobs] + by_weight_dec = [ + j.name for j in sorted(weighted_jobs, key=lambda job: job.weight, reverse=True) + ] + # Set max parallel = 1 so that order dispatched becomes the priority order + # With max parallel > 1, jobs of many priorities are dispatched "at once". + fxt.mock_launcher.max_parallel = 1 + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, 6) + assert_that(fxt.mock_ctx.order_started, equal_to([start_job, *by_weight_dec])) + + @staticmethod + @pytest.mark.xfail(reason="DVSim does not handle zero weights.") + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_zero_weight(fxt: Fxt) -> None: + """Test that the scheduler can handle the case where jobs have a total weight of zero.""" + jobs = make_many_jobs(fxt.tmp_path, 5, weight=0) + result = Scheduler(jobs, fxt.mock_launcher).run() + # TODO: not clear if this should evenly distribute and succeed, or error. + _assert_result_status(result, 5) + + @staticmethod + @pytest.mark.xfail( + reason=FAIL_DEPS_ACROSS_MULTIPLE_TARGETS + " " + FAIL_DEPS_ACROSS_NON_CONSECUTIVE_TARGETS + ) + @pytest.mark.timeout(DEFAULT_TIMEOUT) + def test_blocked_weight_starvation(fxt: Fxt) -> None: + """Test that high weight jobs without fulfilled deps do not block lower weight jobs.""" + # All jobs spawn from a start job. + # There is one chain "start -> long_blocker -> high" where we have a high weight job + # blocked by some blocker that takes a long time. + # There are then 5 other jobs that depend on "start -> short_blocker -> low", which + # are low weight jobs blocked by some blocker that takes a short time. + start_job = job_spec_factory(fxt.tmp_path, name="start") + short_blocker = job_spec_factory(fxt.tmp_path, name="short", dependencies=["start"]) + long_blocker = job_spec_factory(fxt.tmp_path, name="long", dependencies=["start"]) + high = job_spec_factory(fxt.tmp_path, name="high", dependencies=["long"], weight=1000000) + jobs = [start_job, short_blocker, long_blocker, high] + jobs += make_many_jobs( + fxt.tmp_path, + n=5, + weight=1, + dependencies=["short"], + vary_targets=True, + ) + # The blockers should take a bit of time, to let the non-blocked jobs progress + fxt.mock_ctx.set_config( + short_blocker, + MockJob(status_thresholds=[(0, JobStatus.DISPATCHED), (1, JobStatus.PASSED)]), + ) + fxt.mock_ctx.set_config( + long_blocker, + MockJob(status_thresholds=[(0, JobStatus.DISPATCHED), (5, JobStatus.PASSED)]), + ) + result = Scheduler(jobs, fxt.mock_launcher).run() + _assert_result_status(result, 8) + # We expect that the high weight job should have been scheduled last, since + # it was blocked by the blocker (unlike all the other lower weight jobs) + assert_that(fxt.mock_ctx.order_started[0], equal_to(start_job)) + assert_that(fxt.mock_ctx.order_started[-1], equal_to(high)) + + # TODO: we do not currently test the logic to schedule multiple queued jobs per target + # across different targets based on the weights of those jobs/targets, because this + # will require the test to be quite complex and specific to the intricacies of the + # current DVSim scheduler due to the current implementation. Due to only one successor + # in another target being discovered at once, we must carefully construct a dependency + # tree of jobs with specially modelled delays which relies on this implementation + # detail. Instead, for now at least, we leave this untested. + # + # Note also that DVSim currently assumes weights within a target are constant, + # which may not be the case with the current JobSpec model. From 09e7267221ec686e4720b451b47496461688b874 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Tue, 10 Feb 2026 15:07:48 +0000 Subject: [PATCH 09/11] build: add `pytest-repeat` and `pytest-xdist` test development deps These packages are added as optional `test` dependencies to aid in the development of tests. `pytest-repeat` is a plugin for pytest to make it easily to repeat tests a number of times. This can be useful for checking for test flakiness and non-idempotency. `pytest-xdist` is a plugin for pytest to allow you to run tests in parallel, distributing tests across multiple CPUs to speed up execution. The combination of these two plugins lets us run many iterations of tests in parallel to quickly catch potential issues with flaky behaviour in tests. Consider running e.g. pytest -n auto --count=10 --timeout=5 Signed-off-by: Alex Jones --- pyproject.toml | 2 ++ uv.lock | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 26e30f42..5b220fa6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,8 @@ test = [ "pytest>=8.3.3", "pytest-cov>=5.0.0", "pytest-timeout>=2.4.0", + "pytest-repeat>=0.9.4", + "pytest-xdist>=3.8.0", ] release = [ "python-semantic-release>=10.4.1", diff --git a/uv.lock b/uv.lock index 3aa54ab4..3fe52824 100644 --- a/uv.lock +++ b/uv.lock @@ -335,7 +335,9 @@ ci = [ { name = "pyright" }, { name = "pytest" }, { name = "pytest-cov" }, + { name = "pytest-repeat" }, { name = "pytest-timeout" }, + { name = "pytest-xdist" }, { name = "ruff" }, ] debug = [ @@ -349,7 +351,9 @@ dev = [ { name = "pyright" }, { name = "pytest" }, { name = "pytest-cov" }, + { name = "pytest-repeat" }, { name = "pytest-timeout" }, + { name = "pytest-xdist" }, { name = "ruff" }, ] linting = [ @@ -363,7 +367,9 @@ nix = [ { name = "pyright" }, { name = "pytest" }, { name = "pytest-cov" }, + { name = "pytest-repeat" }, { name = "pytest-timeout" }, + { name = "pytest-xdist" }, ] release = [ { name = "python-semantic-release" }, @@ -372,7 +378,9 @@ test = [ { name = "pyhamcrest" }, { name = "pytest" }, { name = "pytest-cov" }, + { name = "pytest-repeat" }, { name = "pytest-timeout" }, + { name = "pytest-xdist" }, ] typing = [ { name = "pyright" }, @@ -397,7 +405,9 @@ requires-dist = [ { name = "pyright", marker = "extra == 'typing'", specifier = ">=1.1.381" }, { name = "pytest", marker = "extra == 'test'", specifier = ">=8.3.3" }, { name = "pytest-cov", marker = "extra == 'test'", specifier = ">=5.0.0" }, + { name = "pytest-repeat", marker = "extra == 'test'", specifier = ">=0.9.4" }, { name = "pytest-timeout", marker = "extra == 'test'", specifier = ">=2.4.0" }, + { name = "pytest-xdist", marker = "extra == 'test'", specifier = ">=3.8.0" }, { name = "python-semantic-release", marker = "extra == 'release'", specifier = ">=10.4.1" }, { name = "pyyaml", specifier = ">=6.0.2" }, { name = "ruff", marker = "extra == 'linting'", specifier = ">=0.6.7" }, @@ -431,6 +441,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/36/f4/c6e662dade71f56cd2f3735141b265c3c79293c109549c1e6933b0651ffc/exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10", size = 16674, upload-time = "2025-05-10T17:42:49.33Z" }, ] +[[package]] +name = "execnet" +version = "2.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/bf/89/780e11f9588d9e7128a3f87788354c7946a9cbb1401ad38a48c4db9a4f07/execnet-2.1.2.tar.gz", hash = "sha256:63d83bfdd9a23e35b9c6a3261412324f964c2ec8dcd8d3c6916ee9373e0befcd", size = 166622, upload-time = "2025-11-12T09:56:37.75Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ab/84/02fc1827e8cdded4aa65baef11296a9bbe595c474f0d6d758af082d849fd/execnet-2.1.2-py3-none-any.whl", hash = "sha256:67fba928dd5a544b783f6056f449e5e3931a5c378b128bc18501f7ea79e296ec", size = 40708, upload-time = "2025-11-12T09:56:36.333Z" }, +] + [[package]] name = "executing" version = "2.2.1" @@ -1011,6 +1030,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" }, ] +[[package]] +name = "pytest-repeat" +version = "0.9.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/80/d4/69e9dbb9b8266df0b157c72be32083403c412990af15c7c15f7a3fd1b142/pytest_repeat-0.9.4.tar.gz", hash = "sha256:d92ac14dfaa6ffcfe6917e5d16f0c9bc82380c135b03c2a5f412d2637f224485", size = 6488, upload-time = "2025-04-07T14:59:53.077Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/73/d4/8b706b81b07b43081bd68a2c0359fe895b74bf664b20aca8005d2bb3be71/pytest_repeat-0.9.4-py3-none-any.whl", hash = "sha256:c1738b4e412a6f3b3b9e0b8b29fcd7a423e50f87381ad9307ef6f5a8601139f3", size = 4180, upload-time = "2025-04-07T14:59:51.492Z" }, +] + [[package]] name = "pytest-timeout" version = "2.4.0" @@ -1023,6 +1054,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fa/b6/3127540ecdf1464a00e5a01ee60a1b09175f6913f0644ac748494d9c4b21/pytest_timeout-2.4.0-py3-none-any.whl", hash = "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2", size = 14382, upload-time = "2025-05-05T19:44:33.502Z" }, ] +[[package]] +name = "pytest-xdist" +version = "3.8.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "execnet" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/78/b4/439b179d1ff526791eb921115fca8e44e596a13efeda518b9d845a619450/pytest_xdist-3.8.0.tar.gz", hash = "sha256:7e578125ec9bc6050861aa93f2d59f1d8d085595d6551c2c90b6f4fad8d3a9f1", size = 88069, upload-time = "2025-07-01T13:30:59.346Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ca/31/d4e37e9e550c2b92a9cbc2e4d0b7420a27224968580b5a447f420847c975/pytest_xdist-3.8.0-py3-none-any.whl", hash = "sha256:202ca578cfeb7370784a8c33d6d05bc6e13b4f25b5053c30a152269fd10f0b88", size = 46396, upload-time = "2025-07-01T13:30:56.632Z" }, +] + [[package]] name = "python-gitlab" version = "6.5.0" From 159598353eb5cc3be981dea3ec4c7ff38cfad75c Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 6 Feb 2026 17:26:12 +0000 Subject: [PATCH 10/11] test: add scheduler signal handler tests Add tests for the Scheduler's signal handler used to allow you to quit DVSim while the scheduler is running. We test a few different things using different parameters: * Test that sending either SIGTERM/SIGINT causes DVSim to gracefully exit, killing ongoing (or to-be-dispatched) processes. * Test that sending SIGINT twice will call the original installed SIGINT handler, i.e. it will cause DVSim to instantly die instead. * Test the above cases with both short and long polls, where long polls mean that the scheduler will enter a sleep/wait for ~100 hours between polls. As long as this does not timeout, this tests that the scheduler is correctly woken from its sleep/wait by the signal and does not have to wait for a `poll_freq` duration to handle the signal. Note the current marked expected failure - from local testing during development, the use of a `threading.Event` (and perhaps logging) in the signal handler is not async-signal-safe and therefore, especially when configured will a poll frequency of 0 (i.e. poll as fast as possible), we sometimes see tests enter deadlocks and thus fail a small percentage of the time. Signed-off-by: Alex Jones --- tests/test_scheduler.py | 106 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index b1dd5933..df7a9616 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -4,10 +4,16 @@ """Test the DVSim scheduler.""" +import multiprocessing +import os +import sys +import threading import time from collections.abc import Callable, Sequence from dataclasses import dataclass from pathlib import Path +from signal import SIGINT, SIGTERM, signal +from types import FrameType from typing import Any import pytest @@ -52,6 +58,7 @@ # Default scheduler test timeout to handle infinite loops in the scheduler DEFAULT_TIMEOUT = 0.5 +SIGNAL_TEST_TIMEOUT = 2.5 @dataclass @@ -929,3 +936,102 @@ def test_blocked_weight_starvation(fxt: Fxt) -> None: # # Note also that DVSim currently assumes weights within a target are constant, # which may not be the case with the current JobSpec model. + + +class TestSignals: + """Integration tests for the signal-handling of the scheduler.""" + + @staticmethod + def _run_signal_test(tmp_path: Path, sig: int, *, repeat: bool, long_poll: bool) -> None: + """Test that the scheduler can be gracefully killed by incoming signals.""" + + # We cannot access the fixtures from the separate process, so define a minimal + # mock launcher class here. + class SignalTestMockLauncher(MockLauncher): + pass + + mock_ctx = MockLauncherContext() + SignalTestMockLauncher.mock_context = mock_ctx + SignalTestMockLauncher.max_parallel = 2 + if long_poll: + # Set a very long poll frequency to be sure that the signal interrupts the + # scheduler from a sleep if configured with infrequent polls. + SignalTestMockLauncher.poll_freq = 360000 + + jobs = make_many_jobs(tmp_path, 3, ensure_paths_exist=True) + # When testing non-graceful exits, we make `kill()` hang and send two signals. + kill_time = None if not repeat else 100.0 + # Job 0 is permanently "dispatched", it never completes. + mock_ctx.set_config( + jobs[0], MockJob(default_status=JobStatus.DISPATCHED, kill_time=kill_time) + ) + # Job 1 will pass, but after a long time (a large number of polls). + mock_ctx.set_config( + jobs[1], + MockJob( + status_thresholds=[(0, JobStatus.DISPATCHED), (1000000000, JobStatus.PASSED)], + kill_time=kill_time, + ), + ) + # Job 2 is also permanently "dispatched", but will never run due to the + # max paralellism limit on the launcher. It will instead be cancelled. + mock_ctx.set_config( + jobs[2], MockJob(default_status=JobStatus.DISPATCHED, kill_time=kill_time) + ) + scheduler = Scheduler(jobs, SignalTestMockLauncher) + + def _get_signal(sig_received: int, _: FrameType | None) -> None: + assert_that(sig_received, equal_to(sig)) + assert_that(repeat) + sys.exit(0) + + if repeat: + # Sending multiple signals will call the regular signal handler + # which will kill the process. Register a mock handler to stop + # that happening and we can check that we "killed the process". + signal(sig, _get_signal) + + def _send_signals() -> None: + # Give time for the handler to be installed and jobs to dispatch + # and for the main loop to enter a sleep/wait. + wait_time = 0.1 + time.sleep(wait_time) + pid = os.getpid() + os.kill(pid, sig) + if repeat: + time.sleep(wait_time) + os.kill(pid, sig) + + # Send signals from a separate thread + threading.Thread(target=_send_signals).start() + result = scheduler.run() + + # If we didn't reach `_get_signal`, this should be a graceful exit + assert_that(not repeat) + _assert_result_status(result, 3, expected=JobStatus.KILLED) + + @staticmethod + @pytest.mark.xfail( + reason="This test passes ~95 percent of the time, but the logging & threading primitive" + "logic used in the signal handler are not async-signal-safe and thus may deadlock," + "causing the process to hang and time out instead.", + strict=False, + ) + @pytest.mark.parametrize("long_poll", [False, True]) + @pytest.mark.parametrize(("sig", "repeat"), [(SIGTERM, False), (SIGINT, False), (SIGINT, True)]) + def test_signal_kill(tmp_path: Path, *, sig: int, repeat: bool, long_poll: bool) -> None: + """Test that the scheduler can be gracefully killed by incoming signals.""" + # We must test in a separate process, otherwise pytest interprets the SIGINT and SIGTERM + # signals using its own signal handlers as signals to quit pytest itself... + proc = multiprocessing.Process( + target=TestSignals._run_signal_test, + args=(tmp_path, sig), + kwargs={"repeat": repeat, "long_poll": long_poll}, + ) + proc.start() + proc.join(timeout=SIGNAL_TEST_TIMEOUT) + if proc.is_alive(): + proc.kill() # SIGKILL instead of SIGINT or SIGTERM + proc.join() + pytest.fail("Scheduler hung and was terminated") + assert_that(proc.exitcode, equal_to(0)) From 0a1628688079f27df47bec6f78db8acfb3fd5565 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 6 Feb 2026 17:25:50 +0000 Subject: [PATCH 11/11] test: enable parallel pytest coverage The signal handler tests do not count towards coverage because they are tested in a separate process (to avoid signals being intercepted by `pytest` instead). We can still capture this coverage however by configuring pytest-coverage to know that we may be executing tests with multiple processes by using `multiprocessing`. Signed-off-by: Alex Jones --- pyproject.toml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 5b220fa6..261d52ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -133,6 +133,13 @@ ignore = [ addopts = "--cov=dvsim --cov-report term-missing" norecursedirs = ["*.egg", ".*", "_darcs", "build", "dist", "venv", "scratch", "doc"] +[tool.coverage.run] +parallel = true +concurrency = ["multiprocessing"] + +[tool.coverage.report] +show_missing = true + [tool.semantic_release] commit_parser = "conventional" version_toml = ["pyproject.toml:project.version"]