-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdedupe.py
More file actions
122 lines (107 loc) · 4.89 KB
/
dedupe.py
File metadata and controls
122 lines (107 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from collections import defaultdict
from typing import Dict, List, Any
from socketdev.log import log
class Dedupe:
@staticmethod
def normalize_file_path(path: str) -> str:
return path.split("/", 1)[-1] if path and "/" in path else path or ""
@staticmethod
def alert_key(alert: dict) -> tuple:
return (
alert["type"],
alert["severity"],
alert["category"],
Dedupe.normalize_file_path(alert.get("file")),
alert.get("start"),
alert.get("end")
)
@staticmethod
def consolidate_and_merge_alerts(package_group: List[Dict[str, Any]]) -> Dict[str, Any]:
def alert_identity(alert: dict) -> tuple:
return (
alert["type"],
alert["severity"],
alert["category"],
Dedupe.normalize_file_path(alert.get("file")),
alert.get("start"),
alert.get("end")
)
alert_map: Dict[tuple, dict] = {}
releases = set()
for pkg in package_group:
release = pkg.get("release") if pkg.get("release") is not None else pkg.get("type")
releases.add(release)
for alert in pkg.get("alerts", []):
identity = alert_identity(alert)
file = Dedupe.normalize_file_path(alert.get("file"))
if identity not in alert_map:
alert_map[identity] = {
"key": alert["key"], # keep the first key seen
"type": alert["type"],
"severity": alert["severity"],
"category": alert["category"],
"file": file,
"start": alert.get("start"),
"end": alert.get("end"),
"releases": [release],
"props": alert.get("props", []),
"action": alert["action"]
}
else:
if release not in alert_map[identity]["releases"]:
alert_map[identity]["releases"].append(release)
base = package_group[0]
base["releases"] = sorted(releases)
base["alerts"] = list(alert_map.values())
# Use inputPurl if available and complete, otherwise construct proper purl with namespace
if "inputPurl" in base and "@" in base["inputPurl"]:
# inputPurl has version, use it as-is
base["purl"] = base["inputPurl"]
else:
# Construct purl properly with namespace and version
purl_type = base.get('type', 'unknown')
namespace = base.get('namespace')
name = base.get('name', 'unknown')
version = base.get('version', '0.0.0')
# Start with inputPurl if available (without version) or construct from scratch
if "inputPurl" in base and not "@" in base["inputPurl"]:
# inputPurl exists but lacks version, append it
base["purl"] = f"{base['inputPurl']}@{version}"
else:
# Construct complete purl from components
if namespace:
base["purl"] = f"pkg:{purl_type}/{namespace}/{name}@{version}"
else:
base["purl"] = f"pkg:{purl_type}/{name}@{version}"
return base
@staticmethod
def dedupe(packages: List[Dict[str, Any]], batched: bool = True) -> List[Dict[str, Any]]:
# Always group by inputPurl now, but keep the batched parameter for backward compatibility
grouped = Dedupe.consolidate_by_input_purl(packages)
results = []
for group in grouped.values():
result = Dedupe.consolidate_and_merge_alerts(group)
# Remove batchIndex from the result
if "batchIndex" in result:
del result["batchIndex"]
results.append(result)
return results
@staticmethod
def consolidate_by_input_purl(packages: List[Dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
"""Group packages by their inputPurl field"""
grouped: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
# Handle both list of packages and nested structure
if packages and isinstance(packages[0], list):
# If we get a nested list, flatten it
flat_packages = []
for sublist in packages:
if isinstance(sublist, list):
flat_packages.extend(sublist)
else:
flat_packages.append(sublist)
packages = flat_packages
for pkg in packages:
# inputPurl should always exist now, fallback to purl if not found
group_key = pkg.get("inputPurl", pkg.get("purl", str(hash(str(pkg)))))
grouped[group_key].append(pkg)
return grouped