Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from api.routes.url import url_router
from collector_db.AsyncDatabaseClient import AsyncDatabaseClient
from collector_db.DatabaseClient import DatabaseClient
from collector_manager.AsyncCollectorManager import AsyncCollectorManager
from core.AsyncCore import AsyncCore
from core.CoreLogger import CoreLogger
from core.ScheduledTaskManager import AsyncScheduledTaskManager
from core.SourceCollectorCore import SourceCollectorCore
from core.TaskManager import TaskManager
from html_tag_collector.ResponseParser import HTMLResponseParser
from html_tag_collector.RootURLCache import RootURLCache
from html_tag_collector.URLRequestInterface import URLRequestInterface
Expand All @@ -28,15 +30,18 @@
async def lifespan(app: FastAPI):
# Initialize shared dependencies
db_client = DatabaseClient()
adb_client = AsyncDatabaseClient()
await setup_database(db_client)
core_logger = CoreLogger(db_client=db_client)

source_collector_core = SourceCollectorCore(
core_logger=CoreLogger(
db_client=db_client
),
db_client=DatabaseClient(),
)
async_core = AsyncCore(
adb_client=AsyncDatabaseClient(),
task_manager = TaskManager(
adb_client=adb_client,
huggingface_interface=HuggingFaceInterface(),
url_request_interface=URLRequestInterface(),
html_parser=HTMLResponseParser(
Expand All @@ -46,6 +51,17 @@ async def lifespan(app: FastAPI):
webhook_url=get_from_env("DISCORD_WEBHOOK_URL")
)
)
async_collector_manager = AsyncCollectorManager(
logger=core_logger,
adb_client=adb_client,
post_collection_function_trigger=task_manager.task_trigger
)

async_core = AsyncCore(
adb_client=adb_client,
task_manager=task_manager,
collector_manager=async_collector_manager
)
async_scheduled_task_manager = AsyncScheduledTaskManager(async_core=async_core)

# Pass dependencies into the app state
Expand All @@ -57,6 +73,7 @@ async def lifespan(app: FastAPI):
yield # Code here runs before shutdown

# Shutdown logic (if needed)
core_logger.shutdown()
app.state.core.shutdown()
# Clean up resources, close connections, etc.
pass
Expand Down
24 changes: 14 additions & 10 deletions api/routes/batch.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from typing import Optional

from fastapi import Path, APIRouter
from fastapi import Path, APIRouter, HTTPException

Check warning on line 3 in api/routes/batch.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] api/routes/batch.py#L3 <401>

'fastapi.HTTPException' imported but unused
Raw output
./api/routes/batch.py:3:1: F401 'fastapi.HTTPException' imported but unused
from fastapi.params import Query, Depends

from api.dependencies import get_core
from api.dependencies import get_core, get_async_core
from collector_db.DTOs.BatchInfo import BatchInfo
from collector_manager.CollectorManager import InvalidCollectorError

Check warning on line 8 in api/routes/batch.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] api/routes/batch.py#L8 <401>

'collector_manager.CollectorManager.InvalidCollectorError' imported but unused
Raw output
./api/routes/batch.py:8:1: F401 'collector_manager.CollectorManager.InvalidCollectorError' imported but unused
from collector_manager.enums import CollectorType
from core.AsyncCore import AsyncCore
from core.DTOs.GetBatchLogsResponse import GetBatchLogsResponse
from core.DTOs.GetBatchStatusResponse import GetBatchStatusResponse
from core.DTOs.GetDuplicatesByBatchResponse import GetDuplicatesByBatchResponse
Expand Down Expand Up @@ -46,24 +48,25 @@


@batch_router.get("/{batch_id}")
def get_batch_info(
async def get_batch_info(

Check warning on line 51 in api/routes/batch.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] api/routes/batch.py#L51 <103>

Missing docstring in public function
Raw output
./api/routes/batch.py:51:1: D103 Missing docstring in public function
batch_id: int = Path(description="The batch id"),
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> BatchInfo:
return core.get_batch_info(batch_id)
result = await core.get_batch_info(batch_id)
return result

@batch_router.get("/{batch_id}/urls")
def get_urls_by_batch(
async def get_urls_by_batch(

Check warning on line 60 in api/routes/batch.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] api/routes/batch.py#L60 <103>

Missing docstring in public function
Raw output
./api/routes/batch.py:60:1: D103 Missing docstring in public function
batch_id: int = Path(description="The batch id"),
page: int = Query(
description="The page number",
default=1
),
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> GetURLsByBatchResponse:
return core.get_urls_by_batch(batch_id, page=page)
return await core.get_urls_by_batch(batch_id, page=page)

@batch_router.get("/{batch_id}/duplicates")
def get_duplicates_by_batch(
Expand All @@ -90,9 +93,10 @@
return core.get_batch_logs(batch_id)

@batch_router.post("/{batch_id}/abort")
def abort_batch(
async def abort_batch(

Check warning on line 96 in api/routes/batch.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] api/routes/batch.py#L96 <103>

Missing docstring in public function
Raw output
./api/routes/batch.py:96:1: D103 Missing docstring in public function
batch_id: int = Path(description="The batch id"),
core: SourceCollectorCore = Depends(get_core),
async_core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> MessageResponse:
return core.abort_batch(batch_id)
return await async_core.abort_batch(batch_id)

Check warning on line 102 in api/routes/batch.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] api/routes/batch.py#L102 <292>

no newline at end of file
Raw output
./api/routes/batch.py:102:50: W292 no newline at end of file
32 changes: 16 additions & 16 deletions api/routes/collector.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from fastapi import APIRouter
from fastapi.params import Depends

from api.dependencies import get_core
from api.dependencies import get_async_core
from collector_manager.DTOs.ExampleInputDTO import ExampleInputDTO
from collector_manager.enums import CollectorType
from core.AsyncCore import AsyncCore
from core.DTOs.CollectorStartInfo import CollectorStartInfo
from core.SourceCollectorCore import SourceCollectorCore
from security_manager.SecurityManager import AccessInfo, get_access_info
from source_collectors.auto_googler.DTOs import AutoGooglerInputDTO
from source_collectors.ckan.DTOs import CKANInputDTO
Expand All @@ -22,13 +22,13 @@
@collector_router.post("/example")
async def start_example_collector(
dto: ExampleInputDTO,
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> CollectorStartInfo:
"""
Start the example collector
"""
return core.initiate_collector(
return await core.initiate_collector(
collector_type=CollectorType.EXAMPLE,
dto=dto,
user_id=access_info.user_id
Expand All @@ -37,13 +37,13 @@ async def start_example_collector(
@collector_router.post("/ckan")
async def start_ckan_collector(
dto: CKANInputDTO,
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> CollectorStartInfo:
"""
Start the ckan collector
"""
return core.initiate_collector(
return await core.initiate_collector(
collector_type=CollectorType.CKAN,
dto=dto,
user_id=access_info.user_id
Expand All @@ -52,13 +52,13 @@ async def start_ckan_collector(
@collector_router.post("/common-crawler")
async def start_common_crawler_collector(
dto: CommonCrawlerInputDTO,
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> CollectorStartInfo:
"""
Start the common crawler collector
"""
return core.initiate_collector(
return await core.initiate_collector(
collector_type=CollectorType.COMMON_CRAWLER,
dto=dto,
user_id=access_info.user_id
Expand All @@ -67,13 +67,13 @@ async def start_common_crawler_collector(
@collector_router.post("/auto-googler")
async def start_auto_googler_collector(
dto: AutoGooglerInputDTO,
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> CollectorStartInfo:
"""
Start the auto googler collector
"""
return core.initiate_collector(
return await core.initiate_collector(
collector_type=CollectorType.AUTO_GOOGLER,
dto=dto,
user_id=access_info.user_id
Expand All @@ -82,13 +82,13 @@ async def start_auto_googler_collector(
@collector_router.post("/muckrock-simple")
async def start_muckrock_collector(
dto: MuckrockSimpleSearchCollectorInputDTO,
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> CollectorStartInfo:
"""
Start the muckrock collector
"""
return core.initiate_collector(
return await core.initiate_collector(
collector_type=CollectorType.MUCKROCK_SIMPLE_SEARCH,
dto=dto,
user_id=access_info.user_id
Expand All @@ -97,13 +97,13 @@ async def start_muckrock_collector(
@collector_router.post("/muckrock-county")
async def start_muckrock_county_collector(
dto: MuckrockCountySearchCollectorInputDTO,
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> CollectorStartInfo:
"""
Start the muckrock county level collector
"""
return core.initiate_collector(
return await core.initiate_collector(
collector_type=CollectorType.MUCKROCK_COUNTY_SEARCH,
dto=dto,
user_id=access_info.user_id
Expand All @@ -112,13 +112,13 @@ async def start_muckrock_county_collector(
@collector_router.post("/muckrock-all")
async def start_muckrock_all_foia_collector(
dto: MuckrockAllFOIARequestsCollectorInputDTO,
core: SourceCollectorCore = Depends(get_core),
core: AsyncCore = Depends(get_async_core),
access_info: AccessInfo = Depends(get_access_info),
) -> CollectorStartInfo:
"""
Start the muckrock collector for all FOIA requests
"""
return core.initiate_collector(
return await core.initiate_collector(
collector_type=CollectorType.MUCKROCK_ALL_SEARCH,
dto=dto,
user_id=access_info.user_id
Expand Down
Loading
Loading