|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | | -import ast |
4 | 3 | import functools |
5 | 4 | import logging |
6 | 5 | import warnings |
@@ -296,33 +295,55 @@ def _apply_action_to_fields( |
296 | 295 |
|
297 | 296 | def _apply_masking_rules(self, data: dict, masking_rules: dict) -> dict: |
298 | 297 | """ |
299 | | - Apply masking rules to data, supporting different rules for each field. |
| 298 | + Apply masking rules to data, supporting both simple field names and complex path expressions. |
| 299 | +
|
| 300 | + Args: |
| 301 | + data: The dictionary containing data to mask |
| 302 | + masking_rules: Dictionary mapping field names or path expressions to masking rules |
| 303 | +
|
| 304 | + Returns: |
| 305 | + dict: The masked data dictionary |
300 | 306 | """ |
301 | 307 | result = data.copy() |
302 | 308 |
|
303 | 309 | for path, rule in masking_rules.items(): |
304 | 310 | try: |
305 | | - # Handle nested paths (e.g., 'address.street') |
306 | | - parts = path.split(".") |
307 | | - current = result |
308 | | - |
309 | | - for part in parts[:-1]: |
310 | | - if isinstance(current[part], str) and current[part].startswith("{"): |
311 | | - try: |
312 | | - current[part] = ast.literal_eval(current[part]) |
313 | | - except (ValueError, SyntaxError): |
314 | | - continue |
315 | | - current = current[part] |
316 | | - |
317 | | - final_field = parts[-1] |
318 | | - |
319 | | - # Apply masking rule to the target field |
320 | | - if final_field in current: |
321 | | - current[final_field] = self.provider.erase(str(current[final_field]), **rule) |
322 | | - |
323 | | - except (KeyError, TypeError, AttributeError): |
324 | | - # Log warning if field not found or invalid path |
325 | | - warnings.warn(f"Could not apply masking rule for path: {path}", stacklevel=2) |
| 311 | + if ".." in path: |
| 312 | + # Handle recursive descent paths (e.g., "address..name") |
| 313 | + base_path, field = path.split("..") |
| 314 | + jsonpath_expr = parse(f"$.{base_path}..{field}") |
| 315 | + elif "[" in path: |
| 316 | + # Handle array notation paths (e.g., "address[*].street") |
| 317 | + jsonpath_expr = parse(f"$.{path}") |
| 318 | + else: |
| 319 | + # Handle simple field names (e.g., "email") |
| 320 | + jsonpath_expr = parse(f"$.{path}") |
| 321 | + |
| 322 | + matches = jsonpath_expr.find(result) |
| 323 | + |
| 324 | + if not matches: |
| 325 | + warnings.warn(f"No matches found for path: {path}", stacklevel=2) |
| 326 | + continue |
| 327 | + |
| 328 | + for match in matches: |
| 329 | + try: |
| 330 | + value = match.value |
| 331 | + if value is not None: |
| 332 | + if isinstance(value, dict): |
| 333 | + # Handle dictionary values by masking each field |
| 334 | + for k, v in value.items(): |
| 335 | + if v is not None: |
| 336 | + value[k] = self.provider.erase(str(v), **rule) |
| 337 | + else: |
| 338 | + masked_value = self.provider.erase(str(value), **rule) |
| 339 | + match.full_path.update(result, masked_value) |
| 340 | + |
| 341 | + except Exception as e: |
| 342 | + warnings.warn(f"Error masking value for path {path}: {str(e)}", stacklevel=2) |
| 343 | + continue |
| 344 | + |
| 345 | + except Exception as e: |
| 346 | + warnings.warn(f"Error processing path {path}: {str(e)}", stacklevel=2) |
326 | 347 | continue |
327 | 348 |
|
328 | 349 | return result |
|
0 commit comments