From 744c1bdbef63c338787ddacddc668112f315ac9b Mon Sep 17 00:00:00 2001 From: Max Chis Date: Thu, 17 Jul 2025 13:19:01 -0400 Subject: [PATCH 1/3] Update logic to disconnect URLs from Batches --- ...ccf4_remove_batches_dependency_for_urls.py | 83 +++ .../queries/get_annotation_batch_info.py | 8 +- .../get_next_url_for_user_annotation.py | 23 +- .../agency/get/queries/next_for_annotation.py | 22 +- src/api/endpoints/annotate/all/get/query.py | 9 +- .../batches => batch/duplicates}/__init__.py | 0 .../get/duplicates.py => duplicates/dto.py} | 0 src/api/endpoints/batch/duplicates/query.py | 60 ++ src/api/endpoints/batch/routes.py | 4 +- .../{url/dtos => batch/urls}/__init__.py | 0 .../batch/{dtos/get/urls.py => urls/dto.py} | 0 src/api/endpoints/batch/urls/query.py | 31 ++ .../endpoints/collector/manual}/__init__.py | 0 src/api/endpoints/collector/manual/query.py | 80 +++ .../api/endpoints/metrics/batches/__init__.py | 0 .../metrics/batches/aggregated/__init__.py | 0 .../aggregated/dto.py} | 0 .../metrics/batches/aggregated/query.py | 117 ++++ .../metrics/batches/breakdown/__init__.py | 0 .../breakdown.py => batches/breakdown/dto.py} | 0 .../metrics/batches/breakdown/query.py | 97 ++++ src/api/endpoints/metrics/routes.py | 4 +- src/api/endpoints/review/next/query.py | 46 +- src/api/endpoints/task/by_id/__init__.py | 0 .../task/{dtos/get/task.py => by_id/dto.py} | 0 src/api/endpoints/task/by_id/query.py | 67 +++ src/api/endpoints/task/routes.py | 2 +- src/api/endpoints/url/get/__init__.py | 0 .../url/{dtos/response.py => get/dto.py} | 0 src/api/endpoints/url/get/query.py | 65 +++ src/api/endpoints/url/routes.py | 2 +- src/core/core.py | 12 +- src/core/tasks/url/manager.py | 5 - .../agency_identification/queries/__init__.py | 0 ...pending_urls_without_agency_suggestions.py | 36 ++ .../operators/url_html/queries/__init__.py | 0 .../get_pending_urls_without_html_data.py | 32 ++ .../queries/__init__.py | 0 ...pending_urls_missing_miscellaneous_data.py | 43 ++ ...pending_urls_missing_miscellaneous_data.py | 14 + src/db/client/async_.py | 511 +++--------------- src/db/client/helpers.py | 3 + src/db/client/sync.py | 65 ++- src/db/dto_converter.py | 18 - src/db/models/instantiations/batch.py | 6 +- src/db/models/instantiations/link/__init__.py | 0 .../instantiations/link/link_batch_urls.py | 17 + .../{ => link}/link_task_url.py | 0 src/db/models/instantiations/url/core.py | 8 +- .../url_counts/builder.py | 10 +- src/db/statement_composer.py | 10 +- src/util/alembic_helpers.py | 23 +- .../api/_helpers/RequestValidator.py | 12 +- .../integration/api/test_duplicates.py | 63 --- .../integration/api/test_example_collector.py | 1 - .../integration/api/test_manual_batch.py | 8 +- tests/automated/integration/api/test_url.py | 2 +- .../security_manager/test_security_manager.py | 2 - tests/automated/integration/tasks/asserts.py | 5 +- 59 files changed, 1016 insertions(+), 610 deletions(-) create mode 100644 alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py rename src/api/endpoints/{metrics/dtos/get/batches => batch/duplicates}/__init__.py (100%) rename src/api/endpoints/batch/{dtos/get/duplicates.py => duplicates/dto.py} (100%) create mode 100644 src/api/endpoints/batch/duplicates/query.py rename src/api/endpoints/{url/dtos => batch/urls}/__init__.py (100%) rename src/api/endpoints/batch/{dtos/get/urls.py => urls/dto.py} (100%) create mode 100644 src/api/endpoints/batch/urls/query.py rename {tests/automated/integration/collector_db => src/api/endpoints/collector/manual}/__init__.py (100%) create mode 100644 src/api/endpoints/collector/manual/query.py rename tests/automated/integration/collector_db/test_db_client.py => src/api/endpoints/metrics/batches/__init__.py (100%) create mode 100644 src/api/endpoints/metrics/batches/aggregated/__init__.py rename src/api/endpoints/metrics/{dtos/get/batches/aggregated.py => batches/aggregated/dto.py} (100%) create mode 100644 src/api/endpoints/metrics/batches/aggregated/query.py create mode 100644 src/api/endpoints/metrics/batches/breakdown/__init__.py rename src/api/endpoints/metrics/{dtos/get/batches/breakdown.py => batches/breakdown/dto.py} (100%) create mode 100644 src/api/endpoints/metrics/batches/breakdown/query.py create mode 100644 src/api/endpoints/task/by_id/__init__.py rename src/api/endpoints/task/{dtos/get/task.py => by_id/dto.py} (100%) create mode 100644 src/api/endpoints/task/by_id/query.py create mode 100644 src/api/endpoints/url/get/__init__.py rename src/api/endpoints/url/{dtos/response.py => get/dto.py} (100%) create mode 100644 src/api/endpoints/url/get/query.py create mode 100644 src/core/tasks/url/operators/agency_identification/queries/__init__.py create mode 100644 src/core/tasks/url/operators/agency_identification/queries/get_pending_urls_without_agency_suggestions.py create mode 100644 src/core/tasks/url/operators/url_html/queries/__init__.py create mode 100644 src/core/tasks/url/operators/url_html/queries/get_pending_urls_without_html_data.py create mode 100644 src/core/tasks/url/operators/url_miscellaneous_metadata/queries/__init__.py create mode 100644 src/core/tasks/url/operators/url_miscellaneous_metadata/queries/get_pending_urls_missing_miscellaneous_data.py create mode 100644 src/core/tasks/url/operators/url_miscellaneous_metadata/queries/has_pending_urls_missing_miscellaneous_data.py create mode 100644 src/db/client/helpers.py create mode 100644 src/db/models/instantiations/link/__init__.py create mode 100644 src/db/models/instantiations/link/link_batch_urls.py rename src/db/models/instantiations/{ => link}/link_task_url.py (100%) delete mode 100644 tests/automated/integration/api/test_duplicates.py diff --git a/alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py b/alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py new file mode 100644 index 00000000..5a97a228 --- /dev/null +++ b/alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py @@ -0,0 +1,83 @@ +"""Remove batches dependency for urls + +Revision ID: 9552d354ccf4 +Revises: d6519a3ca5c9 +Create Date: 2025-07-17 08:56:22.919486 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +from src.util.alembic_helpers import id_column, created_at_column, updated_at_column, url_id_column, batch_id_column + +# revision identifiers, used by Alembic. +revision: str = '9552d354ccf4' +down_revision: Union[str, None] = 'd6519a3ca5c9' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +LINK_TABLE_NAME = 'link_batch_urls' + +BATCHES_COLUMN_NAME = 'batch_id' + +def _create_link_table(): + op.create_table( + LINK_TABLE_NAME, + id_column(), + batch_id_column(), + url_id_column(), + created_at_column(), + updated_at_column(), + sa.UniqueConstraint('url_id', name='uq_link_table_url_id') + ) + +def _drop_link_table(): + op.drop_table(LINK_TABLE_NAME) + +def _migrate_batch_ids_to_link_table(): + op.execute(f""" + INSERT INTO {LINK_TABLE_NAME} (batch_id, url_id) + SELECT batch_id, id + FROM urls + """) + +def _migrate_link_table_to_batch_ids(): + op.execute(f""" + UPDATE urls + SET batch_id = ( + SELECT batch_id + FROM {LINK_TABLE_NAME} + WHERE url_id = urls.id + ) + """) + +def _drop_url_batches_column(): + op.drop_column('urls', BATCHES_COLUMN_NAME) + +def _add_url_batches_column(): + op.add_column( + 'urls', + batch_id_column(nullable=True) + ) + +def _add_not_null_constraint(): + op.alter_column( + 'urls', + BATCHES_COLUMN_NAME, + nullable=False + ) + + +def upgrade() -> None: + _create_link_table() + _migrate_batch_ids_to_link_table() + _drop_url_batches_column() + + +def downgrade() -> None: + _add_url_batches_column() + _migrate_link_table_to_batch_ids() + _add_not_null_constraint() + _drop_link_table() diff --git a/src/api/endpoints/annotate/_shared/queries/get_annotation_batch_info.py b/src/api/endpoints/annotate/_shared/queries/get_annotation_batch_info.py index b9b0a634..15f5b631 100644 --- a/src/api/endpoints/annotate/_shared/queries/get_annotation_batch_info.py +++ b/src/api/endpoints/annotate/_shared/queries/get_annotation_batch_info.py @@ -5,6 +5,7 @@ from src.api.endpoints.annotate.dtos.shared.batch import AnnotationBatchInfo from src.collectors.enums import URLStatus +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL from src.db.models.instantiations.url.core import URL from src.db.queries.base.builder import QueryBuilderBase from src.db.statement_composer import StatementComposer @@ -35,11 +36,14 @@ async def run( for model in self.models ] - select_url = select(func.count(URL.id)) + select_url = ( + select(func.count(URL.id)) + .join(LinkBatchURL) + ) common_where_clause = [ URL.outcome == URLStatus.PENDING.value, - URL.batch_id == self.batch_id, + LinkBatchURL.batch_id == self.batch_id, ] annotated_query = ( diff --git a/src/api/endpoints/annotate/_shared/queries/get_next_url_for_user_annotation.py b/src/api/endpoints/annotate/_shared/queries/get_next_url_for_user_annotation.py index c500641a..3bda8ff3 100644 --- a/src/api/endpoints/annotate/_shared/queries/get_next_url_for_user_annotation.py +++ b/src/api/endpoints/annotate/_shared/queries/get_next_url_for_user_annotation.py @@ -5,6 +5,7 @@ from src.collectors.enums import URLStatus from src.core.enums import SuggestedStatus from src.db.client.types import UserSuggestionModel +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL from src.db.models.instantiations.url.core import URL from src.db.models.instantiations.url.suggestion.relevant.user import UserRelevantSuggestion from src.db.queries.base.builder import QueryBuilderBase @@ -27,10 +28,21 @@ def __init__( self.auto_suggestion_relationship = auto_suggestion_relationship async def run(self, session: AsyncSession): - url_query = ( + query = ( select( URL, ) + ) + + if self.batch_id is not None: + query = ( + query + .join(LinkBatchURL) + .where(LinkBatchURL.batch_id == self.batch_id) + ) + + query = ( + query .where(URL.outcome == URLStatus.PENDING.value) # URL must not have user suggestion .where( @@ -39,7 +51,7 @@ async def run(self, session: AsyncSession): ) if self.check_if_annotated_not_relevant: - url_query = url_query.where( + query = query.where( not_( exists( select(UserRelevantSuggestion) @@ -51,14 +63,13 @@ async def run(self, session: AsyncSession): ) ) - if self.batch_id is not None: - url_query = url_query.where(URL.batch_id == self.batch_id) - url_query = url_query.options( + + query = query.options( joinedload(self.auto_suggestion_relationship), joinedload(URL.html_content) ).limit(1) - raw_result = await session.execute(url_query) + raw_result = await session.execute(query) return raw_result.unique().scalars().one_or_none() \ No newline at end of file diff --git a/src/api/endpoints/annotate/agency/get/queries/next_for_annotation.py b/src/api/endpoints/annotate/agency/get/queries/next_for_annotation.py index be625fb0..5bfd6e8a 100644 --- a/src/api/endpoints/annotate/agency/get/queries/next_for_annotation.py +++ b/src/api/endpoints/annotate/agency/get/queries/next_for_annotation.py @@ -10,6 +10,7 @@ from src.core.tasks.url.operators.url_html.scraper.parser.util import convert_to_response_html_info from src.db.dtos.url.mapping import URLMapping from src.db.models.instantiations.confirmed_url_agency import ConfirmedURLAgency +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL from src.db.models.instantiations.url.core import URL from src.db.models.instantiations.url.suggestion.agency.auto import AutomatedUrlAgencySuggestion from src.db.models.instantiations.url.suggestion.agency.user import UserUrlAgencySuggestion @@ -41,20 +42,19 @@ async def run( have extant autosuggestions """ # Select statement - statement = ( - select(URL.id, URL.url) - # Must not have confirmed agencies - .where( - URL.outcome == URLStatus.PENDING.value - ) + query = select(URL.id, URL.url) + if self.batch_id is not None: + query = query.join(LinkBatchURL).where(LinkBatchURL.batch_id == self.batch_id) + + # Must not have confirmed agencies + query = query.where( + URL.outcome == URLStatus.PENDING.value ) - if self.batch_id is not None: - statement = statement.where(URL.batch_id == self.batch_id) # Must not have been annotated by a user - statement = ( - statement.join(UserUrlAgencySuggestion, isouter=True) + query = ( + query.join(UserUrlAgencySuggestion, isouter=True) .where( ~exists( select(UserUrlAgencySuggestion). @@ -93,7 +93,7 @@ async def run( ) ) ).limit(1) - raw_result = await session.execute(statement) + raw_result = await session.execute(query) results = raw_result.all() if len(results) == 0: return GetNextURLForAgencyAnnotationResponse( diff --git a/src/api/endpoints/annotate/all/get/query.py b/src/api/endpoints/annotate/all/get/query.py index d23d749e..1191e8d6 100644 --- a/src/api/endpoints/annotate/all/get/query.py +++ b/src/api/endpoints/annotate/all/get/query.py @@ -11,6 +11,7 @@ from src.collectors.enums import URLStatus from src.db.dto_converter import DTOConverter from src.db.dtos.url.mapping import URLMapping +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL from src.db.models.instantiations.url.core import URL from src.db.models.instantiations.url.suggestion.agency.user import UserUrlAgencySuggestion from src.db.models.instantiations.url.suggestion.record_type.user import UserRecordTypeSuggestion @@ -32,8 +33,11 @@ async def run( self, session: AsyncSession ) -> GetNextURLForAllAnnotationResponse: + query = Select(URL) + if self.batch_id is not None: + query = query.join(LinkBatchURL).where(LinkBatchURL.batch_id == self.batch_id) query = ( - Select(URL) + query .where( and_( URL.outcome == URLStatus.PENDING.value, @@ -43,8 +47,7 @@ async def run( ) ) ) - if self.batch_id is not None: - query = query.where(URL.batch_id == self.batch_id) + load_options = [ URL.html_content, diff --git a/src/api/endpoints/metrics/dtos/get/batches/__init__.py b/src/api/endpoints/batch/duplicates/__init__.py similarity index 100% rename from src/api/endpoints/metrics/dtos/get/batches/__init__.py rename to src/api/endpoints/batch/duplicates/__init__.py diff --git a/src/api/endpoints/batch/dtos/get/duplicates.py b/src/api/endpoints/batch/duplicates/dto.py similarity index 100% rename from src/api/endpoints/batch/dtos/get/duplicates.py rename to src/api/endpoints/batch/duplicates/dto.py diff --git a/src/api/endpoints/batch/duplicates/query.py b/src/api/endpoints/batch/duplicates/query.py new file mode 100644 index 00000000..a4c3aa31 --- /dev/null +++ b/src/api/endpoints/batch/duplicates/query.py @@ -0,0 +1,60 @@ +from sqlalchemy import Select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import aliased + +from src.db.dtos.duplicate import DuplicateInfo +from src.db.models.instantiations.batch import Batch +from src.db.models.instantiations.duplicate import Duplicate +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL +from src.db.models.instantiations.url.core import URL +from src.db.queries.base.builder import QueryBuilderBase + + +class GetDuplicatesByBatchIDQueryBuilder(QueryBuilderBase): + + def __init__( + self, + batch_id: int, + page: int + ): + super().__init__() + self.batch_id = batch_id + self.page = page + + async def run(self, session: AsyncSession) -> list[DuplicateInfo]: + original_batch = aliased(Batch) + duplicate_batch = aliased(Batch) + + query = ( + Select( + URL.url.label("source_url"), + URL.id.label("original_url_id"), + duplicate_batch.id.label("duplicate_batch_id"), + duplicate_batch.parameters.label("duplicate_batch_parameters"), + original_batch.id.label("original_batch_id"), + original_batch.parameters.label("original_batch_parameters"), + ) + .select_from(Duplicate) + .join(URL, Duplicate.original_url_id == URL.id) + .join(duplicate_batch, Duplicate.batch_id == duplicate_batch.id) + .join(LinkBatchURL, URL.id == LinkBatchURL.url_id) + .join(original_batch, LinkBatchURL.batch_id == original_batch.id) + .filter(duplicate_batch.id == self.batch_id) + .limit(100) + .offset((self.page - 1) * 100) + ) + raw_results = await session.execute(query) + results = raw_results.all() + final_results = [] + for result in results: + final_results.append( + DuplicateInfo( + source_url=result.source_url, + duplicate_batch_id=result.duplicate_batch_id, + duplicate_metadata=result.duplicate_batch_parameters, + original_batch_id=result.original_batch_id, + original_metadata=result.original_batch_parameters, + original_url_id=result.original_url_id + ) + ) + return final_results diff --git a/src/api/endpoints/batch/routes.py b/src/api/endpoints/batch/routes.py index bd3282fc..879c643d 100644 --- a/src/api/endpoints/batch/routes.py +++ b/src/api/endpoints/batch/routes.py @@ -4,12 +4,12 @@ from fastapi.params import Query, Depends from src.api.dependencies import get_async_core -from src.api.endpoints.batch.dtos.get.duplicates import GetDuplicatesByBatchResponse from src.api.endpoints.batch.dtos.get.logs import GetBatchLogsResponse from src.api.endpoints.batch.dtos.get.summaries.response import GetBatchSummariesResponse from src.api.endpoints.batch.dtos.get.summaries.summary import BatchSummary -from src.api.endpoints.batch.dtos.get.urls import GetURLsByBatchResponse from src.api.endpoints.batch.dtos.post.abort import MessageResponse +from src.api.endpoints.batch.duplicates.dto import GetDuplicatesByBatchResponse +from src.api.endpoints.batch.urls.dto import GetURLsByBatchResponse from src.collectors.enums import CollectorType from src.core.core import AsyncCore from src.core.enums import BatchStatus diff --git a/src/api/endpoints/url/dtos/__init__.py b/src/api/endpoints/batch/urls/__init__.py similarity index 100% rename from src/api/endpoints/url/dtos/__init__.py rename to src/api/endpoints/batch/urls/__init__.py diff --git a/src/api/endpoints/batch/dtos/get/urls.py b/src/api/endpoints/batch/urls/dto.py similarity index 100% rename from src/api/endpoints/batch/dtos/get/urls.py rename to src/api/endpoints/batch/urls/dto.py diff --git a/src/api/endpoints/batch/urls/query.py b/src/api/endpoints/batch/urls/query.py new file mode 100644 index 00000000..fcfba3ee --- /dev/null +++ b/src/api/endpoints/batch/urls/query.py @@ -0,0 +1,31 @@ +from sqlalchemy import Select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.db.dtos.url.core import URLInfo +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL +from src.db.models.instantiations.url.core import URL +from src.db.queries.base.builder import QueryBuilderBase + + +class GetURLsByBatchQueryBuilder(QueryBuilderBase): + + def __init__( + self, + batch_id: int, + page: int = 1 + ): + super().__init__() + self.batch_id = batch_id + self.page = page + + async def run(self, session: AsyncSession) -> list[URLInfo]: + query = ( + Select(URL) + .join(LinkBatchURL) + .where(LinkBatchURL.batch_id == self.batch_id) + .order_by(URL.id) + .limit(100) + .offset((self.page - 1) * 100)) + result = await session.execute(query) + urls = result.scalars().all() + return [URLInfo(**url.__dict__) for url in urls] \ No newline at end of file diff --git a/tests/automated/integration/collector_db/__init__.py b/src/api/endpoints/collector/manual/__init__.py similarity index 100% rename from tests/automated/integration/collector_db/__init__.py rename to src/api/endpoints/collector/manual/__init__.py diff --git a/src/api/endpoints/collector/manual/query.py b/src/api/endpoints/collector/manual/query.py new file mode 100644 index 00000000..2f29a357 --- /dev/null +++ b/src/api/endpoints/collector/manual/query.py @@ -0,0 +1,80 @@ +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession + +from src.api.endpoints.collector.dtos.manual_batch.post import ManualBatchInputDTO +from src.api.endpoints.collector.dtos.manual_batch.response import ManualBatchResponseDTO +from src.collectors.enums import CollectorType, URLStatus +from src.core.enums import BatchStatus +from src.db.models.instantiations.batch import Batch +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL +from src.db.models.instantiations.url.core import URL +from src.db.models.instantiations.url.optional_data_source_metadata import URLOptionalDataSourceMetadata +from src.db.queries.base.builder import QueryBuilderBase + + +class UploadManualBatchQueryBuilder(QueryBuilderBase): + + def __init__( + self, + user_id: int, + dto: ManualBatchInputDTO + ): + super().__init__() + self.dto = dto + self.user_id = user_id + + + async def run(self, session: AsyncSession) -> ManualBatchResponseDTO: + batch = Batch( + strategy=CollectorType.MANUAL.value, + status=BatchStatus.READY_TO_LABEL.value, + parameters={ + "name": self.dto.name + }, + user_id=self.user_id + ) + session.add(batch) + await session.flush() + + batch_id = batch.id + url_ids = [] + duplicate_urls = [] + + for entry in self.dto.entries: + url = URL( + url=entry.url, + name=entry.name, + description=entry.description, + collector_metadata=entry.collector_metadata, + outcome=URLStatus.PENDING.value, + record_type=entry.record_type.value if entry.record_type is not None else None, + ) + + async with session.begin_nested(): + try: + session.add(url) + await session.flush() + except IntegrityError: + duplicate_urls.append(entry.url) + continue + await session.flush() + link = LinkBatchURL( + batch_id=batch_id, + url_id=url.id + ) + session.add(link) + + optional_metadata = URLOptionalDataSourceMetadata( + url_id=url.id, + record_formats=entry.record_formats, + data_portal_type=entry.data_portal_type, + supplying_entity=entry.supplying_entity, + ) + session.add(optional_metadata) + url_ids.append(url.id) + + return ManualBatchResponseDTO( + batch_id=batch_id, + urls=url_ids, + duplicate_urls=duplicate_urls + ) \ No newline at end of file diff --git a/tests/automated/integration/collector_db/test_db_client.py b/src/api/endpoints/metrics/batches/__init__.py similarity index 100% rename from tests/automated/integration/collector_db/test_db_client.py rename to src/api/endpoints/metrics/batches/__init__.py diff --git a/src/api/endpoints/metrics/batches/aggregated/__init__.py b/src/api/endpoints/metrics/batches/aggregated/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/api/endpoints/metrics/dtos/get/batches/aggregated.py b/src/api/endpoints/metrics/batches/aggregated/dto.py similarity index 100% rename from src/api/endpoints/metrics/dtos/get/batches/aggregated.py rename to src/api/endpoints/metrics/batches/aggregated/dto.py diff --git a/src/api/endpoints/metrics/batches/aggregated/query.py b/src/api/endpoints/metrics/batches/aggregated/query.py new file mode 100644 index 00000000..12616a22 --- /dev/null +++ b/src/api/endpoints/metrics/batches/aggregated/query.py @@ -0,0 +1,117 @@ +from sqlalchemy import case, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.sql.functions import coalesce + +from src.api.endpoints.metrics.batches.aggregated.dto import GetMetricsBatchesAggregatedResponseDTO, \ + GetMetricsBatchesAggregatedInnerResponseDTO +from src.collectors.enums import URLStatus, CollectorType +from src.core.enums import BatchStatus +from src.db.models.instantiations.batch import Batch +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL +from src.db.models.instantiations.url.core import URL +from src.db.queries.base.builder import QueryBuilderBase +from src.db.statement_composer import StatementComposer + + +class GetBatchesAggregatedMetricsQueryBuilder(QueryBuilderBase): + + async def run( + self, + session: AsyncSession + ) -> GetMetricsBatchesAggregatedResponseDTO: + sc = StatementComposer + + # First, get all batches broken down by collector type and status + def batch_column(status: BatchStatus, label): + return sc.count_distinct( + case( + ( + Batch.status == status.value, + Batch.id + ) + ), + label=label + ) + + batch_count_subquery = select( + batch_column(BatchStatus.READY_TO_LABEL, label="done_count"), + batch_column(BatchStatus.ERROR, label="error_count"), + Batch.strategy, + ).group_by(Batch.strategy).subquery("batch_count") + + def url_column(status: URLStatus, label): + return sc.count_distinct( + case( + ( + URL.outcome == status.value, + URL.id + ) + ), + label=label + ) + + # Next, count urls + url_count_subquery = select( + Batch.strategy, + url_column(URLStatus.PENDING, label="pending_count"), + url_column(URLStatus.ERROR, label="error_count"), + url_column(URLStatus.VALIDATED, label="validated_count"), + url_column(URLStatus.SUBMITTED, label="submitted_count"), + url_column(URLStatus.NOT_RELEVANT, label="rejected_count"), + + ).join( + LinkBatchURL, + LinkBatchURL.url_id == URL.id + ).outerjoin( + Batch, Batch.id == LinkBatchURL.batch_id + ).group_by( + Batch.strategy + ).subquery("url_count") + + # Combine + query = select( + Batch.strategy, + batch_count_subquery.c.done_count.label("batch_done_count"), + batch_count_subquery.c.error_count.label("batch_error_count"), + coalesce(url_count_subquery.c.pending_count, 0).label("pending_count"), + coalesce(url_count_subquery.c.error_count, 0).label("error_count"), + coalesce(url_count_subquery.c.submitted_count, 0).label("submitted_count"), + coalesce(url_count_subquery.c.rejected_count, 0).label("rejected_count"), + coalesce(url_count_subquery.c.validated_count, 0).label("validated_count") + ).join( + batch_count_subquery, + Batch.strategy == batch_count_subquery.c.strategy + ).outerjoin( + url_count_subquery, + Batch.strategy == url_count_subquery.c.strategy + ) + raw_results = await session.execute(query) + results = raw_results.all() + d: dict[CollectorType, GetMetricsBatchesAggregatedInnerResponseDTO] = {} + for result in results: + d[CollectorType(result.strategy)] = GetMetricsBatchesAggregatedInnerResponseDTO( + count_successful_batches=result.batch_done_count, + count_failed_batches=result.batch_error_count, + count_urls=result.pending_count + result.submitted_count + + result.rejected_count + result.error_count + + result.validated_count, + count_urls_pending=result.pending_count, + count_urls_validated=result.validated_count, + count_urls_submitted=result.submitted_count, + count_urls_rejected=result.rejected_count, + count_urls_errors=result.error_count + ) + + total_batch_query = await session.execute( + select( + sc.count_distinct(Batch.id, label="count") + ) + ) + total_batch_count = total_batch_query.scalars().one_or_none() + if total_batch_count is None: + total_batch_count = 0 + + return GetMetricsBatchesAggregatedResponseDTO( + total_batches=total_batch_count, + by_strategy=d + ) \ No newline at end of file diff --git a/src/api/endpoints/metrics/batches/breakdown/__init__.py b/src/api/endpoints/metrics/batches/breakdown/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/api/endpoints/metrics/dtos/get/batches/breakdown.py b/src/api/endpoints/metrics/batches/breakdown/dto.py similarity index 100% rename from src/api/endpoints/metrics/dtos/get/batches/breakdown.py rename to src/api/endpoints/metrics/batches/breakdown/dto.py diff --git a/src/api/endpoints/metrics/batches/breakdown/query.py b/src/api/endpoints/metrics/batches/breakdown/query.py new file mode 100644 index 00000000..771543ac --- /dev/null +++ b/src/api/endpoints/metrics/batches/breakdown/query.py @@ -0,0 +1,97 @@ +from sqlalchemy import select, case +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.sql.functions import coalesce + +from src.api.endpoints.metrics.batches.breakdown.dto import GetMetricsBatchesBreakdownResponseDTO, \ + GetMetricsBatchesBreakdownInnerResponseDTO +from src.collectors.enums import URLStatus, CollectorType +from src.core.enums import BatchStatus +from src.db.models.instantiations.batch import Batch +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL +from src.db.models.instantiations.url.core import URL +from src.db.queries.base.builder import QueryBuilderBase +from src.db.statement_composer import StatementComposer + + +class GetBatchesBreakdownMetricsQueryBuilder(QueryBuilderBase): + + def __init__( + self, + page: int + ): + super().__init__() + self.page = page + + async def run(self, session: AsyncSession) -> GetMetricsBatchesBreakdownResponseDTO: + sc = StatementComposer + + main_query = select( + Batch.strategy, + Batch.id, + Batch.status, + Batch.date_generated.label("created_at"), + ) + + def url_column(status: URLStatus, label): + return sc.count_distinct( + case( + ( + URL.outcome == status.value, + URL.id + ) + ), + label=label + ) + + count_query = select( + LinkBatchURL.batch_id, + sc.count_distinct(URL.id, label="count_total"), + url_column(URLStatus.PENDING, label="count_pending"), + url_column(URLStatus.SUBMITTED, label="count_submitted"), + url_column(URLStatus.NOT_RELEVANT, label="count_rejected"), + url_column(URLStatus.ERROR, label="count_error"), + url_column(URLStatus.VALIDATED, label="count_validated"), + ).join(URL, LinkBatchURL.url_id == URL.id).group_by( + LinkBatchURL.batch_id + ).subquery("url_count") + + query = (select( + main_query.c.strategy, + main_query.c.id, + main_query.c.created_at, + main_query.c.status, + coalesce(count_query.c.count_total, 0).label("count_total"), + coalesce(count_query.c.count_pending, 0).label("count_pending"), + coalesce(count_query.c.count_submitted, 0).label("count_submitted"), + coalesce(count_query.c.count_rejected, 0).label("count_rejected"), + coalesce(count_query.c.count_error, 0).label("count_error"), + coalesce(count_query.c.count_validated, 0).label("count_validated"), + ).outerjoin( + count_query, + main_query.c.id == count_query.c.batch_id + ).offset( + (self.page - 1) * 100 + ).order_by( + main_query.c.created_at.asc() + )) + + raw_results = await session.execute(query) + results = raw_results.all() + batches: list[GetMetricsBatchesBreakdownInnerResponseDTO] = [] + for result in results: + dto = GetMetricsBatchesBreakdownInnerResponseDTO( + batch_id=result.id, + strategy=CollectorType(result.strategy), + status=BatchStatus(result.status), + created_at=result.created_at, + count_url_total=result.count_total, + count_url_pending=result.count_pending, + count_url_submitted=result.count_submitted, + count_url_rejected=result.count_rejected, + count_url_error=result.count_error, + count_url_validated=result.count_validated + ) + batches.append(dto) + return GetMetricsBatchesBreakdownResponseDTO( + batches=batches, + ) \ No newline at end of file diff --git a/src/api/endpoints/metrics/routes.py b/src/api/endpoints/metrics/routes.py index cc043061..59fa5906 100644 --- a/src/api/endpoints/metrics/routes.py +++ b/src/api/endpoints/metrics/routes.py @@ -2,9 +2,9 @@ from fastapi.params import Query, Depends from src.api.dependencies import get_async_core +from src.api.endpoints.metrics.batches.aggregated.dto import GetMetricsBatchesAggregatedResponseDTO +from src.api.endpoints.metrics.batches.breakdown.dto import GetMetricsBatchesBreakdownResponseDTO from src.api.endpoints.metrics.dtos.get.backlog import GetMetricsBacklogResponseDTO -from src.api.endpoints.metrics.dtos.get.batches.aggregated import GetMetricsBatchesAggregatedResponseDTO -from src.api.endpoints.metrics.dtos.get.batches.breakdown import GetMetricsBatchesBreakdownResponseDTO from src.api.endpoints.metrics.dtos.get.urls.aggregated.core import GetMetricsURLsAggregatedResponseDTO from src.api.endpoints.metrics.dtos.get.urls.aggregated.pending import GetMetricsURLsAggregatedPendingResponseDTO from src.api.endpoints.metrics.dtos.get.urls.breakdown.pending import GetMetricsURLsBreakdownPendingResponseDTO diff --git a/src/api/endpoints/review/next/query.py b/src/api/endpoints/review/next/query.py index e1ca7ba5..8f7d5e35 100644 --- a/src/api/endpoints/review/next/query.py +++ b/src/api/endpoints/review/next/query.py @@ -1,6 +1,6 @@ from typing import Optional, Type -from sqlalchemy import FromClause, select, and_, Select, desc, asc, func +from sqlalchemy import FromClause, select, and_, Select, desc, asc, func, join from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload @@ -14,6 +14,7 @@ from src.db.exceptions import FailedQueryException from src.db.models.instantiations.batch import Batch from src.db.models.instantiations.confirmed_url_agency import ConfirmedURLAgency +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL from src.db.models.instantiations.url.core import URL from src.db.models.instantiations.url.suggestion.agency.auto import AutomatedUrlAgencySuggestion from src.db.models.instantiations.url.suggestion.agency.user import UserUrlAgencySuggestion @@ -59,29 +60,46 @@ def _get_where_exist_clauses( where_clauses.append(where_clause) return where_clauses - def _build_base_query(self, anno_exists_query: FromClause, ): + def _build_base_query( + self, + anno_exists_query: FromClause, + ) -> Select: builder = self.anno_exists_builder where_exist_clauses = self._get_where_exist_clauses( builder.query ) - return ( + query = ( select( URL, - self._sum_exists_query(anno_exists_query, ALL_ANNOTATION_MODELS) + self._sum_exists_query(anno_exists_query, USER_ANNOTATION_MODELS) ) .select_from(anno_exists_query) .join( URL, URL.id == builder.url_id ) - .where( + ) + if self.batch_id is not None: + query = ( + query.join( + LinkBatchURL + ) + .where( + LinkBatchURL.batch_id == self.batch_id + ) + ) + + query = ( + query.where( and_( URL.outcome == URLStatus.PENDING.value, *where_exist_clauses ) ) ) + return query + def _sum_exists_query(self, query, models: list[Type[URLDependentMixin]]): return sum( @@ -160,21 +178,23 @@ async def get_count_ready_query(self): builder = self.anno_exists_builder count_ready_query = ( select( - URL.batch_id, + LinkBatchURL.batch_id, func.count(URL.id).label(self.count_label) ) + .select_from(LinkBatchURL) + .join(URL) .join( builder.query, builder.url_id == URL.id ) .where( - URL.batch_id == self.batch_id, + LinkBatchURL.batch_id == self.batch_id, URL.outcome == URLStatus.PENDING.value, *self._get_where_exist_clauses( builder.query ) ) - .group_by(URL.batch_id) + .group_by(LinkBatchURL.batch_id) .subquery("count_ready") ) return count_ready_query @@ -185,10 +205,9 @@ async def get_count_reviewed_query(self): Batch.id.label("batch_id"), func.count(URL.id).label(self.count_label) ) - .outerjoin( - URL, - URL.batch_id == Batch.id - ) + .select_from(Batch) + .join(LinkBatchURL) + .outerjoin(URL, URL.id == LinkBatchURL.url_id) .where( URL.outcome.in_( [ @@ -198,7 +217,7 @@ async def get_count_reviewed_query(self): URLStatus.INDIVIDUAL_RECORD.value ] ), - URL.batch_id == self.batch_id + LinkBatchURL.batch_id == self.batch_id ) .group_by(Batch.id) .subquery("count_reviewed") @@ -272,7 +291,6 @@ async def run( async def build_url_query(self): anno_exists_query = self.anno_exists_builder.query url_query = self._build_base_query(anno_exists_query) - url_query = await self._apply_batch_id_filter(url_query, self.batch_id) url_query = await self._apply_options(url_query) url_query = await self._apply_order_clause(url_query) diff --git a/src/api/endpoints/task/by_id/__init__.py b/src/api/endpoints/task/by_id/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/api/endpoints/task/dtos/get/task.py b/src/api/endpoints/task/by_id/dto.py similarity index 100% rename from src/api/endpoints/task/dtos/get/task.py rename to src/api/endpoints/task/by_id/dto.py diff --git a/src/api/endpoints/task/by_id/query.py b/src/api/endpoints/task/by_id/query.py new file mode 100644 index 00000000..a57b9daf --- /dev/null +++ b/src/api/endpoints/task/by_id/query.py @@ -0,0 +1,67 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from src.api.endpoints.task.by_id.dto import TaskInfo +from src.collectors.enums import URLStatus +from src.core.enums import BatchStatus +from src.db.dtos.url.core import URLInfo +from src.db.dtos.url.error import URLErrorPydanticInfo +from src.db.enums import TaskType +from src.db.models.instantiations.task.core import Task +from src.db.models.instantiations.url.core import URL +from src.db.queries.base.builder import QueryBuilderBase + + +class GetTaskInfoQueryBuilder(QueryBuilderBase): + + def __init__(self, task_id: int): + super().__init__() + self.task_id = task_id + + async def run(self, session: AsyncSession) -> TaskInfo: + # Get Task + result = await session.execute( + select(Task) + .where(Task.id == self.task_id) + .options( + selectinload(Task.urls) + .selectinload(URL.batch), + selectinload(Task.error), + selectinload(Task.errored_urls) + ) + ) + task = result.scalars().first() + error = task.error[0].error if len(task.error) > 0 else None + # Get error info if any + # Get URLs + urls = task.urls + url_infos = [] + for url in urls: + url_info = URLInfo( + id=url.id, + batch_id=url.batch.id, + url=url.url, + collector_metadata=url.collector_metadata, + outcome=URLStatus(url.outcome), + updated_at=url.updated_at + ) + url_infos.append(url_info) + + errored_urls = [] + for url in task.errored_urls: + url_error_info = URLErrorPydanticInfo( + task_id=url.task_id, + url_id=url.url_id, + error=url.error, + updated_at=url.updated_at + ) + errored_urls.append(url_error_info) + return TaskInfo( + task_type=TaskType(task.task_type), + task_status=BatchStatus(task.task_status), + error_info=error, + updated_at=task.updated_at, + urls=url_infos, + url_errors=errored_urls + ) \ No newline at end of file diff --git a/src/api/endpoints/task/routes.py b/src/api/endpoints/task/routes.py index e24d6e76..a719d6b9 100644 --- a/src/api/endpoints/task/routes.py +++ b/src/api/endpoints/task/routes.py @@ -3,9 +3,9 @@ from fastapi import APIRouter, Depends, Query, Path from src.api.dependencies import get_async_core +from src.api.endpoints.task.by_id.dto import TaskInfo from src.api.endpoints.task.dtos.get.tasks import GetTasksResponse from src.api.endpoints.task.dtos.get.task_status import GetTaskStatusResponseInfo -from src.api.endpoints.task.dtos.get.task import TaskInfo from src.db.enums import TaskType from src.core.core import AsyncCore from src.core.enums import BatchStatus diff --git a/src/api/endpoints/url/get/__init__.py b/src/api/endpoints/url/get/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/api/endpoints/url/dtos/response.py b/src/api/endpoints/url/get/dto.py similarity index 100% rename from src/api/endpoints/url/dtos/response.py rename to src/api/endpoints/url/get/dto.py diff --git a/src/api/endpoints/url/get/query.py b/src/api/endpoints/url/get/query.py new file mode 100644 index 00000000..1ba5a75f --- /dev/null +++ b/src/api/endpoints/url/get/query.py @@ -0,0 +1,65 @@ +from sqlalchemy import select, exists +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from src.api.endpoints.url.get.dto import GetURLsResponseInfo, GetURLsResponseErrorInfo, GetURLsResponseInnerInfo +from src.collectors.enums import URLStatus +from src.db.client.helpers import add_standard_limit_and_offset +from src.db.models.instantiations.url.core import URL +from src.db.models.instantiations.url.error_info import URLErrorInfo +from src.db.queries.base.builder import QueryBuilderBase + + +class GetURLsQueryBuilder(QueryBuilderBase): + + def __init__( + self, + page: int, + errors: bool + ): + super().__init__() + self.page = page + self.errors = errors + + async def run(self, session: AsyncSession) -> GetURLsResponseInfo: + statement = select(URL).options( + selectinload(URL.error_info), + selectinload(URL.batch) + ).order_by(URL.id) + if self.errors: + # Only return URLs with errors + statement = statement.where( + exists( + select(URLErrorInfo).where(URLErrorInfo.url_id == URL.id) + ) + ) + add_standard_limit_and_offset(statement, self.page) + execute_result = await session.execute(statement) + all_results = execute_result.scalars().all() + final_results = [] + for result in all_results: + error_results = [] + for error in result.error_info: + error_result = GetURLsResponseErrorInfo( + id=error.id, + error=error.error, + updated_at=error.updated_at + ) + error_results.append(error_result) + final_results.append( + GetURLsResponseInnerInfo( + id=result.id, + batch_id=result.batch.id if result.batch is not None else None, + url=result.url, + status=URLStatus(result.outcome), + collector_metadata=result.collector_metadata, + updated_at=result.updated_at, + created_at=result.created_at, + errors=error_results, + ) + ) + + return GetURLsResponseInfo( + urls=final_results, + count=len(final_results) + ) \ No newline at end of file diff --git a/src/api/endpoints/url/routes.py b/src/api/endpoints/url/routes.py index d746dc30..225dd5d6 100644 --- a/src/api/endpoints/url/routes.py +++ b/src/api/endpoints/url/routes.py @@ -1,7 +1,7 @@ from fastapi import APIRouter, Query, Depends from src.api.dependencies import get_async_core -from src.api.endpoints.url.dtos.response import GetURLsResponseInfo +from src.api.endpoints.url.get.dto import GetURLsResponseInfo from src.core.core import AsyncCore from src.security.manager import get_access_info from src.security.dtos.access_info import AccessInfo diff --git a/src/core/core.py b/src/core/core.py index e2709e01..78554b39 100644 --- a/src/core/core.py +++ b/src/core/core.py @@ -11,18 +11,18 @@ from src.api.endpoints.annotate.all.post.dto import AllAnnotationPostInfo from src.api.endpoints.annotate.dtos.record_type.response import GetNextRecordTypeAnnotationResponseOuterInfo from src.api.endpoints.annotate.relevance.get.dto import GetNextRelevanceAnnotationResponseOuterInfo -from src.api.endpoints.batch.dtos.get.duplicates import GetDuplicatesByBatchResponse from src.api.endpoints.batch.dtos.get.logs import GetBatchLogsResponse from src.api.endpoints.batch.dtos.get.summaries.response import GetBatchSummariesResponse from src.api.endpoints.batch.dtos.get.summaries.summary import BatchSummary -from src.api.endpoints.batch.dtos.get.urls import GetURLsByBatchResponse from src.api.endpoints.batch.dtos.post.abort import MessageResponse +from src.api.endpoints.batch.duplicates.dto import GetDuplicatesByBatchResponse +from src.api.endpoints.batch.urls.dto import GetURLsByBatchResponse from src.api.endpoints.collector.dtos.collector_start import CollectorStartInfo from src.api.endpoints.collector.dtos.manual_batch.post import ManualBatchInputDTO from src.api.endpoints.collector.dtos.manual_batch.response import ManualBatchResponseDTO +from src.api.endpoints.metrics.batches.aggregated.dto import GetMetricsBatchesAggregatedResponseDTO +from src.api.endpoints.metrics.batches.breakdown.dto import GetMetricsBatchesBreakdownResponseDTO from src.api.endpoints.metrics.dtos.get.backlog import GetMetricsBacklogResponseDTO -from src.api.endpoints.metrics.dtos.get.batches.aggregated import GetMetricsBatchesAggregatedResponseDTO -from src.api.endpoints.metrics.dtos.get.batches.breakdown import GetMetricsBatchesBreakdownResponseDTO from src.api.endpoints.metrics.dtos.get.urls.aggregated.core import GetMetricsURLsAggregatedResponseDTO from src.api.endpoints.metrics.dtos.get.urls.aggregated.pending import GetMetricsURLsAggregatedPendingResponseDTO from src.api.endpoints.metrics.dtos.get.urls.breakdown.pending import GetMetricsURLsBreakdownPendingResponseDTO @@ -31,9 +31,9 @@ from src.api.endpoints.review.enums import RejectionReason from src.api.endpoints.review.next.dto import GetNextURLForFinalReviewOuterResponse from src.api.endpoints.search.dtos.response import SearchURLResponse -from src.api.endpoints.task.dtos.get.task import TaskInfo +from src.api.endpoints.task.by_id.dto import TaskInfo from src.api.endpoints.task.dtos.get.tasks import GetTasksResponse -from src.api.endpoints.url.dtos.response import GetURLsResponseInfo +from src.api.endpoints.url.get.dto import GetURLsResponseInfo from src.db.client.async_ import AsyncDatabaseClient from src.db.dtos.batch import BatchInfo from src.api.endpoints.task.dtos.get.task_status import GetTaskStatusResponseInfo diff --git a/src/core/tasks/url/manager.py b/src/core/tasks/url/manager.py index 6d03aca0..1d843b95 100644 --- a/src/core/tasks/url/manager.py +++ b/src/core/tasks/url/manager.py @@ -1,16 +1,11 @@ import logging -from src.api.endpoints.task.dtos.get.tasks import GetTasksResponse from src.core.tasks.handler import TaskHandler from src.core.tasks.url.loader import URLTaskOperatorLoader -from src.db.client.async_ import AsyncDatabaseClient -from src.api.endpoints.task.dtos.get.task import TaskInfo from src.db.enums import TaskType from src.core.tasks.dtos.run_info import URLTaskOperatorRunInfo from src.core.tasks.url.enums import TaskOperatorOutcome from src.core.function_trigger import FunctionTrigger -from src.core.enums import BatchStatus -from discord_poster import DiscordPoster TASK_REPEAT_THRESHOLD = 20 diff --git a/src/core/tasks/url/operators/agency_identification/queries/__init__.py b/src/core/tasks/url/operators/agency_identification/queries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/url/operators/agency_identification/queries/get_pending_urls_without_agency_suggestions.py b/src/core/tasks/url/operators/agency_identification/queries/get_pending_urls_without_agency_suggestions.py new file mode 100644 index 00000000..27459145 --- /dev/null +++ b/src/core/tasks/url/operators/agency_identification/queries/get_pending_urls_without_agency_suggestions.py @@ -0,0 +1,36 @@ +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.collectors.enums import URLStatus, CollectorType +from src.core.tasks.url.operators.agency_identification.dtos.tdo import AgencyIdentificationTDO +from src.db.models.instantiations.batch import Batch +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL +from src.db.models.instantiations.url.core import URL +from src.db.queries.base.builder import QueryBuilderBase +from src.db.statement_composer import StatementComposer + + +class GetPendingURLsWithoutAgencySuggestionsQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> list[AgencyIdentificationTDO]: + + statement = ( + select(URL.id, URL.collector_metadata, Batch.strategy) + .select_from(URL) + .where(URL.outcome == URLStatus.PENDING.value) + .join(LinkBatchURL) + .join(Batch) + ) + statement = StatementComposer.exclude_urls_with_agency_suggestions(statement) + statement = statement.limit(100) + raw_results = await session.execute(statement) + return [ + AgencyIdentificationTDO( + url_id=raw_result[0], + collector_metadata=raw_result[1], + collector_type=CollectorType(raw_result[2]) + ) + for raw_result in raw_results + ] \ No newline at end of file diff --git a/src/core/tasks/url/operators/url_html/queries/__init__.py b/src/core/tasks/url/operators/url_html/queries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/url/operators/url_html/queries/get_pending_urls_without_html_data.py b/src/core/tasks/url/operators/url_html/queries/get_pending_urls_without_html_data.py new file mode 100644 index 00000000..6af92abe --- /dev/null +++ b/src/core/tasks/url/operators/url_html/queries/get_pending_urls_without_html_data.py @@ -0,0 +1,32 @@ +from sqlalchemy.ext.asyncio import AsyncSession + +from src.db.dto_converter import DTOConverter +from src.db.dtos.url.core import URLInfo +from src.db.models.instantiations.url.core import URL +from src.db.queries.base.builder import QueryBuilderBase +from src.db.statement_composer import StatementComposer + + +class GetPendingURLsWithoutHTMLDataQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> list[URLInfo]: + statement = StatementComposer.pending_urls_without_html_data() + statement = statement.limit(100).order_by(URL.id) + scalar_result = await session.scalars(statement) + url_results: list[URL] = scalar_result.all() + + final_results = [] + for url in url_results: + url_info = URLInfo( + id=url.id, + batch_id=url.batch.id if url.batch is not None else None, + url=url.url, + collector_metadata=url.collector_metadata, + outcome=url.outcome, + created_at=url.created_at, + updated_at=url.updated_at, + name=url.name + ) + final_results.append(url_info) + + return final_results diff --git a/src/core/tasks/url/operators/url_miscellaneous_metadata/queries/__init__.py b/src/core/tasks/url/operators/url_miscellaneous_metadata/queries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/url/operators/url_miscellaneous_metadata/queries/get_pending_urls_missing_miscellaneous_data.py b/src/core/tasks/url/operators/url_miscellaneous_metadata/queries/get_pending_urls_missing_miscellaneous_data.py new file mode 100644 index 00000000..c4c9892f --- /dev/null +++ b/src/core/tasks/url/operators/url_miscellaneous_metadata/queries/get_pending_urls_missing_miscellaneous_data.py @@ -0,0 +1,43 @@ +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import selectinload + +from src.collectors.enums import CollectorType +from src.core.tasks.url.operators.url_miscellaneous_metadata.tdo import URLMiscellaneousMetadataTDO, URLHTMLMetadataInfo +from src.db.dtos.url.html_content import HTMLContentType +from src.db.models.instantiations.url.core import URL +from src.db.queries.base.builder import QueryBuilderBase +from src.db.statement_composer import StatementComposer + + +class GetPendingURLsMissingMiscellaneousDataQueryBuilder(QueryBuilderBase): + + + async def run(self, session: AsyncSession) -> list[URLMiscellaneousMetadataTDO]: + query = StatementComposer.pending_urls_missing_miscellaneous_metadata_query() + query = ( + query.options( + selectinload(URL.batch), + selectinload(URL.html_content) + ).limit(100).order_by(URL.id) + ) + + scalar_result = await session.scalars(query) + all_results = scalar_result.all() + final_results = [] + for result in all_results: + tdo = URLMiscellaneousMetadataTDO( + url_id=result.id, + collector_metadata=result.collector_metadata or {}, + collector_type=CollectorType(result.batch.strategy), + ) + html_info = URLHTMLMetadataInfo() + for html_content in result.html_content: + if html_content.content_type == HTMLContentType.TITLE.value: + html_info.title = html_content.content + elif html_content.content_type == HTMLContentType.DESCRIPTION.value: + html_info.description = html_content.content + tdo.html_metadata_info = html_info + final_results.append(tdo) + return final_results diff --git a/src/core/tasks/url/operators/url_miscellaneous_metadata/queries/has_pending_urls_missing_miscellaneous_data.py b/src/core/tasks/url/operators/url_miscellaneous_metadata/queries/has_pending_urls_missing_miscellaneous_data.py new file mode 100644 index 00000000..137c228a --- /dev/null +++ b/src/core/tasks/url/operators/url_miscellaneous_metadata/queries/has_pending_urls_missing_miscellaneous_data.py @@ -0,0 +1,14 @@ +from sqlalchemy.ext.asyncio import AsyncSession + +from src.db.queries.base.builder import QueryBuilderBase +from src.db.statement_composer import StatementComposer + + +class HasPendingURsMissingMiscellaneousDataQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> bool: + query = StatementComposer.pending_urls_missing_miscellaneous_metadata_query() + query = query.limit(1) + + scalar_result = await session.scalars(query) + return bool(scalar_result.first()) \ No newline at end of file diff --git a/src/db/client/async_.py b/src/db/client/async_.py index 00af074d..45505be5 100644 --- a/src/db/client/async_.py +++ b/src/db/client/async_.py @@ -3,15 +3,12 @@ from operator import or_ from typing import Optional, Type, Any, List, Sequence -from fastapi import HTTPException from sqlalchemy import select, exists, func, case, Select, and_, update, delete, literal, text, Row from sqlalchemy.dialects import postgresql from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.exc import IntegrityError, NoResultFound from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker -from sqlalchemy.orm import selectinload, joinedload, QueryableAttribute, aliased -from sqlalchemy.sql.functions import coalesce -from starlette import status +from sqlalchemy.orm import selectinload, QueryableAttribute from src.api.endpoints.annotate._shared.queries.get_annotation_batch_info import GetAnnotationBatchInfoQueryBuilder from src.api.endpoints.annotate._shared.queries.get_next_url_for_user_annotation import \ @@ -22,18 +19,20 @@ from src.api.endpoints.annotate.all.get.query import GetNextURLForAllAnnotationQueryBuilder from src.api.endpoints.annotate.all.post.dto import AllAnnotationPostInfo from src.api.endpoints.annotate.dtos.record_type.response import GetNextRecordTypeAnnotationResponseInfo -from src.api.endpoints.annotate.relevance.get.dto import GetNextRelevanceAnnotationResponseInfo, \ - RelevanceAnnotationResponseInfo +from src.api.endpoints.annotate.relevance.get.dto import GetNextRelevanceAnnotationResponseInfo from src.api.endpoints.annotate.relevance.get.query import GetNextUrlForRelevanceAnnotationQueryBuilder from src.api.endpoints.batch.dtos.get.summaries.response import GetBatchSummariesResponse from src.api.endpoints.batch.dtos.get.summaries.summary import BatchSummary +from src.api.endpoints.batch.duplicates.query import GetDuplicatesByBatchIDQueryBuilder +from src.api.endpoints.batch.urls.query import GetURLsByBatchQueryBuilder from src.api.endpoints.collector.dtos.manual_batch.post import ManualBatchInputDTO from src.api.endpoints.collector.dtos.manual_batch.response import ManualBatchResponseDTO +from src.api.endpoints.collector.manual.query import UploadManualBatchQueryBuilder +from src.api.endpoints.metrics.batches.aggregated.dto import GetMetricsBatchesAggregatedResponseDTO +from src.api.endpoints.metrics.batches.aggregated.query import GetBatchesAggregatedMetricsQueryBuilder +from src.api.endpoints.metrics.batches.breakdown.dto import GetMetricsBatchesBreakdownResponseDTO +from src.api.endpoints.metrics.batches.breakdown.query import GetBatchesBreakdownMetricsQueryBuilder from src.api.endpoints.metrics.dtos.get.backlog import GetMetricsBacklogResponseDTO, GetMetricsBacklogResponseInnerDTO -from src.api.endpoints.metrics.dtos.get.batches.aggregated import GetMetricsBatchesAggregatedResponseDTO, \ - GetMetricsBatchesAggregatedInnerResponseDTO -from src.api.endpoints.metrics.dtos.get.batches.breakdown import GetMetricsBatchesBreakdownInnerResponseDTO, \ - GetMetricsBatchesBreakdownResponseDTO from src.api.endpoints.metrics.dtos.get.urls.aggregated.core import GetMetricsURLsAggregatedResponseDTO from src.api.endpoints.metrics.dtos.get.urls.breakdown.pending import GetMetricsURLsBreakdownPendingResponseDTO, \ GetMetricsURLsBreakdownPendingResponseInnerDTO @@ -45,22 +44,34 @@ from src.api.endpoints.review.next.dto import GetNextURLForFinalReviewOuterResponse from src.api.endpoints.review.reject.query import RejectURLQueryBuilder from src.api.endpoints.search.dtos.response import SearchURLResponse -from src.api.endpoints.task.dtos.get.task import TaskInfo +from src.api.endpoints.task.by_id.dto import TaskInfo + +from src.api.endpoints.task.by_id.query import GetTaskInfoQueryBuilder from src.api.endpoints.task.dtos.get.tasks import GetTasksResponse, GetTasksResponseTaskInfo -from src.api.endpoints.url.dtos.response import GetURLsResponseInfo, GetURLsResponseErrorInfo, GetURLsResponseInnerInfo +from src.api.endpoints.url.get.dto import GetURLsResponseInfo + +from src.api.endpoints.url.get.query import GetURLsQueryBuilder from src.collectors.enums import URLStatus, CollectorType from src.core.enums import BatchStatus, SuggestionType, RecordType, SuggestedStatus from src.core.env_var_manager import EnvVarManager from src.core.tasks.scheduled.operators.agency_sync.dtos.parameters import AgencySyncParameters from src.core.tasks.url.operators.agency_identification.dtos.suggestion import URLAgencySuggestionInfo from src.core.tasks.url.operators.agency_identification.dtos.tdo import AgencyIdentificationTDO -from src.core.tasks.url.operators.auto_relevant.models.annotation import RelevanceAnnotationInfo +from src.core.tasks.url.operators.agency_identification.queries.get_pending_urls_without_agency_suggestions import \ + GetPendingURLsWithoutAgencySuggestionsQueryBuilder from src.core.tasks.url.operators.auto_relevant.models.tdo import URLRelevantTDO from src.core.tasks.url.operators.auto_relevant.queries.get_tdos import GetAutoRelevantTDOsQueryBuilder from src.core.tasks.url.operators.submit_approved_url.tdo import SubmitApprovedURLTDO, SubmittedURLInfo from src.core.tasks.url.operators.url_404_probe.tdo import URL404ProbeTDO from src.core.tasks.url.operators.url_duplicate.tdo import URLDuplicateTDO -from src.core.tasks.url.operators.url_miscellaneous_metadata.tdo import URLMiscellaneousMetadataTDO, URLHTMLMetadataInfo +from src.core.tasks.url.operators.url_html.queries.get_pending_urls_without_html_data import \ + GetPendingURLsWithoutHTMLDataQueryBuilder +from src.core.tasks.url.operators.url_miscellaneous_metadata.queries.get_pending_urls_missing_miscellaneous_data import \ + GetPendingURLsMissingMiscellaneousDataQueryBuilder +from src.core.tasks.url.operators.url_miscellaneous_metadata.queries.has_pending_urls_missing_miscellaneous_data import \ + HasPendingURsMissingMiscellaneousDataQueryBuilder +from src.core.tasks.url.operators.url_miscellaneous_metadata.tdo import URLMiscellaneousMetadataTDO +from src.db.client.helpers import add_standard_limit_and_offset from src.db.client.types import UserSuggestionModel from src.db.config_manager import ConfigManager from src.db.constants import PLACEHOLDER_AGENCY_NAME @@ -71,7 +82,7 @@ from src.db.dtos.url.annotations.auto.relevancy import AutoRelevancyAnnotationInput from src.db.dtos.url.core import URLInfo from src.db.dtos.url.error import URLErrorPydanticInfo -from src.db.dtos.url.html_content import URLHTMLContentInfo, HTMLContentType +from src.db.dtos.url.html_content import URLHTMLContentInfo from src.db.dtos.url.insert import InsertURLsInfo from src.db.dtos.url.mapping import URLMapping from src.db.dtos.url.raw_html import RawHTMLInfo @@ -81,7 +92,8 @@ from src.db.models.instantiations.batch import Batch from src.db.models.instantiations.confirmed_url_agency import ConfirmedURLAgency from src.db.models.instantiations.duplicate import Duplicate -from src.db.models.instantiations.link_task_url import LinkTaskURL +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL +from src.db.models.instantiations.link.link_task_url import LinkTaskURL from src.db.models.instantiations.log import Log from src.db.models.instantiations.root_url_cache import RootURL from src.db.models.instantiations.sync_state_agencies import AgenciesSyncState @@ -95,7 +107,6 @@ from src.db.models.instantiations.url.html_content import URLHTMLContent from src.db.models.instantiations.url.optional_data_source_metadata import URLOptionalDataSourceMetadata from src.db.models.instantiations.url.probed_for_404 import URLProbedFor404 -from src.db.models.instantiations.url.reviewing_user import ReviewingUserURL from src.db.models.instantiations.url.suggestion.agency.auto import AutomatedUrlAgencySuggestion from src.db.models.instantiations.url.suggestion.agency.user import UserUrlAgencySuggestion from src.db.models.instantiations.url.suggestion.record_type.auto import AutoRecordTypeSuggestion @@ -115,11 +126,6 @@ from src.external.pdap.dtos.agencies_sync import AgenciesSyncResponseInnerInfo -def add_standard_limit_and_offset(statement, page, limit=100): - offset = (page - 1) * limit - return statement.limit(limit).offset(offset) - - class AsyncDatabaseClient: def __init__(self, db_url: Optional[str] = None): if db_url is None: @@ -229,7 +235,7 @@ async def run_query_builder( session: AsyncSession, builder: QueryBuilderBase ) -> Any: - return await builder.run(session) + return await builder.run(session=session) # region relevant async def add_auto_relevant_suggestion( @@ -460,45 +466,13 @@ async def has_pending_urls_without_html_data(self, session: AsyncSession) -> boo scalar_result = await session.scalars(statement) return bool(scalar_result.first()) - @session_manager - async def has_pending_urls_missing_miscellaneous_metadata(self, session: AsyncSession) -> bool: - query = StatementComposer.pending_urls_missing_miscellaneous_metadata_query() - query = query.limit(1) - - scalar_result = await session.scalars(query) - return bool(scalar_result.first()) + async def has_pending_urls_missing_miscellaneous_metadata(self) -> bool: + return await self.run_query_builder(HasPendingURsMissingMiscellaneousDataQueryBuilder()) - @session_manager async def get_pending_urls_missing_miscellaneous_metadata( self, - session: AsyncSession ) -> list[URLMiscellaneousMetadataTDO]: - query = StatementComposer.pending_urls_missing_miscellaneous_metadata_query() - query = ( - query.options( - selectinload(URL.batch), - selectinload(URL.html_content) - ).limit(100).order_by(URL.id) - ) - - scalar_result = await session.scalars(query) - all_results = scalar_result.all() - final_results = [] - for result in all_results: - tdo = URLMiscellaneousMetadataTDO( - url_id=result.id, - collector_metadata=result.collector_metadata or {}, - collector_type=CollectorType(result.batch.strategy), - ) - html_info = URLHTMLMetadataInfo() - for html_content in result.html_content: - if html_content.content_type == HTMLContentType.TITLE.value: - html_info.title = html_content.content - elif html_content.content_type == HTMLContentType.DESCRIPTION.value: - html_info.description = html_content.content - tdo.html_metadata_info = html_info - final_results.append(tdo) - return final_results + return await self.run_query_builder(GetPendingURLsMissingMiscellaneousDataQueryBuilder()) @session_manager async def add_miscellaneous_metadata(self, session: AsyncSession, tdos: list[URLMiscellaneousMetadataTDO]): @@ -528,14 +502,8 @@ async def add_miscellaneous_metadata(self, session: AsyncSession, tdos: list[URL ) session.add(metadata_object) - @session_manager - async def get_pending_urls_without_html_data(self, session: AsyncSession) -> list[URLInfo]: - # TODO: Add test that includes some urls WITH html data. Check they're not returned - statement = self.statement_composer.pending_urls_without_html_data() - statement = statement.limit(100).order_by(URL.id) - scalar_result = await session.scalars(statement) - results: list[URL] = scalar_result.all() - return DTOConverter.url_list_to_url_info_list(results) + async def get_pending_urls_without_html_data(self) -> list[URLInfo]: + return await self.run_query_builder(GetPendingURLsWithoutHTMLDataQueryBuilder()) async def get_urls_with_html_data_and_without_models( self, @@ -629,48 +597,15 @@ async def add_to_root_url_cache(self, url: str, page_title: str) -> None: cache = RootURL(url=url, page_title=page_title) await self.add(cache) - @session_manager - async def get_urls(self, session: AsyncSession, page: int, errors: bool) -> GetURLsResponseInfo: - statement = select(URL).options( - selectinload(URL.error_info) - ).order_by(URL.id) - if errors: - # Only return URLs with errors - statement = statement.where( - exists( - select(URLErrorInfo).where(URLErrorInfo.url_id == URL.id) - ) - ) - add_standard_limit_and_offset(statement, page) - execute_result = await session.execute(statement) - all_results = execute_result.scalars().all() - final_results = [] - for result in all_results: - error_results = [] - for error in result.error_info: - error_result = GetURLsResponseErrorInfo( - id=error.id, - error=error.error, - updated_at=error.updated_at - ) - error_results.append(error_result) - final_results.append( - GetURLsResponseInnerInfo( - id=result.id, - batch_id=result.batch_id, - url=result.url, - status=URLStatus(result.outcome), - collector_metadata=result.collector_metadata, - updated_at=result.updated_at, - created_at=result.created_at, - errors=error_results, - ) - ) + async def get_urls( + self, + page: int, + errors: bool + ) -> GetURLsResponseInfo: + return await self.run_query_builder(GetURLsQueryBuilder( + page=page, errors=errors + )) - return GetURLsResponseInfo( - urls=final_results, - count=len(final_results) - ) @session_manager async def initiate_task( @@ -701,52 +636,11 @@ async def add_task_error(self, task_id: int, error: str): ) await self.add(task_error) - @session_manager - async def get_task_info(self, session: AsyncSession, task_id: int) -> TaskInfo: - # Get Task - result = await session.execute( - select(Task) - .where(Task.id == task_id) - .options( - selectinload(Task.urls), - selectinload(Task.error), - selectinload(Task.errored_urls) - ) - ) - task = result.scalars().first() - error = task.error[0].error if len(task.error) > 0 else None - # Get error info if any - # Get URLs - urls = task.urls - url_infos = [] - for url in urls: - url_info = URLInfo( - id=url.id, - batch_id=url.batch_id, - url=url.url, - collector_metadata=url.collector_metadata, - outcome=URLStatus(url.outcome), - updated_at=url.updated_at - ) - url_infos.append(url_info) - - errored_urls = [] - for url in task.errored_urls: - url_error_info = URLErrorPydanticInfo( - task_id=url.task_id, - url_id=url.url_id, - error=url.error, - updated_at=url.updated_at - ) - errored_urls.append(url_error_info) - return TaskInfo( - task_type=TaskType(task.task_type), - task_status=BatchStatus(task.task_status), - error_info=error, - updated_at=task.updated_at, - urls=url_infos, - url_errors=errored_urls - ) + async def get_task_info( + self, + task_id: int + ) -> TaskInfo: + return await self.run_query_builder(GetTaskInfoQueryBuilder(task_id)) async def get_html_content_info(self, url_id: int) -> list[URLHTMLContentInfo]: return await self.run_query_builder(GetHTMLContentInfoQueryBuilder(url_id)) @@ -833,35 +727,12 @@ async def has_urls_without_agency_suggestions( result = raw_result.all() return len(result) != 0 - @session_manager async def get_urls_without_agency_suggestions( - self, session: AsyncSession + self ) -> list[AgencyIdentificationTDO]: - """ - Retrieve URLs without confirmed or suggested agencies - Args: - session: - - Returns: - - """ + """Retrieve URLs without confirmed or suggested agencies.""" + return await self.run_query_builder(GetPendingURLsWithoutAgencySuggestionsQueryBuilder()) - statement = ( - select(URL.id, URL.collector_metadata, Batch.strategy) - .where(URL.outcome == URLStatus.PENDING.value) - .join(Batch) - ) - statement = self.statement_composer.exclude_urls_with_agency_suggestions(statement) - statement = statement.limit(100) - raw_results = await session.execute(statement) - return [ - AgencyIdentificationTDO( - url_id=raw_result[0], - collector_metadata=raw_result[1], - collector_type=CollectorType(raw_result[2]) - ) - for raw_result in raw_results - ] async def get_next_url_agency_for_annotation( self, @@ -1005,19 +876,17 @@ async def get_batch_by_id(self, session, batch_id: int) -> Optional[BatchSummary batch_summary = summaries[0] return batch_summary - @session_manager - async def get_urls_by_batch(self, session, batch_id: int, page: int = 1) -> List[URLInfo]: + async def get_urls_by_batch(self, batch_id: int, page: int = 1) -> list[URLInfo]: """Retrieve all URLs associated with a batch.""" - query = Select(URL).where(URL.batch_id == batch_id).order_by(URL.id).limit(100).offset((page - 1) * 100) - result = await session.execute(query) - urls = result.scalars().all() - return ([URLInfo(**url.__dict__) for url in urls]) + return await self.run_query_builder(GetURLsByBatchQueryBuilder( + batch_id=batch_id, + page=page + )) @session_manager async def insert_url(self, session: AsyncSession, url_info: URLInfo) -> int: """Insert a new URL into the database.""" url_entry = URL( - batch_id=url_info.batch_id, url=url_info.url, collector_metadata=url_info.collector_metadata, outcome=url_info.outcome.value @@ -1026,6 +895,10 @@ async def insert_url(self, session: AsyncSession, url_info: URLInfo) -> int: url_entry.created_at = url_info.created_at session.add(url_entry) await session.flush() + link = LinkBatchURL( + batch_id=url_info.batch_id, + url_id=url_entry.id + ) return url_entry.id @session_manager @@ -1208,43 +1081,11 @@ async def mark_urls_as_submitted(self, session: AsyncSession, infos: list[Submit await session.execute(query) - @session_manager - async def get_duplicates_by_batch_id(self, session, batch_id: int, page: int) -> List[DuplicateInfo]: - original_batch = aliased(Batch) - duplicate_batch = aliased(Batch) - - query = ( - Select( - URL.url.label("source_url"), - URL.id.label("original_url_id"), - duplicate_batch.id.label("duplicate_batch_id"), - duplicate_batch.parameters.label("duplicate_batch_parameters"), - original_batch.id.label("original_batch_id"), - original_batch.parameters.label("original_batch_parameters"), - ) - .select_from(Duplicate) - .join(URL, Duplicate.original_url_id == URL.id) - .join(duplicate_batch, Duplicate.batch_id == duplicate_batch.id) - .join(original_batch, URL.batch_id == original_batch.id) - .filter(duplicate_batch.id == batch_id) - .limit(100) - .offset((page - 1) * 100) - ) - raw_results = await session.execute(query) - results = raw_results.all() - final_results = [] - for result in results: - final_results.append( - DuplicateInfo( - source_url=result.source_url, - duplicate_batch_id=result.duplicate_batch_id, - duplicate_metadata=result.duplicate_batch_parameters, - original_batch_id=result.original_batch_id, - original_metadata=result.original_batch_parameters, - original_url_id=result.original_url_id - ) - ) - return final_results + async def get_duplicates_by_batch_id(self, batch_id: int, page: int) -> list[DuplicateInfo]: + return await self.run_query_builder(GetDuplicatesByBatchIDQueryBuilder( + batch_id=batch_id, + page=page + )) @session_manager async def get_batch_summaries( @@ -1324,61 +1165,16 @@ async def add_all_annotations_to_url( ) session.add(agency_suggestion) - @session_manager async def upload_manual_batch( self, - session: AsyncSession, user_id: int, dto: ManualBatchInputDTO ) -> ManualBatchResponseDTO: - batch = Batch( - strategy=CollectorType.MANUAL.value, - status=BatchStatus.READY_TO_LABEL.value, - parameters={ - "name": dto.name - }, - user_id=user_id - ) - session.add(batch) - await session.flush() - - batch_id = batch.id - url_ids = [] - duplicate_urls = [] - - for entry in dto.entries: - url = URL( - url=entry.url, - name=entry.name, - description=entry.description, - batch_id=batch_id, - collector_metadata=entry.collector_metadata, - outcome=URLStatus.PENDING.value, - record_type=entry.record_type.value if entry.record_type is not None else None, - ) - - async with session.begin_nested(): - try: - session.add(url) - await session.flush() - except IntegrityError: - duplicate_urls.append(entry.url) - continue - await session.flush() - optional_metadata = URLOptionalDataSourceMetadata( - url_id=url.id, - record_formats=entry.record_formats, - data_portal_type=entry.data_portal_type, - supplying_entity=entry.supplying_entity, - ) - session.add(optional_metadata) - url_ids.append(url.id) + return await self.run_query_builder(UploadManualBatchQueryBuilder( + user_id=user_id, + dto=dto + )) - return ManualBatchResponseDTO( - batch_id=batch_id, - urls=url_ids, - duplicate_urls=duplicate_urls - ) @session_manager async def search_for_url(self, session: AsyncSession, url: str) -> SearchURLResponse: @@ -1395,179 +1191,20 @@ async def search_for_url(self, session: AsyncSession, url: str) -> SearchURLResp url_id=url.id ) - @session_manager - async def get_batches_aggregated_metrics(self, session: AsyncSession) -> GetMetricsBatchesAggregatedResponseDTO: - sc = StatementComposer - - # First, get all batches broken down by collector type and status - def batch_column(status: BatchStatus, label): - return sc.count_distinct( - case( - ( - Batch.status == status.value, - Batch.id - ) - ), - label=label - ) - - batch_count_subquery = select( - batch_column(BatchStatus.READY_TO_LABEL, label="done_count"), - batch_column(BatchStatus.ERROR, label="error_count"), - Batch.strategy, - ).group_by(Batch.strategy).subquery("batch_count") - - def url_column(status: URLStatus, label): - return sc.count_distinct( - case( - ( - URL.outcome == status.value, - URL.id - ) - ), - label=label - ) - - # Next, count urls - url_count_subquery = select( - Batch.strategy, - url_column(URLStatus.PENDING, label="pending_count"), - url_column(URLStatus.ERROR, label="error_count"), - url_column(URLStatus.VALIDATED, label="validated_count"), - url_column(URLStatus.SUBMITTED, label="submitted_count"), - url_column(URLStatus.NOT_RELEVANT, label="rejected_count"), - - ).outerjoin( - Batch, Batch.id == URL.batch_id - ).group_by( - Batch.strategy - ).subquery("url_count") - - # Combine - query = select( - Batch.strategy, - batch_count_subquery.c.done_count.label("batch_done_count"), - batch_count_subquery.c.error_count.label("batch_error_count"), - coalesce(url_count_subquery.c.pending_count, 0).label("pending_count"), - coalesce(url_count_subquery.c.error_count, 0).label("error_count"), - coalesce(url_count_subquery.c.submitted_count, 0).label("submitted_count"), - coalesce(url_count_subquery.c.rejected_count, 0).label("rejected_count"), - coalesce(url_count_subquery.c.validated_count, 0).label("validated_count") - ).join( - batch_count_subquery, - Batch.strategy == batch_count_subquery.c.strategy - ).outerjoin( - url_count_subquery, - Batch.strategy == url_count_subquery.c.strategy - ) - raw_results = await session.execute(query) - results = raw_results.all() - d: dict[CollectorType, GetMetricsBatchesAggregatedInnerResponseDTO] = {} - for result in results: - d[CollectorType(result.strategy)] = GetMetricsBatchesAggregatedInnerResponseDTO( - count_successful_batches=result.batch_done_count, - count_failed_batches=result.batch_error_count, - count_urls=result.pending_count + result.submitted_count + - result.rejected_count + result.error_count + - result.validated_count, - count_urls_pending=result.pending_count, - count_urls_validated=result.validated_count, - count_urls_submitted=result.submitted_count, - count_urls_rejected=result.rejected_count, - count_urls_errors=result.error_count - ) - - total_batch_query = await session.execute( - select( - sc.count_distinct(Batch.id, label="count") - ) + async def get_batches_aggregated_metrics(self) -> GetMetricsBatchesAggregatedResponseDTO: + return await self.run_query_builder( + GetBatchesAggregatedMetricsQueryBuilder() ) - total_batch_count = total_batch_query.scalars().one_or_none() - if total_batch_count is None: - total_batch_count = 0 - return GetMetricsBatchesAggregatedResponseDTO( - total_batches=total_batch_count, - by_strategy=d - ) - @session_manager async def get_batches_breakdown_metrics( self, - session: AsyncSession, page: int ) -> GetMetricsBatchesBreakdownResponseDTO: - sc = StatementComposer - - main_query = select( - Batch.strategy, - Batch.id, - Batch.status, - Batch.date_generated.label("created_at"), - ) - - def url_column(status: URLStatus, label): - return sc.count_distinct( - case( - ( - URL.outcome == status.value, - URL.id - ) - ), - label=label - ) - - count_query = select( - URL.batch_id, - sc.count_distinct(URL.id, label="count_total"), - url_column(URLStatus.PENDING, label="count_pending"), - url_column(URLStatus.SUBMITTED, label="count_submitted"), - url_column(URLStatus.NOT_RELEVANT, label="count_rejected"), - url_column(URLStatus.ERROR, label="count_error"), - url_column(URLStatus.VALIDATED, label="count_validated"), - ).group_by( - URL.batch_id - ).subquery("url_count") - - query = (select( - main_query.c.strategy, - main_query.c.id, - main_query.c.created_at, - main_query.c.status, - coalesce(count_query.c.count_total, 0).label("count_total"), - coalesce(count_query.c.count_pending, 0).label("count_pending"), - coalesce(count_query.c.count_submitted, 0).label("count_submitted"), - coalesce(count_query.c.count_rejected, 0).label("count_rejected"), - coalesce(count_query.c.count_error, 0).label("count_error"), - coalesce(count_query.c.count_validated, 0).label("count_validated"), - ).outerjoin( - count_query, - main_query.c.id == count_query.c.batch_id - ).offset( - (page - 1) * 100 - ).order_by( - main_query.c.created_at.asc() - )) - - raw_results = await session.execute(query) - results = raw_results.all() - batches: list[GetMetricsBatchesBreakdownInnerResponseDTO] = [] - for result in results: - dto = GetMetricsBatchesBreakdownInnerResponseDTO( - batch_id=result.id, - strategy=CollectorType(result.strategy), - status=BatchStatus(result.status), - created_at=result.created_at, - count_url_total=result.count_total, - count_url_pending=result.count_pending, - count_url_submitted=result.count_submitted, - count_url_rejected=result.count_rejected, - count_url_error=result.count_error, - count_url_validated=result.count_validated + return await self.run_query_builder( + GetBatchesBreakdownMetricsQueryBuilder( + page=page ) - batches.append(dto) - return GetMetricsBatchesBreakdownResponseDTO( - batches=batches, ) @session_manager diff --git a/src/db/client/helpers.py b/src/db/client/helpers.py new file mode 100644 index 00000000..74099bb4 --- /dev/null +++ b/src/db/client/helpers.py @@ -0,0 +1,3 @@ +def add_standard_limit_and_offset(statement, page, limit=100): + offset = (page - 1) * limit + return statement.limit(limit).offset(offset) diff --git a/src/db/client/sync.py b/src/db/client/sync.py index e62edf08..8ec13085 100644 --- a/src/db/client/sync.py +++ b/src/db/client/sync.py @@ -1,7 +1,7 @@ from functools import wraps from typing import Optional, List -from sqlalchemy import create_engine, update +from sqlalchemy import create_engine, update, Select from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import sessionmaker, scoped_session, Session @@ -13,6 +13,7 @@ from src.db.dtos.log import LogInfo from src.db.dtos.url.core import URLInfo from src.db.dtos.url.mapping import URLMapping +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL from src.db.models.templates import Base from src.db.models.instantiations.duplicate import Duplicate from src.db.models.instantiations.log import Log @@ -58,7 +59,7 @@ def wrapper(self, *args, **kwargs): return wrapper @session_manager - def insert_batch(self, session, batch_info: BatchInfo) -> int: + def insert_batch(self, session: Session, batch_info: BatchInfo) -> int: """Insert a new batch into the database and return its ID.""" batch = Batch( strategy=batch_info.strategy, @@ -80,14 +81,22 @@ def insert_batch(self, session, batch_info: BatchInfo) -> int: return batch.id @session_manager - def get_batch_by_id(self, session, batch_id: int) -> Optional[BatchInfo]: + def get_batch_by_id( + self, + session: Session, + batch_id: int + ) -> BatchInfo | None: """Retrieve a batch by ID.""" batch = session.query(Batch).filter_by(id=batch_id).first() return BatchInfo(**batch.__dict__) @session_manager - def insert_duplicates(self, session, duplicate_infos: list[DuplicateInsertInfo]): + def insert_duplicates( + self, + session: Session, + duplicate_infos: list[DuplicateInsertInfo] + ): for duplicate_info in duplicate_infos: duplicate = Duplicate( batch_id=duplicate_info.duplicate_batch_id, @@ -97,7 +106,10 @@ def insert_duplicates(self, session, duplicate_infos: list[DuplicateInsertInfo]) @session_manager - def get_url_info_by_url(self, session, url: str) -> Optional[URLInfo]: + def get_url_info_by_url( + self, + session: Session, url: str + ) -> URLInfo | None: url = session.query(URL).filter_by(url=url).first() return URLInfo(**url.__dict__) @@ -105,7 +117,6 @@ def get_url_info_by_url(self, session, url: str) -> Optional[URLInfo]: def insert_url(self, session, url_info: URLInfo) -> int: """Insert a new URL into the database.""" url_entry = URL( - batch_id=url_info.batch_id, url=url_info.url, collector_metadata=url_info.collector_metadata, outcome=url_info.outcome.value, @@ -116,6 +127,11 @@ def insert_url(self, session, url_info: URLInfo) -> int: session.add(url_entry) session.commit() session.refresh(url_entry) + link = LinkBatchURL( + batch_id=url_info.batch_id, + url_id=url_entry.id + ) + session.add(link) return url_entry.id def insert_urls(self, url_infos: List[URLInfo], batch_id: int) -> InsertURLsInfo: @@ -144,14 +160,31 @@ def insert_urls(self, url_infos: List[URLInfo], batch_id: int) -> InsertURLsInfo ) @session_manager - def get_urls_by_batch(self, session, batch_id: int, page: int = 1) -> List[URLInfo]: + def get_urls_by_batch( + self, + session: Session, + batch_id: int, + page: int = 1 + ) -> list[URLInfo]: """Retrieve all URLs associated with a batch.""" - urls = (session.query(URL).filter_by(batch_id=batch_id) - .order_by(URL.id).limit(100).offset((page - 1) * 100).all()) + query = ( + Select(URL) + .join(LinkBatchURL) + .where(LinkBatchURL.batch_id == batch_id) + .order_by(URL.id) + .limit(100) + .offset((page - 1) * 100) + ) + urls = session.scalars(query).all() + return ([URLInfo(**url.__dict__) for url in urls]) @session_manager - def insert_logs(self, session, log_infos: List[LogInfo]): + def insert_logs( + self, + session: Session, + log_infos: List[LogInfo] + ): for log_info in log_infos: log = Log(log=log_info.log, batch_id=log_info.batch_id) if log_info.created_at is not None: @@ -159,12 +192,20 @@ def insert_logs(self, session, log_infos: List[LogInfo]): session.add(log) @session_manager - def get_batch_status(self, session, batch_id: int) -> BatchStatus: + def get_batch_status( + self, + session: Session, + batch_id: int + ) -> BatchStatus: batch = session.query(Batch).filter_by(id=batch_id).first() return BatchStatus(batch.status) @session_manager - def update_url(self, session, url_info: URLInfo): + def update_url( + self, + session: Session, + url_info: URLInfo + ): url = session.query(URL).filter_by(id=url_info.id).first() url.collector_metadata = url_info.collector_metadata diff --git a/src/db/dto_converter.py b/src/db/dto_converter.py index c9b7dc0c..5397c803 100644 --- a/src/db/dto_converter.py +++ b/src/db/dto_converter.py @@ -176,24 +176,6 @@ def final_review_annotation_agency_info( def url_list_to_url_with_html_list(url_list: list[URL]) -> list[URLWithHTML]: return [DTOConverter.url_to_url_with_html(url) for url in url_list] - @staticmethod - def url_list_to_url_info_list(urls: list[URL]) -> list[URLInfo]: - results = [] - for url in urls: - url_info = URLInfo( - id=url.id, - batch_id=url.batch_id, - url=url.url, - collector_metadata=url.collector_metadata, - outcome=url.outcome, - created_at=url.created_at, - updated_at=url.updated_at, - name=url.name - ) - results.append(url_info) - - return results - @staticmethod def url_to_url_with_html(url: URL) -> URLWithHTML: url_val = url.url diff --git a/src/db/models/instantiations/batch.py b/src/db/models/instantiations/batch.py index 55bc1b01..89645f4a 100644 --- a/src/db/models/instantiations/batch.py +++ b/src/db/models/instantiations/batch.py @@ -46,7 +46,11 @@ class Batch(StandardModel): parameters = Column(JSON) # Relationships - urls = relationship("URL", back_populates="batch") + urls = relationship( + "URL", + secondary="link_batch_urls", + back_populates="batch" + ) # missings = relationship("Missing", back_populates="batch") # Not in active use logs = relationship("Log", back_populates="batch") duplicates = relationship("Duplicate", back_populates="batch") diff --git a/src/db/models/instantiations/link/__init__.py b/src/db/models/instantiations/link/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/db/models/instantiations/link/link_batch_urls.py b/src/db/models/instantiations/link/link_batch_urls.py new file mode 100644 index 00000000..f357ae6a --- /dev/null +++ b/src/db/models/instantiations/link/link_batch_urls.py @@ -0,0 +1,17 @@ +from sqlalchemy.orm import relationship + +from src.db.models.mixins import CreatedAtMixin, UpdatedAtMixin, BatchDependentMixin, URLDependentMixin +from src.db.models.templates import StandardModel + + +class LinkBatchURL( + UpdatedAtMixin, + CreatedAtMixin, + URLDependentMixin, + BatchDependentMixin, + StandardModel +): + __tablename__ = "link_batch_urls" + + url = relationship('URL') + batch = relationship('Batch') \ No newline at end of file diff --git a/src/db/models/instantiations/link_task_url.py b/src/db/models/instantiations/link/link_task_url.py similarity index 100% rename from src/db/models/instantiations/link_task_url.py rename to src/db/models/instantiations/link/link_task_url.py diff --git a/src/db/models/instantiations/url/core.py b/src/db/models/instantiations/url/core.py index 72fdf628..8e9860fc 100644 --- a/src/db/models/instantiations/url/core.py +++ b/src/db/models/instantiations/url/core.py @@ -11,7 +11,6 @@ class URL(UpdatedAtMixin, CreatedAtMixin, StandardModel): __tablename__ = 'urls' # The batch this URL is associated with - batch_id = Column(Integer, ForeignKey('batches.id', name='fk_url_batch_id'), nullable=False) url = Column(Text, unique=True) name = Column(String) description = Column(Text) @@ -35,7 +34,12 @@ class URL(UpdatedAtMixin, CreatedAtMixin, StandardModel): record_type = Column(postgresql.ENUM(*record_type_values, name='record_type'), nullable=True) # Relationships - batch = relationship("Batch", back_populates="urls") + batch = relationship( + "Batch", + secondary="link_batch_urls", + back_populates="urls", + uselist=False + ) duplicates = relationship("Duplicate", back_populates="original_url") html_content = relationship("URLHTMLContent", back_populates="url", cascade="all, delete-orphan") error_info = relationship("URLErrorInfo", back_populates="url", cascade="all, delete-orphan") diff --git a/src/db/queries/implementations/core/get/recent_batch_summaries/url_counts/builder.py b/src/db/queries/implementations/core/get/recent_batch_summaries/url_counts/builder.py index 564c68f6..571db2a0 100644 --- a/src/db/queries/implementations/core/get/recent_batch_summaries/url_counts/builder.py +++ b/src/db/queries/implementations/core/get/recent_batch_summaries/url_counts/builder.py @@ -5,6 +5,7 @@ from src.collectors.enums import URLStatus, CollectorType from src.core.enums import BatchStatus +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL from src.db.models.instantiations.url.core import URL from src.db.models.instantiations.batch import Batch from src.db.queries.base.builder import QueryBuilderBase @@ -41,7 +42,10 @@ def get_core_query(self): self.count_case_url_status(URLStatus.NOT_RELEVANT, labels.not_relevant), self.count_case_url_status(URLStatus.ERROR, labels.error), self.count_case_url_status(URLStatus.DUPLICATE, labels.duplicate), - ).outerjoin( + ) + .select_from(Batch) + .outerjoin(LinkBatchURL) + .outerjoin( URL ) ) @@ -68,9 +72,9 @@ def apply_pending_urls_filter(self, query: Select): return query pending_url_subquery = ( exists( - Select(URL).where( + Select(URL).join(LinkBatchURL).where( and_( - URL.batch_id == Batch.id, + LinkBatchURL.batch_id == Batch.id, URL.outcome == URLStatus.PENDING.value ) ) diff --git a/src/db/statement_composer.py b/src/db/statement_composer.py index b712a581..9d5faa97 100644 --- a/src/db/statement_composer.py +++ b/src/db/statement_composer.py @@ -1,14 +1,15 @@ from typing import Any from sqlalchemy import Select, select, exists, func, Subquery, and_, not_, ColumnElement -from sqlalchemy.orm import aliased +from sqlalchemy.orm import aliased, selectinload from src.collectors.enums import URLStatus from src.core.enums import BatchStatus from src.db.constants import STANDARD_ROW_LIMIT from src.db.enums import TaskType from src.db.models.instantiations.confirmed_url_agency import ConfirmedURLAgency -from src.db.models.instantiations.link_task_url import LinkTaskURL +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL +from src.db.models.instantiations.link.link_task_url import LinkTaskURL from src.db.models.instantiations.task.core import Task from src.db.models.instantiations.url.html_content import URLHTMLContent from src.db.models.instantiations.url.optional_data_source_metadata import URLOptionalDataSourceMetadata @@ -39,6 +40,9 @@ def pending_urls_without_html_data() -> Select: where(URLHTMLContent.id == None). where(~exists(exclude_subquery)). where(URL.outcome == URLStatus.PENDING.value) + .options( + selectinload(URL.batch) + ) ) return query @@ -92,6 +96,8 @@ def pending_urls_missing_miscellaneous_metadata_query() -> Select: ) ).outerjoin( URLOptionalDataSourceMetadata + ).join( + LinkBatchURL ).join( Batch ) diff --git a/src/util/alembic_helpers.py b/src/util/alembic_helpers.py index c6b60002..3eb18773 100644 --- a/src/util/alembic_helpers.py +++ b/src/util/alembic_helpers.py @@ -54,7 +54,7 @@ def alter_enum_value( """ op.execute(f"ALTER TYPE {enum_name} RENAME VALUE '{old_value}' TO '{new_value}'") -def id_column(): +def id_column() -> sa.Column: """Returns a standard `id` column.""" return sa.Column( 'id', @@ -64,7 +64,7 @@ def id_column(): nullable=False ) -def created_at_column(): +def created_at_column() -> sa.Column: """Returns a standard `created_at` column.""" return sa.Column( 'created_at', @@ -73,7 +73,7 @@ def created_at_column(): nullable=False ) -def updated_at_column(): +def updated_at_column() -> sa.Column: """Returns a standard `updated_at` column.""" return sa.Column( 'updated_at', @@ -83,8 +83,8 @@ def updated_at_column(): nullable=False ) -def url_id_column(): - sa.Column( +def url_id_column() -> sa.Column: + return sa.Column( 'url_id', sa.Integer(), sa.ForeignKey( @@ -92,4 +92,15 @@ def url_id_column(): ondelete='CASCADE' ), nullable=False - ), \ No newline at end of file + ) + +def batch_id_column(nullable=False) -> sa.Column: + return sa.Column( + 'batch_id', + sa.Integer(), + sa.ForeignKey( + 'batches.id', + ondelete='CASCADE' + ), + nullable=nullable + ) \ No newline at end of file diff --git a/tests/automated/integration/api/_helpers/RequestValidator.py b/tests/automated/integration/api/_helpers/RequestValidator.py index e3b43f39..33c3120d 100644 --- a/tests/automated/integration/api/_helpers/RequestValidator.py +++ b/tests/automated/integration/api/_helpers/RequestValidator.py @@ -13,17 +13,17 @@ from src.api.endpoints.annotate.dtos.record_type.response import GetNextRecordTypeAnnotationResponseOuterInfo from src.api.endpoints.annotate.relevance.get.dto import GetNextRelevanceAnnotationResponseOuterInfo from src.api.endpoints.annotate.relevance.post.dto import RelevanceAnnotationPostInfo -from src.api.endpoints.batch.dtos.get.duplicates import GetDuplicatesByBatchResponse from src.api.endpoints.batch.dtos.get.logs import GetBatchLogsResponse from src.api.endpoints.batch.dtos.get.summaries.response import GetBatchSummariesResponse from src.api.endpoints.batch.dtos.get.summaries.summary import BatchSummary -from src.api.endpoints.batch.dtos.get.urls import GetURLsByBatchResponse from src.api.endpoints.batch.dtos.post.abort import MessageResponse +from src.api.endpoints.batch.duplicates.dto import GetDuplicatesByBatchResponse +from src.api.endpoints.batch.urls.dto import GetURLsByBatchResponse from src.api.endpoints.collector.dtos.manual_batch.post import ManualBatchInputDTO from src.api.endpoints.collector.dtos.manual_batch.response import ManualBatchResponseDTO +from src.api.endpoints.metrics.batches.aggregated.dto import GetMetricsBatchesAggregatedResponseDTO +from src.api.endpoints.metrics.batches.breakdown.dto import GetMetricsBatchesBreakdownResponseDTO from src.api.endpoints.metrics.dtos.get.backlog import GetMetricsBacklogResponseDTO -from src.api.endpoints.metrics.dtos.get.batches.aggregated import GetMetricsBatchesAggregatedResponseDTO -from src.api.endpoints.metrics.dtos.get.batches.breakdown import GetMetricsBatchesBreakdownResponseDTO from src.api.endpoints.metrics.dtos.get.urls.aggregated.core import GetMetricsURLsAggregatedResponseDTO from src.api.endpoints.metrics.dtos.get.urls.aggregated.pending import GetMetricsURLsAggregatedPendingResponseDTO from src.api.endpoints.metrics.dtos.get.urls.breakdown.pending import GetMetricsURLsBreakdownPendingResponseDTO @@ -32,10 +32,10 @@ from src.api.endpoints.review.next.dto import GetNextURLForFinalReviewOuterResponse from src.api.endpoints.review.reject.dto import FinalReviewRejectionInfo from src.api.endpoints.search.dtos.response import SearchURLResponse +from src.api.endpoints.task.by_id.dto import TaskInfo from src.api.endpoints.task.dtos.get.tasks import GetTasksResponse -from src.api.endpoints.url.dtos.response import GetURLsResponseInfo from src.api.endpoints.task.dtos.get.task_status import GetTaskStatusResponseInfo -from src.api.endpoints.task.dtos.get.task import TaskInfo +from src.api.endpoints.url.get.dto import GetURLsResponseInfo from src.db.enums import TaskType from src.collectors.source_collectors.example.dtos.input import ExampleInputDTO from src.collectors.enums import CollectorType diff --git a/tests/automated/integration/api/test_duplicates.py b/tests/automated/integration/api/test_duplicates.py deleted file mode 100644 index 0534292c..00000000 --- a/tests/automated/integration/api/test_duplicates.py +++ /dev/null @@ -1,63 +0,0 @@ -import pytest - -from src.api.endpoints.batch.dtos.get.summaries.summary import BatchSummary -from src.db.dtos.batch import BatchInfo -from src.collectors.source_collectors.example.dtos.input import ExampleInputDTO -from tests.automated.integration.api.conftest import disable_task_trigger - - -@pytest.mark.asyncio -async def test_duplicates(api_test_helper): - ath = api_test_helper - - # Temporarily disable task trigger - disable_task_trigger(ath) - - dto = ExampleInputDTO( - sleep_time=0 - ) - - batch_id_1 = ath.request_validator.example_collector( - dto=dto - )["batch_id"] - - assert batch_id_1 is not None - - batch_id_2 = ath.request_validator.example_collector( - dto=dto - )["batch_id"] - - assert batch_id_2 is not None - - await ath.wait_for_all_batches_to_complete() - - - bi_1: BatchSummary = ath.request_validator.get_batch_info(batch_id_1) - bi_2: BatchSummary = ath.request_validator.get_batch_info(batch_id_2) - - bi_1.url_counts.total = 2 - bi_2.url_counts.total = 0 - bi_1.url_counts.duplicate = 0 - bi_2.url_counts.duplicate = 2 - - url_info_1 = ath.request_validator.get_batch_urls(batch_id_1) - url_info_2 = ath.request_validator.get_batch_urls(batch_id_2) - - assert len(url_info_1.urls) == 2 - assert len(url_info_2.urls) == 0 - - dup_info_1 = ath.request_validator.get_batch_url_duplicates(batch_id_1) - dup_info_2 = ath.request_validator.get_batch_url_duplicates(batch_id_2) - - assert len(dup_info_1.duplicates) == 0 - assert len(dup_info_2.duplicates) == 2 - - assert dup_info_2.duplicates[0].original_url_id == url_info_1.urls[0].id - assert dup_info_2.duplicates[1].original_url_id == url_info_1.urls[1].id - - assert dup_info_2.duplicates[0].original_batch_id == batch_id_1 - assert dup_info_2.duplicates[1].original_batch_id == batch_id_1 - - - - diff --git a/tests/automated/integration/api/test_example_collector.py b/tests/automated/integration/api/test_example_collector.py index d1d537e2..1e20362d 100644 --- a/tests/automated/integration/api/test_example_collector.py +++ b/tests/automated/integration/api/test_example_collector.py @@ -77,7 +77,6 @@ async def test_example_collector(api_test_helper, monkeypatch): bi: BatchSummary = ath.request_validator.get_batch_info(batch_id=batch_id) assert bi.status == BatchStatus.READY_TO_LABEL - assert bi.url_counts.total == 2 assert bi.parameters == dto.model_dump() assert bi.strategy == CollectorType.EXAMPLE.value assert bi.user_id is not None diff --git a/tests/automated/integration/api/test_manual_batch.py b/tests/automated/integration/api/test_manual_batch.py index 1c186d7a..a7be37e4 100644 --- a/tests/automated/integration/api/test_manual_batch.py +++ b/tests/automated/integration/api/test_manual_batch.py @@ -2,6 +2,7 @@ import pytest from src.api.endpoints.collector.dtos.manual_batch.post import ManualBatchInnerInputDTO, ManualBatchInputDTO +from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL from src.db.models.instantiations.url.optional_data_source_metadata import URLOptionalDataSourceMetadata from src.db.models.instantiations.url.core import URL from src.db.models.instantiations.batch import Batch @@ -69,6 +70,7 @@ async def test_manual_batch(api_test_helper): # Get URLs from database urls: list[URL] = await adb_client.get_all(URL) + links: list[LinkBatchURL] = await adb_client.get_all(LinkBatchURL) # Confirm 100 URLs assert len(urls) == 100 @@ -87,8 +89,10 @@ def check_attributes( return False return True + def check_link(link: LinkBatchURL): + assert link.batch_id == batch.id + def check_url(url: URL, url_only: bool): - assert url.batch_id == batch.id assert url.url is not None other_attributes = ["name", "description", "collector_metadata", "record_type"] return check_attributes(url, other_attributes, url_only) @@ -99,6 +103,8 @@ def check_url(url: URL, url_only: bool): for url in urls: if check_url(url, True): count_only_name += 1 + for link in links: + check_link(link) assert count_only_name == 50 # Confirm 50 have all optional fields count_all = 0 diff --git a/tests/automated/integration/api/test_url.py b/tests/automated/integration/api/test_url.py index 95c5f99f..e59c8299 100644 --- a/tests/automated/integration/api/test_url.py +++ b/tests/automated/integration/api/test_url.py @@ -1,6 +1,6 @@ import pytest -from src.api.endpoints.url.dtos.response import GetURLsResponseInfo +from src.api.endpoints.url.get.dto import GetURLsResponseInfo from src.db.dtos.url.insert import InsertURLsInfo diff --git a/tests/automated/integration/security_manager/test_security_manager.py b/tests/automated/integration/security_manager/test_security_manager.py index 9c759d21..14120493 100644 --- a/tests/automated/integration/security_manager/test_security_manager.py +++ b/tests/automated/integration/security_manager/test_security_manager.py @@ -1,12 +1,10 @@ import jwt -import pytest from starlette.testclient import TestClient from src.api.main import app from src.security.constants import ALGORITHM from src.security.enums import Permissions - SECRET_KEY = "test_secret_key" VALID_TOKEN = "valid_token" INVALID_TOKEN = "invalid_token" diff --git a/tests/automated/integration/tasks/asserts.py b/tests/automated/integration/tasks/asserts.py index 6e4e0d7e..224e56a1 100644 --- a/tests/automated/integration/tasks/asserts.py +++ b/tests/automated/integration/tasks/asserts.py @@ -1,3 +1,4 @@ +from src.core.tasks.base.run_info import TaskOperatorRunInfo from src.core.tasks.url.enums import TaskOperatorOutcome @@ -10,6 +11,6 @@ async def assert_prereqs_met(operator): assert meets_prereqs -def assert_task_has_expected_run_info(run_info, url_ids: list[int]): - assert run_info.outcome == TaskOperatorOutcome.SUCCESS +def assert_task_has_expected_run_info(run_info: TaskOperatorRunInfo, url_ids: list[int]): + assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message assert run_info.linked_url_ids == url_ids From ffe914f7df61e1fc183b4799cfb4afb541085049 Mon Sep 17 00:00:00 2001 From: Max Chis Date: Thu, 17 Jul 2025 15:48:34 -0400 Subject: [PATCH 2/3] Extract logic, break up functions into separate domain directories and files --- .../api/review/rejection/helpers.py | 2 +- .../test_approve_and_get_next_source.py | 2 +- .../api/review/test_next_source.py | 2 +- .../integration/api/test_annotate.py | 5 +- .../annotate_url/test_agency_not_in_db.py | 2 +- .../annotate_url/test_marked_not_relevant.py | 2 +- .../db/client/approve_url/test_basic.py | 2 +- .../db/client/approve_url/test_error.py | 2 +- .../test_basic.py | 2 +- .../test_batch_id_filtering.py | 2 +- .../test_favor_more_components.py | 2 +- .../test_pending.py | 2 +- .../test_validated.py | 2 +- ...next_url_for_annotation_batch_filtering.py | 2 +- ...get_next_url_for_user_agency_annotation.py | 2 +- ...ext_url_for_user_record_type_annotation.py | 2 +- .../integration/db/test_database_structure.py | 2 +- .../html_tag_collector/test_root_url_cache.py | 2 +- .../scheduled/agency_sync/test_happy_path.py | 2 +- .../agency_sync/test_no_new_results.py | 2 +- tests/conftest.py | 20 ++----- .../{assert_functions.py => asserts.py} | 0 tests/helpers/setup/__init__.py | 0 .../helpers/setup/annotate_agency/__init__.py | 0 tests/helpers/setup/annotate_agency/core.py | 22 +++++++ tests/helpers/setup/annotate_agency/model.py | 6 ++ tests/helpers/setup/annotation/__init__.py | 0 tests/helpers/setup/annotation/core.py | 22 +++++++ tests/helpers/setup/annotation/model.py | 8 +++ tests/helpers/setup/final_review/__init__.py | 0 .../final_review/core.py} | 58 +------------------ tests/helpers/setup/final_review/model.py | 11 ++++ tests/helpers/setup/wipe.py | 12 ++++ .../test_muckrock_api_interface.py | 2 +- 34 files changed, 112 insertions(+), 92 deletions(-) rename tests/helpers/{assert_functions.py => asserts.py} (100%) create mode 100644 tests/helpers/setup/__init__.py create mode 100644 tests/helpers/setup/annotate_agency/__init__.py create mode 100644 tests/helpers/setup/annotate_agency/core.py create mode 100644 tests/helpers/setup/annotate_agency/model.py create mode 100644 tests/helpers/setup/annotation/__init__.py create mode 100644 tests/helpers/setup/annotation/core.py create mode 100644 tests/helpers/setup/annotation/model.py create mode 100644 tests/helpers/setup/final_review/__init__.py rename tests/helpers/{complex_test_data_functions.py => setup/final_review/core.py} (57%) create mode 100644 tests/helpers/setup/final_review/model.py create mode 100644 tests/helpers/setup/wipe.py diff --git a/tests/automated/integration/api/review/rejection/helpers.py b/tests/automated/integration/api/review/rejection/helpers.py index 2a0a248b..8fb26603 100644 --- a/tests/automated/integration/api/review/rejection/helpers.py +++ b/tests/automated/integration/api/review/rejection/helpers.py @@ -3,7 +3,7 @@ from src.api.endpoints.review.reject.dto import FinalReviewRejectionInfo from src.collectors.enums import URLStatus from src.db.models.instantiations.url.core import URL -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_final_review +from tests.helpers.setup.final_review.core import setup_for_get_next_url_for_final_review async def run_rejection_test( diff --git a/tests/automated/integration/api/review/test_approve_and_get_next_source.py b/tests/automated/integration/api/review/test_approve_and_get_next_source.py index 89279e38..9afc16d8 100644 --- a/tests/automated/integration/api/review/test_approve_and_get_next_source.py +++ b/tests/automated/integration/api/review/test_approve_and_get_next_source.py @@ -9,7 +9,7 @@ from src.db.models.instantiations.confirmed_url_agency import ConfirmedURLAgency from src.db.models.instantiations.url.core import URL from src.db.models.instantiations.url.optional_data_source_metadata import URLOptionalDataSourceMetadata -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_final_review +from tests.helpers.setup.final_review.core import setup_for_get_next_url_for_final_review @pytest.mark.asyncio diff --git a/tests/automated/integration/api/review/test_next_source.py b/tests/automated/integration/api/review/test_next_source.py index 19910e38..790914ee 100644 --- a/tests/automated/integration/api/review/test_next_source.py +++ b/tests/automated/integration/api/review/test_next_source.py @@ -1,7 +1,7 @@ import pytest from src.core.enums import SuggestedStatus, RecordType -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_final_review +from tests.helpers.setup.final_review.core import setup_for_get_next_url_for_final_review @pytest.mark.asyncio diff --git a/tests/automated/integration/api/test_annotate.py b/tests/automated/integration/api/test_annotate.py index a81ed27a..b0039212 100644 --- a/tests/automated/integration/api/test_annotate.py +++ b/tests/automated/integration/api/test_annotate.py @@ -18,8 +18,9 @@ from src.core.exceptions import FailedValidationException from src.db.models.instantiations.url.suggestion.record_type.user import UserRecordTypeSuggestion from src.db.models.instantiations.url.suggestion.relevant.user import UserRelevantSuggestion -from tests.helpers.complex_test_data_functions import AnnotateAgencySetupInfo, setup_for_annotate_agency, \ - setup_for_get_next_url_for_final_review +from tests.helpers.setup.annotate_agency.model import AnnotateAgencySetupInfo +from tests.helpers.setup.final_review.core import setup_for_get_next_url_for_final_review +from tests.helpers.setup.annotate_agency.core import setup_for_annotate_agency from tests.helpers.db_data_creator import BatchURLCreationInfo from tests.automated.integration.api.conftest import MOCK_USER_ID diff --git a/tests/automated/integration/db/client/annotate_url/test_agency_not_in_db.py b/tests/automated/integration/db/client/annotate_url/test_agency_not_in_db.py index 4949b092..33a93998 100644 --- a/tests/automated/integration/db/client/annotate_url/test_agency_not_in_db.py +++ b/tests/automated/integration/db/client/annotate_url/test_agency_not_in_db.py @@ -2,7 +2,7 @@ from src.db.constants import PLACEHOLDER_AGENCY_NAME from src.db.models.instantiations.agency import Agency -from tests.helpers.complex_test_data_functions import setup_for_annotate_agency +from tests.helpers.setup.annotate_agency.core import setup_for_annotate_agency from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/annotate_url/test_marked_not_relevant.py b/tests/automated/integration/db/client/annotate_url/test_marked_not_relevant.py index de2794ec..ccf76dc8 100644 --- a/tests/automated/integration/db/client/annotate_url/test_marked_not_relevant.py +++ b/tests/automated/integration/db/client/annotate_url/test_marked_not_relevant.py @@ -2,7 +2,7 @@ from src.core.enums import SuggestedStatus from src.db.dtos.url.mapping import URLMapping -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_annotation +from tests.helpers.setup.annotation.core import setup_for_get_next_url_for_annotation from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/approve_url/test_basic.py b/tests/automated/integration/db/client/approve_url/test_basic.py index 426719d7..590f9cd1 100644 --- a/tests/automated/integration/db/client/approve_url/test_basic.py +++ b/tests/automated/integration/db/client/approve_url/test_basic.py @@ -7,7 +7,7 @@ from src.db.models.instantiations.url.core import URL from src.db.models.instantiations.url.optional_data_source_metadata import URLOptionalDataSourceMetadata from src.db.models.instantiations.url.reviewing_user import ReviewingUserURL -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_final_review +from tests.helpers.setup.final_review.core import setup_for_get_next_url_for_final_review from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/approve_url/test_error.py b/tests/automated/integration/db/client/approve_url/test_error.py index 9b93da9e..52871e76 100644 --- a/tests/automated/integration/db/client/approve_url/test_error.py +++ b/tests/automated/integration/db/client/approve_url/test_error.py @@ -3,7 +3,7 @@ from src.api.endpoints.review.approve.dto import FinalReviewApprovalInfo from src.core.enums import RecordType -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_final_review +from tests.helpers.setup.final_review.core import setup_for_get_next_url_for_final_review from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/get_next_url_for_final_review/test_basic.py b/tests/automated/integration/db/client/get_next_url_for_final_review/test_basic.py index a96b6ca4..adb48844 100644 --- a/tests/automated/integration/db/client/get_next_url_for_final_review/test_basic.py +++ b/tests/automated/integration/db/client/get_next_url_for_final_review/test_basic.py @@ -1,7 +1,7 @@ import pytest from src.core.enums import SuggestedStatus, RecordType -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_final_review +from tests.helpers.setup.final_review.core import setup_for_get_next_url_for_final_review from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/get_next_url_for_final_review/test_batch_id_filtering.py b/tests/automated/integration/db/client/get_next_url_for_final_review/test_batch_id_filtering.py index bf7017e3..bce7d8e2 100644 --- a/tests/automated/integration/db/client/get_next_url_for_final_review/test_batch_id_filtering.py +++ b/tests/automated/integration/db/client/get_next_url_for_final_review/test_batch_id_filtering.py @@ -1,6 +1,6 @@ import pytest -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_final_review +from tests.helpers.setup.final_review.core import setup_for_get_next_url_for_final_review from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/get_next_url_for_final_review/test_favor_more_components.py b/tests/automated/integration/db/client/get_next_url_for_final_review/test_favor_more_components.py index 103b1d03..874dba18 100644 --- a/tests/automated/integration/db/client/get_next_url_for_final_review/test_favor_more_components.py +++ b/tests/automated/integration/db/client/get_next_url_for_final_review/test_favor_more_components.py @@ -1,7 +1,7 @@ import pytest from src.core.enums import SuggestionType -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_final_review +from tests.helpers.setup.final_review.core import setup_for_get_next_url_for_final_review from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/get_next_url_for_user_relevance_annotation/test_pending.py b/tests/automated/integration/db/client/get_next_url_for_user_relevance_annotation/test_pending.py index 4aaf58ba..57c6ae35 100644 --- a/tests/automated/integration/db/client/get_next_url_for_user_relevance_annotation/test_pending.py +++ b/tests/automated/integration/db/client/get_next_url_for_user_relevance_annotation/test_pending.py @@ -1,7 +1,7 @@ import pytest from src.core.enums import SuggestedStatus -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_annotation +from tests.helpers.setup.annotation.core import setup_for_get_next_url_for_annotation from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/get_next_url_for_user_relevance_annotation/test_validated.py b/tests/automated/integration/db/client/get_next_url_for_user_relevance_annotation/test_validated.py index ae806775..3736c2b8 100644 --- a/tests/automated/integration/db/client/get_next_url_for_user_relevance_annotation/test_validated.py +++ b/tests/automated/integration/db/client/get_next_url_for_user_relevance_annotation/test_validated.py @@ -1,7 +1,7 @@ import pytest from src.collectors.enums import URLStatus -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_annotation +from tests.helpers.setup.annotation.core import setup_for_get_next_url_for_annotation from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/test_get_next_url_for_annotation_batch_filtering.py b/tests/automated/integration/db/client/test_get_next_url_for_annotation_batch_filtering.py index 1a348aa4..5a402727 100644 --- a/tests/automated/integration/db/client/test_get_next_url_for_annotation_batch_filtering.py +++ b/tests/automated/integration/db/client/test_get_next_url_for_annotation_batch_filtering.py @@ -1,7 +1,7 @@ import pytest from src.core.enums import SuggestionType -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_annotation +from tests.helpers.setup.annotation.core import setup_for_get_next_url_for_annotation from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/test_get_next_url_for_user_agency_annotation.py b/tests/automated/integration/db/client/test_get_next_url_for_user_agency_annotation.py index 6e548b6b..8f03286c 100644 --- a/tests/automated/integration/db/client/test_get_next_url_for_user_agency_annotation.py +++ b/tests/automated/integration/db/client/test_get_next_url_for_user_agency_annotation.py @@ -1,6 +1,6 @@ import pytest -from tests.helpers.complex_test_data_functions import setup_for_annotate_agency +from tests.helpers.setup.annotate_agency.core import setup_for_annotate_agency from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/client/test_get_next_url_for_user_record_type_annotation.py b/tests/automated/integration/db/client/test_get_next_url_for_user_record_type_annotation.py index 96ee16f3..292ab33f 100644 --- a/tests/automated/integration/db/client/test_get_next_url_for_user_record_type_annotation.py +++ b/tests/automated/integration/db/client/test_get_next_url_for_user_record_type_annotation.py @@ -1,7 +1,7 @@ import pytest from src.core.enums import RecordType -from tests.helpers.complex_test_data_functions import setup_for_get_next_url_for_annotation +from tests.helpers.setup.annotation.core import setup_for_get_next_url_for_annotation from tests.helpers.db_data_creator import DBDataCreator diff --git a/tests/automated/integration/db/test_database_structure.py b/tests/automated/integration/db/test_database_structure.py index 80362aaf..7b34cebb 100644 --- a/tests/automated/integration/db/test_database_structure.py +++ b/tests/automated/integration/db/test_database_structure.py @@ -130,7 +130,7 @@ def run_column_tests(self): self.run_column_test(column) -def test_batch(wipe_database): +def test_batch(wiped_database): engine = create_engine(get_postgres_connection_string()) table_tester = TableTester( table_name="batches", diff --git a/tests/automated/integration/html_tag_collector/test_root_url_cache.py b/tests/automated/integration/html_tag_collector/test_root_url_cache.py index 16ab5941..151985cf 100644 --- a/tests/automated/integration/html_tag_collector/test_root_url_cache.py +++ b/tests/automated/integration/html_tag_collector/test_root_url_cache.py @@ -8,7 +8,7 @@ async def mock_get_request(url: str) -> RootURLCacheResponseInfo: return RootURLCacheResponseInfo(text="Test Title") @pytest.mark.asyncio -async def test_root_url_cache_happy_path(wipe_database): +async def test_root_url_cache_happy_path(wiped_database): cache = RootURLCache() cache.get_request = mock_get_request title = await cache.get_title("https://example.com") diff --git a/tests/automated/integration/tasks/scheduled/agency_sync/test_happy_path.py b/tests/automated/integration/tasks/scheduled/agency_sync/test_happy_path.py index 903008b4..863acf5c 100644 --- a/tests/automated/integration/tasks/scheduled/agency_sync/test_happy_path.py +++ b/tests/automated/integration/tasks/scheduled/agency_sync/test_happy_path.py @@ -9,7 +9,7 @@ from tests.automated.integration.tasks.scheduled.agency_sync.data import AGENCIES_SYNC_RESPONSES from tests.automated.integration.tasks.scheduled.agency_sync.existence_checker import AgencyChecker from tests.automated.integration.tasks.scheduled.agency_sync.helpers import check_sync_concluded, patch_sync_agencies -from tests.helpers.assert_functions import assert_task_run_success +from tests.helpers.asserts import assert_task_run_success @pytest.mark.asyncio diff --git a/tests/automated/integration/tasks/scheduled/agency_sync/test_no_new_results.py b/tests/automated/integration/tasks/scheduled/agency_sync/test_no_new_results.py index 9d01555e..fcc353ef 100644 --- a/tests/automated/integration/tasks/scheduled/agency_sync/test_no_new_results.py +++ b/tests/automated/integration/tasks/scheduled/agency_sync/test_no_new_results.py @@ -11,7 +11,7 @@ from tests.automated.integration.tasks.scheduled.agency_sync.data import THIRD_CALL_RESPONSE from tests.automated.integration.tasks.scheduled.agency_sync.existence_checker import AgencyChecker from tests.automated.integration.tasks.scheduled.agency_sync.helpers import patch_sync_agencies, check_sync_concluded -from tests.helpers.assert_functions import assert_task_run_success +from tests.helpers.asserts import assert_task_run_success @pytest.mark.asyncio diff --git a/tests/conftest.py b/tests/conftest.py index a33014d1..97d92c2d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ from src.util.helper_functions import load_from_environment from tests.helpers.alembic_runner import AlembicRunner from tests.helpers.db_data_creator import DBDataCreator +from tests.helpers.setup.wipe import wipe_database @pytest.fixture(autouse=True, scope="session") @@ -87,22 +88,13 @@ def setup_and_teardown(): engine.dispose() @pytest.fixture -def wipe_database(): - """ - Wipe all data from database - Returns: - - """ - conn = get_postgres_connection_string() - engine = create_engine(conn) - with engine.connect() as connection: - for table in reversed(Base.metadata.sorted_tables): - connection.execute(table.delete()) - connection.commit() +def wiped_database(): + """Wipe all data from database.""" + wipe_database(get_postgres_connection_string()) @pytest.fixture -def db_client_test(wipe_database) -> Generator[DatabaseClient, Any, None]: +def db_client_test(wiped_database) -> Generator[DatabaseClient, Any, None]: # Drop pre-existing table conn = get_postgres_connection_string() db_client = DatabaseClient(db_url=conn) @@ -110,7 +102,7 @@ def db_client_test(wipe_database) -> Generator[DatabaseClient, Any, None]: db_client.engine.dispose() @pytest.fixture -def adb_client_test(wipe_database) -> Generator[AsyncDatabaseClient, Any, None]: +def adb_client_test(wiped_database) -> Generator[AsyncDatabaseClient, Any, None]: conn = get_postgres_connection_string(is_async=True) adb_client = AsyncDatabaseClient(db_url=conn) yield adb_client diff --git a/tests/helpers/assert_functions.py b/tests/helpers/asserts.py similarity index 100% rename from tests/helpers/assert_functions.py rename to tests/helpers/asserts.py diff --git a/tests/helpers/setup/__init__.py b/tests/helpers/setup/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/helpers/setup/annotate_agency/__init__.py b/tests/helpers/setup/annotate_agency/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/helpers/setup/annotate_agency/core.py b/tests/helpers/setup/annotate_agency/core.py new file mode 100644 index 00000000..fbd7bc53 --- /dev/null +++ b/tests/helpers/setup/annotate_agency/core.py @@ -0,0 +1,22 @@ +from src.core.enums import SuggestionType +from tests.helpers.db_data_creator import DBDataCreator, BatchURLCreationInfo +from tests.helpers.setup.annotate_agency.model import AnnotateAgencySetupInfo + + +async def setup_for_annotate_agency( + db_data_creator: DBDataCreator, + url_count: int, + suggestion_type: SuggestionType = SuggestionType.UNKNOWN, + with_html_content: bool = True +): + buci: BatchURLCreationInfo = await db_data_creator.batch_and_urls( + url_count=url_count, + with_html_content=with_html_content + ) + await db_data_creator.auto_suggestions( + url_ids=buci.url_ids, + num_suggestions=1, + suggestion_type=suggestion_type + ) + + return AnnotateAgencySetupInfo(batch_id=buci.batch_id, url_ids=buci.url_ids) diff --git a/tests/helpers/setup/annotate_agency/model.py b/tests/helpers/setup/annotate_agency/model.py new file mode 100644 index 00000000..467d208d --- /dev/null +++ b/tests/helpers/setup/annotate_agency/model.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +class AnnotateAgencySetupInfo(BaseModel): + batch_id: int + url_ids: list[int] diff --git a/tests/helpers/setup/annotation/__init__.py b/tests/helpers/setup/annotation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/helpers/setup/annotation/core.py b/tests/helpers/setup/annotation/core.py new file mode 100644 index 00000000..d8d3bb0c --- /dev/null +++ b/tests/helpers/setup/annotation/core.py @@ -0,0 +1,22 @@ +from src.collectors.enums import URLStatus +from tests.helpers.db_data_creator import DBDataCreator +from tests.helpers.setup.annotation.model import AnnotationSetupInfo + + +async def setup_for_get_next_url_for_annotation( + db_data_creator: DBDataCreator, + url_count: int, + outcome: URLStatus = URLStatus.PENDING +) -> AnnotationSetupInfo: + batch_id = db_data_creator.batch() + insert_urls_info = db_data_creator.urls( + batch_id=batch_id, + url_count=url_count, + outcome=outcome + ) + await db_data_creator.html_data( + [ + url.url_id for url in insert_urls_info.url_mappings + ] + ) + return AnnotationSetupInfo(batch_id=batch_id, insert_urls_info=insert_urls_info) diff --git a/tests/helpers/setup/annotation/model.py b/tests/helpers/setup/annotation/model.py new file mode 100644 index 00000000..fc1627ba --- /dev/null +++ b/tests/helpers/setup/annotation/model.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel + +from src.db.dtos.url.insert import InsertURLsInfo + + +class AnnotationSetupInfo(BaseModel): + batch_id: int + insert_urls_info: InsertURLsInfo diff --git a/tests/helpers/setup/final_review/__init__.py b/tests/helpers/setup/final_review/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/helpers/complex_test_data_functions.py b/tests/helpers/setup/final_review/core.py similarity index 57% rename from tests/helpers/complex_test_data_functions.py rename to tests/helpers/setup/final_review/core.py index fd723ccc..87c4da59 100644 --- a/tests/helpers/complex_test_data_functions.py +++ b/tests/helpers/setup/final_review/core.py @@ -1,63 +1,10 @@ from typing import Optional -from pydantic import BaseModel - from src.api.endpoints.annotate.agency.post.dto import URLAgencyAnnotationPostInfo -from src.db.dtos.url.insert import InsertURLsInfo -from src.db.dtos.url.mapping import URLMapping -from src.collectors.enums import URLStatus -from src.core.enums import RecordType, SuggestionType -from tests.helpers.db_data_creator import BatchURLCreationInfo +from src.core.enums import RecordType from tests.helpers.db_data_creator import DBDataCreator +from tests.helpers.setup.final_review.model import FinalReviewSetupInfo -class AnnotationSetupInfo(BaseModel): - batch_id: int - insert_urls_info: InsertURLsInfo - -async def setup_for_get_next_url_for_annotation( - db_data_creator: DBDataCreator, - url_count: int, - outcome: URLStatus = URLStatus.PENDING -) -> AnnotationSetupInfo: - batch_id = db_data_creator.batch() - insert_urls_info = db_data_creator.urls( - batch_id=batch_id, - url_count=url_count, - outcome=outcome - ) - await db_data_creator.html_data( - [ - url.url_id for url in insert_urls_info.url_mappings - ] - ) - return AnnotationSetupInfo(batch_id=batch_id, insert_urls_info=insert_urls_info) - -class AnnotateAgencySetupInfo(BaseModel): - batch_id: int - url_ids: list[int] - -async def setup_for_annotate_agency( - db_data_creator: DBDataCreator, - url_count: int, - suggestion_type: SuggestionType = SuggestionType.UNKNOWN, - with_html_content: bool = True -): - buci: BatchURLCreationInfo = await db_data_creator.batch_and_urls( - url_count=url_count, - with_html_content=with_html_content - ) - await db_data_creator.auto_suggestions( - url_ids=buci.url_ids, - num_suggestions=1, - suggestion_type=suggestion_type - ) - - return AnnotateAgencySetupInfo(batch_id=buci.batch_id, url_ids=buci.url_ids) - -class FinalReviewSetupInfo(BaseModel): - batch_id: int - url_mapping: URLMapping - user_agency_id: Optional[int] async def setup_for_get_next_url_for_final_review( db_data_creator: DBDataCreator, @@ -124,4 +71,3 @@ async def add_relevant_suggestion(relevant: bool): url_mapping=url_mapping, user_agency_id=user_agency_id ) - diff --git a/tests/helpers/setup/final_review/model.py b/tests/helpers/setup/final_review/model.py new file mode 100644 index 00000000..c75fb847 --- /dev/null +++ b/tests/helpers/setup/final_review/model.py @@ -0,0 +1,11 @@ +from typing import Optional + +from pydantic import BaseModel + +from src.db.dtos.url.mapping import URLMapping + + +class FinalReviewSetupInfo(BaseModel): + batch_id: int + url_mapping: URLMapping + user_agency_id: Optional[int] diff --git a/tests/helpers/setup/wipe.py b/tests/helpers/setup/wipe.py new file mode 100644 index 00000000..2145bcf1 --- /dev/null +++ b/tests/helpers/setup/wipe.py @@ -0,0 +1,12 @@ +from sqlalchemy import create_engine + +from src.db.models.templates import Base + + +def wipe_database(connection_string: str) -> None: + """Wipe all data from database.""" + engine = create_engine(connection_string) + with engine.connect() as connection: + for table in reversed(Base.metadata.sorted_tables): + connection.execute(table.delete()) + connection.commit() diff --git a/tests/manual/agency_identifier/test_muckrock_api_interface.py b/tests/manual/agency_identifier/test_muckrock_api_interface.py index d00a6aa2..1b809718 100644 --- a/tests/manual/agency_identifier/test_muckrock_api_interface.py +++ b/tests/manual/agency_identifier/test_muckrock_api_interface.py @@ -1,7 +1,7 @@ import pytest from aiohttp import ClientSession -from src.collectors.source_collectors import MuckrockAPIInterface +from src.collectors.source_collectors.muckrock.api_interface.core import MuckrockAPIInterface @pytest.mark.asyncio From 927cc73895d5bbe9a922220ab62dc814363635c6 Mon Sep 17 00:00:00 2001 From: Max Chis Date: Thu, 17 Jul 2025 16:11:45 -0400 Subject: [PATCH 3/3] Fix bug in `get_urls` logic --- src/api/endpoints/url/get/dto.py | 2 +- tests/conftest.py | 19 +++++++++++++++---- tests/helpers/setup/populate.py | 18 ++++++++++++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) create mode 100644 tests/helpers/setup/populate.py diff --git a/src/api/endpoints/url/get/dto.py b/src/api/endpoints/url/get/dto.py index 5a9eb6fa..3b3e980e 100644 --- a/src/api/endpoints/url/get/dto.py +++ b/src/api/endpoints/url/get/dto.py @@ -22,7 +22,7 @@ class GetURLsResponseMetadataInfo(BaseModel): class GetURLsResponseInnerInfo(BaseModel): id: int - batch_id: int + batch_id: int | None url: str status: URLStatus collector_metadata: Optional[dict] diff --git a/tests/conftest.py b/tests/conftest.py index 97d92c2d..ee9a6774 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,8 @@ import logging -from typing import Any, Generator +from typing import Any, Generator, AsyncGenerator, Coroutine import pytest +import pytest_asyncio from alembic.config import Config from sqlalchemy import create_engine, inspect, MetaData from sqlalchemy.orm import scoped_session, sessionmaker @@ -14,6 +15,7 @@ from src.util.helper_functions import load_from_environment from tests.helpers.alembic_runner import AlembicRunner from tests.helpers.db_data_creator import DBDataCreator +from tests.helpers.setup.populate import populate_database from tests.helpers.setup.wipe import wipe_database @@ -93,6 +95,7 @@ def wiped_database(): wipe_database(get_postgres_connection_string()) + @pytest.fixture def db_client_test(wiped_database) -> Generator[DatabaseClient, Any, None]: # Drop pre-existing table @@ -101,14 +104,22 @@ def db_client_test(wiped_database) -> Generator[DatabaseClient, Any, None]: yield db_client db_client.engine.dispose() -@pytest.fixture -def adb_client_test(wiped_database) -> Generator[AsyncDatabaseClient, Any, None]: +@pytest_asyncio.fixture +async def populated_database(wiped_database) -> None: + conn = get_postgres_connection_string(is_async=True) + adb_client = AsyncDatabaseClient(db_url=conn) + await populate_database(adb_client) + +@pytest_asyncio.fixture +async def adb_client_test(wiped_database) -> AsyncGenerator[AsyncDatabaseClient, Any]: conn = get_postgres_connection_string(is_async=True) adb_client = AsyncDatabaseClient(db_url=conn) yield adb_client adb_client.engine.dispose() @pytest.fixture -def db_data_creator(db_client_test): +def db_data_creator( + db_client_test, +): db_data_creator = DBDataCreator(db_client=db_client_test) yield db_data_creator diff --git a/tests/helpers/setup/populate.py b/tests/helpers/setup/populate.py new file mode 100644 index 00000000..1741253b --- /dev/null +++ b/tests/helpers/setup/populate.py @@ -0,0 +1,18 @@ +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.instantiations.url.core import URL + + +async def populate_database(adb_client: AsyncDatabaseClient) -> None: + """Populate database with test data.""" + url = URL( + url="https://www.test-data.com/static-test-data", + name="Fake test data", + description="Test data populated as a result of `reset_database`, " + "which imitates a validated URL synchronized from the Data Sources App.", + collector_metadata={ + "source_collector": "test-data", + }, + outcome='validated', + record_type="Other" + ) + await adb_client.add(url) \ No newline at end of file