From a980f46b6e90724ed1b043fde7316f67f9bf860b Mon Sep 17 00:00:00 2001 From: Max Chis Date: Sat, 4 Oct 2025 09:22:15 -0400 Subject: [PATCH 1/2] Continue draft --- ...dd_url_task_error_table_and_remove_url_.py | 54 +++++++++++++++++++ src/api/endpoints/task/by_id/dto.py | 4 +- src/api/endpoints/task/by_id/query.py | 18 +++---- src/api/endpoints/url/get/dto.py | 5 +- src/api/endpoints/url/get/query.py | 10 ++-- src/core/tasks/base/operator.py | 17 ++++++ .../impl/internet_archives/probe/operator.py | 15 +++--- .../impl/internet_archives/save/operator.py | 9 ++-- .../scheduled/impl/task_cleanup/query.py | 10 ---- .../subtasks/templates/subtask.py | 11 ++-- .../tasks/url/operators/auto_relevant/core.py | 12 ++--- .../operators/html/queries/insert/convert.py | 7 ++- .../location_id/subtasks/templates/subtask.py | 9 ++-- .../tasks/url/operators/misc_metadata/core.py | 21 ++++---- .../tasks/url/operators/record_type/core.py | 19 ++++--- .../tasks/url/operators/screenshot/convert.py | 11 ++-- .../tasks/url/operators/screenshot/core.py | 9 ++-- .../url/operators/screenshot/queries/cte.py | 8 +-- .../url/operators/submit_approved/core.py | 26 ++++----- .../url/operators/submit_meta_urls/core.py | 9 ++-- src/db/client/async_.py | 45 +++------------- src/db/helpers/query.py | 8 +++ src/db/models/impl/task/core.py | 4 +- src/db/models/impl/task/error.py | 2 +- src/db/models/impl/url/core/sqlalchemy.py | 6 ++- .../impl/url/error/url_screenshot/pydantic.py | 13 ----- .../url/error/url_screenshot/sqlalchemy.py | 20 ------- src/db/models/impl/url/error_info/pydantic.py | 6 --- .../models/impl/url/error_info/sqlalchemy.py | 20 ------- .../url/{error => task_error}/__init__.py | 0 .../pydantic_}/__init__.py | 0 .../impl/url/task_error/pydantic_/insert.py | 18 +++++++ .../impl/url/task_error/pydantic_/small.py | 7 +++ .../models/impl/url/task_error/sqlalchemy.py | 23 ++++++++ tests/automated/integration/api/test_task.py | 2 +- .../automated/integration/api/url/test_get.py | 2 +- .../db/client/test_add_url_error_info.py | 37 ------------- .../internet_archives/probe/test_error.py | 6 +-- .../impl/internet_archives/save/test_error.py | 4 +- .../tasks/url/impl/auto_relevant/test_task.py | 4 +- .../end_to_end/test_core.py | 4 +- .../tasks/url/impl/screenshot/test_core.py | 6 +-- .../test_submit_approved_url_task.py | 15 +++--- tests/helpers/data_creator/core.py | 15 +++--- 44 files changed, 265 insertions(+), 286 deletions(-) create mode 100644 alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py delete mode 100644 src/db/models/impl/url/error/url_screenshot/pydantic.py delete mode 100644 src/db/models/impl/url/error/url_screenshot/sqlalchemy.py delete mode 100644 src/db/models/impl/url/error_info/sqlalchemy.py rename src/db/models/impl/url/{error => task_error}/__init__.py (100%) rename src/db/models/impl/url/{error/url_screenshot => task_error/pydantic_}/__init__.py (100%) create mode 100644 src/db/models/impl/url/task_error/pydantic_/insert.py create mode 100644 src/db/models/impl/url/task_error/pydantic_/small.py create mode 100644 src/db/models/impl/url/task_error/sqlalchemy.py delete mode 100644 tests/automated/integration/db/client/test_add_url_error_info.py diff --git a/alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py b/alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py new file mode 100644 index 00000000..e6a4e93d --- /dev/null +++ b/alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py @@ -0,0 +1,54 @@ +"""Add url_task_error table and remove url_error_info + +Revision ID: dc6ab5157c49 +Revises: c5c20af87511 +Create Date: 2025-10-03 18:31:54.887740 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import ENUM + +from src.util.alembic_helpers import url_id_column, task_id_column, created_at_column + +# revision identifiers, used by Alembic. +revision: str = 'dc6ab5157c49' +down_revision: Union[str, None] = 'c5c20af87511' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + + + + +def upgrade() -> None: + _remove_url_error_info() + _remove_url_screenshot_error() + _add_url_task_error() + +def _remove_url_error_info(): + op.drop_table("url_error_info") + +def _remove_url_screenshot_error(): + op.drop_table("error_url_screenshot") + +def _add_url_task_error(): + op.create_table( + "url_task_error", + url_id_column(), + task_id_column(), + sa.Column( + "task_type", + ENUM(name="task_type", create_type=False) + ), + sa.Column("error", sa.String(), nullable=False), + created_at_column(), + sa.PrimaryKeyConstraint("url_id", "task_type") + ) + + + +def downgrade() -> None: + pass diff --git a/src/api/endpoints/task/by_id/dto.py b/src/api/endpoints/task/by_id/dto.py index e9a73e44..64595f5d 100644 --- a/src/api/endpoints/task/by_id/dto.py +++ b/src/api/endpoints/task/by_id/dto.py @@ -1,13 +1,11 @@ import datetime -from typing import Optional from pydantic import BaseModel +from src.db.enums import TaskType from src.db.models.impl.task.enums import TaskStatus from src.db.models.impl.url.core.pydantic.info import URLInfo from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic -from src.db.enums import TaskType -from src.core.enums import BatchStatus class TaskInfo(BaseModel): diff --git a/src/api/endpoints/task/by_id/query.py b/src/api/endpoints/task/by_id/query.py index 02d18a3d..c7ccf353 100644 --- a/src/api/endpoints/task/by_id/query.py +++ b/src/api/endpoints/task/by_id/query.py @@ -1,16 +1,15 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import selectinload +from sqlalchemy.orm import selectinload, joinedload from src.api.endpoints.task.by_id.dto import TaskInfo from src.collectors.enums import URLStatus -from src.core.enums import BatchStatus -from src.db.models.impl.task.enums import TaskStatus -from src.db.models.impl.url.core.pydantic.info import URLInfo -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.enums import TaskType from src.db.models.impl.task.core import Task +from src.db.models.impl.task.enums import TaskStatus +from src.db.models.impl.url.core.pydantic.info import URLInfo from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.queries.base.builder import QueryBuilderBase @@ -28,12 +27,11 @@ async def run(self, session: AsyncSession) -> TaskInfo: .options( selectinload(Task.urls) .selectinload(URL.batch), - selectinload(Task.error), - selectinload(Task.errored_urls) + selectinload(Task.url_errors), ) ) task = result.scalars().first() - error = task.error[0].error if len(task.error) > 0 else None + error = task.url_errors[0].error if len(task.url_errors) > 0 else None # Get error info if any # Get URLs urls = task.urls @@ -50,12 +48,12 @@ async def run(self, session: AsyncSession) -> TaskInfo: url_infos.append(url_info) errored_urls = [] - for url in task.errored_urls: + for url in task.url_errors: url_error_info = URLErrorInfoPydantic( task_id=url.task_id, url_id=url.url_id, error=url.error, - updated_at=url.updated_at + updated_at=url.created_at ) errored_urls.append(url_error_info) return TaskInfo( diff --git a/src/api/endpoints/url/get/dto.py b/src/api/endpoints/url/get/dto.py index eef8da2d..a4616d7e 100644 --- a/src/api/endpoints/url/get/dto.py +++ b/src/api/endpoints/url/get/dto.py @@ -4,10 +4,11 @@ from pydantic import BaseModel from src.collectors.enums import URLStatus -from src.db.enums import URLMetadataAttributeType, ValidationStatus, ValidationSource +from src.db.enums import URLMetadataAttributeType, ValidationStatus, ValidationSource, TaskType + class GetURLsResponseErrorInfo(BaseModel): - id: int + task: TaskType error: str updated_at: datetime.datetime diff --git a/src/api/endpoints/url/get/query.py b/src/api/endpoints/url/get/query.py index be4801bf..d476624e 100644 --- a/src/api/endpoints/url/get/query.py +++ b/src/api/endpoints/url/get/query.py @@ -6,7 +6,7 @@ from src.collectors.enums import URLStatus from src.db.client.helpers import add_standard_limit_and_offset from src.db.models.impl.url.core.sqlalchemy import URL -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from src.db.queries.base.builder import QueryBuilderBase @@ -23,14 +23,14 @@ def __init__( async def run(self, session: AsyncSession) -> GetURLsResponseInfo: statement = select(URL).options( - selectinload(URL.error_info), + selectinload(URL.task_errors), selectinload(URL.batch) ).order_by(URL.id) if self.errors: # Only return URLs with errors statement = statement.where( exists( - select(URLErrorInfo).where(URLErrorInfo.url_id == URL.id) + select(URLTaskError).where(URLTaskError.url_id == URL.id) ) ) add_standard_limit_and_offset(statement, self.page) @@ -39,9 +39,9 @@ async def run(self, session: AsyncSession) -> GetURLsResponseInfo: final_results = [] for result in all_results: error_results = [] - for error in result.error_info: + for error in result.task_errors: error_result = GetURLsResponseErrorInfo( - id=error.id, + task=error.task_type, error=error.error, updated_at=error.updated_at ) diff --git a/src/core/tasks/base/operator.py b/src/core/tasks/base/operator.py index 93230db5..51f07a47 100644 --- a/src/core/tasks/base/operator.py +++ b/src/core/tasks/base/operator.py @@ -7,6 +7,8 @@ from src.db.client.async_ import AsyncDatabaseClient from src.db.enums import TaskType from src.db.models.impl.task.enums import TaskStatus +from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall class TaskOperatorBase(ABC): @@ -66,3 +68,18 @@ async def handle_task_error(self, e): task_id=self.task_id, error=str(e) ) + + async def add_task_errors( + self, + errors: list[URLTaskErrorSmall] + ) -> None: + inserts: list[URLTaskErrorPydantic] = [ + URLTaskErrorPydantic( + task_id=self.task_id, + url_id=error.url_id, + task_type=self.task_type, + error=error.error + ) + for error in errors + ] + await self.adb_client.bulk_insert(inserts) \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py b/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py index 05f58554..f1ae27cd 100644 --- a/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py @@ -2,19 +2,19 @@ from src.core.tasks.mixins.link_urls import LinkURLsMixin from src.core.tasks.mixins.prereq import HasPrerequisitesMixin -from src.core.tasks.scheduled.impl.internet_archives.probe.queries.prereq import \ - CheckURLInternetArchivesTaskPrerequisitesQueryBuilder -from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase from src.core.tasks.scheduled.impl.internet_archives.probe.convert import convert_ia_url_mapping_to_ia_metadata from src.core.tasks.scheduled.impl.internet_archives.probe.filter import filter_into_subsets from src.core.tasks.scheduled.impl.internet_archives.probe.models.subset import IAURLMappingSubsets from src.core.tasks.scheduled.impl.internet_archives.probe.queries.get import GetURLsForInternetArchivesTaskQueryBuilder +from src.core.tasks.scheduled.impl.internet_archives.probe.queries.prereq import \ + CheckURLInternetArchivesTaskPrerequisitesQueryBuilder +from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase from src.db.client.async_ import AsyncDatabaseClient from src.db.dtos.url.mapping import URLMapping from src.db.enums import TaskType from src.db.models.impl.flag.checked_for_ia.pydantic import FlagURLCheckedForInternetArchivesPydantic -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.models.impl.url.internet_archives.probe.pydantic import URLInternetArchiveMetadataPydantic +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall from src.external.internet_archives.client import InternetArchivesClient from src.external.internet_archives.models.ia_url_mapping import InternetArchivesURLMapping from src.util.progress_bar import get_progress_bar_disabled @@ -60,16 +60,15 @@ async def inner_task_logic(self) -> None: await self._add_ia_metadata_to_db(mapper, ia_mappings=subsets.has_metadata) async def _add_errors_to_db(self, mapper: URLMapper, ia_mappings: list[InternetArchivesURLMapping]) -> None: - url_error_info_list: list[URLErrorInfoPydantic] = [] + url_error_info_list: list[URLTaskErrorSmall] = [] for ia_mapping in ia_mappings: url_id = mapper.get_id(ia_mapping.url) - url_error_info = URLErrorInfoPydantic( + url_error_info = URLTaskErrorSmall( url_id=url_id, error=ia_mapping.error, - task_id=self.task_id ) url_error_info_list.append(url_error_info) - await self.adb_client.bulk_insert(url_error_info_list) + await self.add_task_errors(url_error_info_list) async def _get_url_mappings(self) -> list[URLMapping]: return await self.adb_client.run_query_builder( diff --git a/src/core/tasks/scheduled/impl/internet_archives/save/operator.py b/src/core/tasks/scheduled/impl/internet_archives/save/operator.py index 8a5b3cdb..fad0d7ac 100644 --- a/src/core/tasks/scheduled/impl/internet_archives/save/operator.py +++ b/src/core/tasks/scheduled/impl/internet_archives/save/operator.py @@ -14,8 +14,8 @@ from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase from src.db.client.async_ import AsyncDatabaseClient from src.db.enums import TaskType -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.models.impl.url.internet_archives.save.pydantic import URLInternetArchiveSaveMetadataPydantic +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall from src.external.internet_archives.client import InternetArchivesClient from src.external.internet_archives.models.save_response import InternetArchivesSaveResponseInfo @@ -89,16 +89,15 @@ async def _add_errors_to_db( mapper: URLToEntryMapper, responses: list[InternetArchivesSaveResponseInfo] ) -> None: - error_info_list: list[URLErrorInfoPydantic] = [] + error_info_list: list[URLTaskErrorSmall] = [] for response in responses: url_id = mapper.get_url_id(response.url) - url_error_info = URLErrorInfoPydantic( + url_error_info = URLTaskErrorSmall( url_id=url_id, error=response.error, - task_id=self.task_id ) error_info_list.append(url_error_info) - await self.adb_client.bulk_insert(error_info_list) + await self.add_task_errors(error_info_list) async def _save_new_saves_to_db( self, diff --git a/src/core/tasks/scheduled/impl/task_cleanup/query.py b/src/core/tasks/scheduled/impl/task_cleanup/query.py index 8874a49a..b455e1c6 100644 --- a/src/core/tasks/scheduled/impl/task_cleanup/query.py +++ b/src/core/tasks/scheduled/impl/task_cleanup/query.py @@ -5,8 +5,6 @@ from sqlalchemy.ext.asyncio import AsyncSession from src.db.models.impl.task.core import Task -from src.db.models.impl.task.error import TaskError -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo from src.db.queries.base.builder import QueryBuilderBase @@ -15,14 +13,6 @@ class TaskCleanupQueryBuilder(QueryBuilderBase): async def run(self, session: AsyncSession) -> Any: one_week_ago: datetime = datetime.now() - timedelta(days=7) - statement = ( - delete(URLErrorInfo) - .where( - URLErrorInfo.updated_at < one_week_ago - ) - ) - await session.execute(statement) - statement = ( delete(Task) .where( diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py b/src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py index efd89ef9..f24b9113 100644 --- a/src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py +++ b/src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py @@ -5,9 +5,9 @@ from src.core.tasks.url.operators.agency_identification.subtasks.models.run_info import AgencyIDSubtaskRunInfo from src.core.tasks.url.operators.agency_identification.subtasks.models.subtask import AutoAgencyIDSubtaskData from src.db.client.async_ import AsyncDatabaseClient -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.models.impl.url.suggestion.agency.subtask.pydantic import URLAutoAgencyIDSubtaskPydantic from src.db.models.impl.url.suggestion.agency.suggestion.pydantic import AgencyIDSubtaskSuggestionPydantic +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall class AgencyIDSubtaskOperatorBase(ABC): @@ -66,17 +66,14 @@ async def _upload_subtask_data( models=suggestions, ) - error_infos: list[URLErrorInfoPydantic] = [] + error_infos: list[URLTaskErrorSmall] = [] for subtask_info in subtask_data_list: if not subtask_info.has_error: continue - error_info = URLErrorInfoPydantic( + error_info = URLTaskErrorSmall( url_id=subtask_info.url_id, error=subtask_info.error, - task_id=self.task_id, ) error_infos.append(error_info) - await self.adb_client.bulk_insert( - models=error_infos, - ) + await self.add_task_errors(error_infos) diff --git a/src/core/tasks/url/operators/auto_relevant/core.py b/src/core/tasks/url/operators/auto_relevant/core.py index 4cb36a27..b5055d38 100644 --- a/src/core/tasks/url/operators/auto_relevant/core.py +++ b/src/core/tasks/url/operators/auto_relevant/core.py @@ -4,8 +4,9 @@ from src.core.tasks.url.operators.base import URLTaskOperatorBase from src.db.client.async_ import AsyncDatabaseClient from src.db.models.impl.url.suggestion.relevant.auto.pydantic.input import AutoRelevancyAnnotationInput -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.enums import TaskType +from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall from src.external.huggingface.inference.client import HuggingFaceInferenceClient from src.external.huggingface.inference.models.input import BasicInput @@ -77,14 +78,13 @@ async def put_results_into_database(self, tdos: list[URLRelevantTDO]) -> None: await self.adb_client.add_user_relevant_suggestions(inputs) async def update_errors_in_database(self, tdos: list[URLRelevantTDO]) -> None: - error_infos = [] + task_errors: list[URLTaskErrorSmall] = [] for tdo in tdos: - error_info = URLErrorInfoPydantic( - task_id=self.task_id, + error_info = URLTaskErrorSmall( url_id=tdo.url_id, error=tdo.error ) - error_infos.append(error_info) - await self.adb_client.add_url_error_infos(error_infos) + task_errors.append(error_info) + await self.add_task_errors(task_errors) diff --git a/src/core/tasks/url/operators/html/queries/insert/convert.py b/src/core/tasks/url/operators/html/queries/insert/convert.py index d689edac..ca827c7e 100644 --- a/src/core/tasks/url/operators/html/queries/insert/convert.py +++ b/src/core/tasks/url/operators/html/queries/insert/convert.py @@ -3,10 +3,12 @@ from src.core.tasks.url.operators.html.content_info_getter import HTMLContentInfoGetter from src.core.tasks.url.operators.html.tdo import UrlHtmlTDO from src.db.dtos.url.html_content import URLHTMLContentInfo +from src.db.enums import TaskType from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.models.impl.url.html.compressed.pydantic import URLCompressedHTMLPydantic from src.db.models.impl.url.scrape_info.enums import ScrapeStatus from src.db.models.impl.url.scrape_info.pydantic import URLScrapeInfoInsertModel +from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic from src.db.utils.compression import compress_html from src.external.url_request.dtos.url_response import URLResponseInfo @@ -64,10 +66,11 @@ def convert_to_url_errors( for tdo in tdos: if tdo.url_response_info.success: continue - model = URLErrorInfoPydantic( + model = URLTaskErrorPydantic( url_id=tdo.url_info.id, error=tdo.url_response_info.exception, - task_id=task_id + task_id=task_id, + task_type=TaskType.HTML ) models.append(model) return models \ No newline at end of file diff --git a/src/core/tasks/url/operators/location_id/subtasks/templates/subtask.py b/src/core/tasks/url/operators/location_id/subtasks/templates/subtask.py index 43fe39de..2429f428 100644 --- a/src/core/tasks/url/operators/location_id/subtasks/templates/subtask.py +++ b/src/core/tasks/url/operators/location_id/subtasks/templates/subtask.py @@ -6,9 +6,9 @@ from src.core.tasks.url.operators.location_id.subtasks.models.subtask import AutoLocationIDSubtaskData from src.core.tasks.url.operators.location_id.subtasks.models.suggestion import LocationSuggestion from src.db.client.async_ import AsyncDatabaseClient -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.models.impl.url.suggestion.location.auto.subtask.pydantic import AutoLocationIDSubtaskPydantic from src.db.models.impl.url.suggestion.location.auto.suggestion.pydantic import LocationIDSubtaskSuggestionPydantic +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall class LocationIDSubtaskOperatorBase(ABC): @@ -68,17 +68,16 @@ async def _upload_subtask_data( models=suggestions, ) - error_infos: list[URLErrorInfoPydantic] = [] + error_infos: list[URLTaskErrorSmall] = [] for subtask_info in subtask_data_list: if not subtask_info.has_error: continue - error_info = URLErrorInfoPydantic( + error_info = URLTaskErrorSmall( url_id=subtask_info.url_id, error=subtask_info.error, - task_id=self.task_id, ) error_infos.append(error_info) - await self.adb_client.bulk_insert( + await self.add_task_errors( models=error_infos, ) diff --git a/src/core/tasks/url/operators/misc_metadata/core.py b/src/core/tasks/url/operators/misc_metadata/core.py index c34c2df7..cd45d90e 100644 --- a/src/core/tasks/url/operators/misc_metadata/core.py +++ b/src/core/tasks/url/operators/misc_metadata/core.py @@ -1,16 +1,14 @@ -from typing import Optional - -from src.db.client.async_ import AsyncDatabaseClient -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic -from src.db.enums import TaskType from src.collectors.enums import CollectorType -from src.core.tasks.url.operators.misc_metadata.tdo import URLMiscellaneousMetadataTDO from src.core.tasks.url.operators.base import URLTaskOperatorBase +from src.core.tasks.url.operators.misc_metadata.tdo import URLMiscellaneousMetadataTDO from src.core.tasks.url.subtasks.miscellaneous_metadata.auto_googler import AutoGooglerMiscMetadataSubtask -from src.core.tasks.url.subtasks.miscellaneous_metadata.ckan import CKANMiscMetadataSubtask from src.core.tasks.url.subtasks.miscellaneous_metadata.base import \ MiscellaneousMetadataSubtaskBase +from src.core.tasks.url.subtasks.miscellaneous_metadata.ckan import CKANMiscMetadataSubtask from src.core.tasks.url.subtasks.miscellaneous_metadata.muckrock import MuckrockMiscMetadataSubtask +from src.db.client.async_ import AsyncDatabaseClient +from src.db.enums import TaskType +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall class URLMiscellaneousMetadataTaskOperator(URLTaskOperatorBase): @@ -61,7 +59,7 @@ async def inner_task_logic(self) -> None: tdos: list[URLMiscellaneousMetadataTDO] = await self.adb_client.get_pending_urls_missing_miscellaneous_metadata() await self.link_urls_to_task(url_ids=[tdo.url_id for tdo in tdos]) - error_infos = [] + task_errors: list[URLTaskErrorSmall] = [] for tdo in tdos: subtask = await self.get_subtask(tdo.collector_type) try: @@ -69,12 +67,11 @@ async def inner_task_logic(self) -> None: subtask.process(tdo) await self.html_default_logic(tdo) except Exception as e: - error_info = URLErrorInfoPydantic( - task_id=self.task_id, + error_info = URLTaskErrorSmall( url_id=tdo.url_id, error=str(e), ) - error_infos.append(error_info) + task_errors.append(error_info) await self.adb_client.add_miscellaneous_metadata(tdos) - await self.adb_client.add_url_error_infos(error_infos) \ No newline at end of file + await self.add_task_errors(task_errors) \ No newline at end of file diff --git a/src/core/tasks/url/operators/record_type/core.py b/src/core/tasks/url/operators/record_type/core.py index bc40e572..8e31fa8d 100644 --- a/src/core/tasks/url/operators/record_type/core.py +++ b/src/core/tasks/url/operators/record_type/core.py @@ -1,10 +1,10 @@ -from src.db.client.async_ import AsyncDatabaseClient -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic -from src.db.enums import TaskType -from src.core.tasks.url.operators.record_type.tdo import URLRecordTypeTDO -from src.core.tasks.url.operators.base import URLTaskOperatorBase from src.core.enums import RecordType +from src.core.tasks.url.operators.base import URLTaskOperatorBase from src.core.tasks.url.operators.record_type.llm_api.record_classifier.openai import OpenAIRecordClassifier +from src.core.tasks.url.operators.record_type.tdo import URLRecordTypeTDO +from src.db.client.async_ import AsyncDatabaseClient +from src.db.enums import TaskType +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall class URLRecordTypeTaskOperator(URLTaskOperatorBase): @@ -42,15 +42,14 @@ async def inner_task_logic(self): await self.update_errors_in_database(error_subset) async def update_errors_in_database(self, tdos: list[URLRecordTypeTDO]): - error_infos = [] + task_errors: list[URLTaskErrorSmall] = [] for tdo in tdos: - error_info = URLErrorInfoPydantic( - task_id=self.task_id, + error_info = URLTaskErrorSmall( url_id=tdo.url_with_html.url_id, error=tdo.error ) - error_infos.append(error_info) - await self.adb_client.add_url_error_infos(error_infos) + task_errors.append(error_info) + await self.add_task_errors(task_errors) async def put_results_into_database(self, tdos: list[URLRecordTypeTDO]): suggestions = [] diff --git a/src/core/tasks/url/operators/screenshot/convert.py b/src/core/tasks/url/operators/screenshot/convert.py index b2527f42..09904ff1 100644 --- a/src/core/tasks/url/operators/screenshot/convert.py +++ b/src/core/tasks/url/operators/screenshot/convert.py @@ -1,7 +1,6 @@ from src.core.tasks.url.operators.screenshot.models.outcome import URLScreenshotOutcome -from src.db.models.impl.url.error.url_screenshot.pydantic import ErrorURLScreenshotPydantic -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.models.impl.url.screenshot.pydantic import URLScreenshotPydantic +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall def convert_to_url_screenshot_pydantic( @@ -17,12 +16,12 @@ def convert_to_url_screenshot_pydantic( results.append(result) return results -def convert_to_error_url_screenshot_pydantic( +def convert_to_task_error( outcomes: list[URLScreenshotOutcome] -) -> list[ErrorURLScreenshotPydantic]: - results: list[ErrorURLScreenshotPydantic] = [] +) -> list[URLTaskErrorSmall]: + results: list[URLTaskErrorSmall] = [] for outcome in outcomes: - result = ErrorURLScreenshotPydantic( + result = URLTaskErrorSmall( url_id=outcome.url_id, error=outcome.error, ) diff --git a/src/core/tasks/url/operators/screenshot/core.py b/src/core/tasks/url/operators/screenshot/core.py index 2e54f501..96627ab8 100644 --- a/src/core/tasks/url/operators/screenshot/core.py +++ b/src/core/tasks/url/operators/screenshot/core.py @@ -1,6 +1,6 @@ from src.core.tasks.url.operators.base import URLTaskOperatorBase from src.core.tasks.url.operators.screenshot.convert import convert_to_url_screenshot_pydantic, \ - convert_to_error_url_screenshot_pydantic + convert_to_task_error from src.core.tasks.url.operators.screenshot.filter import filter_success_outcomes from src.core.tasks.url.operators.screenshot.get import get_url_screenshots from src.core.tasks.url.operators.screenshot.models.outcome import URLScreenshotOutcome @@ -10,9 +10,8 @@ from src.db.client.async_ import AsyncDatabaseClient from src.db.dtos.url.mapping import URLMapping from src.db.enums import TaskType -from src.db.models.impl.url.error.url_screenshot.pydantic import ErrorURLScreenshotPydantic -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.models.impl.url.screenshot.pydantic import URLScreenshotPydantic +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall class URLScreenshotTaskOperator(URLTaskOperatorBase): @@ -42,10 +41,10 @@ async def upload_screenshots(self, outcomes: list[URLScreenshotOutcome]) -> None await self.adb_client.bulk_insert(insert_models) async def upload_errors(self, outcomes: list[URLScreenshotOutcome]) -> None: - insert_models: list[ErrorURLScreenshotPydantic] = convert_to_error_url_screenshot_pydantic( + insert_models: list[URLTaskErrorSmall] = convert_to_task_error( outcomes=outcomes, ) - await self.adb_client.bulk_insert(insert_models) + await self.add_task_errors(insert_models) async def inner_task_logic(self) -> None: url_mappings: list[URLMapping] = await self.get_urls_without_screenshot() diff --git a/src/core/tasks/url/operators/screenshot/queries/cte.py b/src/core/tasks/url/operators/screenshot/queries/cte.py index e1bbf763..d961aabf 100644 --- a/src/core/tasks/url/operators/screenshot/queries/cte.py +++ b/src/core/tasks/url/operators/screenshot/queries/cte.py @@ -1,8 +1,8 @@ -from sqlalchemy import CTE, select, exists, Column +from sqlalchemy import CTE, select, Column -from src.db.helpers.query import url_not_validated, not_exists_url +from src.db.enums import TaskType +from src.db.helpers.query import url_not_validated, not_exists_url, no_url_task_error from src.db.models.impl.url.core.sqlalchemy import URL -from src.db.models.impl.url.error.url_screenshot.sqlalchemy import ErrorURLScreenshot from src.db.models.impl.url.screenshot.sqlalchemy import URLScreenshot from src.db.models.impl.url.web_metadata.sqlalchemy import URLWebMetadata @@ -22,7 +22,7 @@ def __init__(self): .where( url_not_validated(), not_exists_url(URLScreenshot), - not_exists_url(ErrorURLScreenshot), + no_url_task_error(TaskType.SCREENSHOT), URLWebMetadata.status_code == 200, ) .cte("url_screenshot_prerequisites") diff --git a/src/core/tasks/url/operators/submit_approved/core.py b/src/core/tasks/url/operators/submit_approved/core.py index 379e47ae..a09b0462 100644 --- a/src/core/tasks/url/operators/submit_approved/core.py +++ b/src/core/tasks/url/operators/submit_approved/core.py @@ -1,8 +1,8 @@ +from src.core.tasks.url.operators.base import URLTaskOperatorBase +from src.core.tasks.url.operators.submit_approved.tdo import SubmitApprovedURLTDO, SubmittedURLInfo from src.db.client.async_ import AsyncDatabaseClient -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.enums import TaskType -from src.core.tasks.url.operators.submit_approved.tdo import SubmitApprovedURLTDO -from src.core.tasks.url.operators.base import URLTaskOperatorBase +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall from src.external.pdap.client import PDAPClient @@ -31,16 +31,16 @@ async def inner_task_logic(self): await self.link_urls_to_task(url_ids=[tdo.url_id for tdo in tdos]) # Submit each URL, recording errors if they exist - submitted_url_infos = await self.pdap_client.submit_data_source_urls(tdos) + submitted_url_infos: list[SubmittedURLInfo] = await self.pdap_client.submit_data_source_urls(tdos) - error_infos = await self.get_error_infos(submitted_url_infos) + task_errors: list[URLTaskErrorSmall] = await self.get_error_infos(submitted_url_infos) success_infos = await self.get_success_infos(submitted_url_infos) # Update the database for successful submissions await self.adb_client.mark_urls_as_submitted(infos=success_infos) # Update the database for failed submissions - await self.adb_client.add_url_error_infos(error_infos) + await self.add_task_errors(task_errors) async def get_success_infos(self, submitted_url_infos): success_infos = [ @@ -49,17 +49,19 @@ async def get_success_infos(self, submitted_url_infos): ] return success_infos - async def get_error_infos(self, submitted_url_infos): - error_infos: list[URLErrorInfoPydantic] = [] + async def get_error_infos( + self, + submitted_url_infos: list[SubmittedURLInfo] + ) -> list[URLTaskErrorSmall]: + task_errors: list[URLTaskErrorSmall] = [] error_response_objects = [ response_object for response_object in submitted_url_infos if response_object.request_error is not None ] for error_response_object in error_response_objects: - error_info = URLErrorInfoPydantic( - task_id=self.task_id, + error_info = URLTaskErrorSmall( url_id=error_response_object.url_id, error=error_response_object.request_error, ) - error_infos.append(error_info) - return error_infos + task_errors.append(error_info) + return task_errors diff --git a/src/core/tasks/url/operators/submit_meta_urls/core.py b/src/core/tasks/url/operators/submit_meta_urls/core.py index 3202a4cf..e06901da 100644 --- a/src/core/tasks/url/operators/submit_meta_urls/core.py +++ b/src/core/tasks/url/operators/submit_meta_urls/core.py @@ -6,7 +6,7 @@ from src.db.dtos.url.mapping import URLMapping from src.db.enums import TaskType from src.db.models.impl.url.ds_meta_url.pydantic import URLDSMetaURLPydantic -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall from src.external.pdap.client import PDAPClient from src.external.pdap.impl.meta_urls.enums import SubmitMetaURLsStatus from src.external.pdap.impl.meta_urls.request import SubmitMetaURLsRequest @@ -53,7 +53,7 @@ async def inner_task_logic(self) -> None: responses: list[SubmitMetaURLsResponse] = \ await self.pdap_client.submit_meta_urls(requests) - errors: list[URLErrorInfoPydantic] = [] + errors: list[URLTaskErrorSmall] = [] inserts: list[URLDSMetaURLPydantic] = [] for response in responses: @@ -68,12 +68,11 @@ async def inner_task_logic(self) -> None: ) else: errors.append( - URLErrorInfoPydantic( + URLTaskErrorSmall( url_id=url_id, - task_id=self.task_id, error=response.error, ) ) - await self.adb_client.bulk_insert(errors) + await self.add_task_errors(errors) await self.adb_client.bulk_insert(inserts) diff --git a/src/db/client/async_.py b/src/db/client/async_.py index 52191078..6158ff5b 100644 --- a/src/db/client/async_.py +++ b/src/db/client/async_.py @@ -6,8 +6,7 @@ from sqlalchemy import select, exists, func, Select, and_, update, delete, Row from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker -from sqlalchemy.orm import selectinload, QueryableAttribute - +from sqlalchemy.orm import selectinload from src.api.endpoints.annotate.all.get.models.response import GetNextURLForAllAnnotationResponse from src.api.endpoints.annotate.all.get.queries.core import GetNextURLForAllAnnotationQueryBuilder @@ -75,7 +74,7 @@ from src.db.dtos.url.raw_html import RawHTMLInfo from src.db.enums import TaskType from src.db.helpers.session import session_helper as sh -from src.db.models.impl.agency.enums import AgencyType, JurisdictionType +from src.db.models.impl.agency.enums import AgencyType from src.db.models.impl.agency.sqlalchemy import Agency from src.db.models.impl.backlog_snapshot import BacklogSnapshot from src.db.models.impl.batch.pydantic.info import BatchInfo @@ -95,8 +94,6 @@ from src.db.models.impl.url.core.pydantic.info import URLInfo from src.db.models.impl.url.core.sqlalchemy import URL from src.db.models.impl.url.data_source.sqlalchemy import URLDataSource -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML from src.db.models.impl.url.html.content.sqlalchemy import URLHTMLContent from src.db.models.impl.url.optional_data_source_metadata import URLOptionalDataSourceMetadata @@ -107,6 +104,7 @@ from src.db.models.impl.url.suggestion.relevant.auto.pydantic.input import AutoRelevancyAnnotationInput from src.db.models.impl.url.suggestion.relevant.auto.sqlalchemy import AutoRelevantSuggestion from src.db.models.impl.url.suggestion.relevant.user import UserURLTypeSuggestion +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from src.db.models.impl.url.web_metadata.sqlalchemy import URLWebMetadata from src.db.models.templates_.base import Base from src.db.queries.base.builder import QueryBuilderBase @@ -365,37 +363,6 @@ async def add_user_record_type_suggestion( # endregion record_type - @session_manager - async def add_url_error_infos(self, session: AsyncSession, url_error_infos: list[URLErrorInfoPydantic]): - for url_error_info in url_error_infos: - statement = select(URL).where(URL.id == url_error_info.url_id) - scalar_result = await session.scalars(statement) - url = scalar_result.first() - url.status = URLStatus.ERROR.value - - url_error = URLErrorInfo(**url_error_info.model_dump()) - session.add(url_error) - - @session_manager - async def get_urls_with_errors(self, session: AsyncSession) -> list[URLErrorInfoPydantic]: - statement = (select(URL, URLErrorInfo.error, URLErrorInfo.updated_at, URLErrorInfo.task_id) - .join(URLErrorInfo) - .where(URL.status == URLStatus.ERROR.value) - .order_by(URL.id)) - scalar_result = await session.execute(statement) - results = scalar_result.all() - final_results = [] - for url, error, updated_at, task_id in results: - final_results.append( - URLErrorInfoPydantic( - url_id=url.id, - error=error, - updated_at=updated_at, - task_id=task_id - ) - ) - - return final_results @session_manager async def add_html_content_infos(self, session: AsyncSession, html_content_infos: list[URLHTMLContentInfo]): @@ -590,8 +557,8 @@ async def link_urls_to_task( async def get_tasks( self, session: AsyncSession, - task_type: Optional[TaskType] = None, - task_status: Optional[BatchStatus] = None, + task_type: TaskType | None = None, + task_status: BatchStatus | None = None, page: int = 1 ) -> GetTasksResponse: url_count_subquery = self.statement_composer.simple_count_subquery( @@ -601,7 +568,7 @@ async def get_tasks( ) url_error_count_subquery = self.statement_composer.simple_count_subquery( - URLErrorInfo, + URLTaskError, 'task_id', 'url_error_count' ) diff --git a/src/db/helpers/query.py b/src/db/helpers/query.py index b5eda268..bd52bae7 100644 --- a/src/db/helpers/query.py +++ b/src/db/helpers/query.py @@ -1,7 +1,9 @@ from sqlalchemy import exists, ColumnElement +from src.db.enums import TaskType from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from src.db.models.mixins import URLDependentMixin @@ -13,4 +15,10 @@ def not_exists_url( ) -> ColumnElement[bool]: return ~exists().where( model.url_id == URL.id + ) + +def no_url_task_error(task_type: TaskType) -> ColumnElement[bool]: + return ~exists().where( + URLTaskError.url_id == URL.id, + URLTaskError.task_type == task_type ) \ No newline at end of file diff --git a/src/db/models/impl/task/core.py b/src/db/models/impl/task/core.py index 2890f4d0..566dd116 100644 --- a/src/db/models/impl/task/core.py +++ b/src/db/models/impl/task/core.py @@ -35,5 +35,5 @@ class Task(UpdatedAtMixin, WithIDBase): secondary="link_task_urls", back_populates="tasks" ) - error = relationship(TaskError, back_populates="task") - errored_urls = relationship("URLErrorInfo", back_populates="task") + errors = relationship(TaskError) + url_errors = relationship("URLTaskError") diff --git a/src/db/models/impl/task/error.py b/src/db/models/impl/task/error.py index c5a25e78..2de0c66a 100644 --- a/src/db/models/impl/task/error.py +++ b/src/db/models/impl/task/error.py @@ -11,7 +11,7 @@ class TaskError(UpdatedAtMixin, TaskDependentMixin, WithIDBase): error = Column(Text, nullable=False) # Relationships - task = relationship("Task", back_populates="error") + task = relationship("Task") __table_args__ = (UniqueConstraint( "task_id", diff --git a/src/db/models/impl/url/core/sqlalchemy.py b/src/db/models/impl/url/core/sqlalchemy.py index 1e6d76a6..5a9e6217 100644 --- a/src/db/models/impl/url/core/sqlalchemy.py +++ b/src/db/models/impl/url/core/sqlalchemy.py @@ -11,6 +11,7 @@ from src.db.models.impl.url.record_type.sqlalchemy import URLRecordType from src.db.models.impl.url.suggestion.location.auto.subtask.sqlalchemy import AutoLocationIDSubtask from src.db.models.impl.url.suggestion.name.sqlalchemy import URLNameSuggestion +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from src.db.models.mixins import UpdatedAtMixin, CreatedAtMixin from src.db.models.templates_.with_id import WithIDBase @@ -50,7 +51,10 @@ class URL(UpdatedAtMixin, CreatedAtMixin, WithIDBase): ) duplicates = relationship("Duplicate", back_populates="original_url") html_content = relationship("URLHTMLContent", back_populates="url", cascade="all, delete-orphan") - error_info = relationship("URLErrorInfo", back_populates="url", cascade="all, delete-orphan") + task_errors = relationship( + URLTaskError, + cascade="all, delete-orphan" + ) tasks = relationship( "Task", secondary="link_task_urls", diff --git a/src/db/models/impl/url/error/url_screenshot/pydantic.py b/src/db/models/impl/url/error/url_screenshot/pydantic.py deleted file mode 100644 index ffecc86d..00000000 --- a/src/db/models/impl/url/error/url_screenshot/pydantic.py +++ /dev/null @@ -1,13 +0,0 @@ -from pydantic import BaseModel - -from src.db.models.impl.url.error.url_screenshot.sqlalchemy import ErrorURLScreenshot -from src.db.models.templates_.base import Base - - -class ErrorURLScreenshotPydantic(BaseModel): - url_id: int - error: str - - @classmethod - def sa_model(cls) -> type[Base]: - return ErrorURLScreenshot \ No newline at end of file diff --git a/src/db/models/impl/url/error/url_screenshot/sqlalchemy.py b/src/db/models/impl/url/error/url_screenshot/sqlalchemy.py deleted file mode 100644 index e06bf6dd..00000000 --- a/src/db/models/impl/url/error/url_screenshot/sqlalchemy.py +++ /dev/null @@ -1,20 +0,0 @@ -from sqlalchemy import Column, String - -from src.db.models.helpers import url_id_primary_key_constraint -from src.db.models.mixins import URLDependentMixin, CreatedAtMixin -from src.db.models.templates_.base import Base - - -class ErrorURLScreenshot( - Base, - URLDependentMixin, - CreatedAtMixin, -): - - __tablename__ = "error_url_screenshot" - __table_args__ = ( - url_id_primary_key_constraint(), - ) - - - error = Column(String, nullable=False) \ No newline at end of file diff --git a/src/db/models/impl/url/error_info/pydantic.py b/src/db/models/impl/url/error_info/pydantic.py index 013584cb..3ae4d482 100644 --- a/src/db/models/impl/url/error_info/pydantic.py +++ b/src/db/models/impl/url/error_info/pydantic.py @@ -1,7 +1,5 @@ import datetime -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo -from src.db.models.templates_.base import Base from src.db.templates.markers.bulk.insert import BulkInsertableModel @@ -10,7 +8,3 @@ class URLErrorInfoPydantic(BulkInsertableModel): url_id: int error: str updated_at: datetime.datetime = None - - @classmethod - def sa_model(cls) -> type[Base]: - return URLErrorInfo \ No newline at end of file diff --git a/src/db/models/impl/url/error_info/sqlalchemy.py b/src/db/models/impl/url/error_info/sqlalchemy.py deleted file mode 100644 index 59f6c263..00000000 --- a/src/db/models/impl/url/error_info/sqlalchemy.py +++ /dev/null @@ -1,20 +0,0 @@ -from sqlalchemy import UniqueConstraint, Column, Text -from sqlalchemy.orm import relationship - -from src.db.models.mixins import UpdatedAtMixin, TaskDependentMixin, URLDependentMixin -from src.db.models.templates_.with_id import WithIDBase - - -class URLErrorInfo(UpdatedAtMixin, TaskDependentMixin, URLDependentMixin, WithIDBase): - __tablename__ = 'url_error_info' - __table_args__ = (UniqueConstraint( - "url_id", - "task_id", - name="uq_url_id_error"), - ) - - error = Column(Text, nullable=False) - - # Relationships - url = relationship("URL", back_populates="error_info") - task = relationship("Task", back_populates="errored_urls") diff --git a/src/db/models/impl/url/error/__init__.py b/src/db/models/impl/url/task_error/__init__.py similarity index 100% rename from src/db/models/impl/url/error/__init__.py rename to src/db/models/impl/url/task_error/__init__.py diff --git a/src/db/models/impl/url/error/url_screenshot/__init__.py b/src/db/models/impl/url/task_error/pydantic_/__init__.py similarity index 100% rename from src/db/models/impl/url/error/url_screenshot/__init__.py rename to src/db/models/impl/url/task_error/pydantic_/__init__.py diff --git a/src/db/models/impl/url/task_error/pydantic_/insert.py b/src/db/models/impl/url/task_error/pydantic_/insert.py new file mode 100644 index 00000000..87172ad7 --- /dev/null +++ b/src/db/models/impl/url/task_error/pydantic_/insert.py @@ -0,0 +1,18 @@ +from pydantic import BaseModel + +from src.db.enums import TaskType +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError +from src.db.models.templates_.base import Base + + +class URLTaskErrorPydantic(BaseModel): + + url_id: int + task_id: int + task_type: TaskType + error: str + + @classmethod + def sa_model(cls) -> type[Base]: + """Defines the SQLAlchemy model.""" + return URLTaskError diff --git a/src/db/models/impl/url/task_error/pydantic_/small.py b/src/db/models/impl/url/task_error/pydantic_/small.py new file mode 100644 index 00000000..ad14458e --- /dev/null +++ b/src/db/models/impl/url/task_error/pydantic_/small.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel + + +class URLTaskErrorSmall(BaseModel): + """Small version of URLTaskErrorPydantic, to be used with the `add_task_errors` method.""" + url_id: int + error: str \ No newline at end of file diff --git a/src/db/models/impl/url/task_error/sqlalchemy.py b/src/db/models/impl/url/task_error/sqlalchemy.py new file mode 100644 index 00000000..3c4ab016 --- /dev/null +++ b/src/db/models/impl/url/task_error/sqlalchemy.py @@ -0,0 +1,23 @@ +from sqlalchemy import String, Column, PrimaryKeyConstraint +from sqlalchemy.orm import Mapped + +from src.db.enums import TaskType +from src.db.models.helpers import enum_column +from src.db.models.mixins import URLDependentMixin, TaskDependentMixin, CreatedAtMixin +from src.db.models.templates_.base import Base + + +class URLTaskError( + Base, + URLDependentMixin, + TaskDependentMixin, + CreatedAtMixin, +): + __tablename__ = "url_task_error" + + task_type: Mapped[TaskType] = enum_column(TaskType, name="task_type") + error: Mapped[str] = Column(String) + + __table_args__ = ( + PrimaryKeyConstraint("url_id", "task_type"), + ) \ No newline at end of file diff --git a/tests/automated/integration/api/test_task.py b/tests/automated/integration/api/test_task.py index 95ebe003..bda246dc 100644 --- a/tests/automated/integration/api/test_task.py +++ b/tests/automated/integration/api/test_task.py @@ -9,7 +9,7 @@ async def task_setup(ath: APITestHelper) -> int: url_ids = [url.url_id for url in iui.url_mappings] task_id = await ath.db_data_creator.task(url_ids=url_ids) - await ath.db_data_creator.error_info(url_ids=[url_ids[0]], task_id=task_id) + await ath.db_data_creator.task_errors(url_ids=[url_ids[0]], task_id=task_id) return task_id diff --git a/tests/automated/integration/api/url/test_get.py b/tests/automated/integration/api/url/test_get.py index c4bb6bbf..8c95c670 100644 --- a/tests/automated/integration/api/url/test_get.py +++ b/tests/automated/integration/api/url/test_get.py @@ -26,7 +26,7 @@ async def test_get_urls(api_test_helper: APITestHelper): url_ids = [iui.url_mappings[1].url_id, iui.url_mappings[2].url_id] # Add errors - await db_data_creator.error_info(url_ids=url_ids) + await db_data_creator.task_errors(url_ids=url_ids) data: GetURLsResponseInfo = api_test_helper.request_validator.get_urls() diff --git a/tests/automated/integration/db/client/test_add_url_error_info.py b/tests/automated/integration/db/client/test_add_url_error_info.py deleted file mode 100644 index bdcdd498..00000000 --- a/tests/automated/integration/db/client/test_add_url_error_info.py +++ /dev/null @@ -1,37 +0,0 @@ -import pytest - -from src.db.client.async_ import AsyncDatabaseClient -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic -from tests.helpers.data_creator.core import DBDataCreator - - -@pytest.mark.asyncio -async def test_add_url_error_info(db_data_creator: DBDataCreator): - batch_id = db_data_creator.batch() - url_mappings = db_data_creator.urls(batch_id=batch_id, url_count=3).url_mappings - url_ids = [url_mapping.url_id for url_mapping in url_mappings] - - adb_client = AsyncDatabaseClient() - task_id = await db_data_creator.task() - - error_infos = [] - for url_mapping in url_mappings: - uei = URLErrorInfoPydantic( - url_id=url_mapping.url_id, - error="test error", - task_id=task_id - ) - - error_infos.append(uei) - - await adb_client.add_url_error_infos( - url_error_infos=error_infos - ) - - results = await adb_client.get_urls_with_errors() - - assert len(results) == 3 - - for result in results: - assert result.url_id in url_ids - assert result.error == "test error" diff --git a/tests/automated/integration/tasks/scheduled/impl/internet_archives/probe/test_error.py b/tests/automated/integration/tasks/scheduled/impl/internet_archives/probe/test_error.py index 69b3353f..4e1902bb 100644 --- a/tests/automated/integration/tasks/scheduled/impl/internet_archives/probe/test_error.py +++ b/tests/automated/integration/tasks/scheduled/impl/internet_archives/probe/test_error.py @@ -3,10 +3,10 @@ from src.core.tasks.scheduled.impl.internet_archives.probe.operator import InternetArchivesProbeTaskOperator from src.db.client.async_ import AsyncDatabaseClient from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo from src.db.models.impl.url.internet_archives.probe.sqlalchemy import URLInternetArchivesProbeMetadata -from tests.automated.integration.tasks.url.impl.asserts import assert_task_ran_without_error +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from tests.automated.integration.tasks.scheduled.impl.internet_archives.probe.setup import add_urls +from tests.automated.integration.tasks.url.impl.asserts import assert_task_ran_without_error @pytest.mark.asyncio @@ -54,7 +54,7 @@ async def test_error(operator: InternetArchivesProbeTaskOperator) -> None: assert len(metadata_list) == 0 # Confirm presence of URL Error Info - url_error_info_list: list[URLErrorInfo] = await adb_client.get_all(URLErrorInfo) + url_error_info_list: list[URLTaskError] = await adb_client.get_all(URLTaskError) assert len(url_error_info_list) == 2 assert {url_error_info.url_id for url_error_info in url_error_info_list} == set(url_ids) assert {url_error_info.error for url_error_info in url_error_info_list} == { diff --git a/tests/automated/integration/tasks/scheduled/impl/internet_archives/save/test_error.py b/tests/automated/integration/tasks/scheduled/impl/internet_archives/save/test_error.py index 0e7939fc..c754cf44 100644 --- a/tests/automated/integration/tasks/scheduled/impl/internet_archives/save/test_error.py +++ b/tests/automated/integration/tasks/scheduled/impl/internet_archives/save/test_error.py @@ -4,7 +4,7 @@ from src.core.tasks.base.run_info import TaskOperatorRunInfo from src.core.tasks.scheduled.impl.internet_archives.save.operator import InternetArchivesSaveTaskOperator -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from tests.automated.integration.tasks.scheduled.impl.internet_archives.save.setup import setup_valid_entries from tests.automated.integration.tasks.url.impl.asserts import assert_task_ran_without_error @@ -38,7 +38,7 @@ async def test_error(operator: InternetArchivesSaveTaskOperator): assert_task_ran_without_error(run_info) # Confirm URL Error info was added - url_error_list: list[URLErrorInfo] = await operator.adb_client.get_all(URLErrorInfo) + url_error_list: list[URLTaskError] = await operator.adb_client.get_all(URLTaskError) assert len(url_error_list) == 2 assert {url_error.url_id for url_error in url_error_list} == set(url_ids) assert {url_error.error for url_error in url_error_list} == { diff --git a/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py b/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py index 5943213b..bf53bbf5 100644 --- a/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py +++ b/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py @@ -4,8 +4,8 @@ from src.collectors.enums import URLStatus from src.db.models.impl.url.core.sqlalchemy import URL -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo from src.db.models.impl.url.suggestion.relevant.auto.sqlalchemy import AutoRelevantSuggestion +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from tests.automated.integration.tasks.url.impl.asserts import assert_prereqs_not_met, assert_prereqs_met from tests.automated.integration.tasks.url.impl.auto_relevant.setup import setup_operator, setup_urls from tests.helpers.asserts import assert_task_run_success @@ -44,7 +44,7 @@ async def test_url_auto_relevant_task(db_data_creator): assert suggestion.model_name == "test_model" # Confirm presence of url error - errors = await adb_client.get_all(URLErrorInfo) + errors = await adb_client.get_all(URLTaskError) assert len(errors) == 1 diff --git a/tests/automated/integration/tasks/url/impl/location_identification/subtasks/nlp_location_frequency/end_to_end/test_core.py b/tests/automated/integration/tasks/url/impl/location_identification/subtasks/nlp_location_frequency/end_to_end/test_core.py index 2042a588..f8f0c821 100644 --- a/tests/automated/integration/tasks/url/impl/location_identification/subtasks/nlp_location_frequency/end_to_end/test_core.py +++ b/tests/automated/integration/tasks/url/impl/location_identification/subtasks/nlp_location_frequency/end_to_end/test_core.py @@ -10,11 +10,11 @@ from src.core.tasks.url.operators.location_id.subtasks.models.suggestion import LocationSuggestion from src.db.client.async_ import AsyncDatabaseClient from src.db.models.impl.link.task_url import LinkTaskURL -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo from src.db.models.impl.url.suggestion.location.auto.subtask.enums import LocationIDSubtaskType from src.db.models.impl.url.suggestion.location.auto.subtask.pydantic import AutoLocationIDSubtaskPydantic from src.db.models.impl.url.suggestion.location.auto.subtask.sqlalchemy import AutoLocationIDSubtask from src.db.models.impl.url.suggestion.location.auto.suggestion.sqlalchemy import LocationIDSubtaskSuggestion +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from tests.helpers.asserts import assert_task_run_success from tests.helpers.data_creator.core import DBDataCreator from tests.helpers.data_creator.models.creation_info.county import CountyCreationInfo @@ -101,7 +101,7 @@ async def mock_process_inputs( # Confirm one URL error info - error_infos: list[URLErrorInfo] = await adb_client.get_all(URLErrorInfo) + error_infos: list[URLTaskError] = await adb_client.get_all(URLTaskError) assert len(error_infos) == 1 assert error_infos[0].task_id == operator._task_id assert error_infos[0].url_id == error_url_id diff --git a/tests/automated/integration/tasks/url/impl/screenshot/test_core.py b/tests/automated/integration/tasks/url/impl/screenshot/test_core.py index cb627f72..6f54fbf9 100644 --- a/tests/automated/integration/tasks/url/impl/screenshot/test_core.py +++ b/tests/automated/integration/tasks/url/impl/screenshot/test_core.py @@ -3,11 +3,9 @@ import pytest from src.core.tasks.url.operators.screenshot.core import URLScreenshotTaskOperator -from src.core.tasks.url.operators.screenshot.models.outcome import URLScreenshotOutcome from src.db.dtos.url.mapping import URLMapping -from src.db.models.impl.url.error.url_screenshot.sqlalchemy import ErrorURLScreenshot -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo from src.db.models.impl.url.screenshot.sqlalchemy import URLScreenshot +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from src.external.url_request.dtos.screenshot_response import URLScreenshotResponse from tests.helpers.data_creator.core import DBDataCreator from tests.helpers.run import run_task_and_confirm_success @@ -66,7 +64,7 @@ async def test_core( assert screenshots[0].url_id == screenshot_mapping.url_id # Get errors from database, confirm only one - errors: list[ErrorURLScreenshot] = await db_data_creator.adb_client.get_all(ErrorURLScreenshot) + errors: list[URLTaskError] = await db_data_creator.adb_client.get_all(URLTaskError) assert len(errors) == 1 assert errors[0].url_id == error_mapping.url_id diff --git a/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py b/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py index abe2c37d..43d7fc8d 100644 --- a/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py +++ b/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py @@ -1,17 +1,16 @@ import pytest from deepdiff import DeepDiff +from pdap_access_manager import RequestInfo, RequestType, DataSourcesNamespaces -from src.core.tasks.url.operators.submit_approved.core import SubmitApprovedURLTaskOperator -from src.db.enums import TaskType -from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo -from src.db.models.impl.url.data_source.sqlalchemy import URLDataSource -from src.db.models.impl.url.core.sqlalchemy import URL from src.collectors.enums import URLStatus from src.core.tasks.url.enums import TaskOperatorOutcome +from src.core.tasks.url.operators.submit_approved.core import SubmitApprovedURLTaskOperator +from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.models.impl.url.data_source.sqlalchemy import URLDataSource +from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError +from src.external.pdap.client import PDAPClient from tests.automated.integration.tasks.url.impl.submit_approved.mock import mock_make_request from tests.automated.integration.tasks.url.impl.submit_approved.setup import setup_validated_urls -from pdap_access_manager import RequestInfo, RequestType, DataSourcesNamespaces -from src.external.pdap.client import PDAPClient @pytest.mark.asyncio @@ -78,7 +77,7 @@ async def test_submit_approved_url_task( assert url_data_source_2.data_source_id == 34 # Check that errored URL has entry in url_error_info - url_errors = await db_data_creator.adb_client.get_all(URLErrorInfo) + url_errors = await db_data_creator.adb_client.get_all(URLTaskError) assert len(url_errors) == 1 url_error = url_errors[0] assert url_error.url_id == url_3.id diff --git a/tests/helpers/data_creator/core.py b/tests/helpers/data_creator/core.py index ea58562b..6cb3a271 100644 --- a/tests/helpers/data_creator/core.py +++ b/tests/helpers/data_creator/core.py @@ -24,7 +24,6 @@ from src.db.models.impl.link.user_suggestion_not_found.agency.sqlalchemy import LinkUserSuggestionAgencyNotFound from src.db.models.impl.link.user_suggestion_not_found.location.sqlalchemy import LinkUserSuggestionLocationNotFound from src.db.models.impl.url.core.enums import URLSource -from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML from src.db.models.impl.url.suggestion.location.auto.subtask.enums import LocationIDSubtaskType from src.db.models.impl.url.suggestion.location.auto.subtask.sqlalchemy import AutoLocationIDSubtask @@ -32,6 +31,7 @@ from src.db.models.impl.url.suggestion.location.user.sqlalchemy import UserLocationSuggestion from src.db.models.impl.url.suggestion.name.enums import NameSuggestionSource from src.db.models.impl.url.suggestion.name.sqlalchemy import URLNameSuggestion +from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic from src.db.models.impl.url.web_metadata.sqlalchemy import URLWebMetadata from tests.helpers.batch_creation_parameters.core import TestBatchCreationParameters from tests.helpers.batch_creation_parameters.enums import URLCreationEnum @@ -321,22 +321,23 @@ async def html_data(self, url_ids: list[int]) -> None: ) await self.run_command(command) - async def error_info( + async def task_errors( self, url_ids: list[int], task_id: Optional[int] = None ) -> None: if task_id is None: task_id = await self.task() - error_infos = [] + task_errors = [] for url_id in url_ids: - url_error_info = URLErrorInfoPydantic( + task_error = URLTaskErrorPydantic( url_id=url_id, error="test error", - task_id=task_id + task_id=task_id, + task_type=TaskType.HTML ) - error_infos.append(url_error_info) - await self.adb_client.add_url_error_infos(error_infos) + task_errors.append(task_error) + await self.adb_client.bulk_insert(task_errors) async def agency_auto_suggestions( From 887b859b32140cea62f532c2475376ce67314004 Mon Sep 17 00:00:00 2001 From: Max Chis Date: Sat, 4 Oct 2025 15:34:15 -0400 Subject: [PATCH 2/2] Add task cleanup and revise --- src/api/endpoints/task/by_id/query.py | 3 +- src/api/endpoints/url/get/query.py | 2 +- .../huggingface/queries/check/requester.py | 9 ++-- .../internet_archives/probe/queries/prereq.py | 7 ++- .../subtasks/templates/subtask.py | 17 +++++++ .../url/operators/auto_name/queries/cte.py | 6 ++- .../tasks/url/operators/auto_relevant/core.py | 8 +++- .../operators/auto_relevant/queries/cte.py | 39 ++++++++++++++++ .../queries/{get_tdos.py => get.py} | 17 +++---- .../operators/auto_relevant/queries/prereq.py | 18 ++++++++ .../location_id/subtasks/templates/subtask.py | 21 +++++++-- .../tasks/url/operators/misc_metadata/core.py | 15 +++++-- src/core/tasks/url/operators/probe/core.py | 12 ++++- .../probe/queries/urls/not_probed/exists.py | 9 ++-- .../url/operators/submit_approved/convert.py | 19 ++++++++ .../url/operators/submit_approved/core.py | 37 +++++----------- .../url/operators/submit_approved/filter.py | 11 +++++ .../operators/submit_approved/queries/cte.py | 7 +-- .../operators/submit_meta_urls/queries/cte.py | 5 ++- .../url/operators/suspend/queries/cte.py | 3 +- src/db/client/async_.py | 44 +------------------ src/db/helpers/query.py | 7 +++ src/db/models/impl/url/core/sqlalchemy.py | 3 +- .../integration/tasks/url/impl/asserts.py | 7 +-- .../tasks/url/impl/auto_relevant/test_task.py | 12 ++--- .../test_submit_approved_url_task.py | 5 --- 26 files changed, 211 insertions(+), 132 deletions(-) create mode 100644 src/core/tasks/url/operators/auto_relevant/queries/cte.py rename src/core/tasks/url/operators/auto_relevant/queries/{get_tdos.py => get.py} (76%) create mode 100644 src/core/tasks/url/operators/auto_relevant/queries/prereq.py create mode 100644 src/core/tasks/url/operators/submit_approved/convert.py create mode 100644 src/core/tasks/url/operators/submit_approved/filter.py diff --git a/src/api/endpoints/task/by_id/query.py b/src/api/endpoints/task/by_id/query.py index c7ccf353..92487327 100644 --- a/src/api/endpoints/task/by_id/query.py +++ b/src/api/endpoints/task/by_id/query.py @@ -28,10 +28,11 @@ async def run(self, session: AsyncSession) -> TaskInfo: selectinload(Task.urls) .selectinload(URL.batch), selectinload(Task.url_errors), + selectinload(Task.errors) ) ) task = result.scalars().first() - error = task.url_errors[0].error if len(task.url_errors) > 0 else None + error = task.errors[0].error if len(task.errors) > 0 else None # Get error info if any # Get URLs urls = task.urls diff --git a/src/api/endpoints/url/get/query.py b/src/api/endpoints/url/get/query.py index d476624e..d7198612 100644 --- a/src/api/endpoints/url/get/query.py +++ b/src/api/endpoints/url/get/query.py @@ -43,7 +43,7 @@ async def run(self, session: AsyncSession) -> GetURLsResponseInfo: error_result = GetURLsResponseErrorInfo( task=error.task_type, error=error.error, - updated_at=error.updated_at + updated_at=error.created_at ) error_results.append(error_result) final_results.append( diff --git a/src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py b/src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py index 25124c95..ef43bd3d 100644 --- a/src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py +++ b/src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py @@ -6,6 +6,8 @@ from sqlalchemy.sql.functions import count from src.collectors.enums import URLStatus +from src.db.enums import TaskType +from src.db.helpers.query import not_exists_url, no_url_task_error, exists_url from src.db.helpers.session import session_helper as sh from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated from src.db.models.impl.state.huggingface import HuggingFaceUploadState @@ -36,12 +38,9 @@ async def has_valid_urls(self, last_upload_at: datetime | None) -> bool: URLCompressedHTML, URL.id == URLCompressedHTML.url_id ) - .outerjoin( - FlagURLValidated, - URL.id == FlagURLValidated.url_id - ) .where( - FlagURLValidated.url_id.isnot(None) + exists_url(FlagURLValidated), + no_url_task_error(TaskType.PUSH_TO_HUGGINGFACE) ) ) if last_upload_at is not None: diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py index a74dc0a6..7a7d8687 100644 --- a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py @@ -1,6 +1,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from src.db.helpers.query import not_exists_url from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives from src.db.models.impl.url.core.sqlalchemy import URL from src.db.queries.base.builder import QueryBuilderBase @@ -12,11 +13,9 @@ class CheckURLInternetArchivesTaskPrerequisitesQueryBuilder(QueryBuilderBase): async def run(self, session: AsyncSession) -> bool: query = ( select(URL) - .outerjoin( - FlagURLCheckedForInternetArchives, - URL.id == FlagURLCheckedForInternetArchives.url_id + .where( + not_exists_url(FlagURLCheckedForInternetArchives) ) - .where(FlagURLCheckedForInternetArchives.url_id.is_(None)) .limit(1) ) result = await sh.one_or_none(session, query=query) diff --git a/src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py b/src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py index f24b9113..9335afcf 100644 --- a/src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py +++ b/src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py @@ -5,8 +5,10 @@ from src.core.tasks.url.operators.agency_identification.subtasks.models.run_info import AgencyIDSubtaskRunInfo from src.core.tasks.url.operators.agency_identification.subtasks.models.subtask import AutoAgencyIDSubtaskData from src.db.client.async_ import AsyncDatabaseClient +from src.db.enums import TaskType from src.db.models.impl.url.suggestion.agency.subtask.pydantic import URLAutoAgencyIDSubtaskPydantic from src.db.models.impl.url.suggestion.agency.suggestion.pydantic import AgencyIDSubtaskSuggestionPydantic +from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall @@ -77,3 +79,18 @@ async def _upload_subtask_data( error_infos.append(error_info) await self.add_task_errors(error_infos) + + async def add_task_errors( + self, + errors: list[URLTaskErrorSmall] + ) -> None: + inserts: list[URLTaskErrorPydantic] = [ + URLTaskErrorPydantic( + task_id=self.task_id, + url_id=error.url_id, + task_type=TaskType.AGENCY_IDENTIFICATION, + error=error.error + ) + for error in errors + ] + await self.adb_client.bulk_insert(inserts) \ No newline at end of file diff --git a/src/core/tasks/url/operators/auto_name/queries/cte.py b/src/core/tasks/url/operators/auto_name/queries/cte.py index 5dc585bc..1c7fc503 100644 --- a/src/core/tasks/url/operators/auto_name/queries/cte.py +++ b/src/core/tasks/url/operators/auto_name/queries/cte.py @@ -1,6 +1,7 @@ from sqlalchemy import select, exists, CTE, Column -from src.db.enums import URLHTMLContentType +from src.db.enums import URLHTMLContentType, TaskType +from src.db.helpers.query import no_url_task_error from src.db.models.impl.url.core.sqlalchemy import URL from src.db.models.impl.url.html.content.sqlalchemy import URLHTMLContent from src.db.models.impl.url.suggestion.name.enums import NameSuggestionSource @@ -29,7 +30,8 @@ def __init__(self): URLNameSuggestion.url_id == URL.id, URLNameSuggestion.source == NameSuggestionSource.HTML_METADATA_TITLE.value, ) - ) + ), + no_url_task_error(TaskType.AUTO_NAME) ).cte("auto_name_prerequisites") ) diff --git a/src/core/tasks/url/operators/auto_relevant/core.py b/src/core/tasks/url/operators/auto_relevant/core.py index b5055d38..86cc179e 100644 --- a/src/core/tasks/url/operators/auto_relevant/core.py +++ b/src/core/tasks/url/operators/auto_relevant/core.py @@ -1,5 +1,7 @@ from src.core.tasks.url.operators.auto_relevant.models.annotation import RelevanceAnnotationInfo from src.core.tasks.url.operators.auto_relevant.models.tdo import URLRelevantTDO +from src.core.tasks.url.operators.auto_relevant.queries.get import GetAutoRelevantTDOsQueryBuilder +from src.core.tasks.url.operators.auto_relevant.queries.prereq import AutoRelevantPrerequisitesQueryBuilder from src.core.tasks.url.operators.auto_relevant.sort import separate_success_and_error_subsets from src.core.tasks.url.operators.base import URLTaskOperatorBase from src.db.client.async_ import AsyncDatabaseClient @@ -26,10 +28,12 @@ def task_type(self) -> TaskType: return TaskType.RELEVANCY async def meets_task_prerequisites(self) -> bool: - return await self.adb_client.has_urls_with_html_data_and_without_auto_relevant_suggestion() + return await self.adb_client.run_query_builder( + builder=AutoRelevantPrerequisitesQueryBuilder() + ) async def get_tdos(self) -> list[URLRelevantTDO]: - return await self.adb_client.get_tdos_for_auto_relevancy() + return await self.adb_client.run_query_builder(builder=GetAutoRelevantTDOsQueryBuilder()) async def inner_task_logic(self) -> None: tdos = await self.get_tdos() diff --git a/src/core/tasks/url/operators/auto_relevant/queries/cte.py b/src/core/tasks/url/operators/auto_relevant/queries/cte.py new file mode 100644 index 00000000..8ad33867 --- /dev/null +++ b/src/core/tasks/url/operators/auto_relevant/queries/cte.py @@ -0,0 +1,39 @@ +from sqlalchemy import select, CTE +from sqlalchemy.orm import aliased + +from src.collectors.enums import URLStatus +from src.db.enums import TaskType +from src.db.helpers.query import not_exists_url, no_url_task_error +from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML +from src.db.models.impl.url.suggestion.relevant.auto.sqlalchemy import AutoRelevantSuggestion + + +class AutoRelevantPrerequisitesCTEContainer: + + def __init__(self): + self._cte = ( + select( + URL + ) + .join( + URLCompressedHTML, + URL.id == URLCompressedHTML.url_id + ) + .where( + URL.status == URLStatus.OK.value, + not_exists_url(AutoRelevantSuggestion), + no_url_task_error(TaskType.RELEVANCY) + ).cte("auto_relevant_prerequisites") + ) + + self._url_alias = aliased(URL, self._cte) + + @property + def cte(self) -> CTE: + return self._cte + + @property + def url_alias(self): + """Return an ORM alias of URL mapped to the CTE.""" + return self._url_alias diff --git a/src/core/tasks/url/operators/auto_relevant/queries/get_tdos.py b/src/core/tasks/url/operators/auto_relevant/queries/get.py similarity index 76% rename from src/core/tasks/url/operators/auto_relevant/queries/get_tdos.py rename to src/core/tasks/url/operators/auto_relevant/queries/get.py index 384cb5c4..6f6c59b0 100644 --- a/src/core/tasks/url/operators/auto_relevant/queries/get_tdos.py +++ b/src/core/tasks/url/operators/auto_relevant/queries/get.py @@ -6,6 +6,7 @@ from src.collectors.enums import URLStatus from src.core.tasks.url.operators.auto_relevant.models.tdo import URLRelevantTDO +from src.core.tasks.url.operators.auto_relevant.queries.cte import AutoRelevantPrerequisitesCTEContainer from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML from src.db.models.impl.url.core.sqlalchemy import URL from src.db.models.impl.url.suggestion.relevant.auto.sqlalchemy import AutoRelevantSuggestion @@ -16,24 +17,16 @@ class GetAutoRelevantTDOsQueryBuilder(QueryBuilderBase): - def __init__(self): - super().__init__() - async def run(self, session: AsyncSession) -> list[URLRelevantTDO]: + cte = AutoRelevantPrerequisitesCTEContainer() query = ( - select(URL) + select(cte.url_alias) .options( - selectinload(URL.compressed_html) - ) - .join(URLCompressedHTML) - .outerjoin(AutoRelevantSuggestion) - .where( - URL.status == URLStatus.OK.value, - AutoRelevantSuggestion.id.is_(None), + selectinload(cte.url_alias.compressed_html) ) ) - query = query.limit(100).order_by(URL.id) + query = query.limit(100).order_by(cte.url_alias.id) raw_result = await session.execute(query) urls: Sequence[Row[URL]] = raw_result.unique().scalars().all() tdos = [] diff --git a/src/core/tasks/url/operators/auto_relevant/queries/prereq.py b/src/core/tasks/url/operators/auto_relevant/queries/prereq.py new file mode 100644 index 00000000..2736693e --- /dev/null +++ b/src/core/tasks/url/operators/auto_relevant/queries/prereq.py @@ -0,0 +1,18 @@ + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.tasks.url.operators.auto_relevant.queries.cte import AutoRelevantPrerequisitesCTEContainer +from src.db.queries.base.builder import QueryBuilderBase +from src.db.helpers.session import session_helper as sh + +class AutoRelevantPrerequisitesQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> bool: + + cte = AutoRelevantPrerequisitesCTEContainer() + query = ( + select(cte.url_alias) + ) + + return await sh.results_exist(session, query=query) \ No newline at end of file diff --git a/src/core/tasks/url/operators/location_id/subtasks/templates/subtask.py b/src/core/tasks/url/operators/location_id/subtasks/templates/subtask.py index 2429f428..8ee856c2 100644 --- a/src/core/tasks/url/operators/location_id/subtasks/templates/subtask.py +++ b/src/core/tasks/url/operators/location_id/subtasks/templates/subtask.py @@ -6,8 +6,10 @@ from src.core.tasks.url.operators.location_id.subtasks.models.subtask import AutoLocationIDSubtaskData from src.core.tasks.url.operators.location_id.subtasks.models.suggestion import LocationSuggestion from src.db.client.async_ import AsyncDatabaseClient +from src.db.enums import TaskType from src.db.models.impl.url.suggestion.location.auto.subtask.pydantic import AutoLocationIDSubtaskPydantic from src.db.models.impl.url.suggestion.location.auto.suggestion.pydantic import LocationIDSubtaskSuggestionPydantic +from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall @@ -78,6 +80,19 @@ async def _upload_subtask_data( ) error_infos.append(error_info) - await self.add_task_errors( - models=error_infos, - ) + await self.add_task_errors(error_infos) + + async def add_task_errors( + self, + errors: list[URLTaskErrorSmall] + ) -> None: + inserts: list[URLTaskErrorPydantic] = [ + URLTaskErrorPydantic( + task_id=self.task_id, + url_id=error.url_id, + task_type=TaskType.LOCATION_ID, + error=error.error + ) + for error in errors + ] + await self.adb_client.bulk_insert(inserts) \ No newline at end of file diff --git a/src/core/tasks/url/operators/misc_metadata/core.py b/src/core/tasks/url/operators/misc_metadata/core.py index cd45d90e..1db953d4 100644 --- a/src/core/tasks/url/operators/misc_metadata/core.py +++ b/src/core/tasks/url/operators/misc_metadata/core.py @@ -1,5 +1,9 @@ from src.collectors.enums import CollectorType from src.core.tasks.url.operators.base import URLTaskOperatorBase +from src.core.tasks.url.operators.misc_metadata.queries.get_pending_urls_missing_miscellaneous_data import \ + GetPendingURLsMissingMiscellaneousDataQueryBuilder +from src.core.tasks.url.operators.misc_metadata.queries.has_pending_urls_missing_miscellaneous_data import \ + HasPendingURsMissingMiscellaneousDataQueryBuilder from src.core.tasks.url.operators.misc_metadata.tdo import URLMiscellaneousMetadataTDO from src.core.tasks.url.subtasks.miscellaneous_metadata.auto_googler import AutoGooglerMiscMetadataSubtask from src.core.tasks.url.subtasks.miscellaneous_metadata.base import \ @@ -24,7 +28,7 @@ def task_type(self) -> TaskType: return TaskType.MISC_METADATA async def meets_task_prerequisites(self) -> bool: - return await self.adb_client.has_pending_urls_missing_miscellaneous_metadata() + return await self.adb_client.run_query_builder(HasPendingURsMissingMiscellaneousDataQueryBuilder()) async def get_subtask( self, @@ -56,7 +60,7 @@ async def html_default_logic(self, tdo: URLMiscellaneousMetadataTDO): tdo.description = tdo.html_metadata_info.description async def inner_task_logic(self) -> None: - tdos: list[URLMiscellaneousMetadataTDO] = await self.adb_client.get_pending_urls_missing_miscellaneous_metadata() + tdos: list[URLMiscellaneousMetadataTDO] = await self.get_pending_urls_missing_miscellaneous_metadata() await self.link_urls_to_task(url_ids=[tdo.url_id for tdo in tdos]) task_errors: list[URLTaskErrorSmall] = [] @@ -74,4 +78,9 @@ async def inner_task_logic(self) -> None: task_errors.append(error_info) await self.adb_client.add_miscellaneous_metadata(tdos) - await self.add_task_errors(task_errors) \ No newline at end of file + await self.add_task_errors(task_errors) + + async def get_pending_urls_missing_miscellaneous_metadata( + self, + ) -> list[URLMiscellaneousMetadataTDO]: + return await self.adb_client.run_query_builder(GetPendingURLsMissingMiscellaneousDataQueryBuilder()) diff --git a/src/core/tasks/url/operators/probe/core.py b/src/core/tasks/url/operators/probe/core.py index ab518bcd..0e091852 100644 --- a/src/core/tasks/url/operators/probe/core.py +++ b/src/core/tasks/url/operators/probe/core.py @@ -5,6 +5,8 @@ from src.core.tasks.url.operators.probe.convert import convert_tdo_to_web_metadata_list from src.core.tasks.url.operators.probe.filter import filter_non_redirect_tdos, filter_redirect_tdos from src.core.tasks.url.operators.probe.queries.insert_redirects.query import InsertRedirectsQueryBuilder +from src.core.tasks.url.operators.probe.queries.urls.not_probed.exists import HasURLsWithoutProbeQueryBuilder +from src.core.tasks.url.operators.probe.queries.urls.not_probed.get.query import GetURLsWithoutProbeQueryBuilder from src.core.tasks.url.operators.probe.tdo import URLProbeTDO from src.external.url_request.core import URLRequestInterface from src.db.client.async_ import AsyncDatabaseClient @@ -30,10 +32,12 @@ def task_type(self) -> TaskType: @override async def meets_task_prerequisites(self) -> bool: - return await self.adb_client.has_urls_without_probe() + return await self.has_urls_without_probe() async def get_urls_without_probe(self) -> list[URLProbeTDO]: - url_mappings: list[URLMapping] = await self.adb_client.get_urls_without_probe() + url_mappings: list[URLMapping] = await self.adb_client.run_query_builder( + GetURLsWithoutProbeQueryBuilder() + ) return [URLProbeTDO(url_mapping=url_mapping) for url_mapping in url_mappings] @override @@ -73,4 +77,8 @@ async def update_database(self, tdos: list[URLProbeTDO]) -> None: await self.adb_client.run_query_builder(query_builder) + async def has_urls_without_probe(self) -> bool: + return await self.adb_client.run_query_builder( + HasURLsWithoutProbeQueryBuilder() + ) diff --git a/src/core/tasks/url/operators/probe/queries/urls/not_probed/exists.py b/src/core/tasks/url/operators/probe/queries/urls/not_probed/exists.py index 99c4cc67..c1b9b723 100644 --- a/src/core/tasks/url/operators/probe/queries/urls/not_probed/exists.py +++ b/src/core/tasks/url/operators/probe/queries/urls/not_probed/exists.py @@ -2,6 +2,8 @@ from sqlalchemy.ext.asyncio import AsyncSession from typing_extensions import override, final +from src.db.enums import TaskType +from src.db.helpers.query import not_exists_url, no_url_task_error from src.db.helpers.session import session_helper as sh from src.db.models.impl.url.core.sqlalchemy import URL from src.db.models.impl.url.web_metadata.sqlalchemy import URLWebMetadata @@ -16,12 +18,9 @@ async def run(self, session: AsyncSession) -> bool: select( URL.id ) - .outerjoin( - URLWebMetadata, - URL.id == URLWebMetadata.url_id - ) .where( - URLWebMetadata.id.is_(None) + not_exists_url(URLWebMetadata), + no_url_task_error(TaskType.PROBE_URL) ) ) return await sh.has_results(session, query=query) diff --git a/src/core/tasks/url/operators/submit_approved/convert.py b/src/core/tasks/url/operators/submit_approved/convert.py new file mode 100644 index 00000000..1c4a8298 --- /dev/null +++ b/src/core/tasks/url/operators/submit_approved/convert.py @@ -0,0 +1,19 @@ +from src.core.tasks.url.operators.submit_approved.tdo import SubmittedURLInfo +from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall + + +async def convert_to_task_errors( + submitted_url_infos: list[SubmittedURLInfo] +) -> list[URLTaskErrorSmall]: + task_errors: list[URLTaskErrorSmall] = [] + error_response_objects = [ + response_object for response_object in submitted_url_infos + if response_object.request_error is not None + ] + for error_response_object in error_response_objects: + error_info = URLTaskErrorSmall( + url_id=error_response_object.url_id, + error=error_response_object.request_error, + ) + task_errors.append(error_info) + return task_errors diff --git a/src/core/tasks/url/operators/submit_approved/core.py b/src/core/tasks/url/operators/submit_approved/core.py index a09b0462..e16a1269 100644 --- a/src/core/tasks/url/operators/submit_approved/core.py +++ b/src/core/tasks/url/operators/submit_approved/core.py @@ -1,4 +1,8 @@ from src.core.tasks.url.operators.base import URLTaskOperatorBase +from src.core.tasks.url.operators.submit_approved.convert import convert_to_task_errors +from src.core.tasks.url.operators.submit_approved.filter import filter_successes +from src.core.tasks.url.operators.submit_approved.queries.get import GetValidatedURLsQueryBuilder +from src.core.tasks.url.operators.submit_approved.queries.has_validated import HasValidatedURLsQueryBuilder from src.core.tasks.url.operators.submit_approved.tdo import SubmitApprovedURLTDO, SubmittedURLInfo from src.db.client.async_ import AsyncDatabaseClient from src.db.enums import TaskType @@ -21,11 +25,11 @@ def task_type(self): return TaskType.SUBMIT_APPROVED async def meets_task_prerequisites(self): - return await self.adb_client.has_validated_urls() + return await self.adb_client.run_query_builder(HasValidatedURLsQueryBuilder()) async def inner_task_logic(self): # Retrieve all URLs that are validated and not submitted - tdos: list[SubmitApprovedURLTDO] = await self.adb_client.get_validated_urls() + tdos: list[SubmitApprovedURLTDO] = await self.get_validated_urls() # Link URLs to this task await self.link_urls_to_task(url_ids=[tdo.url_id for tdo in tdos]) @@ -33,8 +37,8 @@ async def inner_task_logic(self): # Submit each URL, recording errors if they exist submitted_url_infos: list[SubmittedURLInfo] = await self.pdap_client.submit_data_source_urls(tdos) - task_errors: list[URLTaskErrorSmall] = await self.get_error_infos(submitted_url_infos) - success_infos = await self.get_success_infos(submitted_url_infos) + task_errors: list[URLTaskErrorSmall] = await convert_to_task_errors(submitted_url_infos) + success_infos = await filter_successes(submitted_url_infos) # Update the database for successful submissions await self.adb_client.mark_urls_as_submitted(infos=success_infos) @@ -42,26 +46,5 @@ async def inner_task_logic(self): # Update the database for failed submissions await self.add_task_errors(task_errors) - async def get_success_infos(self, submitted_url_infos): - success_infos = [ - response_object for response_object in submitted_url_infos - if response_object.data_source_id is not None - ] - return success_infos - - async def get_error_infos( - self, - submitted_url_infos: list[SubmittedURLInfo] - ) -> list[URLTaskErrorSmall]: - task_errors: list[URLTaskErrorSmall] = [] - error_response_objects = [ - response_object for response_object in submitted_url_infos - if response_object.request_error is not None - ] - for error_response_object in error_response_objects: - error_info = URLTaskErrorSmall( - url_id=error_response_object.url_id, - error=error_response_object.request_error, - ) - task_errors.append(error_info) - return task_errors + async def get_validated_urls(self) -> list[SubmitApprovedURLTDO]: + return await self.adb_client.run_query_builder(GetValidatedURLsQueryBuilder()) diff --git a/src/core/tasks/url/operators/submit_approved/filter.py b/src/core/tasks/url/operators/submit_approved/filter.py new file mode 100644 index 00000000..4ba2fad8 --- /dev/null +++ b/src/core/tasks/url/operators/submit_approved/filter.py @@ -0,0 +1,11 @@ +from src.core.tasks.url.operators.submit_approved.tdo import SubmittedURLInfo + + +async def filter_successes( + submitted_url_infos: list[SubmittedURLInfo] +) -> list[SubmittedURLInfo]: + success_infos = [ + response_object for response_object in submitted_url_infos + if response_object.data_source_id is not None + ] + return success_infos diff --git a/src/core/tasks/url/operators/submit_approved/queries/cte.py b/src/core/tasks/url/operators/submit_approved/queries/cte.py index 5d883429..cf7ccb71 100644 --- a/src/core/tasks/url/operators/submit_approved/queries/cte.py +++ b/src/core/tasks/url/operators/submit_approved/queries/cte.py @@ -2,6 +2,8 @@ from sqlalchemy.orm import aliased from src.collectors.enums import URLStatus +from src.db.enums import TaskType +from src.db.helpers.query import not_exists_url, no_url_task_error from src.db.models.impl.flag.url_validated.enums import URLType from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated from src.db.models.impl.url.core.sqlalchemy import URL @@ -17,9 +19,8 @@ URL.status == URLStatus.OK, URL.name.isnot(None), FlagURLValidated.type == URLType.DATA_SOURCE, - ~exists().where( - URLDataSource.url_id == URL.id - ) + not_exists_url(URLDataSource), + no_url_task_error(TaskType.SUBMIT_APPROVED) ) .subquery() ) diff --git a/src/core/tasks/url/operators/submit_meta_urls/queries/cte.py b/src/core/tasks/url/operators/submit_meta_urls/queries/cte.py index 89d18c82..d350258c 100644 --- a/src/core/tasks/url/operators/submit_meta_urls/queries/cte.py +++ b/src/core/tasks/url/operators/submit_meta_urls/queries/cte.py @@ -1,5 +1,7 @@ from sqlalchemy import select, exists, Column, CTE +from src.db.enums import TaskType +from src.db.helpers.query import no_url_task_error from src.db.models.impl.agency.sqlalchemy import Agency from src.db.models.impl.link.url_agency.sqlalchemy import LinkURLAgency from src.db.models.impl.url.core.sqlalchemy import URL @@ -36,7 +38,8 @@ def __init__(self): URLDSMetaURL.url_id == URL.id, URLDSMetaURL.agency_id == LinkURLAgency.agency_id ) - ) + ), + no_url_task_error(TaskType.SUBMIT_META_URLS) ) .cte("submit_meta_urls_prerequisites") ) diff --git a/src/core/tasks/url/operators/suspend/queries/cte.py b/src/core/tasks/url/operators/suspend/queries/cte.py index 4dfc6822..7b15aee4 100644 --- a/src/core/tasks/url/operators/suspend/queries/cte.py +++ b/src/core/tasks/url/operators/suspend/queries/cte.py @@ -1,5 +1,6 @@ from sqlalchemy import select, func, Select, exists, or_ +from src.db.helpers.query import no_url_task_error from src.db.models.impl.flag.url_suspended.sqlalchemy import FlagURLSuspended from src.db.models.impl.link.user_suggestion_not_found.agency.sqlalchemy import LinkUserSuggestionAgencyNotFound from src.db.models.impl.link.user_suggestion_not_found.location.sqlalchemy import LinkUserSuggestionLocationNotFound @@ -29,7 +30,7 @@ def __init__(self): .where( FlagURLSuspended.url_id == UnvalidatedURL.url_id ) - ) + ), ) .group_by( UnvalidatedURL.url_id diff --git a/src/db/client/async_.py b/src/db/client/async_.py index 6158ff5b..750303c6 100644 --- a/src/db/client/async_.py +++ b/src/db/client/async_.py @@ -47,22 +47,14 @@ from src.core.env_var_manager import EnvVarManager from src.core.tasks.scheduled.impl.huggingface.queries.state import SetHuggingFaceUploadStateQueryBuilder from src.core.tasks.url.operators.agency_identification.dtos.suggestion import URLAgencySuggestionInfo -from src.core.tasks.url.operators.auto_relevant.models.tdo import URLRelevantTDO -from src.core.tasks.url.operators.auto_relevant.queries.get_tdos import GetAutoRelevantTDOsQueryBuilder from src.core.tasks.url.operators.html.queries.get import \ GetPendingURLsWithoutHTMLDataQueryBuilder -from src.core.tasks.url.operators.misc_metadata.queries.get_pending_urls_missing_miscellaneous_data import \ - GetPendingURLsMissingMiscellaneousDataQueryBuilder -from src.core.tasks.url.operators.misc_metadata.queries.has_pending_urls_missing_miscellaneous_data import \ - HasPendingURsMissingMiscellaneousDataQueryBuilder from src.core.tasks.url.operators.misc_metadata.tdo import URLMiscellaneousMetadataTDO from src.core.tasks.url.operators.probe.queries.urls.not_probed.exists import HasURLsWithoutProbeQueryBuilder from src.core.tasks.url.operators.probe.queries.urls.not_probed.get.query import GetURLsWithoutProbeQueryBuilder from src.core.tasks.url.operators.probe_404.tdo import URL404ProbeTDO -from src.core.tasks.url.operators.submit_approved.queries.get import GetValidatedURLsQueryBuilder -from src.core.tasks.url.operators.submit_approved.queries.has_validated import HasValidatedURLsQueryBuilder from src.core.tasks.url.operators.submit_approved.queries.mark_submitted import MarkURLsAsSubmittedQueryBuilder -from src.core.tasks.url.operators.submit_approved.tdo import SubmitApprovedURLTDO, SubmittedURLInfo +from src.core.tasks.url.operators.submit_approved.tdo import SubmittedURLInfo from src.db.client.helpers import add_standard_limit_and_offset from src.db.client.types import UserSuggestionModel from src.db.config_manager import ConfigManager @@ -280,9 +272,6 @@ async def get_user_suggestion( result = await session.execute(statement) return result.unique().scalar_one_or_none() - async def get_tdos_for_auto_relevancy(self) -> list[URLRelevantTDO]: - return await self.run_query_builder(builder=GetAutoRelevantTDOsQueryBuilder()) - @session_manager async def add_user_relevant_suggestion( self, @@ -375,14 +364,6 @@ async def has_non_errored_urls_without_html_data(self, session: AsyncSession) -> scalar_result = await session.scalars(statement) return bool(scalar_result.first()) - async def has_pending_urls_missing_miscellaneous_metadata(self) -> bool: - return await self.run_query_builder(HasPendingURsMissingMiscellaneousDataQueryBuilder()) - - async def get_pending_urls_missing_miscellaneous_metadata( - self, - ) -> list[URLMiscellaneousMetadataTDO]: - return await self.run_query_builder(GetPendingURLsMissingMiscellaneousDataQueryBuilder()) - @session_manager async def add_miscellaneous_metadata(self, session: AsyncSession, tdos: list[URLMiscellaneousMetadataTDO]): updates = [] @@ -460,13 +441,6 @@ async def has_urls_with_html_data_and_without_models( scalar_result = await session.scalars(statement) return bool(scalar_result.first()) - @session_manager - async def has_urls_with_html_data_and_without_auto_relevant_suggestion(self, session: AsyncSession) -> bool: - return await self.has_urls_with_html_data_and_without_models( - session=session, - model=AutoRelevantSuggestion - ) - @session_manager async def has_urls_with_html_data_and_without_auto_record_type_suggestion(self, session: AsyncSession) -> bool: return await self.has_urls_with_html_data_and_without_models( @@ -811,12 +785,6 @@ async def update_batch_post_collection( batch.status = batch_status.value batch.compute_time = compute_time - async def has_validated_urls(self) -> bool: - return await self.run_query_builder(HasValidatedURLsQueryBuilder()) - - async def get_validated_urls(self) -> list[SubmitApprovedURLTDO]: - return await self.run_query_builder(GetValidatedURLsQueryBuilder()) - async def mark_urls_as_submitted(self, infos: list[SubmittedURLInfo]): await self.run_query_builder(MarkURLsAsSubmittedQueryBuilder(infos)) @@ -1105,16 +1073,6 @@ async def set_hugging_face_upload_state(self, dt: datetime) -> None: async def get_current_database_time(self) -> datetime: return await self.scalar(select(func.now())) - async def has_urls_without_probe(self) -> bool: - return await self.run_query_builder( - HasURLsWithoutProbeQueryBuilder() - ) - - async def get_urls_without_probe(self) -> list[URLMapping]: - return await self.run_query_builder( - GetURLsWithoutProbeQueryBuilder() - ) - async def get_location_id( self, us_state_id: int, diff --git a/src/db/helpers/query.py b/src/db/helpers/query.py index bd52bae7..4375cc33 100644 --- a/src/db/helpers/query.py +++ b/src/db/helpers/query.py @@ -17,6 +17,13 @@ def not_exists_url( model.url_id == URL.id ) +def exists_url( + model: type[URLDependentMixin] +) -> ColumnElement[bool]: + return exists().where( + model.url_id == URL.id + ) + def no_url_task_error(task_type: TaskType) -> ColumnElement[bool]: return ~exists().where( URLTaskError.url_id == URL.id, diff --git a/src/db/models/impl/url/core/sqlalchemy.py b/src/db/models/impl/url/core/sqlalchemy.py index 5a9e6217..db416769 100644 --- a/src/db/models/impl/url/core/sqlalchemy.py +++ b/src/db/models/impl/url/core/sqlalchemy.py @@ -7,6 +7,7 @@ from src.db.models.helpers import enum_column from src.db.models.impl.url.checked_for_duplicate import URLCheckedForDuplicate from src.db.models.impl.url.core.enums import URLSource +from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML from src.db.models.impl.url.probed_for_404 import URLProbedFor404 from src.db.models.impl.url.record_type.sqlalchemy import URLRecordType from src.db.models.impl.url.suggestion.location.auto.subtask.sqlalchemy import AutoLocationIDSubtask @@ -102,7 +103,7 @@ class URL(UpdatedAtMixin, CreatedAtMixin, WithIDBase): back_populates="url" ) compressed_html = relationship( - "URLCompressedHTML", + URLCompressedHTML, uselist=False, back_populates="url" ) diff --git a/tests/automated/integration/tasks/url/impl/asserts.py b/tests/automated/integration/tasks/url/impl/asserts.py index 4187d7ef..10ba1fa1 100644 --- a/tests/automated/integration/tasks/url/impl/asserts.py +++ b/tests/automated/integration/tasks/url/impl/asserts.py @@ -1,15 +1,16 @@ from src.core.tasks.base.run_info import TaskOperatorRunInfo +from src.core.tasks.mixins.prereq import HasPrerequisitesMixin from src.core.tasks.url.enums import TaskOperatorOutcome -async def assert_prereqs_not_met(operator): +async def assert_prereqs_not_met(operator: HasPrerequisitesMixin) -> None: meets_prereqs = await operator.meets_task_prerequisites() assert not meets_prereqs -async def assert_prereqs_met(operator): +async def assert_prereqs_met(operator: HasPrerequisitesMixin) -> None: meets_prereqs = await operator.meets_task_prerequisites() assert meets_prereqs -def assert_task_ran_without_error(run_info: TaskOperatorRunInfo): +def assert_task_ran_without_error(run_info: TaskOperatorRunInfo) -> None: assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message diff --git a/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py b/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py index bf53bbf5..5de999ec 100644 --- a/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py +++ b/tests/automated/integration/tasks/url/impl/auto_relevant/test_task.py @@ -3,18 +3,20 @@ import pytest from src.collectors.enums import URLStatus +from src.core.tasks.url.operators.auto_relevant.core import URLAutoRelevantTaskOperator from src.db.models.impl.url.core.sqlalchemy import URL from src.db.models.impl.url.suggestion.relevant.auto.sqlalchemy import AutoRelevantSuggestion from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError from tests.automated.integration.tasks.url.impl.asserts import assert_prereqs_not_met, assert_prereqs_met from tests.automated.integration.tasks.url.impl.auto_relevant.setup import setup_operator, setup_urls from tests.helpers.asserts import assert_task_run_success +from tests.helpers.data_creator.core import DBDataCreator @pytest.mark.asyncio -async def test_url_auto_relevant_task(db_data_creator): +async def test_url_auto_relevant_task(db_data_creator: DBDataCreator): - operator = await setup_operator(adb_client=db_data_creator.adb_client) + operator: URLAutoRelevantTaskOperator = await setup_operator(adb_client=db_data_creator.adb_client) await assert_prereqs_not_met(operator) url_ids = await setup_urls(db_data_creator) @@ -27,12 +29,6 @@ async def test_url_auto_relevant_task(db_data_creator): assert not await operator.meets_task_prerequisites() adb_client = db_data_creator.adb_client - # Get URLs, confirm one is marked as error - urls: list[URL] = await adb_client.get_all(URL) - assert len(urls) == 3 - counter = Counter([url.status for url in urls]) - assert counter[URLStatus.ERROR] == 1 - assert counter[URLStatus.OK] == 2 # Confirm two annotations were created suggestions: list[AutoRelevantSuggestion] = await adb_client.get_all(AutoRelevantSuggestion) diff --git a/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py b/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py index 43d7fc8d..3d1aec23 100644 --- a/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py +++ b/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py @@ -58,11 +58,6 @@ async def test_submit_approved_url_task( url_2: URL = urls[1] url_3: URL = urls[2] - # Check URLs - assert url_1.status == URLStatus.OK - assert url_2.status == URLStatus.OK - assert url_3.status == URLStatus.ERROR - # Get URL Data Source Links url_data_sources = await db_data_creator.adb_client.get_all(URLDataSource) assert len(url_data_sources) == 2