diff --git a/ENV.md b/ENV.md index d969358a..c1aa1f4c 100644 --- a/ENV.md +++ b/ENV.md @@ -57,15 +57,17 @@ Note that some tasks/subtasks are themselves enabled by other tasks. ### Scheduled Task Flags -| Flag | Description | -|-------------------------------------|--------------------------------------------------------------------| -| `SCHEDULED_TASKS_FLAG` | All scheduled tasks. Disabling disables all other scheduled tasks. | -| `PUSH_TO_HUGGING_FACE_TASK_FLAG` | Pushes data to HuggingFace. | -| `POPULATE_BACKLOG_SNAPSHOT_TASK_FLAG` | Populates the backlog snapshot. | -| `DELETE_OLD_LOGS_TASK_FLAG` | Deletes old logs. | -| `RUN_URL_TASKS_TASK_FLAG` | Runs URL tasks. | -| `IA_PROBE_TASK_FLAG` | Extracts and links Internet Archives metadata to URLs. | -| `IA_SAVE_TASK_FLAG` | Saves URLs to Internet Archives. | +| Flag | Description | +|-------------------------------------|-------------------------------------------------------------------------------| +| `SCHEDULED_TASKS_FLAG` | All scheduled tasks. Disabling disables all other scheduled tasks. | +| `PUSH_TO_HUGGING_FACE_TASK_FLAG` | Pushes data to HuggingFace. | +| `POPULATE_BACKLOG_SNAPSHOT_TASK_FLAG` | Populates the backlog snapshot. | +| `DELETE_OLD_LOGS_TASK_FLAG` | Deletes old logs. | +| `RUN_URL_TASKS_TASK_FLAG` | Runs URL tasks. | +| `IA_PROBE_TASK_FLAG` | Extracts and links Internet Archives metadata to URLs. | +| `IA_SAVE_TASK_FLAG` | Saves URLs to Internet Archives. | +| `MARK_TASK_NEVER_COMPLETED_TASK_FLAG` | Marks tasks that were started but never completed (usually due to a restart). | +| `DELETE_STALE_SCREENSHOTS_TASK_FLAG` | Deletes stale screenshots for URLs already validated. | ### URL Task Flags @@ -87,6 +89,7 @@ URL Task Flags are collectively controlled by the `RUN_URL_TASKS_TASK_FLAG` flag | `URL_AUTO_VALIDATE_TASK_FLAG` | Automatically validates URLs. | | `URL_AUTO_NAME_TASK_FLAG` | Automatically names URLs. | | `URL_SUSPEND_TASK_FLAG` | Suspends URLs meeting suspension criteria. | +| `URL_SUBMIT_META_URLS_TASK_FLAG` | Submits meta URLs to the Data Sources App. | ### Agency ID Subtasks diff --git a/alembic/versions/2025_09_30_1046-84a3de626ad8_add_link_user_submitted_url_table.py b/alembic/versions/2025_09_30_1046-84a3de626ad8_add_link_user_submitted_url_table.py index 73735610..fe7d9309 100644 --- a/alembic/versions/2025_09_30_1046-84a3de626ad8_add_link_user_submitted_url_table.py +++ b/alembic/versions/2025_09_30_1046-84a3de626ad8_add_link_user_submitted_url_table.py @@ -7,8 +7,8 @@ """ from typing import Sequence, Union -from alembic import op import sqlalchemy as sa +from alembic import op from src.util.alembic_helpers import url_id_column, user_id_column, created_at_column diff --git a/alembic/versions/2025_09_30_1613-241fd3925f5d_add_logic_for_meta_url_submissions.py b/alembic/versions/2025_09_30_1613-241fd3925f5d_add_logic_for_meta_url_submissions.py new file mode 100644 index 00000000..fb30fba2 --- /dev/null +++ b/alembic/versions/2025_09_30_1613-241fd3925f5d_add_logic_for_meta_url_submissions.py @@ -0,0 +1,63 @@ +"""Add logic for meta URL submissions + +Revision ID: 241fd3925f5d +Revises: 84a3de626ad8 +Create Date: 2025-09-30 16:13:03.980113 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +from src.util.alembic_helpers import url_id_column, created_at_column, agency_id_column + +# revision identifiers, used by Alembic. +revision: str = '241fd3925f5d' +down_revision: Union[str, None] = '84a3de626ad8' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute("""ALTER TYPE task_type ADD VALUE 'Submit Meta URLs'""") + op.create_table( + "url_ds_meta_url", + url_id_column(), + agency_id_column(), + sa.Column("ds_meta_url_id", sa.Integer(), nullable=False), + created_at_column(), + sa.PrimaryKeyConstraint( + "url_id", + "agency_id" + ), + sa.UniqueConstraint( + "ds_meta_url_id" + ) + ) + op.execute("""ALTER TYPE task_type ADD VALUE 'Delete Stale Screenshots'""") + op.execute("""ALTER TYPE task_type ADD VALUE 'Mark Task Never Completed'""") + op.execute(""" + CREATE TYPE task_status_enum as ENUM( + 'complete', + 'in-process', + 'error', + 'aborted', + 'never-completed' + ) + """) + op.execute(""" + ALTER TABLE tasks + ALTER COLUMN task_status DROP DEFAULT, + ALTER COLUMN task_status TYPE task_status_enum + USING ( + CASE task_status::text -- old enum -> text + WHEN 'ready to label' THEN 'complete'::task_status_enum + ELSE task_status::text::task_status_enum + END + ); + """) + + +def downgrade() -> None: + pass diff --git a/src/api/endpoints/task/by_id/dto.py b/src/api/endpoints/task/by_id/dto.py index d10c3930..e9a73e44 100644 --- a/src/api/endpoints/task/by_id/dto.py +++ b/src/api/endpoints/task/by_id/dto.py @@ -3,6 +3,7 @@ from pydantic import BaseModel +from src.db.models.impl.task.enums import TaskStatus from src.db.models.impl.url.core.pydantic.info import URLInfo from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.enums import TaskType @@ -11,7 +12,7 @@ class TaskInfo(BaseModel): task_type: TaskType - task_status: BatchStatus + task_status: TaskStatus updated_at: datetime.datetime error_info: str | None = None urls: list[URLInfo] diff --git a/src/api/endpoints/task/by_id/query.py b/src/api/endpoints/task/by_id/query.py index 40321333..02d18a3d 100644 --- a/src/api/endpoints/task/by_id/query.py +++ b/src/api/endpoints/task/by_id/query.py @@ -5,6 +5,7 @@ from src.api.endpoints.task.by_id.dto import TaskInfo from src.collectors.enums import URLStatus from src.core.enums import BatchStatus +from src.db.models.impl.task.enums import TaskStatus from src.db.models.impl.url.core.pydantic.info import URLInfo from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic from src.db.enums import TaskType @@ -59,7 +60,7 @@ async def run(self, session: AsyncSession) -> TaskInfo: errored_urls.append(url_error_info) return TaskInfo( task_type=TaskType(task.task_type), - task_status=BatchStatus(task.task_status), + task_status=TaskStatus(task.task_status), error_info=error, updated_at=task.updated_at, urls=url_infos, diff --git a/src/core/tasks/base/operator.py b/src/core/tasks/base/operator.py index 25f3fc5d..93230db5 100644 --- a/src/core/tasks/base/operator.py +++ b/src/core/tasks/base/operator.py @@ -6,6 +6,7 @@ from src.core.tasks.url.enums import TaskOperatorOutcome from src.db.client.async_ import AsyncDatabaseClient from src.db.enums import TaskType +from src.db.models.impl.task.enums import TaskStatus class TaskOperatorBase(ABC): @@ -60,7 +61,7 @@ async def inner_task_logic(self) -> None: raise NotImplementedError async def handle_task_error(self, e): - await self.adb_client.update_task_status(task_id=self.task_id, status=BatchStatus.ERROR) + await self.adb_client.update_task_status(task_id=self.task_id, status=TaskStatus.ERROR) await self.adb_client.add_task_error( task_id=self.task_id, error=str(e) diff --git a/src/core/tasks/handler.py b/src/core/tasks/handler.py index 7f79e3bb..92b96103 100644 --- a/src/core/tasks/handler.py +++ b/src/core/tasks/handler.py @@ -7,6 +7,7 @@ from src.core.tasks.url.enums import TaskOperatorOutcome from src.db.client.async_ import AsyncDatabaseClient from src.db.enums import TaskType +from src.db.models.impl.task.enums import TaskStatus class TaskHandler: @@ -42,13 +43,14 @@ async def handle_outcome(self, run_info: TaskOperatorRunInfo): # case TaskOperatorOutcome.SUCCESS: await self.adb_client.update_task_status( task_id=run_info.task_id, - status=BatchStatus.READY_TO_LABEL + status=TaskStatus.COMPLETE ) async def handle_task_error(self, run_info: TaskOperatorRunInfo): # await self.adb_client.update_task_status( task_id=run_info.task_id, - status=BatchStatus.ERROR) + status=TaskStatus.ERROR + ) await self.adb_client.add_task_error( task_id=run_info.task_id, error=run_info.message diff --git a/src/core/tasks/scheduled/impl/delete_logs/operator.py b/src/core/tasks/scheduled/impl/delete_logs/operator.py index fa7a6ae4..41be3af9 100644 --- a/src/core/tasks/scheduled/impl/delete_logs/operator.py +++ b/src/core/tasks/scheduled/impl/delete_logs/operator.py @@ -1,16 +1,21 @@ +import datetime + +from sqlalchemy import delete + from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase from src.db.client.async_ import AsyncDatabaseClient from src.db.enums import TaskType +from src.db.models.impl.log.sqlalchemy import Log class DeleteOldLogsTaskOperator(ScheduledTaskOperatorBase): - def __init__(self, adb_client: AsyncDatabaseClient): - super().__init__(adb_client) - @property def task_type(self) -> TaskType: return TaskType.DELETE_OLD_LOGS async def inner_task_logic(self) -> None: - await self.adb_client.delete_old_logs() \ No newline at end of file + statement = delete(Log).where( + Log.created_at < datetime.datetime.now() - datetime.timedelta(days=7) + ) + await self.adb_client.execute(statement) \ No newline at end of file diff --git a/src/external/pdap/dtos/sync/__init__.py b/src/core/tasks/scheduled/impl/delete_stale_screenshots/__init__.py similarity index 100% rename from src/external/pdap/dtos/sync/__init__.py rename to src/core/tasks/scheduled/impl/delete_stale_screenshots/__init__.py diff --git a/src/core/tasks/scheduled/impl/delete_stale_screenshots/operator.py b/src/core/tasks/scheduled/impl/delete_stale_screenshots/operator.py new file mode 100644 index 00000000..0c386cfe --- /dev/null +++ b/src/core/tasks/scheduled/impl/delete_stale_screenshots/operator.py @@ -0,0 +1,15 @@ +from src.core.tasks.scheduled.impl.delete_stale_screenshots.query import DeleteStaleScreenshotsQueryBuilder +from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase +from src.db.enums import TaskType + + +class DeleteStaleScreenshotsTaskOperator(ScheduledTaskOperatorBase): + + @property + def task_type(self) -> TaskType: + return TaskType.DELETE_STALE_SCREENSHOTS + + async def inner_task_logic(self) -> None: + await self.adb_client.run_query_builder( + DeleteStaleScreenshotsQueryBuilder() + ) \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/delete_stale_screenshots/query.py b/src/core/tasks/scheduled/impl/delete_stale_screenshots/query.py new file mode 100644 index 00000000..c82220b8 --- /dev/null +++ b/src/core/tasks/scheduled/impl/delete_stale_screenshots/query.py @@ -0,0 +1,29 @@ +from typing import Any + +from sqlalchemy import delete, exists, select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated +from src.db.models.impl.url.screenshot.sqlalchemy import URLScreenshot +from src.db.queries.base.builder import QueryBuilderBase + + +class DeleteStaleScreenshotsQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> Any: + + statement = ( + delete( + URLScreenshot + ) + .where( + exists( + select( + FlagURLValidated, + FlagURLValidated.url_id == URLScreenshot.url_id, + ) + ) + ) + ) + + await session.execute(statement) \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/mark_never_completed/__init__.py b/src/core/tasks/scheduled/impl/mark_never_completed/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/mark_never_completed/operator.py b/src/core/tasks/scheduled/impl/mark_never_completed/operator.py new file mode 100644 index 00000000..7ec08298 --- /dev/null +++ b/src/core/tasks/scheduled/impl/mark_never_completed/operator.py @@ -0,0 +1,15 @@ +from src.core.tasks.scheduled.impl.mark_never_completed.query import MarkTaskNeverCompletedQueryBuilder +from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase +from src.db.enums import TaskType + + +class MarkTaskNeverCompletedOperator(ScheduledTaskOperatorBase): + + @property + def task_type(self) -> TaskType: + return TaskType.MARK_TASK_NEVER_COMPLETED + + async def inner_task_logic(self) -> None: + await self.adb_client.run_query_builder( + MarkTaskNeverCompletedQueryBuilder() + ) \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/mark_never_completed/query.py b/src/core/tasks/scheduled/impl/mark_never_completed/query.py new file mode 100644 index 00000000..d2ea2576 --- /dev/null +++ b/src/core/tasks/scheduled/impl/mark_never_completed/query.py @@ -0,0 +1,27 @@ +from datetime import timedelta, datetime +from typing import Any + +from sqlalchemy import update +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.enums import BatchStatus +from src.db.enums import TaskType +from src.db.models.impl.task.core import Task +from src.db.queries.base.builder import QueryBuilderBase + + +class MarkTaskNeverCompletedQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> Any: + statement = ( + update( + Task + ).values( + task_status=BatchStatus.ABORTED.value + ). + where( + Task.task_status == BatchStatus.IN_PROCESS, + Task.updated_at < datetime.now() - timedelta(hours=1) + ) + ) + await session.execute(statement) \ No newline at end of file diff --git a/src/core/tasks/scheduled/loader.py b/src/core/tasks/scheduled/loader.py index 88cdde20..cfadd82e 100644 --- a/src/core/tasks/scheduled/loader.py +++ b/src/core/tasks/scheduled/loader.py @@ -4,9 +4,12 @@ from src.core.tasks.scheduled.enums import IntervalEnum from src.core.tasks.scheduled.impl.backlog.operator import PopulateBacklogSnapshotTaskOperator from src.core.tasks.scheduled.impl.delete_logs.operator import DeleteOldLogsTaskOperator +from src.core.tasks.scheduled.impl.delete_stale_screenshots.operator import DeleteStaleScreenshotsTaskOperator from src.core.tasks.scheduled.impl.huggingface.operator import PushToHuggingFaceTaskOperator from src.core.tasks.scheduled.impl.internet_archives.probe.operator import InternetArchivesProbeTaskOperator from src.core.tasks.scheduled.impl.internet_archives.save.operator import InternetArchivesSaveTaskOperator +from src.core.tasks.scheduled.impl.mark_never_completed.operator import MarkTaskNeverCompletedOperator +from src.core.tasks.scheduled.impl.mark_never_completed.query import MarkTaskNeverCompletedQueryBuilder from src.core.tasks.scheduled.impl.run_url_tasks.operator import RunURLTasksTaskOperator from src.core.tasks.scheduled.models.entry import ScheduledTaskEntry from src.db.client.async_ import AsyncDatabaseClient @@ -37,6 +40,9 @@ def __init__( self.env = Env() self.env.read_env() + def setup_flag(self, name: str) -> bool: + return self.env.bool(name, default=True) + async def load_entries(self) -> list[ScheduledTaskEntry]: scheduled_task_flag = self.env.bool("SCHEDULED_TASKS_FLAG", default=True) @@ -52,7 +58,7 @@ async def load_entries(self) -> list[ScheduledTaskEntry]: ia_client=self.ia_client ), interval_minutes=IntervalEnum.TEN_MINUTES.value, - enabled=self.env.bool("IA_PROBE_TASK_FLAG", default=True), + enabled=self.setup_flag("IA_PROBE_TASK_FLAG"), ), ScheduledTaskEntry( operator=InternetArchivesSaveTaskOperator( @@ -60,12 +66,12 @@ async def load_entries(self) -> list[ScheduledTaskEntry]: ia_client=self.ia_client ), interval_minutes=IntervalEnum.TEN_MINUTES.value, - enabled=self.env.bool("IA_SAVE_TASK_FLAG", default=True), + enabled=self.setup_flag("IA_SAVE_TASK_FLAG"), ), ScheduledTaskEntry( operator=DeleteOldLogsTaskOperator(adb_client=self.adb_client), interval_minutes=IntervalEnum.DAILY.value, - enabled=self.env.bool("DELETE_OLD_LOGS_TASK_FLAG", default=True) + enabled=self.setup_flag("DELETE_OLD_LOGS_TASK_FLAG") ), ScheduledTaskEntry( operator=RunURLTasksTaskOperator(async_core=self.async_core), @@ -73,13 +79,12 @@ async def load_entries(self) -> list[ScheduledTaskEntry]: "URL_TASKS_FREQUENCY_MINUTES", default=IntervalEnum.HOURLY.value ), - enabled=self.env.bool("RUN_URL_TASKS_TASK_FLAG", default=True) - + enabled=self.setup_flag("RUN_URL_TASKS_TASK_FLAG") ), ScheduledTaskEntry( operator=PopulateBacklogSnapshotTaskOperator(adb_client=self.async_core.adb_client), interval_minutes=IntervalEnum.DAILY.value, - enabled=self.env.bool("POPULATE_BACKLOG_SNAPSHOT_TASK_FLAG", default=True) + enabled=self.setup_flag("POPULATE_BACKLOG_SNAPSHOT_TASK_FLAG") ), ScheduledTaskEntry( operator=PushToHuggingFaceTaskOperator( @@ -87,10 +92,16 @@ async def load_entries(self) -> list[ScheduledTaskEntry]: hf_client=self.hf_client ), interval_minutes=IntervalEnum.DAILY.value, - enabled=self.env.bool( - "PUSH_TO_HUGGING_FACE_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("PUSH_TO_HUGGING_FACE_TASK_FLAG") + ), + ScheduledTaskEntry( + operator=MarkTaskNeverCompletedOperator(adb_client=self.adb_client), + interval_minutes=IntervalEnum.DAILY.value, + enabled=self.setup_flag("MARK_TASK_NEVER_COMPLETED_TASK_FLAG") + ), + ScheduledTaskEntry( + operator=DeleteStaleScreenshotsTaskOperator(adb_client=self.adb_client), + interval_minutes=IntervalEnum.DAILY.value, + enabled=self.setup_flag("DELETE_STALE_SCREENSHOTS_TASK_FLAG") ) - ] diff --git a/src/core/tasks/url/loader.py b/src/core/tasks/url/loader.py index 86625d94..2ad1776f 100644 --- a/src/core/tasks/url/loader.py +++ b/src/core/tasks/url/loader.py @@ -23,6 +23,7 @@ from src.core.tasks.url.operators.root_url.core import URLRootURLTaskOperator from src.core.tasks.url.operators.screenshot.core import URLScreenshotTaskOperator from src.core.tasks.url.operators.submit_approved.core import SubmitApprovedURLTaskOperator +from src.core.tasks.url.operators.submit_meta_urls.core import SubmitMetaURLsTaskOperator from src.core.tasks.url.operators.suspend.core import SuspendURLTaskOperator from src.core.tasks.url.operators.validate.core import AutoValidateURLTaskOperator from src.db.client.async_ import AsyncDatabaseClient @@ -55,6 +56,12 @@ def __init__( self.muckrock_api_interface = muckrock_api_interface self.hf_inference_client = hf_inference_client + def setup_flag(self, name: str) -> bool: + return self.env.bool( + name, + default=True + ) + def _get_url_html_task_operator(self) -> URLTaskEntry: operator = URLHTMLTaskOperator( adb_client=self.adb_client, @@ -63,10 +70,7 @@ def _get_url_html_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_HTML_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_HTML_TASK_FLAG") ) def _get_url_record_type_task_operator(self) -> URLTaskEntry: @@ -76,10 +80,7 @@ def _get_url_record_type_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_RECORD_TYPE_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_RECORD_TYPE_TASK_FLAG") ) def _get_agency_identification_task_operator(self) -> URLTaskEntry: @@ -93,10 +94,7 @@ def _get_agency_identification_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_AGENCY_IDENTIFICATION_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_AGENCY_IDENTIFICATION_TASK_FLAG") ) def _get_submit_approved_url_task_operator(self) -> URLTaskEntry: @@ -106,10 +104,17 @@ def _get_submit_approved_url_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_SUBMIT_APPROVED_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_SUBMIT_APPROVED_TASK_FLAG") + ) + + def _get_submit_meta_urls_task_operator(self) -> URLTaskEntry: + operator = SubmitMetaURLsTaskOperator( + adb_client=self.adb_client, + pdap_client=self.pdap_client + ) + return URLTaskEntry( + operator=operator, + enabled=self.setup_flag("URL_SUBMIT_META_URLS_TASK_FLAG") ) def _get_url_miscellaneous_metadata_task_operator(self) -> URLTaskEntry: @@ -118,10 +123,7 @@ def _get_url_miscellaneous_metadata_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_MISC_METADATA_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_MISC_METADATA_TASK_FLAG") ) def _get_url_404_probe_task_operator(self) -> URLTaskEntry: @@ -131,10 +133,7 @@ def _get_url_404_probe_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_404_PROBE_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_404_PROBE_TASK_FLAG") ) def _get_url_auto_relevance_task_operator(self) -> URLTaskEntry: @@ -144,10 +143,7 @@ def _get_url_auto_relevance_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_AUTO_RELEVANCE_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_AUTO_RELEVANCE_TASK_FLAG") ) def _get_url_probe_task_operator(self) -> URLTaskEntry: @@ -157,10 +153,7 @@ def _get_url_probe_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_PROBE_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_PROBE_TASK_FLAG") ) def _get_url_root_url_task_operator(self) -> URLTaskEntry: @@ -169,10 +162,7 @@ def _get_url_root_url_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_ROOT_URL_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_ROOT_URL_TASK_FLAG") ) def _get_url_screenshot_task_operator(self) -> URLTaskEntry: @@ -181,10 +171,7 @@ def _get_url_screenshot_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_SCREENSHOT_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_SCREENSHOT_TASK_FLAG") ) def _get_location_id_task_operator(self) -> URLTaskEntry: @@ -197,10 +184,7 @@ def _get_location_id_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_LOCATION_IDENTIFICATION_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_LOCATION_IDENTIFICATION_TASK_FLAG") ) def _get_auto_validate_task_operator(self) -> URLTaskEntry: @@ -209,10 +193,7 @@ def _get_auto_validate_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_AUTO_VALIDATE_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_AUTO_VALIDATE_TASK_FLAG") ) def _get_auto_name_task_operator(self) -> URLTaskEntry: @@ -221,10 +202,7 @@ def _get_auto_name_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_AUTO_NAME_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_AUTO_NAME_TASK_FLAG") ) def _get_suspend_url_task_operator(self) -> URLTaskEntry: @@ -233,10 +211,7 @@ def _get_suspend_url_task_operator(self) -> URLTaskEntry: ) return URLTaskEntry( operator=operator, - enabled=self.env.bool( - "URL_SUSPEND_TASK_FLAG", - default=True - ) + enabled=self.setup_flag("URL_SUSPEND_TASK_FLAG") ) @@ -250,6 +225,7 @@ async def load_entries(self) -> list[URLTaskEntry]: self._get_agency_identification_task_operator(), self._get_url_miscellaneous_metadata_task_operator(), self._get_submit_approved_url_task_operator(), + self._get_submit_meta_urls_task_operator(), self._get_url_auto_relevance_task_operator(), self._get_url_screenshot_task_operator(), self._get_location_id_task_operator(), diff --git a/src/core/tasks/url/operators/submit_approved/core.py b/src/core/tasks/url/operators/submit_approved/core.py index 618f7f2f..379e47ae 100644 --- a/src/core/tasks/url/operators/submit_approved/core.py +++ b/src/core/tasks/url/operators/submit_approved/core.py @@ -31,7 +31,7 @@ async def inner_task_logic(self): await self.link_urls_to_task(url_ids=[tdo.url_id for tdo in tdos]) # Submit each URL, recording errors if they exist - submitted_url_infos = await self.pdap_client.submit_urls(tdos) + submitted_url_infos = await self.pdap_client.submit_data_source_urls(tdos) error_infos = await self.get_error_infos(submitted_url_infos) success_infos = await self.get_success_infos(submitted_url_infos) diff --git a/src/core/tasks/url/operators/submit_meta_urls/__init__.py b/src/core/tasks/url/operators/submit_meta_urls/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/url/operators/submit_meta_urls/core.py b/src/core/tasks/url/operators/submit_meta_urls/core.py new file mode 100644 index 00000000..2a2e54b6 --- /dev/null +++ b/src/core/tasks/url/operators/submit_meta_urls/core.py @@ -0,0 +1,79 @@ +from src.core.tasks.url.operators.base import URLTaskOperatorBase +from src.core.tasks.url.operators.submit_meta_urls.queries.get import GetMetaURLsForSubmissionQueryBuilder +from src.core.tasks.url.operators.submit_meta_urls.queries.prereq import \ + MeetsMetaURLSSubmissionPrerequisitesQueryBuilder +from src.db.client.async_ import AsyncDatabaseClient +from src.db.dtos.url.mapping import URLMapping +from src.db.enums import TaskType +from src.db.models.impl.url.ds_meta_url.pydantic import URLDSMetaURLPydantic +from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic +from src.external.pdap.client import PDAPClient +from src.external.pdap.impl.meta_urls.enums import SubmitMetaURLsStatus +from src.external.pdap.impl.meta_urls.request import SubmitMetaURLsRequest +from src.external.pdap.impl.meta_urls.response import SubmitMetaURLsResponse +from src.util.url_mapper import URLMapper + + +class SubmitMetaURLsTaskOperator(URLTaskOperatorBase): + + def __init__( + self, + adb_client: AsyncDatabaseClient, + pdap_client: PDAPClient + ): + super().__init__(adb_client) + self.pdap_client = pdap_client + + @property + def task_type(self) -> TaskType: + return TaskType.SUBMIT_META_URLS + + async def meets_task_prerequisites(self) -> bool: + return await self.adb_client.run_query_builder( + MeetsMetaURLSSubmissionPrerequisitesQueryBuilder() + ) + + async def inner_task_logic(self) -> None: + requests: list[SubmitMetaURLsRequest] = await self.adb_client.run_query_builder( + GetMetaURLsForSubmissionQueryBuilder() + ) + + url_mappings: list[URLMapping] = [ + URLMapping( + url=request.url, + url_id=request.url_id, + ) + for request in requests + ] + + mapper = URLMapper(url_mappings) + + await self.link_urls_to_task(mapper.get_all_ids()) + + responses: list[SubmitMetaURLsResponse] = \ + await self.pdap_client.submit_meta_urls(requests) + + errors: list[URLErrorInfoPydantic] = [] + inserts: list[URLDSMetaURLPydantic] = [] + + for response in responses: + url_id: int = mapper.get_id(response.url) + if response.status == SubmitMetaURLsStatus.SUCCESS: + inserts.append( + URLDSMetaURLPydantic( + url_id=url_id, + agency_id=response.agency_id, + ds_meta_url_id=response.meta_url_id + ) + ) + else: + errors.append( + URLErrorInfoPydantic( + url_id=url_id, + task_id=self.task_id, + error=response.error, + ) + ) + + await self.adb_client.bulk_insert(errors) + await self.adb_client.bulk_insert(inserts) diff --git a/src/core/tasks/url/operators/submit_meta_urls/queries/__init__.py b/src/core/tasks/url/operators/submit_meta_urls/queries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/url/operators/submit_meta_urls/queries/cte.py b/src/core/tasks/url/operators/submit_meta_urls/queries/cte.py new file mode 100644 index 00000000..89d18c82 --- /dev/null +++ b/src/core/tasks/url/operators/submit_meta_urls/queries/cte.py @@ -0,0 +1,58 @@ +from sqlalchemy import select, exists, Column, CTE + +from src.db.models.impl.agency.sqlalchemy import Agency +from src.db.models.impl.link.url_agency.sqlalchemy import LinkURLAgency +from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.models.impl.url.ds_meta_url.sqlalchemy import URLDSMetaURL +from src.db.models.views.meta_url import MetaURL + + +class SubmitMetaURLsPrerequisitesCTEContainer: + + def __init__(self): + + self._cte = ( + select( + URL.id.label("url_id"), + URL.url, + LinkURLAgency.agency_id, + ) + # Validated as Meta URL + .join( + MetaURL, + MetaURL.url_id == URL.id + ) + .join( + LinkURLAgency, + LinkURLAgency.url_id == URL.id + ) + # Does not have a submission + .where( + ~exists( + select( + URLDSMetaURL.ds_meta_url_id + ) + .where( + URLDSMetaURL.url_id == URL.id, + URLDSMetaURL.agency_id == LinkURLAgency.agency_id + ) + ) + ) + .cte("submit_meta_urls_prerequisites") + ) + + @property + def cte(self) -> CTE: + return self._cte + + @property + def url_id(self) -> Column[int]: + return self._cte.c.url_id + + @property + def agency_id(self) -> Column[int]: + return self._cte.c.agency_id + + @property + def url(self) -> Column[str]: + return self._cte.c.url \ No newline at end of file diff --git a/src/core/tasks/url/operators/submit_meta_urls/queries/get.py b/src/core/tasks/url/operators/submit_meta_urls/queries/get.py new file mode 100644 index 00000000..518393f6 --- /dev/null +++ b/src/core/tasks/url/operators/submit_meta_urls/queries/get.py @@ -0,0 +1,34 @@ +from typing import Any, Sequence + +from sqlalchemy import select, RowMapping +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.tasks.url.operators.submit_meta_urls.queries.cte import SubmitMetaURLsPrerequisitesCTEContainer +from src.db.queries.base.builder import QueryBuilderBase +from src.external.pdap.impl.meta_urls.request import SubmitMetaURLsRequest + +from src.db.helpers.session import session_helper as sh + +class GetMetaURLsForSubmissionQueryBuilder(QueryBuilderBase): + + + async def run(self, session: AsyncSession) -> list[SubmitMetaURLsRequest]: + cte = SubmitMetaURLsPrerequisitesCTEContainer() + query = ( + select( + cte.url_id, + cte.agency_id, + cte.url + ) + ) + + mappings: Sequence[RowMapping] = await sh.mappings(session, query=query) + + return [ + SubmitMetaURLsRequest( + url_id=mapping["url_id"], + agency_id=mapping["agency_id"], + url=mapping["url"], + ) + for mapping in mappings + ] diff --git a/src/core/tasks/url/operators/submit_meta_urls/queries/prereq.py b/src/core/tasks/url/operators/submit_meta_urls/queries/prereq.py new file mode 100644 index 00000000..3b5538be --- /dev/null +++ b/src/core/tasks/url/operators/submit_meta_urls/queries/prereq.py @@ -0,0 +1,20 @@ +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.tasks.url.operators.submit_meta_urls.queries.cte import SubmitMetaURLsPrerequisitesCTEContainer +from src.db.queries.base.builder import QueryBuilderBase +from src.db.helpers.session import session_helper as sh + + +class MeetsMetaURLSSubmissionPrerequisitesQueryBuilder(QueryBuilderBase): + + + async def run(self, session: AsyncSession) -> bool: + cte = SubmitMetaURLsPrerequisitesCTEContainer() + query = ( + select( + cte.url_id, + ) + ) + + return await sh.has_results(session, query=query) \ No newline at end of file diff --git a/src/db/client/async_.py b/src/db/client/async_.py index beb71375..52191078 100644 --- a/src/db/client/async_.py +++ b/src/db/client/async_.py @@ -89,6 +89,7 @@ from src.db.models.impl.log.pydantic.output import LogOutputInfo from src.db.models.impl.log.sqlalchemy import Log from src.db.models.impl.task.core import Task +from src.db.models.impl.task.enums import TaskStatus from src.db.models.impl.task.error import TaskError from src.db.models.impl.url.checked_for_duplicate import URLCheckedForDuplicate from src.db.models.impl.url.core.pydantic.info import URLInfo @@ -545,7 +546,13 @@ async def initiate_task( return task.id @session_manager - async def update_task_status(self, session: AsyncSession, task_id: int, status: BatchStatus): + async def update_task_status( + self, + session: + AsyncSession, + task_id: int, + status: TaskStatus + ): task = await session.get(Task, task_id) task.task_status = status.value diff --git a/src/db/enums.py b/src/db/enums.py index 560549a0..489709e3 100644 --- a/src/db/enums.py +++ b/src/db/enums.py @@ -40,6 +40,7 @@ class TaskType(PyEnum): AGENCY_IDENTIFICATION = "Agency Identification" MISC_METADATA = "Misc Metadata" SUBMIT_APPROVED = "Submit Approved URLs" + SUBMIT_META_URLS = "Submit Meta URLs" DUPLICATE_DETECTION = "Duplicate Detection" IDLE = "Idle" PROBE_404 = "404 Probe" @@ -59,6 +60,8 @@ class TaskType(PyEnum): SYNC_DATA_SOURCES = "Sync Data Sources" POPULATE_BACKLOG_SNAPSHOT = "Populate Backlog Snapshot" DELETE_OLD_LOGS = "Delete Old Logs" + DELETE_STALE_SCREENSHOTS = "Delete Stale Screenshots" + MARK_TASK_NEVER_COMPLETED = "Mark Task Never Completed" RUN_URL_TASKS = "Run URL Task Cycles" class ChangeLogOperationType(PyEnum): diff --git a/src/db/models/impl/link/user_suggestion_not_found/users_submitted_url/sqlalchemy.py b/src/db/models/impl/link/user_suggestion_not_found/users_submitted_url/sqlalchemy.py index 7407c016..23e61993 100644 --- a/src/db/models/impl/link/user_suggestion_not_found/users_submitted_url/sqlalchemy.py +++ b/src/db/models/impl/link/user_suggestion_not_found/users_submitted_url/sqlalchemy.py @@ -10,7 +10,7 @@ class LinkUserSubmittedURL( URLDependentMixin, CreatedAtMixin, ): - __tablename__ = "link_user_submitted_url" + __tablename__ = "link_user_submitted_urls" __table_args__ = ( PrimaryKeyConstraint("url_id", "user_id"), UniqueConstraint("url_id"), diff --git a/src/db/models/impl/task/core.py b/src/db/models/impl/task/core.py index 49e953ae..2890f4d0 100644 --- a/src/db/models/impl/task/core.py +++ b/src/db/models/impl/task/core.py @@ -8,6 +8,7 @@ from src.db.models.types import batch_status_enum + class Task(UpdatedAtMixin, WithIDBase): __tablename__ = 'tasks' @@ -16,7 +17,17 @@ class Task(UpdatedAtMixin, WithIDBase): *[task_type.value for task_type in TaskType], name='task_type' ), nullable=False) - task_status = Column(batch_status_enum, nullable=False) + task_status = Column( + PGEnum( + 'complete', + 'in-process', + 'error', + 'aborted', + 'never_completed', + name='task_status_enum' + ), + nullable=False + ) # Relationships urls = relationship( diff --git a/src/db/models/impl/task/enums.py b/src/db/models/impl/task/enums.py new file mode 100644 index 00000000..b166d747 --- /dev/null +++ b/src/db/models/impl/task/enums.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class TaskStatus(Enum): + COMPLETE = "complete" + IN_PROCESS = "in-process" + ERROR = "error" + ABORTED = "aborted" + NEVER_COMPLETED = "never-completed" diff --git a/src/db/models/impl/url/ds_meta_url/__init__.py b/src/db/models/impl/url/ds_meta_url/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/db/models/impl/url/ds_meta_url/pydantic.py b/src/db/models/impl/url/ds_meta_url/pydantic.py new file mode 100644 index 00000000..8f7674e9 --- /dev/null +++ b/src/db/models/impl/url/ds_meta_url/pydantic.py @@ -0,0 +1,14 @@ +from pydantic import BaseModel + +from src.db.models.impl.url.ds_meta_url.sqlalchemy import URLDSMetaURL + + +class URLDSMetaURLPydantic(BaseModel): + + url_id: int + ds_meta_url_id: int + agency_id: int + + @classmethod + def sa_model(cls) -> type[URLDSMetaURL]: + return URLDSMetaURL \ No newline at end of file diff --git a/src/db/models/impl/url/ds_meta_url/sqlalchemy.py b/src/db/models/impl/url/ds_meta_url/sqlalchemy.py new file mode 100644 index 00000000..e642a694 --- /dev/null +++ b/src/db/models/impl/url/ds_meta_url/sqlalchemy.py @@ -0,0 +1,20 @@ +from sqlalchemy import Column, Integer, PrimaryKeyConstraint, UniqueConstraint + +from src.db.models.mixins import URLDependentMixin, CreatedAtMixin, AgencyDependentMixin +from src.db.models.templates_.base import Base + + +class URLDSMetaURL( + Base, + URLDependentMixin, + AgencyDependentMixin, + CreatedAtMixin +): + __tablename__ = "url_ds_meta_url" + + ds_meta_url_id = Column(Integer) + + __table_args__ = ( + PrimaryKeyConstraint("url_id", "agency_id"), + UniqueConstraint("ds_meta_url_id"), + ) \ No newline at end of file diff --git a/src/db/statement_composer.py b/src/db/statement_composer.py index 19b544a4..8618fd84 100644 --- a/src/db/statement_composer.py +++ b/src/db/statement_composer.py @@ -12,6 +12,7 @@ from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL from src.db.models.impl.link.task_url import LinkTaskURL from src.db.models.impl.task.core import Task +from src.db.models.impl.task.enums import TaskStatus from src.db.models.impl.url.core.sqlalchemy import URL from src.db.models.impl.url.optional_data_source_metadata import URLOptionalDataSourceMetadata from src.db.models.impl.url.scrape_info.sqlalchemy import URLScrapeInfo @@ -32,7 +33,7 @@ def has_non_errored_urls_without_html_data() -> Select: join(Task, LinkTaskURL.task_id == Task.id). where(LinkTaskURL.url_id == URL.id). where(Task.task_type == TaskType.HTML.value). - where(Task.task_status == BatchStatus.READY_TO_LABEL.value) + where(Task.task_status == TaskStatus.COMPLETE.value) ) query = ( select(URL) diff --git a/src/external/pdap/client.py b/src/external/pdap/client.py index 661edf07..1c950ad3 100644 --- a/src/external/pdap/client.py +++ b/src/external/pdap/client.py @@ -7,6 +7,8 @@ from src.external.pdap.dtos.match_agency.response import MatchAgencyResponse from src.external.pdap.dtos.unique_url_duplicate import UniqueURLDuplicateInfo from src.external.pdap.enums import MatchAgencyResponseStatus +from src.external.pdap.impl.meta_urls.core import submit_meta_urls +from src.external.pdap.impl.meta_urls.request import SubmitMetaURLsRequest class PDAPClient: @@ -89,7 +91,7 @@ async def is_url_duplicate( is_duplicate: bool = (len(duplicates) != 0) return is_duplicate - async def submit_urls( + async def submit_data_source_urls( self, tdos: list[SubmitApprovedURLTDO] ) -> list[SubmittedURLInfo]: @@ -146,3 +148,12 @@ async def submit_urls( results.append(response_object) return results + + async def submit_meta_urls( + self, + requests: list[SubmitMetaURLsRequest] + ): + return await submit_meta_urls( + self.access_manager, + requests=requests + ) \ No newline at end of file diff --git a/src/external/pdap/dtos/sync/agencies.py b/src/external/pdap/dtos/sync/agencies.py deleted file mode 100644 index 7e569a81..00000000 --- a/src/external/pdap/dtos/sync/agencies.py +++ /dev/null @@ -1,18 +0,0 @@ -import datetime -from typing import Optional - -from pydantic import BaseModel - - - -class AgenciesSyncResponseInnerInfo(BaseModel): - display_name: str - agency_id: int - state_name: str | None - county_name: str | None - locality_name: str | None - updated_at: datetime.datetime - meta_urls: list[str] = [] - -class AgenciesSyncResponseInfo(BaseModel): - agencies: list[AgenciesSyncResponseInnerInfo] diff --git a/src/external/pdap/dtos/sync/data_sources.py b/src/external/pdap/dtos/sync/data_sources.py deleted file mode 100644 index a5fe92b9..00000000 --- a/src/external/pdap/dtos/sync/data_sources.py +++ /dev/null @@ -1,21 +0,0 @@ -from datetime import datetime - -from pydantic import BaseModel - -from src.core.enums import RecordType -from src.external.pdap.enums import ApprovalStatus, DataSourcesURLStatus - - -class DataSourcesSyncResponseInnerInfo(BaseModel): - id: int - url: str - name: str - description: str | None - record_type: RecordType - agency_ids: list[int] - approval_status: ApprovalStatus - url_status: DataSourcesURLStatus - updated_at: datetime - -class DataSourcesSyncResponseInfo(BaseModel): - data_sources: list[DataSourcesSyncResponseInnerInfo] \ No newline at end of file diff --git a/src/external/pdap/impl/__init__.py b/src/external/pdap/impl/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/external/pdap/impl/meta_urls/__init__.py b/src/external/pdap/impl/meta_urls/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/external/pdap/impl/meta_urls/core.py b/src/external/pdap/impl/meta_urls/core.py new file mode 100644 index 00000000..f3078924 --- /dev/null +++ b/src/external/pdap/impl/meta_urls/core.py @@ -0,0 +1,58 @@ +from typing import Any + +from pdap_access_manager import AccessManager, DataSourcesNamespaces, RequestInfo, RequestType, ResponseInfo + +from src.external.pdap.impl.meta_urls.enums import SubmitMetaURLsStatus +from src.external.pdap.impl.meta_urls.request import SubmitMetaURLsRequest +from src.external.pdap.impl.meta_urls.response import SubmitMetaURLsResponse + + +async def submit_meta_urls( + access_manager: AccessManager, + requests: list[SubmitMetaURLsRequest] +) -> list[SubmitMetaURLsResponse]: + + + # Build url-id dictionary + url_id_dict: dict[str, int] = {} + for request in requests: + url_id_dict[request.url] = request.url_id + + meta_urls_json: list[dict[str, Any]] = [] + for request in requests: + meta_urls_json.append( + { + "url": request.url, + "agency_id": request.agency_id + } + ) + + headers: dict[str, str] = await access_manager.jwt_header() + url: str = access_manager.build_url( + namespace=DataSourcesNamespaces.SOURCE_COLLECTOR, + subdomains=["meta-urls"] + ) + request_info = RequestInfo( + type_=RequestType.POST, + url=url, + headers=headers, + json_={ + "data_sources": meta_urls_json + } + ) + + response_info: ResponseInfo = await access_manager.make_request(request_info) + meta_urls_response_json: list[dict[str, Any]] = response_info.data["meta_urls"] + + responses: list[SubmitMetaURLsResponse] = [] + for meta_url in meta_urls_response_json: + responses.append( + SubmitMetaURLsResponse( + url=meta_url["url"], + status=SubmitMetaURLsStatus(meta_url["status"]), + agency_id=meta_url["agency_id"], + meta_url_id=meta_url["meta_url_id"], + error=meta_url["error"] + ) + ) + return responses \ No newline at end of file diff --git a/src/external/pdap/impl/meta_urls/enums.py b/src/external/pdap/impl/meta_urls/enums.py new file mode 100644 index 00000000..e49e71aa --- /dev/null +++ b/src/external/pdap/impl/meta_urls/enums.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class SubmitMetaURLsStatus(Enum): + SUCCESS = "success" + FAILURE = "failure" + ALREADY_EXISTS = "already_exists" \ No newline at end of file diff --git a/src/external/pdap/impl/meta_urls/request.py b/src/external/pdap/impl/meta_urls/request.py new file mode 100644 index 00000000..ac222aca --- /dev/null +++ b/src/external/pdap/impl/meta_urls/request.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel + + +class SubmitMetaURLsRequest(BaseModel): + url_id: int + url: str + agency_id: int diff --git a/src/external/pdap/impl/meta_urls/response.py b/src/external/pdap/impl/meta_urls/response.py new file mode 100644 index 00000000..96d5ece7 --- /dev/null +++ b/src/external/pdap/impl/meta_urls/response.py @@ -0,0 +1,11 @@ +from pydantic import BaseModel + +from src.external.pdap.impl.meta_urls.enums import SubmitMetaURLsStatus + + +class SubmitMetaURLsResponse(BaseModel): + url: str + status: SubmitMetaURLsStatus + meta_url_id: int | None = None + agency_id: int | None = None + error: str | None = None \ No newline at end of file diff --git a/tests/automated/integration/api/submit/test_needs_cleaning.py b/tests/automated/integration/api/submit/test_needs_cleaning.py index 85c2f112..c6512502 100644 --- a/tests/automated/integration/api/submit/test_needs_cleaning.py +++ b/tests/automated/integration/api/submit/test_needs_cleaning.py @@ -16,7 +16,7 @@ async def test_needs_cleaning( ): response: URLSubmissionResponse = await api_test_helper.request_validator.submit_url( request=URLSubmissionRequest( - url="www.example.com#fragment" + url="www.example.com#fdragment" ) ) diff --git a/tests/automated/integration/core/async_/conclude_task/test_error.py b/tests/automated/integration/core/async_/conclude_task/test_error.py index 9507c9ed..1a31b87e 100644 --- a/tests/automated/integration/core/async_/conclude_task/test_error.py +++ b/tests/automated/integration/core/async_/conclude_task/test_error.py @@ -2,6 +2,7 @@ from src.core.enums import BatchStatus from src.core.tasks.url.enums import TaskOperatorOutcome +from src.db.models.impl.task.enums import TaskStatus from tests.automated.integration.core.async_.conclude_task.helpers import setup_run_info from tests.automated.integration.core.async_.conclude_task.setup_info import TestAsyncCoreSetupInfo from tests.automated.integration.core.async_.helpers import setup_async_core @@ -25,5 +26,5 @@ async def test_conclude_task_error( task_info = await ddc.adb_client.get_task_info(task_id=setup.task_id) - assert task_info.task_status == BatchStatus.ERROR + assert task_info.task_status == TaskStatus.ERROR assert task_info.error_info == "test error" diff --git a/tests/automated/integration/core/async_/conclude_task/test_success.py b/tests/automated/integration/core/async_/conclude_task/test_success.py index d9ba649e..03cc5b52 100644 --- a/tests/automated/integration/core/async_/conclude_task/test_success.py +++ b/tests/automated/integration/core/async_/conclude_task/test_success.py @@ -2,6 +2,7 @@ from src.core.enums import BatchStatus from src.core.tasks.url.enums import TaskOperatorOutcome +from src.db.models.impl.task.enums import TaskStatus from tests.automated.integration.core.async_.conclude_task.helpers import setup_run_info from tests.automated.integration.core.async_.conclude_task.setup_info import TestAsyncCoreSetupInfo from tests.automated.integration.core.async_.helpers import setup_async_core @@ -25,4 +26,4 @@ async def test_conclude_task_success( task_info = await ddc.adb_client.get_task_info(task_id=setup.task_id) - assert task_info.task_status == BatchStatus.READY_TO_LABEL + assert task_info.task_status == TaskStatus.COMPLETE diff --git a/tests/automated/integration/db/client/test_delete_old_logs.py b/tests/automated/integration/db/client/test_delete_old_logs.py index 44c96075..7c2c2b62 100644 --- a/tests/automated/integration/db/client/test_delete_old_logs.py +++ b/tests/automated/integration/db/client/test_delete_old_logs.py @@ -2,6 +2,7 @@ import pytest +from src.core.tasks.scheduled.impl.delete_logs.operator import DeleteOldLogsTaskOperator from src.db.models.impl.log.pydantic.info import LogInfo from tests.helpers.data_creator.core import DBDataCreator @@ -13,13 +14,16 @@ async def test_delete_old_logs(db_data_creator: DBDataCreator): old_datetime = datetime.now() - timedelta(days=7) db_client = db_data_creator.db_client adb_client = db_data_creator.adb_client + operator = DeleteOldLogsTaskOperator( + adb_client=adb_client, + ) log_infos = [] for i in range(3): log_infos.append(LogInfo(log="test log", batch_id=batch_id, created_at=old_datetime)) db_client.insert_logs(log_infos=log_infos) logs = await adb_client.get_logs_by_batch_id(batch_id=batch_id) assert len(logs) == 3 - await adb_client.delete_old_logs() + await operator.inner_task_logic() logs = await adb_client.get_logs_by_batch_id(batch_id=batch_id) assert len(logs) == 0 diff --git a/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py b/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py index d7c43e97..f2dd795c 100644 --- a/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py +++ b/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py @@ -2,7 +2,7 @@ from src.core.tasks.scheduled.loader import ScheduledTaskOperatorLoader -NUMBER_OF_ENTRIES = 6 +NUMBER_OF_ENTRIES = 8 @pytest.mark.asyncio async def test_happy_path( diff --git a/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py b/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py index 44b70d53..abe2c37d 100644 --- a/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py +++ b/tests/automated/integration/tasks/url/impl/submit_approved/test_submit_approved_url_task.py @@ -59,7 +59,7 @@ async def test_submit_approved_url_task( url_2: URL = urls[1] url_3: URL = urls[2] - # Check URLs have been marked as 'submitted' + # Check URLs assert url_1.status == URLStatus.OK assert url_2.status == URLStatus.OK assert url_3.status == URLStatus.ERROR diff --git a/tests/automated/integration/tasks/url/impl/submit_approved/test_validated_meta_url.py b/tests/automated/integration/tasks/url/impl/submit_approved/test_validated_meta_url.py index d9b5a380..76754b29 100644 --- a/tests/automated/integration/tasks/url/impl/submit_approved/test_validated_meta_url.py +++ b/tests/automated/integration/tasks/url/impl/submit_approved/test_validated_meta_url.py @@ -12,7 +12,6 @@ async def test_validated_meta_url_not_included( db_data_creator, mock_pdap_client: PDAPClient, - monkeypatch ): """ If a validated Meta URL is included in the database diff --git a/tests/automated/integration/tasks/url/impl/submit_meta_urls/__init__.py b/tests/automated/integration/tasks/url/impl/submit_meta_urls/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/automated/integration/tasks/url/impl/submit_meta_urls/test_core.py b/tests/automated/integration/tasks/url/impl/submit_meta_urls/test_core.py new file mode 100644 index 00000000..37d6e00f --- /dev/null +++ b/tests/automated/integration/tasks/url/impl/submit_meta_urls/test_core.py @@ -0,0 +1,80 @@ +from http import HTTPStatus +from unittest.mock import AsyncMock + +import pytest +from pdap_access_manager import ResponseInfo + +from src.collectors.enums import URLStatus +from src.core.enums import SubmitResponseStatus +from src.core.tasks.url.operators.submit_meta_urls.core import SubmitMetaURLsTaskOperator +from src.db.dtos.url.mapping import URLMapping +from src.db.models.impl.flag.url_validated.enums import URLType +from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.models.impl.url.ds_meta_url.sqlalchemy import URLDSMetaURL +from src.external.pdap.client import PDAPClient +from src.external.pdap.impl.meta_urls.enums import SubmitMetaURLsStatus +from tests.helpers.data_creator.core import DBDataCreator +from tests.helpers.run import run_task_and_confirm_success + + +@pytest.mark.asyncio +async def test_submit_meta_urls( + db_data_creator: DBDataCreator, + mock_pdap_client: PDAPClient, +): + """ + Test Submit Meta URLs Task Operator + """ + + + operator = SubmitMetaURLsTaskOperator( + adb_client=db_data_creator.adb_client, + pdap_client=mock_pdap_client + ) + + assert not await operator.meets_task_prerequisites() + + # Create validated meta url + agency_id: int = (await db_data_creator.create_agencies(count=1))[0] + + mapping: URLMapping = (await db_data_creator.create_validated_urls( + validation_type=URLType.META_URL + ))[0] + await db_data_creator.link_urls_to_agencies( + url_ids=[mapping.url_id], + agency_ids=[agency_id] + ) + + mock_pdap_client.access_manager.make_request = AsyncMock( + return_value=ResponseInfo( + status_code=HTTPStatus.OK, + data={ + "meta_urls": [ + { + "url": mapping.url, + "agency_id": agency_id, + "status": SubmitMetaURLsStatus.SUCCESS.value, + "meta_url_id": 2, + "error": None, + }, + ] + } + ) + ) + + + assert await operator.meets_task_prerequisites() + + await run_task_and_confirm_success(operator) + + urls: list[URL] = await db_data_creator.adb_client.get_all(URL) + assert len(urls) == 1 + url: URL = urls[0] + assert url.status == URLStatus.OK + + url_ds_meta_urls: list[URLDSMetaURL] = await db_data_creator.adb_client.get_all(URLDSMetaURL) + assert len(url_ds_meta_urls) == 1 + url_ds_meta_url: URLDSMetaURL = url_ds_meta_urls[0] + assert url_ds_meta_url.url_id == url.id + assert url_ds_meta_url.ds_meta_url_id == 2 + assert url_ds_meta_url.agency_id == agency_id \ No newline at end of file diff --git a/tests/automated/integration/tasks/url/loader/test_happy_path.py b/tests/automated/integration/tasks/url/loader/test_happy_path.py index a7b02e89..bd5a431c 100644 --- a/tests/automated/integration/tasks/url/loader/test_happy_path.py +++ b/tests/automated/integration/tasks/url/loader/test_happy_path.py @@ -2,7 +2,7 @@ from src.core.tasks.url.loader import URLTaskOperatorLoader -NUMBER_OF_TASK_OPERATORS: int = 14 +NUMBER_OF_TASK_OPERATORS: int = 15 @pytest.mark.asyncio async def test_happy_path(