Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- **security:** keep spoofed built-in Keras `registered_name` values from hiding non-allowlisted custom modules in `.keras` ZIP scans
- **keras:** suppress duplicate custom-object warnings for allowlisted registered objects when module metadata is absent
- **security:** stop auto-applying local `.modelaudit.toml` and `pyproject.toml` rule config during scans unless a human explicitly trusts that config in an interactive scan; remembered trust is stored securely under the local ModelAudit cache and invalidated when the config changes
- **telemetry:** preserve secret-scrubbed model references in telemetry payloads while omitting raw credentials, query strings, and local directory paths
- **cli:** preserve original local files during `--stream` directory scans instead of unlinking them after analysis
Expand Down
55 changes: 34 additions & 21 deletions modelaudit/scanners/keras_zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,47 +135,60 @@ def _is_allowlisted_keras_module(module_value: Any) -> bool:
return False
return module_value.strip().split(".")[0] in _SAFE_KERAS_MODULE_ROOTS

def _layer_uses_allowlisted_module(self, layer: dict[str, Any]) -> bool:
def _iter_layer_module_references(self, layer: dict[str, Any]) -> list[str]:
layer_config = layer.get("config", {})
if not isinstance(layer_config, dict):
layer_config = {}

module_references: list[str] = []
for key in ("module", "fn_module"):
if self._is_allowlisted_keras_module(layer.get(key)):
return True
if self._is_allowlisted_keras_module(layer_config.get(key)):
return True
return False
for value in (layer.get(key), layer_config.get(key)):
if isinstance(value, str) and value.strip():
module_references.append(value.strip())
return module_references

def _layer_uses_allowlisted_module(self, layer: dict[str, Any]) -> bool:
return any(
self._is_allowlisted_keras_module(module_value)
for module_value in self._iter_layer_module_references(layer)
)

def _layer_uses_non_allowlisted_module(self, layer: dict[str, Any]) -> bool:
return any(
not self._is_allowlisted_keras_module(module_value)
for module_value in self._iter_layer_module_references(layer)
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@staticmethod
def _is_known_safe_allowlisted_registered_object(identifier: Any) -> bool:
return isinstance(identifier, str) and identifier.strip().lower() in _SAFE_ALLOWLISTED_REGISTERED_OBJECTS

def _is_known_safe_serialized_layer(self, layer: dict[str, Any]) -> bool:
layer_class = layer.get("class_name")
if is_known_safe_keras_layer_class(layer_class):
return True

return self._layer_uses_allowlisted_module(layer) and self._is_known_safe_allowlisted_registered_object(
if is_known_safe_keras_layer_class(layer_class) or self._is_known_safe_allowlisted_registered_object(
layer_class
)
):
return not self._layer_uses_non_allowlisted_module(layer)

Comment on lines +168 to +172
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Match allowlisted registered objects against registered_name, not only class_name.

Line 151 and Line 167 still feed layer_class into _is_known_safe_allowlisted_registered_object(). A payload like {"class_name": "Wrapper", "registered_name": "NotEqual", "config": {}} will still be reported as a custom layer/object even though the registered object is allowlisted and there are no unsafe module refs. Please key this safe-object path off registered_name here and add a regression where class_name != registered_name.

🔧 Suggested direction
     def _is_known_safe_serialized_layer(self, layer: dict[str, Any]) -> bool:
         layer_class = layer.get("class_name")
+        registered_name = layer.get("registered_name")
         if is_known_safe_keras_layer_class(layer_class) or self._is_known_safe_allowlisted_registered_object(
-            layer_class
+            registered_name
+        ) or self._is_known_safe_allowlisted_registered_object(
+            layer_class
         ):
             return not self._layer_uses_non_allowlisted_module(layer)

         return False
...
         layer_class = layer.get("class_name")
         if isinstance(layer_class, str) and normalized_registered_name == layer_class.strip():
-            if self._is_known_safe_serialized_layer(layer) or self._is_known_safe_allowlisted_registered_object(
-                layer_class
-            ):
+            if self._is_known_safe_serialized_layer(layer) or self._is_known_safe_allowlisted_registered_object(
+                normalized_registered_name
+            ):
                 return has_non_allowlisted_module
             return True

Based on learnings: Preserve or strengthen security detections; test both benign and malicious samples when adding scanner/feature changes.

Also applies to: 167-170

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@modelaudit/scanners/keras_zip_scanner.py` around lines 151 - 155, The check
that treats registered allowlisted objects as safe currently passes layer_class
into _is_known_safe_allowlisted_registered_object; change the call sites in the
Keras ZIP scanner (the branch that also checks is_known_safe_keras_layer_class)
to pass the layer's registered_name (not class_name) when available, then ensure
_is_known_safe_allowlisted_registered_object uses registered_name for matching;
keep the fallback to class_name only if registered_name is missing. Also update
the related branch that calls _layer_uses_non_allowlisted_module to use the same
registered_name logic and add a regression test that constructs a layer with
class_name != registered_name (e.g.,
{"class_name":"Wrapper","registered_name":"NotEqual","config":{}}) asserting it
is treated as allowlisted and reported safe.

return False

def _should_flag_registered_object(self, layer: dict[str, Any]) -> bool:
registered_name = layer.get("registered_name")
if not isinstance(registered_name, str) or not registered_name.strip():
return False

if is_known_safe_keras_layer_class(registered_name):
return False

normalized_registered_name = registered_name.strip()
has_non_allowlisted_module = self._layer_uses_non_allowlisted_module(layer)
layer_class = layer.get("class_name")
if isinstance(layer_class, str) and registered_name.strip() == layer_class.strip():
if self._is_known_safe_serialized_layer(layer):
return False
return not (
self._layer_uses_allowlisted_module(layer)
and self._is_known_safe_allowlisted_registered_object(layer_class)
)
if isinstance(layer_class, str) and normalized_registered_name == layer_class.strip():
if self._is_known_safe_serialized_layer(layer) or self._is_known_safe_allowlisted_registered_object(
layer_class
):
return has_non_allowlisted_module
return True

if is_known_safe_keras_layer_class(normalized_registered_name):
return has_non_allowlisted_module

return True

Expand Down
122 changes: 122 additions & 0 deletions tests/scanners/test_keras_zip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,97 @@ def test_registered_builtin_layer_does_not_false_positive(self, tmp_path: Path)
assert all(check.name != "Custom Layer Class Detection" for check in result.checks)
assert all(check.name != "Custom Object Detection" for check in result.checks)

def test_builtin_registered_name_with_non_allowlisted_module_is_flagged(self, tmp_path: Path) -> None:
"""Spoofed built-in registered names must not hide custom modules."""
scanner = KerasZipScanner()
config = {
"class_name": "Functional",
"config": {
"layers": [
{
"class_name": "InputLayer",
"name": "input_1",
"config": {"batch_shape": [None, 4]},
},
{
"class_name": "Add",
"name": "spoofed_add",
"module": "evil.module",
"registered_name": "Add",
"config": {},
},
]
},
}

result = scanner.scan(
str(create_configured_keras_zip(tmp_path, config, file_name="spoofed_builtin_registered.keras"))
)

custom_object_checks = [check for check in result.checks if check.name == "Custom Object Detection"]
assert len(custom_object_checks) == 1
assert custom_object_checks[0].details["registered_name"] == "Add"

def test_builtin_registered_name_with_mixed_module_refs_is_flagged(self, tmp_path: Path) -> None:
"""A safe module reference must not suppress a separate unsafe one."""
scanner = KerasZipScanner()
config = {
"class_name": "Functional",
"config": {
"layers": [
{
"class_name": "InputLayer",
"name": "input_1",
"config": {"batch_shape": [None, 4]},
},
{
"class_name": "Add",
"name": "spoofed_add",
"module": "keras.src.ops.numpy",
"registered_name": "Add",
"config": {"fn_module": "evil.module"},
},
]
},
}

result = scanner.scan(
str(create_configured_keras_zip(tmp_path, config, file_name="mixed_builtin_registered.keras"))
)

custom_object_checks = [check for check in result.checks if check.name == "Custom Object Detection"]
assert len(custom_object_checks) == 1
assert custom_object_checks[0].details["registered_name"] == "Add"

def test_builtin_class_with_non_allowlisted_module_and_no_registered_name_is_flagged(self, tmp_path: Path) -> None:
"""Built-in class names must still be flagged when module metadata points outside the allowlist."""
scanner = KerasZipScanner()
config = {
"class_name": "Functional",
"config": {
"layers": [
{
"class_name": "InputLayer",
"name": "input_1",
"config": {"batch_shape": [None, 4]},
},
{
"class_name": "Add",
"name": "spoofed_add",
"module": "evil.module",
"config": {},
},
]
},
}

result = scanner.scan(
str(create_configured_keras_zip(tmp_path, config, file_name="builtin_class_evil_module.keras"))
)

assert any(check.name == "Custom Layer Class Detection" for check in result.checks)
assert any(issue.severity in (IssueSeverity.WARNING, IssueSeverity.CRITICAL) for issue in result.issues)

def test_allowlisted_module_layer_does_not_false_positive(self, tmp_path: Path) -> None:
"""Layers from allowlisted Keras modules should not be treated as custom objects."""
scanner = KerasZipScanner()
Expand Down Expand Up @@ -903,6 +994,37 @@ def test_allowlisted_module_layer_does_not_false_positive(self, tmp_path: Path)
assert all(check.name != "Custom Layer Class Detection" for check in result.checks)
assert all(check.name != "Custom Object Detection" for check in result.checks)

def test_allowlisted_registered_object_without_module_does_not_false_positive_custom_object(
self, tmp_path: Path
) -> None:
"""Allowlisted registered objects should not need module metadata to suppress custom-object warnings."""
scanner = KerasZipScanner()
config = {
"class_name": "Functional",
"config": {
"layers": [
{
"class_name": "InputLayer",
"name": "input_1",
"config": {"batch_shape": [None, 4]},
},
{
"class_name": "NotEqual",
"name": "not_equal",
"registered_name": "NotEqual",
"config": {},
},
]
},
}

result = scanner.scan(
str(create_configured_keras_zip(tmp_path, config, file_name="allowlisted_registered_without_module.keras"))
)

assert all(check.name != "Custom Layer Class Detection" for check in result.checks)
assert all(check.name != "Custom Object Detection" for check in result.checks)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def test_allowlisted_module_does_not_suppress_unknown_custom_layer(self, tmp_path: Path) -> None:
"""Unknown classes must still be flagged even if module metadata looks Keras-owned."""
scanner = KerasZipScanner()
Expand Down
Loading