Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 236 additions & 3 deletions nodescraper/plugins/inband/amdsmi/amdsmi_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@
from nodescraper.interfaces import DataAnalyzer
from nodescraper.models import TaskResult

from .amdsmidata import AmdSmiDataModel, AmdSmiStatic, Fw, Partition, Processes
from .amdsmidata import (
AmdSmiDataModel,
AmdSmiMetric,
AmdSmiStatic,
EccData,
Fw,
Partition,
Processes,
)
from .analyzer_args import AmdSmiAnalyzerArgs


Expand Down Expand Up @@ -122,6 +130,223 @@ def check_expected_driver_version(
},
)

def check_amdsmi_metric_pcie(
self,
amdsmi_metric_data: list[AmdSmiMetric],
l0_to_recovery_count_error_threshold: int,
l0_to_recovery_count_warning_threshold: int,
):
"""Check PCIe metrics for link errors

Checks for PCIe link width, speed, replays, recoveries, and NAKs.
Expected width/speeds should come from SKU info.

Args:
amdsmi_metric_data (list[AmdSmiMetric]): AmdSmiMetric data model
l0_to_recovery_count_error_threshold (int): Threshold for error events
l0_to_recovery_count_warning_threshold (int): Threshold for warning events
"""
for metric in amdsmi_metric_data:
pcie_data = metric.pcie
gpu = metric.gpu

if pcie_data.width is not None and pcie_data.width != 16:
self._log_event(
category=EventCategory.IO,
description=f"GPU: {gpu} PCIe width is not x16",
priority=EventPriority.ERROR,
data={"gpu": gpu, "pcie_width": pcie_data.width, "expected": 16},
console_log=True,
)

if pcie_data.speed is not None and pcie_data.speed.value is not None:
try:
speed_val = float(pcie_data.speed.value)
if speed_val != 32.0:
self._log_event(
category=EventCategory.IO,
description=f"GPU: {gpu} PCIe link speed is not Gen5 (32 GT/s)",
priority=EventPriority.ERROR,
data={"gpu": gpu, "pcie_speed": speed_val, "expected": 32.0},
console_log=True,
)
except (ValueError, TypeError):
pass

if pcie_data.replay_count is not None and pcie_data.replay_count > 0:
self._log_event(
category=EventCategory.IO,
description=f"GPU: {gpu} has PCIe replay count: {pcie_data.replay_count}",
priority=EventPriority.WARNING,
data={"gpu": gpu, "replay_count": pcie_data.replay_count},
console_log=True,
)

if (
pcie_data.replay_roll_over_count is not None
and pcie_data.replay_roll_over_count > 0
):
self._log_event(
category=EventCategory.IO,
description=f"GPU: {gpu} has PCIe replay rollover count: {pcie_data.replay_roll_over_count}",
priority=EventPriority.WARNING,
data={"gpu": gpu, "replay_roll_over_count": pcie_data.replay_roll_over_count},
console_log=True,
)

if pcie_data.l0_to_recovery_count is not None:
if pcie_data.l0_to_recovery_count > l0_to_recovery_count_error_threshold:
self._log_event(
category=EventCategory.IO,
description=f"GPU: {gpu} has {pcie_data.l0_to_recovery_count} L0 recoveries",
priority=EventPriority.ERROR,
data={
"gpu": gpu,
"l0_to_recovery_count": pcie_data.l0_to_recovery_count,
"error_threshold": l0_to_recovery_count_error_threshold,
},
console_log=True,
)
elif pcie_data.l0_to_recovery_count > l0_to_recovery_count_warning_threshold:
self._log_event(
category=EventCategory.IO,
description=f"GPU: {gpu} has {pcie_data.l0_to_recovery_count} L0 recoveries",
priority=EventPriority.WARNING,
data={
"gpu": gpu,
"l0_to_recovery_count": pcie_data.l0_to_recovery_count,
"warning_threshold": l0_to_recovery_count_warning_threshold,
},
console_log=True,
)

if pcie_data.nak_sent_count is not None and pcie_data.nak_sent_count > 0:
self._log_event(
category=EventCategory.IO,
description=f"GPU: {gpu} has sent {pcie_data.nak_sent_count} PCIe NAKs",
priority=EventPriority.WARNING,
data={"gpu": gpu, "nak_sent_count": pcie_data.nak_sent_count},
console_log=True,
)

if pcie_data.nak_received_count is not None and pcie_data.nak_received_count > 0:
self._log_event(
category=EventCategory.IO,
description=f"GPU: {gpu} has received {pcie_data.nak_received_count} PCIe NAKs",
priority=EventPriority.WARNING,
data={"gpu": gpu, "nak_received_count": pcie_data.nak_received_count},
console_log=True,
)

def check_amdsmi_metric_ecc_totals(self, amdsmi_metric_data: list[AmdSmiMetric]):
"""Check ECC totals for all GPUs

Raises errors for uncorrectable errors, warnings for correctable and deferred.

Args:
amdsmi_metric_data (list[AmdSmiMetric]): AmdSmiMetric data model
"""
for metric in amdsmi_metric_data:
ecc_totals = metric.ecc
gpu = metric.gpu

ecc_checks: list[tuple[EventPriority, Optional[int], str]] = [
(
EventPriority.WARNING,
ecc_totals.total_correctable_count,
"Total correctable ECC errors",
),
(
EventPriority.ERROR,
ecc_totals.total_uncorrectable_count,
"Total uncorrectable ECC errors",
),
(
EventPriority.WARNING,
ecc_totals.total_deferred_count,
"Total deferred ECC errors",
),
(
EventPriority.WARNING,
ecc_totals.cache_correctable_count,
"Cache correctable ECC errors",
),
(
EventPriority.ERROR,
ecc_totals.cache_uncorrectable_count,
"Cache uncorrectable ECC errors",
),
]

for priority, count, desc in ecc_checks:
if count is not None and count > 0:
self._log_event(
category=EventCategory.RAS,
description=f"GPU: {gpu} has {desc}: {count}",
priority=priority,
data={"gpu": gpu, "error_count": count, "error_type": desc},
console_log=True,
)

def check_amdsmi_metric_ecc(self, amdsmi_metric_data: list[AmdSmiMetric]):
"""Check ECC counts in all blocks for all GPUs

Raises errors for uncorrectable errors, warnings for correctable and deferred.

Args:
amdsmi_metric_data (list[AmdSmiMetric]): AmdSmiMetric data model
"""
for metric in amdsmi_metric_data:
gpu = metric.gpu
ecc_blocks = metric.ecc_blocks

# Skip if ecc_blocks is a string (e.g., "N/A") or empty
if isinstance(ecc_blocks, str) or not ecc_blocks:
continue

for block_name, ecc_data in ecc_blocks.items():
if not isinstance(ecc_data, EccData):
continue

if ecc_data.correctable_count is not None and ecc_data.correctable_count > 0:
self._log_event(
category=EventCategory.RAS,
description=f"GPU: {gpu} has correctable ECC errors in block {block_name}",
priority=EventPriority.WARNING,
data={
"gpu": gpu,
"block": block_name,
"correctable_count": ecc_data.correctable_count,
},
console_log=True,
)

if ecc_data.uncorrectable_count is not None and ecc_data.uncorrectable_count > 0:
self._log_event(
category=EventCategory.RAS,
description=f"GPU: {gpu} has uncorrectable ECC errors in block {block_name}",
priority=EventPriority.ERROR,
data={
"gpu": gpu,
"block": block_name,
"uncorrectable_count": ecc_data.uncorrectable_count,
},
console_log=True,
)

if ecc_data.deferred_count is not None and ecc_data.deferred_count > 0:
self._log_event(
category=EventCategory.RAS,
description=f"GPU: {gpu} has deferred ECC errors in block {block_name}",
priority=EventPriority.WARNING,
data={
"gpu": gpu,
"block": block_name,
"deferred_count": ecc_data.deferred_count,
},
console_log=True,
)

def expected_gpu_processes(
self, processes_data: Optional[list[Processes]], max_num_processes: int
):
Expand Down Expand Up @@ -398,8 +623,6 @@ def check_expected_memory_partition_mode(
}
)

# accelerator currently not avaialbe in API

if bad_memory_partition_mode_gpus:
self._log_event(
category=EventCategory.PLATFORM,
Expand Down Expand Up @@ -429,6 +652,16 @@ def analyze_data(
if args is None:
args = AmdSmiAnalyzerArgs()

if data.metric is not None and len(data.metric) > 0:
if args.l0_to_recovery_count_error_threshold is not None:
self.check_amdsmi_metric_pcie(
data.metric,
args.l0_to_recovery_count_error_threshold,
args.l0_to_recovery_count_warning_threshold or 1,
)
self.check_amdsmi_metric_ecc_totals(data.metric)
self.check_amdsmi_metric_ecc(data.metric)

if args.expected_gpu_processes:
self.expected_gpu_processes(data.process, args.expected_gpu_processes)

Expand Down
63 changes: 51 additions & 12 deletions nodescraper/plugins/inband/amdsmi/amdsmi_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _run_amd_smi(self, cmd: str) -> Optional[str]:
or "User is missing the following required groups" in cmd_ret.stdout
)

# Check for known amd-smi internal bugs
# Check for known amd-smi internal errors
is_amdsmi_internal_error = any(
pattern in cmd_ret.stderr for pattern in ["KeyError:", "AttributeError:", "IndexError:"]
)
Expand Down Expand Up @@ -183,19 +183,50 @@ def _run_amd_smi_dict(self, cmd: str) -> Optional[Union[dict, list[dict]]]:
cmd_ret = self._run_amd_smi(cmd)
if cmd_ret:
try:
# Try to parse as single JSON first
return json.loads(cmd_ret)
except json.JSONDecodeError as e:
self._log_event(
category=EventCategory.APPLICATION,
description=f"Error parsing command: `{cmd}` json data",
data={
"cmd": cmd,
"exception": get_exception_traceback(e),
},
priority=EventPriority.ERROR,
console_log=True,
)
return None
# try to extract and parse multiple JSON objects
try:
json_objects = []
decoder = json.JSONDecoder()
idx = 0
cmd_ret_stripped = cmd_ret.strip()

while idx < len(cmd_ret_stripped):
while idx < len(cmd_ret_stripped) and cmd_ret_stripped[idx].isspace():
idx += 1

if idx >= len(cmd_ret_stripped):
break

if cmd_ret_stripped[idx] not in ["{", "["]:
break

try:
obj, end_idx = decoder.raw_decode(cmd_ret_stripped, idx)
json_objects.append(obj)
idx = end_idx
except json.JSONDecodeError:
break

if json_objects:
return json_objects if len(json_objects) > 1 else json_objects[0]
else:
raise

except Exception:
self._log_event(
category=EventCategory.APPLICATION,
description=f"Error parsing command: `{cmd}` json data",
data={
"cmd": cmd,
"exception": get_exception_traceback(e),
},
priority=EventPriority.ERROR,
console_log=True,
)
return None
return None

def _to_number(self, v: object) -> Optional[Union[int, float]]:
Expand Down Expand Up @@ -498,7 +529,15 @@ def get_partition(self) -> Optional[Partition]:
memparts: list[PartitionMemory] = []
computeparts: list[PartitionCompute] = []

# Flatten multi-JSON results (partition command returns multiple JSON arrays)
flattened_data = []
for item in partition_data:
if isinstance(item, list):
flattened_data.extend(item)
elif isinstance(item, dict):
flattened_data.append(item)

for item in flattened_data:
if not isinstance(item, dict):
continue

Expand Down
Loading