Skip to content

Commit af6e58f

Browse files
committed
feat: cdx vex
1 parent cba1ae4 commit af6e58f

7 files changed

Lines changed: 550 additions & 1 deletion

File tree

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Generated by Django 5.2.4 on 2025-07-30 12:57
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
("vex", "0007_alter_csaf_tracking_current_release_date_and_more"),
10+
]
11+
12+
operations = [
13+
migrations.AlterField(
14+
model_name="vex_document",
15+
name="type",
16+
field=models.CharField(
17+
choices=[("CSAF", "CSAF"), ("OpenVEX", "OpenVEX"), ("CycloneDX", "CycloneDX")], max_length=16
18+
),
19+
),
20+
]
Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
from rest_framework.exceptions import ValidationError
5+
6+
from application.core.api.serializers_helpers import validate_purl
7+
from application.vex.models import VEX_Document, VEX_Statement
8+
from application.vex.services.vex_engine import apply_vex_statements_after_import
9+
from application.vex.types import (
10+
CycloneDX_Analysis_State,
11+
VEX_Document_Type,
12+
VEX_Status,
13+
)
14+
15+
16+
@dataclass
17+
class CycloneDX_Analysis:
18+
state: str = ""
19+
justification: str = ""
20+
response: list[str] = None
21+
detail: str = ""
22+
first_issued: str = ""
23+
last_updated: str = ""
24+
25+
def __post_init__(self):
26+
if self.response is None:
27+
self.response = []
28+
29+
30+
def parse_cyclonedx_data(data: dict) -> None:
31+
cyclonedx_document = _create_cyclonedx_document(data)
32+
33+
product_purls, vex_statements = _process_vex_statements(data, cyclonedx_document)
34+
35+
apply_vex_statements_after_import(product_purls, vex_statements)
36+
37+
38+
def _create_cyclonedx_document(data: dict) -> VEX_Document:
39+
# Use serial number as document ID, fallback to a generated one
40+
document_id = data.get("serialNumber")
41+
if not document_id:
42+
bom_format = data.get("bomFormat", "CycloneDX")
43+
spec_version = data.get("specVersion", "1.0")
44+
document_id = f"{bom_format}-{spec_version}-{hash(str(data))}"
45+
46+
version = str(data.get("version", 1))
47+
48+
# Extract metadata for author and timestamps
49+
metadata = data.get("metadata", {})
50+
51+
# Get timestamp from metadata or use current time
52+
timestamp = metadata.get("timestamp")
53+
if not timestamp:
54+
# If no timestamp, use a default ISO format string
55+
from datetime import datetime
56+
timestamp = datetime.now().isoformat() + "Z"
57+
58+
# Extract author from tools or component
59+
author = "Unknown"
60+
tools = metadata.get("tools", [])
61+
if tools:
62+
if isinstance(tools, list) and tools:
63+
author = tools[0].get("name", "Unknown")
64+
elif isinstance(tools, dict):
65+
components = tools.get("components", [])
66+
if components:
67+
author = components[0].get("name", "Unknown")
68+
69+
# If no author from tools, try to get from component
70+
if author == "Unknown":
71+
component = metadata.get("component", {})
72+
if component:
73+
author = component.get("name", "Unknown")
74+
75+
try:
76+
cyclonedx_document = VEX_Document.objects.get(document_id=document_id, author=author)
77+
cyclonedx_document.delete()
78+
except VEX_Document.DoesNotExist:
79+
pass
80+
81+
cyclonedx_document = VEX_Document.objects.create(
82+
type=VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX,
83+
document_id=document_id,
84+
version=version,
85+
initial_release_date=timestamp,
86+
current_release_date=timestamp,
87+
author=author,
88+
role="", # CycloneDX doesn't have explicit roles
89+
)
90+
91+
return cyclonedx_document
92+
93+
94+
def _process_vex_statements(data: dict, cyclonedx_document: VEX_Document) -> tuple[set[str], set[VEX_Statement]]:
95+
vulnerabilities = data.get("vulnerabilities", [])
96+
if not vulnerabilities:
97+
raise ValidationError("CycloneDX document doesn't contain any vulnerabilities")
98+
if not isinstance(vulnerabilities, list):
99+
raise ValidationError("vulnerabilities is not a list")
100+
101+
# Build components mapping
102+
components_map = _build_components_map(data)
103+
104+
product_purls: set[str] = set()
105+
vex_statements: set[VEX_Statement] = set()
106+
107+
vulnerability_counter = 0
108+
for vulnerability in vulnerabilities:
109+
if not isinstance(vulnerability, dict):
110+
raise ValidationError(f"vulnerability[{vulnerability_counter}] is not a dictionary")
111+
112+
vulnerability_id = vulnerability.get("id")
113+
if not vulnerability_id:
114+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/id is missing")
115+
116+
analysis = vulnerability.get("analysis", {})
117+
if not analysis:
118+
# Skip vulnerabilities without analysis (not VEX statements)
119+
vulnerability_counter += 1
120+
continue
121+
122+
cyclonedx_analysis = _parse_analysis(analysis, vulnerability_counter)
123+
124+
# Map CycloneDX state to VEX status
125+
vex_status = _map_cyclonedx_state_to_vex_status(cyclonedx_analysis.state)
126+
if not vex_status:
127+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/analysis/state is not valid: {cyclonedx_analysis.state}")
128+
129+
description = vulnerability.get("description", "")
130+
detail = vulnerability.get("detail", "")
131+
if detail:
132+
description += f"\n\n{detail}"
133+
134+
# Build justification from CycloneDX justification
135+
justification = _map_cyclonedx_justification_to_vex_justification(cyclonedx_analysis.justification)
136+
137+
# Build remediation from response and recommendation
138+
remediation = _build_remediation_text(cyclonedx_analysis.response, vulnerability.get("recommendation", ""))
139+
140+
# Process affected components
141+
affects = vulnerability.get("affects", [])
142+
if not affects:
143+
# If no affects, this is a general statement - we'll need a product PURL
144+
# Try to extract from metadata component
145+
component = data.get("metadata", {}).get("component", {})
146+
product_purl = component.get("purl", "")
147+
if product_purl:
148+
validate_purl(product_purl)
149+
_create_vex_statement(
150+
cyclonedx_document, vulnerability_id, description, vex_status,
151+
justification, cyclonedx_analysis.detail, remediation,
152+
product_purl, "", product_purls, vex_statements
153+
)
154+
else:
155+
_process_affected_components(
156+
cyclonedx_document=cyclonedx_document,
157+
product_purls=product_purls,
158+
vex_statements=vex_statements,
159+
vulnerability_counter=vulnerability_counter,
160+
vulnerability_id=vulnerability_id,
161+
description=description,
162+
vex_status=vex_status,
163+
justification=justification,
164+
impact=cyclonedx_analysis.detail,
165+
remediation=remediation,
166+
affects=affects,
167+
components_map=components_map,
168+
)
169+
170+
vulnerability_counter += 1
171+
172+
return product_purls, vex_statements
173+
174+
175+
def _build_components_map(data: dict) -> dict[str, dict]:
176+
"""Build a mapping of bom-ref to component data for quick lookup."""
177+
components_map = {}
178+
179+
# Add root component from metadata
180+
metadata_component = data.get("metadata", {}).get("component")
181+
if metadata_component and metadata_component.get("bom-ref"):
182+
components_map[metadata_component["bom-ref"]] = metadata_component
183+
184+
# Add all components
185+
for component in data.get("components", []):
186+
if component.get("bom-ref"):
187+
components_map[component["bom-ref"]] = component
188+
189+
return components_map
190+
191+
192+
def _parse_analysis(analysis: dict, vulnerability_counter: int) -> CycloneDX_Analysis:
193+
state = analysis.get("state", "")
194+
if not state:
195+
raise ValidationError(f"vulnerability[{vulnerability_counter}]/analysis/state is missing")
196+
197+
justification = analysis.get("justification", "")
198+
response = analysis.get("response", [])
199+
if not isinstance(response, list):
200+
response = []
201+
202+
detail = analysis.get("detail", "")
203+
first_issued = analysis.get("firstIssued", "")
204+
last_updated = analysis.get("lastUpdated", "")
205+
206+
return CycloneDX_Analysis(
207+
state=state,
208+
justification=justification,
209+
response=response,
210+
detail=detail,
211+
first_issued=first_issued,
212+
last_updated=last_updated,
213+
)
214+
215+
216+
def _map_cyclonedx_state_to_vex_status(state: str) -> Optional[str]:
217+
"""Map CycloneDX analysis state to VEX status."""
218+
mapping = {
219+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED: VEX_Status.VEX_STATUS_FIXED,
220+
CycloneDX_Analysis_State.CYCLONEDX_STATE_RESOLVED_WITH_PEDIGREE: VEX_Status.VEX_STATUS_FIXED,
221+
CycloneDX_Analysis_State.CYCLONEDX_STATE_EXPLOITABLE: VEX_Status.VEX_STATUS_AFFECTED,
222+
CycloneDX_Analysis_State.CYCLONEDX_STATE_IN_TRIAGE: VEX_Status.VEX_STATUS_UNDER_INVESTIGATION,
223+
CycloneDX_Analysis_State.CYCLONEDX_STATE_FALSE_POSITIVE: VEX_Status.VEX_STATUS_NOT_AFFECTED,
224+
CycloneDX_Analysis_State.CYCLONEDX_STATE_NOT_AFFECTED: VEX_Status.VEX_STATUS_NOT_AFFECTED,
225+
}
226+
return mapping.get(state)
227+
228+
229+
def _map_cyclonedx_justification_to_vex_justification(justification: str) -> str:
230+
"""Map CycloneDX justification to VEX justification if possible."""
231+
# CycloneDX doesn't have standardized justification values like OpenVEX
232+
# We'll pass through the justification as is, or return empty string
233+
return justification if justification else ""
234+
235+
236+
def _build_remediation_text(response: list[str], recommendation: str) -> str:
237+
"""Build remediation text from response actions and recommendation."""
238+
remediation_parts = []
239+
240+
if response:
241+
response_text = ", ".join(response)
242+
remediation_parts.append(f"Response: {response_text}")
243+
244+
if recommendation:
245+
remediation_parts.append(f"Recommendation: {recommendation}")
246+
247+
return "; ".join(remediation_parts)
248+
249+
250+
def _process_affected_components(
251+
*,
252+
cyclonedx_document: VEX_Document,
253+
product_purls: set,
254+
vex_statements: set,
255+
vulnerability_counter: int,
256+
vulnerability_id: str,
257+
description: str,
258+
vex_status: str,
259+
justification: str,
260+
impact: str,
261+
remediation: str,
262+
affects: list,
263+
components_map: dict,
264+
) -> None:
265+
affected_counter = 0
266+
for affected in affects:
267+
if not isinstance(affected, dict):
268+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}] is not a dictionary")
269+
270+
ref = affected.get("ref")
271+
if not ref:
272+
raise ValidationError(f"affects[{vulnerability_counter}][{affected_counter}]/ref is missing")
273+
274+
# Look up component by bom-ref
275+
component = components_map.get(ref)
276+
if not component:
277+
# Skip if we can't find the component
278+
affected_counter += 1
279+
continue
280+
281+
# Extract PURL from component
282+
component_purl = component.get("purl", "")
283+
if component_purl:
284+
validate_purl(component_purl)
285+
286+
# For affected components, we'll use the component PURL as both product and component
287+
_create_vex_statement(
288+
cyclonedx_document, vulnerability_id, description, vex_status,
289+
justification, impact, remediation,
290+
component_purl, component_purl, product_purls, vex_statements
291+
)
292+
293+
affected_counter += 1
294+
295+
296+
def _create_vex_statement(
297+
cyclonedx_document: VEX_Document,
298+
vulnerability_id: str,
299+
description: str,
300+
vex_status: str,
301+
justification: str,
302+
impact: str,
303+
remediation: str,
304+
product_purl: str,
305+
component_purl: str,
306+
product_purls: set,
307+
vex_statements: set,
308+
) -> None:
309+
"""Create and save a VEX statement."""
310+
vex_statement = VEX_Statement(
311+
document=cyclonedx_document,
312+
vulnerability_id=vulnerability_id,
313+
description=description,
314+
status=vex_status,
315+
justification=justification,
316+
impact=impact,
317+
remediation=remediation,
318+
product_purl=product_purl,
319+
component_purl=component_purl,
320+
)
321+
vex_statement.save()
322+
vex_statements.add(vex_statement)
323+
product_purls.add(product_purl)

backend/application/vex/services/openvex_parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def _process_vex_statements(data: dict, openvex_document: VEX_Document) -> tuple
8282
openvex_statement.vulnerability_id = statement.get("vulnerability", {}).get("name")
8383
if not openvex_statement.vulnerability_id:
8484
raise ValidationError(f"vulnerability[{statement_counter}]/name is missing")
85-
openvex_statement.description = statement.get("vulnerability", {}).get("description")
85+
openvex_statement.description = statement.get("vulnerability", {}).get("description", "")
8686
openvex_statement.status = statement.get("status", "")
8787
if not openvex_statement.status:
8888
raise ValidationError(f"status[{statement_counter}] is missing")

backend/application/vex/services/vex_import.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from rest_framework.exceptions import ValidationError
66

77
from application.vex.services.csaf_parser import parse_csaf_data
8+
from application.vex.services.cyclonedx_parser import parse_cyclonedx_data
89
from application.vex.services.openvex_parser import parse_openvex_data
910
from application.vex.types import VEX_Document_Type
1011

@@ -22,6 +23,8 @@ def import_vex(vex_file: File) -> None:
2223
parse_openvex_data(data)
2324
elif vex_type == VEX_Document_Type.VEX_DOCUMENT_TYPE_CSAF:
2425
parse_csaf_data(data)
26+
elif vex_type == VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX:
27+
parse_cyclonedx_data(data)
2528

2629

2730
def _get_json_data(vex_file: File) -> Optional[dict]:
@@ -40,4 +43,7 @@ def _get_vex_type(data: dict) -> Optional[str]:
4043
if data.get("document", {}).get("category") == "csaf_vex" and data.get("document", {}).get("csaf_version") == "2.0":
4144
return VEX_Document_Type.VEX_DOCUMENT_TYPE_CSAF
4245

46+
if data.get("bomFormat") == "CycloneDX":
47+
return VEX_Document_Type.VEX_DOCUMENT_TYPE_CYCLONEDX
48+
4349
return None

0 commit comments

Comments
 (0)