From aef4e09e2492fa3d926fdecd7e6a2f587d0992d3 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Thu, 26 Mar 2026 13:58:43 +0000 Subject: [PATCH 01/15] feat: port core launcher base functionality to the `RuntimeBackend` base This commit fleshes out the abstract `RuntimeBackend` base class with a lot of core functionality that will be needed to implement new runtime backends (which aren't just the legacy launcher adapter). These mostly take the form of protected methods optionally called by backends, comprised of logic on the `Launcher` base class or that was previously duplicated across its various subclasses. Some key changes to note from the launchers: - A new `DVSIM_RUN_INTERACTIVE` env var is introduced intended to replace the `RUN_INTERACTIVE` env var long term, to avoid potential name collision. - Errors are raised if an interactive job tries to run on a backend that doesn't support running jobs interactively. - Log parsing functionality is extracted to a separate object; logs are always lazily loaded so that for jobs that don't need them (passing jobs without any fail or pass patterns), we don't waste time. - Efficiency of the log contents pass/fail regex pattern parsing is improved. Fail patterns are combined into a single regex check, and all regexes are compiled once instead of per-line. Signed-off-by: Alex Jones --- src/dvsim/runtime/backend.py | 261 ++++++++++++++++++++++++++++++++++- 1 file changed, 259 insertions(+), 2 deletions(-) diff --git a/src/dvsim/runtime/backend.py b/src/dvsim/runtime/backend.py index 8c62e279..cba5edcc 100644 --- a/src/dvsim/runtime/backend.py +++ b/src/dvsim/runtime/backend.py @@ -4,17 +4,46 @@ """Runtime backend abstract base class.""" +import os +import re from abc import ABC, abstractmethod -from collections.abc import Hashable, Iterable +from collections.abc import Hashable, Iterable, Sequence +from pathlib import Path -from dvsim.job.data import JobSpec +from dvsim.job.data import JobSpec, JobStatusInfo from dvsim.job.status import JobStatus +from dvsim.job.time import JobTime from dvsim.logging import log from dvsim.runtime.data import ( CompletionCallback, JobCompletionEvent, JobHandle, ) +from dvsim.tool.utils import get_sim_tool_plugin +from dvsim.utils import clean_odirs + +__all__ = ("RuntimeBackend",) + +# A list of magic flags that are currently cleared. +# TODO: it would be good to find a nicer solution for this - perhaps a common configuration +# could just re-export it or define that it should not exist? Or it could be in a DVSim config. +MAGIC_VARS_TO_CLEAR = { + # This variable is used by recursive Make calls to pass variables from one level to the next. + # Even if our command here invokes Make, it should logically be a top-level invocation. We + # don't want to pollute the flow with Make variables from any wrapper that called DVSim. + "MAKEFLAGS", +} + +# Relative paths to files created in job output directories +ENV_DUMP_PATH = "env_vars" + + +# The number of lines to give as context when a failure pattern is parsed from a log file. +NUM_LOG_FAIL_CONTEXT_LINES = 4 +# The number of lines to give as context when pass patterns are missing from a log file. +NUM_LOG_PASS_CONTEXT_LINES = 10 +# The number of lines to give as context when a non-zero exit code is returned. +NUM_RETCODE_CONTEXT_LINES = 10 class RuntimeBackend(ABC): @@ -66,6 +95,9 @@ async def _emit_completion(self, events: Iterable[JobCompletionEvent]) -> None: raise RuntimeError("Backend not attached to the scheduler") for event in events: + # TODO: aim to refactor to remove these callbacks + event.spec.post_finish(event.status) + log.debug( "Job %s completed execution: %s", event.spec.qual_name, event.status.shorthand ) @@ -112,3 +144,228 @@ async def close(self) -> None: # noqa: B027 The default implementation just does nothing. """ + + def _build_job_env( + self, + job: JobSpec, + backend_env: dict[str, str] | None = None, + remove: Iterable[str] | None = None, + ) -> dict[str, str]: + """Build job environment configuration for a given job. + + Arguments: + job: The job specification to get the environment from. + context: The job execution context for this backend. + backend_env: Any backend-specific environment overrides to use. Defaults to None. + Takes precedence over the base OS environment, but is overridden by the job itself. + remove: A list of variables to remove from the final environment variable list. + Defaults to None. + + Returns the job environment as a mapping of env var names to values. + + """ + # Take the existing environment variables and update with any exports defined on the spec. + # TODO: consider adding some `--clean-env` CLI arg & flag to only use `job.exports` instead + # of also inheriting from `os.environ`? + env = dict(os.environ) + if backend_env: + env.update(backend_env) + env.update(job.exports) + + # If the job is set to run in "interactive" mode, we set the `RUN_INTERACTIVE` environment + # variable to 1, and also make a note in the environment. + if job.interactive: + env["DVSIM_RUN_INTERACTIVE"] = "1" + # TODO: Legacy environment variable not prefixed with `DVSIM` - deprecate this. + env["RUN_INTERACTIVE"] = "1" + + # Clear any magic flags or `remove` entries from the environment variable export list + for key in remove or (): + env.pop(key, None) + for magic_var in MAGIC_VARS_TO_CLEAR: + env.pop(magic_var, None) + + # Dump the environment variables to their own file to make debugging easier. + if job.odir and job.odir.exists(): + dump = job.odir / ENV_DUMP_PATH + with dump.open("w", encoding="utf-8", errors="surrogateescape") as f: + f.writelines(f"{key}={value}\n" for key, value in sorted(env.items())) + + return env + + def _make_job_output_directory(self, job: JobSpec) -> None: + """Create the output directory for a job. + + Depending on the configured `renew_odir` setting, this will optionally clean or maintain + a list of previous output directories for this job. + + """ + if job.renew_odir: + clean_odirs(odir=job.odir, max_odirs=self.max_output_dirs) + + Path(job.odir).mkdir(exist_ok=True, parents=True) + + def _prepare_launch(self, job: JobSpec) -> None: + """Do any pre-launch activities, preparing the environment. + + This may include clearing old runs, creating the output directory, etc. + """ + if job.interactive and not self.supports_interactive: + msg = f"Interactive jobs are not supported by the '{self.name}' backend." + raise RuntimeError(msg) + + job.pre_launch() + self._make_job_output_directory(job) + + def _finish_job( + self, handle: JobHandle, exit_code: int, runtime: float | None + ) -> tuple[JobStatus, JobStatusInfo | None]: + """Determine the outcome of a job that ran to completion, and parse extra log info. + + Updates the handle with any extracted job runtime & simulation time info. + """ + if handle.spec.dry_run: + return JobStatus.PASSED, None + + log_results = LogResults(handle.spec) + + # Update time information on the handle. + job_runtime, simulated_time = log_results.get_runtime_from_logs() + if job_runtime is None: + log.warning("%s: Using dvsim-maintained job_runtime instead.", handle.spec.full_name) + if runtime is not None: + handle.job_runtime.set(runtime, "s") + else: + handle.job_runtime.set(*job_runtime.get()) + if simulated_time is not None: + handle.simulated_time.set(*simulated_time.get()) + + # Determine the final status from the logs and exit code. + status, reason = log_results.get_status_from_logs() + if status is not None: + return status, reason + if exit_code != 0: + lines = log_results.get_lines() + return JobStatus.FAILED, JobStatusInfo( + message=f"Job returned a non-zero exit code: {exit_code}", + context=lines[-NUM_RETCODE_CONTEXT_LINES:], + ) + return JobStatus.PASSED, None + + +class LogResults: + """Wrapper for log result parsing which lazily loads the contents of the job log file.""" + + def __init__(self, job: JobSpec) -> None: + """Construct a LogResults object. Does not load the log file until needed.""" + self.spec = job + self._parsed = False + self._lines: list[str] | None = None + self._err_status: tuple[JobStatus, JobStatusInfo] | None = None + + def _ensure_log_parsed(self) -> None: + """Parse the log file into its lines if not already parsed.""" + if self._parsed: + return + + try: + with self.spec.log_path.open(encoding="utf-8", errors="surrogateescape") as f: + self._lines = f.readlines() + except OSError as e: + log.debug( + "%s: Error reading job log file %s: %s", + self.spec.full_name, + str(self.spec.log_path), + str(e), + ) + self._err_status = ( + JobStatus.FAILED, + JobStatusInfo(message=f"Error opening file {self.spec.log_path}:\n{e}"), + ) + finally: + self._parsed = True + + def get_lines(self) -> Sequence[str]: + """Get the sequence of lines in the log results, or an empty sequence if failed parsing.""" + self._ensure_log_parsed() + return () if self._lines is None else self._lines + + def get_status_from_logs(self) -> tuple[JobStatus | None, JobStatusInfo | None]: + """Determine the outcome of a completed job from its log file.""" + # Check we actually need to use the logs before loading them + use_log_check_strategy = bool(self.spec.fail_patterns) or bool(self.spec.pass_patterns) + if not use_log_check_strategy: + return None, None + + lines = self.get_lines() + if self._err_status: + return self._err_status + + fail_regex = None + if self.spec.fail_patterns: + fail_regex = re.compile("|".join(f"(?:{p})" for p in self.spec.fail_patterns)) + pass_regexes = {re.compile(pattern) for pattern in self.spec.pass_patterns} + + # TODO: does this need to be restricted to per-line patterns? It would complicate line + # number parsing, but it might be useful to make this more expressive? + for lineno, line in enumerate(lines, start=1): + # If the job matches ANY fail pattern, it fails. Provide some extra lines for context. + if fail_regex and fail_regex.search(line): + end = lineno + NUM_LOG_FAIL_CONTEXT_LINES + return JobStatus.FAILED, JobStatusInfo( + message=line.strip(), lines=[lineno], context=lines[lineno:end] + ) + + # The job must match ALL pass patterns to succeed. + matched = {regex for regex in pass_regexes if regex.search(line)} + pass_regexes -= matched + + if not pass_regexes and not fail_regex: + break # Early exit if possible + + if pass_regexes: + pass_patterns = [regex.pattern for regex in pass_regexes] + return JobStatus.FAILED, JobStatusInfo( + message=f"Some pass patterns missing: {pass_patterns}", + context=lines[-NUM_LOG_PASS_CONTEXT_LINES:], + ) + + return None, None + + def get_runtime_from_logs(self) -> tuple[JobTime | None, JobTime | None]: + """Try to determine a job's runtime from its log file, using specified extensions.""" + # TODO: rather than check the job type here, in the future the sim tool plugin should + # define the job types it supports. Even longer term, perhaps the job time and sim time + # should not be defined on the JobHandle/CompletedJobStatus and should be directly parsed + # out of the resulting log artifacts by the respective flows. + sim_job_types = ["CompileSim", "RunTest", "CovUnr", "CovMerge", "CovReport", "CovAnalyze"] + supports_log_info_ext = self.spec.job_type in sim_job_types + if not supports_log_info_ext: + return None, None + + lines = self.get_lines() + if self._err_status: + return None, None + + try: + plugin = get_sim_tool_plugin(tool=self.spec.tool.name) + except NotImplementedError as e: + log.error("%s: %s", self.spec.full_name, str(e)) + return None, None + + runtime = None + try: + time, unit = plugin.get_job_runtime(log_text=lines) + runtime = JobTime(time, unit) + except RuntimeError as e: + log.warning("%s: %s", self.spec.full_name, str(e)) + + simulated_time = None + if self.spec.job_type == "RunTest": + try: + time, unit = plugin.get_simulated_time(log_text=lines) + simulated_time = JobTime(time, unit) + except RuntimeError as e: + log.debug("%s: %s", self.spec.full_name, str(e)) + + return runtime, simulated_time From f6aa10246a5ebb84a59654bf04c01a390dc48acc Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Thu, 26 Mar 2026 14:40:46 +0000 Subject: [PATCH 02/15] feat: add `LocalRuntimeBackend` backend This is the async `RuntimeBackend` replacement of the `LocalLauncher`, which will eventually by removed in lieu of this new backend. Some behavioural differences to note: - We now try to await() after a SIGKILL to be sure the process ended, bounded by a short timeout in case blocked at the kernel level. - We now use psutil to enumerate and kill descendent processes in addition to the created subprocess. This won't catch orphaned processes (needs e.g. cgroups), but should cover most sane usage. - The backend does _not_ link the output directories based on status (the `JobSpec.links`, e.g. "passing/", "failed/", "killed/"). The intention is that this detail is not core functionality for either the scheduler or the backends - instead, it will be implemented as an observer on the new async scheduler callbacks when introduced. By using async subprocesses and launching/killing jobs in batch, we are able to more efficiently launch jobs in parallel via async coroutines. We likewise avoid the ned to poll jobs - instead we have an async task awaiting the subprocess' completion, which we then forward to notify the (to be added) scheduler of the job's completion. Note that interactive jobs are still basically handled synchronously as before - assumed that there is only 1 interactive job running at a time. Signed-off-by: Alex Jones --- src/dvsim/runtime/local.py | 334 +++++++++++++++++++++++++++++++++++++ 1 file changed, 334 insertions(+) create mode 100644 src/dvsim/runtime/local.py diff --git a/src/dvsim/runtime/local.py b/src/dvsim/runtime/local.py new file mode 100644 index 00000000..71c98798 --- /dev/null +++ b/src/dvsim/runtime/local.py @@ -0,0 +1,334 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Legacy launcher adapter interface for the new async scheduler design.""" + +import asyncio +import contextlib +import shlex +import signal +import subprocess +import time +from collections.abc import Hashable, Iterable +from dataclasses import dataclass +from typing import TextIO + +import psutil + +from dvsim.job.data import JobSpec, JobStatusInfo +from dvsim.job.status import JobStatus +from dvsim.job.time import JobTime +from dvsim.logging import log +from dvsim.runtime.backend import RuntimeBackend +from dvsim.runtime.data import JobCompletionEvent, JobHandle + + +@dataclass(kw_only=True) +class LocalJobHandle(JobHandle): + """Job handle for a job belonging to a legacy launcher adapter runtime backend.""" + + process: asyncio.subprocess.Process | None + log_file: TextIO | None + start_time: float + kill_requested: bool = False + + +class LocalRuntimeBackend(RuntimeBackend): + """Launch jobs as subprocesses on the user's local machine.""" + + name = "local" + supports_interactive = True + + DEFAULT_SIGTERM_TIMEOUT = 2.0 # in seconds + DEFAULT_SIGKILL_TIMEOUT = 2.0 # in seconds + + def __init__( + self, + *, + max_parallelism: int | None = None, + sigterm_timeout: float | None = None, + sigkill_timeout: float | None = None, + ) -> None: + """Construct a local runtime backend. + + Args: + max_parallelism: The maximum number of jobs that can be dispatched to this backend + at once. `0` means no limit, `None` means no override is applied to the default. + sigterm_timeout: The time to wait for a process to die after a SIGTERM when killing + it, before sending SIGKILL. + sigkill_timeout: The time to wait for a process to die after a SIGKILL when killing + it, before giving up (so the scheduler can progress). + + """ + super().__init__(max_parallelism=max_parallelism) + self.sigterm_timeout = ( + sigterm_timeout if sigterm_timeout is not None else self.DEFAULT_SIGTERM_TIMEOUT + ) + self.sigkill_timeout = ( + sigkill_timeout if sigkill_timeout is not None else self.DEFAULT_SIGKILL_TIMEOUT + ) + + # Retain references to created asyncio tasks so they don't get GC'd. + self._tasks: set[asyncio.Task] = set() + + async def _log_from_pipe( + self, handle: LocalJobHandle, stream: asyncio.StreamReader | None + ) -> None: + """Write piped asyncio subprocess stream contents to a job's log file.""" + if stream is None or not handle.log_file: + return + try: + async for line in stream: + decoded = line.decode("utf-8", errors="surrogateescape") + handle.log_file.write(decoded) + handle.log_file.flush() + except asyncio.CancelledError: + pass + + async def _monitor_job(self, handle: LocalJobHandle) -> None: + """Wait for subprocess completion and emit a completion event.""" + if handle.process is None: + return + + if handle.log_file: + handle.log_file.write(f"[Executing]:\n{handle.spec.cmd}\n\n") + handle.log_file.flush() + + reader_tasks = [ + asyncio.create_task(self._log_from_pipe(handle, handle.process.stdout)), + asyncio.create_task(self._log_from_pipe(handle, handle.process.stderr)), + ] + status = JobStatus.KILLED + reason = None + + try: + exit_code = await asyncio.wait_for( + handle.process.wait(), timeout=handle.spec.timeout_secs + ) + runtime = time.monotonic() - handle.start_time + status, reason = self._finish_job(handle, exit_code, runtime) + except asyncio.TimeoutError: + await self._kill_job(handle) + status = JobStatus.KILLED + timeout_message = f"Job timed out after {handle.spec.timeout_mins} minutes" + reason = JobStatusInfo(message=timeout_message) + finally: + # Explicitly cancel reader tasks and wait for them to finish before closing the log + # file. We first give them a second to finish naturally to reduce log loss. + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait(reader_tasks, timeout=1) + for task in reader_tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*reader_tasks, return_exceptions=True) + + if handle.log_file: + handle.log_file.close() + if handle.kill_requested: + status = JobStatus.KILLED + reason = JobStatusInfo(message="Job killed!") + await self._emit_completion([JobCompletionEvent(handle.spec, status, reason)]) + + def _launch_interactive_job( + self, + job: JobSpec, + log_file: TextIO | None, + env: dict[str, str], + ) -> tuple[LocalJobHandle, JobCompletionEvent | None]: + """Launch a job in interactive mode with transparent stdin and stdout.""" + start_time = time.monotonic() + exit_code = None + completion = None + + if log_file is not None: + try: + proc = subprocess.Popen( + shlex.split(job.cmd), + # Transparent stdin/stdout, stdout & stderr muxed and tee'd via the pipe. + stdin=None, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + env=env, + ) + if proc.stdout is not None: + for line in proc.stdout: + print(line, end="") # noqa: T201 + log_file.write(line) + log_file.flush() + + exit_code = proc.wait() + except subprocess.SubprocessError as e: + log_file.close() + log.exception("Error launching job subprocess: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completion = JobCompletionEvent(job, JobStatus.KILLED, reason) + + runtime = time.monotonic() - start_time + handle = LocalJobHandle( + spec=job, + backend=self.name, + job_runtime=JobTime(), + simulated_time=JobTime(), + process=None, + log_file=log_file, + start_time=start_time, + ) + + if exit_code is not None: + status, reason = self._finish_job(handle, exit_code, runtime) + completion = JobCompletionEvent(job, status, reason) + + return handle, completion + + async def _launch_job( + self, + job: JobSpec, + log_file: TextIO | None, + env: dict[str, str], + ) -> tuple[LocalJobHandle | None, JobCompletionEvent | None]: + """Launch a job (in non-interactive mode) as an async subprocess.""" + proc = None + completion = None + if log_file is not None: + try: + proc = await asyncio.create_subprocess_exec( + *shlex.split(job.cmd), + # TODO: currently we mux the stdout and stderr streams by default. It would be + # useful to make this behaviour optional on some global `IoPolicy`. + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + except BlockingIOError: + # Skip this job for now; the scheduler should re-try to launch it later. + log_file.close() + return None, None + except subprocess.SubprocessError as e: + log_file.close() + log.exception("Error launching job subprocess: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completion = JobCompletionEvent(job, JobStatus.KILLED, reason) + + handle = LocalJobHandle( + spec=job, + backend=self.name, + job_runtime=JobTime(), + simulated_time=JobTime(), + process=proc, + log_file=log_file, + start_time=time.monotonic(), + ) + + # Create a task to asynchronously monitor the launched subprocess. + # We must store a reference in self._tasks to ensure the task is not GC'd. + if proc is not None: + task = asyncio.create_task(self._monitor_job(handle)) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + + return handle, completion + + async def submit_many(self, jobs: Iterable[JobSpec]) -> dict[Hashable, JobHandle]: + """Submit & launch multiple jobs. + + Returns: + mapping from job.id -> JobHandle. Entries are only present for jobs that successfully + launched; jobs that failed in a non-fatal way are missing, and should be retried. + + """ + completions: list[JobCompletionEvent] = [] + handles: dict[Hashable, JobHandle] = {} + + for job in jobs: + env = self._build_job_env(job) + self._prepare_launch(job) + + log_file = None + try: + log_file = job.log_path.open("w", encoding="utf-8", errors="surrogateescape") + except BlockingIOError: + continue # Skip this job for now; the scheduler should re-try to launch it later. + except OSError as e: + log.exception("Error writing to job log file: %s", job.full_name) + reason = JobStatusInfo(message=f"Failed to launch job: {e}") + completions.append(JobCompletionEvent(job, JobStatus.KILLED, reason)) + + if job.interactive: + handle, completion = self._launch_interactive_job(job, log_file, env) + else: + handle, completion = await self._launch_job(job, log_file, env) + if completion is not None: + completions.append(completion) + if handle is not None: + handles[job.id] = handle + + if completions: + await self._emit_completion(completions) + + return handles + + def _send_kill_signal(self, proc: asyncio.subprocess.Process, signal_num: int) -> None: + """Send a (kill) signal to a process and all its descendent processes.""" + # TODO: maybe this should use cgroups in the future to be thorough? + for child in psutil.Process(proc.pid).children(recursive=True): + child.send_signal(signal_num) + proc.send_signal(signal_num) + + async def _kill_job(self, handle: LocalJobHandle) -> None: + """Kill the running local process, sending SIGTERM and then SIGKILL if that didn't work.""" + proc = handle.process + if proc is None: + return + + if proc.returncode is None: + handle.kill_requested = True + try: + self._send_kill_signal(proc, signal.SIGTERM) + except ProcessLookupError: + return + + try: + await asyncio.wait_for(proc.wait(), timeout=self.sigterm_timeout) + except asyncio.TimeoutError: + pass + else: + return + + if proc.returncode is None: + log.warning( + "Job '%s' was not killed with SIGTERM after %g seconds, sending SIGKILL.", + handle.spec.full_name, + self.sigterm_timeout, + ) + try: + self._send_kill_signal(proc, signal.SIGKILL) + except ProcessLookupError: + return + + try: + await asyncio.wait_for(proc.wait(), timeout=self.sigkill_timeout) + except asyncio.TimeoutError: + # proc.wait() completes only when the kernel reaps the process. If we sent SIGKILL + # and did not see this happen for a bit, the process is probably blocked in the + # kernel somewhere (e.g. NFS hang, slow or dead disk I/O). + log.error( + "Job '%s' was not killed with SIGKILL after %g seconds, so give up on it.", + handle.spec.full_name, + self.sigkill_timeout, + ) + + async def kill_many(self, handles: Iterable[JobHandle]) -> None: + """Cancel ongoing jobs via their handle. Killed jobs should still "complete".""" + tasks = [] + for handle in handles: + if not isinstance(handle, LocalJobHandle): + msg = f"Local backend expected handle of type LocalJobHandle, not `{type(handle)}`." + raise TypeError(msg) + if handle.process and not handle.kill_requested and handle.process.returncode is None: + tasks.append(asyncio.create_task(self._kill_job(handle))) + + if tasks: + # Wait for all job subprocesses to be killed; `_monitor_job` handles the completions. + await asyncio.gather(*tasks) From e459a98f7506f3bb2a2015d4d8c05b1156a8eb5e Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 01:40:58 +0000 Subject: [PATCH 03/15] feat: introduce a new `SCHEDULED` job status See the explanatory comments added to JobStatus. The intention is that the new async scheduler will distinguish between jobs that are blocked due to unfinished dependencies (`SCHEDULED`), and those that are pending because there is no availability to run them, despite their dependencies being fulfilled (`QUEUED`). This new state is currently unused. Also add a short test to prevent potential future bugs from status shorthand name collisions. Signed-off-by: Alex Jones --- src/dvsim/instrumentation/resources.py | 2 +- src/dvsim/instrumentation/timing.py | 2 +- src/dvsim/job/status.py | 13 ++++++++----- tests/job/test_status.py | 19 +++++++++++++++++++ 4 files changed, 29 insertions(+), 7 deletions(-) create mode 100644 tests/job/test_status.py diff --git a/src/dvsim/instrumentation/resources.py b/src/dvsim/instrumentation/resources.py index d02ff82c..8b9505cb 100644 --- a/src/dvsim/instrumentation/resources.py +++ b/src/dvsim/instrumentation/resources.py @@ -227,7 +227,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: with self._lock: running = job_id in self._running_jobs started = running or job_id in self._finished_jobs - if not started and status != JobStatus.QUEUED: + if not started and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): self._running_jobs[job_id] = JobResourceAggregate(job) running = True if running and status.is_terminal: diff --git a/src/dvsim/instrumentation/timing.py b/src/dvsim/instrumentation/timing.py index d0c1192b..9766c34a 100644 --- a/src/dvsim/instrumentation/timing.py +++ b/src/dvsim/instrumentation/timing.py @@ -99,7 +99,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: job_info = TimingJobFragment(job) self._jobs[job_id] = job_info - if job_info.start_time is None and status != JobStatus.QUEUED: + if job_info.start_time is None and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): job_info.start_time = time.perf_counter() if status.is_terminal: job_info.end_time = time.perf_counter() diff --git a/src/dvsim/job/status.py b/src/dvsim/job/status.py index e409a155..7076caae 100644 --- a/src/dvsim/job/status.py +++ b/src/dvsim/job/status.py @@ -12,11 +12,14 @@ class JobStatus(Enum): """Status of a Job.""" - QUEUED = auto() - RUNNING = auto() - PASSED = auto() - FAILED = auto() - KILLED = auto() + # SCHEDULED is currently unused in the old sync scheduler, there `SCHEDULED` and `QUEUED` + # are combined under `QUEUED`. It is intended to be used in the new async scheduler. + SCHEDULED = auto() # Waiting for dependencies + QUEUED = auto() # Dependencies satisfied, waiting to be dispatched + RUNNING = auto() # Dispatched to a backend and actively executing + PASSED = auto() # Completed successfully + FAILED = auto() # Completed with failure + KILLED = auto() # Forcibly terminated or never executed @property def shorthand(self) -> str: diff --git a/tests/job/test_status.py b/tests/job/test_status.py new file mode 100644 index 00000000..16ff28b2 --- /dev/null +++ b/tests/job/test_status.py @@ -0,0 +1,19 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Test Job (scheduler) status modelling.""" + +from hamcrest import assert_that, equal_to + +from dvsim.job.status import JobStatus + + +class TestJobStatus: + """Test scheduler JobStatus models.""" + + @staticmethod + def test_unique_shorthands() -> None: + """Test that all scheduler job statuses have unique shorthand representations.""" + shorthands = [status.shorthand for status in JobStatus] + assert_that(len(set(shorthands)), equal_to(len(shorthands))) From b6e3a43c7f46ce62652aa8e4d0c3fae5861b23f2 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 01:45:37 +0000 Subject: [PATCH 04/15] feat: add optional backend field to the `JobSpec` model This field will be used to inform the new scheduler of which backend it should use to execute a job. Though the plumbing is not there in the rest of DVSim, the intent is to make the scheduler such that it could feasibly be run with multiple backends (e.g. some jobs faked, some jobs on the local machine, some dispatched to various remote clusters). To support this design, each job spec can now specify that it should be run on a certain backend, with some designated string name. To instead just use the configured default backend (which is the current behaviour, as the current scheduler only supports one backend / `launcher_cls`), this can be set to `None`. Signed-off-by: Alex Jones --- src/dvsim/instrumentation/metadata.py | 2 ++ src/dvsim/job/data.py | 5 +++++ src/dvsim/job/deploy.py | 3 +++ tests/test_scheduler.py | 1 + 4 files changed, 11 insertions(+) diff --git a/src/dvsim/instrumentation/metadata.py b/src/dvsim/instrumentation/metadata.py index 7871db1a..f674969f 100644 --- a/src/dvsim/instrumentation/metadata.py +++ b/src/dvsim/instrumentation/metadata.py @@ -29,6 +29,7 @@ class MetadataJobFragment(JobFragment): job_type: str target: str tool: str + backend: str | None dependencies: list[str] status: str @@ -61,6 +62,7 @@ def build_report_fragments(self) -> InstrumentationFragments | None: spec.job_type, spec.target, spec.tool.name, + spec.backend, spec.dependencies, status_str, ) diff --git a/src/dvsim/job/data.py b/src/dvsim/job/data.py index d820738c..39d9a936 100644 --- a/src/dvsim/job/data.py +++ b/src/dvsim/job/data.py @@ -54,6 +54,11 @@ class JobSpec(BaseModel): target: str """run phase [build, run, ...]""" + backend: str | None + """The runtime backend to execute this job with. If not provided (None), this + indicates that whatever is configured as the 'default' backend should be used. + """ + seed: int | None """Seed if there is one.""" diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 71247b85..9bfd409d 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -110,6 +110,9 @@ def get_job_spec(self) -> "JobSpec": name=self.name, job_type=self.__class__.__name__, target=self.target, + # TODO: for now we always use the default configured backend, but it might be good + # to allow different jobs to run on different backends in the future? + backend=None, seed=getattr(self, "seed", None), full_name=self.full_name, qual_name=self.qual_name, diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 042357ed..62e13de8 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -312,6 +312,7 @@ def job_spec_factory( "name": "test_job", "job_type": "mock_type", "target": "mock_target", + "backend": None, "seed": None, "dependencies": [], "needs_all_dependencies_passing": True, From ee719941f30c9d6c129c3ce3ff80740d37a103d5 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 03:18:56 +0000 Subject: [PATCH 05/15] feat: introduce new async scheduler For now, this is separated in `async_core.py` - the intention is that it will eventually replace the scheduler in `core.py` when all necessary components for it to work are integrated. This commit contains the fully async scheduler design. Some notes: - Everything is now async. The scheduler is no longer tied to a Timer object, nor does it have to manage its print interval and poll frequency. It takes advantage of parallelism via cooperative multitasking as much as possible. - The scheduler is designed to support multiple different backends (new async versions of launchers). Jobs are dispatch according to their specifications and scheduler parameters. - The scheduler implements the Observer pattern for various events (start, end, job status change, kill signal), allowing consumers that want to use this functionality (e.g. instrumentation, status printer) to hook into the scheduler, instead of unnecessarily coupling code. - The previous scheduler only recognized killed jobs when they were reached in the queue and their status was updated. The new design immediately transitively updates jobs to instantly reflect status updates of all jobs when information is known. - Since the scheduler knows _why_ it is killing the jobs, we attach JobStatusInfo information to give more info in the failure buckets. - The job DAG is indexed and validated during initialization; dependency cycles are detected and cause an error to be raised. - Job info is encapsulated by records, keeping state centralized (outside of indexes). - The scheduler now accepts a prioritization function. It schedules jobs in a heap and schedules according to highest priority. Default prioritisation is by weights, but this can be customized. - The scheduler now has its own separate modifiable parallelism limit. - The scheduler has it sown separate modifiable parallelism limit separate from each individual backend's parallelism limit. Signed-off-by: Alex Jones --- src/dvsim/job/status.py | 2 +- src/dvsim/scheduler/async_core.py | 582 ++++++++++++++++++++++++++++++ 2 files changed, 583 insertions(+), 1 deletion(-) create mode 100644 src/dvsim/scheduler/async_core.py diff --git a/src/dvsim/job/status.py b/src/dvsim/job/status.py index 7076caae..457f3bc2 100644 --- a/src/dvsim/job/status.py +++ b/src/dvsim/job/status.py @@ -13,7 +13,7 @@ class JobStatus(Enum): """Status of a Job.""" # SCHEDULED is currently unused in the old sync scheduler, there `SCHEDULED` and `QUEUED` - # are combined under `QUEUED`. It is intended to be used in the new async scheduler. + # are combined under `QUEUED`. It is used only in the new async scheduler. SCHEDULED = auto() # Waiting for dependencies QUEUED = auto() # Dependencies satisfied, waiting to be dispatched RUNNING = auto() # Dispatched to a backend and actively executing diff --git a/src/dvsim/scheduler/async_core.py b/src/dvsim/scheduler/async_core.py new file mode 100644 index 00000000..651ad983 --- /dev/null +++ b/src/dvsim/scheduler/async_core.py @@ -0,0 +1,582 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Job scheduler.""" + +import asyncio +import heapq +from collections import defaultdict +from collections.abc import Callable, Iterable, Mapping, Sequence +from dataclasses import dataclass, field +from signal import SIGINT, SIGTERM, getsignal, signal +from types import FrameType +from typing import Any, TypeAlias + +from dvsim.job.data import CompletedJobStatus, JobSpec, JobStatusInfo +from dvsim.job.status import JobStatus +from dvsim.logging import log +from dvsim.runtime.backend import RuntimeBackend +from dvsim.runtime.data import JobCompletionEvent, JobHandle + +__all__ = ( + "JobPriorityFn", + "JobRecord", + "OnJobStatusChangeCb", + "OnRunEndCb", + "OnRunStartCb", + "OnSchedulerKillCb", + "Priority", + "Scheduler", +) + + +@dataclass +class JobRecord: + """Mutable runtime representation of a scheduled job, used in the scheduler.""" + + spec: JobSpec + backend_key: str # either spec.backend, or the default backend if not given + + status: JobStatus = JobStatus.SCHEDULED + status_info: JobStatusInfo | None = None + + remaining_deps: int = 0 + passing_deps: int = 0 + dependents: list[str] = field(default_factory=list) + kill_requested: bool = False + + handle: JobHandle | None = None + + +# Function to assign a priority to a given job specification. The returned priority should be +# some lexicographically orderable type. Jobs with higher priority are scheduled first. +Priority: TypeAlias = int | float | Sequence[int | float] +JobPriorityFn: TypeAlias = Callable[[JobRecord], Priority] + +# Callbacks for observers, for when the scheduler run starts and stops +OnRunStartCb: TypeAlias = Callable[[], None] +OnRunEndCb: TypeAlias = Callable[[], None] + +# Callbacks for observers, for when a job status changes in the scheduler +# The arguments are: (job spec, old status, new status). +OnJobStatusChangeCb: TypeAlias = Callable[[JobSpec, JobStatus, JobStatus], None] + +# Callbacks for observers, for when the scheduler receives a kill signal (termination). +OnSchedulerKillCb: TypeAlias = Callable[[], None] + + +# Standard context messages used for killed/failed jobs in the scheduler. +FAILED_DEP = JobStatusInfo( + message="Job cancelled because one of its dependencies failed or was killed." +) +ALL_FAILED_DEP = JobStatusInfo( + message="Job cancelled because all of its dependencies failed or were killed." +) +KILLED_SCHEDULED = JobStatusInfo( + message="Job cancelled because one of its dependencies was killed." +) +KILLED_QUEUED = JobStatusInfo(message="Job killed whilst waiting to begin execution.") +KILLED_RUNNING_SIGINT = JobStatusInfo( + message="Job killed by a SIGINT signal to the scheduler whilst executing." +) +KILLED_RUNNING_SIGTERM = JobStatusInfo( + message="Job killed by a SIGTERM signal to the scheduler whilst executing." +) + + +class Scheduler: + """Event-driven job scheduler that schedules and runs a DAG of job specifications.""" + + def __init__( # noqa: PLR0913 + self, + jobs: Iterable[JobSpec], + backends: Mapping[str, RuntimeBackend], + default_backend: str, + *, + max_parallelism: int = 0, + priority_fn: JobPriorityFn | None = None, + coalesce_window: float | None = 0.001, + ) -> None: + """Construct a new scheduler to run a DAG of jobs. + + Args: + jobs: The DAG of jobs to run. A sequence of job specifications, where the DAG is + defined by the job IDs and job dependency lists. + backends: The mapping (name -> backend) of backends available to the scheduler. + default_backend: The name of the default backend to use if not specified by a job. + max_parallelism: The maximum number of jobs that the scheduler is allowed to dispatch + at once, across all backends. The default value of `0` indicates no upper limit. + priority_fn: A function to calculate the priority of a given job. If no function is + given, this defaults to using the job's weight. + coalesce_window: If specified, the time in seconds to wait on receiving a job + completion, to give a short amount of time to allow other batched completion events + to arrive in the queue. This lets us batch scheduling more frequently for a little + extra cost. Defaults to 1 millisecond, and can be disabled by giving `None`. + + """ + if max_parallelism < 0: + err = f"max_parallelism must be some non-negative integer, not {max_parallelism}" + raise ValueError(err) + if default_backend not in backends: + err = f"Default backend '{default_backend}' is not in the mapping of given backends" + raise ValueError(err) + if coalesce_window is not None and coalesce_window < 0.0: + raise ValueError("coalesce_window must be None or some non-negative number") + + # Configuration of the scheduler's behaviour + self.backends = dict(backends) + self.default_backend = default_backend + self.max_parallelism = max_parallelism + self.priority_fn = priority_fn or self._default_priority + self.coalesce_window = coalesce_window + + # Internal data structures and indexes to track running jobs. + self._jobs: dict[str, JobRecord] = {} + self._ready_heap: list[tuple[Priority, str]] = [] + self._running: set[str] = set() + self._running_per_backend: dict[str, int] = dict.fromkeys(backends, 0) + self._event_queue: asyncio.Queue[Iterable[JobCompletionEvent]] = asyncio.Queue() + + # Internal flags and signal handling + self._shutdown_signal: int | None = None + self._shutdown_event: asyncio.Event | None = None + self._original_sigint_handler: Any = None + self._shutdown_started = False + + # Registered callbacks from observers + self._on_run_start: list[OnRunStartCb] = [] + self._on_run_end: list[OnRunEndCb] = [] + self._on_job_status_change: list[OnJobStatusChangeCb] = [] + self._on_kill_signal: list[OnSchedulerKillCb] = [] + + self._build_graph(jobs) + + def add_run_start_callback(self, cb: OnRunStartCb) -> None: + """Register an observer to notify when the scheduler run is started.""" + self._on_run_start.append(cb) + + def add_run_end_callback(self, cb: OnRunEndCb) -> None: + """Register an observer to notify when the scheduler run ends.""" + self._on_run_end.append(cb) + + def add_job_status_change_callback(self, cb: OnJobStatusChangeCb) -> None: + """Register an observer to notify when the status of a job in the scheduler changes.""" + self._on_job_status_change.append(cb) + + def add_kill_signal_callback(self, cb: OnSchedulerKillCb) -> None: + """Register an observer to notify when the scheduler is killed by some signal.""" + self._on_kill_signal.append(cb) + + def _default_priority(self, job: JobRecord) -> Priority: + """Prioritizes jobs according to their weight. The default prioritization method.""" + return job.spec.weight + + def _build_graph(self, specs: Iterable[JobSpec]) -> None: + """Build the job dependency graph and validate the DAG structure.""" + # Build an index of runtime job records, and check for duplicates + for spec in specs: + if spec.id in self._jobs: + log.warning("Duplicate job ID '%s'", spec.id) + # TODO: when we're sure it's ok, change the behaviour to error on duplicate jobs + # : err = f"Duplicate job ID '{spec.id}'" + # : raise ValueError(err) + # Instead, silently ignore it for now to match the original scheduler behaviour + continue + if spec.backend is not None and spec.backend not in self.backends: + err = f"Unknown job backend '{spec.backend}'" + raise ValueError(err) + backend_name = self.default_backend if spec.backend is None else spec.backend + self._jobs[spec.id] = JobRecord(spec=spec, backend_key=backend_name) + + # Build a graph from the adjacency list formed by the spec dependencies + for job in self._jobs.values(): + job.remaining_deps = len(job.spec.dependencies) + for dep in job.spec.dependencies: + if dep not in self._jobs: + err = f"Unknown job dependency '{dep}' for job {job.spec.id}" + raise ValueError(err) + self._jobs[dep].dependents.append(job.spec.id) + + # Validate that there are no cycles in the given graph. + self._validate_acyclic() + + def _validate_acyclic(self) -> None: + """Validate that the given job digraph is acyclic via Kahn's Algorithm.""" + indegree = {job: record.remaining_deps for job, record in self._jobs.items()} + job_queue = [job for job, degree in indegree.items() if degree == 0] + num_visited = 0 + + while job_queue: + job = job_queue.pop() + num_visited += 1 + for dep in self._jobs[job].dependents: + indegree[dep] -= 1 + if indegree[dep] == 0: + job_queue.append(dep) + + if num_visited != len(self._jobs): + raise ValueError("The given JobSpec graph contains a dependency cycle.") + + def _notify_run_started(self) -> None: + """Notify any observers that the scheduler run has started.""" + for cb in self._on_run_start: + cb() + + def _notify_run_finished(self) -> None: + """Notify any observers that the scheduler run has finished.""" + for cb in self._on_run_end: + cb() + + def _notify_kill_signal(self) -> None: + """Notify any observers that the scheduler received a kill signal.""" + for cb in self._on_kill_signal: + cb() + + def _change_job_status( + self, job: JobRecord, new_status: JobStatus, info: JobStatusInfo | None = None + ) -> JobStatus: + """Change a job's runtime status, storing an optionally associated reason. + + Notifies any status change observers of the change, and returns the previous status. + """ + old_status = job.status + if old_status == new_status: + return old_status + + job.status = new_status + job.status_info = info + + if new_status != JobStatus.RUNNING: + log.log( + log.ERROR if new_status in (JobStatus.FAILED, JobStatus.KILLED) else log.VERBOSE, + "Status change to [%s: %s] for %s", + new_status.shorthand, + new_status.name.capitalize(), + job.spec.full_name, + ) + + for cb in self._on_job_status_change: + cb(job.spec, old_status, new_status) + + return old_status + + def _mark_job_ready(self, job: JobRecord) -> None: + """Mark a given job in the scheduler as ready to execute (all dependencies completed).""" + if job.status != JobStatus.SCHEDULED: + msg = f"_mark_job_ready only applies to 'SCHEDULED' jobs (not '{job.status.name}')." + raise RuntimeError(msg) + + self._change_job_status(job, JobStatus.QUEUED) + # heapq is a min heap, so push (-priority) instead of (priority). + priority = self.priority_fn(job) + priority = priority if isinstance(priority, Sequence) else (priority,) + neg_priority: Priority = tuple(-x for x in priority) + heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) + + def _mark_job_running(self, job: JobRecord) -> None: + """Mark a given job in the scheduler as running. Assumes already removed from the heap.""" + if job.spec.id in self._running: + raise RuntimeError("_mark_job_running called on a job that was already running.") + + self._change_job_status(job, JobStatus.RUNNING) + self._running.add(job.spec.id) + self._running_per_backend[job.backend_key] += 1 + + def _mark_job_completed( + self, job: JobRecord, status: JobStatus, reason: JobStatusInfo | None + ) -> None: + """Mark a given job in the scheduler as completed, having reached some terminal state.""" + if not status.is_terminal: + err = f"_mark_job_completed called with non-terminal status '{status.name}'" + raise RuntimeError(err) + if job.status.is_terminal: + return + + # If the scheduler requested to kill the job, override the failure reason. + if job.kill_requested: + reason = ( + KILLED_RUNNING_SIGINT if self._shutdown_signal == SIGINT else KILLED_RUNNING_SIGTERM + ) + self._change_job_status(job, status, reason) + + # If the job was running, mark it as no longer running. + if job.spec.id in self._running: + self._running.remove(job.spec.id) + self._running_per_backend[job.backend_key] -= 1 + + # Update dependents (jobs that depend on this job), propagating failures if needed. + self._update_completed_job_deps(job) + + def _update_completed_job_deps(self, job: JobRecord) -> None: + """Update the dependencies of a completed job, scheduling/killing deps where necessary.""" + for dep_id in job.dependents: + dep = self._jobs[dep_id] + + # Update dependency tracking counts in the dependency records + dep.remaining_deps -= 1 + if job.status == JobStatus.PASSED: + dep.passing_deps += 1 + + # Propagate kill signals on shutdown + if self._shutdown_signal is not None: + self._mark_job_completed(dep, JobStatus.KILLED, KILLED_SCHEDULED) + continue + + # Handle dependency management and marking dependents as ready + if dep.remaining_deps == 0 and dep.status == JobStatus.SCHEDULED: + if dep.spec.needs_all_dependencies_passing: + if dep.passing_deps == len(dep.spec.dependencies): + self._mark_job_ready(dep) + else: + self._mark_job_completed(dep, JobStatus.KILLED, FAILED_DEP) + elif dep.passing_deps > 0: + self._mark_job_ready(dep) + else: + self._mark_job_completed(dep, JobStatus.KILLED, ALL_FAILED_DEP) + + async def run(self) -> list[CompletedJobStatus]: + """Run all scheduled jobs to completion (unless terminated) and return the results.""" + self._install_signal_handlers() + + for backend in self.backends.values(): + backend.attach_completion_callback(self._submit_job_completion) + + self._notify_run_started() + + # Before entering the main loop, mark jobs with 0 remaining deps as ready to run. + for job in self._jobs.values(): + if job.remaining_deps == 0: + self._mark_job_ready(job) + + try: + await self._main_loop() + finally: + self._notify_run_finished() + + return [ + CompletedJobStatus( + name=job.spec.name, + job_type=job.spec.job_type, + seed=job.spec.seed, + block=job.spec.block, + tool=job.spec.tool, + workspace_cfg=job.spec.workspace_cfg, + full_name=job.spec.full_name, + qual_name=job.spec.qual_name, + target=job.spec.target, + log_path=job.spec.log_path, + job_runtime=job.handle.job_runtime.with_unit("s").get()[0] + if job.handle is not None + else 0.0, + simulated_time=job.handle.simulated_time.with_unit("us").get()[0] + if job.handle is not None + else 0.0, + status=job.status, + fail_msg=job.status_info, + ) + for job in self._jobs.values() + ] + + def _install_signal_handlers(self) -> None: + """Install the SIGINT/SIGTERM signal handlers to trigger graceful shutdowns.""" + self._shutdown_signal = None + self._shutdown_event = asyncio.Event() + self._original_sigint_handler = getsignal(SIGINT) + self._shutdown_started = False + loop = asyncio.get_running_loop() + + def _handler(signum: int, _frame: FrameType | None) -> None: + if self._shutdown_signal is None and self._shutdown_event: + self._shutdown_signal = signum + loop.call_soon_threadsafe(self._shutdown_event.set) + + # Restore the original SIGINT handler so a second Ctrl-C terminates immediately + if signum == SIGINT: + signal(SIGINT, self._original_sigint_handler) + + loop.add_signal_handler(SIGINT, lambda: _handler(SIGINT, None)) + loop.add_signal_handler(SIGTERM, lambda: _handler(SIGTERM, None)) + + async def _submit_job_completion(self, events: Iterable[JobCompletionEvent]) -> None: + """Notify the scheduler that a batch of jobs have been completed.""" + try: + self._event_queue.put_nowait(events) + except asyncio.QueueShutDown as e: + msg = "Scheduler event queue shutdown earlier than expected?" + raise RuntimeError(msg) from e + except asyncio.QueueFull: + log.critical("Scheduler event queue full despite being infinitely sized?") + + async def _main_loop(self) -> None: + """Run the main scheduler loop. + + Tries to schedule any ready jobs if there is available capacity, and then waits for any job + completions (or a shutdown signal). This continues in a loop until all jobs have been either + executed or killed (e.g. via a shutdown signal). + """ + if self._shutdown_event is None: + raise RuntimeError("Expected signal handlers to be installed before running main loop") + + job_completion_task = asyncio.create_task(self._event_queue.get()) + shutdown_task = asyncio.create_task(self._shutdown_event.wait()) + + try: + while True: + await self._schedule_ready_jobs() + + if not self._running: + if not self._ready_heap: + break + # This case (nothing running, but jobs still pending in the queue) can happen + # if backends fail to schedule any jobs (e.g. the backend is temporarily busy). + continue + + # Wait for any job to complete, or for a shutdown signal + try: + done, _ = await asyncio.wait( + (job_completion_task, shutdown_task), + return_when=asyncio.FIRST_COMPLETED, + ) + except asyncio.QueueShutDown as e: + msg = "Scheduler event queue shutdown earlier than expected?" + raise RuntimeError(msg) from e + + if shutdown_task in done: + self._shutdown_event.clear() + shutdown_task = asyncio.create_task(self._shutdown_event.wait()) + await self._handle_exit_signal() + continue + + completions = await self._drain_completions(job_completion_task) + job_completion_task = asyncio.create_task(self._event_queue.get()) + + for event in completions: + job = self._jobs[event.spec.id] + self._mark_job_completed(job, event.status, event.reason) + finally: + job_completion_task.cancel() + shutdown_task.cancel() + + async def _drain_completions(self, completion_task: asyncio.Task) -> list[JobCompletionEvent]: + """Drain batched completions from the queue, optionally coalescing batched events.""" + events = list(completion_task.result()) + + # Coalesce nearby completions by waiting for a very short time + if self.coalesce_window is not None: + await asyncio.sleep(self.coalesce_window) + + # Drain any more completion events from the event queue + try: + while True: + events.extend(self._event_queue.get_nowait()) + except asyncio.QueueEmpty: + return events + except asyncio.QueueShutDown as e: + msg = "Scheduler event queue shutdown earlier than expected?" + raise RuntimeError(msg) from e + + async def _handle_exit_signal(self) -> None: + """Attempt to gracefully shutdown as a result of a triggered exit signal.""" + if self._shutdown_started: + return + self._shutdown_started = True + + signal_name = "SIGTERM" if self._shutdown_signal == SIGTERM else "SIGINT" + log.info("Received %s signal. Exiting gracefully", signal_name) + if self._shutdown_signal == SIGINT: + log.info( + "Send another to force immediate quit (but you may need to manually " + "kill some child processes)." + ) + + self._notify_kill_signal() + + # Mark any jobs that are currently running as jobs we should kill. + # Collect jobs to kill in a dict, grouped per backend, for batched killing. + to_kill: dict[str, list[JobHandle]] = defaultdict(list) + + for job_id in self._running: + job = self._jobs[job_id] + if job.handle is None: + raise RuntimeError("Running job is missing an associated handle.") + job.kill_requested = True + to_kill[job.backend_key].append(job.handle) + + # Asynchronously dispatch backend kill tasks whilst we update scheduler internals. + # Jobs that depend on these jobs will then be transitively killed before they start. + kill_tasks: list[asyncio.Task] = [] + for backend_name, handles in to_kill.items(): + backend = self.backends[backend_name] + kill_tasks.append(asyncio.create_task(backend.kill_many(handles))) + + # Kill any ready (but not running jobs), so that they don't get scheduled. + while self._ready_heap: + _, job_id = heapq.heappop(self._ready_heap) + job = self._jobs[job_id] + self._mark_job_completed(job, JobStatus.KILLED, KILLED_QUEUED) + + if kill_tasks: + await asyncio.gather(*kill_tasks, return_exceptions=True) + + async def _schedule_ready_jobs(self) -> None: + """Attempt to schedule ready jobs whilst respecting scheduler & backend parallelism.""" + # Find out how many jobs we can dispatch according to the scheduler's parallelism limit + available_slots = ( + self.max_parallelism - len(self._running) + if self.max_parallelism + else len(self._ready_heap) + ) + if available_slots <= 0: + return + + # Collect jobs to launch in a dict, grouped per backend, for batched launching. + to_launch: dict[str, list[tuple[Priority, JobRecord]]] = defaultdict(list) + blocked: list[tuple[Priority, str]] = [] + slots_used = 0 + + while self._ready_heap and slots_used < available_slots: + neg_priority, job_id = heapq.heappop(self._ready_heap) + job = self._jobs[job_id] + backend = self.backends[job.backend_key] + running_on_backend = self._running_per_backend[job.backend_key] + len( + to_launch[job.backend_key] + ) + + # Check that we can launch the job whilst respecting backend parallelism limits + if backend.max_parallelism and running_on_backend >= backend.max_parallelism: + blocked.append((neg_priority, job_id)) + continue + + to_launch[job.backend_key].append((neg_priority, job)) + slots_used += 1 + + # Requeue any blocked jobs. + for entry in blocked: + heapq.heappush(self._ready_heap, entry) + + # Launch the selected jobs in batches per backend + launch_tasks = [] + for backend_name, jobs in to_launch.items(): + backend = self.backends[backend_name] + job_specs = [job.spec for _, job in jobs] + log.verbose( + "[%s]: Dispatching jobs: %s", + backend_name, + ", ".join(job.full_name for job in job_specs), + ) + launch_tasks.append(backend.submit_many(job_specs)) + + results = await asyncio.gather(*launch_tasks) + + # Mark jobs running, and requeue any jobs that failed to launch + for jobs, handles in zip(to_launch.values(), results, strict=True): + for neg_priority, job in jobs: + handle = handles.get(job.spec.id) + if handle is None: + log.verbose("[%s]: Requeuing job '%s'", job.spec.target, job.spec.full_name) + heapq.heappush(self._ready_heap, (neg_priority, job.spec.id)) + continue + + job.handle = handle + self._mark_job_running(job) From 907f79b43617ef4ca6b3a431b3f361683a975ca8 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 12:17:47 +0000 Subject: [PATCH 06/15] feat: implement new async base `StatusPrinter` class Add the new async `StatusPrinter` abstract base class, intended to replace the original for use with the new async scheduler. The original will not removed until the scheduler has been switched. This is now an abstract base class, rather than an empty class used for interactive sessions - since the status printer will live outside the new scheduler, it becomes much easier to just _not connect_ any status printer hooks during interactive mode. Some notable changes and overhauls: - Status printing now runs entirely independently of the scheduler. If a print interval > 0 is configured, then the status printer now runs as a loop with async awaits such that the timing logic is entirely separate from the scheduler, maintained by cooperative multitasking. - As new functionality, if a print interval of 0 is configured, we instead activate in synchronous "event/update-driven mode" where every single status update is printed. This might be useful for e.g. the TTY printer where you may want to capture exact times of all updates. - As a result of observing the scheduler, the status printer maintains its own stateful tracking of job information. - Field alignment is calculated from the initial job information and data is appropriately justified to clean up the output tables. - The ability to pause the status bar is introduced to help (later) deal with issues in the EnlightenStatusBar, where its terminal interactivity can be broken and cause hangs under heavy load. - General refactoring: the status header and fields are no longer hardcoded and are instead derived from the JobStatus enum. Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 2 + src/dvsim/scheduler/async_status_printer.py | 171 ++++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 src/dvsim/scheduler/async_status_printer.py diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index c25c1dd8..42d3bb58 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -43,6 +43,7 @@ from dvsim.launcher.sge import SgeLauncher from dvsim.launcher.slurm import SlurmLauncher from dvsim.logging import LOG_LEVELS, configure_logging, log +from dvsim.scheduler.async_status_printer import StatusPrinter from dvsim.scheduler.status_printer import get_status_printer from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout @@ -884,6 +885,7 @@ def main(argv: list[str] | None = None) -> None: # Register the common deploy settings. Timer.print_interval = args.print_interval + StatusPrinter.print_interval = args.print_interval LocalLauncher.max_parallel = args.max_parallel SlurmLauncher.max_parallel = args.max_parallel SgeLauncher.max_parallel = args.max_parallel diff --git a/src/dvsim/scheduler/async_status_printer.py b/src/dvsim/scheduler/async_status_printer.py new file mode 100644 index 00000000..4309b6c5 --- /dev/null +++ b/src/dvsim/scheduler/async_status_printer.py @@ -0,0 +1,171 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Job status printing during a scheduled run.""" + +import asyncio +import time +from abc import ABC, abstractmethod +from collections import defaultdict +from collections.abc import Sequence + +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus + + +class StatusPrinter(ABC): + """Status Printer abstract base class. + + Contains core functionality related to status printing - a print interval can be configured + to control how often the scheduler target statuses are printed, which is managed by an async + thread. Optionally, the print interval can be configured to 0 to run in an update-driven mode + where every single status update is printed. Regardless of the configured print interval, the + final job update for each target is printed immediately to reflect final target end timings. + """ + + # How often we print by default. Zero means we should print on every event change. + print_interval = 0 + + def __init__(self, jobs: Sequence[JobSpec], print_interval: int | None = None) -> None: + """Construct the base StatusPrinter.""" + # Mapping from target -> (Mapping from status -> count) + self._target_counts: dict[str, dict[JobStatus, int]] = defaultdict(lambda: defaultdict(int)) + # Mapping from target -> number of jobs + self._totals: dict[str, int] = defaultdict(int) + + for job in jobs: + self._target_counts[job.target][JobStatus.SCHEDULED] += 1 + self._totals[job.target] += 1 + + # The number of characters used to represent the largest field in the displayed table + self._field_width = max((len(str(total)) for total in self._totals.values()), default=0) + + # State tracking for the StatusPrinter + self._start_time: float = 0.0 + self._last_print: float = 0.0 + self._running: dict[str, list[str]] = defaultdict(list) + self._num_finished: dict[str, int] = defaultdict(int) + self._finish_time: dict[str, float] = {} + + # Async target status update handling + self._task: asyncio.Task | None = None + self._paused: bool = False + + self._interval = print_interval if print_interval is not None else self.print_interval + + @property + def updates_every_event(self) -> bool: + """If the configured print interval is 0, statuses are updated on every state change.""" + return self._interval <= 0 + + def start(self) -> None: + """Start printing the status of the scheduled jobs.""" + self._start_time = time.monotonic() + self._print_header() + for target in self._target_counts: + self._init_target(target, self._get_target_row(target)) + + # If we need an async task to manage the print interval, create one + if not self.updates_every_event: + self._task = asyncio.create_task(self._run()) + + async def _run(self) -> None: + """Run a timer in an async loop, printing the updated status at every interval.""" + next_tick = self._start_time + self._interval + self.update_all_targets(including_unstarted=True) + + while True: + now = time.monotonic() + sleep_time = max(0, next_tick - now) + await asyncio.sleep(sleep_time) + self.update_all_targets() + next_tick += self._interval + + def update_all_targets(self, *, including_unstarted: bool = False) -> None: + """Update the status bars of all targets.""" + if self._paused: + return + update_time = time.monotonic() + for target in self._target_counts: + # Only update targets that have started (some job status has changed) + if self.target_is_started(target) or including_unstarted: + target_update_time = self._finish_time.get(target, update_time) + self._update_target(target_update_time, target) + + def target_is_started(self, target: str) -> bool: + """Check whether a target has been started yet or not.""" + return bool(self._num_finished[target]) or bool(self._running[target]) + + def target_is_done(self, target: str) -> bool: + """Check whether a target is finished or not.""" + return self._num_finished[target] >= self._totals[target] + + def update_status(self, job: JobSpec, old_status: JobStatus, new_status: JobStatus) -> None: + """Update the status printer to reflect a change in job status.""" + status_counts = self._target_counts[job.target] + status_counts[old_status] -= 1 + if old_status == JobStatus.RUNNING: + self._running[job.target].remove(job.full_name) + status_counts[new_status] += 1 + if new_status == JobStatus.RUNNING: + self._running[job.target].append(job.full_name) + if not old_status.is_terminal and new_status.is_terminal: + self._num_finished[job.target] += 1 + + if self.target_is_done(job.target) and not self.updates_every_event: + # Even if we have a configured print interval, we should record + # the time at which the target finished to capture accurate end timing. + self._finish_time[job.target] = time.monotonic() + elif self.updates_every_event: + self.update_all_targets() + + def _get_header(self) -> str: + """Get the header string to use for printing the status.""" + return ( + ", ".join( + f"{status.shorthand}: {status.name.lower().rjust(self._field_width)}" + for status in JobStatus + ) + + ", T: total" + ) + + def _get_target_row(self, target: str) -> str: + """Get a formatted string with the fields for a given target row.""" + fields = [] + for status in JobStatus: + count = self._target_counts[target][status] + value = f"{count:0{self._field_width}d}" + fields.append(f"{status.shorthand}: {value.rjust(len(status.name))}") + total = f"{self._totals[target]:0{self._field_width}d}" + fields.append(f"T: {total.rjust(5)}") + return ", ".join(fields) + + @abstractmethod + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + + @abstractmethod + def _init_target(self, target: str, _msg: str) -> None: + """Initialize the status bar for a target.""" + + @abstractmethod + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + + def pause(self) -> None: + """Toggle whether the status printer is paused. May make target finish times inaccurate.""" + self._paused = not self._paused + if not self._paused and self.updates_every_event: + self.update_all_targets() + + def stop(self) -> None: + """Stop the status header/target printing (but keep the printer context).""" + if self._task: + self._task.cancel() + if self._paused: + self._paused = False + self.update_all_targets(including_unstarted=True) + + def exit(self) -> None: # noqa: B027 + """Do cleanup activities before exiting.""" From db2d6546be034b286310bb64c70d9aae896ef9e4 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 04:31:02 +0000 Subject: [PATCH 07/15] refactor: reorganize time utilities Extract time-related utilities (the `hms` functionality of the `Timer` and the two timestamp formats from `fs.py`) into a new `time.py` utility module. The intention is to use the `hms` functionality inside the new async status printers and to eventually remove `timer.py` completely when the old scheduler is removed. Signed-off-by: Alex Jones --- src/dvsim/utils/__init__.py | 11 +++-------- src/dvsim/utils/fs.py | 11 +++-------- src/dvsim/utils/time.py | 20 ++++++++++++++++++++ src/dvsim/utils/timer.py | 8 +++----- 4 files changed, 29 insertions(+), 21 deletions(-) create mode 100644 src/dvsim/utils/time.py diff --git a/src/dvsim/utils/__init__.py b/src/dvsim/utils/__init__.py index 353b8b37..70c4a74d 100644 --- a/src/dvsim/utils/__init__.py +++ b/src/dvsim/utils/__init__.py @@ -5,16 +5,10 @@ """Utility functions common across dvsim.""" from dvsim.utils.check import check_bool, check_int -from dvsim.utils.fs import ( - TS_FORMAT, - TS_FORMAT_LONG, - clean_odirs, - mk_path, - mk_symlink, - rm_path, -) +from dvsim.utils.fs import clean_odirs, mk_path, mk_symlink, rm_path from dvsim.utils.hjson import parse_hjson from dvsim.utils.subprocess import run_cmd, run_cmd_with_timeout +from dvsim.utils.time import TS_FORMAT, TS_FORMAT_LONG, hms from dvsim.utils.timer import Timer from dvsim.utils.wildcards import ( find_and_substitute_wildcards, @@ -29,6 +23,7 @@ "check_int", "clean_odirs", "find_and_substitute_wildcards", + "hms", "mk_path", "mk_symlink", "parse_hjson", diff --git a/src/dvsim/utils/fs.py b/src/dvsim/utils/fs.py index 6b8d398a..93886e83 100644 --- a/src/dvsim/utils/fs.py +++ b/src/dvsim/utils/fs.py @@ -12,21 +12,16 @@ from pathlib import Path from dvsim.logging import log +from dvsim.utils.time import TS_FORMAT __all__ = ( - "TS_FORMAT", - "TS_FORMAT_LONG", + "clean_odirs", "mk_path", "mk_symlink", + "relative_to", "rm_path", ) -# Timestamp format when creating directory backups. -TS_FORMAT = "%Y%m%d_%H%M%S" - -# Timestamp format when generating reports. -TS_FORMAT_LONG = "%A %B %d %Y %H:%M:%S UTC" - def rm_path(path: Path, *, ignore_error: bool = False) -> None: """Remove the specified path if it exists. diff --git a/src/dvsim/utils/time.py b/src/dvsim/utils/time.py new file mode 100644 index 00000000..8723ec75 --- /dev/null +++ b/src/dvsim/utils/time.py @@ -0,0 +1,20 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Time-based utilities.""" + +# Timestamp format when creating directory backups. +TS_FORMAT = "%Y%m%d_%H%M%S" + +# Timestamp format when generating reports. +TS_FORMAT_LONG = "%A %B %d %Y %H:%M:%S UTC" + + +def hms(seconds: float) -> str: + """Render a duration (in seconds) in the hh:mm:ss format, rounded to the nearest second.""" + total = round(seconds) + hours, mins = divmod(total, 3600) + mins //= 60 + secs = total % 60 + return f"{hours:02d}:{mins:02d}:{secs:02d}" diff --git a/src/dvsim/utils/timer.py b/src/dvsim/utils/timer.py index 7db5fcfd..b0c55f08 100644 --- a/src/dvsim/utils/timer.py +++ b/src/dvsim/utils/timer.py @@ -4,6 +4,8 @@ import time +from dvsim.utils.time import hms + class Timer: """A timer to keep track of how long jobs have been running. @@ -26,11 +28,7 @@ def period(self): def hms(self) -> str: """Get the time since start in hh:mm:ss.""" - period = self.period() - secs = int(period + 0.5) - mins = secs // 60 - hours = mins // 60 - return f"{hours:02}:{mins % 60:02}:{secs % 60:02}" + return hms(self.period()) def check_time(self) -> bool: """Return true if we have passed next_print. From 6e587a19553894d2370b3fdd09cac313750215f7 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 12:25:44 +0000 Subject: [PATCH 08/15] feat: port `TtyStatusPrinter` to the new async interface This commit ports the `TtyStatusPrinter` to use the new async interface, extending the `StatusPrinter` interface introduced previously. The extended logic remains mostly the same as the original code, with some small refactors and tweaks for aesthetics. Signed-off-by: Alex Jones --- src/dvsim/scheduler/async_status_printer.py | 53 +++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/src/dvsim/scheduler/async_status_printer.py b/src/dvsim/scheduler/async_status_printer.py index 4309b6c5..aa39d176 100644 --- a/src/dvsim/scheduler/async_status_printer.py +++ b/src/dvsim/scheduler/async_status_printer.py @@ -12,6 +12,8 @@ from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus +from dvsim.logging import log +from dvsim.utils import hms class StatusPrinter(ABC): @@ -169,3 +171,54 @@ def stop(self) -> None: def exit(self) -> None: # noqa: B027 """Do cleanup activities before exiting.""" + + +class TtyStatusPrinter(StatusPrinter): + """Prints the current scheduler target status onto the console / TTY via logging.""" + + hms_fmt = "\x1b[1m{hms:9s}\x1b[0m" + header_fmt = hms_fmt + " [{target:^13s}]: [{msg}]" + status_fmt = header_fmt + " {percent:3.0f}% {running}" + + def __init__(self, jobs: Sequence[JobSpec]) -> None: + """Initialise the TtyStatusPrinter.""" + super().__init__(jobs) + + # Maintain a mapping of completed targets, so we only print the status one last + # time when it reaches 100% for a target. + self._target_done: dict[str, bool] = {} + + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + log.info(self.header_fmt.format(hms="", target="legend", msg=self._get_header())) + + def _init_target(self, target: str, _msg: str) -> None: + """Initialize the status bar for a target.""" + self._target_done[target] = False + + def _trunc_running(self, running: str, width: int = 30) -> str: + """Truncate the list of running items to a specified length.""" + if len(running) <= width: + return running + return running[: width - 3] + "..." + + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + if self._target_done[target]: + return + if self.target_is_done(target): + self._target_done[target] = True + + status_counts = self._target_counts[target] + done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) + percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 + + log.info( + self.status_fmt.format( + hms=hms(current_time - self._start_time), + target=target, + msg=self._get_target_row(target), + percent=percent, + running=self._trunc_running(", ".join(self._running[target])), + ), + ) From 8acddb79930175bcc32fa55779fabdd598c0645e Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 12:27:14 +0000 Subject: [PATCH 09/15] feat: port `EnlightenStatusPrinter` to the async interface This commit ports the original `EnlightenStatusPrinter` to the new async interface, extending the `StatusPrinter` abstract base class. The logic is mostly the same, with a few important caveats to note: - Since the new interface allows float (and hence sub-second) print intervals, a warning is introduced for intervals less than Enlighten's internal minimum delta value which will cause updates to be coalesced and potentially lost at points if not refreshed. We could also lower the `min_delta` to match the print interval, but experimentation shows that this introduces performance concerns and is best left as is. - Because of the above, logic is added to refresh (flush) the status bar when a target is done, to ensure it locks the final time correctly. - An occasional bug was encountered on using Ctrl-C to gracefully exit where Enlighten's `StatusBar.update` would hang indefinitely. This occurred during the terminal protocol used by the underlying Blessed library, which queried the terminal for its size and expected a response. Under heavy loads, particularly when a large number of processes are killed due to an exit signal, the terminal response might not be received, causing Blessed to hang on a `getch` call. To prevent this, the `pause` interface was introduced to the base `StatusPrinter` which is used for that purpose here. Signed-off-by: Alex Jones --- src/dvsim/scheduler/async_status_printer.py | 173 ++++++++++++++++++++ 1 file changed, 173 insertions(+) diff --git a/src/dvsim/scheduler/async_status_printer.py b/src/dvsim/scheduler/async_status_printer.py index aa39d176..737bcfb3 100644 --- a/src/dvsim/scheduler/async_status_printer.py +++ b/src/dvsim/scheduler/async_status_printer.py @@ -5,10 +5,17 @@ """Job status printing during a scheduled run.""" import asyncio +import os +import shutil +import sys +import termios import time from abc import ABC, abstractmethod from collections import defaultdict from collections.abc import Sequence +from typing import ClassVar + +import enlighten from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus @@ -222,3 +229,169 @@ def _update_target(self, current_time: float, target: str) -> None: running=self._trunc_running(", ".join(self._running[target])), ), ) + + +class EnlightenStatusPrinter(TtyStatusPrinter): + """Prints the current scheduler target status to the terminal using Enlighten. + + Enlighten is a third party progress bar tool. Documentation: + https://python-enlighten.readthedocs.io/en/stable/ + + Enlighten does not work if the output of DVSim is redirected to a file, for + example - it needs to be attached to a TTY enabled stream. + """ + + # Enlighten uses a min_delta of 0.1 by default, only updating every 0.1 seconds. + DEFAULT_MIN_DELTA = 0.1 + + status_fmt_no_running = TtyStatusPrinter.status_fmt.removesuffix("{running}") + status_fmt = "{status_msg}{running}" + + def __init__(self, jobs: Sequence[JobSpec]) -> None: + """Initialise the EnlightenStatusPrinter.""" + super().__init__(jobs) + if self._interval < self.DEFAULT_MIN_DELTA: + # TODO: maybe "debounce" the updates with a delayed async refresh task? + log.warning( + "Configured print interval %g will not accurately reflect for %s," + " which uses status bars with a configured min_delta of %g by default.", + self._interval, + self.__class__.__name__, + self.DEFAULT_MIN_DELTA, + ) + + # Initialize the enlighten manager and needed state + self._manager = enlighten.get_manager() + self._status_header: enlighten.StatusBar | None = None + self._status_bars: dict[str, enlighten.StatusBar] = {} + self._stopped = False + + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + self._status_header = self._manager.status_bar( + status_format=self.header_fmt, + hms="", + target="legend", + msg=self._get_header(), + ) + + def _init_target(self, target: str, msg: str) -> None: + """Initialize the status bar for a target.""" + super()._init_target(target, msg) + msg = self.status_fmt_no_running.format(hms=hms(0), target=target, msg=msg, percent=0.0) + self._status_bars[target] = self._manager.status_bar( + status_format=self.status_fmt, + status_msg=msg, + running="", + ) + + def _trunc_running_to_terminal(self, running: str, offset: int) -> str: + """Truncate the list of running items to match the max terminal width.""" + cols = shutil.get_terminal_size(fallback=(80, 24)).columns + width = max(30, cols - offset - 1) + return self._trunc_running(running, width) + + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + if self._target_done[target]: + return + + status_counts = self._target_counts[target] + done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) + percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 + + status_msg = self.status_fmt_no_running.format( + hms=hms(current_time - self._start_time), + target=target, + msg=self._get_target_row(target), + percent=percent, + ) + offset = len(status_msg) + running = self._trunc_running_to_terminal(", ".join(self._running[target]), offset) + + self._status_bars[target].update(status_msg=status_msg, running=running) + + if self.target_is_done(target): + self._target_done[target] = True + self._status_bars[target].refresh() + + def stop(self) -> None: + """Stop the status header/target printing (but keep the printer context).""" + super().stop() + if self._status_header is not None: + self._status_header.close() + for status_bar in self._status_bars.values(): + status_bar.close() + self._stopped = True + + def exit(self) -> None: + """Do cleanup activities before exiting (closing the manager context).""" + super().exit() + if not self._stopped: + self.stop() + self._manager.stop() + + # Sometimes, exiting via a signal (e.g. Ctrl-C) can cause Enlighten to leave the + # terminal in some non-raw mode. Just in case, restore regular operation. + self._restore_terminal() + + def _restore_terminal(self) -> None: + """Restore regular terminal operation after using Enlighten.""" + # Try open /dev/tty, otherwise fallback to sys.stdin + try: + fd = os.open("/dev/tty", os.O_RDWR) + close_fd = True + except (OSError, termios.error): + fd = sys.stdin.fileno() + close_fd = False + + # By default, the terminal should echo input (ECHO) and run in canonical mode (ICANON). + # We make this change after all buffered output is transmitted (TCSADRAIN). + try: + attrs = termios.tcgetattr(fd) + attrs[3] |= termios.ECHO | termios.ICANON + termios.tcsetattr(fd, termios.TCSADRAIN, attrs) + except termios.error: + log.debug("Unable to restore terminal attributes safely") + + if close_fd: + os.close(fd) + + +class StatusPrinterSingleton: + """Singleton for the status printer to uniquely refer to 1 instance at a time.""" + + _instance: ClassVar[StatusPrinter | None] = None + + @classmethod + def set(cls, instance: StatusPrinter | None) -> None: + """Set the stored status printer.""" + cls._instance = instance + + @classmethod + def get(cls) -> StatusPrinter | None: + """Get the stored status printer (if it exists).""" + return cls._instance + + +def create_status_printer(jobs: Sequence[JobSpec]) -> StatusPrinter: + """Create the global status printer. + + If stdout is a TTY, then return an instance of EnlightenStatusPrinter, else + return an instance of StatusPrinter. + """ + status_printer = StatusPrinterSingleton.get() + if status_printer is not None: + return status_printer + + status_printer = EnlightenStatusPrinter(jobs) if sys.stdout.isatty() else TtyStatusPrinter(jobs) + StatusPrinterSingleton.set(status_printer) + return status_printer + + +def get_status_printer() -> StatusPrinter: + """Retrieve the configured global status printer.""" + status_printer = StatusPrinterSingleton.get() + if status_printer is None: + raise RuntimeError("get_status_printer called without first creating the status printer") + return status_printer From 76663b6a1032d119d797c3ca9503a3a1fd834dd9 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 03:45:44 +0000 Subject: [PATCH 10/15] feat: introduce runtime backend registry Add a new global runtime backend registry to mirror and extend the existing launcher factory. This serves as a registry for built-in backends (and legacy launchers), and provides helper methods for registering custom backends/launchers to support plugin-like extension. When DVSim fully moves to use the new async scheduler, the launcher factory that this replaces will be removed. Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 35 ++++++++++ src/dvsim/runtime/registry.py | 117 ++++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 src/dvsim/runtime/registry.py diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index 42d3bb58..5e9f2bfb 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -43,6 +43,7 @@ from dvsim.launcher.sge import SgeLauncher from dvsim.launcher.slurm import SlurmLauncher from dvsim.logging import LOG_LEVELS, configure_logging, log +from dvsim.runtime.registry import BackendType, backend_registry from dvsim.scheduler.async_status_printer import StatusPrinter from dvsim.scheduler.status_printer import get_status_printer from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout @@ -840,6 +841,37 @@ def parse_args(argv: list[str] | None = None): return args +def set_backend_type(*, is_local: bool = False, fake: bool = False) -> None: + """Set the default backend type that will be used to launch jobs (unless overridden). + + The DVSIM_BACKEND/DVSIM_LAUNCHER environment variables are used to identify what + backend should be used by default, and is intended to be specific to the user's + work site and set externally before invoking DVSim. Selecting a local or fake backend + via the command line will override this. + """ + if is_local: + backend = "local" + elif fake: + backend = "fake" + else: + backend = os.environ.get("DVSIM_BACKEND") + + if backend is None: + # Fall back to the legacy launcher environment variable + backend = os.environ.get("DVSIM_LAUNCHER", "local") + + available_backends = backend_registry.available() + if backend not in available_backends: + log.error( + "Backend %s set using the DVSIM_BACKEND/DVSIM_LAUNCHER environment variables " + "does not exist. Using the local backend instead." + ) + backend = "local" + + # Configure the resolved backend type as the default backend + backend_registry.set_default(BackendType(backend)) + + def main(argv: list[str] | None = None) -> None: """DVSim CLI entry point.""" args = parse_args(argv) @@ -895,6 +927,9 @@ def main(argv: list[str] | None = None) -> None: FakeLauncher.max_parallel = args.max_parallel set_launcher_type(is_local=args.local, fake=args.fake) + # Configure the runtime backend. TODO: deprecate `set_launcher_type` above. + set_backend_type(is_local=args.local, fake=args.fake) + # Configure scheduler instrumentation set_instrumentation(InstrumentationFactory.create(args.instrumentation)) diff --git a/src/dvsim/runtime/registry.py b/src/dvsim/runtime/registry.py new file mode 100644 index 00000000..2b93c8f0 --- /dev/null +++ b/src/dvsim/runtime/registry.py @@ -0,0 +1,117 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Registry of different runtime backends. Built-in backends are registered by default.""" + +from collections.abc import Callable +from typing import Any, NewType, TypeAlias + +from dvsim.launcher.base import Launcher +from dvsim.launcher.fake import FakeLauncher +from dvsim.launcher.lsf import LsfLauncher +from dvsim.launcher.nc import NcLauncher +from dvsim.launcher.sge import SgeLauncher +from dvsim.launcher.slurm import SlurmLauncher +from dvsim.logging import log +from dvsim.runtime.backend import RuntimeBackend +from dvsim.runtime.legacy import LegacyLauncherAdapter +from dvsim.runtime.local import LocalRuntimeBackend + +BackendType = NewType("BackendType", str) + +BackendFactory: TypeAlias = Callable[..., RuntimeBackend] + + +class BackendRegistry: + """Registry mapping backend names to factories/constructors of runtime backends.""" + + def __init__(self) -> None: + """Construct a new runtime backend registry.""" + self._registry: dict[BackendType, BackendFactory] = {} + self._default: BackendType | None = None + + def register( + self, name: BackendType, factory: BackendFactory, *, is_default: bool = False + ) -> None: + """Register a new runtime backend (factory/constructor) under a given name.""" + if name in self._registry: + msg = f"Backend '{name}' is already registered" + raise ValueError(msg) + log.debug("New runtime backend registered: %s", name) + self._registry[name] = factory + if is_default: + self.set_default(name) + + def set_default(self, name: BackendType) -> None: + """Set the default runtime backend, which should be used unless specified otherwise.""" + log.debug("Configured default backend: %s", name) + self._default = name + + def get_default(self) -> BackendType | None: + """Get the configured default runtime backend type.""" + return self._default + + def get(self, name: BackendType | None = None) -> BackendFactory: + """Retrieve a backend factory by its registered name.""" + name = self._default if name is None else name + if name is None: + raise ValueError("No default backend configured or backend name given") + + try: + return self._registry[name] + except KeyError as e: + msg = f"Unknown backend '{name}'" + raise KeyError(msg) from e + + def create( + self, name: BackendType, *args: list[Any], **kwargs: dict[str, Any] + ) -> RuntimeBackend: + """Instantiate a runtime backend by its registered name.""" + factory = self.get(name) + return factory(*args, **kwargs) + + def available(self) -> list[str]: + """Return names of all registered backends.""" + return sorted(self._registry.keys()) + + +# Default global registry +backend_registry = BackendRegistry() + + +def register_backend(name: BackendType, cls: type[RuntimeBackend]) -> None: + """Register a standard runtime backend.""" + backend_registry.register(name, cls) + + +# Helper for registering runtime backends for legacy launchers. +# Can be removed when all legacy launchers are migrated. +def register_legacy_launcher_backend(name: BackendType, launcher_cls: type[Launcher]) -> None: + """Register a legacy launcher class as a runtime backend by wrapping it in an adapter.""" + + def factory(*args: list[Any], **kwargs: dict[str, Any]) -> RuntimeBackend: + return LegacyLauncherAdapter(launcher_cls, *args, **kwargs) + + backend_registry.register(name, factory) + + +# Register built-in backends. TODO: migrate the legacy launchers to runtime backends. +register_backend(BackendType("local"), LocalRuntimeBackend) +register_legacy_launcher_backend(BackendType("fake"), FakeLauncher) +register_legacy_launcher_backend(BackendType("lsf"), LsfLauncher) +register_legacy_launcher_backend(BackendType("nc"), NcLauncher) +register_legacy_launcher_backend(BackendType("sge"), SgeLauncher) +register_legacy_launcher_backend(BackendType("slurm"), SlurmLauncher) + + +# TODO: Hack to support site-specific closed source custom launchers. These should be migrated to +# use the registry / a plugin system, and then the below should be dropped. +try: + from edacloudlauncher.EdaCloudLauncher import ( # type: ignore[report-missing-imports] + EdaCloudLauncher, + ) + + register_legacy_launcher_backend(BackendType("edacloud"), EdaCloudLauncher) +except ImportError: + pass From 401cbe011c0929cabdc7b7e3c9c533a3e2c1faa8 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 04:07:45 +0000 Subject: [PATCH 11/15] fix: check if coverage report exists in `CovReport` In the `CovReport.post_finish` deploy callback, check if the coverage report actually exists and skip / do nothing if not. This is needed as a temporary workaround for the fake launcher, which does not generate this kind of report, but does now call `post_finish` in its backend due to this being implemented on the base backend. Signed-off-by: Alex Jones --- src/dvsim/job/deploy.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 9bfd409d..b75ca63b 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -835,13 +835,14 @@ def callback(status: JobStatus) -> None: If the extraction fails, an appropriate exception is raised, which must be caught by the caller to mark the job as a failure. """ - if self.dry_run or status != JobStatus.PASSED: + cov_report_path = Path(self.cov_report_txt) + if self.dry_run or status != JobStatus.PASSED or not cov_report_path.exists(): return plugin = get_sim_tool_plugin(tool=self.sim_cfg.tool) results, self.cov_total = plugin.get_cov_summary_table( - cov_report_path=self.cov_report_txt, + cov_report_path=cov_report_path, ) for tup in zip(*results, strict=False): From e58fd15e00eb3d9899a0229a54eb8fb246499d47 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 04:10:31 +0000 Subject: [PATCH 12/15] feat: implement initial async scheduler integration Allow you to run DVSim using the new async scheduler by using a switch - either modify the code to make the conditional true, or set e.g. `EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER=1 dvsim ...` before your dvsim command to enable the new scheduler. Note: this currently does not have the status directory linking (JobSpec.links) hooked up, nor the status printing (the StatusPrinter logic, not the logs), or the instrumentation. It just performs the core scheduling functionality. Signed-off-by: Alex Jones --- src/dvsim/flow/base.py | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 847650f1..07a7f4e7 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -4,6 +4,7 @@ """Flow config base class.""" +import asyncio import json import os import pprint @@ -17,9 +18,11 @@ from dvsim import instrumentation from dvsim.flow.hjson import set_target_attribute -from dvsim.job.data import CompletedJobStatus +from dvsim.job.data import CompletedJobStatus, JobSpec from dvsim.launcher.factory import get_launcher_cls from dvsim.logging import log +from dvsim.runtime.registry import backend_registry +from dvsim.scheduler.async_core import Scheduler as AsyncScheduler from dvsim.scheduler.core import Scheduler from dvsim.utils import ( find_and_substitute_wildcards, @@ -33,6 +36,10 @@ __all__ = ("FlowCfg",) +# Temporary: set to 1 to enable experimental use of the async scheduler (not yet fully integrated) +EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER = os.environ.get("EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER", None) + + # Interface class for extensions. class FlowCfg(ABC): """Base class for the different flows supported by dvsim.py. @@ -442,12 +449,37 @@ def deploy_objects(self) -> Sequence[CompletedJobStatus]: ), ) + if EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER: + return asyncio.run(self.run_scheduler(jobs)) + return Scheduler( items=jobs, launcher_cls=get_launcher_cls(), interactive=self.interactive, ).run() + async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]: + """Run the scheduler with the given set of job specifications.""" + # Create the runtime backends. TODO: support multiple runtime backends at once + default_backend_factory = backend_registry.get() + default_backend = default_backend_factory() + + scheduler = AsyncScheduler( + jobs=jobs, + backends={default_backend.name: default_backend}, + default_backend=default_backend.name, + max_parallelism=self.args.max_parallel, + # TODO: introduce a better prioritization function that accounts for timeout + ) + + # Run the scheduler and cleanup + try: + results = await scheduler.run() + finally: + await default_backend.close() + + return results + @abstractmethod def gen_results(self, results: Sequence[CompletedJobStatus]) -> None: """Generate flow results. From 1d1a2cfb3b6addebb4a83f2dac9c6bf92fb7c83b Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 04:13:26 +0000 Subject: [PATCH 13/15] feat: connect instrumentation to the new async scheduler This is easily added outside of the scheduler itself via the new async scheduler's callbacks. Signed-off-by: Alex Jones --- src/dvsim/flow/base.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 07a7f4e7..44e14939 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -472,12 +472,29 @@ async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]: # TODO: introduce a better prioritization function that accounts for timeout ) + # Setup instrumentation + inst = instrumentation.get() + if inst is not None: + inst.start() + + # Add instrumentation hooks + scheduler.add_run_start_callback(inst.on_scheduler_start) + scheduler.add_run_end_callback(inst.on_scheduler_end) + scheduler.add_job_status_change_callback( + lambda spec, _old, new: inst.on_job_status_change(spec, new) + ) + # Run the scheduler and cleanup try: results = await scheduler.run() finally: await default_backend.close() + # Finalize instrumentation + if inst is not None: + inst.stop() + instrumentation.flush() + return results @abstractmethod From 4eb37234ab5f7c7b1c87e9d951c592de17f7ce00 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 04:17:28 +0000 Subject: [PATCH 14/15] feat: introduce `LogManager` and connect it to the async scheduler This replicates the scratch directory symlinking behaviour that is implemented independently by each of the launcher classes. This didn't make much sense to be on the launchers - the scheduler should be in control of the overall job status, not the launcher, which should just reports pass/fail/killed. It also doesn't make sense to live in the scheduler - ideally we want to keep the core logic separate. Instead, make these symlink directories via a new minimal `LogManager` which acts as an observer to scheduler job status changes and creates soft links as before. Signed-off-by: Alex Jones --- src/dvsim/flow/base.py | 7 +++++ src/dvsim/scheduler/log_manager.py | 50 ++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 src/dvsim/scheduler/log_manager.py diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 44e14939..b52c8635 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -24,6 +24,7 @@ from dvsim.runtime.registry import backend_registry from dvsim.scheduler.async_core import Scheduler as AsyncScheduler from dvsim.scheduler.core import Scheduler +from dvsim.scheduler.log_manager import LogManager from dvsim.utils import ( find_and_substitute_wildcards, rm_path, @@ -472,6 +473,12 @@ async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]: # TODO: introduce a better prioritization function that accounts for timeout ) + # Add log manager hooks + log_manager = LogManager() + scheduler.add_job_status_change_callback( + lambda spec, _old, new: log_manager.on_job_status_change(spec, new) + ) + # Setup instrumentation inst = instrumentation.get() if inst is not None: diff --git a/src/dvsim/scheduler/log_manager.py b/src/dvsim/scheduler/log_manager.py new file mode 100644 index 00000000..023eb788 --- /dev/null +++ b/src/dvsim/scheduler/log_manager.py @@ -0,0 +1,50 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""DVSim Scratch Job Log Manager.""" + +from pathlib import Path + +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus +from dvsim.utils import mk_symlink, rm_path + + +class LogManager: + """Observes job state changes in the scheduler and manages scratch output directory links.""" + + def __init__(self) -> None: + """Construct a LogManager.""" + # Mapping from job ID -> last symlinked status + self._links: dict[str, JobStatus] = {} + + def _link_job_output_directory(self, job: JobSpec, status: JobStatus) -> None: + """Symbolic (soft) link the job's output directory based on its status. + + The status directories (e.g. `passed/`, `failed/`) in the scratch area then provide a + quick mechanism for traversing the list of jobs that were executed. + """ + old_status = self._links.get(job.id, None) + if old_status == status: + return + + link_dest = Path(job.links[status], job.qual_name) + self._links[job.id] = status + + # If the symlink already exists (e.g. created by legacy launcher), just keep it. + # TODO: when all launchers are migrated this check can be removed. + if link_dest.exists() and link_dest.is_symlink(): + return + mk_symlink(path=job.odir, link=link_dest) + + # Delete the previous symlink if it exists + if old_status is not None: + old_link_dest = Path(job.links[old_status], job.qual_name) + rm_path(old_link_dest) + + def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: + """Notify the LogManager when a job status has changed.""" + # Only create linked output directories for defined links (terminal state or running). + if status in job.links: + self._link_job_output_directory(job, status) From 8de6347546a4bc71a7619ae733672660b28556f1 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 04:34:30 +0000 Subject: [PATCH 15/15] feat: integrate async status printers with the async scheduler This enables the new async status printers to be used with the new scheduler. The connections are via the callbacks defined on the scheduler, independent from the scheduler itself. Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 13 +++++++++++-- src/dvsim/flow/base.py | 10 ++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index 5e9f2bfb..ed880fa3 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -45,9 +45,13 @@ from dvsim.logging import LOG_LEVELS, configure_logging, log from dvsim.runtime.registry import BackendType, backend_registry from dvsim.scheduler.async_status_printer import StatusPrinter +from dvsim.scheduler.async_status_printer import get_status_printer as get_async_status_printer from dvsim.scheduler.status_printer import get_status_printer from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout +# Temporary: set to 1 to enable experimental use of the async scheduler (not yet fully integrated) +EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER = os.environ.get("EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER", None) + # The different categories that can be passed to the --list argument. _LIST_CATEGORIES = ["build_modes", "run_modes", "tests", "regressions"] @@ -972,8 +976,13 @@ def main(argv: list[str] | None = None) -> None: # Now that we have printed the results from the scheduler, we close the # status printer, to ensure the status remains relevant in the UI context # (for applicable status printers). - status_printer = get_status_printer(args.interactive) - status_printer.exit() + if EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER: + if not args.interactive: + status_printer = get_async_status_printer() + status_printer.exit() + else: + status_printer = get_status_printer(args.interactive) + status_printer.exit() else: log.error("Nothing to run!") diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index b52c8635..ce3a861b 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -23,6 +23,7 @@ from dvsim.logging import log from dvsim.runtime.registry import backend_registry from dvsim.scheduler.async_core import Scheduler as AsyncScheduler +from dvsim.scheduler.async_status_printer import create_status_printer from dvsim.scheduler.core import Scheduler from dvsim.scheduler.log_manager import LogManager from dvsim.utils import ( @@ -473,6 +474,15 @@ async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]: # TODO: introduce a better prioritization function that accounts for timeout ) + if not self.interactive: + status_printer = create_status_printer(jobs) + + # Add status printer hooks + scheduler.add_run_start_callback(status_printer.start) + scheduler.add_job_status_change_callback(status_printer.update_status) + scheduler.add_run_end_callback(status_printer.stop) + scheduler.add_kill_signal_callback(status_printer.pause) + # Add log manager hooks log_manager = LogManager() scheduler.add_job_status_change_callback(