Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions collector_db/StatementComposer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

@staticmethod
def pending_urls_without_html_data() -> Select:
subquery = (select(1).
exclude_subquery = (select(1).
select_from(LinkTaskURL).
join(Task, LinkTaskURL.task_id == Task.id).
where(LinkTaskURL.url_id == URL.id).
where(Task.task_type == TaskType.HTML.value).
where(Task.task_status == BatchStatus.COMPLETE.value)
)

query = select(URL).where(
~exists(subquery)
query = (
select(URL).
outerjoin(URLHTMLContent).
where(URLHTMLContent.id == None).

Check failure on line 30 in collector_db/StatementComposer.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_db/StatementComposer.py#L30 <711>

comparison to None should be 'if cond is None:'
Raw output
./collector_db/StatementComposer.py:30:37: E711 comparison to None should be 'if cond is None:'
where(~exists(exclude_subquery)).
where(URL.outcome == URLStatus.PENDING.value)
)


return query


Expand Down
97 changes: 0 additions & 97 deletions core/CoreLogger.py

This file was deleted.

6 changes: 5 additions & 1 deletion core/TaskManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ async def set_task_status(self, task_type: TaskType):

async def run_tasks(self):
operators = await self.get_task_operators()
count = 0
for operator in operators:
count = 0
await self.set_task_status(task_type=operator.task_type)

meets_prereq = await operator.meets_task_prerequisites()
Expand All @@ -127,6 +127,8 @@ async def run_tasks(self):
task_id = await self.initiate_task_in_db(task_type=operator.task_type)
run_info: TaskOperatorRunInfo = await operator.run_task(task_id)
await self.conclude_task(run_info)
if run_info.outcome == TaskOperatorOutcome.ERROR:
break
count += 1
meets_prereq = await operator.meets_task_prerequisites()
await self.set_task_status(task_type=TaskType.IDLE)
Expand Down Expand Up @@ -165,6 +167,8 @@ async def handle_task_error(self, run_info: TaskOperatorRunInfo):
task_id=run_info.task_id,
error=run_info.message
)
await self.discord_poster.post_to_discord(
message=f"Task {run_info.task_id} ({self.task_status.value}) failed with error.")

async def get_task_info(self, task_id: int) -> TaskInfo:
return await self.adb_client.get_task_info(task_id=task_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from core.AsyncCoreLogger import AsyncCoreLogger
from core.CoreLogger import CoreLogger
from source_collectors.auto_googler.AutoGooglerCollector import AutoGooglerCollector
from source_collectors.auto_googler.DTOs import AutoGooglerInputDTO

Expand Down
2 changes: 0 additions & 2 deletions tests/manual/source_collectors/test_ckan_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
from marshmallow import Schema, fields

from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from collector_db.DatabaseClient import DatabaseClient
from core.AsyncCoreLogger import AsyncCoreLogger
from core.CoreLogger import CoreLogger
from source_collectors.ckan.CKANCollector import CKANCollector
from source_collectors.ckan.DTOs import CKANInputDTO
from source_collectors.ckan.search_terms import package_search, group_search, organization_search
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
from marshmallow import Schema, fields

from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from collector_db.DatabaseClient import DatabaseClient
from core.AsyncCoreLogger import AsyncCoreLogger
from core.CoreLogger import CoreLogger
from source_collectors.common_crawler.CommonCrawlerCollector import CommonCrawlerCollector
from source_collectors.common_crawler.DTOs import CommonCrawlerInputDTO

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from core.AsyncCoreLogger import AsyncCoreLogger
from core.CoreLogger import CoreLogger
from source_collectors.muckrock.DTOs import MuckrockSimpleSearchCollectorInputDTO, \
MuckrockCountySearchCollectorInputDTO, MuckrockAllFOIARequestsCollectorInputDTO
from source_collectors.muckrock.classes.MuckrockCollector import MuckrockSimpleSearchCollector, \
Expand Down
17 changes: 6 additions & 11 deletions tests/test_automated/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@
from collector_manager.AsyncCollectorManager import AsyncCollectorManager
from core.AsyncCore import AsyncCore
from core.AsyncCoreLogger import AsyncCoreLogger
from core.CoreLogger import CoreLogger
from core.SourceCollectorCore import SourceCollectorCore


@pytest.fixture
def test_core(db_client_test):
with CoreLogger(
db_client=db_client_test
) as logger:
core = SourceCollectorCore(
db_client=db_client_test,
core_logger=logger,
dev_mode=True
)
yield core
core.shutdown()
core = SourceCollectorCore(
db_client=db_client_test,
dev_mode=True
)
yield core
core.shutdown()


@pytest.fixture
Expand Down
66 changes: 0 additions & 66 deletions tests/test_automated/integration/core/test_core_logger.py

This file was deleted.

Empty file.
Empty file.
96 changes: 19 additions & 77 deletions tests/test_automated/unit/core/test_core_logger.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,28 @@
import threading
import time
from unittest.mock import MagicMock
import asyncio

Check warning on line 1 in tests/test_automated/unit/core/test_core_logger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] tests/test_automated/unit/core/test_core_logger.py#L1 <100>

Missing docstring in public module
Raw output
./tests/test_automated/unit/core/test_core_logger.py:1:1: D100 Missing docstring in public module
from unittest.mock import AsyncMock

from collector_db.DTOs.LogInfo import LogInfo
from core.CoreLogger import CoreLogger


def test_logger_flush():
mock_db_client = MagicMock()
logger = CoreLogger(flush_interval=1, db_client=mock_db_client)

# Add logs
logger.log(LogInfo(log="Log 1", batch_id=1))
logger.log(LogInfo(log="Log 2", batch_id=1))

# Wait for the flush interval
time.sleep(1.5)

# Verify logs were flushed
assert mock_db_client.insert_logs.called
flushed_logs = mock_db_client.insert_logs.call_args[1]['log_infos']
assert len(flushed_logs) == 2
assert flushed_logs[0].log == "Log 1"

logger.shutdown()

def test_logger_multithreading():
mock_db_client = MagicMock()
logger = CoreLogger(flush_interval=1, db_client=mock_db_client, batch_size=10)

def worker(thread_id):
for i in range(5): # Each thread logs 5 messages
logger.log(LogInfo(log=f"Thread-{thread_id} Log-{i}", batch_id=thread_id))

# Start multiple threads
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)] # 5 threads
for t in threads:
t.start()
for t in threads:
t.join() # Wait for all threads to finish

# Allow the logger to flush
time.sleep(2)
logger.shutdown()

# Verify all logs were flushed
assert mock_db_client.insert_logs.called
flushed_logs = []
for call in mock_db_client.insert_logs.call_args_list:
flushed_logs.extend(call[1]['log_infos'])

# Ensure all logs are present
assert len(flushed_logs) == 25 # 5 threads * 5 messages each
for i in range(5):
for j in range(5):
assert any(log.log == f"Thread-{i} Log-{j}" for log in flushed_logs)
import pytest

from collector_db.DTOs.LogInfo import LogInfo
from core.AsyncCoreLogger import AsyncCoreLogger

def test_logger_with_delays():
mock_db_client = MagicMock()
logger = CoreLogger(flush_interval=1, db_client=mock_db_client, batch_size=10)

def worker(thread_id):
for i in range(10): # Each thread logs 10 messages
logger.log(LogInfo(log=f"Thread-{thread_id} Log-{i}", batch_id=thread_id))
time.sleep(0.1) # Simulate delay between logs
@pytest.mark.asyncio
async def test_logger_flush():

Check warning on line 11 in tests/test_automated/unit/core/test_core_logger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] tests/test_automated/unit/core/test_core_logger.py#L11 <103>

Missing docstring in public function
Raw output
./tests/test_automated/unit/core/test_core_logger.py:11:1: D103 Missing docstring in public function
mock_adb_client = AsyncMock()
async with AsyncCoreLogger(flush_interval=1, adb_client=mock_adb_client) as logger:

# Start multiple threads
threads = [threading.Thread(target=worker, args=(i,)) for i in range(5)] # 5 threads
for t in threads:
t.start()
for t in threads:
t.join() # Wait for all threads to finish
# Add logs
await logger.log(LogInfo(log="Log 1", batch_id=1))
await logger.log(LogInfo(log="Log 2", batch_id=1))

# Allow the logger to flush
time.sleep(2)
logger.shutdown()
# Wait for the flush interval
await asyncio.sleep(1.5)

# Verify that all logs are eventually flushed
flushed_logs = []
for call in mock_db_client.insert_logs.call_args_list:
flushed_logs.extend(call[1]['log_infos'])
# Verify logs were flushed
mock_adb_client.insert_logs.assert_called_once()
flushed_logs = mock_adb_client.insert_logs.call_args[1]['log_infos']
assert len(flushed_logs) == 2
assert flushed_logs[0].log == "Log 1"

assert len(flushed_logs) == 50 # 5 threads * 10 messages each

Loading