Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Add url_task_error table and remove url_error_info

Revision ID: dc6ab5157c49
Revises: c5c20af87511
Create Date: 2025-10-03 18:31:54.887740

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import ENUM

from src.util.alembic_helpers import url_id_column, task_id_column, created_at_column

# revision identifiers, used by Alembic.
revision: str = 'dc6ab5157c49'
down_revision: Union[str, None] = 'c5c20af87511'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None





def upgrade() -> None:

Check warning on line 26 in alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py#L26 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py:26:1: D103 Missing docstring in public function

Check failure on line 26 in alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py#L26 <303>

too many blank lines (5)
Raw output
./alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py:26:1: E303 too many blank lines (5)
_remove_url_error_info()
_remove_url_screenshot_error()
_add_url_task_error()

def _remove_url_error_info():
op.drop_table("url_error_info")

def _remove_url_screenshot_error():
op.drop_table("error_url_screenshot")

def _add_url_task_error():
op.create_table(
"url_task_error",
url_id_column(),
task_id_column(),
sa.Column(
"task_type",
ENUM(name="task_type", create_type=False)
),
sa.Column("error", sa.String(), nullable=False),
created_at_column(),
sa.PrimaryKeyConstraint("url_id", "task_type")
)

Check failure on line 49 in alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py#L49 <122>

continuation line missing indentation or outdented
Raw output
./alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py:49:3: E122 continuation line missing indentation or outdented



def downgrade() -> None:

Check warning on line 53 in alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py#L53 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py:53:1: D103 Missing docstring in public function

Check failure on line 53 in alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py#L53 <303>

too many blank lines (3)
Raw output
./alembic/versions/2025_10_03_1831-dc6ab5157c49_add_url_task_error_table_and_remove_url_.py:53:1: E303 too many blank lines (3)
pass
4 changes: 1 addition & 3 deletions src/api/endpoints/task/by_id/dto.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import datetime
from typing import Optional

from pydantic import BaseModel

from src.db.enums import TaskType
from src.db.models.impl.task.enums import TaskStatus
from src.db.models.impl.url.core.pydantic.info import URLInfo
from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic
from src.db.enums import TaskType
from src.core.enums import BatchStatus


class TaskInfo(BaseModel):
Expand Down
19 changes: 9 additions & 10 deletions src/api/endpoints/task/by_id/query.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from sqlalchemy.orm import selectinload, joinedload

Check warning on line 3 in src/api/endpoints/task/by_id/query.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/api/endpoints/task/by_id/query.py#L3 <401>

'sqlalchemy.orm.joinedload' imported but unused
Raw output
./src/api/endpoints/task/by_id/query.py:3:1: F401 'sqlalchemy.orm.joinedload' imported but unused

from src.api.endpoints.task.by_id.dto import TaskInfo
from src.collectors.enums import URLStatus
from src.core.enums import BatchStatus
from src.db.models.impl.task.enums import TaskStatus
from src.db.models.impl.url.core.pydantic.info import URLInfo
from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic
from src.db.enums import TaskType
from src.db.models.impl.task.core import Task
from src.db.models.impl.task.enums import TaskStatus
from src.db.models.impl.url.core.pydantic.info import URLInfo
from src.db.models.impl.url.core.sqlalchemy import URL
from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic
from src.db.queries.base.builder import QueryBuilderBase


Expand All @@ -28,12 +27,12 @@
.options(
selectinload(Task.urls)
.selectinload(URL.batch),
selectinload(Task.error),
selectinload(Task.errored_urls)
selectinload(Task.url_errors),
selectinload(Task.errors)
)
)
task = result.scalars().first()
error = task.error[0].error if len(task.error) > 0 else None
error = task.errors[0].error if len(task.errors) > 0 else None
# Get error info if any
# Get URLs
urls = task.urls
Expand All @@ -50,12 +49,12 @@
url_infos.append(url_info)

errored_urls = []
for url in task.errored_urls:
for url in task.url_errors:
url_error_info = URLErrorInfoPydantic(
task_id=url.task_id,
url_id=url.url_id,
error=url.error,
updated_at=url.updated_at
updated_at=url.created_at
)
errored_urls.append(url_error_info)
return TaskInfo(
Expand Down
5 changes: 3 additions & 2 deletions src/api/endpoints/url/get/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
from pydantic import BaseModel

from src.collectors.enums import URLStatus
from src.db.enums import URLMetadataAttributeType, ValidationStatus, ValidationSource
from src.db.enums import URLMetadataAttributeType, ValidationStatus, ValidationSource, TaskType


class GetURLsResponseErrorInfo(BaseModel):
id: int
task: TaskType
error: str
updated_at: datetime.datetime

Expand Down
12 changes: 6 additions & 6 deletions src/api/endpoints/url/get/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from src.collectors.enums import URLStatus
from src.db.client.helpers import add_standard_limit_and_offset
from src.db.models.impl.url.core.sqlalchemy import URL
from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo
from src.db.models.impl.url.task_error.sqlalchemy import URLTaskError
from src.db.queries.base.builder import QueryBuilderBase


Expand All @@ -23,14 +23,14 @@ def __init__(

async def run(self, session: AsyncSession) -> GetURLsResponseInfo:
statement = select(URL).options(
selectinload(URL.error_info),
selectinload(URL.task_errors),
selectinload(URL.batch)
).order_by(URL.id)
if self.errors:
# Only return URLs with errors
statement = statement.where(
exists(
select(URLErrorInfo).where(URLErrorInfo.url_id == URL.id)
select(URLTaskError).where(URLTaskError.url_id == URL.id)
)
)
add_standard_limit_and_offset(statement, self.page)
Expand All @@ -39,11 +39,11 @@ async def run(self, session: AsyncSession) -> GetURLsResponseInfo:
final_results = []
for result in all_results:
error_results = []
for error in result.error_info:
for error in result.task_errors:
error_result = GetURLsResponseErrorInfo(
id=error.id,
task=error.task_type,
error=error.error,
updated_at=error.updated_at
updated_at=error.created_at
)
error_results.append(error_result)
final_results.append(
Expand Down
17 changes: 17 additions & 0 deletions src/core/tasks/base/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from src.db.client.async_ import AsyncDatabaseClient
from src.db.enums import TaskType
from src.db.models.impl.task.enums import TaskStatus
from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic
from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall


class TaskOperatorBase(ABC):
Expand Down Expand Up @@ -66,3 +68,18 @@
task_id=self.task_id,
error=str(e)
)

async def add_task_errors(

Check warning on line 72 in src/core/tasks/base/operator.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/base/operator.py#L72 <102>

Missing docstring in public method
Raw output
./src/core/tasks/base/operator.py:72:1: D102 Missing docstring in public method
self,
errors: list[URLTaskErrorSmall]
) -> None:
inserts: list[URLTaskErrorPydantic] = [
URLTaskErrorPydantic(
task_id=self.task_id,
url_id=error.url_id,
task_type=self.task_type,
error=error.error
)
for error in errors
]
await self.adb_client.bulk_insert(inserts)

Check warning on line 85 in src/core/tasks/base/operator.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/base/operator.py#L85 <292>

no newline at end of file
Raw output
./src/core/tasks/base/operator.py:85:51: W292 no newline at end of file
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from sqlalchemy.sql.functions import count

from src.collectors.enums import URLStatus
from src.db.enums import TaskType
from src.db.helpers.query import not_exists_url, no_url_task_error, exists_url

Check warning on line 10 in src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py#L10 <401>

'src.db.helpers.query.not_exists_url' imported but unused
Raw output
./src/core/tasks/scheduled/impl/huggingface/queries/check/requester.py:10:1: F401 'src.db.helpers.query.not_exists_url' imported but unused
from src.db.helpers.session import session_helper as sh
from src.db.models.impl.flag.url_validated.sqlalchemy import FlagURLValidated
from src.db.models.impl.state.huggingface import HuggingFaceUploadState
Expand Down Expand Up @@ -36,12 +38,9 @@
URLCompressedHTML,
URL.id == URLCompressedHTML.url_id
)
.outerjoin(
FlagURLValidated,
URL.id == FlagURLValidated.url_id
)
.where(
FlagURLValidated.url_id.isnot(None)
exists_url(FlagURLValidated),
no_url_task_error(TaskType.PUSH_TO_HUGGINGFACE)
)
)
if last_upload_at is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

from src.core.tasks.mixins.link_urls import LinkURLsMixin
from src.core.tasks.mixins.prereq import HasPrerequisitesMixin
from src.core.tasks.scheduled.impl.internet_archives.probe.queries.prereq import \
CheckURLInternetArchivesTaskPrerequisitesQueryBuilder
from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase
from src.core.tasks.scheduled.impl.internet_archives.probe.convert import convert_ia_url_mapping_to_ia_metadata
from src.core.tasks.scheduled.impl.internet_archives.probe.filter import filter_into_subsets
from src.core.tasks.scheduled.impl.internet_archives.probe.models.subset import IAURLMappingSubsets
from src.core.tasks.scheduled.impl.internet_archives.probe.queries.get import GetURLsForInternetArchivesTaskQueryBuilder
from src.core.tasks.scheduled.impl.internet_archives.probe.queries.prereq import \
CheckURLInternetArchivesTaskPrerequisitesQueryBuilder
from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase
from src.db.client.async_ import AsyncDatabaseClient
from src.db.dtos.url.mapping import URLMapping
from src.db.enums import TaskType
from src.db.models.impl.flag.checked_for_ia.pydantic import FlagURLCheckedForInternetArchivesPydantic
from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic
from src.db.models.impl.url.internet_archives.probe.pydantic import URLInternetArchiveMetadataPydantic
from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall
from src.external.internet_archives.client import InternetArchivesClient
from src.external.internet_archives.models.ia_url_mapping import InternetArchivesURLMapping
from src.util.progress_bar import get_progress_bar_disabled
Expand Down Expand Up @@ -60,16 +60,15 @@ async def inner_task_logic(self) -> None:
await self._add_ia_metadata_to_db(mapper, ia_mappings=subsets.has_metadata)

async def _add_errors_to_db(self, mapper: URLMapper, ia_mappings: list[InternetArchivesURLMapping]) -> None:
url_error_info_list: list[URLErrorInfoPydantic] = []
url_error_info_list: list[URLTaskErrorSmall] = []
for ia_mapping in ia_mappings:
url_id = mapper.get_id(ia_mapping.url)
url_error_info = URLErrorInfoPydantic(
url_error_info = URLTaskErrorSmall(
url_id=url_id,
error=ia_mapping.error,
task_id=self.task_id
)
url_error_info_list.append(url_error_info)
await self.adb_client.bulk_insert(url_error_info_list)
await self.add_task_errors(url_error_info_list)

async def _get_url_mappings(self) -> list[URLMapping]:
return await self.adb_client.run_query_builder(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from src.db.helpers.query import not_exists_url
from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives
from src.db.models.impl.url.core.sqlalchemy import URL
from src.db.queries.base.builder import QueryBuilderBase
Expand All @@ -12,11 +13,9 @@ class CheckURLInternetArchivesTaskPrerequisitesQueryBuilder(QueryBuilderBase):
async def run(self, session: AsyncSession) -> bool:
query = (
select(URL)
.outerjoin(
FlagURLCheckedForInternetArchives,
URL.id == FlagURLCheckedForInternetArchives.url_id
.where(
not_exists_url(FlagURLCheckedForInternetArchives)
)
.where(FlagURLCheckedForInternetArchives.url_id.is_(None))
.limit(1)
)
result = await sh.one_or_none(session, query=query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase
from src.db.client.async_ import AsyncDatabaseClient
from src.db.enums import TaskType
from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic
from src.db.models.impl.url.internet_archives.save.pydantic import URLInternetArchiveSaveMetadataPydantic
from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall
from src.external.internet_archives.client import InternetArchivesClient
from src.external.internet_archives.models.save_response import InternetArchivesSaveResponseInfo

Expand Down Expand Up @@ -89,16 +89,15 @@ async def _add_errors_to_db(
mapper: URLToEntryMapper,
responses: list[InternetArchivesSaveResponseInfo]
) -> None:
error_info_list: list[URLErrorInfoPydantic] = []
error_info_list: list[URLTaskErrorSmall] = []
for response in responses:
url_id = mapper.get_url_id(response.url)
url_error_info = URLErrorInfoPydantic(
url_error_info = URLTaskErrorSmall(
url_id=url_id,
error=response.error,
task_id=self.task_id
)
error_info_list.append(url_error_info)
await self.adb_client.bulk_insert(error_info_list)
await self.add_task_errors(error_info_list)

async def _save_new_saves_to_db(
self,
Expand Down
10 changes: 0 additions & 10 deletions src/core/tasks/scheduled/impl/task_cleanup/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from sqlalchemy.ext.asyncio import AsyncSession

from src.db.models.impl.task.core import Task
from src.db.models.impl.task.error import TaskError
from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo
from src.db.queries.base.builder import QueryBuilderBase


Expand All @@ -15,14 +13,6 @@ class TaskCleanupQueryBuilder(QueryBuilderBase):
async def run(self, session: AsyncSession) -> Any:
one_week_ago: datetime = datetime.now() - timedelta(days=7)

statement = (
delete(URLErrorInfo)
.where(
URLErrorInfo.updated_at < one_week_ago
)
)
await session.execute(statement)

statement = (
delete(Task)
.where(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from src.core.tasks.url.operators.agency_identification.subtasks.models.run_info import AgencyIDSubtaskRunInfo
from src.core.tasks.url.operators.agency_identification.subtasks.models.subtask import AutoAgencyIDSubtaskData
from src.db.client.async_ import AsyncDatabaseClient
from src.db.models.impl.url.error_info.pydantic import URLErrorInfoPydantic
from src.db.enums import TaskType
from src.db.models.impl.url.suggestion.agency.subtask.pydantic import URLAutoAgencyIDSubtaskPydantic
from src.db.models.impl.url.suggestion.agency.suggestion.pydantic import AgencyIDSubtaskSuggestionPydantic
from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic
from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall


class AgencyIDSubtaskOperatorBase(ABC):
Expand Down Expand Up @@ -66,17 +68,29 @@
models=suggestions,
)

error_infos: list[URLErrorInfoPydantic] = []
error_infos: list[URLTaskErrorSmall] = []
for subtask_info in subtask_data_list:
if not subtask_info.has_error:
continue
error_info = URLErrorInfoPydantic(
error_info = URLTaskErrorSmall(
url_id=subtask_info.url_id,
error=subtask_info.error,
task_id=self.task_id,
)
error_infos.append(error_info)

await self.adb_client.bulk_insert(
models=error_infos,
)
await self.add_task_errors(error_infos)

async def add_task_errors(

Check warning on line 83 in src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py#L83 <102>

Missing docstring in public method
Raw output
./src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py:83:1: D102 Missing docstring in public method
self,
errors: list[URLTaskErrorSmall]
) -> None:
inserts: list[URLTaskErrorPydantic] = [
URLTaskErrorPydantic(
task_id=self.task_id,
url_id=error.url_id,
task_type=TaskType.AGENCY_IDENTIFICATION,
error=error.error
)
for error in errors
]
await self.adb_client.bulk_insert(inserts)

Check warning on line 96 in src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py#L96 <292>

no newline at end of file
Raw output
./src/core/tasks/url/operators/agency_identification/subtasks/templates/subtask.py:96:51: W292 no newline at end of file
Loading