From c69b4d8908af4bc29e059a1039a3ad9264f6c642 Mon Sep 17 00:00:00 2001 From: Gustavo Flores Date: Thu, 11 Dec 2025 18:58:43 -0300 Subject: [PATCH] refactor: modify ingester to use processes in a 'fan-out' pattern --- backend/kernelCI_app/constants/ingester.py | 3 + .../commands/helpers/aggregation_helpers.py | 33 +- .../commands/helpers/kcidbng_ingester.py | 542 +++++++++-------- .../kcidbng_ingester_test.py | 551 ++---------------- backend/kernelCI_app/typeModels/modelTypes.py | 7 +- 5 files changed, 343 insertions(+), 793 deletions(-) diff --git a/backend/kernelCI_app/constants/ingester.py b/backend/kernelCI_app/constants/ingester.py index 591a7d456..f688aeaf5 100644 --- a/backend/kernelCI_app/constants/ingester.py +++ b/backend/kernelCI_app/constants/ingester.py @@ -38,6 +38,9 @@ See https://prometheus.github.io/client_python/multiprocess/ for more details. """ +INGEST_FILES_BATCH_SIZE = int(os.environ.get("INGEST_FILES_BATCH_SIZE", 100)) +"""Size of the batch of files to be queued. Default: 100""" + try: INGESTER_METRICS_PORT = int(os.environ.get("INGESTER_METRICS_PORT", 8002)) except (ValueError, TypeError): diff --git a/backend/kernelCI_app/management/commands/helpers/aggregation_helpers.py b/backend/kernelCI_app/management/commands/helpers/aggregation_helpers.py index 8bc86e8c0..00945ef13 100644 --- a/backend/kernelCI_app/management/commands/helpers/aggregation_helpers.py +++ b/backend/kernelCI_app/management/commands/helpers/aggregation_helpers.py @@ -55,23 +55,24 @@ def aggregate_checkouts(checkouts_instances: Sequence[Checkouts]) -> None: for checkout in checkouts_instances ] - with connection.cursor() as cursor: - cursor.executemany( - """ - INSERT INTO latest_checkout ( - checkout_id, origin, tree_name, - git_repository_url, git_repository_branch, start_time + if len(values) > 0: + with connection.cursor() as cursor: + cursor.executemany( + """ + INSERT INTO latest_checkout ( + checkout_id, origin, tree_name, + git_repository_url, git_repository_branch, start_time + ) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (origin, tree_name, git_repository_url, git_repository_branch) + DO UPDATE SET + start_time = EXCLUDED.start_time, + checkout_id = EXCLUDED.checkout_id + WHERE latest_checkout.start_time < EXCLUDED.start_time + """, + values, ) - VALUES (%s, %s, %s, %s, %s, %s) - ON CONFLICT (origin, tree_name, git_repository_url, git_repository_branch) - DO UPDATE SET - start_time = EXCLUDED.start_time, - checkout_id = EXCLUDED.checkout_id - WHERE latest_checkout.start_time < EXCLUDED.start_time - """, - values, - ) - out(f"inserted {len(checkouts_instances)} checkouts in {time.time() - t0:.3f}s") + out(f"inserted {len(checkouts_instances)} checkouts in {time.time() - t0:.3f}s") def aggregate_tests( diff --git a/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py b/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py index 0123b0e70..4292753ed 100644 --- a/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py +++ b/backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py @@ -1,16 +1,16 @@ import multiprocessing -from multiprocessing.synchronize import Event as EventClass +from multiprocessing.sharedctypes import Synchronized +from multiprocessing.synchronize import Lock as ProcessLock from os import DirEntry -from concurrent.futures import ProcessPoolExecutor, as_completed import json import logging import os -from queue import Queue, Empty +from queue import Queue from typing_extensions import Literal from kernelCI_app.constants.ingester import ( CONVERT_LOG_EXCERPT, INGEST_BATCH_SIZE, - INGEST_FLUSH_TIMEOUT_SEC, + INGEST_FILES_BATCH_SIZE, INGEST_QUEUE_MAXSIZE, INGESTER_GRAFANA_LABEL, VERBOSE, @@ -91,7 +91,7 @@ def standardize_tree_names( def prepare_file_data( file: SubmissionFileMetadata, tree_names: dict[str, str] -) -> tuple[Optional[dict[str, Any]], dict[str, Any]]: +) -> tuple[Optional[dict[str, Any]], Optional[dict[str, Any]]]: """ Prepare file data: read, extract log excerpts, standardize tree names, validate. This function does everything except the actual database load. @@ -173,8 +173,10 @@ def flush_buffers( tests_buf: list[Tests], incidents_buf: list[Incidents], buffer_files: set[tuple[str, str]], - archive_dir: str, - pending_retry_dir: str, + dirs: dict[INGESTER_DIRS, str], + stat_ok: Synchronized, + stat_fail: Synchronized, + counter_lock: ProcessLock, ) -> None: """ Consumes the list of objects and tries to insert them into the database. @@ -205,13 +207,18 @@ def flush_buffers( tests_instances=tests_buf, ) for filename, filepath in buffer_files: - os.rename(filepath, os.path.join(archive_dir, filename)) + os.rename(filepath, os.path.join(dirs["archive"], filename)) + + with counter_lock: + stat_ok.value += len(buffer_files) except Exception as e: logger.error("Error during buffer flush: %s", e) try: for filename, filepath in buffer_files: - os.rename(filepath, os.path.join(pending_retry_dir, filename)) + os.rename(filepath, os.path.join(dirs["failed"], filename)) out("Moved %d files to pending retry directory" % len(buffer_files)) + with counter_lock: + stat_fail.value += len(buffer_files) except OSError as oe: logger.error("OS error during buffer file pending retry move: %s", oe) logger.error("Removing files from buffer set, they should be retried") @@ -240,128 +247,6 @@ def flush_buffers( buffer_files.clear() -# TODO: lower the complexity of this function -def db_worker( # noqa: C901 - stop_event: EventClass, - db_queue: Queue, - archive_dir: str, - pending_retry_dir: str, -) -> None: - """ - Worker process that processes the database queue. - This is the only process that interacts with the database. - - Args: - stop_event: multiprocessing.Event (flag) to signal the worker to stop processing - queue: multiprocessing.JoinableQueue to communicate with the worker - """ - - # Local buffers for batching - issues_buf: list[Issues] = [] - checkouts_buf: list[Checkouts] = [] - builds_buf: list[Builds] = [] - tests_buf: list[Tests] = [] - incidents_buf: list[Incidents] = [] - buffer_files: set[tuple[str, str]] = set() - - last_flush_ts = time.time() - - def buffered_total() -> int: - return ( - len(issues_buf) - + len(checkouts_buf) - + len(builds_buf) - + len(tests_buf) - + len(incidents_buf) - ) - - while not stop_event.is_set() or not db_queue.empty(): - try: - item = db_queue.get(timeout=0.1) - if item is None: - db_queue.task_done() - break - try: - filename, filepath, inst = item - if inst is not None: - buffer_files.add((filename, filepath)) - issues_buf.extend(inst["issues"]) - checkouts_buf.extend(inst["checkouts"]) - builds_buf.extend(inst["builds"]) - tests_buf.extend(inst["tests"]) - incidents_buf.extend(inst["incidents"]) - - if buffered_total() >= INGEST_BATCH_SIZE: - flush_buffers( - issues_buf=issues_buf, - checkouts_buf=checkouts_buf, - builds_buf=builds_buf, - tests_buf=tests_buf, - incidents_buf=incidents_buf, - archive_dir=archive_dir, - pending_retry_dir=pending_retry_dir, - buffer_files=buffer_files, - ) - last_flush_ts = time.time() - - if VERBOSE: - msg = ( - "Queued from %s: " - "issues=%d checkouts=%d builds=%d tests=%d incidents=%d" - % ( - filename, - len(inst["issues"]), - len(inst["checkouts"]), - len(inst["builds"]), - len(inst["tests"]), - len(inst["incidents"]), - ) - ) - out(msg) - except Exception as e: - logger.error("Error processing item in db_worker: %s", e) - finally: - db_queue.task_done() - - except Empty: - # Time-based flush when idle - if (time.time() - last_flush_ts) >= INGEST_FLUSH_TIMEOUT_SEC: - if VERBOSE: - out( - "Idle flush after %.1fs without new items (buffered=%d)" - % ( - INGEST_FLUSH_TIMEOUT_SEC, - buffered_total(), - ) - ) - flush_buffers( - issues_buf=issues_buf, - checkouts_buf=checkouts_buf, - builds_buf=builds_buf, - tests_buf=tests_buf, - incidents_buf=incidents_buf, - archive_dir=archive_dir, - pending_retry_dir=pending_retry_dir, - buffer_files=buffer_files, - ) - last_flush_ts = time.time() - continue - except Exception as e: - logger.error("Unexpected error in db_worker: %s", e) - - # Final flush after loop ends - flush_buffers( - issues_buf=issues_buf, - checkouts_buf=checkouts_buf, - builds_buf=builds_buf, - tests_buf=tests_buf, - incidents_buf=incidents_buf, - archive_dir=archive_dir, - pending_retry_dir=pending_retry_dir, - buffer_files=buffer_files, - ) - - MAP_TABLENAMES_TO_COUNTER: dict[TableNames, Counter] = { "checkouts": CHECKOUTS_COUNTER, "issues": ISSUES_COUNTER, @@ -371,60 +256,218 @@ def buffered_total() -> int: } -def process_file( - file: SubmissionFileMetadata, +class SubmissionsInstances(TypedDict): + issues: list[Issues] + checkouts: list[Checkouts] + builds: list[Builds] + tests: list[Tests] + incidents: list[Incidents] + + +def process_batch( + process_queue: Queue, tree_names: dict[str, str], - failed_dir: str, - db_queue: Queue, -) -> bool: - """ - Process a single file in a process, then queue it for database insertion. + dirs: dict[INGESTER_DIRS, str], + processed: Synchronized, + stat_ok: Synchronized, + stat_fail: Synchronized, + counter_lock: ProcessLock, +) -> None: + # Ensure that the new process has a unique connection to the database + connections.close_all() - Returns: - True if file was processed or deleted, False if an error occured - """ - data, metadata = prepare_file_data(file, tree_names) + instances_dict: SubmissionsInstances = { + "issues": [], + "checkouts": [], + "builds": [], + "tests": [], + "incidents": [], + } - if "error" in metadata: - try: - move_file_to_failed_dir(file["path"], failed_dir) - except Exception: - pass - return False + buffer_files = set() + + while True: + batch = process_queue.get() - if data is None: - # Empty file, already deleted - return True + if batch is None or len(batch) == 0: + break - db_queue.put( - ( - file["name"], - file["path"], - build_instances_from_submission(data, MAP_TABLENAMES_TO_COUNTER), + for file in batch: + data, metadata = prepare_file_data(file, tree_names) + + if metadata and metadata.get("error"): + try: + move_file_to_failed_dir(file["path"], dirs["failed"]) + except Exception: + pass + with counter_lock: + stat_fail.value += 1 + processed.value += 1 + continue + + if data is None: + with counter_lock: + processed.value += 1 + continue + + with counter_lock: + processed.value += 1 + FILES_INGESTER_COUNTER.labels(ingester=INGESTER_GRAFANA_LABEL).inc() + + instances = build_instances_from_submission(data, MAP_TABLENAMES_TO_COUNTER) + + instances_dict["issues"].extend(instances["issues"]) + instances_dict["checkouts"].extend(instances["checkouts"]) + instances_dict["builds"].extend(instances["builds"]) + instances_dict["tests"].extend(instances["tests"]) + instances_dict["incidents"].extend(instances["incidents"]) + + buffer_files.add((file["name"], file["path"])) + + # Sort instances to prevent deadlocks when multiple transactions update the same rows + instances_dict["issues"].sort(key=lambda x: x.id) + instances_dict["checkouts"].sort(key=lambda x: x.id) + instances_dict["builds"].sort(key=lambda x: x.id) + instances_dict["tests"].sort(key=lambda x: x.id) + instances_dict["incidents"].sort(key=lambda x: x.id) + + flush_buffers( + issues_buf=( + instances_dict["issues"] + if len(instances_dict["issues"]) >= INGEST_BATCH_SIZE + else [] + ), + checkouts_buf=( + instances_dict["checkouts"] + if len(instances_dict["checkouts"]) >= INGEST_BATCH_SIZE + else [] + ), + builds_buf=( + instances_dict["builds"] + if len(instances_dict["builds"]) >= INGEST_BATCH_SIZE + else [] + ), + tests_buf=( + instances_dict["tests"] + if len(instances_dict["tests"]) >= INGEST_BATCH_SIZE + else [] + ), + incidents_buf=( + instances_dict["incidents"] + if len(instances_dict["incidents"]) >= INGEST_BATCH_SIZE + else [] + ), + buffer_files=buffer_files, + dirs=dirs, + stat_ok=stat_ok, + stat_fail=stat_fail, + counter_lock=counter_lock, ) - ) - FILES_INGESTER_COUNTER.labels(INGESTER_GRAFANA_LABEL).inc() - return True + if any(len(instances_dict[table]) for table in instances_dict): # type: ignore + out("Process finished, flushing remaining buffers") + flush_buffers( + issues_buf=instances_dict["issues"], + checkouts_buf=instances_dict["checkouts"], + builds_buf=instances_dict["builds"], + tests_buf=instances_dict["tests"], + incidents_buf=instances_dict["incidents"], + buffer_files=buffer_files, + dirs=dirs, + stat_ok=stat_ok, + stat_fail=stat_fail, + counter_lock=counter_lock, + ) -def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threading +def print_ingest_progress( + processed: int, + total_files: int, + total_bytes: int, + stat_ok: int, + stat_fail: int, + elapsed: float, + queue_size: int, +) -> None: + """ + Print a report of the ingestion process. + """ + files_per_sec = total_files / elapsed if elapsed > 0 else 0.0 + mb = total_bytes / (1024 * 1024) + mb_per_sec = mb / elapsed if elapsed > 0 else 0.0 + rate = processed / elapsed if elapsed > 0 else 0.0 + remaining = total_files - processed + eta = remaining / rate if rate > 0 else float("inf") + + if remaining > 0: + msg = ( + "Progress: %d/%d files (ok=%d, fail=%d) | " + "%.2fs elapsed | %.1f files/s | ETA %.1fs | Queue size: %d" + % ( + processed, + total_files, + stat_ok, + stat_fail, + elapsed, + rate, + eta, + queue_size, + ) + ) + else: + msg = ( + "Ingest cycle: %d files (ok=%d, fail=%d) in %.2fs | " + "%.2f files/s | %.2f MB processed (%.2f MB/s)" + % ( + total_files, + stat_ok, + stat_fail, + elapsed, + files_per_sec, + mb, + mb_per_sec, + ) + ) + out(msg) + + +def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + multiprocessing json_files: list[DirEntry[str]], tree_names: dict[str, str], dirs: dict[INGESTER_DIRS, str], max_workers: int = 5, ) -> None: """ - Ingest submissions in parallel using ThreadPoolExecutor for I/O operations - and a single database worker thread. + Ingest submissions in parallel using child processes for I/O and database operations. """ + cycle_start = time.time() total_bytes = 0 - for f in json_files: + total_files_count = len(json_files) + + process_queue: multiprocessing.Queue[Optional[list[SubmissionFileMetadata]]] = ( + multiprocessing.Queue(maxsize=INGEST_QUEUE_MAXSIZE) + ) + + batch = [] + for file in json_files: try: - total_bytes += f.stat().st_size + total_bytes += file.stat().st_size except Exception: pass + batch.append( + SubmissionFileMetadata( + path=file.path, + name=file.name, + size=file.stat().st_size, + ) + ) + + batch_len = len(batch) + if batch_len >= INGEST_FILES_BATCH_SIZE or batch_len >= total_files_count: + process_queue.put(batch) + batch = [] + out( "Spool status: %d .json files queued (%.2f MB)" % ( @@ -433,130 +476,65 @@ def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + threadin ) ) - cycle_start = time.time() - total_files_count = len(json_files) - - manager = multiprocessing.Manager() - db_queue = manager.JoinableQueue(maxsize=INGEST_QUEUE_MAXSIZE) - - # Start database worker process - # This process will constantly consume the db_queue and send data to the database - stop_event = multiprocessing.Event() - db_process = multiprocessing.Process( - target=db_worker, - args=( - stop_event, - db_queue, - dirs["archive"], - dirs["pending_retry"], - ), - ) - db_process.start() - - stat_ok = 0 - stat_fail = 0 - - processed = 0 + stat_ok = multiprocessing.Value("i", 0) + stat_fail = multiprocessing.Value("i", 0) + counter_lock = multiprocessing.Lock() + processed = multiprocessing.Value("i", 0) last_progress = cycle_start - progress_every_n = 200 progress_every_sec = 2.0 + writers = [] try: - # Process files in parallel - with ProcessPoolExecutor(max_workers=max_workers) as executor: - # Submit all files for processing - future_to_file = { - executor.submit( - process_file, - {"path": file.path, "name": file.name, "size": file.stat().st_size}, + for _ in range(max_workers): + writer = multiprocessing.Process( + target=process_batch, + args=( + process_queue, tree_names, - dirs["failed"], - db_queue, - ): file.name - for file in json_files - } - - # Collect results progressively - for future in as_completed(future_to_file): - filename = future_to_file[future] - try: - result = future.result() - if result: - stat_ok += 1 - else: - stat_fail += 1 - except Exception as e: - logger.error("Exception processing %s: %s", filename, e) - stat_fail += 1 - finally: - processed += 1 - now = time.time() - if ( - processed % progress_every_n == 0 - or (now - last_progress) >= progress_every_sec - ): - elapsed = now - cycle_start - rate = processed / elapsed if elapsed > 0 else 0.0 - remaining = total_files_count - processed - eta = remaining / rate if rate > 0 else float("inf") - try: - qsz = db_queue.qsize() - except Exception: - qsz = -1 - msg = ( - "Progress: %d/%d files (ok=%d, fail=%d) | " - "%.2fs elapsed | %.1f files/s | ETA %.1fs | db_queue=%d" - % ( - processed, - total_files_count, - stat_ok, - stat_fail, - elapsed, - rate, - eta, - qsz, - ) - ) - out(msg) - last_progress = now - - out("Waiting for DB queue to drain... size=%d" % db_queue.qsize()) - # Wait for all database operations to complete - db_queue.join() + dirs, + processed, + stat_ok, + stat_fail, + counter_lock, + ), + ) + writers.append(writer) + writer.start() + process_queue.put(None) # Poison pill to signal the end of the queue + + while not process_queue.empty(): + if time.time() - last_progress > progress_every_sec: + print_ingest_progress( + processed.value, + total_files_count, + total_bytes, + stat_ok.value, + stat_fail.value, + time.time() - cycle_start, + process_queue.qsize(), + ) + last_progress = time.time() + time.sleep(1) + for writer in writers: + writer.join() except KeyboardInterrupt: - out("KeyboardInterrupt: stopping ingestion and flushing...") - try: - # Attempt to cancel remaining futures and exit early - # Note: this only cancels tasks not yet started - pass - finally: - raise - finally: - # Signal database worker to stop - stop_event.set() - db_queue.put(None) # Poison pill - db_process.join() + out("\nKeyboardInterrupt: terminating workers...") + for writer in writers: + if writer.is_alive(): + writer.terminate() + for writer in writers: + writer.join() + out("Workers terminated.") elapsed = time.time() - cycle_start - total_files = stat_ok + stat_fail - if total_files > 0: - files_per_sec = total_files / elapsed if elapsed > 0 else 0.0 - mb = total_bytes / (1024 * 1024) - mb_per_sec = mb / elapsed if elapsed > 0 else 0.0 - msg = ( - "Ingest cycle: %d files (ok=%d, fail=%d) in %.2fs | " - "%.2f files/s | %.2f MB processed (%.2f MB/s)" - % ( - total_files, - stat_ok, - stat_fail, - elapsed, - files_per_sec, - mb, - mb_per_sec, - ) - ) - out(msg) - else: - out("No files processed, nothing to do") + total_files = total_files_count + print_ingest_progress( + processed.value, + total_files, + total_bytes, + stat_ok.value, + stat_fail.value, + elapsed, + process_queue.qsize(), + ) diff --git a/backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py b/backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py index 6dd045786..a4c14eed8 100644 --- a/backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py +++ b/backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py @@ -1,13 +1,7 @@ -import threading -from queue import Queue from unittest.mock import patch, MagicMock, mock_open, call -from concurrent.futures import Future from kernelCI_app.tests.unitTests.helpers.fixtures.kcidbng_ingester_data import ( ARCHIVE_SUBMISSIONS_DIR, - FAILED_SUBMISSIONS_DIR, - PENDING_RETRY_SUBMISSIONS_DIR, - FLUSH_TIMEOUT_SEC_MOCK, INGEST_BATCH_SIZE_MOCK, MAINLINE_URL, SUBMISSION_DIRS_MOCK, @@ -20,7 +14,6 @@ SUBMISSION_FILENAME_MOCK, ) import pytest -from queue import Empty from kernelCI_app.management.commands.helpers.kcidbng_ingester import ( SubmissionFileMetadata, @@ -28,8 +21,6 @@ prepare_file_data, consume_buffer, flush_buffers, - db_worker, - process_file, ingest_submissions_parallel, ) @@ -281,8 +272,10 @@ def test_flush_buffers_empty_buffers(self, mock_rename, mock_consume): tests_buf=[], incidents_buf=[], buffer_files={}, - archive_dir=ARCHIVE_SUBMISSIONS_DIR, - pending_retry_dir=PENDING_RETRY_SUBMISSIONS_DIR, + dirs=SUBMISSION_DIRS_MOCK, + stat_ok=MagicMock(), + stat_fail=MagicMock(), + counter_lock=MagicMock(), ) mock_consume.assert_not_called() @@ -314,6 +307,12 @@ def test_flush_buffers_with_items( incidents_buf = [] buffer_files = {(SUBMISSION_FILENAME_MOCK, SUBMISSION_FILEPATH_MOCK)} + stat_ok = MagicMock() + stat_ok.value = 0 + stat_fail = MagicMock() + stat_fail.value = 0 + counter_lock = MagicMock() + n_issues = len(issues_buf) n_checkouts = len(checkouts_buf) n_builds = len(builds_buf) @@ -328,8 +327,10 @@ def test_flush_buffers_with_items( tests_buf=tests_buf, incidents_buf=incidents_buf, buffer_files=buffer_files, - archive_dir=ARCHIVE_SUBMISSIONS_DIR, - pending_retry_dir=PENDING_RETRY_SUBMISSIONS_DIR, + dirs=SUBMISSION_DIRS_MOCK, + stat_ok=stat_ok, + stat_fail=stat_fail, + counter_lock=counter_lock, ) expected_calls = [ @@ -341,6 +342,10 @@ def test_flush_buffers_with_items( ] mock_consume.assert_has_calls(expected_calls) + # Verify stat_ok update + # counter_lock enter/exit called + assert counter_lock.__enter__.call_count >= 1 + # Inside of the function the `out` is called before the clear # So we check against the len of the original buffers mock_out.assert_called_once_with( @@ -413,6 +418,10 @@ def test_flush_buffers_with_db_error( mock_consume.side_effect = Exception("Database error") + stat_ok = MagicMock() + stat_fail = MagicMock() + counter_lock = MagicMock() + flush_buffers( issues_buf=issues_buf, checkouts_buf=checkouts_buf, @@ -420,8 +429,10 @@ def test_flush_buffers_with_db_error( tests_buf=tests_buf, incidents_buf=incidents_buf, buffer_files=buffer_files, - archive_dir=ARCHIVE_SUBMISSIONS_DIR, - pending_retry_dir=PENDING_RETRY_SUBMISSIONS_DIR, + dirs=SUBMISSION_DIRS_MOCK, + stat_ok=stat_ok, + stat_fail=stat_fail, + counter_lock=counter_lock, ) # Doesn't expect to consume all buffers since the first one raises error and skips the rest @@ -457,7 +468,7 @@ def test_flush_buffers_with_db_error( mock_rename.assert_called_once_with( SUBMISSION_FILEPATH_MOCK, - "/".join([PENDING_RETRY_SUBMISSIONS_DIR, SUBMISSION_FILENAME_MOCK]), + "/".join([SUBMISSION_DIRS_MOCK["failed"], SUBMISSION_FILENAME_MOCK]), ) assert mock_time.call_count == 2 @@ -468,378 +479,59 @@ def test_flush_buffers_with_db_error( mock_aggregate.assert_not_called() -class TestDbWorker: - """Test cases for db_worker function.""" - - # Test cases: - # - stop event is set (in a real scenario it would be set by another thread) - # - less items than the limit for flushing - # - more items than the limit for flushing - # - time-based flush when idle - - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.flush_buffers") - @patch("time.time", return_value=1000) - def test_db_worker_stop_event(self, mock_time, mock_flush): - """Test db_worker with stop event set.""" - test_queue = Queue() - - stop_event = threading.Event() - stop_event.set() - - db_worker( - stop_event, - test_queue, - archive_dir=ARCHIVE_SUBMISSIONS_DIR, - pending_retry_dir=PENDING_RETRY_SUBMISSIONS_DIR, - ) - - assert test_queue.empty() - assert test_queue.all_tasks_done - assert mock_time.call_count == 1 - mock_flush.assert_called_once() - - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.flush_buffers") - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.VERBOSE", False) - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.INGEST_BATCH_SIZE", - 10, - ) - @patch("time.time", return_value=1000) - def test_db_worker_with_item_then_none(self, mock_time, mock_flush): - """Test db_worker processes one item and then stops with None poison pill.""" - test_queue = Queue() - stop_event = threading.Event() - - mock_instances = { - "issues": [MagicMock()], - "checkouts": [MagicMock(), MagicMock()], - "builds": [], - "tests": [MagicMock()], - "incidents": [], - } - test_queue.put( - (SUBMISSION_FILENAME_MOCK, SUBMISSION_FILEPATH_MOCK, mock_instances) - ) - test_queue.put(None) - - db_worker( - stop_event, - test_queue, - archive_dir=ARCHIVE_SUBMISSIONS_DIR, - pending_retry_dir=PENDING_RETRY_SUBMISSIONS_DIR, - ) - - assert test_queue.empty() - assert mock_time.call_count == 1 - - # The only flush is the last one after receiving None - mock_flush.assert_called_once() - - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.flush_buffers") - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out") - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.VERBOSE", True) - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.INGEST_BATCH_SIZE", 5 - ) - @patch("time.time", return_value=TIME_MOCK) - def test_db_worker_flush_on_batch_size_limit(self, mock_time, mock_out, mock_flush): - """Test db_worker flushes when batch size limit is reached (5 individual items).""" - test_queue = Queue() - stop_event = threading.Event() - - mock_instances_1 = { - "issues": [MagicMock()], - "checkouts": [MagicMock(), MagicMock()], - "builds": [], - "tests": [], - "incidents": [], - } - mock_instances_2 = { - "issues": [], - "checkouts": [], - "builds": [MagicMock()], - "tests": [MagicMock(), MagicMock()], - "incidents": [], - } - test_queue.put(("test1.json", SUBMISSION_FILEPATH_MOCK, mock_instances_1)) - test_queue.put(("test2.json", SUBMISSION_FILEPATH_MOCK, mock_instances_2)) - test_queue.put(None) - - # NOTE: On a normal run, the list of items would be cleared from the flush, - # meaning that we could add more items before the next flush, but since we're - # mocking flush_buffers, the lists are not cleared - db_worker( - stop_event, - test_queue, - archive_dir=ARCHIVE_SUBMISSIONS_DIR, - pending_retry_dir=PENDING_RETRY_SUBMISSIONS_DIR, - ) - - out_calls = [ - "Queued from test1.json: issues=1 checkouts=2 builds=0 tests=0 incidents=0", - "Queued from test2.json: issues=0 checkouts=0 builds=1 tests=2 incidents=0", - ] - mock_out.assert_has_calls([call(out_calls[0]), call(out_calls[1])]) - assert test_queue.empty() - # 2 flushes: one after reaching limit, one at the end - assert mock_flush.call_count == 2 - assert mock_time.call_count == 2 - - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.flush_buffers") - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.VERBOSE", True) - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out") - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.INGEST_FLUSH_TIMEOUT_SEC", - FLUSH_TIMEOUT_SEC_MOCK, - ) - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.INGEST_BATCH_SIZE", - 10, - ) - @patch("time.time", side_effect=TIME_MOCK) - def test_db_worker_time_based_flush(self, mock_time, mock_out, mock_flush): - """Test db_worker triggers flush after timeout when idle.""" - stop_event = threading.Event() - - mock_instances = { - "issues": [MagicMock()], - "checkouts": [], - "builds": [], - "tests": [], - "incidents": [], - } - n_issues_submission = len(mock_instances["issues"]) - n_checkouts_submission = len(mock_instances["checkouts"]) - n_builds_submission = len(mock_instances["builds"]) - n_tests_submission = len(mock_instances["tests"]) - n_incidents_submission = len(mock_instances["incidents"]) - total_items_submission = ( - n_issues_submission - + n_checkouts_submission - + n_builds_submission - + n_tests_submission - + n_incidents_submission - ) - - # Mock the queue.get() to return the item, then Empty exception, then None (poison pill) - # The poison pill is needed otherwise the worker would run indefinitely - mock_queue = MagicMock() - mock_queue.get.side_effect = [ - (SUBMISSION_FILENAME_MOCK, SUBMISSION_FILEPATH_MOCK, mock_instances), - Empty(), - None, - ] - - db_worker( - stop_event, - mock_queue, - archive_dir=ARCHIVE_SUBMISSIONS_DIR, - pending_retry_dir=PENDING_RETRY_SUBMISSIONS_DIR, - ) - - # 2 flushes: one after timeout, one at the end - assert mock_flush.call_count == 2 - assert mock_time.call_count == 3 - - out_calls = [ - "Queued from %s: " - "issues=%d checkouts=%d builds=%d tests=%d incidents=%d" - % ( - SUBMISSION_FILENAME_MOCK, - n_issues_submission, - n_checkouts_submission, - n_builds_submission, - n_tests_submission, - n_incidents_submission, - ), - "Idle flush after %.1fs without new items (buffered=%d)" - % (FLUSH_TIMEOUT_SEC_MOCK, total_items_submission), - ] - mock_out.assert_has_calls([call(out_calls[0]), call(out_calls[1])]) - - -class TestProcessFile: - """Test cases for process_file function.""" - - # Test cases: - # - error in metadata with successful move to failed dir - # - error in metadata with exception when moving to failed dir - # - no error but empty data - # - process successfully - - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.move_file_to_failed_dir" - ) - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.prepare_file_data" - ) - def test_process_file_move_to_failed_dir(self, mock_prepare, mock_move_failed): - """Test process_file with failed submission and correct move to failed dir.""" - mock_file = SubmissionFileMetadata( - name=SUBMISSION_FILENAME_MOCK, - path=SUBMISSION_PATH_MOCK, - size=100, - ) - - mock_metadata = {"file": mock_file, "error": "Any error"} - mock_prepare.return_value = (None, mock_metadata) - - test_queue = Queue() - result = process_file(mock_file, {}, FAILED_SUBMISSIONS_DIR, test_queue) - - assert result is False - mock_move_failed.assert_called_once() - - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.move_file_to_failed_dir", - ) - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.prepare_file_data" - ) - def test_process_file_prepare_error_with_move_exception( - self, mock_prepare, mock_move_failed - ): - """Test process_file with failed submission and exception when moving to failed dir.""" - mock_file = SubmissionFileMetadata( - name=SUBMISSION_FILENAME_MOCK, - path=SUBMISSION_PATH_MOCK, - size=100, - ) - - mock_metadata = {"file": mock_file, "error": "Any error"} - mock_prepare.return_value = (None, mock_metadata) - - mock_move_failed.side_effect = Exception("Any Exception error") - - test_queue = Queue() - result = process_file(mock_file, {}, FAILED_SUBMISSIONS_DIR, test_queue) - - # Same asserts as before because currently Exceptions are not treated - assert result is False - mock_move_failed.assert_called_once() - - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.move_file_to_failed_dir", - ) - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.prepare_file_data" - ) - def test_process_file_empty_file(self, mock_prepare, mock_move_failed): - """Test process_file with empty file (already deleted).""" - mock_file = SubmissionFileMetadata( - name=SUBMISSION_FILENAME_MOCK, - path=SUBMISSION_PATH_MOCK, - size=0, - ) - mock_metadata = {} - mock_prepare.return_value = (None, mock_metadata) - - test_queue = Queue() - result = process_file(mock_file, {}, FAILED_SUBMISSIONS_DIR, test_queue) - - assert result is True - mock_move_failed.assert_not_called() - - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.build_instances_from_submission" - ) - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.prepare_file_data" - ) - def test_process_file_success(self, mock_prepare, mock_build_instances): - """Test successful file processing.""" - mock_file = SubmissionFileMetadata( - name=SUBMISSION_FILENAME_MOCK, - path=SUBMISSION_PATH_MOCK, - fsize=100, - ) - - mock_metadata = {"file": mock_file, "fsize": 100} - mock_prepare.return_value = (mock_file, mock_metadata) - - mock_instances_from_submission = { - "issues": [MagicMock()], - "checkouts": [], - "builds": [], - "tests": [MagicMock()], - "incidents": [], - } - mock_build_instances.return_value = mock_instances_from_submission - - test_queue = Queue() - result = process_file(mock_file, {}, FAILED_SUBMISSIONS_DIR, test_queue) - - assert result is True - - # Check that item was queued - assert not test_queue.empty() - queued_item = test_queue.get() - assert queued_item == ( - mock_file["name"], - mock_file["path"], - mock_instances_from_submission, - ) - - class TestIngestSubmissionsParallel: """Test cases for ingest_submissions_parallel function.""" # Test cases: # - successful ingestion - # - processing exception - # - KeyboardInterrupt during ingestion mock_file1 = MagicMock() mock_file1.name = SUBMISSION_FILENAME_MOCK mock_file1.stat.return_value.st_size = 1000 @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out") - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.as_completed") - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.ProcessPoolExecutor" - ) @patch("multiprocessing.Process") + @patch("multiprocessing.Queue") + @patch("multiprocessing.Value") + @patch("time.sleep") @patch("time.time", side_effect=TIME_MOCK) def test_ingest_submissions_parallel_success( self, mock_time, + mock_sleep, + mock_value, + mock_queue_cls, mock_process, - mock_executor, - mock_as_completed, mock_out, ): """Test successful parallel ingestion.""" - # Mock files and related data + mock_queue = MagicMock() + mock_queue_cls.return_value = mock_queue + + mock_queue.empty.return_value = True + mock_queue.qsize.return_value = 0 + + self.mock_file1.path = SUBMISSION_FILEPATH_MOCK + SUBMISSION_FILENAME_MOCK mock_file2 = MagicMock() mock_file2.name = "file2.json" + mock_file2.path = SUBMISSION_FILEPATH_MOCK + "file2.json" mock_file2.stat.return_value.st_size = 2000 json_files = [self.mock_file1, mock_file2] + mock_ok = MagicMock() + mock_ok.value = 1 + mock_fail = MagicMock() + mock_fail.value = 1 + mock_processed = MagicMock() + mock_processed.value = len(json_files) + + mock_value.side_effect = [mock_ok, mock_fail, mock_processed] + # Mock the process mock_process_instance = MagicMock() mock_process.return_value = mock_process_instance - # Mock the futures and executor - future1 = Future() - future1.set_result(True) - - future2 = Future() - future2.set_result(False) - - # Since we are using `with` in the function, we have to mock the ThreadPoolExecutor CLASS itself - # After that, the class will call the __enter__ function and will return something else, so we - # also have to mock that something else as the mock_executor_instance. - # Then the instance will call a method (submit), so we need to mock that return too. - mock_executor_instance = MagicMock() - mock_executor_instance.submit.side_effect = [future1, future2] - mock_executor.return_value.__enter__.return_value = mock_executor_instance - - # Since as_completed is a separate function, we also have to mock its behavior, and make it return - # the futures that were set here, so that we can also control the return value of the future and - # match it with the executor which will construct the `future_to_file` dict - mock_as_completed.return_value = [future1, future2] - ingest_submissions_parallel( json_files=json_files, tree_names={}, @@ -847,158 +539,31 @@ def test_ingest_submissions_parallel_success( max_workers=2, ) - assert mock_time.call_count == 4 + # Verify process was started and joined + assert mock_process_instance.start.call_count == 2 + assert mock_process_instance.join.call_count == 2 - # Verify thread was started and joined - mock_process_instance.start.assert_called_once() - mock_process_instance.join.assert_called_once() - - # Verify output messages stat_ok = 1 stat_fail = 1 - total_processed_files = stat_ok + stat_fail - total_elapsed = 3 + total_elapsed = 1 + total_bytes = ( self.mock_file1.stat.return_value.st_size + mock_file2.stat.return_value.st_size ) mb = total_bytes / (1024 * 1024) - # Only asserts the number and the last message in order to avoid race condition - assert mock_out.call_count == 4 - mock_out.assert_called_with( - "Ingest cycle: %d files (ok=%d, fail=%d) in %.2fs | " - "%.2f files/s | %.2f MB processed (%.2f MB/s)" - % ( - total_processed_files, - stat_ok, - stat_fail, - total_elapsed, - total_processed_files / total_elapsed, - mb, - mb / total_elapsed, - ), - ) - - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out") - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.as_completed") - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.ProcessPoolExecutor" - ) - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.logger") - @patch("multiprocessing.Process") - @patch("time.time", side_effect=TIME_MOCK) - def test_ingest_submissions_parallel_processing_exception( - self, - mock_time, - mock_process, - mock_logger, - mock_executor, - mock_as_completed, - mock_out, - ): - """Test ingestion with processing exception.""" - # Mock the thread - mock_process_instance = MagicMock() - mock_process.return_value = mock_process_instance - - # Mock the futures and executor - future_result = Exception("Processing error") - future1 = MagicMock() - # Use side_effect to make result() raise the exception - future1.result.side_effect = future_result - - mock_executor_instance = MagicMock() - mock_executor_instance.submit.side_effect = [future1] - mock_executor.return_value.__enter__.return_value = mock_executor_instance - - mock_as_completed.return_value = [future1] - - ingest_submissions_parallel( - json_files=[self.mock_file1], - tree_names={}, - dirs=SUBMISSION_DIRS_MOCK, - max_workers=2, - ) - - mock_logger.error.assert_called_once_with( - "Exception processing %s: %s", self.mock_file1.name, future_result - ) - - assert mock_time.call_count == 3 - - # Verify thread was started and joined - mock_process_instance.start.assert_called_once() - mock_process_instance.join.assert_called_once() - - # Verify output messages - stat_ok = 0 - stat_fail = 1 - total_processed_files = stat_ok + stat_fail - total_elapsed = 2 - total_bytes = self.mock_file1.stat.return_value.st_size - mb = total_bytes / (1024 * 1024) - - # Only asserts the number and the last message in order to avoid race condition - assert mock_out.call_count == 3 + # We verify only the final call mock_out.assert_called_with( "Ingest cycle: %d files (ok=%d, fail=%d) in %.2fs | " "%.2f files/s | %.2f MB processed (%.2f MB/s)" % ( - total_processed_files, + len(json_files), stat_ok, stat_fail, total_elapsed, - total_processed_files / total_elapsed, + len(json_files) / total_elapsed, mb, mb / total_elapsed, ), ) - - @patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out") - @patch( - "kernelCI_app.management.commands.helpers.kcidbng_ingester.ProcessPoolExecutor" - ) - @patch("multiprocessing.Process") - @patch("time.time", side_effect=TIME_MOCK) - def test_ingest_submissions_keyboard_interruption( - self, - mock_time, - mock_process, - mock_executor, - mock_out, - ): - """Test parallel ingestion with keyboard interruption.""" - json_files = [self.mock_file1] - - # Mock the thread - mock_process_instance = MagicMock() - mock_process.return_value = mock_process_instance - - # Mock executor to raise KeyboardInterrupt on submit - mock_executor_instance = MagicMock() - mock_executor_instance.submit.side_effect = KeyboardInterrupt() - mock_executor.return_value.__enter__.return_value = mock_executor_instance - - with pytest.raises(KeyboardInterrupt): - ingest_submissions_parallel( - json_files=json_files, - tree_names={}, - dirs=SUBMISSION_DIRS_MOCK, - max_workers=2, - ) - - assert mock_time.call_count == 1 - - # Verify thread was started and joined - mock_process_instance.start.assert_called_once() - mock_process_instance.join.assert_called_once() - - # Verify output messages - total_bytes = self.mock_file1.stat.return_value.st_size - out_calls = [ - "Spool status: %d .json files queued (%.2f MB)" - % (len(json_files), total_bytes / (1024 * 1024)), - "KeyboardInterrupt: stopping ingestion and flushing...", - ] - mock_out.assert_has_calls([call(c) for c in out_calls]) diff --git a/backend/kernelCI_app/typeModels/modelTypes.py b/backend/kernelCI_app/typeModels/modelTypes.py index 0369e2ddf..3ae1398c4 100644 --- a/backend/kernelCI_app/typeModels/modelTypes.py +++ b/backend/kernelCI_app/typeModels/modelTypes.py @@ -1,10 +1,13 @@ -from typing import Literal +from typing import Literal, Type from kernelCI_app.models import Builds, Checkouts, Incidents, Issues, Tests type TableNames = Literal["issues", "checkouts", "builds", "tests", "incidents"] type TableModels = Issues | Checkouts | Builds | Tests | Incidents +type TableModelsClass = Type[Issues] | Type[Checkouts] | Type[Builds] | Type[ + Tests +] | Type[Incidents] -MODEL_MAP: dict[TableNames, TableModels] = { +MODEL_MAP: dict[TableNames, TableModelsClass] = { "issues": Issues, "checkouts": Checkouts, "builds": Builds,