From 48f33e62a68daa3d66c44c5bf58848b7e0d66854 Mon Sep 17 00:00:00 2001 From: maxachis Date: Fri, 18 Apr 2025 18:15:41 -0400 Subject: [PATCH] feat(app): make delete logs job an asynchronous scheduled task Additionally, because now the Scheduled Task Manager is no longer in use, it has been removed and references to it removed. --- api/main.py | 1 - collector_db/AsyncDatabaseClient.py | 15 ++++++- collector_db/DatabaseClient.py | 9 ----- core/ScheduledTaskManager.py | 40 ++++--------------- core/SourceCollectorCore.py | 21 ---------- .../collector_db/test_db_client.py | 2 +- tests/test_automated/integration/conftest.py | 2 - 7 files changed, 23 insertions(+), 67 deletions(-) diff --git a/api/main.py b/api/main.py index 6c5e2018..ae74c914 100644 --- a/api/main.py +++ b/api/main.py @@ -96,7 +96,6 @@ async def lifespan(app: FastAPI): # Clean up resources, close connections, etc. await core_logger.shutdown() await async_core.shutdown() - source_collector_core.shutdown() await session.close() pass diff --git a/collector_db/AsyncDatabaseClient.py b/collector_db/AsyncDatabaseClient.py index 9e1ab473..eb68735c 100644 --- a/collector_db/AsyncDatabaseClient.py +++ b/collector_db/AsyncDatabaseClient.py @@ -1,8 +1,9 @@ +from datetime import datetime, timedelta from functools import wraps from typing import Optional, Type, Any, List from fastapi import HTTPException -from sqlalchemy import select, exists, func, case, desc, Select, not_, and_, or_, update, Delete, Insert, asc +from sqlalchemy import select, exists, func, case, desc, Select, not_, and_, or_, update, Delete, Insert, asc, delete from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.orm import selectinload, joinedload, QueryableAttribute, aliased @@ -1615,3 +1616,15 @@ async def get_logs_by_batch_id(self, session, batch_id: int) -> List[LogOutputIn logs = raw_results.scalars().all() return ([LogOutputInfo(**log.__dict__) for log in logs]) + @session_manager + async def delete_old_logs(self, session): + """ + Delete logs older than a day + """ + statement = delete(Log).where( + Log.created_at < datetime.now() - timedelta(days=1) + ) + await session.execute(statement) + + + diff --git a/collector_db/DatabaseClient.py b/collector_db/DatabaseClient.py index b8547f1d..3999dbc9 100644 --- a/collector_db/DatabaseClient.py +++ b/collector_db/DatabaseClient.py @@ -158,15 +158,6 @@ def get_batch_status(self, session, batch_id: int) -> BatchStatus: batch = session.query(Batch).filter_by(id=batch_id).first() return BatchStatus(batch.status) - @session_manager - def delete_old_logs(self, session): - """ - Delete logs older than a day - """ - session.query(Log).filter( - Log.created_at < datetime.now() - timedelta(days=1) - ).delete() - @session_manager def update_url(self, session, url_info: URLInfo): url = session.query(URL).filter_by(id=url_info.id).first() diff --git a/core/ScheduledTaskManager.py b/core/ScheduledTaskManager.py index 5b2ff0a7..0a407d9e 100644 --- a/core/ScheduledTaskManager.py +++ b/core/ScheduledTaskManager.py @@ -1,41 +1,9 @@ from datetime import datetime, timedelta from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger - -from collector_db.DatabaseClient import DatabaseClient from core.AsyncCore import AsyncCore - -class ScheduledTaskManager: - - def __init__(self, db_client: DatabaseClient): - # Dependencies - self.db_client = db_client - - # Main objects - self.scheduler = BackgroundScheduler() - self.scheduler.start() - self.add_scheduled_tasks() - - # Jobs - self.delete_old_logs_job = None - - - def add_scheduled_tasks(self): - self.delete_old_logs_job = self.scheduler.add_job( - self.db_client.delete_old_logs, - trigger=IntervalTrigger( - days=1, - start_date=datetime.now() + timedelta(minutes=10) - ) - ) - - def shutdown(self): - if self.scheduler.running: - self.scheduler.shutdown() - class AsyncScheduledTaskManager: def __init__(self, async_core: AsyncCore): @@ -49,6 +17,7 @@ def __init__(self, async_core: AsyncCore): # Jobs self.run_cycles_job = None + self.delete_logs_job = None def add_scheduled_tasks(self): self.run_cycles_job = self.scheduler.add_job( @@ -59,6 +28,13 @@ def add_scheduled_tasks(self): ), misfire_grace_time=60 ) + self.delete_logs_job = self.scheduler.add_job( + self.async_core.adb_client.delete_old_logs, + trigger=IntervalTrigger( + days=1, + start_date=datetime.now() + timedelta(minutes=10) + ) + ) def shutdown(self): if self.scheduler.running: diff --git a/core/SourceCollectorCore.py b/core/SourceCollectorCore.py index 6f05a3c4..a4699bf6 100644 --- a/core/SourceCollectorCore.py +++ b/core/SourceCollectorCore.py @@ -2,7 +2,6 @@ from collector_db.DatabaseClient import DatabaseClient -from core.ScheduledTaskManager import ScheduledTaskManager from core.enums import BatchStatus @@ -10,30 +9,10 @@ class SourceCollectorCore: def __init__( self, db_client: Optional[DatabaseClient] = None, - dev_mode: bool = False ): if db_client is None: db_client = DatabaseClient() self.db_client = db_client - if not dev_mode: - self.scheduled_task_manager = ScheduledTaskManager(db_client=db_client) - else: - self.scheduled_task_manager = None - def get_status(self, batch_id: int) -> BatchStatus: return self.db_client.get_batch_status(batch_id) - - - def shutdown(self): - if self.scheduled_task_manager is not None: - self.scheduled_task_manager.shutdown() - - - - - -""" -TODO: Add logic for batch processing - -""" \ No newline at end of file diff --git a/tests/test_automated/integration/collector_db/test_db_client.py b/tests/test_automated/integration/collector_db/test_db_client.py index 5560577e..93edb3ed 100644 --- a/tests/test_automated/integration/collector_db/test_db_client.py +++ b/tests/test_automated/integration/collector_db/test_db_client.py @@ -94,7 +94,7 @@ async def test_delete_old_logs(db_data_creator: DBDataCreator): db_client.insert_logs(log_infos=log_infos) logs = await adb_client.get_logs_by_batch_id(batch_id=batch_id) assert len(logs) == 3 - db_client.delete_old_logs() + await adb_client.delete_old_logs() logs = await adb_client.get_logs_by_batch_id(batch_id=batch_id) assert len(logs) == 0 diff --git a/tests/test_automated/integration/conftest.py b/tests/test_automated/integration/conftest.py index a0180800..70c79c22 100644 --- a/tests/test_automated/integration/conftest.py +++ b/tests/test_automated/integration/conftest.py @@ -13,10 +13,8 @@ def test_core(db_client_test): core = SourceCollectorCore( db_client=db_client_test, - dev_mode=True ) yield core - core.shutdown() @pytest.fixture