diff --git a/ENV.md b/ENV.md index 1accf4fa..b957bc11 100644 --- a/ENV.md +++ b/ENV.md @@ -83,7 +83,6 @@ URL Task Flags are collectively controlled by the `RUN_URL_TASKS_TASK_FLAG` flag | `URL_AGENCY_IDENTIFICATION_TASK_FLAG` | Automatically assigns and suggests Agencies for URLs. | | `URL_SUBMIT_APPROVED_TASK_FLAG` | Submits approved URLs to the Data Sources App. | | `URL_MISC_METADATA_TASK_FLAG` | Adds misc metadata to URLs. | -| `URL_404_PROBE_TASK_FLAG` | Probes URLs for 404 errors. | | `URL_AUTO_RELEVANCE_TASK_FLAG` | Automatically assigns Relevances to URLs. | | `URL_PROBE_TASK_FLAG` | Probes URLs for web metadata. | | `URL_ROOT_URL_TASK_FLAG` | Extracts and links Root URLs to URLs. | diff --git a/alembic/versions/2025_10_12_1549-d55ec2987702_remove_404_probe_task.py b/alembic/versions/2025_10_12_1549-d55ec2987702_remove_404_probe_task.py new file mode 100644 index 00000000..26fb9d0e --- /dev/null +++ b/alembic/versions/2025_10_12_1549-d55ec2987702_remove_404_probe_task.py @@ -0,0 +1,157 @@ +"""Remove 404 Probe Task + +Revision ID: d55ec2987702 +Revises: 25b3fc777c31 +Create Date: 2025-10-12 15:49:01.945412 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +from src.util.alembic_helpers import remove_enum_value, add_enum_value + +# revision identifiers, used by Alembic. +revision: str = 'd55ec2987702' +down_revision: Union[str, None] = '25b3fc777c31' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + _drop_views() + add_enum_value( + enum_name="url_type", + enum_value="broken page" + ) + + op.execute( + """DELETE FROM TASKS WHERE task_type = '404 Probe'""" + ) + op.execute( + """DELETE FROM url_task_error WHERE task_type = '404 Probe'""" + ) + remove_enum_value( + enum_name="task_type", + value_to_remove="404 Probe", + targets=[ + ("tasks", "task_type"), + ("url_task_error", "task_type") + ] + ) + op.execute( + """UPDATE URLS SET status = 'ok' WHERE status = '404 not found'""" + ) + remove_enum_value( + enum_name="url_status", + value_to_remove="404 not found", + targets=[ + ("urls", "status") + ] + ) + + op.drop_table("url_probed_for_404") + + _recreate_views() + +def _drop_views(): + op.execute("drop view url_task_count_1_day") + op.execute("drop view url_task_count_1_week") + op.execute("drop materialized view url_status_mat_view") + +def _recreate_views(): + op.execute(""" + create view url_task_count_1_day(task_type, count) as + SELECT + t.task_type, + count(ltu.url_id) AS count + FROM + tasks t + JOIN link_task_urls ltu + ON ltu.task_id = t.id + WHERE + t.updated_at > (now() - '1 day'::interval) + GROUP BY + t.task_type; + """) + + op.execute(""" + create view url_task_count_1_week(task_type, count) as + SELECT + t.task_type, + count(ltu.url_id) AS count + FROM + tasks t + JOIN link_task_urls ltu + ON ltu.task_id = t.id + WHERE + t.updated_at > (now() - '7 days'::interval) + GROUP BY + t.task_type; + """) + + op.execute( + """ + CREATE MATERIALIZED VIEW url_status_mat_view AS + with + urls_with_relevant_errors as ( + select + ute.url_id + from + url_task_error ute + where + ute.task_type in ( + 'Screenshot', + 'HTML', + 'URL Probe' + ) + ) + select + u.id as url_id, + case + when ( + -- Validated as not relevant, individual record, or not found + fuv.type in ('not relevant', 'individual record', 'not found') + -- Has Meta URL in data sources app + OR udmu.url_id is not null + -- Has data source in data sources app + OR uds.url_id is not null + ) Then 'Submitted/Pipeline Complete' + when fuv.type is not null THEN 'Accepted' + when ( + -- Has compressed HTML + uch.url_id is not null + AND + -- Has web metadata + uwm.url_id is not null + AND + -- Has screenshot + us.url_id is not null + ) THEN 'Community Labeling' + when uwre.url_id is not null then 'Error' + ELSE 'Intake' + END as status + + from + urls u + left join urls_with_relevant_errors uwre + on u.id = uwre.url_id + left join url_screenshot us + on u.id = us.url_id + left join url_compressed_html uch + on u.id = uch.url_id + left join url_web_metadata uwm + on u.id = uwm.url_id + left join flag_url_validated fuv + on u.id = fuv.url_id + left join url_ds_meta_url udmu + on u.id = udmu.url_id + left join url_data_source uds + on u.id = uds.url_id + """ + ) + + +def downgrade() -> None: + pass diff --git a/src/api/endpoints/review/reject/query.py b/src/api/endpoints/review/reject/query.py index 89509dfc..1f9dfe91 100644 --- a/src/api/endpoints/review/reject/query.py +++ b/src/api/endpoints/review/reject/query.py @@ -35,12 +35,12 @@ async def run(self, session) -> None: url = await session.execute(query) url = url.scalars().first() - validation_type: URLType | None = None + validation_type: URLType match self.rejection_reason: case RejectionReason.INDIVIDUAL_RECORD: validation_type = URLType.INDIVIDUAL_RECORD case RejectionReason.BROKEN_PAGE_404: - url.status = URLStatus.NOT_FOUND.value + validation_type = URLType.BROKEN_PAGE case RejectionReason.NOT_RELEVANT: validation_type = URLType.NOT_RELEVANT case _: @@ -49,12 +49,11 @@ async def run(self, session) -> None: detail="Invalid rejection reason" ) - if validation_type is not None: - flag_url_validated = FlagURLValidated( - url_id=self.url_id, - type=validation_type - ) - session.add(flag_url_validated) + flag_url_validated = FlagURLValidated( + url_id=self.url_id, + type=validation_type + ) + session.add(flag_url_validated) # Add rejecting user rejecting_user_url = ReviewingUserURL( diff --git a/src/collectors/enums.py b/src/collectors/enums.py index c357d6bf..f40e5f19 100644 --- a/src/collectors/enums.py +++ b/src/collectors/enums.py @@ -14,4 +14,3 @@ class URLStatus(Enum): OK = "ok" ERROR = "error" DUPLICATE = "duplicate" - NOT_FOUND = "404 not found" diff --git a/src/core/tasks/url/loader.py b/src/core/tasks/url/loader.py index 2ad1776f..b5910f5e 100644 --- a/src/core/tasks/url/loader.py +++ b/src/core/tasks/url/loader.py @@ -17,7 +17,6 @@ from src.core.tasks.url.operators.location_id.subtasks.loader import LocationIdentificationSubtaskLoader from src.core.tasks.url.operators.misc_metadata.core import URLMiscellaneousMetadataTaskOperator from src.core.tasks.url.operators.probe.core import URLProbeTaskOperator -from src.core.tasks.url.operators.probe_404.core import URL404ProbeTaskOperator from src.core.tasks.url.operators.record_type.core import URLRecordTypeTaskOperator from src.core.tasks.url.operators.record_type.llm_api.record_classifier.openai import OpenAIRecordClassifier from src.core.tasks.url.operators.root_url.core import URLRootURLTaskOperator @@ -126,15 +125,6 @@ def _get_url_miscellaneous_metadata_task_operator(self) -> URLTaskEntry: enabled=self.setup_flag("URL_MISC_METADATA_TASK_FLAG") ) - def _get_url_404_probe_task_operator(self) -> URLTaskEntry: - operator = URL404ProbeTaskOperator( - adb_client=self.adb_client, - url_request_interface=self.url_request_interface - ) - return URLTaskEntry( - operator=operator, - enabled=self.setup_flag("URL_404_PROBE_TASK_FLAG") - ) def _get_url_auto_relevance_task_operator(self) -> URLTaskEntry: operator = URLAutoRelevantTaskOperator( @@ -220,7 +210,6 @@ async def load_entries(self) -> list[URLTaskEntry]: self._get_url_root_url_task_operator(), self._get_url_probe_task_operator(), self._get_url_html_task_operator(), - self._get_url_404_probe_task_operator(), self._get_url_record_type_task_operator(), self._get_agency_identification_task_operator(), self._get_url_miscellaneous_metadata_task_operator(), diff --git a/src/core/tasks/url/operators/probe/core.py b/src/core/tasks/url/operators/probe/core.py index 0e091852..1c961155 100644 --- a/src/core/tasks/url/operators/probe/core.py +++ b/src/core/tasks/url/operators/probe/core.py @@ -8,6 +8,7 @@ from src.core.tasks.url.operators.probe.queries.urls.not_probed.exists import HasURLsWithoutProbeQueryBuilder from src.core.tasks.url.operators.probe.queries.urls.not_probed.get.query import GetURLsWithoutProbeQueryBuilder from src.core.tasks.url.operators.probe.tdo import URLProbeTDO +from src.db.models.impl.url.web_metadata.insert import URLWebMetadataPydantic from src.external.url_request.core import URLRequestInterface from src.db.client.async_ import AsyncDatabaseClient from src.db.dtos.url.mapping import URLMapping @@ -68,10 +69,10 @@ async def probe_urls(self, tdos: list[URLProbeTDO]) -> None: async def update_database(self, tdos: list[URLProbeTDO]) -> None: non_redirect_tdos = filter_non_redirect_tdos(tdos) - web_metadata_objects = convert_tdo_to_web_metadata_list(non_redirect_tdos) - await self.adb_client.bulk_insert(web_metadata_objects) + web_metadata_objects: list[URLWebMetadataPydantic] = convert_tdo_to_web_metadata_list(non_redirect_tdos) + await self.adb_client.bulk_upsert(web_metadata_objects) - redirect_tdos = filter_redirect_tdos(tdos) + redirect_tdos: list[URLProbeTDO] = filter_redirect_tdos(tdos) query_builder = InsertRedirectsQueryBuilder(tdos=redirect_tdos) await self.adb_client.run_query_builder(query_builder) diff --git a/src/core/tasks/url/operators/probe/queries/insert_redirects/extract.py b/src/core/tasks/url/operators/probe/queries/insert_redirects/extract.py index c44e1a83..3de66e85 100644 --- a/src/core/tasks/url/operators/probe/queries/insert_redirects/extract.py +++ b/src/core/tasks/url/operators/probe/queries/insert_redirects/extract.py @@ -4,7 +4,7 @@ def extract_response_pairs(tdos: list[URLProbeTDO]) -> list[URLProbeRedirectResponsePair]: - results = [] + results: list[URLProbeRedirectResponsePair] = [] for tdo in tdos: if not tdo.response.is_redirect: raise ValueError(f"Expected {tdo.url_mapping.url} to be a redirect.") diff --git a/src/core/tasks/url/operators/probe/queries/insert_redirects/query.py b/src/core/tasks/url/operators/probe/queries/insert_redirects/query.py index a79cca77..0ba70c47 100644 --- a/src/core/tasks/url/operators/probe/queries/insert_redirects/query.py +++ b/src/core/tasks/url/operators/probe/queries/insert_redirects/query.py @@ -6,6 +6,7 @@ from src.core.tasks.url.operators.probe.tdo import URLProbeTDO from src.db.dtos.url.mapping import URLMapping from src.db.queries.base.builder import QueryBuilderBase +from src.external.url_request.probe.models.redirect import URLProbeRedirectResponsePair from src.external.url_request.probe.models.response import URLProbeResponse from src.util.url_mapper import URLMapper @@ -20,7 +21,7 @@ def __init__( self.source_url_mappings = [tdo.url_mapping for tdo in self.tdos] self._mapper = URLMapper(self.source_url_mappings) - self._response_pairs = extract_response_pairs(self.tdos) + self._response_pairs: list[URLProbeRedirectResponsePair] = extract_response_pairs(self.tdos) self._destination_probe_responses: list[URLProbeResponse] = [ pair.destination @@ -49,14 +50,19 @@ async def run(self, session: AsyncSession) -> None: session=session ) + + # Get all destination URLs already in the database dest_url_mappings_in_db: list[URLMapping] = await rm.get_url_mappings_in_db( urls=self._destination_urls ) + # Filter out to only have those URLs that are new in the database new_dest_urls: list[str] = filter_new_dest_urls( url_mappings_in_db=dest_url_mappings_in_db, all_dest_urls=self._destination_urls ) + + # Add the new URLs new_dest_url_mappings: list[URLMapping] = await rm.insert_new_urls( urls=new_dest_urls ) @@ -64,12 +70,14 @@ async def run(self, session: AsyncSession) -> None: self._mapper.add_mappings(all_dest_url_mappings) + # Add web metadata for new URLs await rm.add_web_metadata( all_dest_url_mappings=all_dest_url_mappings, dest_url_to_probe_response_mappings=self._destination_url_to_probe_response_mapping, tdos=self.tdos ) + # Add redirect links for new URLs await rm.add_redirect_links( response_pairs=self._response_pairs, mapper=self._mapper diff --git a/src/core/tasks/url/operators/probe/queries/insert_redirects/request_manager.py b/src/core/tasks/url/operators/probe/queries/insert_redirects/request_manager.py index d866106a..35dfded5 100644 --- a/src/core/tasks/url/operators/probe/queries/insert_redirects/request_manager.py +++ b/src/core/tasks/url/operators/probe/queries/insert_redirects/request_manager.py @@ -1,3 +1,6 @@ +from typing import Sequence + +from sqlalchemy import select, tuple_, RowMapping from sqlalchemy.ext.asyncio import AsyncSession from src.core.tasks.url.operators.probe.queries.insert_redirects.convert import convert_to_url_mappings, \ @@ -11,6 +14,8 @@ from src.db.dtos.url.mapping import URLMapping from src.db.helpers.session import session_helper as sh from src.db.models.impl.link.url_redirect_url.pydantic import LinkURLRedirectURLPydantic +from src.db.models.impl.link.url_redirect_url.sqlalchemy import LinkURLRedirectURL +from src.db.models.impl.url.core.sqlalchemy import URL from src.db.models.impl.url.web_metadata.insert import URLWebMetadataPydantic from src.external.url_request.probe.models.redirect import URLProbeRedirectResponsePair from src.external.url_request.probe.models.response import URLProbeResponse @@ -69,10 +74,40 @@ async def add_redirect_links( response_pairs: list[URLProbeRedirectResponsePair], mapper: URLMapper ) -> None: - links: list[LinkURLRedirectURLPydantic] = [] + # Get all existing links and exclude + link_tuples: list[tuple[int, int]] = [] for pair in response_pairs: source_url_id = mapper.get_id(pair.source.url) destination_url_id = mapper.get_id(pair.destination.url) + link_tuples.append((source_url_id, destination_url_id)) + + query = ( + select( + LinkURLRedirectURL.source_url_id, + LinkURLRedirectURL.destination_url_id + ) + .where( + tuple_( + LinkURLRedirectURL.source_url_id, + LinkURLRedirectURL.destination_url_id + ).in_(link_tuples) + ) + ) + mappings: Sequence[RowMapping] = await sh.mappings(self.session, query=query) + existing_links: set[tuple[int, int]] = { + (mapping["source_url_id"], mapping["destination_url_id"]) + for mapping in mappings + } + new_links: list[tuple[int, int]] = [ + (source_url_id, destination_url_id) + for source_url_id, destination_url_id in link_tuples + if (source_url_id, destination_url_id) not in existing_links + ] + + + links: list[LinkURLRedirectURLPydantic] = [] + for link in new_links: + source_url_id, destination_url_id = link link = LinkURLRedirectURLPydantic( source_url_id=source_url_id, destination_url_id=destination_url_id diff --git a/src/core/tasks/url/operators/probe/queries/urls/not_probed/exists.py b/src/core/tasks/url/operators/probe/queries/urls/not_probed/exists.py index c1b9b723..5954c197 100644 --- a/src/core/tasks/url/operators/probe/queries/urls/not_probed/exists.py +++ b/src/core/tasks/url/operators/probe/queries/urls/not_probed/exists.py @@ -1,4 +1,6 @@ -from sqlalchemy import select +from datetime import timedelta, datetime + +from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from typing_extensions import override, final @@ -18,8 +20,15 @@ async def run(self, session: AsyncSession) -> bool: select( URL.id ) + .outerjoin( + URLWebMetadata, + URL.id == URLWebMetadata.url_id + ) .where( - not_exists_url(URLWebMetadata), + or_( + URLWebMetadata.id.is_(None), + URLWebMetadata.updated_at < datetime.now() - timedelta(days=30) + ), no_url_task_error(TaskType.PROBE_URL) ) ) diff --git a/src/core/tasks/url/operators/probe/queries/urls/not_probed/get/query.py b/src/core/tasks/url/operators/probe/queries/urls/not_probed/get/query.py index 8e29adc6..36450252 100644 --- a/src/core/tasks/url/operators/probe/queries/urls/not_probed/get/query.py +++ b/src/core/tasks/url/operators/probe/queries/urls/not_probed/get/query.py @@ -1,4 +1,6 @@ -from sqlalchemy import select +from datetime import timedelta, datetime + +from sqlalchemy import select, or_ from sqlalchemy.ext.asyncio import AsyncSession from typing_extensions import override, final @@ -25,7 +27,10 @@ async def run(self, session: AsyncSession) -> list[URLMapping]: URL.id == URLWebMetadata.url_id ) .where( - URLWebMetadata.id.is_(None) + or_( + URLWebMetadata.id.is_(None), + URLWebMetadata.updated_at < datetime.now() - timedelta(days=30) + ) ) .limit(500) ) diff --git a/src/core/tasks/url/operators/probe_404/__init__.py b/src/core/tasks/url/operators/probe_404/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/core/tasks/url/operators/probe_404/core.py b/src/core/tasks/url/operators/probe_404/core.py deleted file mode 100644 index ecfed6f5..00000000 --- a/src/core/tasks/url/operators/probe_404/core.py +++ /dev/null @@ -1,75 +0,0 @@ -from http import HTTPStatus - -from pydantic import BaseModel - -from src.core.tasks.url.operators.probe_404.tdo import URL404ProbeTDO -from src.external.url_request.core import URLRequestInterface -from src.db.client.async_ import AsyncDatabaseClient -from src.db.enums import TaskType -from src.core.tasks.url.operators.base import URLTaskOperatorBase - - -class URL404ProbeTDOSubsets(BaseModel): - successful: list[URL404ProbeTDO] - is_404: list[URL404ProbeTDO] - - - -class URL404ProbeTaskOperator(URLTaskOperatorBase): - - def __init__( - self, - url_request_interface: URLRequestInterface, - adb_client: AsyncDatabaseClient, - ): - super().__init__(adb_client) - self.url_request_interface = url_request_interface - - @property - def task_type(self) -> TaskType: - return TaskType.PROBE_404 - - async def meets_task_prerequisites(self) -> bool: - return await self.adb_client.has_pending_urls_not_recently_probed_for_404() - - async def probe_urls_for_404(self, tdos: list[URL404ProbeTDO]) -> None: - """ - Modifies: - URL404ProbeTDO.is_404 - """ - responses = await self.url_request_interface.make_simple_requests( - urls=[tdo.url for tdo in tdos] - ) - for tdo, response in zip(tdos, responses): - if response.status is None: - continue - tdo.is_404 = response.status == HTTPStatus.NOT_FOUND - - - async def inner_task_logic(self) -> None: - tdos = await self.get_pending_urls_not_recently_probed_for_404() - url_ids = [task_info.url_id for task_info in tdos] - await self.link_urls_to_task(url_ids=url_ids) - await self.probe_urls_for_404(tdos) - url_ids_404 = [tdo.url_id for tdo in tdos if tdo.is_404] - - await self.update_404s_in_database(url_ids_404) - await self.mark_as_recently_probed_for_404(url_ids) - - async def get_pending_urls_not_recently_probed_for_404(self) -> list[URL404ProbeTDO]: - return await self.adb_client.get_pending_urls_not_recently_probed_for_404() - - async def update_404s_in_database(self, url_ids_404: list[int]) -> None: - """ - Modifies: - URL data in DB - """ - await self.adb_client.mark_all_as_404(url_ids_404) - - async def mark_as_recently_probed_for_404(self, url_ids: list[int]) -> None: - """ - Modifies: - URL data in DB - """ - await self.adb_client.mark_all_as_recently_probed_for_404(url_ids) - diff --git a/src/core/tasks/url/operators/probe_404/tdo.py b/src/core/tasks/url/operators/probe_404/tdo.py deleted file mode 100644 index f24cd7b3..00000000 --- a/src/core/tasks/url/operators/probe_404/tdo.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Optional - -from pydantic import BaseModel - - -class URL404ProbeTDO(BaseModel): - url_id: int - url: str - is_404: Optional[bool] = None \ No newline at end of file diff --git a/src/db/client/async_.py b/src/db/client/async_.py index 2844ab57..792dc5bb 100644 --- a/src/db/client/async_.py +++ b/src/db/client/async_.py @@ -1,10 +1,8 @@ from datetime import datetime, timedelta from functools import wraps -from operator import or_ from typing import Optional, Type, Any, List, Sequence from sqlalchemy import select, exists, func, Select, and_, update, delete, Row, text -from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.orm import selectinload @@ -48,9 +46,6 @@ from src.core.tasks.url.operators.html.queries.get import \ GetPendingURLsWithoutHTMLDataQueryBuilder from src.core.tasks.url.operators.misc_metadata.tdo import URLMiscellaneousMetadataTDO -from src.core.tasks.url.operators.probe.queries.urls.not_probed.exists import HasURLsWithoutProbeQueryBuilder -from src.core.tasks.url.operators.probe.queries.urls.not_probed.get.query import GetURLsWithoutProbeQueryBuilder -from src.core.tasks.url.operators.probe_404.tdo import URL404ProbeTDO from src.core.tasks.url.operators.submit_approved.queries.mark_submitted import MarkURLsAsSubmittedQueryBuilder from src.core.tasks.url.operators.submit_approved.tdo import SubmittedURLInfo from src.db.client.helpers import add_standard_limit_and_offset @@ -60,7 +55,6 @@ from src.db.dto_converter import DTOConverter from src.db.dtos.url.html_content import URLHTMLContentInfo from src.db.dtos.url.insert import InsertURLsInfo -from src.db.dtos.url.mapping import URLMapping from src.db.dtos.url.raw_html import RawHTMLInfo from src.db.enums import TaskType from src.db.helpers.session import session_helper as sh @@ -87,7 +81,6 @@ from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML from src.db.models.impl.url.html.content.sqlalchemy import URLHTMLContent from src.db.models.impl.url.optional_data_source_metadata import URLOptionalDataSourceMetadata -from src.db.models.impl.url.probed_for_404 import URLProbedFor404 from src.db.models.impl.url.suggestion.agency.user import UserUrlAgencySuggestion from src.db.models.impl.url.suggestion.record_type.auto import AutoRecordTypeSuggestion from src.db.models.impl.url.suggestion.record_type.user import UserRecordTypeSuggestion @@ -938,77 +931,15 @@ async def populate_backlog_snapshot( session.add(snapshot) async def mark_all_as_404(self, url_ids: List[int]): - query = update(URL).where(URL.id.in_(url_ids)).values(status=URLStatus.NOT_FOUND.value) - await self.execute(query) query = update(URLWebMetadata).where(URLWebMetadata.url_id.in_(url_ids)).values(status_code=404) await self.execute(query) - async def mark_all_as_recently_probed_for_404( - self, - url_ids: List[int], - dt: datetime = func.now() - ): - values = [ - {"url_id": url_id, "last_probed_at": dt} for url_id in url_ids - ] - stmt = pg_insert(URLProbedFor404).values(values) - update_stmt = stmt.on_conflict_do_update( - index_elements=['url_id'], - set_={"last_probed_at": dt} - ) - await self.execute(update_stmt) - @session_manager async def mark_as_checked_for_duplicates(self, session: AsyncSession, url_ids: list[int]): for url_id in url_ids: url_checked_for_duplicate = URLCheckedForDuplicate(url_id=url_id) session.add(url_checked_for_duplicate) - @session_manager - async def has_pending_urls_not_recently_probed_for_404(self, session: AsyncSession) -> bool: - month_ago = func.now() - timedelta(days=30) - query = ( - select( - URL.id - ).outerjoin( - URLProbedFor404 - ).where( - and_( - URL.status == URLStatus.OK.value, - or_( - URLProbedFor404.id == None, - URLProbedFor404.last_probed_at < month_ago - ) - ) - ).limit(1) - ) - - raw_result = await session.execute(query) - result = raw_result.one_or_none() - return result is not None - - @session_manager - async def get_pending_urls_not_recently_probed_for_404(self, session: AsyncSession) -> List[URL404ProbeTDO]: - month_ago = func.now() - timedelta(days=30) - query = ( - select( - URL - ).outerjoin( - URLProbedFor404 - ).where( - and_( - URL.status == URLStatus.OK.value, - or_( - URLProbedFor404.id == None, - URLProbedFor404.last_probed_at < month_ago - ) - ) - ).limit(100) - ) - - raw_result = await session.execute(query) - urls = raw_result.scalars().all() - return [URL404ProbeTDO(url=url.url, url_id=url.id) for url in urls] async def get_urls_aggregated_pending_metrics(self): return await self.run_query_builder(GetMetricsURLSAggregatedPendingQueryBuilder()) diff --git a/src/db/enums.py b/src/db/enums.py index f7ca4611..b232c188 100644 --- a/src/db/enums.py +++ b/src/db/enums.py @@ -43,7 +43,6 @@ class TaskType(PyEnum): SUBMIT_META_URLS = "Submit Meta URLs" DUPLICATE_DETECTION = "Duplicate Detection" IDLE = "Idle" - PROBE_404 = "404 Probe" PROBE_URL = "URL Probe" ROOT_URL = "Root URL" IA_PROBE = "Internet Archives Probe" diff --git a/src/db/helpers/session/session_helper.py b/src/db/helpers/session/session_helper.py index bf92f686..43369ff3 100644 --- a/src/db/helpers/session/session_helper.py +++ b/src/db/helpers/session/session_helper.py @@ -64,11 +64,13 @@ async def bulk_upsert( upsert_model.model_dump() for upsert_model in models ] - # Set all non-id fields to the values in the upsert mapping + # Set all but two fields to the values in the upsert mapping set_ = {} for k, v in upsert_mappings[0].items(): if k == parser.id_field: continue + if k == "created_at": + continue set_[k] = getattr(query.excluded, k) # Add upsert logic to update on conflict diff --git a/src/db/models/impl/flag/url_validated/enums.py b/src/db/models/impl/flag/url_validated/enums.py index 7c410493..7ac2a0ad 100644 --- a/src/db/models/impl/flag/url_validated/enums.py +++ b/src/db/models/impl/flag/url_validated/enums.py @@ -5,4 +5,5 @@ class URLType(Enum): DATA_SOURCE = "data source" META_URL = "meta url" NOT_RELEVANT = "not relevant" - INDIVIDUAL_RECORD = "individual record" \ No newline at end of file + INDIVIDUAL_RECORD = "individual record" + BROKEN_PAGE = "broken page" \ No newline at end of file diff --git a/src/db/models/impl/url/core/sqlalchemy.py b/src/db/models/impl/url/core/sqlalchemy.py index db416769..3582dd56 100644 --- a/src/db/models/impl/url/core/sqlalchemy.py +++ b/src/db/models/impl/url/core/sqlalchemy.py @@ -1,14 +1,11 @@ from sqlalchemy import Column, Text, String, JSON from sqlalchemy.orm import relationship -from src.api.endpoints.annotate.all.get.models.name import NameAnnotationSuggestion from src.collectors.enums import URLStatus -from src.core.enums import RecordType from src.db.models.helpers import enum_column from src.db.models.impl.url.checked_for_duplicate import URLCheckedForDuplicate from src.db.models.impl.url.core.enums import URLSource from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML -from src.db.models.impl.url.probed_for_404 import URLProbedFor404 from src.db.models.impl.url.record_type.sqlalchemy import URLRecordType from src.db.models.impl.url.suggestion.location.auto.subtask.sqlalchemy import AutoLocationIDSubtask from src.db.models.impl.url.suggestion.name.sqlalchemy import URLNameSuggestion @@ -97,11 +94,6 @@ class URL(UpdatedAtMixin, CreatedAtMixin, WithIDBase): uselist=False, back_populates="url" ) - probed_for_404 = relationship( - URLProbedFor404, - uselist=False, - back_populates="url" - ) compressed_html = relationship( URLCompressedHTML, uselist=False, diff --git a/src/db/models/impl/url/probed_for_404.py b/src/db/models/impl/url/probed_for_404.py deleted file mode 100644 index 478ce9de..00000000 --- a/src/db/models/impl/url/probed_for_404.py +++ /dev/null @@ -1,14 +0,0 @@ -from sqlalchemy.orm import relationship - -from src.db.models.helpers import get_created_at_column -from src.db.models.mixins import URLDependentMixin -from src.db.models.templates_.with_id import WithIDBase - - -class URLProbedFor404(URLDependentMixin, WithIDBase): - __tablename__ = 'url_probed_for_404' - - last_probed_at = get_created_at_column() - - # Relationships - url = relationship("URL", uselist=False, back_populates="probed_for_404") diff --git a/src/util/alembic_helpers.py b/src/util/alembic_helpers.py index 668d1298..85621ca4 100644 --- a/src/util/alembic_helpers.py +++ b/src/util/alembic_helpers.py @@ -1,5 +1,9 @@ +import uuid + from alembic import op import sqlalchemy as sa +from sqlalchemy import text + def switch_enum_type( table_name, @@ -176,4 +180,107 @@ def add_enum_value( enum_name: str, enum_value: str ) -> None: - op.execute(f"ALTER TYPE {enum_name} ADD VALUE '{enum_value}'") \ No newline at end of file + op.execute(f"ALTER TYPE {enum_name} ADD VALUE '{enum_value}'") + + + +def _q_ident(s: str) -> str: + return '"' + s.replace('"', '""') + '"' + + +def _q_label(s: str) -> str: + return "'" + s.replace("'", "''") + "'" + + +def remove_enum_value( + *, + enum_name: str, + value_to_remove: str, + targets: list[tuple[str, str]], # (table, column) + schema: str = "public", +) -> None: + """ + Remove `value_to_remove` from ENUM `schema.enum_name` across the given (table, column) pairs. + Assumes target columns have **no defaults**. + """ + conn = op.get_bind() + + # 1) Load current labels (ordered) + labels = [ + r[0] + for r in conn.execute( + text( + """ + SELECT e.enumlabel + FROM pg_enum e + JOIN pg_type t ON t.oid = e.enumtypid + JOIN pg_namespace n ON n.oid = t.typnamespace + WHERE t.typname = :enum_name + AND n.nspname = :schema + ORDER BY e.enumsortorder + """ + ), + {"enum_name": enum_name, "schema": schema}, + ).fetchall() + ] + if not labels: + raise RuntimeError(f"Enum {schema}.{enum_name!r} not found.") + if value_to_remove not in labels: + return # nothing to do + new_labels = [l for l in labels if l != value_to_remove] + if not new_labels: + raise RuntimeError("Refusing to remove the last remaining enum label.") + + # Deduplicate targets while preserving order + seen = set() + targets = [(t, c) for (t, c) in targets if not ((t, c) in seen or seen.add((t, c)))] + + # 2) Ensure no rows still hold the label + for table, col in targets: + count = conn.execute( + text( + f"SELECT COUNT(*) FROM {_q_ident(schema)}.{_q_ident(table)} " + f"WHERE {_q_ident(col)} = :v" + ), + {"v": value_to_remove}, + ).scalar() + if count and count > 0: + raise RuntimeError( + f"Cannot remove {value_to_remove!r}: {schema}.{table}.{col} " + f"has {count} row(s) with that value. UPDATE or DELETE them first." + ) + + # 3) Create a tmp enum without the value + tmp_name = f"{enum_name}__tmp__{uuid.uuid4().hex[:8]}" + op.execute( + text( + f"CREATE TYPE {_q_ident(schema)}.{_q_ident(tmp_name)} AS ENUM (" + + ", ".join(_q_label(l) for l in new_labels) + + ")" + ) + ) + + # 4) For each column: enum -> text -> tmp_enum + for table, col in targets: + op.execute( + text( + f"ALTER TABLE {_q_ident(schema)}.{_q_ident(table)} " + f"ALTER COLUMN {_q_ident(col)} TYPE TEXT USING {_q_ident(col)}::TEXT" + ) + ) + op.execute( + text( + f"ALTER TABLE {_q_ident(schema)}.{_q_ident(table)} " + f"ALTER COLUMN {_q_ident(col)} TYPE {_q_ident(schema)}.{_q_ident(tmp_name)} " + f"USING {_q_ident(col)}::{_q_ident(schema)}.{_q_ident(tmp_name)}" + ) + ) + + # 5) Swap: drop old enum, rename tmp -> original name + op.execute(text(f"DROP TYPE {_q_ident(schema)}.{_q_ident(enum_name)}")) + op.execute( + text( + f"ALTER TYPE {_q_ident(schema)}.{_q_ident(tmp_name)} " + f"RENAME TO {_q_ident(enum_name)}" + ) + ) \ No newline at end of file diff --git a/tests/automated/integration/tasks/url/impl/test_url_404_probe.py b/tests/automated/integration/tasks/url/impl/test_url_404_probe.py deleted file mode 100644 index e55ad9ad..00000000 --- a/tests/automated/integration/tasks/url/impl/test_url_404_probe.py +++ /dev/null @@ -1,168 +0,0 @@ -import types -from http import HTTPStatus - -import pendulum -import pytest -from aiohttp import ClientResponseError, RequestInfo - -from src.core.tasks.url.operators.probe_404.core import URL404ProbeTaskOperator -from src.external.url_request.core import URLRequestInterface -from src.db.models.impl.url.probed_for_404 import URLProbedFor404 -from src.db.models.impl.url.core.sqlalchemy import URL -from src.collectors.enums import URLStatus -from src.core.tasks.url.enums import TaskOperatorOutcome -from src.external.url_request.dtos.url_response import URLResponseInfo -from tests.helpers.batch_creation_parameters.enums import URLCreationEnum -from tests.helpers.data_creator.core import DBDataCreator -from tests.helpers.batch_creation_parameters.url_creation_parameters import TestURLCreationParameters -from tests.helpers.batch_creation_parameters.core import TestBatchCreationParameters - - -@pytest.mark.asyncio -async def test_url_404_probe_task( - wiped_database, - db_data_creator: DBDataCreator -): - - mock_html_content = "" - mock_content_type = "text/html" - adb_client = db_data_creator.adb_client - - async def mock_make_simple_requests(self, urls: list[str]) -> list[URLResponseInfo]: - """ - Mock make_simple_requests so that - - the first url returns a 200 - - the second url returns a 404 - - the third url returns a general error - - """ - results = [] - for idx, url in enumerate(urls): - if idx == 1: - results.append( - URLResponseInfo( - success=False, - content_type=mock_content_type, - exception=str(ClientResponseError( - request_info=RequestInfo( - url=url, - method="GET", - real_url=url, - headers={}, - ), - code=HTTPStatus.NOT_FOUND.value, - history=(None,), - )), - status=HTTPStatus.NOT_FOUND - ) - ) - elif idx == 2: - results.append( - URLResponseInfo( - success=False, - exception=str(ValueError("test error")), - content_type=mock_content_type - ) - ) - else: - results.append(URLResponseInfo( - html=mock_html_content, success=True, content_type=mock_content_type)) - return results - - url_request_interface = URLRequestInterface() - url_request_interface.make_simple_requests = types.MethodType(mock_make_simple_requests, url_request_interface) - - operator = URL404ProbeTaskOperator( - url_request_interface=url_request_interface, - adb_client=adb_client - ) - # Check that initially prerequisites aren't met - meets_prereqs = await operator.meets_task_prerequisites() - assert not meets_prereqs - - # Add 4 URLs, 3 pending, 1 error - creation_info = await db_data_creator.batch_v2( - parameters=TestBatchCreationParameters( - urls=[ - TestURLCreationParameters( - count=3, - status=URLCreationEnum.OK, - with_html_content=True - ), - TestURLCreationParameters( - count=1, - status=URLCreationEnum.ERROR, - with_html_content=False - ), - ] - ) - ) - - meets_prereqs = await operator.meets_task_prerequisites() - assert meets_prereqs - - # Run task and validate results - run_info = await operator.run_task() - assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message - - - pending_url_mappings = creation_info.urls_by_status[URLCreationEnum.OK].url_mappings - url_id_success = pending_url_mappings[0].url_id - url_id_404 = pending_url_mappings[1].url_id - url_id_error = pending_url_mappings[2].url_id - - url_id_initial_error = creation_info.urls_by_status[URLCreationEnum.ERROR].url_mappings[0].url_id - - # Check that URLProbedFor404 has been appropriately populated - probed_for_404_objects: list[URLProbedFor404] = await db_data_creator.adb_client.get_all(URLProbedFor404) - - assert len(probed_for_404_objects) == 3 - assert probed_for_404_objects[0].url_id == url_id_success - assert probed_for_404_objects[1].url_id == url_id_404 - assert probed_for_404_objects[2].url_id == url_id_error - - # Check that the URLs have been updated appropriated - urls: list[URL] = await adb_client.get_all(URL) - - def find_url(url_id: int) -> URL: - for url in urls: - if url.id == url_id: - return url - raise Exception(f"URL with id {url_id} not found") - - assert find_url(url_id_success).status == URLStatus.OK - assert find_url(url_id_404).status == URLStatus.NOT_FOUND - assert find_url(url_id_error).status == URLStatus.OK - assert find_url(url_id_initial_error).status == URLStatus.ERROR - - # Check that meets_task_prerequisites now returns False - meets_prereqs = await operator.meets_task_prerequisites() - assert not meets_prereqs - - # Check that meets_task_prerequisites returns True - # After setting the last probed for 404 date to 2 months ago - two_months_ago = pendulum.now().subtract(months=2).naive() - await adb_client.mark_all_as_recently_probed_for_404( - [url_id_404, url_id_error], - dt=two_months_ago - ) - - meets_prereqs = await operator.meets_task_prerequisites() - assert meets_prereqs - - # Run the task and Ensure all but the URL previously marked as 404 have been checked again - run_info = await operator.run_task() - assert run_info.outcome == TaskOperatorOutcome.SUCCESS, run_info.message - - probed_for_404_objects: list[URLProbedFor404] = await db_data_creator.adb_client.get_all(URLProbedFor404) - - assert len(probed_for_404_objects) == 3 - assert probed_for_404_objects[0].last_probed_at != two_months_ago - assert probed_for_404_objects[1].last_probed_at == two_months_ago - assert probed_for_404_objects[2].last_probed_at != two_months_ago - - - - - - diff --git a/tests/automated/integration/tasks/url/loader/test_flags.py b/tests/automated/integration/tasks/url/loader/test_flags.py index 777038b1..f812c947 100644 --- a/tests/automated/integration/tasks/url/loader/test_flags.py +++ b/tests/automated/integration/tasks/url/loader/test_flags.py @@ -10,7 +10,6 @@ from src.core.tasks.url.operators.html.core import URLHTMLTaskOperator from src.core.tasks.url.operators.misc_metadata.core import URLMiscellaneousMetadataTaskOperator from src.core.tasks.url.operators.probe.core import URLProbeTaskOperator -from src.core.tasks.url.operators.probe_404.core import URL404ProbeTaskOperator from src.core.tasks.url.operators.record_type.core import URLRecordTypeTaskOperator from src.core.tasks.url.operators.root_url.core import URLRootURLTaskOperator from src.core.tasks.url.operators.submit_approved.core import SubmitApprovedURLTaskOperator @@ -45,10 +44,6 @@ class Config: env_var="URL_MISC_METADATA_TASK_FLAG", operator=URLMiscellaneousMetadataTaskOperator ), - FlagTestParams( - env_var="URL_404_PROBE_TASK_FLAG", - operator=URL404ProbeTaskOperator - ), FlagTestParams( env_var="URL_AUTO_RELEVANCE_TASK_FLAG", operator=URLAutoRelevantTaskOperator diff --git a/tests/automated/integration/tasks/url/loader/test_happy_path.py b/tests/automated/integration/tasks/url/loader/test_happy_path.py index bd5a431c..a7b02e89 100644 --- a/tests/automated/integration/tasks/url/loader/test_happy_path.py +++ b/tests/automated/integration/tasks/url/loader/test_happy_path.py @@ -2,7 +2,7 @@ from src.core.tasks.url.loader import URLTaskOperatorLoader -NUMBER_OF_TASK_OPERATORS: int = 15 +NUMBER_OF_TASK_OPERATORS: int = 14 @pytest.mark.asyncio async def test_happy_path( diff --git a/tests/conftest.py b/tests/conftest.py index 8333529e..8ba93200 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,7 +16,6 @@ from src.db.models.impl.log.sqlalchemy import Log # noqa: F401 from src.db.models.impl.task.error import TaskError # noqa: F401 from src.db.models.impl.url.checked_for_duplicate import URLCheckedForDuplicate # noqa: F401 -from src.db.models.impl.url.probed_for_404 import URLProbedFor404 # noqa: F401 from src.db.client.async_ import AsyncDatabaseClient from src.db.client.sync import DatabaseClient from src.db.helpers.connect import get_postgres_connection_string diff --git a/tests/helpers/data_creator/commands/impl/urls_/convert.py b/tests/helpers/data_creator/commands/impl/urls_/convert.py index bfefc7bd..66747e6c 100644 --- a/tests/helpers/data_creator/commands/impl/urls_/convert.py +++ b/tests/helpers/data_creator/commands/impl/urls_/convert.py @@ -17,8 +17,6 @@ def convert_url_creation_enum_to_url_status(url_creation_enum: URLCreationEnum) return URLStatus.ERROR case URLCreationEnum.DUPLICATE: return URLStatus.DUPLICATE - case URLCreationEnum.NOT_FOUND: - return URLStatus.NOT_FOUND case _: raise ValueError(f"Unknown URLCreationEnum: {url_creation_enum}")