Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ addopts = [
"-rfExXw",
"--strict-markers",
"--doctest-modules",
# setup.py imports setuptools which is not available in the Docker runtime
# image. Without this, pytest (which uses python_files = "*.py") tries to
# collect setup.py as a test module and crashes with exit code 2.
"--ignore=setup.py",
"--ignore-glob=*/setup.py",
# Ignore the following doctests until these files are migrated to
# import-improve structure
"--ignore=vulnerabilities/importers/apache_httpd.py",
Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2
from vulnerabilities.pipelines.v2_importers import apache_tomcat_importer as apache_tomcat_v2
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
from vulnerabilities.pipelines.v2_importers import cloudvulndb_importer as cloudvulndb_importer_v2
from vulnerabilities.pipelines.v2_importers import collect_fix_commits as collect_fix_commits_v2
from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2
from vulnerabilities.pipelines.v2_importers import debian_importer as debian_importer_v2
Expand Down Expand Up @@ -109,6 +110,7 @@
project_kb_msr2019_importer_v2.ProjectKBMSR2019Pipeline,
ruby_importer_v2.RubyImporterPipeline,
epss_importer_v2.EPSSImporterPipeline,
cloudvulndb_importer_v2.CloudVulnDBImporterPipeline,
gentoo_importer_v2.GentooImporterPipeline,
nginx_importer_v2.NginxImporterPipeline,
debian_importer_v2.DebianImporterPipeline,
Expand Down
171 changes: 171 additions & 0 deletions vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import hashlib
import json
import logging
from typing import Iterable
from urllib.parse import urlparse
from xml.etree import ElementTree

from dateutil import parser as dateutil_parser

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.utils import fetch_response
from vulnerabilities.utils import find_all_cve

logger = logging.getLogger(__name__)

CLOUDVULNDB_RSS_URL = "https://www.cloudvulndb.org/rss/feed.xml"


class CloudVulnDBImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Collect cloud vulnerabilities from the public CloudVulnDB RSS feed."""

pipeline_id = "cloudvulndb_importer_v2"
spdx_license_expression = "CC-BY-4.0"
license_url = "https://github.com/wiz-sec/open-cvdb/blob/main/LICENSE.md"
repo_url = "https://github.com/wiz-sec/open-cvdb"
precedence = 200

_cached_items = None

@classmethod
def steps(cls):
return (cls.collect_and_store_advisories,)

def get_feed_items(self):
if self._cached_items is None:
response = fetch_response(CLOUDVULNDB_RSS_URL)
self._cached_items = parse_rss_feed(response.text)
return self._cached_items

def advisories_count(self) -> int:
return len(self.get_feed_items())

def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
for item in self.get_feed_items():
advisory = parse_advisory_data(item)
if advisory:
yield advisory


def parse_rss_feed(xml_text: str) -> list:
"""
Parse CloudVulnDB RSS XML and return a list of item dictionaries.
Each dictionary has ``title``, ``link``, ``description``, ``pub_date`` and ``guid`` keys.
"""
try:
root = ElementTree.fromstring(xml_text)
except ElementTree.ParseError as e:
logger.error("Failed to parse CloudVulnDB RSS XML: %s", e)
Comment on lines +61 to +69
return []

channel = root.find("channel")
if channel is None:
logger.error("CloudVulnDB RSS feed has no <channel> element")
return []

items = []
for item_el in channel.findall("item"):
items.append(
{
"title": (item_el.findtext("title") or "").strip(),
"link": (item_el.findtext("link") or "").strip(),
"description": (item_el.findtext("description") or "").strip(),
"pub_date": (item_el.findtext("pubDate") or "").strip(),
"guid": (item_el.findtext("guid") or "").strip(),
}
)

return items


def parse_advisory_data(item: dict):
"""
Parse one CloudVulnDB item and return an AdvisoryDataV2 object.
Since the RSS feed does not provide package/version coordinates, ``affected_packages`` is empty.
"""
title = item.get("title") or ""
link = item.get("link") or ""
description = item.get("description") or ""
pub_date = item.get("pub_date") or ""
guid = item.get("guid") or ""

advisory_id = get_advisory_id(guid=guid, link=link, title=title, pub_date=pub_date)
if not advisory_id:
logger.error("Skipping advisory with no usable identifier: %r", item)
return None

aliases = list(dict.fromkeys(find_all_cve(f"{title}\n{description}")))
aliases = [alias for alias in aliases if alias != advisory_id]

date_published = None
if pub_date:
try:
date_published = dateutil_parser.parse(pub_date)
except Exception as e:
logger.warning("Could not parse date %r for advisory %s: %s", pub_date, advisory_id, e)

references = []
if link:
references.append(ReferenceV2(url=link))

summary = title or description

return AdvisoryDataV2(
advisory_id=advisory_id,
aliases=aliases,
summary=summary,
affected_packages=[],
references=references,
date_published=date_published,
url=link or CLOUDVULNDB_RSS_URL,
original_advisory_text=json.dumps(item, indent=2, ensure_ascii=False),
)


def get_advisory_id(guid: str, link: str, title: str, pub_date: str) -> str:
"""
Return a stable advisory identifier using the best available source.
Preference order is GUID, link slug, then deterministic content hash fallback.
"""
guid = (guid or "").strip()
if guid:
return guid

slug = advisory_slug_from_link(link)
if slug:
return slug

fingerprint_source = "|".join([title.strip(), pub_date.strip()])
if not fingerprint_source.strip("|"):
return ""

digest = hashlib.sha256(fingerprint_source.encode("utf-8")).hexdigest()[:16]
return f"cloudvulndb-{digest}"


def advisory_slug_from_link(link: str) -> str:
"""Extract an advisory slug from a CloudVulnDB URL path."""
if not link:
return ""

try:
parsed = urlparse(link)
except Exception:
return ""

parts = [part for part in parsed.path.split("/") if part]
if not parts:
return ""

return parts[-1].strip()
64 changes: 64 additions & 0 deletions vulnerabilities/tests/test_cloudvulndb_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from unittest import TestCase

from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import advisory_slug_from_link
from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import get_advisory_id
from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import parse_advisory_data
from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import parse_rss_feed
from vulnerabilities.tests import util_tests

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data/cloudvulndb")


def _load_rss(filename="cloudvulndb_rss_mock.xml"):
with open(os.path.join(TEST_DATA, filename), encoding="utf-8") as f:
return f.read()


class TestCloudVulnDBImporter(TestCase):
def test_parse_rss_feed_returns_correct_item_count(self):
items = parse_rss_feed(_load_rss())
self.assertEqual(len(items), 2)

def test_parse_advisory_with_guid_and_cves(self):
items = parse_rss_feed(_load_rss())
result = parse_advisory_data(items[0])
self.assertIsNotNone(result)
result_dict = result.to_dict()
expected_file = os.path.join(TEST_DATA, "expected_cloudvulndb_advisory_output1.json")
util_tests.check_results_against_json(result_dict, expected_file)

def test_parse_advisory_without_guid_falls_back_to_link_slug(self):
items = parse_rss_feed(_load_rss())
result = parse_advisory_data(items[1])
self.assertIsNotNone(result)
self.assertEqual(result.advisory_id, "azure-imds-ssrf")
self.assertEqual(result.aliases, [])

def test_get_advisory_id_hash_fallback(self):
advisory_id = get_advisory_id(
guid="",
link="",
title="Example advisory title",
pub_date="Mon, 08 Jul 2024 00:00:00 GMT",
)
self.assertTrue(advisory_id.startswith("cloudvulndb-"))
self.assertEqual(len(advisory_id), len("cloudvulndb-") + 16)

def test_parse_rss_feed_invalid_xml_returns_empty(self):
result = parse_rss_feed("not valid xml <>>>")
self.assertEqual(result, [])

def test_advisory_slug_from_link(self):
slug = advisory_slug_from_link("https://www.cloudvulndb.org/vulnerabilities/aws-example/")
self.assertEqual(slug, "aws-example")
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
<channel>
<title>CloudVulnDB RSS</title>
<link>https://www.cloudvulndb.org</link>
<description>Cloud vulnerabilities and security issues</description>
<item>
<title><![CDATA[AWS Example Privilege Escalation (CVE-2024-11111)]]></title>
<link>https://www.cloudvulndb.org/vulnerabilities/aws-example-privilege-escalation</link>
<guid isPermaLink="false">CLOUD-2024-0001</guid>
<pubDate>Tue, 04 Jun 2024 12:30:00 GMT</pubDate>
<description><![CDATA[An example cloud vulnerability. Additional tracking: CVE-2024-22222.]]></description>
</item>
<item>
<title><![CDATA[Azure IMDS SSRF Exposure]]></title>
<link>https://www.cloudvulndb.org/vulnerabilities/azure-imds-ssrf</link>
<guid></guid>
<pubDate>Fri, 05 Jul 2024 08:00:00 GMT</pubDate>
<description><![CDATA[No CVE assigned.]]></description>
</item>
</channel>
</rss>
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"advisory_id": "CLOUD-2024-0001",
"aliases": [
"CVE-2024-11111",
"CVE-2024-22222"
],
"summary": "AWS Example Privilege Escalation (CVE-2024-11111)",
"affected_packages": [],
"references": [
{
"reference_id": "",
"reference_type": "",
"url": "https://www.cloudvulndb.org/vulnerabilities/aws-example-privilege-escalation"
}
],
"patches": [],
"severities": [],
"date_published": "2024-06-04T12:30:00+00:00",
"weaknesses": [],
"url": "https://www.cloudvulndb.org/vulnerabilities/aws-example-privilege-escalation"
}