diff --git a/ENV.md b/ENV.md index a46c4f1d..deabffd9 100644 --- a/ENV.md +++ b/ENV.md @@ -102,6 +102,7 @@ Agency ID Subtasks are collectively disabled by the `URL_AGENCY_IDENTIFICATION_T | `AGENCY_ID_NLP_LOCATION_MATCH_FLAG` | Enables the NLP location match subtask for agency identification. | | `AGENCY_ID_CKAN_FLAG` | Enables the CKAN subtask for agency identification. | | `AGENCY_ID_MUCKROCK_FLAG` | Enables the MuckRock subtask for agency identification. | +| `AGENCY_ID_BATCH_LINK_FLAG` | Enables the Batch Link subtask for agency identification. | ### Location ID Subtasks @@ -111,6 +112,7 @@ Location ID Subtasks are collectively disabled by the `URL_LOCATION_IDENTIFICATI | Flag | Description | |---------------------------------------|---------------------------------------------------------------------| | `LOCATION_ID_NLP_LOCATION_MATCH_FLAG` | Enables the NLP location match subtask for location identification. | +| `LOCATION_ID_BATCH_LINK_FLAG` | Enables the Batch Link subtask for location identification. | ## Foreign Data Wrapper (FDW) diff --git a/alembic/versions/2025_10_11_1438-8b2adc95c5d7_add_batch_link_subtasks.py b/alembic/versions/2025_10_11_1438-8b2adc95c5d7_add_batch_link_subtasks.py new file mode 100644 index 00000000..49fd2354 --- /dev/null +++ b/alembic/versions/2025_10_11_1438-8b2adc95c5d7_add_batch_link_subtasks.py @@ -0,0 +1,34 @@ +"""Add batch link subtasks + +Revision ID: 8b2adc95c5d7 +Revises: 7c4049508bfc +Create Date: 2025-10-11 14:38:01.874040 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +from src.util.alembic_helpers import add_enum_value + +# revision identifiers, used by Alembic. +revision: str = '8b2adc95c5d7' +down_revision: Union[str, None] = '7c4049508bfc' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + add_enum_value( + enum_name="agency_auto_suggestion_method", + enum_value="batch_link" + ) + add_enum_value( + enum_name="auto_location_id_subtask_type", + enum_value="batch_link" + ) + + +def downgrade() -> None: + pass diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/flags/mappings.py b/src/core/tasks/url/operators/agency_identification/subtasks/flags/mappings.py index d6997423..dcc0b60c 100644 --- a/src/core/tasks/url/operators/agency_identification/subtasks/flags/mappings.py +++ b/src/core/tasks/url/operators/agency_identification/subtasks/flags/mappings.py @@ -4,5 +4,6 @@ AutoAgencyIDSubtaskType.HOMEPAGE_MATCH: "AGENCY_ID_HOMEPAGE_MATCH_FLAG", AutoAgencyIDSubtaskType.NLP_LOCATION_MATCH: "AGENCY_ID_NLP_LOCATION_MATCH_FLAG", AutoAgencyIDSubtaskType.CKAN: "AGENCY_ID_CKAN_FLAG", - AutoAgencyIDSubtaskType.MUCKROCK: "AGENCY_ID_MUCKROCK_FLAG" + AutoAgencyIDSubtaskType.MUCKROCK: "AGENCY_ID_MUCKROCK_FLAG", + AutoAgencyIDSubtaskType.BATCH_LINK: "AGENCY_ID_BATCH_LINK_FLAG" } \ No newline at end of file diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/__init__.py b/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/core.py b/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/core.py new file mode 100644 index 00000000..9e15996f --- /dev/null +++ b/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/core.py @@ -0,0 +1,48 @@ +from src.core.tasks.url.operators.agency_identification.subtasks.impl.batch_link.params import \ + AgencyBatchLinkSubtaskParams +from src.core.tasks.url.operators.agency_identification.subtasks.impl.batch_link.query import \ + GetLocationBatchLinkSubtaskParamsQueryBuilder +from src.core.tasks.url.operators.agency_identification.subtasks.models.subtask import AutoAgencyIDSubtaskData +from src.core.tasks.url.operators.agency_identification.subtasks.models.suggestion import AgencySuggestion +from src.core.tasks.url.operators.agency_identification.subtasks.templates.subtask import AgencyIDSubtaskOperatorBase +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.impl.url.suggestion.agency.subtask.enum import AutoAgencyIDSubtaskType +from src.db.models.impl.url.suggestion.agency.subtask.pydantic import URLAutoAgencyIDSubtaskPydantic + + +class AgencyBatchLinkSubtaskOperator(AgencyIDSubtaskOperatorBase): + + def __init__( + self, + adb_client: AsyncDatabaseClient, + task_id: int + ): + super().__init__(adb_client=adb_client, task_id=task_id) + + async def inner_logic(self) -> None: + params: list[AgencyBatchLinkSubtaskParams] = await self._get_params() + self.linked_urls = [param.url_id for param in params] + subtask_data_list: list[AutoAgencyIDSubtaskData] = [] + for param in params: + subtask_data: AutoAgencyIDSubtaskData = AutoAgencyIDSubtaskData( + pydantic_model=URLAutoAgencyIDSubtaskPydantic( + task_id=self.task_id, + url_id=param.url_id, + type=AutoAgencyIDSubtaskType.BATCH_LINK, + agencies_found=True, + ), + suggestions=[ + AgencySuggestion( + agency_id=param.agency_id, + confidence=80, + ) + ], + ) + subtask_data_list.append(subtask_data) + + await self._upload_subtask_data(subtask_data_list) + + async def _get_params(self) -> list[AgencyBatchLinkSubtaskParams]: + return await self.adb_client.run_query_builder( + GetLocationBatchLinkSubtaskParamsQueryBuilder() + ) \ No newline at end of file diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/params.py b/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/params.py new file mode 100644 index 00000000..3008f9be --- /dev/null +++ b/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/params.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +class AgencyBatchLinkSubtaskParams(BaseModel): + url_id: int + agency_id: int \ No newline at end of file diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/query.py b/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/query.py new file mode 100644 index 00000000..008bd1f2 --- /dev/null +++ b/src/core/tasks/url/operators/agency_identification/subtasks/impl/batch_link/query.py @@ -0,0 +1,45 @@ +from typing import Sequence + +from sqlalchemy import select, RowMapping +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.tasks.url.operators.agency_identification.subtasks.impl.batch_link.params import \ + AgencyBatchLinkSubtaskParams +from src.core.tasks.url.operators.agency_identification.subtasks.queries.survey.queries.ctes.eligible import \ + EligibleContainer +from src.db.models.impl.link.agency_batch.sqlalchemy import LinkAgencyBatch +from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL +from src.db.queries.base.builder import QueryBuilderBase +from src.db.helpers.session import session_helper as sh + +class GetLocationBatchLinkSubtaskParamsQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> list[AgencyBatchLinkSubtaskParams]: + container = EligibleContainer() + query = ( + select( + container.url_id, + LinkAgencyBatch.agency_id, + ) + .select_from(container.cte) + .join( + LinkBatchURL, + LinkBatchURL.url_id == container.url_id, + ) + .join( + LinkAgencyBatch, + LinkAgencyBatch.batch_id == LinkBatchURL.batch_id, + ) + .where( + container.batch_link, + ) + .limit(500) + ) + results: Sequence[RowMapping] = await sh.mappings(session, query=query) + return [ + AgencyBatchLinkSubtaskParams( + url_id=mapping["id"], + agency_id=mapping["agency_id"], + ) + for mapping in results + ] \ No newline at end of file diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/impl/ckan_/query.py b/src/core/tasks/url/operators/agency_identification/subtasks/impl/ckan_/query.py index 90e965e7..503d5414 100644 --- a/src/core/tasks/url/operators/agency_identification/subtasks/impl/ckan_/query.py +++ b/src/core/tasks/url/operators/agency_identification/subtasks/impl/ckan_/query.py @@ -3,13 +3,10 @@ from sqlalchemy import select, RowMapping from sqlalchemy.ext.asyncio import AsyncSession -from src.collectors.enums import CollectorType from src.core.tasks.url.operators.agency_identification.subtasks.impl.ckan_.params import CKANAgencyIDSubtaskParams from src.core.tasks.url.operators.agency_identification.subtasks.queries.survey.queries.ctes.eligible import \ EligibleContainer from src.db.helpers.session import session_helper as sh -from src.db.models.impl.batch.sqlalchemy import Batch -from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL from src.db.models.impl.url.core.sqlalchemy import URL from src.db.queries.base.builder import QueryBuilderBase diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/loader.py b/src/core/tasks/url/operators/agency_identification/subtasks/loader.py index 50bbe255..24099540 100644 --- a/src/core/tasks/url/operators/agency_identification/subtasks/loader.py +++ b/src/core/tasks/url/operators/agency_identification/subtasks/loader.py @@ -1,4 +1,6 @@ from src.collectors.impl.muckrock.api_interface.core import MuckrockAPIInterface +from src.core.tasks.url.operators.agency_identification.subtasks.impl.batch_link.core import \ + AgencyBatchLinkSubtaskOperator from src.core.tasks.url.operators.agency_identification.subtasks.impl.ckan_.core import CKANAgencyIDSubtaskOperator from src.core.tasks.url.operators.agency_identification.subtasks.impl.homepage_match_.core import \ HomepageMatchSubtaskOperator @@ -52,6 +54,15 @@ def _load_nlp_location_match_subtask(self, task_id: int) -> NLPLocationMatchSubt adb_client=self.adb_client, ) + def _load_batch_link_subtask( + self, + task_id: int + ) -> AgencyBatchLinkSubtaskOperator: + return AgencyBatchLinkSubtaskOperator( + task_id=task_id, + adb_client=self.adb_client, + ) + async def load_subtask( self, @@ -68,4 +79,6 @@ async def load_subtask( return self._load_nlp_location_match_subtask(task_id) case AutoAgencyIDSubtaskType.HOMEPAGE_MATCH: return self._load_homepage_match_subtask(task_id) + case AutoAgencyIDSubtaskType.BATCH_LINK: + return self._load_batch_link_subtask(task_id) raise ValueError(f"Unknown subtask type: {subtask_type}") diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/constants.py b/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/constants.py index 749332e6..bea99266 100644 --- a/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/constants.py +++ b/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/constants.py @@ -5,7 +5,8 @@ AutoAgencyIDSubtaskType.CKAN, AutoAgencyIDSubtaskType.MUCKROCK, AutoAgencyIDSubtaskType.HOMEPAGE_MATCH, - AutoAgencyIDSubtaskType.NLP_LOCATION_MATCH + AutoAgencyIDSubtaskType.NLP_LOCATION_MATCH, + AutoAgencyIDSubtaskType.BATCH_LINK ] SUBTASK_HIERARCHY_MAPPING: dict[AutoAgencyIDSubtaskType, int] = { diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/ctes/eligible.py b/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/ctes/eligible.py index 31d4e63c..ff7e2d72 100644 --- a/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/ctes/eligible.py +++ b/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/ctes/eligible.py @@ -4,6 +4,8 @@ HIGH_CONFIDENCE_ANNOTATIONS_EXISTS_CONTAINER from src.core.tasks.url.operators._shared.ctes.validated import \ VALIDATED_EXISTS_CONTAINER +from src.core.tasks.url.operators.agency_identification.subtasks.queries.survey.queries.ctes.subtask.impl.batch_link import \ + BATCH_LINK_SUBTASK_CONTAINER from src.core.tasks.url.operators.agency_identification.subtasks.queries.survey.queries.ctes.subtask.impl.ckan import \ CKAN_SUBTASK_CONTAINER from src.core.tasks.url.operators.agency_identification.subtasks.queries.survey.queries.ctes.subtask.impl.homepage import \ @@ -24,6 +26,7 @@ def __init__(self): MUCKROCK_SUBTASK_CONTAINER.eligible_query.label("muckrock"), HOMEPAGE_SUBTASK_CONTAINER.eligible_query.label("homepage"), NLP_LOCATION_CONTAINER.eligible_query.label("nlp_location"), + BATCH_LINK_SUBTASK_CONTAINER.eligible_query.label("batch_link"), ) .where( HIGH_CONFIDENCE_ANNOTATIONS_EXISTS_CONTAINER.not_exists_query, @@ -44,6 +47,10 @@ def url_id(self) -> Column[int]: def ckan(self) -> Column[bool]: return self._cte.c['ckan'] + @property + def batch_link(self) -> Column[bool]: + return self._cte.c['batch_link'] + @property def muckrock(self) -> Column[bool]: return self._cte.c['muckrock'] diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/ctes/subtask/impl/batch_link.py b/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/ctes/subtask/impl/batch_link.py new file mode 100644 index 00000000..42fcc02f --- /dev/null +++ b/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/ctes/subtask/impl/batch_link.py @@ -0,0 +1,31 @@ +from sqlalchemy import select + +from src.core.tasks.url.operators._shared.container.subtask.eligible import URLsSubtaskEligibleCTEContainer +from src.core.tasks.url.operators.agency_identification.subtasks.queries.survey.queries.ctes.subtask.helpers import \ + get_exists_subtask_query +from src.db.models.impl.link.agency_batch.sqlalchemy import LinkAgencyBatch +from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL +from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.models.impl.url.suggestion.agency.subtask.enum import AutoAgencyIDSubtaskType + +cte = ( + select( + URL.id, + get_exists_subtask_query( + AutoAgencyIDSubtaskType.BATCH_LINK, + ) + ) + .join( + LinkBatchURL, + LinkBatchURL.url_id == URL.id, + ) + .join( + LinkAgencyBatch, + LinkAgencyBatch.batch_id == LinkBatchURL.batch_id, + ) + .cte("batch_link_eligible") +) + +BATCH_LINK_SUBTASK_CONTAINER = URLsSubtaskEligibleCTEContainer( + cte, +) \ No newline at end of file diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/eligible_counts.py b/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/eligible_counts.py index 96a322cb..d3b7fe6b 100644 --- a/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/eligible_counts.py +++ b/src/core/tasks/url/operators/agency_identification/subtasks/queries/survey/queries/eligible_counts.py @@ -21,5 +21,6 @@ def sum_count(col: ColumnElement[bool], subtask_type: AutoAgencyIDSubtaskType) - sum_count(container.muckrock, AutoAgencyIDSubtaskType.MUCKROCK), sum_count(container.homepage, AutoAgencyIDSubtaskType.HOMEPAGE_MATCH), sum_count(container.nlp_location, AutoAgencyIDSubtaskType.NLP_LOCATION_MATCH), + sum_count(container.batch_link, AutoAgencyIDSubtaskType.BATCH_LINK) ) ) \ No newline at end of file diff --git a/src/core/tasks/url/operators/location_id/subtasks/flags/mappings.py b/src/core/tasks/url/operators/location_id/subtasks/flags/mappings.py index 6a47590e..48f5d194 100644 --- a/src/core/tasks/url/operators/location_id/subtasks/flags/mappings.py +++ b/src/core/tasks/url/operators/location_id/subtasks/flags/mappings.py @@ -2,4 +2,5 @@ SUBTASK_TO_ENV_FLAG: dict[LocationIDSubtaskType, str] = { LocationIDSubtaskType.NLP_LOCATION_FREQUENCY: "LOCATION_ID_NLP_LOCATION_MATCH_FLAG", + LocationIDSubtaskType.BATCH_LINK: "LOCATION_ID_BATCH_LINK_FLAG", } \ No newline at end of file diff --git a/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/__init__.py b/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/core.py b/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/core.py new file mode 100644 index 00000000..a85e572a --- /dev/null +++ b/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/core.py @@ -0,0 +1,56 @@ +from src.core.tasks.url.operators.location_id.subtasks.impl.batch_link.inputs import LocationBatchLinkInput +from src.core.tasks.url.operators.location_id.subtasks.impl.batch_link.query import GetLocationBatchLinkQueryBuilder +from src.core.tasks.url.operators.location_id.subtasks.impl.nlp_location_freq.constants import ITERATIONS_PER_SUBTASK +from src.core.tasks.url.operators.location_id.subtasks.models.subtask import AutoLocationIDSubtaskData +from src.core.tasks.url.operators.location_id.subtasks.models.suggestion import LocationSuggestion +from src.core.tasks.url.operators.location_id.subtasks.templates.subtask import LocationIDSubtaskOperatorBase +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.impl.url.suggestion.location.auto.subtask.enums import LocationIDSubtaskType +from src.db.models.impl.url.suggestion.location.auto.subtask.pydantic import AutoLocationIDSubtaskPydantic + + +class LocationBatchLinkSubtaskOperator(LocationIDSubtaskOperatorBase): + + def __init__( + self, + task_id: int, + adb_client: AsyncDatabaseClient, + ): + super().__init__(adb_client=adb_client, task_id=task_id) + + async def inner_logic(self) -> None: + for iteration in range(ITERATIONS_PER_SUBTASK): + inputs: list[LocationBatchLinkInput] = await self._get_from_db() + if len(inputs) == 0: + break + await self.run_subtask_iteration(inputs) + + async def run_subtask_iteration( + self, + inputs: list[LocationBatchLinkInput] + ) -> None: + self.linked_urls.extend([input_.url_id for input_ in inputs]) + subtask_data_list: list[AutoLocationIDSubtaskData] = [] + for input_ in inputs: + subtask_data_list.append( + AutoLocationIDSubtaskData( + pydantic_model=AutoLocationIDSubtaskPydantic( + url_id=input_.url_id, + task_id=self.task_id, + locations_found=True, + type=LocationIDSubtaskType.BATCH_LINK, + ), + suggestions=[ + LocationSuggestion( + location_id=input_.location_id, + confidence=80, + ) + ] + ) + ) + + await self._upload_subtask_data(subtask_data_list) + + async def _get_from_db(self) -> list[LocationBatchLinkInput]: + query = GetLocationBatchLinkQueryBuilder() + return await self.adb_client.run_query_builder(query) \ No newline at end of file diff --git a/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/inputs.py b/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/inputs.py new file mode 100644 index 00000000..0bd10414 --- /dev/null +++ b/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/inputs.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +class LocationBatchLinkInput(BaseModel): + location_id: int + url_id: int \ No newline at end of file diff --git a/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/query.py b/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/query.py new file mode 100644 index 00000000..1a7d424f --- /dev/null +++ b/src/core/tasks/url/operators/location_id/subtasks/impl/batch_link/query.py @@ -0,0 +1,46 @@ +from typing import Sequence + +from sqlalchemy import select, RowMapping +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.tasks.url.operators.location_id.subtasks.impl.batch_link.inputs import LocationBatchLinkInput +from src.core.tasks.url.operators.location_id.subtasks.impl.nlp_location_freq.constants import \ + NUMBER_OF_ENTRIES_PER_ITERATION +from src.core.tasks.url.operators.location_id.subtasks.queries.survey.queries.ctes.eligible import EligibleContainer +from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL +from src.db.models.impl.link.location_batch.sqlalchemy import LinkLocationBatch +from src.db.queries.base.builder import QueryBuilderBase +from src.db.helpers.session import session_helper as sh + +class GetLocationBatchLinkQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> list[LocationBatchLinkInput]: + container = EligibleContainer() + query = ( + select( + LinkLocationBatch.location_id, + LinkBatchURL.url_id + ) + .join( + LinkLocationBatch, + LinkBatchURL.batch_id == LinkLocationBatch.batch_id, + ) + .join( + container.cte, + LinkBatchURL.url_id == container.url_id, + ) + .where( + container.batch_link, + ) + .limit(NUMBER_OF_ENTRIES_PER_ITERATION) + ) + + mappings: Sequence[RowMapping] = await sh.mappings(session, query=query) + inputs: list[LocationBatchLinkInput] = [ + LocationBatchLinkInput( + location_id=mapping["location_id"], + url_id=mapping["url_id"], + ) + for mapping in mappings + ] + return inputs diff --git a/src/core/tasks/url/operators/location_id/subtasks/loader.py b/src/core/tasks/url/operators/location_id/subtasks/loader.py index b8267cdb..408b5a07 100644 --- a/src/core/tasks/url/operators/location_id/subtasks/loader.py +++ b/src/core/tasks/url/operators/location_id/subtasks/loader.py @@ -1,3 +1,4 @@ +from src.core.tasks.url.operators.location_id.subtasks.impl.batch_link.core import LocationBatchLinkSubtaskOperator from src.core.tasks.url.operators.location_id.subtasks.impl.nlp_location_freq.core import \ NLPLocationFrequencySubtaskOperator from src.core.tasks.url.operators.location_id.subtasks.impl.nlp_location_freq.processor.nlp.core import NLPProcessor @@ -24,6 +25,12 @@ def _load_nlp_location_match_subtask(self, task_id: int) -> NLPLocationFrequency nlp_processor=self._nlp_processor ) + def _load_batch_link_subtask(self, task_id: int) -> LocationBatchLinkSubtaskOperator: + return LocationBatchLinkSubtaskOperator( + task_id=task_id, + adb_client=self.adb_client, + ) + async def load_subtask( self, subtask_type: LocationIDSubtaskType, @@ -32,4 +39,6 @@ async def load_subtask( match subtask_type: case LocationIDSubtaskType.NLP_LOCATION_FREQUENCY: return self._load_nlp_location_match_subtask(task_id=task_id) + case LocationIDSubtaskType.BATCH_LINK: + return self._load_batch_link_subtask(task_id=task_id) raise ValueError(f"Unknown subtask type: {subtask_type}") diff --git a/src/core/tasks/url/operators/location_id/subtasks/queries/survey/constants.py b/src/core/tasks/url/operators/location_id/subtasks/queries/survey/constants.py index 0465f295..b9f85e2d 100644 --- a/src/core/tasks/url/operators/location_id/subtasks/queries/survey/constants.py +++ b/src/core/tasks/url/operators/location_id/subtasks/queries/survey/constants.py @@ -3,6 +3,7 @@ SUBTASK_HIERARCHY: list[LocationIDSubtaskType] = [ LocationIDSubtaskType.NLP_LOCATION_FREQUENCY, + LocationIDSubtaskType.BATCH_LINK ] SUBTASK_HIERARCHY_MAPPING: dict[LocationIDSubtaskType, int] = { diff --git a/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/ctes/eligible.py b/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/ctes/eligible.py index b2d2986c..1c97f8fb 100644 --- a/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/ctes/eligible.py +++ b/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/ctes/eligible.py @@ -5,6 +5,8 @@ from src.core.tasks.url.operators._shared.ctes.validated import VALIDATED_EXISTS_CONTAINER from src.core.tasks.url.operators.location_id.subtasks.queries.survey.queries.ctes.exists.high_confidence_annotations import \ HIGH_CONFIDENCE_ANNOTATIONS_EXISTS_CONTAINER +from src.core.tasks.url.operators.location_id.subtasks.queries.survey.queries.ctes.subtask.impl.batch_link import \ + BATCH_LINK_CONTAINER from src.core.tasks.url.operators.location_id.subtasks.queries.survey.queries.ctes.subtask.impl.nlp_location_freq import \ NLP_LOCATION_CONTAINER from src.db.models.impl.url.core.sqlalchemy import URL @@ -17,6 +19,7 @@ def __init__(self): select( URL.id, NLP_LOCATION_CONTAINER.eligible_query.label("nlp_location"), + BATCH_LINK_CONTAINER.eligible_query.label("batch_link"), ) .where( HIGH_CONFIDENCE_ANNOTATIONS_EXISTS_CONTAINER.not_exists_query, @@ -35,4 +38,8 @@ def url_id(self) -> Column[int]: @property def nlp_location(self) -> Column[bool]: - return self._cte.c['nlp_location'] \ No newline at end of file + return self._cte.c['nlp_location'] + + @property + def batch_link(self) -> Column[bool]: + return self._cte.c['batch_link'] \ No newline at end of file diff --git a/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/ctes/subtask/impl/batch_link.py b/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/ctes/subtask/impl/batch_link.py new file mode 100644 index 00000000..14c2f260 --- /dev/null +++ b/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/ctes/subtask/impl/batch_link.py @@ -0,0 +1,31 @@ +from sqlalchemy import select + +from src.core.tasks.url.operators._shared.container.subtask.eligible import URLsSubtaskEligibleCTEContainer +from src.core.tasks.url.operators.location_id.subtasks.queries.survey.queries.ctes.subtask.helpers import \ + get_exists_subtask_query +from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL +from src.db.models.impl.link.location_batch.sqlalchemy import LinkLocationBatch +from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.models.impl.url.suggestion.location.auto.subtask.enums import LocationIDSubtaskType + +cte = ( + select( + URL.id, + get_exists_subtask_query( + LocationIDSubtaskType.BATCH_LINK + ) + ) + .join( + LinkBatchURL, + LinkBatchURL.url_id == URL.id, + ) + .join( + LinkLocationBatch, + LinkLocationBatch.batch_id == LinkBatchURL.batch_id, + ) + .cte("batch_link") +) + +BATCH_LINK_CONTAINER = URLsSubtaskEligibleCTEContainer( + cte, +) diff --git a/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/eligible_counts.py b/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/eligible_counts.py index 707fffeb..b803b7f2 100644 --- a/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/eligible_counts.py +++ b/src/core/tasks/url/operators/location_id/subtasks/queries/survey/queries/eligible_counts.py @@ -17,5 +17,6 @@ def sum_count(col: ColumnElement[bool], subtask_type: LocationIDSubtaskType) -> ELIGIBLE_COUNTS_QUERY = ( select( sum_count(container.nlp_location, LocationIDSubtaskType.NLP_LOCATION_FREQUENCY), + sum_count(container.batch_link, LocationIDSubtaskType.BATCH_LINK) ) ) \ No newline at end of file diff --git a/src/db/models/impl/link/agency_batch/sqlalchemy.py b/src/db/models/impl/link/agency_batch/sqlalchemy.py index 57e235ba..dcb670d3 100644 --- a/src/db/models/impl/link/agency_batch/sqlalchemy.py +++ b/src/db/models/impl/link/agency_batch/sqlalchemy.py @@ -1,6 +1,6 @@ from sqlalchemy import PrimaryKeyConstraint -from src.db.models.mixins import CreatedAtMixin, LocationDependentMixin, AgencyDependentMixin, BatchDependentMixin +from src.db.models.mixins import CreatedAtMixin, AgencyDependentMixin, BatchDependentMixin from src.db.models.templates_.base import Base diff --git a/src/db/models/impl/url/suggestion/agency/subtask/enum.py b/src/db/models/impl/url/suggestion/agency/subtask/enum.py index f3ee7c3f..ef1ecbc0 100644 --- a/src/db/models/impl/url/suggestion/agency/subtask/enum.py +++ b/src/db/models/impl/url/suggestion/agency/subtask/enum.py @@ -6,6 +6,7 @@ class AutoAgencyIDSubtaskType(Enum): NLP_LOCATION_MATCH = "nlp_location_match" MUCKROCK = "muckrock_match" CKAN = "ckan_match" + BATCH_LINK = "batch_link" class SubtaskDetailCode(Enum): NO_DETAILS = "no details" diff --git a/src/db/models/impl/url/suggestion/location/auto/subtask/enums.py b/src/db/models/impl/url/suggestion/location/auto/subtask/enums.py index c42f53c2..c4937af3 100644 --- a/src/db/models/impl/url/suggestion/location/auto/subtask/enums.py +++ b/src/db/models/impl/url/suggestion/location/auto/subtask/enums.py @@ -2,4 +2,5 @@ class LocationIDSubtaskType(Enum): - NLP_LOCATION_FREQUENCY = 'nlp_location_frequency' \ No newline at end of file + NLP_LOCATION_FREQUENCY = 'nlp_location_frequency' + BATCH_LINK = 'batch_link' \ No newline at end of file diff --git a/src/util/alembic_helpers.py b/src/util/alembic_helpers.py index 3ca0db71..668d1298 100644 --- a/src/util/alembic_helpers.py +++ b/src/util/alembic_helpers.py @@ -170,4 +170,10 @@ def agency_id_column(nullable=False) -> sa.Column: ), nullable=nullable, comment='A foreign key to the `agencies` table.' - ) \ No newline at end of file + ) + +def add_enum_value( + enum_name: str, + enum_value: str +) -> None: + op.execute(f"ALTER TYPE {enum_name} ADD VALUE '{enum_value}'") \ No newline at end of file diff --git a/tests/automated/integration/tasks/url/impl/agency_identification/subtasks/batch_link/__init__.py b/tests/automated/integration/tasks/url/impl/agency_identification/subtasks/batch_link/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/automated/integration/tasks/url/impl/agency_identification/subtasks/batch_link/test_core.py b/tests/automated/integration/tasks/url/impl/agency_identification/subtasks/batch_link/test_core.py new file mode 100644 index 00000000..b39d74ca --- /dev/null +++ b/tests/automated/integration/tasks/url/impl/agency_identification/subtasks/batch_link/test_core.py @@ -0,0 +1,65 @@ +import pytest + +from src.core.tasks.url.operators.agency_identification.core import AgencyIdentificationTaskOperator +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.impl.link.agency_batch.sqlalchemy import LinkAgencyBatch +from src.db.models.impl.url.suggestion.agency.subtask.enum import AutoAgencyIDSubtaskType +from src.db.models.impl.url.suggestion.agency.subtask.sqlalchemy import URLAutoAgencyIDSubtask +from src.db.models.impl.url.suggestion.agency.suggestion.sqlalchemy import AgencyIDSubtaskSuggestion +from tests.helpers.batch_creation_parameters.core import TestBatchCreationParameters +from tests.helpers.batch_creation_parameters.url_creation_parameters import TestURLCreationParameters +from tests.helpers.data_creator.core import DBDataCreator +from tests.helpers.data_creator.models.creation_info.batch.v2 import BatchURLCreationInfoV2 +from tests.helpers.run import run_task_and_confirm_success + + +@pytest.mark.asyncio +async def test_batch_link_subtask( + operator: AgencyIdentificationTaskOperator, + db_data_creator: DBDataCreator +): + + adb_client: AsyncDatabaseClient = operator.adb_client + + creation_info: BatchURLCreationInfoV2 = await db_data_creator.batch_v2( + parameters=TestBatchCreationParameters( + urls=[ + TestURLCreationParameters( + count=2 + ) + ] + ) + ) + batch_id: int = creation_info.batch_id + url_ids: list[int] = creation_info.url_ids + + agency_id: int = await db_data_creator.agency() + + link = LinkAgencyBatch( + agency_id=agency_id, + batch_id=batch_id + ) + await adb_client.add(link) + + assert await operator.meets_task_prerequisites() + assert operator._subtask == AutoAgencyIDSubtaskType.BATCH_LINK + + await run_task_and_confirm_success(operator) + + assert not await operator.meets_task_prerequisites() + assert operator._subtask is None + + subtasks: list[URLAutoAgencyIDSubtask] = await adb_client.get_all(URLAutoAgencyIDSubtask) + assert len(subtasks) == 2 + subtask: URLAutoAgencyIDSubtask = subtasks[0] + assert subtask.type == AutoAgencyIDSubtaskType.BATCH_LINK + + assert subtask.agencies_found + + suggestions: list[AgencyIDSubtaskSuggestion] = await adb_client.get_all(AgencyIDSubtaskSuggestion) + assert len(suggestions) == 2 + + assert all(sugg.confidence == 80 for sugg in suggestions) + assert all(sugg.agency_id == agency_id for sugg in suggestions) + + diff --git a/tests/automated/integration/tasks/url/impl/location_identification/subtasks/batch_link/__init__.py b/tests/automated/integration/tasks/url/impl/location_identification/subtasks/batch_link/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/automated/integration/tasks/url/impl/location_identification/subtasks/batch_link/test_core.py b/tests/automated/integration/tasks/url/impl/location_identification/subtasks/batch_link/test_core.py new file mode 100644 index 00000000..ab505627 --- /dev/null +++ b/tests/automated/integration/tasks/url/impl/location_identification/subtasks/batch_link/test_core.py @@ -0,0 +1,64 @@ +import pytest + +from src.core.tasks.url.operators.location_id.core import LocationIdentificationTaskOperator +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.impl.link.location_batch.sqlalchemy import LinkLocationBatch +from src.db.models.impl.url.suggestion.location.auto.subtask.enums import LocationIDSubtaskType +from src.db.models.impl.url.suggestion.location.auto.subtask.sqlalchemy import AutoLocationIDSubtask +from src.db.models.impl.url.suggestion.location.auto.suggestion.sqlalchemy import LocationIDSubtaskSuggestion +from tests.helpers.batch_creation_parameters.core import TestBatchCreationParameters +from tests.helpers.batch_creation_parameters.url_creation_parameters import TestURLCreationParameters +from tests.helpers.data_creator.core import DBDataCreator +from tests.helpers.data_creator.models.creation_info.batch.v2 import BatchURLCreationInfoV2 +from tests.helpers.data_creator.models.creation_info.locality import LocalityCreationInfo +from tests.helpers.run import run_task_and_confirm_success + + +@pytest.mark.asyncio +async def test_batch_link_subtask( + operator: LocationIdentificationTaskOperator, + db_data_creator: DBDataCreator, + pittsburgh_locality: LocalityCreationInfo +): + + adb_client: AsyncDatabaseClient = operator.adb_client + + creation_info: BatchURLCreationInfoV2 = await db_data_creator.batch_v2( + parameters=TestBatchCreationParameters( + urls=[ + TestURLCreationParameters( + count=2 + ) + ] + ) + ) + batch_id: int = creation_info.batch_id + url_ids: list[int] = creation_info.url_ids + + location_id: int = pittsburgh_locality.location_id + + link = LinkLocationBatch( + location_id=location_id, + batch_id=batch_id + ) + await adb_client.add(link) + + assert await operator.meets_task_prerequisites() + assert operator._subtask == LocationIDSubtaskType.BATCH_LINK + + await run_task_and_confirm_success(operator) + + assert not await operator.meets_task_prerequisites() + assert operator._subtask is None + + subtasks: list[AutoLocationIDSubtask] = await adb_client.get_all(AutoLocationIDSubtask) + assert len(subtasks) == 2 + subtask: AutoLocationIDSubtask = subtasks[0] + assert subtask.type == LocationIDSubtaskType.BATCH_LINK + assert subtask.locations_found + + suggestions: list[LocationIDSubtaskSuggestion] = await adb_client.get_all(LocationIDSubtaskSuggestion) + assert len(suggestions) == 2 + + assert all(sugg.confidence == 80 for sugg in suggestions) + assert all(sugg.location_id == location_id for sugg in suggestions) \ No newline at end of file