diff --git a/src/core/tasks/scheduled/sync/agency/operator.py b/src/core/tasks/scheduled/sync/agency/operator.py index 55318882..333d0195 100644 --- a/src/core/tasks/scheduled/sync/agency/operator.py +++ b/src/core/tasks/scheduled/sync/agency/operator.py @@ -44,5 +44,5 @@ async def inner_task_logic(self): request_count += 1 await self.adb_client.mark_full_agencies_sync() - print(f"Sync completeSynced {count_agencies_synced} agencies") + print(f"Sync complete. Synced {count_agencies_synced} agencies") diff --git a/src/core/tasks/url/operators/submit_approved_url/queries/__init__.py b/src/core/tasks/url/operators/submit_approved_url/queries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/url/operators/submit_approved_url/queries/get.py b/src/core/tasks/url/operators/submit_approved_url/queries/get.py new file mode 100644 index 00000000..ea40ce79 --- /dev/null +++ b/src/core/tasks/url/operators/submit_approved_url/queries/get.py @@ -0,0 +1,67 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from src.collectors.enums import URLStatus +from src.core.tasks.url.operators.submit_approved_url.tdo import SubmitApprovedURLTDO +from src.db.models.instantiations.url.core.sqlalchemy import URL +from src.db.queries.base.builder import QueryBuilderBase +from src.db.helpers.session import session_helper as sh + +class GetValidatedURLsQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> list[SubmitApprovedURLTDO]: + query = await self._build_query() + urls = await sh.scalars(session, query) + return await self._process_results(urls) + + async def _process_results(self, urls): + results: list[SubmitApprovedURLTDO] = [] + for url in urls: + try: + tdo = await self._process_result(url) + except Exception as e: + raise ValueError(f"Failed to process url {url.id}") from e + results.append(tdo) + return results + + @staticmethod + async def _build_query(): + query = ( + select(URL) + .where(URL.outcome == URLStatus.VALIDATED.value) + .options( + selectinload(URL.optional_data_source_metadata), + selectinload(URL.confirmed_agencies), + selectinload(URL.reviewing_user) + ).limit(100) + ) + return query + + @staticmethod + async def _process_result(url: URL) -> SubmitApprovedURLTDO: + agency_ids = [] + for agency in url.confirmed_agencies: + agency_ids.append(agency.agency_id) + optional_metadata = url.optional_data_source_metadata + if optional_metadata is None: + record_formats = None + data_portal_type = None + supplying_entity = None + else: + record_formats = optional_metadata.record_formats + data_portal_type = optional_metadata.data_portal_type + supplying_entity = optional_metadata.supplying_entity + tdo = SubmitApprovedURLTDO( + url_id=url.id, + url=url.url, + name=url.name, + agency_ids=agency_ids, + description=url.description, + record_type=url.record_type, + record_formats=record_formats, + data_portal_type=data_portal_type, + supplying_entity=supplying_entity, + approving_user_id=url.reviewing_user.user_id + ) + return tdo \ No newline at end of file diff --git a/src/core/tasks/url/operators/submit_approved_url/queries/has_validated.py b/src/core/tasks/url/operators/submit_approved_url/queries/has_validated.py new file mode 100644 index 00000000..9a5c4b51 --- /dev/null +++ b/src/core/tasks/url/operators/submit_approved_url/queries/has_validated.py @@ -0,0 +1,18 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.collectors.enums import URLStatus +from src.db.models.instantiations.url.core.sqlalchemy import URL +from src.db.queries.base.builder import QueryBuilderBase + + +class HasValidatedURLsQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> bool: + query = ( + select(URL) + .where(URL.outcome == URLStatus.VALIDATED.value) + ) + urls = await session.execute(query) + urls = urls.scalars().all() + return len(urls) > 0 \ No newline at end of file diff --git a/src/core/tasks/url/operators/submit_approved_url/queries/mark_submitted.py b/src/core/tasks/url/operators/submit_approved_url/queries/mark_submitted.py new file mode 100644 index 00000000..9c68ec21 --- /dev/null +++ b/src/core/tasks/url/operators/submit_approved_url/queries/mark_submitted.py @@ -0,0 +1,38 @@ +from sqlalchemy import update +from sqlalchemy.ext.asyncio import AsyncSession + +from src.collectors.enums import URLStatus +from src.core.tasks.url.operators.submit_approved_url.tdo import SubmittedURLInfo +from src.db.models.instantiations.url.core.sqlalchemy import URL +from src.db.models.instantiations.url.data_source.sqlalchemy import URLDataSource +from src.db.queries.base.builder import QueryBuilderBase + + +class MarkURLsAsSubmittedQueryBuilder(QueryBuilderBase): + + def __init__(self, infos: list[SubmittedURLInfo]): + super().__init__() + self.infos = infos + + async def run(self, session: AsyncSession): + for info in self.infos: + url_id = info.url_id + data_source_id = info.data_source_id + + query = ( + update(URL) + .where(URL.id == url_id) + .values( + outcome=URLStatus.SUBMITTED.value + ) + ) + + url_data_source_object = URLDataSource( + url_id=url_id, + data_source_id=data_source_id + ) + if info.submitted_at is not None: + url_data_source_object.created_at = info.submitted_at + session.add(url_data_source_object) + + await session.execute(query) \ No newline at end of file diff --git a/src/db/client/async_.py b/src/db/client/async_.py index 07af0739..bb444c0e 100644 --- a/src/db/client/async_.py +++ b/src/db/client/async_.py @@ -71,6 +71,9 @@ GetPendingURLsWithoutAgencySuggestionsQueryBuilder from src.core.tasks.url.operators.auto_relevant.models.tdo import URLRelevantTDO from src.core.tasks.url.operators.auto_relevant.queries.get_tdos import GetAutoRelevantTDOsQueryBuilder +from src.core.tasks.url.operators.submit_approved_url.queries.get import GetValidatedURLsQueryBuilder +from src.core.tasks.url.operators.submit_approved_url.queries.has_validated import HasValidatedURLsQueryBuilder +from src.core.tasks.url.operators.submit_approved_url.queries.mark_submitted import MarkURLsAsSubmittedQueryBuilder from src.core.tasks.url.operators.submit_approved_url.tdo import SubmitApprovedURLTDO, SubmittedURLInfo from src.core.tasks.url.operators.url_404_probe.tdo import URL404ProbeTDO from src.core.tasks.url.operators.url_duplicate.tdo import URLDuplicateTDO @@ -81,7 +84,6 @@ from src.core.tasks.url.operators.url_miscellaneous_metadata.queries.has_pending_urls_missing_miscellaneous_data import \ HasPendingURsMissingMiscellaneousDataQueryBuilder from src.core.tasks.url.operators.url_miscellaneous_metadata.tdo import URLMiscellaneousMetadataTDO -from src.db.helpers.session import session_helper as sh from src.db.client.helpers import add_standard_limit_and_offset from src.db.client.types import UserSuggestionModel from src.db.config_manager import ConfigManager @@ -92,6 +94,7 @@ from src.db.dtos.url.mapping import URLMapping from src.db.dtos.url.raw_html import RawHTMLInfo from src.db.enums import TaskType +from src.db.helpers.session import session_helper as sh from src.db.models.instantiations.agency.sqlalchemy import Agency from src.db.models.instantiations.backlog_snapshot import BacklogSnapshot from src.db.models.instantiations.batch.pydantic import BatchInfo @@ -1008,86 +1011,14 @@ async def update_batch_post_collection( batch.status = batch_status.value batch.compute_time = compute_time - @session_manager - async def has_validated_urls(self, session: AsyncSession) -> bool: - query = ( - select(URL) - .where(URL.outcome == URLStatus.VALIDATED.value) - ) - urls = await session.execute(query) - urls = urls.scalars().all() - return len(urls) > 0 - - @session_manager - async def get_validated_urls( - self, - session: AsyncSession - ) -> list[SubmitApprovedURLTDO]: - query = ( - select(URL) - .where(URL.outcome == URLStatus.VALIDATED.value) - .options( - selectinload(URL.optional_data_source_metadata), - selectinload(URL.confirmed_agencies), - selectinload(URL.reviewing_user) - ).limit(100) - ) - urls = await session.execute(query) - urls = urls.scalars().all() - results: list[SubmitApprovedURLTDO] = [] - for url in urls: - agency_ids = [] - for agency in url.confirmed_agencies: - agency_ids.append(agency.agency_id) - optional_metadata = url.optional_data_source_metadata - - if optional_metadata is None: - record_formats = None - data_portal_type = None - supplying_entity = None - else: - record_formats = optional_metadata.record_formats - data_portal_type = optional_metadata.data_portal_type - supplying_entity = optional_metadata.supplying_entity - - tdo = SubmitApprovedURLTDO( - url_id=url.id, - url=url.url, - name=url.name, - agency_ids=agency_ids, - description=url.description, - record_type=url.record_type, - record_formats=record_formats, - data_portal_type=data_portal_type, - supplying_entity=supplying_entity, - approving_user_id=url.reviewing_user.user_id - ) - results.append(tdo) - return results - - @session_manager - async def mark_urls_as_submitted(self, session: AsyncSession, infos: list[SubmittedURLInfo]): - for info in infos: - url_id = info.url_id - data_source_id = info.data_source_id - - query = ( - update(URL) - .where(URL.id == url_id) - .values( - outcome=URLStatus.SUBMITTED.value - ) - ) + async def has_validated_urls(self) -> bool: + return await self.run_query_builder(HasValidatedURLsQueryBuilder()) - url_data_source_object = URLDataSource( - url_id=url_id, - data_source_id=data_source_id - ) - if info.submitted_at is not None: - url_data_source_object.created_at = info.submitted_at - session.add(url_data_source_object) + async def get_validated_urls(self) -> list[SubmitApprovedURLTDO]: + return await self.run_query_builder(GetValidatedURLsQueryBuilder()) - await session.execute(query) + async def mark_urls_as_submitted(self, infos: list[SubmittedURLInfo]): + await self.run_query_builder(MarkURLsAsSubmittedQueryBuilder(infos)) async def get_duplicates_by_batch_id(self, batch_id: int, page: int) -> list[DuplicateInfo]: return await self.run_query_builder(GetDuplicatesByBatchIDQueryBuilder(