Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from collector_db.DatabaseClient import DatabaseClient
from collector_manager.AsyncCollectorManager import AsyncCollectorManager
from core.AsyncCore import AsyncCore
from core.CoreLogger import CoreLogger
from core.AsyncCoreLogger import AsyncCoreLogger
from core.ScheduledTaskManager import AsyncScheduledTaskManager
from core.SourceCollectorCore import SourceCollectorCore
from core.TaskManager import TaskManager
Expand All @@ -32,12 +32,10 @@ async def lifespan(app: FastAPI):
db_client = DatabaseClient()
adb_client = AsyncDatabaseClient()
await setup_database(db_client)
core_logger = CoreLogger(db_client=db_client)
core_logger = AsyncCoreLogger(adb_client=adb_client)


source_collector_core = SourceCollectorCore(
core_logger=CoreLogger(
db_client=db_client
),
db_client=DatabaseClient(),
)
task_manager = TaskManager(
Expand Down Expand Up @@ -68,13 +66,15 @@ async def lifespan(app: FastAPI):
app.state.core = source_collector_core
app.state.async_core = async_core
app.state.async_scheduled_task_manager = async_scheduled_task_manager
app.state.logger = core_logger

# Startup logic
yield # Code here runs before shutdown

# Shutdown logic (if needed)
core_logger.shutdown()
app.state.core.shutdown()
await core_logger.shutdown()
await async_core.shutdown()
source_collector_core.shutdown()
# Clean up resources, close connections, etc.
pass

Expand Down
11 changes: 10 additions & 1 deletion collector_db/AsyncDatabaseClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from collector_db.DTOs.BatchInfo import BatchInfo
from collector_db.DTOs.DuplicateInfo import DuplicateInsertInfo
from collector_db.DTOs.InsertURLsInfo import InsertURLsInfo
from collector_db.DTOs.LogInfo import LogInfo
from collector_db.DTOs.TaskInfo import TaskInfo
from collector_db.DTOs.URLErrorInfos import URLErrorPydanticInfo
from collector_db.DTOs.URLHTMLContentInfo import URLHTMLContentInfo, HTMLContentType
Expand All @@ -27,7 +28,7 @@
from collector_db.models import URL, URLErrorInfo, URLHTMLContent, Base, \
RootURL, Task, TaskError, LinkTaskURL, Batch, Agency, AutomatedUrlAgencySuggestion, \
UserUrlAgencySuggestion, AutoRelevantSuggestion, AutoRecordTypeSuggestion, UserRelevantSuggestion, \
UserRecordTypeSuggestion, ReviewingUserURL, URLOptionalDataSourceMetadata, ConfirmedURLAgency, Duplicate
UserRecordTypeSuggestion, ReviewingUserURL, URLOptionalDataSourceMetadata, ConfirmedURLAgency, Duplicate, Log
from collector_manager.enums import URLStatus, CollectorType
from core.DTOs.FinalReviewApprovalInfo import FinalReviewApprovalInfo
from core.DTOs.GetNextRecordTypeAnnotationResponseInfo import GetNextRecordTypeAnnotationResponseInfo
Expand Down Expand Up @@ -1378,6 +1379,14 @@
url = raw_result.scalars().first()
return URLInfo(**url.__dict__)

@session_manager
async def insert_logs(self, session, log_infos: List[LogInfo]):

Check warning on line 1383 in collector_db/AsyncDatabaseClient.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_db/AsyncDatabaseClient.py#L1383 <102>

Missing docstring in public method
Raw output
./collector_db/AsyncDatabaseClient.py:1383:1: D102 Missing docstring in public method
for log_info in log_infos:
log = Log(log=log_info.log, batch_id=log_info.batch_id)
if log_info.created_at is not None:
log.created_at = log_info.created_at
session.add(log)

@session_manager
async def insert_duplicates(self, session, duplicate_infos: list[DuplicateInsertInfo]):
for duplicate_info in duplicate_infos:
Expand Down
12 changes: 8 additions & 4 deletions collector_manager/AsyncCollectorBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collector_db.DTOs.InsertURLsInfo import InsertURLsInfo
from collector_db.DTOs.LogInfo import LogInfo
from collector_manager.enums import CollectorType
from core.CoreLogger import CoreLogger
from core.AsyncCoreLogger import AsyncCoreLogger
from core.FunctionTrigger import FunctionTrigger
from core.enums import BatchStatus
from core.preprocessors.PreprocessorBase import PreprocessorBase
Expand All @@ -25,7 +25,7 @@
self,
batch_id: int,
dto: BaseModel,
logger: CoreLogger,
logger: AsyncCoreLogger,
adb_client: AsyncDatabaseClient,
raise_error: bool = False,
post_collection_function_trigger: Optional[FunctionTrigger] = None,
Expand Down Expand Up @@ -120,8 +120,12 @@
self.status = BatchStatus.ERROR
await self.handle_error(e)

async def log(self, message: str, allow_abort = True) -> None:
self.logger.log(LogInfo(
async def log(

Check warning on line 123 in collector_manager/AsyncCollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/AsyncCollectorBase.py#L123 <102>

Missing docstring in public method
Raw output
./collector_manager/AsyncCollectorBase.py:123:1: D102 Missing docstring in public method
self,
message: str,
allow_abort = True # Deprecated

Check warning on line 126 in collector_manager/AsyncCollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/AsyncCollectorBase.py#L126 <100>

Unused argument 'allow_abort'
Raw output
./collector_manager/AsyncCollectorBase.py:126:13: U100 Unused argument 'allow_abort'

Check failure on line 126 in collector_manager/AsyncCollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/AsyncCollectorBase.py#L126 <251>

unexpected spaces around keyword / parameter equals
Raw output
./collector_manager/AsyncCollectorBase.py:126:24: E251 unexpected spaces around keyword / parameter equals

Check failure on line 126 in collector_manager/AsyncCollectorBase.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/AsyncCollectorBase.py#L126 <251>

unexpected spaces around keyword / parameter equals
Raw output
./collector_manager/AsyncCollectorBase.py:126:26: E251 unexpected spaces around keyword / parameter equals
) -> None:
await self.logger.log(LogInfo(
batch_id=self.batch_id,
log=message
))
Expand Down
14 changes: 10 additions & 4 deletions collector_manager/AsyncCollectorManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
from collector_manager.CollectorManager import InvalidCollectorError
from collector_manager.collector_mapping import COLLECTOR_MAPPING
from collector_manager.enums import CollectorType
from core.CoreLogger import CoreLogger
from core.AsyncCoreLogger import AsyncCoreLogger
from core.FunctionTrigger import FunctionTrigger


class AsyncCollectorManager:

def __init__(
self,
logger: CoreLogger,
logger: AsyncCoreLogger,
adb_client: AsyncDatabaseClient,
dev_mode: bool = False,
post_collection_function_trigger: FunctionTrigger = None
Expand Down Expand Up @@ -79,10 +79,16 @@
self.async_tasks.pop(cid)

async def shutdown_all_collectors(self) -> None:
for cid, task in self.async_tasks.items():
while self.async_tasks:
cid, task = self.async_tasks.popitem()
if task.done():
try:
task.result()
except Exception as e:
raise e
await self.abort_collector_async(cid)
else:
task.cancel()
try:
await task # Await so cancellation propagates
except asyncio.CancelledError:
pass

Check warning on line 94 in collector_manager/AsyncCollectorManager.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_manager/AsyncCollectorManager.py#L94 <292>

no newline at end of file
Raw output
./collector_manager/AsyncCollectorManager.py:94:25: W292 no newline at end of file
71 changes: 71 additions & 0 deletions core/AsyncCoreLogger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio

Check warning on line 1 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L1 <100>

Missing docstring in public module
Raw output
./core/AsyncCoreLogger.py:1:1: D100 Missing docstring in public module

from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from collector_db.DTOs.LogInfo import LogInfo


class AsyncCoreLogger:

Check warning on line 7 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L7 <101>

Missing docstring in public class
Raw output
./core/AsyncCoreLogger.py:7:1: D101 Missing docstring in public class
def __init__(

Check warning on line 8 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L8 <107>

Missing docstring in __init__
Raw output
./core/AsyncCoreLogger.py:8:1: D107 Missing docstring in __init__
self,
adb_client: AsyncDatabaseClient,
flush_interval: float = 10,
batch_size: int = 100
):
self.adb_client = adb_client
self.flush_interval = flush_interval
self.batch_size = batch_size

self.log_queue = asyncio.Queue()
self.lock = asyncio.Lock()
self._flush_task: asyncio.Task | None = None
self._stop_event = asyncio.Event()

async def __aenter__(self):

Check warning on line 23 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L23 <105>

Missing docstring in magic method
Raw output
./core/AsyncCoreLogger.py:23:1: D105 Missing docstring in magic method
self._stop_event.clear()
self._flush_task = asyncio.create_task(self._flush_logs())
return self

async def __aexit__(self, exc_type, exc_value, traceback):

Check warning on line 28 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L28 <105>

Missing docstring in magic method
Raw output
./core/AsyncCoreLogger.py:28:1: D105 Missing docstring in magic method

Check warning on line 28 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L28 <100>

Unused argument 'exc_type'
Raw output
./core/AsyncCoreLogger.py:28:31: U100 Unused argument 'exc_type'

Check warning on line 28 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L28 <100>

Unused argument 'exc_value'
Raw output
./core/AsyncCoreLogger.py:28:41: U100 Unused argument 'exc_value'

Check warning on line 28 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L28 <100>

Unused argument 'traceback'
Raw output
./core/AsyncCoreLogger.py:28:52: U100 Unused argument 'traceback'
await self.shutdown()

async def log(self, log_info: LogInfo):

Check warning on line 31 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L31 <102>

Missing docstring in public method
Raw output
./core/AsyncCoreLogger.py:31:1: D102 Missing docstring in public method
await self.log_queue.put(log_info)

async def _flush_logs(self):
while not self._stop_event.is_set():
await asyncio.sleep(self.flush_interval)
await self.flush()

async def flush(self):

Check warning on line 39 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L39 <102>

Missing docstring in public method
Raw output
./core/AsyncCoreLogger.py:39:1: D102 Missing docstring in public method
async with self.lock:
logs: list[LogInfo] = []

while not self.log_queue.empty() and len(logs) < self.batch_size:
try:
log = self.log_queue.get_nowait()
logs.append(log)
except asyncio.QueueEmpty:
break

if logs:
await self.adb_client.insert_logs(log_infos=logs)

async def clear_log_queue(self):

Check warning on line 53 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L53 <102>

Missing docstring in public method
Raw output
./core/AsyncCoreLogger.py:53:1: D102 Missing docstring in public method
while not self.log_queue.empty():
self.log_queue.get_nowait()

async def flush_all(self):

Check warning on line 57 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L57 <102>

Missing docstring in public method
Raw output
./core/AsyncCoreLogger.py:57:1: D102 Missing docstring in public method
while not self.log_queue.empty():
await self.flush()

async def restart(self):

Check warning on line 61 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L61 <102>

Missing docstring in public method
Raw output
./core/AsyncCoreLogger.py:61:1: D102 Missing docstring in public method
await self.flush_all()
await self.shutdown()
self._stop_event.clear()
self._flush_task = asyncio.create_task(self._flush_logs())

async def shutdown(self):

Check warning on line 67 in core/AsyncCoreLogger.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/AsyncCoreLogger.py#L67 <102>

Missing docstring in public method
Raw output
./core/AsyncCoreLogger.py:67:1: D102 Missing docstring in public method
self._stop_event.set()
if self._flush_task:
await self._flush_task
await self.flush_all()
6 changes: 2 additions & 4 deletions core/SourceCollectorCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from collector_db.DatabaseClient import DatabaseClient
from collector_manager.enums import CollectorType
from core.CoreLogger import CoreLogger
from core.DTOs.GetBatchLogsResponse import GetBatchLogsResponse
from core.DTOs.GetBatchStatusResponse import GetBatchStatusResponse
from core.DTOs.GetDuplicatesByBatchResponse import GetDuplicatesByBatchResponse
Expand All @@ -14,13 +13,12 @@
class SourceCollectorCore:
def __init__(
self,
core_logger: CoreLogger,
collector_manager: Optional[Any] = None,
core_logger: Optional[Any] = None, # Deprecated

Check warning on line 16 in core/SourceCollectorCore.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/SourceCollectorCore.py#L16 <100>

Unused argument 'core_logger'
Raw output
./core/SourceCollectorCore.py:16:9: U100 Unused argument 'core_logger'
collector_manager: Optional[Any] = None, # Deprecated

Check warning on line 17 in core/SourceCollectorCore.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] core/SourceCollectorCore.py#L17 <100>

Unused argument 'collector_manager'
Raw output
./core/SourceCollectorCore.py:17:9: U100 Unused argument 'collector_manager'
db_client: DatabaseClient = DatabaseClient(),
dev_mode: bool = False
):
self.db_client = db_client
self.core_logger = core_logger
if not dev_mode:
self.scheduled_task_manager = ScheduledTaskManager(db_client=db_client)
else:
Expand Down
2 changes: 1 addition & 1 deletion source_collectors/auto_googler/AutoGooglerCollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def run_to_completion(self) -> AutoGoogler:
)
)
async for log in auto_googler.run():
self.log(log)
await self.log(log)
return auto_googler

async def run_implementation(self) -> None:
Expand Down
4 changes: 2 additions & 2 deletions source_collectors/muckrock/classes/MuckrockCollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ async def run_implementation(self) -> None:
self.check_for_count_break(results_count, max_count)
except SearchCompleteException:
break
self.log(f"Search {search_count}: Found {len(results)} results")
await self.log(f"Search {search_count}: Found {len(results)} results")

self.log(f"Search Complete. Total results: {results_count}")
await self.log(f"Search Complete. Total results: {results_count}")
self.data = {"urls": self.format_results(all_results)}

def format_results(self, results: list[dict]) -> list[dict]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pytest

from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from collector_db.DatabaseClient import DatabaseClient
from core.AsyncCoreLogger import AsyncCoreLogger
from core.CoreLogger import CoreLogger
from source_collectors.auto_googler.AutoGooglerCollector import AutoGooglerCollector
from source_collectors.auto_googler.DTOs import AutoGooglerInputDTO
Expand All @@ -16,7 +16,7 @@
urls_per_result=5,
queries=["police"],
),
logger = MagicMock(spec=CoreLogger),
logger = AsyncMock(spec=AsyncCoreLogger),

Check failure on line 19 in tests/manual/source_collectors/test_autogoogler_collector.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] tests/manual/source_collectors/test_autogoogler_collector.py#L19 <251>

unexpected spaces around keyword / parameter equals
Raw output
./tests/manual/source_collectors/test_autogoogler_collector.py:19:15: E251 unexpected spaces around keyword / parameter equals

Check failure on line 19 in tests/manual/source_collectors/test_autogoogler_collector.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] tests/manual/source_collectors/test_autogoogler_collector.py#L19 <251>

unexpected spaces around keyword / parameter equals
Raw output
./tests/manual/source_collectors/test_autogoogler_collector.py:19:17: E251 unexpected spaces around keyword / parameter equals
adb_client=AsyncMock(spec=AsyncDatabaseClient),
raise_error=True
)
Expand Down
5 changes: 3 additions & 2 deletions tests/manual/source_collectors/test_ckan_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from collector_db.DatabaseClient import DatabaseClient
from core.AsyncCoreLogger import AsyncCoreLogger
from core.CoreLogger import CoreLogger
from source_collectors.ckan.CKANCollector import CKANCollector
from source_collectors.ckan.DTOs import CKANInputDTO
Expand All @@ -31,7 +32,7 @@ async def test_ckan_collector_default():
"organization_search": organization_search
}
),
logger=MagicMock(spec=CoreLogger),
logger=AsyncMock(spec=AsyncCoreLogger),
adb_client=AsyncMock(spec=AsyncDatabaseClient),
raise_error=True
)
Expand Down Expand Up @@ -80,7 +81,7 @@ async def test_ckan_collector_custom():
]
}
),
logger=MagicMock(spec=CoreLogger),
logger=AsyncMock(spec=AsyncCoreLogger),
adb_client=AsyncMock(spec=AsyncDatabaseClient),
raise_error=True
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from collector_db.DatabaseClient import DatabaseClient
from core.AsyncCoreLogger import AsyncCoreLogger
from core.CoreLogger import CoreLogger
from source_collectors.common_crawler.CommonCrawlerCollector import CommonCrawlerCollector
from source_collectors.common_crawler.DTOs import CommonCrawlerInputDTO
Expand All @@ -18,7 +19,7 @@ async def test_common_crawler_collector():
collector = CommonCrawlerCollector(
batch_id=1,
dto=CommonCrawlerInputDTO(),
logger=MagicMock(spec=CoreLogger),
logger=AsyncMock(spec=AsyncCoreLogger),
adb_client=AsyncMock(spec=AsyncDatabaseClient),
raise_error=True
)
Expand Down
7 changes: 4 additions & 3 deletions tests/manual/source_collectors/test_muckrock_collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest

from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from core.AsyncCoreLogger import AsyncCoreLogger
from core.CoreLogger import CoreLogger
from source_collectors.muckrock.DTOs import MuckrockSimpleSearchCollectorInputDTO, \
MuckrockCountySearchCollectorInputDTO, MuckrockAllFOIARequestsCollectorInputDTO
Expand All @@ -22,7 +23,7 @@ async def test_muckrock_simple_search_collector():
search_string="police",
max_results=10
),
logger=MagicMock(spec=CoreLogger),
logger=AsyncMock(spec=AsyncCoreLogger),
adb_client=AsyncMock(spec=AsyncDatabaseClient),
raise_error=True
)
Expand All @@ -41,7 +42,7 @@ async def test_muckrock_county_level_search_collector():
parent_jurisdiction_id=ALLEGHENY_COUNTY_MUCKROCK_ID,
town_names=ALLEGHENY_COUNTY_TOWN_NAMES
),
logger=MagicMock(spec=CoreLogger),
logger=AsyncMock(spec=AsyncCoreLogger),
adb_client=AsyncMock(spec=AsyncDatabaseClient),
raise_error=True
)
Expand All @@ -61,7 +62,7 @@ async def test_muckrock_full_search_collector():
start_page=1,
total_pages=2
),
logger=MagicMock(spec=CoreLogger),
logger=AsyncMock(spec=AsyncCoreLogger),
adb_client=AsyncMock(spec=AsyncDatabaseClient),
raise_error=True
)
Expand Down
Loading