From 6521ec4c4c142ec3cb96bda753104789b3a0f07a Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 01:40:58 +0000 Subject: [PATCH 1/5] feat: introduce a new `SCHEDULED` job status See the explanatory comments added to JobStatus. The intention is that the new async scheduler will distinguish between jobs that are blocked due to unfinished dependencies (`SCHEDULED`), and those that are pending because there is no availability to run them, despite their dependencies being fulfilled (`QUEUED`). This new state is currently unused. Also add a short test to prevent potential future bugs from status shorthand name collisions. Signed-off-by: Alex Jones --- src/dvsim/instrumentation/resources.py | 2 +- src/dvsim/instrumentation/timing.py | 2 +- src/dvsim/job/status.py | 13 ++++++++----- tests/job/test_status.py | 19 +++++++++++++++++++ 4 files changed, 29 insertions(+), 7 deletions(-) create mode 100644 tests/job/test_status.py diff --git a/src/dvsim/instrumentation/resources.py b/src/dvsim/instrumentation/resources.py index d02ff82c..8b9505cb 100644 --- a/src/dvsim/instrumentation/resources.py +++ b/src/dvsim/instrumentation/resources.py @@ -227,7 +227,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: with self._lock: running = job_id in self._running_jobs started = running or job_id in self._finished_jobs - if not started and status != JobStatus.QUEUED: + if not started and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): self._running_jobs[job_id] = JobResourceAggregate(job) running = True if running and status.is_terminal: diff --git a/src/dvsim/instrumentation/timing.py b/src/dvsim/instrumentation/timing.py index d0c1192b..9766c34a 100644 --- a/src/dvsim/instrumentation/timing.py +++ b/src/dvsim/instrumentation/timing.py @@ -99,7 +99,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None: job_info = TimingJobFragment(job) self._jobs[job_id] = job_info - if job_info.start_time is None and status != JobStatus.QUEUED: + if job_info.start_time is None and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED): job_info.start_time = time.perf_counter() if status.is_terminal: job_info.end_time = time.perf_counter() diff --git a/src/dvsim/job/status.py b/src/dvsim/job/status.py index e409a155..7076caae 100644 --- a/src/dvsim/job/status.py +++ b/src/dvsim/job/status.py @@ -12,11 +12,14 @@ class JobStatus(Enum): """Status of a Job.""" - QUEUED = auto() - RUNNING = auto() - PASSED = auto() - FAILED = auto() - KILLED = auto() + # SCHEDULED is currently unused in the old sync scheduler, there `SCHEDULED` and `QUEUED` + # are combined under `QUEUED`. It is intended to be used in the new async scheduler. + SCHEDULED = auto() # Waiting for dependencies + QUEUED = auto() # Dependencies satisfied, waiting to be dispatched + RUNNING = auto() # Dispatched to a backend and actively executing + PASSED = auto() # Completed successfully + FAILED = auto() # Completed with failure + KILLED = auto() # Forcibly terminated or never executed @property def shorthand(self) -> str: diff --git a/tests/job/test_status.py b/tests/job/test_status.py new file mode 100644 index 00000000..16ff28b2 --- /dev/null +++ b/tests/job/test_status.py @@ -0,0 +1,19 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Test Job (scheduler) status modelling.""" + +from hamcrest import assert_that, equal_to + +from dvsim.job.status import JobStatus + + +class TestJobStatus: + """Test scheduler JobStatus models.""" + + @staticmethod + def test_unique_shorthands() -> None: + """Test that all scheduler job statuses have unique shorthand representations.""" + shorthands = [status.shorthand for status in JobStatus] + assert_that(len(set(shorthands)), equal_to(len(shorthands))) From 01f0162baaf0c87a5a2ace8d5fc2b9e9e0298bb9 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 12:17:47 +0000 Subject: [PATCH 2/5] feat: implement new async base `StatusPrinter` class Add the new async `StatusPrinter` abstract base class, intended to replace the original for use with the new async scheduler. The original will not removed until the scheduler has been switched. This is now an abstract base class, rather than an empty class used for interactive sessions - since the status printer will live outside the new scheduler, it becomes much easier to just _not connect_ any status printer hooks during interactive mode. Some notable changes and overhauls: - Status printing now runs entirely independently of the scheduler. If a print interval > 0 is configured, then the status printer now runs as a loop with async awaits such that the timing logic is entirely separate from the scheduler, maintained by cooperative multitasking. - As new functionality, if a print interval of 0 is configured, we instead activate in synchronous "event/update-driven mode" where every single status update is printed. This might be useful for e.g. the TTY printer where you may want to capture exact times of all updates. - As a result of observing the scheduler, the status printer maintains its own stateful tracking of job information. - Field alignment is calculated from the initial job information and data is appropriately justified to clean up the output tables. - The ability to pause the status bar is introduced to help (later) deal with issues in the EnlightenStatusBar, where its terminal interactivity can be broken and cause hangs under heavy load. - General refactoring: the status header and fields are no longer hardcoded and are instead derived from the JobStatus enum. Signed-off-by: Alex Jones --- src/dvsim/cli/run.py | 2 + src/dvsim/scheduler/async_status_printer.py | 171 ++++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 src/dvsim/scheduler/async_status_printer.py diff --git a/src/dvsim/cli/run.py b/src/dvsim/cli/run.py index c25c1dd8..42d3bb58 100644 --- a/src/dvsim/cli/run.py +++ b/src/dvsim/cli/run.py @@ -43,6 +43,7 @@ from dvsim.launcher.sge import SgeLauncher from dvsim.launcher.slurm import SlurmLauncher from dvsim.logging import LOG_LEVELS, configure_logging, log +from dvsim.scheduler.async_status_printer import StatusPrinter from dvsim.scheduler.status_printer import get_status_printer from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout @@ -884,6 +885,7 @@ def main(argv: list[str] | None = None) -> None: # Register the common deploy settings. Timer.print_interval = args.print_interval + StatusPrinter.print_interval = args.print_interval LocalLauncher.max_parallel = args.max_parallel SlurmLauncher.max_parallel = args.max_parallel SgeLauncher.max_parallel = args.max_parallel diff --git a/src/dvsim/scheduler/async_status_printer.py b/src/dvsim/scheduler/async_status_printer.py new file mode 100644 index 00000000..4309b6c5 --- /dev/null +++ b/src/dvsim/scheduler/async_status_printer.py @@ -0,0 +1,171 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Job status printing during a scheduled run.""" + +import asyncio +import time +from abc import ABC, abstractmethod +from collections import defaultdict +from collections.abc import Sequence + +from dvsim.job.data import JobSpec +from dvsim.job.status import JobStatus + + +class StatusPrinter(ABC): + """Status Printer abstract base class. + + Contains core functionality related to status printing - a print interval can be configured + to control how often the scheduler target statuses are printed, which is managed by an async + thread. Optionally, the print interval can be configured to 0 to run in an update-driven mode + where every single status update is printed. Regardless of the configured print interval, the + final job update for each target is printed immediately to reflect final target end timings. + """ + + # How often we print by default. Zero means we should print on every event change. + print_interval = 0 + + def __init__(self, jobs: Sequence[JobSpec], print_interval: int | None = None) -> None: + """Construct the base StatusPrinter.""" + # Mapping from target -> (Mapping from status -> count) + self._target_counts: dict[str, dict[JobStatus, int]] = defaultdict(lambda: defaultdict(int)) + # Mapping from target -> number of jobs + self._totals: dict[str, int] = defaultdict(int) + + for job in jobs: + self._target_counts[job.target][JobStatus.SCHEDULED] += 1 + self._totals[job.target] += 1 + + # The number of characters used to represent the largest field in the displayed table + self._field_width = max((len(str(total)) for total in self._totals.values()), default=0) + + # State tracking for the StatusPrinter + self._start_time: float = 0.0 + self._last_print: float = 0.0 + self._running: dict[str, list[str]] = defaultdict(list) + self._num_finished: dict[str, int] = defaultdict(int) + self._finish_time: dict[str, float] = {} + + # Async target status update handling + self._task: asyncio.Task | None = None + self._paused: bool = False + + self._interval = print_interval if print_interval is not None else self.print_interval + + @property + def updates_every_event(self) -> bool: + """If the configured print interval is 0, statuses are updated on every state change.""" + return self._interval <= 0 + + def start(self) -> None: + """Start printing the status of the scheduled jobs.""" + self._start_time = time.monotonic() + self._print_header() + for target in self._target_counts: + self._init_target(target, self._get_target_row(target)) + + # If we need an async task to manage the print interval, create one + if not self.updates_every_event: + self._task = asyncio.create_task(self._run()) + + async def _run(self) -> None: + """Run a timer in an async loop, printing the updated status at every interval.""" + next_tick = self._start_time + self._interval + self.update_all_targets(including_unstarted=True) + + while True: + now = time.monotonic() + sleep_time = max(0, next_tick - now) + await asyncio.sleep(sleep_time) + self.update_all_targets() + next_tick += self._interval + + def update_all_targets(self, *, including_unstarted: bool = False) -> None: + """Update the status bars of all targets.""" + if self._paused: + return + update_time = time.monotonic() + for target in self._target_counts: + # Only update targets that have started (some job status has changed) + if self.target_is_started(target) or including_unstarted: + target_update_time = self._finish_time.get(target, update_time) + self._update_target(target_update_time, target) + + def target_is_started(self, target: str) -> bool: + """Check whether a target has been started yet or not.""" + return bool(self._num_finished[target]) or bool(self._running[target]) + + def target_is_done(self, target: str) -> bool: + """Check whether a target is finished or not.""" + return self._num_finished[target] >= self._totals[target] + + def update_status(self, job: JobSpec, old_status: JobStatus, new_status: JobStatus) -> None: + """Update the status printer to reflect a change in job status.""" + status_counts = self._target_counts[job.target] + status_counts[old_status] -= 1 + if old_status == JobStatus.RUNNING: + self._running[job.target].remove(job.full_name) + status_counts[new_status] += 1 + if new_status == JobStatus.RUNNING: + self._running[job.target].append(job.full_name) + if not old_status.is_terminal and new_status.is_terminal: + self._num_finished[job.target] += 1 + + if self.target_is_done(job.target) and not self.updates_every_event: + # Even if we have a configured print interval, we should record + # the time at which the target finished to capture accurate end timing. + self._finish_time[job.target] = time.monotonic() + elif self.updates_every_event: + self.update_all_targets() + + def _get_header(self) -> str: + """Get the header string to use for printing the status.""" + return ( + ", ".join( + f"{status.shorthand}: {status.name.lower().rjust(self._field_width)}" + for status in JobStatus + ) + + ", T: total" + ) + + def _get_target_row(self, target: str) -> str: + """Get a formatted string with the fields for a given target row.""" + fields = [] + for status in JobStatus: + count = self._target_counts[target][status] + value = f"{count:0{self._field_width}d}" + fields.append(f"{status.shorthand}: {value.rjust(len(status.name))}") + total = f"{self._totals[target]:0{self._field_width}d}" + fields.append(f"T: {total.rjust(5)}") + return ", ".join(fields) + + @abstractmethod + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + + @abstractmethod + def _init_target(self, target: str, _msg: str) -> None: + """Initialize the status bar for a target.""" + + @abstractmethod + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + + def pause(self) -> None: + """Toggle whether the status printer is paused. May make target finish times inaccurate.""" + self._paused = not self._paused + if not self._paused and self.updates_every_event: + self.update_all_targets() + + def stop(self) -> None: + """Stop the status header/target printing (but keep the printer context).""" + if self._task: + self._task.cancel() + if self._paused: + self._paused = False + self.update_all_targets(including_unstarted=True) + + def exit(self) -> None: # noqa: B027 + """Do cleanup activities before exiting.""" From 7f962a7aa96c705904597729f94ac2813637c9f4 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 04:31:02 +0000 Subject: [PATCH 3/5] refactor: reorganize time utilities Extract time-related utilities (the `hms` functionality of the `Timer` and the two timestamp formats from `fs.py`) into a new `time.py` utility module. The intention is to use the `hms` functionality inside the new async status printers and to eventually remove `timer.py` completely when the old scheduler is removed. Signed-off-by: Alex Jones --- src/dvsim/utils/__init__.py | 11 +++-------- src/dvsim/utils/fs.py | 11 +++-------- src/dvsim/utils/time.py | 20 ++++++++++++++++++++ src/dvsim/utils/timer.py | 8 +++----- 4 files changed, 29 insertions(+), 21 deletions(-) create mode 100644 src/dvsim/utils/time.py diff --git a/src/dvsim/utils/__init__.py b/src/dvsim/utils/__init__.py index 353b8b37..70c4a74d 100644 --- a/src/dvsim/utils/__init__.py +++ b/src/dvsim/utils/__init__.py @@ -5,16 +5,10 @@ """Utility functions common across dvsim.""" from dvsim.utils.check import check_bool, check_int -from dvsim.utils.fs import ( - TS_FORMAT, - TS_FORMAT_LONG, - clean_odirs, - mk_path, - mk_symlink, - rm_path, -) +from dvsim.utils.fs import clean_odirs, mk_path, mk_symlink, rm_path from dvsim.utils.hjson import parse_hjson from dvsim.utils.subprocess import run_cmd, run_cmd_with_timeout +from dvsim.utils.time import TS_FORMAT, TS_FORMAT_LONG, hms from dvsim.utils.timer import Timer from dvsim.utils.wildcards import ( find_and_substitute_wildcards, @@ -29,6 +23,7 @@ "check_int", "clean_odirs", "find_and_substitute_wildcards", + "hms", "mk_path", "mk_symlink", "parse_hjson", diff --git a/src/dvsim/utils/fs.py b/src/dvsim/utils/fs.py index 6b8d398a..93886e83 100644 --- a/src/dvsim/utils/fs.py +++ b/src/dvsim/utils/fs.py @@ -12,21 +12,16 @@ from pathlib import Path from dvsim.logging import log +from dvsim.utils.time import TS_FORMAT __all__ = ( - "TS_FORMAT", - "TS_FORMAT_LONG", + "clean_odirs", "mk_path", "mk_symlink", + "relative_to", "rm_path", ) -# Timestamp format when creating directory backups. -TS_FORMAT = "%Y%m%d_%H%M%S" - -# Timestamp format when generating reports. -TS_FORMAT_LONG = "%A %B %d %Y %H:%M:%S UTC" - def rm_path(path: Path, *, ignore_error: bool = False) -> None: """Remove the specified path if it exists. diff --git a/src/dvsim/utils/time.py b/src/dvsim/utils/time.py new file mode 100644 index 00000000..8723ec75 --- /dev/null +++ b/src/dvsim/utils/time.py @@ -0,0 +1,20 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Time-based utilities.""" + +# Timestamp format when creating directory backups. +TS_FORMAT = "%Y%m%d_%H%M%S" + +# Timestamp format when generating reports. +TS_FORMAT_LONG = "%A %B %d %Y %H:%M:%S UTC" + + +def hms(seconds: float) -> str: + """Render a duration (in seconds) in the hh:mm:ss format, rounded to the nearest second.""" + total = round(seconds) + hours, mins = divmod(total, 3600) + mins //= 60 + secs = total % 60 + return f"{hours:02d}:{mins:02d}:{secs:02d}" diff --git a/src/dvsim/utils/timer.py b/src/dvsim/utils/timer.py index 7db5fcfd..b0c55f08 100644 --- a/src/dvsim/utils/timer.py +++ b/src/dvsim/utils/timer.py @@ -4,6 +4,8 @@ import time +from dvsim.utils.time import hms + class Timer: """A timer to keep track of how long jobs have been running. @@ -26,11 +28,7 @@ def period(self): def hms(self) -> str: """Get the time since start in hh:mm:ss.""" - period = self.period() - secs = int(period + 0.5) - mins = secs // 60 - hours = mins // 60 - return f"{hours:02}:{mins % 60:02}:{secs % 60:02}" + return hms(self.period()) def check_time(self) -> bool: """Return true if we have passed next_print. From c4f472cfe3ff43755ef125994934e42b3dfb4dd3 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 12:25:44 +0000 Subject: [PATCH 4/5] feat: port `TtyStatusPrinter` to the new async interface This commit ports the `TtyStatusPrinter` to use the new async interface, extending the `StatusPrinter` interface introduced previously. The extended logic remains mostly the same as the original code, with some small refactors and tweaks for aesthetics. Signed-off-by: Alex Jones --- src/dvsim/scheduler/async_status_printer.py | 53 +++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/src/dvsim/scheduler/async_status_printer.py b/src/dvsim/scheduler/async_status_printer.py index 4309b6c5..aa39d176 100644 --- a/src/dvsim/scheduler/async_status_printer.py +++ b/src/dvsim/scheduler/async_status_printer.py @@ -12,6 +12,8 @@ from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus +from dvsim.logging import log +from dvsim.utils import hms class StatusPrinter(ABC): @@ -169,3 +171,54 @@ def stop(self) -> None: def exit(self) -> None: # noqa: B027 """Do cleanup activities before exiting.""" + + +class TtyStatusPrinter(StatusPrinter): + """Prints the current scheduler target status onto the console / TTY via logging.""" + + hms_fmt = "\x1b[1m{hms:9s}\x1b[0m" + header_fmt = hms_fmt + " [{target:^13s}]: [{msg}]" + status_fmt = header_fmt + " {percent:3.0f}% {running}" + + def __init__(self, jobs: Sequence[JobSpec]) -> None: + """Initialise the TtyStatusPrinter.""" + super().__init__(jobs) + + # Maintain a mapping of completed targets, so we only print the status one last + # time when it reaches 100% for a target. + self._target_done: dict[str, bool] = {} + + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + log.info(self.header_fmt.format(hms="", target="legend", msg=self._get_header())) + + def _init_target(self, target: str, _msg: str) -> None: + """Initialize the status bar for a target.""" + self._target_done[target] = False + + def _trunc_running(self, running: str, width: int = 30) -> str: + """Truncate the list of running items to a specified length.""" + if len(running) <= width: + return running + return running[: width - 3] + "..." + + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + if self._target_done[target]: + return + if self.target_is_done(target): + self._target_done[target] = True + + status_counts = self._target_counts[target] + done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) + percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 + + log.info( + self.status_fmt.format( + hms=hms(current_time - self._start_time), + target=target, + msg=self._get_target_row(target), + percent=percent, + running=self._trunc_running(", ".join(self._running[target])), + ), + ) From 3b981899713f5701770ad546f93a0a3b03f163e6 Mon Sep 17 00:00:00 2001 From: Alex Jones Date: Fri, 27 Mar 2026 12:27:14 +0000 Subject: [PATCH 5/5] feat: port `EnlightenStatusPrinter` to the async interface This commit ports the original `EnlightenStatusPrinter` to the new async interface, extending the `StatusPrinter` abstract base class. The logic is mostly the same, with a few important caveats to note: - Since the new interface allows float (and hence sub-second) print intervals, a warning is introduced for intervals less than Enlighten's internal minimum delta value which will cause updates to be coalesced and potentially lost at points if not refreshed. We could also lower the `min_delta` to match the print interval, but experimentation shows that this introduces performance concerns and is best left as is. - Because of the above, logic is added to refresh (flush) the status bar when a target is done, to ensure it locks the final time correctly. - An occasional bug was encountered on using Ctrl-C to gracefully exit where Enlighten's `StatusBar.update` would hang indefinitely. This occurred during the terminal protocol used by the underlying Blessed library, which queried the terminal for its size and expected a response. Under heavy loads, particularly when a large number of processes are killed due to an exit signal, the terminal response might not be received, causing Blessed to hang on a `getch` call. To prevent this, the `pause` interface was introduced to the base `StatusPrinter` which is used for that purpose here. Signed-off-by: Alex Jones --- src/dvsim/scheduler/async_status_printer.py | 173 ++++++++++++++++++++ 1 file changed, 173 insertions(+) diff --git a/src/dvsim/scheduler/async_status_printer.py b/src/dvsim/scheduler/async_status_printer.py index aa39d176..737bcfb3 100644 --- a/src/dvsim/scheduler/async_status_printer.py +++ b/src/dvsim/scheduler/async_status_printer.py @@ -5,10 +5,17 @@ """Job status printing during a scheduled run.""" import asyncio +import os +import shutil +import sys +import termios import time from abc import ABC, abstractmethod from collections import defaultdict from collections.abc import Sequence +from typing import ClassVar + +import enlighten from dvsim.job.data import JobSpec from dvsim.job.status import JobStatus @@ -222,3 +229,169 @@ def _update_target(self, current_time: float, target: str) -> None: running=self._trunc_running(", ".join(self._running[target])), ), ) + + +class EnlightenStatusPrinter(TtyStatusPrinter): + """Prints the current scheduler target status to the terminal using Enlighten. + + Enlighten is a third party progress bar tool. Documentation: + https://python-enlighten.readthedocs.io/en/stable/ + + Enlighten does not work if the output of DVSim is redirected to a file, for + example - it needs to be attached to a TTY enabled stream. + """ + + # Enlighten uses a min_delta of 0.1 by default, only updating every 0.1 seconds. + DEFAULT_MIN_DELTA = 0.1 + + status_fmt_no_running = TtyStatusPrinter.status_fmt.removesuffix("{running}") + status_fmt = "{status_msg}{running}" + + def __init__(self, jobs: Sequence[JobSpec]) -> None: + """Initialise the EnlightenStatusPrinter.""" + super().__init__(jobs) + if self._interval < self.DEFAULT_MIN_DELTA: + # TODO: maybe "debounce" the updates with a delayed async refresh task? + log.warning( + "Configured print interval %g will not accurately reflect for %s," + " which uses status bars with a configured min_delta of %g by default.", + self._interval, + self.__class__.__name__, + self.DEFAULT_MIN_DELTA, + ) + + # Initialize the enlighten manager and needed state + self._manager = enlighten.get_manager() + self._status_header: enlighten.StatusBar | None = None + self._status_bars: dict[str, enlighten.StatusBar] = {} + self._stopped = False + + def _print_header(self) -> None: + """Initialize / print the header, displaying the legend of job status meanings.""" + self._status_header = self._manager.status_bar( + status_format=self.header_fmt, + hms="", + target="legend", + msg=self._get_header(), + ) + + def _init_target(self, target: str, msg: str) -> None: + """Initialize the status bar for a target.""" + super()._init_target(target, msg) + msg = self.status_fmt_no_running.format(hms=hms(0), target=target, msg=msg, percent=0.0) + self._status_bars[target] = self._manager.status_bar( + status_format=self.status_fmt, + status_msg=msg, + running="", + ) + + def _trunc_running_to_terminal(self, running: str, offset: int) -> str: + """Truncate the list of running items to match the max terminal width.""" + cols = shutil.get_terminal_size(fallback=(80, 24)).columns + width = max(30, cols - offset - 1) + return self._trunc_running(running, width) + + def _update_target(self, current_time: float, target: str) -> None: + """Update the status bar for a given target.""" + if self._target_done[target]: + return + + status_counts = self._target_counts[target] + done_count = sum(status_counts[status] for status in JobStatus if status.is_terminal) + percent = (done_count / self._totals[target] * 100) if self._totals[target] else 100 + + status_msg = self.status_fmt_no_running.format( + hms=hms(current_time - self._start_time), + target=target, + msg=self._get_target_row(target), + percent=percent, + ) + offset = len(status_msg) + running = self._trunc_running_to_terminal(", ".join(self._running[target]), offset) + + self._status_bars[target].update(status_msg=status_msg, running=running) + + if self.target_is_done(target): + self._target_done[target] = True + self._status_bars[target].refresh() + + def stop(self) -> None: + """Stop the status header/target printing (but keep the printer context).""" + super().stop() + if self._status_header is not None: + self._status_header.close() + for status_bar in self._status_bars.values(): + status_bar.close() + self._stopped = True + + def exit(self) -> None: + """Do cleanup activities before exiting (closing the manager context).""" + super().exit() + if not self._stopped: + self.stop() + self._manager.stop() + + # Sometimes, exiting via a signal (e.g. Ctrl-C) can cause Enlighten to leave the + # terminal in some non-raw mode. Just in case, restore regular operation. + self._restore_terminal() + + def _restore_terminal(self) -> None: + """Restore regular terminal operation after using Enlighten.""" + # Try open /dev/tty, otherwise fallback to sys.stdin + try: + fd = os.open("/dev/tty", os.O_RDWR) + close_fd = True + except (OSError, termios.error): + fd = sys.stdin.fileno() + close_fd = False + + # By default, the terminal should echo input (ECHO) and run in canonical mode (ICANON). + # We make this change after all buffered output is transmitted (TCSADRAIN). + try: + attrs = termios.tcgetattr(fd) + attrs[3] |= termios.ECHO | termios.ICANON + termios.tcsetattr(fd, termios.TCSADRAIN, attrs) + except termios.error: + log.debug("Unable to restore terminal attributes safely") + + if close_fd: + os.close(fd) + + +class StatusPrinterSingleton: + """Singleton for the status printer to uniquely refer to 1 instance at a time.""" + + _instance: ClassVar[StatusPrinter | None] = None + + @classmethod + def set(cls, instance: StatusPrinter | None) -> None: + """Set the stored status printer.""" + cls._instance = instance + + @classmethod + def get(cls) -> StatusPrinter | None: + """Get the stored status printer (if it exists).""" + return cls._instance + + +def create_status_printer(jobs: Sequence[JobSpec]) -> StatusPrinter: + """Create the global status printer. + + If stdout is a TTY, then return an instance of EnlightenStatusPrinter, else + return an instance of StatusPrinter. + """ + status_printer = StatusPrinterSingleton.get() + if status_printer is not None: + return status_printer + + status_printer = EnlightenStatusPrinter(jobs) if sys.stdout.isatty() else TtyStatusPrinter(jobs) + StatusPrinterSingleton.set(status_printer) + return status_printer + + +def get_status_printer() -> StatusPrinter: + """Retrieve the configured global status printer.""" + status_printer = StatusPrinterSingleton.get() + if status_printer is None: + raise RuntimeError("get_status_printer called without first creating the status printer") + return status_printer