From c12ed5fec54a2c586d6accf724bcbe0bc9155e29 Mon Sep 17 00:00:00 2001 From: ziad hany Date: Thu, 8 Jan 2026 23:06:45 +0200 Subject: [PATCH 1/5] Migrate Alpine importer to advisory V2 Signed-off-by: ziad hany --- vulnerabilities/importers/__init__.py | 2 + .../v2_importers/alpine_linux_importer.py | 296 +++++ vulnerabilities/references.py | 68 ++ .../test_alpine_linux_importer_pipeline.py | 1032 +++++++++++++++++ 4 files changed, 1398 insertions(+) create mode 100644 vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py create mode 100644 vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 6d5e0eff8..c7ac8af2b 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -41,6 +41,7 @@ from vulnerabilities.pipelines import nvd_importer from vulnerabilities.pipelines import pypa_importer from vulnerabilities.pipelines import pysec_importer +from vulnerabilities.pipelines.v2_importers import alpine_linux_importer as alpine_linux_importer_v2 from vulnerabilities.pipelines.v2_importers import aosp_importer as aosp_importer_v2 from vulnerabilities.pipelines.v2_importers import apache_httpd_importer as apache_httpd_v2 from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2 @@ -105,6 +106,7 @@ nginx_importer_v2.NginxImporterPipeline, mattermost_importer_v2.MattermostImporterPipeline, apache_tomcat_v2.ApacheTomcatImporterPipeline, + alpine_linux_importer_v2.AlpineLinuxImporterPipeline, nvd_importer.NVDImporterPipeline, github_importer.GitHubAPIImporterPipeline, gitlab_importer.GitLabImporterPipeline, diff --git a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py new file mode 100644 index 000000000..55cc2ced3 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py @@ -0,0 +1,296 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import logging +from typing import Any +from typing import Iterable +from typing import List +from typing import Mapping +from urllib.parse import urljoin + +from bs4 import BeautifulSoup +from packageurl import PackageURL +from univers.version_range import AlpineLinuxVersionRange +from univers.versions import InvalidVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline +from vulnerabilities.references import WireSharkReferenceV2 +from vulnerabilities.references import XsaReferenceV2 +from vulnerabilities.references import ZbxReferenceV2 +from vulnerabilities.utils import fetch_response + + +class AlpineLinuxImporterPipeline(VulnerableCodeBaseImporterPipeline): + """Collect Alpine Linux advisories.""" + + pipeline_id = "alpine_linux_importer_v2" + + spdx_license_expression = "CC-BY-SA-4.0" + license_url = "https://secdb.alpinelinux.org/license.txt" + url = "https://secdb.alpinelinux.org/" + importer_name = "Alpine Linux Importer" + + @classmethod + def steps(cls): + return (cls.collect_and_store_advisories,) + + def advisories_count(self) -> int: + return 0 + + def collect_advisories(self) -> Iterable[AdvisoryData]: + page_response_content = fetch_response(self.url).content + advisory_directory_links = fetch_advisory_directory_links( + page_response_content, self.url, self.log + ) + advisory_links = [] + for advisory_directory_link in advisory_directory_links: + advisory_directory_page = fetch_response(advisory_directory_link).content + advisory_links.extend( + fetch_advisory_links(advisory_directory_page, advisory_directory_link, self.log) + ) + for link in advisory_links: + record = fetch_response(link).json() + if not record["packages"]: + self.log( + f'"packages" not found in {link!r}', + level=logging.DEBUG, + ) + continue + yield from process_record(record=record, url=link, logger=self.log) + + +def fetch_advisory_directory_links( + page_response_content: str, + base_url: str, + logger: callable = None, +) -> List[str]: + """ + Return a list of advisory directory links present in `page_response_content` html string + """ + index_page = BeautifulSoup(page_response_content, features="lxml") + alpine_versions = [ + link.text + for link in index_page.find_all("a") + if link.text.startswith("v") or link.text.startswith("edge") + ] + + if not alpine_versions: + if logger: + logger( + f"No versions found in {base_url!r}", + level=logging.DEBUG, + ) + return [] + + advisory_directory_links = [urljoin(base_url, version) for version in alpine_versions] + + return advisory_directory_links + + +def fetch_advisory_links( + advisory_directory_page: str, + advisory_directory_link: str, + logger: callable = None, +) -> Iterable[str]: + """ + Yield json file urls present in `advisory_directory_page` + """ + advisory_directory_page = BeautifulSoup(advisory_directory_page, features="lxml") + anchor_tags = advisory_directory_page.find_all("a") + if not anchor_tags: + if logger: + logger( + f"No anchor tags found in {advisory_directory_link!r}", + level=logging.DEBUG, + ) + return iter([]) + for anchor_tag in anchor_tags: + if anchor_tag.text.endswith("json"): + yield urljoin(advisory_directory_link, anchor_tag.text) + + +def check_for_attributes(record, logger) -> bool: + attributes = ["distroversion", "reponame", "archs"] + for attribute in attributes: + if attribute not in record: + if logger: + logger( + f'"{attribute!r}" not found in {record!r}', + level=logging.DEBUG, + ) + return False + return True + + +def process_record(record: dict, url: str, logger: callable = None) -> Iterable[AdvisoryData]: + """ + Return a list of AdvisoryData objects by processing data + present in that `record` + """ + if not record.get("packages"): + if logger: + logger( + f'"packages" not found in this record {record!r}', + level=logging.DEBUG, + ) + return [] + + for package in record["packages"]: + if not package["pkg"]: + if logger: + logger( + f'"pkg" not found in this package {package!r}', + level=logging.DEBUG, + ) + continue + if not check_for_attributes(record, logger): + continue + yield from load_advisories( + pkg_infos=package["pkg"], + distroversion=record["distroversion"], + reponame=record["reponame"], + archs=record["archs"], + url=url, + logger=logger, + ) + + +def load_advisories( + pkg_infos: Mapping[str, Any], + distroversion: str, + reponame: str, + archs: List[str], + url: str, + logger: callable = None, +) -> Iterable[AdvisoryData]: + """ + Yield AdvisoryData by mapping data from `pkg_infos` + and form PURL for AffectedPackages by using + `distroversion`, `reponame`, `archs` + """ + if not pkg_infos.get("name"): + if logger: + logger( + f'"name" is not available in package {pkg_infos!r}', + level=logging.DEBUG, + ) + return [] + + for version, fixed_vulns in pkg_infos["secfixes"].items(): + if not fixed_vulns: + if logger: + logger( + f"No fixed vulnerabilities in version {version!r}", + level=logging.DEBUG, + ) + continue + # fixed_vulns is a list of strings and each string is a space-separated + # list of aliases and CVES + for vuln_ids in fixed_vulns: + if not isinstance(vuln_ids, str): + if logger: + logger( + f"{vuln_ids!r} is not of `str` instance", + level=logging.DEBUG, + ) + continue + vuln_ids = vuln_ids.strip().split() + if not vuln_ids: + if logger: + logger( + f"{vuln_ids!r} is empty", + level=logging.DEBUG, + ) + continue + aliases = vuln_ids + + references = [] + for reference_id in vuln_ids: + if reference_id.startswith("XSA"): + references.append(XsaReferenceV2.from_id(xsa_id=reference_id)) + + elif reference_id.startswith("ZBX"): + references.append(ZbxReferenceV2.from_id(zbx_id=reference_id)) + + elif reference_id.startswith("wnpa-sec"): + references.append(WireSharkReferenceV2.from_id(wnpa_sec_id=reference_id)) + + elif reference_id.startswith("CVE"): + references.append( + ReferenceV2( + reference_id=reference_id, + url=f"https://nvd.nist.gov/vuln/detail/{reference_id}", + ) + ) + + qualifiers = { + "distroversion": distroversion, + "reponame": reponame, + } + + affected_packages = [] + + fixed_version_range = None + try: + fixed_version_range = AlpineLinuxVersionRange.from_versions([version]) + except InvalidVersion as e: + if logger: + logger( + f"{version!r} is not a valid AlpineVersion {e!r}", + level=logging.DEBUG, + ) + + if not isinstance(archs, List): + if logger: + logger( + f"{archs!r} is not of `List` instance", + level=logging.DEBUG, + ) + continue + + if archs and fixed_version_range: + for arch in archs: + qualifiers["arch"] = arch + affected_packages.append( + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name=pkg_infos["name"], + qualifiers=qualifiers, + ), + fixed_version_range=fixed_version_range, + ) + ) + + if not archs and fixed_version_range: + # there is no arch, this is not an arch-specific package + affected_packages.append( + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name=pkg_infos["name"], + qualifiers=qualifiers, + ), + fixed_version_range=fixed_version_range, + ) + ) + + for advisory_id in aliases: + yield AdvisoryData( + advisory_id=advisory_id, + aliases=[], + references_v2=references, + affected_packages=affected_packages, + url=url, + ) diff --git a/vulnerabilities/references.py b/vulnerabilities/references.py index 47225f520..b543e813b 100644 --- a/vulnerabilities/references.py +++ b/vulnerabilities/references.py @@ -8,6 +8,7 @@ # from vulnerabilities.importer import Reference +from vulnerabilities.importer import ReferenceV2 class XsaReference(Reference): @@ -75,3 +76,70 @@ def from_id(cls, wnpa_sec_id): reference_id=wnpa_sec_id, url=f"https://www.wireshark.org/security/{wnpa_sec_id}.html", ) + + +class XsaReferenceV2: + """ + A Xen advisory reference. See https://xenbits.xen.org/xsa + """ + + @classmethod + def from_id(cls, xsa_id): + """ + Return a new XsaReference from an XSA-XXXX id. + """ + if not xsa_id or not xsa_id.lower().startswith("xsa"): + return ValueError(f"Not a Xen reference. Does not start with XSA: {xsa_id!r}") + _, numid = xsa_id.rsplit("-") + return ReferenceV2( + reference_id=xsa_id, + url=f"https://xenbits.xen.org/xsa/advisory-{numid}.html", + ) + + @classmethod + def from_number(cls, number): + """ + Return a new XsaReference from an XSA number. + """ + return ReferenceV2( + reference_id=f"XSA-{number}", + url=f"https://xenbits.xen.org/xsa/advisory-{number}.html", + ) + + +class ZbxReferenceV2: + """ + A Zabbix advisory reference. See https://support.zabbix.com + """ + + @classmethod + def from_id(cls, zbx_id): + """ + Return a new ZbxReference from an ZBX-XXXX id. + """ + if not zbx_id or not zbx_id.lower().startswith("zbx"): + return ValueError(f"Not a Zabbix reference. Does not start with ZBX: {zbx_id!r}") + return ReferenceV2( + reference_id=zbx_id, + url=f"https://support.zabbix.com/browse/{zbx_id}", + ) + + +class WireSharkReferenceV2: + """ + A Wireshark advisory reference. See https://www.wireshark.org/security + """ + + @classmethod + def from_id(cls, wnpa_sec_id): + """ + Return a new WireSharkReference from an wnpa-sec-XXXX id. + """ + if not wnpa_sec_id or not wnpa_sec_id.lower().startswith("wnpa-sec"): + return ValueError( + f"Not a WireShark reference. Does not start with wnpa-sec: {wnpa_sec_id!r}" + ) + return ReferenceV2( + reference_id=wnpa_sec_id, + url=f"https://www.wireshark.org/security/{wnpa_sec_id}.html", + ) diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py new file mode 100644 index 000000000..a41f1ebbc --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py @@ -0,0 +1,1032 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +from pathlib import Path + +import pytest +from packageurl import PackageURL +from univers.version_constraint import VersionConstraint +from univers.version_range import AlpineLinuxVersionRange +from univers.versions import AlpineLinuxVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import ( + fetch_advisory_directory_links, +) +from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import fetch_advisory_links +from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import load_advisories +from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import process_record +from vulnerabilities.tests.pipelines import TestLogger + +TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "alpine" + + +def test_process_record(): + logger = TestLogger() + expected_advisories = [ + AdvisoryData( + advisory_id="XSA-248", + aliases=[], + summary="", + affected_packages=[ + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={ + "arch": "aarch64", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={ + "arch": "ppc64le", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + ], + references=[], + references_v2=[ + ReferenceV2( + reference_id="XSA-248", + reference_type="", + url="https://xenbits.xen.org/xsa/advisory-248.html", + ) + ], + patches=[], + date_published=None, + weaknesses=[], + severities=[], + url="https://secdb.alpinelinux.org/v3.11/", + original_advisory_text=None, + ), + AdvisoryData( + advisory_id="XSA-252", + aliases=[], + summary="", + affected_packages=[], + references=[], + references_v2=[ + ReferenceV2( + reference_id="XSA-252", + reference_type="", + url="https://xenbits.xen.org/xsa/advisory-252.html", + ) + ], + patches=[], + date_published=None, + weaknesses=[], + severities=[], + url="https://secdb.alpinelinux.org/v3.11/", + original_advisory_text=None, + ), + AdvisoryData( + advisory_id="CVE-2018-7540", + aliases=[], + summary="", + affected_packages=[ + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={ + "arch": "aarch64", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={ + "arch": "ppc64le", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + ], + references=[], + references_v2=[ + ReferenceV2( + reference_id="CVE-2018-7540", + reference_type="", + url="https://nvd.nist.gov/vuln/detail/CVE-2018-7540", + ), + ReferenceV2( + reference_id="XSA-252", + reference_type="", + url="https://xenbits.xen.org/xsa/advisory-252.html", + ), + ], + patches=[], + date_published=None, + weaknesses=[], + severities=[], + url="https://secdb.alpinelinux.org/v3.11/", + original_advisory_text=None, + ), + AdvisoryData( + advisory_id="XSA-252", + aliases=[], + summary="", + affected_packages=[ + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={ + "arch": "aarch64", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={ + "arch": "ppc64le", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="xen", + version=None, + qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + ], + references=[], + references_v2=[ + ReferenceV2( + reference_id="CVE-2018-7540", + reference_type="", + url="https://nvd.nist.gov/vuln/detail/CVE-2018-7540", + ), + ReferenceV2( + reference_id="XSA-252", + reference_type="", + url="https://xenbits.xen.org/xsa/advisory-252.html", + ), + ], + patches=[], + date_published=None, + weaknesses=[], + severities=[], + url="https://secdb.alpinelinux.org/v3.11/", + original_advisory_text=None, + ), + AdvisoryData( + advisory_id="CVE-2017-9669", + aliases=[], + summary="", + affected_packages=[ + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={ + "arch": "aarch64", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={ + "arch": "ppc64le", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + ], + references=[], + references_v2=[ + ReferenceV2( + reference_id="CVE-2017-9669", + reference_type="", + url="https://nvd.nist.gov/vuln/detail/CVE-2017-9669", + ) + ], + patches=[], + date_published=None, + weaknesses=[], + severities=[], + url="https://secdb.alpinelinux.org/v3.11/", + original_advisory_text=None, + ), + AdvisoryData( + advisory_id="CVE-2017-9671", + aliases=[], + summary="", + affected_packages=[ + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={ + "arch": "aarch64", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={ + "arch": "ppc64le", + "distroversion": "v3.11", + "reponame": "main", + }, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL( + type="apk", + namespace="alpine", + name="apk-tools", + version=None, + qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, + subpath=None, + ), + affected_version_range=None, + fixed_version_range=AlpineLinuxVersionRange( + constraints=( + VersionConstraint( + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + ), + ) + ), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + ], + references=[], + references_v2=[ + ReferenceV2( + reference_id="CVE-2017-9671", + reference_type="", + url="https://nvd.nist.gov/vuln/detail/CVE-2017-9671", + ) + ], + patches=[], + date_published=None, + weaknesses=[], + severities=[], + url="https://secdb.alpinelinux.org/v3.11/", + original_advisory_text=None, + ), + ] + with open(TEST_DATA / "v3.11/main.json") as f: + found_advisories = list( + process_record( + json.loads(f.read()), + "https://secdb.alpinelinux.org/v3.11/", + logger=logger.write, + ) + ) + assert found_advisories == expected_advisories + assert ( + "'4.10-1-r1' is not a valid AlpineVersion InvalidVersion(\"'4.10-1-r1' is not a valid \")" + in logger.getvalue() + ) + + +def test_fetch_advisory_directory_links(): + expected = [ + "https://secdb.alpinelinux.org/edge/", + "https://secdb.alpinelinux.org/v3.10/", + "https://secdb.alpinelinux.org/v3.11/", + "https://secdb.alpinelinux.org/v3.12/", + "https://secdb.alpinelinux.org/v3.13/", + "https://secdb.alpinelinux.org/v3.14/", + "https://secdb.alpinelinux.org/v3.15/", + "https://secdb.alpinelinux.org/v3.2/", + "https://secdb.alpinelinux.org/v3.3/", + "https://secdb.alpinelinux.org/v3.4/", + "https://secdb.alpinelinux.org/v3.5/", + "https://secdb.alpinelinux.org/v3.6/", + "https://secdb.alpinelinux.org/v3.7/", + "https://secdb.alpinelinux.org/v3.8/", + "https://secdb.alpinelinux.org/v3.9/", + ] + with open(TEST_DATA / "web_pages/directory.html") as f: + assert ( + fetch_advisory_directory_links(f.read(), "https://secdb.alpinelinux.org/") == expected + ) + + +def test_fetch_advisory_directory_links_failure(): + logger = TestLogger() + with open(TEST_DATA / "web_pages/fail_directory.html") as f: + assert ( + fetch_advisory_directory_links( + f.read(), "https://secdb.alpinelinux.org/", logger=logger.write + ) + == [] + ) + assert "No versions found in 'https://secdb.alpinelinux.org/'" in logger.getvalue() + + +def test_fetch_advisory_links(): + expected = [ + "https://secdb.alpinelinux.org/v3.11/community.json", + "https://secdb.alpinelinux.org/v3.11/main.json", + ] + with open(TEST_DATA / "web_pages/v3.11.html") as f: + assert ( + list(fetch_advisory_links(f.read(), "https://secdb.alpinelinux.org/v3.11/")) == expected + ) + + +def test_fetch_advisory_links_failure(): + logger = TestLogger() + with open(TEST_DATA / "web_pages/fail_directory.html") as f: + assert list(fetch_advisory_links(f.read(), "v3.11", logger=logger.write)) == [] + assert "No anchor tags found in 'v3.11'" in logger.getvalue() + + +def test_process_record_without_packages(): + logger = TestLogger() + with open(TEST_DATA / TEST_DATA / "v3.3/community.json") as f: + assert list(process_record(json.loads(f.read()), "", logger=logger.write)) == [] + assert ( + "\"packages\" not found in this record {'apkurl': '{{urlprefix}}/{{distroversion}}/{{reponame}}/{{arch}}/{{pkg.name}}-{{pkg.ver}}.apk', 'archs': ['armhf', 'x86', 'x86_64'], 'reponame': 'community', 'urlprefix': 'https://dl-cdn.alpinelinux.org/alpine', 'distroversion': 'v3.3', 'packages': []}" + in logger.getvalue() + ) + + +def test_load_advisories_package_without_name(): + logger = TestLogger() + package = { + "secfixes": {"4.10.0-r1": ["XSA-248"], "4.10.0-r2": ["CVE-2018-7540 XSA-252"]}, + } + list(load_advisories(package, "v3.11", "main", archs=[], url="", logger=logger.write)) + assert ( + "\"name\" is not available in package {'secfixes': {'4.10.0-r1': ['XSA-248'], '4.10.0-r2': ['CVE-2018-7540 XSA-252']}}" + in logger.getvalue() + ) + + +def test_load_advisories_package_without_secfixes(): + logger = TestLogger() + package = { + "name": "xen", + "secfixes": {"4.10.0-r1": []}, + } + list(load_advisories(package, "v3.11", "main", archs=[], url="", logger=logger.write)) + assert "No fixed vulnerabilities in version '4.10.0-r1'" in logger.getvalue() + + +@pytest.mark.parametrize( + "test_case", + [ + # these are the tests are not supported yet + # when we start supporting these version, + # they will be moved back to main test suite + "1.9.5p2-r0", + "6.6.2p1-r0", + "6.6.4p1-r1", + "4.10-1-r1", + ], +) +def test_load_advisories_package_with_invalid_alpine_version(test_case): + logger = TestLogger() + package = { + "name": "xen", + "secfixes": {f"{test_case}": ["XSA-248"]}, + } + result = list(load_advisories(package, "v3.11", "main", archs=[], url="", logger=logger.write)) + assert result != [] From b935cf9bdb8152f0e1a6928cb7eda27c668675e8 Mon Sep 17 00:00:00 2001 From: ziad hany Date: Mon, 12 Jan 2026 15:04:32 +0200 Subject: [PATCH 2/5] Fix typo Alpine Linux importer pipeline inherits from VulnerableCodeBaseImporterPipelineV2 Signed-off-by: ziad hany --- .../pipelines/v2_importers/alpine_linux_importer.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py index 55cc2ced3..b136d4957 100644 --- a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py +++ b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py @@ -22,22 +22,20 @@ from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackageV2 from vulnerabilities.importer import ReferenceV2 -from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 from vulnerabilities.references import WireSharkReferenceV2 from vulnerabilities.references import XsaReferenceV2 from vulnerabilities.references import ZbxReferenceV2 from vulnerabilities.utils import fetch_response -class AlpineLinuxImporterPipeline(VulnerableCodeBaseImporterPipeline): +class AlpineLinuxImporterPipeline(VulnerableCodeBaseImporterPipelineV2): """Collect Alpine Linux advisories.""" pipeline_id = "alpine_linux_importer_v2" - spdx_license_expression = "CC-BY-SA-4.0" license_url = "https://secdb.alpinelinux.org/license.txt" url = "https://secdb.alpinelinux.org/" - importer_name = "Alpine Linux Importer" @classmethod def steps(cls): From b7aff19ab38243cbad9295fcdc3361d027af649b Mon Sep 17 00:00:00 2001 From: ziad hany Date: Thu, 15 Jan 2026 14:56:13 +0200 Subject: [PATCH 3/5] Update Alpine to avoid fetching the same URL links multiple times Signed-off-by: ziad hany --- .../v2_importers/alpine_linux_importer.py | 47 +++++++++++-------- .../test_alpine_linux_importer_pipeline.py | 13 ++--- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py index b136d4957..fa5061635 100644 --- a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py +++ b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py @@ -49,12 +49,17 @@ def collect_advisories(self) -> Iterable[AdvisoryData]: advisory_directory_links = fetch_advisory_directory_links( page_response_content, self.url, self.log ) - advisory_links = [] + advisory_links = set() + visited_directories = set() for advisory_directory_link in advisory_directory_links: + if advisory_directory_link in visited_directories: + continue + advisory_directory_page = fetch_response(advisory_directory_link).content - advisory_links.extend( + advisory_links.update( fetch_advisory_links(advisory_directory_page, advisory_directory_link, self.log) ) + for link in advisory_links: record = fetch_response(link).json() if not record["packages"]: @@ -241,11 +246,10 @@ def load_advisories( try: fixed_version_range = AlpineLinuxVersionRange.from_versions([version]) except InvalidVersion as e: - if logger: - logger( - f"{version!r} is not a valid AlpineVersion {e!r}", - level=logging.DEBUG, - ) + logger( + f"{version!r} is not a valid AlpineVersion {e!r}", + level=logging.DEBUG, + ) if not isinstance(archs, List): if logger: @@ -258,33 +262,36 @@ def load_advisories( if archs and fixed_version_range: for arch in archs: qualifiers["arch"] = arch + purl = PackageURL( + type="apk", + namespace="alpine", + name=pkg_infos["name"], + qualifiers=qualifiers, + ) affected_packages.append( AffectedPackageV2( - package=PackageURL( - type="apk", - namespace="alpine", - name=pkg_infos["name"], - qualifiers=qualifiers, - ), + package=purl, fixed_version_range=fixed_version_range, ) ) if not archs and fixed_version_range: # there is no arch, this is not an arch-specific package + purl = PackageURL( + type="apk", + namespace="alpine", + name=pkg_infos["name"], + qualifiers=qualifiers, + ) affected_packages.append( AffectedPackageV2( - package=PackageURL( - type="apk", - namespace="alpine", - name=pkg_infos["name"], - qualifiers=qualifiers, - ), + package=purl, fixed_version_range=fixed_version_range, ) ) - for advisory_id in aliases: + for cve in aliases: + advisory_id = f"{pkg_infos['name']}/{qualifiers['distroversion']}/{cve}" yield AdvisoryData( advisory_id=advisory_id, aliases=[], diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py index a41f1ebbc..558d49653 100644 --- a/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py +++ b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py @@ -34,7 +34,7 @@ def test_process_record(): logger = TestLogger() expected_advisories = [ AdvisoryData( - advisory_id="XSA-248", + advisory_id="xen/v3.11/XSA-248", aliases=[], summary="", affected_packages=[ @@ -203,7 +203,7 @@ def test_process_record(): original_advisory_text=None, ), AdvisoryData( - advisory_id="XSA-252", + advisory_id="xen/v3.11/XSA-252", aliases=[], summary="", affected_packages=[], @@ -223,7 +223,7 @@ def test_process_record(): original_advisory_text=None, ), AdvisoryData( - advisory_id="CVE-2018-7540", + advisory_id="xen/v3.11/CVE-2018-7540", aliases=[], summary="", affected_packages=[ @@ -397,7 +397,7 @@ def test_process_record(): original_advisory_text=None, ), AdvisoryData( - advisory_id="XSA-252", + advisory_id="xen/v3.11/XSA-252", aliases=[], summary="", affected_packages=[ @@ -571,7 +571,7 @@ def test_process_record(): original_advisory_text=None, ), AdvisoryData( - advisory_id="CVE-2017-9669", + advisory_id="apk-tools/v3.11/CVE-2017-9669", aliases=[], summary="", affected_packages=[ @@ -740,7 +740,7 @@ def test_process_record(): original_advisory_text=None, ), AdvisoryData( - advisory_id="CVE-2017-9671", + advisory_id="apk-tools/v3.11/CVE-2017-9671", aliases=[], summary="", affected_packages=[ @@ -909,6 +909,7 @@ def test_process_record(): original_advisory_text=None, ), ] + with open(TEST_DATA / "v3.11/main.json") as f: found_advisories = list( process_record( From e9bd3cdf865f225b5918818c9af0a58b482fc4a6 Mon Sep 17 00:00:00 2001 From: ziad hany Date: Mon, 26 Jan 2026 23:54:13 +0200 Subject: [PATCH 4/5] Update alpine linux so for every vulnerability id AdvisoryData Fix duplication on advisory_id Signed-off-by: ziad hany --- .../v2_importers/alpine_linux_importer.py | 47 ++-- .../test_alpine_linux_importer_pipeline.py | 214 +++++++++--------- 2 files changed, 126 insertions(+), 135 deletions(-) diff --git a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py index fa5061635..0ae811d8a 100644 --- a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py +++ b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py @@ -198,6 +198,7 @@ def load_advisories( continue # fixed_vulns is a list of strings and each string is a space-separated # list of aliases and CVES + aliases = set() for vuln_ids in fixed_vulns: if not isinstance(vuln_ids, str): if logger: @@ -214,26 +215,27 @@ def load_advisories( level=logging.DEBUG, ) continue - aliases = vuln_ids + aliases.update(vuln_ids) + for vuln_id in aliases: references = [] - for reference_id in vuln_ids: - if reference_id.startswith("XSA"): - references.append(XsaReferenceV2.from_id(xsa_id=reference_id)) - elif reference_id.startswith("ZBX"): - references.append(ZbxReferenceV2.from_id(zbx_id=reference_id)) + if vuln_id.startswith("XSA"): + references.append(XsaReferenceV2.from_id(xsa_id=vuln_id)) - elif reference_id.startswith("wnpa-sec"): - references.append(WireSharkReferenceV2.from_id(wnpa_sec_id=reference_id)) + elif vuln_id.startswith("ZBX"): + references.append(ZbxReferenceV2.from_id(zbx_id=vuln_id)) - elif reference_id.startswith("CVE"): - references.append( - ReferenceV2( - reference_id=reference_id, - url=f"https://nvd.nist.gov/vuln/detail/{reference_id}", - ) + elif vuln_id.startswith("wnpa-sec"): + references.append(WireSharkReferenceV2.from_id(wnpa_sec_id=vuln_id)) + + elif vuln_id.startswith("CVE"): + references.append( + ReferenceV2( + reference_id=vuln_id, + url=f"https://nvd.nist.gov/vuln/detail/{vuln_id}", ) + ) qualifiers = { "distroversion": distroversion, @@ -290,12 +292,11 @@ def load_advisories( ) ) - for cve in aliases: - advisory_id = f"{pkg_infos['name']}/{qualifiers['distroversion']}/{cve}" - yield AdvisoryData( - advisory_id=advisory_id, - aliases=[], - references_v2=references, - affected_packages=affected_packages, - url=url, - ) + advisory_id = f"{pkg_infos['name']}/{qualifiers['distroversion']}/{version}/{vuln_id}" + yield AdvisoryData( + advisory_id=advisory_id, + aliases=[vuln_id], + references_v2=references, + affected_packages=affected_packages, + url=url, + ) diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py index 558d49653..74db189fb 100644 --- a/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py +++ b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py @@ -34,15 +34,15 @@ def test_process_record(): logger = TestLogger() expected_advisories = [ AdvisoryData( - advisory_id="xen/v3.11/XSA-248", - aliases=[], + advisory_id="apk-tools/v3.11/2.7.2-r0/CVE-2017-9669", + aliases=["CVE-2017-9669"], summary="", affected_packages=[ AffectedPackageV2( package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={ "arch": "aarch64", @@ -55,7 +55,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -66,7 +66,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -75,7 +75,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -86,7 +86,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -95,7 +95,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -106,7 +106,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={ "arch": "ppc64le", @@ -119,7 +119,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -130,7 +130,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -139,7 +139,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -150,7 +150,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -159,7 +159,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -170,7 +170,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -179,7 +179,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -190,29 +190,9 @@ def test_process_record(): references=[], references_v2=[ ReferenceV2( - reference_id="XSA-248", - reference_type="", - url="https://xenbits.xen.org/xsa/advisory-248.html", - ) - ], - patches=[], - date_published=None, - weaknesses=[], - severities=[], - url="https://secdb.alpinelinux.org/v3.11/", - original_advisory_text=None, - ), - AdvisoryData( - advisory_id="xen/v3.11/XSA-252", - aliases=[], - summary="", - affected_packages=[], - references=[], - references_v2=[ - ReferenceV2( - reference_id="XSA-252", + reference_id="CVE-2017-9669", reference_type="", - url="https://xenbits.xen.org/xsa/advisory-252.html", + url="https://nvd.nist.gov/vuln/detail/CVE-2017-9669", ) ], patches=[], @@ -223,15 +203,15 @@ def test_process_record(): original_advisory_text=None, ), AdvisoryData( - advisory_id="xen/v3.11/CVE-2018-7540", - aliases=[], + advisory_id="apk-tools/v3.11/2.7.2-r0/CVE-2017-9671", + aliases=["CVE-2017-9671"], summary="", affected_packages=[ AffectedPackageV2( package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={ "arch": "aarch64", @@ -244,7 +224,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -255,7 +235,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -264,7 +244,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -275,7 +255,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -284,7 +264,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -295,7 +275,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={ "arch": "ppc64le", @@ -308,7 +288,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -319,7 +299,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -328,7 +308,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -339,7 +319,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -348,7 +328,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -359,7 +339,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="xen", + name="apk-tools", version=None, qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -368,7 +348,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") ), ) ), @@ -379,15 +359,30 @@ def test_process_record(): references=[], references_v2=[ ReferenceV2( - reference_id="CVE-2018-7540", + reference_id="CVE-2017-9671", reference_type="", - url="https://nvd.nist.gov/vuln/detail/CVE-2018-7540", - ), + url="https://nvd.nist.gov/vuln/detail/CVE-2017-9671", + ) + ], + patches=[], + date_published=None, + weaknesses=[], + severities=[], + url="https://secdb.alpinelinux.org/v3.11/", + original_advisory_text=None, + ), + AdvisoryData( + advisory_id="xen/v3.11/4.10-1-r1/XSA-252", + aliases=["XSA-252"], + summary="", + affected_packages=[], + references=[], + references_v2=[ ReferenceV2( reference_id="XSA-252", reference_type="", url="https://xenbits.xen.org/xsa/advisory-252.html", - ), + ) ], patches=[], date_published=None, @@ -397,8 +392,8 @@ def test_process_record(): original_advisory_text=None, ), AdvisoryData( - advisory_id="xen/v3.11/XSA-252", - aliases=[], + advisory_id="xen/v3.11/4.10.0-r1/XSA-248", + aliases=["XSA-248"], summary="", affected_packages=[ AffectedPackageV2( @@ -418,7 +413,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") ), ) ), @@ -438,7 +433,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") ), ) ), @@ -458,7 +453,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") ), ) ), @@ -482,7 +477,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") ), ) ), @@ -502,7 +497,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") ), ) ), @@ -522,7 +517,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") ), ) ), @@ -542,7 +537,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r1") ), ) ), @@ -553,15 +548,10 @@ def test_process_record(): references=[], references_v2=[ ReferenceV2( - reference_id="CVE-2018-7540", - reference_type="", - url="https://nvd.nist.gov/vuln/detail/CVE-2018-7540", - ), - ReferenceV2( - reference_id="XSA-252", + reference_id="XSA-248", reference_type="", - url="https://xenbits.xen.org/xsa/advisory-252.html", - ), + url="https://xenbits.xen.org/xsa/advisory-248.html", + ) ], patches=[], date_published=None, @@ -571,15 +561,15 @@ def test_process_record(): original_advisory_text=None, ), AdvisoryData( - advisory_id="apk-tools/v3.11/CVE-2017-9669", - aliases=[], + advisory_id="xen/v3.11/4.10.0-r2/CVE-2018-7540", + aliases=["CVE-2018-7540"], summary="", affected_packages=[ AffectedPackageV2( package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={ "arch": "aarch64", @@ -592,7 +582,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -603,7 +593,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -612,7 +602,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -623,7 +613,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -632,7 +622,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -643,7 +633,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={ "arch": "ppc64le", @@ -656,7 +646,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -667,7 +657,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -676,7 +666,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -687,7 +677,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -696,7 +686,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -707,7 +697,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -716,7 +706,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -727,9 +717,9 @@ def test_process_record(): references=[], references_v2=[ ReferenceV2( - reference_id="CVE-2017-9669", + reference_id="CVE-2018-7540", reference_type="", - url="https://nvd.nist.gov/vuln/detail/CVE-2017-9669", + url="https://nvd.nist.gov/vuln/detail/CVE-2018-7540", ) ], patches=[], @@ -740,15 +730,15 @@ def test_process_record(): original_advisory_text=None, ), AdvisoryData( - advisory_id="apk-tools/v3.11/CVE-2017-9671", - aliases=[], + advisory_id="xen/v3.11/4.10.0-r2/XSA-252", + aliases=["XSA-252"], summary="", affected_packages=[ AffectedPackageV2( package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={ "arch": "aarch64", @@ -761,7 +751,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -772,7 +762,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -781,7 +771,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -792,7 +782,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -801,7 +791,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -812,7 +802,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={ "arch": "ppc64le", @@ -825,7 +815,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -836,7 +826,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -845,7 +835,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -856,7 +846,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -865,7 +855,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -876,7 +866,7 @@ def test_process_record(): package=PackageURL( type="apk", namespace="alpine", - name="apk-tools", + name="xen", version=None, qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, subpath=None, @@ -885,7 +875,7 @@ def test_process_record(): fixed_version_range=AlpineLinuxVersionRange( constraints=( VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="2.7.2-r0") + comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") ), ) ), @@ -896,9 +886,9 @@ def test_process_record(): references=[], references_v2=[ ReferenceV2( - reference_id="CVE-2017-9671", + reference_id="XSA-252", reference_type="", - url="https://nvd.nist.gov/vuln/detail/CVE-2017-9671", + url="https://xenbits.xen.org/xsa/advisory-252.html", ) ], patches=[], @@ -918,7 +908,7 @@ def test_process_record(): logger=logger.write, ) ) - assert found_advisories == expected_advisories + assert sorted(found_advisories, key=lambda adv: adv.advisory_id) == expected_advisories assert ( "'4.10-1-r1' is not a valid AlpineVersion InvalidVersion(\"'4.10-1-r1' is not a valid \")" in logger.getvalue() From 0bb7b03fa0efefd1eb4b501220e61b5f86df74fb Mon Sep 17 00:00:00 2001 From: ziad hany Date: Fri, 30 Jan 2026 03:50:30 +0200 Subject: [PATCH 5/5] Update Alpine pipeline to use aboutcode-mirror-alpine-secdb Signed-off-by: ziad hany --- .../v2_importers/alpine_linux_importer.py | 138 ++++------- .../test_alpine_linux_importer_pipeline.py | 229 +----------------- 2 files changed, 47 insertions(+), 320 deletions(-) diff --git a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py index 0ae811d8a..faf19f26a 100644 --- a/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py +++ b/vulnerabilities/pipelines/v2_importers/alpine_linux_importer.py @@ -7,14 +7,15 @@ # See https://aboutcode.org for more information about nexB OSS projects. # +import json import logging +from pathlib import Path from typing import Any from typing import Iterable from typing import List from typing import Mapping -from urllib.parse import urljoin -from bs4 import BeautifulSoup +from fetchcode.vcs import fetch_via_vcs from packageurl import PackageURL from univers.version_range import AlpineLinuxVersionRange from univers.versions import InvalidVersion @@ -26,7 +27,7 @@ from vulnerabilities.references import WireSharkReferenceV2 from vulnerabilities.references import XsaReferenceV2 from vulnerabilities.references import ZbxReferenceV2 -from vulnerabilities.utils import fetch_response +from vulnerabilities.utils import get_advisory_url class AlpineLinuxImporterPipeline(VulnerableCodeBaseImporterPipelineV2): @@ -35,90 +36,59 @@ class AlpineLinuxImporterPipeline(VulnerableCodeBaseImporterPipelineV2): pipeline_id = "alpine_linux_importer_v2" spdx_license_expression = "CC-BY-SA-4.0" license_url = "https://secdb.alpinelinux.org/license.txt" - url = "https://secdb.alpinelinux.org/" + repo_url = "git+https://github.com/aboutcode-org/aboutcode-mirror-alpine-secdb/" @classmethod def steps(cls): - return (cls.collect_and_store_advisories,) + return ( + cls.clone, + cls.collect_and_store_advisories, + ) def advisories_count(self) -> int: - return 0 + base_path = Path(self.vcs_response.dest_dir) / "secdb" + count = 0 - def collect_advisories(self) -> Iterable[AdvisoryData]: - page_response_content = fetch_response(self.url).content - advisory_directory_links = fetch_advisory_directory_links( - page_response_content, self.url, self.log - ) - advisory_links = set() - visited_directories = set() - for advisory_directory_link in advisory_directory_links: - if advisory_directory_link in visited_directories: - continue + for json_file in base_path.rglob("*.json"): + data = json.loads(json_file.read_text(encoding="utf-8")) + for pkg in data.get("packages", []): + count += len(pkg.get("advisories", [])) + + return count - advisory_directory_page = fetch_response(advisory_directory_link).content - advisory_links.update( - fetch_advisory_links(advisory_directory_page, advisory_directory_link, self.log) + def clone(self): + self.log(f"Cloning `{self.repo_url}`") + self.vcs_response = fetch_via_vcs(self.repo_url) + + def collect_advisories(self) -> Iterable[AdvisoryData]: + base_path = Path(self.vcs_response.dest_dir) / "secdb" + for file_path in base_path.glob("**/*.json"): + advisory_url = get_advisory_url( + file=file_path, + base_path=base_path, + url="https://github.com/aboutcode-org/aboutcode-mirror-alpine-secdb/blob/main/", ) - for link in advisory_links: - record = fetch_response(link).json() - if not record["packages"]: + with open(file_path) as f: + record = json.load(f) + + if not record or not record["packages"]: self.log( - f'"packages" not found in {link!r}', + f'"packages" not found in {advisory_url!r}', level=logging.DEBUG, ) continue - yield from process_record(record=record, url=link, logger=self.log) - - -def fetch_advisory_directory_links( - page_response_content: str, - base_url: str, - logger: callable = None, -) -> List[str]: - """ - Return a list of advisory directory links present in `page_response_content` html string - """ - index_page = BeautifulSoup(page_response_content, features="lxml") - alpine_versions = [ - link.text - for link in index_page.find_all("a") - if link.text.startswith("v") or link.text.startswith("edge") - ] - - if not alpine_versions: - if logger: - logger( - f"No versions found in {base_url!r}", - level=logging.DEBUG, - ) - return [] + yield from process_record(record=record, url=advisory_url, logger=self.log) - advisory_directory_links = [urljoin(base_url, version) for version in alpine_versions] + def clean_downloads(self): + """Cleanup any temporary repository data.""" + if self.vcs_response: + self.log(f"Removing cloned repository") + self.vcs_response.delete() - return advisory_directory_links - - -def fetch_advisory_links( - advisory_directory_page: str, - advisory_directory_link: str, - logger: callable = None, -) -> Iterable[str]: - """ - Yield json file urls present in `advisory_directory_page` - """ - advisory_directory_page = BeautifulSoup(advisory_directory_page, features="lxml") - anchor_tags = advisory_directory_page.find_all("a") - if not anchor_tags: - if logger: - logger( - f"No anchor tags found in {advisory_directory_link!r}", - level=logging.DEBUG, - ) - return iter([]) - for anchor_tag in anchor_tags: - if anchor_tag.text.endswith("json"): - yield urljoin(advisory_directory_link, anchor_tag.text) + def on_failure(self): + """Ensure cleanup is always performed on failure.""" + self.clean_downloads() def check_for_attributes(record, logger) -> bool: @@ -196,30 +166,14 @@ def load_advisories( level=logging.DEBUG, ) continue + # fixed_vulns is a list of strings and each string is a space-separated # list of aliases and CVES - aliases = set() for vuln_ids in fixed_vulns: - if not isinstance(vuln_ids, str): - if logger: - logger( - f"{vuln_ids!r} is not of `str` instance", - level=logging.DEBUG, - ) - continue - vuln_ids = vuln_ids.strip().split() - if not vuln_ids: - if logger: - logger( - f"{vuln_ids!r} is empty", - level=logging.DEBUG, - ) - continue - aliases.update(vuln_ids) + aliases = vuln_ids.strip().split(" ") + vuln_id = aliases[0] - for vuln_id in aliases: references = [] - if vuln_id.startswith("XSA"): references.append(XsaReferenceV2.from_id(xsa_id=vuln_id)) @@ -295,7 +249,7 @@ def load_advisories( advisory_id = f"{pkg_infos['name']}/{qualifiers['distroversion']}/{version}/{vuln_id}" yield AdvisoryData( advisory_id=advisory_id, - aliases=[vuln_id], + aliases=aliases, references_v2=references, affected_packages=affected_packages, url=url, diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py index 74db189fb..d2b25271b 100644 --- a/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py +++ b/vulnerabilities/tests/pipelines/v2_importers/test_alpine_linux_importer_pipeline.py @@ -19,10 +19,6 @@ from vulnerabilities.importer import AdvisoryData from vulnerabilities.importer import AffectedPackageV2 from vulnerabilities.importer import ReferenceV2 -from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import ( - fetch_advisory_directory_links, -) -from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import fetch_advisory_links from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import load_advisories from vulnerabilities.pipelines.v2_importers.alpine_linux_importer import process_record from vulnerabilities.tests.pipelines import TestLogger @@ -562,7 +558,7 @@ def test_process_record(): ), AdvisoryData( advisory_id="xen/v3.11/4.10.0-r2/CVE-2018-7540", - aliases=["CVE-2018-7540"], + aliases=["CVE-2018-7540", "XSA-252"], summary="", affected_packages=[ AffectedPackageV2( @@ -729,175 +725,6 @@ def test_process_record(): url="https://secdb.alpinelinux.org/v3.11/", original_advisory_text=None, ), - AdvisoryData( - advisory_id="xen/v3.11/4.10.0-r2/XSA-252", - aliases=["XSA-252"], - summary="", - affected_packages=[ - AffectedPackageV2( - package=PackageURL( - type="apk", - namespace="alpine", - name="xen", - version=None, - qualifiers={ - "arch": "aarch64", - "distroversion": "v3.11", - "reponame": "main", - }, - subpath=None, - ), - affected_version_range=None, - fixed_version_range=AlpineLinuxVersionRange( - constraints=( - VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") - ), - ) - ), - introduced_by_commit_patches=[], - fixed_by_commit_patches=[], - ), - AffectedPackageV2( - package=PackageURL( - type="apk", - namespace="alpine", - name="xen", - version=None, - qualifiers={"arch": "armhf", "distroversion": "v3.11", "reponame": "main"}, - subpath=None, - ), - affected_version_range=None, - fixed_version_range=AlpineLinuxVersionRange( - constraints=( - VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") - ), - ) - ), - introduced_by_commit_patches=[], - fixed_by_commit_patches=[], - ), - AffectedPackageV2( - package=PackageURL( - type="apk", - namespace="alpine", - name="xen", - version=None, - qualifiers={"arch": "armv7", "distroversion": "v3.11", "reponame": "main"}, - subpath=None, - ), - affected_version_range=None, - fixed_version_range=AlpineLinuxVersionRange( - constraints=( - VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") - ), - ) - ), - introduced_by_commit_patches=[], - fixed_by_commit_patches=[], - ), - AffectedPackageV2( - package=PackageURL( - type="apk", - namespace="alpine", - name="xen", - version=None, - qualifiers={ - "arch": "ppc64le", - "distroversion": "v3.11", - "reponame": "main", - }, - subpath=None, - ), - affected_version_range=None, - fixed_version_range=AlpineLinuxVersionRange( - constraints=( - VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") - ), - ) - ), - introduced_by_commit_patches=[], - fixed_by_commit_patches=[], - ), - AffectedPackageV2( - package=PackageURL( - type="apk", - namespace="alpine", - name="xen", - version=None, - qualifiers={"arch": "s390x", "distroversion": "v3.11", "reponame": "main"}, - subpath=None, - ), - affected_version_range=None, - fixed_version_range=AlpineLinuxVersionRange( - constraints=( - VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") - ), - ) - ), - introduced_by_commit_patches=[], - fixed_by_commit_patches=[], - ), - AffectedPackageV2( - package=PackageURL( - type="apk", - namespace="alpine", - name="xen", - version=None, - qualifiers={"arch": "x86", "distroversion": "v3.11", "reponame": "main"}, - subpath=None, - ), - affected_version_range=None, - fixed_version_range=AlpineLinuxVersionRange( - constraints=( - VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") - ), - ) - ), - introduced_by_commit_patches=[], - fixed_by_commit_patches=[], - ), - AffectedPackageV2( - package=PackageURL( - type="apk", - namespace="alpine", - name="xen", - version=None, - qualifiers={"arch": "x86_64", "distroversion": "v3.11", "reponame": "main"}, - subpath=None, - ), - affected_version_range=None, - fixed_version_range=AlpineLinuxVersionRange( - constraints=( - VersionConstraint( - comparator="=", version=AlpineLinuxVersion(string="4.10.0-r2") - ), - ) - ), - introduced_by_commit_patches=[], - fixed_by_commit_patches=[], - ), - ], - references=[], - references_v2=[ - ReferenceV2( - reference_id="XSA-252", - reference_type="", - url="https://xenbits.xen.org/xsa/advisory-252.html", - ) - ], - patches=[], - date_published=None, - weaknesses=[], - severities=[], - url="https://secdb.alpinelinux.org/v3.11/", - original_advisory_text=None, - ), ] with open(TEST_DATA / "v3.11/main.json") as f: @@ -915,60 +742,6 @@ def test_process_record(): ) -def test_fetch_advisory_directory_links(): - expected = [ - "https://secdb.alpinelinux.org/edge/", - "https://secdb.alpinelinux.org/v3.10/", - "https://secdb.alpinelinux.org/v3.11/", - "https://secdb.alpinelinux.org/v3.12/", - "https://secdb.alpinelinux.org/v3.13/", - "https://secdb.alpinelinux.org/v3.14/", - "https://secdb.alpinelinux.org/v3.15/", - "https://secdb.alpinelinux.org/v3.2/", - "https://secdb.alpinelinux.org/v3.3/", - "https://secdb.alpinelinux.org/v3.4/", - "https://secdb.alpinelinux.org/v3.5/", - "https://secdb.alpinelinux.org/v3.6/", - "https://secdb.alpinelinux.org/v3.7/", - "https://secdb.alpinelinux.org/v3.8/", - "https://secdb.alpinelinux.org/v3.9/", - ] - with open(TEST_DATA / "web_pages/directory.html") as f: - assert ( - fetch_advisory_directory_links(f.read(), "https://secdb.alpinelinux.org/") == expected - ) - - -def test_fetch_advisory_directory_links_failure(): - logger = TestLogger() - with open(TEST_DATA / "web_pages/fail_directory.html") as f: - assert ( - fetch_advisory_directory_links( - f.read(), "https://secdb.alpinelinux.org/", logger=logger.write - ) - == [] - ) - assert "No versions found in 'https://secdb.alpinelinux.org/'" in logger.getvalue() - - -def test_fetch_advisory_links(): - expected = [ - "https://secdb.alpinelinux.org/v3.11/community.json", - "https://secdb.alpinelinux.org/v3.11/main.json", - ] - with open(TEST_DATA / "web_pages/v3.11.html") as f: - assert ( - list(fetch_advisory_links(f.read(), "https://secdb.alpinelinux.org/v3.11/")) == expected - ) - - -def test_fetch_advisory_links_failure(): - logger = TestLogger() - with open(TEST_DATA / "web_pages/fail_directory.html") as f: - assert list(fetch_advisory_links(f.read(), "v3.11", logger=logger.write)) == [] - assert "No anchor tags found in 'v3.11'" in logger.getvalue() - - def test_process_record_without_packages(): logger = TestLogger() with open(TEST_DATA / TEST_DATA / "v3.3/community.json") as f: