Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions nodescraper/plugins/inband/storage/analyzer_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
###############################################################################
from typing import Optional

from pydantic import BaseModel
from pydantic import BaseModel, Field


class StorageAnalyzerArgs(BaseModel):
min_required_free_space_abs: Optional[str] = None
min_required_free_space_prct: Optional[int] = None
ignore_devices: Optional[list] = []
ignore_devices: Optional[list[str]] = Field(default_factory=list)
check_devices: Optional[list[str]] = Field(default_factory=list)
regex_match: bool = False
40 changes: 39 additions & 1 deletion nodescraper/plugins/inband/storage/storage_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# SOFTWARE.
#
###############################################################################
import re
from typing import Optional

from nodescraper.enums import EventCategory, EventPriority, ExecutionStatus
Expand All @@ -39,6 +40,36 @@ class StorageAnalyzer(DataAnalyzer[StorageDataModel, StorageAnalyzerArgs]):

DATA_MODEL = StorageDataModel

def _matches_device_filter(
self, device_name: str, exp_devices: list[str], regex_match: bool
) -> bool:
"""Check if the device name matches any of the expected devices""

Args:
device_name (str): device name to check
exp_devices (list[str]): list of expected devices to match against
regex_match (bool): if True, use regex matching; otherwise, use exact match

Returns:
bool: True if the device name matches any of the expected devices, False otherwise
"""
for exp_device in exp_devices:
if regex_match:
try:
device_regex = re.compile(exp_device)
except re.error:
self._log_event(
category=EventCategory.STORAGE,
description=f"Invalid regex pattern: {exp_device}",
priority=EventPriority.ERROR,
)
continue
if device_regex.match(device_name):
return True
elif device_name == exp_device:
return True
return False

def analyze_data(
self, data: StorageDataModel, args: Optional[StorageAnalyzerArgs] = None
) -> TaskResult:
Expand Down Expand Up @@ -68,8 +99,15 @@ def analyze_data(
return self.result

for device_name, device_data in data.storage_data.items():
if args.ignore_devices and device_name in args.ignore_devices:
if args.check_devices and not self._matches_device_filter(
device_name, args.check_devices, args.regex_match
):
continue
elif args.ignore_devices and self._matches_device_filter(
device_name, args.ignore_devices, args.regex_match
):
continue

condition = False
if args.min_required_free_space_abs:
min_free_abs = convert_to_bytes(args.min_required_free_space_abs)
Expand Down