From 7df26a8c17f28a385354fe28d764d4a1a6b757d3 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 05:27:36 +0000 Subject: [PATCH 1/5] feat: switch to use the async scheduler interface This commit makes the full transition to use the new async scheduler in place of the old scheduler, instead of hiding it behind an experimental environment variable. Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 26 ++++++-------------------- src/dvsim/flow/base.py | 15 +++------------ src/dvsim/job/status.py | 2 -- 3 files changed, 9 insertions(+), 34 deletions(-) diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index ff42cae1..467777a8 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -45,9 +45,7 @@ from dvsim.logging import LOG_LEVELS, configure_logging, log from dvsim.runtime.backend import RuntimeBackend from dvsim.runtime.registry import BackendType, backend_registry -from dvsim.scheduler.async_status_printer import StatusPrinter -from dvsim.scheduler.async_status_printer import get_status_printer as get_async_status_printer -from dvsim.scheduler.status_printer import get_status_printer +from dvsim.scheduler.async_status_printer import StatusPrinter, get_status_printer from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout # The different categories that can be passed to the --list argument. @@ -785,10 +783,11 @@ def parse_args(argv: list[str] | None = None): dvg.add_argument( "--print-interval", "-pi", - type=int, + type=float, default=10, metavar="N", - help="Print status every N seconds.", + help="Print status every N seconds (default %(default)d). A zero value means that every" + " job status change will cause a print.", ) dvg.add_argument( @@ -817,17 +816,8 @@ def parse_args(argv: list[str] | None = None): help=("Use a fake launcher that generates random results"), ) - dvg.add_argument( - "--experimental-enable-async-scheduler", - action="store_true", - help="Enable experimental use of the async scheduler (not fully integrated).", - ) - args = parser.parse_args(argv) if argv else parser.parse_args() - if args.experimental_enable_async_scheduler: - log.warning("DVSim configured to use new experimental async scheduler.") - # Check conflicts # interactive and remote, r if args.interactive and args.remote: @@ -983,12 +973,8 @@ def main(argv: list[str] | None = None) -> None: # Now that we have printed the results from the scheduler, we close the # status printer, to ensure the status remains relevant in the UI context # (for applicable status printers). - if args.experimental_enable_async_scheduler: - if not args.interactive: - status_printer = get_async_status_printer() - status_printer.exit() - else: - status_printer = get_status_printer(args.interactive) + if not args.interactive: + status_printer = get_status_printer() status_printer.exit() else: diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 1167cec8..2d2c6ffe 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -20,13 +20,11 @@ from dvsim.flow.hjson import set_target_attribute from dvsim.job.data import CompletedJobStatus, JobSpec from dvsim.job.status import JobStatus -from dvsim.launcher.factory import get_launcher_cls from dvsim.logging import log from dvsim.runtime.fake import FakeRuntimeBackend from dvsim.runtime.registry import backend_registry -from dvsim.scheduler.async_core import Scheduler as AsyncScheduler +from dvsim.scheduler.async_core import Scheduler from dvsim.scheduler.async_status_printer import create_status_printer -from dvsim.scheduler.core import Scheduler from dvsim.scheduler.log_manager import LogManager from dvsim.utils import ( find_and_substitute_wildcards, @@ -465,14 +463,7 @@ def deploy_objects(self) -> Sequence[CompletedJobStatus]: ), ) - if self.args.experimental_enable_async_scheduler: - return asyncio.run(self.run_scheduler(jobs)) - - return Scheduler( - items=jobs, - launcher_cls=get_launcher_cls(), - interactive=self.interactive, - ).run() + return asyncio.run(self.run_scheduler(jobs)) async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]: """Run the scheduler with the given set of job specifications.""" @@ -485,7 +476,7 @@ async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]: max_timeout = max((job.timeout_mins for job in jobs if job.timeout_mins), default=0) - scheduler = AsyncScheduler( + scheduler = Scheduler( jobs=jobs, backends={default_backend.name: default_backend}, default_backend=default_backend.name, diff --git a/src/dvsim/job/status.py b/src/dvsim/job/status.py index 457f3bc2..efe71d2a 100644 --- a/src/dvsim/job/status.py +++ b/src/dvsim/job/status.py @@ -12,8 +12,6 @@ class JobStatus(Enum): """Status of a Job.""" - # SCHEDULED is currently unused in the old sync scheduler, there `SCHEDULED` and `QUEUED` - # are combined under `QUEUED`. It is used only in the new async scheduler. SCHEDULED = auto() # Waiting for dependencies QUEUED = auto() # Dependencies satisfied, waiting to be dispatched RUNNING = auto() # Dispatched to a backend and actively executing From 895d90dd0e773cb91bb6537acb15d9b44b3090e2 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 05:30:07 +0000 Subject: [PATCH 2/5] feat: remove the old scheduler and status printer These have now been replaced by equivalent / improved asynchronous interfaces. Signed-off-by: Alex Jones --- src/dvsim/scheduler/core.py | 763 -------------------------- src/dvsim/scheduler/status_printer.py | 292 ---------- 2 files changed, 1055 deletions(-) delete mode 100644 src/dvsim/scheduler/core.py delete mode 100644 src/dvsim/scheduler/status_printer.py diff --git a/src/dvsim/scheduler/core.py b/src/dvsim/scheduler/core.py deleted file mode 100644 index b063c7fb..00000000 --- a/src/dvsim/scheduler/core.py +++ /dev/null @@ -1,763 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -"""Job scheduler.""" - -import contextlib -import os -import selectors -from collections.abc import ( - Callable, - Mapping, - MutableMapping, - MutableSequence, - MutableSet, - Sequence, -) -from itertools import dropwhile -from signal import SIGINT, SIGTERM, signal -from types import FrameType -from typing import TYPE_CHECKING, Any - -from dvsim import instrumentation -from dvsim.instrumentation import NoOpInstrumentation -from dvsim.job.data import CompletedJobStatus, JobSpec, JobStatusInfo -from dvsim.job.status import JobStatus -from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError -from dvsim.logging import log -from dvsim.scheduler.status_printer import get_status_printer -from dvsim.utils.timer import Timer - -if TYPE_CHECKING: - from dvsim.flow.base import FlowCfg - - -def total_sub_items( - d: Mapping[str, Sequence[str]] | Mapping["FlowCfg", Sequence[JobSpec]], -) -> int: - """Return the total number of sub items in a mapping. - - Given a dict whose key values are lists, return sum of lengths of - these lists. - """ - return sum(len(v) for v in d.values()) - - -def get_next_item(arr: Sequence, index: int) -> tuple[Any, int]: - """Perpetually get an item from a list. - - Returns the next item on the list by advancing the index by 1. If the index - is already the last item on the list, it loops back to the start, thus - implementing a circular list. - - Args: - arr: subscriptable list. - index: index of the last item returned. - - Returns: - (item, index) if successful. - - Raises: - IndexError if arr is empty. - - """ - index += 1 - try: - item = arr[index] - except IndexError: - index = 0 - try: - item = arr[index] - except IndexError: - msg = "List is empty!" - raise IndexError(msg) from None - - return item, index - - -class Scheduler: - """An object that runs one or more jobs from JobSpec items.""" - - def __init__( - self, - items: Sequence[JobSpec], - launcher_cls: type[Launcher], - *, - interactive: bool = False, - ) -> None: - """Initialise a job scheduler. - - Args: - items: sequence of jobs to deploy. - launcher_cls: Launcher class to use to deploy the jobs. - interactive: launch the tools in interactive mode. - - """ - # Start any instrumentation - inst = instrumentation.get() - self._instrumentation = NoOpInstrumentation() if inst is None else inst - self._instrumentation.start() - - self._jobs: Mapping[str, JobSpec] = {i.full_name: i for i in items} - - # 'scheduled[target][cfg]' is a list of JobSpec object names for the chosen - # target and cfg. As items in _scheduled are ready to be run (once - # their dependencies pass), they are moved to the _queued list, where - # they wait until slots are available for them to be dispatched. - # When all items (in all cfgs) of a target are done, it is removed from - # this dictionary. - self._scheduled: MutableMapping[str, MutableMapping[str, MutableSequence[str]]] = {} - self.add_to_scheduled(jobs=self._jobs) - - # Print status periodically using an external status printer. - self._status_printer = get_status_printer(interactive) - self._status_printer.print_header() - - # Sets of items, split up by their current state. The sets are - # disjoint and their union equals the keys of self.item_status. - # _queued is a list so that we dispatch things in order (relevant - # for things like tests where we have ordered things cleverly to - # try to see failures early). They are maintained for each target. - - # The list of available targets and the list of running items in each - # target are polled in a circular fashion, looping back to the start. - # This is done to allow us to poll a smaller subset of jobs rather than - # the entire regression. We keep rotating through our list of running - # items, picking up where we left off on the last poll. - self._targets: Sequence[str] = list(self._scheduled.keys()) - self._total: MutableMapping[str, int] = {} - - self._queued: MutableMapping[str, MutableSequence[str]] = {} - self._running: MutableMapping[str, MutableSequence[str]] = {} - - self._passed: MutableMapping[str, MutableSet[str]] = {} - self._failed: MutableMapping[str, MutableSet[str]] = {} - self._killed: MutableMapping[str, MutableSet[str]] = {} - - self._last_target_polled_idx = -1 - self._last_item_polled_idx = {} - - for target in self._scheduled: - self._queued[target] = [] - self._running[target] = [] - - self._passed[target] = set() - self._failed[target] = set() - self._killed[target] = set() - - self._total[target] = total_sub_items(self._scheduled[target]) - self._last_item_polled_idx[target] = -1 - - # Stuff for printing the status. - width = len(str(self._total[target])) - field_fmt = f"{{:0{width}d}}" - self._msg_fmt = ( - f"Q: {field_fmt}, R: {field_fmt}, P: {field_fmt}, " - f"F: {field_fmt}, K: {field_fmt}, T: {field_fmt}" - ) - msg = self._msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) - self._status_printer.init_target(target=target, msg=msg) - - # A map from the job names tracked by this class to their - # current status, corresponding to membership in the dicts - # above. This is not per-target. - self.job_status: MutableMapping[str, JobStatus] = {} - - # Create the launcher instance for all items. - self._launchers: Mapping[str, Launcher] = { - full_name: launcher_cls(job_spec) for full_name, job_spec in self._jobs.items() - } - - # The chosen launcher class. This allows us to access launcher - # variant-specific settings such as max parallel jobs & poll rate. - self._launcher_cls: type[Launcher] = launcher_cls - - def _handle_exit_signal(self, last_received_signal: int, handler: Callable) -> None: - """Handle a received exit (SIGINT/SIGTERM signal) in the main scheduler loop. - - On either signal, this will tell runners to quit and cancel future jobs. - On receiving a SIGINT specifically, this re-installs the old signal handler - such that subsequent SIGINT signals will kill the process (non-gracefully). - """ - log.info( - "Received signal %s. Exiting gracefully.", - last_received_signal, - ) - - if last_received_signal == SIGINT: - log.info( - "Send another to force immediate quit (but you may " - "need to manually kill child processes)", - ) - - # Restore old handler to catch a second SIGINT - signal(SIGINT, handler) - - self._kill() - - def run(self) -> Sequence[CompletedJobStatus]: - """Run all scheduled jobs and return the results. - - Returns the results (status) of all items dispatched for all - targets and cfgs. - """ - # Notify instrumentation that the scheduler started - self._instrumentation.on_scheduler_start() - timer = Timer() - - # On SIGTERM or SIGINT, tell the runner to quit. - # On a second SIGINT specifically, die. - sel = selectors.DefaultSelector() - signal_rfd, signal_wfd = os.pipe() - sel.register(signal_rfd, selectors.EVENT_READ) - last_received_signal: int | None = None - - def on_signal(signal_received: int, _: FrameType | None) -> None: - # To allow async-safe-signal logic where signals can be handled - # while sleeping, we use a selector to perform a blocking wait, - # and signal the event through a pipe. We then set a flag with - # the received signal. Like this, we can receive a signal - # at any point, and it can also interrupt the poll wait to take - # immediate effect. - nonlocal last_received_signal - last_received_signal = signal_received - os.write(signal_wfd, b"\x00") - - # Install the SIGINT and SIGTERM handlers before scheduling jobs. - old_handler = signal(SIGINT, on_signal) - signal(SIGTERM, on_signal) - - # Enqueue all items of the first target. - self._enqueue_successors(None) - - try: - while True: - if last_received_signal is not None: - self._handle_exit_signal(last_received_signal, old_handler) - last_received_signal = None - - hms = timer.hms() - changed = self._poll(hms) or timer.check_time() - self._dispatch(hms) - if changed and self._check_if_done(hms): - break - - # Wait between each poll, except we may be woken by a signal. - sel.select(timeout=self._launcher_cls.poll_freq) - - finally: - signal(SIGINT, old_handler) - - # Stop the status printer, but don't exit/close it yet, for reporting purposes. - # That will be done by the CLI upon exiting - self._status_printer.stop() - - # Finish instrumentation and generate the instrumentation report - self._instrumentation.on_scheduler_end() - self._instrumentation.stop() - instrumentation.flush() - - # We got to the end without anything exploding. Return the results. - results = [] - for name, status in self.job_status.items(): - launcher = self._launchers[name] - job_spec = self._jobs[name] - - fail_msg = None - if launcher.fail_msg is not None: - launcher_fail = launcher.fail_msg - lines = None if launcher_fail.line_number is None else [launcher_fail.line_number] - fail_msg = JobStatusInfo( - message=launcher_fail.message, lines=lines, context=launcher_fail.context - ) - - results.append( - CompletedJobStatus( - name=job_spec.name, - job_type=job_spec.job_type, - seed=job_spec.seed, - block=job_spec.block, - tool=job_spec.tool, - workspace_cfg=job_spec.workspace_cfg, - full_name=name, - qual_name=job_spec.qual_name, - target=job_spec.target, - log_path=job_spec.log_path, - job_runtime=launcher.job_runtime.with_unit("s").get()[0], - simulated_time=launcher.simulated_time.with_unit("us").get()[0], - status=status, - fail_msg=fail_msg, - ) - ) - - return results - - def add_to_scheduled(self, jobs: Mapping[str, JobSpec]) -> None: - """Add jobs to the schedule. - - Args: - jobs: the jobs to add to the schedule. - - """ - for full_name, job_spec in jobs.items(): - target_dict = self._scheduled.setdefault(job_spec.target, {}) - cfg_list = target_dict.setdefault(job_spec.block.name, []) - - if job_spec not in cfg_list: - cfg_list.append(full_name) - - def _unschedule_item(self, job_name: str) -> None: - """Remove deploy item from the schedule.""" - job = self._jobs[job_name] - target_dict = self._scheduled[job.target] - cfg_list = target_dict.get(job.block.name) - - if cfg_list is not None: - with contextlib.suppress(ValueError): - cfg_list.remove(job_name) - - # When all items in _scheduled[target][cfg] are finally removed, - # the cfg key is deleted. - if not cfg_list: - del target_dict[job.block.name] - - def _enqueue_successors(self, job_name: str | None = None) -> None: - """Move an item's successors from _scheduled to _queued. - - 'item' is the recently run job that has completed. If None, then we - move all available items in all available cfgs in _scheduled's first - target. If 'item' is specified, then we find its successors and move - them to _queued. - """ - for next_job_name in self._get_successors(job_name): - target = self._jobs[next_job_name].target - if next_job_name in self.job_status or next_job_name in self._queued[target]: - msg = f"Job {next_job_name} already scheduled" - raise RuntimeError(msg) - - self.job_status[next_job_name] = JobStatus.QUEUED - self._queued[target].append(next_job_name) - self._unschedule_item(next_job_name) - - def _cancel_successors(self, job_name: str) -> None: - """Cancel an item's successors. - - Recursively move them from _scheduled or _queued to _killed. - - Args: - job_name: job whose successors are to be canceled. - - """ - items = list(self._get_successors(job_name)) - while items: - next_item = items.pop() - self._cancel_item(next_item, cancel_successors=False) - items.extend(self._get_successors(next_item)) - - def _get_successor_target(self, job_name: str) -> str | None: - """Find the first target in the scheduled list that follows the target of a given job. - - Args: - job_name: name of the job (to find the successor target of). - - Returns: - the successor, or None if no such target exists or there is no successor. - - """ - job: JobSpec = self._jobs[job_name] - - if job.target not in self._scheduled: - msg = f"Scheduler does not contain target {job.target}" - raise KeyError(msg) - - # Find the first target that follows the target in the scheduled list. - target_iter = dropwhile(lambda x: x != job.target, self._scheduled) - next(target_iter, None) - return next(target_iter, None) - - def _get_successors(self, job_name: str | None = None) -> Sequence[str]: - """Find immediate successors of an item. - - We choose the target that follows the item's current target and find - the list of successors whose dependency list contains "job_name". If - "job_name" is None, we pick successors from all cfgs, else we pick - successors only from the cfg to which the item belongs. - - Args: - job_name: name of the job - - Returns: - list of the jobs successors, or an empty list if there are none. - - """ - if job_name is None: - target = next(iter(self._scheduled), None) - cfgs = set() if target is None else set(self._scheduled[target]) - else: - target = self._get_successor_target(job_name) - job: JobSpec = self._jobs[job_name] - cfgs = {job.block.name} - - if target is None: - return () - - # Find item's successors that can be enqueued. We assume here that - # only the immediately succeeding target can be enqueued at this - # time. - successors = [] - for cfg in cfgs: - for next_item in self._scheduled[target][cfg]: - if job_name is not None: - job = self._jobs[next_item] - if not job.dependencies: - raise RuntimeError( - "Job item exists but the next item's dependency list is empty?" - ) - if job_name not in job.dependencies: - continue - - if self._ok_to_enqueue(next_item): - successors.append(next_item) - - return successors - - def _ok_to_enqueue(self, job_name: str) -> bool: - """Check if all dependencies jobs are completed. - - Args: - job_name: name of job. - - Returns: - true if ALL dependencies of item are complete. - - """ - for dep in self._jobs[job_name].dependencies: - # Ignore dependencies that were not scheduled to run. - if dep not in self._jobs: - continue - - # Has the dep even been enqueued? - if dep not in self.job_status: - return False - - # Has the dep completed? - if not self.job_status[dep].is_terminal: - return False - - return True - - def _ok_to_run(self, job_name: str) -> bool: - """Check if a job is ready to start. - - The item's needs_all_dependencies_passing setting is used to figure - out whether we can run this item or not, based on its dependent jobs' - statuses. - - Args: - job_name: name of the job to check - - Returns: - true if the required dependencies have passed. - - """ - job: JobSpec = self._jobs[job_name] - # 'item' can run only if its dependencies have passed (their results - # should already show up in the item to status map). - for dep_name in job.dependencies: - # Ignore dependencies that were not scheduled to run. - if dep_name not in self._jobs: - continue - - dep_status = self.job_status[dep_name] - if not dep_status.is_terminal: - msg = f"Expected dependent job {dep_name} to be ended, not {dep_status.name}." - raise ValueError(msg) - - if job.needs_all_dependencies_passing: - if dep_status != JobStatus.PASSED: - return False - elif dep_status == JobStatus.PASSED: - return True - - return job.needs_all_dependencies_passing - - def _poll(self, hms: str) -> bool: - """Check for running items that have finished. - - Returns: - True if something changed. - - """ - max_poll = min( - self._launcher_cls.max_poll, - total_sub_items(self._running), - ) - - # If there are no jobs running, we are likely done (possibly because - # of a SIGINT). Since poll() was called anyway, signal that something - # has indeed changed. - if not max_poll: - return True - - changed = False - while max_poll: - target, self._last_target_polled_idx = get_next_item( - self._targets, - self._last_target_polled_idx, - ) - - while self._running[target] and max_poll: - max_poll -= 1 - job_name, self._last_item_polled_idx[target] = get_next_item( - self._running[target], - self._last_item_polled_idx[target], - ) - try: - status = self._launchers[job_name].poll() - except LauncherError as e: - log.error("Error when dispatching target: %s", str(e)) - status = JobStatus.KILLED - level = log.VERBOSE - - if status == JobStatus.RUNNING: - continue - - if status == JobStatus.PASSED: - self._passed[target].add(job_name) - self._instrumentation.on_job_status_change(self._jobs[job_name], status) - - elif status == JobStatus.FAILED: - self._failed[target].add(job_name) - self._instrumentation.on_job_status_change(self._jobs[job_name], status) - level = log.ERROR - - else: - # Killed, still Queued, or some error when dispatching. - self._killed[target].add(job_name) - self._instrumentation.on_job_status_change(self._jobs[job_name], status.KILLED) - level = log.ERROR - - self._running[target].pop(self._last_item_polled_idx[target]) - self._last_item_polled_idx[target] -= 1 - self.job_status[job_name] = status - - log.log( - level, - "[%s]: [%s]: [status] [%s: %s]", - hms, - target, - job_name, - status.shorthand, - ) - - # Enqueue item's successors regardless of its status. - # - # It may be possible that a failed item's successor may not - # need all of its dependents to pass (if it has other dependent - # jobs). Hence we enqueue all successors rather than canceling - # them right here. We leave it to _dispatch() to figure out - # whether an enqueued item can be run or not. - self._enqueue_successors(job_name) - changed = True - - return changed - - def _dispatch_job(self, hms: str, target: str, job_name: str) -> None: - """Dispatch the named queued job. - - Args: - hms: time as a string formatted in hh:mm:ss - target: the target to dispatch this job to - job_name: the name of the job to dispatch - - """ - try: - self._launchers[job_name].launch() - - except LauncherError: - log.exception("Error launching %s", job_name) - self._kill_item(job_name) - - except LauncherBusyError: - log.exception("Launcher busy") - - self._queued[target].append(job_name) - - log.verbose( - "[%s]: [%s]: [requeued]: %s", - hms, - target, - job_name, - ) - return - - self._running[target].append(job_name) - self.job_status[job_name] = JobStatus.RUNNING - self._instrumentation.on_job_status_change(self._jobs[job_name], JobStatus.RUNNING) - - def _dispatch(self, hms: str) -> None: - """Dispatch some queued items if possible. - - Args: - hms: time as a string formatted in hh:mm:ss - - """ - slots = self._launcher_cls.max_parallel - total_sub_items(self._running) - if slots <= 0: - return - - # Compute how many slots to allocate to each target based on their - # weights. - sum_weight = 0 - slots_filled = 0 - total_weight = sum( - self._jobs[self._queued[t][0]].weight for t in self._queued if self._queued[t] - ) - - for target in self._scheduled: - if not self._queued[target]: - continue - - # N slots are allocated to M targets each with W(m) weights with - # the formula: - # - # N(m) = N * W(m) / T, where, - # T is the sum total of all weights. - # - # This is however, problematic due to fractions. Even after - # rounding off to the nearest digit, slots may not be fully - # utilized (one extra left). An alternate approach that avoids this - # problem is as follows: - # - # N(m) = (N * S(W(m)) / T) - F(m), where, - # S(W(m)) is the running sum of weights upto current target m. - # F(m) is the running total of slots filled. - # - # The computed slots per target is nearly identical to the first - # solution, except that it prioritizes the slot allocation to - # targets that are earlier in the list such that in the end, all - # slots are fully consumed. - sum_weight += self._jobs[self._queued[target][0]].weight - target_slots = round((slots * sum_weight) / total_weight) - slots_filled - if target_slots <= 0: - continue - slots_filled += target_slots - - to_dispatch = [] - while self._queued[target] and target_slots > 0: - next_item = self._queued[target].pop(0) - if not self._ok_to_run(next_item): - self._cancel_item(next_item, cancel_successors=False) - self._enqueue_successors(next_item) - continue - - to_dispatch.append(next_item) - target_slots -= 1 - - if not to_dispatch: - continue - - log.verbose( - "[%s]: [%s]: [dispatch]:\n%s", - hms, - target, - ", ".join(job_name for job_name in to_dispatch), - ) - - for job_name in to_dispatch: - self._dispatch_job(hms, target, job_name) - - def _kill(self) -> None: - """Kill any running items and cancel any that are waiting.""" - # Cancel any waiting items. We take a copy of self._queued to avoid - # iterating over the set as we modify it. - for target in self._queued: - for item in list(self._queued[target]): - self._cancel_item(item) - - # Kill any running items. Again, take a copy of the set to avoid - # modifying it while iterating over it. - for target in self._running: - for item in list(self._running[target]): - self._kill_item(item) - - def _check_if_done(self, hms: str) -> bool: - """Check if we are done executing all jobs. - - Also, prints the status of currently running jobs. - """ - done = True - for target in self._scheduled: - done_cnt = sum( - [ - len(self._passed[target]), - len(self._failed[target]), - len(self._killed[target]), - ], - ) - done = done and (done_cnt == self._total[target]) - - # Skip if a target has not even begun executing. - if not (self._queued[target] or self._running[target] or done_cnt > 0): - continue - - perc = done_cnt / self._total[target] * 100 - - running = ", ".join( - [f"{job_name}" for job_name in self._running[target]], - ) - msg = self._msg_fmt.format( - len(self._queued[target]), - len(self._running[target]), - len(self._passed[target]), - len(self._failed[target]), - len(self._killed[target]), - self._total[target], - ) - self._status_printer.update_target( - target=target, - msg=msg, - hms=hms, - perc=perc, - running=running, - ) - return done - - def _cancel_item(self, job_name: str, *, cancel_successors: bool = True) -> None: - """Cancel an item and optionally all of its successors. - - Supplied item may be in _scheduled list or the _queued list. From - either, we move it straight to _killed. - - Args: - job_name: name of the job to cancel - cancel_successors: if set then cancel successors as well (True). - - """ - job = self._jobs[job_name] - self.job_status[job_name] = JobStatus.KILLED - self._killed[job.target].add(job_name) - self._instrumentation.on_job_status_change(job, JobStatus.KILLED) - if job_name in self._queued[job.target]: - self._queued[job.target].remove(job_name) - else: - self._unschedule_item(job_name) - - if cancel_successors: - self._cancel_successors(job_name) - - def _kill_item(self, job_name: str) -> None: - """Kill a running item and cancel all of its successors. - - Args: - job_name: name of the job to kill - - """ - job = self._jobs[job_name] - self._launchers[job_name].kill() - self.job_status[job_name] = JobStatus.KILLED - self._killed[job.target].add(job_name) - self._instrumentation.on_job_status_change(job, JobStatus.KILLED) - self._running[job.target].remove(job_name) - self._cancel_successors(job_name) diff --git a/src/dvsim/scheduler/status_printer.py b/src/dvsim/scheduler/status_printer.py deleted file mode 100644 index b20a707f..00000000 --- a/src/dvsim/scheduler/status_printer.py +++ /dev/null @@ -1,292 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -"""Job status printing during a scheduled run.""" - -import os -import shutil -import sys -import termios -from typing import ClassVar - -import enlighten - -from dvsim.logging import log - -DEFAULT_HEADER = "Q: queued, R: running, P: passed, F: failed, K: killed, T: total" -"""The default header to use for printing the status.""" - - -class StatusPrinter: - """Dummy Status Printer class for interactive mode. - - When interactive mode is set, dvsim does not print the status. By - instantiating this dummy class (printing nothing), outer interface stays - same. - """ - - def __init__(self) -> None: - """Initialise.""" - - def print_header(self) -> None: - """Initialize / print the header bar. - - The header bar contains an introductory message such as the legend of - what Q, D, ... mean. - """ - - def init_target(self, target: str, msg: str) -> None: - """Initialize the status bar for each target.""" - - def update_target( - self, - target: str, - hms: str, - msg: str, - perc: float, - running: str, - ) -> None: - """Periodically update the status bar for each target. - - Args: - hms: Elapsed time in hh:mm:ss. - target: The tool flow step. - msg: The completion status message (set externally). - perc: Percentage of completion. - running: What jobs are currently still running. - - """ - - def stop(self) -> None: - """Stop the status header/target printing (but keep the printer context).""" - - def exit(self) -> None: - """Do cleanup activities before exiting.""" - - -class TtyStatusPrinter(StatusPrinter): - """Abstraction for printing the current target status onto the console. - - Targets are ASIC tool flow steps such as build, run, cov etc. These steps - are sequenced by the Scheduler. There may be multiple jobs running in - parallel in each target. This class provides a mechanism to periodically - print the completion status of each target onto the terminal. Messages - printed by this class are rather static in nature - all the necessary - computations of how the jobs are progressing need to be handled externally. - - The following are the 'fields' accepted by this class: - """ - - # Print elapsed time in bold. - hms_fmt = "\x1b[1m{hms:9s}\x1b[0m" - header_fmt = hms_fmt + " [{target:^13s}]: [{msg}]" - status_fmt = header_fmt + " {perc:3.0f}% {running}" - - def __init__(self) -> None: - """Initialise printer.""" - super().__init__() - - # Once a target is complete, we no longer need to update it - we can - # just skip it. Maintaining this here provides a way to print the status - # one last time when it reaches 100%. It is much easier to do that here - # than in the Scheduler class. - self.target_done = {} - - def print_header(self) -> None: - """Initialize / print the header bar. - - The header bar contains an introductory message such as the legend of - what Q, D, ... mean. - """ - log.info(self.header_fmt.format(hms="", target="legend", msg=DEFAULT_HEADER)) - - def init_target(self, target: str, msg: str) -> None: - """Initialize the status bar for each target.""" - self.target_done[target] = False - - def _trunc_running(self, running: str, width: int = 30) -> str: - """Truncate the list of running items to a specified width.""" - if len(running) <= width: - return running - return running[: width - 3] + "..." - - def update_target( - self, - target: str, - hms: str, - msg: str, - perc: float, - running: str, - ) -> None: - """Periodically update the status bar for each target. - - Args: - hms: Elapsed time in hh:mm:ss. - target: The tool flow step. - msg: The completion status message (set externally). - perc: Percentage of completion. - running: What jobs are currently still running. - - """ - if self.target_done[target]: - return - - log.info( - self.status_fmt.format( - hms=hms, - target=target, - msg=msg, - perc=perc, - running=self._trunc_running(running), - ), - ) - if perc == 100: - self.target_done[target] = True - - -class EnlightenStatusPrinter(TtyStatusPrinter): - """Abstraction for printing status using Enlighten. - - Enlighten is a third party progress bar tool. Documentation: - https://python-enlighten.readthedocs.io/en/stable/ - - Though it offers very fancy progress bar visualization, we stick to a - simple status bar 'pinned' to the bottom of the screen for each target - that displays statically, a pre-prepared message. We avoid the progress bar - visualization since it requires enlighten to perform some computations the - Scheduler already does. It also helps keep the overhead to a minimum. - - Enlighten does not work if the output of dvsim is redirected to a file, for - example - it needs to be attached to a TTY enabled stream. - """ - - status_fmt_no_running = TtyStatusPrinter.status_fmt.removesuffix("{running}") - status_fmt = "{status_msg}{running}" - - def __init__(self) -> None: - super().__init__() - - # Initialize the status_bars for header and the targets . - self.manager = enlighten.get_manager() - self.status_header = None - self.status_target = {} - self._stopped = False - - def print_header(self) -> None: - self.status_header = self.manager.status_bar( - status_format=self.header_fmt, - hms="", - target="legend", - msg=DEFAULT_HEADER, - ) - - def init_target(self, target, msg) -> None: - super().init_target(target, msg) - status_msg = self.status_fmt_no_running.format(hms="", target=target, msg=msg, perc=0.0) - self.status_target[target] = self.manager.status_bar( - status_format=self.status_fmt, - status_msg=status_msg, - running="", - ) - - def _trunc_running_to_terminal(self, running: str, offset: int) -> str: - """Truncate the list of running items to match the max terminal width.""" - cols = shutil.get_terminal_size(fallback=(80, 24)).columns - width = max(30, cols - offset) - return self._trunc_running(running, width) - - def update_target(self, target, hms, msg, perc, running) -> None: - if self.target_done[target]: - return - - status_msg = self.status_fmt_no_running.format( - hms=hms, - target=target, - msg=msg, - perc=perc, - ) - - offset = len(status_msg) - running = self._trunc_running_to_terminal(running, offset) - - self.status_target[target].update(status_msg=status_msg, running=running) - if perc == 100: - self.target_done[target] = True - - def stop(self) -> None: - """Stop the status header/target printing (but keep the printer context).""" - if self.status_header is not None: - self.status_header.close() - for target in self.status_target: - self.status_target[target].close() - self._stopped = True - - def exit(self) -> None: - """Do cleanup activities before exiting (closing the manager context).""" - if not self._stopped: - self.stop() - self.manager.stop() - # Sometimes, exiting via a signal (e.g. Ctrl-C) can cause Enlighten to leave the - # terminal in some non-raw mode. Just in case, restore regular operation. - self._restore_terminal() - - def _restore_terminal(self) -> None: - """Restore regular terminal operation after using Enlighten.""" - # Try open /dev/tty, otherwise fallback to sys.stdin - try: - fd = os.open("/dev/tty", os.O_RDWR) - close_fd = True - except (OSError, termios.error): - fd = sys.stdin.fileno() - close_fd = False - - # By default, the terminal should echo input (ECHO) and run in canonical mode (ICANON). - # We make this change after all buffered output is transmitted (TCSADRAIN). - try: - attrs = termios.tcgetattr(fd) - control_flag = 3 - attrs[control_flag] |= termios.ECHO | termios.ICANON - termios.tcsetattr(fd, termios.TCSADRAIN, attrs) - except termios.error: - log.debug("Unable to restore terminal attributes safely") - - if close_fd: - os.close(fd) - - -class StatusPrinterSingleton: - """Singleton for the status printer to uniquely refer to 1 instance at a time.""" - - _instance: ClassVar[StatusPrinter | None] = None - - @classmethod - def set(cls, instance: StatusPrinter | None) -> None: - """Set the stored status printer.""" - cls._instance = instance - - @classmethod - def get(cls) -> StatusPrinter | None: - """Get the stored status printer (if it exists).""" - return cls._instance - - -def get_status_printer(interactive: bool) -> StatusPrinter: - """Get the global status printer. - - If stdout is a TTY, then return an instance of EnlightenStatusPrinter, else - return an instance of StatusPrinter. If the status printer has already been - created, then returns that instance, regardless of given arguments. - """ - status_printer = StatusPrinterSingleton.get() - if status_printer is not None: - return status_printer - - if interactive: - status_printer = StatusPrinter() - elif sys.stdout.isatty(): - status_printer = EnlightenStatusPrinter() - else: - status_printer = TtyStatusPrinter() - StatusPrinterSingleton.set(status_printer) - return status_printer From 76eedcb259fb6b92c98fba40bbf2438db14b1ea4 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 05:31:20 +0000 Subject: [PATCH 3/5] feat: async_core -> core & async_status_printer -> status_printer Now that the async scheduler and status printer have been made the defaults, and the old versions have been removed, rename the `async_core` and `async_status_printer` modules to just be `core` and `status_printer` as before, respectively. Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 2 +- src/dvsim/flow/base.py | 4 ++-- src/dvsim/scheduler/{async_core.py => core.py} | 0 .../scheduler/{async_status_printer.py => status_printer.py} | 0 tests/test_scheduler.py | 2 +- 5 files changed, 4 insertions(+), 4 deletions(-) rename src/dvsim/scheduler/{async_core.py => core.py} (100%) rename src/dvsim/scheduler/{async_status_printer.py => status_printer.py} (100%) diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index 467777a8..983f6c95 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -45,7 +45,7 @@ from dvsim.logging import LOG_LEVELS, configure_logging, log from dvsim.runtime.backend import RuntimeBackend from dvsim.runtime.registry import BackendType, backend_registry -from dvsim.scheduler.async_status_printer import StatusPrinter, get_status_printer +from dvsim.scheduler.status_printer import StatusPrinter, get_status_printer from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout # The different categories that can be passed to the --list argument. diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 2d2c6ffe..d5d4bb5d 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -23,9 +23,9 @@ from dvsim.logging import log from dvsim.runtime.fake import FakeRuntimeBackend from dvsim.runtime.registry import backend_registry -from dvsim.scheduler.async_core import Scheduler -from dvsim.scheduler.async_status_printer import create_status_printer +from dvsim.scheduler.core import Scheduler from dvsim.scheduler.log_manager import LogManager +from dvsim.scheduler.status_printer import create_status_printer from dvsim.utils import ( find_and_substitute_wildcards, rm_path, diff --git a/src/dvsim/scheduler/async_core.py b/src/dvsim/scheduler/core.py similarity index 100% rename from src/dvsim/scheduler/async_core.py rename to src/dvsim/scheduler/core.py diff --git a/src/dvsim/scheduler/async_status_printer.py b/src/dvsim/scheduler/status_printer.py similarity index 100% rename from src/dvsim/scheduler/async_status_printer.py rename to src/dvsim/scheduler/status_printer.py diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index a443f924..460fa495 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -24,7 +24,7 @@ from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError from dvsim.report.data import IPMeta, ToolMeta from dvsim.runtime.legacy import LegacyLauncherAdapter -from dvsim.scheduler.async_core import Scheduler +from dvsim.scheduler.core import Scheduler __all__ = () From a9037b87629bd1985595d45294c15b9172a2a177 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 05:33:47 +0000 Subject: [PATCH 4/5] feat: remove `Timer` utility This `Timer` class was only used by the scheduler and is not particularly useful, especially in async contexts. Deprecate and remove this util module. Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 3 +-- src/dvsim/utils/__init__.py | 2 -- src/dvsim/utils/timer.py | 54 ------------------------------------- 3 files changed, 1 insertion(+), 58 deletions(-) delete mode 100644 src/dvsim/utils/timer.py diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index 983f6c95..281a1b37 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -46,7 +46,7 @@ from dvsim.runtime.backend import RuntimeBackend from dvsim.runtime.registry import BackendType, backend_registry from dvsim.scheduler.status_printer import StatusPrinter, get_status_printer -from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout +from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, rm_path, run_cmd_with_timeout # The different categories that can be passed to the --list argument. _LIST_CATEGORIES = ["build_modes", "run_modes", "tests", "regressions"] @@ -916,7 +916,6 @@ def main(argv: list[str] | None = None) -> None: RunTest.fixed_seed = args.fixed_seed # Register the common deploy settings. - Timer.print_interval = args.print_interval StatusPrinter.print_interval = args.print_interval LocalLauncher.max_parallel = args.max_parallel SlurmLauncher.max_parallel = args.max_parallel diff --git a/src/dvsim/utils/__init__.py b/src/dvsim/utils/__init__.py index 092c6104..b893ecb2 100644 --- a/src/dvsim/utils/__init__.py +++ b/src/dvsim/utils/__init__.py @@ -9,7 +9,6 @@ from dvsim.utils.hjson import parse_hjson from dvsim.utils.subprocess import run_cmd, run_cmd_with_timeout from dvsim.utils.time import TS_FORMAT, TS_FORMAT_LONG, TS_HMS_FORMAT -from dvsim.utils.timer import Timer from dvsim.utils.wildcards import ( find_and_substitute_wildcards, subst_wildcards, @@ -19,7 +18,6 @@ "TS_FORMAT", "TS_FORMAT_LONG", "TS_HMS_FORMAT", - "Timer", "check_bool", "check_int", "clean_odirs", diff --git a/src/dvsim/utils/timer.py b/src/dvsim/utils/timer.py deleted file mode 100644 index b3b05892..00000000 --- a/src/dvsim/utils/timer.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -import time - -from dvsim.utils.time import TS_HMS_FORMAT - - -class Timer: - """A timer to keep track of how long jobs have been running. - - This has a notion of start time (the time when the object was constructed), - together with a time when the results should next be printed. - - """ - - print_interval = 5 - - def __init__(self) -> None: - self.start = time.monotonic() - self.next_print = self.start + Timer.print_interval - self.first_print = True - - def period(self): - """Return the float time in seconds since start.""" - return time.monotonic() - self.start - - def hms(self) -> str: - """Get the time since start in hh:mm:ss.""" - return time.strftime(TS_HMS_FORMAT, time.gmtime(self.period())) - - def check_time(self) -> bool: - """Return true if we have passed next_print. - - If so, increment next_print by print_interval unless the result would - be in the past, in which case set it to the current time plus - print_interval. - - """ - now = time.monotonic() - - if self.first_print: - self.first_print = False - return True - - if now < self.next_print: - return False - - self.next_print += Timer.print_interval - if self.next_print <= now: - self.next_print = now + Timer.print_interval - - return True From bd37201de1d6c48b3a4a3443f508686e0748e19b Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 05:36:42 +0000 Subject: [PATCH 5/5] feat: remove legacy LocalLauncher, FakeLauncher and Launcher factory Now that everything goes through the RuntimeBackend registry instead, there is no need for the Launcher factory to exist any more, as it has been superseded. Likewise, the LocalLauncher and FakeLauncher have been replaced by the LocalRuntimeBackend and FakeRuntimeBackend respectively - as such, these legacy launchers can also be removed. Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 8 +- src/dvsim/launcher/factory.py | 87 -------------- src/dvsim/launcher/fake.py | 96 --------------- src/dvsim/launcher/local.py | 213 ---------------------------------- 4 files changed, 1 insertion(+), 403 deletions(-) delete mode 100644 src/dvsim/launcher/factory.py delete mode 100644 src/dvsim/launcher/fake.py delete mode 100644 src/dvsim/launcher/local.py diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index 281a1b37..3dc3a53f 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -35,9 +35,6 @@ from dvsim.instrumentation import InstrumentationFactory, set_instrumentation from dvsim.job.deploy import RunTest from dvsim.launcher.base import Launcher -from dvsim.launcher.factory import set_launcher_type -from dvsim.launcher.fake import FakeLauncher -from dvsim.launcher.local import LocalLauncher from dvsim.launcher.lsf import LsfLauncher from dvsim.launcher.nc import NcLauncher from dvsim.launcher.sge import SgeLauncher @@ -917,17 +914,14 @@ def main(argv: list[str] | None = None) -> None: # Register the common deploy settings. StatusPrinter.print_interval = args.print_interval - LocalLauncher.max_parallel = args.max_parallel SlurmLauncher.max_parallel = args.max_parallel SgeLauncher.max_parallel = args.max_parallel LsfLauncher.max_parallel = args.max_parallel NcLauncher.max_parallel = args.max_parallel Launcher.max_odirs = args.max_odirs RuntimeBackend.max_output_dirs = args.max_odirs - FakeLauncher.max_parallel = args.max_parallel - set_launcher_type(is_local=args.local, fake=args.fake) - # Configure the runtime backend. TODO: deprecate `set_launcher_type` above. + # Configure the runtime backend. set_backend_type(is_local=args.local, fake=args.fake) # Configure scheduler instrumentation diff --git a/src/dvsim/launcher/factory.py b/src/dvsim/launcher/factory.py deleted file mode 100644 index 6d909702..00000000 --- a/src/dvsim/launcher/factory.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -import os - -from dvsim.launcher.base import Launcher -from dvsim.launcher.fake import FakeLauncher -from dvsim.launcher.local import LocalLauncher -from dvsim.launcher.lsf import LsfLauncher -from dvsim.launcher.nc import NcLauncher -from dvsim.launcher.sge import SgeLauncher -from dvsim.launcher.slurm import SlurmLauncher -from dvsim.logging import log - -try: - from edacloudlauncher.EdaCloudLauncher import EdaCloudLauncher - - EDACLOUD_LAUNCHER_EXISTS = True -except ImportError: - EDACLOUD_LAUNCHER_EXISTS = False - -# The chosen launcher class. -_LAUNCHER_CLS: type[Launcher] | None = None - - -def set_launcher_type(is_local: bool = False, fake: bool = False) -> None: - """Set the launcher type that will be used to launch the jobs. - - The env variable `DVSIM_LAUNCHER` is used to identify what launcher system - to use. This variable is specific to the user's work site. It is meant to - be set externally before invoking DVSim. Valid values are [local, lsf, - edacloud]. If --local arg is supplied then the local launcher takes - precedence. - """ - launcher = os.environ.get("DVSIM_LAUNCHER", "local") - if is_local: - launcher = "local" - if fake: - launcher = "fake" - Launcher.variant = launcher - - global _LAUNCHER_CLS - if launcher == "local": - _LAUNCHER_CLS = LocalLauncher - - elif launcher == "lsf": - _LAUNCHER_CLS = LsfLauncher - - elif launcher == "sge": - _LAUNCHER_CLS = SgeLauncher - - elif launcher == "nc": - _LAUNCHER_CLS = NcLauncher - - elif launcher == "slurm": - _LAUNCHER_CLS = SlurmLauncher - - elif launcher == "fake": - _LAUNCHER_CLS = FakeLauncher - - # These custom launchers are site specific. They may not be committed to - # the open source repo. - elif launcher == "edacloud" and EDACLOUD_LAUNCHER_EXISTS: - _LAUNCHER_CLS = EdaCloudLauncher - - else: - log.error( - f"Launcher {launcher} set using DVSIM_LAUNCHER env var does not " - "exist. Using local launcher instead.", - ) - _LAUNCHER_CLS = LocalLauncher - - -def get_launcher_cls() -> type[Launcher]: - """Returns the chosen launcher class.""" - assert _LAUNCHER_CLS is not None - return _LAUNCHER_CLS - - -def get_launcher(deploy): - """Returns an instance of a launcher. - - 'deploy' is an instance of the deploy class to with the launcher is paired. - """ - assert _LAUNCHER_CLS is not None - return _LAUNCHER_CLS(deploy) diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py deleted file mode 100644 index d64fcb86..00000000 --- a/src/dvsim/launcher/fake.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -"""Fake Launcher that returns random results.""" - -from random import choice -from typing import TYPE_CHECKING - -from dvsim.job.status import JobStatus -from dvsim.launcher.base import ErrorMessage, Launcher - -if TYPE_CHECKING: - from dvsim.job.data import JobSpec, WorkspaceConfig - - -__all__ = ("FakeLauncher",) - - -def _run_test_handler(job_spec: "JobSpec") -> JobStatus: - """Handle a RunTest deploy job.""" - return choice((JobStatus.PASSED, JobStatus.FAILED)) - - -def _cov_report_handler(job_spec: "JobSpec") -> JobStatus: - """Handle a CompileSim deploy job.""" - # TODO: this hack doesn't work any more and needs implementing by writing - # a file that can be parsed as if it's been generated by the tool. - # - # keys = [ - # "score", - # "line", - # "cond", - # "toggle", - # "fsm", - # "branch", - # "assert", - # "group", - # ] - # job_spec.cov_results_dict = {k: f"{random() * 100:.2f} %" for k in keys} - - return JobStatus.PASSED - - -_DEPLOY_HANDLER = { - "RunTest": _run_test_handler, - "CovReport": _cov_report_handler, -} - - -class FakeLauncher(Launcher): - """Launch jobs and return fake results.""" - - # Poll job's completion status every this many seconds - poll_freq = 0 - - def _do_launch(self) -> None: - """Do the launch.""" - - def poll(self) -> JobStatus: - """Check status of the running process.""" - deploy_cls = self.job_spec.job_type - if deploy_cls in _DEPLOY_HANDLER: - return _DEPLOY_HANDLER[deploy_cls](job_spec=self.job_spec) - - # Default result is Pass - return JobStatus.PASSED - - def kill(self) -> None: - """Kill the running process.""" - self._post_finish( - JobStatus.KILLED, - ErrorMessage(line_number=None, message="Job killed!", context=[]), - ) - - @staticmethod - def prepare_workspace(cfg: "WorkspaceConfig") -> None: - """Prepare the workspace based on the chosen launcher's needs. - - This is done once for the entire duration for the flow run. - - Args: - cfg: workspace configuration - - """ - - @staticmethod - def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: - """Prepare the workspace for a cfg. - - This is invoked once for each cfg. - - Args: - cfg: workspace configuration - - """ diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py deleted file mode 100644 index a022b3ed..00000000 --- a/src/dvsim/launcher/local.py +++ /dev/null @@ -1,213 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -"""Launcher implementation to run jobs as subprocesses on the local machine.""" - -import datetime -import os -import shlex -import subprocess -from typing import TYPE_CHECKING - -from dvsim.job.status import JobStatus -from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError - -if TYPE_CHECKING: - from dvsim.job.data import JobSpec - from dvsim.job.deploy import WorkspaceConfig - - -class LocalLauncher(Launcher): - """Implementation of Launcher to launch jobs in the user's local workstation.""" - - # Poll job's completion status every this many seconds - poll_freq = 0.025 - - def __init__(self, job_spec: "JobSpec") -> None: - """Initialize common class members.""" - super().__init__(job_spec) - - # Popen object when launching the job. - self._process = None - self._log_file = None - - def _do_launch(self) -> None: - # Update the shell's env vars with self.exports. Values in exports must - # replace the values in the shell's env vars if the keys match. - exports = os.environ.copy() - exports.update(self.job_spec.exports) - - # Clear the magic MAKEFLAGS variable from exports if necessary. This - # variable is used by recursive Make calls to pass variables from one - # level to the next. Here, self.cmd is a call to Make but it's - # logically a top-level invocation: we don't want to pollute the flow's - # Makefile with Make variables from any wrapper that called dvsim. - if "MAKEFLAGS" in exports: - del exports["MAKEFLAGS"] - - self._dump_env_vars(exports) - - try: - log_path = self.job_spec.log_path - self._log_file = log_path.open( - "w", - encoding="UTF-8", - errors="surrogateescape", - ) - self._log_file.write(f"[Executing]:\n{self.job_spec.cmd}\n\n") - self._log_file.flush() - - if not self.job_spec.interactive: - self.timeout_secs = self.job_spec.timeout_secs - - try: - self._process = subprocess.Popen( - shlex.split(self.job_spec.cmd), - bufsize=4096, - universal_newlines=True, - stdout=self._log_file, - stderr=self._log_file, - env=exports, - ) - except BlockingIOError as e: - msg = f"Failed to launch job: {e}" - raise LauncherBusyError(msg) from e - except subprocess.SubprocessError as e: - msg = f"IO Error: {e}\nSee {log_path}" - raise LauncherError(msg) from e - else: - # Interactive: Set RUN_INTERACTIVE to 1 - exports["RUN_INTERACTIVE"] = "1" - - # Interactive. stdin / stdout are transparent - # no timeout and blocking op as user controls the flow - self._process = subprocess.Popen( - shlex.split(self.job_spec.cmd), - stdin=None, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - # string mode - universal_newlines=True, - env=exports, - ) - - # stdout/stderr logs are tee'd to the log file via the pipe - if self._process.stdout is not None: - for line in self._process.stdout: - print(line, end="") # noqa: T201 - self._log_file.write(line) - self._log_file.flush() - - # Wait until the process exits - self._process.wait() - - except BlockingIOError as e: - msg = f"Failed to launch job: {e}" - raise LauncherBusyError(msg) from e - finally: - self._close_job_log_file() - - self._link_odir(JobStatus.RUNNING) - - def poll(self) -> JobStatus: - """Check status of the running process. - - This returns a job status. If RUNNING, the job is still running. - If PASSED, the job finished successfully. If FAILED, the job finished - with an error. If KILLED, it was killed. - - This function must only be called after running self.dispatch_cmd() and - must not be called again once it has returned PASSED or FAILED. - """ - if self._process is None: - msg = ( - "poll() was called either before calling launch() or after " - "ignoring a LauncherError from launch()." - ) - raise LauncherError(msg) - - elapsed_time = datetime.datetime.now() - self.start_time - self.job_runtime_secs = elapsed_time.total_seconds() - if self._process.poll() is None: - if self.timeout_secs and self.job_runtime_secs > self.timeout_secs: - self._kill() - timeout_mins = self.job_spec.timeout_mins - timeout_message = f"Job timed out after {timeout_mins} minutes" - self._post_finish( - JobStatus.KILLED, - ErrorMessage( - line_number=None, - message=timeout_message, - context=[timeout_message], - ), - ) - return JobStatus.KILLED - - return JobStatus.RUNNING - - self.exit_code = self._process.returncode - status, err_msg = self._check_status() - self._post_finish(status, err_msg) - - return self.status - - def _kill(self) -> None: - """Kill the running process. - - Try to kill the running process. Send SIGTERM first, wait a bit, - and then send SIGKILL if it didn't work. - """ - if self._process is None: - # process already dead or didn't start - return - - self._process.terminate() - try: - self._process.wait(timeout=2) - except subprocess.TimeoutExpired: - self._process.kill() - - def kill(self) -> None: - """Kill the running process. - - This must be called between dispatching and reaping the process (the - same window as poll()). - """ - self._kill() - self._post_finish( - JobStatus.KILLED, - ErrorMessage(line_number=None, message="Job killed!", context=[]), - ) - - def _post_finish(self, status: str, err_msg: ErrorMessage | None) -> None: - self._close_job_log_file() - self._process = None - super()._post_finish(status, err_msg) - - def _close_job_log_file(self) -> None: - """Close the file descriptors associated with the process.""" - if self._log_file: - self._log_file.close() - - @staticmethod - def prepare_workspace(cfg: "WorkspaceConfig") -> None: - """Prepare the workspace based on the chosen launcher's needs. - - This is done once for the entire duration for the flow run. - - Args: - cfg: workspace configuration - - """ - - @staticmethod - def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: - """Prepare the workspace for a cfg. - - This is invoked once for each cfg. - - Args: - cfg: workspace configuration - - """