Skip to content
19 changes: 17 additions & 2 deletions nodescraper/interfaces/dataplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,15 @@ def collect(
Union[SystemInteractionLevel, str]
] = SystemInteractionLevel.INTERACTIVE,
preserve_connection: bool = False,
collection_args: Optional[Union[TCollectArg, dict]] = None,
collection_args: Optional[TCollectArg] = None,
) -> TaskResult:
"""Run data collector task

Args:
max_event_priority_level (Union[EventPriority, str], optional): priority limit for events. Defaults to EventPriority.CRITICAL.
system_interaction_level (Union[SystemInteractionLevel, str], optional): system interaction level. Defaults to SystemInteractionLevel.INTERACTIVE.
preserve_connection (bool, optional): whether we should close the connection after data collection. Defaults to False.
collection_args (Optional[Union[TCollectArg , dict]], optional): args for data collection. Defaults to None.
collection_args (Optional[TCollectArg], optional): args for data collection (validated model). Defaults to None.

Returns:
TaskResult: task result for data collection
Expand Down Expand Up @@ -195,6 +195,13 @@ def collect(
message="Connection not available, data collection skipped",
)
else:
if (
collection_args is not None
and isinstance(collection_args, dict)
and hasattr(self, "COLLECTOR_ARGS")
and self.COLLECTOR_ARGS is not None
):
collection_args = self.COLLECTOR_ARGS.model_validate(collection_args)

collection_task = self.COLLECTOR(
system_info=self.system_info,
Expand Down Expand Up @@ -264,6 +271,14 @@ def analyze(
)
return self.analysis_result

if (
analysis_args is not None
and isinstance(analysis_args, dict)
and hasattr(self, "ANALYZER_ARGS")
and self.ANALYZER_ARGS is not None
):
analysis_args = self.ANALYZER_ARGS.model_validate(analysis_args)

analyzer_task = self.ANALYZER(
self.system_info,
logger=self.logger,
Expand Down
29 changes: 29 additions & 0 deletions nodescraper/plugins/inband/sys_settings/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
from .analyzer_args import SysfsCheck, SysSettingsAnalyzerArgs
from .sys_settings_plugin import SysSettingsPlugin

__all__ = ["SysSettingsPlugin", "SysSettingsAnalyzerArgs", "SysfsCheck"]
83 changes: 83 additions & 0 deletions nodescraper/plugins/inband/sys_settings/analyzer_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
from typing import Optional

from pydantic import BaseModel, Field

from nodescraper.models import AnalyzerArgs


class SysfsCheck(BaseModel):
"""One sysfs check: path to read, acceptable values or pattern, and display name.

For file paths: use expected (list of acceptable values); if empty, check passes.
For directory paths: use pattern (regex); at least one directory entry must match (e.g. ^hsn[0-9]+).
"""

path: str
expected: list[str] = Field(default_factory=list)
name: str
pattern: Optional[str] = None


class SysSettingsAnalyzerArgs(AnalyzerArgs):
"""Sysfs settings for analysis via a list of checks (path, expected values, name).

The path in each check is the sysfs path to read; the collector uses these paths
when collection_args is derived from analysis_args (e.g. by the plugin).
"""

checks: Optional[list[SysfsCheck]] = None

def paths_to_collect(self) -> list[str]:
"""Return unique sysfs file paths from checks (those without pattern), for use by the collector."""
if not self.checks:
return []
seen = set()
out = []
for c in self.checks:
if c.pattern:
continue
p = c.path.rstrip("/")
if p not in seen:
seen.add(p)
out.append(c.path)
return out

def paths_to_list(self) -> list[str]:
"""Return unique sysfs directory paths from checks (those with pattern), for listing (ls)."""
if not self.checks:
return []
seen = set()
out = []
for c in self.checks:
if not c.pattern:
continue
p = c.path.rstrip("/")
if p not in seen:
seen.add(p)
out.append(c.path)
return out
37 changes: 37 additions & 0 deletions nodescraper/plugins/inband/sys_settings/collector_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
from pydantic import BaseModel


class SysSettingsCollectorArgs(BaseModel):
"""Collection args for SysSettingsCollector.

paths: sysfs paths to read (cat).
directory_paths: sysfs paths to list (ls -1); use for checks that match entry names by regex.
"""

paths: list[str] = []
directory_paths: list[str] = []
161 changes: 161 additions & 0 deletions nodescraper/plugins/inband/sys_settings/sys_settings_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
import re
from typing import List, Optional, Union, cast

from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus
from nodescraper.interfaces import DataAnalyzer
from nodescraper.models import TaskResult

from .analyzer_args import SysSettingsAnalyzerArgs
from .sys_settings_data import SysSettingsDataModel


def _get_actual_for_path(data: SysSettingsDataModel, path: str) -> Optional[str]:
"""Return the actual value from the data model for the given sysfs path.

Args:
data: Collected sysfs readings (path -> value).
path: Sysfs path (with or without trailing slash).

Returns:
Normalized value for that path, or None if not present.
"""
value = data.readings.get(path) or data.readings.get(path.rstrip("/"))
return (value or "").strip().lower() if value is not None else None


class SysSettingsAnalyzer(DataAnalyzer[SysSettingsDataModel, SysSettingsAnalyzerArgs]):
"""Check sysfs settings against expected values from the checks list."""

DATA_MODEL = SysSettingsDataModel

def analyze_data(
self, data: SysSettingsDataModel, args: Optional[SysSettingsAnalyzerArgs] = None
) -> TaskResult:
"""Compare sysfs data to expected settings from args.checks.

Args:
data: Collected sysfs readings to check.
args: Analyzer args with checks (path, expected, name). If None or no checks, returns OK.

Returns:
TaskResult with status OK if all checks pass, ERROR if any mismatch or missing path.
"""
mismatches: dict[str, dict[str, Union[Optional[str], List[str]]]] = {}

if not args or not args.checks:
self.result.status = ExecutionStatus.OK
self.result.message = "No checks configured."
return self.result

for check in args.checks:
raw_reading = data.readings.get(check.path) or data.readings.get(check.path.rstrip("/"))

if check.pattern:
# Directory-listing check: at least one line must match the regex (e.g. ^hsn[0-9]+)
if raw_reading is None:
mismatches[check.name] = {
"path": check.path,
"pattern": check.pattern,
"actual": None,
"reason": "path not collected by this plugin",
}
continue
try:
pat = re.compile(check.pattern)
except re.error:
mismatches[check.name] = {
"path": check.path,
"pattern": check.pattern,
"reason": "invalid regex",
}
continue
lines = [ln.strip() for ln in raw_reading.splitlines() if ln.strip()]
if not any(pat.search(ln) for ln in lines):
mismatches[check.name] = {
"path": check.path,
"pattern": check.pattern,
"actual": lines,
}
continue

actual = _get_actual_for_path(data, check.path)
if actual is None:
mismatches[check.name] = {
"path": check.path,
"expected": check.expected,
"actual": None,
"reason": "path not collected by this plugin",
}
continue

if not check.expected:
continue
expected_normalized = [e.strip().lower() for e in check.expected]
if actual not in expected_normalized:
raw = data.readings.get(check.path) or data.readings.get(check.path.rstrip("/"))
mismatches[check.name] = {
"path": check.path,
"expected": check.expected,
"actual": raw,
}

if mismatches:
self.result.status = ExecutionStatus.ERROR
parts = []
for name, info in mismatches.items():
path = info.get("path", "")
reason = info.get("reason")
pattern = info.get("pattern")
if reason:
part = f"{name} ({path}): {reason}"
elif pattern is not None:
part = f"{name} ({path}): no entry matching pattern {pattern!r}"
else:
expected = info.get("expected")
actual = cast(Optional[str], info.get("actual"))
part = f"{name} ({path}): expected one of {expected}, actual {repr(actual)}"
parts.append(part)
self.result.message = "Sysfs mismatch: " + "; ".join(parts)
self._log_event(
category=EventCategory.OS,
description="Sysfs mismatch detected",
data=mismatches,
priority=EventPriority.ERROR,
console_log=True,
)
else:
self._log_event(
category=EventCategory.OS,
description="Sysfs settings match expected",
priority=EventPriority.INFO,
console_log=True,
)
self.result.status = ExecutionStatus.OK
self.result.message = "Sysfs settings as expected."

return self.result
Loading