diff --git a/ENV.md b/ENV.md index c1aa1f4c..a46c4f1d 100644 --- a/ENV.md +++ b/ENV.md @@ -68,6 +68,7 @@ Note that some tasks/subtasks are themselves enabled by other tasks. | `IA_SAVE_TASK_FLAG` | Saves URLs to Internet Archives. | | `MARK_TASK_NEVER_COMPLETED_TASK_FLAG` | Marks tasks that were started but never completed (usually due to a restart). | | `DELETE_STALE_SCREENSHOTS_TASK_FLAG` | Deletes stale screenshots for URLs already validated. | +| `TASK_CLEANUP_TASK_FLAG` | Cleans up tasks that are no longer needed. | ### URL Task Flags diff --git a/alembic/versions/2025_10_03_1546-c5c20af87511_add_task_cleanup_task.py b/alembic/versions/2025_10_03_1546-c5c20af87511_add_task_cleanup_task.py new file mode 100644 index 00000000..39a1004f --- /dev/null +++ b/alembic/versions/2025_10_03_1546-c5c20af87511_add_task_cleanup_task.py @@ -0,0 +1,28 @@ +"""Add task cleanup task + +Revision ID: c5c20af87511 +Revises: 241fd3925f5d +Create Date: 2025-10-03 15:46:00.212674 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'c5c20af87511' +down_revision: Union[str, None] = '241fd3925f5d' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute(""" + ALTER TYPE task_type ADD VALUE 'Task Cleanup' + """) + + +def downgrade() -> None: + pass diff --git a/src/core/tasks/scheduled/impl/task_cleanup/__init__.py b/src/core/tasks/scheduled/impl/task_cleanup/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/task_cleanup/operator.py b/src/core/tasks/scheduled/impl/task_cleanup/operator.py new file mode 100644 index 00000000..ea4febcd --- /dev/null +++ b/src/core/tasks/scheduled/impl/task_cleanup/operator.py @@ -0,0 +1,15 @@ +from src.core.tasks.scheduled.impl.task_cleanup.query import TaskCleanupQueryBuilder +from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase +from src.db.enums import TaskType + + +class TaskCleanupOperator(ScheduledTaskOperatorBase): + + @property + def task_type(self) -> TaskType: + return TaskType.TASK_CLEANUP + + async def inner_task_logic(self) -> None: + await self.adb_client.run_query_builder( + TaskCleanupQueryBuilder() + ) \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/task_cleanup/query.py b/src/core/tasks/scheduled/impl/task_cleanup/query.py new file mode 100644 index 00000000..8874a49a --- /dev/null +++ b/src/core/tasks/scheduled/impl/task_cleanup/query.py @@ -0,0 +1,33 @@ +from datetime import timedelta, datetime +from typing import Any + +from sqlalchemy import delete +from sqlalchemy.ext.asyncio import AsyncSession + +from src.db.models.impl.task.core import Task +from src.db.models.impl.task.error import TaskError +from src.db.models.impl.url.error_info.sqlalchemy import URLErrorInfo +from src.db.queries.base.builder import QueryBuilderBase + + +class TaskCleanupQueryBuilder(QueryBuilderBase): + + async def run(self, session: AsyncSession) -> Any: + one_week_ago: datetime = datetime.now() - timedelta(days=7) + + statement = ( + delete(URLErrorInfo) + .where( + URLErrorInfo.updated_at < one_week_ago + ) + ) + await session.execute(statement) + + statement = ( + delete(Task) + .where( + Task.updated_at < one_week_ago + ) + ) + + await session.execute(statement) \ No newline at end of file diff --git a/src/core/tasks/scheduled/loader.py b/src/core/tasks/scheduled/loader.py index cfadd82e..a753f2da 100644 --- a/src/core/tasks/scheduled/loader.py +++ b/src/core/tasks/scheduled/loader.py @@ -11,6 +11,7 @@ from src.core.tasks.scheduled.impl.mark_never_completed.operator import MarkTaskNeverCompletedOperator from src.core.tasks.scheduled.impl.mark_never_completed.query import MarkTaskNeverCompletedQueryBuilder from src.core.tasks.scheduled.impl.run_url_tasks.operator import RunURLTasksTaskOperator +from src.core.tasks.scheduled.impl.task_cleanup.operator import TaskCleanupOperator from src.core.tasks.scheduled.models.entry import ScheduledTaskEntry from src.db.client.async_ import AsyncDatabaseClient from src.external.huggingface.hub.client import HuggingFaceHubClient @@ -103,5 +104,10 @@ async def load_entries(self) -> list[ScheduledTaskEntry]: operator=DeleteStaleScreenshotsTaskOperator(adb_client=self.adb_client), interval_minutes=IntervalEnum.DAILY.value, enabled=self.setup_flag("DELETE_STALE_SCREENSHOTS_TASK_FLAG") + ), + ScheduledTaskEntry( + operator=TaskCleanupOperator(adb_client=self.adb_client), + interval_minutes=IntervalEnum.DAILY.value, + enabled=self.setup_flag("TASK_CLEANUP_TASK_FLAG") ) ] diff --git a/src/db/enums.py b/src/db/enums.py index 489709e3..dd0a7b24 100644 --- a/src/db/enums.py +++ b/src/db/enums.py @@ -63,6 +63,7 @@ class TaskType(PyEnum): DELETE_STALE_SCREENSHOTS = "Delete Stale Screenshots" MARK_TASK_NEVER_COMPLETED = "Mark Task Never Completed" RUN_URL_TASKS = "Run URL Task Cycles" + TASK_CLEANUP = "Task Cleanup" class ChangeLogOperationType(PyEnum): INSERT = "INSERT" diff --git a/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py b/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py index f2dd795c..be3dc380 100644 --- a/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py +++ b/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py @@ -2,7 +2,7 @@ from src.core.tasks.scheduled.loader import ScheduledTaskOperatorLoader -NUMBER_OF_ENTRIES = 8 +NUMBER_OF_ENTRIES = 9 @pytest.mark.asyncio async def test_happy_path(