Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Add html duplicate url materialized view

Revision ID: d5f0cc2be6b6
Revises: 5ac9d50b91c5
Create Date: 2025-11-27 09:07:28.767553

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

Check warning on line 11 in alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py#L11 <401>

'sqlalchemy as sa' imported but unused
Raw output
./alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py:11:1: F401 'sqlalchemy as sa' imported but unused


# revision identifiers, used by Alembic.
revision: str = 'd5f0cc2be6b6'
down_revision: Union[str, None] = '5ac9d50b91c5'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:

Check warning on line 21 in alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py#L21 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py:21:1: D103 Missing docstring in public function
op.execute("""
create extension pgcrypto;
""")

op.execute("""
CREATE MATERIALIZED VIEW mat_view__html_duplicate_url AS
WITH
hashes AS (
SELECT
url_id,
digest(compressed_html, 'sha256') AS hash
FROM
url_compressed_html
)
, duplicate_hashes as (
SELECT
hash AS content_hash,
COUNT(*) AS n,
ARRAY_AGG(url_id ORDER BY url_id) AS url_ids
FROM
hashes
GROUP BY
hash
HAVING
COUNT(*) > 1
)
select
urls.id as url_id
from urls
join hashes h on h.url_id = urls.id
join duplicate_hashes dh on dh.content_hash = h.hash;
""")


def downgrade() -> None:

Check warning on line 56 in alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py#L56 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_11_27_0907-d5f0cc2be6b6_add_html_duplicate_url_materialized_view.py:56:1: D103 Missing docstring in public function
pass
2 changes: 1 addition & 1 deletion src/core/tasks/scheduled/impl/huggingface/operator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from itertools import count

from src.core.tasks.mixins.prereq import HasPrerequisitesMixin
from src.core.tasks.scheduled.impl.huggingface.queries.check.core import CheckValidURLsUpdatedQueryBuilder
from src.core.tasks.scheduled.impl.huggingface.queries.prereq.core import CheckValidURLsUpdatedQueryBuilder
from src.core.tasks.scheduled.impl.huggingface.queries.get.core import GetForLoadingToHuggingFaceQueryBuilder
from src.core.tasks.scheduled.impl.huggingface.queries.get.model import GetForLoadingToHuggingFaceOutput
from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase
Expand Down
38 changes: 38 additions & 0 deletions src/core/tasks/scheduled/impl/huggingface/queries/cte.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from datetime import datetime

Check warning on line 1 in src/core/tasks/scheduled/impl/huggingface/queries/cte.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/huggingface/queries/cte.py#L1 <100>

Missing docstring in public module
Raw output
./src/core/tasks/scheduled/impl/huggingface/queries/cte.py:1:1: D100 Missing docstring in public module

from sqlalchemy import select, Column

from src.db.enums import TaskType
from src.db.helpers.query import exists_url, no_url_task_error, not_exists_url
from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated
from src.db.models.impl.url.core.sqlalchemy import URL
from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML
from src.db.models.materialized_views.html_duplicate_url import HTMLDuplicateURLMaterializedView


class HuggingfacePrereqCTEContainer:

Check warning on line 13 in src/core/tasks/scheduled/impl/huggingface/queries/cte.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/huggingface/queries/cte.py#L13 <101>

Missing docstring in public class
Raw output
./src/core/tasks/scheduled/impl/huggingface/queries/cte.py:13:1: D101 Missing docstring in public class

def __init__(self):

Check warning on line 15 in src/core/tasks/scheduled/impl/huggingface/queries/cte.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/huggingface/queries/cte.py#L15 <107>

Missing docstring in __init__
Raw output
./src/core/tasks/scheduled/impl/huggingface/queries/cte.py:15:1: D107 Missing docstring in __init__
self.cte = (
select(
URL.id,
URL.updated_at
)
.join(
URLCompressedHTML,
URL.id == URLCompressedHTML.url_id
)
.where(
exists_url(FlagURLValidated),
not_exists_url(HTMLDuplicateURLMaterializedView),
no_url_task_error(TaskType.PUSH_TO_HUGGINGFACE)
)
)

@property
def url_id(self) -> Column[int]:

Check warning on line 33 in src/core/tasks/scheduled/impl/huggingface/queries/cte.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/huggingface/queries/cte.py#L33 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/huggingface/queries/cte.py:33:1: D102 Missing docstring in public method
return self.cte.c.id

@property
def updated_at(self) -> Column[datetime]:

Check warning on line 37 in src/core/tasks/scheduled/impl/huggingface/queries/cte.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/huggingface/queries/cte.py#L37 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/huggingface/queries/cte.py:37:1: D102 Missing docstring in public method
return self.cte.c.updated_at

Check warning on line 38 in src/core/tasks/scheduled/impl/huggingface/queries/cte.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/huggingface/queries/cte.py#L38 <292>

no newline at end of file
Raw output
./src/core/tasks/scheduled/impl/huggingface/queries/cte.py:38:37: W292 no newline at end of file
12 changes: 9 additions & 3 deletions src/core/tasks/scheduled/impl/huggingface/queries/get/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from src.core.tasks.scheduled.impl.huggingface.queries.cte import HuggingfacePrereqCTEContainer
from src.core.tasks.scheduled.impl.huggingface.queries.get.convert import convert_fine_to_coarse_record_type, \
convert_validated_type_to_relevant
from src.core.tasks.scheduled.impl.huggingface.queries.get.model import GetForLoadingToHuggingFaceOutput
Expand All @@ -23,21 +24,26 @@


async def run(self, session: AsyncSession) -> list[GetForLoadingToHuggingFaceOutput]:
label_url_id = 'url_id'
label_url = 'url'
label_record_type_fine = 'record_type_fine'
label_html = 'html'
label_type = 'type'


cte = HuggingfacePrereqCTEContainer()

Check failure on line 33 in src/core/tasks/scheduled/impl/huggingface/queries/get/core.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/huggingface/queries/get/core.py#L33 <303>

too many blank lines (2)
Raw output
./src/core/tasks/scheduled/impl/huggingface/queries/get/core.py:33:9: E303 too many blank lines (2)

query = (
select(
URL.id.label(label_url_id),
cte.url_id,
URL.full_url.label(label_url),
URLRecordType.record_type.label(label_record_type_fine),
URLCompressedHTML.compressed_html.label(label_html),
FlagURLValidated.type.label(label_type)
)
.join(
URL,
cte.url_id == URL.id
)
.join(
URLRecordType,
URL.id == URLRecordType.url_id
Expand Down Expand Up @@ -65,7 +71,7 @@
final_results = []
for result in db_results:
output = GetForLoadingToHuggingFaceOutput(
url_id=result[label_url_id],
url_id=result[cte.url_id],
url=result[label_url],
relevant=convert_validated_type_to_relevant(
URLType(result[label_type])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from sqlalchemy.ext.asyncio import AsyncSession

from src.core.tasks.scheduled.impl.huggingface.queries.check.requester import CheckValidURLsUpdatedRequester
from src.core.tasks.scheduled.impl.huggingface.queries.prereq.requester import CheckValidURLsUpdatedRequester
from src.db.queries.base.builder import QueryBuilderBase


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sqlalchemy.sql.functions import count

from src.collectors.enums import URLStatus
from src.core.tasks.scheduled.impl.huggingface.queries.cte import HuggingfacePrereqCTEContainer
from src.db.enums import TaskType
from src.db.helpers.query import not_exists_url, no_url_task_error, exists_url
from src.db.helpers.session import session_helper as sh
Expand All @@ -32,21 +33,17 @@
)

async def has_valid_urls(self, last_upload_at: datetime | None) -> bool:
cte = HuggingfacePrereqCTEContainer()
query = (
select(count(URL.id))
.join(
URLCompressedHTML,
URL.id == URLCompressedHTML.url_id
)
.where(
exists_url(FlagURLValidated),
no_url_task_error(TaskType.PUSH_TO_HUGGINGFACE)
select(
cte.url_id
)
)
if last_upload_at is not None:
query = query.where(URL.updated_at > last_upload_at)
url_count = await sh.scalar(
query = query.where(cte.updated_at > last_upload_at)
query = query.limit(1)
result = await sh.one_or_none(
session=self.session,
query=query
)
return url_count > 0
return result is not None

Check warning on line 49 in src/core/tasks/scheduled/impl/huggingface/queries/prereq/requester.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/huggingface/queries/prereq/requester.py#L49 <292>

no newline at end of file
Raw output
./src/core/tasks/scheduled/impl/huggingface/queries/prereq/requester.py:49:34: W292 no newline at end of file
3 changes: 3 additions & 0 deletions src/db/client/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,4 +919,7 @@ async def refresh_materialized_views(self):
)
await self.execute(
text("REFRESH MATERIALIZED VIEW batch_url_status_mat_view")
)
await self.execute(
text("REFRESH MATERIALIZED VIEW mat_view__html_duplicate_url")
)
Empty file.
9 changes: 9 additions & 0 deletions src/db/models/materialized_views/html_duplicate_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from src.db.models.mixins import URLDependentViewMixin

Check warning on line 1 in src/db/models/materialized_views/html_duplicate_url.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/db/models/materialized_views/html_duplicate_url.py#L1 <100>

Missing docstring in public module
Raw output
./src/db/models/materialized_views/html_duplicate_url.py:1:1: D100 Missing docstring in public module
from src.db.models.templates_.base import Base


class HTMLDuplicateURLMaterializedView(

Check warning on line 5 in src/db/models/materialized_views/html_duplicate_url.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/db/models/materialized_views/html_duplicate_url.py#L5 <101>

Missing docstring in public class
Raw output
./src/db/models/materialized_views/html_duplicate_url.py:5:1: D101 Missing docstring in public class
Base,
URLDependentViewMixin
):
__tablename__ = "mat_view__html_duplicate_url"

Check warning on line 9 in src/db/models/materialized_views/html_duplicate_url.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/db/models/materialized_views/html_duplicate_url.py#L9 <292>

no newline at end of file
Raw output
./src/db/models/materialized_views/html_duplicate_url.py:9:51: W292 no newline at end of file
28 changes: 28 additions & 0 deletions tests/automated/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,21 @@
)
return url_id

@pytest_asyncio.fixture
async def test_url_data_source_id_2(

Check warning on line 225 in tests/automated/integration/conftest.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] tests/automated/integration/conftest.py#L225 <103>

Missing docstring in public function
Raw output
./tests/automated/integration/conftest.py:225:1: D103 Missing docstring in public function
db_data_creator: DBDataCreator,
test_agency_id: int
) -> int:
url_id: int = (await db_data_creator.create_validated_urls(
record_type=RecordType.CAR_GPS,
validation_type=URLType.DATA_SOURCE,
))[0].url_id
await db_data_creator.link_urls_to_agencies(
url_ids=[url_id],
agency_ids=[test_agency_id]
)
return url_id

@pytest_asyncio.fixture
async def test_url_id(
db_data_creator: DBDataCreator,
Expand All @@ -233,6 +248,19 @@
)
return await db_data_creator.adb_client.add(url, return_id=True)

@pytest_asyncio.fixture
async def test_url_id_2(

Check warning on line 252 in tests/automated/integration/conftest.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] tests/automated/integration/conftest.py#L252 <103>

Missing docstring in public function
Raw output
./tests/automated/integration/conftest.py:252:1: D103 Missing docstring in public function
db_data_creator: DBDataCreator,
) -> int:
url = URL(
url="example.com/2",
source=URLSource.COLLECTOR,
trailing_slash=False,
status=URLStatus.OK
)
return await db_data_creator.adb_client.add(url, return_id=True)


@pytest_asyncio.fixture
async def test_url_data_source_mapping(
db_data_creator: DBDataCreator,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest

Check warning on line 1 in tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py#L1 <100>

Missing docstring in public module
Raw output
./tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py:1:1: D100 Missing docstring in public module

from src.core.tasks.scheduled.impl.huggingface.operator import PushToHuggingFaceTaskOperator
from src.db.client.async_ import AsyncDatabaseClient
from src.db.models.impl.url.html.compressed.sqlalchemy import URLCompressedHTML


@pytest.mark.asyncio
async def test_huggingface_task_duplicate_html_content_not_picked_up(

Check warning on line 9 in tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py#L9 <103>

Missing docstring in public function
Raw output
./tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py:9:1: D103 Missing docstring in public function
adb_client_test: AsyncDatabaseClient,
operator: PushToHuggingFaceTaskOperator,
test_url_data_source_id: int,
test_url_data_source_id_2: int
):

# Add HTML content with the same hash
uch_1 = URLCompressedHTML(
url_id=test_url_data_source_id,
compressed_html=b"test"
)
uch_2 = URLCompressedHTML(
url_id=test_url_data_source_id_2,
compressed_html=b"test"
)
await adb_client_test.add_all([
uch_1,
uch_2
])

# Confirm task meets prerequisites
assert await operator.meets_task_prerequisites()

# Refresh materialized view
await adb_client_test.refresh_materialized_views()

# Confirm task does not meet prerequisites
assert not await operator.meets_task_prerequisites()

Check warning on line 38 in tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py#L38 <391>

blank line at end of file
Raw output
./tests/automated/integration/tasks/scheduled/impl/huggingface/test_duplicate_html_content_not_picked_up.py:38:1: W391 blank line at end of file