Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Add task log

Revision ID: 88ac26c3b025
Revises: de0305465e2c
Create Date: 2025-11-16 11:30:25.742630

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

from src.util.alembic_helpers import task_id_column, created_at_column

# revision identifiers, used by Alembic.
revision: str = '88ac26c3b025'
down_revision: Union[str, None] = 'de0305465e2c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:

Check warning on line 22 in alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py#L22 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py:22:1: D103 Missing docstring in public function
op.create_table(
"tasks__log",
task_id_column(),
sa.Column(
"log",
sa.Text,
nullable=False,
),
created_at_column(),
sa.PrimaryKeyConstraint("task_id"),
)


def downgrade() -> None:

Check warning on line 36 in alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py#L36 <103>

Missing docstring in public function
Raw output
./alembic/versions/2025_11_16_1130-88ac26c3b025_add_task_log.py:36:1: D103 Missing docstring in public function
pass
11 changes: 11 additions & 0 deletions src/core/tasks/base/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from src.db.client.async_ import AsyncDatabaseClient
from src.db.enums import TaskType
from src.db.models.impl.task.enums import TaskStatus
from src.db.models.impl.task.log import TaskLog
from src.db.models.impl.url.task_error.pydantic_.insert import URLTaskErrorPydantic
from src.db.models.impl.url.task_error.pydantic_.small import URLTaskErrorSmall
from src.db.queries.base.builder import QueryBuilderBase
Expand Down Expand Up @@ -94,6 +95,16 @@
]
await self.adb_client.bulk_insert(inserts)

async def add_task_log(

Check warning on line 98 in src/core/tasks/base/operator.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/base/operator.py#L98 <102>

Missing docstring in public method
Raw output
./src/core/tasks/base/operator.py:98:1: D102 Missing docstring in public method
self,
log: str
) -> None:
task_log = TaskLog(
task_id=self.task_id,
log=log
)
await self.adb_client.add(task_log)

# Convenience forwarder functions
async def run_query_builder(self, query_builder: QueryBuilderBase) -> Any:
return await self.adb_client.run_query_builder(query_builder)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ async def meets_task_prerequisites(self) -> bool:

async def inner_task_logic(self) -> None:
request: AddAgenciesOuterRequest = await self.get_request_input()
db_ids: list[int] = [r.request_id for r in request.agencies]
await self.add_task_log(f"Adding agencies with the following db_ids: {db_ids}")
responses: list[DSAppSyncAddResponseInnerModel] = await self.make_request(request)
await self.insert_ds_app_links(responses)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@

async def inner_task_logic(self) -> None:
ds_app_ids: list[int] = await self.get_inputs()
await self.log_ds_app_ids(ds_app_ids)
await self.make_request(ds_app_ids)
await self.delete_flags(ds_app_ids)
await self.delete_links(ds_app_ids)

async def log_ds_app_ids(self, ds_app_ids: list[int]):

Check warning on line 34 in src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/delete/core.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/delete/core.py#L34 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/delete/core.py:34:1: D102 Missing docstring in public method
await self.add_task_log(f"Deleting agencies with the following ds_app_ids: {ds_app_ids}")

async def get_inputs(self) -> list[int]:
return await self.adb_client.run_query_builder(
DSAppSyncAgenciesDeleteGetQueryBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@

async def inner_task_logic(self) -> None:
request: UpdateAgenciesOuterRequest = await self.get_inputs()
await self.make_request(request)
ds_app_ids: list[int] = [
agency.app_id
for agency in request.agencies
]
await self.log_ds_app_ids(ds_app_ids)
await self.make_request(request)
await self.update_links(ds_app_ids)

async def log_ds_app_ids(self, ds_app_ids: list[int]):

Check warning on line 36 in src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/update/core.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/update/core.py#L36 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/sync_to_ds/impl/agencies/update/core.py:36:1: D102 Missing docstring in public method
await self.add_task_log(f"Updating agencies with the following ds_app_ids: {ds_app_ids}")

async def get_inputs(self) -> UpdateAgenciesOuterRequest:
return await self.adb_client.run_query_builder(
DSAppSyncAgenciesUpdateGetQueryBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from src.core.tasks.scheduled.impl.sync_to_ds.templates.operator import DSSyncTaskOperatorBase
from src.db.enums import TaskType
from src.external.pdap.impl.sync.data_sources.add.core import AddDataSourcesRequestBuilder
from src.external.pdap.impl.sync.data_sources.add.request import AddDataSourcesOuterRequest
from src.external.pdap.impl.sync.data_sources.add.request import AddDataSourcesOuterRequest, AddDataSourcesInnerRequest
from src.external.pdap.impl.sync.shared.models.add.response import DSAppSyncAddResponseInnerModel


Expand All @@ -27,9 +27,13 @@

async def inner_task_logic(self) -> None:
request: AddDataSourcesOuterRequest = await self.get_request_input()
await self.log_db_ids(request.data_sources)
responses: list[DSAppSyncAddResponseInnerModel] = await self.make_request(request)
await self.insert_ds_app_links(responses)

async def log_db_ids(self, data_sources: list[AddDataSourcesInnerRequest]):

Check warning on line 34 in src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/add/core.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/add/core.py#L34 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/add/core.py:34:1: D102 Missing docstring in public method
db_ids: list[int] = [d.request_id for d in data_sources]
await self.add_task_log(f"Adding data sources with the following db_ids: {db_ids}")

async def get_request_input(self) -> AddDataSourcesOuterRequest:
return await self.run_query_builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@

async def inner_task_logic(self) -> None:
ds_app_ids: list[int] = await self.get_inputs()
await self.log_ds_app_ids(ds_app_ids)
await self.make_request(ds_app_ids)
await self.delete_flags(ds_app_ids)
await self.delete_links(ds_app_ids)

async def log_ds_app_ids(self, ds_app_ids: list[int]):

Check warning on line 34 in src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/delete/core.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/delete/core.py#L34 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/delete/core.py:34:1: D102 Missing docstring in public method
await self.add_task_log(f"Deleting data sources with the following ds_app_ids: {ds_app_ids}")

async def get_inputs(self) -> list[int]:
return await self.run_query_builder(
DSAppSyncDataSourcesDeleteGetQueryBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@

async def inner_task_logic(self) -> None:
request: UpdateDataSourcesOuterRequest = await self.get_inputs()
await self.make_request(request)
ds_app_ids: list[int] = [
ds.app_id
for ds in request.data_sources
]
await self.log_ds_app_ids(ds_app_ids)
await self.make_request(request)
await self.update_links(ds_app_ids)

async def log_ds_app_ids(self, ds_app_ids: list[int]):

Check warning on line 36 in src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/update/core.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/update/core.py#L36 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/sync_to_ds/impl/data_sources/update/core.py:36:1: D102 Missing docstring in public method
await self.add_task_log(f"Updating data sources with the following ds_app_ids: {ds_app_ids}")

async def get_inputs(self) -> UpdateDataSourcesOuterRequest:
return await self.adb_client.run_query_builder(
DSAppSyncDataSourcesUpdateGetQueryBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from src.core.tasks.scheduled.impl.sync_to_ds.templates.operator import DSSyncTaskOperatorBase
from src.db.enums import TaskType
from src.external.pdap.impl.sync.meta_urls.add.core import AddMetaURLsRequestBuilder
from src.external.pdap.impl.sync.meta_urls.add.request import AddMetaURLsOuterRequest
from src.external.pdap.impl.sync.meta_urls.add.request import AddMetaURLsOuterRequest, AddMetaURLsInnerRequest
from src.external.pdap.impl.sync.shared.models.add.response import DSAppSyncAddResponseInnerModel


Expand All @@ -25,9 +25,14 @@

async def inner_task_logic(self) -> None:
request: AddMetaURLsOuterRequest = await self.get_request_input()
await self.log_db_ids(request.meta_urls)
responses: list[DSAppSyncAddResponseInnerModel] = await self.make_request(request)
await self.insert_ds_app_links(responses)

async def log_db_ids(self, meta_urls: list[AddMetaURLsInnerRequest]):

Check warning on line 32 in src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/add/core.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/add/core.py#L32 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/add/core.py:32:1: D102 Missing docstring in public method
db_ids: list[int] = [m.request_id for m in meta_urls]
await self.add_task_log(f"Adding meta urls with the following db_ids: {db_ids}")

async def get_request_input(self) -> AddMetaURLsOuterRequest:
return await self.run_query_builder(
DSAppSyncMetaURLsAddGetQueryBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@

async def inner_task_logic(self) -> None:
ds_app_ids: list[int] = await self.get_inputs()
await self.log_ds_app_ids(ds_app_ids)
await self.make_request(ds_app_ids)
await self.delete_flags(ds_app_ids)
await self.delete_links(ds_app_ids)

async def log_ds_app_ids(self, ds_app_ids: list[int]):

Check warning on line 34 in src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/delete/core.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/delete/core.py#L34 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/delete/core.py:34:1: D102 Missing docstring in public method
await self.add_task_log(f"Deleting meta urls with the following ds_app_ids: {ds_app_ids}")

async def get_inputs(self) -> list[int]:
return await self.run_query_builder(
DSAppSyncMetaURLsDeleteGetQueryBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@

async def inner_task_logic(self) -> None:
request: UpdateMetaURLsOuterRequest = await self.get_inputs()
await self.make_request(request)
ds_app_ids: list[int] = [
meta_url.app_id
for meta_url in request.meta_urls
]
await self.log_ds_app_ids(ds_app_ids)
await self.make_request(request)
await self.update_links(ds_app_ids)

async def log_ds_app_ids(self, ds_app_ids: list[int]):

Check warning on line 36 in src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/update/core.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/update/core.py#L36 <102>

Missing docstring in public method
Raw output
./src/core/tasks/scheduled/impl/sync_to_ds/impl/meta_urls/update/core.py:36:1: D102 Missing docstring in public method
await self.add_task_log(f"Updating meta urls with the following ds_app_ids: {ds_app_ids}")

async def get_inputs(self) -> UpdateMetaURLsOuterRequest:
return await self.adb_client.run_query_builder(
DSAppSyncMetaURLsUpdateGetQueryBuilder()
Expand Down
17 changes: 17 additions & 0 deletions src/db/models/impl/task/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from sqlalchemy import Column, Text, PrimaryKeyConstraint

Check warning on line 1 in src/db/models/impl/task/log.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/db/models/impl/task/log.py#L1 <100>

Missing docstring in public module
Raw output
./src/db/models/impl/task/log.py:1:1: D100 Missing docstring in public module

from src.db.models.mixins import TaskDependentMixin, UpdatedAtMixin, CreatedAtMixin

Check warning on line 3 in src/db/models/impl/task/log.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/db/models/impl/task/log.py#L3 <401>

'src.db.models.mixins.UpdatedAtMixin' imported but unused
Raw output
./src/db/models/impl/task/log.py:3:1: F401 'src.db.models.mixins.UpdatedAtMixin' imported but unused
from src.db.models.templates_.base import Base


class TaskLog(

Check warning on line 7 in src/db/models/impl/task/log.py

View workflow job for this annotation

GitHub Actions / flake8

[flake8] src/db/models/impl/task/log.py#L7 <101>

Missing docstring in public class
Raw output
./src/db/models/impl/task/log.py:7:1: D101 Missing docstring in public class
Base,
TaskDependentMixin,
CreatedAtMixin,
):
__tablename__ = "tasks__log"
__table_args__ = (
PrimaryKeyConstraint("task_id"),
)

log = Column(Text, nullable=False)