Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions src/dvsim/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,15 @@
from dvsim.launcher.sge import SgeLauncher
from dvsim.launcher.slurm import SlurmLauncher
from dvsim.logging import LOG_LEVELS, configure_logging, log
from dvsim.runtime.registry import BackendType, backend_registry
from dvsim.scheduler.async_status_printer import StatusPrinter
from dvsim.scheduler.async_status_printer import get_status_printer as get_async_status_printer
from dvsim.scheduler.status_printer import get_status_printer
from dvsim.utils import TS_FORMAT, TS_FORMAT_LONG, Timer, rm_path, run_cmd_with_timeout

# Temporary: set to 1 to enable experimental use of the async scheduler (not yet fully integrated)
EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER = os.environ.get("EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER", None)

# The different categories that can be passed to the --list argument.
_LIST_CATEGORIES = ["build_modes", "run_modes", "tests", "regressions"]

Expand Down Expand Up @@ -839,6 +845,37 @@ def parse_args(argv: list[str] | None = None):
return args


def set_backend_type(*, is_local: bool = False, fake: bool = False) -> None:
"""Set the default backend type that will be used to launch jobs (unless overridden).

The DVSIM_BACKEND/DVSIM_LAUNCHER environment variables are used to identify what
backend should be used by default, and is intended to be specific to the user's
work site and set externally before invoking DVSim. Selecting a local or fake backend
via the command line will override this.
"""
if is_local:
backend = "local"
elif fake:
backend = "fake"
else:
backend = os.environ.get("DVSIM_BACKEND")

if backend is None:
# Fall back to the legacy launcher environment variable
backend = os.environ.get("DVSIM_LAUNCHER", "local")

available_backends = backend_registry.available()
if backend not in available_backends:
log.error(
"Backend %s set using the DVSIM_BACKEND/DVSIM_LAUNCHER environment variables "
"does not exist. Using the local backend instead."
)
backend = "local"

# Configure the resolved backend type as the default backend
backend_registry.set_default(BackendType(backend))


def main(argv: list[str] | None = None) -> None:
"""DVSim CLI entry point."""
args = parse_args(argv)
Expand Down Expand Up @@ -884,6 +921,7 @@ def main(argv: list[str] | None = None) -> None:

# Register the common deploy settings.
Timer.print_interval = args.print_interval
StatusPrinter.print_interval = args.print_interval
LocalLauncher.max_parallel = args.max_parallel
SlurmLauncher.max_parallel = args.max_parallel
SgeLauncher.max_parallel = args.max_parallel
Expand All @@ -893,6 +931,9 @@ def main(argv: list[str] | None = None) -> None:
FakeLauncher.max_parallel = args.max_parallel
set_launcher_type(is_local=args.local, fake=args.fake)

# Configure the runtime backend. TODO: deprecate `set_launcher_type` above.
set_backend_type(is_local=args.local, fake=args.fake)

# Configure scheduler instrumentation
set_instrumentation(InstrumentationFactory.create(args.instrumentation))

Expand Down Expand Up @@ -935,8 +976,13 @@ def main(argv: list[str] | None = None) -> None:
# Now that we have printed the results from the scheduler, we close the
# status printer, to ensure the status remains relevant in the UI context
# (for applicable status printers).
status_printer = get_status_printer(args.interactive)
status_printer.exit()
if EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER:
if not args.interactive:
status_printer = get_async_status_printer()
status_printer.exit()
else:
status_printer = get_status_printer(args.interactive)
status_printer.exit()

else:
log.error("Nothing to run!")
Expand Down
68 changes: 67 additions & 1 deletion src/dvsim/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Flow config base class."""

import asyncio
import json
import os
import pprint
Expand All @@ -17,10 +18,14 @@

from dvsim import instrumentation
from dvsim.flow.hjson import set_target_attribute
from dvsim.job.data import CompletedJobStatus
from dvsim.job.data import CompletedJobStatus, JobSpec
from dvsim.launcher.factory import get_launcher_cls
from dvsim.logging import log
from dvsim.runtime.registry import backend_registry
from dvsim.scheduler.async_core import Scheduler as AsyncScheduler
from dvsim.scheduler.async_status_printer import create_status_printer
from dvsim.scheduler.core import Scheduler
from dvsim.scheduler.log_manager import LogManager
from dvsim.utils import (
find_and_substitute_wildcards,
rm_path,
Expand All @@ -33,6 +38,10 @@
__all__ = ("FlowCfg",)


# Temporary: set to 1 to enable experimental use of the async scheduler (not yet fully integrated)
EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER = os.environ.get("EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER", None)


# Interface class for extensions.
class FlowCfg(ABC):
"""Base class for the different flows supported by dvsim.py.
Expand Down Expand Up @@ -442,12 +451,69 @@ def deploy_objects(self) -> Sequence[CompletedJobStatus]:
),
)

if EXPERIMENTAL_ENABLE_ASYNC_SCHEDULER:
return asyncio.run(self.run_scheduler(jobs))

return Scheduler(
items=jobs,
launcher_cls=get_launcher_cls(),
interactive=self.interactive,
).run()

async def run_scheduler(self, jobs: list[JobSpec]) -> list[CompletedJobStatus]:
"""Run the scheduler with the given set of job specifications."""
# Create the runtime backends. TODO: support multiple runtime backends at once
default_backend_factory = backend_registry.get()
default_backend = default_backend_factory()

scheduler = AsyncScheduler(
jobs=jobs,
backends={default_backend.name: default_backend},
default_backend=default_backend.name,
max_parallelism=self.args.max_parallel,
# TODO: introduce a better prioritization function that accounts for timeout
)

if not self.interactive:
status_printer = create_status_printer(jobs)

# Add status printer hooks
scheduler.add_run_start_callback(status_printer.start)
scheduler.add_job_status_change_callback(status_printer.update_status)
scheduler.add_run_end_callback(status_printer.stop)
scheduler.add_kill_signal_callback(status_printer.pause)

# Add log manager hooks
log_manager = LogManager()
scheduler.add_job_status_change_callback(
lambda spec, _old, new: log_manager.on_job_status_change(spec, new)
)

# Setup instrumentation
inst = instrumentation.get()
if inst is not None:
inst.start()

# Add instrumentation hooks
scheduler.add_run_start_callback(inst.on_scheduler_start)
scheduler.add_run_end_callback(inst.on_scheduler_end)
scheduler.add_job_status_change_callback(
lambda spec, _old, new: inst.on_job_status_change(spec, new)
)

# Run the scheduler and cleanup
try:
results = await scheduler.run()
finally:
await default_backend.close()

# Finalize instrumentation
if inst is not None:
inst.stop()
instrumentation.flush()

return results

@abstractmethod
def gen_results(self, results: Sequence[CompletedJobStatus]) -> None:
"""Generate flow results.
Expand Down
2 changes: 2 additions & 0 deletions src/dvsim/instrumentation/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class MetadataJobFragment(JobFragment):
job_type: str
target: str
tool: str
backend: str | None
dependencies: list[str]
status: str

Expand Down Expand Up @@ -61,6 +62,7 @@ def build_report_fragments(self) -> InstrumentationFragments | None:
spec.job_type,
spec.target,
spec.tool.name,
spec.backend,
spec.dependencies,
status_str,
)
Expand Down
2 changes: 1 addition & 1 deletion src/dvsim/instrumentation/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None:
with self._lock:
running = job_id in self._running_jobs
started = running or job_id in self._finished_jobs
if not started and status != JobStatus.QUEUED:
if not started and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED):
self._running_jobs[job_id] = JobResourceAggregate(job)
running = True
if running and status.is_terminal:
Expand Down
2 changes: 1 addition & 1 deletion src/dvsim/instrumentation/timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def on_job_status_change(self, job: JobSpec, status: JobStatus) -> None:
job_info = TimingJobFragment(job)
self._jobs[job_id] = job_info

if job_info.start_time is None and status != JobStatus.QUEUED:
if job_info.start_time is None and status not in (JobStatus.SCHEDULED, JobStatus.QUEUED):
job_info.start_time = time.perf_counter()
if status.is_terminal:
job_info.end_time = time.perf_counter()
Expand Down
5 changes: 5 additions & 0 deletions src/dvsim/job/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class JobSpec(BaseModel):
target: str
"""run phase [build, run, ...]"""

backend: str | None
"""The runtime backend to execute this job with. If not provided (None), this
indicates that whatever is configured as the 'default' backend should be used.
"""

seed: int | None
"""Seed if there is one."""

Expand Down
8 changes: 6 additions & 2 deletions src/dvsim/job/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def get_job_spec(self) -> "JobSpec":
name=self.name,
job_type=self.__class__.__name__,
target=self.target,
# TODO: for now we always use the default configured backend, but it might be good
# to allow different jobs to run on different backends in the future?
backend=None,
seed=getattr(self, "seed", None),
full_name=self.full_name,
qual_name=self.qual_name,
Expand Down Expand Up @@ -832,13 +835,14 @@ def callback(status: JobStatus) -> None:
If the extraction fails, an appropriate exception is raised, which must
be caught by the caller to mark the job as a failure.
"""
if self.dry_run or status != JobStatus.PASSED:
cov_report_path = Path(self.cov_report_txt)
if self.dry_run or status != JobStatus.PASSED or not cov_report_path.exists():
return

plugin = get_sim_tool_plugin(tool=self.sim_cfg.tool)

results, self.cov_total = plugin.get_cov_summary_table(
cov_report_path=self.cov_report_txt,
cov_report_path=cov_report_path,
)

for tup in zip(*results, strict=False):
Expand Down
13 changes: 8 additions & 5 deletions src/dvsim/job/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
class JobStatus(Enum):
"""Status of a Job."""

QUEUED = auto()
RUNNING = auto()
PASSED = auto()
FAILED = auto()
KILLED = auto()
# SCHEDULED is currently unused in the old sync scheduler, there `SCHEDULED` and `QUEUED`
# are combined under `QUEUED`. It is used only in the new async scheduler.
SCHEDULED = auto() # Waiting for dependencies
QUEUED = auto() # Dependencies satisfied, waiting to be dispatched
RUNNING = auto() # Dispatched to a backend and actively executing
PASSED = auto() # Completed successfully
FAILED = auto() # Completed with failure
KILLED = auto() # Forcibly terminated or never executed

@property
def shorthand(self) -> str:
Expand Down
Loading
Loading