Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 58 additions & 10 deletions nodescraper/plugins/inband/package/package_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def regex_version_data(
package_data: dict[str, str],
key_search: re.Pattern[str],
value_search: Optional[Pattern[str]],
) -> bool:
) -> tuple[bool, list[tuple[str, str, str]]]:
"""Searches the package values for the key and value search patterns

Args:
Expand All @@ -53,10 +53,12 @@ def regex_version_data(
value_search (Optional[Pattern[str]]): a compiled regex pattern to search for the package version, if None then any version is accepted

Returns:
bool: A boolean indicating if the value was found
tuple: (value_found, version_mismatches) where value_found is a bool and
version_mismatches is a list of (package_name, expected_pattern, found_version) tuples
"""

value_found = False
version_mismatches = []
for name, version in package_data.items():
self.logger.debug("Package data: %s, %s", name, version)
key_search_res = key_search.search(name)
Expand All @@ -66,6 +68,7 @@ def regex_version_data(
continue
value_search_res = value_search.search(version)
if not value_search_res:
version_mismatches.append((name, value_search.pattern, version))
self._log_event(
EventCategory.APPLICATION,
f"Package {key_search.pattern} Version Mismatch, Expected {value_search.pattern} but found {version}",
Expand All @@ -77,7 +80,7 @@ def regex_version_data(
"found_version": version,
},
)
return value_found
return value_found, version_mismatches

def package_regex_search(
self, package_data: dict[str, str], exp_package_data: dict[str, Optional[str]]
Expand All @@ -87,16 +90,23 @@ def package_regex_search(
Args:
package_data (dict[str, str]): a dictionary of package names and versions
exp_package_data (dict[str, Optional[str]]): a dictionary of expected package names and versions

Returns:
tuple: (not_found_keys, regex_errors, version_mismatches) containing lists of errors
"""
not_found_keys = []
regex_errors = []
version_mismatches = []

for exp_key, exp_value in exp_package_data.items():
try:
if exp_value is not None:
value_search = re.compile(exp_value)
else:
value_search = None
key_search = re.compile(exp_key)
except re.error:
except re.error as e:
regex_errors.append((exp_key, exp_value, str(e)))
self._log_event(
EventCategory.RUNTIME,
f"Regex Compile Error either {exp_key} {exp_value}",
Expand All @@ -108,10 +118,13 @@ def package_regex_search(
)
continue

key_found = self.regex_version_data(package_data, key_search, value_search)
key_found, mismatches = self.regex_version_data(package_data, key_search, value_search)

# Collect version mismatches
version_mismatches.extend(mismatches)

if not key_found:
not_found_keys.append(exp_key)
not_found_keys.append((exp_key, exp_value))
self._log_event(
EventCategory.APPLICATION,
f"Package {exp_key} not found in the package list",
Expand All @@ -123,7 +136,8 @@ def package_regex_search(
"found_version": None,
},
)
return not_found_keys

return not_found_keys, regex_errors, version_mismatches

def package_exact_match(
self, package_data: dict[str, str], exp_package_data: dict[str, Optional[str]]
Expand Down Expand Up @@ -190,9 +204,43 @@ def analyze_data(
return self.result

if args.regex_match:
not_found_keys = self.package_regex_search(data.version_info, args.exp_package_ver)
self.result.message = f"Packages not found: {not_found_keys}"
self.result.status = ExecutionStatus.ERROR
not_found_keys, regex_errors, version_mismatches = self.package_regex_search(
data.version_info, args.exp_package_ver
)

# Adding details for err message
error_parts = []
if not_found_keys:
packages_detail = ", ".join(
[
f"'{pkg}' (expected version: {ver if ver else 'any'})"
for pkg, ver in not_found_keys
]
)
error_parts.append(f"Packages not found: {packages_detail}")

if regex_errors:
regex_detail = ", ".join(
[f"'{pkg}' pattern (version: {ver})" for pkg, ver, _ in regex_errors]
)
error_parts.append(f"Regex compile errors: {regex_detail}")

if version_mismatches:
version_detail = ", ".join(
[
f"'{pkg}' (expected: {exp}, found: {found})"
for pkg, exp, found in version_mismatches
]
)
error_parts.append(f"Version mismatches: {version_detail}")

total_errors = len(not_found_keys) + len(regex_errors) + len(version_mismatches)
if total_errors > 0:
self.result.message = f"{'; '.join(error_parts)}"
self.result.status = ExecutionStatus.ERROR
else:
self.result.message = "All packages found and versions matched"
self.result.status = ExecutionStatus.OK
else:
self.logger.info("Expected packages: %s", list(args.exp_package_ver.keys()))
not_found_match, not_found_version = self.package_exact_match(
Expand Down
18 changes: 18 additions & 0 deletions test/unit/plugin/test_package_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,22 @@ def test_data_version_regex(package_analyzer, default_data_lib):
regex_match=True,
)
res = package_analyzer.analyze_data(default_data_lib, args=args)
assert res.status == ExecutionStatus.OK
assert res.message == "All packages found and versions matched"


def test_data_multiple_errors_regex(package_analyzer, default_data_lib):
"""Test that detailed error messages are shown for multiple package errors"""
args = PackageAnalyzerArgs(
exp_package_ver={
"missing-package": None,
"test-ubuntu-package\\.x86_64": "2\\.\\d+",
"another-missing": "1\\.0",
},
regex_match=True,
)
res = package_analyzer.analyze_data(default_data_lib, args=args)
assert res.status == ExecutionStatus.ERROR
assert "missing-package" in res.message
assert "another-missing" in res.message
assert len(res.events) == 3