From b5000763a94b4bdb1420cdc728c480c376ce2b72 Mon Sep 17 00:00:00 2001 From: Alexandra Bara Date: Thu, 13 Nov 2025 09:06:51 -0600 Subject: [PATCH 1/8] added topo info --- .../plugins/inband/amdsmi/amdsmi_collector.py | 400 ++++++++++++++++++ .../plugins/inband/amdsmi/amdsmidata.py | 133 ++++++ 2 files changed, 533 insertions(+) diff --git a/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py b/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py index a29ff1e7..ca100bca 100644 --- a/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py +++ b/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py @@ -32,14 +32,21 @@ from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily from nodescraper.models import TaskResult from nodescraper.plugins.inband.amdsmi.amdsmidata import ( + AccessTable, AmdSmiDataModel, AmdSmiListItem, AmdSmiMetric, AmdSmiStatic, AmdSmiVersion, + AtomicsTable, BadPages, + BiDirectionalTable, + CoherentTable, + DmaTable, Fw, FwListItem, + LinkStatusTable, + LinkTypes, MetricClockData, MetricEccTotals, MetricEnergy, @@ -75,7 +82,13 @@ StaticVbios, StaticVram, StaticXgmiPlpd, + Topo, + TopoLink, ValueUnit, + XgmiLink, + XgmiLinkMetrics, + XgmiLinks, + XgmiMetrics, ) from nodescraper.utils import get_exception_details, get_exception_traceback @@ -234,6 +247,9 @@ def _get_amdsmi_data(self) -> Optional[AmdSmiDataModel]: gpu_list = self.get_gpu_list() statics = self.get_static() metric = self.get_metric() + xgmi_metrics = self.get_xgmi_metrics() + xgmi_links = self.get_xgmi_links() + topology = self.get_topology() except Exception as e: self.logger.error(e) self._log_event( @@ -253,9 +269,12 @@ def _get_amdsmi_data(self) -> Optional[AmdSmiDataModel]: gpu_list=gpu_list, process=processes, partition=partition, + topology=topology, firmware=firmware, static=statics, metric=metric, + xgmi_metric=xgmi_metrics, + xgmi_link=xgmi_links, ) except ValidationError as e: self.logger.warning("Validation err: %s", e) @@ -1867,6 +1886,387 @@ def get_throttle(self, h: Any) -> MetricThrottle: total_gfx_clk_below_host_limit_violation_activity=self._th_vu_list_pct(per_gfx_total), ) + def get_xgmi_metrics(self) -> list[XgmiMetrics]: + """Collect XGMI link metrics for all GPU devices. + + Returns: + list[XgmiMetrics]: List of XGMI metrics for each GPU + """ + amdsmi = self._amdsmi_mod() + handles = self._get_handles() + if not handles: + return [] + + link_metrics_fn = getattr(amdsmi, "amdsmi_get_link_metrics", None) + if not callable(link_metrics_fn): + return [] + + xgmi_metrics_list = [] + for idx, h in enumerate(handles): + try: + data = self._smi_try(link_metrics_fn, h, default=None) + if not isinstance(data, dict): + continue + + # Get GPU info (BDF) + bdf = self._smi_try(amdsmi.amdsmi_get_gpu_device_bdf, h, default="") + if not isinstance(bdf, str): + bdf = f"unknown_{idx}" + + # Extract link metrics + bit_rate_val = data.get("bit_rate") + max_bandwidth_val = data.get("max_bandwidth") + + # Process links + links_data = data.get("links", []) + if not isinstance(links_data, list): + links_data = [] + + xgmi_links = [] + for link_data in links_data: + if not isinstance(link_data, dict): + continue + + link_bdf = link_data.get("bdf", "") + read_val = link_data.get("read") + write_val = link_data.get("write") + + xgmi_link = XgmiLink( + gpu=idx, + bdf=link_bdf if isinstance(link_bdf, str) else "", + read=self._valueunit(read_val, "KB"), + write=self._valueunit(write_val, "KB"), + ) + xgmi_links.append(xgmi_link) + + # Get link type name + link_type_val = data.get("link_type", 0) + link_type_name = "UNKNOWN" + link_type_enum = getattr(amdsmi, "AmdSmiLinkType", None) + if link_type_enum: + if link_type_val == getattr(link_type_enum, "AMDSMI_LINK_TYPE_XGMI", None): + link_type_name = "XGMI" + elif link_type_val == getattr(link_type_enum, "AMDSMI_LINK_TYPE_PCIE", None): + link_type_name = "PCIE" + elif link_type_val == getattr( + link_type_enum, "AMDSMI_LINK_TYPE_INTERNAL", None + ): + link_type_name = "INTERNAL" + elif link_type_val == getattr( + link_type_enum, "AMDSMI_LINK_TYPE_NOT_APPLICABLE", None + ): + link_type_name = "NOT_APPLICABLE" + + link_metrics = XgmiLinkMetrics( + bit_rate=self._valueunit(bit_rate_val, "Gbps"), + max_bandwidth=self._valueunit(max_bandwidth_val, "GB/s"), + link_type=link_type_name, + links=xgmi_links, + ) + + xgmi_metric = XgmiMetrics( + gpu=idx, + bdf=bdf, + link_metrics=link_metrics, + ) + xgmi_metrics_list.append(xgmi_metric) + + except Exception as e: + self.logger.debug(f"Error collecting XGMI metrics for GPU {idx}: {e}") + continue + + return xgmi_metrics_list + + def get_xgmi_links(self) -> list[XgmiLinks]: + """Collect XGMI link status/topology information for all GPU devices. + + Returns: + list[XgmiLinks]: List of XGMI link status for each GPU + """ + amdsmi = self._amdsmi_mod() + handles = self._get_handles() + if not handles: + return [] + + # Check for topology functions + topo_get_link_type_fn = getattr(amdsmi, "amdsmi_topo_get_link_type", None) + if not callable(topo_get_link_type_fn): + return [] + + xgmi_links_list = [] + for idx, h in enumerate(handles): + try: + bdf = self._smi_try(amdsmi.amdsmi_get_gpu_device_bdf, h, default="") + if not isinstance(bdf, str): + bdf = f"unknown_{idx}" + + # Collect link status for all other GPUs + link_status_list = [] + for other_idx, other_h in enumerate(handles): + if idx == other_idx: + link_status_list.append(LinkStatusTable.DISABLED) + continue + + try: + link_type = self._smi_try(topo_get_link_type_fn, h, other_h, default=None) + # Determine link based on type as per API example + link_type_enum = getattr(amdsmi, "AmdSmiLinkType", None) + if link_type_enum and link_type is not None: + if link_type == getattr(link_type_enum, "AMDSMI_LINK_TYPE_XGMI", None): + link_status_list.append(LinkStatusTable.UP) + elif link_type == getattr( + link_type_enum, "AMDSMI_LINK_TYPE_NOT_APPLICABLE", None + ): + link_status_list.append(LinkStatusTable.DISABLED) + else: + link_status_list.append(LinkStatusTable.DOWN) + else: + link_status_list.append(LinkStatusTable.DISABLED) + except Exception: + link_status_list.append(LinkStatusTable.DISABLED) + + xgmi_link = XgmiLinks( + gpu=idx, + bdf=bdf, + link_status=link_status_list, + ) + xgmi_links_list.append(xgmi_link) + + except Exception as e: + self.logger.debug(f"Error collecting XGMI link status for GPU {idx}: {e}") + continue + + return xgmi_links_list + + def get_topology(self) -> list[Topo]: + """Collect GPU topology information for all GPU devices. + + Returns: + list[Topo]: List of topology information for each GPU + """ + amdsmi = self._amdsmi_mod() + handles = self._get_handles() + if not handles: + return [] + + topo_fn = getattr(amdsmi, "amdsmi_get_gpu_topology", None) + topo_get_link_type_fn = getattr(amdsmi, "amdsmi_topo_get_link_type", None) + topo_get_link_weight_fn = getattr(amdsmi, "amdsmi_topo_get_link_weight", None) + get_minmax_bw_fn = getattr(amdsmi, "amdsmi_get_minmax_bandwidth_between_processors", None) + + topology_list = [] + for idx, h in enumerate(handles): + try: + bdf = self._smi_try(amdsmi.amdsmi_get_gpu_device_bdf, h, default="") + if not isinstance(bdf, str): + bdf = f"unknown_{idx}" + + topo_links = [] + + # If amdsmi_get_gpu_topology is available, use it + if callable(topo_fn): + topo_data = self._smi_try(topo_fn, h, default=None) + if isinstance(topo_data, dict): + # Process topology data if it contains link information + links_data = topo_data.get("links", []) + if isinstance(links_data, list): + for link_data in links_data: + if not isinstance(link_data, dict): + continue + topo_link = self._process_topo_link(amdsmi, idx, link_data) + if topo_link: + topo_links.append(topo_link) + + # Fallback: collect topology using individual API calls + if not topo_links and callable(topo_get_link_type_fn): + for other_idx, other_h in enumerate(handles): + try: + other_bdf = self._smi_try( + amdsmi.amdsmi_get_gpu_device_bdf, other_h, default="" + ) + if not isinstance(other_bdf, str): + other_bdf = f"unknown_{other_idx}" + + # Get link type and hops + link_info = self._smi_try( + topo_get_link_type_fn, h, other_h, default=None + ) + if isinstance(link_info, dict): + link_type_val = link_info.get("type") + num_hops = link_info.get("hops", 0) + else: + link_type_val = link_info + num_hops = 0 + + # Determine link type + link_type_enum = getattr(amdsmi, "AmdSmiLinkType", None) + if idx == other_idx: + link_type = LinkTypes.SELF + link_status = AccessTable.ENABLED + elif link_type_enum and link_type_val is not None: + if link_type_val == getattr( + link_type_enum, "AMDSMI_LINK_TYPE_XGMI", None + ): + link_type = LinkTypes.XGMI + link_status = AccessTable.ENABLED + elif link_type_val == getattr( + link_type_enum, "AMDSMI_LINK_TYPE_PCIE", None + ): + link_type = LinkTypes.PCIE + link_status = AccessTable.ENABLED + else: + link_type = LinkTypes.PCIE + link_status = AccessTable.DISABLED + else: + continue + + # Get link weight + weight = 0 + if callable(topo_get_link_weight_fn): + weight_val = self._smi_try( + topo_get_link_weight_fn, h, other_h, default=None + ) + if isinstance(weight_val, (int, float)): + weight = int(weight_val) + + # Get bandwidth + bandwidth = "0-0" + if callable(get_minmax_bw_fn): + bw_data = self._smi_try(get_minmax_bw_fn, h, other_h, default=None) + if isinstance(bw_data, dict): + min_bw = bw_data.get("min_bandwidth", 0) + max_bw = bw_data.get("max_bandwidth", 0) + bandwidth = f"{min_bw}-{max_bw}" + + # Create topology link + topo_link = TopoLink( + gpu=other_idx, + bdf=other_bdf, + weight=weight, + link_status=link_status, + link_type=link_type, + num_hops=num_hops if isinstance(num_hops, int) else 0, + bandwidth=bandwidth, + ) + topo_links.append(topo_link) + + except Exception as e: + self.logger.debug( + f"Error collecting topology link from GPU {idx} to {other_idx}: {e}" + ) + continue + + if topo_links: + topo = Topo( + gpu=idx, + bdf=bdf, + links=topo_links, + ) + topology_list.append(topo) + + except Exception as e: + self.logger.debug(f"Error collecting topology for GPU {idx}: {e}") + continue + + return topology_list + + def _process_topo_link(self, amdsmi: Any, src_idx: int, link_data: dict) -> Optional[TopoLink]: + """Process topology link data from amdsmi_get_gpu_topology. + + Args: + amdsmi (Any): AMD SMI module + src_idx (int): Source GPU index + link_data (dict): Link data from topology API + + Returns: + Optional[TopoLink]: Processed topology link or None + """ + try: + dst_idx = link_data.get("gpu", link_data.get("index", 0)) + dst_bdf = link_data.get("bdf", f"unknown_{dst_idx}") + weight = link_data.get("weight", 0) + num_hops = link_data.get("hops", link_data.get("num_hops", 0)) + + # Parse link type + link_type_str = link_data.get("link_type", link_data.get("type", "")).upper() + if "XGMI" in link_type_str: + link_type = LinkTypes.XGMI + elif "PCIE" in link_type_str or "PCI" in link_type_str: + link_type = LinkTypes.PCIE + elif src_idx == dst_idx or "SELF" in link_type_str: + link_type = LinkTypes.SELF + else: + link_type = LinkTypes.PCIE + + # Parse link status + link_status_str = link_data.get("status", link_data.get("link_status", "")).upper() + if "ENABLED" in link_status_str or "UP" in link_status_str: + link_status = AccessTable.ENABLED + else: + link_status = AccessTable.DISABLED + + # Parse bandwidth + bandwidth = "0-0" + bw_data = link_data.get("bandwidth") + if isinstance(bw_data, dict): + min_bw = bw_data.get("min", 0) + max_bw = bw_data.get("max", 0) + bandwidth = f"{min_bw}-{max_bw}" + elif isinstance(bw_data, str): + bandwidth = bw_data + + # Parse optional fields + coherent = None + coherent_str = str(link_data.get("coherent", "")).upper() + if "C" in coherent_str and "NC" not in coherent_str: + coherent = CoherentTable.COHERANT + elif "NC" in coherent_str: + coherent = CoherentTable.NON_COHERANT + elif "SELF" in coherent_str: + coherent = CoherentTable.SELF + + atomics = None + atomics_str = str(link_data.get("atomics", "")) + if "64,32" in atomics_str or "64, 32" in atomics_str: + atomics = AtomicsTable.TRUE + elif "32" in atomics_str: + atomics = AtomicsTable.THIRTY_TWO + elif "64" in atomics_str: + atomics = AtomicsTable.SIXTY_FOUR + elif "SELF" in atomics_str.upper(): + atomics = AtomicsTable.SELF + + dma = None + dma_val = link_data.get("dma") + if dma_val is True or str(dma_val).upper() in ("TRUE", "T"): + dma = DmaTable.TRUE + elif str(dma_val).upper() == "SELF": + dma = DmaTable.SELF + + bi_dir = None + bi_dir_val = link_data.get("bi_directional", link_data.get("bidirectional")) + if bi_dir_val is True or str(bi_dir_val).upper() in ("TRUE", "T"): + bi_dir = BiDirectionalTable.TRUE + elif str(bi_dir_val).upper() == "SELF": + bi_dir = BiDirectionalTable.SELF + + return TopoLink( + gpu=dst_idx if isinstance(dst_idx, int) else 0, + bdf=dst_bdf if isinstance(dst_bdf, str) else "", + weight=weight if isinstance(weight, int) else 0, + link_status=link_status, + link_type=link_type, + num_hops=num_hops if isinstance(num_hops, int) else 0, + bandwidth=bandwidth, + coherent=coherent, + atomics=atomics, + dma=dma, + bi_dir=bi_dir, + ) + except Exception as e: + self.logger.debug(f"Error processing topology link data: {e}") + return None + def _flatten_2d(self, v: object) -> list[object]: """Flatten a 2D list into a 1D list, or normalize scalars/None to lists. diff --git a/nodescraper/plugins/inband/amdsmi/amdsmidata.py b/nodescraper/plugins/inband/amdsmi/amdsmidata.py index edab6044..ef17cfe5 100644 --- a/nodescraper/plugins/inband/amdsmi/amdsmidata.py +++ b/nodescraper/plugins/inband/amdsmi/amdsmidata.py @@ -1,10 +1,12 @@ import re +from enum import Enum from typing import Any, List, Mapping, Optional, Union from pydantic import ( BaseModel, ConfigDict, Field, + computed_field, field_validator, model_validator, ) @@ -751,6 +753,133 @@ def validate_energy(cls, value: Optional[Any]) -> Optional[MetricEnergy]: return value +### LINK DATA ### + + +class LinkStatusTable(Enum): + UP = "U" + DOWN = "D" + DISABLED = "X" + + +class BiDirectionalTable(Enum): + SELF = "SELF" + TRUE = "T" + + +class DmaTable(Enum): + SELF = "SELF" + TRUE = "T" + + +class AtomicsTable(Enum): + SELF = "SELF" + TRUE = "64,32" + THIRTY_TWO = "32" + SIXTY_FOUR = "64" + + +class LinkTypes(Enum): + XGMI = "XGMI" + PCIE = "PCIE" + SELF = "SELF" + + +class AccessTable(Enum): + ENABLED = "ENABLED" + DISABLED = "DISABLED" + + +# XGMI +class XgmiLink(BaseModel): + gpu: int + bdf: str + read: Optional[ValueUnit] + write: Optional[ValueUnit] + na_validator = field_validator("read", "write", mode="before")(na_to_none) + + +class XgmiLinkMetrics(BaseModel): + bit_rate: Optional[ValueUnit] + max_bandwidth: Optional[ValueUnit] + link_type: str + links: List[XgmiLink] + na_validator = field_validator("max_bandwidth", "bit_rate", mode="before")(na_to_none) + + +class XgmiMetrics(BaseModel): + gpu: int + bdf: str + link_metrics: XgmiLinkMetrics + + +class XgmiLinks(BaseModel): + gpu: int + bdf: str + link_status: list[LinkStatusTable] + + +class CoherentTable(Enum): + COHERANT = "C" + NON_COHERANT = "NC" + SELF = "SELF" + + +# TOPO + + +class TopoLink(BaseModel): + gpu: int + bdf: str + weight: int + link_status: AccessTable + link_type: LinkTypes + num_hops: int + bandwidth: str + # The below fields are sometimes missing, so we use Optional + coherent: Optional[CoherentTable] = None + atomics: Optional[AtomicsTable] = None + dma: Optional[DmaTable] = None + bi_dir: Optional[BiDirectionalTable] = None + + @computed_field + def bandwidth_from(self) -> Optional[int]: + """Get the bandwidth from the link.""" + bw_split = self.bandwidth.split("-") + if len(bw_split) == 2: + return int(bw_split[0]) + else: + # If the bandwidth is not in the expected format, return None + return None + + @computed_field + def bandwidth_to(self) -> Optional[int]: + """Get the bandwidth to the link.""" + bw_split = self.bandwidth.split("-") + if len(bw_split) == 2: + return int(bw_split[1]) + else: + # If the bandwidth is not in the expected format, return None + return None + + +class Topo(BaseModel): + gpu: int + bdf: str + links: List[TopoLink] + + +class AmdSmiTstData(BaseModel): + "Summary of amdsmitst results, with list and count of passing/skipped/failed tests" + + passed_tests: list[str] = Field(default_factory=list) + skipped_tests: list[str] = Field(default_factory=list) + failed_tests: list[str] = Field(default_factory=list) + passed_test_count: int = 0 + skipped_test_count: int = 0 + failed_test_count: int = 0 + + class AmdSmiDataModel(DataModel): """Data model for amd-smi data. @@ -771,10 +900,14 @@ class AmdSmiDataModel(DataModel): gpu_list: Optional[list[AmdSmiListItem]] = Field(default_factory=list) partition: Optional[Partition] = None process: Optional[list[Processes]] = Field(default_factory=list) + topology: Optional[list[Topo]] = Field(default_factory=list) firmware: Optional[list[Fw]] = Field(default_factory=list) bad_pages: Optional[list[BadPages]] = Field(default_factory=list) static: Optional[list[AmdSmiStatic]] = Field(default_factory=list) metric: Optional[list[AmdSmiMetric]] = Field(default_factory=list) + xgmi_metric: Optional[list[XgmiMetrics]] = Field(default_factory=list) + xgmi_link: Optional[list[XgmiLinks]] = Field(default_factory=list) + amdsmitst_data: AmdSmiTstData = Field(default_factory=AmdSmiTstData) def get_list(self, gpu: int) -> Optional[AmdSmiListItem]: """Get the gpu list item for the given gpu id.""" From 3aa4dbec3a8345bcb5a45b4fc798756cbc7b6ece Mon Sep 17 00:00:00 2001 From: Alexandra Bara Date: Thu, 13 Nov 2025 12:22:11 -0600 Subject: [PATCH 2/8] removed deprecated topo_link call --- .../plugins/inband/amdsmi/amdsmi_collector.py | 120 +----------------- 1 file changed, 2 insertions(+), 118 deletions(-) diff --git a/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py b/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py index 0f2d7a79..a8f9b59f 100644 --- a/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py +++ b/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py @@ -38,11 +38,7 @@ AmdSmiMetric, AmdSmiStatic, AmdSmiVersion, - AtomicsTable, BadPages, - BiDirectionalTable, - CoherentTable, - DmaTable, Fw, FwListItem, LinkStatusTable, @@ -2033,7 +2029,6 @@ def get_topology(self) -> list[Topo]: if not handles: return [] - topo_fn = getattr(amdsmi, "amdsmi_get_gpu_topology", None) topo_get_link_type_fn = getattr(amdsmi, "amdsmi_topo_get_link_type", None) topo_get_link_weight_fn = getattr(amdsmi, "amdsmi_topo_get_link_weight", None) get_minmax_bw_fn = getattr(amdsmi, "amdsmi_get_minmax_bandwidth_between_processors", None) @@ -2047,22 +2042,8 @@ def get_topology(self) -> list[Topo]: topo_links = [] - # If amdsmi_get_gpu_topology is available, use it - if callable(topo_fn): - topo_data = self._smi_try(topo_fn, h, default=None) - if isinstance(topo_data, dict): - # Process topology data if it contains link information - links_data = topo_data.get("links", []) - if isinstance(links_data, list): - for link_data in links_data: - if not isinstance(link_data, dict): - continue - topo_link = self._process_topo_link(amdsmi, idx, link_data) - if topo_link: - topo_links.append(topo_link) - - # Fallback: collect topology using individual API calls - if not topo_links and callable(topo_get_link_type_fn): + # Collect topology using individual API calls + if callable(topo_get_link_type_fn): for other_idx, other_h in enumerate(handles): try: other_bdf = self._smi_try( @@ -2154,103 +2135,6 @@ def get_topology(self) -> list[Topo]: return topology_list - def _process_topo_link(self, amdsmi: Any, src_idx: int, link_data: dict) -> Optional[TopoLink]: - """Process topology link data from amdsmi_get_gpu_topology. - - Args: - amdsmi (Any): AMD SMI module - src_idx (int): Source GPU index - link_data (dict): Link data from topology API - - Returns: - Optional[TopoLink]: Processed topology link or None - """ - try: - dst_idx = link_data.get("gpu", link_data.get("index", 0)) - dst_bdf = link_data.get("bdf", f"unknown_{dst_idx}") - weight = link_data.get("weight", 0) - num_hops = link_data.get("hops", link_data.get("num_hops", 0)) - - # Parse link type - link_type_str = link_data.get("link_type", link_data.get("type", "")).upper() - if "XGMI" in link_type_str: - link_type = LinkTypes.XGMI - elif "PCIE" in link_type_str or "PCI" in link_type_str: - link_type = LinkTypes.PCIE - elif src_idx == dst_idx or "SELF" in link_type_str: - link_type = LinkTypes.SELF - else: - link_type = LinkTypes.PCIE - - # Parse link status - link_status_str = link_data.get("status", link_data.get("link_status", "")).upper() - if "ENABLED" in link_status_str or "UP" in link_status_str: - link_status = AccessTable.ENABLED - else: - link_status = AccessTable.DISABLED - - # Parse bandwidth - bandwidth = "0-0" - bw_data = link_data.get("bandwidth") - if isinstance(bw_data, dict): - min_bw = bw_data.get("min", 0) - max_bw = bw_data.get("max", 0) - bandwidth = f"{min_bw}-{max_bw}" - elif isinstance(bw_data, str): - bandwidth = bw_data - - # Parse optional fields - coherent = None - coherent_str = str(link_data.get("coherent", "")).upper() - if "C" in coherent_str and "NC" not in coherent_str: - coherent = CoherentTable.COHERANT - elif "NC" in coherent_str: - coherent = CoherentTable.NON_COHERANT - elif "SELF" in coherent_str: - coherent = CoherentTable.SELF - - atomics = None - atomics_str = str(link_data.get("atomics", "")) - if "64,32" in atomics_str or "64, 32" in atomics_str: - atomics = AtomicsTable.TRUE - elif "32" in atomics_str: - atomics = AtomicsTable.THIRTY_TWO - elif "64" in atomics_str: - atomics = AtomicsTable.SIXTY_FOUR - elif "SELF" in atomics_str.upper(): - atomics = AtomicsTable.SELF - - dma = None - dma_val = link_data.get("dma") - if dma_val is True or str(dma_val).upper() in ("TRUE", "T"): - dma = DmaTable.TRUE - elif str(dma_val).upper() == "SELF": - dma = DmaTable.SELF - - bi_dir = None - bi_dir_val = link_data.get("bi_directional", link_data.get("bidirectional")) - if bi_dir_val is True or str(bi_dir_val).upper() in ("TRUE", "T"): - bi_dir = BiDirectionalTable.TRUE - elif str(bi_dir_val).upper() == "SELF": - bi_dir = BiDirectionalTable.SELF - - return TopoLink( - gpu=dst_idx if isinstance(dst_idx, int) else 0, - bdf=dst_bdf if isinstance(dst_bdf, str) else "", - weight=weight if isinstance(weight, int) else 0, - link_status=link_status, - link_type=link_type, - num_hops=num_hops if isinstance(num_hops, int) else 0, - bandwidth=bandwidth, - coherent=coherent, - atomics=atomics, - dma=dma, - bi_dir=bi_dir, - ) - except Exception as e: - self.logger.debug(f"Error processing topology link data: {e}") - return None - def _flatten_2d(self, v: object) -> list[object]: """Flatten a 2D list into a 1D list, or normalize scalars/None to lists. From 547a99c99a4a9fec370bafa1ac972ac2a9b73623 Mon Sep 17 00:00:00 2001 From: Alexandra Bara Date: Tue, 25 Nov 2025 10:39:28 -0600 Subject: [PATCH 3/8] added xgmi calls and expected_link spped --- .../plugins/inband/amdsmi/amdsmi_analyzer.py | 101 ++++++++++ .../plugins/inband/amdsmi/analyzer_args.py | 1 + test/unit/plugin/test_amdsmi_analyzer.py | 183 ++++++++++++++++++ 3 files changed, 285 insertions(+) diff --git a/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py b/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py index 9e8d7c49..e72c478c 100644 --- a/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py +++ b/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py @@ -35,10 +35,12 @@ AmdSmiDataModel, AmdSmiMetric, AmdSmiStatic, + AmdSmiTstData, EccData, Fw, Partition, Processes, + XgmiMetrics, ) from .analyzer_args import AmdSmiAnalyzerArgs from .cper import CperAnalysisTaskMixin @@ -637,6 +639,97 @@ def check_expected_memory_partition_mode( }, ) + def check_expected_xgmi_link_speed( + self, + xgmi_metric: Optional[List[XgmiMetrics]], + expected_xgmi_speed: Optional[List[float]] = None, + ): + """Check the XGMI link speed for all GPUs + + Args: + xgmi_metric (Optional[List[XgmiMetrics]]): XGMI metrics data + expected_xgmi_speed (Optional[List[float]]): List of expected XGMI speeds (GT/s) + """ + if xgmi_metric is None or len(xgmi_metric) == 0: + self._log_event( + category=EventCategory.IO, + description="XGMI link speed data is not available and cannot be checked", + priority=EventPriority.WARNING, + data={"xgmi_metric": xgmi_metric}, + ) + return + + if expected_xgmi_speed is None or len(expected_xgmi_speed) == 0: + self._log_event( + category=EventCategory.IO, + description="Expected XGMI speed not configured, skipping XGMI link speed check", + priority=EventPriority.WARNING, + ) + return + + for xgmi_data in xgmi_metric: + link_metric = xgmi_data.link_metrics + try: + if link_metric.bit_rate is None or link_metric.bit_rate.value is None: + self._log_event( + category=EventCategory.IO, + description="XGMI link speed is not available", + priority=EventPriority.ERROR, + data={ + "gpu": xgmi_data.gpu, + "xgmi_bit_rate": ( + link_metric.bit_rate.unit if link_metric.bit_rate else "N/A" + ), + }, + ) + continue + + xgmi_float = float(link_metric.bit_rate.value) + except ValueError: + self._log_event( + category=EventCategory.IO, + description="XGMI link speed is not a valid number", + priority=EventPriority.ERROR, + data={ + "gpu": xgmi_data.gpu, + "xgmi_bit_rate": ( + link_metric.bit_rate.value if link_metric.bit_rate else "N/A" + ), + }, + ) + continue + + if xgmi_float not in expected_xgmi_speed: + self._log_event( + category=EventCategory.IO, + description="XGMI link speed is not as expected", + priority=EventPriority.ERROR, + data={ + "gpu": xgmi_data.gpu, + "xgmi_bit_rate": xgmi_float, + "expected_xgmi_speed": expected_xgmi_speed, + }, + console_log=True, + ) + + def check_amdsmitst(self, amdsmitst_data: AmdSmiTstData): + """Check AMD SMI test results + + Args: + amdsmitst_data (AmdSmiTstData): AMD SMI test data + """ + if amdsmitst_data.failed_test_count > 0: + self._log_event( + category=EventCategory.APPLICATION, + description=f"{amdsmitst_data.failed_test_count} failed tests running amdsmitst", + priority=EventPriority.ERROR, + data={ + "failed_test_count": amdsmitst_data.failed_test_count, + "failed_tests": amdsmitst_data.failed_tests, + }, + console_log=True, + ) + def analyze_data( self, data: AmdSmiDataModel, args: Optional[AmdSmiAnalyzerArgs] = None ) -> TaskResult: @@ -717,4 +810,12 @@ def analyze_data( analysis_range_end=args.analysis_range_end, ) + if data.xgmi_metric and len(data.xgmi_metric) > 0: + self.check_expected_xgmi_link_speed( + data.xgmi_metric, expected_xgmi_speed=args.expected_xgmi_speed + ) + + if data.amdsmitst_data and data.amdsmitst_data.failed_test_count > 0: + self.check_amdsmitst(data.amdsmitst_data) + return self.result diff --git a/nodescraper/plugins/inband/amdsmi/analyzer_args.py b/nodescraper/plugins/inband/amdsmi/analyzer_args.py index 03504fae..333f37ae 100644 --- a/nodescraper/plugins/inband/amdsmi/analyzer_args.py +++ b/nodescraper/plugins/inband/amdsmi/analyzer_args.py @@ -45,5 +45,6 @@ class AmdSmiAnalyzerArgs(AnalyzerArgs): devid_ep: Optional[str] = None devid_ep_vf: Optional[str] = None sku_name: Optional[str] = None + expected_xgmi_speed: Optional[list[float]] = None analysis_range_start: Optional[datetime] = None analysis_range_end: Optional[datetime] = None diff --git a/test/unit/plugin/test_amdsmi_analyzer.py b/test/unit/plugin/test_amdsmi_analyzer.py index 849b0b01..af7ab0f9 100644 --- a/test/unit/plugin/test_amdsmi_analyzer.py +++ b/test/unit/plugin/test_amdsmi_analyzer.py @@ -31,6 +31,7 @@ from nodescraper.plugins.inband.amdsmi.amdsmidata import ( AmdSmiDataModel, AmdSmiStatic, + AmdSmiTstData, AmdSmiVersion, EccState, Fw, @@ -52,6 +53,8 @@ StaticRas, StaticVram, ValueUnit, + XgmiLinkMetrics, + XgmiMetrics, ) from nodescraper.plugins.inband.amdsmi.analyzer_args import AmdSmiAnalyzerArgs @@ -550,6 +553,167 @@ def test_check_expected_memory_partition_mode_mismatch(mock_analyzer): assert len(analyzer.result.events) >= 0 +def test_check_expected_xgmi_link_speed_success(mock_analyzer): + """Test check_expected_xgmi_link_speed passes when XGMI speed matches.""" + analyzer = mock_analyzer + + xgmi_data = [ + XgmiMetrics( + gpu=0, + bdf="0000:01:00.0", + link_metrics=XgmiLinkMetrics( + bit_rate=ValueUnit(value=32.0, unit="GT/s"), + max_bandwidth=None, + link_type="XGMI", + links=[], + ), + ), + XgmiMetrics( + gpu=1, + bdf="0000:02:00.0", + link_metrics=XgmiLinkMetrics( + bit_rate=ValueUnit(value=32.0, unit="GT/s"), + max_bandwidth=None, + link_type="XGMI", + links=[], + ), + ), + ] + + analyzer.check_expected_xgmi_link_speed(xgmi_data, expected_xgmi_speed=[32.0]) + + assert len(analyzer.result.events) == 0 + + +def test_check_expected_xgmi_link_speed_mismatch(mock_analyzer): + """Test check_expected_xgmi_link_speed logs error when speed doesn't match.""" + analyzer = mock_analyzer + + xgmi_data = [ + XgmiMetrics( + gpu=0, + bdf="0000:01:00.0", + link_metrics=XgmiLinkMetrics( + bit_rate=ValueUnit(value=25.0, unit="GT/s"), + max_bandwidth=None, + link_type="XGMI", + links=[], + ), + ), + ] + + analyzer.check_expected_xgmi_link_speed(xgmi_data, expected_xgmi_speed=[32.0]) + + assert len(analyzer.result.events) == 1 + assert analyzer.result.events[0].category == "IO" + assert analyzer.result.events[0].priority == EventPriority.ERROR + assert "XGMI link speed is not as expected" in analyzer.result.events[0].description + + +def test_check_expected_xgmi_link_speed_multiple_valid_speeds(mock_analyzer): + """Test check_expected_xgmi_link_speed with multiple valid speeds.""" + analyzer = mock_analyzer + + xgmi_data = [ + XgmiMetrics( + gpu=0, + bdf="0000:01:00.0", + link_metrics=XgmiLinkMetrics( + bit_rate=ValueUnit(value=36.0, unit="GT/s"), + max_bandwidth=None, + link_type="XGMI", + links=[], + ), + ), + XgmiMetrics( + gpu=1, + bdf="0000:02:00.0", + link_metrics=XgmiLinkMetrics( + bit_rate=ValueUnit(value=38.0, unit="GT/s"), + max_bandwidth=None, + link_type="XGMI", + links=[], + ), + ), + ] + + analyzer.check_expected_xgmi_link_speed(xgmi_data, expected_xgmi_speed=[36.0, 38.0]) + + assert len(analyzer.result.events) == 0 + + +def test_check_expected_xgmi_link_speed_no_data(mock_analyzer): + """Test check_expected_xgmi_link_speed handles missing XGMI data.""" + analyzer = mock_analyzer + + analyzer.check_expected_xgmi_link_speed(None, expected_xgmi_speed=[32.0]) + + assert len(analyzer.result.events) == 1 + assert analyzer.result.events[0].priority == EventPriority.WARNING + assert "XGMI link speed data is not available" in analyzer.result.events[0].description + + +def test_check_expected_xgmi_link_speed_missing_bit_rate(mock_analyzer): + """Test check_expected_xgmi_link_speed handles missing bit rate value.""" + analyzer = mock_analyzer + + xgmi_data = [ + XgmiMetrics( + gpu=0, + bdf="0000:01:00.0", + link_metrics=XgmiLinkMetrics( + bit_rate=None, + max_bandwidth=None, + link_type="XGMI", + links=[], + ), + ), + ] + + analyzer.check_expected_xgmi_link_speed(xgmi_data, expected_xgmi_speed=[32.0]) + + assert len(analyzer.result.events) == 1 + assert analyzer.result.events[0].priority == EventPriority.ERROR + assert "XGMI link speed is not available" in analyzer.result.events[0].description + + +def test_check_amdsmitst_success(mock_analyzer): + """Test check_amdsmitst passes when no tests failed.""" + analyzer = mock_analyzer + + tst_data = AmdSmiTstData( + passed_tests=["test1", "test2", "test3"], + skipped_tests=[], + failed_tests=[], + failed_test_count=0, + ) + + analyzer.check_amdsmitst(tst_data) + + assert len(analyzer.result.events) == 0 + + +def test_check_amdsmitst_failures(mock_analyzer): + """Test check_amdsmitst logs error when tests failed.""" + analyzer = mock_analyzer + + tst_data = AmdSmiTstData( + passed_tests=["test1", "test2"], + skipped_tests=["test3"], + failed_tests=["test4", "test5"], + failed_test_count=2, + ) + + analyzer.check_amdsmitst(tst_data) + + assert len(analyzer.result.events) == 1 + assert analyzer.result.events[0].category == "APPLICATION" + assert analyzer.result.events[0].priority == EventPriority.ERROR + assert "2 failed tests running amdsmitst" in analyzer.result.events[0].description + assert analyzer.result.events[0].data["failed_test_count"] == 2 + assert analyzer.result.events[0].data["failed_tests"] == ["test4", "test5"] + + def test_analyze_data_full_workflow(mock_analyzer): """Test full analyze_data workflow with various checks.""" analyzer = mock_analyzer @@ -588,12 +752,31 @@ def test_analyze_data_full_workflow(mock_analyzer): ], partition=None, gpu_list=None, + xgmi_metric=[ + XgmiMetrics( + gpu=0, + bdf="0000:01:00.0", + link_metrics=XgmiLinkMetrics( + bit_rate=ValueUnit(value=32.0, unit="GT/s"), + max_bandwidth=None, + link_type="XGMI", + links=[], + ), + ), + ], + amdsmitst_data=AmdSmiTstData( + passed_tests=["test1", "test2"], + skipped_tests=[], + failed_tests=[], + failed_test_count=0, + ), ) args = AmdSmiAnalyzerArgs( expected_max_power=550, expected_driver_version="1.2.3", expected_gpu_processes=10, + expected_xgmi_speed=[32.0], ) result = analyzer.analyze_data(data, args) From 161c3f93c6d7bacfdc2a222e90438228a3ea2257 Mon Sep 17 00:00:00 2001 From: Alexandra Bara Date: Tue, 2 Dec 2025 11:14:21 -0600 Subject: [PATCH 4/8] fix for global_args overwritting plugin_args completely. Will now only overwritte args that are provided in both global & plugin --- nodescraper/pluginexecutor.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nodescraper/pluginexecutor.py b/nodescraper/pluginexecutor.py index e47a6cc8..2ee29767 100644 --- a/nodescraper/pluginexecutor.py +++ b/nodescraper/pluginexecutor.py @@ -173,6 +173,12 @@ def run_queue(self) -> list[PluginResult]: global_run_args = self.apply_global_args_to_plugin( plugin_inst, plugin_class, self.plugin_config.global_args ) + # Merge analysis_args and collection_args instead of replacing + for args_key in ["analysis_args", "collection_args"]: + if args_key in global_run_args and args_key in run_payload: + # Merge: global args override plugin-specific args + run_payload[args_key].update(global_run_args[args_key]) + del global_run_args[args_key] run_payload.update(global_run_args) except ValueError as ve: self.logger.error( From 087e108199a3b086315b93c0d9e1518a8adf80ac Mon Sep 17 00:00:00 2001 From: Alexandra Bara Date: Tue, 2 Dec 2025 11:18:44 -0600 Subject: [PATCH 5/8] fix for global_args overwritting plugin_args completely. Will now only overwritte args that are provided in both global & plugin --- nodescraper/pluginexecutor.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/nodescraper/pluginexecutor.py b/nodescraper/pluginexecutor.py index 2ee29767..e47a6cc8 100644 --- a/nodescraper/pluginexecutor.py +++ b/nodescraper/pluginexecutor.py @@ -173,12 +173,6 @@ def run_queue(self) -> list[PluginResult]: global_run_args = self.apply_global_args_to_plugin( plugin_inst, plugin_class, self.plugin_config.global_args ) - # Merge analysis_args and collection_args instead of replacing - for args_key in ["analysis_args", "collection_args"]: - if args_key in global_run_args and args_key in run_payload: - # Merge: global args override plugin-specific args - run_payload[args_key].update(global_run_args[args_key]) - del global_run_args[args_key] run_payload.update(global_run_args) except ValueError as ve: self.logger.error( From 7c24ad0531b27233615e3a7edd36e7fd47e314ea Mon Sep 17 00:00:00 2001 From: Alexandra Bara Date: Tue, 2 Dec 2025 13:47:03 -0600 Subject: [PATCH 6/8] fix for global args overwritting all of plugin args + fix for invalid plugin --- nodescraper/cli/cli.py | 17 +++++++++++++++-- nodescraper/pluginexecutor.py | 28 ++++++++++++++++++++++++++++ test/functional/test_run_plugins.py | 14 ++++++++++---- test/unit/framework/test_cli.py | 5 +++-- 4 files changed, 56 insertions(+), 8 deletions(-) diff --git a/nodescraper/cli/cli.py b/nodescraper/cli/cli.py index c5e7abeb..3c0407bf 100644 --- a/nodescraper/cli/cli.py +++ b/nodescraper/cli/cli.py @@ -327,6 +327,7 @@ def process_args( plugin_arg_index = -1 plugin_arg_map = {} + invalid_plugins = [] if plugin_arg_index != -1 and plugin_arg_index != len(raw_arg_input) - 1: top_level_args = raw_arg_input[: plugin_arg_index + 1] plugin_args = raw_arg_input[plugin_arg_index + 1 :] @@ -342,7 +343,10 @@ def process_args( cur_plugin = arg elif cur_plugin: plugin_arg_map[cur_plugin].append(arg) - return (top_level_args, plugin_arg_map) + elif not arg.startswith("-"): + # Track invalid plugin names to log event later + invalid_plugins.append(arg) + return (top_level_args, plugin_arg_map, invalid_plugins) def main(arg_input: Optional[list[str]] = None): @@ -360,7 +364,9 @@ def main(arg_input: Optional[list[str]] = None): parser, plugin_subparser_map = build_parser(plugin_reg, config_reg) try: - top_level_args, plugin_arg_map = process_args(arg_input, list(plugin_subparser_map.keys())) + top_level_args, plugin_arg_map, invalid_plugins = process_args( + arg_input, list(plugin_subparser_map.keys()) + ) parsed_args = parser.parse_args(top_level_args) system_info = get_system_info(parsed_args) @@ -380,6 +386,13 @@ def main(arg_input: Optional[list[str]] = None): if log_path: logger.info("Log path: %s", log_path) + # Log warning if invalid plugin names were provided + if invalid_plugins: + logger.warning( + "Invalid plugin name(s) ignored: %s. Use 'describe plugin' to list available plugins.", + ", ".join(invalid_plugins), + ) + if parsed_args.subcmd == "summary": generate_summary(parsed_args.search_path, parsed_args.output_path, logger) sys.exit(0) diff --git a/nodescraper/pluginexecutor.py b/nodescraper/pluginexecutor.py index e47a6cc8..6e2696ac 100644 --- a/nodescraper/pluginexecutor.py +++ b/nodescraper/pluginexecutor.py @@ -33,6 +33,7 @@ from pydantic import BaseModel from nodescraper.constants import DEFAULT_LOGGER +from nodescraper.enums import ExecutionStatus from nodescraper.interfaces import ConnectionManager, DataPlugin, PluginInterface from nodescraper.models import PluginConfig, SystemInfo from nodescraper.models.pluginresult import PluginResult @@ -119,6 +120,13 @@ def run_queue(self) -> list[PluginResult]: plugin_name, plugin_args = plugin_queue.popleft() if plugin_name not in self.plugin_registry.plugins: self.logger.error("Unable to find registered plugin for name %s", plugin_name) + plugin_results.append( + PluginResult( + status=ExecutionStatus.ERROR, + source=plugin_name, + message=f"Plugin '{plugin_name}' not found in registry", + ) + ) continue plugin_class = self.plugin_registry.plugins[plugin_name] @@ -140,6 +148,13 @@ def run_queue(self) -> list[PluginResult]: "Unable to find registered connection manager class for %s that is required by", connection_manager_class.__name__, ) + plugin_results.append( + PluginResult( + status=ExecutionStatus.ERROR, + source=plugin_name, + message=f"Connection manager '{connection_manager_class.__name__}' not found in registry", + ) + ) continue if connection_manager_class not in self.connection_library: @@ -173,6 +188,12 @@ def run_queue(self) -> list[PluginResult]: global_run_args = self.apply_global_args_to_plugin( plugin_inst, plugin_class, self.plugin_config.global_args ) + # Merge analysis_args and collection_args + for args_key in ["analysis_args", "collection_args"]: + if args_key in global_run_args and args_key in run_payload: + # Merge: global args override plugin-specific args keys specified in both global and plugin-specific args + run_payload[args_key].update(global_run_args[args_key]) + del global_run_args[args_key] run_payload.update(global_run_args) except ValueError as ve: self.logger.error( @@ -180,6 +201,13 @@ def run_queue(self) -> list[PluginResult]: plugin_name, str(ve), ) + plugin_results.append( + PluginResult( + status=ExecutionStatus.ERROR, + source=plugin_name, + message=f"Invalid global_args for plugin: {str(ve)}", + ) + ) continue self.logger.info("-" * 50) diff --git a/test/functional/test_run_plugins.py b/test/functional/test_run_plugins.py index c8446f8b..a17f4321 100644 --- a/test/functional/test_run_plugins.py +++ b/test/functional/test_run_plugins.py @@ -88,9 +88,15 @@ def test_run_all_plugins_together(run_cli_command, all_plugins, tmp_path): def test_run_plugin_with_invalid_name(run_cli_command): - """Test that running a non-existent plugin fails gracefully.""" + """Test that running a non-existent plugin logs a warning and falls back to default config.""" result = run_cli_command(["run-plugins", "NonExistentPlugin"], check=False) - assert result.returncode != 0 - output = (result.stdout + result.stderr).lower() - assert "error" in output or "invalid" in output or "not found" in output + # Invalid plugin is ignored and default config runs instead + # Exit code depends on whether default config plugins succeed + output = result.stdout + result.stderr + # Check that warning was logged for invalid plugin + assert "Invalid plugin name(s) ignored: NonExistentPlugin" in output + # Check that default config was used + assert "running default config" in output.lower() or "NodeStatus" in output + # Verify it didn't crash + assert "Data written to csv file" in output diff --git a/test/unit/framework/test_cli.py b/test/unit/framework/test_cli.py index 79aca013..cd266ed9 100644 --- a/test/unit/framework/test_cli.py +++ b/test/unit/framework/test_cli.py @@ -115,12 +115,12 @@ def test_system_info_builder(): ( ["--sys-name", "test-sys", "--sys-sku", "test-sku"], ["TestPlugin1", "TestPlugin2"], - (["--sys-name", "test-sys", "--sys-sku", "test-sku"], {}), + (["--sys-name", "test-sys", "--sys-sku", "test-sku"], {}, []), ), ( ["--sys-name", "test-sys", "--sys-sku", "test-sku", "run-plugins", "-h"], ["TestPlugin1", "TestPlugin2"], - (["--sys-name", "test-sys", "--sys-sku", "test-sku", "run-plugins", "-h"], {}), + (["--sys-name", "test-sys", "--sys-sku", "test-sku", "run-plugins", "-h"], {}, []), ), ( [ @@ -143,6 +143,7 @@ def test_system_info_builder(): "TestPlugin1": ["--plugin1_arg", "test-val1"], "TestPlugin2": ["--plugin2_arg", "test-val2"], }, + [], ), ), ], From bc9e5f64a5b3d93cf9def32fd442d8c1f7a23664 Mon Sep 17 00:00:00 2001 From: Alexandra Bara Date: Tue, 2 Dec 2025 14:00:34 -0600 Subject: [PATCH 7/8] removed err throw for invalid plugin --- nodescraper/cli/cli.py | 13 ++++++++++++- nodescraper/pluginexecutor.py | 22 ---------------------- test/functional/test_run_plugins.py | 13 +++++++++++++ 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/nodescraper/cli/cli.py b/nodescraper/cli/cli.py index 3c0407bf..379ceb84 100644 --- a/nodescraper/cli/cli.py +++ b/nodescraper/cli/cli.py @@ -338,7 +338,18 @@ def process_args( else: cur_plugin = None for arg in plugin_args: - if arg in plugin_names: + # Handle comma-separated plugin names (but not arguments) + if not arg.startswith("-") and "," in arg: + # Split comma-separated plugin names + for potential_plugin in arg.split(","): + potential_plugin = potential_plugin.strip() + if potential_plugin in plugin_names: + plugin_arg_map[potential_plugin] = [] + cur_plugin = potential_plugin + elif potential_plugin: + # Track invalid plugin names to log event later + invalid_plugins.append(potential_plugin) + elif arg in plugin_names: plugin_arg_map[arg] = [] cur_plugin = arg elif cur_plugin: diff --git a/nodescraper/pluginexecutor.py b/nodescraper/pluginexecutor.py index 6e2696ac..d03010c6 100644 --- a/nodescraper/pluginexecutor.py +++ b/nodescraper/pluginexecutor.py @@ -33,7 +33,6 @@ from pydantic import BaseModel from nodescraper.constants import DEFAULT_LOGGER -from nodescraper.enums import ExecutionStatus from nodescraper.interfaces import ConnectionManager, DataPlugin, PluginInterface from nodescraper.models import PluginConfig, SystemInfo from nodescraper.models.pluginresult import PluginResult @@ -120,13 +119,6 @@ def run_queue(self) -> list[PluginResult]: plugin_name, plugin_args = plugin_queue.popleft() if plugin_name not in self.plugin_registry.plugins: self.logger.error("Unable to find registered plugin for name %s", plugin_name) - plugin_results.append( - PluginResult( - status=ExecutionStatus.ERROR, - source=plugin_name, - message=f"Plugin '{plugin_name}' not found in registry", - ) - ) continue plugin_class = self.plugin_registry.plugins[plugin_name] @@ -148,13 +140,6 @@ def run_queue(self) -> list[PluginResult]: "Unable to find registered connection manager class for %s that is required by", connection_manager_class.__name__, ) - plugin_results.append( - PluginResult( - status=ExecutionStatus.ERROR, - source=plugin_name, - message=f"Connection manager '{connection_manager_class.__name__}' not found in registry", - ) - ) continue if connection_manager_class not in self.connection_library: @@ -201,13 +186,6 @@ def run_queue(self) -> list[PluginResult]: plugin_name, str(ve), ) - plugin_results.append( - PluginResult( - status=ExecutionStatus.ERROR, - source=plugin_name, - message=f"Invalid global_args for plugin: {str(ve)}", - ) - ) continue self.logger.info("-" * 50) diff --git a/test/functional/test_run_plugins.py b/test/functional/test_run_plugins.py index a17f4321..6cb34cb4 100644 --- a/test/functional/test_run_plugins.py +++ b/test/functional/test_run_plugins.py @@ -100,3 +100,16 @@ def test_run_plugin_with_invalid_name(run_cli_command): assert "running default config" in output.lower() or "NodeStatus" in output # Verify it didn't crash assert "Data written to csv file" in output + + +def test_run_comma_separated_plugins_with_invalid(run_cli_command): + """Test that comma-separated plugins run valid ones and ignore invalid ones.""" + result = run_cli_command(["run-plugins", "AmdSmiPlugin,SomePlugin"], check=False) + + output = result.stdout + result.stderr + # Check that warning was logged for invalid plugin + assert "Invalid plugin name(s) ignored: SomePlugin" in output + # Check that AmdSmiPlugin actually ran + assert "Running plugin AmdSmiPlugin" in output + # Verify it didn't crash + assert "Data written to csv file" in output From 3a703b325940e1f0097f8e5bcd5d9e48bdb9ead8 Mon Sep 17 00:00:00 2001 From: Alexandra Bara Date: Mon, 8 Dec 2025 09:16:13 -0600 Subject: [PATCH 8/8] undid typecheck changes --- .../plugins/inband/amdsmi/amdsmi_analyzer.py | 80 ++++++++--------- .../plugins/inband/amdsmi/amdsmidata.py | 88 +++++++++---------- 2 files changed, 84 insertions(+), 84 deletions(-) diff --git a/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py b/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py index e72c478c..085f022f 100644 --- a/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py +++ b/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py @@ -25,7 +25,7 @@ ############################################################################### import io from collections import defaultdict -from typing import Any, Dict, List, Optional, Union +from typing import Any, Optional, Union from nodescraper.enums import EventCategory, EventPriority from nodescraper.interfaces import DataAnalyzer @@ -53,16 +53,16 @@ class AmdSmiAnalyzer(CperAnalysisTaskMixin, DataAnalyzer[AmdSmiDataModel, None]) def check_expected_max_power( self, - amdsmi_static_data: List[AmdSmiStatic], + amdsmi_static_data: list[AmdSmiStatic], expected_max_power: int, ): """Check against expected max power Args: - amdsmi_static_data (List[AmdSmiStatic]): AmdSmiStatic data model + amdsmi_static_data (list[AmdSmiStatic]): AmdSmiStatic data model expected_max_power (int): expected max power """ - incorrect_max_power_gpus: Dict[int, Union[int, str, float]] = {} + incorrect_max_power_gpus: dict[int, Union[int, str, float]] = {} for gpu in amdsmi_static_data: if gpu.limit is None or gpu.limit.max_power is None: self._log_event( @@ -102,18 +102,18 @@ def check_expected_max_power( def check_expected_driver_version( self, - amdsmi_static_data: List[AmdSmiStatic], + amdsmi_static_data: list[AmdSmiStatic], expected_driver_version: str, ) -> None: """Check expectecd driver version Args: - amdsmi_static_data (List[AmdSmiStatic]): AmdSmiStatic data model + amdsmi_static_data (list[AmdSmiStatic]): AmdSmiStatic data model expected_driver_version (str): expected driver version """ - bad_driver_gpus: List[int] = [] + bad_driver_gpus: list[int] = [] - versions_by_gpu: Dict[int, Optional[str]] = {} + versions_by_gpu: dict[int, Optional[str]] = {} for gpu in amdsmi_static_data: ver: Optional[str] = None if gpu.driver is not None: @@ -136,7 +136,7 @@ def check_expected_driver_version( def check_amdsmi_metric_pcie( self, - amdsmi_metric_data: List[AmdSmiMetric], + amdsmi_metric_data: list[AmdSmiMetric], l0_to_recovery_count_error_threshold: int, l0_to_recovery_count_warning_threshold: int, ): @@ -146,7 +146,7 @@ def check_amdsmi_metric_pcie( Expected width/speeds should come from SKU info. Args: - amdsmi_metric_data (List[AmdSmiMetric]): AmdSmiMetric data model + amdsmi_metric_data (list[AmdSmiMetric]): AmdSmiMetric data model l0_to_recovery_count_error_threshold (int): Threshold for error events l0_to_recovery_count_warning_threshold (int): Threshold for warning events """ @@ -242,19 +242,19 @@ def check_amdsmi_metric_pcie( console_log=True, ) - def check_amdsmi_metric_ecc_totals(self, amdsmi_metric_data: List[AmdSmiMetric]): + def check_amdsmi_metric_ecc_totals(self, amdsmi_metric_data: list[AmdSmiMetric]): """Check ECC totals for all GPUs Raises errors for uncorrectable errors, warnings for correctable and deferred. Args: - amdsmi_metric_data (List[AmdSmiMetric]): AmdSmiMetric data model + amdsmi_metric_data (list[AmdSmiMetric]): AmdSmiMetric data model """ for metric in amdsmi_metric_data: ecc_totals = metric.ecc gpu = metric.gpu - ecc_checks: List[tuple[EventPriority, Optional[int], str]] = [ + ecc_checks: list[tuple[EventPriority, Optional[int], str]] = [ ( EventPriority.WARNING, ecc_totals.total_correctable_count, @@ -292,13 +292,13 @@ def check_amdsmi_metric_ecc_totals(self, amdsmi_metric_data: List[AmdSmiMetric]) console_log=True, ) - def check_amdsmi_metric_ecc(self, amdsmi_metric_data: List[AmdSmiMetric]): + def check_amdsmi_metric_ecc(self, amdsmi_metric_data: list[AmdSmiMetric]): """Check ECC counts in all blocks for all GPUs Raises errors for uncorrectable errors, warnings for correctable and deferred. Args: - amdsmi_metric_data (List[AmdSmiMetric]): AmdSmiMetric data model + amdsmi_metric_data (list[AmdSmiMetric]): AmdSmiMetric data model """ for metric in amdsmi_metric_data: gpu = metric.gpu @@ -352,15 +352,15 @@ def check_amdsmi_metric_ecc(self, amdsmi_metric_data: List[AmdSmiMetric]): ) def expected_gpu_processes( - self, processes_data: Optional[List[Processes]], max_num_processes: int + self, processes_data: Optional[list[Processes]], max_num_processes: int ): """Check the number of GPU processes running Args: - processes_data (Optional[List[Processes]]): list of processes per GPU + processes_data (Optional[list[Processes]]): list of processes per GPU max_num_processes (int): max number of expected processes """ - gpu_exceeds_num_processes: Dict[int, int] = {} + gpu_exceeds_num_processes: dict[int, int] = {} if processes_data is None or len(processes_data) == 0: self._log_event( category=EventCategory.PLATFORM, @@ -392,13 +392,13 @@ def expected_gpu_processes( console_log=True, ) - def static_consistancy_check(self, amdsmi_static_data: List[AmdSmiStatic]): + def static_consistancy_check(self, amdsmi_static_data: list[AmdSmiStatic]): """Check consistency of expected data Args: - amdsmi_static_data (List[AmdSmiStatic]): AmdSmiStatic data model + amdsmi_static_data (list[AmdSmiStatic]): AmdSmiStatic data model """ - consistancy_data: Dict[str, Union[set[str], set[int]]] = { + consistancy_data: dict[str, Union[set[str], set[int]]] = { "market_name": {gpu.asic.market_name for gpu in amdsmi_static_data}, "vendor_id": {gpu.asic.vendor_id for gpu in amdsmi_static_data}, "vendor_name": {gpu.asic.vendor_name for gpu in amdsmi_static_data}, @@ -425,7 +425,7 @@ def static_consistancy_check(self, amdsmi_static_data: List[AmdSmiStatic]): def check_static_data( self, - amdsmi_static_data: List[AmdSmiStatic], + amdsmi_static_data: list[AmdSmiStatic], vendor_id: Optional[str], subvendor_id: Optional[str], device_id: tuple[Optional[str], Optional[str]], @@ -435,7 +435,7 @@ def check_static_data( """Check expected static data Args: - amdsmi_static_data (List[AmdSmiStatic]): AmdSmiStatic data + amdsmi_static_data (list[AmdSmiStatic]): AmdSmiStatic data vendor_id (Optional[str]): expected vendor_id subvendor_id (Optional[str]): expected subvendor_id device_id (tuple[Optional[str], Optional[str]]): expected device_id @@ -443,9 +443,9 @@ def check_static_data( sku_name (Optional[str]): expected sku_name """ - mismatches: List[tuple[int, str, str, str]] = [] + mismatches: list[tuple[int, str, str, str]] = [] - expected_data: Dict[str, Optional[str]] = { + expected_data: dict[str, Optional[str]] = { "vendor_id": vendor_id, "subvendor_id": subvendor_id, "vendor_name": "Advanced Micro Devices Inc", @@ -453,7 +453,7 @@ def check_static_data( } for gpu_data in amdsmi_static_data: - collected_data: Dict[str, str] = { + collected_data: dict[str, str] = { "vendor_id": gpu_data.asic.vendor_id, "subvendor_id": gpu_data.asic.subvendor_id, "vendor_name": gpu_data.asic.vendor_name, @@ -504,24 +504,24 @@ def check_static_data( def _format_static_mismatch_payload( self, - mismatches: List[tuple[int, str, str, str]], - ) -> Dict[str, Any]: + mismatches: list[tuple[int, str, str, str]], + ) -> dict[str, Any]: """Helper function for pretty printing mismatch in expected data Args: - mismatches (List[tuple[int, str, str, str]]): mismatched data per GPU + mismatches (list[tuple[int, str, str, str]]): mismatched data per GPU Returns: - Dict[str, Any]: dict of mismatched data per GPU + dict[str, Any]: dict of mismatched data per GPU """ - per_gpu: Dict[int, List[Dict[str, str]]] = defaultdict(list) + per_gpu: dict[int, list[dict[str, str]]] = defaultdict(list) field_set: set[str] = set() for gpu, field, expected, actual in mismatches: field_set.add(field) per_gpu[gpu].append({"field": field, "expected": expected, "actual": actual}) - per_gpu_list: List[Dict[str, Any]] = [ + per_gpu_list: list[dict[str, Any]] = [ {"gpu": gpu, "mismatches": entries} for gpu, entries in sorted(per_gpu.items(), key=lambda kv: kv[0]) ] @@ -537,13 +537,13 @@ def _format_static_mismatch_payload( def check_pldm_version( self, - amdsmi_fw_data: Optional[List[Fw]], + amdsmi_fw_data: Optional[list[Fw]], expected_pldm_version: Optional[str], ): """Check expected pldm version Args: - amdsmi_fw_data (Optional[List[Fw]]): data model + amdsmi_fw_data (Optional[list[Fw]]): data model expected_pldm_version (Optional[str]): expected pldm version """ PLDM_STRING = "PLDM_BUNDLE" @@ -555,8 +555,8 @@ def check_pldm_version( data={"amdsmi_fw_data": amdsmi_fw_data}, ) return - mismatched_gpus: List[int] = [] - pldm_missing_gpus: List[int] = [] + mismatched_gpus: list[int] = [] + pldm_missing_gpus: list[int] = [] for fw_data in amdsmi_fw_data: gpu = fw_data.gpu if isinstance(fw_data.fw_list, str): @@ -641,14 +641,14 @@ def check_expected_memory_partition_mode( def check_expected_xgmi_link_speed( self, - xgmi_metric: Optional[List[XgmiMetrics]], - expected_xgmi_speed: Optional[List[float]] = None, + xgmi_metric: Optional[list[XgmiMetrics]], + expected_xgmi_speed: Optional[list[float]] = None, ): """Check the XGMI link speed for all GPUs Args: - xgmi_metric (Optional[List[XgmiMetrics]]): XGMI metrics data - expected_xgmi_speed (Optional[List[float]]): List of expected XGMI speeds (GT/s) + xgmi_metric (Optional[list[XgmiMetrics]]): XGMI metrics data + expected_xgmi_speed (Optional[list[float]]): List of expected XGMI speeds (GT/s) """ if xgmi_metric is None or len(xgmi_metric) == 0: self._log_event( diff --git a/nodescraper/plugins/inband/amdsmi/amdsmidata.py b/nodescraper/plugins/inband/amdsmi/amdsmidata.py index fe039c40..aacca2ac 100644 --- a/nodescraper/plugins/inband/amdsmi/amdsmidata.py +++ b/nodescraper/plugins/inband/amdsmi/amdsmidata.py @@ -25,7 +25,7 @@ ############################################################################### import re from enum import Enum -from typing import Any, Dict, List, Mapping, Optional, Union +from typing import Any, Mapping, Optional, Union from pydantic import ( AliasChoices, @@ -49,15 +49,15 @@ def na_to_none(values: Union[int, str]): return values -def na_to_none_list(values: List[Union[int, str, None]]) -> List[Union[int, str, None]]: - ret_list: List[Union[int, str, None]] = values.copy() +def na_to_none_list(values: list[Union[int, str, None]]) -> list[Union[int, str, None]]: + ret_list: list[Union[int, str, None]] = values.copy() for i in range(len(ret_list)): if ret_list[i] == "N/A": ret_list[i] = None return ret_list -def na_to_none_dict(values: object) -> Optional[Dict[str, Any]]: +def na_to_none_dict(values: object) -> Optional[dict[str, Any]]: """Normalize mapping-like fields where 'N/A' or empty should become None. Accepts None; returns None for 'N/A'/'NA'/'' or non-mapping inputs.""" if values is None: @@ -67,7 +67,7 @@ def na_to_none_dict(values: object) -> Optional[Dict[str, Any]]: if not isinstance(values, Mapping): return None - out: Dict[str, Any] = {} + out: dict[str, Any] = {} for k, v in values.items(): if isinstance(v, str) and v.strip().upper() in {"N/A", "NA", ""}: out[k] = None @@ -207,7 +207,7 @@ class ProcessListItem(BaseModel): class Processes(BaseModel): gpu: int - process_list: List[ProcessListItem] + process_list: list[ProcessListItem] # FW @@ -218,7 +218,7 @@ class FwListItem(BaseModel): class Fw(BaseModel): gpu: int - fw_list: Union[List[FwListItem], str] + fw_list: Union[list[FwListItem], str] class AmdSmiListItem(BaseModel): @@ -279,8 +279,8 @@ class PartitionCompute(BaseModel): class Partition(BaseModel): """Contains the partition info for amd-smi""" - memory_partition: List[PartitionMemory] = Field(default_factory=list) - compute_partition: List[PartitionCompute] = Field(default_factory=list) + memory_partition: list[PartitionMemory] = Field(default_factory=list) + compute_partition: list[PartitionCompute] = Field(default_factory=list) ### STATIC DATA ### @@ -362,7 +362,7 @@ class StaticRas(BaseModel): single_bit_schema: EccState double_bit_schema: EccState poison_schema: EccState - ecc_block_state: Union[Dict[str, EccState], str] + ecc_block_state: Union[dict[str, EccState], str] class StaticPartition(BaseModel): @@ -383,13 +383,13 @@ class StaticPolicy(BaseModel): class StaticSocPstate(BaseModel): num_supported: int current_id: int - policies: List[StaticPolicy] + policies: list[StaticPolicy] class StaticXgmiPlpd(BaseModel): num_supported: int current_id: int - plpds: List[StaticPolicy] + plpds: list[StaticPolicy] class StaticNuma(BaseModel): @@ -410,7 +410,7 @@ class StaticVram(AmdSmiBaseModel): class StaticCacheInfoItem(AmdSmiBaseModel): cache: ValueUnit - cache_properties: List[str] + cache_properties: list[str] cache_size: Optional[ValueUnit] cache_level: ValueUnit max_num_cu_shared: ValueUnit @@ -454,9 +454,9 @@ class AmdSmiStatic(BaseModel): process_isolation: str numa: StaticNuma vram: StaticVram - cache_info: List[StaticCacheInfoItem] + cache_info: list[StaticCacheInfoItem] partition: Optional[StaticPartition] = None # This has been removed in Amd-smi 26.0.0+d30a0afe+ - clock: Optional[Dict[str, Union[StaticClockData, None]]] = None + clock: Optional[dict[str, Union[StaticClockData, None]]] = None na_validator_dict = field_validator("clock", mode="before")(na_to_none_dict) na_validator = field_validator("soc_pstate", "xgmi_plpd", "vbios", "limit", mode="before")( na_to_none @@ -473,7 +473,7 @@ class PageData(BaseModel): class BadPages(BaseModel): gpu: int - retired: List[PageData] + retired: list[PageData] # Metric Data @@ -481,11 +481,11 @@ class MetricUsage(BaseModel): gfx_activity: Optional[ValueUnit] umc_activity: Optional[ValueUnit] mm_activity: Optional[ValueUnit] - vcn_activity: List[Optional[Union[ValueUnit, str]]] - jpeg_activity: List[Optional[Union[ValueUnit, str]]] - gfx_busy_inst: Optional[Dict[str, List[Optional[Union[ValueUnit, str]]]]] - jpeg_busy: Optional[Dict[str, List[Optional[Union[ValueUnit, str]]]]] - vcn_busy: Optional[Dict[str, List[Optional[Union[ValueUnit, str]]]]] + vcn_activity: list[Optional[Union[ValueUnit, str]]] + jpeg_activity: list[Optional[Union[ValueUnit, str]]] + gfx_busy_inst: Optional[dict[str, list[Optional[Union[ValueUnit, str]]]]] + jpeg_busy: Optional[dict[str, list[Optional[Union[ValueUnit, str]]]]] + vcn_busy: Optional[dict[str, list[Optional[Union[ValueUnit, str]]]]] na_validator_list = field_validator("vcn_activity", "jpeg_activity", mode="before")( na_to_none_list ) @@ -648,9 +648,9 @@ class MetricMemUsage(BaseModel): class MetricThrottleVu(BaseModel): - xcp_0: Optional[List[Optional[Union[ValueUnit, str]]]] = None + xcp_0: Optional[list[Optional[Union[ValueUnit, str]]]] = None # Deprecated below - value: Optional[Dict[str, List[Union[int, str]]]] = Field(deprecated=True, default=None) + value: Optional[dict[str, list[Union[int, str]]]] = Field(deprecated=True, default=None) unit: str = Field(deprecated=True, default="") @@ -766,15 +766,15 @@ class AmdSmiMetric(BaseModel): gpu: int usage: MetricUsage power: MetricPower - clock: Dict[str, MetricClockData] + clock: dict[str, MetricClockData] temperature: MetricTemperature pcie: MetricPcie ecc: MetricEccTotals - ecc_blocks: Union[Dict[str, EccData], str] + ecc_blocks: Union[dict[str, EccData], str] fan: MetricFan voltage_curve: Optional[MetricVoltageCurve] - perf_level: Optional[Union[str, Dict]] - xgmi_err: Optional[Union[str, Dict]] + perf_level: Optional[Union[str, dict]] + xgmi_err: Optional[Union[str, dict]] energy: Optional[MetricEnergy] mem_usage: MetricMemUsage throttle: MetricThrottle @@ -783,7 +783,7 @@ class AmdSmiMetric(BaseModel): @field_validator("ecc_blocks", mode="before") @classmethod - def validate_ecc_blocks(cls, value: Union[Dict[str, EccData], str]) -> Dict[str, EccData]: + def validate_ecc_blocks(cls, value: Union[dict[str, EccData], str]) -> dict[str, EccData]: """Validate the ecc_blocks field.""" if isinstance(value, str): # If it's a string, we assume it's "N/A" and return an empty dict @@ -849,7 +849,7 @@ class XgmiLinkMetrics(BaseModel): bit_rate: Optional[ValueUnit] max_bandwidth: Optional[ValueUnit] link_type: str - links: List[XgmiLink] + links: list[XgmiLink] na_validator = field_validator("max_bandwidth", "bit_rate", mode="before")(na_to_none) @@ -862,7 +862,7 @@ class XgmiMetrics(BaseModel): class XgmiLinks(BaseModel): gpu: int bdf: str - link_status: List[LinkStatusTable] + link_status: list[LinkStatusTable] class CoherentTable(Enum): @@ -912,15 +912,15 @@ def bandwidth_to(self) -> Optional[int]: class Topo(BaseModel): gpu: int bdf: str - links: List[TopoLink] + links: list[TopoLink] class AmdSmiTstData(BaseModel): "Summary of amdsmitst results, with list and count of passing/skipped/failed tests" - passed_tests: List[str] = Field(default_factory=list) - skipped_tests: List[str] = Field(default_factory=list) - failed_tests: List[str] = Field(default_factory=list) + passed_tests: list[str] = Field(default_factory=list) + skipped_tests: list[str] = Field(default_factory=list) + failed_tests: list[str] = Field(default_factory=list) passed_test_count: int = 0 skipped_test_count: int = 0 failed_test_count: int = 0 @@ -943,17 +943,17 @@ class AmdSmiDataModel(DataModel): ) version: Optional[AmdSmiVersion] = None - gpu_list: Optional[List[AmdSmiListItem]] = Field(default_factory=list) + gpu_list: Optional[list[AmdSmiListItem]] = Field(default_factory=list) partition: Optional[Partition] = None - process: Optional[List[Processes]] = Field(default_factory=list) - topology: Optional[List[Topo]] = Field(default_factory=list) - firmware: Optional[List[Fw]] = Field(default_factory=list) - bad_pages: Optional[List[BadPages]] = Field(default_factory=list) - static: Optional[List[AmdSmiStatic]] = Field(default_factory=list) - metric: Optional[List[AmdSmiMetric]] = Field(default_factory=list) - xgmi_metric: Optional[List[XgmiMetrics]] = Field(default_factory=list) - xgmi_link: Optional[List[XgmiLinks]] = Field(default_factory=list) - cper_data: Optional[List[FileModel]] = Field(default_factory=list) + process: Optional[list[Processes]] = Field(default_factory=list) + topology: Optional[list[Topo]] = Field(default_factory=list) + firmware: Optional[list[Fw]] = Field(default_factory=list) + bad_pages: Optional[list[BadPages]] = Field(default_factory=list) + static: Optional[list[AmdSmiStatic]] = Field(default_factory=list) + metric: Optional[list[AmdSmiMetric]] = Field(default_factory=list) + xgmi_metric: Optional[list[XgmiMetrics]] = Field(default_factory=list) + xgmi_link: Optional[list[XgmiLinks]] = Field(default_factory=list) + cper_data: Optional[list[FileModel]] = Field(default_factory=list) amdsmitst_data: AmdSmiTstData = Field(default_factory=AmdSmiTstData) def get_list(self, gpu: int) -> Optional[AmdSmiListItem]: