diff --git a/alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py b/alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py new file mode 100644 index 00000000..ed7f9e49 --- /dev/null +++ b/alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py @@ -0,0 +1,37 @@ +"""Add task log + +Revision ID: 88ac26c3b025 +Revises: de0305465e2c +Create Date: 2025-11-16 11:30:25.742630 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +from src.util.alembic_helpers import task_id_column, created_at_column + +# revision identifiers, used by Alembic. +revision: str = '88ac26c3b025' +down_revision: Union[str, None] = 'de0305465e2c' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "tasks__log", + task_id_column(), + sa.Column( + "log", + sa.Text, + nullable=False, + ), + created_at_column(), + sa.PrimaryKeyConstraint("task_id"), + ) + + +def downgrade() -> None: + pass diff --git a/src/core/tasks/base/operator.py b/src/core/tasks/base/operator.py index 719abdf5..ff5ec4e5 100644 --- a/src/core/tasks/base/operator.py +++ b/src/core/tasks/base/operator.py @@ -8,6 +8,7 @@ from src.db.client.async_ import AsyncDatabaseClient from src.db.enums import TaskType from src.db.models.impl.task.enums import TaskStatus +from src.db.models.impl.task.log import TaskLog from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall from src.db.queries.base.builder import QueryBuilderBase @@ -94,6 +95,16 @@ async def add_task_errors( ] await self.adb_client.bulk_insert(inserts) + async def add_task_log( + self, + log: str + ) -> None: + task_log = TaskLog( + task_id=self.task_id, + log=log + ) + await self.adb_client.add(task_log) + # Convenience forwarder functions async def run_query_builder(self, query_builder: QueryBuilderBase) -> Any: return await self.adb_client.run_query_builder(query_builder) \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/add/core.py b/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/add/core.py index e46deed5..d21f1259 100644 --- a/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/add/core.py +++ b/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/add/core.py @@ -25,6 +25,8 @@ async def meets_task_prerequisites(self) -> bool: async def inner_task_logic(self) -> None: request: AddAgenciesOuterRequest = await self.get_request_input() + db_ids: list[int] = [r.request_id for r in request.agencies] + await self.add_task_log(f"Adding agencies with the following db_ids: {db_ids}") responses: list[DSAppSyncAddResponseInnerModel] = await self.make_request(request) await self.insert_ds_app_links(responses) diff --git a/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/delete/core.py b/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/delete/core.py index e84d3b2b..806ba230 100644 --- a/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/delete/core.py +++ b/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/delete/core.py @@ -26,10 +26,14 @@ async def meets_task_prerequisites(self) -> bool: async def inner_task_logic(self) -> None: ds_app_ids: list[int] = await self.get_inputs() + await self.log_ds_app_ids(ds_app_ids) await self.make_request(ds_app_ids) await self.delete_flags(ds_app_ids) await self.delete_links(ds_app_ids) + async def log_ds_app_ids(self, ds_app_ids: list[int]): + await self.add_task_log(f"Deleting agencies with the following ds_app_ids: {ds_app_ids}") + async def get_inputs(self) -> list[int]: return await self.adb_client.run_query_builder( DSAppSyncAgenciesDeleteGetQueryBuilder() diff --git a/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/update/core.py b/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/update/core.py index 24481e8d..814f9a1e 100644 --- a/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/update/core.py +++ b/src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/update/core.py @@ -25,13 +25,17 @@ async def meets_task_prerequisites(self) -> bool: async def inner_task_logic(self) -> None: request: UpdateAgenciesOuterRequest = await self.get_inputs() - await self.make_request(request) ds_app_ids: list[int] = [ agency.app_id for agency in request.agencies ] + await self.log_ds_app_ids(ds_app_ids) + await self.make_request(request) await self.update_links(ds_app_ids) + async def log_ds_app_ids(self, ds_app_ids: list[int]): + await self.add_task_log(f"Updating agencies with the following ds_app_ids: {ds_app_ids}") + async def get_inputs(self) -> UpdateAgenciesOuterRequest: return await self.adb_client.run_query_builder( DSAppSyncAgenciesUpdateGetQueryBuilder() diff --git a/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/add/core.py b/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/add/core.py index 760583fd..6acd74fd 100644 --- a/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/add/core.py +++ b/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/add/core.py @@ -7,7 +7,7 @@ from src.core.tasks.scheduled.impl.sync_to_ds.templates.operator import DSSyncTaskOperatorBase from src.db.enums import TaskType from src.external.pdap.impl.sync.data_sources.add.core import AddDataSourcesRequestBuilder -from src.external.pdap.impl.sync.data_sources.add.request import AddDataSourcesOuterRequest +from src.external.pdap.impl.sync.data_sources.add.request import AddDataSourcesOuterRequest, AddDataSourcesInnerRequest from src.external.pdap.impl.sync.shared.models.add.response import DSAppSyncAddResponseInnerModel @@ -27,9 +27,13 @@ async def meets_task_prerequisites(self) -> bool: async def inner_task_logic(self) -> None: request: AddDataSourcesOuterRequest = await self.get_request_input() + await self.log_db_ids(request.data_sources) responses: list[DSAppSyncAddResponseInnerModel] = await self.make_request(request) await self.insert_ds_app_links(responses) + async def log_db_ids(self, data_sources: list[AddDataSourcesInnerRequest]): + db_ids: list[int] = [d.request_id for d in data_sources] + await self.add_task_log(f"Adding data sources with the following db_ids: {db_ids}") async def get_request_input(self) -> AddDataSourcesOuterRequest: return await self.run_query_builder( diff --git a/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/delete/core.py b/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/delete/core.py index 14450a51..0c5bd53e 100644 --- a/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/delete/core.py +++ b/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/delete/core.py @@ -26,10 +26,14 @@ async def meets_task_prerequisites(self) -> bool: async def inner_task_logic(self) -> None: ds_app_ids: list[int] = await self.get_inputs() + await self.log_ds_app_ids(ds_app_ids) await self.make_request(ds_app_ids) await self.delete_flags(ds_app_ids) await self.delete_links(ds_app_ids) + async def log_ds_app_ids(self, ds_app_ids: list[int]): + await self.add_task_log(f"Deleting data sources with the following ds_app_ids: {ds_app_ids}") + async def get_inputs(self) -> list[int]: return await self.run_query_builder( DSAppSyncDataSourcesDeleteGetQueryBuilder() diff --git a/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/update/core.py b/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/update/core.py index fd925146..0a0c4d21 100644 --- a/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/update/core.py +++ b/src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/update/core.py @@ -25,13 +25,17 @@ async def meets_task_prerequisites(self) -> bool: async def inner_task_logic(self) -> None: request: UpdateDataSourcesOuterRequest = await self.get_inputs() - await self.make_request(request) ds_app_ids: list[int] = [ ds.app_id for ds in request.data_sources ] + await self.log_ds_app_ids(ds_app_ids) + await self.make_request(request) await self.update_links(ds_app_ids) + async def log_ds_app_ids(self, ds_app_ids: list[int]): + await self.add_task_log(f"Updating data sources with the following ds_app_ids: {ds_app_ids}") + async def get_inputs(self) -> UpdateDataSourcesOuterRequest: return await self.adb_client.run_query_builder( DSAppSyncDataSourcesUpdateGetQueryBuilder() diff --git a/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/add/core.py b/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/add/core.py index 6823c205..08ee031d 100644 --- a/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/add/core.py +++ b/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/add/core.py @@ -6,7 +6,7 @@ from src.core.tasks.scheduled.impl.sync_to_ds.templates.operator import DSSyncTaskOperatorBase from src.db.enums import TaskType from src.external.pdap.impl.sync.meta_urls.add.core import AddMetaURLsRequestBuilder -from src.external.pdap.impl.sync.meta_urls.add.request import AddMetaURLsOuterRequest +from src.external.pdap.impl.sync.meta_urls.add.request import AddMetaURLsOuterRequest, AddMetaURLsInnerRequest from src.external.pdap.impl.sync.shared.models.add.response import DSAppSyncAddResponseInnerModel @@ -25,9 +25,14 @@ async def meets_task_prerequisites(self) -> bool: async def inner_task_logic(self) -> None: request: AddMetaURLsOuterRequest = await self.get_request_input() + await self.log_db_ids(request.meta_urls) responses: list[DSAppSyncAddResponseInnerModel] = await self.make_request(request) await self.insert_ds_app_links(responses) + async def log_db_ids(self, meta_urls: list[AddMetaURLsInnerRequest]): + db_ids: list[int] = [m.request_id for m in meta_urls] + await self.add_task_log(f"Adding meta urls with the following db_ids: {db_ids}") + async def get_request_input(self) -> AddMetaURLsOuterRequest: return await self.run_query_builder( DSAppSyncMetaURLsAddGetQueryBuilder() diff --git a/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/delete/core.py b/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/delete/core.py index 32f5ef85..76fc9c4b 100644 --- a/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/delete/core.py +++ b/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/delete/core.py @@ -26,10 +26,14 @@ async def meets_task_prerequisites(self) -> bool: async def inner_task_logic(self) -> None: ds_app_ids: list[int] = await self.get_inputs() + await self.log_ds_app_ids(ds_app_ids) await self.make_request(ds_app_ids) await self.delete_flags(ds_app_ids) await self.delete_links(ds_app_ids) + async def log_ds_app_ids(self, ds_app_ids: list[int]): + await self.add_task_log(f"Deleting meta urls with the following ds_app_ids: {ds_app_ids}") + async def get_inputs(self) -> list[int]: return await self.run_query_builder( DSAppSyncMetaURLsDeleteGetQueryBuilder() diff --git a/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/update/core.py b/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/update/core.py index 3ef8dc28..ff0b06ec 100644 --- a/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/update/core.py +++ b/src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/update/core.py @@ -25,13 +25,17 @@ async def meets_task_prerequisites(self) -> bool: async def inner_task_logic(self) -> None: request: UpdateMetaURLsOuterRequest = await self.get_inputs() - await self.make_request(request) ds_app_ids: list[int] = [ meta_url.app_id for meta_url in request.meta_urls ] + await self.log_ds_app_ids(ds_app_ids) + await self.make_request(request) await self.update_links(ds_app_ids) + async def log_ds_app_ids(self, ds_app_ids: list[int]): + await self.add_task_log(f"Updating meta urls with the following ds_app_ids: {ds_app_ids}") + async def get_inputs(self) -> UpdateMetaURLsOuterRequest: return await self.adb_client.run_query_builder( DSAppSyncMetaURLsUpdateGetQueryBuilder() diff --git a/src/db/models/impl/task/log.py b/src/db/models/impl/task/log.py new file mode 100644 index 00000000..9efd86da --- /dev/null +++ b/src/db/models/impl/task/log.py @@ -0,0 +1,17 @@ +from sqlalchemy import Column, Text, PrimaryKeyConstraint + +from src.db.models.mixins import TaskDependentMixin, UpdatedAtMixin, CreatedAtMixin +from src.db.models.templates_.base import Base + + +class TaskLog( + Base, + TaskDependentMixin, + CreatedAtMixin, +): + __tablename__ = "tasks__log" + __table_args__ = ( + PrimaryKeyConstraint("task_id"), + ) + + log = Column(Text, nullable=False)