diff --git a/nodescraper/plugins/inband/amdsmi/__init__.py b/nodescraper/plugins/inband/amdsmi/__init__.py new file mode 100644 index 00000000..f117a9fd --- /dev/null +++ b/nodescraper/plugins/inband/amdsmi/__init__.py @@ -0,0 +1,28 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +from .amdsmi_plugin import AmdSmiPlugin + +__all__ = ["AmdSmiPlugin"] diff --git a/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py b/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py new file mode 100644 index 00000000..18f2ef85 --- /dev/null +++ b/nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py @@ -0,0 +1,475 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +from collections import defaultdict +from typing import Any, Dict, List, Optional, Union + +from nodescraper.enums import EventCategory, EventPriority +from nodescraper.interfaces import DataAnalyzer +from nodescraper.models import TaskResult + +from .amdsmidata import AmdSmiDataModel, AmdSmiStatic, Fw, Partition, Processes +from .analyzer_args import AmdSmiAnalyzerArgs + + +class AmdSmiAnalyzer(DataAnalyzer[AmdSmiDataModel, None]): + """""" + + DATA_MODEL = AmdSmiDataModel + + def check_expected_max_power( + self, + amdsmi_static_data: list[AmdSmiStatic], + expected_max_power: int, + ): + """Check against expected max power + + Args: + amdsmi_static_data (list[AmdSmiStatic]): AmdSmiStatic data model + expected_max_power (int): expected max power + """ + incorrect_max_power_gpus: dict[int, Union[int, str, float]] = {} + for gpu in amdsmi_static_data: + if gpu.limit is None or gpu.limit.max_power is None: + self._log_event( + category=EventCategory.PLATFORM, + description=f"GPU: {gpu.gpu} has no max power limit set", + priority=EventPriority.WARNING, + data={"gpu": gpu.gpu}, + ) + continue + max_power_value = gpu.limit.max_power.value + try: + max_power_float = float(max_power_value) + except ValueError: + self._log_event( + category=EventCategory.PLATFORM, + description=f"GPU: {gpu.gpu} has an invalid max power limit", + priority=EventPriority.ERROR, + data={ + "gpu": gpu.gpu, + "max_power_value": max_power_value, + }, + ) + continue + if max_power_float != expected_max_power: + incorrect_max_power_gpus[gpu.gpu] = max_power_float + if incorrect_max_power_gpus: + self._log_event( + category=EventCategory.PLATFORM, + description="Max power mismatch", + priority=EventPriority.ERROR, + data={ + "gpus": list(incorrect_max_power_gpus.keys()), + "max_power_values": incorrect_max_power_gpus, + "expected_max_power": expected_max_power, + }, + ) + + def check_expected_driver_version( + self, + amdsmi_static_data: list[AmdSmiStatic], + expected_driver_version: str, + ) -> None: + """Check expectecd driver version + + Args: + amdsmi_static_data (list[AmdSmiStatic]): AmdSmiStatic data model + expected_driver_version (str): expected driver version + """ + bad_driver_gpus: list[int] = [] + + versions_by_gpu: dict[int, Optional[str]] = {} + for gpu in amdsmi_static_data: + ver: Optional[str] = None + if gpu.driver is not None: + ver = gpu.driver.version + versions_by_gpu[gpu.gpu] = ver + if ver != expected_driver_version: + bad_driver_gpus.append(gpu.gpu) + + if bad_driver_gpus: + self._log_event( + category=EventCategory.PLATFORM, + description="Driver Version Mismatch", + priority=EventPriority.ERROR, + data={ + "gpus": bad_driver_gpus, + "driver_version": {g: versions_by_gpu[g] for g in bad_driver_gpus}, + "expected_driver_version": expected_driver_version, + }, + ) + + def expected_gpu_processes( + self, processes_data: Optional[list[Processes]], max_num_processes: int + ): + """Check the number of GPU processes running + + Args: + processes_data (Optional[list[Processes]]): list of processes per GPU + max_num_processes (int): max number of expected processes + """ + gpu_exceeds_num_processes: dict[int, int] = {} + if processes_data is None or len(processes_data) == 0: + self._log_event( + category=EventCategory.PLATFORM, + description="No GPU processes data available", + priority=EventPriority.WARNING, + data={"processes_data": processes_data}, + console_log=True, + ) + return + for process in processes_data: + if len(process.process_list) == 0 or isinstance( + process.process_list[0].process_info, str + ): + # Skip if there are no processes + continue + + process_count = len(process.process_list) + if process_count > max_num_processes: + gpu_exceeds_num_processes[process.gpu] = process_count + + if gpu_exceeds_num_processes: + self._log_event( + category=EventCategory.PLATFORM, + description="Number of processes exceeds max processes", + priority=EventPriority.ERROR, + data={ + "gpu_exceeds_num_processes": gpu_exceeds_num_processes, + }, + console_log=True, + ) + + def static_consistancy_check(self, amdsmi_static_data: list[AmdSmiStatic]): + """Check consistency of expected data + + Args: + amdsmi_static_data (list[AmdSmiStatic]): AmdSmiStatic data model + """ + consistancy_data: dict[str, Union[set[str], set[int]]] = { + "market_name": {gpu.asic.market_name for gpu in amdsmi_static_data}, + "vendor_id": {gpu.asic.vendor_id for gpu in amdsmi_static_data}, + "vendor_name": {gpu.asic.vendor_name for gpu in amdsmi_static_data}, + "subvendor_id": {gpu.asic.subvendor_id for gpu in amdsmi_static_data}, + "subsystem_id": {gpu.asic.subsystem_id for gpu in amdsmi_static_data}, + "device_id": {gpu.asic.device_id for gpu in amdsmi_static_data}, + "rev_id": {gpu.asic.rev_id for gpu in amdsmi_static_data}, + "num_compute_units": {str(gpu.asic.num_compute_units) for gpu in amdsmi_static_data}, + "target_graphics_version": { + gpu.asic.target_graphics_version for gpu in amdsmi_static_data + }, + } + for key, value in consistancy_data.items(): + if len(value) > 1: + self._log_event( + category=EventCategory.PLATFORM, + description=f"{key} is not consistent across all GPUs", + priority=EventPriority.WARNING, + data={ + "field": key, + "non_consistent_values": value, + }, + ) + + def check_static_data( + self, + amdsmi_static_data: list[AmdSmiStatic], + vendor_id: Optional[str], + subvendor_id: Optional[str], + device_id: tuple[Optional[str], Optional[str]], + subsystem_id: tuple[Optional[str], Optional[str]], + sku_name: Optional[str], + ) -> None: + """Check expected static data + + Args: + amdsmi_static_data (list[AmdSmiStatic]): AmdSmiStatic data + vendor_id (Optional[str]): expected vendor_id + subvendor_id (Optional[str]): expected subvendor_id + device_id (tuple[Optional[str], Optional[str]]): expected device_id + subsystem_id (tuple[Optional[str], Optional[str]]): expected subsystem_id + sku_name (Optional[str]): expected sku_name + """ + + mismatches: list[tuple[int, str, str, str]] = [] + + expected_data: Dict[str, Optional[str]] = { + "vendor_id": vendor_id, + "subvendor_id": subvendor_id, + "vendor_name": "Advanced Micro Devices Inc", + "market_name": sku_name, + } + + for gpu_data in amdsmi_static_data: + collected_data: dict[str, str] = { + "vendor_id": gpu_data.asic.vendor_id, + "subvendor_id": gpu_data.asic.subvendor_id, + "vendor_name": gpu_data.asic.vendor_name, + "market_name": gpu_data.asic.market_name, + } + + for key, expected in expected_data.items(): + if expected is None: + continue + actual = collected_data[key] + if expected not in actual: + mismatches.append((gpu_data.gpu, key, expected, actual)) + break + + if device_id[0] is not None and device_id[1] is not None: + dev_actual = gpu_data.asic.device_id + if ( + device_id[0].upper() not in dev_actual.upper() + and device_id[1].upper() not in dev_actual.upper() + ): + mismatches.append( + (gpu_data.gpu, "device_id", f"{device_id[0]}|{device_id[1]}", dev_actual) + ) + + if subsystem_id[0] is not None and subsystem_id[1] is not None: + subsys_actual = gpu_data.asic.subsystem_id + if ( + subsystem_id[0].upper() not in subsys_actual.upper() + and subsystem_id[1].upper() not in subsys_actual.upper() + ): + mismatches.append( + ( + gpu_data.gpu, + "subsystem_id", + f"{subsystem_id[0]}|{subsystem_id[1]}", + subsys_actual, + ) + ) + + if mismatches: + payload = self._format_static_mismatch_payload(mismatches) + self._log_event( + category=EventCategory.PLATFORM, + description="amd-smi static data mismatch", + priority=EventPriority.ERROR, + data=payload, + ) + + def _format_static_mismatch_payload( + self, + mismatches: List[tuple[int, str, str, str]], + ) -> Dict[str, Any]: + """Helper function for pretty printing mismatch in expected data + + Args: + mismatches (List[tuple[int, str, str, str]]): mismatched data per GPU + + Returns: + Dict[str, Any]: dict of mismatched data per GPU + """ + per_gpu: Dict[int, List[Dict[str, str]]] = defaultdict(list) + field_set: set[str] = set() + + for gpu, field, expected, actual in mismatches: + field_set.add(field) + per_gpu[gpu].append({"field": field, "expected": expected, "actual": actual}) + + per_gpu_list: List[Dict[str, Any]] = [ + {"gpu": gpu, "mismatches": entries} + for gpu, entries in sorted(per_gpu.items(), key=lambda kv: kv[0]) + ] + + return { + "summary": { + "gpus_affected": len(per_gpu), + "fields": sorted(field_set), + "total_mismatches": sum(len(v) for v in per_gpu.values()), + }, + "per_gpu": per_gpu_list, + } + + def check_pldm_version( + self, + amdsmi_fw_data: Optional[list[Fw]], + expected_pldm_version: Optional[str], + ): + """Check expected pldm version + + Args: + amdsmi_fw_data (Optional[list[Fw]]): data model + expected_pldm_version (Optional[str]): expected pldm version + """ + PLDM_STRING = "PLDM_BUNDLE" + if amdsmi_fw_data is None or len(amdsmi_fw_data) == 0: + self._log_event( + category=EventCategory.PLATFORM, + description="No AMD SMI firmware data available", + priority=EventPriority.WARNING, + data={"amdsmi_fw_data": amdsmi_fw_data}, + ) + return + mismatched_gpus: list[int] = [] + pldm_missing_gpus: list[int] = [] + for fw_data in amdsmi_fw_data: + gpu = fw_data.gpu + if isinstance(fw_data.fw_list, str): + pldm_missing_gpus.append(gpu) + continue + for fw_info in fw_data.fw_list: + if PLDM_STRING == fw_info.fw_id and expected_pldm_version != fw_info.fw_version: + mismatched_gpus.append(gpu) + if PLDM_STRING == fw_info.fw_id: + break + else: + pldm_missing_gpus.append(gpu) + + if mismatched_gpus or pldm_missing_gpus: + self._log_event( + category=EventCategory.FW, + description="PLDM Version Mismatch", + priority=EventPriority.ERROR, + data={ + "mismatched_gpus": mismatched_gpus, + "pldm_missing_gpus": pldm_missing_gpus, + "expected_pldm_version": expected_pldm_version, + }, + ) + + def check_expected_memory_partition_mode( + self, + partition_data: Optional[Partition], + expected_memory_partition_mode: Optional[str], + expected_compute_partition_mode: Optional[str], + ): + """Check expected mem partition mode + + Args: + partition_data (Optional[Partition]): data model + expected_memory_partition_mode (Optional[str]): expected mem partition mode + expected_compute_partition_mode (Optional[str]): expected compute partition mode + """ + if partition_data is None: + self._log_event( + category=EventCategory.PLATFORM, + description="No AMD SMI Partition data not available", + priority=EventPriority.WARNING, + ) + return + bad_memory_partition_mode_gpus = [] + for partition_current in partition_data.memory_partition: + if ( + expected_memory_partition_mode is not None + and partition_current.partition_type != expected_memory_partition_mode + ): + bad_memory_partition_mode_gpus.append( + { + "gpu_id": partition_current.gpu_id, + "memory_partition_mode": partition_current.partition_type, + } + ) + + for compute_current in partition_data.compute_partition: + if ( + expected_compute_partition_mode is not None + and compute_current.partition_type != expected_compute_partition_mode + ): + bad_memory_partition_mode_gpus.append( + { + "gpu_id": compute_current.gpu_id, + "compute_partition_mode": compute_current.partition_type, + } + ) + + # accelerator currently not avaialbe in API + + if bad_memory_partition_mode_gpus: + self._log_event( + category=EventCategory.PLATFORM, + description="Partition Mode Mismatch", + priority=EventPriority.ERROR, + data={ + "actual_partition_data": bad_memory_partition_mode_gpus, + "expected_memory_partition_mode": expected_memory_partition_mode, + "expected_compute_partition_mode": expected_compute_partition_mode, + }, + ) + + def analyze_data( + self, data: AmdSmiDataModel, args: Optional[AmdSmiAnalyzerArgs] = None + ) -> TaskResult: + """Analyze the amdsmi data against expected data + + Args: + data (AmdSmiDataModel): the AmdSmi data model + args (_type_, optional): optional AmdSmi analyzer args. Defaults to None. + + Returns: + TaskResult: the result of the analysis indicating weather the AmdSmi data model + matched the expected data + """ + + if args is None: + args = AmdSmiAnalyzerArgs() + + if args.expected_gpu_processes: + self.expected_gpu_processes(data.process, args.expected_gpu_processes) + + if data.static is None or len(data.static) == 0: + self._log_event( + category=EventCategory.PLATFORM, + description="No AMD SMI static data available", + priority=EventPriority.WARNING, + data={"amdsmi_static_data": data.static}, + ) + else: + if args.expected_max_power: + self.check_expected_max_power(data.static, args.expected_max_power) + if args.expected_driver_version: + self.check_expected_driver_version(data.static, args.expected_driver_version) + + self.static_consistancy_check(data.static) + if ( + self.system_info.sku + and args.devid_ep + and args.devid_ep_vf + and args.vendorid_ep + and args.check_static_data + ) or args.check_static_data: + self.check_static_data( + data.static, + args.vendorid_ep, + args.vendorid_ep, + (args.devid_ep, args.devid_ep), + (args.devid_ep, args.devid_ep), + sku_name=args.sku_name, + ) + + if args.expected_memory_partition_mode or args.expected_compute_partition_mode: + self.check_expected_memory_partition_mode( + data.partition, + args.expected_memory_partition_mode, + args.expected_compute_partition_mode, + ) + + if args.expected_pldm_version: + self.check_pldm_version(data.firmware, args.expected_pldm_version) + + return self.result diff --git a/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py b/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py new file mode 100644 index 00000000..ca1d077c --- /dev/null +++ b/nodescraper/plugins/inband/amdsmi/amdsmi_collector.py @@ -0,0 +1,1099 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +import json +from typing import Any, Optional, Union + +from pydantic import ValidationError + +from nodescraper.base.inbandcollectortask import InBandDataCollector +from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily +from nodescraper.models import TaskResult +from nodescraper.plugins.inband.amdsmi.amdsmidata import ( + AmdSmiDataModel, + AmdSmiListItem, + AmdSmiStatic, + AmdSmiVersion, + Fw, + FwListItem, + Partition, + PartitionCompute, + PartitionMemory, + Processes, + ProcessInfo, + ProcessListItem, + ProcessMemoryUsage, + ProcessUsage, + StaticAsic, + StaticBoard, + StaticBus, + StaticCacheInfoItem, + StaticClockData, + StaticDriver, + StaticFrequencyLevels, + StaticNuma, + StaticPolicy, + StaticSocPstate, + StaticVbios, + StaticVram, + StaticXgmiPlpd, + ValueUnit, +) +from nodescraper.utils import get_exception_traceback + + +class AmdSmiCollector(InBandDataCollector[AmdSmiDataModel, None]): + """Class for collection of inband tool amd-smi data.""" + + AMD_SMI_EXE = "amd-smi" + + SUPPORTED_OS_FAMILY: set[OSFamily] = {OSFamily.LINUX} + + DATA_MODEL = AmdSmiDataModel + + CMD_VERSION = "amd-smi version --json" + CMD_LIST = "amd-smi list --json" + CMD_PROCESS = "amd-smi process --json" + CMD_PARTITION = "amd-smi partition --json" + CMD_FIRMWARE = "amd-smi firmware --json" + CMD_STATIC = "amd-smi static -g all --json" + + def _check_amdsmi_installed(self) -> bool: + """Check if amd-smi is installed + + Returns: + bool: True if amd-smi is installed, False otherwise + """ + cmd_ret = self._run_sut_cmd("which amd-smi") + return bool(cmd_ret.exit_code == 0 and "no amd-smi in" not in cmd_ret.stdout) + + def _run_amd_smi(self, cmd: str) -> Optional[str]: + """Run amd-smi command + + Args: + cmd (str): command arguments to pass to amd-smi + + Returns: + Optional[str]: stdout from command or None on error + """ + cmd_ret = self._run_sut_cmd(f"{self.AMD_SMI_EXE} {cmd}") + + # Check for known warnings and errors that can be handled + is_group_warning = ( + "User is missing the following required groups" in cmd_ret.stderr + or "User is missing the following required groups" in cmd_ret.stdout + ) + + # Check for known amd-smi internal bugs + is_amdsmi_internal_error = any( + pattern in cmd_ret.stderr for pattern in ["KeyError:", "AttributeError:", "IndexError:"] + ) + + # Log warning if user is missing group + if cmd_ret.stderr != "" or cmd_ret.exit_code != 0: + if is_amdsmi_internal_error: + self._log_event( + category=EventCategory.SW_DRIVER, + description="amd-smi internal error detected", + data={ + "command": cmd, + "exit_code": cmd_ret.exit_code, + "stderr": cmd_ret.stderr, + }, + priority=EventPriority.WARNING, + console_log=True, + ) + return None + elif not is_group_warning: + self._log_event( + category=EventCategory.APPLICATION, + description="Error running amd-smi command", + data={ + "command": cmd, + "exit_code": cmd_ret.exit_code, + "stderr": cmd_ret.stderr, + }, + priority=EventPriority.ERROR, + console_log=True, + ) + return None + else: + self._log_event( + category=EventCategory.APPLICATION, + description="amd-smi warning (continuing): User missing required groups", + data={ + "command": cmd, + "warning": cmd_ret.stderr or cmd_ret.stdout, + }, + priority=EventPriority.WARNING, + console_log=False, + ) + + stdout = cmd_ret.stdout + if is_group_warning and stdout: + lines = stdout.split("\n") + cleaned_lines = [ + line + for line in lines + if not any( + warn in line + for warn in [ + "RuntimeError:", + "WARNING: User is missing", + "Please add user to these groups", + ] + ) + ] + stdout = "\n".join(cleaned_lines).strip() + + return stdout + + def _run_amd_smi_dict(self, cmd: str) -> Optional[Union[dict, list[dict]]]: + """Run amd-smi command with json output + + Args: + cmd (str): command arguments to pass to amd-smi + + Returns: + Optional[Union[dict, list[dict]]]: parsed JSON output or None on error + """ + cmd += " --json" + cmd_ret = self._run_amd_smi(cmd) + if cmd_ret: + try: + return json.loads(cmd_ret) + except json.JSONDecodeError as e: + self._log_event( + category=EventCategory.APPLICATION, + description=f"Error parsing command: `{cmd}` json data", + data={ + "cmd": cmd, + "exception": get_exception_traceback(e), + }, + priority=EventPriority.ERROR, + console_log=True, + ) + return None + return None + + def _to_number(self, v: object) -> Optional[Union[int, float]]: + """Helper function to return number from str, float or "N/A" + + Args: + v (object): non number object + + Returns: + Optional[Union[int, float]]: number version of input + """ + if v in (None, "", "N/A"): + return None + try: + if isinstance(v, (int, float)): + return v + if isinstance(v, str): + s = v.strip() + try: + return int(s) + except Exception: + return float(s) + return float(str(v)) + except Exception: + return None + + def _valueunit(self, v: object, unit: str, *, required: bool = False) -> Optional[ValueUnit]: + """Build ValueUnit instance from object + + Args: + v (object): object to be turned into ValueUnit + unit (str): unit of measurement + required (bool, optional): bool to force instance creation. Defaults to False. + + Returns: + Optional[ValueUnit]: ValueUnit Instance + """ + n = self._to_number(v) + if n is None: + return ValueUnit(value=0, unit=unit) if required else None + return ValueUnit(value=n, unit=unit) + + def _valueunit_req(self, v: object, unit: str) -> ValueUnit: + """Helper function to force ValueUnit instance creation + + Args: + v (object): object + unit (str): unit of measurement + + Returns: + ValueUnit: instance of ValueUnit + """ + vu = self._valueunit(v, unit, required=True) + assert vu is not None + return vu + + def _normalize(self, val: object, default: str = "unknown", slot_type: bool = False) -> str: + """Normalize strings + + Args: + val (object): object + default (str, optional): default option. Defaults to "unknown". + slot_type (bool, optional): map to one of {'OAM','PCIE','CEM','Unknown'}. Defaults to False. + + Returns: + str: normalized string + """ + s = str(val).strip() if val is not None else "" + if not s or s.upper() == "N/A": + return "Unknown" if slot_type else default + + if slot_type: + u = s.upper().replace(" ", "").replace("-", "") + if u == "OAM": + return "OAM" + if u in {"PCIE", "PCIEXPRESS", "PCIEXP"} or u.startswith("PCIE"): + return "PCIE" + if u == "CEM": + return "CEM" + return "Unknown" + + return s + + def _get_amdsmi_data(self) -> Optional[AmdSmiDataModel]: + """Fill in information for AmdSmi data model + + Returns: + Optional[AmdSmiDataModel]: instance of the AmdSmi data model + """ + try: + version = self._get_amdsmi_version() + processes = self.get_process() + partition = self.get_partition() + firmware = self.get_firmware() + gpu_list = self.get_gpu_list() + statics = self.get_static() + except Exception as e: + self._log_event( + category=EventCategory.APPLICATION, + description="Error running amd-smi sub commands", + data={"exception": get_exception_traceback(e)}, + priority=EventPriority.ERROR, + console_log=True, + ) + self.result.status = ExecutionStatus.EXECUTION_FAILURE + return None + + try: + return AmdSmiDataModel( + version=version, + gpu_list=gpu_list, + process=processes, + partition=partition, + firmware=firmware, + static=statics, + ) + except ValidationError as err: + self.logger.warning("Validation err: %s", err) + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build AmdSmiDataModel", + data={"errors": err.errors(include_url=False)}, + priority=EventPriority.ERROR, + ) + return None + + def _get_amdsmi_version(self) -> Optional[AmdSmiVersion]: + """Get amdsmi version and data + + Returns: + Optional[AmdSmiVersion]: version information or None on error + """ + ret = self._run_amd_smi_dict("version") + if not ret or not isinstance(ret, list) or len(ret) == 0: + return None + + version_data = ret[0] if isinstance(ret, list) else ret + if not isinstance(version_data, dict): + return None + + try: + return AmdSmiVersion( + tool="amdsmi", + version=version_data.get("amdsmi_library_version", ""), + amdsmi_library_version=version_data.get("amdsmi_library_version", ""), + rocm_version=version_data.get("rocm_version", ""), + ) + except ValidationError as err: + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build AmdSmiVersion", + data={"errors": err.errors(include_url=False)}, + priority=EventPriority.WARNING, + ) + return None + + def get_gpu_list(self) -> Optional[list[AmdSmiListItem]]: + """Get GPU information from amd-smi list command + + Returns: + Optional[list[AmdSmiListItem]]: list of GPU info items + """ + ret = self._run_amd_smi_dict("list") + if not ret: + return [] + + gpu_data = ret if isinstance(ret, list) else [ret] + out: list[AmdSmiListItem] = [] + + def _to_int(x: Any, default: int = 0) -> int: + try: + return int(x) + except Exception: + return default + + for item in gpu_data: + if not isinstance(item, dict): + continue + + try: + out.append( + AmdSmiListItem( + gpu=_to_int(item.get("gpu", 0)), + bdf=str(item.get("bdf", "")), + uuid=str(item.get("uuid", "")), + kfd_id=_to_int(item.get("kfd_id", 0)), + node_id=_to_int(item.get("node_id", 0)), + partition_id=_to_int(item.get("partition_id", 0)), + ) + ) + except ValidationError as err: + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build AmdSmiListItem", + data={"errors": err.errors(include_url=False), "item": item}, + priority=EventPriority.WARNING, + ) + + return out + + def get_process(self) -> Optional[list[Processes]]: + """Get process information + + Returns: + Optional[list[Processes]]: list of GPU processes + """ + ret = self._run_amd_smi_dict("process") + if not ret: + return [] + + process_data = ret if isinstance(ret, list) else [ret] + out: list[Processes] = [] + + for item in process_data: + if not isinstance(item, dict): + continue + + gpu_idx = int(item.get("gpu", 0)) if item.get("gpu") not in (None, "") else 0 + process_list_raw = item.get("process_list", []) + if not isinstance(process_list_raw, list): + continue + + plist: list[ProcessListItem] = [] + + for entry in process_list_raw: + if not isinstance(entry, dict): + plist.append(ProcessListItem(process_info=str(entry))) + continue + + name = entry.get("name", "N/A") + pid_val = entry.get("pid", 0) + try: + pid = int(pid_val) if pid_val not in (None, "") else 0 + except Exception: + pid = 0 + + mem_vu = self._valueunit(entry.get("mem"), "B") + + mu = entry.get("memory_usage") or {} + mem_usage = ProcessMemoryUsage( + gtt_mem=self._valueunit(mu.get("gtt_mem"), "B"), + cpu_mem=self._valueunit(mu.get("cpu_mem"), "B"), + vram_mem=self._valueunit(mu.get("vram_mem"), "B"), + ) + + eu = entry.get("engine_usage") or {} + usage = ProcessUsage( + gfx=self._valueunit(eu.get("gfx"), "ns"), + enc=self._valueunit(eu.get("enc"), "ns"), + ) + + try: + plist.append( + ProcessListItem( + process_info=ProcessInfo( + name=str(name), + pid=pid, + memory_usage=mem_usage, + mem_usage=mem_vu, + usage=usage, + ) + ) + ) + except ValidationError as err: + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build ProcessListItem; skipping entry", + data={ + "errors": err.errors(include_url=False), + "gpu_index": gpu_idx, + "entry": repr(entry), + }, + priority=EventPriority.WARNING, + ) + continue + + try: + out.append(Processes(gpu=gpu_idx, process_list=plist)) + except ValidationError as err: + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build Processes", + data={"errors": err.errors(include_url=False), "gpu_index": gpu_idx}, + priority=EventPriority.WARNING, + ) + + return out + + def get_partition(self) -> Optional[Partition]: + """Check partition information + + Returns: + Optional[Partition]: Partition data if available + """ + ret = self._run_amd_smi_dict("partition") + if not ret: + return None + + partition_data = ret if isinstance(ret, list) else [ret] + memparts: list[PartitionMemory] = [] + computeparts: list[PartitionCompute] = [] + + for item in partition_data: + if not isinstance(item, dict): + continue + + gpu_idx = int(item.get("gpu", 0)) if item.get("gpu") not in (None, "") else 0 + mem_pt = item.get("memory_partition") + comp_pt = item.get("compute_partition") + + try: + memparts.append( + PartitionMemory(gpu_id=gpu_idx, partition_type=str(mem_pt) if mem_pt else None) + ) + except ValidationError as err: + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build PartitionMemory", + data={ + "errors": err.errors(include_url=False), + "gpu_index": gpu_idx, + "data": mem_pt, + }, + priority=EventPriority.WARNING, + ) + + try: + computeparts.append( + PartitionCompute( + gpu_id=gpu_idx, partition_type=str(comp_pt) if comp_pt else None + ) + ) + except ValidationError as err: + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build PartitionCompute", + data={ + "errors": err.errors(include_url=False), + "gpu_index": gpu_idx, + "data": comp_pt, + }, + priority=EventPriority.WARNING, + ) + + try: + return Partition(memory_partition=memparts, compute_partition=computeparts) + except ValidationError as err: + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build Partition", + data={"errors": err.errors(include_url=False)}, + priority=EventPriority.WARNING, + ) + return None + + def get_firmware(self) -> Optional[list[Fw]]: + """Get firmware information + + Returns: + Optional[list[Fw]]: List of firmware info per GPU + """ + ret = self._run_amd_smi_dict("firmware") + if not ret: + return [] + + firmware_data = ret if isinstance(ret, list) else [ret] + out: list[Fw] = [] + + for item in firmware_data: + if not isinstance(item, dict): + continue + + gpu_idx = int(item.get("gpu", 0)) if item.get("gpu") not in (None, "") else 0 + fw_list_raw = item.get("fw_list", []) + + if not isinstance(fw_list_raw, list): + continue + + normalized: list[FwListItem] = [] + for e in fw_list_raw: + if isinstance(e, dict): + fid = e.get("fw_name") + ver = e.get("fw_version") + normalized.append( + FwListItem( + fw_id="" if fid is None else str(fid), + fw_version="" if ver is None else str(ver), + ) + ) + else: + self._log_event( + category=EventCategory.APPLICATION, + description="Unrecognized firmware entry shape", + data={"entry_shape": repr(e)}, + priority=EventPriority.INFO, + ) + + try: + out.append(Fw(gpu=gpu_idx, fw_list=normalized)) + except ValidationError as err: + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build Fw", + data={"errors": err.errors(include_url=False), "gpu_index": gpu_idx}, + priority=EventPriority.WARNING, + ) + + return out + + def get_static(self) -> Optional[list[AmdSmiStatic]]: + """Get Static info from amd-smi static command + + Returns: + Optional[list[AmdSmiStatic]]: list of AmdSmiStatic instances or empty list + """ + ret = self._run_amd_smi_dict("static -g all") + if not ret: + self.logger.info("Bulk static query failed, attempting per-GPU fallback") + gpu_list = self.get_gpu_list() + if gpu_list: + fallback_data: list[dict] = [] + for gpu in gpu_list: + gpu_data = self._run_amd_smi_dict(f"static -g {gpu.gpu}") + if gpu_data: + if isinstance(gpu_data, dict): + fallback_data.append(gpu_data) + elif isinstance(gpu_data, list): + fallback_data.extend(gpu_data) + if fallback_data: + ret = fallback_data + else: + return [] + else: + return [] + + if isinstance(ret, dict) and "gpu_data" in ret: + ret = ret["gpu_data"] + + static_data = ret if isinstance(ret, list) else [ret] + out: list[AmdSmiStatic] = [] + + for item in static_data: + if not isinstance(item, dict) or "gpu" not in item: + continue + + gpu_idx = int(item.get("gpu", 0)) if item.get("gpu") not in (None, "") else 0 + + asic = item.get("asic", {}) or {} + board = item.get("board", {}) or {} + bus = item.get("bus", {}) or {} + vbios = item.get("vbios", {}) or {} + driver = item.get("driver", {}) or {} + numa = item.get("numa", {}) or {} + vram = item.get("vram", {}) or {} + cache = item.get("cache", {}) or {} + clock = item.get("clock", {}) or {} + soc_pstate = item.get("soc_pstate", {}) or {} + xgmi_plpd = item.get("xgmi_plpd", {}) or {} + + # Bus / PCIe + bus_model = StaticBus( + bdf=str(bus.get("bdf", "")), + max_pcie_width=self._valueunit(bus.get("max_pcie_width"), "x"), + max_pcie_speed=self._valueunit(bus.get("max_pcie_speed"), "GT/s"), + pcie_interface_version=self._normalize(bus.get("pcie_interface_version")), + slot_type=self._normalize(bus.get("slot_type"), slot_type=True), + ) + + # ASIC + oam_id_raw = asic.get("oam_id") + if oam_id_raw in (None, "", "N/A"): + oam_id_val: Union[int, str] = "N/A" + elif isinstance(oam_id_raw, str): + oam_id_val = oam_id_raw + else: + oam_id_val = int(oam_id_raw) if oam_id_raw is not None else "N/A" + + num_cu_raw = asic.get("num_compute_units") + if num_cu_raw in (None, "", "N/A"): + num_cu_val: Union[int, str] = "N/A" + elif isinstance(num_cu_raw, str): + num_cu_val = num_cu_raw + else: + num_cu_val = int(num_cu_raw) if num_cu_raw is not None else "N/A" + + asic_model = StaticAsic( + market_name=self._normalize( + asic.get("market_name") or asic.get("asic_name"), default="" + ), + vendor_id=str(asic.get("vendor_id", "")), + vendor_name=str(asic.get("vendor_name", "")), + subvendor_id=str(asic.get("subvendor_id", "")), + device_id=str(asic.get("device_id", "")), + subsystem_id=str(asic.get("subsystem_id", "")), + rev_id=str(asic.get("rev_id", "")), + asic_serial=str(asic.get("asic_serial", "")), + oam_id=oam_id_val, + num_compute_units=num_cu_val, + target_graphics_version=str(asic.get("target_graphics_version", "")), + ) + + # Board + board_model = StaticBoard( + model_number=str( + board.get("model_number", "") or board.get("amdsmi_model_number", "") + ), + product_serial=str(board.get("product_serial", "")), + fru_id=str(board.get("fru_id", "")), + product_name=str(board.get("product_name", "")), + manufacturer_name=str(board.get("manufacturer_name", "")), + ) + + # Driver + driver_model = StaticDriver( + name=self._normalize( + driver.get("driver_name") if driver else None, default="unknown" + ), + version=self._normalize( + driver.get("driver_version") if driver else None, default="unknown" + ), + ) + + # VBIOS + vbios_model: Optional[StaticVbios] = None + if vbios: + vbios_model = StaticVbios( + name=str(vbios.get("vbios_name", "")), + build_date=str(vbios.get("vbios_build_date", "")), + part_number=str(vbios.get("vbios_part_number", "")), + version=str(vbios.get("vbios_version", "")), + ) + + # NUMA + numa_node = int(numa.get("node", 0) or 0) + affinity_raw = numa.get("affinity") + if affinity_raw in (None, "", "N/A"): + affinity_val: Union[int, str] = "N/A" + elif isinstance(affinity_raw, str): + affinity_val = affinity_raw + else: + affinity_val = int(affinity_raw) if affinity_raw is not None else "N/A" + + numa_model = StaticNuma(node=numa_node, affinity=affinity_val) + + # VRAM + vram_type = str(vram.get("vram_type", "") or "unknown") + vram_vendor = vram.get("vram_vendor") + vram_bits = vram.get("vram_bit_width") + vram_size_b: Optional[int] = None + if vram.get("vram_size_mb") is not None: + try: + vram_size_b = int(vram["vram_size_mb"]) * 1024 * 1024 + except Exception: + vram_size_b = None + + vram_model = StaticVram( + type=vram_type, + vendor=None if vram_vendor in (None, "", "N/A") else str(vram_vendor), + size=self._valueunit(vram_size_b, "B"), + bit_width=self._valueunit(vram_bits, "bit"), + max_bandwidth=None, + ) + + # SOC P-state + soc_pstate_model = self._parse_soc_pstate(soc_pstate) + + # XGMI PLPD + xgmi_plpd_model = self._parse_xgmi_plpd(xgmi_plpd) + + # Cache info + cache_info_model = self._parse_cache_info(cache) + + # Clock + clock_dict_model = self._parse_clock_dict(clock) + + try: + out.append( + AmdSmiStatic( + gpu=gpu_idx, + asic=asic_model, + bus=bus_model, + vbios=vbios_model, + limit=None, + driver=driver_model, + board=board_model, + soc_pstate=soc_pstate_model, + xgmi_plpd=xgmi_plpd_model, + process_isolation="", + numa=numa_model, + vram=vram_model, + cache_info=cache_info_model, + partition=None, + clock=clock_dict_model, + ) + ) + except ValidationError as err: + self.logger.error(err) + self._log_event( + category=EventCategory.APPLICATION, + description="Failed to build AmdSmiStatic", + data={"errors": err.errors(include_url=False), "gpu_index": gpu_idx}, + priority=EventPriority.WARNING, + ) + + return out + + def _parse_soc_pstate(self, data: dict) -> Optional[StaticSocPstate]: + """Parse SOC P-state data + + Args: + data (dict): SOC P-state data from amd-smi + + Returns: + Optional[StaticSocPstate]: StaticSocPstate instance or None + """ + if not isinstance(data, dict): + return None + + try: + num_supported = int(data.get("num_supported", 0) or 0) + except Exception: + num_supported = 0 + try: + current_id = int(data.get("current_id", 0) or 0) + except Exception: + current_id = 0 + + policies_raw = data.get("policies") or [] + policies: list[StaticPolicy] = [] + if isinstance(policies_raw, list): + for p in policies_raw: + if not isinstance(p, dict): + continue + pid = p.get("policy_id", 0) + desc = p.get("policy_description", "") + try: + policies.append( + StaticPolicy( + policy_id=int(pid) if pid not in (None, "") else 0, + policy_description=str(desc), + ) + ) + except ValidationError: + continue + + if not num_supported and not current_id and not policies: + return None + + try: + return StaticSocPstate( + num_supported=num_supported, + current_id=current_id, + policies=policies, + ) + except ValidationError: + return None + + def _parse_xgmi_plpd(self, data: dict) -> Optional[StaticXgmiPlpd]: + """Parse XGMI PLPD data + + Args: + data (dict): XGMI PLPD data from amd-smi + + Returns: + Optional[StaticXgmiPlpd]: StaticXgmiPlpd instance or None + """ + if not isinstance(data, dict): + return None + + try: + num_supported = int(data.get("num_supported", 0) or 0) + except Exception: + num_supported = 0 + try: + current_id = int(data.get("current_id", 0) or 0) + except Exception: + current_id = 0 + + plpds_raw = data.get("plpds") or [] + plpds: list[StaticPolicy] = [] + if isinstance(plpds_raw, list): + for p in plpds_raw: + if not isinstance(p, dict): + continue + pid = p.get("policy_id", 0) + desc = p.get("policy_description", "") + try: + plpds.append( + StaticPolicy( + policy_id=int(pid) if pid not in (None, "") else 0, + policy_description=str(desc), + ) + ) + except ValidationError: + continue + + if not num_supported and not current_id and not plpds: + return None + + try: + return StaticXgmiPlpd( + num_supported=num_supported, + current_id=current_id, + plpds=plpds, + ) + except ValidationError: + return None + + def _parse_cache_info(self, data: dict) -> list[StaticCacheInfoItem]: + """Parse cache info data + + Args: + data (dict): Cache data from amd-smi + + Returns: + list[StaticCacheInfoItem]: list of StaticCacheInfoItem instances + """ + if not isinstance(data, dict) or not isinstance(data.get("cache"), list): + return [] + + items = data["cache"] + + def _as_list_str(v: Any) -> list[str]: + if isinstance(v, list): + return [str(x) for x in v] + if isinstance(v, str): + parts = [p.strip() for p in v.replace(";", ",").split(",")] + return [p for p in parts if p] + return [] + + out: list[StaticCacheInfoItem] = [] + for e in items: + if not isinstance(e, dict): + continue + + cache_level = self._valueunit_req(e.get("cache_level"), "") + max_num_cu_shared = self._valueunit_req(e.get("max_num_cu_shared"), "") + num_cache_instance = self._valueunit_req(e.get("num_cache_instance"), "") + cache_size = self._valueunit(e.get("cache_size"), "", required=False) + cache_props = _as_list_str(e.get("cache_properties")) + + lvl_val = cache_level.value + cache_label_val = ( + f"Label_{int(lvl_val) if isinstance(lvl_val, (int, float)) else lvl_val}" + ) + cache_label = ValueUnit(value=cache_label_val, unit="") + + try: + out.append( + StaticCacheInfoItem( + cache=cache_label, + cache_properties=cache_props, + cache_size=cache_size, + cache_level=cache_level, + max_num_cu_shared=max_num_cu_shared, + num_cache_instance=num_cache_instance, + ) + ) + except ValidationError as err: + self._log_event( + category=EventCategory.APPLICATION, + description="Bad cache info entry from amd-smi; skipping", + data={"entry": repr(e), "errors": err.errors(include_url=False)}, + priority=EventPriority.WARNING, + ) + continue + + return out + + def _parse_clock(self, data: dict) -> Optional[StaticClockData]: + """Parse clock data + + Args: + data (dict): Clock data from amd-smi + + Returns: + Optional[StaticClockData]: StaticClockData instance or None + """ + if not isinstance(data, dict): + return None + + freqs_raw = data.get("frequency") + if not isinstance(freqs_raw, list) or not freqs_raw: + return None + + def _to_mhz(v: object) -> Optional[int]: + x = self._to_number(v) + if x is None: + return None + xf = float(x) + if xf >= 1e7: + return int(round(xf / 1_000_000.0)) + if xf >= 1e4: + return int(round(xf / 1_000.0)) + return int(round(xf)) + + freqs_mhz: list[int] = [] + for v in freqs_raw: + mhz = _to_mhz(v) + if mhz is not None: + freqs_mhz.append(mhz) + + if not freqs_mhz: + return None + + def _fmt(n: Optional[int]) -> Optional[str]: + return None if n is None else f"{n} MHz" + + level0: str = _fmt(freqs_mhz[0]) or "0 MHz" + level1: Optional[str] = _fmt(freqs_mhz[1]) if len(freqs_mhz) > 1 else None + level2: Optional[str] = _fmt(freqs_mhz[2]) if len(freqs_mhz) > 2 else None + + cur_raw = data.get("current") + current: Optional[int] + if isinstance(cur_raw, (int, float)): + current = int(cur_raw) + elif isinstance(cur_raw, str) and cur_raw.strip() and cur_raw.upper() != "N/A": + try: + current = int(cur_raw.strip()) + except Exception: + current = None + else: + current = None + + try: + levels = StaticFrequencyLevels.model_validate( + {"Level 0": level0, "Level 1": level1, "Level 2": level2} + ) + + # Use the alias "current level" as defined in the model + return StaticClockData.model_validate( + {"frequency_levels": levels, "current level": current} + ) + except ValidationError: + return None + + def _parse_clock_dict(self, data: dict) -> Optional[dict[str, Union[StaticClockData, None]]]: + """Parse clock data into dictionary structure + + Args: + data (dict): Clock data from amd-smi + + Returns: + Optional[dict[str, Union[StaticClockData, None]]]: dictionary of clock data or None + """ + if not isinstance(data, dict): + return None + + clock_dict: dict[str, Union[StaticClockData, None]] = {} + + clock_data = self._parse_clock(data) + if clock_data: + clock_dict["clk"] = clock_data + + return clock_dict if clock_dict else None + + def collect_data( + self, + args: Any = None, + ) -> tuple[TaskResult, Optional[AmdSmiDataModel]]: + """Collect AmdSmi data from system + + Args: + args (Any, optional): optional arguments for data collection. Defaults to None. + + Returns: + tuple[TaskResult, Optional[AmdSmiDataModel]]: task result and collected data model + """ + + if not self._check_amdsmi_installed(): + self._log_event( + category=EventCategory.APPLICATION, + description="amd-smi is not installed", + priority=EventPriority.WARNING, + console_log=True, + ) + self.result.status = ExecutionStatus.NOT_RAN + return self.result, None + + try: + version = self._get_amdsmi_version() + if version is not None: + self.logger.info("amd-smi version: %s", version.version) + self.logger.info("ROCm version: %s", version.rocm_version) + + amd_smi_data = self._get_amdsmi_data() + + if amd_smi_data is None: + return self.result, None + + return self.result, amd_smi_data + except Exception as e: + self._log_event( + category=EventCategory.APPLICATION, + description="Error running amd-smi collector", + data={"exception": get_exception_traceback(e)}, + priority=EventPriority.ERROR, + console_log=True, + ) + self.result.status = ExecutionStatus.EXECUTION_FAILURE + return self.result, None diff --git a/nodescraper/plugins/inband/amdsmi/amdsmi_plugin.py b/nodescraper/plugins/inband/amdsmi/amdsmi_plugin.py new file mode 100644 index 00000000..67eda944 --- /dev/null +++ b/nodescraper/plugins/inband/amdsmi/amdsmi_plugin.py @@ -0,0 +1,43 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +from nodescraper.base import InBandDataPlugin + +from .amdsmi_analyzer import AmdSmiAnalyzer +from .amdsmi_collector import AmdSmiCollector +from .amdsmidata import AmdSmiDataModel +from .analyzer_args import AmdSmiAnalyzerArgs + + +class AmdSmiPlugin(InBandDataPlugin[AmdSmiDataModel, None, AmdSmiAnalyzerArgs]): + """Plugin for collection and analysis of amdsmi data""" + + DATA_MODEL = AmdSmiDataModel + + COLLECTOR = AmdSmiCollector + + ANALYZER = AmdSmiAnalyzer + + ANALYZER_ARGS = AmdSmiAnalyzerArgs diff --git a/nodescraper/plugins/inband/amdsmi/amdsmidata.py b/nodescraper/plugins/inband/amdsmi/amdsmidata.py new file mode 100644 index 00000000..fd9eae41 --- /dev/null +++ b/nodescraper/plugins/inband/amdsmi/amdsmidata.py @@ -0,0 +1,475 @@ +import re +from typing import Any, List, Mapping, Optional, Union + +from pydantic import ( + AliasChoices, + BaseModel, + ConfigDict, + Field, + field_validator, + model_validator, +) + +from nodescraper.models.datamodel import DataModel +from nodescraper.utils import find_annotation_in_container + +_NUM_UNIT_RE = re.compile(r"^\s*([-+]?\d+(?:\.\d+)?)(?:\s*([A-Za-z%/][A-Za-z0-9%/._-]*))?\s*$") + + +def na_to_none(values: Union[int, str]): + if values == "N/A": + return None + return values + + +def na_to_none_list(values: List[Union[int, str, None]]) -> List[Union[int, str, None]]: + ret_list: List[Union[int, str, None]] = values.copy() + for i in range(len(ret_list)): + if ret_list[i] == "N/A": + ret_list[i] = None + return ret_list + + +def na_to_none_dict(values: object) -> Optional[dict[str, Any]]: + """Normalize mapping-like fields where 'N/A' or empty should become None. + Accepts None; returns None for 'N/A'/'NA'/'' or non-mapping inputs.""" + if values is None: + return None + if isinstance(values, str) and values.strip().upper() in {"N/A", "NA", ""}: + return None + if not isinstance(values, Mapping): + return None + + out: dict[str, Any] = {} + for k, v in values.items(): + if isinstance(v, str) and v.strip().upper() in {"N/A", "NA", ""}: + out[k] = None + else: + out[k] = v + return out + + +class AmdSmiBaseModel(BaseModel): + """Base model for AMD SMI data models. + + This is used to ensure that all AMD SMI data models have the same + configuration and validation. + """ + + model_config = ConfigDict( + str_min_length=1, + str_strip_whitespace=True, + populate_by_name=True, + extra="forbid", # Forbid extra fields not defined in the model + ) + + def __init__(self, **data): + # Convert Union[int, str, float] -> ValueUnit + for field_name, field_type in self.model_fields.items(): + annotation = field_type.annotation + target_type, container = find_annotation_in_container(annotation, ValueUnit) + if target_type is None: + continue + + if field_name in data and isinstance(data[field_name], (int, str, float)): + # If the field is a primitive type, convert it to ValueUnit dict for validator + data[field_name] = { + "value": data[field_name], + "unit": "", + } + + super().__init__(**data) + + +class ValueUnit(BaseModel): + """A model for a value with a unit. + + Accepts: + - dict: {"value": 123, "unit": "W"} + - number: 123 -> unit="" + - string with number+unit: "123 W" -> {"value": 123, "unit": "W"} + - "N/A" / "NA" / "" / None -> None + """ + + value: Union[int, float, str] + unit: str = "" + + @model_validator(mode="before") + @classmethod + def _coerce(cls, v): + # treat N/A as None + def na(x) -> bool: + return x is None or (isinstance(x, str) and x.strip().upper() in {"N/A", "NA", ""}) + + if na(v): + return None + + if isinstance(v, dict): + val = v.get("value") + unit = v.get("unit", "") + if na(val): + return None + if isinstance(val, str): + m = _NUM_UNIT_RE.match(val.strip()) + if m and not unit: + num, u = m.groups() + unit = u or unit or "" + val = float(num) if "." in num else int(num) + return {"value": val, "unit": unit} + + # numbers + if isinstance(v, (int, float)): + return {"value": v, "unit": ""} + + if isinstance(v, str): + s = v.strip() + m = _NUM_UNIT_RE.match(s) + if m: + num, unit = m.groups() + val = float(num) if "." in num else int(num) + return {"value": val, "unit": unit or ""} + return {"value": s, "unit": ""} + + return v + + @field_validator("unit") + @classmethod + def _clean_unit(cls, u): + return "" if u is None else str(u).strip() + + +# Process +class ProcessMemoryUsage(BaseModel): + gtt_mem: Optional[ValueUnit] + cpu_mem: Optional[ValueUnit] + vram_mem: Optional[ValueUnit] + + na_validator = field_validator("gtt_mem", "cpu_mem", "vram_mem", mode="before")(na_to_none) + + +class ProcessUsage(BaseModel): + # AMDSMI reports engine usage in nanoseconds + gfx: Optional[ValueUnit] + enc: Optional[ValueUnit] + na_validator = field_validator("gfx", "enc", mode="before")(na_to_none) + + +class ProcessInfo(BaseModel): + name: str + pid: int + memory_usage: ProcessMemoryUsage + mem_usage: Optional[ValueUnit] + usage: ProcessUsage + na_validator = field_validator("mem_usage", mode="before")(na_to_none) + + +class ProcessListItem(BaseModel): + process_info: Union[ProcessInfo, str] + + +class Processes(BaseModel): + gpu: int + process_list: List[ProcessListItem] + + +# FW +class FwListItem(BaseModel): + fw_id: str + fw_version: str + + +class Fw(BaseModel): + gpu: int + fw_list: Union[List[FwListItem], str] + + +class AmdSmiListItem(BaseModel): + gpu: int + bdf: str + uuid: str + kfd_id: int + node_id: int + partition_id: int + + +class AmdSmiVersion(BaseModel): + """Contains the versioning info for amd-smi""" + + tool: Optional[str] = None + version: Optional[str] = None + amdsmi_library_version: Optional[str] = None + rocm_version: Optional[str] = None + amdgpu_version: Optional[str] = None + amd_hsmp_driver_version: Optional[str] = None + + @field_validator("*", mode="before") + @classmethod + def _stringify(cls, v): + if v is None or isinstance(v, str): + return v + if isinstance(v, (bytes, bytearray)): + return v.decode("utf-8", "ignore") + if isinstance(v, (tuple, list)): + return ".".join(str(x) for x in v) + return str(v) + + +class PartitionAccelerator(BaseModel): + """Accelerator partition data""" + + gpu_id: int + memory: Optional[str] = None + accelerator_type: Optional[str] = None + accelerator_profile_index: Optional[Union[str, int]] = None + partition_id: Optional[int] = None + + +class PartitionMemory(BaseModel): + """Memory Partition data""" + + gpu_id: int + partition_type: Optional[str] = None + + +class PartitionCompute(BaseModel): + """Compute Partition data""" + + gpu_id: int + partition_type: Optional[str] = None + + +class Partition(BaseModel): + """Contains the partition info for amd-smi""" + + memory_partition: list[PartitionMemory] = Field(default_factory=list) + compute_partition: list[PartitionCompute] = Field(default_factory=list) + + +### STATIC DATA ### +class StaticAsic(BaseModel): + market_name: str + vendor_id: str + vendor_name: str + subvendor_id: str + device_id: str + subsystem_id: str + rev_id: str + asic_serial: str + oam_id: Union[int, str] # can be N/A + num_compute_units: Union[int, str] # can be N/A + target_graphics_version: str + + +class StaticBus(AmdSmiBaseModel): + bdf: str + max_pcie_width: Optional[ValueUnit] = None + max_pcie_speed: Optional[ValueUnit] = None + pcie_interface_version: str = "unknown" + slot_type: str = "unknown" + + +class StaticVbios(BaseModel): + name: str + build_date: str + part_number: str + version: str + + +class StaticLimit(AmdSmiBaseModel): + max_power: Optional[ValueUnit] + min_power: Optional[ValueUnit] + socket_power: Optional[ValueUnit] + slowdown_edge_temperature: Optional[ValueUnit] + slowdown_hotspot_temperature: Optional[ValueUnit] + slowdown_vram_temperature: Optional[ValueUnit] + shutdown_edge_temperature: Optional[ValueUnit] + shutdown_hotspot_temperature: Optional[ValueUnit] + shutdown_vram_temperature: Optional[ValueUnit] + na_validator = field_validator( + "max_power", + "min_power", + "socket_power", + "slowdown_edge_temperature", + "slowdown_hotspot_temperature", + "slowdown_vram_temperature", + "shutdown_edge_temperature", + "shutdown_hotspot_temperature", + "shutdown_vram_temperature", + mode="before", + )(na_to_none) + + +class StaticDriver(BaseModel): + name: str + version: str + + +class StaticBoard(BaseModel): + model_config = ConfigDict( + populate_by_name=True, + ) + + amdsmi_model_number: str = Field( + alias="model_number" + ) # Model number is a reserved keyword for pydantic + product_serial: str + fru_id: str + product_name: str + manufacturer_name: str + + +class StaticPartition(BaseModel): + # The name for compute_partition has changed we will support both for now + + compute_partition: str = Field( + validation_alias=AliasChoices("compute_partition", "accelerator_partition") + ) + memory_partition: str + partition_id: int + + +class StaticPolicy(BaseModel): + policy_id: int + policy_description: str + + +class StaticSocPstate(BaseModel): + num_supported: int + current_id: int + policies: List[StaticPolicy] + + +class StaticXgmiPlpd(BaseModel): + num_supported: int + current_id: int + plpds: List[StaticPolicy] + + +class StaticNuma(BaseModel): + node: int + affinity: Union[int, str] # can be N/A + + +class StaticVram(AmdSmiBaseModel): + type: str + vendor: Optional[str] + size: Optional[ValueUnit] + bit_width: Optional[ValueUnit] + max_bandwidth: Optional[ValueUnit] = None + na_validator = field_validator("vendor", "size", "bit_width", "max_bandwidth", mode="before")( + na_to_none + ) + + +class StaticCacheInfoItem(AmdSmiBaseModel): + cache: ValueUnit + cache_properties: List[str] + cache_size: Optional[ValueUnit] + cache_level: ValueUnit + max_num_cu_shared: ValueUnit + num_cache_instance: ValueUnit + na_validator = field_validator("cache_size", mode="before")(na_to_none) + + +class StaticFrequencyLevels(BaseModel): + model_config = ConfigDict( + populate_by_name=True, + ) + + Level_0: str = Field(..., alias="Level 0") + Level_1: Optional[str] = Field(default=None, alias="Level 1") + Level_2: Optional[str] = Field(default=None, alias="Level 2") + + +class StaticClockData(BaseModel): + model_config = ConfigDict( + populate_by_name=True, + ) + frequency_levels: StaticFrequencyLevels + + current_level: Optional[int] = Field(..., alias="current level") + na_validator = field_validator("current_level", mode="before")(na_to_none) + + +class AmdSmiStatic(BaseModel): + """Contains all static data""" + + gpu: int + asic: StaticAsic + bus: StaticBus + vbios: Optional[StaticVbios] + limit: Optional[StaticLimit] + driver: StaticDriver + board: StaticBoard + soc_pstate: Optional[StaticSocPstate] + xgmi_plpd: Optional[StaticXgmiPlpd] + process_isolation: str + numa: StaticNuma + vram: StaticVram + cache_info: List[StaticCacheInfoItem] + partition: Optional[StaticPartition] = None # This has been removed in Amd-smi 26.0.0+d30a0afe+ + clock: Optional[dict[str, Union[StaticClockData, None]]] = None + na_validator_dict = field_validator("clock", mode="before")(na_to_none_dict) + na_validator = field_validator("soc_pstate", "xgmi_plpd", "vbios", "limit", mode="before")( + na_to_none + ) + + +class AmdSmiDataModel(DataModel): + """Data model for amd-smi data. + + Optionals are used to allow for the data to be missing, + This makes the data class more flexible for the analyzer + which consumes only the required data. If any more data is + required for the analyzer then they should not be set to + default. + """ + + model_config = ConfigDict( + str_min_length=1, + str_strip_whitespace=True, + populate_by_name=True, + ) + + version: Optional[AmdSmiVersion] = None + gpu_list: Optional[list[AmdSmiListItem]] = Field(default_factory=list) + partition: Optional[Partition] = None + process: Optional[list[Processes]] = Field(default_factory=list) + firmware: Optional[list[Fw]] = Field(default_factory=list) + static: Optional[list[AmdSmiStatic]] = Field(default_factory=list) + + def get_list(self, gpu: int) -> Optional[AmdSmiListItem]: + """Get the gpu list item for the given gpu id.""" + if self.gpu_list is None: + return None + for item in self.gpu_list: + if item.gpu == gpu: + return item + return None + + def get_process(self, gpu: int) -> Optional[Processes]: + """Get the process data for the given gpu id.""" + if self.process is None: + return None + for item in self.process: + if item.gpu == gpu: + return item + return None + + def get_firmware(self, gpu: int) -> Optional[Fw]: + """Get the firmware data for the given gpu id.""" + if self.firmware is None: + return None + for item in self.firmware: + if item.gpu == gpu: + return item + return None + + def get_static(self, gpu: int) -> Optional[AmdSmiStatic]: + """Get the static data for the given gpu id.""" + if self.static is None: + return None + for item in self.static: + if item.gpu == gpu: + return item + return None diff --git a/nodescraper/plugins/inband/amdsmi/analyzer_args.py b/nodescraper/plugins/inband/amdsmi/analyzer_args.py new file mode 100644 index 00000000..b8721014 --- /dev/null +++ b/nodescraper/plugins/inband/amdsmi/analyzer_args.py @@ -0,0 +1,46 @@ +############################################################################### +# +# MIT License +# +# Copyright (c) 2025 Advanced Micro Devices, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +############################################################################### +from typing import Optional + +from nodescraper.models import AnalyzerArgs + + +class AmdSmiAnalyzerArgs(AnalyzerArgs): + + check_static_data: bool = False + expected_gpu_processes: Optional[int] = None + expected_max_power: Optional[int] = None + expected_driver_version: Optional[str] = None + expected_memory_partition_mode: Optional[str] = None + expected_compute_partition_mode: Optional[str] = None + expected_pldm_version: Optional[str] = None + l0_to_recovery_count_error_threshold: Optional[int] = 3 + l0_to_recovery_count_warning_threshold: Optional[int] = 1 + vendorid_ep: Optional[str] = None + vendorid_ep_vf: Optional[str] = None + devid_ep: Optional[str] = None + devid_ep_vf: Optional[str] = None + sku_name: Optional[str] = None diff --git a/nodescraper/utils.py b/nodescraper/utils.py index 9b1fb88c..ceaccea3 100644 --- a/nodescraper/utils.py +++ b/nodescraper/utils.py @@ -27,7 +27,7 @@ import re import traceback from enum import Enum -from typing import TypeVar +from typing import Any, TypeVar, Union, get_args, get_origin T = TypeVar("T") @@ -171,6 +171,50 @@ def bytes_to_human_readable(input_bytes: int) -> str: return f"{gb}GB" +def find_annotation_in_container( + annotation, target_type +) -> Union[tuple[Any, list[Any]], tuple[None, list[Any]]]: + """Recursively search for a target type in an annotation and return the target type and the containers + supported container types are generic types, Callable, Tuple, Union, Literal, Final, ClassVar + and Annotated. If the target type is not found then None is returned. + + Examples: + find_annotation_in_container(Union[int, str], int) -> int, [Union[int, str]] + find_annotation_in_container(Union[int, dict[str, list[MyClass]]], MyClass) -> MyClass, [list,dict,union] + find_annotation_in_container(Union[int, str], MyClass) -> None, [] + + Parameters + ---------- + annotation : type + A type annotation to search for the target type in. + target_type : type + The target type to search for. + + Returns + ------- + Union[tuple[Any, list[Any]], tuple[None, []]] + The target type and the containers if found, otherwise None and an empty list. + """ + containers: list[Any] = [] + origin = get_origin(annotation) + args = get_args(annotation) + if len(args) == 0 and issubclass(annotation, target_type): + return annotation, containers + if isinstance(args, tuple): + for item in args: + item_args = get_args(item) + if len(item_args) > 0: + result, container = find_annotation_in_container(item, target_type) + containers += container + if result: + containers.append(origin) + return result, containers + if len(get_args(item)) == 0 and issubclass(item, target_type): + containers.append(origin) + return item, containers + return None, [] + + def shell_quote(s: str) -> str: """Single quote fix diff --git a/test/unit/plugin/test_amdsmi_collector.py b/test/unit/plugin/test_amdsmi_collector.py new file mode 100644 index 00000000..6783c407 --- /dev/null +++ b/test/unit/plugin/test_amdsmi_collector.py @@ -0,0 +1,394 @@ +import json +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from nodescraper.enums.systeminteraction import SystemInteractionLevel +from nodescraper.plugins.inband.amdsmi.amdsmi_collector import AmdSmiCollector + + +def make_cmd_result(stdout: str, stderr: str = "", exit_code: int = 0) -> MagicMock: + """Create a mock command result""" + result = MagicMock() + result.stdout = stdout + result.stderr = stderr + result.exit_code = exit_code + return result + + +def make_json_response(data: Any) -> str: + """Convert data to JSON string""" + return json.dumps(data) + + +@pytest.fixture +def mock_commands(monkeypatch): + """Mock all amd-smi commands with sample data""" + + def mock_run_sut_cmd(cmd: str) -> MagicMock: + if "which amd-smi" in cmd: + return make_cmd_result("/usr/bin/amd-smi") + + if "version --json" in cmd: + return make_cmd_result( + make_json_response( + [{"tool": "amdsmi", "amdsmi_library_version": "1.2.3", "rocm_version": "6.1.0"}] + ) + ) + + if "list --json" in cmd: + return make_cmd_result( + make_json_response( + [ + { + "gpu": 0, + "bdf": "0000:0b:00.0", + "uuid": "GPU-UUID-123", + "kfd_id": 7, + "node_id": 3, + "partition_id": 0, + } + ] + ) + ) + + if "process --json" in cmd: + return make_cmd_result( + make_json_response( + [ + { + "gpu": 0, + "process_list": [ + { + "name": "python", + "pid": 4242, + "mem": 1024, + "engine_usage": {"gfx": 1000000, "enc": 0}, + "memory_usage": { + "gtt_mem": 0, + "cpu_mem": 4096, + "vram_mem": 2048, + }, + "cu_occupancy": 12, + }, + { + "name": "test", + "pid": 9999, + "mem": 0, + "engine_usage": {"gfx": 0, "enc": 0}, + "memory_usage": {"gtt_mem": 0, "cpu_mem": 0, "vram_mem": 0}, + "cu_occupancy": 0, + }, + ], + } + ] + ) + ) + + if "partition --json" in cmd: + return make_cmd_result( + make_json_response( + [{"gpu": 0, "memory_partition": "NPS1", "compute_partition": "CPX_DISABLED"}] + ) + ) + + if "firmware --json" in cmd: + return make_cmd_result( + make_json_response( + [ + { + "gpu": 0, + "fw_list": [ + {"fw_name": "SMU", "fw_version": "55.33"}, + {"fw_name": "VBIOS", "fw_version": "V1"}, + ], + } + ] + ) + ) + + if "static -g all --json" in cmd: + return make_cmd_result( + make_json_response( + { + "gpu_data": [ + { + "gpu": 0, + "asic": { + "market_name": "SomeGPU", + "vendor_id": "1002", + "vendor_name": "AMD", + "subvendor_id": "1ABC", + "device_id": "0x1234", + "subsystem_id": "0x5678", + "rev_id": "A1", + "asic_serial": "ASERIAL", + "oam_id": 0, + "num_compute_units": 224, + "target_graphics_version": "GFX940", + "vram_type": "HBM3", + "vram_vendor": "Micron", + "vram_bit_width": 4096, + }, + "board": { + "model_number": "Board-42", + "product_serial": "SN0001", + "fru_id": "FRU-1", + "product_name": "ExampleBoard", + "manufacturer_name": "ACME", + }, + "bus": { + "bdf": "0000:0b:00.0", + "max_pcie_width": 16, + "max_pcie_speed": 16.0, + "pcie_interface_version": "PCIe 5.0", + "slot_type": "PCIe", + }, + "vbios": { + "vbios_name": "vbiosA", + "vbios_build_date": "2024-01-01", + "vbios_part_number": "PN123", + "vbios_version": "V1", + }, + "driver": {"driver_name": "amdgpu", "driver_version": "6.1.0"}, + "numa": {"node": 3, "affinity": 0}, + "vram": { + "vram_type": "HBM3", + "vram_vendor": "Micron", + "vram_bit_width": 4096, + "vram_size_mb": 65536, + }, + "cache": { + "cache": [ + { + "cache_level": 1, + "max_num_cu_shared": 8, + "num_cache_instance": 32, + "cache_size": 262144, + "cache_properties": "PropertyA, PropertyB; PropertyC", + } + ] + }, + "clock": {"frequency": [500, 1500, 2000], "current": 1}, + "soc_pstate": {}, + "xgmi_plpd": {}, + } + ] + } + ) + ) + + return make_cmd_result("", f"Unknown command: {cmd}", 1) + + return mock_run_sut_cmd + + +@pytest.fixture +def collector(mock_commands, conn_mock, system_info, monkeypatch): + """Create a collector with mocked commands""" + c = AmdSmiCollector( + system_info=system_info, + system_interaction_level=SystemInteractionLevel.PASSIVE, + connection=conn_mock, + ) + monkeypatch.setattr(c, "_run_sut_cmd", mock_commands) + return c + + +def test_check_amdsmi_installed(collector): + """Test that _check_amdsmi_installed works""" + assert collector._check_amdsmi_installed() is True + + +def test_check_amdsmi_not_installed(conn_mock, system_info, monkeypatch): + """Test when amd-smi is not installed""" + + def mock_which_fail(cmd: str) -> MagicMock: + if "which amd-smi" in cmd: + return make_cmd_result("", "no amd-smi in /usr/bin", 1) + return make_cmd_result("") + + c = AmdSmiCollector( + system_info=system_info, + system_interaction_level=SystemInteractionLevel.PASSIVE, + connection=conn_mock, + ) + monkeypatch.setattr(c, "_run_sut_cmd", mock_which_fail) + + result, data = c.collect_data() + assert data is None + assert result.status.name == "NOT_RAN" + + +def test_collect_data(collector): + """Test full data collection""" + result, data = collector.collect_data() + assert data is not None + assert data.version is not None + assert data.version.tool == "amdsmi" + assert data.version.version == "1.2.3" + assert data.version.rocm_version == "6.1.0" + + # gpu_list + assert data.gpu_list is not None and len(data.gpu_list) == 1 + assert data.gpu_list[0].bdf == "0000:0b:00.0" + assert data.gpu_list[0].uuid == "GPU-UUID-123" + assert data.gpu_list[0].kfd_id == 7 + assert data.gpu_list[0].node_id == 3 + + # processes + assert data.process is not None and len(data.process) == 1 + assert len(data.process[0].process_list) == 2 + + # partition + assert data.partition is not None + assert len(data.partition.memory_partition) == 1 + assert data.partition.memory_partition[0].partition_type == "NPS1" + + # firmware + assert data.firmware is not None and len(data.firmware) == 1 + assert len(data.firmware[0].fw_list) == 2 + + # static + assert data.static is not None and len(data.static) == 1 + s = data.static[0] + assert s.bus is not None and s.bus.max_pcie_speed is not None + assert float(s.bus.max_pcie_speed.value) == pytest.approx(16.0) + assert s.bus.pcie_interface_version == "PCIe 5.0" + + +def test_get_gpu_list(collector): + """Test GPU list parsing""" + gpu_list = collector.get_gpu_list() + assert gpu_list is not None and len(gpu_list) == 1 + assert gpu_list[0].gpu == 0 + assert gpu_list[0].bdf == "0000:0b:00.0" + assert gpu_list[0].uuid == "GPU-UUID-123" + + +def test_get_process(collector): + """Test process list parsing""" + procs = collector.get_process() + assert procs is not None and len(procs) == 1 + assert procs[0].gpu == 0 + assert len(procs[0].process_list) == 2 + + p0 = procs[0].process_list[0].process_info + assert p0.name == "python" + assert p0.pid == 4242 + assert p0.mem_usage is not None and p0.mem_usage.unit == "B" + assert p0.usage.gfx is not None and p0.usage.gfx.unit == "ns" + + p1 = procs[0].process_list[1].process_info + assert p1.name == "test" + assert p1.pid == 9999 + + +def test_get_partition(collector): + """Test partition parsing""" + p = collector.get_partition() + assert p is not None + assert len(p.memory_partition) == 1 and len(p.compute_partition) == 1 + assert p.memory_partition[0].partition_type == "NPS1" + assert p.compute_partition[0].partition_type == "CPX_DISABLED" + + +def test_get_firmware(collector): + """Test firmware parsing""" + fw = collector.get_firmware() + assert fw is not None and len(fw) == 1 + assert fw[0].gpu == 0 + assert len(fw[0].fw_list) == 2 + assert fw[0].fw_list[0].fw_id == "SMU" + assert fw[0].fw_list[0].fw_version == "55.33" + + +def test_get_static(collector): + """Test static data parsing""" + stat = collector.get_static() + assert stat is not None and len(stat) == 1 + s = stat[0] + + # ASIC + assert s.asic.market_name == "SomeGPU" + assert s.asic.vendor_name == "AMD" + assert s.asic.num_compute_units == 224 + + # Board + assert s.board.amdsmi_model_number == "Board-42" + assert s.board.manufacturer_name == "ACME" + + # Bus/PCIe + assert s.bus.bdf == "0000:0b:00.0" + assert s.bus.max_pcie_width is not None + assert s.bus.max_pcie_speed is not None + + # VRAM + assert s.vram.type == "HBM3" + assert s.vram.vendor == "Micron" + + # Cache + assert s.cache_info is not None and len(s.cache_info) == 1 + cache = s.cache_info[0] + assert cache.cache_level.value == 1 + assert cache.cache_properties + + if s.clock is not None: + assert isinstance(s.clock, dict) + if "clk" in s.clock and s.clock["clk"] is not None: + assert s.clock["clk"].frequency_levels is not None + + +def test_cache_properties_parsing(collector): + """Test cache properties string parsing""" + stat = collector.get_static() + item = stat[0].cache_info[0] + assert isinstance(item.cache.value, str) and item.cache.value.startswith("Label_") + assert item.cache_properties + assert {"PropertyA", "PropertyB", "PropertyC"}.issubset(set(item.cache_properties)) + + +def test_json_parse_error(conn_mock, system_info, monkeypatch): + """Test handling of malformed JSON""" + + def mock_bad_json(cmd: str) -> MagicMock: + if "which amd-smi" in cmd: + return make_cmd_result("/usr/bin/amd-smi") + if "version --json" in cmd: + return make_cmd_result("{ invalid json }") + return make_cmd_result("") + + c = AmdSmiCollector( + system_info=system_info, + system_interaction_level=SystemInteractionLevel.PASSIVE, + connection=conn_mock, + ) + monkeypatch.setattr(c, "_run_sut_cmd", mock_bad_json) + + result, data = c.collect_data() + assert data is not None + assert data.version is None + assert len(result.events) > 0 # Should have error events + + +def test_command_error(conn_mock, system_info, monkeypatch): + """Test handling of command execution errors""" + + def mock_cmd_error(cmd: str) -> MagicMock: + if "which amd-smi" in cmd: + return make_cmd_result("/usr/bin/amd-smi") + return make_cmd_result("", "Command failed", 1) + + c = AmdSmiCollector( + system_info=system_info, + system_interaction_level=SystemInteractionLevel.PASSIVE, + connection=conn_mock, + ) + monkeypatch.setattr(c, "_run_sut_cmd", mock_cmd_error) + + result, data = c.collect_data() + assert data is not None + assert data.version is None + assert data.gpu_list == [] + assert len(result.events) > 0 # Should have error events