Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 124 additions & 16 deletions src/dvsim/job/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar

from dvsim.flow.base import FlowCfg
from dvsim.job.data import JobSpec, WorkspaceConfig
from dvsim.job.status import JobStatus
from dvsim.job.time import JobTime
from dvsim.logging import log
from dvsim.report.data import IPMeta, ToolMeta
from dvsim.test import Test
from dvsim.tool.utils import get_sim_tool_plugin
from dvsim.utils import (
clean_odirs,
Expand All @@ -25,8 +27,8 @@
)

if TYPE_CHECKING:
from dvsim.flow.sim import SimCfg
from dvsim.modes import BuildMode
from dvsim.sim.flow import SimCfg


__all__ = (
Expand Down Expand Up @@ -59,7 +61,7 @@ def __str__(self) -> str:
"""Get a string representation of the deployment object."""
return pprint.pformat(self.__dict__) if log.isEnabledFor(log.VERBOSE) else self.full_name

def __init__(self, sim_cfg: "SimCfg") -> None:
def __init__(self, sim_cfg: "FlowCfg") -> None:
"""Initialise deployment object.

Args:
Expand Down Expand Up @@ -372,10 +374,31 @@ class CompileSim(Deploy):
cmds_list_vars: ClassVar = ["pre_build_cmds", "post_build_cmds"]
weight = 5

def __init__(self, build_mode, sim_cfg) -> None:
def __init__(self, build_mode: "BuildMode", sim_cfg: "SimCfg") -> None:
"""Initialise a Sim compile stage job deployment."""
self.build_mode_obj = build_mode
self.seed = sim_cfg.build_seed

# Register a copy of sim_cfg which is explicitly the SimCfg type
self._typed_sim_cfg: SimCfg = sim_cfg

# Declare (typed) variables for values that will be loaded in in
# super().__init__. This teaches a type checker about the existence of
# the fields.
self.proj_root: str = ""
self.sv_flist_gen_cmd: str = ""
self.sv_flist_gen_dir: str = ""
self.sv_flist_gen_opts: list[str] = []
self.pre_build_cmds: list[str] = []
self.build_cmd: str = ""
self.build_dir: str = ""
self.build_opts: list[str] = []
self.post_build_cmds: list[str] = []
self.build_fail_patterns: list[str] = []
self.build_pass_patterns: list[str] = []
self.build_timeout_mins: float | None = None
self.cov_db_dir: str = ""

super().__init__(sim_cfg)

# Needs to be after the wildcard expansion to log anything meaningful
Expand Down Expand Up @@ -439,7 +462,7 @@ def _set_attrs(self) -> None:
# 'build_mode' is used as a substitution variable in the HJson.
self.build_mode = self.name
self.job_name += f"_{self.build_mode}"
if self.sim_cfg.cov:
if self._typed_sim_cfg.cov:
self.output_dirs += [self.cov_db_dir]
self.pass_patterns = self.build_pass_patterns
self.fail_patterns = self.build_fail_patterns
Expand All @@ -454,7 +477,7 @@ def callback() -> None:
"""Perform pre-launch tasks."""
# Delete old coverage database directories before building again. We
# need to do this because the build directory is not 'renewed'.
rm_path(self.cov_db_dir)
rm_path(Path(self.cov_db_dir))

return callback

Expand All @@ -471,9 +494,29 @@ class CompileOneShot(Deploy):

target = "build"

def __init__(self, build_mode, sim_cfg) -> None:
def __init__(self, build_mode: "BuildMode", sim_cfg: "FlowCfg") -> None:
"""Initialise a CompileOneShot object."""
self.build_mode_obj = build_mode

# Declare (typed) variables for values that will be loaded in in
# super().__init__. This teaches a type checker about the existence of
# the fields.
self.proj_root: str = ""
self.sv_flist_gen_cmd: str = ""
self.sv_flist_gen_dir: str = ""
self.sv_flist_gen_opts: list[str] = []
self.build_dir: str = ""
self.build_cmd: str = ""
self.build_opts: list[str] = []
self.build_log: str = ""
self.build_timeout_mins: float | None = None
self.post_build_cmds: list[str] = []
self.pre_build_cmds: list[str] = []
self.report_cmd: str = ""
self.report_opts: list[str] = []
self.build_fail_patterns: list[str] = []
self.build_pass_patterns: list[str] = []

super().__init__(sim_cfg)

# Needs to be after the wildcard expansion to log anything meaningful
Expand Down Expand Up @@ -542,7 +585,10 @@ class RunTest(Deploy):
fixed_seed = None
cmds_list_vars = ["pre_run_cmds", "post_run_cmds"]

def __init__(self, index, test, build_job, sim_cfg: "SimCfg") -> None:
def __init__(self, index: int, test: Test, build_job: CompileSim, sim_cfg: "SimCfg") -> None:
# Register a copy of sim_cfg which is explicitly the SimCfg type
self._typed_sim_cfg: SimCfg = sim_cfg

self.test_obj = test
self.index = index
self.build_seed = sim_cfg.build_seed
Expand All @@ -557,6 +603,30 @@ def __init__(self, index, test, build_job, sim_cfg: "SimCfg") -> None:
index,
self.seed,
)

# Declare (typed) variables for values that will be loaded in in
# super().__init__. This teaches a type checker about the existence of
# the fields.
self.proj_root: str = ""
self.uvm_test: str = ""
self.uvm_test_seq: str = ""
self.sw_images: list[str] = []
self.sw_build_device: str = ""
self.sw_build_cmd: str = ""
self.sw_build_opts: list[str] = []
self.run_dir: str = ""
self.pre_run_cmds: list[str] = []
self.run_cmd: str = ""
self.run_opts: list[str] = []
self.post_run_cmds: list[str] = []
self.cov_db_dir: str = ""
self.cov_db_test_dir: str = ""
self.run_dir_name: str = ""
self.run_fail_patterns: list[str] = []
self.run_pass_patterns: list[str] = []
self.run_timeout_mins: float | None = None
self.run_timeout_multiplier: float = 1

super().__init__(sim_cfg)

# Needs to be after the wildcard expansion to log anything meaningful
Expand All @@ -567,7 +637,7 @@ def __init__(self, index, test, build_job, sim_cfg: "SimCfg") -> None:
self.run_timeout_mins,
)

if build_job is not None and not self.sim_cfg.run_only:
if build_job is not None and not self._typed_sim_cfg.run_only:
self.dependencies.append(build_job)

# We did something wrong if build_mode is not the same as the build_job
Expand Down Expand Up @@ -621,7 +691,7 @@ def _set_attrs(self) -> None:
self.qual_name = self.run_dir_name + "." + str(self.seed)
self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}"
self.job_name += f"_{self.build_mode}"
if self.sim_cfg.cov:
if self._typed_sim_cfg.cov:
self.output_dirs += [self.cov_db_dir]

# In GUI mode, the log file is not updated; hence, nothing to check.
Expand Down Expand Up @@ -660,7 +730,7 @@ def callback(status: JobStatus) -> None:
"""Perform tidy up tasks."""
if status != JobStatus.PASSED:
# Delete the coverage data if available.
rm_path(self.cov_db_test_dir)
rm_path(Path(self.cov_db_test_dir))

return callback

Expand Down Expand Up @@ -691,8 +761,24 @@ class CovUnr(Deploy):

target = "cov_unr"

def __init__(self, sim_cfg) -> None:
def __init__(self, sim_cfg: FlowCfg) -> None:
"""Initialise a UNR coverage calculation job deployment."""
# Declare (typed) variables for values that will be loaded in in
# super().__init__. This teaches a type checker about the existence of
# the fields.
self.proj_root: str = ""
self.sv_flist_gen_cmd: str = ""
self.sv_flist_gen_dir: str = ""
self.sv_flist_gen_opts: list[str] = []
self.build_dir: str = ""
self.cov_unr_build_cmd: str = ""
self.cov_unr_build_opts: list[str] = []
self.cov_unr_run_cmd: str = ""
self.cov_unr_run_opts: list[str] = []
self.cov_unr_dir: str = ""
self.cov_merge_db_dir: str = ""
self.build_fail_patterns: list[str] = []

super().__init__(sim_cfg)

def _define_attrs(self) -> None:
Expand Down Expand Up @@ -759,7 +845,7 @@ def __init__(self, run_items, sim_cfg) -> None:
self.cov_merge_db_dir = subst_wildcards("{cov_merge_db_dir}", sim_cfg.__dict__)

# Prune previous merged cov directories, keeping past 7 dbs.
prev_cov_db_dirs = clean_odirs(odir=self.cov_merge_db_dir, max_odirs=7)
prev_cov_db_dirs = clean_odirs(odir=Path(self.cov_merge_db_dir), max_odirs=7)

# If the --cov-merge-previous command line switch is passed, then
# merge coverage with the previous runs.
Expand Down Expand Up @@ -797,8 +883,20 @@ class CovReport(Deploy):
target = "cov_report"
weight = 10

def __init__(self, merge_job, sim_cfg) -> None:
def __init__(self, merge_job: CovMerge, sim_cfg: "SimCfg") -> None:
"""Initialise a job deployment to generate a coverage report."""
# Register a copy of sim_cfg which is explicitly the SimCfg type
self._typed_sim_cfg: SimCfg = sim_cfg

# Declare (typed) variables for values that will be loaded in in
# super().__init__. This teaches a type checker about the existence of
# the fields.
self.cov_report_cmd: str = ""
self.cov_report_opts: list[str] = []
self.cov_report_dir: str = ""
self.cov_merge_db_dir: str = ""
self.cov_report_txt: str = ""

super().__init__(sim_cfg)
self.dependencies.append(merge_job)

Expand Down Expand Up @@ -835,10 +933,10 @@ def callback(status: JobStatus) -> None:
if self.dry_run or status != JobStatus.PASSED:
return

plugin = get_sim_tool_plugin(tool=self.sim_cfg.tool)
plugin = get_sim_tool_plugin(tool=self._typed_sim_cfg.tool)

results, self.cov_total = plugin.get_cov_summary_table(
cov_report_path=self.cov_report_txt,
cov_report_path=Path(self.cov_report_txt),
)

for tup in zip(*results, strict=False):
Expand All @@ -852,10 +950,20 @@ class CovAnalyze(Deploy):

target = "cov_analyze"

def __init__(self, sim_cfg) -> None:
def __init__(self, sim_cfg: FlowCfg) -> None:
"""Initialise a job deployment for running coverage analysis."""
# Enforce GUI mode for coverage analysis.
sim_cfg.gui = True

# Declare (typed) variables for values that will be loaded in in
# super().__init__. This teaches a type checker about the existence of
# the fields.
self.proj_root: str = ""
self.cov_analyze_cmd: str = ""
self.cov_analyze_opts: list[str] = []
self.cov_analyze_dir: str = ""
self.cov_merge_db_dir: str = ""

super().__init__(sim_cfg)

def _define_attrs(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion src/dvsim/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def _build_logger() -> DVSimLogger:
# Log any unhandled exceptions
_previous_excepthook = sys.excepthook

def _handle_exception(exc_type, exc_value, exc_tb):
def _handle_exception(exc_type, exc_value, exc_tb) -> None:
logger.critical("Unhandled exception", exc_info=(exc_type, exc_value, exc_tb))
_previous_excepthook(exc_type, exc_value, exc_tb)

Expand Down
Loading