diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index ff42cae1..3dc3a53f 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -35,9 +35,6 @@ from dvsim.instrumentation import InstrumentationFactory, set_instrumentation from dvsim.job.deploy import RunTest from dvsim.launcher.base import Launcher -from dvsim.launcher.factory import set_launcher_type -from dvsim.launcher.fake import FakeLauncher -from dvsim.launcher.local import LocalLauncher from dvsim.launcher.lsf import LsfLauncher from dvsim.launcher.nc import NcLauncher from dvsim.launcher.sge import SgeLauncher @@ -45,10 +42,8 @@ from dvsim.logging import LOG_LEVELS, configure_logging, log from dvsim.runtime.backend import RuntimeBackend from dvsim.runtime.registry import BackendType, backend_registry -from dvsim.scheduler.async_status_printer import StatusPrinter -from dvsim.scheduler.async_status_printer import get_status_printer as get_async_status_printer -from dvsim.scheduler.status_printer import get_status_printer -from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout +from dvsim.scheduler.status_printer import StatusPrinter, get_status_printer +from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, rm_path, run_cmd_with_timeout # The different categories that can be passed to the --list argument. _LIST_CATEGORIES = ["build_modes", "run_modes", "tests", "regressions"] @@ -785,10 +780,11 @@ def parse_args(argv: list[str] | None = None): dvg.add_argument( "--print-interval", "-pi", - type=int, + type=float, default=10, metavar="N", - help="Print status every N seconds.", + help="Print status every N seconds (default %(default)d). A zero value means that every" + " job status change will cause a print.", ) dvg.add_argument( @@ -817,17 +813,8 @@ def parse_args(argv: list[str] | None = None): help=("Use a fake launcher that generates random results"), ) - dvg.add_argument( - "--experimental-enable-async-scheduler", - action="store_true", - help="Enable experimental use of the async scheduler (not fully integrated).", - ) - args = parser.parse_args(argv) if argv else parser.parse_args() - if args.experimental_enable_async_scheduler: - log.warning("DVSim configured to use new experimental async scheduler.") - # Check conflicts # interactive and remote, r if args.interactive and args.remote: @@ -926,19 +913,15 @@ def main(argv: list[str] | None = None) -> None: RunTest.fixed_seed = args.fixed_seed # Register the common deploy settings. - Timer.print_interval = args.print_interval StatusPrinter.print_interval = args.print_interval - LocalLauncher.max_parallel = args.max_parallel SlurmLauncher.max_parallel = args.max_parallel SgeLauncher.max_parallel = args.max_parallel LsfLauncher.max_parallel = args.max_parallel NcLauncher.max_parallel = args.max_parallel Launcher.max_odirs = args.max_odirs RuntimeBackend.max_output_dirs = args.max_odirs - FakeLauncher.max_parallel = args.max_parallel - set_launcher_type(is_local=args.local, fake=args.fake) - # Configure the runtime backend. TODO: deprecate `set_launcher_type` above. + # Configure the runtime backend. set_backend_type(is_local=args.local, fake=args.fake) # Configure scheduler instrumentation @@ -983,12 +966,8 @@ def main(argv: list[str] | None = None) -> None: # Now that we have printed the results from the scheduler, we close the # status printer, to ensure the status remains relevant in the UI context # (for applicable status printers). - if args.experimental_enable_async_scheduler: - if not args.interactive: - status_printer = get_async_status_printer() - status_printer.exit() - else: - status_printer = get_status_printer(args.interactive) + if not args.interactive: + status_printer = get_status_printer() status_printer.exit() else: diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 1167cec8..d5d4bb5d 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -20,14 +20,12 @@ from dvsim.flow.hjson import set_target_attribute from dvsim.job.data import CompletedJobStatus, JobSpec from dvsim.job.status import JobStatus -from dvsim.launcher.factory import get_launcher_cls from dvsim.logging import log from dvsim.runtime.fake import FakeRuntimeBackend from dvsim.runtime.registry import backend_registry -from dvsim.scheduler.async_core import Scheduler as AsyncScheduler -from dvsim.scheduler.async_status_printer import create_status_printer from dvsim.scheduler.core import Scheduler from dvsim.scheduler.log_manager import LogManager +from dvsim.scheduler.status_printer import create_status_printer from dvsim.utils import ( find_and_substitute_wildcards, rm_path, @@ -465,14 +463,7 @@ def deploy_objects(self) -> Sequence[CompletedJobStatus]: ), ) - if self.args.experimental_enable_async_scheduler: - return asyncio.run(self.run_scheduler(jobs)) - - return Scheduler( - items=jobs, - launcher_cls=get_launcher_cls(), - interactive=self.interactive, - ).run() + return asyncio.run(self.run_scheduler(jobs)) async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]: """Run the scheduler with the given set of job specifications.""" @@ -485,7 +476,7 @@ async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]: max_timeout = max((job.timeout_mins for job in jobs if job.timeout_mins), default=0) - scheduler = AsyncScheduler( + scheduler = Scheduler( jobs=jobs, backends={default_backend.name: default_backend}, default_backend=default_backend.name, diff --git a/src/dvsim/job/status.py b/src/dvsim/job/status.py index 457f3bc2..efe71d2a 100644 --- a/src/dvsim/job/status.py +++ b/src/dvsim/job/status.py @@ -12,8 +12,6 @@ class JobStatus(Enum): """Status of a Job.""" - # SCHEDULED is currently unused in the old sync scheduler, there `SCHEDULED` and `QUEUED` - # are combined under `QUEUED`. It is used only in the new async scheduler. SCHEDULED = auto() # Waiting for dependencies QUEUED = auto() # Dependencies satisfied, waiting to be dispatched RUNNING = auto() # Dispatched to a backend and actively executing diff --git a/src/dvsim/launcher/factory.py b/src/dvsim/launcher/factory.py deleted file mode 100644 index 6d909702..00000000 --- a/src/dvsim/launcher/factory.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -import os - -from dvsim.launcher.base import Launcher -from dvsim.launcher.fake import FakeLauncher -from dvsim.launcher.local import LocalLauncher -from dvsim.launcher.lsf import LsfLauncher -from dvsim.launcher.nc import NcLauncher -from dvsim.launcher.sge import SgeLauncher -from dvsim.launcher.slurm import SlurmLauncher -from dvsim.logging import log - -try: - from edacloudlauncher.EdaCloudLauncher import EdaCloudLauncher - - EDACLOUD_LAUNCHER_EXISTS = True -except ImportError: - EDACLOUD_LAUNCHER_EXISTS = False - -# The chosen launcher class. -_LAUNCHER_CLS: type[Launcher] | None = None - - -def set_launcher_type(is_local: bool = False, fake: bool = False) -> None: - """Set the launcher type that will be used to launch the jobs. - - The env variable `DVSIM_LAUNCHER` is used to identify what launcher system - to use. This variable is specific to the user's work site. It is meant to - be set externally before invoking DVSim. Valid values are [local, lsf, - edacloud]. If --local arg is supplied then the local launcher takes - precedence. - """ - launcher = os.environ.get("DVSIM_LAUNCHER", "local") - if is_local: - launcher = "local" - if fake: - launcher = "fake" - Launcher.variant = launcher - - global _LAUNCHER_CLS - if launcher == "local": - _LAUNCHER_CLS = LocalLauncher - - elif launcher == "lsf": - _LAUNCHER_CLS = LsfLauncher - - elif launcher == "sge": - _LAUNCHER_CLS = SgeLauncher - - elif launcher == "nc": - _LAUNCHER_CLS = NcLauncher - - elif launcher == "slurm": - _LAUNCHER_CLS = SlurmLauncher - - elif launcher == "fake": - _LAUNCHER_CLS = FakeLauncher - - # These custom launchers are site specific. They may not be committed to - # the open source repo. - elif launcher == "edacloud" and EDACLOUD_LAUNCHER_EXISTS: - _LAUNCHER_CLS = EdaCloudLauncher - - else: - log.error( - f"Launcher {launcher} set using DVSIM_LAUNCHER env var does not " - "exist. Using local launcher instead.", - ) - _LAUNCHER_CLS = LocalLauncher - - -def get_launcher_cls() -> type[Launcher]: - """Returns the chosen launcher class.""" - assert _LAUNCHER_CLS is not None - return _LAUNCHER_CLS - - -def get_launcher(deploy): - """Returns an instance of a launcher. - - 'deploy' is an instance of the deploy class to with the launcher is paired. - """ - assert _LAUNCHER_CLS is not None - return _LAUNCHER_CLS(deploy) diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py deleted file mode 100644 index d64fcb86..00000000 --- a/src/dvsim/launcher/fake.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -"""Fake Launcher that returns random results.""" - -from random import choice -from typing import TYPE_CHECKING - -from dvsim.job.status import JobStatus -from dvsim.launcher.base import ErrorMessage, Launcher - -if TYPE_CHECKING: - from dvsim.job.data import JobSpec, WorkspaceConfig - - -__all__ = ("FakeLauncher",) - - -def _run_test_handler(job_spec: "JobSpec") -> JobStatus: - """Handle a RunTest deploy job.""" - return choice((JobStatus.PASSED, JobStatus.FAILED)) - - -def _cov_report_handler(job_spec: "JobSpec") -> JobStatus: - """Handle a CompileSim deploy job.""" - # TODO: this hack doesn't work any more and needs implementing by writing - # a file that can be parsed as if it's been generated by the tool. - # - # keys = [ - # "score", - # "line", - # "cond", - # "toggle", - # "fsm", - # "branch", - # "assert", - # "group", - # ] - # job_spec.cov_results_dict = {k: f"{random() * 100:.2f} %" for k in keys} - - return JobStatus.PASSED - - -_DEPLOY_HANDLER = { - "RunTest": _run_test_handler, - "CovReport": _cov_report_handler, -} - - -class FakeLauncher(Launcher): - """Launch jobs and return fake results.""" - - # Poll job's completion status every this many seconds - poll_freq = 0 - - def _do_launch(self) -> None: - """Do the launch.""" - - def poll(self) -> JobStatus: - """Check status of the running process.""" - deploy_cls = self.job_spec.job_type - if deploy_cls in _DEPLOY_HANDLER: - return _DEPLOY_HANDLER[deploy_cls](job_spec=self.job_spec) - - # Default result is Pass - return JobStatus.PASSED - - def kill(self) -> None: - """Kill the running process.""" - self._post_finish( - JobStatus.KILLED, - ErrorMessage(line_number=None, message="Job killed!", context=[]), - ) - - @staticmethod - def prepare_workspace(cfg: "WorkspaceConfig") -> None: - """Prepare the workspace based on the chosen launcher's needs. - - This is done once for the entire duration for the flow run. - - Args: - cfg: workspace configuration - - """ - - @staticmethod - def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: - """Prepare the workspace for a cfg. - - This is invoked once for each cfg. - - Args: - cfg: workspace configuration - - """ diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py deleted file mode 100644 index a022b3ed..00000000 --- a/src/dvsim/launcher/local.py +++ /dev/null @@ -1,213 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -"""Launcher implementation to run jobs as subprocesses on the local machine.""" - -import datetime -import os -import shlex -import subprocess -from typing import TYPE_CHECKING - -from dvsim.job.status import JobStatus -from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError - -if TYPE_CHECKING: - from dvsim.job.data import JobSpec - from dvsim.job.deploy import WorkspaceConfig - - -class LocalLauncher(Launcher): - """Implementation of Launcher to launch jobs in the user's local workstation.""" - - # Poll job's completion status every this many seconds - poll_freq = 0.025 - - def __init__(self, job_spec: "JobSpec") -> None: - """Initialize common class members.""" - super().__init__(job_spec) - - # Popen object when launching the job. - self._process = None - self._log_file = None - - def _do_launch(self) -> None: - # Update the shell's env vars with self.exports. Values in exports must - # replace the values in the shell's env vars if the keys match. - exports = os.environ.copy() - exports.update(self.job_spec.exports) - - # Clear the magic MAKEFLAGS variable from exports if necessary. This - # variable is used by recursive Make calls to pass variables from one - # level to the next. Here, self.cmd is a call to Make but it's - # logically a top-level invocation: we don't want to pollute the flow's - # Makefile with Make variables from any wrapper that called dvsim. - if "MAKEFLAGS" in exports: - del exports["MAKEFLAGS"] - - self._dump_env_vars(exports) - - try: - log_path = self.job_spec.log_path - self._log_file = log_path.open( - "w", - encoding="UTF-8", - errors="surrogateescape", - ) - self._log_file.write(f"[Executing]:\n{self.job_spec.cmd}\n\n") - self._log_file.flush() - - if not self.job_spec.interactive: - self.timeout_secs = self.job_spec.timeout_secs - - try: - self._process = subprocess.Popen( - shlex.split(self.job_spec.cmd), - bufsize=4096, - universal_newlines=True, - stdout=self._log_file, - stderr=self._log_file, - env=exports, - ) - except BlockingIOError as e: - msg = f"Failed to launch job: {e}" - raise LauncherBusyError(msg) from e - except subprocess.SubprocessError as e: - msg = f"IO Error: {e}\nSee {log_path}" - raise LauncherError(msg) from e - else: - # Interactive: Set RUN_INTERACTIVE to 1 - exports["RUN_INTERACTIVE"] = "1" - - # Interactive. stdin / stdout are transparent - # no timeout and blocking op as user controls the flow - self._process = subprocess.Popen( - shlex.split(self.job_spec.cmd), - stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - # string mode - universal_newlines=True, - env=exports, - ) - - # stdout/stderr logs are tee'd to the log file via the pipe - if self._process.stdout is not None: - for line in self._process.stdout: - print(line, end="") # noqa: T201 - self._log_file.write(line) - self._log_file.flush() - - # Wait until the process exits - self._process.wait() - - except BlockingIOError as e: - msg = f"Failed to launch job: {e}" - raise LauncherBusyError(msg) from e - finally: - self._close_job_log_file() - - self._link_odir(JobStatus.RUNNING) - - def poll(self) -> JobStatus: - """Check status of the running process. - - This returns a job status. If RUNNING, the job is still running. - If PASSED, the job finished successfully. If FAILED, the job finished - with an error. If KILLED, it was killed. - - This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned PASSED or FAILED. - """ - if self._process is None: - msg = ( - "poll() was called either before calling launch() or after " - "ignoring a LauncherError from launch()." - ) - raise LauncherError(msg) - - elapsed_time = datetime.datetime.now() - self.start_time - self.job_runtime_secs = elapsed_time.total_seconds() - if self._process.poll() is None: - if self.timeout_secs and self.job_runtime_secs > self.timeout_secs: - self._kill() - timeout_mins = self.job_spec.timeout_mins - timeout_message = f"Job timed out after {timeout_mins} minutes" - self._post_finish( - JobStatus.KILLED, - ErrorMessage( - line_number=None, - message=timeout_message, - context=[timeout_message], - ), - ) - return JobStatus.KILLED - - return JobStatus.RUNNING - - self.exit_code = self._process.returncode - status, err_msg = self._check_status() - self._post_finish(status, err_msg) - - return self.status - - def _kill(self) -> None: - """Kill the running process. - - Try to kill the running process. Send SIGTERM first, wait a bit, - and then send SIGKILL if it didn't work. - """ - if self._process is None: - # process already dead or didn't start - return - - self._process.terminate() - try: - self._process.wait(timeout=2) - except subprocess.TimeoutExpired: - self._process.kill() - - def kill(self) -> None: - """Kill the running process. - - This must be called between dispatching and reaping the process (the - same window as poll()). - """ - self._kill() - self._post_finish( - JobStatus.KILLED, - ErrorMessage(line_number=None, message="Job killed!", context=[]), - ) - - def _post_finish(self, status: str, err_msg: ErrorMessage | None) -> None: - self._close_job_log_file() - self._process = None - super()._post_finish(status, err_msg) - - def _close_job_log_file(self) -> None: - """Close the file descriptors associated with the process.""" - if self._log_file: - self._log_file.close() - - @staticmethod - def prepare_workspace(cfg: "WorkspaceConfig") -> None: - """Prepare the workspace based on the chosen launcher's needs. - - This is done once for the entire duration for the flow run. - - Args: - cfg: workspace configuration - - """ - - @staticmethod - def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: - """Prepare the workspace for a cfg. - - This is invoked once for each cfg. - - Args: - cfg: workspace configuration - - """ diff --git a/src/dvsim/scheduler/async_core.py b/src/dvsim/scheduler/async_core.py deleted file mode 100644 index fef9571a..00000000 --- a/src/dvsim/scheduler/async_core.py +++ /dev/null @@ -1,599 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -"""Job scheduler.""" - -import asyncio -import heapq -from collections import defaultdict -from collections.abc import Callable, Iterable, Mapping, Sequence -from dataclasses import dataclass, field -from signal import SIGINT, SIGTERM, getsignal, signal -from types import FrameType -from typing import Any, TypeAlias - -from dvsim.job.data import CompletedJobStatus, JobSpec, JobStatusInfo -from dvsim.job.status import JobStatus -from dvsim.logging import log -from dvsim.runtime.backend import RuntimeBackend -from dvsim.runtime.data import JobCompletionEvent, JobHandle - -__all__ = ( - "JobPriorityFn", - "JobRecord", - "OnJobStatusChangeCb", - "OnRunEndCb", - "OnRunStartCb", - "OnSchedulerKillCb", - "Priority", - "Scheduler", -) - - -@dataclass -class JobRecord: - """Mutable runtime representation of a scheduled job, used in the scheduler.""" - - spec: JobSpec - backend_key: str # either spec.backend, or the default backend if not given - - status: JobStatus = JobStatus.SCHEDULED - status_info: JobStatusInfo | None = None - - remaining_deps: int = 0 - passing_deps: int = 0 - dependents: list[str] = field(default_factory=list) - kill_requested: bool = False - - handle: JobHandle | None = None - - -# Function to assign a priority to a given job specification. The returned priority should be -# some lexicographically orderable type. Jobs with higher priority are scheduled first. -Priority: TypeAlias = int | float | Sequence[int | float] -JobPriorityFn: TypeAlias = Callable[[JobRecord], Priority] - -# Callbacks for observers, for when the scheduler run starts and stops -OnRunStartCb: TypeAlias = Callable[[], None] -OnRunEndCb: TypeAlias = Callable[[], None] - -# Callbacks for observers, for when a job status changes in the scheduler -# The arguments are: (job spec, old status, new status). -OnJobStatusChangeCb: TypeAlias = Callable[[JobSpec, JobStatus, JobStatus], None] - -# Callbacks for observers, for when the scheduler receives a kill signal (termination). -OnSchedulerKillCb: TypeAlias = Callable[[], None] - - -# Standard context messages used for killed/failed jobs in the scheduler. -FAILED_DEP = JobStatusInfo( - message="Job cancelled because one of its dependencies failed or was killed." -) -ALL_FAILED_DEP = JobStatusInfo( - message="Job cancelled because all of its dependencies failed or were killed." -) -KILLED_SCHEDULED = JobStatusInfo( - message="Job cancelled because one of its dependencies was killed." -) -KILLED_QUEUED = JobStatusInfo(message="Job killed whilst waiting to begin execution.") -KILLED_RUNNING_SIGINT = JobStatusInfo( - message="Job killed by a SIGINT signal to the scheduler whilst executing." -) -KILLED_RUNNING_SIGTERM = JobStatusInfo( - message="Job killed by a SIGTERM signal to the scheduler whilst executing." -) - - -class Scheduler: - """Event-driven job scheduler that schedules and runs a DAG of job specifications.""" - - def __init__( # noqa: PLR0913 - self, - jobs: Iterable[JobSpec], - backends: Mapping[str, RuntimeBackend], - default_backend: str, - *, - max_parallelism: int = 0, - priority_fn: JobPriorityFn | None = None, - coalesce_window: float | None = 0.001, - ) -> None: - """Construct a new scheduler to run a DAG of jobs. - - Args: - jobs: The DAG of jobs to run. A sequence of job specifications, where the DAG is - defined by the job IDs and job dependency lists. - backends: The mapping (name -> backend) of backends available to the scheduler. - default_backend: The name of the default backend to use if not specified by a job. - max_parallelism: The maximum number of jobs that the scheduler is allowed to dispatch - at once, across all backends. The default value of `0` indicates no upper limit. - priority_fn: A function to calculate the priority of a given job. If no function is - given, this defaults to using the job's weight. - coalesce_window: If specified, the time in seconds to wait on receiving a job - completion, to give a short amount of time to allow other batched completion events - to arrive in the queue. This lets us batch scheduling more frequently for a little - extra cost. Defaults to 1 millisecond, and can be disabled by giving `None`. - - """ - if max_parallelism < 0: - err = f"max_parallelism must be some non-negative integer, not {max_parallelism}" - raise ValueError(err) - if default_backend not in backends: - err = f"Default backend '{default_backend}' is not in the mapping of given backends" - raise ValueError(err) - if coalesce_window is not None and coalesce_window < 0.0: - raise ValueError("coalesce_window must be None or some non-negative number") - - # Configuration of the scheduler's behaviour - self._backends = dict(backends) - self._default_backend = default_backend - self._max_parallelism = max_parallelism - self._priority_fn = priority_fn or self._default_priority - self._coalesce_window = coalesce_window - - # Internal data structures and indexes to track running jobs. - self._jobs: dict[str, JobRecord] = {} - self._ready_heap: list[tuple[Priority, str]] = [] - self._running: set[str] = set() - self._running_per_backend: dict[str, int] = dict.fromkeys(backends, 0) - self._event_queue: asyncio.Queue[Iterable[JobCompletionEvent]] = asyncio.Queue() - - # Internal flags and signal handling - self._shutdown_signal: int | None = None - self._shutdown_event: asyncio.Event | None = None - self._original_sigint_handler: Any = None - self._shutdown_started = False - - # Registered callbacks from observers - self._on_run_start: list[OnRunStartCb] = [] - self._on_run_end: list[OnRunEndCb] = [] - self._on_job_status_change: list[OnJobStatusChangeCb] = [] - self._on_kill_signal: list[OnSchedulerKillCb] = [] - - self._jobs = self.build_graph(jobs, self._backends, self._default_backend) - - def add_run_start_callback(self, cb: OnRunStartCb) -> None: - """Register an observer to notify when the scheduler run is started.""" - self._on_run_start.append(cb) - - def add_run_end_callback(self, cb: OnRunEndCb) -> None: - """Register an observer to notify when the scheduler run ends.""" - self._on_run_end.append(cb) - - def add_job_status_change_callback(self, cb: OnJobStatusChangeCb) -> None: - """Register an observer to notify when the status of a job in the scheduler changes.""" - self._on_job_status_change.append(cb) - - def add_kill_signal_callback(self, cb: OnSchedulerKillCb) -> None: - """Register an observer to notify when the scheduler is killed by some signal.""" - self._on_kill_signal.append(cb) - - def _default_priority(self, job: JobRecord) -> Priority: - """Prioritizes jobs according to their weight. The default prioritization method.""" - return job.spec.weight - - @staticmethod - def build_graph( - specs: Iterable[JobSpec], backends: Iterable[str], default_backend: str - ) -> dict[str, JobRecord]: - """Build the job dependency graph and validate the DAG structure. - - Args: - specs: The list of job specifications that comprise the DAG. - backends: The list of defined backend (names) that can be used by jobs. - default_backend: The backend that is used by default if not defined by a spec. - - Returns: - A (validated) dict mapping job IDs to records representing the graph. - - """ - # Build an index of runtime job records, and check for duplicates - job_graph: dict[str, JobRecord] = {} - for spec in specs: - if spec.id in job_graph: - log.warning("Duplicate job ID '%s'", spec.id) - # TODO: when we're sure it's ok, change the behaviour to error on duplicate jobs - # : err = f"Duplicate job ID '{spec.id}'" - # : raise ValueError(err) - # Instead, silently ignore it for now to match the original scheduler behaviour - continue - if spec.backend is not None and spec.backend not in backends: - err = f"Unknown job backend '{spec.backend}'" - raise ValueError(err) - backend_name = default_backend if spec.backend is None else spec.backend - job_graph[spec.id] = JobRecord(spec=spec, backend_key=backend_name) - - # Build a graph from the adjacency list formed by the spec dependencies - for job in job_graph.values(): - job.remaining_deps = len(job.spec.dependencies) - for dep in job.spec.dependencies: - if dep not in job_graph: - err = f"Unknown job dependency '{dep}' for job {job.spec.id}" - raise ValueError(err) - job_graph[dep].dependents.append(job.spec.id) - - # Validate that there are no cycles in the given graph. - Scheduler.validate_acyclic(job_graph) - - return job_graph - - @staticmethod - def validate_acyclic(job_graph: Mapping[str, JobRecord]) -> None: - """Validate that the given job digraph is acyclic via Kahn's Algorithm.""" - indegree = {job: record.remaining_deps for job, record in job_graph.items()} - job_queue = [job for job, degree in indegree.items() if degree == 0] - num_visited = 0 - - while job_queue: - job = job_queue.pop() - num_visited += 1 - for dep in job_graph[job].dependents: - indegree[dep] -= 1 - if indegree[dep] == 0: - job_queue.append(dep) - - if num_visited != len(job_graph): - raise ValueError("The given JobSpec graph contains a dependency cycle.") - - def _notify_run_started(self) -> None: - """Notify any observers that the scheduler run has started.""" - for cb in self._on_run_start: - cb() - - def _notify_run_finished(self) -> None: - """Notify any observers that the scheduler run has finished.""" - for cb in self._on_run_end: - cb() - - def _notify_kill_signal(self) -> None: - """Notify any observers that the scheduler received a kill signal.""" - for cb in self._on_kill_signal: - cb() - - def _change_job_status( - self, job: JobRecord, new_status: JobStatus, info: JobStatusInfo | None = None - ) -> JobStatus: - """Change a job's runtime status, storing an optionally associated reason. - - Notifies any status change observers of the change, and returns the previous status. - """ - old_status = job.status - if old_status == new_status: - return old_status - - job.status = new_status - job.status_info = info - - if new_status != JobStatus.RUNNING: - log.log( - log.ERROR if new_status in (JobStatus.FAILED, JobStatus.KILLED) else log.VERBOSE, - "Status change to [%s: %s] for %s", - new_status.shorthand, - new_status.name.capitalize(), - job.spec.full_name, - ) - - for cb in self._on_job_status_change: - cb(job.spec, old_status, new_status) - - return old_status - - def _mark_job_ready(self, job: JobRecord) -> None: - """Mark a given job in the scheduler as ready to execute (all dependencies completed).""" - if job.status != JobStatus.SCHEDULED: - msg = f"_mark_job_ready only applies to 'SCHEDULED' jobs (not '{job.status.name}')." - raise RuntimeError(msg) - - self._change_job_status(job, JobStatus.QUEUED) - # heapq is a min heap, so push (-priority) instead of (priority). - priority = self._priority_fn(job) - priority = priority if isinstance(priority, Sequence) else (priority,) - neg_priority: Priority = tuple(-x for x in priority) - heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) - - def _mark_job_running(self, job: JobRecord) -> None: - """Mark a given job in the scheduler as running. Assumes already removed from the heap.""" - if job.spec.id in self._running: - raise RuntimeError("_mark_job_running called on a job that was already running.") - - self._change_job_status(job, JobStatus.RUNNING) - self._running.add(job.spec.id) - self._running_per_backend[job.backend_key] += 1 - - def _mark_job_completed( - self, job: JobRecord, status: JobStatus, reason: JobStatusInfo | None - ) -> None: - """Mark a given job in the scheduler as completed, having reached some terminal state.""" - if not status.is_terminal: - err = f"_mark_job_completed called with non-terminal status '{status.name}'" - raise RuntimeError(err) - if job.status.is_terminal: - return - - # If the scheduler requested to kill the job, override the failure reason. - if job.kill_requested: - reason = ( - KILLED_RUNNING_SIGINT if self._shutdown_signal == SIGINT else KILLED_RUNNING_SIGTERM - ) - self._change_job_status(job, status, reason) - - # If the job was running, mark it as no longer running. - if job.spec.id in self._running: - self._running.remove(job.spec.id) - self._running_per_backend[job.backend_key] -= 1 - - # Update dependents (jobs that depend on this job), propagating failures if needed. - self._update_completed_job_deps(job) - - def _update_completed_job_deps(self, job: JobRecord) -> None: - """Update the dependencies of a completed job, scheduling/killing deps where necessary.""" - for dep_id in job.dependents: - dep = self._jobs[dep_id] - - # Update dependency tracking counts in the dependency records - dep.remaining_deps -= 1 - if job.status == JobStatus.PASSED: - dep.passing_deps += 1 - - # Propagate kill signals on shutdown - if self._shutdown_signal is not None: - self._mark_job_completed(dep, JobStatus.KILLED, KILLED_SCHEDULED) - continue - - # Handle dependency management and marking dependents as ready - if dep.remaining_deps == 0 and dep.status == JobStatus.SCHEDULED: - if dep.spec.needs_all_dependencies_passing: - if dep.passing_deps == len(dep.spec.dependencies): - self._mark_job_ready(dep) - else: - self._mark_job_completed(dep, JobStatus.KILLED, FAILED_DEP) - elif dep.passing_deps > 0: - self._mark_job_ready(dep) - else: - self._mark_job_completed(dep, JobStatus.KILLED, ALL_FAILED_DEP) - - async def run(self) -> list[CompletedJobStatus]: - """Run all scheduled jobs to completion (unless terminated) and return the results.""" - self._install_signal_handlers() - - for backend in self._backends.values(): - backend.attach_completion_callback(self._submit_job_completion) - - self._notify_run_started() - - # Before entering the main loop, mark jobs with 0 remaining deps as ready to run. - for job in self._jobs.values(): - if job.remaining_deps == 0: - self._mark_job_ready(job) - - try: - await self._main_loop() - finally: - self._notify_run_finished() - - return [ - CompletedJobStatus( - name=job.spec.name, - job_type=job.spec.job_type, - seed=job.spec.seed, - block=job.spec.block, - tool=job.spec.tool, - workspace_cfg=job.spec.workspace_cfg, - full_name=job.spec.full_name, - qual_name=job.spec.qual_name, - target=job.spec.target, - log_path=job.spec.log_path, - job_runtime=job.handle.job_runtime.with_unit("s").get()[0] - if job.handle is not None - else 0.0, - simulated_time=job.handle.simulated_time.with_unit("us").get()[0] - if job.handle is not None - else 0.0, - status=job.status, - fail_msg=job.status_info, - ) - for job in self._jobs.values() - ] - - def _install_signal_handlers(self) -> None: - """Install the SIGINT/SIGTERM signal handlers to trigger graceful shutdowns.""" - self._shutdown_signal = None - self._shutdown_event = asyncio.Event() - self._original_sigint_handler = getsignal(SIGINT) - self._shutdown_started = False - loop = asyncio.get_running_loop() - - def _handler(signum: int, _frame: FrameType | None) -> None: - if self._shutdown_signal is None and self._shutdown_event: - self._shutdown_signal = signum - loop.call_soon_threadsafe(self._shutdown_event.set) - - # Restore the original SIGINT handler so a second Ctrl-C terminates immediately - if signum == SIGINT: - signal(SIGINT, self._original_sigint_handler) - - loop.add_signal_handler(SIGINT, lambda: _handler(SIGINT, None)) - loop.add_signal_handler(SIGTERM, lambda: _handler(SIGTERM, None)) - - async def _submit_job_completion(self, events: Iterable[JobCompletionEvent]) -> None: - """Notify the scheduler that a batch of jobs have been completed.""" - try: - self._event_queue.put_nowait(events) - except asyncio.QueueShutDown as e: - msg = "Scheduler event queue shutdown earlier than expected?" - raise RuntimeError(msg) from e - except asyncio.QueueFull: - log.critical("Scheduler event queue full despite being infinitely sized?") - - async def _main_loop(self) -> None: - """Run the main scheduler loop. - - Tries to schedule any ready jobs if there is available capacity, and then waits for any job - completions (or a shutdown signal). This continues in a loop until all jobs have been either - executed or killed (e.g. via a shutdown signal). - """ - if self._shutdown_event is None: - raise RuntimeError("Expected signal handlers to be installed before running main loop") - - job_completion_task = asyncio.create_task(self._event_queue.get()) - shutdown_task = asyncio.create_task(self._shutdown_event.wait()) - - try: - while True: - await self._schedule_ready_jobs() - - if not self._running: - if not self._ready_heap: - break - # This case (nothing running, but jobs still pending in the queue) can happen - # if backends fail to schedule any jobs (e.g. the backend is temporarily busy). - continue - - # Wait for any job to complete, or for a shutdown signal - try: - done, _ = await asyncio.wait( - (job_completion_task, shutdown_task), - return_when=asyncio.FIRST_COMPLETED, - ) - except asyncio.QueueShutDown as e: - msg = "Scheduler event queue shutdown earlier than expected?" - raise RuntimeError(msg) from e - - if shutdown_task in done: - self._shutdown_event.clear() - shutdown_task = asyncio.create_task(self._shutdown_event.wait()) - await self._handle_exit_signal() - continue - - completions = await self._drain_completions(job_completion_task) - job_completion_task = asyncio.create_task(self._event_queue.get()) - - for event in completions: - job = self._jobs[event.spec.id] - self._mark_job_completed(job, event.status, event.reason) - finally: - job_completion_task.cancel() - shutdown_task.cancel() - - async def _drain_completions(self, completion_task: asyncio.Task) -> list[JobCompletionEvent]: - """Drain batched completions from the queue, optionally coalescing batched events.""" - events = list(completion_task.result()) - - # Coalesce nearby completions by waiting for a very short time - if self._coalesce_window is not None: - await asyncio.sleep(self._coalesce_window) - - # Drain any more completion events from the event queue - try: - while True: - events.extend(self._event_queue.get_nowait()) - except asyncio.QueueEmpty: - return events - except asyncio.QueueShutDown as e: - msg = "Scheduler event queue shutdown earlier than expected?" - raise RuntimeError(msg) from e - - async def _handle_exit_signal(self) -> None: - """Attempt to gracefully shutdown as a result of a triggered exit signal.""" - if self._shutdown_started: - return - self._shutdown_started = True - - signal_name = "SIGTERM" if self._shutdown_signal == SIGTERM else "SIGINT" - log.info("Received %s signal. Exiting gracefully", signal_name) - if self._shutdown_signal == SIGINT: - log.info( - "Send another to force immediate quit (but you may need to manually " - "kill some child processes)." - ) - - self._notify_kill_signal() - - # Mark any jobs that are currently running as jobs we should kill. - # Collect jobs to kill in a dict, grouped per backend, for batched killing. - to_kill: dict[str, list[JobHandle]] = defaultdict(list) - - for job_id in self._running: - job = self._jobs[job_id] - if job.handle is None: - raise RuntimeError("Running job is missing an associated handle.") - job.kill_requested = True - to_kill[job.backend_key].append(job.handle) - - # Asynchronously dispatch backend kill tasks whilst we update scheduler internals. - # Jobs that depend on these jobs will then be transitively killed before they start. - kill_tasks: list[asyncio.Task] = [] - for backend_name, handles in to_kill.items(): - backend = self._backends[backend_name] - kill_tasks.append(asyncio.create_task(backend.kill_many(handles))) - - # Kill any ready (but not running jobs), so that they don't get scheduled. - while self._ready_heap: - _, job_id = heapq.heappop(self._ready_heap) - job = self._jobs[job_id] - self._mark_job_completed(job, JobStatus.KILLED, KILLED_QUEUED) - - if kill_tasks: - await asyncio.gather(*kill_tasks, return_exceptions=True) - - async def _schedule_ready_jobs(self) -> None: - """Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism.""" - # Find out how many jobs we can dispatch according to the scheduler's parallelism limit - available_slots = ( - self._max_parallelism - len(self._running) - if self._max_parallelism - else len(self._ready_heap) - ) - if available_slots <= 0: - return - - # Collect jobs to launch in a dict, grouped per backend, for batched launching. - to_launch: dict[str, list[tuple[Priority, JobRecord]]] = defaultdict(list) - blocked: list[tuple[Priority, str]] = [] - slots_used = 0 - - while self._ready_heap and slots_used < available_slots: - neg_priority, job_id = heapq.heappop(self._ready_heap) - job = self._jobs[job_id] - backend = self._backends[job.backend_key] - running_on_backend = self._running_per_backend[job.backend_key] + len( - to_launch[job.backend_key] - ) - - # Check that we can launch the job whilst respecting backend parallelism limits - if backend.max_parallelism and running_on_backend >= backend.max_parallelism: - blocked.append((neg_priority, job_id)) - continue - - to_launch[job.backend_key].append((neg_priority, job)) - slots_used += 1 - - # Requeue any blocked jobs. - for entry in blocked: - heapq.heappush(self._ready_heap, entry) - - # Launch the selected jobs in batches per backend - launch_tasks = [] - for backend_name, jobs in to_launch.items(): - backend = self._backends[backend_name] - job_specs = [job.spec for _, job in jobs] - log.verbose( - "[%s]: Dispatching jobs: %s", - backend_name, - ", ".join(job.full_name for job in job_specs), - ) - launch_tasks.append(backend.submit_many(job_specs)) - - results = await asyncio.gather(*launch_tasks) - - # Mark jobs running, and requeue any jobs that failed to launch - for jobs, handles in zip(to_launch.values(), results, strict=True): - for neg_priority, job in jobs: - handle = handles.get(job.spec.id) - if handle is None: - log.verbose("[%s]: Requeuing job '%s'", job.spec.target, job.spec.full_name) - heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) - continue - - job.handle = handle - self._mark_job_running(job) diff --git a/src/dvsim/scheduler/async_status_printer.py b/src/dvsim/scheduler/async_status_printer.py deleted file mode 100644 index d3dc19b9..00000000 --- a/src/dvsim/scheduler/async_status_printer.py +++ /dev/null @@ -1,400 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -"""Job status printing during a scheduled run.""" - -import asyncio -import os -import shutil -import sys -import termios -import time -from abc import ABC, abstractmethod -from collections import defaultdict -from collections.abc import Sequence -from typing import ClassVar - -import enlighten - -from dvsim.job.data import JobSpec -from dvsim.job.status import JobStatus -from dvsim.logging import log -from dvsim.utils import TS_HMS_FORMAT - - -class StatusPrinter(ABC): - """Status Printer abstract base class. - - Contains core functionality related to status printing - a print interval can be configured - to control how often the scheduler target statuses are printed, which is managed by an async - thread. Optionally, the print interval can be configured to 0 to run in an update-driven mode - where every single status update is printed. Regardless of the configured print interval, the - final job update for each target is printed immediately to reflect final target end timings. - """ - - # How often we print by default. Zero means we should print on every event change. - print_interval = 0 - - def __init__(self, jobs: Sequence[JobSpec], print_interval: int | None = None) -> None: - """Construct the base StatusPrinter.""" - # Mapping from target -> (Mapping from status -> count) - self._target_counts: dict[str, dict[JobStatus, int]] = defaultdict(lambda: defaultdict(int)) - # Mapping from target -> number of jobs - self._totals: dict[str, int] = defaultdict(int) - - for job in jobs: - self._target_counts[job.target][JobStatus.SCHEDULED] += 1 - self._totals[job.target] += 1 - - # The number of characters used to represent the largest field in the displayed table - self._field_width = max((len(str(total)) for total in self._totals.values()), default=0) - - # State tracking for the StatusPrinter - self._start_time: float = 0.0 - self._last_print: float = 0.0 - self._running: dict[str, list[str]] = defaultdict(list) - self._num_finished: dict[str, int] = defaultdict(int) - self._finish_time: dict[str, float] = {} - - # Async target status update handling - self._task: asyncio.Task | None = None - self._paused: bool = False - - self._interval = print_interval if print_interval is not None else self.print_interval - - @property - def updates_every_event(self) -> bool: - """If the configured print interval is 0, statuses are updated on every state change.""" - return self._interval <= 0 - - def start(self) -> None: - """Start printing the status of the scheduled jobs.""" - self._start_time = time.monotonic() - self._print_header() - for target in self._target_counts: - self._init_target(target, self._get_target_row(target)) - - # If we need an async task to manage the print interval, create one - if not self.updates_every_event: - self._task = asyncio.create_task(self._run()) - - async def _run(self) -> None: - """Run a timer in an async loop, printing the updated status at every interval.""" - next_tick = self._start_time + self._interval - self.update_all_targets(including_unstarted=True) - - while True: - now = time.monotonic() - sleep_time = max(0, next_tick - now) - await asyncio.sleep(sleep_time) - self.update_all_targets() - next_tick += self._interval - - def update_all_targets(self, *, including_unstarted: bool = False) -> None: - """Update the status bars of all targets.""" - if self._paused: - return - update_time = time.monotonic() - for target in self._target_counts: - # Only update targets that have started (some job status has changed) - if self.target_is_started(target) or including_unstarted: - target_update_time = self._finish_time.get(target, update_time) - self._update_target(target_update_time, target) - - def target_is_started(self, target: str) -> bool: - """Check whether a target has been started yet or not.""" - return bool(self._num_finished[target]) or bool(self._running[target]) - - def target_is_done(self, target: str) -> bool: - """Check whether a target is finished or not.""" - return self._num_finished[target] >= self._totals[target] - - def update_status(self, job: JobSpec, old_status: JobStatus, new_status: JobStatus) -> None: - """Update the status printer to reflect a change in job status.""" - status_counts = self._target_counts[job.target] - status_counts[old_status] -= 1 - if old_status == JobStatus.RUNNING: - self._running[job.target].remove(job.full_name) - status_counts[new_status] += 1 - if new_status == JobStatus.RUNNING: - self._running[job.target].append(job.full_name) - if not old_status.is_terminal and new_status.is_terminal: - self._num_finished[job.target] += 1 - - if self.target_is_done(job.target) and not self.updates_every_event: - # Even if we have a configured print interval, we should record - # the time at which the target finished to capture accurate end timing. - self._finish_time[job.target] = time.monotonic() - elif self.updates_every_event: - self.update_all_targets() - - def _get_header(self) -> str: - """Get the header string to use for printing the status.""" - return ( - ", ".join( - f"{status.shorthand}: {status.name.lower().rjust(self._field_width)}" - for status in JobStatus - ) - + ", T: total" - ) - - def _get_target_row(self, target: str) -> str: - """Get a formatted string with the fields for a given target row.""" - fields = [] - for status in JobStatus: - count = self._target_counts[target][status] - value = f"{count:0{self._field_width}d}" - fields.append(f"{status.shorthand}: {value.rjust(len(status.name))}") - total = f"{self._totals[target]:0{self._field_width}d}" - fields.append(f"T: {total.rjust(5)}") - return ", ".join(fields) - - @abstractmethod - def _print_header(self) -> None: - """Initialize / print the header, displaying the legend of job status meanings.""" - - @abstractmethod - def _init_target(self, target: str, _msg: str) -> None: - """Initialize the status bar for a target.""" - - @abstractmethod - def _update_target(self, current_time: float, target: str) -> None: - """Update the status bar for a given target.""" - - def pause(self) -> None: - """Toggle whether the status printer is paused. May make target finish times inaccurate.""" - self._paused = not self._paused - if not self._paused and self.updates_every_event: - self.update_all_targets() - - def stop(self) -> None: - """Stop the status header/target printing (but keep the printer context).""" - if self._task: - self._task.cancel() - if self._paused: - self._paused = False - self.update_all_targets(including_unstarted=True) - - def exit(self) -> None: # noqa: B027 - """Do cleanup activities before exiting.""" - - -class TtyStatusPrinter(StatusPrinter): - """Prints the current scheduler target status onto the console / TTY via logging.""" - - hms_fmt = "\x1b[1m{hms:9s}\x1b[0m" - header_fmt = hms_fmt + " [{target:^13s}]: [{msg}]" - status_fmt = header_fmt + " {percent:3.0f}% {running}" - - def __init__(self, jobs: Sequence[JobSpec]) -> None: - """Initialise the TtyStatusPrinter.""" - super().__init__(jobs) - - # Maintain a mapping of completed targets, so we only print the status one last - # time when it reaches 100% for a target. - self._target_done: dict[str, bool] = {} - - def _print_header(self) -> None: - """Initialize / print the header, displaying the legend of job status meanings.""" - log.info(self.header_fmt.format(hms="", target="legend", msg=self._get_header())) - - def _init_target(self, target: str, _msg: str) -> None: - """Initialize the status bar for a target.""" - self._target_done[target] = False - - def _trunc_running(self, running: str, width: int = 30) -> str: - """Truncate the list of running items to a specified length.""" - if len(running) <= width: - return running - return running[: width - 3] + "..." - - def _update_target(self, current_time: float, target: str) -> None: - """Update the status bar for a given target.""" - if self._target_done[target]: - return - if self.target_is_done(target): - self._target_done[target] = True - - status_counts = self._target_counts[target] - done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) - percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 - elapsed_time = time.gmtime(current_time - self._start_time) - - log.info( - self.status_fmt.format( - hms=time.strftime(TS_HMS_FORMAT, elapsed_time), - target=target, - msg=self._get_target_row(target), - percent=percent, - running=self._trunc_running(", ".join(self._running[target])), - ), - ) - - -class EnlightenStatusPrinter(TtyStatusPrinter): - """Prints the current scheduler target status to the terminal using Enlighten. - - Enlighten is a third party progress bar tool. Documentation: - https://python-enlighten.readthedocs.io/en/stable/ - - Enlighten does not work if the output of DVSim is redirected to a file, for - example - it needs to be attached to a TTY enabled stream. - """ - - # Enlighten uses a min_delta of 0.1 by default, only updating every 0.1 seconds. - DEFAULT_MIN_DELTA = 0.1 - - status_fmt_no_running = TtyStatusPrinter.status_fmt.removesuffix("{running}") - status_fmt = "{status_msg}{running}" - - def __init__(self, jobs: Sequence[JobSpec]) -> None: - """Initialise the EnlightenStatusPrinter.""" - super().__init__(jobs) - if self._interval < self.DEFAULT_MIN_DELTA: - # TODO: maybe "debounce" the updates with a delayed async refresh task? - log.warning( - "Configured print interval %g will not accurately reflect for %s," - " which uses status bars with a configured min_delta of %g by default.", - self._interval, - self.__class__.__name__, - self.DEFAULT_MIN_DELTA, - ) - - # Initialize the enlighten manager and needed state - self._manager = enlighten.get_manager() - self._status_header: enlighten.StatusBar | None = None - self._status_bars: dict[str, enlighten.StatusBar] = {} - self._stopped = False - - def _print_header(self) -> None: - """Initialize / print the header, displaying the legend of job status meanings.""" - self._status_header = self._manager.status_bar( - status_format=self.header_fmt, - hms="", - target="legend", - msg=self._get_header(), - ) - - def _init_target(self, target: str, msg: str) -> None: - """Initialize the status bar for a target.""" - super()._init_target(target, msg) - hms = time.strftime(TS_HMS_FORMAT, time.gmtime(0)) - msg = self.status_fmt_no_running.format(hms=hms, target=target, msg=msg, percent=0.0) - self._status_bars[target] = self._manager.status_bar( - status_format=self.status_fmt, - status_msg=msg, - running="", - ) - - def _trunc_running_to_terminal(self, running: str, offset: int) -> str: - """Truncate the list of running items to match the max terminal width.""" - cols = shutil.get_terminal_size(fallback=(80, 24)).columns - width = max(30, cols - offset - 1) - return self._trunc_running(running, width) - - def _update_target(self, current_time: float, target: str) -> None: - """Update the status bar for a given target.""" - if self._target_done[target]: - return - - status_counts = self._target_counts[target] - done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) - percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 - elapsed_time = time.gmtime(current_time - self._start_time) - - status_msg = self.status_fmt_no_running.format( - hms=time.strftime(TS_HMS_FORMAT, elapsed_time), - target=target, - msg=self._get_target_row(target), - percent=percent, - ) - offset = len(status_msg) - running = self._trunc_running_to_terminal(", ".join(self._running[target]), offset) - - self._status_bars[target].update(status_msg=status_msg, running=running) - - if self.target_is_done(target): - self._target_done[target] = True - self._status_bars[target].refresh() - - def stop(self) -> None: - """Stop the status header/target printing (but keep the printer context).""" - super().stop() - if self._status_header is not None: - self._status_header.close() - for status_bar in self._status_bars.values(): - status_bar.close() - self._stopped = True - - def exit(self) -> None: - """Do cleanup activities before exiting (closing the manager context).""" - super().exit() - if not self._stopped: - self.stop() - self._manager.stop() - - # Sometimes, exiting via a signal (e.g. Ctrl-C) can cause Enlighten to leave the - # terminal in some non-raw mode. Just in case, restore regular operation. - self._restore_terminal() - - def _restore_terminal(self) -> None: - """Restore regular terminal operation after using Enlighten.""" - # Try open /dev/tty, otherwise fallback to sys.stdin - try: - fd = os.open("/dev/tty", os.O_RDWR) - close_fd = True - except (OSError, termios.error): - fd = sys.stdin.fileno() - close_fd = False - - # By default, the terminal should echo input (ECHO) and run in canonical mode (ICANON). - # We make this change after all buffered output is transmitted (TCSADRAIN). - try: - attrs = termios.tcgetattr(fd) - attrs[3] |= termios.ECHO | termios.ICANON - termios.tcsetattr(fd, termios.TCSADRAIN, attrs) - except termios.error: - log.debug("Unable to restore terminal attributes safely") - - if close_fd: - os.close(fd) - - -class StatusPrinterSingleton: - """Singleton for the status printer to uniquely refer to 1 instance at a time.""" - - _instance: ClassVar[StatusPrinter | None] = None - - @classmethod - def set(cls, instance: StatusPrinter | None) -> None: - """Set the stored status printer.""" - cls._instance = instance - - @classmethod - def get(cls) -> StatusPrinter | None: - """Get the stored status printer (if it exists).""" - return cls._instance - - -def create_status_printer(jobs: Sequence[JobSpec]) -> StatusPrinter: - """Create the global status printer. - - If stdout is a TTY, then return an instance of EnlightenStatusPrinter, else - return an instance of StatusPrinter. - """ - status_printer = StatusPrinterSingleton.get() - if status_printer is not None: - return status_printer - - status_printer = EnlightenStatusPrinter(jobs) if sys.stdout.isatty() else TtyStatusPrinter(jobs) - StatusPrinterSingleton.set(status_printer) - return status_printer - - -def get_status_printer() -> StatusPrinter: - """Retrieve the configured global status printer.""" - status_printer = StatusPrinterSingleton.get() - if status_printer is None: - raise RuntimeError("get_status_printer called without first creating the status printer") - return status_printer diff --git a/src/dvsim/scheduler/core.py b/src/dvsim/scheduler/core.py index b063c7fb..fef9571a 100644 --- a/src/dvsim/scheduler/core.py +++ b/src/dvsim/scheduler/core.py @@ -4,760 +4,596 @@ """Job scheduler.""" -import contextlib -import os -import selectors -from collections.abc import ( - Callable, - Mapping, - MutableMapping, - MutableSequence, - MutableSet, - Sequence, -) -from itertools import dropwhile -from signal import SIGINT, SIGTERM, signal +import asyncio +import heapq +from collections import defaultdict +from collections.abc import Callable, Iterable, Mapping, Sequence +from dataclasses import dataclass, field +from signal import SIGINT, SIGTERM, getsignal, signal from types import FrameType -from typing import TYPE_CHECKING, Any +from typing import Any, TypeAlias -from dvsim import instrumentation -from dvsim.instrumentation import NoOpInstrumentation from dvsim.job.data import CompletedJobStatus, JobSpec, JobStatusInfo from dvsim.job.status import JobStatus -from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError from dvsim.logging import log -from dvsim.scheduler.status_printer import get_status_printer -from dvsim.utils.timer import Timer +from dvsim.runtime.backend import RuntimeBackend +from dvsim.runtime.data import JobCompletionEvent, JobHandle + +__all__ = ( + "JobPriorityFn", + "JobRecord", + "OnJobStatusChangeCb", + "OnRunEndCb", + "OnRunStartCb", + "OnSchedulerKillCb", + "Priority", + "Scheduler", +) -if TYPE_CHECKING: - from dvsim.flow.base import FlowCfg +@dataclass +class JobRecord: + """Mutable runtime representation of a scheduled job, used in the scheduler.""" -def total_sub_items( - d: Mapping[str, Sequence[str]] | Mapping["FlowCfg", Sequence[JobSpec]], -) -> int: - """Return the total number of sub items in a mapping. + spec: JobSpec + backend_key: str # either spec.backend, or the default backend if not given - Given a dict whose key values are lists, return sum of lengths of - these lists. - """ - return sum(len(v) for v in d.values()) + status: JobStatus = JobStatus.SCHEDULED + status_info: JobStatusInfo | None = None + remaining_deps: int = 0 + passing_deps: int = 0 + dependents: list[str] = field(default_factory=list) + kill_requested: bool = False -def get_next_item(arr: Sequence, index: int) -> tuple[Any, int]: - """Perpetually get an item from a list. + handle: JobHandle | None = None - Returns the next item on the list by advancing the index by 1. If the index - is already the last item on the list, it loops back to the start, thus - implementing a circular list. - Args: - arr: subscriptable list. - index: index of the last item returned. +# Function to assign a priority to a given job specification. The returned priority should be +# some lexicographically orderable type. Jobs with higher priority are scheduled first. +Priority: TypeAlias = int | float | Sequence[int | float] +JobPriorityFn: TypeAlias = Callable[[JobRecord], Priority] - Returns: - (item, index) if successful. +# Callbacks for observers, for when the scheduler run starts and stops +OnRunStartCb: TypeAlias = Callable[[], None] +OnRunEndCb: TypeAlias = Callable[[], None] - Raises: - IndexError if arr is empty. +# Callbacks for observers, for when a job status changes in the scheduler +# The arguments are: (job spec, old status, new status). +OnJobStatusChangeCb: TypeAlias = Callable[[JobSpec, JobStatus, JobStatus], None] + +# Callbacks for observers, for when the scheduler receives a kill signal (termination). +OnSchedulerKillCb: TypeAlias = Callable[[], None] - """ - index += 1 - try: - item = arr[index] - except IndexError: - index = 0 - try: - item = arr[index] - except IndexError: - msg = "List is empty!" - raise IndexError(msg) from None - return item, index +# Standard context messages used for killed/failed jobs in the scheduler. +FAILED_DEP = JobStatusInfo( + message="Job cancelled because one of its dependencies failed or was killed." +) +ALL_FAILED_DEP = JobStatusInfo( + message="Job cancelled because all of its dependencies failed or were killed." +) +KILLED_SCHEDULED = JobStatusInfo( + message="Job cancelled because one of its dependencies was killed." +) +KILLED_QUEUED = JobStatusInfo(message="Job killed whilst waiting to begin execution.") +KILLED_RUNNING_SIGINT = JobStatusInfo( + message="Job killed by a SIGINT signal to the scheduler whilst executing." +) +KILLED_RUNNING_SIGTERM = JobStatusInfo( + message="Job killed by a SIGTERM signal to the scheduler whilst executing." +) class Scheduler: - """An object that runs one or more jobs from JobSpec items.""" + """Event-driven job scheduler that schedules and runs a DAG of job specifications.""" - def __init__( + def __init__( # noqa: PLR0913 self, - items: Sequence[JobSpec], - launcher_cls: type[Launcher], + jobs: Iterable[JobSpec], + backends: Mapping[str, RuntimeBackend], + default_backend: str, *, - interactive: bool = False, + max_parallelism: int = 0, + priority_fn: JobPriorityFn | None = None, + coalesce_window: float | None = 0.001, ) -> None: - """Initialise a job scheduler. + """Construct a new scheduler to run a DAG of jobs. Args: - items: sequence of jobs to deploy. - launcher_cls: Launcher class to use to deploy the jobs. - interactive: launch the tools in interactive mode. + jobs: The DAG of jobs to run. A sequence of job specifications, where the DAG is + defined by the job IDs and job dependency lists. + backends: The mapping (name -> backend) of backends available to the scheduler. + default_backend: The name of the default backend to use if not specified by a job. + max_parallelism: The maximum number of jobs that the scheduler is allowed to dispatch + at once, across all backends. The default value of `0` indicates no upper limit. + priority_fn: A function to calculate the priority of a given job. If no function is + given, this defaults to using the job's weight. + coalesce_window: If specified, the time in seconds to wait on receiving a job + completion, to give a short amount of time to allow other batched completion events + to arrive in the queue. This lets us batch scheduling more frequently for a little + extra cost. Defaults to 1 millisecond, and can be disabled by giving `None`. """ - # Start any instrumentation - inst = instrumentation.get() - self._instrumentation = NoOpInstrumentation() if inst is None else inst - self._instrumentation.start() - - self._jobs: Mapping[str, JobSpec] = {i.full_name: i for i in items} - - # 'scheduled[target][cfg]' is a list of JobSpec object names for the chosen - # target and cfg. As items in _scheduled are ready to be run (once - # their dependencies pass), they are moved to the _queued list, where - # they wait until slots are available for them to be dispatched. - # When all items (in all cfgs) of a target are done, it is removed from - # this dictionary. - self._scheduled: MutableMapping[str, MutableMapping[str, MutableSequence[str]]] = {} - self.add_to_scheduled(jobs=self._jobs) - - # Print status periodically using an external status printer. - self._status_printer = get_status_printer(interactive) - self._status_printer.print_header() - - # Sets of items, split up by their current state. The sets are - # disjoint and their union equals the keys of self.item_status. - # _queued is a list so that we dispatch things in order (relevant - # for things like tests where we have ordered things cleverly to - # try to see failures early). They are maintained for each target. - - # The list of available targets and the list of running items in each - # target are polled in a circular fashion, looping back to the start. - # This is done to allow us to poll a smaller subset of jobs rather than - # the entire regression. We keep rotating through our list of running - # items, picking up where we left off on the last poll. - self._targets: Sequence[str] = list(self._scheduled.keys()) - self._total: MutableMapping[str, int] = {} - - self._queued: MutableMapping[str, MutableSequence[str]] = {} - self._running: MutableMapping[str, MutableSequence[str]] = {} - - self._passed: MutableMapping[str, MutableSet[str]] = {} - self._failed: MutableMapping[str, MutableSet[str]] = {} - self._killed: MutableMapping[str, MutableSet[str]] = {} - - self._last_target_polled_idx = -1 - self._last_item_polled_idx = {} - - for target in self._scheduled: - self._queued[target] = [] - self._running[target] = [] - - self._passed[target] = set() - self._failed[target] = set() - self._killed[target] = set() - - self._total[target] = total_sub_items(self._scheduled[target]) - self._last_item_polled_idx[target] = -1 - - # Stuff for printing the status. - width = len(str(self._total[target])) - field_fmt = f"{{:0{width}d}}" - self._msg_fmt = ( - f"Q: {field_fmt}, R: {field_fmt}, P: {field_fmt}, " - f"F: {field_fmt}, K: {field_fmt}, T: {field_fmt}" - ) - msg = self._msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) - self._status_printer.init_target(target=target, msg=msg) - - # A map from the job names tracked by this class to their - # current status, corresponding to membership in the dicts - # above. This is not per-target. - self.job_status: MutableMapping[str, JobStatus] = {} - - # Create the launcher instance for all items. - self._launchers: Mapping[str, Launcher] = { - full_name: launcher_cls(job_spec) for full_name, job_spec in self._jobs.items() - } + if max_parallelism < 0: + err = f"max_parallelism must be some non-negative integer, not {max_parallelism}" + raise ValueError(err) + if default_backend not in backends: + err = f"Default backend '{default_backend}' is not in the mapping of given backends" + raise ValueError(err) + if coalesce_window is not None and coalesce_window < 0.0: + raise ValueError("coalesce_window must be None or some non-negative number") + + # Configuration of the scheduler's behaviour + self._backends = dict(backends) + self._default_backend = default_backend + self._max_parallelism = max_parallelism + self._priority_fn = priority_fn or self._default_priority + self._coalesce_window = coalesce_window + + # Internal data structures and indexes to track running jobs. + self._jobs: dict[str, JobRecord] = {} + self._ready_heap: list[tuple[Priority, str]] = [] + self._running: set[str] = set() + self._running_per_backend: dict[str, int] = dict.fromkeys(backends, 0) + self._event_queue: asyncio.Queue[Iterable[JobCompletionEvent]] = asyncio.Queue() + + # Internal flags and signal handling + self._shutdown_signal: int | None = None + self._shutdown_event: asyncio.Event | None = None + self._original_sigint_handler: Any = None + self._shutdown_started = False + + # Registered callbacks from observers + self._on_run_start: list[OnRunStartCb] = [] + self._on_run_end: list[OnRunEndCb] = [] + self._on_job_status_change: list[OnJobStatusChangeCb] = [] + self._on_kill_signal: list[OnSchedulerKillCb] = [] + + self._jobs = self.build_graph(jobs, self._backends, self._default_backend) + + def add_run_start_callback(self, cb: OnRunStartCb) -> None: + """Register an observer to notify when the scheduler run is started.""" + self._on_run_start.append(cb) + + def add_run_end_callback(self, cb: OnRunEndCb) -> None: + """Register an observer to notify when the scheduler run ends.""" + self._on_run_end.append(cb) + + def add_job_status_change_callback(self, cb: OnJobStatusChangeCb) -> None: + """Register an observer to notify when the status of a job in the scheduler changes.""" + self._on_job_status_change.append(cb) + + def add_kill_signal_callback(self, cb: OnSchedulerKillCb) -> None: + """Register an observer to notify when the scheduler is killed by some signal.""" + self._on_kill_signal.append(cb) + + def _default_priority(self, job: JobRecord) -> Priority: + """Prioritizes jobs according to their weight. The default prioritization method.""" + return job.spec.weight + + @staticmethod + def build_graph( + specs: Iterable[JobSpec], backends: Iterable[str], default_backend: str + ) -> dict[str, JobRecord]: + """Build the job dependency graph and validate the DAG structure. - # The chosen launcher class. This allows us to access launcher - # variant-specific settings such as max parallel jobs & poll rate. - self._launcher_cls: type[Launcher] = launcher_cls + Args: + specs: The list of job specifications that comprise the DAG. + backends: The list of defined backend (names) that can be used by jobs. + default_backend: The backend that is used by default if not defined by a spec. - def _handle_exit_signal(self, last_received_signal: int, handler: Callable) -> None: - """Handle a received exit (SIGINT/SIGTERM signal) in the main scheduler loop. + Returns: + A (validated) dict mapping job IDs to records representing the graph. - On either signal, this will tell runners to quit and cancel future jobs. - On receiving a SIGINT specifically, this re-installs the old signal handler - such that subsequent SIGINT signals will kill the process (non-gracefully). """ - log.info( - "Received signal %s. Exiting gracefully.", - last_received_signal, - ) - - if last_received_signal == SIGINT: - log.info( - "Send another to force immediate quit (but you may " - "need to manually kill child processes)", - ) - - # Restore old handler to catch a second SIGINT - signal(SIGINT, handler) - - self._kill() - - def run(self) -> Sequence[CompletedJobStatus]: - """Run all scheduled jobs and return the results. - - Returns the results (status) of all items dispatched for all - targets and cfgs. + # Build an index of runtime job records, and check for duplicates + job_graph: dict[str, JobRecord] = {} + for spec in specs: + if spec.id in job_graph: + log.warning("Duplicate job ID '%s'", spec.id) + # TODO: when we're sure it's ok, change the behaviour to error on duplicate jobs + # : err = f"Duplicate job ID '{spec.id}'" + # : raise ValueError(err) + # Instead, silently ignore it for now to match the original scheduler behaviour + continue + if spec.backend is not None and spec.backend not in backends: + err = f"Unknown job backend '{spec.backend}'" + raise ValueError(err) + backend_name = default_backend if spec.backend is None else spec.backend + job_graph[spec.id] = JobRecord(spec=spec, backend_key=backend_name) + + # Build a graph from the adjacency list formed by the spec dependencies + for job in job_graph.values(): + job.remaining_deps = len(job.spec.dependencies) + for dep in job.spec.dependencies: + if dep not in job_graph: + err = f"Unknown job dependency '{dep}' for job {job.spec.id}" + raise ValueError(err) + job_graph[dep].dependents.append(job.spec.id) + + # Validate that there are no cycles in the given graph. + Scheduler.validate_acyclic(job_graph) + + return job_graph + + @staticmethod + def validate_acyclic(job_graph: Mapping[str, JobRecord]) -> None: + """Validate that the given job digraph is acyclic via Kahn's Algorithm.""" + indegree = {job: record.remaining_deps for job, record in job_graph.items()} + job_queue = [job for job, degree in indegree.items() if degree == 0] + num_visited = 0 + + while job_queue: + job = job_queue.pop() + num_visited += 1 + for dep in job_graph[job].dependents: + indegree[dep] -= 1 + if indegree[dep] == 0: + job_queue.append(dep) + + if num_visited != len(job_graph): + raise ValueError("The given JobSpec graph contains a dependency cycle.") + + def _notify_run_started(self) -> None: + """Notify any observers that the scheduler run has started.""" + for cb in self._on_run_start: + cb() + + def _notify_run_finished(self) -> None: + """Notify any observers that the scheduler run has finished.""" + for cb in self._on_run_end: + cb() + + def _notify_kill_signal(self) -> None: + """Notify any observers that the scheduler received a kill signal.""" + for cb in self._on_kill_signal: + cb() + + def _change_job_status( + self, job: JobRecord, new_status: JobStatus, info: JobStatusInfo | None = None + ) -> JobStatus: + """Change a job's runtime status, storing an optionally associated reason. + + Notifies any status change observers of the change, and returns the previous status. """ - # Notify instrumentation that the scheduler started - self._instrumentation.on_scheduler_start() - timer = Timer() - - # On SIGTERM or SIGINT, tell the runner to quit. - # On a second SIGINT specifically, die. - sel = selectors.DefaultSelector() - signal_rfd, signal_wfd = os.pipe() - sel.register(signal_rfd, selectors.EVENT_READ) - last_received_signal: int | None = None - - def on_signal(signal_received: int, _: FrameType | None) -> None: - # To allow async-safe-signal logic where signals can be handled - # while sleeping, we use a selector to perform a blocking wait, - # and signal the event through a pipe. We then set a flag with - # the received signal. Like this, we can receive a signal - # at any point, and it can also interrupt the poll wait to take - # immediate effect. - nonlocal last_received_signal - last_received_signal = signal_received - os.write(signal_wfd, b"\x00") - - # Install the SIGINT and SIGTERM handlers before scheduling jobs. - old_handler = signal(SIGINT, on_signal) - signal(SIGTERM, on_signal) - - # Enqueue all items of the first target. - self._enqueue_successors(None) - - try: - while True: - if last_received_signal is not None: - self._handle_exit_signal(last_received_signal, old_handler) - last_received_signal = None - - hms = timer.hms() - changed = self._poll(hms) or timer.check_time() - self._dispatch(hms) - if changed and self._check_if_done(hms): - break - - # Wait between each poll, except we may be woken by a signal. - sel.select(timeout=self._launcher_cls.poll_freq) - - finally: - signal(SIGINT, old_handler) - - # Stop the status printer, but don't exit/close it yet, for reporting purposes. - # That will be done by the CLI upon exiting - self._status_printer.stop() - - # Finish instrumentation and generate the instrumentation report - self._instrumentation.on_scheduler_end() - self._instrumentation.stop() - instrumentation.flush() - - # We got to the end without anything exploding. Return the results. - results = [] - for name, status in self.job_status.items(): - launcher = self._launchers[name] - job_spec = self._jobs[name] - - fail_msg = None - if launcher.fail_msg is not None: - launcher_fail = launcher.fail_msg - lines = None if launcher_fail.line_number is None else [launcher_fail.line_number] - fail_msg = JobStatusInfo( - message=launcher_fail.message, lines=lines, context=launcher_fail.context - ) - - results.append( - CompletedJobStatus( - name=job_spec.name, - job_type=job_spec.job_type, - seed=job_spec.seed, - block=job_spec.block, - tool=job_spec.tool, - workspace_cfg=job_spec.workspace_cfg, - full_name=name, - qual_name=job_spec.qual_name, - target=job_spec.target, - log_path=job_spec.log_path, - job_runtime=launcher.job_runtime.with_unit("s").get()[0], - simulated_time=launcher.simulated_time.with_unit("us").get()[0], - status=status, - fail_msg=fail_msg, - ) + old_status = job.status + if old_status == new_status: + return old_status + + job.status = new_status + job.status_info = info + + if new_status != JobStatus.RUNNING: + log.log( + log.ERROR if new_status in (JobStatus.FAILED, JobStatus.KILLED) else log.VERBOSE, + "Status change to [%s: %s] for %s", + new_status.shorthand, + new_status.name.capitalize(), + job.spec.full_name, ) - return results - - def add_to_scheduled(self, jobs: Mapping[str, JobSpec]) -> None: - """Add jobs to the schedule. - - Args: - jobs: the jobs to add to the schedule. - - """ - for full_name, job_spec in jobs.items(): - target_dict = self._scheduled.setdefault(job_spec.target, {}) - cfg_list = target_dict.setdefault(job_spec.block.name, []) - - if job_spec not in cfg_list: - cfg_list.append(full_name) - - def _unschedule_item(self, job_name: str) -> None: - """Remove deploy item from the schedule.""" - job = self._jobs[job_name] - target_dict = self._scheduled[job.target] - cfg_list = target_dict.get(job.block.name) - - if cfg_list is not None: - with contextlib.suppress(ValueError): - cfg_list.remove(job_name) - - # When all items in _scheduled[target][cfg] are finally removed, - # the cfg key is deleted. - if not cfg_list: - del target_dict[job.block.name] - - def _enqueue_successors(self, job_name: str | None = None) -> None: - """Move an item's successors from _scheduled to _queued. - - 'item' is the recently run job that has completed. If None, then we - move all available items in all available cfgs in _scheduled's first - target. If 'item' is specified, then we find its successors and move - them to _queued. - """ - for next_job_name in self._get_successors(job_name): - target = self._jobs[next_job_name].target - if next_job_name in self.job_status or next_job_name in self._queued[target]: - msg = f"Job {next_job_name} already scheduled" - raise RuntimeError(msg) - - self.job_status[next_job_name] = JobStatus.QUEUED - self._queued[target].append(next_job_name) - self._unschedule_item(next_job_name) - - def _cancel_successors(self, job_name: str) -> None: - """Cancel an item's successors. - - Recursively move them from _scheduled or _queued to _killed. - - Args: - job_name: job whose successors are to be canceled. - - """ - items = list(self._get_successors(job_name)) - while items: - next_item = items.pop() - self._cancel_item(next_item, cancel_successors=False) - items.extend(self._get_successors(next_item)) - - def _get_successor_target(self, job_name: str) -> str | None: - """Find the first target in the scheduled list that follows the target of a given job. + for cb in self._on_job_status_change: + cb(job.spec, old_status, new_status) - Args: - job_name: name of the job (to find the successor target of). - - Returns: - the successor, or None if no such target exists or there is no successor. + return old_status - """ - job: JobSpec = self._jobs[job_name] + def _mark_job_ready(self, job: JobRecord) -> None: + """Mark a given job in the scheduler as ready to execute (all dependencies completed).""" + if job.status != JobStatus.SCHEDULED: + msg = f"_mark_job_ready only applies to 'SCHEDULED' jobs (not '{job.status.name}')." + raise RuntimeError(msg) - if job.target not in self._scheduled: - msg = f"Scheduler does not contain target {job.target}" - raise KeyError(msg) + self._change_job_status(job, JobStatus.QUEUED) + # heapq is a min heap, so push (-priority) instead of (priority). + priority = self._priority_fn(job) + priority = priority if isinstance(priority, Sequence) else (priority,) + neg_priority: Priority = tuple(-x for x in priority) + heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) - # Find the first target that follows the target in the scheduled list. - target_iter = dropwhile(lambda x: x != job.target, self._scheduled) - next(target_iter, None) - return next(target_iter, None) + def _mark_job_running(self, job: JobRecord) -> None: + """Mark a given job in the scheduler as running. Assumes already removed from the heap.""" + if job.spec.id in self._running: + raise RuntimeError("_mark_job_running called on a job that was already running.") - def _get_successors(self, job_name: str | None = None) -> Sequence[str]: - """Find immediate successors of an item. + self._change_job_status(job, JobStatus.RUNNING) + self._running.add(job.spec.id) + self._running_per_backend[job.backend_key] += 1 - We choose the target that follows the item's current target and find - the list of successors whose dependency list contains "job_name". If - "job_name" is None, we pick successors from all cfgs, else we pick - successors only from the cfg to which the item belongs. + def _mark_job_completed( + self, job: JobRecord, status: JobStatus, reason: JobStatusInfo | None + ) -> None: + """Mark a given job in the scheduler as completed, having reached some terminal state.""" + if not status.is_terminal: + err = f"_mark_job_completed called with non-terminal status '{status.name}'" + raise RuntimeError(err) + if job.status.is_terminal: + return - Args: - job_name: name of the job + # If the scheduler requested to kill the job, override the failure reason. + if job.kill_requested: + reason = ( + KILLED_RUNNING_SIGINT if self._shutdown_signal == SIGINT else KILLED_RUNNING_SIGTERM + ) + self._change_job_status(job, status, reason) - Returns: - list of the jobs successors, or an empty list if there are none. + # If the job was running, mark it as no longer running. + if job.spec.id in self._running: + self._running.remove(job.spec.id) + self._running_per_backend[job.backend_key] -= 1 - """ - if job_name is None: - target = next(iter(self._scheduled), None) - cfgs = set() if target is None else set(self._scheduled[target]) - else: - target = self._get_successor_target(job_name) - job: JobSpec = self._jobs[job_name] - cfgs = {job.block.name} - - if target is None: - return () - - # Find item's successors that can be enqueued. We assume here that - # only the immediately succeeding target can be enqueued at this - # time. - successors = [] - for cfg in cfgs: - for next_item in self._scheduled[target][cfg]: - if job_name is not None: - job = self._jobs[next_item] - if not job.dependencies: - raise RuntimeError( - "Job item exists but the next item's dependency list is empty?" - ) - if job_name not in job.dependencies: - continue - - if self._ok_to_enqueue(next_item): - successors.append(next_item) - - return successors - - def _ok_to_enqueue(self, job_name: str) -> bool: - """Check if all dependencies jobs are completed. + # Update dependents (jobs that depend on this job), propagating failures if needed. + self._update_completed_job_deps(job) - Args: - job_name: name of job. + def _update_completed_job_deps(self, job: JobRecord) -> None: + """Update the dependencies of a completed job, scheduling/killing deps where necessary.""" + for dep_id in job.dependents: + dep = self._jobs[dep_id] - Returns: - true if ALL dependencies of item are complete. + # Update dependency tracking counts in the dependency records + dep.remaining_deps -= 1 + if job.status == JobStatus.PASSED: + dep.passing_deps += 1 - """ - for dep in self._jobs[job_name].dependencies: - # Ignore dependencies that were not scheduled to run. - if dep not in self._jobs: + # Propagate kill signals on shutdown + if self._shutdown_signal is not None: + self._mark_job_completed(dep, JobStatus.KILLED, KILLED_SCHEDULED) continue - # Has the dep even been enqueued? - if dep not in self.job_status: - return False - - # Has the dep completed? - if not self.job_status[dep].is_terminal: - return False - - return True + # Handle dependency management and marking dependents as ready + if dep.remaining_deps == 0 and dep.status == JobStatus.SCHEDULED: + if dep.spec.needs_all_dependencies_passing: + if dep.passing_deps == len(dep.spec.dependencies): + self._mark_job_ready(dep) + else: + self._mark_job_completed(dep, JobStatus.KILLED, FAILED_DEP) + elif dep.passing_deps > 0: + self._mark_job_ready(dep) + else: + self._mark_job_completed(dep, JobStatus.KILLED, ALL_FAILED_DEP) - def _ok_to_run(self, job_name: str) -> bool: - """Check if a job is ready to start. + async def run(self) -> list[CompletedJobStatus]: + """Run all scheduled jobs to completion (unless terminated) and return the results.""" + self._install_signal_handlers() - The item's needs_all_dependencies_passing setting is used to figure - out whether we can run this item or not, based on its dependent jobs' - statuses. + for backend in self._backends.values(): + backend.attach_completion_callback(self._submit_job_completion) - Args: - job_name: name of the job to check + self._notify_run_started() - Returns: - true if the required dependencies have passed. + # Before entering the main loop, mark jobs with 0 remaining deps as ready to run. + for job in self._jobs.values(): + if job.remaining_deps == 0: + self._mark_job_ready(job) + try: + await self._main_loop() + finally: + self._notify_run_finished() + + return [ + CompletedJobStatus( + name=job.spec.name, + job_type=job.spec.job_type, + seed=job.spec.seed, + block=job.spec.block, + tool=job.spec.tool, + workspace_cfg=job.spec.workspace_cfg, + full_name=job.spec.full_name, + qual_name=job.spec.qual_name, + target=job.spec.target, + log_path=job.spec.log_path, + job_runtime=job.handle.job_runtime.with_unit("s").get()[0] + if job.handle is not None + else 0.0, + simulated_time=job.handle.simulated_time.with_unit("us").get()[0] + if job.handle is not None + else 0.0, + status=job.status, + fail_msg=job.status_info, + ) + for job in self._jobs.values() + ] + + def _install_signal_handlers(self) -> None: + """Install the SIGINT/SIGTERM signal handlers to trigger graceful shutdowns.""" + self._shutdown_signal = None + self._shutdown_event = asyncio.Event() + self._original_sigint_handler = getsignal(SIGINT) + self._shutdown_started = False + loop = asyncio.get_running_loop() + + def _handler(signum: int, _frame: FrameType | None) -> None: + if self._shutdown_signal is None and self._shutdown_event: + self._shutdown_signal = signum + loop.call_soon_threadsafe(self._shutdown_event.set) + + # Restore the original SIGINT handler so a second Ctrl-C terminates immediately + if signum == SIGINT: + signal(SIGINT, self._original_sigint_handler) + + loop.add_signal_handler(SIGINT, lambda: _handler(SIGINT, None)) + loop.add_signal_handler(SIGTERM, lambda: _handler(SIGTERM, None)) + + async def _submit_job_completion(self, events: Iterable[JobCompletionEvent]) -> None: + """Notify the scheduler that a batch of jobs have been completed.""" + try: + self._event_queue.put_nowait(events) + except asyncio.QueueShutDown as e: + msg = "Scheduler event queue shutdown earlier than expected?" + raise RuntimeError(msg) from e + except asyncio.QueueFull: + log.critical("Scheduler event queue full despite being infinitely sized?") + + async def _main_loop(self) -> None: + """Run the main scheduler loop. + + Tries to schedule any ready jobs if there is available capacity, and then waits for any job + completions (or a shutdown signal). This continues in a loop until all jobs have been either + executed or killed (e.g. via a shutdown signal). """ - job: JobSpec = self._jobs[job_name] - # 'item' can run only if its dependencies have passed (their results - # should already show up in the item to status map). - for dep_name in job.dependencies: - # Ignore dependencies that were not scheduled to run. - if dep_name not in self._jobs: - continue - - dep_status = self.job_status[dep_name] - if not dep_status.is_terminal: - msg = f"Expected dependent job {dep_name} to be ended, not {dep_status.name}." - raise ValueError(msg) + if self._shutdown_event is None: + raise RuntimeError("Expected signal handlers to be installed before running main loop") - if job.needs_all_dependencies_passing: - if dep_status != JobStatus.PASSED: - return False - elif dep_status == JobStatus.PASSED: - return True - - return job.needs_all_dependencies_passing - - def _poll(self, hms: str) -> bool: - """Check for running items that have finished. - - Returns: - True if something changed. + job_completion_task = asyncio.create_task(self._event_queue.get()) + shutdown_task = asyncio.create_task(self._shutdown_event.wait()) - """ - max_poll = min( - self._launcher_cls.max_poll, - total_sub_items(self._running), - ) + try: + while True: + await self._schedule_ready_jobs() - # If there are no jobs running, we are likely done (possibly because - # of a SIGINT). Since poll() was called anyway, signal that something - # has indeed changed. - if not max_poll: - return True - - changed = False - while max_poll: - target, self._last_target_polled_idx = get_next_item( - self._targets, - self._last_target_polled_idx, - ) + if not self._running: + if not self._ready_heap: + break + # This case (nothing running, but jobs still pending in the queue) can happen + # if backends fail to schedule any jobs (e.g. the backend is temporarily busy). + continue - while self._running[target] and max_poll: - max_poll -= 1 - job_name, self._last_item_polled_idx[target] = get_next_item( - self._running[target], - self._last_item_polled_idx[target], - ) + # Wait for any job to complete, or for a shutdown signal try: - status = self._launchers[job_name].poll() - except LauncherError as e: - log.error("Error when dispatching target: %s", str(e)) - status = JobStatus.KILLED - level = log.VERBOSE - - if status == JobStatus.RUNNING: + done, _ = await asyncio.wait( + (job_completion_task, shutdown_task), + return_when=asyncio.FIRST_COMPLETED, + ) + except asyncio.QueueShutDown as e: + msg = "Scheduler event queue shutdown earlier than expected?" + raise RuntimeError(msg) from e + + if shutdown_task in done: + self._shutdown_event.clear() + shutdown_task = asyncio.create_task(self._shutdown_event.wait()) + await self._handle_exit_signal() continue - if status == JobStatus.PASSED: - self._passed[target].add(job_name) - self._instrumentation.on_job_status_change(self._jobs[job_name], status) + completions = await self._drain_completions(job_completion_task) + job_completion_task = asyncio.create_task(self._event_queue.get()) - elif status == JobStatus.FAILED: - self._failed[target].add(job_name) - self._instrumentation.on_job_status_change(self._jobs[job_name], status) - level = log.ERROR + for event in completions: + job = self._jobs[event.spec.id] + self._mark_job_completed(job, event.status, event.reason) + finally: + job_completion_task.cancel() + shutdown_task.cancel() - else: - # Killed, still Queued, or some error when dispatching. - self._killed[target].add(job_name) - self._instrumentation.on_job_status_change(self._jobs[job_name], status.KILLED) - level = log.ERROR - - self._running[target].pop(self._last_item_polled_idx[target]) - self._last_item_polled_idx[target] -= 1 - self.job_status[job_name] = status - - log.log( - level, - "[%s]: [%s]: [status] [%s: %s]", - hms, - target, - job_name, - status.shorthand, - ) - - # Enqueue item's successors regardless of its status. - # - # It may be possible that a failed item's successor may not - # need all of its dependents to pass (if it has other dependent - # jobs). Hence we enqueue all successors rather than canceling - # them right here. We leave it to _dispatch() to figure out - # whether an enqueued item can be run or not. - self._enqueue_successors(job_name) - changed = True - - return changed - - def _dispatch_job(self, hms: str, target: str, job_name: str) -> None: - """Dispatch the named queued job. + async def _drain_completions(self, completion_task: asyncio.Task) -> list[JobCompletionEvent]: + """Drain batched completions from the queue, optionally coalescing batched events.""" + events = list(completion_task.result()) - Args: - hms: time as a string formatted in hh:mm:ss - target: the target to dispatch this job to - job_name: the name of the job to dispatch + # Coalesce nearby completions by waiting for a very short time + if self._coalesce_window is not None: + await asyncio.sleep(self._coalesce_window) - """ + # Drain any more completion events from the event queue try: - self._launchers[job_name].launch() - - except LauncherError: - log.exception("Error launching %s", job_name) - self._kill_item(job_name) - - except LauncherBusyError: - log.exception("Launcher busy") - - self._queued[target].append(job_name) - - log.verbose( - "[%s]: [%s]: [requeued]: %s", - hms, - target, - job_name, - ) + while True: + events.extend(self._event_queue.get_nowait()) + except asyncio.QueueEmpty: + return events + except asyncio.QueueShutDown as e: + msg = "Scheduler event queue shutdown earlier than expected?" + raise RuntimeError(msg) from e + + async def _handle_exit_signal(self) -> None: + """Attempt to gracefully shutdown as a result of a triggered exit signal.""" + if self._shutdown_started: return + self._shutdown_started = True - self._running[target].append(job_name) - self.job_status[job_name] = JobStatus.RUNNING - self._instrumentation.on_job_status_change(self._jobs[job_name], JobStatus.RUNNING) - - def _dispatch(self, hms: str) -> None: - """Dispatch some queued items if possible. - - Args: - hms: time as a string formatted in hh:mm:ss - - """ - slots = self._launcher_cls.max_parallel - total_sub_items(self._running) - if slots <= 0: - return + signal_name = "SIGTERM" if self._shutdown_signal == SIGTERM else "SIGINT" + log.info("Received %s signal. Exiting gracefully", signal_name) + if self._shutdown_signal == SIGINT: + log.info( + "Send another to force immediate quit (but you may need to manually " + "kill some child processes)." + ) - # Compute how many slots to allocate to each target based on their - # weights. - sum_weight = 0 - slots_filled = 0 - total_weight = sum( - self._jobs[self._queued[t][0]].weight for t in self._queued if self._queued[t] + self._notify_kill_signal() + + # Mark any jobs that are currently running as jobs we should kill. + # Collect jobs to kill in a dict, grouped per backend, for batched killing. + to_kill: dict[str, list[JobHandle]] = defaultdict(list) + + for job_id in self._running: + job = self._jobs[job_id] + if job.handle is None: + raise RuntimeError("Running job is missing an associated handle.") + job.kill_requested = True + to_kill[job.backend_key].append(job.handle) + + # Asynchronously dispatch backend kill tasks whilst we update scheduler internals. + # Jobs that depend on these jobs will then be transitively killed before they start. + kill_tasks: list[asyncio.Task] = [] + for backend_name, handles in to_kill.items(): + backend = self._backends[backend_name] + kill_tasks.append(asyncio.create_task(backend.kill_many(handles))) + + # Kill any ready (but not running jobs), so that they don't get scheduled. + while self._ready_heap: + _, job_id = heapq.heappop(self._ready_heap) + job = self._jobs[job_id] + self._mark_job_completed(job, JobStatus.KILLED, KILLED_QUEUED) + + if kill_tasks: + await asyncio.gather(*kill_tasks, return_exceptions=True) + + async def _schedule_ready_jobs(self) -> None: + """Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism.""" + # Find out how many jobs we can dispatch according to the scheduler's parallelism limit + available_slots = ( + self._max_parallelism - len(self._running) + if self._max_parallelism + else len(self._ready_heap) ) + if available_slots <= 0: + return - for target in self._scheduled: - if not self._queued[target]: - continue + # Collect jobs to launch in a dict, grouped per backend, for batched launching. + to_launch: dict[str, list[tuple[Priority, JobRecord]]] = defaultdict(list) + blocked: list[tuple[Priority, str]] = [] + slots_used = 0 + + while self._ready_heap and slots_used < available_slots: + neg_priority, job_id = heapq.heappop(self._ready_heap) + job = self._jobs[job_id] + backend = self._backends[job.backend_key] + running_on_backend = self._running_per_backend[job.backend_key] + len( + to_launch[job.backend_key] + ) - # N slots are allocated to M targets each with W(m) weights with - # the formula: - # - # N(m) = N * W(m) / T, where, - # T is the sum total of all weights. - # - # This is however, problematic due to fractions. Even after - # rounding off to the nearest digit, slots may not be fully - # utilized (one extra left). An alternate approach that avoids this - # problem is as follows: - # - # N(m) = (N * S(W(m)) / T) - F(m), where, - # S(W(m)) is the running sum of weights upto current target m. - # F(m) is the running total of slots filled. - # - # The computed slots per target is nearly identical to the first - # solution, except that it prioritizes the slot allocation to - # targets that are earlier in the list such that in the end, all - # slots are fully consumed. - sum_weight += self._jobs[self._queued[target][0]].weight - target_slots = round((slots * sum_weight) / total_weight) - slots_filled - if target_slots <= 0: + # Check that we can launch the job whilst respecting backend parallelism limits + if backend.max_parallelism and running_on_backend >= backend.max_parallelism: + blocked.append((neg_priority, job_id)) continue - slots_filled += target_slots - - to_dispatch = [] - while self._queued[target] and target_slots > 0: - next_item = self._queued[target].pop(0) - if not self._ok_to_run(next_item): - self._cancel_item(next_item, cancel_successors=False) - self._enqueue_successors(next_item) - continue - to_dispatch.append(next_item) - target_slots -= 1 + to_launch[job.backend_key].append((neg_priority, job)) + slots_used += 1 - if not to_dispatch: - continue + # Requeue any blocked jobs. + for entry in blocked: + heapq.heappush(self._ready_heap, entry) + # Launch the selected jobs in batches per backend + launch_tasks = [] + for backend_name, jobs in to_launch.items(): + backend = self._backends[backend_name] + job_specs = [job.spec for _, job in jobs] log.verbose( - "[%s]: [%s]: [dispatch]:\n%s", - hms, - target, - ", ".join(job_name for job_name in to_dispatch), - ) - - for job_name in to_dispatch: - self._dispatch_job(hms, target, job_name) - - def _kill(self) -> None: - """Kill any running items and cancel any that are waiting.""" - # Cancel any waiting items. We take a copy of self._queued to avoid - # iterating over the set as we modify it. - for target in self._queued: - for item in list(self._queued[target]): - self._cancel_item(item) - - # Kill any running items. Again, take a copy of the set to avoid - # modifying it while iterating over it. - for target in self._running: - for item in list(self._running[target]): - self._kill_item(item) - - def _check_if_done(self, hms: str) -> bool: - """Check if we are done executing all jobs. - - Also, prints the status of currently running jobs. - """ - done = True - for target in self._scheduled: - done_cnt = sum( - [ - len(self._passed[target]), - len(self._failed[target]), - len(self._killed[target]), - ], + "[%s]: Dispatching jobs: %s", + backend_name, + ", ".join(job.full_name for job in job_specs), ) - done = done and (done_cnt == self._total[target]) - - # Skip if a target has not even begun executing. - if not (self._queued[target] or self._running[target] or done_cnt > 0): - continue + launch_tasks.append(backend.submit_many(job_specs)) - perc = done_cnt / self._total[target] * 100 + results = await asyncio.gather(*launch_tasks) - running = ", ".join( - [f"{job_name}" for job_name in self._running[target]], - ) - msg = self._msg_fmt.format( - len(self._queued[target]), - len(self._running[target]), - len(self._passed[target]), - len(self._failed[target]), - len(self._killed[target]), - self._total[target], - ) - self._status_printer.update_target( - target=target, - msg=msg, - hms=hms, - perc=perc, - running=running, - ) - return done - - def _cancel_item(self, job_name: str, *, cancel_successors: bool = True) -> None: - """Cancel an item and optionally all of its successors. - - Supplied item may be in _scheduled list or the _queued list. From - either, we move it straight to _killed. - - Args: - job_name: name of the job to cancel - cancel_successors: if set then cancel successors as well (True). - - """ - job = self._jobs[job_name] - self.job_status[job_name] = JobStatus.KILLED - self._killed[job.target].add(job_name) - self._instrumentation.on_job_status_change(job, JobStatus.KILLED) - if job_name in self._queued[job.target]: - self._queued[job.target].remove(job_name) - else: - self._unschedule_item(job_name) - - if cancel_successors: - self._cancel_successors(job_name) - - def _kill_item(self, job_name: str) -> None: - """Kill a running item and cancel all of its successors. - - Args: - job_name: name of the job to kill + # Mark jobs running, and requeue any jobs that failed to launch + for jobs, handles in zip(to_launch.values(), results, strict=True): + for neg_priority, job in jobs: + handle = handles.get(job.spec.id) + if handle is None: + log.verbose("[%s]: Requeuing job '%s'", job.spec.target, job.spec.full_name) + heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) + continue - """ - job = self._jobs[job_name] - self._launchers[job_name].kill() - self.job_status[job_name] = JobStatus.KILLED - self._killed[job.target].add(job_name) - self._instrumentation.on_job_status_change(job, JobStatus.KILLED) - self._running[job.target].remove(job_name) - self._cancel_successors(job_name) + job.handle = handle + self._mark_job_running(job) diff --git a/src/dvsim/scheduler/status_printer.py b/src/dvsim/scheduler/status_printer.py index b20a707f..d3dc19b9 100644 --- a/src/dvsim/scheduler/status_printer.py +++ b/src/dvsim/scheduler/status_printer.py @@ -4,229 +4,336 @@ """Job status printing during a scheduled run.""" +import asyncio import os import shutil import sys import termios +import time +from abc import ABC, abstractmethod +from collections import defaultdict +from collections.abc import Sequence from typing import ClassVar import enlighten +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus from dvsim.logging import log +from dvsim.utils import TS_HMS_FORMAT -DEFAULT_HEADER = "Q: queued, R: running, P: passed, F: failed, K: killed, T: total" -"""The default header to use for printing the status.""" +class StatusPrinter(ABC): + """Status Printer abstract base class. -class StatusPrinter: - """Dummy Status Printer class for interactive mode. - - When interactive mode is set, dvsim does not print the status. By - instantiating this dummy class (printing nothing), outer interface stays - same. + Contains core functionality related to status printing - a print interval can be configured + to control how often the scheduler target statuses are printed, which is managed by an async + thread. Optionally, the print interval can be configured to 0 to run in an update-driven mode + where every single status update is printed. Regardless of the configured print interval, the + final job update for each target is printed immediately to reflect final target end timings. """ - def __init__(self) -> None: - """Initialise.""" - - def print_header(self) -> None: - """Initialize / print the header bar. - - The header bar contains an introductory message such as the legend of - what Q, D, ... mean. - """ - - def init_target(self, target: str, msg: str) -> None: - """Initialize the status bar for each target.""" - - def update_target( - self, - target: str, - hms: str, - msg: str, - perc: float, - running: str, - ) -> None: - """Periodically update the status bar for each target. - - Args: - hms: Elapsed time in hh:mm:ss. - target: The tool flow step. - msg: The completion status message (set externally). - perc: Percentage of completion. - running: What jobs are currently still running. + # How often we print by default. Zero means we should print on every event change. + print_interval = 0 + + def __init__(self, jobs: Sequence[JobSpec], print_interval: int | None = None) -> None: + """Construct the base StatusPrinter.""" + # Mapping from target -> (Mapping from status -> count) + self._target_counts: dict[str, dict[JobStatus, int]] = defaultdict(lambda: defaultdict(int)) + # Mapping from target -> number of jobs + self._totals: dict[str, int] = defaultdict(int) + + for job in jobs: + self._target_counts[job.target][JobStatus.SCHEDULED] += 1 + self._totals[job.target] += 1 + + # The number of characters used to represent the largest field in the displayed table + self._field_width = max((len(str(total)) for total in self._totals.values()), default=0) + + # State tracking for the StatusPrinter + self._start_time: float = 0.0 + self._last_print: float = 0.0 + self._running: dict[str, list[str]] = defaultdict(list) + self._num_finished: dict[str, int] = defaultdict(int) + self._finish_time: dict[str, float] = {} + + # Async target status update handling + self._task: asyncio.Task | None = None + self._paused: bool = False + + self._interval = print_interval if print_interval is not None else self.print_interval + + @property + def updates_every_event(self) -> bool: + """If the configured print interval is 0, statuses are updated on every state change.""" + return self._interval <= 0 + + def start(self) -> None: + """Start printing the status of the scheduled jobs.""" + self._start_time = time.monotonic() + self._print_header() + for target in self._target_counts: + self._init_target(target, self._get_target_row(target)) + + # If we need an async task to manage the print interval, create one + if not self.updates_every_event: + self._task = asyncio.create_task(self._run()) + + async def _run(self) -> None: + """Run a timer in an async loop, printing the updated status at every interval.""" + next_tick = self._start_time + self._interval + self.update_all_targets(including_unstarted=True) + + while True: + now = time.monotonic() + sleep_time = max(0, next_tick - now) + await asyncio.sleep(sleep_time) + self.update_all_targets() + next_tick += self._interval + + def update_all_targets(self, *, including_unstarted: bool = False) -> None: + """Update the status bars of all targets.""" + if self._paused: + return + update_time = time.monotonic() + for target in self._target_counts: + # Only update targets that have started (some job status has changed) + if self.target_is_started(target) or including_unstarted: + target_update_time = self._finish_time.get(target, update_time) + self._update_target(target_update_time, target) + + def target_is_started(self, target: str) -> bool: + """Check whether a target has been started yet or not.""" + return bool(self._num_finished[target]) or bool(self._running[target]) + + def target_is_done(self, target: str) -> bool: + """Check whether a target is finished or not.""" + return self._num_finished[target] >= self._totals[target] + + def update_status(self, job: JobSpec, old_status: JobStatus, new_status: JobStatus) -> None: + """Update the status printer to reflect a change in job status.""" + status_counts = self._target_counts[job.target] + status_counts[old_status] -= 1 + if old_status == JobStatus.RUNNING: + self._running[job.target].remove(job.full_name) + status_counts[new_status] += 1 + if new_status == JobStatus.RUNNING: + self._running[job.target].append(job.full_name) + if not old_status.is_terminal and new_status.is_terminal: + self._num_finished[job.target] += 1 + + if self.target_is_done(job.target) and not self.updates_every_event: + # Even if we have a configured print interval, we should record + # the time at which the target finished to capture accurate end timing. + self._finish_time[job.target] = time.monotonic() + elif self.updates_every_event: + self.update_all_targets() + + def _get_header(self) -> str: + """Get the header string to use for printing the status.""" + return ( + ", ".join( + f"{status.shorthand}: {status.name.lower().rjust(self._field_width)}" + for status in JobStatus + ) + + ", T: total" + ) - """ + def _get_target_row(self, target: str) -> str: + """Get a formatted string with the fields for a given target row.""" + fields = [] + for status in JobStatus: + count = self._target_counts[target][status] + value = f"{count:0{self._field_width}d}" + fields.append(f"{status.shorthand}: {value.rjust(len(status.name))}") + total = f"{self._totals[target]:0{self._field_width}d}" + fields.append(f"T: {total.rjust(5)}") + return ", ".join(fields) + + @abstractmethod + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + + @abstractmethod + def _init_target(self, target: str, _msg: str) -> None: + """Initialize the status bar for a target.""" + + @abstractmethod + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + + def pause(self) -> None: + """Toggle whether the status printer is paused. May make target finish times inaccurate.""" + self._paused = not self._paused + if not self._paused and self.updates_every_event: + self.update_all_targets() def stop(self) -> None: """Stop the status header/target printing (but keep the printer context).""" + if self._task: + self._task.cancel() + if self._paused: + self._paused = False + self.update_all_targets(including_unstarted=True) - def exit(self) -> None: + def exit(self) -> None: # noqa: B027 """Do cleanup activities before exiting.""" class TtyStatusPrinter(StatusPrinter): - """Abstraction for printing the current target status onto the console. - - Targets are ASIC tool flow steps such as build, run, cov etc. These steps - are sequenced by the Scheduler. There may be multiple jobs running in - parallel in each target. This class provides a mechanism to periodically - print the completion status of each target onto the terminal. Messages - printed by this class are rather static in nature - all the necessary - computations of how the jobs are progressing need to be handled externally. + """Prints the current scheduler target status onto the console / TTY via logging.""" - The following are the 'fields' accepted by this class: - """ - - # Print elapsed time in bold. hms_fmt = "\x1b[1m{hms:9s}\x1b[0m" header_fmt = hms_fmt + " [{target:^13s}]: [{msg}]" - status_fmt = header_fmt + " {perc:3.0f}% {running}" - - def __init__(self) -> None: - """Initialise printer.""" - super().__init__() + status_fmt = header_fmt + " {percent:3.0f}% {running}" - # Once a target is complete, we no longer need to update it - we can - # just skip it. Maintaining this here provides a way to print the status - # one last time when it reaches 100%. It is much easier to do that here - # than in the Scheduler class. - self.target_done = {} + def __init__(self, jobs: Sequence[JobSpec]) -> None: + """Initialise the TtyStatusPrinter.""" + super().__init__(jobs) - def print_header(self) -> None: - """Initialize / print the header bar. + # Maintain a mapping of completed targets, so we only print the status one last + # time when it reaches 100% for a target. + self._target_done: dict[str, bool] = {} - The header bar contains an introductory message such as the legend of - what Q, D, ... mean. - """ - log.info(self.header_fmt.format(hms="", target="legend", msg=DEFAULT_HEADER)) + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + log.info(self.header_fmt.format(hms="", target="legend", msg=self._get_header())) - def init_target(self, target: str, msg: str) -> None: - """Initialize the status bar for each target.""" - self.target_done[target] = False + def _init_target(self, target: str, _msg: str) -> None: + """Initialize the status bar for a target.""" + self._target_done[target] = False def _trunc_running(self, running: str, width: int = 30) -> str: - """Truncate the list of running items to a specified width.""" + """Truncate the list of running items to a specified length.""" if len(running) <= width: return running return running[: width - 3] + "..." - def update_target( - self, - target: str, - hms: str, - msg: str, - perc: float, - running: str, - ) -> None: - """Periodically update the status bar for each target. - - Args: - hms: Elapsed time in hh:mm:ss. - target: The tool flow step. - msg: The completion status message (set externally). - perc: Percentage of completion. - running: What jobs are currently still running. - - """ - if self.target_done[target]: + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + if self._target_done[target]: return + if self.target_is_done(target): + self._target_done[target] = True + + status_counts = self._target_counts[target] + done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) + percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 + elapsed_time = time.gmtime(current_time - self._start_time) log.info( self.status_fmt.format( - hms=hms, + hms=time.strftime(TS_HMS_FORMAT, elapsed_time), target=target, - msg=msg, - perc=perc, - running=self._trunc_running(running), + msg=self._get_target_row(target), + percent=percent, + running=self._trunc_running(", ".join(self._running[target])), ), ) - if perc == 100: - self.target_done[target] = True class EnlightenStatusPrinter(TtyStatusPrinter): - """Abstraction for printing status using Enlighten. + """Prints the current scheduler target status to the terminal using Enlighten. Enlighten is a third party progress bar tool. Documentation: https://python-enlighten.readthedocs.io/en/stable/ - Though it offers very fancy progress bar visualization, we stick to a - simple status bar 'pinned' to the bottom of the screen for each target - that displays statically, a pre-prepared message. We avoid the progress bar - visualization since it requires enlighten to perform some computations the - Scheduler already does. It also helps keep the overhead to a minimum. - - Enlighten does not work if the output of dvsim is redirected to a file, for + Enlighten does not work if the output of DVSim is redirected to a file, for example - it needs to be attached to a TTY enabled stream. """ + # Enlighten uses a min_delta of 0.1 by default, only updating every 0.1 seconds. + DEFAULT_MIN_DELTA = 0.1 + status_fmt_no_running = TtyStatusPrinter.status_fmt.removesuffix("{running}") status_fmt = "{status_msg}{running}" - def __init__(self) -> None: - super().__init__() - - # Initialize the status_bars for header and the targets . - self.manager = enlighten.get_manager() - self.status_header = None - self.status_target = {} + def __init__(self, jobs: Sequence[JobSpec]) -> None: + """Initialise the EnlightenStatusPrinter.""" + super().__init__(jobs) + if self._interval < self.DEFAULT_MIN_DELTA: + # TODO: maybe "debounce" the updates with a delayed async refresh task? + log.warning( + "Configured print interval %g will not accurately reflect for %s," + " which uses status bars with a configured min_delta of %g by default.", + self._interval, + self.__class__.__name__, + self.DEFAULT_MIN_DELTA, + ) + + # Initialize the enlighten manager and needed state + self._manager = enlighten.get_manager() + self._status_header: enlighten.StatusBar | None = None + self._status_bars: dict[str, enlighten.StatusBar] = {} self._stopped = False - def print_header(self) -> None: - self.status_header = self.manager.status_bar( + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + self._status_header = self._manager.status_bar( status_format=self.header_fmt, hms="", target="legend", - msg=DEFAULT_HEADER, + msg=self._get_header(), ) - def init_target(self, target, msg) -> None: - super().init_target(target, msg) - status_msg = self.status_fmt_no_running.format(hms="", target=target, msg=msg, perc=0.0) - self.status_target[target] = self.manager.status_bar( + def _init_target(self, target: str, msg: str) -> None: + """Initialize the status bar for a target.""" + super()._init_target(target, msg) + hms = time.strftime(TS_HMS_FORMAT, time.gmtime(0)) + msg = self.status_fmt_no_running.format(hms=hms, target=target, msg=msg, percent=0.0) + self._status_bars[target] = self._manager.status_bar( status_format=self.status_fmt, - status_msg=status_msg, + status_msg=msg, running="", ) def _trunc_running_to_terminal(self, running: str, offset: int) -> str: """Truncate the list of running items to match the max terminal width.""" cols = shutil.get_terminal_size(fallback=(80, 24)).columns - width = max(30, cols - offset) + width = max(30, cols - offset - 1) return self._trunc_running(running, width) - def update_target(self, target, hms, msg, perc, running) -> None: - if self.target_done[target]: + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + if self._target_done[target]: return + status_counts = self._target_counts[target] + done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) + percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 + elapsed_time = time.gmtime(current_time - self._start_time) + status_msg = self.status_fmt_no_running.format( - hms=hms, + hms=time.strftime(TS_HMS_FORMAT, elapsed_time), target=target, - msg=msg, - perc=perc, + msg=self._get_target_row(target), + percent=percent, ) - offset = len(status_msg) - running = self._trunc_running_to_terminal(running, offset) + running = self._trunc_running_to_terminal(", ".join(self._running[target]), offset) - self.status_target[target].update(status_msg=status_msg, running=running) - if perc == 100: - self.target_done[target] = True + self._status_bars[target].update(status_msg=status_msg, running=running) + + if self.target_is_done(target): + self._target_done[target] = True + self._status_bars[target].refresh() def stop(self) -> None: """Stop the status header/target printing (but keep the printer context).""" - if self.status_header is not None: - self.status_header.close() - for target in self.status_target: - self.status_target[target].close() + super().stop() + if self._status_header is not None: + self._status_header.close() + for status_bar in self._status_bars.values(): + status_bar.close() self._stopped = True def exit(self) -> None: """Do cleanup activities before exiting (closing the manager context).""" + super().exit() if not self._stopped: self.stop() - self.manager.stop() + self._manager.stop() + # Sometimes, exiting via a signal (e.g. Ctrl-C) can cause Enlighten to leave the # terminal in some non-raw mode. Just in case, restore regular operation. self._restore_terminal() @@ -245,8 +352,7 @@ def _restore_terminal(self) -> None: # We make this change after all buffered output is transmitted (TCSADRAIN). try: attrs = termios.tcgetattr(fd) - control_flag = 3 - attrs[control_flag] |= termios.ECHO | termios.ICANON + attrs[3] |= termios.ECHO | termios.ICANON termios.tcsetattr(fd, termios.TCSADRAIN, attrs) except termios.error: log.debug("Unable to restore terminal attributes safely") @@ -271,22 +377,24 @@ def get(cls) -> StatusPrinter | None: return cls._instance -def get_status_printer(interactive: bool) -> StatusPrinter: - """Get the global status printer. +def create_status_printer(jobs: Sequence[JobSpec]) -> StatusPrinter: + """Create the global status printer. If stdout is a TTY, then return an instance of EnlightenStatusPrinter, else - return an instance of StatusPrinter. If the status printer has already been - created, then returns that instance, regardless of given arguments. + return an instance of StatusPrinter. """ status_printer = StatusPrinterSingleton.get() if status_printer is not None: return status_printer - if interactive: - status_printer = StatusPrinter() - elif sys.stdout.isatty(): - status_printer = EnlightenStatusPrinter() - else: - status_printer = TtyStatusPrinter() + status_printer = EnlightenStatusPrinter(jobs) if sys.stdout.isatty() else TtyStatusPrinter(jobs) StatusPrinterSingleton.set(status_printer) return status_printer + + +def get_status_printer() -> StatusPrinter: + """Retrieve the configured global status printer.""" + status_printer = StatusPrinterSingleton.get() + if status_printer is None: + raise RuntimeError("get_status_printer called without first creating the status printer") + return status_printer diff --git a/src/dvsim/utils/__init__.py b/src/dvsim/utils/__init__.py index 092c6104..b893ecb2 100644 --- a/src/dvsim/utils/__init__.py +++ b/src/dvsim/utils/__init__.py @@ -9,7 +9,6 @@ from dvsim.utils.hjson import parse_hjson from dvsim.utils.subprocess import run_cmd, run_cmd_with_timeout from dvsim.utils.time import TS_FORMAT, TS_FORMAT_LONG, TS_HMS_FORMAT -from dvsim.utils.timer import Timer from dvsim.utils.wildcards import ( find_and_substitute_wildcards, subst_wildcards, @@ -19,7 +18,6 @@ "TS_FORMAT", "TS_FORMAT_LONG", "TS_HMS_FORMAT", - "Timer", "check_bool", "check_int", "clean_odirs", diff --git a/src/dvsim/utils/timer.py b/src/dvsim/utils/timer.py deleted file mode 100644 index b3b05892..00000000 --- a/src/dvsim/utils/timer.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -import time - -from dvsim.utils.time import TS_HMS_FORMAT - - -class Timer: - """A timer to keep track of how long jobs have been running. - - This has a notion of start time (the time when the object was constructed), - together with a time when the results should next be printed. - - """ - - print_interval = 5 - - def __init__(self) -> None: - self.start = time.monotonic() - self.next_print = self.start + Timer.print_interval - self.first_print = True - - def period(self): - """Return the float time in seconds since start.""" - return time.monotonic() - self.start - - def hms(self) -> str: - """Get the time since start in hh:mm:ss.""" - return time.strftime(TS_HMS_FORMAT, time.gmtime(self.period())) - - def check_time(self) -> bool: - """Return true if we have passed next_print. - - If so, increment next_print by print_interval unless the result would - be in the past, in which case set it to the current time plus - print_interval. - - """ - now = time.monotonic() - - if self.first_print: - self.first_print = False - return True - - if now < self.next_print: - return False - - self.next_print += Timer.print_interval - if self.next_print <= now: - self.next_print = now + Timer.print_interval - - return True diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index a443f924..460fa495 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -24,7 +24,7 @@ from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError from dvsim.report.data import IPMeta, ToolMeta from dvsim.runtime.legacy import LegacyLauncherAdapter -from dvsim.scheduler.async_core import Scheduler +from dvsim.scheduler.core import Scheduler __all__ = ()