diff --git a/api_v3_usage.rst b/api_v3_usage.rst new file mode 100644 index 000000000..26ed9377f --- /dev/null +++ b/api_v3_usage.rst @@ -0,0 +1,246 @@ +Package Endpoint +================ + +We are migrating from **API v1** to **API v3**. + +Previously, the ``/api/packages`` endpoint exposed multiple routes: + +- ``bulk_search`` +- ``bulk_lookup`` +- ``lookup`` +- ``all`` + +In **API v3**, all these capabilities are consolidated into a **single endpoint**: + +:: + + POST /api/v3/packages + + +Pagination +---------- + +Responses from the package endpoint are **always paginated**, with **10 results per page**. + +Each response includes: + +- ``count`` — total number of results +- ``next`` — URL for the next page +- ``previous`` — URL for the previous page + +If a package is associated with **more than 100 advisories**, the response will include: + +- ``affected_by_vulnerabilities_url`` instead of ``affected_by_vulnerabilities`` +- ``fixing_vulnerabilities_url`` instead of ``fixing_vulnerabilities`` + + +Getting All Vulnerable Packages +------------------------------- + +Instead of calling ``/api/packages/all``, call the v3 endpoint with an empty ``purls`` list. + +:: + + POST /api/v3/packages + + { + "purls": [] + } + +Example response: + +:: + + { + "count": 596, + "next": "http://example.com/api/v3/packages?page=2", + "previous": null, + "results": [ + "pkg:npm/626@1.1.1", + "pkg:npm/aedes@0.35.0", + "pkg:npm/airbrake@0.3.8", + "pkg:npm/angular-http-server@1.4.3", + "pkg:npm/apex-publish-static-files@2.0.0", + "pkg:npm/atob@2.0.3", + "pkg:npm/augustine@0.2.3", + "pkg:npm/backbone@0.3.3", + "pkg:npm/base64-url@1.3.3", + "pkg:npm/base64url@2.0.0" + ] + } + + +Bulk Search (Replacement) +------------------------- + +Instead of calling ``/api/packages/bulk_search``, use: + +:: + + POST /api/v3/packages + +Parameters: + +- ``purls`` — list of package URLs to query +- ``details`` — boolean (default: ``false``) +- ``approximate`` — boolean (default: ``false``) + +The ``approximate`` flag replaces the previous ``plain_purl`` parameter. +When set to ``true``, qualifiers and subpaths in PURLs are ignored. + + +Get Only Vulnerable PURLs +~~~~~~~~~~~~~~~~~~~~~~~~~ + +:: + + POST /api/v3/packages + + { + "purls": ["pkg:npm/atob@2.0.3", "pkg:pypi/sample@2.0.0"], + "details": false + } + +Example response: + +:: + + { + "count": 1, + "next": null, + "previous": null, + "results": [ + "pkg:npm/atob@2.0.3" + ] + } + + +Get Detailed Vulnerability Information +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:: + + POST /api/v3/packages + + { + "purls": ["pkg:npm/atob@2.0.3", "pkg:pypi/sample@2.0.0"], + "details": true + } + +Example response: + +:: + + { + "count": 1, + "next": null, + "previous": null, + "results": [ + { + "purl": "pkg:npm/atob@2.0.3", + "affected_by_vulnerabilities": [ + { + "advisory_id": "nodejs_security_wg/npm-403", + "fixed_by_packages": [ + "pkg:npm/atob@2.1.0" + ], + "duplicate_advisory_ids": [] + } + ], + "fixing_vulnerabilities": [], + "next_non_vulnerable_version": "2.1.0", + "latest_non_vulnerable_version": "2.1.0", + "risk_score": null + } + ] + } + + +Using Approximate Matching +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +:: + + POST /api/v3/packages + + { + "purls": ["pkg:npm/atob@2.0.3?foo=bar"], + "approximate": true, + "details": true + } + +Example response: + +:: + + { + "count": 1, + "next": null, + "previous": null, + "results": [ + { + "purl": "pkg:npm/atob@2.0.3", + "affected_by_vulnerabilities": [ + { + "advisory_id": "nodejs_security_wg/npm-403", + "fixed_by_packages": [ + "pkg:npm/atob@2.1.0" + ], + "duplicate_advisory_ids": [] + } + ], + "fixing_vulnerabilities": [], + "next_non_vulnerable_version": "2.1.0", + "latest_non_vulnerable_version": "2.1.0", + "risk_score": null + } + ] + } + + +Advisory Endpoint +================= + +Retrieve advisories for one or more PURLs: + +:: + + POST /api/v3/advisories + + { + "purls": ["pkg:npm/atob@2.0.3", "pkg:pypi/sample@2.0.0"] + } + +Responses are paginated (10 results per page) and include ``next`` and ``previous`` links. + + +Affected-By Advisories Endpoint +=============================== + +Retrieve advisories that **affect (impact)** a given PURL: + +:: + + GET /api/v3/affected-by-advisories?purl= + +Example: + +:: + + GET /api/v3/affected-by-advisories?purl=pkg:npm/atob@2.0.3 + + +Fixing Advisories Endpoint +========================== + +Retrieve advisories that are **fixed by** a given PURL: + +:: + + GET /api/v3/fixing-advisories?purl= + +Example: + +:: + + GET /api/v3/fixing-advisories?purl=pkg:npm/atob@2.1.0 \ No newline at end of file diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 74975b819..6e0ab9213 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -9,6 +9,7 @@ from django.db.models import Prefetch +from django.db.models import Q from django_filters import rest_framework as filters from drf_spectacular.utils import OpenApiParameter from drf_spectacular.utils import extend_schema @@ -25,15 +26,9 @@ from rest_framework.reverse import reverse from rest_framework.throttling import AnonRateThrottle -from vulnerabilities.models import AdvisoryReference -from vulnerabilities.models import AdvisorySeverity -from vulnerabilities.models import AdvisoryV2 -from vulnerabilities.models import AdvisoryWeakness from vulnerabilities.models import CodeFix from vulnerabilities.models import CodeFixV2 -from vulnerabilities.models import ImpactedPackage from vulnerabilities.models import Package -from vulnerabilities.models import PackageV2 from vulnerabilities.models import PipelineRun from vulnerabilities.models import PipelineSchedule from vulnerabilities.models import Vulnerability @@ -41,7 +36,6 @@ from vulnerabilities.models import VulnerabilitySeverity from vulnerabilities.models import Weakness from vulnerabilities.throttling import PermissionBasedUserRateThrottle -from vulnerabilities.utils import group_advisories_by_content class CharInFilter(filters.BaseInFilter, filters.CharFilter): @@ -58,16 +52,6 @@ class Meta: fields = ["cwe_id", "name", "description"] -class AdvisoryWeaknessSerializer(serializers.ModelSerializer): - cwe_id = serializers.CharField() - name = serializers.CharField() - description = serializers.CharField() - - class Meta: - model = AdvisoryWeakness - fields = ["cwe_id", "name", "description"] - - class VulnerabilityReferenceV2Serializer(serializers.ModelSerializer): url = serializers.CharField() reference_type = serializers.CharField() @@ -78,29 +62,6 @@ class Meta: fields = ["url", "reference_type", "reference_id"] -class AdvisoryReferenceSerializer(serializers.ModelSerializer): - url = serializers.CharField() - reference_type = serializers.CharField() - reference_id = serializers.CharField() - - class Meta: - model = AdvisoryReference - fields = ["url", "reference_type", "reference_id"] - - -class AdvisorySeveritySerializer(serializers.ModelSerializer): - class Meta: - model = AdvisorySeverity - fields = ["url", "value", "scoring_system", "scoring_elements", "published_at"] - - def to_representation(self, instance): - data = super().to_representation(instance) - published_at = data.get("published_at", None) - if not published_at: - data.pop("published_at") - return data - - class VulnerabilitySeverityV2Serializer(serializers.ModelSerializer): class Meta: model = VulnerabilitySeverity @@ -141,58 +102,6 @@ def get_aliases(self, obj): return [alias.alias for alias in obj.aliases.all()] -class AdvisoryV2Serializer(serializers.ModelSerializer): - aliases = serializers.SerializerMethodField() - weaknesses = AdvisoryWeaknessSerializer(many=True) - references = AdvisoryReferenceSerializer(many=True) - severities = AdvisorySeveritySerializer(many=True) - advisory_id = serializers.CharField(source="avid", read_only=True) - related_ssvc_trees = serializers.SerializerMethodField() - - def get_related_ssvc_trees(self, obj): - related_ssvcs = obj.related_ssvcs.all().select_related("source_advisory") - source_ssvcs = obj.source_ssvcs.all().select_related("source_advisory") - - seen = set() - result = [] - - for ssvc in list(related_ssvcs) + list(source_ssvcs): - key = (ssvc.vector, ssvc.source_advisory_id) - if key in seen: - continue - seen.add(key) - - result.append( - { - "vector": ssvc.vector, - "decision": ssvc.decision, - "options": ssvc.options, - "source_url": ssvc.source_advisory.url, - } - ) - - return result - - class Meta: - model = AdvisoryV2 - fields = [ - "advisory_id", - "url", - "aliases", - "summary", - "severities", - "weaknesses", - "references", - "exploitability", - "weighted_severity", - "risk_score", - "related_ssvc_trees", - ] - - def get_aliases(self, obj): - return [alias.alias for alias in obj.aliases.all()] - - class VulnerabilityListSerializer(serializers.ModelSerializer): url = serializers.SerializerMethodField() @@ -333,107 +242,6 @@ def get_fixing_vulnerabilities(self, obj): return [vuln.vulnerability_id for vuln in obj.fixing_vulnerabilities.all()] -class PackageV3Serializer(serializers.ModelSerializer): - purl = serializers.CharField(source="package_url") - risk_score = serializers.FloatField(read_only=True) - affected_by_vulnerabilities = serializers.SerializerMethodField() - fixing_vulnerabilities = serializers.SerializerMethodField() - next_non_vulnerable_version = serializers.SerializerMethodField() - latest_non_vulnerable_version = serializers.SerializerMethodField() - - class Meta: - model = Package - fields = [ - "purl", - "affected_by_vulnerabilities", - "fixing_vulnerabilities", - "next_non_vulnerable_version", - "latest_non_vulnerable_version", - "risk_score", - ] - - def get_affected_by_vulnerabilities(self, package): - """Return a dictionary with advisory as keys and their details, including fixed_by_packages.""" - impacts = package.affected_in_impacts.select_related("advisory").prefetch_related( - "fixed_by_packages" - ) - - avids = {impact.advisory.avid for impact in impacts if impact.advisory_id} - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_by_avid = {adv.avid: adv for adv in latest_advisories} - impact_by_avid = {} - - advisories = [] - for impact in impacts: - avid = impact.advisory.avid - advisory = advisory_by_avid.get(avid) - if not advisory: - continue - advisories.append(advisory) - impact_by_avid[avid] = impact - - grouped_advisories = group_advisories_by_content(advisories=advisories) - - advs = [] - - for hash in grouped_advisories: - advs.append(grouped_advisories[hash]) - - result = [] - - for advisory in advs: - primary_advisory = advisory["primary"] - avid = primary_advisory.avid - impact = impact_by_avid.get(avid) - if not impact: - continue - result.append( - { - "advisory_id": primary_advisory.avid, - "fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()], - "duplicate_advisory_ids": [adv.avid for adv in advisory["secondary"]], - } - ) - - return result - - def get_fixing_vulnerabilities(self, package): - impacts = package.fixed_in_impacts.select_related("advisory") - - avids = {impact.advisory.avid for impact in impacts if impact.advisory_id} - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - - grouped_advisories = group_advisories_by_content(advisories=latest_advisories) - - advs = [] - - for hash in grouped_advisories: - advs.append(grouped_advisories[hash]) - - result = [] - - for advisory in advs: - primary_advisory = advisory["primary"] - result.append( - { - "advisory_id": primary_advisory.avid, - "duplicate_advisory_ids": [adv.avid for adv in advisory["secondary"]], - } - ) - - return result - - def get_next_non_vulnerable_version(self, package): - if next_non_vulnerable := package.get_non_vulnerable_versions()[0]: - return next_non_vulnerable.version - - def get_latest_non_vulnerable_version(self, package): - if latest_non_vulnerable := package.get_non_vulnerable_versions()[-1]: - return latest_non_vulnerable.version - - class PackageurlListSerializer(serializers.Serializer): purls = serializers.ListField( child=serializers.CharField(), @@ -462,27 +270,6 @@ class PackageV2FilterSet(filters.FilterSet): purl = filters.CharFilter(field_name="package_url") -class AdvisoryPackageV2FilterSet(filters.FilterSet): - affected_by_advisory = filters.CharFilter( - field_name="affected_in_impacts__advisory__avid", - label="Affected By Advisory ID", - help_text="Filter packages affected by a specific Advisory ID.", - ) - - fixing_advisory = filters.CharFilter( - field_name="fixed_in_impacts__advisory__avid", - label="Fixed By Advisory ID", - help_text="Filter packages fixed by a specific Advisory ID.", - ) - - purls = CharInFilter( - field_name="package_url", - lookup_expr="in", - label="Package URL", - help_text="Filter by one or more Package URLs. Multi-value supported (comma-separated).", - ) - - class PackageV2ViewSet(viewsets.ReadOnlyModelViewSet): queryset = Package.objects.all().prefetch_related( Prefetch( @@ -1062,339 +849,3 @@ def get_view_name(self): if self.detail: return "Pipeline Instance" return "Pipeline Jobs" - - -class PackageV3ViewSet(viewsets.ReadOnlyModelViewSet): - queryset = PackageV2.objects.all() - serializer_class = PackageV3Serializer - filter_backends = [filters.DjangoFilterBackend] - filterset_class = AdvisoryPackageV2FilterSet - throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] - - def get_queryset(self): - return ( - super() - .get_queryset() - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages", - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - - def list(self, request, *args, **kwargs): - queryset = self.filter_queryset(self.get_queryset()) - page = self.paginate_queryset(queryset) - - packages = page if page is not None else queryset - - avids = set() - - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - - advisory_data = {adv.avid: AdvisoryV2Serializer(adv).data for adv in latest_advisories} - - serializer = self.get_serializer(packages, many=True) - - if page is not None: - return self.get_paginated_response( - { - "packages": serializer.data, - "advisories_by_id": advisory_data, - } - ) - - return Response( - { - "packages": serializer.data, - "advisories_by_id": advisory_data, - } - ) - - @extend_schema( - request=PackageurlListSerializer, - responses={200: PackageV2Serializer(many=True)}, - ) - @action( - detail=False, - methods=["post"], - serializer_class=PackageurlListSerializer, - filter_backends=[], - pagination_class=None, - ) - def bulk_lookup(self, request): - """ - Return the response for exact PackageURLs requested for. - """ - serializer = self.serializer_class(data=request.data) - if not serializer.is_valid(): - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "error": serializer.errors, - "message": "A non-empty 'purls' list of PURLs is required.", - }, - ) - - purls = serializer.validated_data.get("purls") - - packages = ( - PackageV2.objects.for_purls(purls) - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages" - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - - avids = set() - - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - - advisory_data = { - adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data - for adv in latest_advisories - } - - package_data = PackageV3Serializer( - packages, - many=True, - context={"request": request}, - ).data - - return Response( - { - "packages": package_data, - "advisories_by_id": advisory_data, - } - ) - - @extend_schema( - request=PackageBulkSearchRequestSerializer, - responses={200: PackageV2Serializer(many=True)}, - ) - @action( - detail=False, - methods=["post"], - serializer_class=PackageBulkSearchRequestSerializer, - filter_backends=[], - pagination_class=None, - ) - def bulk_search(self, request): - """ - Lookup for vulnerable packages using many Package URLs at once. - """ - serializer = self.serializer_class(data=request.data) - if not serializer.is_valid(): - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "error": serializer.errors, - "message": "A non-empty 'purls' list of PURLs is required.", - }, - ) - - validated_data = serializer.validated_data - purls = validated_data.get("purls") - purl_only = validated_data.get("purl_only", False) - plain_purl = validated_data.get("plain_purl", False) - - if plain_purl: - purl_objects = [PackageURL.from_string(purl) for purl in purls] - plain_purl_objects = [ - PackageURL( - type=purl.type, - namespace=purl.namespace, - name=purl.name, - version=purl.version, - ) - for purl in purl_objects - ] - plain_purls = [str(purl) for purl in plain_purl_objects] - - query = ( - PackageV2.objects.filter(plain_package_url__in=plain_purls) - .order_by("plain_package_url") - .distinct("plain_package_url") - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related( - "advisory" - ).prefetch_related( - "fixed_by_packages", - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - - packages = query - - avids = set() - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_data = { - adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data - for adv in latest_advisories - } - - if not purl_only: - package_data = PackageV3Serializer( - packages, - many=True, - context={"request": request}, - ).data - return Response( - { - "packages": package_data, - "advisories_by_id": advisory_data, - } - ) - - # Using order by and distinct because there will be - # many fully qualified purl for a single plain purl - vulnerable_purls = query.vulnerable().only("plain_package_url") - vulnerable_purls = [str(package.plain_package_url) for package in vulnerable_purls] - return Response(data=vulnerable_purls) - - query = ( - PackageV2.objects.filter(package_url__in=purls) - .order_by("plain_package_url") - .distinct("plain_package_url") - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages", - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - packages = query - - avids = set() - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_data = { - adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data - for adv in latest_advisories - } - - if not purl_only: - package_data = PackageV3Serializer( - packages, - many=True, - context={"request": request}, - ).data - return Response( - { - "packages": package_data, - "advisories_by_id": advisory_data, - } - ) - - vulnerable_purls = query.vulnerable().only("package_url") - vulnerable_purls = [str(package.package_url) for package in vulnerable_purls] - return Response(data=vulnerable_purls) - - @action(detail=False, methods=["get"]) - def all(self, request): - """ - Return a list of Package URLs of vulnerable packages. - """ - vulnerable_purls = ( - PackageV2.objects.vulnerable() - .only("package_url") - .order_by("package_url") - .distinct() - .values_list("package_url", flat=True) - ) - return Response(vulnerable_purls) - - @extend_schema( - request=LookupRequestSerializer, - responses={200: PackageV2Serializer(many=True)}, - ) - @action( - detail=False, - methods=["post"], - serializer_class=LookupRequestSerializer, - filter_backends=[], - pagination_class=None, - ) - def lookup(self, request): - """ - Return the response for exact PackageURL requested for. - """ - serializer = self.serializer_class(data=request.data) - if not serializer.is_valid(): - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "error": serializer.errors, - "message": "A 'purl' is required.", - }, - ) - validated_data = serializer.validated_data - purl = validated_data.get("purl") - - qs = self.get_queryset().for_purls([purl]).with_is_vulnerable() - return Response(PackageV3Serializer(qs, many=True, context={"request": request}).data) diff --git a/vulnerabilities/api_v3.py b/vulnerabilities/api_v3.py new file mode 100644 index 000000000..2549aacc9 --- /dev/null +++ b/vulnerabilities/api_v3.py @@ -0,0 +1,429 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from urllib.parse import urlencode + +from django_filters import rest_framework as filters +from packageurl import PackageURL +from rest_framework import serializers +from rest_framework import viewsets +from rest_framework.reverse import reverse +from rest_framework.throttling import AnonRateThrottle + +from vulnerabilities.models import AdvisoryReference +from vulnerabilities.models import AdvisorySeverity +from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.models import AdvisoryWeakness +from vulnerabilities.models import PackageV2 +from vulnerabilities.throttling import PermissionBasedUserRateThrottle +from vulnerabilities.utils import group_advisories_by_content + + +class PackageQuerySerializer(serializers.Serializer): + purls = serializers.ListField( + child=serializers.CharField(), + required=False, + default=list, + ) + details = serializers.BooleanField(default=False) + approximate = serializers.BooleanField(default=False) + + def validate(self, data): + if not data["purls"]: + if data["details"] or data["approximate"]: + raise serializers.ValidationError( + "details and approximate must be false when purls is empty" + ) + return data + + +class AdvisoryQuerySerializer(serializers.Serializer): + purls = serializers.ListField( + child=serializers.CharField(), + required=False, + default=list, + ) + + def validate(self, data): + if not data["purls"]: + raise serializers.ValidationError("purls is required") + return data + + +class AdvisoryReferenceSerializer(serializers.ModelSerializer): + url = serializers.CharField() + reference_type = serializers.CharField() + reference_id = serializers.CharField() + + class Meta: + model = AdvisoryReference + fields = ["url", "reference_type", "reference_id"] + + +class AdvisorySeveritySerializer(serializers.ModelSerializer): + class Meta: + model = AdvisorySeverity + fields = ["url", "value", "scoring_system", "scoring_elements", "published_at"] + + def to_representation(self, instance): + data = super().to_representation(instance) + published_at = data.get("published_at", None) + if not published_at: + data.pop("published_at") + return data + + +class AdvisoryWeaknessSerializer(serializers.ModelSerializer): + cwe_id = serializers.CharField() + name = serializers.CharField() + description = serializers.CharField() + + class Meta: + model = AdvisoryWeakness + fields = ["cwe_id", "name", "description"] + + +class AdvisoryV3Serializer(serializers.ModelSerializer): + aliases = serializers.SerializerMethodField() + weaknesses = AdvisoryWeaknessSerializer(many=True) + references = AdvisoryReferenceSerializer(many=True) + severities = AdvisorySeveritySerializer(many=True) + advisory_id = serializers.CharField(source="avid", read_only=True) + related_ssvc_trees = serializers.SerializerMethodField() + + def get_related_ssvc_trees(self, obj): + related_ssvcs = obj.related_ssvcs.all().select_related("source_advisory") + source_ssvcs = obj.source_ssvcs.all().select_related("source_advisory") + + seen = set() + result = [] + + for ssvc in list(related_ssvcs) + list(source_ssvcs): + key = (ssvc.vector, ssvc.source_advisory_id) + if key in seen: + continue + seen.add(key) + + result.append( + { + "vector": ssvc.vector, + "decision": ssvc.decision, + "options": ssvc.options, + "source_url": ssvc.source_advisory.url, + } + ) + + return result + + class Meta: + model = AdvisoryV2 + fields = [ + "advisory_id", + "url", + "aliases", + "summary", + "severities", + "weaknesses", + "references", + "exploitability", + "weighted_severity", + "risk_score", + "related_ssvc_trees", + ] + + def get_aliases(self, obj): + return [alias.alias for alias in obj.aliases.all()] + + +class PackageV3Serializer(serializers.ModelSerializer): + purl = serializers.CharField(source="package_url") + risk_score = serializers.FloatField(read_only=True) + affected_by_vulnerabilities = serializers.SerializerMethodField() + affected_by_vulnerabilities_url = serializers.SerializerMethodField() + fixing_vulnerabilities = serializers.SerializerMethodField() + fixing_vulnerabilities_url = serializers.SerializerMethodField() + next_non_vulnerable_version = serializers.SerializerMethodField() + latest_non_vulnerable_version = serializers.SerializerMethodField() + + class Meta: + model = PackageV2 + fields = [ + "purl", + "affected_by_vulnerabilities", + "affected_by_vulnerabilities_url", + "fixing_vulnerabilities", + "fixing_vulnerabilities_url", + "next_non_vulnerable_version", + "latest_non_vulnerable_version", + "risk_score", + ] + + def to_representation(self, instance): + data = super().to_representation(instance) + + if data.get("affected_by_vulnerabilities") is None: + data.pop("affected_by_vulnerabilities", None) + else: + data.pop("affected_by_vulnerabilities_url", None) + + if data.get("fixing_vulnerabilities") is None: + data.pop("fixing_vulnerabilities", None) + else: + data.pop("fixing_vulnerabilities_url", None) + + return data + + def get_affected_by_vulnerabilities_url(self, obj): + request = self.context.get("request") + if not request: + return None + + base = reverse("affected-by-advisories-list") + url = request.build_absolute_uri(base) + + return f"{url}?{urlencode({'purl': obj.package_url})}" + + def get_fixing_vulnerabilities_url(self, obj): + request = self.context.get("request") + if not request: + return None + + base = reverse("fixing-advisories-list") + url = request.build_absolute_uri(base) + + return f"{url}?{urlencode({'purl': obj.package_url})}" + + def get_affected_by_vulnerabilities(self, package): + """Return a dictionary with advisory as keys and their details, including fixed_by_packages.""" + advisories_qs = AdvisoryV2.objects.latest_affecting_advisories_for_purl(package.package_url) + + advisories = list(advisories_qs[:101]) + if len(advisories) > 100: + return None + + advisory_by_avid = {adv.avid: adv for adv in advisories} + avids = advisory_by_avid.keys() + + impacts = ( + package.affected_in_impacts.filter(advisory__avid__in=avids) + .select_related("advisory") + .prefetch_related("fixed_by_packages") + ) + + impact_by_avid = {impact.advisory.avid: impact for impact in impacts} + + grouped = group_advisories_by_content(advisories) + + result = [] + for entry in grouped.values(): + primary = entry["primary"] + impact = impact_by_avid.get(primary.avid) + if not impact: + continue + + result.append( + { + "advisory_id": primary.avid, + "fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()], + "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], + } + ) + + return result + + def get_fixing_vulnerabilities(self, package): + advisories_qs = AdvisoryV2.objects.latest_fixed_by_advisories_for_purl(package.package_url) + + advisories = list(advisories_qs[:101]) + if len(advisories) > 100: + return None + + advisory_by_avid = {adv.avid: adv for adv in advisories} + avids = advisory_by_avid.keys() + + impacts = ( + package.fixed_in_impacts.filter(advisory__avid__in=avids) + .select_related("advisory") + .prefetch_related("fixed_by_packages") + ) + + impact_by_avid = {impact.advisory.avid: impact for impact in impacts} + + grouped = group_advisories_by_content(advisories) + + result = [] + for entry in grouped.values(): + primary = entry["primary"] + impact = impact_by_avid.get(primary.avid) + if not impact: + continue + + result.append( + { + "advisory_id": primary.avid, + "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], + } + ) + + return result + + def get_next_non_vulnerable_version(self, package): + if next_non_vulnerable := package.get_non_vulnerable_versions()[0]: + return next_non_vulnerable.version + + def get_latest_non_vulnerable_version(self, package): + if latest_non_vulnerable := package.get_non_vulnerable_versions()[-1]: + return latest_non_vulnerable.version + + +class PackageV3ViewSet(viewsets.GenericViewSet): + queryset = PackageV2.objects.all() + serializer_class = PackageV3Serializer + filter_backends = [filters.DjangoFilterBackend] + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] + + def create(self, request, *args, **kwargs): + serializer = PackageQuerySerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + purls = serializer.validated_data["purls"] + details = serializer.validated_data["details"] + approximate = serializer.validated_data["approximate"] + + if not purls: + vulnerable_purls = ( + PackageV2.objects.vulnerable() + .only("package_url") + .distinct() + .values_list("package_url", flat=True) + .order_by("package_url") + ) + page = self.paginate_queryset(vulnerable_purls) + return self.get_paginated_response(page) + + plain_purls = None + + if approximate: + plain_purls = [ + str( + PackageURL( + type=p.type, + namespace=p.namespace, + name=p.name, + version=p.version, + ) + ) + for p in map(PackageURL.from_string, purls) + ] + + if not details: + if approximate: + query = ( + PackageV2.objects.filter(plain_package_url__in=plain_purls) + .values_list("plain_package_url", flat=True) + .distinct() + .order_by("plain_package_url") + ) + else: + query = ( + PackageV2.objects.filter(package_url__in=purls) + .distinct() + .order_by("package_url") + .values_list("package_url", flat=True) + ) + + page = self.paginate_queryset(query) + return self.get_paginated_response(page) + + if approximate: + query = ( + PackageV2.objects.filter(plain_package_url__in=plain_purls) + .order_by("plain_package_url") + .distinct("plain_package_url") + ) + else: + query = ( + PackageV2.objects.filter(package_url__in=purls) + .order_by("package_url") + .distinct("package_url") + ) + + page = self.paginate_queryset(query) + serializer = self.get_serializer(page, many=True, context={"request": request}) + return self.get_paginated_response(serializer.data) + + +class AffectedByAdvisoryV3Serializer(AdvisoryV3Serializer): + fixed_by_packages = serializers.SerializerMethodField() + + def get_fixed_by_packages(self, obj): + return list( + obj.impacted_packages.values_list("fixed_by_packages__package_url", flat=True) + .exclude(fixed_by_packages__package_url__isnull=True) + .distinct() + ) + + class Meta: + model = AdvisoryV2 + fields = [ + "advisory_id", + "url", + "aliases", + "summary", + "severities", + "weaknesses", + "references", + "exploitability", + "weighted_severity", + "risk_score", + "related_ssvc_trees", + "fixed_by_packages", + ] + + +class AdvisoryV3ViewSet(viewsets.GenericViewSet): + queryset = AdvisoryV2.objects.all() + serializer_class = AdvisoryV3Serializer + filter_backends = [filters.DjangoFilterBackend] + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] + + def create(self, request, *args, **kwargs): + serializer = PackageQuerySerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + purls = serializer.validated_data["purls"] + + latest_advisories = AdvisoryV2.objects.latest_advisories_for_purls(purls=purls) + + page = self.paginate_queryset(latest_advisories) + serializer = self.get_serializer(page, many=True, context={"request": request}) + return self.get_paginated_response(serializer.data) + + +class PackageAdvisoriesViewSet(viewsets.ReadOnlyModelViewSet): + serializer_class = AdvisoryV3Serializer + relation = None + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] + + def get_queryset(self): + purl = self.request.query_params.get("purl") + + if not purl: + return AdvisoryV2.objects.none() + + return AdvisoryV2.objects.filter(**{self.relation: purl}).latest_per_avid() + + +class FixingAdvisoriesViewSet(PackageAdvisoriesViewSet): + relation = "impacted_packages__fixed_by_packages__package_url" + + +class AffectedByAdvisoriesViewSet(PackageAdvisoriesViewSet): + relation = "impacted_packages__affecting_packages__package_url" + serializer_class = AffectedByAdvisoryV3Serializer diff --git a/vulnerabilities/migrations/0117_advisoryv2_risk_score.py b/vulnerabilities/migrations/0117_advisoryv2_risk_score.py new file mode 100644 index 000000000..47733da5e --- /dev/null +++ b/vulnerabilities/migrations/0117_advisoryv2_risk_score.py @@ -0,0 +1,24 @@ +# Generated by Django 5.2.11 on 2026-03-17 09:07 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0116_advisoryv2_advisory_content_hash"), + ] + + operations = [ + migrations.AddField( + model_name="advisoryv2", + name="risk_score", + field=models.DecimalField( + blank=True, + decimal_places=1, + help_text="Risk expressed as a number ranging from 0 to 10. Risk is calculated from weighted severity and exploitability values. It is the maximum value of (the weighted severity multiplied by its exploitability) or 10. Risk = min(weighted severity * exploitability, 10)", + max_digits=3, + null=True, + ), + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index d1c88f285..270074cda 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -2876,6 +2876,36 @@ def latest_per_avid(self): def latest_for_avids(self, avids): return self.filter(avid__in=avids).latest_per_avid() + def latest_affecting_advisories_for_purl(self, purl): + return self.filter( + impacted_packages__affecting_packages__package_url=purl + ).latest_per_avid() + + def latest_affecting_advisories_for_purls(self, purls): + return self.filter( + impacted_packages__affecting_packages__package_url__in=purls + ).latest_per_avid() + + def latest_fixed_by_advisories_for_purl(self, purl): + return self.filter(impacted_packages__fixed_by_packages__package_url=purl).latest_per_avid() + + def latest_fixed_by_advisories_for_purls(self, purls): + return self.filter( + impacted_packages__fixed_by_packages__package_url__in=purls + ).latest_per_avid() + + def latest_advisories_for_purl(self, purl): + return self.filter( + Q(impacted_packages__affecting_packages__package_url=purl) + | Q(impacted_packages__fixed_by_packages__package_url=purl) + ).latest_per_avid() + + def latest_advisories_for_purls(self, purls): + return self.filter( + Q(impacted_packages__affecting_packages__package_url__in=purls) + | Q(impacted_packages__fixed_by_packages__package_url__in=purls) + ).latest_per_avid() + class AdvisoryV2(models.Model): """ @@ -3017,17 +3047,13 @@ class AdvisoryV2(models.Model): help_text="A unique hash computed from the content of the advisory used to identify advisories with the same content.", ) - @property - def risk_score(self): - """ - Risk expressed as a number ranging from 0 to 10. - Risk is calculated from weighted severity and exploitability values. - It is the maximum value of (the weighted severity multiplied by its exploitability) or 10 - Risk = min(weighted severity * exploitability, 10) - """ - if self.exploitability and self.weighted_severity: - risk_score = min(float(self.exploitability * self.weighted_severity), 10.0) - return round(risk_score, 1) + risk_score = models.DecimalField( + null=True, + blank=True, + max_digits=3, + decimal_places=1, + help_text="Risk expressed as a number ranging from 0 to 10. Risk is calculated from weighted severity and exploitability values. It is the maximum value of (the weighted severity multiplied by its exploitability) or 10. Risk = min(weighted severity * exploitability, 10)", + ) objects = AdvisoryV2QuerySet.as_manager() diff --git a/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py index 2b82b667c..213904815 100644 --- a/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py +++ b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py @@ -36,7 +36,8 @@ def steps(cls): def collect_ssvc_data(self): vulnrichment_advisories = ( - AdvisoryV2.objects.filter( + AdvisoryV2.objects.latest_per_avid() + .filter( severities__scoring_system=SCORING_SYSTEMS["ssvc"], ) .distinct() diff --git a/vulnerabilities/pipelines/v2_improvers/compute_advisory_content_hash.py b/vulnerabilities/pipelines/v2_improvers/compute_advisory_content_hash.py index fe5a3c97e..935631419 100644 --- a/vulnerabilities/pipelines/v2_improvers/compute_advisory_content_hash.py +++ b/vulnerabilities/pipelines/v2_improvers/compute_advisory_content_hash.py @@ -27,7 +27,7 @@ def steps(cls): def compute_advisory_content_hash(self): """Compute Advisory Content Hash for Advisory.""" - advisories = AdvisoryV2.objects.filter(advisory_content_hash__isnull=True) + advisories = AdvisoryV2.objects.latest_per_avid().filter(advisory_content_hash__isnull=True) advisories_count = advisories.count() diff --git a/vulnerabilities/pipelines/v2_improvers/compute_package_risk.py b/vulnerabilities/pipelines/v2_improvers/compute_package_risk.py index 9caaaeb95..dacf7e6c8 100644 --- a/vulnerabilities/pipelines/v2_improvers/compute_package_risk.py +++ b/vulnerabilities/pipelines/v2_improvers/compute_package_risk.py @@ -7,14 +7,15 @@ # See https://aboutcode.org for more information about nexB OSS projects. # from aboutcode.pipeline import LoopProgress +from django.db.models import Max from django.db.models import Prefetch -from django.db.models import Q +from vulnerabilities.models import AdvisoryExploit +from vulnerabilities.models import AdvisoryReference from vulnerabilities.models import AdvisorySeverity from vulnerabilities.models import AdvisoryV2 from vulnerabilities.models import PackageV2 from vulnerabilities.pipelines import VulnerableCodePipeline -from vulnerabilities.risk import compute_package_risk_v2 from vulnerabilities.risk import compute_vulnerability_risk_factors @@ -36,104 +37,146 @@ def steps(cls): ) def compute_and_store_vulnerability_risk_score(self): + affected_advisories = ( - AdvisoryV2.objects.filter(impacted_packages__affecting_packages__isnull=False) + AdvisoryV2.objects.latest_per_avid() + .filter(impacted_packages__affecting_packages__isnull=False) + .only("id") .prefetch_related( - "references", - "severities", - "exploits", + Prefetch( + "references", queryset=AdvisoryReference.objects.only("id", "reference_type") + ), + Prefetch( + "severities", + queryset=AdvisorySeverity.objects.only("id", "value", "url", "scoring_system"), + ), + Prefetch("exploits", queryset=AdvisoryExploit.objects.only("id")), Prefetch( "related_advisory_severities", - queryset=AdvisoryV2.objects.prefetch_related("severities"), + queryset=AdvisoryV2.objects.only("id").prefetch_related( + Prefetch( + "severities", + queryset=AdvisorySeverity.objects.only( + "id", "value", "url", "scoring_system" + ), + ) + ), ), ) .distinct() ) + estimated_vulnerability_count = affected_advisories.count() + self.log( - f"Calculating risk for {affected_advisories.count():,d} advisory with a affected packages records" + f"Calculating risk for {estimated_vulnerability_count:,d} advisory with a affected packages records" ) - progress = LoopProgress(total_iterations=affected_advisories.count(), logger=self.log) + progress = LoopProgress( + logger=self.log, total_iterations=estimated_vulnerability_count, progress_step=5 + ) updatables = [] updated_vulnerability_count = 0 batch_size = 5000 for advisory in progress.iter(affected_advisories.iterator(chunk_size=batch_size)): + references = advisory.references.all() exploits = advisory.exploits.all() - severities = AdvisorySeverity.objects.filter( - Q(advisories=advisory) | Q(advisories__related_to_advisory_severities=advisory) - ).distinct() + severities = list(advisory.severities.all()) - weighted_severity, exploitability = compute_vulnerability_risk_factors( - references=references, - severities=severities, - exploits=exploits, - ) - advisory.weighted_severity = weighted_severity - advisory.exploitability = exploitability - updatables.append(advisory) + for rel in advisory.related_advisory_severities.all(): + severities.extend(rel.severities.all()) + + try: + weighted_severity, exploitability = compute_vulnerability_risk_factors( + references=references, + severities=severities, + exploits=exploits, + ) + + advisory.weighted_severity = weighted_severity + advisory.exploitability = exploitability + if advisory.exploitability and advisory.weighted_severity: + risk_score = min( + float(advisory.exploitability * advisory.weighted_severity), 10.0 + ) + advisory.risk_score = round(risk_score, 1) + updatables.append(advisory) + except Exception as e: + self.log(f"Error computing risk score for advisory {advisory.advisory_id}: {e}") if len(updatables) >= batch_size: updated_vulnerability_count += bulk_update( model=AdvisoryV2, items=updatables, - fields=["weighted_severity", "exploitability"], + fields=["weighted_severity", "exploitability", "risk_score"], logger=self.log, ) - - updated_vulnerability_count += bulk_update( - model=AdvisoryV2, - items=updatables, - fields=["weighted_severity", "exploitability"], - logger=self.log, - ) + updatables.clear() + + if updatables: + updated_vulnerability_count += bulk_update( + model=AdvisoryV2, + items=updatables, + fields=["weighted_severity", "exploitability", "risk_score"], + logger=self.log, + ) self.log( f"Successfully added risk score for {updated_vulnerability_count:,d} vulnerability" ) def compute_and_store_package_risk_score(self): - affected_packages = (PackageV2.objects.filter(affected_in_impacts__isnull=False)).distinct() - self.log(f"Calculating risk for {affected_packages.count():,d} affected package records") + latest_advisories = AdvisoryV2.objects.latest_per_avid() + + qs = ( + PackageV2.objects.filter( + affected_in_impacts__advisory__risk_score__isnull=False, + affected_in_impacts__advisory__in=latest_advisories, + ) + .annotate(computed_risk=Max("affected_in_impacts__advisory__risk_score")) + .only("id") + ) + + estimated = qs.count() progress = LoopProgress( - total_iterations=affected_packages.count(), + total_iterations=estimated, logger=self.log, progress_step=5, ) - updatables = [] - updated_package_count = 0 - batch_size = 10000 + self.log(f"Computing risk for {estimated:,d} packages") - for package in progress.iter(affected_packages.iterator(chunk_size=batch_size)): - risk_score = compute_package_risk_v2(package) + batch = [] + batch_size = 5000 + updated = 0 - if not risk_score: - continue + for pkg in progress.iter(qs.iterator(chunk_size=batch_size)): - package.risk_score = risk_score - updatables.append(package) + pkg.risk_score = round(float(pkg.computed_risk), 1) + batch.append(pkg) - if len(updatables) >= batch_size: - updated_package_count += bulk_update( + if len(batch) >= batch_size: + updated += bulk_update( model=PackageV2, - items=updatables, + items=batch, fields=["risk_score"], logger=self.log, ) - updated_package_count += bulk_update( + batch.clear() + + updated += bulk_update( model=PackageV2, - items=updatables, + items=batch, fields=["risk_score"], logger=self.log, ) - self.log(f"Successfully added risk score for {updated_package_count:,d} package") + self.log(f"Successfully added risk score for {updated:,d} package") def bulk_update(model, items, fields, logger): diff --git a/vulnerabilities/pipelines/v2_improvers/enhance_with_exploitdb.py b/vulnerabilities/pipelines/v2_improvers/enhance_with_exploitdb.py index c306502d8..70afa4ef1 100644 --- a/vulnerabilities/pipelines/v2_improvers/enhance_with_exploitdb.py +++ b/vulnerabilities/pipelines/v2_improvers/enhance_with_exploitdb.py @@ -89,7 +89,7 @@ def add_vulnerability_exploit(row, logger): for adv in alias.advisories.all(): advisories.add(adv) else: - advs = AdvisoryV2.objects.filter(advisory_id=raw_alias) + advs = AdvisoryV2.objects.filter(advisory_id=raw_alias).latest_per_avid() for adv in advs: advisories.add(adv) except AdvisoryAlias.DoesNotExist: diff --git a/vulnerabilities/pipelines/v2_improvers/enhance_with_kev.py b/vulnerabilities/pipelines/v2_improvers/enhance_with_kev.py index 486d79232..5274378da 100644 --- a/vulnerabilities/pipelines/v2_improvers/enhance_with_kev.py +++ b/vulnerabilities/pipelines/v2_improvers/enhance_with_kev.py @@ -78,7 +78,7 @@ def add_vulnerability_exploit(kev_vul, logger): for adv in alias.advisories.all(): advisories.add(adv) else: - advs = AdvisoryV2.objects.filter(advisory_id=cve_id) + advs = AdvisoryV2.objects.filter(advisory_id=cve_id).latest_per_avid() for adv in advs: advisories.add(adv) except AdvisoryAlias.DoesNotExist: diff --git a/vulnerabilities/pipelines/v2_improvers/enhance_with_metasploit.py b/vulnerabilities/pipelines/v2_improvers/enhance_with_metasploit.py index fbfea5150..3ce1ff7c9 100644 --- a/vulnerabilities/pipelines/v2_improvers/enhance_with_metasploit.py +++ b/vulnerabilities/pipelines/v2_improvers/enhance_with_metasploit.py @@ -83,7 +83,7 @@ def add_advisory_exploit(record, logger): for adv in alias.advisories.all(): advisories.add(adv) else: - advs = AdvisoryV2.objects.filter(advisory_id=ref) + advs = AdvisoryV2.objects.filter(advisory_id=ref).latest_per_avid() for adv in advs: advisories.add(adv) except AdvisoryAlias.DoesNotExist: diff --git a/vulnerabilities/pipelines/v2_improvers/relate_severities.py b/vulnerabilities/pipelines/v2_improvers/relate_severities.py index 97a86404b..9ce3e0a30 100644 --- a/vulnerabilities/pipelines/v2_improvers/relate_severities.py +++ b/vulnerabilities/pipelines/v2_improvers/relate_severities.py @@ -61,8 +61,8 @@ def relate_severities(self): severity_score_advisories = ( AdvisoryV2.objects.filter(datasource_id__in=self.pipelines) .filter(severities__scoring_system__in=self.SUPPORTED_SYSTEMS) - .distinct() .latest_per_avid() + .distinct() ) total = severity_score_advisories.count() @@ -70,14 +70,21 @@ def relate_severities(self): advisory_id_map = {} - qs = AdvisoryV2.objects.filter( - advisory_id__in=severity_score_advisories.values("advisory_id") - ).values("id", "advisory_id") - - alias_qs = AdvisoryV2.objects.filter( - aliases__alias__in=severity_score_advisories.values("advisory_id") - ).values("id", "aliases__alias") + qs = ( + AdvisoryV2.objects.filter( + advisory_id__in=severity_score_advisories.values("advisory_id") + ) + .latest_per_avid() + .values("id", "advisory_id") + ) + alias_qs = ( + AdvisoryV2.objects.filter( + aliases__alias__in=severity_score_advisories.values("advisory_id") + ) + .latest_per_avid() + .values("id", "aliases__alias") + ) for row in qs: advisory_id_map.setdefault(row["advisory_id"], set()).add(row["id"]) diff --git a/vulnerabilities/risk.py b/vulnerabilities/risk.py index 0628422bb..dd7401d80 100644 --- a/vulnerabilities/risk.py +++ b/vulnerabilities/risk.py @@ -8,6 +8,9 @@ # from urllib.parse import urlparse +from django.db.models import Max + +from vulnerabilities.models import AdvisoryV2 from vulnerabilities.models import VulnerabilityReference from vulnerabilities.severity_systems import EPSS from vulnerabilities.weight_config import WEIGHT_CONFIG @@ -123,12 +126,14 @@ def compute_package_risk_v2(package): Calculate the risk for a package by iterating over all vulnerabilities that affects this package and determining the associated risk. """ - result = [] - for impact in package.affected_in_impacts.all(): - if risk := impact.advisory.risk_score: - result.append(float(risk)) - if not result: + max_risk = ( + AdvisoryV2.objects.latest_affecting_advisories_for_purl(package.purl).aggregate( + max_risk=Max("risk_score") + ) + )["max_risk"] + + if max_risk is None: return - return round(max(result), 1) + return round(float(max_risk), 1) diff --git a/vulnerabilities/templates/affected_by_advisories.html b/vulnerabilities/templates/affected_by_advisories.html new file mode 100644 index 000000000..01721b84f --- /dev/null +++ b/vulnerabilities/templates/affected_by_advisories.html @@ -0,0 +1,116 @@ +{% extends "base.html" %} +{% load humanize %} +{% load widget_tweaks %} +{% load static %} +{% load url_filters %} +{% load utils %} + +{% block content %} +
+
+
+
+ {{ page_obj.paginator.count|intcomma }} results +
+ {% if is_paginated %} + {% include 'includes/pagination.html' with page_obj=page_obj %} + {% endif %} +
+
+
+ +
+
+ + + + + + + + + + + + + {% for advisory in page_obj %} + + + + + + + + {% empty %} + + + + {% endfor %} + +
AdvisorySourceDate PublishedSummaryFixed in package version
+ + {{advisory.avid }} + +
+ {% if advisory.alias|length != 0 %} + Aliases: + {% endif %} +
+ {% for alias in advisory.alias %} + {% if alias.url %} + {{ alias }} +
+ {% else %} + {{ alias }} +
+ {% endif %} + {% endfor %} + + {% if advisory.secondary|length != 0 %} +

Supporting advisories are listed below the primary advisory.

+ {% for secondary in advisory.secondary %} + + {{secondary.avid }} + + {% endfor %} + {% endif %} +
+ {{advisory.url}} + + {{advisory.date_published}} + + {{ advisory.summary }} + + {% with fixed=fixed_package_details|get_item:advisory.avid %} + {% if fixed %} + {% for item in fixed %} +
+ {{ item.pkg.version }} +
+ {% if item.pkg.is_vulnerable %} + + Vulnerable + + {% else %} + + Not vulnerable + + {% endif %} +
+ {% endfor %} + {% else %} + There are no reported fixed by versions. + {% endif %} + {% endwith %} +
+ This package is not known to be subject of any advisories. +
+
+ +{% if is_paginated %} + {% include 'includes/pagination.html' with page_obj=page_obj %} +{% endif %} +{% endblock %} +
diff --git a/vulnerabilities/templates/fixing_advisories.html b/vulnerabilities/templates/fixing_advisories.html new file mode 100644 index 000000000..64af4fc65 --- /dev/null +++ b/vulnerabilities/templates/fixing_advisories.html @@ -0,0 +1,76 @@ +{% extends "base.html" %} +{% load humanize %} +{% load widget_tweaks %} + +{% block content %} +
+
+
+
+ {{ page_obj.paginator.count|intcomma }} results +
+ {% if is_paginated %} + {% include 'includes/pagination.html' with page_obj=page_obj %} + {% endif %} +
+
+
+ +
+
+ + + + + + + + + + + + {% for advisory in page_obj %} + + + + + + + + {% empty %} + + + + {% endfor %} + +
AdvisorySourceDate PublishedSummaryAliases
+ + {{advisory.avid }} + + + {{advisory.url}} + + {{advisory.date_published}} + + {{ advisory.summary }} + + {% for alias in advisory.alias %} + {% if alias.url %} + {{ alias }} +
+ {% else %} + {{ alias }} +
+ {% endif %} + {% endfor %} +
+ This package is not known to fix any advisories. +
+
+ +{% if is_paginated %} + {% include 'includes/pagination.html' with page_obj=page_obj %} +{% endif %} +{% endblock %} +
diff --git a/vulnerabilities/templates/package_details_v2.html b/vulnerabilities/templates/package_details_v2.html index 9cc9ea343..f90585b9d 100644 --- a/vulnerabilities/templates/package_details_v2.html +++ b/vulnerabilities/templates/package_details_v2.html @@ -45,7 +45,7 @@
- {% if affected_by_advisories_v2|length != 0 %} + {% if affected_by_advisories_v2|length != 0 or affected_by_advisories_v2_url %}
{% else %}
@@ -82,7 +82,7 @@
- {% if affected_by_advisories_v2|length != 0 %} + {% if affected_by_advisories_v2|length != 0 or affected_by_advisories_v2_url %}
@@ -128,10 +128,10 @@ {% endif %}
+ {% if affected_by_advisories_v2|length != 0 %}
Vulnerabilities affecting this package ({{ affected_by_advisories_v2|length }})
-
@@ -192,9 +192,15 @@ {{ item.pkg.version }}
- - Subject of {{ item.affected_count }} other advisories. + {% if item.pkg.is_vulnerable %} + + Vulnerable + + {% else %} + + Not vulnerable + {% endif %} {% endfor %} {% else %} @@ -212,9 +218,20 @@ {% endfor %}
+ {% elif affected_by_advisories_v2_url %} +
+ This package is subject to more than 100 advisories. Please refer to the following + URL for vulnerabilities affecting this package: Advisories +
+ {% else %} +
+ This package is not known to be subject of any advisories. +
+ {% endif %}
+ {% if fixing_advisories_v2|length != 0 %}
Vulnerabilities fixed by this package ({{ fixing_advisories_v2|length }})
@@ -279,6 +296,16 @@
+ {% elif fixing_advisories_v2_url %} +
+ This package is known to fix more than 100 advisories. Please refer to the following + URL for vulnerabilities fixed by this package: Advisories +
+ {% else %} +
+ This package is not known to fix any advisories. +
+ {% endif %}
diff --git a/vulnerabilities/templates/packages_v2.html b/vulnerabilities/templates/packages_v2.html index fe2b05abe..752e0709c 100644 --- a/vulnerabilities/templates/packages_v2.html +++ b/vulnerabilities/templates/packages_v2.html @@ -41,14 +41,14 @@ - Affected by vulnerabilities + Vulnerable - Fixing vulnerabilities + Risk Score @@ -61,8 +61,8 @@ href="{{ package.get_absolute_url }}?search={{ search }}" target="_self">{{ package.purl }} - {{ package.vulnerability_count }} - {{ package.patched_vulnerability_count }} + {{ package.is_vulnerable|yesno:"Yes,No" }} + {{ package.risk_score }} {% empty %} diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index 6968123c7..c4abe3b97 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -834,74 +834,3 @@ def test_filter_codefix_by_advisory_id_not_found(self): response = self.client.get(self.url, {"advisory_id": "nonexistent/ADVISORY-ID"}) assert response.status_code == status.HTTP_200_OK assert response.data["count"] == 0 - - -class AdvisoriesPackageV2Tests(APITestCase): - def setUp(self): - from vulnerabilities.models import ImpactedPackage - - self.advisory = AdvisoryV2.objects.create( - datasource_id="ghsa", - advisory_id="GHSA-1234", - avid="ghsa/GHSA-1234", - unique_content_id="f" * 64, - url="https://example.com/advisory", - date_collected="2025-07-01T00:00:00Z", - ) - - self.package = PackageV2.objects.from_purl(purl="pkg:pypi/sample@1.0.0") - self.impact = ImpactedPackage.objects.create( - advisory=self.advisory, base_purl="pkg:pypi/sample" - ) - self.impact.affecting_packages.add(self.package) - - self.client = APIClient(enforce_csrf_checks=True) - - def test_list_with_purl_filter(self): - url = reverse("package-v3-list") - with self.assertNumQueries(31): - response = self.client.get(url, {"purl": "pkg:pypi/sample@1.0.0"}) - assert response.status_code == 200 - assert "packages" in response.data["results"] - assert "advisories_by_id" in response.data["results"] - assert self.advisory.avid in response.data["results"]["advisories_by_id"] - - def test_bulk_lookup(self): - url = reverse("package-v3-bulk-lookup") - with self.assertNumQueries(30): - response = self.client.post(url, {"purls": ["pkg:pypi/sample@1.0.0"]}, format="json") - assert response.status_code == 200 - assert "packages" in response.data - assert "advisories_by_id" in response.data - assert self.advisory.avid in response.data["advisories_by_id"] - - def test_bulk_search_plain(self): - url = reverse("package-v3-bulk-search") - payload = {"purls": ["pkg:pypi/sample@1.0.0"], "plain_purl": True, "purl_only": False} - with self.assertNumQueries(30): - response = self.client.post(url, payload, format="json") - assert response.status_code == 200 - assert "packages" in response.data - assert "advisories_by_id" in response.data - - def test_bulk_search_purl_only(self): - url = reverse("package-v3-bulk-search") - payload = {"purls": ["pkg:pypi/sample@1.0.0"], "plain_purl": False, "purl_only": True} - with self.assertNumQueries(17): - response = self.client.post(url, payload, format="json") - assert response.status_code == 200 - assert "pkg:pypi/sample@1.0.0" in response.data - - def test_lookup_single_package(self): - url = reverse("package-v3-lookup") - with self.assertNumQueries(23): - response = self.client.post(url, {"purl": "pkg:pypi/sample@1.0.0"}, format="json") - assert response.status_code == 200 - assert any(pkg["purl"] == "pkg:pypi/sample@1.0.0" for pkg in response.data) - - def test_get_all_vulnerable_purls(self): - url = reverse("package-v3-all") - with self.assertNumQueries(3): - response = self.client.get(url) - assert response.status_code == 200 - assert "pkg:pypi/sample@1.0.0" in response.data diff --git a/vulnerabilities/tests/test_api_v3.py b/vulnerabilities/tests/test_api_v3.py new file mode 100644 index 000000000..6d8b9519b --- /dev/null +++ b/vulnerabilities/tests/test_api_v3.py @@ -0,0 +1,252 @@ +from django.urls import reverse +from packageurl import PackageURL +from rest_framework import status +from rest_framework.test import APIClient +from rest_framework.test import APITestCase +from univers.version_range import PypiVersionRange + +from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.models import PackageV2 +from vulnerabilities.pipes.advisory import insert_advisory_v2 + + +class APIV3TestCase(APITestCase): + def setUp(self): + from vulnerabilities.models import ImpactedPackage + + self.advisory = AdvisoryV2.objects.create( + datasource_id="ghsa", + advisory_id="GHSA-1234", + avid="ghsa/GHSA-1234", + unique_content_id="f" * 64, + url="https://example.com/advisory", + date_collected="2025-07-01T00:00:00Z", + ) + + self.package = PackageV2.objects.from_purl(purl="pkg:pypi/sample@1.0.0") + self.impact = ImpactedPackage.objects.create( + advisory=self.advisory, base_purl="pkg:pypi/sample" + ) + self.impact.affecting_packages.add(self.package) + + self.client = APIClient(enforce_csrf_checks=True) + + def test_packages_post_without_details(self): + url = reverse("package-v3-list") + + with self.assertNumQueries(4): + response = self.client.post( + url, + data={ + "purls": ["pkg:pypi/sample@1.0.0"], + "details": False, + }, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + results = response.data["results"] + self.assertEqual(len(results), 1) + self.assertEqual(results[0], "pkg:pypi/sample@1.0.0") + + def test_packages_post_with_details(self): + url = reverse("package-v3-list") + + with self.assertNumQueries(21): + response = self.client.post( + url, + data={ + "purls": ["pkg:pypi/sample@1.0.0"], + "details": True, + }, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + pkg = response.data["results"][0] + self.assertEqual(pkg["purl"], "pkg:pypi/sample@1.0.0") + + def test_advisories_post(self): + url = reverse("advisory-v3-list") + + with self.assertNumQueries(10): + response = self.client.post( + url, + data={"purls": ["pkg:pypi/sample@1.0.0"]}, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + advisory = response.data["results"][0] + self.assertEqual(advisory["advisory_id"], "ghsa/GHSA-1234") + + def test_affected_by_advisories_list(self): + url = reverse("affected-by-advisories-list") + + with self.assertNumQueries(11): + response = self.client.get( + url, + {"purl": "pkg:pypi/sample@1.0.0"}, + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + results = response.data["results"] + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["advisory_id"], "ghsa/GHSA-1234") + + def test_fixing_advisories_list_empty(self): + url = reverse("fixing-advisories-list") + + with self.assertNumQueries(3): + response = self.client.get( + url, + {"purl": "pkg:pypi/sample@1.0.0"}, + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data["results"]), 0) + + def test_packages_pagination(self): + url = reverse("package-v3-list") + + response = self.client.post( + url, + data={"purls": []}, + format="json", + ) + + self.assertEqual(response.status_code, 200) + + self.assertIn("count", response.data) + self.assertEqual(response.data["count"], 1) + self.assertIn("results", response.data) + self.assertIn("next", response.data) + + def test_packages_approximate(self): + url = reverse("package-v3-list") + + response = self.client.post( + url, + data={ + "purls": ["pkg:pypi/sample@1.0.0?foo=bar"], + "approximate": True, + "details": False, + }, + format="json", + ) + + self.assertEqual(response.status_code, 200) + self.assertGreaterEqual(len(response.data["results"]), 1) + self.assertIn("pkg:pypi/sample@1.0.0", response.data["results"]) + + +class APIV3TestCaseOnePackageMultipleAdvisories(APITestCase): + def setUp(self): + from vulnerabilities.importer import AdvisoryDataV2 + from vulnerabilities.importer import AffectedPackageV2 + + affected_packages = [] + affected_packages.append( + AffectedPackageV2( + package=PackageURL(type="pypi", name="sample"), + affected_version_range=PypiVersionRange.from_string("vers:pypi/=1.0.0"), + ) + ) + + for i in range(1, 102): + advisory = AdvisoryDataV2( + advisory_id=f"GHSA-1234{i}", + aliases=[f"CVE-2021-1234{i}"], + summary="Sample advisory", + affected_packages=affected_packages, + url="https://example.com/advisory", + original_advisory_text="Sample advisory text", + ) + + insert_advisory_v2(advisory, "ghsa_importer", print, 100) + + self.client = APIClient(enforce_csrf_checks=True) + + def test_packages_post_purl_with_many_advisories(self): + url = reverse("package-v3-list") + + with self.assertNumQueries(11): + response = self.client.post( + url, + data={ + "purls": ["pkg:pypi/sample@1.0.0"], + "details": True, + }, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + results = response.data["results"] + self.assertEqual(len(results), 1) + self.assertIsNotNone(results[0]["affected_by_vulnerabilities_url"]) + + def test_advisories_post(self): + url = reverse("advisory-v3-list") + + with self.assertNumQueries(64): + response = self.client.post( + url, + data={"purls": ["pkg:pypi/sample@1.0.0"]}, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data["results"]), 10) + advisory = response.data["results"][0] + self.assertEqual(advisory["advisory_id"], "ghsa_importer/GHSA-12341") + + +class APIV3TestCaseOneAdvisoryMultiplePackages(APITestCase): + def setUp(self): + from vulnerabilities.importer import AdvisoryDataV2 + from vulnerabilities.importer import AffectedPackageV2 + + affected_packages = [] + for i in range(1, 102): + affected_packages.append( + AffectedPackageV2( + package=PackageURL(type="pypi", name=f"sample{i}"), + affected_version_range=PypiVersionRange.from_string("vers:pypi/=1.0.0"), + ) + ) + + advisory = AdvisoryDataV2( + advisory_id=f"GHSA-1234{i}", + aliases=[f"CVE-2021-1234{i}"], + summary="Sample advisory", + affected_packages=affected_packages, + url="https://example.com/advisory", + original_advisory_text="Sample advisory text", + ) + + insert_advisory_v2(advisory, "ghsa_importer", print, 100) + + self.client = APIClient(enforce_csrf_checks=True) + + def test_get_all_vulnerable_purls(self): + url = reverse("package-v3-list") + + with self.assertNumQueries(4): + response = self.client.post( + url, + data={ + "purls": [], + }, + format="json", + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK) + + results = response.data["results"] + self.assertEqual(len(results), 10) + self.assertIn("next", response.data) diff --git a/vulnerabilities/views.py b/vulnerabilities/views.py index 860bde8eb..cf9b4ccf0 100644 --- a/vulnerabilities/views.py +++ b/vulnerabilities/views.py @@ -7,6 +7,7 @@ # See https://aboutcode.org for more information about nexB OSS projects. # import logging +from collections import defaultdict from cvss.exceptions import CVSS2MalformedError from cvss.exceptions import CVSS3MalformedError @@ -15,8 +16,8 @@ from django.contrib.auth.views import LoginView from django.core.exceptions import ValidationError from django.core.mail import send_mail -from django.db.models import Count -from django.db.models import F +from django.db.models import Exists +from django.db.models import OuterRef from django.db.models import Prefetch from django.http.response import Http404 from django.shortcuts import get_object_or_404 @@ -77,34 +78,6 @@ def get_queryset(self, query=None): ) -class PackageSearchV2(ListView): - model = models.PackageV2 - template_name = "packages_v2.html" - ordering = ["type", "namespace", "name", "version"] - paginate_by = PAGE_SIZE - - def get_context_data(self, **kwargs): - context = super().get_context_data(**kwargs) - request_query = self.request.GET - context["package_search_form"] = PackageSearchForm(request_query) - context["search"] = request_query.get("search") - return context - - def get_queryset(self, query=None): - """ - Return a Package queryset for the ``query``. - Make a best effort approach to find matching packages either based - on exact purl, partial purl or just name and namespace. - """ - query = query or self.request.GET.get("search") or "" - return ( - self.model.objects.search(query) - .with_vulnerability_counts() - .prefetch_related() - .order_by("package_url") - ) - - class VulnerabilitySearch(ListView): model = models.Vulnerability template_name = "vulnerabilities.html" @@ -123,24 +96,6 @@ def get_queryset(self, query=None): return self.model.objects.search(query=query).with_package_counts() -class AdvisorySearch(ListView): - model = models.AdvisoryV2 - template_name = "vulnerabilities.html" - ordering = ["advisory_id"] - paginate_by = PAGE_SIZE - - def get_context_data(self, **kwargs): - context = super().get_context_data(**kwargs) - request_query = self.request.GET - context["advisory_search_form"] = VulnerabilitySearchForm(request_query) - context["search"] = request_query.get("search") - return context - - def get_queryset(self, query=None): - query = query or self.request.GET.get("search") or "" - return self.model.objects.search(query=query).with_package_counts() - - class PackageDetails(DetailView): model = models.Package template_name = "package_details.html" @@ -182,95 +137,130 @@ def get_object(self, queryset=None): return package -class PackageV2Details(DetailView): +class PackageSearchV2(ListView): model = models.PackageV2 - template_name = "package_details_v2.html" - slug_url_kwarg = "purl" - slug_field = "purl" + template_name = "packages_v2.html" + ordering = ["type", "namespace", "name", "version"] + paginate_by = PAGE_SIZE def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) - package = self.object - next_non_vulnerable, latest_non_vulnerable = package.get_non_vulnerable_versions() - - ( - fixed_pkg_details, - affected_by_advisories, - fixing_advisories, - ) = self.get_fixed_package_details(package) + request_query = self.request.GET + context["package_search_form"] = PackageSearchForm(request_query) + context["search"] = request_query.get("search") + return context - affected_avid_by_hash = {} - fixing_avid_by_hash = {} + def get_queryset(self, query=None): + """ + Return a Package queryset for the ``query``. + Make a best effort approach to find matching packages either based + on exact purl, partial purl or just name and namespace. + """ + query = query or self.request.GET.get("search") or "" + return ( + self.model.objects.search(query) + .prefetch_related() + .order_by("package_url") + .with_is_vulnerable() + ) - affected_avid_by_hash = group_advisories_by_content(affected_by_advisories) - fixing_avid_by_hash = group_advisories_by_content(fixing_advisories) - affecting_advs = [] +class AffectedByAdvisoriesListView(ListView): + model = models.AdvisoryV2 + template_name = "affected_by_advisories.html" + paginate_by = PAGE_SIZE - for hash in affected_avid_by_hash: - affecting_advs.append(affected_avid_by_hash[hash]) + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + purl = self.kwargs.get("purl") + package = models.PackageV2.objects.for_purl(purl).first() + context["fixed_package_details"] = get_fixed_package_details(package) + return context - fixing_advs = [] + def get_queryset(self): + purl = self.kwargs.get("purl") + print(purl) + return models.AdvisoryV2.objects.latest_affecting_advisories_for_purl(purl) - for hash in fixing_avid_by_hash: - fixing_advs.append(fixing_avid_by_hash[hash]) - context["package"] = package - context["next_non_vulnerable"] = next_non_vulnerable - context["latest_non_vulnerable"] = latest_non_vulnerable - context["affected_by_advisories_v2"] = affecting_advs - context["fixing_advisories_v2"] = fixing_advs +class FixingAdvisoriesListView(ListView): + model = models.AdvisoryV2 + template_name = "fixing_advisories.html" + paginate_by = PAGE_SIZE - context["package_search_form"] = PackageSearchForm(self.request.GET) - context["fixed_package_details"] = fixed_pkg_details + def get_queryset(self): + purl = self.kwargs.get("purl") + return models.AdvisoryV2.objects.latest_fixed_by_advisories_for_purl(purl) - return context - def get_fixed_package_details(self, package): - affected_impacts = package.affected_in_impacts.select_related("advisory").prefetch_related( - Prefetch( - "fixed_by_packages", - queryset=( - models.PackageV2.objects.annotate(affected_count=Count("affected_in_impacts")) - ), - ) - ) +class PackageV2Details(DetailView): + model = models.PackageV2 + template_name = "package_details_v2.html" + slug_url_kwarg = "purl" + slug_field = "purl" - fixed_impacts = package.fixed_in_impacts.select_related("advisory") + def get_context_data(self, **kwargs): + context = super().get_context_data(**kwargs) + package = self.object - affected_avids = {impact.advisory.avid for impact in affected_impacts if impact.advisory_id} + next_non_vulnerable, latest_non_vulnerable = package.get_non_vulnerable_versions() - fixed_avids = {impact.advisory.avid for impact in fixed_impacts if impact.advisory_id} + context["package"] = package + context["next_non_vulnerable"] = next_non_vulnerable + context["latest_non_vulnerable"] = latest_non_vulnerable + context["package_search_form"] = PackageSearchForm(self.request.GET) - all_avids = affected_avids | fixed_avids + affected_by_advisories_qs = models.AdvisoryV2.objects.latest_affecting_advisories_for_purl( + package.package_url + ) - advisories = models.AdvisoryV2.objects.latest_for_avids(all_avids) - advisory_by_avid = {adv.avid: adv for adv in advisories} + fixing_advisories_qs = models.AdvisoryV2.objects.latest_fixed_by_advisories_for_purl( + package.package_url + ) - fixed_pkg_details = {} + affected_by_advisories_url = None + fixing_advisories_url = None - for impact in affected_impacts: - advisory = advisory_by_avid.get(impact.advisory.avid) - if not advisory: - continue + affected_by_advisories = list(affected_by_advisories_qs[:101]) + if len(affected_by_advisories) > 100: + affected_by_advisories_url = reverse_lazy( + "affected_by_advisories_v2", kwargs={"purl": package.package_url} + ) + context["affected_by_advisories_v2_url"] = affected_by_advisories_url + context["affected_by_advisories_v2"] = [] + context["fixed_package_details"] = {} - fixed_pkg_details.setdefault(impact.advisory.avid, []).extend( - { - "pkg": pkg, - "affected_count": pkg.affected_count, - } - for pkg in impact.fixed_by_packages.all() + else: + fixed_pkg_details = get_fixed_package_details(package) + affected_avid_by_hash = {} + affected_avid_by_hash = group_advisories_by_content(affected_by_advisories_qs) + affecting_advs = [] + + for hash in affected_avid_by_hash: + affecting_advs.append(affected_avid_by_hash[hash]) + context["affected_by_advisories_v2"] = affecting_advs + context["fixed_package_details"] = fixed_pkg_details + context["affected_by_advisories_v2_url"] = None + + fixing_advisories = list(fixing_advisories_qs[:101]) + if len(fixing_advisories) > 100: + fixing_advisories_url = reverse_lazy( + "fixing_advisories_v2", kwargs={"purl": package.package_url} ) + context["fixing_advisories_v2_url"] = fixing_advisories_url + context["fixing_advisories_v2"] = [] - affected_by_advisories = { - advisory_by_avid[avid] for avid in affected_avids if avid in advisory_by_avid - } + else: + fixing_avid_by_hash = {} + fixing_avid_by_hash = group_advisories_by_content(fixing_advisories_qs) + fixing_advs = [] - fixing_advisories = { - advisory_by_avid[avid] for avid in fixed_avids if avid in advisory_by_avid - } + for hash in fixing_avid_by_hash: + fixing_advs.append(fixing_avid_by_hash[hash]) + context["fixing_advisories_v2"] = fixing_advs + context["fixing_advisories_v2_url"] = None - return fixed_pkg_details, affected_by_advisories, fixing_advisories + return context def get_queryset(self): return ( @@ -308,6 +298,43 @@ def get_object(self, queryset=None): return package +def get_fixed_package_details(package): + rows = package.affected_in_impacts.values_list( + "advisory__avid", + "fixed_by_packages", + ) + + pkg_ids = {pkg_id for _, pkg_id in rows if pkg_id} + + pkg_map = { + p.id: p + for p in models.PackageV2.objects.filter(id__in=pkg_ids).annotate( + is_vulnerable=Exists( + models.ImpactedPackage.objects.filter(affecting_packages=OuterRef("pk")) + ) + ) + } + + fixed_pkg_details = defaultdict(list) + + for avid, pkg_id in rows: + if not pkg_id: + continue + + pkg = pkg_map.get(pkg_id) + if not pkg: + continue + + fixed_pkg_details[avid].append( + { + "pkg": pkg, + "is_vulnerable": pkg.is_vulnerable, + } + ) + + return fixed_pkg_details + + class VulnerabilityDetails(DetailView): model = models.Vulnerability template_name = "vulnerability_details.html" diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index 49948a3b9..eb1bc006b 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -23,13 +23,18 @@ from vulnerabilities.api_v2 import CodeFixV2ViewSet from vulnerabilities.api_v2 import CodeFixViewSet from vulnerabilities.api_v2 import PackageV2ViewSet -from vulnerabilities.api_v2 import PackageV3ViewSet from vulnerabilities.api_v2 import PipelineScheduleV2ViewSet from vulnerabilities.api_v2 import VulnerabilityV2ViewSet +from vulnerabilities.api_v3 import AdvisoryV3ViewSet +from vulnerabilities.api_v3 import AffectedByAdvisoriesViewSet +from vulnerabilities.api_v3 import FixingAdvisoriesViewSet +from vulnerabilities.api_v3 import PackageV3ViewSet from vulnerabilities.views import AdminLoginView from vulnerabilities.views import AdvisoryDetails from vulnerabilities.views import AdvisoryPackagesDetails +from vulnerabilities.views import AffectedByAdvisoriesListView from vulnerabilities.views import ApiUserCreateView +from vulnerabilities.views import FixingAdvisoriesListView from vulnerabilities.views import HomePage from vulnerabilities.views import HomePageV2 from vulnerabilities.views import PackageDetails @@ -70,6 +75,11 @@ def __init__(self, *args, **kwargs): api_v3_router = OptionalSlashRouter() api_v3_router.register("packages", PackageV3ViewSet, basename="package-v3") +api_v3_router.register("advisories", AdvisoryV3ViewSet, basename="advisory-v3") +api_v3_router.register( + "affected-by-advisories", AffectedByAdvisoriesViewSet, basename="affected-by-advisories" +) +api_v3_router.register("fixing-advisories", FixingAdvisoriesViewSet, basename="fixing-advisories") urlpatterns = [ path("admin/login/", AdminLoginView.as_view(), name="admin-login"), @@ -134,6 +144,16 @@ def __init__(self, *args, **kwargs): PackageV2Details.as_view(), name="package_details_v2", ), + re_path( + r"^fixing-advisories/v2/(?Ppkg:.+)$", + FixingAdvisoriesListView.as_view(), + name="fixing_advisories_v2", + ), + re_path( + r"^affected-by-advisories/v2/(?Ppkg:.+)$", + AffectedByAdvisoriesListView.as_view(), + name="affected_by_advisories_v2", + ), path( "vulnerabilities/search/", VulnerabilitySearch.as_view(), @@ -174,10 +194,10 @@ def __init__(self, *args, **kwargs): TemplateView.as_view(template_name="tos.html"), name="api_tos", ), - path( - "admin/", - admin.site.urls, - ), + # path( + # "admin/", + # admin.site.urls, + # ), ] if DEBUG: