Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from nodescraper.base import InBandDataCollector
from nodescraper.enums import ExecutionStatus
from nodescraper.models import TaskResult
Expand All @@ -9,7 +11,7 @@ class SampleCollector(InBandDataCollector[SampleDataModel, None]):

DATA_MODEL = SampleDataModel

def collect_data(self, args=None) -> tuple[TaskResult, SampleDataModel | None]:
def collect_data(self, args=None) -> tuple[TaskResult, Optional[SampleDataModel]]:
sample_data = SampleDataModel(some_str="example123")
self.result.message = "Collector ran successfully"
self.result.status = ExecutionStatus.OK
Expand Down
2 changes: 1 addition & 1 deletion nodescraper/plugins/inband/kernel/analyzer_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ def build_from_model(cls, datamodel: KernelDataModel) -> "KernelAnalyzerArgs":
Returns:
KernelAnalyzerArgs: instance of analyzer args class
"""
return cls(exp_kernel=datamodel.kernel_version)
return cls(exp_kernel=datamodel.kernel_info)
54 changes: 48 additions & 6 deletions nodescraper/plugins/inband/kernel/kernel_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# SOFTWARE.
#
###############################################################################
import re
from typing import Optional

from nodescraper.base import InBandDataCollector
Expand All @@ -37,7 +38,31 @@ class KernelCollector(InBandDataCollector[KernelDataModel, None]):

DATA_MODEL = KernelDataModel
CMD_WINDOWS = "wmic os get Version /Value"
CMD = "sh -c 'uname -r'"
CMD = "sh -c 'uname -a'"

def _parse_kernel_version(self, uname_a: str) -> Optional[str]:
"""Extract the kernel release from `uname -a` output.

Args:
uname_a (str): The full output string from the `uname -a` command.

Returns:
Optional[str]: The parsed kernel release (e.g., "5.13.0-30-generic")
if found, otherwise None.
"""
if not uname_a:
return None

result = uname_a.strip().split()
if len(result) >= 3:
return result[2]

# if some change in output look for a version-like string (e.g. 4.18.0-553.el8_10.x86_64)
match = re.search(r"\d+\.\d+\.\d+[\w\-\.]*", uname_a)
if match:
return match.group(0)

return None

def collect_data(
self,
Expand All @@ -51,16 +76,28 @@ def collect_data(
"""

kernel = None
kernel_info = None

if self.system_info.os_family == OSFamily.WINDOWS:
res = self._run_sut_cmd(self.CMD_WINDOWS)
if res.exit_code == 0:
kernel_info = res.stdout
kernel = [line for line in res.stdout.splitlines() if "Version=" in line][0].split(
"="
)[1]
else:
res = self._run_sut_cmd(self.CMD)
if res.exit_code == 0:
kernel = res.stdout
kernel_info = res.stdout
kernel = self._parse_kernel_version(kernel_info)
if not kernel:
self._log_event(
category=EventCategory.OS,
description="Could not extract kernel version from 'uname -a'",
data={"command": res.command, "exit_code": res.exit_code},
priority=EventPriority.ERROR,
console_log=True,
)

if res.exit_code != 0:
self._log_event(
Expand All @@ -71,8 +108,9 @@ def collect_data(
console_log=True,
)

if kernel:
kernel_data = KernelDataModel(kernel_version=kernel)
if kernel_info and kernel:

kernel_data = KernelDataModel(kernel_info=kernel_info, kernel_version=kernel)
self._log_event(
category="KERNEL_READ",
description="Kernel version read",
Expand All @@ -82,6 +120,10 @@ def collect_data(
else:
kernel_data = None

self.result.message = f"Kernel: {kernel}" if kernel else "Kernel not found"
self.result.status = ExecutionStatus.OK if kernel else ExecutionStatus.ERROR
self.result.message = (
"Kernel not found"
if not kernel_info
else f"Kernel info: {kernel_info} | Kernel version: {kernel if kernel else 'Kernel version not found'}"
)
self.result.status = ExecutionStatus.OK if kernel_info else ExecutionStatus.ERROR
return self.result, kernel_data
2 changes: 2 additions & 0 deletions nodescraper/plugins/inband/kernel/kerneldata.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
# SOFTWARE.
#
###############################################################################

from nodescraper.models import DataModel


class KernelDataModel(DataModel):
kernel_info: str
kernel_version: str
9 changes: 6 additions & 3 deletions test/unit/plugin/test_kernel_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@

@pytest.fixture
def model_obj():
return KernelDataModel(kernel_version="5.13.0-30-generic")
return KernelDataModel(
kernel_info="Linux MockSystem 5.13.0-30-generic #1 XYZ Day Month 10 15:19:13 EDT 2024 x86_64 x86_64 x86_64 GNU/Linux",
kernel_version="5.13.0-30-generic",
)


@pytest.fixture
Expand Down Expand Up @@ -118,14 +121,14 @@ def test_invalid_kernel_config(system_info, model_obj, config):


def test_match_regex(system_info, model_obj):
args = KernelAnalyzerArgs(exp_kernel=[r"5.13.\d-\d+-[\w]+"], regex_match=True)
args = KernelAnalyzerArgs(exp_kernel=[r".*5\.13\.\d+-\d+-[\w-]+.*"], regex_match=True)
analyzer = KernelAnalyzer(system_info)
result = analyzer.analyze_data(model_obj, args)
assert result.status == ExecutionStatus.OK


def test_mismatch_regex(system_info, model_obj):
args = KernelAnalyzerArgs(exp_kernel=[r"4.3.\d-\d+-[\w]+"], regex_match=True)
args = KernelAnalyzerArgs(exp_kernel=[r".*4\.13\.\d+-\d+-[\w-]+.*"], regex_match=True)
analyzer = KernelAnalyzer(system_info)
result = analyzer.analyze_data(model_obj, args)

Expand Down
13 changes: 9 additions & 4 deletions test/unit/plugin/test_kernel_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,27 @@ def test_run_windows(collector, conn_mock):

result, data = collector.collect_data()

assert data == KernelDataModel(kernel_version="10.0.19041.1237")
assert data == KernelDataModel(
kernel_info="Version=10.0.19041.1237", kernel_version="10.0.19041.1237"
)
assert result.status == ExecutionStatus.OK


def test_run_linux(collector, conn_mock):
collector.system_info.os_family = OSFamily.LINUX
conn_mock.run_command.return_value = CommandArtifact(
exit_code=0,
stdout="5.4.0-88-generic",
stdout="Linux MockSystem 5.13.0-30-generic #1 XYZ Day Month 10 15:19:13 EDT 2024 x86_64 x86_64 x86_64 GNU/Linux",
stderr="",
command="sh -c 'uname -r'",
command="sh -c 'uname -a'",
)

result, data = collector.collect_data()

assert data == KernelDataModel(kernel_version="5.4.0-88-generic")
assert data == KernelDataModel(
kernel_info="Linux MockSystem 5.13.0-30-generic #1 XYZ Day Month 10 15:19:13 EDT 2024 x86_64 x86_64 x86_64 GNU/Linux",
kernel_version="5.13.0-30-generic",
)
assert result.status == ExecutionStatus.OK


Expand Down
Loading