diff --git a/dojo/finding/deduplication.py b/dojo/finding/deduplication.py index 99d416f44f0..900476926a8 100644 --- a/dojo/finding/deduplication.py +++ b/dojo/finding/deduplication.py @@ -217,14 +217,17 @@ def is_deduplication_on_engagement_mismatch(new_finding, to_duplicate_finding): return False -def get_endpoints_as_url(finding): - # Fix for https://github.com/DefectDojo/django-DefectDojo/issues/10215 - # When endpoints lack a protocol (scheme), str(e) returns a string like "10.20.197.218:6379" - # without the "//" prefix. hyperlink.parse() then misinterprets the hostname as the scheme. - # We replicate the behavior from dojo/endpoint/utils.py line 265: prepend "//" if "://" is missing - # to ensure hyperlink.parse() correctly identifies host, port, and path components. +def get_endpoints_as_url(endpoints): + """ + Convert a list of Endpoint objects to parsed hyperlink URLs. + + Fix for https://github.com/DefectDojo/django-DefectDojo/issues/10215 + When endpoints lack a protocol (scheme), str(e) returns a string like "10.20.197.218:6379" + without the "//" prefix. hyperlink.parse() then misinterprets the hostname as the scheme. + We prepend "//" if "://" is missing to ensure correct parsing. + """ urls = [] - for e in finding.endpoints.all(): + for e in endpoints: endpoint_str = str(e) if "://" not in endpoint_str: endpoint_str = "//" + endpoint_str @@ -242,8 +245,9 @@ def are_urls_equal(url1, url2, fields): return True -def finding_locations(finding): - return [ref.location.url for ref in finding.locations.all()] +def finding_locations(location_refs): + """Extract URLs from a list of location references.""" + return [ref.location.url for ref in location_refs] def are_location_urls_equal(url1, url2, fields): @@ -266,8 +270,11 @@ def are_locations_duplicates(new_finding, to_duplicate_finding): return True if settings.V3_FEATURE_LOCATIONS: - list1 = finding_locations(new_finding) - list2 = finding_locations(to_duplicate_finding) + # Use unsaved_locations for unsaved findings (preview mode), saved M2M otherwise + locs1 = new_finding.locations.all() if new_finding.pk else getattr(new_finding, "unsaved_locations", []) + locs2 = to_duplicate_finding.locations.all() if to_duplicate_finding.pk else getattr(to_duplicate_finding, "unsaved_locations", []) + list1 = finding_locations(locs1) + list2 = finding_locations(locs2) deduplicationLogger.debug( f"Starting deduplication by location fields for finding {new_finding.id} with locations {list1} and finding {to_duplicate_finding.id} with locations {list2}", @@ -284,8 +291,11 @@ def are_locations_duplicates(new_finding, to_duplicate_finding): deduplicationLogger.debug(f"locations are not duplicates: {new_finding.id} and {to_duplicate_finding.id}") return False # TODO: Delete this after the move to Locations - list1 = get_endpoints_as_url(new_finding) - list2 = get_endpoints_as_url(to_duplicate_finding) + # Use unsaved_endpoints for unsaved findings (preview mode), saved M2M otherwise + eps1 = new_finding.endpoints.all() if new_finding.pk else getattr(new_finding, "unsaved_endpoints", []) + eps2 = to_duplicate_finding.endpoints.all() if to_duplicate_finding.pk else getattr(to_duplicate_finding, "unsaved_endpoints", []) + list1 = get_endpoints_as_url(eps1) + list2 = get_endpoints_as_url(eps2) deduplicationLogger.debug( f"Starting deduplication by endpoint fields for finding {new_finding.id} with urls {list1} and finding {to_duplicate_finding.id} with urls {list2}", @@ -535,6 +545,9 @@ def find_candidates_for_reimport_legacy(test, findings, service=None): def _is_candidate_older(new_finding, candidate): + # Unsaved findings (e.g. preview mode) have no PK — all DB candidates are older by definition + if new_finding.pk is None: + return True # Ensure the newer finding is marked as duplicate of the older finding is_older = candidate.id < new_finding.id if not is_older: @@ -715,7 +728,116 @@ def _flush_duplicate_changes(modified_new_findings): return modified_new_findings +# --------------------------------------------------------------------------- +# Match-only functions (read-only, no DB writes) +# These return [(new_finding, matched_candidate), ...] without persisting. +# Used by both the regular dedup pipeline and the Pro import/reimport preview engine. +# --------------------------------------------------------------------------- + + +def match_batch_hash_code(findings): + """Find dedup matches by hash_code without persisting. Returns [(finding, candidate), ...].""" + if not findings: + return [] + test = findings[0].test + candidates_by_hash = find_candidates_for_deduplication_hash(test, findings) + if not candidates_by_hash: + return [] + matches = [] + for new_finding in findings: + for match in get_matches_from_hash_candidates(new_finding, candidates_by_hash): + matches.append((new_finding, match)) + break + return matches + + +def match_batch_unique_id(findings): + """Find dedup matches by unique_id_from_tool without persisting. Returns [(finding, candidate), ...].""" + if not findings: + return [] + test = findings[0].test + candidates_by_uid = find_candidates_for_deduplication_unique_id(test, findings) + if not candidates_by_uid: + return [] + matches = [] + for new_finding in findings: + for match in get_matches_from_unique_id_candidates(new_finding, candidates_by_uid): + matches.append((new_finding, match)) + break + return matches + + +def match_batch_uid_or_hash(findings): + """Find dedup matches by uid or hash_code without persisting. Returns [(finding, candidate), ...].""" + if not findings: + return [] + test = findings[0].test + candidates_by_uid, existing_by_hash = find_candidates_for_deduplication_uid_or_hash(test, findings) + if not (candidates_by_uid or existing_by_hash): + return [] + matches = [] + for new_finding in findings: + if new_finding.duplicate: + continue + for match in get_matches_from_uid_or_hash_candidates(new_finding, candidates_by_uid, existing_by_hash): + matches.append((new_finding, match)) + break + return matches + + +def match_batch_legacy(findings): + """Find dedup matches by legacy algorithm without persisting. Returns [(finding, candidate), ...].""" + if not findings: + return [] + test = findings[0].test + candidates_by_title, candidates_by_cwe = find_candidates_for_deduplication_legacy(test, findings) + if not (candidates_by_title or candidates_by_cwe): + return [] + matches = [] + for new_finding in findings: + for match in get_matches_from_legacy_candidates(new_finding, candidates_by_title, candidates_by_cwe): + matches.append((new_finding, match)) + break + return matches + + +def match_batch_of_findings(findings): + """ + Batch match findings against existing candidates without persisting. + + Returns list of (new_finding, matched_candidate) tuples. + Works with both saved and unsaved findings. + """ + if not findings: + return [] + enabled = System_Settings.objects.get().enable_deduplication + if not enabled: + return [] + # Only sort by id for saved findings; unsaved findings have no id + if findings[0].pk is not None: + findings = sorted(findings, key=attrgetter("id")) + test = findings[0].test + dedup_alg = test.deduplication_algorithm + if dedup_alg == settings.DEDUPE_ALGO_HASH_CODE: + return match_batch_hash_code(findings) + if dedup_alg == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL: + return match_batch_unique_id(findings) + if dedup_alg == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE: + return match_batch_uid_or_hash(findings) + return match_batch_legacy(findings) + + +# --------------------------------------------------------------------------- +# Batch dedup functions (match + persist) +# These call the match-only functions above and then persist the results. +# --------------------------------------------------------------------------- + + def _dedupe_batch_hash_code(findings): + # NOTE: These functions intentionally interleave matching and set_duplicate() + # rather than calling the match_batch_*() functions above. This is because + # set_duplicate() modifies finding.duplicate in-memory, which affects the + # duplicate check in subsequent loop iterations (especially for uid_or_hash). if not findings: return [] test = findings[0].test diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index a57b6884152..103bb2f48f1 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -308,35 +308,12 @@ def process_findings( return new_findings - def close_old_findings( - self, - findings: list[Finding], - **kwargs: dict, - ) -> list[Finding]: + def get_close_old_findings_queryset(self, new_hash_codes, new_unique_ids_from_tool): """ - Closes old findings based on a hash code match at either the product - or the engagement scope. Closing an old finding entails setting the - finding to mitigated status, setting all location statuses to mitigated, - as well as leaving a not on the finding indicating that it was mitigated - because the vulnerability is no longer present in the submitted scan report. - """ - # First check if close old findings is desired - if not self.close_old_findings_toggle: - return [] + Build queryset of findings that would be closed, without closing them. - logger.debug("IMPORT_SCAN: Closing findings no longer present in scan report") - # Remove all the findings that are coming from the report already mitigated - new_hash_codes = [] - new_unique_ids_from_tool = [] - for finding in findings.values(): - # Do not process closed findings in the report - if finding.get("is_mitigated", False): - continue - # Grab the hash code - if (hash_code := finding.get("hash_code")) is not None: - new_hash_codes.append(hash_code) - if (unique_id_from_tool := finding.get("unique_id_from_tool")) is not None: - new_unique_ids_from_tool.append(unique_id_from_tool) + Reusable by preview engines to count findings that would be closed. + """ # Get the initial filtered list of old findings to be closed without # considering the scope of the product or engagement # Include both active findings and risk-accepted findings (which have active=False) @@ -373,6 +350,38 @@ def close_old_findings( old_findings = old_findings.filter(service=self.service) else: old_findings = old_findings.filter(Q(service__isnull=True) | Q(service__exact="")) + return old_findings + + def close_old_findings( + self, + findings: list[Finding], + **kwargs: dict, + ) -> list[Finding]: + """ + Closes old findings based on a hash code match at either the product + or the engagement scope. Closing an old finding entails setting the + finding to mitigated status, setting all location statuses to mitigated, + as well as leaving a not on the finding indicating that it was mitigated + because the vulnerability is no longer present in the submitted scan report. + """ + # First check if close old findings is desired + if not self.close_old_findings_toggle: + return [] + + logger.debug("IMPORT_SCAN: Closing findings no longer present in scan report") + # Remove all the findings that are coming from the report already mitigated + new_hash_codes = [] + new_unique_ids_from_tool = [] + for finding in findings.values(): + # Do not process closed findings in the report + if finding.get("is_mitigated", False): + continue + # Grab the hash code + if (hash_code := finding.get("hash_code")) is not None: + new_hash_codes.append(hash_code) + if (unique_id_from_tool := finding.get("unique_id_from_tool")) is not None: + new_unique_ids_from_tool.append(unique_id_from_tool) + old_findings = self.get_close_old_findings_queryset(new_hash_codes, new_unique_ids_from_tool) # Update the status of the findings and any locations for old_finding in old_findings: url = str(get_full_url(reverse("view_test", args=(self.test.id,)))) diff --git a/dojo/tools/anchore_grype/parser.py b/dojo/tools/anchore_grype/parser.py index a2c7850fb25..9c823632a06 100644 --- a/dojo/tools/anchore_grype/parser.py +++ b/dojo/tools/anchore_grype/parser.py @@ -76,7 +76,8 @@ def get_findings(self, file, test): rel_epss = related_vulnerability.get("epss") rel_vuln_id = related_vulnerability.get("id") vulnerability_ids = self.get_vulnerability_ids( - vuln_id, related_vulnerabilities, + vuln_id, + related_vulnerabilities, ) matches = item["matchDetails"] @@ -87,11 +88,7 @@ def get_findings(self, file, test): artifact_purl = artifact.get("purl") artifact_location = artifact.get("locations") file_path = None - if ( - artifact_location - and len(artifact_location) > 0 - and artifact_location[0].get("path") - ): + if artifact_location and len(artifact_location) > 0 and artifact_location[0].get("path"): file_path = artifact_location[0].get("path") finding_title = f"{vuln_id} in {artifact_name}:{artifact_version}" @@ -99,25 +96,17 @@ def get_findings(self, file, test): finding_tags = None finding_description = "" if vuln_namespace: - finding_description += ( - f"**Vulnerability Namespace:** {vuln_namespace}" - ) + finding_description += f"**Vulnerability Namespace:** {vuln_namespace}" if vuln_description: - finding_description += ( - f"\n**Vulnerability Description:** {vuln_description}" - ) + finding_description += f"\n**Vulnerability Description:** {vuln_description}" if rel_description and rel_description != vuln_description: finding_description += f"\n**Related Vulnerability Description:** {rel_description}" if matches: if isinstance(item["matchDetails"], dict): - finding_description += ( - f"\n**Matcher:** {matches['matcher']}" - ) + finding_description += f"\n**Matcher:** {matches['matcher']}" finding_tags = [matches["matcher"].replace("-matcher", "")] elif len(matches) == 1: - finding_description += ( - f"\n**Matcher:** {matches[0]['matcher']}" - ) + finding_description += f"\n**Matcher:** {matches[0]['matcher']}" finding_tags = [ matches[0]["matcher"].replace("-matcher", ""), ] @@ -148,30 +137,22 @@ def get_findings(self, file, test): finding_references = "" if vuln_datasource: - finding_references += ( - f"**Vulnerability Datasource:** {vuln_datasource}\n" - ) + finding_references += f"**Vulnerability Datasource:** {vuln_datasource}\n" if vuln_urls: if len(vuln_urls) == 1: if vuln_urls[0] != vuln_datasource: - finding_references += ( - f"**Vulnerability URL:** {vuln_urls[0]}\n" - ) + finding_references += f"**Vulnerability URL:** {vuln_urls[0]}\n" else: finding_references += "**Vulnerability URLs:**\n" for url in vuln_urls: if url != vuln_datasource: finding_references += f"- {url}\n" if rel_datasource: - finding_references += ( - f"**Related Vulnerability Datasource:** {rel_datasource}\n" - ) + finding_references += f"**Related Vulnerability Datasource:** {rel_datasource}\n" if rel_urls: if len(rel_urls) == 1: if rel_urls[0] != vuln_datasource: - finding_references += ( - f"**Related Vulnerability URL:** {rel_urls[0]}\n" - ) + finding_references += f"**Related Vulnerability URL:** {rel_urls[0]}\n" else: finding_references += "**Related Vulnerability URLs:**\n" for url in rel_urls: @@ -215,7 +196,6 @@ def get_findings(self, file, test): component_name=artifact_name, component_version=artifact_version.replace("\x00", ""), vuln_id_from_tool=vuln_id, - tags=finding_tags, static_finding=True, dynamic_finding=False, nb_occurences=1, @@ -223,6 +203,7 @@ def get_findings(self, file, test): fix_available=fix_available, fix_version=fix_version, ) + dupes[dupe_key].unsaved_tags = finding_tags if self.mode == "detailed": dupes[dupe_key].unique_id_from_tool = dupe_key dupes[dupe_key].unsaved_vulnerability_ids = vulnerability_ids @@ -244,7 +225,8 @@ def get_cvss(self, cvss): vector = cvss_item["vector"] cvss_objects = cvss_parser.parse_cvss_from_text(vector) if len(cvss_objects) > 0 and isinstance( - cvss_objects[0], CVSS3, + cvss_objects[0], + CVSS3, ): return vector return None @@ -274,8 +256,11 @@ def get_vulnerability_ids(self, vuln_id, related_vulnerabilities): if vuln_id: vulnerability_ids.append(vuln_id) if related_vulnerabilities: - vulnerability_ids.extend(related_vulnerability_id for related_vulnerability in related_vulnerabilities - if (related_vulnerability_id := related_vulnerability.get("id"))) + vulnerability_ids.extend( + related_vulnerability_id + for related_vulnerability in related_vulnerabilities + if (related_vulnerability_id := related_vulnerability.get("id")) + ) if vulnerability_ids: return vulnerability_ids return None diff --git a/dojo/tools/cargo_audit/parser.py b/dojo/tools/cargo_audit/parser.py index 49379081793..cd84b8100dc 100644 --- a/dojo/tools/cargo_audit/parser.py +++ b/dojo/tools/cargo_audit/parser.py @@ -80,24 +80,13 @@ def get_findings(self, filename, test): vuln_id = advisory.get("id") vulnerability_ids = [advisory.get("id")] categories = f"**Categories:** {', '.join(advisory['categories'])}" if "categories" in advisory else "" - description = ( - categories - + f"\n**Description:** `{advisory.get('description')}`" - ) + description = categories + f"\n**Description:** `{advisory.get('description')}`" - if ( - item["affected"] is not None - and "functions" in item["affected"] - ): + if item["affected"] is not None and "functions" in item["affected"]: affected_func = [ - f'{func}: {", ".join(versions)}' - for func, versions in item["affected"][ - "functions" - ].items() + f"{func}: {', '.join(versions)}" for func, versions in item["affected"]["functions"].items() ] - description += ( - f"\n**Affected functions**: {', '.join(affected_func)}" - ) + description += f"\n**Affected functions**: {', '.join(affected_func)}" references = f"{advisory.get('url')}\n" + "\n".join( advisory["references"], @@ -130,7 +119,6 @@ def get_findings(self, filename, test): title=title, test=test, severity=severity, - tags=tags, description=description, component_name=package_name, component_version=package_version, @@ -140,6 +128,7 @@ def get_findings(self, filename, test): references=references, mitigation=mitigation, ) + finding.unsaved_tags = tags finding.unsaved_vulnerability_ids = vulnerability_ids if settings.V3_FEATURE_LOCATIONS and package_name: finding.unsaved_locations.append( diff --git a/dojo/tools/dependency_check/parser.py b/dojo/tools/dependency_check/parser.py index 2d45f998161..4c472e2f4c4 100644 --- a/dojo/tools/dependency_check/parser.py +++ b/dojo/tools/dependency_check/parser.py @@ -87,7 +87,10 @@ def add_finding(self, finding, dupes): dupes[key] = finding def get_filename_and_path_from_dependency( - self, dependency, related_dependency, namespace, + self, + dependency, + related_dependency, + namespace, ): if related_dependency is None: return dependency.findtext( @@ -104,7 +107,10 @@ def get_filename_and_path_from_dependency( return None, None def get_component_name_and_version_from_dependency( - self, dependency, related_dependency, namespace, + self, + dependency, + related_dependency, + namespace, ): identifiers_node = dependency.find(namespace + "identifiers") if identifiers_node is not None: @@ -116,20 +122,13 @@ def get_component_name_and_version_from_dependency( purl_parts = purl.to_dict() component_name = ( purl_parts["namespace"] + ":" - if purl_parts["namespace"] - and len(purl_parts["namespace"]) > 0 - else "" - ) - component_name += ( - purl_parts["name"] - if purl_parts["name"] and len(purl_parts["name"]) > 0 + if purl_parts["namespace"] and len(purl_parts["namespace"]) > 0 else "" ) + component_name += purl_parts["name"] if purl_parts["name"] and len(purl_parts["name"]) > 0 else "" component_name = component_name or None component_version = ( - purl_parts["version"] - if purl_parts["version"] and len(purl_parts["version"]) > 0 - else "" + purl_parts["version"] if purl_parts["version"] and len(purl_parts["version"]) > 0 else "" ) return component_name, component_version, pck_id @@ -149,20 +148,10 @@ def get_component_name_and_version_from_dependency( if cpe_node: cpe_id = cpe_node.findtext(f"{namespace}name") cpe = CPE(cpe_id) - component_name = ( - cpe.get_vendor()[0] + ":" - if len(cpe.get_vendor()) > 0 - else "" - ) - component_name += ( - cpe.get_product()[0] if len(cpe.get_product()) > 0 else "" - ) + component_name = cpe.get_vendor()[0] + ":" if len(cpe.get_vendor()) > 0 else "" + component_name += cpe.get_product()[0] if len(cpe.get_product()) > 0 else "" component_name = component_name or None - component_version = ( - cpe.get_version()[0] - if len(cpe.get_version()) > 0 - else None - ) + component_version = cpe.get_version()[0] if len(cpe.get_version()) > 0 else None return component_name, component_version, None maven_node = identifiers_node.find( @@ -251,7 +240,8 @@ def get_severity_and_cvss_meta(self, vulnerability, namespace) -> dict: if severity: if severity.strip().lower() not in self.SEVERITY_MAPPING: logger.warning( - "Warning: Unknow severity value detected '%s'. Bypass to 'Medium' value", severity, + "Warning: Unknow severity value detected '%s'. Bypass to 'Medium' value", + severity, ) severity = "Medium" else: @@ -266,13 +256,20 @@ def get_severity_and_cvss_meta(self, vulnerability, namespace) -> dict: } def get_finding_from_vulnerability( - self, dependency, related_dependency, vulnerability, test, namespace, + self, + dependency, + related_dependency, + vulnerability, + test, + namespace, ): ( dependency_filename, dependency_filepath, ) = self.get_filename_and_path_from_dependency( - dependency, related_dependency, namespace, + dependency, + related_dependency, + namespace, ) # logger.debug('dependency_filename: %s', dependency_filename) @@ -318,13 +315,17 @@ def get_finding_from_vulnerability( component_version, component_purl, ) = self.get_component_name_and_version_from_dependency( - dependency, related_dependency, namespace, + dependency, + related_dependency, + namespace, ) stripped_name = name # startswith CVE-XXX-YYY stripped_name = re.sub( - r"^CVE-\d{4}-\d{4,7}", "", stripped_name, + r"^CVE-\d{4}-\d{4,7}", + "", + stripped_name, ).strip() # startswith CWE-XXX: stripped_name = re.sub(r"^CWE-\d+\:", "", stripped_name).strip() @@ -333,7 +334,8 @@ def get_finding_from_vulnerability( if component_name is None: logger.warning( - "component_name was None for File: %s, using dependency file name instead.", dependency_filename, + "component_name was None for File: %s, using dependency file name instead.", + dependency_filename, ) component_name = dependency_filename @@ -352,15 +354,9 @@ def get_finding_from_vulnerability( ref_url = reference_node.findtext(f"{namespace}url") ref_name = reference_node.findtext(f"{namespace}name") if ref_url == ref_name: - reference_detail += ( - f"**Source:** {ref_source}\n**URL:** {ref_url}\n\n" - ) + reference_detail += f"**Source:** {ref_source}\n**URL:** {ref_url}\n\n" else: - reference_detail += ( - f"**Source:** {ref_source}\n" - f"**URL:** {ref_url}\n" - f"**Name:** {ref_name}\n\n" - ) + reference_detail += f"**Source:** {ref_source}\n**URL:** {ref_url}\n**Name:** {ref_name}\n\n" if related_dependency is not None: tags.append("related") @@ -370,14 +366,18 @@ def get_finding_from_vulnerability( notes = "Document on why we are suppressing this vulnerability is missing!" tags.append("no_suppression_document") mitigation = f"**This vulnerability is mitigated and/or suppressed:** {notes}\n" - mitigation += f"Update {component_name}:{component_version} to at least the version recommended in the description" + mitigation += ( + f"Update {component_name}:{component_version} to at least the version recommended in the description" + ) mitigated = datetime.datetime.now(datetime.UTC) is_Mitigated = True active = False tags.append("suppressed") else: - mitigation = f"Update {component_name}:{component_version} to at least the version recommended in the description" + mitigation = ( + f"Update {component_name}:{component_version} to at least the version recommended in the description" + ) description += "\n**Filepath:** " + str(dependency_filepath) active = True @@ -390,7 +390,6 @@ def get_finding_from_vulnerability( mitigation=mitigation, mitigated=mitigated, is_mitigated=is_Mitigated, - tags=tags, active=active, dynamic_finding=False, static_finding=True, @@ -400,6 +399,8 @@ def get_finding_from_vulnerability( **self.get_severity_and_cvss_meta(vulnerability, namespace), ) + finding.unsaved_tags = tags + if settings.V3_FEATURE_LOCATIONS and component_purl: finding.unsaved_locations.append( LocationData.dependency(purl=component_purl, file_path=dependency_filename), @@ -466,19 +467,15 @@ def get_findings(self, filename, test): namespace + "relatedDependencies", ) if relatedDependencies is not None: - for ( - relatedDependency - ) in relatedDependencies.findall( + for relatedDependency in relatedDependencies.findall( namespace + "relatedDependency", ): - finding = ( - self.get_finding_from_vulnerability( - dependency, - relatedDependency, - vulnerability, - test, - namespace, - ) + finding = self.get_finding_from_vulnerability( + dependency, + relatedDependency, + vulnerability, + test, + namespace, ) if finding: # could be None if scan_date: @@ -502,7 +499,9 @@ def get_findings(self, filename, test): elif settings.V3_FEATURE_LOCATIONS: # Collect product-level dependency locations _, _, component_purl = self.get_component_name_and_version_from_dependency( - dependency, None, namespace, + dependency, + None, + namespace, ) if component_purl: test.unsaved_metadata.append( diff --git a/dojo/tools/jfrog_xray_unified/parser.py b/dojo/tools/jfrog_xray_unified/parser.py index a15d94c8dac..9235d0e20b1 100644 --- a/dojo/tools/jfrog_xray_unified/parser.py +++ b/dojo/tools/jfrog_xray_unified/parser.py @@ -49,9 +49,7 @@ def get_item(vulnerability, test): # not all cves have cvssv3 scores, so skip these. If no v3 scores, # we'll default to index 0 if "cvss_v3_score" in vulnerability["cves"][thisCveIndex]: - thisCvssV3Score = vulnerability["cves"][thisCveIndex][ - "cvss_v3_score" - ] + thisCvssV3Score = vulnerability["cves"][thisCveIndex]["cvss_v3_score"] if thisCvssV3Score > highestCvssV3Score: highestCvssV3Index = thisCveIndex highestCvssV3Score = thisCvssV3Score @@ -84,23 +82,13 @@ def get_item(vulnerability, test): cvss_v2 = worstCve["cvss_v2_vector"] fix_available = False - if ( - "fixed_versions" in vulnerability - and len(vulnerability["fixed_versions"]) > 0 - ): + if "fixed_versions" in vulnerability and len(vulnerability["fixed_versions"]) > 0: mitigation = "Versions containing a fix:\n" mitigation += "\n".join(vulnerability["fixed_versions"]) fix_available = True - if ( - "external_advisory_source" in vulnerability - and "external_advisory_severity" in vulnerability - ): - extra_desc = ( - vulnerability["external_advisory_source"] - + ": " - + vulnerability["external_advisory_severity"] - ) + if "external_advisory_source" in vulnerability and "external_advisory_severity" in vulnerability: + extra_desc = vulnerability["external_advisory_source"] + ": " + vulnerability["external_advisory_severity"] if vulnerability["issue_id"]: title = vulnerability["issue_id"] + " - " + vulnerability["summary"] @@ -108,10 +96,15 @@ def get_item(vulnerability, test): title = vulnerability["summary"] references_str = vulnerability.get("references") - references = "\n".join(references_str) if isinstance(references_str, list) else (references_str if isinstance(references_str, str) else "") + references = ( + "\n".join(references_str) + if isinstance(references_str, list) + else (references_str if isinstance(references_str, str) else "") + ) scan_time = datetime.strptime( - vulnerability["artifact_scan_time"], "%Y-%m-%dT%H:%M:%S%z", + vulnerability["artifact_scan_time"], + "%Y-%m-%dT%H:%M:%S%z", ) # component has several parts separated by colons. Last part is the @@ -132,9 +125,7 @@ def get_item(vulnerability, test): title=title, test=test, severity=severity, - description=( - vulnerability.get("description", vulnerability.get("summary")) + "\n\n" + extra_desc - ).strip(), + description=(vulnerability.get("description", vulnerability.get("summary")) + "\n\n" + extra_desc).strip(), mitigation=mitigation, component_name=component_name, component_version=component_version, @@ -146,10 +137,11 @@ def get_item(vulnerability, test): impact=severity, date=scan_time, unique_id_from_tool=vulnerability["issue_id"], - tags=tags, fix_available=fix_available, ) + finding.unsaved_tags = tags + cvss_data = parse_cvss_data(cvssv3) if cvss_data: finding.cvssv3 = cvss_data.get("cvssv3") @@ -161,7 +153,9 @@ def get_item(vulnerability, test): if settings.V3_FEATURE_LOCATIONS and package_type and component_name: purl_type = package_type.lower() finding.unsaved_locations.append( - LocationData.dependency(purl_type=purl_type, name=component_name, version=component_version, file_path=vulnerability["path"]), + LocationData.dependency( + purl_type=purl_type, name=component_name, version=component_version, file_path=vulnerability["path"], + ), ) return finding diff --git a/dojo/tools/threat_composer/parser.py b/dojo/tools/threat_composer/parser.py index 266d63fd662..e7e95b6ca77 100644 --- a/dojo/tools/threat_composer/parser.py +++ b/dojo/tools/threat_composer/parser.py @@ -66,7 +66,6 @@ def get_findings(self, file, test): mitigation_links[linked_id].append(mitigations[mitigation_id]) for threat in data["threats"]: - if "threatAction" in threat: title = threat["threatAction"] severity, impact, comments = self.parse_threat_metadata(threat.get("metadata", [])) @@ -84,11 +83,12 @@ def get_findings(self, file, test): unique_id_from_tool=unique_id_from_tool, mitigation=mitigation, impact=impact, - tags=tags, static_finding=True, dynamic_finding=False, ) + finding.unsaved_tags = tags + match threat.get("status", "threatIdentified"): case "threatResolved": finding.active = False diff --git a/unittests/tools/test_anchore_grype_parser.py b/unittests/tools/test_anchore_grype_parser.py index c06225d3980..86a99847eae 100644 --- a/unittests/tools/test_anchore_grype_parser.py +++ b/unittests/tools/test_anchore_grype_parser.py @@ -132,7 +132,7 @@ def test_check_all_fields(self): self.assertEqual("libgssapi-krb5-2", finding.component_name) self.assertEqual("1.17-3+deb10u3", finding.component_version) self.assertEqual("CVE-2004-0971", finding.vuln_id_from_tool) - self.assertEqual(["dpkg"], finding.tags) + self.assertEqual(["dpkg"], finding.unsaved_tags) self.assertEqual(1, finding.nb_occurences) finding = findings[1] @@ -167,7 +167,7 @@ def test_check_all_fields(self): self.assertEqual("redis", finding.component_name) self.assertEqual("4.0.2", finding.component_version) self.assertEqual("CVE-2021-32626", finding.vuln_id_from_tool) - self.assertEqual(["python", "python2"], finding.tags) + self.assertEqual(["python", "python2"], finding.unsaved_tags) self.assertEqual(1, finding.nb_occurences) finding = findings[2] @@ -197,7 +197,7 @@ def test_check_all_fields(self): self.assertEqual("libc-bin", finding.component_name) self.assertEqual("2.28-10", finding.component_version) self.assertEqual("CVE-2021-33574", finding.vuln_id_from_tool) - self.assertEqual(["dpkg"], finding.tags) + self.assertEqual(["dpkg"], finding.unsaved_tags) self.assertEqual(1, finding.nb_occurences) finding = findings[3] @@ -227,7 +227,7 @@ def test_check_all_fields(self): self.assertEqual("libc6", finding.component_name) self.assertEqual("2.28-10", finding.component_version) self.assertEqual("CVE-2021-33574", finding.vuln_id_from_tool) - self.assertEqual(["dpkg"], finding.tags) + self.assertEqual(["dpkg"], finding.unsaved_tags) self.assertEqual(1, finding.nb_occurences) finding = findings[4] @@ -257,7 +257,7 @@ def test_check_all_fields(self): self.assertEqual("Django", finding.component_name) self.assertEqual("3.2.9", finding.component_version) self.assertEqual("GHSA-v6rh-hp5x-86rv", finding.vuln_id_from_tool) - self.assertEqual(["python"], finding.tags) + self.assertEqual(["python"], finding.unsaved_tags) self.assertEqual(2, finding.nb_occurences) def test_grype_issue_9618(self): diff --git a/unittests/tools/test_cargo_audit_parser.py b/unittests/tools/test_cargo_audit_parser.py index e68b73e1f46..4ee54c3531d 100644 --- a/unittests/tools/test_cargo_audit_parser.py +++ b/unittests/tools/test_cargo_audit_parser.py @@ -22,7 +22,7 @@ def test_parse_many_findings(self): self.assertEqual("[arc-swap 0.4.7] Dangling reference in `access::Map` with Constant", finding.title) self.assertEqual("High", finding.severity) self.assertIsNotNone(finding.description) - self.assertEqual(["dangling reference"], finding.tags) + self.assertEqual(["dangling reference"], finding.unsaved_tags) self.assertEqual("arc-swap", finding.component_name) self.assertEqual("0.4.7", finding.component_version) self.assertEqual("RUSTSEC-2020-0091", finding.vuln_id_from_tool) @@ -37,7 +37,7 @@ def test_parse_many_findings(self): self.assertEqual("[hyper 0.13.9] Multiple Transfer-Encoding headers misinterprets request payload", finding.title) self.assertEqual("High", finding.severity) self.assertIsNotNone(finding.description) - self.assertEqual(["http", "request-smuggling"], finding.tags) + self.assertEqual(["http", "request-smuggling"], finding.unsaved_tags) self.assertEqual("hyper", finding.component_name) self.assertEqual("0.13.9", finding.component_version) self.assertEqual("RUSTSEC-2021-0020", finding.vuln_id_from_tool) @@ -52,7 +52,7 @@ def test_parse_many_findings(self): self.assertEqual("[smallvec 0.6.13] Buffer overflow in SmallVec::insert_many", finding.title) self.assertEqual("High", finding.severity) self.assertIsNotNone(finding.description) - self.assertEqual(["buffer-overflow", "heap-overflow", "unsound"], finding.tags) + self.assertEqual(["buffer-overflow", "heap-overflow", "unsound"], finding.unsaved_tags) self.assertEqual("smallvec", finding.component_name) self.assertEqual("0.6.13", finding.component_version) self.assertEqual("RUSTSEC-2021-0003", finding.vuln_id_from_tool) @@ -67,7 +67,7 @@ def test_parse_many_findings(self): self.assertEqual("[smallvec 1.5.0] Buffer overflow in SmallVec::insert_many", finding.title) self.assertEqual("High", finding.severity) self.assertIsNotNone(finding.description) - self.assertEqual(["buffer-overflow", "heap-overflow", "unsound"], finding.tags) + self.assertEqual(["buffer-overflow", "heap-overflow", "unsound"], finding.unsaved_tags) self.assertEqual("smallvec", finding.component_name) self.assertEqual("1.5.0", finding.component_version) self.assertEqual("RUSTSEC-2021-0003", finding.vuln_id_from_tool) diff --git a/unittests/tools/test_dependency_check_parser.py b/unittests/tools/test_dependency_check_parser.py index 7b23e5aafa5..31e1394ec51 100644 --- a/unittests/tools/test_dependency_check_parser.py +++ b/unittests/tools/test_dependency_check_parser.py @@ -108,7 +108,7 @@ def test_parse_file_with_multiple_vulnerabilities_has_multiple_findings(self): items[1].mitigation, "Update org.dom4j:dom4j:2.1.1.redhat-00001 to at least the version recommended in the description", ) - self.assertEqual(items[1].tags, "related") + self.assertEqual(items[1].unsaved_tags, ["related"]) self.assertEqual(1, len(items[1].unsaved_vulnerability_ids)) self.assertEqual("CVE-0000-0001", items[1].unsaved_vulnerability_ids[0]) @@ -258,7 +258,7 @@ def test_parse_file_with_multiple_vulnerabilities_has_multiple_findings(self): items[9].mitigation, "**This vulnerability is mitigated and/or suppressed:** Document on why we are suppressing this vulnerability is missing!\nUpdate jquery:3.1.1 to at least the version recommended in the description", ) - self.assertEqual(items[9].tags, ["suppressed", "no_suppression_document"]) + self.assertEqual(items[9].unsaved_tags, ["no_suppression_document", "suppressed"]) self.assertEqual(items[9].severity, "Critical") self.assertEqual(items[9].cvssv3, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") self.assertEqual(items[9].cvssv3_score, 9.8) @@ -270,7 +270,7 @@ def test_parse_file_with_multiple_vulnerabilities_has_multiple_findings(self): items[10].mitigation, "**This vulnerability is mitigated and/or suppressed:** This is our reason for not to upgrade it.\nUpdate jquery:3.1.1 to at least the version recommended in the description", ) - self.assertEqual(items[10].tags, "suppressed") + self.assertEqual(items[10].unsaved_tags, ["suppressed"]) self.assertEqual(items[10].severity, "Critical") self.assertEqual(items[10].cvssv3, "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H") self.assertEqual(items[10].cvssv3_score, 9.8) diff --git a/unittests/tools/test_jfrog_xray_unified_parser.py b/unittests/tools/test_jfrog_xray_unified_parser.py index 92bc30c75ff..239161912cd 100644 --- a/unittests/tools/test_jfrog_xray_unified_parser.py +++ b/unittests/tools/test_jfrog_xray_unified_parser.py @@ -33,7 +33,7 @@ def test_parse_file_with_one_vuln(self): self.assertIsNotNone(item.mitigation) self.assertGreater(len(item.mitigation), 0) self.assertEqual("Jinja2", item.component_name) - self.assertEqual('"packagetype_pypi"', item.tags) + self.assertEqual(["packagetype_pypi"], item.unsaved_tags) self.assertEqual("2.11.2", item.component_version) self.assertEqual("pypi-remote/30/9e/f663a2aa66a09d838042ae1a2c5659828bb9b41ea3a6efa20a20fd92b121/Jinja2-2.11.2-py2.py3-none-any.whl", item.file_path) self.assertIsNotNone(item.severity_justification) @@ -186,7 +186,7 @@ def test_parse_file_with_very_many_vulns(self): self.assertEqual(" is too late.", item.description[-13:]) self.assertIsNone(item.mitigation) self.assertEqual("3.12:sqlite-libs", item.component_name) - self.assertEqual('"packagetype_alpine"', item.tags) + self.assertEqual(["packagetype_alpine"], item.unsaved_tags) self.assertEqual("3.32.1-r0", item.component_version) self.assertEqual("dockerhub-remote/kiwigrid/k8s-sidecar/sha256__7cba93c3dde21c78fe07ee3f8ed8d82d05bf00415392606401df8a7d72057b5b/", item.file_path) self.assertIsNotNone(item.severity_justification) @@ -209,7 +209,7 @@ def test_parse_file_with_very_many_vulns(self): self.assertEqual("(Affected 1.0.2-1.0.2w).", item.description[-24:]) self.assertIsNone(item.mitigation) self.assertEqual("ubuntu:bionic:libssl1.1", item.component_name) - self.assertEqual('"packagetype_debian"', item.tags) + self.assertEqual(["packagetype_debian"], item.unsaved_tags) self.assertEqual("1.1.1-1ubuntu2.1~18.04.6", item.component_version) self.assertEqual("dockerhub-remote/library/mongo/sha256__31f6433f7cfcd2180483e40728cbf97142df1e85de36d80d75c93e5e7fe10405/", item.file_path) self.assertIsNotNone(item.severity_justification) @@ -233,7 +233,7 @@ def test_parse_file_with_very_many_vulns(self): self.assertIsNotNone(item.mitigation) self.assertGreater(len(item.mitigation), 0) self.assertEqual("github.com/docker/docker", item.component_name) - self.assertEqual('"packagetype_go"', item.tags) + self.assertEqual(["packagetype_go"], item.unsaved_tags) self.assertEqual("1.4.2-0.20200203170920-46ec8731fbce", item.component_version) self.assertEqual("dockerhub-remote/fluxcd/helm-controller/sha256__27790f965d8965884e8dfc12cba0d1f609794a1abc69bc81a658bd76e463ffce/", item.file_path) self.assertIsNotNone(item.severity_justification) @@ -255,7 +255,7 @@ def test_parse_file_with_very_many_vulns(self): self.assertEqual("sensitive information.", item.description[-22:]) self.assertIsNone(item.mitigation) self.assertEqual("com.fasterxml.jackson.core:jackson-databind", item.component_name) - self.assertEqual('"packagetype_maven"', item.tags) + self.assertEqual(["packagetype_maven"], item.unsaved_tags) self.assertEqual("2.10.4", item.component_version) self.assertEqual("elastic-docker-remote/elasticsearch/elasticsearch/7.9.1-amd64/", item.file_path) self.assertIsNotNone(item.severity_justification) @@ -279,7 +279,7 @@ def test_parse_file_with_very_many_vulns(self): self.assertIsNotNone(item.mitigation) self.assertGreater(len(item.mitigation), 0) self.assertEqual("jquery", item.component_name) - self.assertEqual('"packagetype_npm"', item.tags) + self.assertEqual(["packagetype_npm"], item.unsaved_tags) self.assertEqual("3.4.1", item.component_version) self.assertEqual("pypi-remote/cc/94/5f7079a0e00bd6863ef8f1da638721e9da21e5bacee597595b318f71d62e/Werkzeug-1.0.1-py2.py3-none-any.whl", item.file_path) self.assertIsNotNone(item.severity_justification) @@ -303,7 +303,7 @@ def test_parse_file_with_very_many_vulns(self): self.assertIsNotNone(item.mitigation) self.assertGreater(len(item.mitigation), 0) self.assertEqual("pip", item.component_name) - self.assertEqual('"packagetype_pypi"', item.tags) + self.assertEqual(["packagetype_pypi"], item.unsaved_tags) self.assertEqual("20.2.3", item.component_version) self.assertEqual("dockerhub-remote/kiwigrid/k8s-sidecar/sha256__4b5a25c8dbac9637f8e680566959fdccd1a98d74ce2f2746f9b0f9ff6b57d03b/", item.file_path) self.assertIsNotNone(item.severity_justification) @@ -326,7 +326,7 @@ def test_parse_file_with_very_many_vulns(self): self.assertEqual("TABLE statements.\n\nRed Hat Severity: Moderate", item.description[-45:]) self.assertIsNone(item.mitigation) self.assertEqual("7:sqlite:0", item.component_name) - self.assertIn("packagetype_rpm", item.tags) + self.assertIn("packagetype_rpm", item.unsaved_tags) self.assertEqual("3.7.17-8.el7_7.1", item.component_version) self.assertEqual("elastic-docker-remote/elasticsearch/elasticsearch/7.9.1-amd64/", item.file_path) self.assertIsNotNone(item.severity_justification)