diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 594021092..f7c40ae4a 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -47,6 +47,7 @@ from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2 from vulnerabilities.pipelines.v2_importers import apache_tomcat_importer as apache_tomcat_v2 from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2 +from vulnerabilities.pipelines.v2_importers import checkpoint_importer as checkpoint_importer_v2 from vulnerabilities.pipelines.v2_importers import collect_fix_commits as collect_fix_commits_v2 from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2 from vulnerabilities.pipelines.v2_importers import debian_importer as debian_importer_v2 @@ -88,6 +89,7 @@ [ archlinux_importer_v2.ArchLinuxImporterPipeline, apache_kafka_importer_v2.ApacheKafkaImporterPipeline, + checkpoint_importer_v2.CheckPointImporterPipeline, nvd_importer_v2.NVDImporterPipeline, elixir_security_importer_v2.ElixirSecurityImporterPipeline, npm_importer_v2.NpmImporterPipeline, diff --git a/vulnerabilities/pipelines/v2_importers/checkpoint_importer.py b/vulnerabilities/pipelines/v2_importers/checkpoint_importer.py new file mode 100644 index 000000000..e1735294d --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/checkpoint_importer.py @@ -0,0 +1,213 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import datetime +import json +import logging +from typing import Iterable + +import dateparser +import requests +from bs4 import BeautifulSoup + +from vulnerabilities.importer import AdvisoryDataV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.importer import VulnerabilitySeverity +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 +from vulnerabilities.severity_systems import GENERIC + +logger = logging.getLogger(__name__) + +ADVISORY_BASE_URL = "https://advisories.checkpoint.com" +ADVISORY_LIST_URL = "https://advisories.checkpoint.com/advisories/" + + +class CheckPointImporterPipeline(VulnerableCodeBaseImporterPipelineV2): + """Collect Check Point security advisories.""" + + pipeline_id = "checkpoint_importer" + spdx_license_expression = "LicenseRef-scancode-proprietary-license" + license_url = "https://advisories.checkpoint.com/" + url = ADVISORY_LIST_URL + precedence = 200 + + @classmethod + def steps(cls): + return ( + cls.fetch, + cls.collect_and_store_advisories, + ) + + def fetch(self): + self.log(f"Fetch `{self.url}`") + self.advisories_data = list(fetch_all_advisory_rows(self.log)) + + def advisories_count(self): + return len(self.advisories_data) + + def collect_advisories(self) -> Iterable[AdvisoryDataV2]: + for row_data in self.advisories_data: + advisory = parse_advisory(row_data) + if advisory: + yield advisory + + +def get_available_years(soup: BeautifulSoup) -> list: + """Return sorted list of years from year-navigation links, including current year.""" + years = set() + for link in soup.find_all("a", href=True): + href = link["href"] + if "/defense/advisories/public/" in href: + part = href.rstrip("/").split("/")[-1] + if part.isdigit() and len(part) == 4: + years.add(int(part)) + years.add(datetime.date.today().year) + return sorted(years) + + +def get_total_pages(soup: BeautifulSoup) -> int: + """Return total page count from pagination links.""" + page_nums = [] + for link in soup.find_all("a", href=True): + href = link["href"] + if "/advisories/page/" in href: + part = href.split("/page/")[-1].split("?")[0].strip("/") + if part.isdigit(): + page_nums.append(int(part)) + return max(page_nums) if page_nums else 1 + + +def fetch_all_advisory_rows(log_fn) -> Iterable[dict]: + """Yield row dicts for all advisories across all years and pages.""" + try: + resp = requests.get(ADVISORY_LIST_URL, timeout=30) + resp.raise_for_status() + except requests.exceptions.RequestException as e: + log_fn(f"Failed to fetch {ADVISORY_LIST_URL}: {e}") + return + + soup = BeautifulSoup(resp.text, features="lxml") + years = get_available_years(soup) + + for year in years: + url = f"{ADVISORY_LIST_URL}?year={year}" + try: + resp = requests.get(url, timeout=30) + resp.raise_for_status() + except requests.exceptions.RequestException as e: + log_fn(f"Failed to fetch {url}: {e}") + continue + + year_soup = BeautifulSoup(resp.text, features="lxml") + total_pages = get_total_pages(year_soup) + yield from parse_table_rows(resp.text) + + for page in range(2, total_pages + 1): + page_url = f"{ADVISORY_LIST_URL}page/{page}/?year={year}" + try: + resp = requests.get(page_url, timeout=30) + resp.raise_for_status() + except requests.exceptions.RequestException as e: + log_fn(f"Failed to fetch {page_url}: {e}") + break + yield from parse_table_rows(resp.text) + + +def parse_table_rows(html: str) -> list: + """Return list of row dicts from the advisories table HTML.""" + soup = BeautifulSoup(html, features="lxml") + table = soup.find("table", {"id": "cp_advisory_table_sorter"}) + if not table: + return [] + + rows = [] + for tr in table.find_all("tr")[1:]: + cells = tr.find_all("td") + if len(cells) < 7: + continue + + cpai_link = cells[3].find("a") + if not cpai_link: + continue + + advisory_id = cpai_link.get_text(strip=True) + href = cpai_link.get("href", "") + advisory_url = f"{ADVISORY_BASE_URL}{href}" if href.startswith("/") else href + + cve_link = cells[5].find("a") + cve_text = cve_link.get_text(strip=True) if cve_link else cells[5].get_text(strip=True) + cve_id = cve_text.split(" (")[0].strip() + + summary_link = cells[6].find("a") + summary = ( + summary_link.get_text(strip=True) if summary_link else cells[6].get_text(strip=True) + ) + + rows.append( + { + "advisory_id": advisory_id, + "advisory_url": advisory_url, + "cve_id": cve_id, + "severity": cells[0].get_text(strip=True), + "date_published": cells[1].get_text(strip=True), + "summary": summary, + } + ) + + return rows + + +def parse_advisory(row_data: dict): + """Return AdvisoryDataV2 from a row data dict, or None if advisory_id is missing.""" + advisory_id = row_data.get("advisory_id") or "" + if not advisory_id or not advisory_id.startswith("CPAI-"): + return None + + date_published = None + raw_date = row_data.get("date_published") or "" + if raw_date: + date_published = dateparser.parse( + raw_date, + settings={"TIMEZONE": "UTC", "RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"}, + ) + if date_published is None: + logger.warning("Could not parse date %r for %s", raw_date, advisory_id) + + cve_id = row_data.get("cve_id") or "" + aliases = [cve_id] if cve_id.startswith("CVE-") else [] + + advisory_url = row_data.get("advisory_url") or "" + references = [] + if advisory_url: + references.append(ReferenceV2(url=advisory_url, reference_id=advisory_id)) + if cve_id.startswith("CVE-"): + references.append( + ReferenceV2( + url=f"https://nvd.nist.gov/vuln/detail/{cve_id}", + reference_id=cve_id, + ) + ) + + severities = [] + severity = row_data.get("severity") or "" + if severity: + severities.append(VulnerabilitySeverity(system=GENERIC, value=severity)) + + return AdvisoryDataV2( + advisory_id=advisory_id, + aliases=aliases, + summary=row_data.get("summary") or "", + affected_packages=[], + references=references, + date_published=date_published, + weaknesses=[], + severities=severities, + url=advisory_url, + original_advisory_text=json.dumps(row_data, indent=2, ensure_ascii=False), + ) diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_checkpoint_importer.py b/vulnerabilities/tests/pipelines/v2_importers/test_checkpoint_importer.py new file mode 100644 index 000000000..54729cf22 --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_checkpoint_importer.py @@ -0,0 +1,180 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import datetime +import json +from pathlib import Path +from unittest import TestCase +from unittest.mock import MagicMock +from unittest.mock import patch + +import requests +from bs4 import BeautifulSoup + +from vulnerabilities.pipelines.v2_importers.checkpoint_importer import CheckPointImporterPipeline +from vulnerabilities.pipelines.v2_importers.checkpoint_importer import get_available_years +from vulnerabilities.pipelines.v2_importers.checkpoint_importer import get_total_pages +from vulnerabilities.pipelines.v2_importers.checkpoint_importer import parse_advisory +from vulnerabilities.pipelines.v2_importers.checkpoint_importer import parse_table_rows + +TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "checkpoint" + +with open(TEST_DATA / "advisories_2026.html") as f: + SAMPLE_HTML = f.read() + +SAMPLE_ROWS = parse_table_rows(SAMPLE_HTML) + + +class TestGetAvailableYears(TestCase): + def test_extracts_years_from_nav_links(self): + soup = BeautifulSoup(SAMPLE_HTML, features="lxml") + years = get_available_years(soup) + current_year = datetime.date.today().year + assert 2024 in years + assert 2025 in years + assert current_year in years + + def test_always_includes_current_year(self): + soup = BeautifulSoup("", features="lxml") + years = get_available_years(soup) + assert years == [datetime.date.today().year] + + +class TestGetTotalPages(TestCase): + def test_extracts_max_page_from_pagination(self): + soup = BeautifulSoup(SAMPLE_HTML, features="lxml") + assert get_total_pages(soup) == 2 + + def test_returns_one_when_no_pagination(self): + soup = BeautifulSoup("", features="lxml") + assert get_total_pages(soup) == 1 + + +class TestParseTableRows(TestCase): + def test_parses_three_rows(self): + assert len(SAMPLE_ROWS) == 3 + + def test_first_row_advisory_id(self): + assert SAMPLE_ROWS[0]["advisory_id"] == "CPAI-2026-1780" + + def test_first_row_cve_id(self): + assert SAMPLE_ROWS[0]["cve_id"] == "CVE-2026-20122" + + def test_first_row_severity(self): + assert SAMPLE_ROWS[0]["severity"] == "Medium" + + def test_first_row_date(self): + assert SAMPLE_ROWS[0]["date_published"] == "17 Mar 2026" + + def test_first_row_summary(self): + assert "Cisco Catalyst" in SAMPLE_ROWS[0]["summary"] + + def test_first_row_advisory_url(self): + assert SAMPLE_ROWS[0]["advisory_url"].endswith("cpai-2026-1780.html") + + def test_cve_id_stripped_of_extra_text(self): + assert SAMPLE_ROWS[2]["cve_id"] == "CVE-2025-33603" + + def test_returns_empty_list_for_missing_table(self): + assert parse_table_rows("") == [] + + +class TestParseAdvisory(TestCase): + def setUp(self): + self.row = SAMPLE_ROWS[0] + + def test_advisory_id(self): + advisory = parse_advisory(self.row) + assert advisory.advisory_id == "CPAI-2026-1780" + + def test_cve_in_aliases(self): + advisory = parse_advisory(self.row) + assert "CVE-2026-20122" in advisory.aliases + + def test_date_parsed(self): + advisory = parse_advisory(self.row) + assert advisory.date_published is not None + assert advisory.date_published.year == 2026 + + def test_severity_stored(self): + advisory = parse_advisory(self.row) + assert len(advisory.severities) == 1 + assert advisory.severities[0].value == "Medium" + + def test_references_include_advisory_url(self): + advisory = parse_advisory(self.row) + urls = [r.url for r in advisory.references] + assert any("cpai-2026-1780.html" in u for u in urls) + + def test_references_include_nvd_url(self): + advisory = parse_advisory(self.row) + urls = [r.url for r in advisory.references] + assert any("nvd.nist.gov" in u for u in urls) + + def test_reference_ids_set(self): + advisory = parse_advisory(self.row) + ref_ids = [r.reference_id for r in advisory.references] + assert "CPAI-2026-1780" in ref_ids + assert "CVE-2026-20122" in ref_ids + + def test_affected_packages_empty(self): + advisory = parse_advisory(self.row) + assert advisory.affected_packages == [] + + def test_weaknesses_empty(self): + advisory = parse_advisory(self.row) + assert advisory.weaknesses == [] + + def test_original_advisory_text_is_pretty_json(self): + advisory = parse_advisory(self.row) + parsed = json.loads(advisory.original_advisory_text) + assert parsed["advisory_id"] == "CPAI-2026-1780" + assert "\n" in advisory.original_advisory_text + + def test_missing_id_returns_none(self): + assert parse_advisory({}) is None + assert parse_advisory({"advisory_id": ""}) is None + assert parse_advisory({"advisory_id": "INVALID-123"}) is None + + def test_no_cve_yields_empty_aliases(self): + row = dict(self.row) + row["cve_id"] = "" + advisory = parse_advisory(row) + assert advisory.aliases == [] + + def test_critical_severity(self): + advisory = parse_advisory(SAMPLE_ROWS[1]) + assert advisory.severities[0].value == "Critical" + + +class TestCheckPointImporterPipeline(TestCase): + def setUp(self): + self.pipeline = CheckPointImporterPipeline() + self.pipeline.advisories_data = SAMPLE_ROWS + + def test_advisories_count(self): + assert self.pipeline.advisories_count() == 3 + + def test_collect_advisories_yields_all_valid(self): + advisories = list(self.pipeline.collect_advisories()) + assert len(advisories) == 3 + + @patch("vulnerabilities.pipelines.v2_importers.checkpoint_importer.requests.get") + def test_fetch_stores_advisory_rows(self, mock_get): + mock_resp = MagicMock() + mock_resp.text = SAMPLE_HTML + mock_get.return_value = mock_resp + self.pipeline.fetch() + assert len(self.pipeline.advisories_data) > 0 + + @patch("vulnerabilities.pipelines.v2_importers.checkpoint_importer.requests.get") + def test_fetch_handles_request_error(self, mock_get): + mock_get.side_effect = requests.exceptions.RequestException("timeout") + self.pipeline.fetch() + assert self.pipeline.advisories_data == [] diff --git a/vulnerabilities/tests/test_data/checkpoint/advisories_2026.html b/vulnerabilities/tests/test_data/checkpoint/advisories_2026.html new file mode 100644 index 000000000..89cb9391f --- /dev/null +++ b/vulnerabilities/tests/test_data/checkpoint/advisories_2026.html @@ -0,0 +1,60 @@ + + +
| Severity | +Date Published | +Date Updated | +Check Point Reference | +Source | +Industry Reference | +Description | +
|---|---|---|---|---|---|---|
| Medium | +17 Mar 2026 | +17 Mar 2026 | +CPAI-2026-1780 | ++ | CVE-2026-20122 | +Cisco Catalyst SD-WAN Manager Arbitrary File Write (CVE-2026-20122) | +
| Critical | +10 Jan 2026 | +15 Jan 2026 | +CPAI-2026-0042 | ++ | CVE-2026-1001 | +Example Critical Vulnerability Remote Code Execution | +
| High | +05 Feb 2026 | +05 Feb 2026 | +CPAI-2026-0210 | ++ | CVE-2025-33603 (and 2 others) | +Example High Severity Stack Overflow | +