diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py b/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py index f1ae27cd..f4773417 100644 --- a/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/operator.py @@ -5,6 +5,8 @@ from src.core.tasks.scheduled.impl.internet_archives.probe.convert import convert_ia_url_mapping_to_ia_metadata from src.core.tasks.scheduled.impl.internet_archives.probe.filter import filter_into_subsets from src.core.tasks.scheduled.impl.internet_archives.probe.models.subset import IAURLMappingSubsets +from src.core.tasks.scheduled.impl.internet_archives.probe.queries.delete import \ + DeleteOldUnsuccessfulIACheckedFlagsQueryBuilder from src.core.tasks.scheduled.impl.internet_archives.probe.queries.get import GetURLsForInternetArchivesTaskQueryBuilder from src.core.tasks.scheduled.impl.internet_archives.probe.queries.prereq import \ CheckURLInternetArchivesTaskPrerequisitesQueryBuilder @@ -45,6 +47,10 @@ async def meets_task_prerequisites(self) -> bool: ) async def inner_task_logic(self) -> None: + await self.adb_client.run_query_builder( + DeleteOldUnsuccessfulIACheckedFlagsQueryBuilder() + ) + url_mappings: list[URLMapping] = await self._get_url_mappings() if len(url_mappings) == 0: return diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/cte.py b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/cte.py new file mode 100644 index 00000000..7de8b290 --- /dev/null +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/cte.py @@ -0,0 +1,42 @@ +from sqlalchemy import select, or_, exists, func, text, CTE, ColumnElement + +from src.db.helpers.query import not_exists_url +from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives +from src.db.models.impl.url.core.sqlalchemy import URL + + +class CheckURLInternetArchivesCTEContainer: + + def __init__(self): + + self._cte = ( + select( + URL.id.label("url_id"), + URL.url + ) + .where( + or_( + not_exists_url(FlagURLCheckedForInternetArchives), + exists( + select(FlagURLCheckedForInternetArchives.url_id) + .where( + FlagURLCheckedForInternetArchives.url_id == URL.id, + ~FlagURLCheckedForInternetArchives.success, + FlagURLCheckedForInternetArchives.created_at < func.now() - text("INTERVAL '1 week'") + ) + ) + ) + ).cte("check_url_internet_archives_prereq") + ) + + @property + def cte(self) -> CTE: + return self._cte + + @property + def url_id(self) -> ColumnElement[int]: + return self._cte.c.url_id + + @property + def url(self) -> ColumnElement[str]: + return self._cte.c.url \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/delete.py b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/delete.py new file mode 100644 index 00000000..2d9a08e1 --- /dev/null +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/delete.py @@ -0,0 +1,24 @@ +from sqlalchemy import delete, exists, select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.tasks.scheduled.impl.internet_archives.probe.queries.cte import CheckURLInternetArchivesCTEContainer +from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives +from src.db.queries.base.builder import QueryBuilderBase + +class DeleteOldUnsuccessfulIACheckedFlagsQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> None: + cte = CheckURLInternetArchivesCTEContainer() + query = ( + delete(FlagURLCheckedForInternetArchives) + .where( + exists( + select(cte.url_id) + .where( + FlagURLCheckedForInternetArchives.url_id == cte.url_id, + ) + ) + ) + ) + + await session.execute(query) \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/get.py b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/get.py index 94f2ad5e..3306943a 100644 --- a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/get.py +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/get.py @@ -1,7 +1,9 @@ -from sqlalchemy import select +from sqlalchemy import select, or_, exists, text, func from sqlalchemy.ext.asyncio import AsyncSession +from src.core.tasks.scheduled.impl.internet_archives.probe.queries.cte import CheckURLInternetArchivesCTEContainer from src.db.dtos.url.mapping import URLMapping +from src.db.helpers.query import not_exists_url from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives from src.db.models.impl.url.core.sqlalchemy import URL from src.db.queries.base.builder import QueryBuilderBase @@ -11,23 +13,19 @@ class GetURLsForInternetArchivesTaskQueryBuilder(QueryBuilderBase): async def run(self, session: AsyncSession) -> list[URLMapping]: + cte = CheckURLInternetArchivesCTEContainer() query = ( select( - URL.id, - URL.url + cte.url_id, + cte.url ) - .outerjoin( - FlagURLCheckedForInternetArchives, - URL.id == FlagURLCheckedForInternetArchives.url_id - ) - .where(FlagURLCheckedForInternetArchives.url_id.is_(None)) .limit(100) ) db_mappings = await sh.mappings(session, query=query) return [ URLMapping( - url_id=mapping["id"], + url_id=mapping["url_id"], url=mapping["url"] ) for mapping in db_mappings ] diff --git a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py index 7a7d8687..d8994641 100644 --- a/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py +++ b/src/core/tasks/scheduled/impl/internet_archives/probe/queries/prereq.py @@ -1,6 +1,7 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from src.core.tasks.scheduled.impl.internet_archives.probe.queries.cte import CheckURLInternetArchivesCTEContainer from src.db.helpers.query import not_exists_url from src.db.models.impl.flag.checked_for_ia.sqlalchemy import FlagURLCheckedForInternetArchives from src.db.models.impl.url.core.sqlalchemy import URL @@ -11,12 +12,8 @@ class CheckURLInternetArchivesTaskPrerequisitesQueryBuilder(QueryBuilderBase): async def run(self, session: AsyncSession) -> bool: + cte = CheckURLInternetArchivesCTEContainer() query = ( - select(URL) - .where( - not_exists_url(FlagURLCheckedForInternetArchives) - ) - .limit(1) + select(cte.url_id) ) - result = await sh.one_or_none(session, query=query) - return result is not None + return await sh.results_exist(session, query=query) diff --git a/src/db/models/impl/flag/checked_for_ia/sqlalchemy.py b/src/db/models/impl/flag/checked_for_ia/sqlalchemy.py index 87914eb2..efdf9257 100644 --- a/src/db/models/impl/flag/checked_for_ia/sqlalchemy.py +++ b/src/db/models/impl/flag/checked_for_ia/sqlalchemy.py @@ -1,13 +1,14 @@ from sqlalchemy import PrimaryKeyConstraint from sqlalchemy.orm import Mapped -from src.db.models.mixins import URLDependentMixin +from src.db.models.mixins import URLDependentMixin, CreatedAtMixin from src.db.models.templates_.base import Base from src.db.models.templates_.with_id import WithIDBase class FlagURLCheckedForInternetArchives( URLDependentMixin, + CreatedAtMixin, Base ):