|
7 | 7 | # See https://aboutcode.org for more information about nexB OSS projects. |
8 | 8 | # |
9 | 9 |
|
| 10 | +# temporarily import json to create output to analyze |
| 11 | +import json |
10 | 12 | import os |
11 | | -from unittest import TestCase |
12 | 13 |
|
13 | | -from packageurl import PackageURL |
14 | | -from univers.version_range import VersionRange |
| 14 | +import pytest |
15 | 15 |
|
16 | | -from vulnerabilities.importer import AdvisoryData |
17 | | -from vulnerabilities.importer import Reference |
18 | 16 | from vulnerabilities.importers.apache_kafka import ApacheKafkaImporter |
19 | | -from vulnerabilities.importers.apache_kafka import to_version_ranges |
20 | | -from vulnerabilities.package_managers import GitHubTagsAPI |
21 | | -from vulnerabilities.package_managers import Version |
22 | | -from vulnerabilities.utils import AffectedPackage |
| 17 | + |
| 18 | +# from vulnerabilities.package_managers import GitHubTagsAPI |
| 19 | +from vulnerabilities.tests import util_tests |
23 | 20 |
|
24 | 21 | BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
25 | | -TEST_DATA = os.path.join(BASE_DIR, "test_data", "apache_kafka", "cve-list.html") |
26 | | - |
27 | | - |
28 | | -class TestApacheKafkaImporter(TestCase): |
29 | | - def test_to_version_ranges(self): |
30 | | - # Check single version |
31 | | - assert [ |
32 | | - VersionRange.from_scheme_version_spec_string("maven", "=3.2.2") |
33 | | - ] == to_version_ranges("3.2.2") |
34 | | - |
35 | | - # Check range with lower and upper bounds |
36 | | - assert [ |
37 | | - VersionRange.from_scheme_version_spec_string("maven", ">=3.2.2, <=3.2.3") |
38 | | - ] == to_version_ranges("3.2.2 to 3.2.3") |
39 | | - |
40 | | - # Check range with "and later" |
41 | | - assert [ |
42 | | - VersionRange.from_scheme_version_spec_string("maven", ">=3.2.2") |
43 | | - ] == to_version_ranges("3.2.2 and later") |
44 | | - |
45 | | - # Check combination of above cases |
46 | | - assert [ |
47 | | - VersionRange.from_scheme_version_spec_string("maven", ">=3.2.2"), |
48 | | - VersionRange.from_scheme_version_spec_string("maven", ">=3.2.2, <=3.2.3"), |
49 | | - VersionRange.from_scheme_version_spec_string("maven", "==3.2.2"), |
50 | | - ] == to_version_ranges("3.2.2 and later, 3.2.2 to 3.2.3, 3.2.2") |
51 | | - |
52 | | - def test_to_advisory(self): |
53 | | - data_source = ApacheKafkaImporter(batch_size=1) |
54 | | - data_source.version_api = GitHubTagsAPI( |
55 | | - cache={"apache/kafka": [Version("2.1.2"), Version("0.10.2.2")]} |
56 | | - ) |
57 | | - expected_advisories = [ |
58 | | - Advisory( |
59 | | - summary="In Apache Kafka versions between 0.11.0.0 and 2.1.0, it is possible to manually\n craft a Produce request which bypasses transaction/idempotent ACL validation.\n Only authenticated clients with Write permission on the respective topics are\n able to exploit this vulnerability. Users should upgrade to 2.1.1 or later\n where this vulnerability has been fixed.", |
60 | | - vulnerability_id="CVE-2018-17196", |
61 | | - affected_packages=[ |
62 | | - AffectedPackage( |
63 | | - vulnerable_package=PackageURL( |
64 | | - type="apache", |
65 | | - namespace=None, |
66 | | - name="kafka", |
67 | | - version="0.10.2.2", |
68 | | - qualifiers={}, |
69 | | - subpath=None, |
70 | | - ), |
71 | | - patched_package=PackageURL( |
72 | | - type="apache", |
73 | | - namespace=None, |
74 | | - name="kafka", |
75 | | - version="2.1.2", |
76 | | - qualifiers={}, |
77 | | - subpath=None, |
78 | | - ), |
79 | | - ) |
80 | | - ], |
81 | | - references=[ |
82 | | - Reference( |
83 | | - reference_id="", url="https://kafka.apache.org/cve-list", severities=[] |
84 | | - ), |
85 | | - Reference( |
86 | | - reference_id="CVE-2018-17196", |
87 | | - url="https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2018-17196", |
88 | | - severities=[], |
89 | | - ), |
90 | | - ], |
91 | | - ) |
92 | | - ] |
93 | | - with open(TEST_DATA) as f: |
94 | | - found_advisories = data_source.to_advisory(f) |
95 | | - |
96 | | - found_advisories = list(map(Advisory.normalized, found_advisories)) |
97 | | - expected_advisories = list(map(Advisory.normalized, expected_advisories)) |
98 | | - assert sorted(found_advisories) == sorted(expected_advisories) |
| 22 | +# Created cve-list-2022-12-06.html with a full copy of https://raw.githubusercontent.com/apache/kafka-site/asf-site/cve-list.html |
| 23 | +TEST_DATA = os.path.join( |
| 24 | + BASE_DIR, |
| 25 | + "test_data/apache_kafka", |
| 26 | +) |
| 27 | + |
| 28 | + |
| 29 | +def test_to_advisory(): |
| 30 | + with open(os.path.join(TEST_DATA, "cve-list-2022-12-06.html")) as f: |
| 31 | + raw_data = f.read() |
| 32 | + advisories = ApacheKafkaImporter().to_advisory(raw_data) |
| 33 | + result = [data.to_dict() for data in advisories] |
| 34 | + |
| 35 | + # TODO: We need to finish this test including the REGEN step. 2022-12-12 Monday 14:48:05. Done. |
| 36 | + expected_file = os.path.join(TEST_DATA, f"to-advisory-apache_kafka-expected.json") |
| 37 | + util_tests.check_results_against_json(result, expected_file) |
| 38 | + |
| 39 | + # We generate these 2 files solely to vet the output and adjust the importer code. |
| 40 | + # with open(os.path.join(TEST_DATA, "jmh-test-01.txt"), "w") as f1: |
| 41 | + # for advisory_object in result: |
| 42 | + # f1.write(f"{advisory_object}\n\n") |
| 43 | + # for k, v in advisory_object.items(): |
| 44 | + # f1.write(f"{k}: {v}\n\n") |
| 45 | + # f1.write(f"=================================================\n\n") |
| 46 | + |
| 47 | + # with open(os.path.join(TEST_DATA, "test-advisories.json"), "w", encoding="utf-8") as f: |
| 48 | + # json.dump(result, f, ensure_ascii=False, indent=4) |
| 49 | + |
| 50 | + |
| 51 | +# Check for an unknown CVE value. |
| 52 | +def to_advisory_changed_cve(): |
| 53 | + with open(os.path.join(TEST_DATA, "cve-list-changed-cve.html")) as f: |
| 54 | + raw_data = f.read() |
| 55 | + advisories = ApacheKafkaImporter().to_advisory(raw_data) |
| 56 | + |
| 57 | + |
| 58 | +def test_to_advisory_changed_cve_exception(): |
| 59 | + with pytest.raises(KeyError) as excinfo: |
| 60 | + to_advisory_changed_cve() |
| 61 | + |
| 62 | + assert "CVE-2022-34918" in str(excinfo.value) |
| 63 | + |
| 64 | + |
| 65 | +# Check for an unknown "Versions affected" value. |
| 66 | +def to_advisory_changed_versions_affected(): |
| 67 | + with open(os.path.join(TEST_DATA, "cve-list-changed-versions-affected.html")) as f: |
| 68 | + raw_data = f.read() |
| 69 | + advisories = ApacheKafkaImporter().to_advisory(raw_data) |
| 70 | + |
| 71 | + |
| 72 | +def test_to_advisory_changed_versions_affected_exception(): |
| 73 | + with pytest.raises(KeyError) as excinfo: |
| 74 | + to_advisory_changed_versions_affected() |
| 75 | + |
| 76 | + assert "2.8.0 - 2.8.1, 3.0.0 - 3.0.1, 3.1.0 - 3.1.1, 3.2.0 - 3.2.2" in str(excinfo.value) |
| 77 | + |
| 78 | + |
| 79 | +# Check for an unknown "Fixed versions" value. |
| 80 | +def to_advisory_changed_fixed_versions(): |
| 81 | + with open(os.path.join(TEST_DATA, "cve-list-changed-fixed-versions.html")) as f: |
| 82 | + raw_data = f.read() |
| 83 | + advisories = ApacheKafkaImporter().to_advisory(raw_data) |
| 84 | + |
| 85 | + |
| 86 | +def test_to_advisory_changed_fixed_versions_exception(): |
| 87 | + with pytest.raises(KeyError) as excinfo: |
| 88 | + to_advisory_changed_fixed_versions() |
| 89 | + |
| 90 | + assert "2.8.2, 3.0.2, 3.1.2, 3.2.4" in str(excinfo.value) |
0 commit comments