Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ async def lifespan(app: FastAPI):
# Clean up resources, close connections, etc.
await core_logger.shutdown()
await async_core.shutdown()
source_collector_core.shutdown()
await session.close()
pass

Expand Down
15 changes: 14 additions & 1 deletion collector_db/AsyncDatabaseClient.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from datetime import datetime, timedelta

Check warning on line 1 in collector_db/AsyncDatabaseClient.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_db/AsyncDatabaseClient.py#L1 <100>

Missing docstring in public module
Raw output
./collector_db/AsyncDatabaseClient.py:1:1: D100 Missing docstring in public module
from functools import wraps
from typing import Optional, Type, Any, List

from fastapi import HTTPException
from sqlalchemy import select, exists, func, case, desc, Select, not_, and_, or_, update, Delete, Insert, asc
from sqlalchemy import select, exists, func, case, desc, Select, not_, and_, or_, update, Delete, Insert, asc, delete

Check warning on line 6 in collector_db/AsyncDatabaseClient.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_db/AsyncDatabaseClient.py#L6 <401>

'sqlalchemy.or_' imported but unused
Raw output
./collector_db/AsyncDatabaseClient.py:6:1: F401 'sqlalchemy.or_' imported but unused

Check warning on line 6 in collector_db/AsyncDatabaseClient.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_db/AsyncDatabaseClient.py#L6 <401>

'sqlalchemy.Delete' imported but unused
Raw output
./collector_db/AsyncDatabaseClient.py:6:1: F401 'sqlalchemy.Delete' imported but unused

Check warning on line 6 in collector_db/AsyncDatabaseClient.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_db/AsyncDatabaseClient.py#L6 <401>

'sqlalchemy.Insert' imported but unused
Raw output
./collector_db/AsyncDatabaseClient.py:6:1: F401 'sqlalchemy.Insert' imported but unused
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import selectinload, joinedload, QueryableAttribute, aliased
Expand Down Expand Up @@ -1615,3 +1616,15 @@
logs = raw_results.scalars().all()
return ([LogOutputInfo(**log.__dict__) for log in logs])

@session_manager
async def delete_old_logs(self, session):
"""
Delete logs older than a day
"""
statement = delete(Log).where(
Log.created_at < datetime.now() - timedelta(days=1)
)
await session.execute(statement)



Check warning on line 1630 in collector_db/AsyncDatabaseClient.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] collector_db/AsyncDatabaseClient.py#L1630 <391>

blank line at end of file
Raw output
./collector_db/AsyncDatabaseClient.py:1630:1: W391 blank line at end of file
9 changes: 0 additions & 9 deletions collector_db/DatabaseClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,6 @@ def get_batch_status(self, session, batch_id: int) -> BatchStatus:
batch = session.query(Batch).filter_by(id=batch_id).first()
return BatchStatus(batch.status)

@session_manager
def delete_old_logs(self, session):
"""
Delete logs older than a day
"""
session.query(Log).filter(
Log.created_at < datetime.now() - timedelta(days=1)
).delete()

@session_manager
def update_url(self, session, url_info: URLInfo):
url = session.query(URL).filter_by(id=url_info.id).first()
Expand Down
40 changes: 8 additions & 32 deletions core/ScheduledTaskManager.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,9 @@
from datetime import datetime, timedelta

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger

from collector_db.DatabaseClient import DatabaseClient
from core.AsyncCore import AsyncCore


class ScheduledTaskManager:

def __init__(self, db_client: DatabaseClient):
# Dependencies
self.db_client = db_client

# Main objects
self.scheduler = BackgroundScheduler()
self.scheduler.start()
self.add_scheduled_tasks()

# Jobs
self.delete_old_logs_job = None


def add_scheduled_tasks(self):
self.delete_old_logs_job = self.scheduler.add_job(
self.db_client.delete_old_logs,
trigger=IntervalTrigger(
days=1,
start_date=datetime.now() + timedelta(minutes=10)
)
)

def shutdown(self):
if self.scheduler.running:
self.scheduler.shutdown()

class AsyncScheduledTaskManager:

def __init__(self, async_core: AsyncCore):
Expand All @@ -49,6 +17,7 @@ def __init__(self, async_core: AsyncCore):

# Jobs
self.run_cycles_job = None
self.delete_logs_job = None

def add_scheduled_tasks(self):
self.run_cycles_job = self.scheduler.add_job(
Expand All @@ -59,6 +28,13 @@ def add_scheduled_tasks(self):
),
misfire_grace_time=60
)
self.delete_logs_job = self.scheduler.add_job(
self.async_core.adb_client.delete_old_logs,
trigger=IntervalTrigger(
days=1,
start_date=datetime.now() + timedelta(minutes=10)
)
)

def shutdown(self):
if self.scheduler.running:
Expand Down
21 changes: 0 additions & 21 deletions core/SourceCollectorCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,17 @@


from collector_db.DatabaseClient import DatabaseClient
from core.ScheduledTaskManager import ScheduledTaskManager
from core.enums import BatchStatus


class SourceCollectorCore:
def __init__(
self,
db_client: Optional[DatabaseClient] = None,
dev_mode: bool = False
):
if db_client is None:
db_client = DatabaseClient()
self.db_client = db_client
if not dev_mode:
self.scheduled_task_manager = ScheduledTaskManager(db_client=db_client)
else:
self.scheduled_task_manager = None


def get_status(self, batch_id: int) -> BatchStatus:
return self.db_client.get_batch_status(batch_id)


def shutdown(self):
if self.scheduled_task_manager is not None:
self.scheduled_task_manager.shutdown()





"""
TODO: Add logic for batch processing

"""
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def test_delete_old_logs(db_data_creator: DBDataCreator):
db_client.insert_logs(log_infos=log_infos)
logs = await adb_client.get_logs_by_batch_id(batch_id=batch_id)
assert len(logs) == 3
db_client.delete_old_logs()
await adb_client.delete_old_logs()

logs = await adb_client.get_logs_by_batch_id(batch_id=batch_id)
assert len(logs) == 0
Expand Down
2 changes: 0 additions & 2 deletions tests/test_automated/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
def test_core(db_client_test):
core = SourceCollectorCore(
db_client=db_client_test,
dev_mode=True
)
yield core
core.shutdown()


@pytest.fixture
Expand Down