Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from src.api.endpoints.task.routes import task_router
from src.api.endpoints.url.routes import url_router
from src.collectors.manager import AsyncCollectorManager
from src.collectors.source_collectors.muckrock.api_interface.core import MuckrockAPIInterface
from src.core.core import AsyncCore
from src.core.logger import AsyncCoreLogger
from src.core.env_var_manager import EnvVarManager
Expand Down Expand Up @@ -62,6 +63,9 @@ async def lifespan(app: FastAPI):
api_key=env_var_manager.pdap_api_key,
session=session
)
),
muckrock_api_interface=MuckrockAPIInterface(
session=session
)
)
async_collector_manager = AsyncCollectorManager(
Expand Down
5 changes: 3 additions & 2 deletions src/core/tasks/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ def __init__(
html_parser: HTMLResponseParser,
discord_poster: DiscordPoster,
pdap_client: PDAPClient,
muckrock_api_interface: MuckrockAPIInterface
):
# Dependencies
self.adb_client = adb_client
self.pdap_client = pdap_client
self.url_request_interface = url_request_interface
self.html_parser = html_parser
self.discord_poster = discord_poster
self.muckrock_api_interface = muckrock_api_interface

self.logger = logging.getLogger(__name__)
self.logger.addHandler(logging.StreamHandler())
Expand All @@ -65,11 +67,10 @@ async def get_url_record_type_task_operator(self):
return operator

async def get_agency_identification_task_operator(self):
muckrock_api_interface = MuckrockAPIInterface()
operator = AgencyIdentificationTaskOperator(
adb_client=self.adb_client,
pdap_client=self.pdap_client,
muckrock_api_interface=muckrock_api_interface
muckrock_api_interface=self.muckrock_api_interface
)
return operator

Expand Down
57 changes: 27 additions & 30 deletions src/core/tasks/operators/agency_identification/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,36 +70,33 @@ async def run_subtask(subtask, url_id, collector_metadata) -> list[URLAgencySugg
return await subtask.run(url_id=url_id, collector_metadata=collector_metadata)

async def inner_task_logic(self):
async with ClientSession() as session:
self.pdap_client.access_manager.session = session
self.muckrock_api_interface.session = session
tdos: list[AgencyIdentificationTDO] = await self.get_pending_urls_without_agency_identification()
await self.link_urls_to_task(url_ids=[tdo.url_id for tdo in tdos])
error_infos = []
all_agency_suggestions = []
for tdo in tdos:
subtask = await self.get_subtask(tdo.collector_type)
try:
new_agency_suggestions = await self.run_subtask(
subtask,
tdo.url_id,
tdo.collector_metadata
)
all_agency_suggestions.extend(new_agency_suggestions)
except Exception as e:
error_info = URLErrorPydanticInfo(
task_id=self.task_id,
url_id=tdo.url_id,
error=str(e),
)
error_infos.append(error_info)
tdos: list[AgencyIdentificationTDO] = await self.get_pending_urls_without_agency_identification()
await self.link_urls_to_task(url_ids=[tdo.url_id for tdo in tdos])
error_infos = []
all_agency_suggestions = []
for tdo in tdos:
subtask = await self.get_subtask(tdo.collector_type)
try:
new_agency_suggestions = await self.run_subtask(
subtask,
tdo.url_id,
tdo.collector_metadata
)
all_agency_suggestions.extend(new_agency_suggestions)
except Exception as e:
error_info = URLErrorPydanticInfo(
task_id=self.task_id,
url_id=tdo.url_id,
error=str(e),
)
error_infos.append(error_info)

non_unknown_agency_suggestions = [suggestion for suggestion in all_agency_suggestions if suggestion.suggestion_type != SuggestionType.UNKNOWN]
await self.adb_client.upsert_new_agencies(non_unknown_agency_suggestions)
confirmed_suggestions = [suggestion for suggestion in all_agency_suggestions if suggestion.suggestion_type == SuggestionType.CONFIRMED]
await self.adb_client.add_confirmed_agency_url_links(confirmed_suggestions)
non_confirmed_suggestions = [suggestion for suggestion in all_agency_suggestions if suggestion.suggestion_type != SuggestionType.CONFIRMED]
await self.adb_client.add_agency_auto_suggestions(non_confirmed_suggestions)
await self.adb_client.add_url_error_infos(error_infos)
non_unknown_agency_suggestions = [suggestion for suggestion in all_agency_suggestions if suggestion.suggestion_type != SuggestionType.UNKNOWN]
await self.adb_client.upsert_new_agencies(non_unknown_agency_suggestions)
confirmed_suggestions = [suggestion for suggestion in all_agency_suggestions if suggestion.suggestion_type == SuggestionType.CONFIRMED]
await self.adb_client.add_confirmed_agency_url_links(confirmed_suggestions)
non_confirmed_suggestions = [suggestion for suggestion in all_agency_suggestions if suggestion.suggestion_type != SuggestionType.CONFIRMED]
await self.adb_client.add_agency_auto_suggestions(non_confirmed_suggestions)
await self.adb_client.add_url_error_infos(error_infos)


Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async def test_insert_logs(db_data_creator: DBDataCreator):
async def test_delete_old_logs(db_data_creator: DBDataCreator):
batch_id = db_data_creator.batch()

old_datetime = datetime.now() - timedelta(days=1)
old_datetime = datetime.now() - timedelta(days=7)
db_client = db_data_creator.db_client
adb_client = db_data_creator.adb_client
log_infos = []
Expand Down
3 changes: 2 additions & 1 deletion tests/automated/integration/core/test_async_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def setup_async_core(adb_client: AsyncDatabaseClient):
url_request_interface=AsyncMock(),
html_parser=AsyncMock(),
discord_poster=AsyncMock(),
pdap_client=AsyncMock()
pdap_client=AsyncMock(),
muckrock_api_interface=AsyncMock(),
),
collector_manager=AsyncMock()
)
Expand Down