From fc3c45e5cd781bb8ef7b06135c4a6aa66c500f7f Mon Sep 17 00:00:00 2001 From: Anfimov Dima Date: Fri, 10 Oct 2025 10:42:25 +0200 Subject: [PATCH] feat: add schedule source for psqlpy and aiopg --- pyproject.toml | 3 + src/taskiq_pg/_internal/__init__.py | 10 ++ src/taskiq_pg/_internal/schedule_source.py | 49 ++++++ src/taskiq_pg/aiopg/__init__.py | 2 + src/taskiq_pg/aiopg/broker.py | 0 src/taskiq_pg/aiopg/queries.py | 28 ++++ src/taskiq_pg/aiopg/schedule_source.py | 124 ++++++++++++++ src/taskiq_pg/asyncpg/schedule_source.py | 51 +----- src/taskiq_pg/psqlpy/__init__.py | 2 + src/taskiq_pg/psqlpy/queries.py | 28 ++++ src/taskiq_pg/psqlpy/schedule_source.py | 128 +++++++++++++++ tests/integration/test_schedule_source.py | 180 +++++++++++++++++++++ 12 files changed, 559 insertions(+), 46 deletions(-) create mode 100644 src/taskiq_pg/_internal/schedule_source.py create mode 100644 src/taskiq_pg/aiopg/broker.py create mode 100644 src/taskiq_pg/aiopg/schedule_source.py create mode 100644 src/taskiq_pg/psqlpy/schedule_source.py create mode 100644 tests/integration/test_schedule_source.py diff --git a/pyproject.toml b/pyproject.toml index aa81e74..358a242 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -129,6 +129,7 @@ ignore = [ # Conflicted rules "D203", # with D211 "D212", # with D213 + "COM812", # with formatter ] [tool.ruff.lint.per-file-ignores] @@ -146,6 +147,8 @@ ignore = [ "S608", "RUF", + + "PLR2004", # magic numbers in tests ] "tests/test_linting.py" = [ "S603", # subprocess usage diff --git a/src/taskiq_pg/_internal/__init__.py b/src/taskiq_pg/_internal/__init__.py index e69de29..de62da8 100644 --- a/src/taskiq_pg/_internal/__init__.py +++ b/src/taskiq_pg/_internal/__init__.py @@ -0,0 +1,10 @@ +from taskiq_pg._internal.broker import BasePostgresBroker +from taskiq_pg._internal.result_backend import BasePostgresResultBackend +from taskiq_pg._internal.schedule_source import BasePostgresScheduleSource + + +__all__ = [ + "BasePostgresBroker", + "BasePostgresResultBackend", + "BasePostgresScheduleSource", +] diff --git a/src/taskiq_pg/_internal/schedule_source.py b/src/taskiq_pg/_internal/schedule_source.py new file mode 100644 index 0000000..02ab4c6 --- /dev/null +++ b/src/taskiq_pg/_internal/schedule_source.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import typing as tp + +from taskiq import ScheduleSource + + +if tp.TYPE_CHECKING: + from taskiq.abc.broker import AsyncBroker + + +class BasePostgresScheduleSource(ScheduleSource): + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", + table_name: str = "taskiq_schedules", + **connect_kwargs: tp.Any, + ) -> None: + """ + Initialize the PostgreSQL scheduler source. + + Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. + This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks + across application restarts. + + Args: + dsn: PostgreSQL connection string + table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist. + broker: The TaskIQ broker instance to use for finding and managing tasks. + Required if startup_schedule is provided. + **connect_kwargs: Additional keyword arguments passed to the database connection pool. + + """ + self._broker: tp.Final = broker + self._dsn: tp.Final = dsn + self._table_name: tp.Final = table_name + self._connect_kwargs: tp.Final = connect_kwargs + + @property + def dsn(self) -> str | None: + """ + Get the DSN string. + + Returns the DSN string or None if not set. + """ + if callable(self._dsn): + return self._dsn() + return self._dsn diff --git a/src/taskiq_pg/aiopg/__init__.py b/src/taskiq_pg/aiopg/__init__.py index b00bcd1..67fe69f 100644 --- a/src/taskiq_pg/aiopg/__init__.py +++ b/src/taskiq_pg/aiopg/__init__.py @@ -1,6 +1,8 @@ from taskiq_pg.aiopg.result_backend import AiopgResultBackend +from taskiq_pg.aiopg.schedule_source import AiopgScheduleSource __all__ = [ "AiopgResultBackend", + "AiopgScheduleSource", ] diff --git a/src/taskiq_pg/aiopg/broker.py b/src/taskiq_pg/aiopg/broker.py new file mode 100644 index 0000000..e69de29 diff --git a/src/taskiq_pg/aiopg/queries.py b/src/taskiq_pg/aiopg/queries.py index 9b0db7c..8868808 100644 --- a/src/taskiq_pg/aiopg/queries.py +++ b/src/taskiq_pg/aiopg/queries.py @@ -29,3 +29,31 @@ DELETE_RESULT_QUERY = """ DELETE FROM {} WHERE task_id = %s """ + +CREATE_SCHEDULES_TABLE_QUERY = """ +CREATE TABLE IF NOT EXISTS {} ( + id UUID PRIMARY KEY, + task_name VARCHAR(100) NOT NULL, + schedule JSONB NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); +""" + +INSERT_SCHEDULE_QUERY = """ +INSERT INTO {} (id, task_name, schedule) +VALUES (%s, %s, %s) +ON CONFLICT (id) DO UPDATE +SET task_name = EXCLUDED.task_name, + schedule = EXCLUDED.schedule, + updated_at = NOW(); +""" + +SELECT_SCHEDULES_QUERY = """ +SELECT id, task_name, schedule +FROM {}; +""" + +DELETE_ALL_SCHEDULES_QUERY = """ +DELETE FROM {}; +""" diff --git a/src/taskiq_pg/aiopg/schedule_source.py b/src/taskiq_pg/aiopg/schedule_source.py new file mode 100644 index 0000000..f8b4a1a --- /dev/null +++ b/src/taskiq_pg/aiopg/schedule_source.py @@ -0,0 +1,124 @@ +import uuid +from logging import getLogger + +from aiopg import Pool, create_pool +from pydantic import ValidationError +from taskiq import ScheduledTask + +from taskiq_pg import exceptions +from taskiq_pg._internal import BasePostgresScheduleSource +from taskiq_pg.aiopg.queries import ( + CREATE_SCHEDULES_TABLE_QUERY, + DELETE_ALL_SCHEDULES_QUERY, + INSERT_SCHEDULE_QUERY, + SELECT_SCHEDULES_QUERY, +) + + +logger = getLogger("taskiq_pg.aiopg_schedule_source") + + +class AiopgScheduleSource(BasePostgresScheduleSource): + """Schedule source that uses aiopg to store schedules in PostgreSQL.""" + + _database_pool: Pool + + async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) -> None: + """Update schedules in the database on startup: truncate table and insert new ones.""" + async with self._database_pool.acquire() as connection, connection.cursor() as cursor: + await cursor.execute(DELETE_ALL_SCHEDULES_QUERY.format(self._table_name)) + for schedule in schedules: + await cursor.execute( + INSERT_SCHEDULE_QUERY.format(self._table_name), + [ + schedule.schedule_id, + schedule.task_name, + schedule.model_dump_json( + exclude={"schedule_id", "task_name"}, + ), + ], + ) + + def _get_schedules_from_broker_tasks(self) -> list[ScheduledTask]: + """Extract schedules from the broker's registered tasks.""" + scheduled_tasks_for_creation: list[ScheduledTask] = [] + for task_name, task in self._broker.get_all_tasks().items(): + if "schedule" not in task.labels: + logger.debug("Task %s has no schedule, skipping", task_name) + continue + if not isinstance(task.labels["schedule"], list): + logger.warning( + "Schedule for task %s is not a list, skipping", + task_name, + ) + continue + for schedule in task.labels["schedule"]: + try: + new_schedule = ScheduledTask.model_validate( + { + "task_name": task_name, + "labels": schedule.get("labels", {}), + "args": schedule.get("args", []), + "kwargs": schedule.get("kwargs", {}), + "schedule_id": str(uuid.uuid4()), + "cron": schedule.get("cron", None), + "cron_offset": schedule.get("cron_offset", None), + "time": schedule.get("time", None), + }, + ) + scheduled_tasks_for_creation.append(new_schedule) + except ValidationError: + logger.exception( + "Schedule for task %s is not valid, skipping", + task_name, + ) + continue + return scheduled_tasks_for_creation + + async def startup(self) -> None: + """ + Initialize the schedule source. + + Construct new connection pool, create new table for schedules if not exists + and fill table with schedules from task labels. + """ + try: + self._database_pool = await create_pool( + dsn=self.dsn, + **self._connect_kwargs, + ) + async with self._database_pool.acquire() as connection, connection.cursor() as cursor: + await cursor.execute(CREATE_SCHEDULES_TABLE_QUERY.format(self._table_name)) + scheduled_tasks_for_creation = self._get_schedules_from_broker_tasks() + await self._update_schedules_on_startup(scheduled_tasks_for_creation) + except Exception as error: + raise exceptions.DatabaseConnectionError(str(error)) from error + + async def shutdown(self) -> None: + """Close the connection pool.""" + if getattr(self, "_database_pool", None) is not None: + self._database_pool.close() + + async def get_schedules(self) -> list["ScheduledTask"]: + """Fetch schedules from the database.""" + async with self._database_pool.acquire() as connection, connection.cursor() as cursor: + await cursor.execute( + SELECT_SCHEDULES_QUERY.format(self._table_name), + ) + schedules, rows = [], await cursor.fetchall() + for schedule_id, task_name, schedule in rows: + schedules.append( + ScheduledTask.model_validate( + { + "schedule_id": str(schedule_id), + "task_name": task_name, + "labels": schedule["labels"], + "args": schedule["args"], + "kwargs": schedule["kwargs"], + "cron": schedule["cron"], + "cron_offset": schedule["cron_offset"], + "time": schedule["time"], + }, + ), + ) + return schedules diff --git a/src/taskiq_pg/asyncpg/schedule_source.py b/src/taskiq_pg/asyncpg/schedule_source.py index 0e5bcb3..fb0fe36 100644 --- a/src/taskiq_pg/asyncpg/schedule_source.py +++ b/src/taskiq_pg/asyncpg/schedule_source.py @@ -1,13 +1,12 @@ import json -import typing as tp import uuid from logging import getLogger import asyncpg from pydantic import ValidationError -from taskiq import ScheduledTask, ScheduleSource -from taskiq.abc.broker import AsyncBroker +from taskiq import ScheduledTask +from taskiq_pg._internal import BasePostgresScheduleSource from taskiq_pg.asyncpg.queries import ( CREATE_SCHEDULES_TABLE_QUERY, DELETE_ALL_SCHEDULES_QUERY, @@ -19,57 +18,16 @@ logger = getLogger("taskiq_pg.asyncpg_schedule_source") -class AsyncpgScheduleSource(ScheduleSource): +class AsyncpgScheduleSource(BasePostgresScheduleSource): """Schedule source that uses asyncpg to store schedules in PostgreSQL.""" _database_pool: "asyncpg.Pool[asyncpg.Record]" - def __init__( - self, - broker: AsyncBroker, - dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", - table_name: str = "taskiq_schedules", - **connect_kwargs: tp.Any, - ) -> None: - """ - Initialize the PostgreSQL scheduler source. - - Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. - This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks - across application restarts. - - Args: - dsn: PostgreSQL connection string - table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist. - broker: The TaskIQ broker instance to use for finding and managing tasks. - Required if startup_schedule is provided. - **connect_kwargs: Additional keyword arguments passed to the database connection pool. - - """ - self._broker: tp.Final = broker - self._dsn: tp.Final = dsn - self._table_name: tp.Final = table_name - self._connect_kwargs: tp.Final = connect_kwargs - - @property - def dsn(self) -> str | None: - """ - Get the DSN string. - - Returns the DSN string or None if not set. - """ - if callable(self._dsn): - return self._dsn() - return self._dsn - async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) -> None: - """Update schedules in the database on startup: trancate table and insert new ones.""" + """Update schedules in the database on startup: truncate table and insert new ones.""" async with self._database_pool.acquire() as connection, connection.transaction(): await connection.execute(DELETE_ALL_SCHEDULES_QUERY.format(self._table_name)) for schedule in schedules: - schedule.model_dump_json( - exclude={"schedule_id", "task_name"}, - ) await self._database_pool.execute( INSERT_SCHEDULE_QUERY.format(self._table_name), str(schedule.schedule_id), @@ -91,6 +49,7 @@ def _get_schedules_from_broker_tasks(self) -> list[ScheduledTask]: "Schedule for task %s is not a list, skipping", task_name, ) + continue for schedule in task.labels["schedule"]: try: new_schedule = ScheduledTask.model_validate( diff --git a/src/taskiq_pg/psqlpy/__init__.py b/src/taskiq_pg/psqlpy/__init__.py index 92b64b0..7039ddc 100644 --- a/src/taskiq_pg/psqlpy/__init__.py +++ b/src/taskiq_pg/psqlpy/__init__.py @@ -1,8 +1,10 @@ from taskiq_pg.psqlpy.broker import PSQLPyBroker from taskiq_pg.psqlpy.result_backend import PSQLPyResultBackend +from taskiq_pg.psqlpy.schedule_source import PSQLPyScheduleSource __all__ = [ "PSQLPyBroker", "PSQLPyResultBackend", + "PSQLPyScheduleSource", ] diff --git a/src/taskiq_pg/psqlpy/queries.py b/src/taskiq_pg/psqlpy/queries.py index 9770e99..3555c47 100644 --- a/src/taskiq_pg/psqlpy/queries.py +++ b/src/taskiq_pg/psqlpy/queries.py @@ -51,3 +51,31 @@ CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING *" DELETE_MESSAGE_QUERY = "DELETE FROM {} WHERE id = $1" + +CREATE_SCHEDULES_TABLE_QUERY = """ +CREATE TABLE IF NOT EXISTS {} ( + id UUID PRIMARY KEY, + task_name VARCHAR(100) NOT NULL, + schedule JSONB NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); +""" + +INSERT_SCHEDULE_QUERY = """ +INSERT INTO {} (id, task_name, schedule) +VALUES ($1, $2, $3) +ON CONFLICT (id) DO UPDATE +SET task_name = EXCLUDED.task_name, + schedule = EXCLUDED.schedule, + updated_at = NOW(); +""" + +SELECT_SCHEDULES_QUERY = """ +SELECT id, task_name, schedule +FROM {}; +""" + +DELETE_ALL_SCHEDULES_QUERY = """ +DELETE FROM {}; +""" diff --git a/src/taskiq_pg/psqlpy/schedule_source.py b/src/taskiq_pg/psqlpy/schedule_source.py new file mode 100644 index 0000000..f521070 --- /dev/null +++ b/src/taskiq_pg/psqlpy/schedule_source.py @@ -0,0 +1,128 @@ +import uuid +from logging import getLogger + +from psqlpy import ConnectionPool +from pydantic import ValidationError +from taskiq import ScheduledTask + +from taskiq_pg._internal import BasePostgresScheduleSource +from taskiq_pg.psqlpy.queries import ( + CREATE_SCHEDULES_TABLE_QUERY, + DELETE_ALL_SCHEDULES_QUERY, + INSERT_SCHEDULE_QUERY, + SELECT_SCHEDULES_QUERY, +) + + +logger = getLogger("taskiq_pg.psqlpy_schedule_source") + + +class PSQLPyScheduleSource(BasePostgresScheduleSource): + """Schedule source that uses psqlpy to store schedules in PostgreSQL.""" + + _database_pool: ConnectionPool + + async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) -> None: + """Update schedules in the database on startup: truncate table and insert new ones.""" + async with self._database_pool.acquire() as connection, connection.transaction(): + await connection.execute(DELETE_ALL_SCHEDULES_QUERY.format(self._table_name)) + data_to_insert: list = [ + [ + uuid.UUID(schedule.schedule_id), + schedule.task_name, + schedule.model_dump( + exclude={"schedule_id", "task_name"}, + ), + ] + for schedule in schedules + ] + await connection.execute_many( + INSERT_SCHEDULE_QUERY.format(self._table_name), + data_to_insert, + ) + + def _get_schedules_from_broker_tasks(self) -> list[ScheduledTask]: + """Extract schedules from the broker's registered tasks.""" + scheduled_tasks_for_creation: list[ScheduledTask] = [] + for task_name, task in self._broker.get_all_tasks().items(): + if "schedule" not in task.labels: + logger.debug("Task %s has no schedule, skipping", task_name) + continue + if not isinstance(task.labels["schedule"], list): + logger.warning( + "Schedule for task %s is not a list, skipping", + task_name, + ) + continue + for schedule in task.labels["schedule"]: + try: + new_schedule = ScheduledTask.model_validate( + { + "task_name": task_name, + "labels": schedule.get("labels", {}), + "args": schedule.get("args", []), + "kwargs": schedule.get("kwargs", {}), + "schedule_id": str(uuid.uuid4()), + "cron": schedule.get("cron", None), + "cron_offset": schedule.get("cron_offset", None), + "time": schedule.get("time", None), + }, + ) + scheduled_tasks_for_creation.append(new_schedule) + except ValidationError: + logger.exception( + "Schedule for task %s is not valid, skipping", + task_name, + ) + continue + return scheduled_tasks_for_creation + + async def startup(self) -> None: + """ + Initialize the schedule source. + + Construct new connection pool, create new table for schedules if not exists + and fill table with schedules from task labels. + """ + self._database_pool = ConnectionPool( + dsn=self.dsn, + **self._connect_kwargs, + ) + async with self._database_pool.acquire() as connection: + await connection.execute( + CREATE_SCHEDULES_TABLE_QUERY.format( + self._table_name, + ), + ) + scheduled_tasks_for_creation = self._get_schedules_from_broker_tasks() + await self._update_schedules_on_startup(scheduled_tasks_for_creation) + + async def shutdown(self) -> None: + """Close the connection pool.""" + if getattr(self, "_database_pool", None) is not None: + self._database_pool.close() + + async def get_schedules(self) -> list["ScheduledTask"]: + """Fetch schedules from the database.""" + async with self._database_pool.acquire() as connection: + rows_with_schedules = await connection.fetch( + SELECT_SCHEDULES_QUERY.format(self._table_name), + ) + schedules = [] + for row in rows_with_schedules.result(): + schedule = row["schedule"] + schedules.append( + ScheduledTask.model_validate( + { + "schedule_id": str(row["id"]), + "task_name": row["task_name"], + "labels": schedule["labels"], + "args": schedule["args"], + "kwargs": schedule["kwargs"], + "cron": schedule["cron"], + "cron_offset": schedule["cron_offset"], + "time": schedule["time"], + }, + ), + ) + return schedules diff --git a/tests/integration/test_schedule_source.py b/tests/integration/test_schedule_source.py new file mode 100644 index 0000000..62bfb7a --- /dev/null +++ b/tests/integration/test_schedule_source.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import json +import typing as tp +import uuid + +import asyncpg +import pytest + +from taskiq_pg.aiopg import AiopgScheduleSource +from taskiq_pg.asyncpg import AsyncpgScheduleSource +from taskiq_pg.psqlpy import PSQLPyBroker, PSQLPyScheduleSource + + +if tp.TYPE_CHECKING: + from taskiq import ScheduledTask + + +@pytest.mark.integration +@pytest.mark.parametrize( + "schedule_source_class", + [ + PSQLPyScheduleSource, + AiopgScheduleSource, + AsyncpgScheduleSource, + ], +) +async def test_when_broker_has_tasks_with_schedule_labels__then_startup_persists_schedules_in_db( + pg_dsn: str, + schedule_source_class: type[PSQLPyScheduleSource | AiopgScheduleSource | AsyncpgScheduleSource], +) -> None: + # Given: unique table, broker with a task having schedule labels, schedule source + table_name: str = f"taskiq_schedules_{uuid.uuid4().hex}" + broker = PSQLPyBroker(dsn=pg_dsn) + + @broker.task( + task_name="tests:scheduled_task", + schedule=[ + { + "cron": "*/5 * * * *", + "cron_offset": None, + "time": None, + "args": [1, 2], + "kwargs": {"a": "b"}, + "labels": {"source": "test"}, + }, + { + "cron": "0 0 * * *", + "cron_offset": None, + "time": None, + "args": [], + "kwargs": {}, + "labels": {}, + }, + ], + ) + async def scheduled_task() -> None: # noqa: ARG001 + return None + + source = schedule_source_class(dsn=pg_dsn, broker=broker, table_name=table_name) + conn: asyncpg.Connection = await asyncpg.connect(dsn=pg_dsn) + + try: + # When: starting schedule source should truncate table and insert schedules from broker labels + await source.startup() + + # Then: schedules are persisted in DB + cnt: int = tp.cast("int", await conn.fetchval(f"SELECT COUNT(*) FROM {table_name}")) + assert cnt == 2 + + rows = await conn.fetch( + f"SELECT task_name, schedule FROM {table_name} ORDER BY task_name, schedule->>'cron'", + ) + assert len(rows) == 2 # noqa: PLR2004 + assert all(r["task_name"] == "tests:scheduled_task" for r in rows) + + first_schedule = json.loads(tp.cast("str", rows[0]["schedule"])) + assert first_schedule["cron"] == "0 0 * * *" + assert first_schedule["args"] == [] + assert first_schedule["kwargs"] == {} + assert first_schedule["labels"] == {} + + second_schedule = json.loads(tp.cast("str", rows[1]["schedule"])) + assert second_schedule["cron"] == "*/5 * * * *" + assert second_schedule["args"] == [1, 2] + assert second_schedule["kwargs"] == {"a": "b"} + assert second_schedule["labels"] == {"source": "test"} + finally: + await source.shutdown() + try: + await conn.execute(f"DROP TABLE IF EXISTS {table_name}") + finally: + await conn.close() + + +@pytest.mark.integration +@pytest.mark.parametrize( + "schedule_source_class", + [ + PSQLPyScheduleSource, + AiopgScheduleSource, + AsyncpgScheduleSource, + ], +) +async def test_when_schedule_rows_exist_in_db__then_get_schedules_returns_scheduled_tasks( + pg_dsn: str, + schedule_source_class: type[PSQLPyScheduleSource | AiopgScheduleSource | AsyncpgScheduleSource], +) -> None: + # Given: unique table, started schedule source with empty broker schedules, and rows inserted directly into DB + table_name: str = f"taskiq_schedules_{uuid.uuid4().hex}" + broker = PSQLPyBroker(dsn=pg_dsn) + source = schedule_source_class(dsn=pg_dsn, broker=broker, table_name=table_name) + conn: asyncpg.Connection = await asyncpg.connect(dsn=pg_dsn) + + try: + # Create table and ensure it's empty by starting up the source + await source.startup() + + schedule_id1 = uuid.uuid4() + schedule_id2 = uuid.uuid4() + + schedule_payload1 = { + "cron": "*/10 * * * *", + "cron_offset": None, + "time": None, + "args": [42], + "kwargs": {"x": 10}, + "labels": {"foo": "bar"}, + } + schedule_payload2 = { + "cron": "0 1 * * *", + "cron_offset": None, + "time": None, + "args": [], + "kwargs": {}, + "labels": {}, + } + + await conn.execute( + f"INSERT INTO {table_name} (id, task_name, schedule) VALUES ($1, $2, $3)", + str(schedule_id1), + "tests:other_task", + json.dumps(schedule_payload1), + ) + await conn.execute( + f"INSERT INTO {table_name} (id, task_name, schedule) VALUES ($1, $2, $3)", + str(schedule_id2), + "tests:other_task", + json.dumps(schedule_payload2), + ) + + # When: fetching schedules via schedule source + schedules: list[ScheduledTask] = await source.get_schedules() + + # Then: schedules are correctly deserialized and returned + assert len(schedules) == 2 # noqa: PLR2004 + + ids = {s.schedule_id for s in schedules} + assert str(schedule_id1) in ids + assert str(schedule_id2) in ids + + s1 = next(s for s in schedules if s.schedule_id == str(schedule_id1)) + assert s1.task_name == "tests:other_task" + assert s1.args == [42] + assert s1.kwargs == {"x": 10} + assert s1.labels == {"foo": "bar"} + assert s1.cron == "*/10 * * * *" + + s2 = next(s for s in schedules if s.schedule_id == str(schedule_id2)) + assert s2.task_name == "tests:other_task" + assert s2.args == [] + assert s2.kwargs == {} + assert s2.labels == {} + assert s2.cron == "0 1 * * *" + finally: + await source.shutdown() + try: + await conn.execute(f"DROP TABLE IF EXISTS {table_name}") + finally: + await conn.close()