From d9c1d6c30f194a4cd0a4c1d772d5976748c0ecf2 Mon Sep 17 00:00:00 2001 From: Tedsig42 Date: Sun, 15 Mar 2026 00:31:53 +0000 Subject: [PATCH 1/2] Add CloudVulnDB importer - add CloudVulnDB v2 importer pipeline - register importer - add tests and fixtures - ignore setup.py in pytest collection --- pyproject.toml | 5 + vulnerabilities/importers/__init__.py | 2 + .../v2_importers/cloudvulndb_importer.py | 171 ++++++++++++++++++ .../tests/test_cloudvulndb_importer.py | 64 +++++++ .../cloudvulndb/cloudvulndb_rss_mock.xml | 22 +++ ...expected_cloudvulndb_advisory_output1.json | 21 +++ 6 files changed, 285 insertions(+) create mode 100644 vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py create mode 100644 vulnerabilities/tests/test_cloudvulndb_importer.py create mode 100644 vulnerabilities/tests/test_data/cloudvulndb/cloudvulndb_rss_mock.xml create mode 100644 vulnerabilities/tests/test_data/cloudvulndb/expected_cloudvulndb_advisory_output1.json diff --git a/pyproject.toml b/pyproject.toml index 6b1d8c0d5..88e1ee5c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,11 @@ addopts = [ "-rfExXw", "--strict-markers", "--doctest-modules", + # setup.py imports setuptools which is not available in the Docker runtime + # image. Without this, pytest (which uses python_files = "*.py") tries to + # collect setup.py as a test module and crashes with exit code 2. + "--ignore=setup.py", + "--ignore-glob=*/setup.py", # Ignore the following doctests until these files are migrated to # import-improve structure "--ignore=vulnerabilities/importers/apache_httpd.py", diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 594021092..6b42b4409 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -47,6 +47,7 @@ from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2 from vulnerabilities.pipelines.v2_importers import apache_tomcat_importer as apache_tomcat_v2 from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2 +from vulnerabilities.pipelines.v2_importers import cloudvulndb_importer as cloudvulndb_importer_v2 from vulnerabilities.pipelines.v2_importers import collect_fix_commits as collect_fix_commits_v2 from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2 from vulnerabilities.pipelines.v2_importers import debian_importer as debian_importer_v2 @@ -109,6 +110,7 @@ project_kb_msr2019_importer_v2.ProjectKBMSR2019Pipeline, ruby_importer_v2.RubyImporterPipeline, epss_importer_v2.EPSSImporterPipeline, + cloudvulndb_importer_v2.CloudVulnDBImporterPipeline, gentoo_importer_v2.GentooImporterPipeline, nginx_importer_v2.NginxImporterPipeline, debian_importer_v2.DebianImporterPipeline, diff --git a/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py b/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py new file mode 100644 index 000000000..6b87f7baf --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py @@ -0,0 +1,171 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import hashlib +import json +import logging +from typing import Iterable +from urllib.parse import urlparse +from xml.etree import ElementTree + +from dateutil import parser as dateutil_parser + +from vulnerabilities.importer import AdvisoryDataV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 +from vulnerabilities.utils import fetch_response +from vulnerabilities.utils import find_all_cve + +logger = logging.getLogger(__name__) + +CLOUDVULNDB_RSS_URL = "https://www.cloudvulndb.org/rss/feed.xml" + + +class CloudVulnDBImporterPipeline(VulnerableCodeBaseImporterPipelineV2): + """Collect cloud vulnerabilities from the public CloudVulnDB RSS feed.""" + + pipeline_id = "cloudvulndb_importer" + spdx_license_expression = "CC-BY-4.0" + license_url = "https://github.com/wiz-sec/open-cvdb/blob/main/LICENSE.md" + repo_url = "https://github.com/wiz-sec/open-cvdb" + precedence = 200 + + _cached_items = None + + @classmethod + def steps(cls): + return (cls.collect_and_store_advisories,) + + def get_feed_items(self): + if self._cached_items is None: + response = fetch_response(CLOUDVULNDB_RSS_URL) + self._cached_items = parse_rss_feed(response.text) + return self._cached_items + + def advisories_count(self) -> int: + return len(self.get_feed_items()) + + def collect_advisories(self) -> Iterable[AdvisoryDataV2]: + for item in self.get_feed_items(): + advisory = parse_advisory_data(item) + if advisory: + yield advisory + + +def parse_rss_feed(xml_text: str) -> list: + """ + Parse CloudVulnDB RSS XML and return a list of item dictionaries. + Each dictionary has ``title``, ``link``, ``description``, ``pub_date`` and ``guid`` keys. + """ + try: + root = ElementTree.fromstring(xml_text) + except ElementTree.ParseError as e: + logger.error("Failed to parse CloudVulnDB RSS XML: %s", e) + return [] + + channel = root.find("channel") + if channel is None: + logger.error("CloudVulnDB RSS feed has no element") + return [] + + items = [] + for item_el in channel.findall("item"): + items.append( + { + "title": (item_el.findtext("title") or "").strip(), + "link": (item_el.findtext("link") or "").strip(), + "description": (item_el.findtext("description") or "").strip(), + "pub_date": (item_el.findtext("pubDate") or "").strip(), + "guid": (item_el.findtext("guid") or "").strip(), + } + ) + + return items + + +def parse_advisory_data(item: dict): + """ + Parse one CloudVulnDB item and return an AdvisoryDataV2 object. + Since the RSS feed does not provide package/version coordinates, ``affected_packages`` is empty. + """ + title = item.get("title") or "" + link = item.get("link") or "" + description = item.get("description") or "" + pub_date = item.get("pub_date") or "" + guid = item.get("guid") or "" + + advisory_id = get_advisory_id(guid=guid, link=link, title=title, pub_date=pub_date) + if not advisory_id: + logger.error("Skipping advisory with no usable identifier: %r", item) + return None + + aliases = list(dict.fromkeys(find_all_cve(f"{title}\n{description}"))) + aliases = [alias for alias in aliases if alias != advisory_id] + + date_published = None + if pub_date: + try: + date_published = dateutil_parser.parse(pub_date) + except Exception as e: + logger.warning("Could not parse date %r for advisory %s: %s", pub_date, advisory_id, e) + + references = [] + if link: + references.append(ReferenceV2(url=link)) + + summary = title or description + + return AdvisoryDataV2( + advisory_id=advisory_id, + aliases=aliases, + summary=summary, + affected_packages=[], + references=references, + date_published=date_published, + url=link or CLOUDVULNDB_RSS_URL, + original_advisory_text=json.dumps(item, indent=2, ensure_ascii=False), + ) + + +def get_advisory_id(guid: str, link: str, title: str, pub_date: str) -> str: + """ + Return a stable advisory identifier using the best available source. + Preference order is GUID, link slug, then deterministic content hash fallback. + """ + guid = (guid or "").strip() + if guid: + return guid + + slug = advisory_slug_from_link(link) + if slug: + return slug + + fingerprint_source = "|".join([title.strip(), pub_date.strip()]) + if not fingerprint_source.strip("|"): + return "" + + digest = hashlib.sha256(fingerprint_source.encode("utf-8")).hexdigest()[:16] + return f"cloudvulndb-{digest}" + + +def advisory_slug_from_link(link: str) -> str: + """Extract an advisory slug from a CloudVulnDB URL path.""" + if not link: + return "" + + try: + parsed = urlparse(link) + except Exception: + return "" + + parts = [part for part in parsed.path.split("/") if part] + if not parts: + return "" + + return parts[-1].strip() diff --git a/vulnerabilities/tests/test_cloudvulndb_importer.py b/vulnerabilities/tests/test_cloudvulndb_importer.py new file mode 100644 index 000000000..9f717fe51 --- /dev/null +++ b/vulnerabilities/tests/test_cloudvulndb_importer.py @@ -0,0 +1,64 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import os +from unittest import TestCase + +from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import advisory_slug_from_link +from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import get_advisory_id +from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import parse_advisory_data +from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import parse_rss_feed +from vulnerabilities.tests import util_tests + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_DATA = os.path.join(BASE_DIR, "test_data/cloudvulndb") + + +def _load_rss(filename="cloudvulndb_rss_mock.xml"): + with open(os.path.join(TEST_DATA, filename), encoding="utf-8") as f: + return f.read() + + +class TestCloudVulnDBImporter(TestCase): + def test_parse_rss_feed_returns_correct_item_count(self): + items = parse_rss_feed(_load_rss()) + self.assertEqual(len(items), 2) + + def test_parse_advisory_with_guid_and_cves(self): + items = parse_rss_feed(_load_rss()) + result = parse_advisory_data(items[0]) + self.assertIsNotNone(result) + result_dict = result.to_dict() + expected_file = os.path.join(TEST_DATA, "expected_cloudvulndb_advisory_output1.json") + util_tests.check_results_against_json(result_dict, expected_file) + + def test_parse_advisory_without_guid_falls_back_to_link_slug(self): + items = parse_rss_feed(_load_rss()) + result = parse_advisory_data(items[1]) + self.assertIsNotNone(result) + self.assertEqual(result.advisory_id, "azure-imds-ssrf") + self.assertEqual(result.aliases, []) + + def test_get_advisory_id_hash_fallback(self): + advisory_id = get_advisory_id( + guid="", + link="", + title="Example advisory title", + pub_date="Mon, 08 Jul 2024 00:00:00 GMT", + ) + self.assertTrue(advisory_id.startswith("cloudvulndb-")) + self.assertEqual(len(advisory_id), len("cloudvulndb-") + 16) + + def test_parse_rss_feed_invalid_xml_returns_empty(self): + result = parse_rss_feed("not valid xml <>>>") + self.assertEqual(result, []) + + def test_advisory_slug_from_link(self): + slug = advisory_slug_from_link("https://www.cloudvulndb.org/vulnerabilities/aws-example/") + self.assertEqual(slug, "aws-example") diff --git a/vulnerabilities/tests/test_data/cloudvulndb/cloudvulndb_rss_mock.xml b/vulnerabilities/tests/test_data/cloudvulndb/cloudvulndb_rss_mock.xml new file mode 100644 index 000000000..1d2421e57 --- /dev/null +++ b/vulnerabilities/tests/test_data/cloudvulndb/cloudvulndb_rss_mock.xml @@ -0,0 +1,22 @@ + + + + CloudVulnDB RSS + https://www.cloudvulndb.org + Cloud vulnerabilities and security issues + + <![CDATA[AWS Example Privilege Escalation (CVE-2024-11111)]]> + https://www.cloudvulndb.org/vulnerabilities/aws-example-privilege-escalation + CLOUD-2024-0001 + Tue, 04 Jun 2024 12:30:00 GMT + + + + <![CDATA[Azure IMDS SSRF Exposure]]> + https://www.cloudvulndb.org/vulnerabilities/azure-imds-ssrf + + Fri, 05 Jul 2024 08:00:00 GMT + + + + diff --git a/vulnerabilities/tests/test_data/cloudvulndb/expected_cloudvulndb_advisory_output1.json b/vulnerabilities/tests/test_data/cloudvulndb/expected_cloudvulndb_advisory_output1.json new file mode 100644 index 000000000..8baf2b463 --- /dev/null +++ b/vulnerabilities/tests/test_data/cloudvulndb/expected_cloudvulndb_advisory_output1.json @@ -0,0 +1,21 @@ +{ + "advisory_id": "CLOUD-2024-0001", + "aliases": [ + "CVE-2024-11111", + "CVE-2024-22222" + ], + "summary": "AWS Example Privilege Escalation (CVE-2024-11111)", + "affected_packages": [], + "references": [ + { + "reference_id": "", + "reference_type": "", + "url": "https://www.cloudvulndb.org/vulnerabilities/aws-example-privilege-escalation" + } + ], + "patches": [], + "severities": [], + "date_published": "2024-06-04T12:30:00+00:00", + "weaknesses": [], + "url": "https://www.cloudvulndb.org/vulnerabilities/aws-example-privilege-escalation" +} From 19709372d7d5437ec79c880678bd9d27256b5fe8 Mon Sep 17 00:00:00 2001 From: Tedsig42 Date: Sun, 15 Mar 2026 00:39:19 +0000 Subject: [PATCH 2/2] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py b/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py index 6b87f7baf..9379f1126 100644 --- a/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py +++ b/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py @@ -30,7 +30,7 @@ class CloudVulnDBImporterPipeline(VulnerableCodeBaseImporterPipelineV2): """Collect cloud vulnerabilities from the public CloudVulnDB RSS feed.""" - pipeline_id = "cloudvulndb_importer" + pipeline_id = "cloudvulndb_importer_v2" spdx_license_expression = "CC-BY-4.0" license_url = "https://github.com/wiz-sec/open-cvdb/blob/main/LICENSE.md" repo_url = "https://github.com/wiz-sec/open-cvdb"