From 3ce56428cb076f50c9be95f018b391479efe7559 Mon Sep 17 00:00:00 2001 From: Max Chis Date: Thu, 27 Nov 2025 09:57:21 -0500 Subject: [PATCH] Add logic to prevent HTML content duplicates from being sent to HuggingFace --- ...dd_html_duplicate_url_materialized_view.py | 57 +++++++++++++++++++ .../scheduled/impl/huggingface/operator.py | 2 +- .../scheduled/impl/huggingface/queries/cte.py | 38 +++++++++++++ .../impl/huggingface/queries/get/core.py | 12 +++- .../queries/{check => prereq}/__init__.py | 0 .../queries/{check => prereq}/core.py | 2 +- .../queries/{check => prereq}/requester.py | 19 +++---- src/db/client/async_.py | 3 + src/db/models/materialized_views/__init__.py | 0 .../materialized_views/html_duplicate_url.py | 9 +++ tests/automated/integration/conftest.py | 28 +++++++++ ...st_duplicate_html_content_not_picked_up.py | 38 +++++++++++++ 12 files changed, 192 insertions(+), 16 deletions(-) create mode 100644 alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py create mode 100644 src/core/tasks/scheduled/impl/huggingface/queries/cte.py rename src/core/tasks/scheduled/impl/huggingface/queries/{check => prereq}/__init__.py (100%) rename src/core/tasks/scheduled/impl/huggingface/queries/{check => prereq}/core.py (78%) rename src/core/tasks/scheduled/impl/huggingface/queries/{check => prereq}/requester.py (75%) create mode 100644 src/db/models/materialized_views/__init__.py create mode 100644 src/db/models/materialized_views/html_duplicate_url.py create mode 100644 tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py diff --git a/alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py b/alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py new file mode 100644 index 00000000..1eeb1eb3 --- /dev/null +++ b/alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py @@ -0,0 +1,57 @@ +"""Add html duplicate url materialized view + +Revision ID: d5f0cc2be6b6 +Revises: 5ac9d50b91c5 +Create Date: 2025-11-27 09:07:28.767553 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'd5f0cc2be6b6' +down_revision: Union[str, None] = '5ac9d50b91c5' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute(""" + create extension pgcrypto; + """) + + op.execute(""" + CREATE MATERIALIZED VIEW mat_view__html_duplicate_url AS + WITH + hashes AS ( + SELECT + url_id, + digest(compressed_html, 'sha256') AS hash + FROM + url_compressed_html + ) + , duplicate_hashes as ( + SELECT + hash AS content_hash, + COUNT(*) AS n, + ARRAY_AGG(url_id ORDER BY url_id) AS url_ids + FROM + hashes + GROUP BY + hash + HAVING + COUNT(*) > 1 + ) + select + urls.id as url_id + from urls + join hashes h on h.url_id = urls.id + join duplicate_hashes dh on dh.content_hash = h.hash; + """) + + +def downgrade() -> None: + pass diff --git a/src/core/tasks/scheduled/impl/huggingface/operator.py b/src/core/tasks/scheduled/impl/huggingface/operator.py index 9bb7a85e..f644ff94 100644 --- a/src/core/tasks/scheduled/impl/huggingface/operator.py +++ b/src/core/tasks/scheduled/impl/huggingface/operator.py @@ -1,7 +1,7 @@ from itertools import count from src.core.tasks.mixins.prereq import HasPrerequisitesMixin -from src.core.tasks.scheduled.impl.huggingface.queries.check.core import CheckValidURLsUpdatedQueryBuilder +from src.core.tasks.scheduled.impl.huggingface.queries.prereq.core import CheckValidURLsUpdatedQueryBuilder from src.core.tasks.scheduled.impl.huggingface.queries.get.core import GetForLoadingToHuggingFaceQueryBuilder from src.core.tasks.scheduled.impl.huggingface.queries.get.model import GetForLoadingToHuggingFaceOutput from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase diff --git a/src/core/tasks/scheduled/impl/huggingface/queries/cte.py b/src/core/tasks/scheduled/impl/huggingface/queries/cte.py new file mode 100644 index 00000000..8ea75b0c --- /dev/null +++ b/src/core/tasks/scheduled/impl/huggingface/queries/cte.py @@ -0,0 +1,38 @@ +from datetime import datetime + +from sqlalchemy import select, Column + +from src.db.enums import TaskType +from src.db.helpers.query import exists_url, no_url_task_error, not_exists_url +from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated +from src.db.models.impl.url.core.sqlalchemy import URL +from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML +from src.db.models.materialized_views.html_duplicate_url import HTMLDuplicateURLMaterializedView + + +class HuggingfacePrereqCTEContainer: + + def __init__(self): + self.cte = ( + select( + URL.id, + URL.updated_at + ) + .join( + URLCompressedHTML, + URL.id == URLCompressedHTML.url_id + ) + .where( + exists_url(FlagURLValidated), + not_exists_url(HTMLDuplicateURLMaterializedView), + no_url_task_error(TaskType.PUSH_TO_HUGGINGFACE) + ) + ) + + @property + def url_id(self) -> Column[int]: + return self.cte.c.id + + @property + def updated_at(self) -> Column[datetime]: + return self.cte.c.updated_at \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/huggingface/queries/get/core.py b/src/core/tasks/scheduled/impl/huggingface/queries/get/core.py index 802e8ea5..10986a05 100644 --- a/src/core/tasks/scheduled/impl/huggingface/queries/get/core.py +++ b/src/core/tasks/scheduled/impl/huggingface/queries/get/core.py @@ -1,6 +1,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from src.core.tasks.scheduled.impl.huggingface.queries.cte import HuggingfacePrereqCTEContainer from src.core.tasks.scheduled.impl.huggingface.queries.get.convert import convert_fine_to_coarse_record_type, \ convert_validated_type_to_relevant from src.core.tasks.scheduled.impl.huggingface.queries.get.model import GetForLoadingToHuggingFaceOutput @@ -23,21 +24,26 @@ def __init__(self, page: int): async def run(self, session: AsyncSession) -> list[GetForLoadingToHuggingFaceOutput]: - label_url_id = 'url_id' label_url = 'url' label_record_type_fine = 'record_type_fine' label_html = 'html' label_type = 'type' + cte = HuggingfacePrereqCTEContainer() + query = ( select( - URL.id.label(label_url_id), + cte.url_id, URL.full_url.label(label_url), URLRecordType.record_type.label(label_record_type_fine), URLCompressedHTML.compressed_html.label(label_html), FlagURLValidated.type.label(label_type) ) + .join( + URL, + cte.url_id == URL.id + ) .join( URLRecordType, URL.id == URLRecordType.url_id @@ -65,7 +71,7 @@ async def run(self, session: AsyncSession) -> list[GetForLoadingToHuggingFaceOut final_results = [] for result in db_results: output = GetForLoadingToHuggingFaceOutput( - url_id=result[label_url_id], + url_id=result[cte.url_id], url=result[label_url], relevant=convert_validated_type_to_relevant( URLType(result[label_type]) diff --git a/src/core/tasks/scheduled/impl/huggingface/queries/check/__init__.py b/src/core/tasks/scheduled/impl/huggingface/queries/prereq/__init__.py similarity index 100% rename from src/core/tasks/scheduled/impl/huggingface/queries/check/__init__.py rename to src/core/tasks/scheduled/impl/huggingface/queries/prereq/__init__.py diff --git a/src/core/tasks/scheduled/impl/huggingface/queries/check/core.py b/src/core/tasks/scheduled/impl/huggingface/queries/prereq/core.py similarity index 78% rename from src/core/tasks/scheduled/impl/huggingface/queries/check/core.py rename to src/core/tasks/scheduled/impl/huggingface/queries/prereq/core.py index c76fa2e1..fdf82ba9 100644 --- a/src/core/tasks/scheduled/impl/huggingface/queries/check/core.py +++ b/src/core/tasks/scheduled/impl/huggingface/queries/prereq/core.py @@ -1,6 +1,6 @@ from sqlalchemy.ext.asyncio import AsyncSession -from src.core.tasks.scheduled.impl.huggingface.queries.check.requester import CheckValidURLsUpdatedRequester +from src.core.tasks.scheduled.impl.huggingface.queries.prereq.requester import CheckValidURLsUpdatedRequester from src.db.queries.base.builder import QueryBuilderBase diff --git a/src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py b/src/core/tasks/scheduled/impl/huggingface/queries/prereq/requester.py similarity index 75% rename from src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py rename to src/core/tasks/scheduled/impl/huggingface/queries/prereq/requester.py index ef43bd3d..1eaa306d 100644 --- a/src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py +++ b/src/core/tasks/scheduled/impl/huggingface/queries/prereq/requester.py @@ -6,6 +6,7 @@ from sqlalchemy.sql.functions import count from src.collectors.enums import URLStatus +from src.core.tasks.scheduled.impl.huggingface.queries.cte import HuggingfacePrereqCTEContainer from src.db.enums import TaskType from src.db.helpers.query import not_exists_url, no_url_task_error, exists_url from src.db.helpers.session import session_helper as sh @@ -32,21 +33,17 @@ async def latest_upload(self) -> datetime: ) async def has_valid_urls(self, last_upload_at: datetime | None) -> bool: + cte = HuggingfacePrereqCTEContainer() query = ( - select(count(URL.id)) - .join( - URLCompressedHTML, - URL.id == URLCompressedHTML.url_id - ) - .where( - exists_url(FlagURLValidated), - no_url_task_error(TaskType.PUSH_TO_HUGGINGFACE) + select( + cte.url_id ) ) if last_upload_at is not None: - query = query.where(URL.updated_at > last_upload_at) - url_count = await sh.scalar( + query = query.where(cte.updated_at > last_upload_at) + query = query.limit(1) + result = await sh.one_or_none( session=self.session, query=query ) - return url_count > 0 + return result is not None \ No newline at end of file diff --git a/src/db/client/async_.py b/src/db/client/async_.py index 913a0a35..0fb99f76 100644 --- a/src/db/client/async_.py +++ b/src/db/client/async_.py @@ -919,4 +919,7 @@ async def refresh_materialized_views(self): ) await self.execute( text("REFRESH MATERIALIZED VIEW batch_url_status_mat_view") + ) + await self.execute( + text("REFRESH MATERIALIZED VIEW mat_view__html_duplicate_url") ) \ No newline at end of file diff --git a/src/db/models/materialized_views/__init__.py b/src/db/models/materialized_views/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/db/models/materialized_views/html_duplicate_url.py b/src/db/models/materialized_views/html_duplicate_url.py new file mode 100644 index 00000000..703bbbea --- /dev/null +++ b/src/db/models/materialized_views/html_duplicate_url.py @@ -0,0 +1,9 @@ +from src.db.models.mixins import URLDependentViewMixin +from src.db.models.templates_.base import Base + + +class HTMLDuplicateURLMaterializedView( + Base, + URLDependentViewMixin +): + __tablename__ = "mat_view__html_duplicate_url" \ No newline at end of file diff --git a/tests/automated/integration/conftest.py b/tests/automated/integration/conftest.py index 4c6a76d0..19a9fe19 100644 --- a/tests/automated/integration/conftest.py +++ b/tests/automated/integration/conftest.py @@ -221,6 +221,21 @@ async def test_url_data_source_id( ) return url_id +@pytest_asyncio.fixture +async def test_url_data_source_id_2( + db_data_creator: DBDataCreator, + test_agency_id: int +) -> int: + url_id: int = (await db_data_creator.create_validated_urls( + record_type=RecordType.CAR_GPS, + validation_type=URLType.DATA_SOURCE, + ))[0].url_id + await db_data_creator.link_urls_to_agencies( + url_ids=[url_id], + agency_ids=[test_agency_id] + ) + return url_id + @pytest_asyncio.fixture async def test_url_id( db_data_creator: DBDataCreator, @@ -233,6 +248,19 @@ async def test_url_id( ) return await db_data_creator.adb_client.add(url, return_id=True) +@pytest_asyncio.fixture +async def test_url_id_2( + db_data_creator: DBDataCreator, +) -> int: + url = URL( + url="example.com/2", + source=URLSource.COLLECTOR, + trailing_slash=False, + status=URLStatus.OK + ) + return await db_data_creator.adb_client.add(url, return_id=True) + + @pytest_asyncio.fixture async def test_url_data_source_mapping( db_data_creator: DBDataCreator, diff --git a/tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py b/tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py new file mode 100644 index 00000000..be84ffd4 --- /dev/null +++ b/tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py @@ -0,0 +1,38 @@ +import pytest + +from src.core.tasks.scheduled.impl.huggingface.operator import PushToHuggingFaceTaskOperator +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML + + +@pytest.mark.asyncio +async def test_huggingface_task_duplicate_html_content_not_picked_up( + adb_client_test: AsyncDatabaseClient, + operator: PushToHuggingFaceTaskOperator, + test_url_data_source_id: int, + test_url_data_source_id_2: int +): + + # Add HTML content with the same hash + uch_1 = URLCompressedHTML( + url_id=test_url_data_source_id, + compressed_html=b"test" + ) + uch_2 = URLCompressedHTML( + url_id=test_url_data_source_id_2, + compressed_html=b"test" + ) + await adb_client_test.add_all([ + uch_1, + uch_2 + ]) + + # Confirm task meets prerequisites + assert await operator.meets_task_prerequisites() + + # Refresh materialized view + await adb_client_test.refresh_materialized_views() + + # Confirm task does not meet prerequisites + assert not await operator.meets_task_prerequisites() +