Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Remove batches dependency for urls

Revision ID: 9552d354ccf4
Revises: d6519a3ca5c9
Create Date: 2025-07-17 08:56:22.919486

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

from src.util.alembic_helpers import id_column, created_at_column, updated_at_column, url_id_column, batch_id_column

# revision identifiers, used by Alembic.
revision: str = '9552d354ccf4'
down_revision: Union[str, None] = 'd6519a3ca5c9'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

LINK_TABLE_NAME = 'link_batch_urls'

BATCHES_COLUMN_NAME = 'batch_id'

def _create_link_table():
op.create_table(
LINK_TABLE_NAME,
id_column(),
batch_id_column(),
url_id_column(),
created_at_column(),
updated_at_column(),
sa.UniqueConstraint('url_id', name='uq_link_table_url_id')
)

def _drop_link_table():
op.drop_table(LINK_TABLE_NAME)

def _migrate_batch_ids_to_link_table():
op.execute(f"""
INSERT INTO {LINK_TABLE_NAME} (batch_id, url_id)
SELECT batch_id, id
FROM urls
""")

def _migrate_link_table_to_batch_ids():
op.execute(f"""
UPDATE urls
SET batch_id = (
SELECT batch_id
FROM {LINK_TABLE_NAME}
WHERE url_id = urls.id
)
""")

def _drop_url_batches_column():
op.drop_column('urls', BATCHES_COLUMN_NAME)

def _add_url_batches_column():
op.add_column(
'urls',
batch_id_column(nullable=True)
)

def _add_not_null_constraint():
op.alter_column(
'urls',
BATCHES_COLUMN_NAME,
nullable=False
)


def upgrade() -> None:

Check warning on line 73 in alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py#L73 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py:73:1: D103 Missing docstring in public function
_create_link_table()
_migrate_batch_ids_to_link_table()
_drop_url_batches_column()


def downgrade() -> None:

Check warning on line 79 in alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py#L79 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_07_17_0856-9552d354ccf4_remove_batches_dependency_for_urls.py:79:1: D103 Missing docstring in public function
_add_url_batches_column()
_migrate_link_table_to_batch_ids()
_add_not_null_constraint()
_drop_link_table()
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from src.api.endpoints.annotate.dtos.shared.batch import AnnotationBatchInfo
from src.collectors.enums import URLStatus
from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL
from src.db.models.instantiations.url.core import URL
from src.db.queries.base.builder import QueryBuilderBase
from src.db.statement_composer import StatementComposer
Expand Down Expand Up @@ -35,11 +36,14 @@ async def run(
for model in self.models
]

select_url = select(func.count(URL.id))
select_url = (
select(func.count(URL.id))
.join(LinkBatchURL)
)

common_where_clause = [
URL.outcome == URLStatus.PENDING.value,
URL.batch_id == self.batch_id,
LinkBatchURL.batch_id == self.batch_id,
]

annotated_query = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from src.collectors.enums import URLStatus
from src.core.enums import SuggestedStatus
from src.db.client.types import UserSuggestionModel
from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL
from src.db.models.instantiations.url.core import URL
from src.db.models.instantiations.url.suggestion.relevant.user import UserRelevantSuggestion
from src.db.queries.base.builder import QueryBuilderBase
Expand All @@ -27,10 +28,21 @@
self.auto_suggestion_relationship = auto_suggestion_relationship

async def run(self, session: AsyncSession):
url_query = (
query = (
select(
URL,
)
)

if self.batch_id is not None:
query = (
query
.join(LinkBatchURL)
.where(LinkBatchURL.batch_id == self.batch_id)
)

query = (
query
.where(URL.outcome == URLStatus.PENDING.value)
# URL must not have user suggestion
.where(
Expand All @@ -39,7 +51,7 @@
)

if self.check_if_annotated_not_relevant:
url_query = url_query.where(
query = query.where(
not_(
exists(
select(UserRelevantSuggestion)
Expand All @@ -51,14 +63,13 @@
)
)

if self.batch_id is not None:
url_query = url_query.where(URL.batch_id == self.batch_id)

url_query = url_query.options(

query = query.options(

Check failure on line 68 in src/api/endpoints/annotate/_shared/queries/get_next_url_for_user_annotation.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/annotate/_shared/queries/get_next_url_for_user_annotation.py#L68 <303>

too many blank lines (3)
Raw output
./src/api/endpoints/annotate/_shared/queries/get_next_url_for_user_annotation.py:68:9: E303 too many blank lines (3)
joinedload(self.auto_suggestion_relationship),
joinedload(URL.html_content)
).limit(1)

raw_result = await session.execute(url_query)
raw_result = await session.execute(query)

return raw_result.unique().scalars().one_or_none()
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from src.core.tasks.url.operators.url_html.scraper.parser.util import convert_to_response_html_info
from src.db.dtos.url.mapping import URLMapping
from src.db.models.instantiations.confirmed_url_agency import ConfirmedURLAgency
from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL
from src.db.models.instantiations.url.core import URL
from src.db.models.instantiations.url.suggestion.agency.auto import AutomatedUrlAgencySuggestion
from src.db.models.instantiations.url.suggestion.agency.user import UserUrlAgencySuggestion
Expand Down Expand Up @@ -41,20 +42,19 @@ async def run(
have extant autosuggestions
"""
# Select statement
statement = (
select(URL.id, URL.url)
# Must not have confirmed agencies
.where(
URL.outcome == URLStatus.PENDING.value
)
query = select(URL.id, URL.url)
if self.batch_id is not None:
query = query.join(LinkBatchURL).where(LinkBatchURL.batch_id == self.batch_id)

# Must not have confirmed agencies
query = query.where(
URL.outcome == URLStatus.PENDING.value
)

if self.batch_id is not None:
statement = statement.where(URL.batch_id == self.batch_id)

# Must not have been annotated by a user
statement = (
statement.join(UserUrlAgencySuggestion, isouter=True)
query = (
query.join(UserUrlAgencySuggestion, isouter=True)
.where(
~exists(
select(UserUrlAgencySuggestion).
Expand Down Expand Up @@ -93,7 +93,7 @@ async def run(
)
)
).limit(1)
raw_result = await session.execute(statement)
raw_result = await session.execute(query)
results = raw_result.all()
if len(results) == 0:
return GetNextURLForAgencyAnnotationResponse(
Expand Down
9 changes: 6 additions & 3 deletions src/api/endpoints/annotate/all/get/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from src.collectors.enums import URLStatus
from src.db.dto_converter import DTOConverter
from src.db.dtos.url.mapping import URLMapping
from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL
from src.db.models.instantiations.url.core import URL
from src.db.models.instantiations.url.suggestion.agency.user import UserUrlAgencySuggestion
from src.db.models.instantiations.url.suggestion.record_type.user import UserRecordTypeSuggestion
Expand All @@ -32,8 +33,11 @@ async def run(
self,
session: AsyncSession
) -> GetNextURLForAllAnnotationResponse:
query = Select(URL)
if self.batch_id is not None:
query = query.join(LinkBatchURL).where(LinkBatchURL.batch_id == self.batch_id)
query = (
Select(URL)
query
.where(
and_(
URL.outcome == URLStatus.PENDING.value,
Expand All @@ -43,8 +47,7 @@ async def run(
)
)
)
if self.batch_id is not None:
query = query.where(URL.batch_id == self.batch_id)


load_options = [
URL.html_content,
Expand Down
60 changes: 60 additions & 0 deletions src/api/endpoints/batch/duplicates/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from sqlalchemy import Select

Check warning on line 1 in src/api/endpoints/batch/duplicates/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/batch/duplicates/query.py#L1 <100>

Missing docstring in public module
Raw output
./src/api/endpoints/batch/duplicates/query.py:1:1: D100 Missing docstring in public module
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import aliased

from src.db.dtos.duplicate import DuplicateInfo
from src.db.models.instantiations.batch import Batch
from src.db.models.instantiations.duplicate import Duplicate
from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL
from src.db.models.instantiations.url.core import URL
from src.db.queries.base.builder import QueryBuilderBase


class GetDuplicatesByBatchIDQueryBuilder(QueryBuilderBase):

Check warning on line 13 in src/api/endpoints/batch/duplicates/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/batch/duplicates/query.py#L13 <101>

Missing docstring in public class
Raw output
./src/api/endpoints/batch/duplicates/query.py:13:1: D101 Missing docstring in public class

def __init__(

Check warning on line 15 in src/api/endpoints/batch/duplicates/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/batch/duplicates/query.py#L15 <107>

Missing docstring in __init__
Raw output
./src/api/endpoints/batch/duplicates/query.py:15:1: D107 Missing docstring in __init__
self,
batch_id: int,
page: int
):
super().__init__()
self.batch_id = batch_id
self.page = page

async def run(self, session: AsyncSession) -> list[DuplicateInfo]:

Check warning on line 24 in src/api/endpoints/batch/duplicates/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/batch/duplicates/query.py#L24 <102>

Missing docstring in public method
Raw output
./src/api/endpoints/batch/duplicates/query.py:24:1: D102 Missing docstring in public method
original_batch = aliased(Batch)
duplicate_batch = aliased(Batch)

query = (
Select(
URL.url.label("source_url"),
URL.id.label("original_url_id"),
duplicate_batch.id.label("duplicate_batch_id"),
duplicate_batch.parameters.label("duplicate_batch_parameters"),
original_batch.id.label("original_batch_id"),
original_batch.parameters.label("original_batch_parameters"),
)
.select_from(Duplicate)
.join(URL, Duplicate.original_url_id == URL.id)
.join(duplicate_batch, Duplicate.batch_id == duplicate_batch.id)
.join(LinkBatchURL, URL.id == LinkBatchURL.url_id)
.join(original_batch, LinkBatchURL.batch_id == original_batch.id)
.filter(duplicate_batch.id == self.batch_id)
.limit(100)
.offset((self.page - 1) * 100)
)
raw_results = await session.execute(query)
results = raw_results.all()
final_results = []
for result in results:
final_results.append(
DuplicateInfo(
source_url=result.source_url,
duplicate_batch_id=result.duplicate_batch_id,
duplicate_metadata=result.duplicate_batch_parameters,
original_batch_id=result.original_batch_id,
original_metadata=result.original_batch_parameters,
original_url_id=result.original_url_id
)
)
return final_results
4 changes: 2 additions & 2 deletions src/api/endpoints/batch/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
from fastapi.params import Query, Depends

from src.api.dependencies import get_async_core
from src.api.endpoints.batch.dtos.get.duplicates import GetDuplicatesByBatchResponse
from src.api.endpoints.batch.dtos.get.logs import GetBatchLogsResponse
from src.api.endpoints.batch.dtos.get.summaries.response import GetBatchSummariesResponse
from src.api.endpoints.batch.dtos.get.summaries.summary import BatchSummary
from src.api.endpoints.batch.dtos.get.urls import GetURLsByBatchResponse
from src.api.endpoints.batch.dtos.post.abort import MessageResponse
from src.api.endpoints.batch.duplicates.dto import GetDuplicatesByBatchResponse
from src.api.endpoints.batch.urls.dto import GetURLsByBatchResponse
from src.collectors.enums import CollectorType
from src.core.core import AsyncCore
from src.core.enums import BatchStatus
Expand Down
31 changes: 31 additions & 0 deletions src/api/endpoints/batch/urls/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from sqlalchemy import Select

Check warning on line 1 in src/api/endpoints/batch/urls/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/batch/urls/query.py#L1 <100>

Missing docstring in public module
Raw output
./src/api/endpoints/batch/urls/query.py:1:1: D100 Missing docstring in public module
from sqlalchemy.ext.asyncio import AsyncSession

from src.db.dtos.url.core import URLInfo
from src.db.models.instantiations.link.link_batch_urls import LinkBatchURL
from src.db.models.instantiations.url.core import URL
from src.db.queries.base.builder import QueryBuilderBase


class GetURLsByBatchQueryBuilder(QueryBuilderBase):

Check warning on line 10 in src/api/endpoints/batch/urls/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/batch/urls/query.py#L10 <101>

Missing docstring in public class
Raw output
./src/api/endpoints/batch/urls/query.py:10:1: D101 Missing docstring in public class

def __init__(

Check warning on line 12 in src/api/endpoints/batch/urls/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/batch/urls/query.py#L12 <107>

Missing docstring in __init__
Raw output
./src/api/endpoints/batch/urls/query.py:12:1: D107 Missing docstring in __init__
self,
batch_id: int,
page: int = 1
):
super().__init__()
self.batch_id = batch_id
self.page = page

async def run(self, session: AsyncSession) -> list[URLInfo]:

Check warning on line 21 in src/api/endpoints/batch/urls/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/batch/urls/query.py#L21 <102>

Missing docstring in public method
Raw output
./src/api/endpoints/batch/urls/query.py:21:1: D102 Missing docstring in public method
query = (
Select(URL)
.join(LinkBatchURL)
.where(LinkBatchURL.batch_id == self.batch_id)
.order_by(URL.id)
.limit(100)
.offset((self.page - 1) * 100))
result = await session.execute(query)
urls = result.scalars().all()
return [URLInfo(**url.__dict__) for url in urls]

Check warning on line 31 in src/api/endpoints/batch/urls/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/batch/urls/query.py#L31 <292>

no newline at end of file
Raw output
./src/api/endpoints/batch/urls/query.py:31:57: W292 no newline at end of file
Loading