diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index c25c1dd8..ed880fa3 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -43,9 +43,15 @@ from dvsim.launcher.sge import SgeLauncher from dvsim.launcher.slurm import SlurmLauncher from dvsim.logging import LOG_LEVELS, configure_logging, log +from dvsim.runtime.registry import BackendType, backend_registry +from dvsim.scheduler.async_status_printer import StatusPrinter +from dvsim.scheduler.async_status_printer import get_status_printer as get_async_status_printer from dvsim.scheduler.status_printer import get_status_printer from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout +# Temporary: set to 1 to enable experimental use of the async scheduler (not yet fully integrated) +EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER = os.environ.get("EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER", None) + # The different categories that can be passed to the --list argument. _LIST_CATEGORIES = ["build_modes", "run_modes", "tests", "regressions"] @@ -839,6 +845,37 @@ def parse_args(argv: list[str] | None = None): return args +def set_backend_type(*, is_local: bool = False, fake: bool = False) -> None: + """Set the default backend type that will be used to launch jobs (unless overridden). + + The DVSIM_BACKEND/DVSIM_LAUNCHER environment variables are used to identify what + backend should be used by default, and is intended to be specific to the user's + work site and set externally before invoking DVSim. Selecting a local or fake backend + via the command line will override this. + """ + if is_local: + backend = "local" + elif fake: + backend = "fake" + else: + backend = os.environ.get("DVSIM_BACKEND") + + if backend is None: + # Fall back to the legacy launcher environment variable + backend = os.environ.get("DVSIM_LAUNCHER", "local") + + available_backends = backend_registry.available() + if backend not in available_backends: + log.error( + "Backend %s set using the DVSIM_BACKEND/DVSIM_LAUNCHER environment variables " + "does not exist. Using the local backend instead." + ) + backend = "local" + + # Configure the resolved backend type as the default backend + backend_registry.set_default(BackendType(backend)) + + def main(argv: list[str] | None = None) -> None: """DVSim CLI entry point.""" args = parse_args(argv) @@ -884,6 +921,7 @@ def main(argv: list[str] | None = None) -> None: # Register the common deploy settings. Timer.print_interval = args.print_interval + StatusPrinter.print_interval = args.print_interval LocalLauncher.max_parallel = args.max_parallel SlurmLauncher.max_parallel = args.max_parallel SgeLauncher.max_parallel = args.max_parallel @@ -893,6 +931,9 @@ def main(argv: list[str] | None = None) -> None: FakeLauncher.max_parallel = args.max_parallel set_launcher_type(is_local=args.local, fake=args.fake) + # Configure the runtime backend. TODO: deprecate `set_launcher_type` above. + set_backend_type(is_local=args.local, fake=args.fake) + # Configure scheduler instrumentation set_instrumentation(InstrumentationFactory.create(args.instrumentation)) @@ -935,8 +976,13 @@ def main(argv: list[str] | None = None) -> None: # Now that we have printed the results from the scheduler, we close the # status printer, to ensure the status remains relevant in the UI context # (for applicable status printers). - status_printer = get_status_printer(args.interactive) - status_printer.exit() + if EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER: + if not args.interactive: + status_printer = get_async_status_printer() + status_printer.exit() + else: + status_printer = get_status_printer(args.interactive) + status_printer.exit() else: log.error("Nothing to run!") diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 847650f1..ce3a861b 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -4,6 +4,7 @@ """Flow config base class.""" +import asyncio import json import os import pprint @@ -17,10 +18,14 @@ from dvsim import instrumentation from dvsim.flow.hjson import set_target_attribute -from dvsim.job.data import CompletedJobStatus +from dvsim.job.data import CompletedJobStatus, JobSpec from dvsim.launcher.factory import get_launcher_cls from dvsim.logging import log +from dvsim.runtime.registry import backend_registry +from dvsim.scheduler.async_core import Scheduler as AsyncScheduler +from dvsim.scheduler.async_status_printer import create_status_printer from dvsim.scheduler.core import Scheduler +from dvsim.scheduler.log_manager import LogManager from dvsim.utils import ( find_and_substitute_wildcards, rm_path, @@ -33,6 +38,10 @@ __all__ = ("FlowCfg",) +# Temporary: set to 1 to enable experimental use of the async scheduler (not yet fully integrated) +EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER = os.environ.get("EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER", None) + + # Interface class for extensions. class FlowCfg(ABC): """Base class for the different flows supported by dvsim.py. @@ -442,12 +451,69 @@ def deploy_objects(self) -> Sequence[CompletedJobStatus]: ), ) + if EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER: + return asyncio.run(self.run_scheduler(jobs)) + return Scheduler( items=jobs, launcher_cls=get_launcher_cls(), interactive=self.interactive, ).run() + async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]: + """Run the scheduler with the given set of job specifications.""" + # Create the runtime backends. TODO: support multiple runtime backends at once + default_backend_factory = backend_registry.get() + default_backend = default_backend_factory() + + scheduler = AsyncScheduler( + jobs=jobs, + backends={default_backend.name: default_backend}, + default_backend=default_backend.name, + max_parallelism=self.args.max_parallel, + # TODO: introduce a better prioritization function that accounts for timeout + ) + + if not self.interactive: + status_printer = create_status_printer(jobs) + + # Add status printer hooks + scheduler.add_run_start_callback(status_printer.start) + scheduler.add_job_status_change_callback(status_printer.update_status) + scheduler.add_run_end_callback(status_printer.stop) + scheduler.add_kill_signal_callback(status_printer.pause) + + # Add log manager hooks + log_manager = LogManager() + scheduler.add_job_status_change_callback( + lambda spec, _old, new: log_manager.on_job_status_change(spec, new) + ) + + # Setup instrumentation + inst = instrumentation.get() + if inst is not None: + inst.start() + + # Add instrumentation hooks + scheduler.add_run_start_callback(inst.on_scheduler_start) + scheduler.add_run_end_callback(inst.on_scheduler_end) + scheduler.add_job_status_change_callback( + lambda spec, _old, new: inst.on_job_status_change(spec, new) + ) + + # Run the scheduler and cleanup + try: + results = await scheduler.run() + finally: + await default_backend.close() + + # Finalize instrumentation + if inst is not None: + inst.stop() + instrumentation.flush() + + return results + @abstractmethod def gen_results(self, results: Sequence[CompletedJobStatus]) -> None: """Generate flow results. diff --git a/src/dvsim/instrumentation/metadata.py b/src/dvsim/instrumentation/metadata.py index 7871db1a..f674969f 100644 --- a/src/dvsim/instrumentation/metadata.py +++ b/src/dvsim/instrumentation/metadata.py @@ -29,6 +29,7 @@ class MetadataJobFragment(JobFragment): job_type: str target: str tool: str + backend: str | None dependencies: list[str] status: str @@ -61,6 +62,7 @@ def build_report_fragments(self) -> InstrumentationFragments | None: spec.job_type, spec.target, spec.tool.name, + spec.backend, spec.dependencies, status_str, ) diff --git a/src/dvsim/instrumentation/resources.py b/src/dvsim/instrumentation/resources.py index d02ff82c..8b9505cb 100644 --- a/src/dvsim/instrumentation/resources.py +++ b/src/dvsim/instrumentation/resources.py @@ -227,7 +227,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: with self._lock: running = job_id in self._running_jobs started = running or job_id in self._finished_jobs - if not started and status != JobStatus.QUEUED: + if not started and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): self._running_jobs[job_id] = JobResourceAggregate(job) running = True if running and status.is_terminal: diff --git a/src/dvsim/instrumentation/timing.py b/src/dvsim/instrumentation/timing.py index d0c1192b..9766c34a 100644 --- a/src/dvsim/instrumentation/timing.py +++ b/src/dvsim/instrumentation/timing.py @@ -99,7 +99,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: job_info = TimingJobFragment(job) self._jobs[job_id] = job_info - if job_info.start_time is None and status != JobStatus.QUEUED: + if job_info.start_time is None and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): job_info.start_time = time.perf_counter() if status.is_terminal: job_info.end_time = time.perf_counter() diff --git a/src/dvsim/job/data.py b/src/dvsim/job/data.py index d820738c..39d9a936 100644 --- a/src/dvsim/job/data.py +++ b/src/dvsim/job/data.py @@ -54,6 +54,11 @@ class JobSpec(BaseModel): target: str """run phase [build, run, ...]""" + backend: str | None + """The runtime backend to execute this job with. If not provided (None), this + indicates that whatever is configured as the 'default' backend should be used. + """ + seed: int | None """Seed if there is one.""" diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 71247b85..b75ca63b 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -110,6 +110,9 @@ def get_job_spec(self) -> "JobSpec": name=self.name, job_type=self.__class__.__name__, target=self.target, + # TODO: for now we always use the default configured backend, but it might be good + # to allow different jobs to run on different backends in the future? + backend=None, seed=getattr(self, "seed", None), full_name=self.full_name, qual_name=self.qual_name, @@ -832,13 +835,14 @@ def callback(status: JobStatus) -> None: If the extraction fails, an appropriate exception is raised, which must be caught by the caller to mark the job as a failure. """ - if self.dry_run or status != JobStatus.PASSED: + cov_report_path = Path(self.cov_report_txt) + if self.dry_run or status != JobStatus.PASSED or not cov_report_path.exists(): return plugin = get_sim_tool_plugin(tool=self.sim_cfg.tool) results, self.cov_total = plugin.get_cov_summary_table( - cov_report_path=self.cov_report_txt, + cov_report_path=cov_report_path, ) for tup in zip(*results, strict=False): diff --git a/src/dvsim/job/status.py b/src/dvsim/job/status.py index e409a155..457f3bc2 100644 --- a/src/dvsim/job/status.py +++ b/src/dvsim/job/status.py @@ -12,11 +12,14 @@ class JobStatus(Enum): """Status of a Job.""" - QUEUED = auto() - RUNNING = auto() - PASSED = auto() - FAILED = auto() - KILLED = auto() + # SCHEDULED is currently unused in the old sync scheduler, there `SCHEDULED` and `QUEUED` + # are combined under `QUEUED`. It is used only in the new async scheduler. + SCHEDULED = auto() # Waiting for dependencies + QUEUED = auto() # Dependencies satisfied, waiting to be dispatched + RUNNING = auto() # Dispatched to a backend and actively executing + PASSED = auto() # Completed successfully + FAILED = auto() # Completed with failure + KILLED = auto() # Forcibly terminated or never executed @property def shorthand(self) -> str: diff --git a/src/dvsim/runtime/backend.py b/src/dvsim/runtime/backend.py index 8c62e279..cba5edcc 100644 --- a/src/dvsim/runtime/backend.py +++ b/src/dvsim/runtime/backend.py @@ -4,17 +4,46 @@ """Runtime backend abstract base class.""" +import os +import re from abc import ABC, abstractmethod -from collections.abc import Hashable, Iterable +from collections.abc import Hashable, Iterable, Sequence +from pathlib import Path -from dvsim.job.data import JobSpec +from dvsim.job.data import JobSpec, JobStatusInfo from dvsim.job.status import JobStatus +from dvsim.job.time import JobTime from dvsim.logging import log from dvsim.runtime.data import ( CompletionCallback, JobCompletionEvent, JobHandle, ) +from dvsim.tool.utils import get_sim_tool_plugin +from dvsim.utils import clean_odirs + +__all__ = ("RuntimeBackend",) + +# A list of magic flags that are currently cleared. +# TODO: it would be good to find a nicer solution for this - perhaps a common configuration +# could just re-export it or define that it should not exist? Or it could be in a DVSim config. +MAGIC_VARS_TO_CLEAR = { + # This variable is used by recursive Make calls to pass variables from one level to the next. + # Even if our command here invokes Make, it should logically be a top-level invocation. We + # don't want to pollute the flow with Make variables from any wrapper that called DVSim. + "MAKEFLAGS", +} + +# Relative paths to files created in job output directories +ENV_DUMP_PATH = "env_vars" + + +# The number of lines to give as context when a failure pattern is parsed from a log file. +NUM_LOG_FAIL_CONTEXT_LINES = 4 +# The number of lines to give as context when pass patterns are missing from a log file. +NUM_LOG_PASS_CONTEXT_LINES = 10 +# The number of lines to give as context when a non-zero exit code is returned. +NUM_RETCODE_CONTEXT_LINES = 10 class RuntimeBackend(ABC): @@ -66,6 +95,9 @@ async def _emit_completion(self, events: Iterable[JobCompletionEvent]) -> None: raise RuntimeError("Backend not attached to the scheduler") for event in events: + # TODO: aim to refactor to remove these callbacks + event.spec.post_finish(event.status) + log.debug( "Job %s completed execution: %s", event.spec.qual_name, event.status.shorthand ) @@ -112,3 +144,228 @@ async def close(self) -> None: # noqa: B027 The default implementation just does nothing. """ + + def _build_job_env( + self, + job: JobSpec, + backend_env: dict[str, str] | None = None, + remove: Iterable[str] | None = None, + ) -> dict[str, str]: + """Build job environment configuration for a given job. + + Arguments: + job: The job specification to get the environment from. + context: The job execution context for this backend. + backend_env: Any backend-specific environment overrides to use. Defaults to None. + Takes precedence over the base OS environment, but is overridden by the job itself. + remove: A list of variables to remove from the final environment variable list. + Defaults to None. + + Returns the job environment as a mapping of env var names to values. + + """ + # Take the existing environment variables and update with any exports defined on the spec. + # TODO: consider adding some `--clean-env` CLI arg & flag to only use `job.exports` instead + # of also inheriting from `os.environ`? + env = dict(os.environ) + if backend_env: + env.update(backend_env) + env.update(job.exports) + + # If the job is set to run in "interactive" mode, we set the `RUN_INTERACTIVE` environment + # variable to 1, and also make a note in the environment. + if job.interactive: + env["DVSIM_RUN_INTERACTIVE"] = "1" + # TODO: Legacy environment variable not prefixed with `DVSIM` - deprecate this. + env["RUN_INTERACTIVE"] = "1" + + # Clear any magic flags or `remove` entries from the environment variable export list + for key in remove or (): + env.pop(key, None) + for magic_var in MAGIC_VARS_TO_CLEAR: + env.pop(magic_var, None) + + # Dump the environment variables to their own file to make debugging easier. + if job.odir and job.odir.exists(): + dump = job.odir / ENV_DUMP_PATH + with dump.open("w", encoding="utf-8", errors="surrogateescape") as f: + f.writelines(f"{key}={value}\n" for key, value in sorted(env.items())) + + return env + + def _make_job_output_directory(self, job: JobSpec) -> None: + """Create the output directory for a job. + + Depending on the configured `renew_odir` setting, this will optionally clean or maintain + a list of previous output directories for this job. + + """ + if job.renew_odir: + clean_odirs(odir=job.odir, max_odirs=self.max_output_dirs) + + Path(job.odir).mkdir(exist_ok=True, parents=True) + + def _prepare_launch(self, job: JobSpec) -> None: + """Do any pre-launch activities, preparing the environment. + + This may include clearing old runs, creating the output directory, etc. + """ + if job.interactive and not self.supports_interactive: + msg = f"Interactive jobs are not supported by the '{self.name}' backend." + raise RuntimeError(msg) + + job.pre_launch() + self._make_job_output_directory(job) + + def _finish_job( + self, handle: JobHandle, exit_code: int, runtime: float | None + ) -> tuple[JobStatus, JobStatusInfo | None]: + """Determine the outcome of a job that ran to completion, and parse extra log info. + + Updates the handle with any extracted job runtime & simulation time info. + """ + if handle.spec.dry_run: + return JobStatus.PASSED, None + + log_results = LogResults(handle.spec) + + # Update time information on the handle. + job_runtime, simulated_time = log_results.get_runtime_from_logs() + if job_runtime is None: + log.warning("%s: Using dvsim-maintained job_runtime instead.", handle.spec.full_name) + if runtime is not None: + handle.job_runtime.set(runtime, "s") + else: + handle.job_runtime.set(*job_runtime.get()) + if simulated_time is not None: + handle.simulated_time.set(*simulated_time.get()) + + # Determine the final status from the logs and exit code. + status, reason = log_results.get_status_from_logs() + if status is not None: + return status, reason + if exit_code != 0: + lines = log_results.get_lines() + return JobStatus.FAILED, JobStatusInfo( + message=f"Job returned a non-zero exit code: {exit_code}", + context=lines[-NUM_RETCODE_CONTEXT_LINES:], + ) + return JobStatus.PASSED, None + + +class LogResults: + """Wrapper for log result parsing which lazily loads the contents of the job log file.""" + + def __init__(self, job: JobSpec) -> None: + """Construct a LogResults object. Does not load the log file until needed.""" + self.spec = job + self._parsed = False + self._lines: list[str] | None = None + self._err_status: tuple[JobStatus, JobStatusInfo] | None = None + + def _ensure_log_parsed(self) -> None: + """Parse the log file into its lines if not already parsed.""" + if self._parsed: + return + + try: + with self.spec.log_path.open(encoding="utf-8", errors="surrogateescape") as f: + self._lines = f.readlines() + except OSError as e: + log.debug( + "%s: Error reading job log file %s: %s", + self.spec.full_name, + str(self.spec.log_path), + str(e), + ) + self._err_status = ( + JobStatus.FAILED, + JobStatusInfo(message=f"Error opening file {self.spec.log_path}:\n{e}"), + ) + finally: + self._parsed = True + + def get_lines(self) -> Sequence[str]: + """Get the sequence of lines in the log results, or an empty sequence if failed parsing.""" + self._ensure_log_parsed() + return () if self._lines is None else self._lines + + def get_status_from_logs(self) -> tuple[JobStatus | None, JobStatusInfo | None]: + """Determine the outcome of a completed job from its log file.""" + # Check we actually need to use the logs before loading them + use_log_check_strategy = bool(self.spec.fail_patterns) or bool(self.spec.pass_patterns) + if not use_log_check_strategy: + return None, None + + lines = self.get_lines() + if self._err_status: + return self._err_status + + fail_regex = None + if self.spec.fail_patterns: + fail_regex = re.compile("|".join(f"(?:{p})" for p in self.spec.fail_patterns)) + pass_regexes = {re.compile(pattern) for pattern in self.spec.pass_patterns} + + # TODO: does this need to be restricted to per-line patterns? It would complicate line + # number parsing, but it might be useful to make this more expressive? + for lineno, line in enumerate(lines, start=1): + # If the job matches ANY fail pattern, it fails. Provide some extra lines for context. + if fail_regex and fail_regex.search(line): + end = lineno + NUM_LOG_FAIL_CONTEXT_LINES + return JobStatus.FAILED, JobStatusInfo( + message=line.strip(), lines=[lineno], context=lines[lineno:end] + ) + + # The job must match ALL pass patterns to succeed. + matched = {regex for regex in pass_regexes if regex.search(line)} + pass_regexes -= matched + + if not pass_regexes and not fail_regex: + break # Early exit if possible + + if pass_regexes: + pass_patterns = [regex.pattern for regex in pass_regexes] + return JobStatus.FAILED, JobStatusInfo( + message=f"Some pass patterns missing: {pass_patterns}", + context=lines[-NUM_LOG_PASS_CONTEXT_LINES:], + ) + + return None, None + + def get_runtime_from_logs(self) -> tuple[JobTime | None, JobTime | None]: + """Try to determine a job's runtime from its log file, using specified extensions.""" + # TODO: rather than check the job type here, in the future the sim tool plugin should + # define the job types it supports. Even longer term, perhaps the job time and sim time + # should not be defined on the JobHandle/CompletedJobStatus and should be directly parsed + # out of the resulting log artifacts by the respective flows. + sim_job_types = ["CompileSim", "RunTest", "CovUnr", "CovMerge", "CovReport", "CovAnalyze"] + supports_log_info_ext = self.spec.job_type in sim_job_types + if not supports_log_info_ext: + return None, None + + lines = self.get_lines() + if self._err_status: + return None, None + + try: + plugin = get_sim_tool_plugin(tool=self.spec.tool.name) + except NotImplementedError as e: + log.error("%s: %s", self.spec.full_name, str(e)) + return None, None + + runtime = None + try: + time, unit = plugin.get_job_runtime(log_text=lines) + runtime = JobTime(time, unit) + except RuntimeError as e: + log.warning("%s: %s", self.spec.full_name, str(e)) + + simulated_time = None + if self.spec.job_type == "RunTest": + try: + time, unit = plugin.get_simulated_time(log_text=lines) + simulated_time = JobTime(time, unit) + except RuntimeError as e: + log.debug("%s: %s", self.spec.full_name, str(e)) + + return runtime, simulated_time diff --git a/src/dvsim/runtime/local.py b/src/dvsim/runtime/local.py new file mode 100644 index 00000000..71c98798 --- /dev/null +++ b/src/dvsim/runtime/local.py @@ -0,0 +1,334 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Legacy launcher adapter interface for the new async scheduler design.""" + +import asyncio +import contextlib +import shlex +import signal +import subprocess +import time +from collections.abc import Hashable, Iterable +from dataclasses import dataclass +from typing import TextIO + +import psutil + +from dvsim.job.data import JobSpec, JobStatusInfo +from dvsim.job.status import JobStatus +from dvsim.job.time import JobTime +from dvsim.logging import log +from dvsim.runtime.backend import RuntimeBackend +from dvsim.runtime.data import JobCompletionEvent, JobHandle + + +@dataclass(kw_only=True) +class LocalJobHandle(JobHandle): + """Job handle for a job belonging to a legacy launcher adapter runtime backend.""" + + process: asyncio.subprocess.Process | None + log_file: TextIO | None + start_time: float + kill_requested: bool = False + + +class LocalRuntimeBackend(RuntimeBackend): + """Launch jobs as subprocesses on the user's local machine.""" + + name = "local" + supports_interactive = True + + DEFAULT_SIGTERM_TIMEOUT = 2.0 # in seconds + DEFAULT_SIGKILL_TIMEOUT = 2.0 # in seconds + + def __init__( + self, + *, + max_parallelism: int | None = None, + sigterm_timeout: float | None = None, + sigkill_timeout: float | None = None, + ) -> None: + """Construct a local runtime backend. + + Args: + max_parallelism: The maximum number of jobs that can be dispatched to this backend + at once. `0` means no limit, `None` means no override is applied to the default. + sigterm_timeout: The time to wait for a process to die after a SIGTERM when killing + it, before sending SIGKILL. + sigkill_timeout: The time to wait for a process to die after a SIGKILL when killing + it, before giving up (so the scheduler can progress). + + """ + super().__init__(max_parallelism=max_parallelism) + self.sigterm_timeout = ( + sigterm_timeout if sigterm_timeout is not None else self.DEFAULT_SIGTERM_TIMEOUT + ) + self.sigkill_timeout = ( + sigkill_timeout if sigkill_timeout is not None else self.DEFAULT_SIGKILL_TIMEOUT + ) + + # Retain references to created asyncio tasks so they don't get GC'd. + self._tasks: set[asyncio.Task] = set() + + async def _log_from_pipe( + self, handle: LocalJobHandle, stream: asyncio.StreamReader | None + ) -> None: + """Write piped asyncio subprocess stream contents to a job's log file.""" + if stream is None or not handle.log_file: + return + try: + async for line in stream: + decoded = line.decode("utf-8", errors="surrogateescape") + handle.log_file.write(decoded) + handle.log_file.flush() + except asyncio.CancelledError: + pass + + async def _monitor_job(self, handle: LocalJobHandle) -> None: + """Wait for subprocess completion and emit a completion event.""" + if handle.process is None: + return + + if handle.log_file: + handle.log_file.write(f"[Executing]:\n{handle.spec.cmd}\n\n") + handle.log_file.flush() + + reader_tasks = [ + asyncio.create_task(self._log_from_pipe(handle, handle.process.stdout)), + asyncio.create_task(self._log_from_pipe(handle, handle.process.stderr)), + ] + status = JobStatus.KILLED + reason = None + + try: + exit_code = await asyncio.wait_for( + handle.process.wait(), timeout=handle.spec.timeout_secs + ) + runtime = time.monotonic() - handle.start_time + status, reason = self._finish_job(handle, exit_code, runtime) + except asyncio.TimeoutError: + await self._kill_job(handle) + status = JobStatus.KILLED + timeout_message = f"Job timed out after {handle.spec.timeout_mins} minutes" + reason = JobStatusInfo(message=timeout_message) + finally: + # Explicitly cancel reader tasks and wait for them to finish before closing the log + # file. We first give them a second to finish naturally to reduce log loss. + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait(reader_tasks, timeout=1) + for task in reader_tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*reader_tasks, return_exceptions=True) + + if handle.log_file: + handle.log_file.close() + if handle.kill_requested: + status = JobStatus.KILLED + reason = JobStatusInfo(message="Job killed!") + await self._emit_completion([JobCompletionEvent(handle.spec, status, reason)]) + + def _launch_interactive_job( + self, + job: JobSpec, + log_file: TextIO | None, + env: dict[str, str], + ) -> tuple[LocalJobHandle, JobCompletionEvent | None]: + """Launch a job in interactive mode with transparent stdin and stdout.""" + start_time = time.monotonic() + exit_code = None + completion = None + + if log_file is not None: + try: + proc = subprocess.Popen( + shlex.split(job.cmd), + # Transparent stdin/stdout, stdout & stderr muxed and tee'd via the pipe. + stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + env=env, + ) + if proc.stdout is not None: + for line in proc.stdout: + print(line, end="") # noqa: T201 + log_file.write(line) + log_file.flush() + + exit_code = proc.wait() + except subprocess.SubprocessError as e: + log_file.close() + log.exception("Error launching job subprocess: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completion = JobCompletionEvent(job, JobStatus.KILLED, reason) + + runtime = time.monotonic() - start_time + handle = LocalJobHandle( + spec=job, + backend=self.name, + job_runtime=JobTime(), + simulated_time=JobTime(), + process=None, + log_file=log_file, + start_time=start_time, + ) + + if exit_code is not None: + status, reason = self._finish_job(handle, exit_code, runtime) + completion = JobCompletionEvent(job, status, reason) + + return handle, completion + + async def _launch_job( + self, + job: JobSpec, + log_file: TextIO | None, + env: dict[str, str], + ) -> tuple[LocalJobHandle | None, JobCompletionEvent | None]: + """Launch a job (in non-interactive mode) as an async subprocess.""" + proc = None + completion = None + if log_file is not None: + try: + proc = await asyncio.create_subprocess_exec( + *shlex.split(job.cmd), + # TODO: currently we mux the stdout and stderr streams by default. It would be + # useful to make this behaviour optional on some global `IoPolicy`. + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + except BlockingIOError: + # Skip this job for now; the scheduler should re-try to launch it later. + log_file.close() + return None, None + except subprocess.SubprocessError as e: + log_file.close() + log.exception("Error launching job subprocess: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completion = JobCompletionEvent(job, JobStatus.KILLED, reason) + + handle = LocalJobHandle( + spec=job, + backend=self.name, + job_runtime=JobTime(), + simulated_time=JobTime(), + process=proc, + log_file=log_file, + start_time=time.monotonic(), + ) + + # Create a task to asynchronously monitor the launched subprocess. + # We must store a reference in self._tasks to ensure the task is not GC'd. + if proc is not None: + task = asyncio.create_task(self._monitor_job(handle)) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + + return handle, completion + + async def submit_many(self, jobs: Iterable[JobSpec]) -> dict[Hashable, JobHandle]: + """Submit & launch multiple jobs. + + Returns: + mapping from job.id -> JobHandle. Entries are only present for jobs that successfully + launched; jobs that failed in a non-fatal way are missing, and should be retried. + + """ + completions: list[JobCompletionEvent] = [] + handles: dict[Hashable, JobHandle] = {} + + for job in jobs: + env = self._build_job_env(job) + self._prepare_launch(job) + + log_file = None + try: + log_file = job.log_path.open("w", encoding="utf-8", errors="surrogateescape") + except BlockingIOError: + continue # Skip this job for now; the scheduler should re-try to launch it later. + except OSError as e: + log.exception("Error writing to job log file: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completions.append(JobCompletionEvent(job, JobStatus.KILLED, reason)) + + if job.interactive: + handle, completion = self._launch_interactive_job(job, log_file, env) + else: + handle, completion = await self._launch_job(job, log_file, env) + if completion is not None: + completions.append(completion) + if handle is not None: + handles[job.id] = handle + + if completions: + await self._emit_completion(completions) + + return handles + + def _send_kill_signal(self, proc: asyncio.subprocess.Process, signal_num: int) -> None: + """Send a (kill) signal to a process and all its descendent processes.""" + # TODO: maybe this should use cgroups in the future to be thorough? + for child in psutil.Process(proc.pid).children(recursive=True): + child.send_signal(signal_num) + proc.send_signal(signal_num) + + async def _kill_job(self, handle: LocalJobHandle) -> None: + """Kill the running local process, sending SIGTERM and then SIGKILL if that didn't work.""" + proc = handle.process + if proc is None: + return + + if proc.returncode is None: + handle.kill_requested = True + try: + self._send_kill_signal(proc, signal.SIGTERM) + except ProcessLookupError: + return + + try: + await asyncio.wait_for(proc.wait(), timeout=self.sigterm_timeout) + except asyncio.TimeoutError: + pass + else: + return + + if proc.returncode is None: + log.warning( + "Job '%s' was not killed with SIGTERM after %g seconds, sending SIGKILL.", + handle.spec.full_name, + self.sigterm_timeout, + ) + try: + self._send_kill_signal(proc, signal.SIGKILL) + except ProcessLookupError: + return + + try: + await asyncio.wait_for(proc.wait(), timeout=self.sigkill_timeout) + except asyncio.TimeoutError: + # proc.wait() completes only when the kernel reaps the process. If we sent SIGKILL + # and did not see this happen for a bit, the process is probably blocked in the + # kernel somewhere (e.g. NFS hang, slow or dead disk I/O). + log.error( + "Job '%s' was not killed with SIGKILL after %g seconds, so give up on it.", + handle.spec.full_name, + self.sigkill_timeout, + ) + + async def kill_many(self, handles: Iterable[JobHandle]) -> None: + """Cancel ongoing jobs via their handle. Killed jobs should still "complete".""" + tasks = [] + for handle in handles: + if not isinstance(handle, LocalJobHandle): + msg = f"Local backend expected handle of type LocalJobHandle, not `{type(handle)}`." + raise TypeError(msg) + if handle.process and not handle.kill_requested and handle.process.returncode is None: + tasks.append(asyncio.create_task(self._kill_job(handle))) + + if tasks: + # Wait for all job subprocesses to be killed; `_monitor_job` handles the completions. + await asyncio.gather(*tasks) diff --git a/src/dvsim/runtime/registry.py b/src/dvsim/runtime/registry.py new file mode 100644 index 00000000..2b93c8f0 --- /dev/null +++ b/src/dvsim/runtime/registry.py @@ -0,0 +1,117 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Registry of different runtime backends. Built-in backends are registered by default.""" + +from collections.abc import Callable +from typing import Any, NewType, TypeAlias + +from dvsim.launcher.base import Launcher +from dvsim.launcher.fake import FakeLauncher +from dvsim.launcher.lsf import LsfLauncher +from dvsim.launcher.nc import NcLauncher +from dvsim.launcher.sge import SgeLauncher +from dvsim.launcher.slurm import SlurmLauncher +from dvsim.logging import log +from dvsim.runtime.backend import RuntimeBackend +from dvsim.runtime.legacy import LegacyLauncherAdapter +from dvsim.runtime.local import LocalRuntimeBackend + +BackendType = NewType("BackendType", str) + +BackendFactory: TypeAlias = Callable[..., RuntimeBackend] + + +class BackendRegistry: + """Registry mapping backend names to factories/constructors of runtime backends.""" + + def __init__(self) -> None: + """Construct a new runtime backend registry.""" + self._registry: dict[BackendType, BackendFactory] = {} + self._default: BackendType | None = None + + def register( + self, name: BackendType, factory: BackendFactory, *, is_default: bool = False + ) -> None: + """Register a new runtime backend (factory/constructor) under a given name.""" + if name in self._registry: + msg = f"Backend '{name}' is already registered" + raise ValueError(msg) + log.debug("New runtime backend registered: %s", name) + self._registry[name] = factory + if is_default: + self.set_default(name) + + def set_default(self, name: BackendType) -> None: + """Set the default runtime backend, which should be used unless specified otherwise.""" + log.debug("Configured default backend: %s", name) + self._default = name + + def get_default(self) -> BackendType | None: + """Get the configured default runtime backend type.""" + return self._default + + def get(self, name: BackendType | None = None) -> BackendFactory: + """Retrieve a backend factory by its registered name.""" + name = self._default if name is None else name + if name is None: + raise ValueError("No default backend configured or backend name given") + + try: + return self._registry[name] + except KeyError as e: + msg = f"Unknown backend '{name}'" + raise KeyError(msg) from e + + def create( + self, name: BackendType, *args: list[Any], **kwargs: dict[str, Any] + ) -> RuntimeBackend: + """Instantiate a runtime backend by its registered name.""" + factory = self.get(name) + return factory(*args, **kwargs) + + def available(self) -> list[str]: + """Return names of all registered backends.""" + return sorted(self._registry.keys()) + + +# Default global registry +backend_registry = BackendRegistry() + + +def register_backend(name: BackendType, cls: type[RuntimeBackend]) -> None: + """Register a standard runtime backend.""" + backend_registry.register(name, cls) + + +# Helper for registering runtime backends for legacy launchers. +# Can be removed when all legacy launchers are migrated. +def register_legacy_launcher_backend(name: BackendType, launcher_cls: type[Launcher]) -> None: + """Register a legacy launcher class as a runtime backend by wrapping it in an adapter.""" + + def factory(*args: list[Any], **kwargs: dict[str, Any]) -> RuntimeBackend: + return LegacyLauncherAdapter(launcher_cls, *args, **kwargs) + + backend_registry.register(name, factory) + + +# Register built-in backends. TODO: migrate the legacy launchers to runtime backends. +register_backend(BackendType("local"), LocalRuntimeBackend) +register_legacy_launcher_backend(BackendType("fake"), FakeLauncher) +register_legacy_launcher_backend(BackendType("lsf"), LsfLauncher) +register_legacy_launcher_backend(BackendType("nc"), NcLauncher) +register_legacy_launcher_backend(BackendType("sge"), SgeLauncher) +register_legacy_launcher_backend(BackendType("slurm"), SlurmLauncher) + + +# TODO: Hack to support site-specific closed source custom launchers. These should be migrated to +# use the registry / a plugin system, and then the below should be dropped. +try: + from edacloudlauncher.EdaCloudLauncher import ( # type: ignore[report-missing-imports] + EdaCloudLauncher, + ) + + register_legacy_launcher_backend(BackendType("edacloud"), EdaCloudLauncher) +except ImportError: + pass diff --git a/src/dvsim/scheduler/async_core.py b/src/dvsim/scheduler/async_core.py new file mode 100644 index 00000000..651ad983 --- /dev/null +++ b/src/dvsim/scheduler/async_core.py @@ -0,0 +1,582 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Job scheduler.""" + +import asyncio +import heapq +from collections import defaultdict +from collections.abc import Callable, Iterable, Mapping, Sequence +from dataclasses import dataclass, field +from signal import SIGINT, SIGTERM, getsignal, signal +from types import FrameType +from typing import Any, TypeAlias + +from dvsim.job.data import CompletedJobStatus, JobSpec, JobStatusInfo +from dvsim.job.status import JobStatus +from dvsim.logging import log +from dvsim.runtime.backend import RuntimeBackend +from dvsim.runtime.data import JobCompletionEvent, JobHandle + +__all__ = ( + "JobPriorityFn", + "JobRecord", + "OnJobStatusChangeCb", + "OnRunEndCb", + "OnRunStartCb", + "OnSchedulerKillCb", + "Priority", + "Scheduler", +) + + +@dataclass +class JobRecord: + """Mutable runtime representation of a scheduled job, used in the scheduler.""" + + spec: JobSpec + backend_key: str # either spec.backend, or the default backend if not given + + status: JobStatus = JobStatus.SCHEDULED + status_info: JobStatusInfo | None = None + + remaining_deps: int = 0 + passing_deps: int = 0 + dependents: list[str] = field(default_factory=list) + kill_requested: bool = False + + handle: JobHandle | None = None + + +# Function to assign a priority to a given job specification. The returned priority should be +# some lexicographically orderable type. Jobs with higher priority are scheduled first. +Priority: TypeAlias = int | float | Sequence[int | float] +JobPriorityFn: TypeAlias = Callable[[JobRecord], Priority] + +# Callbacks for observers, for when the scheduler run starts and stops +OnRunStartCb: TypeAlias = Callable[[], None] +OnRunEndCb: TypeAlias = Callable[[], None] + +# Callbacks for observers, for when a job status changes in the scheduler +# The arguments are: (job spec, old status, new status). +OnJobStatusChangeCb: TypeAlias = Callable[[JobSpec, JobStatus, JobStatus], None] + +# Callbacks for observers, for when the scheduler receives a kill signal (termination). +OnSchedulerKillCb: TypeAlias = Callable[[], None] + + +# Standard context messages used for killed/failed jobs in the scheduler. +FAILED_DEP = JobStatusInfo( + message="Job cancelled because one of its dependencies failed or was killed." +) +ALL_FAILED_DEP = JobStatusInfo( + message="Job cancelled because all of its dependencies failed or were killed." +) +KILLED_SCHEDULED = JobStatusInfo( + message="Job cancelled because one of its dependencies was killed." +) +KILLED_QUEUED = JobStatusInfo(message="Job killed whilst waiting to begin execution.") +KILLED_RUNNING_SIGINT = JobStatusInfo( + message="Job killed by a SIGINT signal to the scheduler whilst executing." +) +KILLED_RUNNING_SIGTERM = JobStatusInfo( + message="Job killed by a SIGTERM signal to the scheduler whilst executing." +) + + +class Scheduler: + """Event-driven job scheduler that schedules and runs a DAG of job specifications.""" + + def __init__( # noqa: PLR0913 + self, + jobs: Iterable[JobSpec], + backends: Mapping[str, RuntimeBackend], + default_backend: str, + *, + max_parallelism: int = 0, + priority_fn: JobPriorityFn | None = None, + coalesce_window: float | None = 0.001, + ) -> None: + """Construct a new scheduler to run a DAG of jobs. + + Args: + jobs: The DAG of jobs to run. A sequence of job specifications, where the DAG is + defined by the job IDs and job dependency lists. + backends: The mapping (name -> backend) of backends available to the scheduler. + default_backend: The name of the default backend to use if not specified by a job. + max_parallelism: The maximum number of jobs that the scheduler is allowed to dispatch + at once, across all backends. The default value of `0` indicates no upper limit. + priority_fn: A function to calculate the priority of a given job. If no function is + given, this defaults to using the job's weight. + coalesce_window: If specified, the time in seconds to wait on receiving a job + completion, to give a short amount of time to allow other batched completion events + to arrive in the queue. This lets us batch scheduling more frequently for a little + extra cost. Defaults to 1 millisecond, and can be disabled by giving `None`. + + """ + if max_parallelism < 0: + err = f"max_parallelism must be some non-negative integer, not {max_parallelism}" + raise ValueError(err) + if default_backend not in backends: + err = f"Default backend '{default_backend}' is not in the mapping of given backends" + raise ValueError(err) + if coalesce_window is not None and coalesce_window < 0.0: + raise ValueError("coalesce_window must be None or some non-negative number") + + # Configuration of the scheduler's behaviour + self.backends = dict(backends) + self.default_backend = default_backend + self.max_parallelism = max_parallelism + self.priority_fn = priority_fn or self._default_priority + self.coalesce_window = coalesce_window + + # Internal data structures and indexes to track running jobs. + self._jobs: dict[str, JobRecord] = {} + self._ready_heap: list[tuple[Priority, str]] = [] + self._running: set[str] = set() + self._running_per_backend: dict[str, int] = dict.fromkeys(backends, 0) + self._event_queue: asyncio.Queue[Iterable[JobCompletionEvent]] = asyncio.Queue() + + # Internal flags and signal handling + self._shutdown_signal: int | None = None + self._shutdown_event: asyncio.Event | None = None + self._original_sigint_handler: Any = None + self._shutdown_started = False + + # Registered callbacks from observers + self._on_run_start: list[OnRunStartCb] = [] + self._on_run_end: list[OnRunEndCb] = [] + self._on_job_status_change: list[OnJobStatusChangeCb] = [] + self._on_kill_signal: list[OnSchedulerKillCb] = [] + + self._build_graph(jobs) + + def add_run_start_callback(self, cb: OnRunStartCb) -> None: + """Register an observer to notify when the scheduler run is started.""" + self._on_run_start.append(cb) + + def add_run_end_callback(self, cb: OnRunEndCb) -> None: + """Register an observer to notify when the scheduler run ends.""" + self._on_run_end.append(cb) + + def add_job_status_change_callback(self, cb: OnJobStatusChangeCb) -> None: + """Register an observer to notify when the status of a job in the scheduler changes.""" + self._on_job_status_change.append(cb) + + def add_kill_signal_callback(self, cb: OnSchedulerKillCb) -> None: + """Register an observer to notify when the scheduler is killed by some signal.""" + self._on_kill_signal.append(cb) + + def _default_priority(self, job: JobRecord) -> Priority: + """Prioritizes jobs according to their weight. The default prioritization method.""" + return job.spec.weight + + def _build_graph(self, specs: Iterable[JobSpec]) -> None: + """Build the job dependency graph and validate the DAG structure.""" + # Build an index of runtime job records, and check for duplicates + for spec in specs: + if spec.id in self._jobs: + log.warning("Duplicate job ID '%s'", spec.id) + # TODO: when we're sure it's ok, change the behaviour to error on duplicate jobs + # : err = f"Duplicate job ID '{spec.id}'" + # : raise ValueError(err) + # Instead, silently ignore it for now to match the original scheduler behaviour + continue + if spec.backend is not None and spec.backend not in self.backends: + err = f"Unknown job backend '{spec.backend}'" + raise ValueError(err) + backend_name = self.default_backend if spec.backend is None else spec.backend + self._jobs[spec.id] = JobRecord(spec=spec, backend_key=backend_name) + + # Build a graph from the adjacency list formed by the spec dependencies + for job in self._jobs.values(): + job.remaining_deps = len(job.spec.dependencies) + for dep in job.spec.dependencies: + if dep not in self._jobs: + err = f"Unknown job dependency '{dep}' for job {job.spec.id}" + raise ValueError(err) + self._jobs[dep].dependents.append(job.spec.id) + + # Validate that there are no cycles in the given graph. + self._validate_acyclic() + + def _validate_acyclic(self) -> None: + """Validate that the given job digraph is acyclic via Kahn's Algorithm.""" + indegree = {job: record.remaining_deps for job, record in self._jobs.items()} + job_queue = [job for job, degree in indegree.items() if degree == 0] + num_visited = 0 + + while job_queue: + job = job_queue.pop() + num_visited += 1 + for dep in self._jobs[job].dependents: + indegree[dep] -= 1 + if indegree[dep] == 0: + job_queue.append(dep) + + if num_visited != len(self._jobs): + raise ValueError("The given JobSpec graph contains a dependency cycle.") + + def _notify_run_started(self) -> None: + """Notify any observers that the scheduler run has started.""" + for cb in self._on_run_start: + cb() + + def _notify_run_finished(self) -> None: + """Notify any observers that the scheduler run has finished.""" + for cb in self._on_run_end: + cb() + + def _notify_kill_signal(self) -> None: + """Notify any observers that the scheduler received a kill signal.""" + for cb in self._on_kill_signal: + cb() + + def _change_job_status( + self, job: JobRecord, new_status: JobStatus, info: JobStatusInfo | None = None + ) -> JobStatus: + """Change a job's runtime status, storing an optionally associated reason. + + Notifies any status change observers of the change, and returns the previous status. + """ + old_status = job.status + if old_status == new_status: + return old_status + + job.status = new_status + job.status_info = info + + if new_status != JobStatus.RUNNING: + log.log( + log.ERROR if new_status in (JobStatus.FAILED, JobStatus.KILLED) else log.VERBOSE, + "Status change to [%s: %s] for %s", + new_status.shorthand, + new_status.name.capitalize(), + job.spec.full_name, + ) + + for cb in self._on_job_status_change: + cb(job.spec, old_status, new_status) + + return old_status + + def _mark_job_ready(self, job: JobRecord) -> None: + """Mark a given job in the scheduler as ready to execute (all dependencies completed).""" + if job.status != JobStatus.SCHEDULED: + msg = f"_mark_job_ready only applies to 'SCHEDULED' jobs (not '{job.status.name}')." + raise RuntimeError(msg) + + self._change_job_status(job, JobStatus.QUEUED) + # heapq is a min heap, so push (-priority) instead of (priority). + priority = self.priority_fn(job) + priority = priority if isinstance(priority, Sequence) else (priority,) + neg_priority: Priority = tuple(-x for x in priority) + heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) + + def _mark_job_running(self, job: JobRecord) -> None: + """Mark a given job in the scheduler as running. Assumes already removed from the heap.""" + if job.spec.id in self._running: + raise RuntimeError("_mark_job_running called on a job that was already running.") + + self._change_job_status(job, JobStatus.RUNNING) + self._running.add(job.spec.id) + self._running_per_backend[job.backend_key] += 1 + + def _mark_job_completed( + self, job: JobRecord, status: JobStatus, reason: JobStatusInfo | None + ) -> None: + """Mark a given job in the scheduler as completed, having reached some terminal state.""" + if not status.is_terminal: + err = f"_mark_job_completed called with non-terminal status '{status.name}'" + raise RuntimeError(err) + if job.status.is_terminal: + return + + # If the scheduler requested to kill the job, override the failure reason. + if job.kill_requested: + reason = ( + KILLED_RUNNING_SIGINT if self._shutdown_signal == SIGINT else KILLED_RUNNING_SIGTERM + ) + self._change_job_status(job, status, reason) + + # If the job was running, mark it as no longer running. + if job.spec.id in self._running: + self._running.remove(job.spec.id) + self._running_per_backend[job.backend_key] -= 1 + + # Update dependents (jobs that depend on this job), propagating failures if needed. + self._update_completed_job_deps(job) + + def _update_completed_job_deps(self, job: JobRecord) -> None: + """Update the dependencies of a completed job, scheduling/killing deps where necessary.""" + for dep_id in job.dependents: + dep = self._jobs[dep_id] + + # Update dependency tracking counts in the dependency records + dep.remaining_deps -= 1 + if job.status == JobStatus.PASSED: + dep.passing_deps += 1 + + # Propagate kill signals on shutdown + if self._shutdown_signal is not None: + self._mark_job_completed(dep, JobStatus.KILLED, KILLED_SCHEDULED) + continue + + # Handle dependency management and marking dependents as ready + if dep.remaining_deps == 0 and dep.status == JobStatus.SCHEDULED: + if dep.spec.needs_all_dependencies_passing: + if dep.passing_deps == len(dep.spec.dependencies): + self._mark_job_ready(dep) + else: + self._mark_job_completed(dep, JobStatus.KILLED, FAILED_DEP) + elif dep.passing_deps > 0: + self._mark_job_ready(dep) + else: + self._mark_job_completed(dep, JobStatus.KILLED, ALL_FAILED_DEP) + + async def run(self) -> list[CompletedJobStatus]: + """Run all scheduled jobs to completion (unless terminated) and return the results.""" + self._install_signal_handlers() + + for backend in self.backends.values(): + backend.attach_completion_callback(self._submit_job_completion) + + self._notify_run_started() + + # Before entering the main loop, mark jobs with 0 remaining deps as ready to run. + for job in self._jobs.values(): + if job.remaining_deps == 0: + self._mark_job_ready(job) + + try: + await self._main_loop() + finally: + self._notify_run_finished() + + return [ + CompletedJobStatus( + name=job.spec.name, + job_type=job.spec.job_type, + seed=job.spec.seed, + block=job.spec.block, + tool=job.spec.tool, + workspace_cfg=job.spec.workspace_cfg, + full_name=job.spec.full_name, + qual_name=job.spec.qual_name, + target=job.spec.target, + log_path=job.spec.log_path, + job_runtime=job.handle.job_runtime.with_unit("s").get()[0] + if job.handle is not None + else 0.0, + simulated_time=job.handle.simulated_time.with_unit("us").get()[0] + if job.handle is not None + else 0.0, + status=job.status, + fail_msg=job.status_info, + ) + for job in self._jobs.values() + ] + + def _install_signal_handlers(self) -> None: + """Install the SIGINT/SIGTERM signal handlers to trigger graceful shutdowns.""" + self._shutdown_signal = None + self._shutdown_event = asyncio.Event() + self._original_sigint_handler = getsignal(SIGINT) + self._shutdown_started = False + loop = asyncio.get_running_loop() + + def _handler(signum: int, _frame: FrameType | None) -> None: + if self._shutdown_signal is None and self._shutdown_event: + self._shutdown_signal = signum + loop.call_soon_threadsafe(self._shutdown_event.set) + + # Restore the original SIGINT handler so a second Ctrl-C terminates immediately + if signum == SIGINT: + signal(SIGINT, self._original_sigint_handler) + + loop.add_signal_handler(SIGINT, lambda: _handler(SIGINT, None)) + loop.add_signal_handler(SIGTERM, lambda: _handler(SIGTERM, None)) + + async def _submit_job_completion(self, events: Iterable[JobCompletionEvent]) -> None: + """Notify the scheduler that a batch of jobs have been completed.""" + try: + self._event_queue.put_nowait(events) + except asyncio.QueueShutDown as e: + msg = "Scheduler event queue shutdown earlier than expected?" + raise RuntimeError(msg) from e + except asyncio.QueueFull: + log.critical("Scheduler event queue full despite being infinitely sized?") + + async def _main_loop(self) -> None: + """Run the main scheduler loop. + + Tries to schedule any ready jobs if there is available capacity, and then waits for any job + completions (or a shutdown signal). This continues in a loop until all jobs have been either + executed or killed (e.g. via a shutdown signal). + """ + if self._shutdown_event is None: + raise RuntimeError("Expected signal handlers to be installed before running main loop") + + job_completion_task = asyncio.create_task(self._event_queue.get()) + shutdown_task = asyncio.create_task(self._shutdown_event.wait()) + + try: + while True: + await self._schedule_ready_jobs() + + if not self._running: + if not self._ready_heap: + break + # This case (nothing running, but jobs still pending in the queue) can happen + # if backends fail to schedule any jobs (e.g. the backend is temporarily busy). + continue + + # Wait for any job to complete, or for a shutdown signal + try: + done, _ = await asyncio.wait( + (job_completion_task, shutdown_task), + return_when=asyncio.FIRST_COMPLETED, + ) + except asyncio.QueueShutDown as e: + msg = "Scheduler event queue shutdown earlier than expected?" + raise RuntimeError(msg) from e + + if shutdown_task in done: + self._shutdown_event.clear() + shutdown_task = asyncio.create_task(self._shutdown_event.wait()) + await self._handle_exit_signal() + continue + + completions = await self._drain_completions(job_completion_task) + job_completion_task = asyncio.create_task(self._event_queue.get()) + + for event in completions: + job = self._jobs[event.spec.id] + self._mark_job_completed(job, event.status, event.reason) + finally: + job_completion_task.cancel() + shutdown_task.cancel() + + async def _drain_completions(self, completion_task: asyncio.Task) -> list[JobCompletionEvent]: + """Drain batched completions from the queue, optionally coalescing batched events.""" + events = list(completion_task.result()) + + # Coalesce nearby completions by waiting for a very short time + if self.coalesce_window is not None: + await asyncio.sleep(self.coalesce_window) + + # Drain any more completion events from the event queue + try: + while True: + events.extend(self._event_queue.get_nowait()) + except asyncio.QueueEmpty: + return events + except asyncio.QueueShutDown as e: + msg = "Scheduler event queue shutdown earlier than expected?" + raise RuntimeError(msg) from e + + async def _handle_exit_signal(self) -> None: + """Attempt to gracefully shutdown as a result of a triggered exit signal.""" + if self._shutdown_started: + return + self._shutdown_started = True + + signal_name = "SIGTERM" if self._shutdown_signal == SIGTERM else "SIGINT" + log.info("Received %s signal. Exiting gracefully", signal_name) + if self._shutdown_signal == SIGINT: + log.info( + "Send another to force immediate quit (but you may need to manually " + "kill some child processes)." + ) + + self._notify_kill_signal() + + # Mark any jobs that are currently running as jobs we should kill. + # Collect jobs to kill in a dict, grouped per backend, for batched killing. + to_kill: dict[str, list[JobHandle]] = defaultdict(list) + + for job_id in self._running: + job = self._jobs[job_id] + if job.handle is None: + raise RuntimeError("Running job is missing an associated handle.") + job.kill_requested = True + to_kill[job.backend_key].append(job.handle) + + # Asynchronously dispatch backend kill tasks whilst we update scheduler internals. + # Jobs that depend on these jobs will then be transitively killed before they start. + kill_tasks: list[asyncio.Task] = [] + for backend_name, handles in to_kill.items(): + backend = self.backends[backend_name] + kill_tasks.append(asyncio.create_task(backend.kill_many(handles))) + + # Kill any ready (but not running jobs), so that they don't get scheduled. + while self._ready_heap: + _, job_id = heapq.heappop(self._ready_heap) + job = self._jobs[job_id] + self._mark_job_completed(job, JobStatus.KILLED, KILLED_QUEUED) + + if kill_tasks: + await asyncio.gather(*kill_tasks, return_exceptions=True) + + async def _schedule_ready_jobs(self) -> None: + """Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism.""" + # Find out how many jobs we can dispatch according to the scheduler's parallelism limit + available_slots = ( + self.max_parallelism - len(self._running) + if self.max_parallelism + else len(self._ready_heap) + ) + if available_slots <= 0: + return + + # Collect jobs to launch in a dict, grouped per backend, for batched launching. + to_launch: dict[str, list[tuple[Priority, JobRecord]]] = defaultdict(list) + blocked: list[tuple[Priority, str]] = [] + slots_used = 0 + + while self._ready_heap and slots_used < available_slots: + neg_priority, job_id = heapq.heappop(self._ready_heap) + job = self._jobs[job_id] + backend = self.backends[job.backend_key] + running_on_backend = self._running_per_backend[job.backend_key] + len( + to_launch[job.backend_key] + ) + + # Check that we can launch the job whilst respecting backend parallelism limits + if backend.max_parallelism and running_on_backend >= backend.max_parallelism: + blocked.append((neg_priority, job_id)) + continue + + to_launch[job.backend_key].append((neg_priority, job)) + slots_used += 1 + + # Requeue any blocked jobs. + for entry in blocked: + heapq.heappush(self._ready_heap, entry) + + # Launch the selected jobs in batches per backend + launch_tasks = [] + for backend_name, jobs in to_launch.items(): + backend = self.backends[backend_name] + job_specs = [job.spec for _, job in jobs] + log.verbose( + "[%s]: Dispatching jobs: %s", + backend_name, + ", ".join(job.full_name for job in job_specs), + ) + launch_tasks.append(backend.submit_many(job_specs)) + + results = await asyncio.gather(*launch_tasks) + + # Mark jobs running, and requeue any jobs that failed to launch + for jobs, handles in zip(to_launch.values(), results, strict=True): + for neg_priority, job in jobs: + handle = handles.get(job.spec.id) + if handle is None: + log.verbose("[%s]: Requeuing job '%s'", job.spec.target, job.spec.full_name) + heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) + continue + + job.handle = handle + self._mark_job_running(job) diff --git a/src/dvsim/scheduler/async_status_printer.py b/src/dvsim/scheduler/async_status_printer.py new file mode 100644 index 00000000..737bcfb3 --- /dev/null +++ b/src/dvsim/scheduler/async_status_printer.py @@ -0,0 +1,397 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Job status printing during a scheduled run.""" + +import asyncio +import os +import shutil +import sys +import termios +import time +from abc import ABC, abstractmethod +from collections import defaultdict +from collections.abc import Sequence +from typing import ClassVar + +import enlighten + +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus +from dvsim.logging import log +from dvsim.utils import hms + + +class StatusPrinter(ABC): + """Status Printer abstract base class. + + Contains core functionality related to status printing - a print interval can be configured + to control how often the scheduler target statuses are printed, which is managed by an async + thread. Optionally, the print interval can be configured to 0 to run in an update-driven mode + where every single status update is printed. Regardless of the configured print interval, the + final job update for each target is printed immediately to reflect final target end timings. + """ + + # How often we print by default. Zero means we should print on every event change. + print_interval = 0 + + def __init__(self, jobs: Sequence[JobSpec], print_interval: int | None = None) -> None: + """Construct the base StatusPrinter.""" + # Mapping from target -> (Mapping from status -> count) + self._target_counts: dict[str, dict[JobStatus, int]] = defaultdict(lambda: defaultdict(int)) + # Mapping from target -> number of jobs + self._totals: dict[str, int] = defaultdict(int) + + for job in jobs: + self._target_counts[job.target][JobStatus.SCHEDULED] += 1 + self._totals[job.target] += 1 + + # The number of characters used to represent the largest field in the displayed table + self._field_width = max((len(str(total)) for total in self._totals.values()), default=0) + + # State tracking for the StatusPrinter + self._start_time: float = 0.0 + self._last_print: float = 0.0 + self._running: dict[str, list[str]] = defaultdict(list) + self._num_finished: dict[str, int] = defaultdict(int) + self._finish_time: dict[str, float] = {} + + # Async target status update handling + self._task: asyncio.Task | None = None + self._paused: bool = False + + self._interval = print_interval if print_interval is not None else self.print_interval + + @property + def updates_every_event(self) -> bool: + """If the configured print interval is 0, statuses are updated on every state change.""" + return self._interval <= 0 + + def start(self) -> None: + """Start printing the status of the scheduled jobs.""" + self._start_time = time.monotonic() + self._print_header() + for target in self._target_counts: + self._init_target(target, self._get_target_row(target)) + + # If we need an async task to manage the print interval, create one + if not self.updates_every_event: + self._task = asyncio.create_task(self._run()) + + async def _run(self) -> None: + """Run a timer in an async loop, printing the updated status at every interval.""" + next_tick = self._start_time + self._interval + self.update_all_targets(including_unstarted=True) + + while True: + now = time.monotonic() + sleep_time = max(0, next_tick - now) + await asyncio.sleep(sleep_time) + self.update_all_targets() + next_tick += self._interval + + def update_all_targets(self, *, including_unstarted: bool = False) -> None: + """Update the status bars of all targets.""" + if self._paused: + return + update_time = time.monotonic() + for target in self._target_counts: + # Only update targets that have started (some job status has changed) + if self.target_is_started(target) or including_unstarted: + target_update_time = self._finish_time.get(target, update_time) + self._update_target(target_update_time, target) + + def target_is_started(self, target: str) -> bool: + """Check whether a target has been started yet or not.""" + return bool(self._num_finished[target]) or bool(self._running[target]) + + def target_is_done(self, target: str) -> bool: + """Check whether a target is finished or not.""" + return self._num_finished[target] >= self._totals[target] + + def update_status(self, job: JobSpec, old_status: JobStatus, new_status: JobStatus) -> None: + """Update the status printer to reflect a change in job status.""" + status_counts = self._target_counts[job.target] + status_counts[old_status] -= 1 + if old_status == JobStatus.RUNNING: + self._running[job.target].remove(job.full_name) + status_counts[new_status] += 1 + if new_status == JobStatus.RUNNING: + self._running[job.target].append(job.full_name) + if not old_status.is_terminal and new_status.is_terminal: + self._num_finished[job.target] += 1 + + if self.target_is_done(job.target) and not self.updates_every_event: + # Even if we have a configured print interval, we should record + # the time at which the target finished to capture accurate end timing. + self._finish_time[job.target] = time.monotonic() + elif self.updates_every_event: + self.update_all_targets() + + def _get_header(self) -> str: + """Get the header string to use for printing the status.""" + return ( + ", ".join( + f"{status.shorthand}: {status.name.lower().rjust(self._field_width)}" + for status in JobStatus + ) + + ", T: total" + ) + + def _get_target_row(self, target: str) -> str: + """Get a formatted string with the fields for a given target row.""" + fields = [] + for status in JobStatus: + count = self._target_counts[target][status] + value = f"{count:0{self._field_width}d}" + fields.append(f"{status.shorthand}: {value.rjust(len(status.name))}") + total = f"{self._totals[target]:0{self._field_width}d}" + fields.append(f"T: {total.rjust(5)}") + return ", ".join(fields) + + @abstractmethod + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + + @abstractmethod + def _init_target(self, target: str, _msg: str) -> None: + """Initialize the status bar for a target.""" + + @abstractmethod + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + + def pause(self) -> None: + """Toggle whether the status printer is paused. May make target finish times inaccurate.""" + self._paused = not self._paused + if not self._paused and self.updates_every_event: + self.update_all_targets() + + def stop(self) -> None: + """Stop the status header/target printing (but keep the printer context).""" + if self._task: + self._task.cancel() + if self._paused: + self._paused = False + self.update_all_targets(including_unstarted=True) + + def exit(self) -> None: # noqa: B027 + """Do cleanup activities before exiting.""" + + +class TtyStatusPrinter(StatusPrinter): + """Prints the current scheduler target status onto the console / TTY via logging.""" + + hms_fmt = "\x1b[1m{hms:9s}\x1b[0m" + header_fmt = hms_fmt + " [{target:^13s}]: [{msg}]" + status_fmt = header_fmt + " {percent:3.0f}% {running}" + + def __init__(self, jobs: Sequence[JobSpec]) -> None: + """Initialise the TtyStatusPrinter.""" + super().__init__(jobs) + + # Maintain a mapping of completed targets, so we only print the status one last + # time when it reaches 100% for a target. + self._target_done: dict[str, bool] = {} + + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + log.info(self.header_fmt.format(hms="", target="legend", msg=self._get_header())) + + def _init_target(self, target: str, _msg: str) -> None: + """Initialize the status bar for a target.""" + self._target_done[target] = False + + def _trunc_running(self, running: str, width: int = 30) -> str: + """Truncate the list of running items to a specified length.""" + if len(running) <= width: + return running + return running[: width - 3] + "..." + + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + if self._target_done[target]: + return + if self.target_is_done(target): + self._target_done[target] = True + + status_counts = self._target_counts[target] + done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) + percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 + + log.info( + self.status_fmt.format( + hms=hms(current_time - self._start_time), + target=target, + msg=self._get_target_row(target), + percent=percent, + running=self._trunc_running(", ".join(self._running[target])), + ), + ) + + +class EnlightenStatusPrinter(TtyStatusPrinter): + """Prints the current scheduler target status to the terminal using Enlighten. + + Enlighten is a third party progress bar tool. Documentation: + https://python-enlighten.readthedocs.io/en/stable/ + + Enlighten does not work if the output of DVSim is redirected to a file, for + example - it needs to be attached to a TTY enabled stream. + """ + + # Enlighten uses a min_delta of 0.1 by default, only updating every 0.1 seconds. + DEFAULT_MIN_DELTA = 0.1 + + status_fmt_no_running = TtyStatusPrinter.status_fmt.removesuffix("{running}") + status_fmt = "{status_msg}{running}" + + def __init__(self, jobs: Sequence[JobSpec]) -> None: + """Initialise the EnlightenStatusPrinter.""" + super().__init__(jobs) + if self._interval < self.DEFAULT_MIN_DELTA: + # TODO: maybe "debounce" the updates with a delayed async refresh task? + log.warning( + "Configured print interval %g will not accurately reflect for %s," + " which uses status bars with a configured min_delta of %g by default.", + self._interval, + self.__class__.__name__, + self.DEFAULT_MIN_DELTA, + ) + + # Initialize the enlighten manager and needed state + self._manager = enlighten.get_manager() + self._status_header: enlighten.StatusBar | None = None + self._status_bars: dict[str, enlighten.StatusBar] = {} + self._stopped = False + + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + self._status_header = self._manager.status_bar( + status_format=self.header_fmt, + hms="", + target="legend", + msg=self._get_header(), + ) + + def _init_target(self, target: str, msg: str) -> None: + """Initialize the status bar for a target.""" + super()._init_target(target, msg) + msg = self.status_fmt_no_running.format(hms=hms(0), target=target, msg=msg, percent=0.0) + self._status_bars[target] = self._manager.status_bar( + status_format=self.status_fmt, + status_msg=msg, + running="", + ) + + def _trunc_running_to_terminal(self, running: str, offset: int) -> str: + """Truncate the list of running items to match the max terminal width.""" + cols = shutil.get_terminal_size(fallback=(80, 24)).columns + width = max(30, cols - offset - 1) + return self._trunc_running(running, width) + + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + if self._target_done[target]: + return + + status_counts = self._target_counts[target] + done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) + percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 + + status_msg = self.status_fmt_no_running.format( + hms=hms(current_time - self._start_time), + target=target, + msg=self._get_target_row(target), + percent=percent, + ) + offset = len(status_msg) + running = self._trunc_running_to_terminal(", ".join(self._running[target]), offset) + + self._status_bars[target].update(status_msg=status_msg, running=running) + + if self.target_is_done(target): + self._target_done[target] = True + self._status_bars[target].refresh() + + def stop(self) -> None: + """Stop the status header/target printing (but keep the printer context).""" + super().stop() + if self._status_header is not None: + self._status_header.close() + for status_bar in self._status_bars.values(): + status_bar.close() + self._stopped = True + + def exit(self) -> None: + """Do cleanup activities before exiting (closing the manager context).""" + super().exit() + if not self._stopped: + self.stop() + self._manager.stop() + + # Sometimes, exiting via a signal (e.g. Ctrl-C) can cause Enlighten to leave the + # terminal in some non-raw mode. Just in case, restore regular operation. + self._restore_terminal() + + def _restore_terminal(self) -> None: + """Restore regular terminal operation after using Enlighten.""" + # Try open /dev/tty, otherwise fallback to sys.stdin + try: + fd = os.open("/dev/tty", os.O_RDWR) + close_fd = True + except (OSError, termios.error): + fd = sys.stdin.fileno() + close_fd = False + + # By default, the terminal should echo input (ECHO) and run in canonical mode (ICANON). + # We make this change after all buffered output is transmitted (TCSADRAIN). + try: + attrs = termios.tcgetattr(fd) + attrs[3] |= termios.ECHO | termios.ICANON + termios.tcsetattr(fd, termios.TCSADRAIN, attrs) + except termios.error: + log.debug("Unable to restore terminal attributes safely") + + if close_fd: + os.close(fd) + + +class StatusPrinterSingleton: + """Singleton for the status printer to uniquely refer to 1 instance at a time.""" + + _instance: ClassVar[StatusPrinter | None] = None + + @classmethod + def set(cls, instance: StatusPrinter | None) -> None: + """Set the stored status printer.""" + cls._instance = instance + + @classmethod + def get(cls) -> StatusPrinter | None: + """Get the stored status printer (if it exists).""" + return cls._instance + + +def create_status_printer(jobs: Sequence[JobSpec]) -> StatusPrinter: + """Create the global status printer. + + If stdout is a TTY, then return an instance of EnlightenStatusPrinter, else + return an instance of StatusPrinter. + """ + status_printer = StatusPrinterSingleton.get() + if status_printer is not None: + return status_printer + + status_printer = EnlightenStatusPrinter(jobs) if sys.stdout.isatty() else TtyStatusPrinter(jobs) + StatusPrinterSingleton.set(status_printer) + return status_printer + + +def get_status_printer() -> StatusPrinter: + """Retrieve the configured global status printer.""" + status_printer = StatusPrinterSingleton.get() + if status_printer is None: + raise RuntimeError("get_status_printer called without first creating the status printer") + return status_printer diff --git a/src/dvsim/scheduler/log_manager.py b/src/dvsim/scheduler/log_manager.py new file mode 100644 index 00000000..023eb788 --- /dev/null +++ b/src/dvsim/scheduler/log_manager.py @@ -0,0 +1,50 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim Scratch Job Log Manager.""" + +from pathlib import Path + +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus +from dvsim.utils import mk_symlink, rm_path + + +class LogManager: + """Observes job state changes in the scheduler and manages scratch output directory links.""" + + def __init__(self) -> None: + """Construct a LogManager.""" + # Mapping from job ID -> last symlinked status + self._links: dict[str, JobStatus] = {} + + def _link_job_output_directory(self, job: JobSpec, status: JobStatus) -> None: + """Symbolic (soft) link the job's output directory based on its status. + + The status directories (e.g. `passed/`, `failed/`) in the scratch area then provide a + quick mechanism for traversing the list of jobs that were executed. + """ + old_status = self._links.get(job.id, None) + if old_status == status: + return + + link_dest = Path(job.links[status], job.qual_name) + self._links[job.id] = status + + # If the symlink already exists (e.g. created by legacy launcher), just keep it. + # TODO: when all launchers are migrated this check can be removed. + if link_dest.exists() and link_dest.is_symlink(): + return + mk_symlink(path=job.odir, link=link_dest) + + # Delete the previous symlink if it exists + if old_status is not None: + old_link_dest = Path(job.links[old_status], job.qual_name) + rm_path(old_link_dest) + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify the LogManager when a job status has changed.""" + # Only create linked output directories for defined links (terminal state or running). + if status in job.links: + self._link_job_output_directory(job, status) diff --git a/src/dvsim/utils/__init__.py b/src/dvsim/utils/__init__.py index 353b8b37..70c4a74d 100644 --- a/src/dvsim/utils/__init__.py +++ b/src/dvsim/utils/__init__.py @@ -5,16 +5,10 @@ """Utility functions common across dvsim.""" from dvsim.utils.check import check_bool, check_int -from dvsim.utils.fs import ( - TS_FORMAT, - TS_FORMAT_LONG, - clean_odirs, - mk_path, - mk_symlink, - rm_path, -) +from dvsim.utils.fs import clean_odirs, mk_path, mk_symlink, rm_path from dvsim.utils.hjson import parse_hjson from dvsim.utils.subprocess import run_cmd, run_cmd_with_timeout +from dvsim.utils.time import TS_FORMAT, TS_FORMAT_LONG, hms from dvsim.utils.timer import Timer from dvsim.utils.wildcards import ( find_and_substitute_wildcards, @@ -29,6 +23,7 @@ "check_int", "clean_odirs", "find_and_substitute_wildcards", + "hms", "mk_path", "mk_symlink", "parse_hjson", diff --git a/src/dvsim/utils/fs.py b/src/dvsim/utils/fs.py index 6b8d398a..93886e83 100644 --- a/src/dvsim/utils/fs.py +++ b/src/dvsim/utils/fs.py @@ -12,21 +12,16 @@ from pathlib import Path from dvsim.logging import log +from dvsim.utils.time import TS_FORMAT __all__ = ( - "TS_FORMAT", - "TS_FORMAT_LONG", + "clean_odirs", "mk_path", "mk_symlink", + "relative_to", "rm_path", ) -# Timestamp format when creating directory backups. -TS_FORMAT = "%Y%m%d_%H%M%S" - -# Timestamp format when generating reports. -TS_FORMAT_LONG = "%A %B %d %Y %H:%M:%S UTC" - def rm_path(path: Path, *, ignore_error: bool = False) -> None: """Remove the specified path if it exists. diff --git a/src/dvsim/utils/time.py b/src/dvsim/utils/time.py new file mode 100644 index 00000000..8723ec75 --- /dev/null +++ b/src/dvsim/utils/time.py @@ -0,0 +1,20 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Time-based utilities.""" + +# Timestamp format when creating directory backups. +TS_FORMAT = "%Y%m%d_%H%M%S" + +# Timestamp format when generating reports. +TS_FORMAT_LONG = "%A %B %d %Y %H:%M:%S UTC" + + +def hms(seconds: float) -> str: + """Render a duration (in seconds) in the hh:mm:ss format, rounded to the nearest second.""" + total = round(seconds) + hours, mins = divmod(total, 3600) + mins //= 60 + secs = total % 60 + return f"{hours:02d}:{mins:02d}:{secs:02d}" diff --git a/src/dvsim/utils/timer.py b/src/dvsim/utils/timer.py index 7db5fcfd..b0c55f08 100644 --- a/src/dvsim/utils/timer.py +++ b/src/dvsim/utils/timer.py @@ -4,6 +4,8 @@ import time +from dvsim.utils.time import hms + class Timer: """A timer to keep track of how long jobs have been running. @@ -26,11 +28,7 @@ def period(self): def hms(self) -> str: """Get the time since start in hh:mm:ss.""" - period = self.period() - secs = int(period + 0.5) - mins = secs // 60 - hours = mins // 60 - return f"{hours:02}:{mins % 60:02}:{secs % 60:02}" + return hms(self.period()) def check_time(self) -> bool: """Return true if we have passed next_print. diff --git a/tests/job/test_status.py b/tests/job/test_status.py new file mode 100644 index 00000000..16ff28b2 --- /dev/null +++ b/tests/job/test_status.py @@ -0,0 +1,19 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Test Job (scheduler) status modelling.""" + +from hamcrest import assert_that, equal_to + +from dvsim.job.status import JobStatus + + +class TestJobStatus: + """Test scheduler JobStatus models.""" + + @staticmethod + def test_unique_shorthands() -> None: + """Test that all scheduler job statuses have unique shorthand representations.""" + shorthands = [status.shorthand for status in JobStatus] + assert_that(len(set(shorthands)), equal_to(len(shorthands))) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 042357ed..62e13de8 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -312,6 +312,7 @@ def job_spec_factory( "name": "test_job", "job_type": "mock_type", "target": "mock_target", + "backend": None, "seed": None, "dependencies": [], "needs_all_dependencies_passing": True,