diff --git a/.gitignore b/.gitignore index 16c4411..0411262 100644 --- a/.gitignore +++ b/.gitignore @@ -752,3 +752,4 @@ insightvm-python.code-workspace # Explicitly include tests directory (must be at end to override patterns above) !tests/ !tests/** +coverage.json diff --git a/tests/__pycache__/__init__.cpython-312.pyc b/tests/__pycache__/__init__.cpython-312.pyc index 67c6ff7..e498cc7 100644 Binary files a/tests/__pycache__/__init__.cpython-312.pyc and b/tests/__pycache__/__init__.cpython-312.pyc differ diff --git a/tests/__pycache__/conftest.cpython-312-pytest-8.4.2.pyc b/tests/__pycache__/conftest.cpython-312-pytest-8.4.2.pyc index df70786..c577356 100644 Binary files a/tests/__pycache__/conftest.cpython-312-pytest-8.4.2.pyc and b/tests/__pycache__/conftest.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/__pycache__/test_api_standardization.cpython-312-pytest-8.4.2.pyc b/tests/__pycache__/test_api_standardization.cpython-312-pytest-8.4.2.pyc index 0e3d8e1..af57690 100644 Binary files a/tests/__pycache__/test_api_standardization.cpython-312-pytest-8.4.2.pyc and b/tests/__pycache__/test_api_standardization.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/__pycache__/test_auth.cpython-312-pytest-8.4.2.pyc b/tests/__pycache__/test_auth.cpython-312-pytest-8.4.2.pyc index 495db44..ecb82a4 100644 Binary files a/tests/__pycache__/test_auth.cpython-312-pytest-8.4.2.pyc and b/tests/__pycache__/test_auth.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/__pycache__/test_client.cpython-312-pytest-8.4.2.pyc b/tests/__pycache__/test_client.cpython-312-pytest-8.4.2.pyc index 42e8920..9222973 100644 Binary files a/tests/__pycache__/test_client.cpython-312-pytest-8.4.2.pyc and b/tests/__pycache__/test_client.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/__pycache__/test_config.cpython-312-pytest-8.4.2.pyc b/tests/__pycache__/test_config.cpython-312-pytest-8.4.2.pyc new file mode 100644 index 0000000..bf175a8 Binary files /dev/null and b/tests/__pycache__/test_config.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/__pycache__/test_ui.cpython-312-pytest-8.4.2.pyc b/tests/__pycache__/test_ui.cpython-312-pytest-8.4.2.pyc new file mode 100644 index 0000000..0e7b728 Binary files /dev/null and b/tests/__pycache__/test_ui.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..ce16af9 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,221 @@ +""" +Tests for Rapid7 InsightVM config module. + +Tests the config.py module for persistent configuration management. +""" + +import json +import pytest +from pathlib import Path +from unittest.mock import Mock, patch, mock_open +import tempfile +import shutil + +from rapid7.config import Config, get_config + + +class TestConfig: + """Test Config class functionality.""" + + @pytest.fixture + def temp_config_dir(self): + """Create a temporary config directory for testing.""" + temp_dir = tempfile.mkdtemp() + yield temp_dir + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_init_creates_directories(self, temp_config_dir): + """Test that Config creates necessary directories.""" + config = Config(config_dir=temp_config_dir) + + assert config.config_dir.exists() + assert config.state_dir.exists() + assert config.config_dir == Path(temp_config_dir) + + def test_init_with_default_directory(self): + """Test that Config uses default directory when not specified.""" + with patch('pathlib.Path.mkdir'): + with patch('pathlib.Path.exists', return_value=False): + config = Config() + + expected_dir = Path.home() / '.insightvm' + assert config.config_dir == expected_dir + + def test_default_config_structure(self, temp_config_dir): + """Test that default configuration has expected structure.""" + config = Config(config_dir=temp_config_dir) + + # Check top-level keys + assert 'version' in config.data + assert 'preferences' in config.data + assert 'tools' in config.data + + # Check preferences + prefs = config.data['preferences'] + assert 'confirm_destructive_operations' in prefs + assert 'colored_output' in prefs + assert 'show_progress_bars' in prefs + assert 'verbose' in prefs + + # Check tools + tools = config.data['tools'] + assert 'sonar_queries' in tools + assert 'insight_agent' in tools + assert 'scan_assistant' in tools + + def test_save_config(self, temp_config_dir): + """Test saving configuration to file.""" + config = Config(config_dir=temp_config_dir) + config.data['preferences']['verbose'] = True + + config.save() + + # Verify file was created + assert config.config_file.exists() + + # Verify content + with open(config.config_file, 'r') as f: + saved_data = json.load(f) + + assert saved_data['preferences']['verbose'] is True + + def test_load_existing_config(self, temp_config_dir): + """Test loading existing configuration from file.""" + # Create a config file + config_file = Path(temp_config_dir) / 'config.json' + config_data = { + 'version': '2.0.0', + 'preferences': {'verbose': True}, + 'tools': {} + } + + Path(temp_config_dir).mkdir(parents=True, exist_ok=True) + with open(config_file, 'w') as f: + json.dump(config_data, f) + + # Load config + config = Config(config_dir=temp_config_dir) + + assert config.data['preferences']['verbose'] is True + + def test_get_preference(self, temp_config_dir): + """Test getting a preference value.""" + config = Config(config_dir=temp_config_dir) + + value = config.get_preference('colored_output') + assert value is True + + # Test with default value + value = config.get_preference('nonexistent', default=False) + assert value is False + + def test_set_preference(self, temp_config_dir): + """Test setting a preference value.""" + config = Config(config_dir=temp_config_dir) + + config.set_preference('verbose', True) + assert config.data['preferences']['verbose'] is True + + def test_get_tool_config(self, temp_config_dir): + """Test getting tool configuration.""" + config = Config(config_dir=temp_config_dir) + + sonar_config = config.get_tool_config('sonar_queries') + assert sonar_config is not None + assert 'default_days' in sonar_config + assert sonar_config['default_days'] == 30 + + def test_set_tool_config(self, temp_config_dir): + """Test setting tool configuration.""" + config = Config(config_dir=temp_config_dir) + + new_config = {'custom_setting': 'value'} + config.set_tool_config('custom_tool', new_config) + + assert 'custom_tool' in config.data['tools'] + assert config.data['tools']['custom_tool']['custom_setting'] == 'value' + + def test_save_state(self, temp_config_dir): + """Test saving tool state.""" + config = Config(config_dir=temp_config_dir) + + state_data = {'progress': 50, 'last_item': 'item_123'} + config.save_state('test_tool', state_data) + + state_file = config.state_dir / 'test_tool_state.json' + assert state_file.exists() + + with open(state_file, 'r') as f: + saved_state = json.load(f) + + assert saved_state['progress'] == 50 + assert saved_state['last_item'] == 'item_123' + + def test_load_state(self, temp_config_dir): + """Test loading tool state.""" + config = Config(config_dir=temp_config_dir) + + # Save state first + state_data = {'progress': 75} + config.save_state('test_tool', state_data) + + # Load state + loaded_state = config.load_state('test_tool') + + assert loaded_state is not None + assert loaded_state['progress'] == 75 + + def test_load_state_nonexistent(self, temp_config_dir): + """Test loading state that doesn't exist.""" + config = Config(config_dir=temp_config_dir) + + state = config.load_state('nonexistent_tool') + assert state is None + + def test_clear_state(self, temp_config_dir): + """Test clearing tool state.""" + config = Config(config_dir=temp_config_dir) + + # Save state + config.save_state('test_tool', {'data': 'test'}) + state_file = config.state_dir / 'test_tool_state.json' + assert state_file.exists() + + # Clear state + config.clear_state('test_tool') + assert not state_file.exists() + + def test_load_config_with_json_error(self, temp_config_dir): + """Test handling of corrupted JSON config file.""" + config_file = Path(temp_config_dir) / 'config.json' + Path(temp_config_dir).mkdir(parents=True, exist_ok=True) + + # Write invalid JSON + with open(config_file, 'w') as f: + f.write('{ invalid json }') + + # Should fall back to default config + config = Config(config_dir=temp_config_dir) + assert 'version' in config.data + assert config.data['version'] == '2.0.0' + + +class TestGetConfig: + """Test get_config helper function.""" + + def test_get_config_singleton(self): + """Test that get_config returns a singleton instance.""" + config1 = get_config() + config2 = get_config() + + # Should return the same instance + assert config1 is config2 + + def test_get_config_returns_config_instance(self): + """Test that get_config returns a Config instance.""" + config = get_config() + + assert isinstance(config, Config) + assert hasattr(config, 'data') + assert hasattr(config, 'save') + assert hasattr(config, 'get_preference') diff --git a/tests/test_rapid7/__pycache__/__init__.cpython-312.pyc b/tests/test_rapid7/__pycache__/__init__.cpython-312.pyc index cbf260c..b29849a 100644 Binary files a/tests/test_rapid7/__pycache__/__init__.cpython-312.pyc and b/tests/test_rapid7/__pycache__/__init__.cpython-312.pyc differ diff --git a/tests/test_rapid7/__pycache__/test_assets.cpython-312-pytest-8.4.2.pyc b/tests/test_rapid7/__pycache__/test_assets.cpython-312-pytest-8.4.2.pyc index 683a281..425ed7c 100644 Binary files a/tests/test_rapid7/__pycache__/test_assets.cpython-312-pytest-8.4.2.pyc and b/tests/test_rapid7/__pycache__/test_assets.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/test_rapid7/__pycache__/test_base.cpython-312-pytest-8.4.2.pyc b/tests/test_rapid7/__pycache__/test_base.cpython-312-pytest-8.4.2.pyc index d7ca60b..31bbc41 100644 Binary files a/tests/test_rapid7/__pycache__/test_base.cpython-312-pytest-8.4.2.pyc and b/tests/test_rapid7/__pycache__/test_base.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/test_rapid7/__pycache__/test_reports.cpython-312-pytest-8.4.2.pyc b/tests/test_rapid7/__pycache__/test_reports.cpython-312-pytest-8.4.2.pyc new file mode 100644 index 0000000..21e6a8f Binary files /dev/null and b/tests/test_rapid7/__pycache__/test_reports.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/test_rapid7/__pycache__/test_scans.cpython-312-pytest-8.4.2.pyc b/tests/test_rapid7/__pycache__/test_scans.cpython-312-pytest-8.4.2.pyc new file mode 100644 index 0000000..24f4af3 Binary files /dev/null and b/tests/test_rapid7/__pycache__/test_scans.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/test_rapid7/__pycache__/test_sites.cpython-312-pytest-8.4.2.pyc b/tests/test_rapid7/__pycache__/test_sites.cpython-312-pytest-8.4.2.pyc new file mode 100644 index 0000000..1cfe54d Binary files /dev/null and b/tests/test_rapid7/__pycache__/test_sites.cpython-312-pytest-8.4.2.pyc differ diff --git a/tests/test_rapid7/test_reports.py b/tests/test_rapid7/test_reports.py new file mode 100644 index 0000000..d8ef8b7 --- /dev/null +++ b/tests/test_rapid7/test_reports.py @@ -0,0 +1,337 @@ +""" +Tests for Rapid7 InsightVM Reports API module. + +Tests the reports.py API module for report generation and management. +""" + +import pytest +from unittest.mock import Mock, patch + +from rapid7.api.reports import ReportsAPI + + +class TestReportsAPI: + """Test ReportsAPI functionality.""" + + @pytest.fixture + def mock_auth(self): + """Mock authentication for testing.""" + auth = Mock() + auth.base_url = "https://test.insightvm.example.com:3780" + auth.auth = Mock() + return auth + + @pytest.fixture + def reports_api(self, mock_auth): + """Create ReportsAPI instance for testing.""" + return ReportsAPI(mock_auth) + + def test_init(self, mock_auth): + """Test ReportsAPI initialization.""" + api = ReportsAPI(mock_auth) + assert api.auth == mock_auth + assert hasattr(api, 'MAX_PAGE_SIZE') + assert api.MAX_PAGE_SIZE == 500 + + @patch('rapid7.api.reports.BaseAPI._request') + def test_list_reports(self, mock_request, reports_api): + """Test listing all reports.""" + mock_response = { + "resources": [ + {"id": 1, "name": "Vulnerability Report", "format": "pdf"}, + {"id": 2, "name": "Compliance Report", "format": "html"} + ], + "page": {"number": 0, "size": 10} + } + mock_request.return_value = mock_response + + result = reports_api.list(page=0, size=10) + + assert result == mock_response + assert len(result['resources']) == 2 + mock_request.assert_called_once() + + @patch('rapid7.api.reports.BaseAPI._request') + def test_get_report(self, mock_request, reports_api): + """Test getting a specific report configuration.""" + report_id = 123 + mock_response = { + "id": report_id, + "name": "Monthly Security Report", + "format": "pdf", + "template": "vulnerability-report" + } + mock_request.return_value = mock_response + + result = reports_api.get_report(report_id) + + assert result == mock_response + assert result['id'] == report_id + mock_request.assert_called_once_with('GET', f'reports/{report_id}') + + @patch('rapid7.api.reports.BaseAPI._request') + def test_create_report(self, mock_request, reports_api): + """Test creating a new report configuration.""" + report_config = { + "name": "New Report", + "format": "pdf", + "template": "audit-report", + "scope": {"sites": [1, 2, 3]} + } + mock_response = {"id": 456, **report_config} + mock_request.return_value = mock_response + + result = reports_api.create(report_config) + + assert result == mock_response + assert result['id'] == 456 + call_args = mock_request.call_args + assert call_args[0][0] == 'POST' + assert call_args[1]['json'] == report_config + + @patch('rapid7.api.reports.BaseAPI._request') + def test_update_report(self, mock_request, reports_api): + """Test updating a report configuration.""" + report_id = 123 + updates = {"name": "Updated Report Name"} + mock_response = {"id": report_id, **updates} + mock_request.return_value = mock_response + + result = reports_api.update(report_id, updates) + + assert result == mock_response + call_args = mock_request.call_args + assert call_args[0][0] == 'PUT' + assert f'reports/{report_id}' in call_args[0][1] + + @patch('rapid7.api.reports.BaseAPI._request') + def test_delete_report(self, mock_request, reports_api): + """Test deleting a report configuration.""" + report_id = 123 + mock_request.return_value = {} + + result = reports_api.delete_report(report_id) + + mock_request.assert_called_once_with('DELETE', f'reports/{report_id}') + + @patch('rapid7.api.reports.BaseAPI._request') + def test_generate_report(self, mock_request, reports_api): + """Test generating a report.""" + report_id = 123 + mock_response = {"id": "instance-789"} # API returns dict + mock_request.return_value = mock_response + + result = reports_api.generate(report_id) + + assert result == "instance-789" # Method extracts and converts to string + call_args = mock_request.call_args + assert call_args[0][0] == 'POST' + assert f'reports/{report_id}/generate' in call_args[0][1] + + @patch('rapid7.api.reports.BaseAPI._request') + def test_get_report_instance(self, mock_request, reports_api): + """Test getting a report instance.""" + report_id = 123 + instance_id = "instance-789" + mock_response = { + "id": instance_id, + "status": "complete", + "uri": "/reports/123/history/instance-789" + } + mock_request.return_value = mock_response + + result = reports_api.get_instance(report_id, instance_id) + + assert result == mock_response + assert result['status'] == "complete" + + @patch('rapid7.api.reports.BaseAPI._request') + def test_download_report(self, mock_request, reports_api): + """Test downloading a report.""" + report_id = 123 + instance_id = "instance-789" + mock_content = b"PDF report content" + + mock_response = Mock() + mock_response.content = mock_content + mock_request.return_value = mock_response + + result = reports_api.download(report_id, instance_id) + + assert result == mock_content # Method returns response.content (bytes) + call_args = mock_request.call_args + # Verify return_raw=True for binary content + assert call_args[1].get('return_raw') is True + + @patch('rapid7.api.reports.BaseAPI._request') + def test_get_templates(self, mock_request, reports_api): + """Test listing available report templates.""" + mock_response = { + "resources": [ + {"id": "audit-report", "name": "Audit Report"}, + {"id": "vulnerability-report", "name": "Vulnerability Report"} + ] + } + mock_request.return_value = mock_response + + result = reports_api.get_templates() + + assert result == mock_response + assert len(result['resources']) == 2 + + @patch('rapid7.api.reports.BaseAPI._request') + def test_list_with_pagination(self, mock_request, reports_api): + """Test list respects page size limits.""" + mock_response = {"resources": [], "page": {}} + mock_request.return_value = mock_response + + # Request larger than MAX_PAGE_SIZE + reports_api.list(page=0, size=1000) + + call_args = mock_request.call_args + # Should be capped at MAX_PAGE_SIZE (500) + assert call_args[1]['params']['size'] == 500 + + +class TestReportsAPIHistory: + """Test report history operations.""" + + @pytest.fixture + def reports_api(self, mock_auth): + """Create ReportsAPI instance.""" + return ReportsAPI(mock_auth) + + @patch('rapid7.api.reports.BaseAPI._request') + def test_get_report_history(self, mock_request, reports_api): + """Test getting report generation history.""" + report_id = 123 + mock_response = { + "resources": [ + {"id": "instance-1", "status": "complete", "generated": "2025-01-01"}, + {"id": "instance-2", "status": "complete", "generated": "2025-01-02"} + ] + } + mock_request.return_value = mock_response + + result = reports_api.get_history(report_id) + + assert result == mock_response + assert len(result['resources']) == 2 + call_args = mock_request.call_args + assert f'reports/{report_id}/history' in call_args[0][1] + + +class TestReportsAPIFormats: + """Test report format operations.""" + + @pytest.fixture + def reports_api(self, mock_auth): + """Create ReportsAPI instance.""" + return ReportsAPI(mock_auth) + + @patch('rapid7.api.reports.BaseAPI._request') + def test_get_formats(self, mock_request, reports_api): + """Test getting available report formats.""" + mock_response = { + "resources": [ + {"id": "pdf", "name": "PDF"}, + {"id": "html", "name": "HTML"}, + {"id": "csv", "name": "CSV"} + ] + } + mock_request.return_value = mock_response + + result = reports_api.get_formats() + + assert result == mock_response + assert len(result['resources']) == 3 + + +class TestReportsAPIErrorHandling: + """Test error handling in ReportsAPI.""" + + @pytest.fixture + def reports_api(self, mock_auth): + """Create ReportsAPI instance.""" + return ReportsAPI(mock_auth) + + @patch('rapid7.api.reports.BaseAPI._request') + def test_get_nonexistent_report(self, mock_request, reports_api): + """Test handling of 404 for nonexistent report.""" + import requests + mock_request.side_effect = requests.HTTPError("404 Not Found") + + with pytest.raises(requests.HTTPError): + reports_api.get_report(99999) + + @patch('rapid7.api.reports.BaseAPI._request') + def test_generate_with_invalid_config(self, mock_request, reports_api): + """Test generating report with invalid configuration.""" + import requests + mock_request.side_effect = requests.HTTPError("400 Bad Request") + + with pytest.raises(requests.HTTPError): + reports_api.generate(123) + + +class TestReportsAPIIntegration: + """Test integration scenarios.""" + + @pytest.fixture + def reports_api(self, mock_auth): + """Create ReportsAPI instance.""" + return ReportsAPI(mock_auth) + + @patch('rapid7.api.reports.BaseAPI._request') + def test_report_creation_and_generation(self, mock_request, reports_api): + """Test complete report workflow.""" + # Mock responses for different operations + report_config = {"name": "Test Report", "format": "pdf"} + report_id = 456 + instance_id = "instance-789" + + mock_request.side_effect = [ + {"id": report_id}, # create + {"id": instance_id}, # generate returns dict, method extracts id as string + {"id": instance_id, "status": "complete"}, # get_instance + Mock(content=b"PDF content") # download returns Response with content + ] + + # Create report + created = reports_api.create(report_config) + assert created['id'] == report_id + + # Generate report + instance = reports_api.generate(report_id) + assert instance == instance_id + + # Check status + status = reports_api.get_instance(report_id, instance_id) + assert status['status'] == "complete" + + # Download report + content = reports_api.download(report_id, instance_id) + assert content == b"PDF content" # download returns bytes + + # Verify all calls were made + assert mock_request.call_count == 4 + + @patch('rapid7.api.reports.BaseAPI._request') + def test_report_update_and_delete(self, mock_request, reports_api): + """Test report update and deletion.""" + report_id = 123 + + mock_request.side_effect = [ + {"id": report_id, "name": "Updated Name"}, # update + {} # delete_report + ] + + # Update report + updated = reports_api.update(report_id, {"name": "Updated Name"}) + assert updated['name'] == "Updated Name" + + # Delete report + reports_api.delete_report(report_id) + + # Verify both calls were made + assert mock_request.call_count == 2 diff --git a/tests/test_rapid7/test_scans.py b/tests/test_rapid7/test_scans.py new file mode 100644 index 0000000..0ee6ad1 --- /dev/null +++ b/tests/test_rapid7/test_scans.py @@ -0,0 +1,321 @@ +""" +Tests for Rapid7 InsightVM Scans API module. + +Tests the scans.py API module for scan lifecycle management. +""" + +import pytest +from unittest.mock import Mock, patch + +from rapid7.api.scans import ScansAPI + + +class TestScansAPI: + """Test ScansAPI functionality.""" + + @pytest.fixture + def mock_auth(self): + """Mock authentication for testing.""" + auth = Mock() + auth.base_url = "https://test.insightvm.example.com:3780" + auth.auth = Mock() + return auth + + @pytest.fixture + def scans_api(self, mock_auth): + """Create ScansAPI instance for testing.""" + return ScansAPI(mock_auth) + + def test_init(self, mock_auth): + """Test ScansAPI initialization.""" + api = ScansAPI(mock_auth) + assert api.auth == mock_auth + assert hasattr(api, 'MAX_PAGE_SIZE') + assert api.MAX_PAGE_SIZE == 500 + + @patch('rapid7.api.scans.BaseAPI._request') + def test_list_scans(self, mock_request, scans_api): + """Test listing all scans.""" + mock_response = { + "resources": [ + {"id": 1, "scanName": "Test Scan 1", "status": "running"}, + {"id": 2, "scanName": "Test Scan 2", "status": "finished"} + ], + "page": {"number": 0, "size": 10, "totalResources": 2} + } + mock_request.return_value = mock_response + + result = scans_api.list(page=0, size=10) + + assert result == mock_response + assert len(result['resources']) == 2 + mock_request.assert_called_once() + + @patch('rapid7.api.scans.BaseAPI._request') + def test_list_active_scans(self, mock_request, scans_api): + """Test listing only active scans.""" + mock_response = { + "resources": [{"id": 1, "status": "running"}], + "page": {"number": 0, "size": 10} + } + mock_request.return_value = mock_response + + result = scans_api.list(active=True) + + assert result == mock_response + mock_request.assert_called_once() + # Verify active parameter was passed (as string 'true') + call_args = mock_request.call_args + assert call_args[1]['params']['active'] == 'true' + + @patch('rapid7.api.scans.BaseAPI._request') + def test_get_scan(self, mock_request, scans_api): + """Test getting a specific scan.""" + scan_id = 123 + mock_response = { + "id": scan_id, + "scanName": "Security Audit", + "status": "running", + "progress": 45.5 + } + mock_request.return_value = mock_response + + result = scans_api.get_scan(scan_id) + + assert result == mock_response + assert result['id'] == scan_id + mock_request.assert_called_once_with('GET', f'scans/{scan_id}') + + @patch('rapid7.api.scans.BaseAPI._request') + def test_start_site_scan(self, mock_request, scans_api): + """Test starting a scan for a site.""" + site_id = 456 + mock_response = {"id": 789} # API returns dict with id + mock_request.return_value = mock_response + + result = scans_api.start_site_scan( + site_id=site_id, + scan_name="Test Scan", + scan_template_id="full-audit" + ) + + assert result == 789 # Method extracts the ID + mock_request.assert_called_once() + # Verify it's a POST request to the correct endpoint + call_args = mock_request.call_args + assert call_args[0][0] == 'POST' + assert f'sites/{site_id}/scans' in call_args[0][1] + + @patch('rapid7.api.scans.BaseAPI._request') + def test_start_site_scan_with_hosts(self, mock_request, scans_api): + """Test starting a scan with specific hosts.""" + site_id = 456 + hosts = ["192.168.1.100", "192.168.1.101"] + mock_response = {"id": 789} # API returns dict with id + mock_request.return_value = mock_response + + result = scans_api.start_site_scan( + site_id=site_id, + hosts=hosts + ) + + assert result == 789 # Method extracts the ID + call_args = mock_request.call_args + assert 'hosts' in call_args[1]['json'] + assert call_args[1]['json']['hosts'] == hosts + + @patch('rapid7.api.scans.BaseAPI._request') + def test_stop_scan(self, mock_request, scans_api): + """Test stopping a running scan.""" + scan_id = 123 + mock_request.return_value = {} + + result = scans_api.stop_scan(scan_id) + + mock_request.assert_called_once_with('POST', f'scans/{scan_id}/stop') + + @patch('rapid7.api.scans.BaseAPI._request') + def test_pause_scan(self, mock_request, scans_api): + """Test pausing a running scan.""" + scan_id = 123 + mock_request.return_value = {} + + result = scans_api.pause_scan(scan_id) + + mock_request.assert_called_once_with('POST', f'scans/{scan_id}/pause') + + @patch('rapid7.api.scans.BaseAPI._request') + def test_resume_scan(self, mock_request, scans_api): + """Test resuming a paused scan.""" + scan_id = 123 + mock_request.return_value = {} + + result = scans_api.resume_scan(scan_id) + + mock_request.assert_called_once_with('POST', f'scans/{scan_id}/resume') + + @patch('rapid7.api.scans.ScansAPI.get_scan') + def test_get_scan_summary(self, mock_get_scan, scans_api): + """Test getting scan summary.""" + scan_id = 123 + mock_full_scan = { + "id": scan_id, + "scanName": "Test Scan", + "status": "running", + "assets": 50, + "vulnerabilities": { + "critical": 5, + "severe": 10, + "moderate": 20, + "total": 35 + }, + "duration": "00:30:00" + } + mock_get_scan.return_value = mock_full_scan + + result = scans_api.get_scan_summary(scan_id) + + assert result['status'] == "running" + assert result['assets_scanned'] == 50 + assert result['vulnerabilities']['critical'] == 5 + + @patch('rapid7.api.scans.BaseAPI._request') + def test_list_with_pagination(self, mock_request, scans_api): + """Test list respects page size limits.""" + mock_response = {"resources": [], "page": {}} + mock_request.return_value = mock_response + + # Request larger than MAX_PAGE_SIZE + scans_api.list(page=0, size=1000) + + call_args = mock_request.call_args + # Should be capped at MAX_PAGE_SIZE (500) + assert call_args[1]['params']['size'] == 500 + + @patch('rapid7.api.scans.BaseAPI._request') + def test_list_with_sort(self, mock_request, scans_api): + """Test list with sort parameters.""" + mock_response = {"resources": [], "page": {}} + mock_request.return_value = mock_response + + sort_params = ["startTime,DESC", "scanName,ASC"] + scans_api.list(sort=sort_params) + + call_args = mock_request.call_args + assert call_args[1]['params']['sort'] == sort_params + + +class TestScansAPIUtilityMethods: + """Test utility methods.""" + + @pytest.fixture + def scans_api(self, mock_auth): + """Create ScansAPI instance.""" + return ScansAPI(mock_auth) + + @patch('rapid7.api.scans.ScansAPI.list') + def test_get_active_scans(self, mock_list, scans_api): + """Test getting active scans.""" + mock_response = [ + {"id": 1, "status": "running"}, + {"id": 2, "status": "running"} + ] + mock_list.return_value = {"resources": mock_response} + + result = scans_api.get_active_scans() + + assert len(result) == 2 + assert all(scan['status'] == 'running' for scan in result) + + +class TestScansAPIScanHistory: + """Test scan history operations.""" + + @pytest.fixture + def scans_api(self, mock_auth): + """Create ScansAPI instance.""" + return ScansAPI(mock_auth) + + @patch('rapid7.api.scans.BaseAPI._request') + def test_get_site_scans(self, mock_request, scans_api): + """Test getting scans for a specific site.""" + site_id = 123 + mock_response = { + "resources": [ + {"id": 1, "scanName": "Scan 1"}, + {"id": 2, "scanName": "Scan 2"} + ] + } + mock_request.return_value = mock_response + + result = scans_api.get_site_scans(site_id) + + assert result == mock_response + mock_request.assert_called_once() + call_args = mock_request.call_args + assert f'sites/{site_id}/scans' in call_args[0][1] + + +class TestScansAPIErrorHandling: + """Test error handling in ScansAPI.""" + + @pytest.fixture + def scans_api(self, mock_auth): + """Create ScansAPI instance.""" + return ScansAPI(mock_auth) + + @patch('rapid7.api.scans.BaseAPI._request') + def test_get_nonexistent_scan(self, mock_request, scans_api): + """Test handling of 404 for nonexistent scan.""" + import requests + mock_request.side_effect = requests.HTTPError("404 Not Found") + + with pytest.raises(requests.HTTPError): + scans_api.get_scan(99999) + + @patch('rapid7.api.scans.BaseAPI._request') + def test_stop_already_stopped_scan(self, mock_request, scans_api): + """Test stopping an already stopped scan.""" + import requests + mock_request.side_effect = requests.HTTPError("400 Bad Request") + + with pytest.raises(requests.HTTPError): + scans_api.stop_scan(123) + + +class TestScansAPIIntegration: + """Test integration scenarios.""" + + @pytest.fixture + def scans_api(self, mock_auth): + """Create ScansAPI instance.""" + return ScansAPI(mock_auth) + + @patch('rapid7.api.scans.BaseAPI._request') + def test_scan_lifecycle(self, mock_request, scans_api): + """Test complete scan lifecycle.""" + # Mock responses for different operations + scan_id = 456 + mock_request.side_effect = [ + {"id": scan_id}, # start_site_scan returns dict, method extracts id + {"id": scan_id, "status": "running"}, # get_scan + {}, # pause_scan + {}, # resume_scan + {} # stop_scan + ] + + # Start scan + result = scans_api.start_site_scan(site_id=123) + assert result == scan_id + + # Check status + status = scans_api.get_scan(scan_id) + assert status['status'] == "running" + + # Pause, resume, stop + scans_api.pause_scan(scan_id) + scans_api.resume_scan(scan_id) + scans_api.stop_scan(scan_id) + + # Verify all calls were made + assert mock_request.call_count == 5 diff --git a/tests/test_rapid7/test_sites.py b/tests/test_rapid7/test_sites.py new file mode 100644 index 0000000..58729c1 --- /dev/null +++ b/tests/test_rapid7/test_sites.py @@ -0,0 +1,262 @@ +""" +Tests for Rapid7 InsightVM Sites API module. + +Tests the sites.py API module for site management operations. +""" + +import pytest +from unittest.mock import Mock, patch + +from rapid7.api.sites import SiteAPI + + +class TestSiteAPI: + """Test SiteAPI functionality.""" + + @pytest.fixture + def mock_auth(self): + """Mock authentication for testing.""" + auth = Mock() + auth.base_url = "https://test.insightvm.example.com:3780" + auth.auth = Mock() + return auth + + @pytest.fixture + def sites_api(self, mock_auth): + """Create SiteAPI instance for testing.""" + return SiteAPI(mock_auth) + + def test_init(self, mock_auth): + """Test SiteAPI initialization.""" + api = SiteAPI(mock_auth) + assert api.auth == mock_auth + assert hasattr(api, 'list') + assert hasattr(api, 'get_site') + + @patch('rapid7.api.sites.BaseAPI._request') + def test_list_sites(self, mock_request, sites_api): + """Test listing all sites.""" + mock_response = { + "resources": [ + {"id": 1, "name": "Corporate Network", "importance": "high"}, + {"id": 2, "name": "DMZ", "importance": "critical"} + ], + "page": {"number": 0, "size": 10} + } + mock_request.return_value = mock_response + + result = sites_api.list(page=0, size=10) + + assert result == mock_response + assert len(result['resources']) == 2 + mock_request.assert_called_once() + + @patch('rapid7.api.sites.BaseAPI._request') + def test_get_site(self, mock_request, sites_api): + """Test getting a specific site.""" + site_id = 123 + mock_response = { + "id": site_id, + "name": "Test Site", + "importance": "high", + "riskScore": 75000 + } + mock_request.return_value = mock_response + + result = sites_api.get_site(site_id) + + assert result == mock_response + assert result['id'] == site_id + mock_request.assert_called_once_with('GET', f'sites/{site_id}') + + @patch('rapid7.api.sites.BaseAPI._request') + def test_create_site(self, mock_request, sites_api): + """Test creating a new site.""" + site_config = { + "name": "New Site", + "description": "Test site", + "importance": "high", + "scanTemplate": "full-audit" + } + mock_response = {"id": 456} + mock_request.return_value = mock_response + + result = sites_api.create(name="New Site", description="Test site") + + assert result == mock_response + assert result['id'] == 456 + call_args = mock_request.call_args + assert call_args[0][0] == 'POST' + + @patch('rapid7.api.sites.BaseAPI._request') + def test_update_site(self, mock_request, sites_api): + """Test updating a site.""" + site_id = 123 + mock_response = {"id": site_id} + mock_request.return_value = mock_response + + result = sites_api.update(site_id, name="Updated Site Name") + + assert result == mock_response + call_args = mock_request.call_args + assert call_args[0][0] == 'PUT' + assert f'sites/{site_id}' in call_args[0][1] + + @patch('rapid7.api.sites.BaseAPI._request') + def test_delete_site(self, mock_request, sites_api): + """Test deleting a site.""" + site_id = 123 + mock_request.return_value = {} + + result = sites_api.delete_site(site_id) + + mock_request.assert_called_once_with('DELETE', f'sites/{site_id}') + + @patch('rapid7.api.sites.BaseAPI._request') + def test_get_site_assets(self, mock_request, sites_api): + """Test getting assets for a site.""" + site_id = 123 + mock_response = { + "resources": [ + {"id": 1, "hostname": "server01.example.com"}, + {"id": 2, "hostname": "server02.example.com"} + ] + } + mock_request.return_value = mock_response + + result = sites_api.get_assets(site_id) + + assert result == mock_response + assert len(result['resources']) == 2 + call_args = mock_request.call_args + assert f'sites/{site_id}/assets' in call_args[0][1] + + @patch('rapid7.api.sites.BaseAPI._request') + def test_list_with_parameters(self, mock_request, sites_api): + """Test list with parameters.""" + mock_response = {"resources": [], "page": {}} + mock_request.return_value = mock_response + + sites_api.list(page=0, size=100) + + mock_request.assert_called_once() + # Just verify it was called with GET + call_args = mock_request.call_args + assert call_args[0][0] == 'GET' + + +class TestSiteAPIConfiguration: + """Test site configuration operations.""" + + @pytest.fixture + def sites_api(self, mock_auth): + """Create SiteAPI instance.""" + return SiteAPI(mock_auth) + + @patch('rapid7.api.sites.BaseAPI._request') + def test_get_scan_template(self, mock_request, sites_api): + """Test getting site scan template.""" + site_id = 123 + mock_response = "full-audit-without-web-spider" + mock_request.return_value = mock_response + + result = sites_api.get_scan_template(site_id) + + assert result == mock_response + call_args = mock_request.call_args + assert f'sites/{site_id}/scan_template' in call_args[0][1] + + @patch('rapid7.api.sites.BaseAPI._request') + def test_set_scan_template(self, mock_request, sites_api): + """Test setting site scan template.""" + site_id = 123 + template_id = "full-audit" + mock_request.return_value = {} + + result = sites_api.set_scan_template(site_id, template_id) + + call_args = mock_request.call_args + assert call_args[0][0] == 'PUT' + assert f'sites/{site_id}/scan_template' in call_args[0][1] + + @patch('rapid7.api.sites.BaseAPI._request') + def test_get_scan_engine(self, mock_request, sites_api): + """Test getting site scan engine.""" + site_id = 123 + mock_response = {"id": 5, "name": "Engine 1"} + mock_request.return_value = mock_response + + result = sites_api.get_scan_engine(site_id) + + assert result == mock_response + call_args = mock_request.call_args + assert f'sites/{site_id}/scan_engine' in call_args[0][1] + + @patch('rapid7.api.sites.BaseAPI._request') + def test_set_scan_engine(self, mock_request, sites_api): + """Test setting site scan engine.""" + site_id = 123 + engine_id = 5 + mock_request.return_value = {} + + result = sites_api.set_scan_engine(site_id, engine_id) + + call_args = mock_request.call_args + assert call_args[0][0] == 'PUT' + assert f'sites/{site_id}/scan_engine' in call_args[0][1] + + +class TestSiteAPIErrorHandling: + """Test error handling in SiteAPI.""" + + @pytest.fixture + def sites_api(self, mock_auth): + """Create SiteAPI instance.""" + return SiteAPI(mock_auth) + + @patch('rapid7.api.sites.BaseAPI._request') + def test_get_nonexistent_site(self, mock_request, sites_api): + """Test handling of 404 for nonexistent site.""" + import requests + mock_request.side_effect = requests.HTTPError("404 Not Found") + + with pytest.raises(requests.HTTPError): + sites_api.get_site(99999) + + +class TestSiteAPIIntegration: + """Test integration scenarios.""" + + @pytest.fixture + def sites_api(self, mock_auth): + """Create SiteAPI instance.""" + return SiteAPI(mock_auth) + + @patch('rapid7.api.sites.BaseAPI._request') + def test_site_creation_and_configuration(self, mock_request, sites_api): + """Test complete site workflow.""" + site_id = 456 + + mock_request.side_effect = [ + {"id": site_id}, # create + {}, # set_scan_template + {}, # set_scan_engine + {"id": site_id, "name": "New Site"} # get + ] + + # Create site + created = sites_api.create({"name": "New Site"}) + assert created['id'] == site_id + + # Configure scan template + sites_api.set_scan_template(site_id, "full-audit") + + # Configure scan engine + sites_api.set_scan_engine(site_id, 5) + + # Verify site + site = sites_api.get_site(site_id) + assert site['name'] == "New Site" + + # Verify all calls were made + assert mock_request.call_count == 4 diff --git a/tests/test_ui.py b/tests/test_ui.py new file mode 100644 index 0000000..5a6041f --- /dev/null +++ b/tests/test_ui.py @@ -0,0 +1,289 @@ +""" +Tests for Rapid7 InsightVM UI module. + +Tests the ui.py module for user interface utilities. +""" + +import pytest +from unittest.mock import Mock, patch, MagicMock +from io import StringIO +import sys + +from rapid7.ui import Color, UI, SimpleProgressBar, create_ui + + +class TestColor: + """Test Color enum.""" + + def test_color_enum_values(self): + """Test that Color enum has expected values.""" + assert Color.RESET.value == '\033[0m' + assert Color.RED.value == '\033[91m' + assert Color.GREEN.value == '\033[92m' + assert Color.YELLOW.value == '\033[93m' + assert Color.BLUE.value == '\033[94m' + + +class TestUI: + """Test UI class functionality.""" + + @pytest.fixture + def ui_with_colors(self): + """Create UI instance with colored output.""" + with patch('rapid7.ui.get_config') as mock_config: + config = Mock() + config.get_preference.return_value = True + mock_config.return_value = config + + with patch('rapid7.ui.RICH_AVAILABLE', False): + ui = UI() + ui.colored = True + return ui + + @pytest.fixture + def ui_without_colors(self): + """Create UI instance without colored output.""" + with patch('rapid7.ui.get_config') as mock_config: + config = Mock() + config.get_preference.return_value = False + mock_config.return_value = config + + with patch('rapid7.ui.RICH_AVAILABLE', False): + ui = UI() + ui.colored = False + return ui + + def test_ui_init(self): + """Test UI initialization.""" + with patch('rapid7.ui.get_config') as mock_config: + config = Mock() + config.get_preference.return_value = True + mock_config.return_value = config + + ui = UI() + + assert ui.config is not None + assert hasattr(ui, 'colored') + + def test_print_success_with_colors(self, ui_with_colors, capsys): + """Test printing success message with colors.""" + ui_with_colors.print_success("Test message") + + captured = capsys.readouterr() + assert "Test message" in captured.out + assert "✓" in captured.out + + def test_print_success_without_colors(self, ui_without_colors, capsys): + """Test printing success message without colors.""" + ui_without_colors.print_success("Test message") + + captured = capsys.readouterr() + assert "Test message" in captured.out + assert "✓" in captured.out + # Should not contain ANSI codes + assert '\033[' not in captured.out + + def test_print_error_with_colors(self, ui_with_colors, capsys): + """Test printing error message with colors.""" + ui_with_colors.print_error("Error message") + + captured = capsys.readouterr() + assert "Error message" in captured.err + assert "✗" in captured.err + + def test_print_error_without_colors(self, ui_without_colors, capsys): + """Test printing error message without colors.""" + ui_without_colors.print_error("Error message") + + captured = capsys.readouterr() + assert "Error message" in captured.err + assert "✗" in captured.err + assert '\033[' not in captured.err + + def test_print_warning_with_colors(self, ui_with_colors, capsys): + """Test printing warning message with colors.""" + ui_with_colors.print_warning("Warning message") + + captured = capsys.readouterr() + assert "Warning message" in captured.out + assert "⚠" in captured.out + + def test_print_info_with_colors(self, ui_with_colors, capsys): + """Test printing info message with colors.""" + ui_with_colors.print_info("Info message") + + captured = capsys.readouterr() + assert "Info message" in captured.out + assert "ℹ" in captured.out + + def test_print_header(self, ui_without_colors, capsys): + """Test printing header.""" + ui_without_colors.print_header("Test Header") + + captured = capsys.readouterr() + assert "Test Header" in captured.out + assert "=" in captured.out + + def test_print_separator(self, ui_without_colors, capsys): + """Test printing separator.""" + ui_without_colors.print_separator() + + captured = capsys.readouterr() + assert "-" * 80 in captured.out + + def test_print_table(self, ui_without_colors, capsys): + """Test printing table.""" + headers = ["Name", "Age", "City"] + rows = [ + ["Alice", "30", "New York"], + ["Bob", "25", "Los Angeles"] + ] + + ui_without_colors.print_table("User Data", headers, rows) + + captured = capsys.readouterr() + assert "User Data" in captured.out + assert "Name" in captured.out + assert "Alice" in captured.out + assert "Bob" in captured.out + + def test_confirm_yes(self, ui_without_colors): + """Test confirm prompt with yes answer.""" + with patch('builtins.input', return_value='y'): + result = ui_without_colors.confirm("Continue?") + assert result is True + + def test_confirm_no(self, ui_without_colors): + """Test confirm prompt with no answer.""" + with patch('builtins.input', return_value='n'): + result = ui_without_colors.confirm("Continue?") + assert result is False + + def test_confirm_default(self, ui_without_colors): + """Test confirm prompt with default value.""" + with patch('builtins.input', return_value=''): + result = ui_without_colors.confirm("Continue?", default=True) + assert result is True + + def test_prompt(self, ui_without_colors): + """Test text prompt.""" + with patch('builtins.input', return_value='test value'): + result = ui_without_colors.prompt("Enter value:") + assert result == "test value" + + def test_prompt_with_default(self, ui_without_colors): + """Test text prompt with default value.""" + with patch('builtins.input', return_value=''): + result = ui_without_colors.prompt("Enter value:", default="default") + assert result == "default" + + def test_select_menu(self, ui_without_colors): + """Test selecting from a menu.""" + options = ["Option 1", "Option 2", "Option 3"] + + with patch('builtins.input', return_value='2'): + result = ui_without_colors.select_menu("Choose:", options) + assert result == 1 # Zero-indexed + + def test_progress_bar_creation(self, ui_without_colors): + """Test creating a progress bar.""" + progress = ui_without_colors.progress_bar("Processing", total=100) + + assert progress is not None + assert isinstance(progress, SimpleProgressBar) + + +class TestSimpleProgressBar: + """Test SimpleProgressBar class.""" + + @pytest.fixture + def mock_ui(self): + """Create a mock UI for SimpleProgressBar.""" + with patch('rapid7.ui.get_config') as mock_config: + config = Mock() + config.get_preference.return_value = True + mock_config.return_value = config + return UI() + + def test_simple_progress_bar_init(self, mock_ui): + """Test SimpleProgressBar initialization.""" + progress = SimpleProgressBar("Test", total=100, ui=mock_ui) + + assert progress.description == "Test" + assert progress.total == 100 + assert progress.current == 0 + + def test_simple_progress_bar_context_manager(self, mock_ui, capsys): + """Test SimpleProgressBar as context manager.""" + with SimpleProgressBar("Test", total=100, ui=mock_ui) as progress: + progress.update(50) + + captured = capsys.readouterr() + assert "Test" in captured.out + assert "Done" in captured.out + + def test_simple_progress_bar_update(self, mock_ui, capsys): + """Test updating progress bar.""" + with SimpleProgressBar("Test", total=100, ui=mock_ui) as progress: + progress.update(50) + + captured = capsys.readouterr() + assert "Test" in captured.out + + +class TestCreateUI: + """Test create_ui helper function.""" + + def test_create_ui_returns_ui_instance(self): + """Test that create_ui returns a UI instance.""" + with patch('rapid7.ui.get_config') as mock_config: + config = Mock() + config.get_preference.return_value = True + mock_config.return_value = config + + ui = create_ui() + + assert isinstance(ui, UI) + assert hasattr(ui, 'print_success') + assert hasattr(ui, 'print_error') + + +class TestUIWithRich: + """Test UI class with rich library available.""" + + @pytest.fixture + def ui_with_rich(self): + """Create UI instance with rich available.""" + with patch('rapid7.ui.get_config') as mock_config: + config = Mock() + config.get_preference.return_value = True + mock_config.return_value = config + + with patch('rapid7.ui.RICH_AVAILABLE', True): + with patch('rapid7.ui.Console') as mock_console_class: + mock_console = Mock() + mock_console_class.return_value = mock_console + + ui = UI() + ui.console = mock_console + return ui + + def test_print_success_with_rich(self, ui_with_rich): + """Test printing success with rich.""" + ui_with_rich.print_success("Test message") + + # Verify console.print was called + ui_with_rich.console.print.assert_called_once() + call_args = ui_with_rich.console.print.call_args[0][0] + assert "Test message" in call_args + assert "[green]" in call_args + + def test_print_error_with_rich(self, ui_with_rich): + """Test printing error with rich.""" + ui_with_rich.print_error("Error message") + + # Verify console.print was called with stderr + ui_with_rich.console.print.assert_called_once() + call_args = ui_with_rich.console.print.call_args[0][0] + assert "Error message" in call_args + assert "[red]" in call_args