diff --git a/celery_app.py b/celery_app.py new file mode 100644 index 00000000..1d232165 --- /dev/null +++ b/celery_app.py @@ -0,0 +1,81 @@ +"""Celery application factory and configuration for Sample Platform.""" + +import os + +from celery import Celery +from celery.schedules import crontab + +# Load configuration - use empty dict for testing when config.py doesn't exist +try: + from config_parser import parse_config + config = parse_config('config') +except Exception: + # In test environment, config.py may not exist + config = {} + + +def make_celery(app=None): + """ + Create a Celery application configured for the Sample Platform. + + :param app: Optional Flask application for context binding + :return: Configured Celery application + """ + celery_app = Celery( + 'sample_platform', + broker=config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), + backend=config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), + include=['mod_ci.tasks'] + ) + + # Apply configuration from config.py + celery_app.conf.update( + task_serializer=config.get('CELERY_TASK_SERIALIZER', 'json'), + result_serializer=config.get('CELERY_RESULT_SERIALIZER', 'json'), + accept_content=config.get('CELERY_ACCEPT_CONTENT', ['json']), + timezone=config.get('CELERY_TIMEZONE', 'UTC'), + enable_utc=config.get('CELERY_ENABLE_UTC', True), + task_acks_late=config.get('CELERY_TASK_ACKS_LATE', True), + worker_prefetch_multiplier=config.get('CELERY_WORKER_PREFETCH_MULTIPLIER', 1), + task_reject_on_worker_lost=config.get('CELERY_TASK_REJECT_ON_WORKER_LOST', True), + task_soft_time_limit=config.get('CELERY_TASK_SOFT_TIME_LIMIT', 3600), + task_time_limit=config.get('CELERY_TASK_TIME_LIMIT', 3900), + ) + + # Beat schedule for periodic tasks + celery_app.conf.beat_schedule = { + 'check-expired-instances-every-5-minutes': { + 'task': 'mod_ci.tasks.check_expired_instances_task', + 'schedule': crontab(minute='*/5'), + 'options': {'queue': 'maintenance'} + }, + 'process-pending-tests-every-minute': { + 'task': 'mod_ci.tasks.process_pending_tests_task', + 'schedule': crontab(minute='*'), + 'options': {'queue': 'default'} + }, + } + + # Queue routing + celery_app.conf.task_routes = { + 'mod_ci.tasks.start_test_task': {'queue': 'test_execution'}, + 'mod_ci.tasks.check_expired_instances_task': {'queue': 'maintenance'}, + 'mod_ci.tasks.process_pending_tests_task': {'queue': 'default'}, + } + + # If Flask app is provided, bind tasks to its context + if app is not None: + class ContextTask(celery_app.Task): + """Task base class that maintains Flask application context.""" + + def __call__(self, *args, **kwargs): + with app.app_context(): + return self.run(*args, **kwargs) + + celery_app.Task = ContextTask + + return celery_app + + +# Create the default celery instance (used by worker when started standalone) +celery = make_celery() diff --git a/config_sample.py b/config_sample.py index 34f30a46..2b2fd64f 100755 --- a/config_sample.py +++ b/config_sample.py @@ -37,3 +37,21 @@ GCP_INSTANCE_MAX_RUNTIME = 120 # In minutes GCS_BUCKET_NAME = 'spdev' GCS_SIGNED_URL_EXPIRY_LIMIT = 720 # In minutes + + +# CELERY TASK QUEUE CONFIG +CELERY_BROKER_URL = 'redis://localhost:6379/0' +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_SERIALIZER = 'json' +CELERY_ACCEPT_CONTENT = ['json'] +CELERY_TIMEZONE = 'UTC' +CELERY_ENABLE_UTC = True +CELERY_TASK_ACKS_LATE = True # Task acknowledged after completion +CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # One task at a time per worker +CELERY_TASK_REJECT_ON_WORKER_LOST = True # Requeue tasks if worker dies +CELERY_TASK_SOFT_TIME_LIMIT = 3600 # 1 hour soft limit +CELERY_TASK_TIME_LIMIT = 3900 # 1 hour 5 minutes hard limit + +# Feature flag for gradual migration (set to True to enable Celery, False for cron fallback) +USE_CELERY_TASKS = False diff --git a/install/celery-beat.service b/install/celery-beat.service new file mode 100644 index 00000000..81fce68b --- /dev/null +++ b/install/celery-beat.service @@ -0,0 +1,23 @@ +[Unit] +Description=Sample Platform Celery Beat Scheduler +After=network.target redis.service celery-worker.service +Requires=redis.service + +[Service] +Type=simple +User=www-data +Group=www-data +WorkingDirectory=/var/www/sample-platform +Environment="PATH=/var/www/sample-platform/venv/bin" +ExecStart=/var/www/sample-platform/venv/bin/celery \ + -A celery_app.celery beat \ + --pidfile=/var/run/celery/beat.pid \ + --logfile=/var/www/sample-platform/logs/celery/beat.log \ + --loglevel=INFO \ + --schedule=/var/www/sample-platform/celerybeat-schedule +RuntimeDirectory=celery +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target diff --git a/install/celery-worker.service b/install/celery-worker.service new file mode 100644 index 00000000..d9c1cfaa --- /dev/null +++ b/install/celery-worker.service @@ -0,0 +1,30 @@ +[Unit] +Description=Sample Platform Celery Worker +After=network.target redis.service mysql.service +Requires=redis.service + +[Service] +Type=forking +User=www-data +Group=www-data +WorkingDirectory=/var/www/sample-platform +Environment="PATH=/var/www/sample-platform/venv/bin" +ExecStart=/var/www/sample-platform/venv/bin/celery \ + -A celery_app.celery multi start worker \ + --pidfile=/var/run/celery/%n.pid \ + --logfile=/var/www/sample-platform/logs/celery/%n%I.log \ + --loglevel=INFO \ + -Q default,test_execution,maintenance \ + --concurrency=2 +ExecStop=/var/www/sample-platform/venv/bin/celery \ + -A celery_app.celery multi stopwait worker \ + --pidfile=/var/run/celery/%n.pid +ExecReload=/var/www/sample-platform/venv/bin/celery \ + -A celery_app.celery multi restart worker \ + --pidfile=/var/run/celery/%n.pid +RuntimeDirectory=celery +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target diff --git a/install/installation.md b/install/installation.md index 0b577cec..839bcac0 100644 --- a/install/installation.md +++ b/install/installation.md @@ -217,6 +217,107 @@ The file `mod_ci/cron.py` is to be run in periodic intervals. To setup a cron jo ``` Change the `/var/www/sample-plaform` directory, if you have installed the platform in a different directory. +## Optional: Setting up Celery Task Queue + +As an alternative to cron-based polling, you can use Celery with Redis for event-driven test processing. This provides faster test execution, better retry handling, and parallel processing. + +### Installing Redis + +```bash +sudo apt update +sudo apt install redis-server + +# Configure Redis +sudo nano /etc/redis/redis.conf +# Set: supervised systemd +# Set: bind 127.0.0.1 ::1 + +# Enable and start Redis +sudo systemctl enable redis-server +sudo systemctl start redis-server + +# Verify Redis is running +redis-cli ping # Should return PONG +``` + +### Configuring Celery + +Add the following to your `config.py`: + +```python +# Celery Configuration +CELERY_BROKER_URL = 'redis://localhost:6379/0' +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' +USE_CELERY_TASKS = True # Set to False to use cron instead +``` + +### Installing Celery Services + +```bash +# Create log directory +sudo mkdir -p /var/www/sample-platform/logs/celery +sudo chown -R www-data:www-data /var/www/sample-platform/logs/celery + +# Create runtime directory +sudo mkdir -p /var/run/celery +sudo chown www-data:www-data /var/run/celery + +# Install systemd services +sudo cp /var/www/sample-platform/install/celery-worker.service /etc/systemd/system/ +sudo cp /var/www/sample-platform/install/celery-beat.service /etc/systemd/system/ + +# Reload systemd and enable services +sudo systemctl daemon-reload +sudo systemctl enable celery-worker celery-beat + +# Start the services +sudo systemctl start celery-worker +sudo systemctl start celery-beat +``` + +### Monitoring Celery + +```bash +# Check worker status +celery -A celery_app.celery inspect active + +# Check queue depth +redis-cli LLEN celery + +# View logs +tail -f /var/www/sample-platform/logs/celery/*.log + +# Optional: Install Flower for web-based monitoring +pip install flower +celery -A celery_app.celery flower --port=5555 +``` + +### Gradual Migration + +For a safe transition from cron to Celery: + +1. **Stage 1**: Set `USE_CELERY_TASKS = False` and keep cron running. Start Celery services and verify they work correctly in logs. + +2. **Stage 2**: Set `USE_CELERY_TASKS = True`. Reduce cron frequency to every 30 minutes as a fallback. + +3. **Stage 3**: Disable cron entirely once you're confident Celery is working correctly. + +### Rollback to Cron + +If you need to disable Celery: + +```bash +# Stop Celery services +sudo systemctl stop celery-beat celery-worker + +# Edit config.py and set USE_CELERY_TASKS = False + +# Restart platform +sudo systemctl restart platform + +# Ensure cron is running every 10 minutes +``` + ## GCS configuration to serve file downloads using Signed URLs To serve file downloads directly from the private GCS bucket, Signed download URLs have been used. diff --git a/mod_ci/controllers.py b/mod_ci/controllers.py index e60c0e0d..3affe558 100755 --- a/mod_ci/controllers.py +++ b/mod_ci/controllers.py @@ -122,6 +122,78 @@ def safe_db_commit(db, operation_description: str = "database operation") -> boo return False +# User-friendly messages for known GCP error codes +GCP_ERROR_MESSAGES = { + 'ZONE_RESOURCE_POOL_EXHAUSTED': ( + "GCP resources temporarily unavailable in the configured zone. " + "The test will be retried automatically when resources become available." + ), + 'QUOTA_EXCEEDED': ( + "GCP quota limit reached. Please wait for other tests to complete " + "or contact the administrator." + ), + 'RESOURCE_NOT_FOUND': "Required GCP resource not found. Please contact the administrator.", + 'RESOURCE_ALREADY_EXISTS': "A VM with this name already exists. Please contact the administrator.", + 'TIMEOUT': "GCP operation timed out. The test will be retried automatically.", +} + + +def parse_gcp_error(result: Dict, log=None) -> str: + """ + Parse a GCP API error response and return a user-friendly message. + + GCP errors have the structure: + { + 'error': { + 'errors': [{'code': 'ERROR_CODE', 'message': '...'}] + } + } + + For known error codes, returns a user-friendly message. + For unknown errors, logs the details server-side and returns a generic message + to avoid exposing potentially sensitive information. + + :param result: The GCP API response dictionary + :param log: Optional logger instance. If not provided, uses module logger. + :return: A user-friendly error message + """ + import logging + if log is None: + log = logging.getLogger('Platform') + + if not isinstance(result, dict): + log.error(f"GCP error (non-dict): {result}") + return "VM creation failed. Please contact the administrator." + + error = result.get('error') + if error is None: + log.error(f"GCP error (no error key): {result}") + return "VM creation failed. Please contact the administrator." + + if not isinstance(error, dict): + log.error(f"GCP error (error not dict): {error}") + return "VM creation failed. Please contact the administrator." + + errors = error.get('errors', []) + if not errors: + log.error(f"GCP error (empty errors list): {error}") + return "VM creation failed. Please contact the administrator." + + # Get the first error (usually the most relevant) + first_error = errors[0] if isinstance(errors, list) and len(errors) > 0 else {} + error_code = first_error.get('code', 'UNKNOWN') + error_message = first_error.get('message', 'No details provided') + + # Check if we have a user-friendly message for this error code + if error_code in GCP_ERROR_MESSAGES: + return GCP_ERROR_MESSAGES[error_code] + + # For unknown errors, log full details server-side but return generic message + # to avoid exposing potentially sensitive information (project names, zones, etc.) + log.error(f"GCP error ({error_code}): {error_message}") + return f"VM creation failed ({error_code}). Please contact the administrator." + + mod_ci = Blueprint('ci', __name__) @@ -782,9 +854,9 @@ def start_test(compute, app, db, repository: Repository.Repository, test, bot_to if not safe_db_commit(db, f"recording GCP instance for test {test.id}"): log.error(f"Failed to record GCP instance for test {test.id}, but VM was created") else: - error_msg = result.get('error', 'Unknown error') if isinstance(result, dict) else str(result) + error_msg = parse_gcp_error(result) log.error(f"Error creating test instance for test {test.id}, result: {result}") - mark_test_failed(db, test, repository, f"Failed to create VM: {error_msg}") + mark_test_failed(db, test, repository, error_msg) def create_instance(compute, project, zone, test, reportURL) -> Dict: @@ -997,7 +1069,7 @@ def save_xml_to_file(xml_node, folder_name, file_name) -> None: ) -def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None: +def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> list: """ Add test details entry into Test model for each platform. @@ -1013,8 +1085,8 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None: :type branch: str :param pr_nr: Pull Request number, if applicable. :type pr_nr: int - :return: Nothing - :rtype: None + :return: List of created test IDs + :rtype: list """ from run import log @@ -1022,7 +1094,7 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None: # Based on issue identified by NexionisJake in PR #937 if not is_valid_commit_hash(commit): log.error(f"Invalid commit hash '{commit}' - skipping test entry creation") - return + return [] fork_url = f"%/{g.github['repository_owner']}/{g.github['repository']}.git" fork = Fork.query.filter(Fork.github.like(fork_url)).first() @@ -1031,12 +1103,59 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None: log.debug('pull request test type detected') branch = "pull_request" + test_ids = [] linux_test = Test(TestPlatform.linux, test_type, fork.id, branch, commit, pr_nr) db.add(linux_test) + db.flush() # Get ID before commit + test_ids.append(linux_test.id) + windows_test = Test(TestPlatform.windows, test_type, fork.id, branch, commit, pr_nr) db.add(windows_test) + db.flush() # Get ID before commit + test_ids.append(windows_test.id) + if not safe_db_commit(db, f"adding test entries for commit {commit[:7]}"): log.error(f"Failed to add test entries for commit {commit}") + return [] + + return test_ids + + +def trigger_test_tasks(test_ids: list, bot_token: str) -> None: + """ + Optionally trigger Celery tasks for newly created tests. + + Only triggers if USE_CELERY_TASKS is True in config. + Falls back to waiting for cron/periodic task otherwise. + + :param test_ids: List of Test IDs to queue + :type test_ids: list + :param bot_token: GitHub bot token + :type bot_token: str + """ + from run import config, log + + if not config.get('USE_CELERY_TASKS', False): + log.debug("Celery tasks disabled, tests will be picked up by cron/periodic task") + return + + if not test_ids: + return + + try: + from mod_ci.tasks import start_test_task + + for test_id in test_ids: + start_test_task.apply_async( + args=[test_id, bot_token], + queue='test_execution', + countdown=30 # 30 second delay for artifact upload to complete + ) + log.info(f"Queued test {test_id} via Celery") + except ImportError: + log.warning("Celery tasks module not available, falling back to cron") + except Exception as e: + log.error(f"Failed to queue Celery tasks: {e}, tests will be picked up by cron") def schedule_test(gh_commit: Commit.Commit) -> None: @@ -1302,7 +1421,8 @@ def start_ci(): last_commit.value = ref.object.sha if not safe_db_commit(g.db, "updating last commit"): return 'ERROR' - add_test_entry(g.db, commit_hash, TestType.commit) + test_ids = add_test_entry(g.db, commit_hash, TestType.commit) + trigger_test_tasks(test_ids, g.github['bot_token']) else: g.log.warning('Unknown push type! Dumping payload for analysis') g.log.warning(payload) @@ -1332,7 +1452,8 @@ def start_ci(): try: pr = retry_with_backoff(lambda: repository.get_pull(number=pr_nr)) if pr.mergeable is not False: - add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr) + test_ids = add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr) + trigger_test_tasks(test_ids, g.github['bot_token']) except GithubException as e: g.log.error(f"Failed to get PR {pr_nr} after retries: {e}") diff --git a/mod_ci/tasks.py b/mod_ci/tasks.py new file mode 100644 index 00000000..d1c140aa --- /dev/null +++ b/mod_ci/tasks.py @@ -0,0 +1,241 @@ +"""Celery tasks for CI platform operations.""" + +from celery import shared_task +from celery.exceptions import SoftTimeLimitExceeded +from celery.utils.log import get_task_logger +from github import Auth, Github, GithubException + +from celery_app import celery + +logger = get_task_logger(__name__) + + +@celery.task( + bind=True, + max_retries=3, + default_retry_delay=60, + autoretry_for=(GithubException,), + retry_backoff=True, + retry_backoff_max=300, + acks_late=True +) +def start_test_task(self, test_id: int, bot_token: str): + """ + Execute a single test by creating a GCP VM instance. + + This task wraps the existing start_test() function with Celery's + retry mechanisms and proper error handling. + + :param test_id: The ID of the Test to execute + :param bot_token: GitHub bot token for artifact download + :return: Dict with status and message + """ + # Import inside task to avoid circular imports and ensure fresh Flask context + from run import app, config + + from database import create_session + from mod_ci.controllers import ( + get_compute_service_object, + mark_test_failed, + start_test, + ) + from mod_ci.models import GcpInstance + from mod_test.models import Test + + with app.app_context(): + db = create_session(config['DATABASE_URI']) + + try: + # Fetch the test + test = Test.query.get(test_id) + if test is None: + logger.error(f"Test {test_id} not found") + return {'status': 'error', 'message': 'Test not found'} + + # Check if test is already finished + if test.finished: + logger.info(f"Test {test_id} already finished, skipping") + return {'status': 'skipped', 'message': 'Test already finished'} + + # Check if test already has a GCP instance (prevent duplicates) + existing_instance = GcpInstance.query.filter( + GcpInstance.test_id == test_id + ).first() + if existing_instance is not None: + logger.info(f"Test {test_id} already has GCP instance, skipping") + return {'status': 'skipped', 'message': 'Test already has instance'} + + # Get GitHub repository + gh = Github(auth=Auth.Token(bot_token)) + repository = gh.get_repo( + f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" + ) + + # Execute the test + compute = get_compute_service_object() + start_test(compute, app, db, repository, test, bot_token) + + logger.info(f"Test {test_id} started successfully") + return {'status': 'success', 'test_id': test_id} + + except SoftTimeLimitExceeded: + logger.error(f"Test {test_id} exceeded time limit") + try: + test = Test.query.get(test_id) + if test and not test.finished: + gh = Github(auth=Auth.Token(bot_token)) + repository = gh.get_repo( + f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" + ) + mark_test_failed(db, test, repository, "Task timed out") + except Exception as mark_error: + logger.error(f"Failed to mark test {test_id} as failed: {mark_error}") + raise + + except Exception as e: + logger.exception(f"Error starting test {test_id}: {e}") + # Retry on transient failures + if self.request.retries < self.max_retries: + raise self.retry(exc=e) + # Final failure - mark test as failed + try: + test = Test.query.get(test_id) + if test and not test.finished: + gh = Github(auth=Auth.Token(bot_token)) + repository = gh.get_repo( + f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" + ) + mark_test_failed(db, test, repository, f"Task failed: {str(e)[:100]}") + except Exception as mark_error: + logger.error(f"Failed to mark test {test_id} as failed: {mark_error}") + raise + + finally: + db.remove() + + +@celery.task(bind=True, acks_late=True) +def check_expired_instances_task(self): + """ + Periodic task to clean up expired GCP instances. + + This wraps delete_expired_instances() for Celery scheduling. + + :return: Dict with status and message + """ + from run import app, config + + from database import create_session + from github import Auth, Github + from mod_ci.controllers import delete_expired_instances, get_compute_service_object + + with app.app_context(): + db = create_session(config['DATABASE_URI']) + + try: + vm_max_runtime = config.get('GCP_INSTANCE_MAX_RUNTIME', 120) + zone = config.get('ZONE', '') + project = config.get('PROJECT_NAME', '') + + if not zone or not project: + logger.error('GCP zone or project not configured') + return {'status': 'error', 'message': 'GCP not configured'} + + # Get GitHub repository + github_token = config.get('GITHUB_TOKEN', '') + if not github_token: + logger.error('GitHub token not configured') + return {'status': 'error', 'message': 'GitHub token missing'} + + gh = Github(auth=Auth.Token(github_token)) + repository = gh.get_repo( + f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" + ) + + compute = get_compute_service_object() + delete_expired_instances( + compute, vm_max_runtime, project, zone, db, repository + ) + + logger.info('Expired instances check completed') + return {'status': 'success'} + + except Exception as e: + logger.exception(f"Error checking expired instances: {e}") + return {'status': 'error', 'message': str(e)} + + finally: + db.remove() + + +@celery.task(bind=True, acks_late=True) +def process_pending_tests_task(self): + """ + Periodic task to find and queue pending tests for execution. + + This replaces the cron-based approach by finding pending tests + and dispatching individual start_test_task for each. + + :return: Dict with status and count of queued tests + """ + from run import app, config + + from database import create_session + from mod_ci.models import GcpInstance, MaintenanceMode + from mod_test.models import Test, TestPlatform, TestProgress, TestStatus + + with app.app_context(): + db = create_session(config['DATABASE_URI']) + + try: + github_token = config.get('GITHUB_TOKEN', '') + if not github_token: + logger.error('GitHub token not configured') + return {'status': 'error', 'message': 'GitHub token missing'} + + bot_token = github_token + queued_count = 0 + + # Find pending tests for each platform + for platform in [TestPlatform.linux, TestPlatform.windows]: + # Check maintenance mode + maintenance_mode = MaintenanceMode.query.filter( + MaintenanceMode.platform == platform + ).first() + if maintenance_mode is not None and maintenance_mode.disabled: + logger.debug(f'[{platform.value}] In maintenance mode, skipping') + continue + + # Get tests with progress (finished or in progress) + finished_tests = db.query(TestProgress.test_id).filter( + TestProgress.status.in_([TestStatus.canceled, TestStatus.completed]) + ) + + # Get tests with GCP instances (currently running) + running_tests = db.query(GcpInstance.test_id) + + # Find pending tests (limit to 5 per platform per run) + pending_tests = Test.query.filter( + Test.id.notin_(finished_tests), + Test.id.notin_(running_tests), + Test.platform == platform + ).order_by(Test.id.asc()).limit(5).all() + + for test in pending_tests: + # Queue each test as a separate task + start_test_task.apply_async( + args=[test.id, bot_token], + queue='test_execution', + countdown=1 # Small delay between tasks + ) + queued_count += 1 + logger.info(f'Queued test {test.id} for {platform.value}') + + return {'status': 'success', 'queued_count': queued_count} + + except Exception as e: + logger.exception(f"Error processing pending tests: {e}") + return {'status': 'error', 'message': str(e)} + + finally: + db.remove() diff --git a/requirements.txt b/requirements.txt index 86c9c0a2..a399b31f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,5 @@ cffi==2.0.0 PyGithub==2.8.1 blinker==1.9.0 click==8.1.7 +celery[redis]==5.3.6 +redis==5.0.1 diff --git a/tests/test_ci/test_controllers.py b/tests/test_ci/test_controllers.py index d9ed6152..59db5bc2 100644 --- a/tests/test_ci/test_controllers.py +++ b/tests/test_ci/test_controllers.py @@ -1,4 +1,5 @@ import json +import unittest from importlib import reload from unittest import mock from unittest.mock import MagicMock @@ -3092,3 +3093,158 @@ def test_mark_test_failed_includes_target_url(self, mock_progress, mock_update_g call_args = mock_update_github.call_args self.assertEqual(len(call_args[0]), 5) # 5 positional args including target_url self.assertIn("456", call_args[0][4]) # target_url contains test ID + + +class TestParseGcpError(unittest.TestCase): + """Tests for the parse_gcp_error helper function.""" + + def test_parse_gcp_error_zone_resource_exhausted(self): + """Test that ZONE_RESOURCE_POOL_EXHAUSTED returns user-friendly message.""" + from mod_ci.controllers import parse_gcp_error + + mock_log = MagicMock() + result = { + 'status': 'DONE', + 'error': { + 'errors': [{ + 'code': 'ZONE_RESOURCE_POOL_EXHAUSTED', + 'message': "The zone 'projects/test/zones/us-central1-a' does not " + "have enough resources available to fulfill the request." + }] + } + } + + error_msg = parse_gcp_error(result, log=mock_log) + self.assertIn("GCP resources temporarily unavailable", error_msg) + self.assertIn("retried automatically", error_msg) + # Should NOT contain raw technical details + self.assertNotIn("us-central1-a", error_msg) + + def test_parse_gcp_error_quota_exceeded(self): + """Test that QUOTA_EXCEEDED returns user-friendly message.""" + from mod_ci.controllers import parse_gcp_error + + mock_log = MagicMock() + result = { + 'error': { + 'errors': [{ + 'code': 'QUOTA_EXCEEDED', + 'message': 'Quota exceeded for resource.' + }] + } + } + + error_msg = parse_gcp_error(result, log=mock_log) + self.assertIn("quota limit reached", error_msg) + + def test_parse_gcp_error_timeout(self): + """Test that TIMEOUT returns user-friendly message.""" + from mod_ci.controllers import parse_gcp_error + + mock_log = MagicMock() + result = { + 'status': 'TIMEOUT', + 'error': { + 'errors': [{ + 'code': 'TIMEOUT', + 'message': 'Operation timed out after 1800 seconds' + }] + } + } + + error_msg = parse_gcp_error(result, log=mock_log) + self.assertIn("timed out", error_msg) + self.assertIn("retried automatically", error_msg) + + def test_parse_gcp_error_unknown_code(self): + """Test that unknown error codes return generic message and log details.""" + from mod_ci.controllers import parse_gcp_error + + mock_log = MagicMock() + result = { + 'error': { + 'errors': [{ + 'code': 'SOME_NEW_ERROR', + 'message': 'Something unexpected happened.' + }] + } + } + + error_msg = parse_gcp_error(result, log=mock_log) + # Should include error code but not the full message (security) + self.assertIn("SOME_NEW_ERROR", error_msg) + self.assertIn("contact the administrator", error_msg) + # Should NOT expose the raw error message + self.assertNotIn("Something unexpected happened", error_msg) + # Should log the full details server-side + mock_log.error.assert_called_once() + self.assertIn("SOME_NEW_ERROR", mock_log.error.call_args[0][0]) + self.assertIn("Something unexpected happened", mock_log.error.call_args[0][0]) + + def test_parse_gcp_error_logs_sensitive_info(self): + """Test that sensitive info is logged but not returned to user.""" + from mod_ci.controllers import parse_gcp_error + + mock_log = MagicMock() + result = { + 'error': { + 'errors': [{ + 'code': 'UNKNOWN_ERROR', + 'message': 'Error in project my-secret-project zone us-central1-a' + }] + } + } + + error_msg = parse_gcp_error(result, log=mock_log) + # Should NOT expose project/zone names + self.assertNotIn("my-secret-project", error_msg) + self.assertNotIn("us-central1-a", error_msg) + # But should log them server-side + mock_log.error.assert_called_once() + self.assertIn("my-secret-project", mock_log.error.call_args[0][0]) + + def test_parse_gcp_error_no_error_key(self): + """Test handling when 'error' key is missing.""" + from mod_ci.controllers import parse_gcp_error + + mock_log = MagicMock() + result = {'status': 'DONE'} + + error_msg = parse_gcp_error(result, log=mock_log) + self.assertIn("VM creation failed", error_msg) + mock_log.error.assert_called_once() + + def test_parse_gcp_error_empty_errors_list(self): + """Test handling when 'errors' list is empty.""" + from mod_ci.controllers import parse_gcp_error + + mock_log = MagicMock() + result = {'error': {'errors': []}} + + error_msg = parse_gcp_error(result, log=mock_log) + self.assertIn("VM creation failed", error_msg) + mock_log.error.assert_called_once() + + def test_parse_gcp_error_not_a_dict(self): + """Test handling when result is not a dictionary.""" + from mod_ci.controllers import parse_gcp_error + + mock_log = MagicMock() + error_msg = parse_gcp_error("some string error", log=mock_log) + self.assertIn("VM creation failed", error_msg) + # Should NOT expose the raw input + self.assertNotIn("some string error", error_msg) + # But should log it + mock_log.error.assert_called_once() + self.assertIn("some string error", mock_log.error.call_args[0][0]) + + def test_parse_gcp_error_error_not_a_dict(self): + """Test handling when 'error' value is not a dictionary.""" + from mod_ci.controllers import parse_gcp_error + + mock_log = MagicMock() + result = {'error': 'just a string'} + + error_msg = parse_gcp_error(result, log=mock_log) + self.assertIn("VM creation failed", error_msg) + mock_log.error.assert_called_once() diff --git a/tests/test_ci/test_tasks.py b/tests/test_ci/test_tasks.py new file mode 100644 index 00000000..7190e227 --- /dev/null +++ b/tests/test_ci/test_tasks.py @@ -0,0 +1,128 @@ +"""Unit tests for Celery tasks and related controller functions.""" + +import unittest +from unittest import mock +from unittest.mock import MagicMock, patch + +from flask import g + +from mod_test.models import TestType +from tests.base import BaseTestCase + + +class TestTriggerTestTasks(BaseTestCase): + """Test cases for trigger_test_tasks function.""" + + def test_trigger_test_tasks_disabled_by_default(self): + """Test that tasks are not triggered when USE_CELERY_TASKS is False (default).""" + from mod_ci.controllers import trigger_test_tasks + + # By default, USE_CELERY_TASKS is False + with self.app.app_context(): + # Should not raise exception and should log debug message + trigger_test_tasks([1, 2], 'fake_token') + # Function returns silently when disabled + + def test_trigger_test_tasks_empty_list(self): + """Test that empty list doesn't trigger any tasks.""" + from mod_ci.controllers import trigger_test_tasks + + with patch.dict(self.app.config, {'USE_CELERY_TASKS': True}): + with self.app.app_context(): + # Should return early without error + trigger_test_tasks([], 'fake_token') + + def test_trigger_test_tasks_handles_import_error(self): + """Test graceful handling when Celery module is not available.""" + from mod_ci.controllers import trigger_test_tasks + + with patch.dict(self.app.config, {'USE_CELERY_TASKS': True}): + # Simulate import error by patching the import + with patch.dict('sys.modules', {'mod_ci.tasks': None}): + with self.app.app_context(): + # Should not raise exception + trigger_test_tasks([1, 2], 'fake_token') + + +class TestAddTestEntryReturnsIds(BaseTestCase): + """Test that add_test_entry returns test IDs correctly.""" + + @mock.patch('mod_ci.controllers.g') + @mock.patch('mod_ci.controllers.safe_db_commit') + @mock.patch('mod_ci.controllers.Fork') + @mock.patch('mod_ci.controllers.Test') + def test_add_test_entry_returns_test_ids(self, mock_test, mock_fork, mock_commit, mock_g): + """Test that add_test_entry returns list of created test IDs.""" + from mod_ci.controllers import add_test_entry + + # Setup mocks + mock_g.github = {'repository_owner': 'test', 'repository': 'test'} + mock_fork_obj = MagicMock() + mock_fork_obj.id = 1 + mock_fork.query.filter.return_value.first.return_value = mock_fork_obj + mock_commit.return_value = True + + # Setup mock Test objects to return IDs + mock_linux_test = MagicMock() + mock_linux_test.id = 100 + mock_windows_test = MagicMock() + mock_windows_test.id = 101 + mock_test.side_effect = [mock_linux_test, mock_windows_test] + + mock_db = MagicMock() + + # Call the function with a valid commit hash (40 hex chars) + test_ids = add_test_entry(mock_db, 'a' * 40, TestType.commit) + + # Verify we got a list with 2 IDs + self.assertIsInstance(test_ids, list) + self.assertEqual(len(test_ids), 2) + self.assertEqual(test_ids[0], 100) + self.assertEqual(test_ids[1], 101) + + def test_add_test_entry_invalid_commit_returns_empty(self): + """Test that invalid commit hash returns empty list.""" + from mod_ci.controllers import add_test_entry + + mock_db = MagicMock() + + # Call with invalid commit hash + test_ids = add_test_entry(mock_db, 'invalid_hash', TestType.commit) + + # Verify empty list for invalid commit + self.assertIsInstance(test_ids, list) + self.assertEqual(len(test_ids), 0) + + @mock.patch('mod_ci.controllers.g') + @mock.patch('mod_ci.controllers.safe_db_commit') + @mock.patch('mod_ci.controllers.Fork') + @mock.patch('mod_ci.controllers.Test') + def test_add_test_entry_db_failure_returns_empty(self, mock_test, mock_fork, mock_commit, mock_g): + """Test that db commit failure returns empty list.""" + from mod_ci.controllers import add_test_entry + + # Setup mocks + mock_g.github = {'repository_owner': 'test', 'repository': 'test'} + mock_fork_obj = MagicMock() + mock_fork_obj.id = 1 + mock_fork.query.filter.return_value.first.return_value = mock_fork_obj + mock_commit.return_value = False # Simulate commit failure + + mock_linux_test = MagicMock() + mock_linux_test.id = 100 + mock_windows_test = MagicMock() + mock_windows_test.id = 101 + mock_test.side_effect = [mock_linux_test, mock_windows_test] + + mock_db = MagicMock() + + # Call the function + test_ids = add_test_entry(mock_db, 'a' * 40, TestType.commit) + + # Verify empty list when commit fails + self.assertIsInstance(test_ids, list) + self.assertEqual(len(test_ids), 0) + + +if __name__ == '__main__': + unittest.main()