diff --git a/component_catalog/tests/test_views.py b/component_catalog/tests/test_views.py index 5efafcb1..1dc00aa2 100644 --- a/component_catalog/tests/test_views.py +++ b/component_catalog/tests/test_views.py @@ -3064,37 +3064,6 @@ def test_vulnerablecode_get_vulnerable_purls(self): vulnerable_purls = vulnerablecode.get_vulnerable_purls(packages=[self.package1]) self.assertEqual(["pkg:pypi/django@2.1"], vulnerable_purls) - def test_vulnerablecode_get_vulnerable_cpes(self): - vulnerablecode = VulnerableCode(self.dataspace) - vulnerable_cpes = vulnerablecode.get_vulnerable_cpes(components=[]) - self.assertEqual([], vulnerable_cpes) - - components = [self.component1, self.component2] - vulnerable_cpes = vulnerablecode.get_vulnerable_cpes(components=components) - self.assertEqual([], vulnerable_cpes) - - self.component1.cpe = "cpe:2.3:a:djangoproject:django:0.95:*:*:*:*:*:*:*" - self.component1.save() - - with mock.patch( - "dejacode_toolkit.vulnerablecode.VulnerableCode.bulk_search_by_cpes" - ) as bulk_search: - bulk_search.return_value = [ - { - "vulnerability_id": "VCID-188m-1bke-aaae", - "summary": "The administrative interface in django.contrib.admin ", - "references": [ - {"reference_id": ""}, - ], - } - ] - vulnerable_cpes = vulnerablecode.get_vulnerable_cpes(components=components) - self.assertEqual([], vulnerable_cpes) - - bulk_search.return_value[0]["references"] = [{"reference_id": self.component1.cpe}] - vulnerable_cpes = vulnerablecode.get_vulnerable_cpes(components=components) - self.assertEqual([self.component1.cpe], vulnerable_cpes) - @mock.patch("dejacode_toolkit.vulnerablecode.VulnerableCode.request_get") def test_vulnerablecode_get_vulnerabilities_cache(self, mock_request_get): vulnerablecode = VulnerableCode(self.dataspace) diff --git a/dejacode_toolkit/__init__.py b/dejacode_toolkit/__init__.py index a7c23cc8..13e500d1 100644 --- a/dejacode_toolkit/__init__.py +++ b/dejacode_toolkit/__init__.py @@ -27,7 +27,7 @@ def get_settings(var_name, default=None): def is_service_available(label, session, url, raise_exceptions): """Check if a configured integration service is available.""" try: - response = session.head(url, timeout=REQUESTS_TIMEOUT) + response = session.head(url, allow_redirects=True, timeout=REQUESTS_TIMEOUT) response.raise_for_status() except requests.exceptions.RequestException as request_exception: logger.debug(f"{label} is_available() error: {request_exception}") @@ -43,6 +43,7 @@ class BaseService: settings_prefix = None url_field_name = None api_key_field_name = None + api_version = None default_timeout = REQUESTS_TIMEOUT def __init__(self, dataspace): @@ -71,6 +72,9 @@ def __init__(self, dataspace): self.api_url = f"{self.service_url.rstrip('/')}/api/" + if self.api_version: + self.api_url = f"{self.api_url}{self.api_version.rstrip('/')}/" + def get_session(self): session = requests.Session() diff --git a/dejacode_toolkit/vulnerablecode.py b/dejacode_toolkit/vulnerablecode.py index 34980dcf..c93314f4 100644 --- a/dejacode_toolkit/vulnerablecode.py +++ b/dejacode_toolkit/vulnerablecode.py @@ -19,6 +19,7 @@ class VulnerableCode(BaseService): settings_prefix = "VULNERABLECODE" url_field_name = "vulnerablecode_url" api_key_field_name = "vulnerablecode_api_key" + api_version = "v3" def get_vulnerabilities( self, @@ -47,59 +48,30 @@ def get_vulnerabilities_by_purl( ): """Get list of vulnerabilities providing a package `purl`.""" return self.get_vulnerabilities( - url=f"{self.api_url}packages/", + url=f"{self.api_url}packages", field_name="purl", field_value=get_plain_purl(purl), timeout=timeout, ) - def get_vulnerabilities_by_cpe( - self, - cpe, - timeout=None, - ): - """Get list of vulnerabilities providing a package or component `cpe`.""" - return self.get_vulnerabilities( - url=f"{self.api_url}cpes/", - field_name="cpe", - field_value=cpe, - timeout=timeout, - ) - def bulk_search_by_purl( self, purls, - purl_only, + details=True, timeout=None, ): """Bulk search of vulnerabilities using the provided list of `purls`.""" - url = f"{self.api_url}packages/bulk_search" + url = f"{self.api_url}packages" data = { "purls": purls, - "purl_only": purl_only, - "plain_purl": True, + "details": details, } logger.debug(f"VulnerableCode: url={url} purls_count={len(purls)}") return self.request_post(url=url, json=data, timeout=timeout) - def bulk_search_by_cpes( - self, - cpes, - timeout=None, - ): - """Bulk search of vulnerabilities using the provided list of `cpes`.""" - url = f"{self.api_url}cpes/bulk_search" - - data = { - "cpes": cpes, - } - - logger.debug(f"VulnerableCode: url={url} cpes_count={len(cpes)}") - return self.request_post(url, json=data, timeout=timeout) - - def get_vulnerable_purls(self, packages, purl_only=True, timeout=10): + def get_vulnerable_purls(self, packages, details=False, timeout=10): """ Return a list of PURLs for which at least one `affected_by_vulnerabilities` was found in the VulnerableCodeDB for the given list of `packages`. @@ -110,34 +82,11 @@ def get_vulnerable_purls(self, packages, purl_only=True, timeout=10): return [] vulnerable_purls = self.bulk_search_by_purl( - plain_purls, - purl_only=purl_only, + purls=plain_purls, + details=details, timeout=timeout, ) - return vulnerable_purls or [] - - def get_vulnerable_cpes(self, components): - """ - Return a list of vulnerable CPEs found in the VulnerableCodeDB for the given - list of `components`. - """ - cpes = [component.cpe for component in components if component.cpe] - - if not cpes: - return [] - - search_results = self.bulk_search_by_cpes(cpes) - if not search_results: - return [] - - vulnerable_cpes = [ - reference.get("reference_id") - for entry in search_results - for reference in entry.get("references") - if reference.get("reference_id").startswith("cpe") - ] - - return list(set(vulnerable_cpes)) + return vulnerable_purls.get("results") or [] def get_package_url_available_types(self): # Replace by fetching the endpoint once available. diff --git a/vulnerabilities/fetch.py b/vulnerabilities/fetch.py index 7f286ed0..877e167f 100644 --- a/vulnerabilities/fetch.py +++ b/vulnerabilities/fetch.py @@ -14,6 +14,8 @@ from django.urls import reverse from django.utils import timezone +from packageurl import PackageURL + from component_catalog.models import PACKAGE_URL_FIELDS from component_catalog.models import Package from dejacode_toolkit.vulnerablecode import VulnerableCode @@ -84,20 +86,21 @@ def fetch_for_packages( log_func(f"Progress: {intcomma(progress_count)}/{intcomma(object_count)}") batch_affected_packages = [] - vc_entries = vulnerablecode.get_vulnerable_purls(batch, purl_only=False, timeout=timeout) + vc_entries = vulnerablecode.get_vulnerable_purls(batch, details=True, timeout=timeout) for vc_entry in vc_entries: affected_by_vulnerabilities = vc_entry.get("affected_by_vulnerabilities") if not affected_by_vulnerabilities: continue + purl = PackageURL.from_string(vc_entry.get("purl")) affected_packages = queryset.filter( - type=vc_entry.get("type"), - namespace=vc_entry.get("namespace") or "", - name=vc_entry.get("name"), - version=vc_entry.get("version") or "", + type=purl.type, + namespace=purl.namespace, + name=purl.name, + version=purl.version, ) if not affected_packages: - raise CommandError("Could not find package!") + raise CommandError("Could not find packages!") # Store all packages of that batch to then trigger the update_weighted_risk_score batch_affected_packages.extend(affected_packages) @@ -119,6 +122,9 @@ def fetch_for_packages( def create_or_update_vulnerability( vulnerability_data, dataspace, affected_packages, update, results ): + # Adapt to the previous `vulnerability_id` based system. + vulnerability_data["vulnerability_id"] = vulnerability_data["advisory_id"] + vulnerability_id = vulnerability_data["vulnerability_id"] vulnerability_qs = Vulnerability.objects.scope(dataspace) vulnerability = vulnerability_qs.get_or_none(vulnerability_id=vulnerability_id) diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 7bf65d2f..1348ed06 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -98,6 +98,24 @@ class Vulnerability(HistoryDateFieldsMixin, DataspacedModel): "For example, 'VCID-2024-0001'." ), ) + # avid = models.CharField( + # max_length=500, + # blank=False, + # null=False, + # help_text=_( + # "Unique ID for the datasource used for this advisory ." + # "e.g.: pysec_importer_v2/PYSEC-2020-2233" + # ), + # ) + # advisory_id = models.CharField( + # max_length=500, + # blank=False, + # null=False, + # help_text=_( + # "An advisory is a unique vulnerability identifier in some database, " + # "such as PYSEC-2020-2233" + # ), + # ) resource_url = models.URLField( _("Resource URL"), max_length=1024, @@ -191,6 +209,9 @@ class Meta: unique_together = (("dataspace", "vulnerability_id"), ("dataspace", "uuid")) indexes = [ models.Index(fields=["vulnerability_id"]), + # models.Index(fields=["exploitability"]), + # models.Index(fields=["weighted_severity"]), + # models.Index(fields=["risk_score"]), models.Index(fields=["risk_level"]), ] @@ -488,16 +509,7 @@ def get_entry_for_package(self, vulnerablecode): affected_by_vulnerabilities = vulnerable_packages[0].get("affected_by_vulnerabilities") return affected_by_vulnerabilities - def get_entry_for_component(self, vulnerablecode): - if not self.cpe: - return - - # Support for Component is paused as the CPES endpoint do not work properly. - # https://github.com/aboutcode-org/vulnerablecode/issues/1557 - # vulnerabilities = vulnerablecode.get_vulnerabilities_by_cpe(self.cpe, timeout=10) - def get_entry_from_vulnerablecode(self): - from component_catalog.models import Component from component_catalog.models import Package from dejacode_toolkit.vulnerablecode import VulnerableCode @@ -513,8 +525,6 @@ def get_entry_from_vulnerablecode(self): if not is_vulnerablecode_enabled: return - if isinstance(self, Component): - return self.get_entry_for_component(vulnerablecode) elif isinstance(self, Package): return self.get_entry_for_package(vulnerablecode)