diff --git a/collector_db/StatementComposer.py b/collector_db/StatementComposer.py index a84df5a1..d108a3fa 100644 --- a/collector_db/StatementComposer.py +++ b/collector_db/StatementComposer.py @@ -17,18 +17,22 @@ class StatementComposer: @staticmethod def pending_urls_without_html_data() -> Select: - subquery = (select(1). + exclude_subquery = (select(1). select_from(LinkTaskURL). join(Task, LinkTaskURL.task_id == Task.id). where(LinkTaskURL.url_id == URL.id). where(Task.task_type == TaskType.HTML.value). where(Task.task_status == BatchStatus.COMPLETE.value) ) - - query = select(URL).where( - ~exists(subquery) + query = ( + select(URL). + outerjoin(URLHTMLContent). + where(URLHTMLContent.id == None). + where(~exists(exclude_subquery)). + where(URL.outcome == URLStatus.PENDING.value) ) + return query diff --git a/core/CoreLogger.py b/core/CoreLogger.py deleted file mode 100644 index 79263c78..00000000 --- a/core/CoreLogger.py +++ /dev/null @@ -1,97 +0,0 @@ - - -import queue -import threading -import time -from concurrent.futures import Future -from concurrent.futures.thread import ThreadPoolExecutor - -from collector_db.DTOs.LogInfo import LogInfo -from collector_db.DatabaseClient import DatabaseClient - - -class CoreLogger: - def __init__( - self, - db_client: DatabaseClient, - flush_interval=10, - batch_size=100 - ): - self.db_client = db_client - self.flush_interval = flush_interval - self.batch_size = batch_size - - self.log_queue = queue.Queue() - self.lock = threading.Lock() - self.stop_event = threading.Event() - # Start the periodic flush task - self.executor = ThreadPoolExecutor(max_workers=1) - self.flush_future: Future = self.executor.submit(self._flush_logs) - - def __enter__(self): - """ - Start the logger for use in a context. - """ - return self - - def __exit__(self, exc_type, exc_value, traceback): - """ - Gracefully shut down the logger when exiting the context. - """ - self.shutdown() - - def log(self, log_info: LogInfo): - """ - Adds a log entry to the queue. - """ - self.log_queue.put(log_info) - - def _flush_logs(self): - """ - Periodically flushes logs from the queue to the database. - """ - while not self.stop_event.is_set(): - time.sleep(self.flush_interval) - self.flush() - - def flush(self): - """ - Flushes all logs from the queue to the database in batches. - """ - with self.lock: - logs: list[LogInfo] = [] - while not self.log_queue.empty() and len(logs) < self.batch_size: - try: - log = self.log_queue.get_nowait() - logs.append(log) - except queue.Empty: - break - - if logs: - try: - self.db_client.insert_logs(log_infos=logs) - except Exception as e: - # Handle logging database errors (e.g., save to fallback storage) - print(f"Error while flushing logs: {e}") - - def flush_all(self): - """ - Flushes all logs from the queue to the database. - """ - while not self.log_queue.empty(): - self.flush() - - def restart(self): - self.flush_all() - self.executor.shutdown(wait=False) - self.executor = ThreadPoolExecutor(max_workers=1) - self.flush_future = self.executor.submit(self._flush_logs) - - def shutdown(self): - """ - Stops the logger gracefully and flushes any remaining logs. - """ - self.stop_event.set() - # if self.flush_future and not self.flush_future.done(): - self.flush_future.result(timeout=10) - self.flush_all() # Flush remaining logs diff --git a/core/TaskManager.py b/core/TaskManager.py index 64aa57e6..77844d91 100644 --- a/core/TaskManager.py +++ b/core/TaskManager.py @@ -111,8 +111,8 @@ async def set_task_status(self, task_type: TaskType): async def run_tasks(self): operators = await self.get_task_operators() - count = 0 for operator in operators: + count = 0 await self.set_task_status(task_type=operator.task_type) meets_prereq = await operator.meets_task_prerequisites() @@ -127,6 +127,8 @@ async def run_tasks(self): task_id = await self.initiate_task_in_db(task_type=operator.task_type) run_info: TaskOperatorRunInfo = await operator.run_task(task_id) await self.conclude_task(run_info) + if run_info.outcome == TaskOperatorOutcome.ERROR: + break count += 1 meets_prereq = await operator.meets_task_prerequisites() await self.set_task_status(task_type=TaskType.IDLE) @@ -165,6 +167,8 @@ async def handle_task_error(self, run_info: TaskOperatorRunInfo): task_id=run_info.task_id, error=run_info.message ) + await self.discord_poster.post_to_discord( + message=f"Task {run_info.task_id} ({self.task_status.value}) failed with error.") async def get_task_info(self, task_id: int) -> TaskInfo: return await self.adb_client.get_task_info(task_id=task_id) diff --git a/tests/manual/source_collectors/test_autogoogler_collector.py b/tests/manual/source_collectors/test_autogoogler_collector.py index a51fc883..c9942106 100644 --- a/tests/manual/source_collectors/test_autogoogler_collector.py +++ b/tests/manual/source_collectors/test_autogoogler_collector.py @@ -4,7 +4,6 @@ from collector_db.AsyncDatabaseClient import AsyncDatabaseClient from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger from source_collectors.auto_googler.AutoGooglerCollector import AutoGooglerCollector from source_collectors.auto_googler.DTOs import AutoGooglerInputDTO diff --git a/tests/manual/source_collectors/test_ckan_collector.py b/tests/manual/source_collectors/test_ckan_collector.py index f642fd8d..f9deaf02 100644 --- a/tests/manual/source_collectors/test_ckan_collector.py +++ b/tests/manual/source_collectors/test_ckan_collector.py @@ -4,9 +4,7 @@ from marshmallow import Schema, fields from collector_db.AsyncDatabaseClient import AsyncDatabaseClient -from collector_db.DatabaseClient import DatabaseClient from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger from source_collectors.ckan.CKANCollector import CKANCollector from source_collectors.ckan.DTOs import CKANInputDTO from source_collectors.ckan.search_terms import package_search, group_search, organization_search diff --git a/tests/manual/source_collectors/test_common_crawler_collector.py b/tests/manual/source_collectors/test_common_crawler_collector.py index 872b7710..cb1c4f78 100644 --- a/tests/manual/source_collectors/test_common_crawler_collector.py +++ b/tests/manual/source_collectors/test_common_crawler_collector.py @@ -4,9 +4,7 @@ from marshmallow import Schema, fields from collector_db.AsyncDatabaseClient import AsyncDatabaseClient -from collector_db.DatabaseClient import DatabaseClient from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger from source_collectors.common_crawler.CommonCrawlerCollector import CommonCrawlerCollector from source_collectors.common_crawler.DTOs import CommonCrawlerInputDTO diff --git a/tests/manual/source_collectors/test_muckrock_collectors.py b/tests/manual/source_collectors/test_muckrock_collectors.py index bfd0ba26..49bfa5fb 100644 --- a/tests/manual/source_collectors/test_muckrock_collectors.py +++ b/tests/manual/source_collectors/test_muckrock_collectors.py @@ -4,7 +4,6 @@ from collector_db.AsyncDatabaseClient import AsyncDatabaseClient from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger from source_collectors.muckrock.DTOs import MuckrockSimpleSearchCollectorInputDTO, \ MuckrockCountySearchCollectorInputDTO, MuckrockAllFOIARequestsCollectorInputDTO from source_collectors.muckrock.classes.MuckrockCollector import MuckrockSimpleSearchCollector, \ diff --git a/tests/test_automated/integration/conftest.py b/tests/test_automated/integration/conftest.py index 6be03e86..a0180800 100644 --- a/tests/test_automated/integration/conftest.py +++ b/tests/test_automated/integration/conftest.py @@ -6,22 +6,17 @@ from collector_manager.AsyncCollectorManager import AsyncCollectorManager from core.AsyncCore import AsyncCore from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger from core.SourceCollectorCore import SourceCollectorCore @pytest.fixture def test_core(db_client_test): - with CoreLogger( - db_client=db_client_test - ) as logger: - core = SourceCollectorCore( - db_client=db_client_test, - core_logger=logger, - dev_mode=True - ) - yield core - core.shutdown() + core = SourceCollectorCore( + db_client=db_client_test, + dev_mode=True + ) + yield core + core.shutdown() @pytest.fixture diff --git a/tests/test_automated/integration/core/test_core_logger.py b/tests/test_automated/integration/core/test_core_logger.py deleted file mode 100644 index 07a98000..00000000 --- a/tests/test_automated/integration/core/test_core_logger.py +++ /dev/null @@ -1,66 +0,0 @@ -import threading -import time - -from collector_db.DTOs.LogInfo import LogInfo -from core.CoreLogger import CoreLogger -from tests.helpers.DBDataCreator import DBDataCreator - - -def test_logger_integration(db_data_creator: DBDataCreator): - batch_id = db_data_creator.batch() - db_client = db_data_creator.db_client - with CoreLogger(flush_interval=1, db_client=db_client) as logger: - - # Simulate logging - logger.log(LogInfo(log="Integration Log 1", batch_id=batch_id)) - logger.log(LogInfo(log="Integration Log 2", batch_id=batch_id)) - - # Wait for the flush interval - time.sleep(1.5) - - # Verify logs in the database - logs = db_client.get_logs_by_batch_id(batch_id) - assert len(logs) == 2 - assert logs[0].log == "Integration Log 1" - - -def test_multithreaded_integration_with_live_db(db_data_creator: DBDataCreator): - # Ensure the database is empty - db_client = db_data_creator.db_client - db_client.delete_all_logs() - - batch_ids = [db_data_creator.batch() for _ in range(5)] - db_client = db_data_creator.db_client - logger = CoreLogger(flush_interval=1, db_client=db_client, batch_size=10) - - # Simulate multiple threads logging - def worker(thread_id): - batch_id = batch_ids[thread_id-1] - for i in range(10): # Each thread logs 10 messages - logger.log(LogInfo(log=f"Thread-{thread_id} Log-{i}", batch_id=batch_id)) - - # Start multiple threads - threads = [threading.Thread(target=worker, args=(i+1,)) for i in range(5)] # 5 threads - for t in threads: - t.start() - for t in threads: - t.join() - - # Allow the logger to flush - logger.shutdown() - time.sleep(10) - - # Verify logs in the database - logs = db_client.get_all_logs() - - # Optional: Print logs for manual inspection - for log in logs: - print(log.log) - - # Assertions - assert len(logs) == 50 # 5 threads * 10 messages each - for i in range(1,6): - for j in range(10): - assert any(log.log == f"Thread-{i} Log-{j}" for log in logs) - - diff --git a/tests/test_automated/unit/collector_manager/__init__.py b/tests/test_automated/unit/collector_manager/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/test_automated/unit/collector_manager/test_collector_manager.py b/tests/test_automated/unit/collector_manager/test_collector_manager.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/test_automated/unit/core/test_core_logger.py b/tests/test_automated/unit/core/test_core_logger.py index 22d08bfb..d91ce6cd 100644 --- a/tests/test_automated/unit/core/test_core_logger.py +++ b/tests/test_automated/unit/core/test_core_logger.py @@ -1,86 +1,28 @@ -import threading -import time -from unittest.mock import MagicMock +import asyncio +from unittest.mock import AsyncMock -from collector_db.DTOs.LogInfo import LogInfo -from core.CoreLogger import CoreLogger - - -def test_logger_flush(): - mock_db_client = MagicMock() - logger = CoreLogger(flush_interval=1, db_client=mock_db_client) - - # Add logs - logger.log(LogInfo(log="Log 1", batch_id=1)) - logger.log(LogInfo(log="Log 2", batch_id=1)) - - # Wait for the flush interval - time.sleep(1.5) - - # Verify logs were flushed - assert mock_db_client.insert_logs.called - flushed_logs = mock_db_client.insert_logs.call_args[1]['log_infos'] - assert len(flushed_logs) == 2 - assert flushed_logs[0].log == "Log 1" - - logger.shutdown() - -def test_logger_multithreading(): - mock_db_client = MagicMock() - logger = CoreLogger(flush_interval=1, db_client=mock_db_client, batch_size=10) - - def worker(thread_id): - for i in range(5): # Each thread logs 5 messages - logger.log(LogInfo(log=f"Thread-{thread_id} Log-{i}", batch_id=thread_id)) - - # Start multiple threads - threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)] # 5 threads - for t in threads: - t.start() - for t in threads: - t.join() # Wait for all threads to finish - - # Allow the logger to flush - time.sleep(2) - logger.shutdown() - - # Verify all logs were flushed - assert mock_db_client.insert_logs.called - flushed_logs = [] - for call in mock_db_client.insert_logs.call_args_list: - flushed_logs.extend(call[1]['log_infos']) - - # Ensure all logs are present - assert len(flushed_logs) == 25 # 5 threads * 5 messages each - for i in range(5): - for j in range(5): - assert any(log.log == f"Thread-{i} Log-{j}" for log in flushed_logs) +import pytest +from collector_db.DTOs.LogInfo import LogInfo +from core.AsyncCoreLogger import AsyncCoreLogger -def test_logger_with_delays(): - mock_db_client = MagicMock() - logger = CoreLogger(flush_interval=1, db_client=mock_db_client, batch_size=10) - def worker(thread_id): - for i in range(10): # Each thread logs 10 messages - logger.log(LogInfo(log=f"Thread-{thread_id} Log-{i}", batch_id=thread_id)) - time.sleep(0.1) # Simulate delay between logs +@pytest.mark.asyncio +async def test_logger_flush(): + mock_adb_client = AsyncMock() + async with AsyncCoreLogger(flush_interval=1, adb_client=mock_adb_client) as logger: - # Start multiple threads - threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)] # 5 threads - for t in threads: - t.start() - for t in threads: - t.join() # Wait for all threads to finish + # Add logs + await logger.log(LogInfo(log="Log 1", batch_id=1)) + await logger.log(LogInfo(log="Log 2", batch_id=1)) - # Allow the logger to flush - time.sleep(2) - logger.shutdown() + # Wait for the flush interval + await asyncio.sleep(1.5) - # Verify that all logs are eventually flushed - flushed_logs = [] - for call in mock_db_client.insert_logs.call_args_list: - flushed_logs.extend(call[1]['log_infos']) + # Verify logs were flushed + mock_adb_client.insert_logs.assert_called_once() + flushed_logs = mock_adb_client.insert_logs.call_args[1]['log_infos'] + assert len(flushed_logs) == 2 + assert flushed_logs[0].log == "Log 1" - assert len(flushed_logs) == 50 # 5 threads * 10 messages each diff --git a/tests/test_automated/unit/source_collectors/test_autogoogler_collector.py b/tests/test_automated/unit/source_collectors/test_autogoogler_collector.py index 2349afe2..c3fafa61 100644 --- a/tests/test_automated/unit/source_collectors/test_autogoogler_collector.py +++ b/tests/test_automated/unit/source_collectors/test_autogoogler_collector.py @@ -4,9 +4,7 @@ from collector_db.AsyncDatabaseClient import AsyncDatabaseClient from collector_db.DTOs.URLInfo import URLInfo -from collector_db.DatabaseClient import DatabaseClient from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger from source_collectors.auto_googler.AutoGooglerCollector import AutoGooglerCollector from source_collectors.auto_googler.DTOs import GoogleSearchQueryResultsInnerDTO, AutoGooglerInputDTO diff --git a/tests/test_automated/unit/source_collectors/test_ckan_collector.py b/tests/test_automated/unit/source_collectors/test_ckan_collector.py index ef7dbee8..e0e9ee47 100644 --- a/tests/test_automated/unit/source_collectors/test_ckan_collector.py +++ b/tests/test_automated/unit/source_collectors/test_ckan_collector.py @@ -5,9 +5,7 @@ import pytest from collector_db.AsyncDatabaseClient import AsyncDatabaseClient -from collector_db.DatabaseClient import DatabaseClient from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger from source_collectors.ckan.CKANCollector import CKANCollector from source_collectors.ckan.DTOs import CKANInputDTO diff --git a/tests/test_automated/unit/source_collectors/test_common_crawl_collector.py b/tests/test_automated/unit/source_collectors/test_common_crawl_collector.py index d1f0ccda..1c5aa6ee 100644 --- a/tests/test_automated/unit/source_collectors/test_common_crawl_collector.py +++ b/tests/test_automated/unit/source_collectors/test_common_crawl_collector.py @@ -4,9 +4,7 @@ from collector_db.AsyncDatabaseClient import AsyncDatabaseClient from collector_db.DTOs.URLInfo import URLInfo -from collector_db.DatabaseClient import DatabaseClient from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger from source_collectors.common_crawler.CommonCrawlerCollector import CommonCrawlerCollector from source_collectors.common_crawler.DTOs import CommonCrawlerInputDTO diff --git a/tests/test_automated/unit/source_collectors/test_example_collector.py b/tests/test_automated/unit/source_collectors/test_example_collector.py index 26ca601d..b770d952 100644 --- a/tests/test_automated/unit/source_collectors/test_example_collector.py +++ b/tests/test_automated/unit/source_collectors/test_example_collector.py @@ -1,10 +1,9 @@ -from unittest.mock import MagicMock, AsyncMock +from unittest.mock import AsyncMock from collector_db.DatabaseClient import DatabaseClient from collector_manager.DTOs.ExampleInputDTO import ExampleInputDTO from collector_manager.ExampleCollector import ExampleCollector from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger def test_example_collector(): diff --git a/tests/test_automated/unit/source_collectors/test_muckrock_collectors.py b/tests/test_automated/unit/source_collectors/test_muckrock_collectors.py index 7e533efa..100fbb6e 100644 --- a/tests/test_automated/unit/source_collectors/test_muckrock_collectors.py +++ b/tests/test_automated/unit/source_collectors/test_muckrock_collectors.py @@ -5,9 +5,7 @@ from collector_db.AsyncDatabaseClient import AsyncDatabaseClient from collector_db.DTOs.URLInfo import URLInfo -from collector_db.DatabaseClient import DatabaseClient from core.AsyncCoreLogger import AsyncCoreLogger -from core.CoreLogger import CoreLogger from source_collectors.muckrock.DTOs import MuckrockSimpleSearchCollectorInputDTO, \ MuckrockCountySearchCollectorInputDTO, MuckrockAllFOIARequestsCollectorInputDTO from source_collectors.muckrock.classes.MuckrockCollector import MuckrockSimpleSearchCollector, \