Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Migrate batch strategy enum to lookup table

Revision ID: 3a9f8e1d5b2c
Revises: 1fb2286a016c
Create Date: 2026-02-27 12:15:00.000000

"""
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = '3a9f8e1d5b2c'
down_revision: Union[str, None] = '1fb2286a016c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

STRATEGY_VALUES = [
"example",
"ckan",
"muckrock_county_search",
"auto_googler",
"muckrock_all_search",
"muckrock_simple_search",
"common_crawler",
"manual",
"internet_archive",
]


def upgrade() -> None:

Check warning on line 33 in alembic/versions/2026_02_27_1215-3a9f8e1d5b2c_migrate_batch_strategy_to_lookup_table.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2026_02_27_1215-3a9f8e1d5b2c_migrate_batch_strategy_to_lookup_table.py#L33 <103>

Missing docstring in public function
Raw output
./alembic/versions/2026_02_27_1215-3a9f8e1d5b2c_migrate_batch_strategy_to_lookup_table.py:33:1: D103 Missing docstring in public function
op.create_table(
"batch_strategies",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("name"),
)

values_sql = ", ".join(f"('{value}')" for value in STRATEGY_VALUES)
op.execute(f"INSERT INTO batch_strategies (name) VALUES {values_sql}")

op.add_column(
"batches",
sa.Column("batch_strategy_id", sa.Integer(), nullable=True),
)
op.execute("""
UPDATE batches AS b
SET batch_strategy_id = bs.id
FROM batch_strategies AS bs
WHERE b.strategy::text = bs.name
""")
op.alter_column("batches", "batch_strategy_id", nullable=False)
op.create_foreign_key(
"fk_batches_batch_strategy_id",
"batches",
"batch_strategies",
["batch_strategy_id"],
["id"],
)
op.create_index(
"ix_batches_batch_strategy_id",
"batches",
["batch_strategy_id"],
unique=False,
)

op.drop_column("batches", "strategy")
op.execute("DROP TYPE IF EXISTS batch_strategy")


def downgrade() -> None:

Check warning on line 74 in alembic/versions/2026_02_27_1215-3a9f8e1d5b2c_migrate_batch_strategy_to_lookup_table.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2026_02_27_1215-3a9f8e1d5b2c_migrate_batch_strategy_to_lookup_table.py#L74 <103>

Missing docstring in public function
Raw output
./alembic/versions/2026_02_27_1215-3a9f8e1d5b2c_migrate_batch_strategy_to_lookup_table.py:74:1: D103 Missing docstring in public function
batch_strategy_enum = postgresql.ENUM(
*STRATEGY_VALUES,
name="batch_strategy",
)
batch_strategy_enum.create(op.get_bind(), checkfirst=True)

op.add_column(
"batches",
sa.Column("strategy", batch_strategy_enum, nullable=True),
)
op.execute("""
UPDATE batches AS b
SET strategy = bs.name::batch_strategy
FROM batch_strategies AS bs
WHERE b.batch_strategy_id = bs.id
""")
op.alter_column("batches", "strategy", nullable=False)

op.drop_index("ix_batches_batch_strategy_id", table_name="batches")
op.drop_constraint("fk_batches_batch_strategy_id", "batches", type_="foreignkey")
op.drop_column("batches", "batch_strategy_id")
op.drop_table("batch_strategies")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dev = [
"pytest-asyncio~=0.25.2",
"pytest-mock==3.12.0",
"pytest-timeout~=2.3.1",
"ruff>=0.15.4",
"vulture>=2.14",
]

Expand Down
12 changes: 10 additions & 2 deletions src/api/endpoints/collector/manual/query.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from sqlalchemy import select

Check warning on line 1 in src/api/endpoints/collector/manual/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/collector/manual/query.py#L1 <100>

Missing docstring in public module
Raw output
./src/api/endpoints/collector/manual/query.py:1:1: D100 Missing docstring in public module
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession

Expand All @@ -6,6 +7,7 @@
from src.collectors.enums import CollectorType
from src.core.enums import BatchStatus
from src.db.models.impl.batch.sqlalchemy import Batch
from src.db.models.impl.batch.strategy.sqlalchemy import BatchStrategy
from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL
from src.db.models.impl.url.core.enums import URLSource
from src.db.models.impl.url.core.sqlalchemy import URL
Expand All @@ -29,8 +31,14 @@


async def run(self, session: AsyncSession) -> ManualBatchResponseDTO:
strategy_id: int | None = await session.scalar(
select(BatchStrategy.id).where(BatchStrategy.name == CollectorType.MANUAL.value)
)
if strategy_id is None:
raise ValueError(f"Unknown batch strategy: {CollectorType.MANUAL.value}")

batch = Batch(
strategy=CollectorType.MANUAL.value,
batch_strategy_id=strategy_id,
status=BatchStatus.READY_TO_LABEL.value,
parameters={
"name": self.dto.name
Expand Down Expand Up @@ -95,4 +103,4 @@
batch_id=batch_id,
urls=url_ids,
duplicate_urls=duplicate_urls
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from src.api.endpoints.metrics.batches.aggregated.query.models.strategy_count import CountByBatchStrategyResponse
from src.db.helpers.session import session_helper as sh
from src.db.models.impl.batch.sqlalchemy import Batch
from src.db.models.impl.batch.strategy.sqlalchemy import BatchStrategy
from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL
from src.db.queries.base.builder import QueryBuilderBase

Expand All @@ -16,13 +17,18 @@ async def run(self, session: AsyncSession) -> list[CountByBatchStrategyResponse]

query = (
select(
Batch.strategy,
BatchStrategy.name.label("strategy"),
func.count(LinkBatchURL.url_id).label("count")
)
.select_from(Batch)
.join(
BatchStrategy,
Batch.batch_strategy_id == BatchStrategy.id,
)
.join(LinkBatchURL)
.group_by(Batch.strategy)
.group_by(BatchStrategy.name)
)

mappings: Sequence[RowMapping] = await sh.mappings(session, query=query)
results = [CountByBatchStrategyResponse(**mapping) for mapping in mappings]
return results
return results
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Sequence

from sqlalchemy import CTE, select, func, RowMapping
from sqlalchemy import select, func, RowMapping
from sqlalchemy.ext.asyncio import AsyncSession

from src.api.endpoints.metrics.batches.aggregated.query.batch_status_.response import \
BatchStatusCountByBatchStrategyResponseDTO
from src.collectors.enums import CollectorType
from src.core.enums import BatchStatus
from src.db.models.impl.batch.sqlalchemy import Batch
from src.db.models.impl.batch.strategy.sqlalchemy import BatchStrategy
from src.db.queries.base.builder import QueryBuilderBase

from src.db.helpers.session import session_helper as sh
Expand All @@ -17,11 +18,16 @@ class BatchStatusByBatchStrategyQueryBuilder(QueryBuilderBase):
async def run(self, session: AsyncSession) -> list[BatchStatusCountByBatchStrategyResponseDTO]:
query = (
select(
Batch.strategy,
BatchStrategy.name.label("strategy"),
Batch.status,
func.count(Batch.id).label("count")
)
.group_by(Batch.strategy, Batch.status)
.select_from(Batch)
.join(
BatchStrategy,
Batch.batch_strategy_id == BatchStrategy.id,
)
.group_by(BatchStrategy.name, Batch.status)
)
mappings: Sequence[RowMapping] = await sh.mappings(session, query=query)

Expand All @@ -34,4 +40,4 @@ async def run(self, session: AsyncSession) -> list[BatchStatusCountByBatchStrate
count=mapping["count"]
)
)
return results
return results
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from src.api.endpoints.metrics.batches.aggregated.query.models.strategy_count import CountByBatchStrategyResponse
from src.db.models.impl.batch.sqlalchemy import Batch
from src.db.models.impl.batch.strategy.sqlalchemy import BatchStrategy
from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated
from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL
from src.db.queries.base.builder import QueryBuilderBase
Expand All @@ -17,9 +18,14 @@ async def run(

query = (
select(
Batch.strategy,
BatchStrategy.name.label("strategy"),
func.count(LinkBatchURL.url_id).label("count")
)
.select_from(Batch)
.join(
BatchStrategy,
Batch.batch_strategy_id == BatchStrategy.id,
)
.join(
LinkBatchURL,
LinkBatchURL.batch_id == Batch.id
Expand All @@ -29,7 +35,7 @@ async def run(
FlagURLValidated.url_id == LinkBatchURL.url_id
)
.where(FlagURLValidated.url_id.is_(None))
.group_by(Batch.strategy)
.group_by(BatchStrategy.name)
)

mappings: Sequence[RowMapping] = await sh.mappings(session, query=query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from src.api.endpoints.metrics.batches.aggregated.query.models.strategy_count import CountByBatchStrategyResponse
from src.db.models.impl.batch.sqlalchemy import Batch
from src.db.models.impl.batch.strategy.sqlalchemy import BatchStrategy
from src.db.models.impl.flag.url_validated.enums import URLType
from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated
from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL
Expand All @@ -19,9 +20,14 @@ async def run(

query = (
select(
Batch.strategy,
BatchStrategy.name.label("strategy"),
func.count(FlagURLValidated.url_id).label("count")
)
.select_from(Batch)
.join(
BatchStrategy,
Batch.batch_strategy_id == BatchStrategy.id,
)
.join(
LinkBatchURL,
LinkBatchURL.batch_id == Batch.id
Expand All @@ -31,7 +37,7 @@ async def run(
FlagURLValidated.url_id == LinkBatchURL.url_id
)
.where(FlagURLValidated.type == URLType.NOT_RELEVANT)
.group_by(Batch.strategy)
.group_by(BatchStrategy.name)
)

mappings: Sequence[RowMapping] = await sh.mappings(session, query=query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from src.collectors.enums import CollectorType
from src.db.helpers.session import session_helper as sh
from src.db.models.impl.batch.sqlalchemy import Batch
from src.db.models.impl.batch.strategy.sqlalchemy import BatchStrategy
from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL
from src.db.models.impl.url.data_source.sqlalchemy import DSAppLinkDataSource
from src.db.queries.base.builder import QueryBuilderBase
Expand All @@ -19,9 +20,14 @@ async def run(self, session: AsyncSession) -> list[
]:
query = (
select(
Batch.strategy,
BatchStrategy.name.label("strategy"),
func.count(DSAppLinkDataSource.id).label("count")
)
.select_from(Batch)
.join(
BatchStrategy,
Batch.batch_strategy_id == BatchStrategy.id,
)
.join(
LinkBatchURL,
LinkBatchURL.batch_id == Batch.id
Expand All @@ -30,7 +36,7 @@ async def run(self, session: AsyncSession) -> list[
DSAppLinkDataSource,
DSAppLinkDataSource.url_id == LinkBatchURL.url_id
)
.group_by(Batch.strategy)
.group_by(BatchStrategy.name)
)

mappings: Sequence[RowMapping] = await sh.mappings(session, query=query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from src.db.helpers.query import exists_url
from src.db.helpers.session import session_helper as sh
from src.db.models.impl.batch.sqlalchemy import Batch
from src.db.models.impl.batch.strategy.sqlalchemy import BatchStrategy
from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL
from src.db.models.impl.url.core.sqlalchemy import URL
from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError
Expand All @@ -18,20 +19,22 @@ class URLErrorByBatchStrategyQueryBuilder(QueryBuilderBase):
async def run(self, session: AsyncSession) -> list[CountByBatchStrategyResponse]:
query = (
select(
Batch.strategy,
BatchStrategy.name.label("strategy"),
func.count(URL.id).label("count")
)
.select_from(Batch)
.join(
BatchStrategy,
Batch.batch_strategy_id == BatchStrategy.id,
)
.join(LinkBatchURL)
.join(URL)
.where(
exists_url(URLTaskError)
)
.group_by(Batch.strategy)
.group_by(BatchStrategy.name)
)

mappings: Sequence[RowMapping] = await sh.mappings(session, query=query)
results = [CountByBatchStrategyResponse(**mapping) for mapping in mappings]
return results


Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from src.api.endpoints.metrics.batches.aggregated.query.models.strategy_count import CountByBatchStrategyResponse
from src.db.helpers.session import session_helper as sh
from src.db.models.impl.batch.sqlalchemy import Batch
from src.db.models.impl.batch.strategy.sqlalchemy import BatchStrategy
from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated
from src.db.models.impl.link.batch_url.sqlalchemy import LinkBatchURL
from src.db.queries.base.builder import QueryBuilderBase
Expand All @@ -19,9 +20,14 @@ async def run(

query = (
select(
Batch.strategy,
BatchStrategy.name.label("strategy"),
func.count(FlagURLValidated.url_id).label("count")
)
.select_from(Batch)
.join(
BatchStrategy,
Batch.batch_strategy_id == BatchStrategy.id,
)
.join(
LinkBatchURL,
LinkBatchURL.batch_id == Batch.id
Expand All @@ -30,7 +36,7 @@ async def run(
FlagURLValidated,
FlagURLValidated.url_id == LinkBatchURL.url_id
)
.group_by(Batch.strategy)
.group_by(BatchStrategy.name)
)

mappings: Sequence[RowMapping] = await sh.mappings(session, query=query)
Expand Down
Loading
Loading