Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 164 additions & 3 deletions nodescraper/plugins/inband/nvme/nvme_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# SOFTWARE.
#
###############################################################################
import json
import os
import re
from typing import Optional
Expand All @@ -32,14 +33,16 @@
from nodescraper.base import InBandDataCollector
from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus, OSFamily
from nodescraper.models import TaskResult
from nodescraper.utils import bytes_to_human_readable, str_or_none

from .nvmedata import NvmeDataModel
from .nvmedata import NvmeDataModel, NvmeListEntry


class NvmeCollector(InBandDataCollector[NvmeDataModel, None]):
"""Collect NVMe details from the system."""

DATA_MODEL = NvmeDataModel
CMD_LINUX_LIST_JSON = "nvme list -o json"
CMD_LINUX = {
"smart_log": "nvme smart-log {dev}",
"error_log": "nvme error-log {dev} --log-entries=256",
Expand All @@ -54,6 +57,15 @@ class NvmeCollector(InBandDataCollector[NvmeDataModel, None]):

TELEMETRY_FILENAME = "telemetry_log.bin"

def _check_nvme_cli_installed(self) -> bool:
"""Check if the nvme CLI is installed on the system.

Returns:
bool: True if nvme is available, False otherwise.
"""
res = self._run_sut_cmd("which nvme")
return res.exit_code == 0 and bool(res.stdout.strip())

def collect_data(
self,
args=None,
Expand All @@ -73,6 +85,18 @@ def collect_data(
self.result.status = ExecutionStatus.NOT_RAN
return self.result, None

if not self._check_nvme_cli_installed():
self._log_event(
category=EventCategory.SW_DRIVER,
description="nvme CLI not found; install nvme-cli to collect NVMe data",
priority=EventPriority.WARNING,
)
self.result.message = "nvme CLI not found; NVMe collection skipped"
self.result.status = ExecutionStatus.NOT_RAN
return self.result, None

nvme_list_entries = self._collect_nvme_list_entries()

nvme_devices = self._get_nvme_devices()
if not nvme_devices:
self._log_event(
Expand Down Expand Up @@ -115,7 +139,7 @@ def collect_data(

if all_device_data:
try:
nvme_data = NvmeDataModel(devices=all_device_data)
nvme_data = NvmeDataModel(nvme_list=nvme_list_entries, devices=all_device_data)
except ValidationError as exp:
self._log_event(
category=EventCategory.SW_DRIVER,
Expand All @@ -130,7 +154,10 @@ def collect_data(
self._log_event(
category=EventCategory.SW_DRIVER,
description="Collected NVMe data",
data=nvme_data.model_dump(),
data={
"devices": list(nvme_data.devices.keys()),
"nvme_list_entries": len(nvme_data.nvme_list or []),
},
priority=EventPriority.INFO,
)
self.result.message = "NVMe data successfully collected"
Expand All @@ -147,6 +174,140 @@ def collect_data(
self.result.status = ExecutionStatus.ERROR
return self.result, None

def _collect_nvme_list_entries(self) -> Optional[list[NvmeListEntry]]:
"""Run 'nvme list -o json' and parse output into list of NvmeListEntry."""
res = self._run_sut_cmd(self.CMD_LINUX_LIST_JSON, sudo=False)
if res.exit_code == 0 and res.stdout:
entries = self._parse_nvme_list_json(res.stdout.strip())
if not entries:
self._log_event(
category=EventCategory.SW_DRIVER,
description="Parsing of 'nvme list -o json' output failed (no entries from nested or flat format)",
priority=EventPriority.WARNING,
)
return entries
return None

def _parse_nvme_list_json(self, raw: str) -> list[NvmeListEntry]:
"""Parse 'nvme list -o json' output into NvmeListEntry list.

Supports two formats:
- Nested: Devices[] -> Subsystems[] -> Controllers[] -> Namespaces[].
- Flat: Devices[] where each element has DevicePath, SerialNumber, ModelNumber, etc.
"""
try:
data = json.loads(raw)
except json.JSONDecodeError:
return []
devices = data.get("Devices", []) if isinstance(data, dict) else []
if not isinstance(devices, list):
return []
entries = self._parse_nvme_list_nested(devices)
if not entries and devices:
entries = self._parse_nvme_list_flat(devices)
return entries

def _parse_nvme_list_flat(self, devices: list) -> list[NvmeListEntry]:
"""Parse flat 'nvme list -o json' format (one object per namespace in Devices[])."""
entries = []
for dev in devices:
if not isinstance(dev, dict):
continue
if dev.get("DevicePath") is None and dev.get("SerialNumber") is None:
continue
node = str_or_none(dev.get("DevicePath"))
generic_path = str_or_none(dev.get("GenericPath"))
serial_number = str_or_none(dev.get("SerialNumber"))
model = str_or_none(dev.get("ModelNumber"))
fw_rev = str_or_none(dev.get("Firmware"))
name_space = dev.get("NameSpace") or dev.get("NameSpaceId")
nsid = name_space if name_space is not None else dev.get("NSID")
namespace_id = (
f"0x{int(nsid):x}" if isinstance(nsid, (int, float)) else str_or_none(nsid)
)
used_bytes = dev.get("UsedBytes")
physical_size = dev.get("PhysicalSize")
sector_size = dev.get("SectorSize")
if isinstance(used_bytes, (int, float)) and isinstance(physical_size, (int, float)):
usage = (
f"{bytes_to_human_readable(int(used_bytes))} / "
f"{bytes_to_human_readable(int(physical_size))}"
)
else:
usage = None
format_lba = f"{sector_size} B + 0 B" if sector_size is not None else None
entries.append(
NvmeListEntry(
node=node,
generic=generic_path,
serial_number=serial_number,
model=model,
namespace_id=namespace_id,
usage=usage,
format_lba=format_lba,
fw_rev=fw_rev,
)
)
return entries

def _parse_nvme_list_nested(self, devices: list) -> list[NvmeListEntry]:
"""Parse nested 'nvme list -o json' format (Devices -> Subsystems -> Controllers -> Namespaces)."""
entries = []
for dev in devices:
if not isinstance(dev, dict):
continue
subsystems = dev.get("Subsystems") or []
for subsys in subsystems:
if not isinstance(subsys, dict):
continue
controllers = subsys.get("Controllers") or []
for ctrl in controllers:
if not isinstance(ctrl, dict):
continue
serial_number = str_or_none(ctrl.get("SerialNumber"))
model = str_or_none(ctrl.get("ModelNumber"))
fw_rev = str_or_none(ctrl.get("Firmware"))
namespaces = ctrl.get("Namespaces") or []
for ns in namespaces:
if not isinstance(ns, dict):
continue
name_space = ns.get("NameSpace") or ns.get("NameSpaceId")
generic = ns.get("Generic")
nsid = ns.get("NSID")
used_bytes = ns.get("UsedBytes")
physical_size = ns.get("PhysicalSize")
sector_size = ns.get("SectorSize")
node = f"/dev/{name_space}" if name_space else None
generic_path = (
f"/dev/{generic}" if (generic and str(generic).strip()) else None
)
namespace_id = f"0x{nsid:x}" if isinstance(nsid, int) else str_or_none(nsid)
if isinstance(used_bytes, (int, float)) and isinstance(
physical_size, (int, float)
):
usage = (
f"{bytes_to_human_readable(int(used_bytes))} / "
f"{bytes_to_human_readable(int(physical_size))}"
)
else:
usage = None
format_lba = (
f"{sector_size} B + 0 B" if sector_size is not None else None
)
entries.append(
NvmeListEntry(
node=str_or_none(node),
generic=str_or_none(generic_path),
serial_number=serial_number,
model=model,
namespace_id=namespace_id,
usage=usage,
format_lba=format_lba,
fw_rev=fw_rev,
)
)
return entries

def _get_nvme_devices(self) -> list[str]:
nvme_devs = []

Expand Down
27 changes: 25 additions & 2 deletions nodescraper/plugins/inband/nvme/nvmedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,28 @@
###############################################################################
from typing import Optional

from pydantic import BaseModel
from pydantic import BaseModel, Field

from nodescraper.models import DataModel


class NvmeListEntry(BaseModel):
"""One row from 'nvme list': a single NVMe device/namespace line."""

node: Optional[str] = Field(default=None, description="Device node path (e.g. /dev/nvme0n1).")
generic: Optional[str] = Field(
default=None, description="Generic device node (e.g. /dev/ng0n1)."
)
serial_number: Optional[str] = Field(default=None, description="Serial number (SN).")
model: Optional[str] = Field(default=None, description="Model name.")
namespace_id: Optional[str] = Field(default=None, description="Namespace ID.")
usage: Optional[str] = Field(default=None, description="Usage (e.g. capacity).")
format_lba: Optional[str] = Field(
default=None, description="LBA format (sector size + metadata)."
)
fw_rev: Optional[str] = Field(default=None, description="Firmware revision.")


class DeviceNvmeData(BaseModel):
smart_log: Optional[str] = None
error_log: Optional[str] = None
Expand All @@ -42,4 +59,10 @@ class DeviceNvmeData(BaseModel):


class NvmeDataModel(DataModel):
devices: dict[str, DeviceNvmeData]
"""NVMe collection output: parsed 'nvme list' entries and per-device command outputs."""

nvme_list: Optional[list[NvmeListEntry]] = Field(
default=None,
description="Parsed list of NVMe devices from 'nvme list'.",
)
devices: dict[str, DeviceNvmeData] = Field(default_factory=dict)
44 changes: 30 additions & 14 deletions nodescraper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ def get_exception_details(exception: Exception) -> dict:
}


def str_or_none(val: object) -> Optional[str]:
"""Return a stripped string or None.

None input, or a string that is empty/whitespace after stripping, becomes None.
Non-string values are converted to string then stripped. Useful for normalizing
values from JSON, dicts, or user input into Optional[str] for model fields.

Args:
val: Any value (e.g. str, int, None).

Returns:
Stripped non-empty string, or None.
"""
if val is None:
return None
s = val.strip() if isinstance(val, str) else str(val).strip()
return s if s else None


def convert_to_bytes(value: str, si=False) -> int:
"""
Convert human-readable memory sizes (like GB, MB) to bytes.
Expand Down Expand Up @@ -150,26 +169,23 @@ def pascal_to_snake(input_str: str) -> str:


def bytes_to_human_readable(input_bytes: int) -> str:
"""converts a bytes int to a human readable sting in KB, MB, or GB
"""Converts a bytes int to a human-readable string in B, KB, MB, GB, TB, or PB (decimal).

Args:
input_bytes (int): bytes integer

Returns:
str: human readable string
str: human-readable string (e.g. "8.25KB", "7.68TB")
"""
kb = round(float(input_bytes) / 1000, 2)

if kb < 1000:
return f"{kb}KB"

mb = round(kb / 1000, 2)

if mb < 1000:
return f"{mb}MB"

gb = round(mb / 1000, 2)
return f"{gb}GB"
if input_bytes < 0:
return "0B"
if input_bytes == 0:
return "0B"
units = [(10**12, "TB"), (10**9, "GB"), (10**6, "MB"), (10**3, "KB"), (1, "B")]
for scale, label in units:
if input_bytes >= scale:
return f"{round(float(input_bytes) / scale, 2)}{label}"
return "0B"


def find_annotation_in_container(
Expand Down
Loading