From 78adbae9e6b6803d30ba837720c9a8cf3da53c36 Mon Sep 17 00:00:00 2001 From: Marian GAPPA Date: Fri, 3 Jan 2025 14:57:42 +0000 Subject: [PATCH 1/6] fix: Update unit test * Fix unit test by using full function/method/object path * Rename unit to test_unit python file * Move data information into a specific python file --- tests/data.py | 44 +++++++++++++++++++++++++++++ tests/test_unit.py | 27 ++++++++++++++++++ tests/unit.py | 69 ---------------------------------------------- 3 files changed, 71 insertions(+), 69 deletions(-) create mode 100644 tests/data.py create mode 100644 tests/test_unit.py delete mode 100644 tests/unit.py diff --git a/tests/data.py b/tests/data.py new file mode 100644 index 0000000..d3c73f4 --- /dev/null +++ b/tests/data.py @@ -0,0 +1,44 @@ + +SETTINGS = { + 'name': 'Example Tenant', + 'identity': 'abc', + 'secret': 'def', + 'url': 'example.prismacloud.io', + 'verify': False, + 'debug': False +} + +META_INFO = { + 'twistlockUrl': 'example.prismacloud.io' +} + +USER_PROFILE = { + 'email': 'example@example.com', + 'firstName': 'Example', + 'lastName': 'User', + 'timeZone': 'America/Los_Angeles', + 'enabled': True, + 'lastModifiedBy': 'template@redlock.io', + 'lastModifiedTs': 1630000000000, + 'lastLoginTs': 1640000000000, + 'displayName': 'Example User', + 'accessKeysAllowed': True, + 'defaultRoleId': '1234-5678', + 'roleIds': ['1234-5678'], + 'roles': [{ + 'id': '1234-5678', + 'name': 'System Admin', + 'type': 'System Admin', + 'onlyAllowCIAccess': False, + 'onlyAllowComputeAccess': False, + 'onlyAllowReadAccess': False + }], + 'activeRole': { + 'id': '1234-5678', + 'name': 'System Admin', + 'type': 'System Admin', + 'onlyAllowCIAccess': False, + 'onlyAllowComputeAccess': False, + 'onlyAllowReadAccess': False + } +} diff --git a/tests/test_unit.py b/tests/test_unit.py new file mode 100644 index 0000000..ce5b1fb --- /dev/null +++ b/tests/test_unit.py @@ -0,0 +1,27 @@ +""" Unit Tests """ + +import unittest + +from unittest import mock + +# pylint: disable=import-error +from prismacloud.api import pc_api +from tests.data import META_INFO, SETTINGS, USER_PROFILE + + +class TestPrismaCloudAPI(unittest.TestCase): + """ Unit Tests with Mocking """ + + # Decorator + @mock.patch('prismacloud.api.cspm.EndpointsPrismaCloudAPIMixin.meta_info') + def test_pc_api_configure(self, meta_info): + meta_info.return_value = META_INFO + pc_api.configure(SETTINGS) + self.assertEqual(pc_api.api_compute, 'example.prismacloud.io') + + # With + def test_pc_api_current_user(self): + with mock.patch('prismacloud.api.PrismaCloudAPI.execute') as pc_api_execute: + pc_api_execute.return_value = USER_PROFILE + result = pc_api.current_user() + self.assertEqual(result['displayName'], 'Example User') diff --git a/tests/unit.py b/tests/unit.py deleted file mode 100644 index 54dc843..0000000 --- a/tests/unit.py +++ /dev/null @@ -1,69 +0,0 @@ -""" Unit Tests """ - -import unittest - -import mock - -# pylint: disable=import-error -from prismacloud.api import pc_api - -class TestPrismaCloudAPI(unittest.TestCase): - """ Unit Tests with Mocking """ - - SETTINGS = { - 'name': 'Example Tenant', - 'identity': 'abc', - 'secret': 'def', - 'url': 'example.prismacloud.io', - 'verify': False, - 'debug': False - } - - USER_PROFILE = { - 'email': 'example@example.com', - 'firstName': 'Example', - 'lastName': 'User', - 'timeZone': 'America/Los_Angeles', - 'enabled': True, - 'lastModifiedBy': 'template@redlock.io', - 'lastModifiedTs': 1630000000000, - 'lastLoginTs': 1640000000000, - 'displayName': 'Example User', - 'accessKeysAllowed': True, - 'defaultRoleId': '1234-5678', - 'roleIds': ['1234-5678'], - 'roles': [{ - 'id': '1234-5678', - 'name': 'System Admin', - 'type': 'System Admin', - 'onlyAllowCIAccess': False, - 'onlyAllowComputeAccess': False, - 'onlyAllowReadAccess': False - }], - 'activeRole': { - 'id': '1234-5678', - 'name': 'System Admin', - 'type': 'System Admin', - 'onlyAllowCIAccess': False, - 'onlyAllowComputeAccess': False, - 'onlyAllowReadAccess': False - } - } - - # Decorator - @mock.patch('prismacloud.api.pc_utility.get_settings') - def test_pc_api_configure(self, get_settings): - get_settings.return_value = self.SETTINGS - settings = get_settings() - pc_api.configure(settings) - self.assertEqual('example.prismacloud.io', pc_api.url) - - # With - def test_pc_api_current_user(self): - with mock.patch('prismacloud.api.PrismaCloudAPI.execute') as pc_api_execute: - pc_api_execute.return_value = self.USER_PROFILE - result = pc_api.current_user() - self.assertEqual('Example User', result['displayName']) - -if __name__ == '__main__': - unittest.main() From b129431023361d3b2e7e6d6daedd8c5edc6486ec Mon Sep 17 00:00:00 2001 From: Marian GAPPA Date: Fri, 3 Jan 2025 15:00:55 +0000 Subject: [PATCH 2/6] chore: Add test on execute_compute method * Add some basic test by using responses library, that permit to simplify the request mock. We add test before the retry change to keep all basic fonctionnalities --- requirements_test.txt | 2 + tests/data.py | 1486 +++++++++++++++++++++++++++++++++++++++ tests/test_cwpp_cwpp.py | 265 +++++++ 3 files changed, 1753 insertions(+) create mode 100644 requirements_test.txt create mode 100644 tests/test_cwpp_cwpp.py diff --git a/requirements_test.txt b/requirements_test.txt new file mode 100644 index 0000000..4a140b5 --- /dev/null +++ b/requirements_test.txt @@ -0,0 +1,2 @@ +coverage==7.6.10 +responses==0.25.3 diff --git a/tests/data.py b/tests/data.py index d3c73f4..b2f90c6 100644 --- a/tests/data.py +++ b/tests/data.py @@ -42,3 +42,1489 @@ 'onlyAllowReadAccess': False } } + +CREDENTIALS = [ + { + "_id": "string", + "accountGUID": "string", + "accountID": "string", + "accountName": "string", + "apiToken": { + "encrypted": "string", + "plain": "string" + }, + "azureSPInfo": { + "clientId": "string", + "miType": [ + "user-assigned", + "system-assigned" + ], + "subscriptionId": "string", + "tenantId": "string" + }, + "caCert": "string", + "cloudProviderAccountID": "string", + "created": "2024-07-29T15:51:28.071Z", + "description": "string", + "external": True, + "global": True, + "lastModified": "2024-07-29T15:51:28.071Z", + "ociCred": { + "fingerprint": "string", + "tenancyId": "string" + }, + "owner": "string", + "prismaLastModified": 0, + "roleArn": "string", + "secret": { + "encrypted": "string", + "plain": "string" + }, + "skipVerify": True, + "stsEndpoints": [ + "string" + ], + "tokens": { + "awsAccessKeyId": "string", + "awsSecretAccessKey": { + "encrypted": "string", + "plain": "string" + }, + "duration": 0, + "expirationTime": "2024-07-29T15:51:28.071Z", + "token": { + "encrypted": "string", + "plain": "string" + } + }, + "type": [ + "aws", + "azure", + "gcp", + "ibmCloud", + "oci", + "apiToken", + "basic", + "dtr", + "kubeconfig", + "certificate", + "gitlabToken" + ], + "url": "string", + "useAWSRole": True, + "useSTSRegionalEndpoint": True + } +] + +ONE_HOST = { + "Secrets": [ + "string" + ], + "_id": "string", + "agentless": True, + "aisUUID": "string", + "allCompliance": { + "compliance": [ + { + "applicableRules": [ + "string" + ], + "binaryPkgs": [ + "string" + ], + "block": True, + "cause": "string", + "cri": True, + "custom": True, + "cve": "string", + "cvss": 0, + "description": "string", + "discovered": "2024-07-29T15:51:28.071Z", + "exploit": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ], + "exploits": [ + { + "kind": [ + "poc", + "in-the-wild" + ], + "link": "string", + "source": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ] + } + ], + "fixDate": 0, + "fixLink": "string", + "functionLayer": "string", + "gracePeriodDays": 0, + "id": 0, + "layerTime": 0, + "link": "string", + "packageName": "string", + "packageType": [ + "nodejs", + "gem", + "python", + "jar", + "package", + "windows", + "binary", + "nuget", + "go", + "app", + "unknown" + ], + "packageVersion": "string", + "published": 0, + "riskFactors": {}, + "secret": { + "group": "string", + "locationInFile": "string", + "metadataModifiedTime": 0, + "modifiedTime": 0, + "originalFileLocation": "string", + "path": "string", + "permissions": "string", + "secretID": "string", + "size": 0, + "snippet": "string", + "type": [ + "AWS Access Key ID", + "AWS Secret Key", + "AWS MWS Auth Token", + "Azure Storage Account Access Key", + "Azure Service Principal", + "GCP Service Account Auth Key", + "Private Encryption Key", + "Public Encryption Key", + "PEM X509 Certificate Header", + "SSH Authorized Keys", + "Artifactory API Token", + "Artifactory Password", + "Basic Auth Credentials", + "Mailchimp Access Key", + "NPM Token", + "Slack Token", + "Slack Webhook", + "Square OAuth Secret", + "Notion Integration Token", + "Airtable API Key", + "Atlassian Oauth2 Keys", + "CircleCI Personal Token", + "Databricks Authentication Token", + "GitHub Token", + "GitLab Token", + "Google API key", + "Grafana Token", + "Python Package Index Key (PYPI)", + "Typeform API Token", + "Scalr Token", + "Braintree Access Token", + "Braintree Payments Key", + "Paypal Token Key", + "Braintree Payments ID", + "Datadog Client Token", + "ClickUp Personal API Token", + "OpenAI API Key", + "Java DB Connectivity (JDBC)", + "MongoDB", + ".Net SQL Server" + ], + "user": "string" + }, + "severity": "string", + "status": "string", + "templates": [ + [ + "PCI", + "HIPAA", + "NIST SP 800-190", + "GDPR", + "DISA STIG" + ] + ], + "text": "string", + "title": "string", + "twistlock": True, + "type": [ + "container", + "image", + "host_config", + "daemon_config", + "daemon_config_files", + "security_operations", + "k8s_master", + "k8s_worker", + "k8s_federation", + "linux", + "windows", + "istio", + "serverless", + "custom", + "docker_stig", + "openshift_master", + "openshift_worker", + "application_control_linux", + "gke_worker", + "image_malware", + "host_malware", + "aks_worker", + "eks_worker", + "image_secret", + "host_secret" + ], + "vecStr": "string", + "vulnTagInfos": [ + { + "color": "string", + "comment": "string", + "name": "string" + } + ], + "wildfireMalware": { + "md5": "string", + "path": "string", + "verdict": "string" + } + } + ], + "enabled": True + }, + "appEmbedded": True, + "applications": [ + { + "installedFromPackage": True, + "knownVulnerabilities": 0, + "layerTime": 0, + "name": "string", + "originPackageName": "string", + "path": "string", + "service": True, + "version": "string" + } + ], + "baseImage": "string", + "binaries": [ + { + "altered": True, + "cveCount": 0, + "deps": [ + "string" + ], + "fileMode": 0, + "functionLayer": "string", + "md5": "string", + "missingPkg": True, + "name": "string", + "path": "string", + "pkgRootDir": "string", + "services": [ + "string" + ], + "version": "string" + } + ], + "cloudMetadata": { + "accountID": "string", + "awsExecutionEnv": "string", + "image": "string", + "labels": [ + { + "key": "string", + "sourceName": "string", + "sourceType": [ + "namespace", + "deployment", + "aws", + "azure", + "gcp", + "oci" + ], + "timestamp": "2024-07-29T15:51:28.071Z", + "value": "string" + } + ], + "name": "string", + "provider": [ + "aws", + "azure", + "gcp", + "alibaba", + "oci", + "others" + ], + "region": "string", + "resourceID": "string", + "resourceURL": "string", + "type": "string", + "vmID": "string", + "vmImageID": "string" + }, + "clusterType": [ + "AKS", + "ECS", + "EKS", + "GKE", + "Kubernetes" + ], + "clusters": [ + "string" + ], + "collections": [ + "string" + ], + "complianceDistribution": { + "critical": 0, + "high": 0, + "low": 0, + "medium": 0, + "total": 0 + }, + "complianceIssues": [ + { + "applicableRules": [ + "string" + ], + "binaryPkgs": [ + "string" + ], + "block": True, + "cause": "string", + "cri": True, + "custom": True, + "cve": "string", + "cvss": 0, + "description": "string", + "discovered": "2024-07-29T15:51:28.071Z", + "exploit": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ], + "exploits": [ + { + "kind": [ + "poc", + "in-the-wild" + ], + "link": "string", + "source": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ] + } + ], + "fixDate": 0, + "fixLink": "string", + "functionLayer": "string", + "gracePeriodDays": 0, + "id": 0, + "layerTime": 0, + "link": "string", + "packageName": "string", + "packageType": [ + "nodejs", + "gem", + "python", + "jar", + "package", + "windows", + "binary", + "nuget", + "go", + "app", + "unknown" + ], + "packageVersion": "string", + "published": 0, + "riskFactors": {}, + "secret": { + "group": "string", + "locationInFile": "string", + "metadataModifiedTime": 0, + "modifiedTime": 0, + "originalFileLocation": "string", + "path": "string", + "permissions": "string", + "secretID": "string", + "size": 0, + "snippet": "string", + "type": [ + "AWS Access Key ID", + "AWS Secret Key", + "AWS MWS Auth Token", + "Azure Storage Account Access Key", + "Azure Service Principal", + "GCP Service Account Auth Key", + "Private Encryption Key", + "Public Encryption Key", + "PEM X509 Certificate Header", + "SSH Authorized Keys", + "Artifactory API Token", + "Artifactory Password", + "Basic Auth Credentials", + "Mailchimp Access Key", + "NPM Token", + "Slack Token", + "Slack Webhook", + "Square OAuth Secret", + "Notion Integration Token", + "Airtable API Key", + "Atlassian Oauth2 Keys", + "CircleCI Personal Token", + "Databricks Authentication Token", + "GitHub Token", + "GitLab Token", + "Google API key", + "Grafana Token", + "Python Package Index Key (PYPI)", + "Typeform API Token", + "Scalr Token", + "Braintree Access Token", + "Braintree Payments Key", + "Paypal Token Key", + "Braintree Payments ID", + "Datadog Client Token", + "ClickUp Personal API Token", + "OpenAI API Key", + "Java DB Connectivity (JDBC)", + "MongoDB", + ".Net SQL Server" + ], + "user": "string" + }, + "severity": "string", + "status": "string", + "templates": [ + [ + "PCI", + "HIPAA", + "NIST SP 800-190", + "GDPR", + "DISA STIG" + ] + ], + "text": "string", + "title": "string", + "twistlock": True, + "type": [ + "container", + "image", + "host_config", + "daemon_config", + "daemon_config_files", + "security_operations", + "k8s_master", + "k8s_worker", + "k8s_federation", + "linux", + "windows", + "istio", + "serverless", + "custom", + "docker_stig", + "openshift_master", + "openshift_worker", + "application_control_linux", + "gke_worker", + "image_malware", + "host_malware", + "aks_worker", + "eks_worker", + "image_secret", + "host_secret" + ], + "vecStr": "string", + "vulnTagInfos": [ + { + "color": "string", + "comment": "string", + "name": "string" + } + ], + "wildfireMalware": { + "md5": "string", + "path": "string", + "verdict": "string" + } + } + ], + "complianceIssuesCount": 0, + "complianceRiskScore": 0, + "compressed": True, + "compressedLayerTimes": { + "appTimes": [ + 0 + ], + "pkgsTimes": [ + { + "pkgTimes": [ + 0 + ], + "pkgsType": [ + "nodejs", + "gem", + "python", + "jar", + "package", + "windows", + "binary", + "nuget", + "go", + "app", + "unknown" + ] + } + ] + }, + "creationTime": "2024-07-29T15:51:28.071Z", + "csa": True, + "csaWindows": True, + "distro": "string", + "ecsClusterName": "string", + "err": "string", + "errCode": 0, + "externalLabels": [ + { + "key": "string", + "sourceName": "string", + "sourceType": [ + "namespace", + "deployment", + "aws", + "azure", + "gcp", + "oci" + ], + "timestamp": "2024-07-29T15:51:28.071Z", + "value": "string" + } + ], + "files": [ + { + "md5": "string", + "original_file_location": "string", + "path": "string", + "sha1": "string", + "sha256": "string" + } + ], + "firewallProtection": { + "enabled": True, + "outOfBandMode": [ + "", + "Observation", + "Protection" + ], + "ports": [ + 0 + ], + "supported": True, + "tlsPorts": [ + 0 + ], + "unprotectedProcesses": [ + { + "port": 0, + "process": "string", + "tls": True + } + ] + }, + "firstScanTime": "2024-07-29T15:51:28.071Z", + "foundSecrets": [ + { + "group": "string", + "locationInFile": "string", + "metadataModifiedTime": 0, + "modifiedTime": 0, + "originalFileLocation": "string", + "path": "string", + "permissions": "string", + "secretID": "string", + "size": 0, + "snippet": "string", + "type": [ + "AWS Access Key ID", + "AWS Secret Key", + "AWS MWS Auth Token", + "Azure Storage Account Access Key", + "Azure Service Principal", + "GCP Service Account Auth Key", + "Private Encryption Key", + "Public Encryption Key", + "PEM X509 Certificate Header", + "SSH Authorized Keys", + "Artifactory API Token", + "Artifactory Password", + "Basic Auth Credentials", + "Mailchimp Access Key", + "NPM Token", + "Slack Token", + "Slack Webhook", + "Square OAuth Secret", + "Notion Integration Token", + "Airtable API Key", + "Atlassian Oauth2 Keys", + "CircleCI Personal Token", + "Databricks Authentication Token", + "GitHub Token", + "GitLab Token", + "Google API key", + "Grafana Token", + "Python Package Index Key (PYPI)", + "Typeform API Token", + "Scalr Token", + "Braintree Access Token", + "Braintree Payments Key", + "Paypal Token Key", + "Braintree Payments ID", + "Datadog Client Token", + "ClickUp Personal API Token", + "OpenAI API Key", + "Java DB Connectivity (JDBC)", + "MongoDB", + ".Net SQL Server" + ], + "user": "string" + } + ], + "history": [ + { + "baseLayer": True, + "created": 0, + "emptyLayer": True, + "id": "string", + "instruction": "string", + "sizeBytes": 0, + "tags": [ + "string" + ], + "vulnerabilities": [ + { + "applicableRules": [ + "string" + ], + "binaryPkgs": [ + "string" + ], + "block": True, + "cause": "string", + "cri": True, + "custom": True, + "cve": "string", + "cvss": 0, + "description": "string", + "discovered": "2024-07-29T15:51:28.071Z", + "exploit": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ], + "exploits": [ + { + "kind": [ + "poc", + "in-the-wild" + ], + "link": "string", + "source": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ] + } + ], + "fixDate": 0, + "fixLink": "string", + "functionLayer": "string", + "gracePeriodDays": 0, + "id": 0, + "layerTime": 0, + "link": "string", + "packageName": "string", + "packageType": [ + "nodejs", + "gem", + "python", + "jar", + "package", + "windows", + "binary", + "nuget", + "go", + "app", + "unknown" + ], + "packageVersion": "string", + "published": 0, + "riskFactors": {}, + "secret": { + "group": "string", + "locationInFile": "string", + "metadataModifiedTime": 0, + "modifiedTime": 0, + "originalFileLocation": "string", + "path": "string", + "permissions": "string", + "secretID": "string", + "size": 0, + "snippet": "string", + "type": [ + "AWS Access Key ID", + "AWS Secret Key", + "AWS MWS Auth Token", + "Azure Storage Account Access Key", + "Azure Service Principal", + "GCP Service Account Auth Key", + "Private Encryption Key", + "Public Encryption Key", + "PEM X509 Certificate Header", + "SSH Authorized Keys", + "Artifactory API Token", + "Artifactory Password", + "Basic Auth Credentials", + "Mailchimp Access Key", + "NPM Token", + "Slack Token", + "Slack Webhook", + "Square OAuth Secret", + "Notion Integration Token", + "Airtable API Key", + "Atlassian Oauth2 Keys", + "CircleCI Personal Token", + "Databricks Authentication Token", + "GitHub Token", + "GitLab Token", + "Google API key", + "Grafana Token", + "Python Package Index Key (PYPI)", + "Typeform API Token", + "Scalr Token", + "Braintree Access Token", + "Braintree Payments Key", + "Paypal Token Key", + "Braintree Payments ID", + "Datadog Client Token", + "ClickUp Personal API Token", + "OpenAI API Key", + "Java DB Connectivity (JDBC)", + "MongoDB", + ".Net SQL Server" + ], + "user": "string" + }, + "severity": "string", + "status": "string", + "templates": [ + [ + "PCI", + "HIPAA", + "NIST SP 800-190", + "GDPR", + "DISA STIG" + ] + ], + "text": "string", + "title": "string", + "twistlock": True, + "type": [ + "container", + "image", + "host_config", + "daemon_config", + "daemon_config_files", + "security_operations", + "k8s_master", + "k8s_worker", + "k8s_federation", + "linux", + "windows", + "istio", + "serverless", + "custom", + "docker_stig", + "openshift_master", + "openshift_worker", + "application_control_linux", + "gke_worker", + "image_malware", + "host_malware", + "aks_worker", + "eks_worker", + "image_secret", + "host_secret" + ], + "vecStr": "string", + "vulnTagInfos": [ + { + "color": "string", + "comment": "string", + "name": "string" + } + ], + "wildfireMalware": { + "md5": "string", + "path": "string", + "verdict": "string" + } + } + ] + } + ], + "hostDevices": [ + { + "ip": "string", + "name": "string" + } + ], + "hostRuntimeEnabled": True, + "hostname": "string", + "hosts": {}, + "id": "string", + "image": { + "created": "2024-07-29T15:51:28.071Z", + "entrypoint": [ + "string" + ], + "env": [ + "string" + ], + "healthcheck": True, + "history": [ + { + "baseLayer": True, + "created": 0, + "emptyLayer": True, + "id": "string", + "instruction": "string", + "sizeBytes": 0, + "tags": [ + "string" + ], + "vulnerabilities": [ + { + "applicableRules": [ + "string" + ], + "binaryPkgs": [ + "string" + ], + "block": True, + "cause": "string", + "cri": True, + "custom": True, + "cve": "string", + "cvss": 0, + "description": "string", + "discovered": "2024-07-29T15:51:28.071Z", + "exploit": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ], + "exploits": [ + { + "kind": [ + "poc", + "in-the-wild" + ], + "link": "string", + "source": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ] + } + ], + "fixDate": 0, + "fixLink": "string", + "functionLayer": "string", + "gracePeriodDays": 0, + "id": 0, + "layerTime": 0, + "link": "string", + "packageName": "string", + "packageType": [ + "nodejs", + "gem", + "python", + "jar", + "package", + "windows", + "binary", + "nuget", + "go", + "app", + "unknown" + ], + "packageVersion": "string", + "published": 0, + "riskFactors": {}, + "secret": { + "group": "string", + "locationInFile": "string", + "metadataModifiedTime": 0, + "modifiedTime": 0, + "originalFileLocation": "string", + "path": "string", + "permissions": "string", + "secretID": "string", + "size": 0, + "snippet": "string", + "type": [ + "AWS Access Key ID", + "AWS Secret Key", + "AWS MWS Auth Token", + "Azure Storage Account Access Key", + "Azure Service Principal", + "GCP Service Account Auth Key", + "Private Encryption Key", + "Public Encryption Key", + "PEM X509 Certificate Header", + "SSH Authorized Keys", + "Artifactory API Token", + "Artifactory Password", + "Basic Auth Credentials", + "Mailchimp Access Key", + "NPM Token", + "Slack Token", + "Slack Webhook", + "Square OAuth Secret", + "Notion Integration Token", + "Airtable API Key", + "Atlassian Oauth2 Keys", + "CircleCI Personal Token", + "Databricks Authentication Token", + "GitHub Token", + "GitLab Token", + "Google API key", + "Grafana Token", + "Python Package Index Key (PYPI)", + "Typeform API Token", + "Scalr Token", + "Braintree Access Token", + "Braintree Payments Key", + "Paypal Token Key", + "Braintree Payments ID", + "Datadog Client Token", + "ClickUp Personal API Token", + "OpenAI API Key", + "Java DB Connectivity (JDBC)", + "MongoDB", + ".Net SQL Server" + ], + "user": "string" + }, + "severity": "string", + "status": "string", + "templates": [ + [ + "PCI", + "HIPAA", + "NIST SP 800-190", + "GDPR", + "DISA STIG" + ] + ], + "text": "string", + "title": "string", + "twistlock": True, + "type": [ + "container", + "image", + "host_config", + "daemon_config", + "daemon_config_files", + "security_operations", + "k8s_master", + "k8s_worker", + "k8s_federation", + "linux", + "windows", + "istio", + "serverless", + "custom", + "docker_stig", + "openshift_master", + "openshift_worker", + "application_control_linux", + "gke_worker", + "image_malware", + "host_malware", + "aks_worker", + "eks_worker", + "image_secret", + "host_secret" + ], + "vecStr": "string", + "vulnTagInfos": [ + { + "color": "string", + "comment": "string", + "name": "string" + } + ], + "wildfireMalware": { + "md5": "string", + "path": "string", + "verdict": "string" + } + } + ] + } + ], + "id": "string", + "labels": {}, + "layers": [ + "string" + ], + "os": "string", + "repoDigest": [ + "string" + ], + "repoTags": [ + "string" + ], + "user": "string", + "workingDir": "string" + }, + "installedProducts": { + "agentless": True, + "apache": "string", + "awsCloud": True, + "clusterType": [ + "AKS", + "ECS", + "EKS", + "GKE", + "Kubernetes" + ], + "crio": True, + "docker": "string", + "dockerEnterprise": True, + "hasPackageManager": True, + "k8sApiServer": True, + "k8sControllerManager": True, + "k8sEtcd": True, + "k8sFederationApiServer": True, + "k8sFederationControllerManager": True, + "k8sKubelet": True, + "k8sProxy": True, + "k8sScheduler": True, + "kubernetes": "string", + "managedClusterVersion": "string", + "openshift": True, + "openshiftVersion": "string", + "osDistro": "string", + "serverless": True, + "swarmManager": True, + "swarmNode": True + }, + "instances": [ + { + "host": "string", + "image": "string", + "modified": "2024-07-29T15:51:28.071Z", + "registry": "string", + "repo": "string", + "tag": "string" + } + ], + "isARM64": True, + "k8sClusterAddr": "string", + "labels": [ + "string" + ], + "layers": [ + "string" + ], + "malwareAnalyzedTime": "2024-07-29T15:51:28.071Z", + "missingDistroVulnCoverage": True, + "namespaces": [ + "string" + ], + "osDistro": "string", + "osDistroRelease": "string", + "osDistroVersion": "string", + "packageManager": True, + "packages": [ + { + "pkgs": [ + { + "author": "string", + "binaryIdx": [ + 0 + ], + "binaryPkgs": [ + "string" + ], + "cveCount": 0, + "defaultGem": True, + "files": [ + { + "md5": "string", + "original_file_location": "string", + "path": "string", + "sha1": "string", + "sha256": "string" + } + ], + "functionLayer": "string", + "goPkg": True, + "jarIdentifier": "string", + "layerTime": 0, + "license": "string", + "name": "string", + "originPackageName": "string", + "osPackage": True, + "path": "string", + "purl": "string", + "securityRepoPkg": True, + "symbols": [ + "string" + ], + "version": "string" + } + ], + "pkgsType": [ + "nodejs", + "gem", + "python", + "jar", + "package", + "windows", + "binary", + "nuget", + "go", + "app", + "unknown" + ] + } + ], + "pullDuration": 0, + "pushTime": "2024-07-29T15:51:28.071Z", + "redHatNonRPMImage": True, + "registryNamespace": "string", + "registryTags": [ + "string" + ], + "registryType": "string", + "repoDigests": [ + "string" + ], + "repoTag": { + "digest": "string", + "id": "string", + "registry": "string", + "repo": "string", + "tag": "string" + }, + "rhelRepos": [ + "string" + ], + "riskFactors": {}, + "scanBuildDate": "string", + "scanDuration": 0, + "scanID": 0, + "scanTime": "2024-07-29T15:51:28.071Z", + "scanVersion": "string", + "secretScanMetrics": { + "failedScans": 0, + "foundSecrets": 0, + "scanTime": 0, + "scanTimeouts": 0, + "scannedFileSize": 0, + "scannedFiles": 0, + "totalBytes": 0, + "totalFiles": 0, + "totalTime": 0, + "typesCount": {} + }, + "startupBinaries": [ + { + "altered": True, + "cveCount": 0, + "deps": [ + "string" + ], + "fileMode": 0, + "functionLayer": "string", + "md5": "string", + "missingPkg": True, + "name": "string", + "path": "string", + "pkgRootDir": "string", + "services": [ + "string" + ], + "version": "string" + } + ], + "stopped": True, + "tags": [ + { + "digest": "string", + "id": "string", + "registry": "string", + "repo": "string", + "tag": "string" + } + ], + "topLayer": "string", + "trustResult": { + "groups": [ + { + "_id": "string", + "disabled": True, + "images": [ + "string" + ], + "layers": [ + "string" + ], + "modified": "2024-07-29T15:51:28.071Z", + "name": "string", + "notes": "string", + "owner": "string", + "previousName": "string" + } + ], + "hostsStatuses": [ + { + "host": "string", + "status": [ + "trusted", + "untrusted" + ] + } + ] + }, + "trustStatus": [ + "trusted", + "untrusted" + ], + "twistlockImage": True, + "type": [ + "image", + "ciImage", + "container", + "host", + "agentlessHost", + "registry", + "serverlessScan", + "ciServerless", + "vm", + "tas", + "ciTas", + "cloudDiscovery", + "serverlessRadar", + "serverlessAutoDeploy", + "hostAutoDeploy", + "codeRepo", + "ciCodeRepo" + ], + "underlyingDistro": "string", + "underlyingDistroRelease": "string", + "vulnerabilities": [ + { + "applicableRules": [ + "string" + ], + "binaryPkgs": [ + "string" + ], + "block": True, + "cause": "string", + "cri": True, + "custom": True, + "cve": "string", + "cvss": 0, + "description": "string", + "discovered": "2024-07-29T15:51:28.071Z", + "exploit": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ], + "exploits": [ + { + "kind": [ + "poc", + "in-the-wild" + ], + "link": "string", + "source": [ + "", + "exploit-db", + "exploit-windows", + "cisa-kev" + ] + } + ], + "fixDate": 0, + "fixLink": "string", + "functionLayer": "string", + "gracePeriodDays": 0, + "id": 0, + "layerTime": 0, + "link": "string", + "packageName": "string", + "packageType": [ + "nodejs", + "gem", + "python", + "jar", + "package", + "windows", + "binary", + "nuget", + "go", + "app", + "unknown" + ], + "packageVersion": "string", + "published": 0, + "riskFactors": {}, + "secret": { + "group": "string", + "locationInFile": "string", + "metadataModifiedTime": 0, + "modifiedTime": 0, + "originalFileLocation": "string", + "path": "string", + "permissions": "string", + "secretID": "string", + "size": 0, + "snippet": "string", + "type": [ + "AWS Access Key ID", + "AWS Secret Key", + "AWS MWS Auth Token", + "Azure Storage Account Access Key", + "Azure Service Principal", + "GCP Service Account Auth Key", + "Private Encryption Key", + "Public Encryption Key", + "PEM X509 Certificate Header", + "SSH Authorized Keys", + "Artifactory API Token", + "Artifactory Password", + "Basic Auth Credentials", + "Mailchimp Access Key", + "NPM Token", + "Slack Token", + "Slack Webhook", + "Square OAuth Secret", + "Notion Integration Token", + "Airtable API Key", + "Atlassian Oauth2 Keys", + "CircleCI Personal Token", + "Databricks Authentication Token", + "GitHub Token", + "GitLab Token", + "Google API key", + "Grafana Token", + "Python Package Index Key (PYPI)", + "Typeform API Token", + "Scalr Token", + "Braintree Access Token", + "Braintree Payments Key", + "Paypal Token Key", + "Braintree Payments ID", + "Datadog Client Token", + "ClickUp Personal API Token", + "OpenAI API Key", + "Java DB Connectivity (JDBC)", + "MongoDB", + ".Net SQL Server" + ], + "user": "string" + }, + "severity": "string", + "status": "string", + "templates": [ + [ + "PCI", + "HIPAA", + "NIST SP 800-190", + "GDPR", + "DISA STIG" + ] + ], + "text": "string", + "title": "string", + "twistlock": True, + "type": [ + "container", + "image", + "host_config", + "daemon_config", + "daemon_config_files", + "security_operations", + "k8s_master", + "k8s_worker", + "k8s_federation", + "linux", + "windows", + "istio", + "serverless", + "custom", + "docker_stig", + "openshift_master", + "openshift_worker", + "application_control_linux", + "gke_worker", + "image_malware", + "host_malware", + "aks_worker", + "eks_worker", + "image_secret", + "host_secret" + ], + "vecStr": "string", + "vulnTagInfos": [ + { + "color": "string", + "comment": "string", + "name": "string" + } + ], + "wildfireMalware": { + "md5": "string", + "path": "string", + "verdict": "string" + } + } + ], + "vulnerabilitiesCount": 0, + "vulnerabilityDistribution": { + "critical": 0, + "high": 0, + "low": 0, + "medium": 0, + "total": 0 + }, + "vulnerabilityRiskScore": 0, + "wildFireUsage": { + "bytes": 0, + "queries": 0, + "uploads": 0 + } +} diff --git a/tests/test_cwpp_cwpp.py b/tests/test_cwpp_cwpp.py new file mode 100644 index 0000000..9ce9a92 --- /dev/null +++ b/tests/test_cwpp_cwpp.py @@ -0,0 +1,265 @@ +import unittest +from unittest import mock +import json +from prismacloud.api.pc_lib_api import PrismaCloudAPI +from tests.data import SETTINGS, META_INFO, CREDENTIALS, ONE_HOST + +import responses +from responses import registries, matchers +from requests import exceptions + + +class TestCasePrismaCloudAPICWPPMixin(unittest.TestCase): + + @responses.activate + def setUp(self): + responses.post( + "https://example.prismacloud.io/login", + body=json.dumps({"token": "token"}), + status=200, + ) + responses.get( + "https://example.prismacloud.io/meta_info", + body=json.dumps(META_INFO), + status=200, + ) + self.pc_api = PrismaCloudAPI() + self.pc_api.configure(SETTINGS) + + @responses.activate + def test_execute_compute_nominal_for_credentials_list(self): + get_creds = responses.get( + "https://example.prismacloud.io/api/v1/credentials", + body=json.dumps(CREDENTIALS), + status=200 + ) + crendtials_list = self.pc_api.execute_compute( + 'GET', 'api/v1/credentials') + self.assertIsInstance(crendtials_list, list) + self.assertEqual(len(crendtials_list), 1) + self.assertEqual(get_creds.call_count, 1) + + @responses.activate + def test_execute_compute_without_token_for_credentials_list(self): + self.pc_api.token = None + responses.post( + "https://example.prismacloud.io/login", + body=json.dumps({"token": "token"}), + status=200 + ) + get_creds = responses.get( + "https://example.prismacloud.io/api/v1/credentials", + body=json.dumps(CREDENTIALS), + status=200 + ) + crendtials_list = self.pc_api.execute_compute( + 'GET', + 'api/v1/credentials', + ) + self.assertIsInstance(crendtials_list, list) + self.assertEqual(len(crendtials_list), 1) + self.assertEqual(get_creds.call_count, 1) + + @responses.activate + def test_execute_compute_with_an_expire_token_for_credentials_list(self): + self.pc_api.token_timer = 0.0 + responses.post( + "https://example.prismacloud.io/login", + body=json.dumps({"token": "token"}), + status=200 + ) + get_creds = responses.get( + "https://example.prismacloud.io/api/v1/credentials", + body=json.dumps(CREDENTIALS), + status=200 + ) + crendtials_list = self.pc_api.execute_compute( + 'GET', + 'api/v1/credentials', + ) + self.assertIsInstance(crendtials_list, list) + self.assertEqual(len(crendtials_list), 1) + self.assertEqual(get_creds.call_count, 1) + + @responses.activate(registry=registries.OrderedRegistry) + def test_execute_compute_retry_for_credentials_list(self): + get_creds_1 = responses.get( + 'https://example.prismacloud.io/api/v1/credentials', + body=json.dumps({}), + status=500, + ) + get_creds_2 = responses.get( + 'https://example.prismacloud.io/api/v1/credentials', + body=json.dumps(CREDENTIALS), + status=200, + ) + + crendtials_list = self.pc_api.execute_compute( + 'GET', + 'api/v1/credentials', + ) + self.assertIsInstance(crendtials_list, list) + self.assertEqual(len(crendtials_list), 1) + self.assertEqual(get_creds_1.call_count, 1) + self.assertEqual(get_creds_2.call_count, 1) + + @responses.activate(registry=registries.OrderedRegistry) + def test_execute_compute_retry_failed_for_credentials_list(self): + self.pc_api.retry_waits = [1, 1] + get_creds_responses = [ + responses.get( + 'https://example.prismacloud.io/api/v1/credentials', + body=json.dumps({}), + status=500, + ) + for _ in range(3) + ] + with self.assertRaises(SystemExit) as context: + self.pc_api.execute_compute( + 'GET', + 'api/v1/credentials' + ) + self.assertEqual( + str(context.exception), + """ + +Status Code: 500 +API: (https://example.prismacloud.io/api/v1/credentials) with query params: (None) and body params: (None) responded with an error and this response: +{} + +""" + ) + for get_cred in get_creds_responses: + self.assertEqual(get_cred.call_count, 1) + + @responses.activate(registry=registries.OrderedRegistry) + def test_execute_compute_nominal_for_hosts_list(self): + responses.get( + "https://example.prismacloud.io/api/v1/hosts", + body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 50)]), + status=200, + headers={"Total-Count": "52"} + ) + responses.get( + "https://example.prismacloud.io/api/v1/hosts", + body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 2)]), + status=200, + headers={"Total-Count": "52"} + ) + hosts = self.pc_api.execute_compute( + 'GET', + 'api/v1/hosts', + paginated=True, + ) + self.assertEqual(len(hosts), 52) + + @responses.activate(registry=registries.OrderedRegistry) + def test_execute_compute_failed_all_hosts_list_because_unexpected_status_code_using_force_parameter(self): + responses.get( + "https://example.prismacloud.io/api/v1/hosts", + body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 50)]), + status=200, + headers={"Total-Count": "52"} + ) + responses.get( + "https://example.prismacloud.io/api/v1/hosts", + body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 2)]), + status=404, + headers={"Total-Count": "52"} + ) + hosts = self.pc_api.execute_compute( + 'GET', + 'api/v1/hosts', + paginated=True, + force=True + ) + self.assertEqual(len(hosts), 50) + + @responses.activate(registry=registries.OrderedRegistry) + def test_execute_compute_failed_all_hosts_list_because_unexpected_status_code_without_force_parameter(self): + responses.get( + "https://example.prismacloud.io/api/v1/hosts", + body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 50)]), + status=200, + headers={"Total-Count": "52"} + ) + responses.get( + "https://example.prismacloud.io/api/v1/hosts", + body=json.dumps({}), + status=404, + headers={"Total-Count": "52"} + ) + with self.assertRaises(SystemExit) as context: + hosts = self.pc_api.execute_compute( + 'GET', + 'api/v1/hosts', + paginated=True + ) + self.assertEqual(str(context.exception), """ + +Status Code: 404 +API: (https://example.prismacloud.io/api/v1/hosts?limit=50&offset=50) with query params: (None) and body params: (None) responded with an error and this response: +{} + +""") + + @responses.activate(registry=registries.OrderedRegistry) + def test_excute_compute_nominal_for_tag_update(self): + responses.put( + "https://example.prismacloud.io/api/v1/tags/my_tag", + status=200, + match=[ + matchers.json_params_matcher({ + "name": "tag_name", + "color": "#ff0000", + "description": "A super cool tag", + }) + ] + ) + response = self.pc_api.execute_compute( + 'PUT', + 'api/v1/tags/my_tag', + body_params={ + "name": "tag_name", + "color": "#ff0000", + "description": "A super cool tag", + }, + ) + self.assertIsNone(response) + + @responses.activate + def test_execute_compute_failed_because_bad_status_code(self): + get_creds = responses.get( + "https://example.prismacloud.io/api/v1/credentials", + body=json.dumps(CREDENTIALS), + status=404 + ) + with self.assertRaises(SystemExit) as context: + self.pc_api.execute_compute( + 'GET', + 'api/v1/credentials', + ) + self.assertEqual( + str(context.exception), + f""" + +Status Code: 404 +API: (https://example.prismacloud.io/api/v1/credentials) with query params: (None) and body params: (None) responded with an error and this response: +{json.dumps(CREDENTIALS)} + +""" + ) + self.assertEqual(get_creds.call_count, 1) + + @responses.activate + def test_execute_compute_with_csv_content_type(self): + discovery = responses.get( + "https://example.prismacloud.io/api/v1/cloud/discovery/download", + body="discovery_download", + status=200, + content_type="text/csv", + ) + download = self.pc_api.execute_compute( + "GET", "api/v1/cloud/discovery/download") + self.assertEqual(discovery.call_count, 1) + self.assertEqual(download, "discovery_download") From de693163a623cfd8d7013d51149d4de3e29adc29 Mon Sep 17 00:00:00 2001 From: Marian GAPPA Date: Mon, 6 Jan 2025 07:57:56 +0000 Subject: [PATCH 3/6] refactor(cwpp): Modify manual retry by using Retry urllib class * Retry class permit to retry requests and to use requests session * Add retry_number property on PrismaCloudAPI used by Retry class * Update test_execute_compute_retry_failed_for_credentials_list test to use retry_number instead of retry_wait --- prismacloud/api/cwpp/cwpp.py | 144 +++++++++++++++++++--------------- prismacloud/api/pc_lib_api.py | 1 + tests/test_cwpp_cwpp.py | 2 +- 3 files changed, 82 insertions(+), 65 deletions(-) diff --git a/prismacloud/api/cwpp/cwpp.py b/prismacloud/api/cwpp/cwpp.py index cb0e750..49f4e75 100644 --- a/prismacloud/api/cwpp/cwpp.py +++ b/prismacloud/api/cwpp/cwpp.py @@ -4,6 +4,8 @@ import time import requests +from requests.adapters import HTTPAdapter, Retry + class PrismaCloudAPICWPPMixin(): """ Requests and Output """ @@ -16,7 +18,8 @@ def login_compute(self): # Login via CWP. self.login('https://%s/api/v1/authenticate' % self.api_compute) else: - self.error_and_exit(418, "Specify a Prisma Cloud URL or Prisma Cloud Compute URL") + self.error_and_exit( + 418, "Specify a Prisma Cloud URL or Prisma Cloud Compute URL") self.debug_print('New API Token: %s' % self.token) def extend_login_compute(self): @@ -43,78 +46,91 @@ def execute_compute(self, action, endpoint, query_params=None, body_params=None, limit = 50 more = False results = [] - while offset == 0 or more is True: - if int(time.time() - self.token_timer) > self.token_limit: - self.extend_login_compute() - if paginated: - url = 'https://%s/%s?limit=%s&offset=%s' % (self.api_compute, endpoint, limit, offset) - else: - url = 'https://%s/%s' % (self.api_compute, endpoint) - if self.token: - if self.api: - # Authenticate via CSPM - request_headers['x-redlock-auth'] = self.token + + retries = Retry(total=self.retry_number, + status_forcelist=self.retry_status_codes, raise_on_status=False) + + with requests.Session() as session: + session.mount('https://%s/%s' % (self.api_compute, + endpoint), adapter=HTTPAdapter(max_retries=retries)) + + while offset == 0 or more is True: + if int(time.time() - self.token_timer) > self.token_limit: + self.extend_login_compute() + if paginated: + url = 'https://%s/%s?limit=%s&offset=%s' % ( + self.api_compute, endpoint, limit, offset) else: - # Authenticate via CWP - request_headers['Authorization'] = "Bearer %s" % self.token - self.debug_print('API URL: %s' % url) - self.debug_print('API Request Headers: (%s)' % request_headers) - self.debug_print('API Query Params: %s' % query_params) - self.debug_print('API Body Params: %s' % body_params_json) - # Add User-Agent to the headers - request_headers['User-Agent'] = self.user_agent - api_response = requests.request(action, url, headers=request_headers, params=query_params, data=body_params_json, verify=self.verify, timeout=self.timeout) - self.debug_print('API Response Status Code: (%s)' % api_response.status_code) - self.debug_print('API Response Headers: (%s)' % api_response.headers) - if api_response.status_code in self.retry_status_codes: - for exponential_wait in self.retry_waits: - time.sleep(exponential_wait) - api_response = requests.request(action, url, headers=request_headers, params=query_params, data=body_params_json, verify=self.verify, timeout=self.timeout) - if api_response.ok: - break # retry loop - if api_response.ok: - if not api_response.content: - return None - if api_response.headers.get('Content-Type') == 'application/x-gzip': - return api_response.content - if api_response.headers.get('Content-Type') == 'text/csv': - return api_response.content.decode('utf-8') - try: - result = json.loads(api_response.content) - #if result is None: - # self.logger.error('JSON returned None, API: (%s) with query params: (%s) and body params: (%s) parsing response: (%s)' % (url, query_params, body_params, api_response.content)) - # if force: - # return results # or continue - # self.error_and_exit(api_response.status_code, 'JSON returned None, API: (%s) with query params: (%s) and body params: (%s) parsing response: (%s)' % (url, query_params, body_params, api_response.content)) - except ValueError: - self.logger.error('JSON raised ValueError, API: (%s) with query params: (%s) and body params: (%s) parsing response: (%s)' % (url, query_params, body_params, api_response.content)) - if force: - return results # or continue - self.error_and_exit(api_response.status_code, 'JSON raised ValueError, API: (%s) with query params: (%s) and body params: (%s) parsing response: (%s)' % (url, query_params, body_params, api_response.content)) - if 'Total-Count' in api_response.headers: - self.debug_print('Retrieving Next Page of Results: Offset/Total Count: %s/%s' % (offset, api_response.headers['Total-Count'])) - total_count = int(api_response.headers['Total-Count']) - if total_count > 0: - results.extend(result) - offset += limit - more = bool(offset < total_count) + url = 'https://%s/%s' % (self.api_compute, endpoint) + if self.token: + if self.api: + # Authenticate via CSPM + request_headers['x-redlock-auth'] = self.token + else: + # Authenticate via CWP + request_headers['Authorization'] = "Bearer %s" % self.token + self.debug_print('API URL: %s' % url) + self.debug_print('API Request Headers: (%s)' % request_headers) + self.debug_print('API Query Params: %s' % query_params) + self.debug_print('API Body Params: %s' % body_params_json) + # Add User-Agent to the headers + request_headers['User-Agent'] = self.user_agent + api_response = session.request(action, url, headers=request_headers, params=query_params, + data=body_params_json, verify=self.verify, timeout=self.timeout) + self.debug_print('API Response Status Code: (%s)' % + api_response.status_code) + self.debug_print('API Response Headers: (%s)' % + api_response.headers) + if api_response.ok: + if not api_response.content: + return None + if api_response.headers.get('Content-Type') == 'application/x-gzip': + return api_response.content + if api_response.headers.get('Content-Type') == 'text/csv': + return api_response.content.decode('utf-8') + try: + result = json.loads(api_response.content) + # if result is None: + # self.logger.error('JSON returned None, API: (%s) with query params: (%s) and body params: (%s) parsing response: (%s)' % (url, query_params, body_params, api_response.content)) + # if force: + # return results # or continue + # self.error_and_exit(api_response.status_code, 'JSON returned None, API: (%s) with query params: (%s) and body params: (%s) parsing response: (%s)' % (url, query_params, body_params, api_response.content)) + except ValueError: + self.logger.error('JSON raised ValueError, API: (%s) with query params: (%s) and body params: (%s) parsing response: (%s)' % ( + url, query_params, body_params, api_response.content)) + if force: + return results # or continue + self.error_and_exit(api_response.status_code, 'JSON raised ValueError, API: (%s) with query params: (%s) and body params: (%s) parsing response: (%s)' % ( + url, query_params, body_params, api_response.content)) + if 'Total-Count' in api_response.headers: + self.debug_print('Retrieving Next Page of Results: Offset/Total Count: %s/%s' % ( + offset, api_response.headers['Total-Count'])) + total_count = int(api_response.headers['Total-Count']) + if total_count > 0: + results.extend(result) + offset += limit + more = bool(offset < total_count) + else: + return result else: - return result - else: - self.logger.error('API: (%s) responded with a status of: (%s), with query: (%s) and body params: (%s)' % (url, api_response.status_code, query_params, body_params)) - if force: - return results - self.error_and_exit(api_response.status_code, 'API: (%s) with query params: (%s) and body params: (%s) responded with an error and this response:\n%s' % (url, query_params, body_params, api_response.text)) - return results + self.logger.error('API: (%s) responded with a status of: (%s), with query: (%s) and body params: (%s)' % ( + url, api_response.status_code, query_params, body_params)) + if force: + return results + self.error_and_exit(api_response.status_code, 'API: (%s) with query params: (%s) and body params: (%s) responded with an error and this response:\n%s' % ( + url, query_params, body_params, api_response.text)) + return results # The Compute API setting is optional. def validate_api_compute(self): if not self.api_compute: - self.error_and_exit(500, 'Please specify a Prisma Cloud Compute API URL.') + self.error_and_exit( + 500, 'Please specify a Prisma Cloud Compute API URL.') # Exit handler (Error). @classmethod def error_and_exit(cls, error_code, error_message='', system_message=''): - raise SystemExit('\n\nStatus Code: %s\n%s\n%s\n' % (error_code, error_message, system_message)) + raise SystemExit('\n\nStatus Code: %s\n%s\n%s\n' % + (error_code, error_message, system_message)) diff --git a/prismacloud/api/pc_lib_api.py b/prismacloud/api/pc_lib_api.py index 01289f5..511569b 100644 --- a/prismacloud/api/pc_lib_api.py +++ b/prismacloud/api/pc_lib_api.py @@ -43,6 +43,7 @@ def __init__(self): self.token_limit = 590 # aka 9 minutes self.retry_status_codes = [425, 429, 500, 502, 503, 504] self.retry_waits = [1, 2, 4, 8, 16, 32] + self.retry_number = 6 self.max_workers = 8 # self.error_log = 'error.log' diff --git a/tests/test_cwpp_cwpp.py b/tests/test_cwpp_cwpp.py index 9ce9a92..956a91d 100644 --- a/tests/test_cwpp_cwpp.py +++ b/tests/test_cwpp_cwpp.py @@ -105,7 +105,7 @@ def test_execute_compute_retry_for_credentials_list(self): @responses.activate(registry=registries.OrderedRegistry) def test_execute_compute_retry_failed_for_credentials_list(self): - self.pc_api.retry_waits = [1, 1] + self.pc_api.retry_number = 2 get_creds_responses = [ responses.get( 'https://example.prismacloud.io/api/v1/credentials', From 671f1819873c993ef82dfafd7ecce1d1c216cf67 Mon Sep 17 00:00:00 2001 From: Marian GAPPA Date: Mon, 6 Jan 2025 08:29:24 +0000 Subject: [PATCH 4/6] chore: Add extra requirements for test purposes --- pyproject.toml | 5 ++++- setup.py | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7fd26b9..8580373 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ [build-system] requires = ["setuptools"] -build-backend = "setuptools.build_meta" \ No newline at end of file +build-backend = "setuptools.build_meta" + +[project.optional-dependencies] +test = ["coverage==7.6.10", "responses==0.25.3"] diff --git a/setup.py b/setup.py index 1218eb8..fbf4164 100644 --- a/setup.py +++ b/setup.py @@ -34,5 +34,8 @@ 'requests', 'update_checker' ], + extras_require={ + 'test': ['coverage==7.6.10', 'responses==0.25.3'] + }, python_requires='>=3.6' ) From def4390a182c01751eca779bb072f2051410d8f8 Mon Sep 17 00:00:00 2001 From: Marian GAPPA Date: Mon, 6 Jan 2025 09:59:32 +0000 Subject: [PATCH 5/6] refactor(tests): Remove unused and oranized import --- tests/test_cwpp_cwpp.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/test_cwpp_cwpp.py b/tests/test_cwpp_cwpp.py index 956a91d..d4bc5a8 100644 --- a/tests/test_cwpp_cwpp.py +++ b/tests/test_cwpp_cwpp.py @@ -1,12 +1,10 @@ import unittest -from unittest import mock import json -from prismacloud.api.pc_lib_api import PrismaCloudAPI -from tests.data import SETTINGS, META_INFO, CREDENTIALS, ONE_HOST import responses from responses import registries, matchers -from requests import exceptions +from prismacloud.api.pc_lib_api import PrismaCloudAPI +from tests.data import SETTINGS, META_INFO, CREDENTIALS, ONE_HOST class TestCasePrismaCloudAPICWPPMixin(unittest.TestCase): From 7bc85783e7bb3ca6542aeffcaefc79b1dd031e4b Mon Sep 17 00:00:00 2001 From: Marian GAPPA Date: Mon, 6 Jan 2025 10:33:02 +0000 Subject: [PATCH 6/6] chore: Add documentation on tests and add some assert on cwpp execute method --- tests/test_cwpp_cwpp.py | 63 +++++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/tests/test_cwpp_cwpp.py b/tests/test_cwpp_cwpp.py index d4bc5a8..0ae6c20 100644 --- a/tests/test_cwpp_cwpp.py +++ b/tests/test_cwpp_cwpp.py @@ -1,3 +1,5 @@ +"""Unit test for PrismaCloudAPICWPPMixin class +""" import unittest import json @@ -8,9 +10,12 @@ class TestCasePrismaCloudAPICWPPMixin(unittest.TestCase): - + """Unit test on execute_compute method + """ @responses.activate def setUp(self): + """Setup the login and meta_info route to get a mock PrimaCloudAPI object used on test + """ responses.post( "https://example.prismacloud.io/login", body=json.dumps({"token": "token"}), @@ -26,6 +31,8 @@ def setUp(self): @responses.activate def test_execute_compute_nominal_for_credentials_list(self): + """Nominal test on the mock credentials list route + """ get_creds = responses.get( "https://example.prismacloud.io/api/v1/credentials", body=json.dumps(CREDENTIALS), @@ -39,6 +46,9 @@ def test_execute_compute_nominal_for_credentials_list(self): @responses.activate def test_execute_compute_without_token_for_credentials_list(self): + """Nominal test on the mock credentials list route with missing Prisma token + On this case, we check the login phase + """ self.pc_api.token = None responses.post( "https://example.prismacloud.io/login", @@ -60,6 +70,9 @@ def test_execute_compute_without_token_for_credentials_list(self): @responses.activate def test_execute_compute_with_an_expire_token_for_credentials_list(self): + """Nominal test on the mock credentials list route with an expired Prisma token + On this case, we check the re-login phase + """ self.pc_api.token_timer = 0.0 responses.post( "https://example.prismacloud.io/login", @@ -81,6 +94,9 @@ def test_execute_compute_with_an_expire_token_for_credentials_list(self): @responses.activate(registry=registries.OrderedRegistry) def test_execute_compute_retry_for_credentials_list(self): + """Nominal test on the mock credentials list route with an 500 error + On the first call we send a 500 error before getting it 200 to test the retry mecanisme + """ get_creds_1 = responses.get( 'https://example.prismacloud.io/api/v1/credentials', body=json.dumps({}), @@ -103,6 +119,9 @@ def test_execute_compute_retry_for_credentials_list(self): @responses.activate(registry=registries.OrderedRegistry) def test_execute_compute_retry_failed_for_credentials_list(self): + """Non expected test on the mock credentials list route with multiple 500 error + We setup for all call a 500 error return and test if the right exception is raise + """ self.pc_api.retry_number = 2 get_creds_responses = [ responses.get( @@ -132,13 +151,16 @@ def test_execute_compute_retry_failed_for_credentials_list(self): @responses.activate(registry=registries.OrderedRegistry) def test_execute_compute_nominal_for_hosts_list(self): - responses.get( + """Norminal test on the mock hosts list route + We expected to get 52 hosts + """ + get_host_1 = responses.get( "https://example.prismacloud.io/api/v1/hosts", body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 50)]), status=200, headers={"Total-Count": "52"} ) - responses.get( + get_host_2 = responses.get( "https://example.prismacloud.io/api/v1/hosts", body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 2)]), status=200, @@ -150,16 +172,22 @@ def test_execute_compute_nominal_for_hosts_list(self): paginated=True, ) self.assertEqual(len(hosts), 52) + self.assertEqual(get_host_1.call_count, 1) + self.assertEqual(get_host_2.call_count, 1) @responses.activate(registry=registries.OrderedRegistry) def test_execute_compute_failed_all_hosts_list_because_unexpected_status_code_using_force_parameter(self): - responses.get( + """Non expected test on the mock hosts list route with a 404 HTTP error code + The first batch of hosts is correctly send by the second batch have an expected error code + Since the force flag is set, we expected a 50 hosts list and no raised exception + """ + get_host_1 = responses.get( "https://example.prismacloud.io/api/v1/hosts", body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 50)]), status=200, headers={"Total-Count": "52"} ) - responses.get( + get_host_2 = responses.get( "https://example.prismacloud.io/api/v1/hosts", body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 2)]), status=404, @@ -172,27 +200,35 @@ def test_execute_compute_failed_all_hosts_list_because_unexpected_status_code_us force=True ) self.assertEqual(len(hosts), 50) + self.assertEqual(get_host_1.call_count, 1) + self.assertEqual(get_host_2.call_count, 1) @responses.activate(registry=registries.OrderedRegistry) def test_execute_compute_failed_all_hosts_list_because_unexpected_status_code_without_force_parameter(self): - responses.get( + """Non expected test on the mock hosts list route with a 404 HTTP error code + The first batch of hosts is correctly send by the second batch have an expected error code + We must have a raised exception and no host response + """ + get_host_1 = responses.get( "https://example.prismacloud.io/api/v1/hosts", body=json.dumps([json.dumps(ONE_HOST) for _ in range(0, 50)]), status=200, headers={"Total-Count": "52"} ) - responses.get( + get_host_2 = responses.get( "https://example.prismacloud.io/api/v1/hosts", body=json.dumps({}), status=404, headers={"Total-Count": "52"} ) with self.assertRaises(SystemExit) as context: - hosts = self.pc_api.execute_compute( + self.pc_api.execute_compute( 'GET', 'api/v1/hosts', paginated=True ) + self.assertEqual(get_host_1.call_count, 1) + self.assertEqual(get_host_2.call_count, 1) self.assertEqual(str(context.exception), """ Status Code: 404 @@ -203,7 +239,10 @@ def test_execute_compute_failed_all_hosts_list_because_unexpected_status_code_wi @responses.activate(registry=registries.OrderedRegistry) def test_excute_compute_nominal_for_tag_update(self): - responses.put( + """Nominal test on the mock tags put + We expect no response + """ + put_tag = responses.put( "https://example.prismacloud.io/api/v1/tags/my_tag", status=200, match=[ @@ -224,9 +263,13 @@ def test_excute_compute_nominal_for_tag_update(self): }, ) self.assertIsNone(response) + self.assertEqual(put_tag.call_count, 1) @responses.activate def test_execute_compute_failed_because_bad_status_code(self): + """Non expected test on the mock credentials list route with an 404 error + We expect an expection raised + """ get_creds = responses.get( "https://example.prismacloud.io/api/v1/credentials", body=json.dumps(CREDENTIALS), @@ -251,6 +294,8 @@ def test_execute_compute_failed_because_bad_status_code(self): @responses.activate def test_execute_compute_with_csv_content_type(self): + """Nominal test on the mock discovery cloud download file on CSV + """ discovery = responses.get( "https://example.prismacloud.io/api/v1/cloud/discovery/download", body="discovery_download",