diff --git a/README.md b/README.md index 70f3079..3e56de3 100644 --- a/README.md +++ b/README.md @@ -9,70 +9,125 @@ PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers. -See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/). +See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/) or [examples directory](https://github.com/danfimov/taskiq-postgres/examples). ## Installation -Depend on your preferred PostgreSQL driver, you can install this library: +Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra: -=== "asyncpg" +```bash +# with asyncpg +pip install taskiq-postgres[asyncpg] - ```bash - pip install taskiq-postgres[asyncpg] - ``` +# with psqlpy +pip install taskiq-postgres[psqlpy] -=== "psqlpy" +# with aiopg +pip install taskiq-postgres[aiopg] +``` - ```bash - pip install taskiq-postgres[psqlpy] - ``` +## Quick start -=== "aiopg" +### Basic task processing - ```bash - pip install taskiq-postgres[aiopg] - ``` +1. Define your broker with [asyncpg](https://github.com/MagicStack/asyncpg): + ```python + # broker_example.py + import asyncio + from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend -## Usage example -Simple example of usage with [asyncpg](https://github.com/MagicStack/asyncpg): + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" + broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn)) -```python -# broker.py -import asyncio -from taskiq_pg.asyncpg import AsyncpgResultBackend, AsyncpgBroker + @broker.task("solve_all_problems") + async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") -result_backend = AsyncpgResultBackend( - dsn="postgres://postgres:postgres@localhost:5432/postgres", -) -broker = AsyncpgBroker( - dsn="postgres://postgres:postgres@localhost:5432/postgres", -).with_result_backend(result_backend) + async def main(): + await broker.startup() + task = await best_task_ever.kiq() + print(await task.wait_result()) + await broker.shutdown() -@broker.task -async def best_task_ever() -> None: - """Solve all problems in the world.""" - await asyncio.sleep(5.5) - print("All problems are solved!") + if __name__ == "__main__": + asyncio.run(main()) + ``` +2. Start a worker to process tasks (by default taskiq runs two instances of worker): -async def main(): - await broker.startup() - task = await best_task_ever.kiq() - print(await task.wait_result()) - await broker.shutdown() + ```bash + taskiq worker broker_example:broker + ``` +3. Run `broker_example.py` file to send a task to the worker: -if __name__ == "__main__": - asyncio.run(main()) -``` + ```bash + python broker_example.py + ``` Your experience with other drivers will be pretty similar. Just change the import statement and that's it. +### Task scheduling + +1. Define your broker and schedule source: + + ```python + # scheduler_example.py + import asyncio + from taskiq import TaskiqScheduler + from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource + + + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" + broker = AsyncpgBroker(dsn) + scheduler = TaskiqScheduler( + broker=broker, + sources=[AsyncpgScheduleSource( + dsn=dsn, + broker=broker, + )], + ) + + + @broker.task( + task_name="solve_all_problems", + schedule=[ + { + "cron": "*/1 * * * *", # type: str, either cron or time should be specified. + "cron_offset": None, # type: str | timedelta | None, can be omitted. + "time": None, # type: datetime | None, either cron or time should be specified. + "args": [], # type list[Any] | None, can be omitted. + "kwargs": {}, # type: dict[str, Any] | None, can be omitted. + "labels": {}, # type: dict[str, Any] | None, can be omitted. + }, + ], + ) + async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") + + ``` + +2. Start worker processes: + + ```bash + taskiq worker scheduler_example:broker + ``` + +3. Run scheduler process: + + ```bash + taskiq scheduler scheduler_example:scheduler + ``` + ## Motivation There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality. diff --git a/docs/contributing.md b/docs/contributing.md index 59130b6..587a305 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -1,3 +1,37 @@ --- -title: Contributing +title: Contributing and Development --- + +## Development + +This project uses modern Python development tools: + +- [uv](https://github.com/astral-sh/uv) — fast Python package installer and resolver +- [ruff](https://github.com/astral-sh/ruff) — extremely fast Python linter and formatter + +### Setup Development Environment + +```bash +# Clone the repository +git clone https://github.com/danfimov/taskiq-postgres.git +cd taskiq-postgres + +# Create a virtual environment (optional but recommended) +make venv + +# Install dependencies +make init +``` + +You can see other useful commands by running `make help`. + + +## Contributing + +Contributions are welcome! Please: + +1. Fork the repository +2. Create a feature branch +3. Add tests for new functionality +4. Ensure all tests pass +5. Submit a pull request diff --git a/docs/index.md b/docs/index.md index e6a9f7d..497944b 100644 --- a/docs/index.md +++ b/docs/index.md @@ -2,4 +2,290 @@ title: Overview --- ---8<-- "README.md" +[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/taskiq-postgres?style=for-the-badge&logo=python)](https://pypi.org/project/taskiq-postgres/) +[![PyPI](https://img.shields.io/pypi/v/taskiq-postgres?style=for-the-badge&logo=pypi)](https://pypi.org/project/taskiq-postgres/) +[![Checks](https://img.shields.io/github/check-runs/danfimov/taskiq-postgres/main?nameFilter=Tests%20(3.12)&style=for-the-badge)](https://github.com/danfimov/taskiq-postgres) + +
+ +
+
+ +PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers. + +## Motivation + +There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality. +To address this issue I created this library with a common interface for most popular PostgreSQL drivers that handle similarity across functionality of: + +- result backends; +- brokers; +- schedule sources. + + +## Installation + +Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra: + +=== "asyncpg" + + ```bash + pip install taskiq-postgres[asyncpg] + ``` + +=== "psqlpy" + + ```bash + pip install taskiq-postgres[psqlpy] + ``` + +=== "aiopg" + + ```bash + pip install taskiq-postgres[aiopg] + ``` + +## Quick start + +### Basic task processing + +1. Define your broker: + + === "asyncpg" + + ```python + # broker_example.py + import asyncio + from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend + + + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" + broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn)) + + + @broker.task("solve_all_problems") + async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") + + + async def main(): + await broker.startup() + task = await best_task_ever.kiq() + print(await task.wait_result()) + await broker.shutdown() + + + if __name__ == "__main__": + asyncio.run(main()) + ``` + + === "psqlpy" + + ```python + # broker_example.py + import asyncio + from taskiq_pg.psqlpy import PSQLPyBroker, PSQLPyResultBackend + + + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" + broker = PSQLPyBroker(dsn).with_result_backend(PSQLPyResultBackend(dsn)) + + + @broker.task("solve_all_problems") + async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") + + + async def main(): + await broker.startup() + task = await best_task_ever.kiq() + print(await task.wait_result()) + await broker.shutdown() + + + if __name__ == "__main__": + asyncio.run(main()) + ``` + + === "aiopg" + + ```python + # broker_example.py + import asyncio + from taskiq_pg.aiopg import AiopgBroker, AiopgResultBackend + + + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" + broker = AiopgBroker(dsn).with_result_backend(AiopgResultBackend(dsn)) + + + @broker.task("solve_all_problems") + async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") + + + async def main(): + await broker.startup() + task = await best_task_ever.kiq() + print(await task.wait_result()) + await broker.shutdown() + + + if __name__ == "__main__": + asyncio.run(main()) + ``` + +2. Start a worker to process tasks (by default taskiq runs two instances of worker): + + ```bash + taskiq worker broker_example:broker + ``` + +3. Run `broker_example.py` file to send a task to the worker: + + ```bash + python broker_example.py + ``` + +Your experience with other drivers will be pretty similar. Just change the import statement and that's it. + +### Task scheduling + +1. Define your broker and schedule source: + + === "asyncpg" + + ```python + # scheduler_example.py + import asyncio + from taskiq import TaskiqScheduler + from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource + + + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" + broker = AsyncpgBroker(dsn) + scheduler = TaskiqScheduler( + broker=broker, + sources=[AsyncpgScheduleSource( + dsn=dsn, + broker=broker, + )], + ) + + + @broker.task( + task_name="solve_all_problems", + schedule=[ + { + "cron": "*/1 * * * *", # type: str, either cron or time should be specified. + "cron_offset": None, # type: str | timedelta | None, can be omitted. + "time": None, # type: datetime | None, either cron or time should be specified. + "args": [], # type list[Any] | None, can be omitted. + "kwargs": {}, # type: dict[str, Any] | None, can be omitted. + "labels": {}, # type: dict[str, Any] | None, can be omitted. + }, + ], + ) + async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") + ``` + + === "psqlpy" + + ```python + # scheduler_example.py + import asyncio + from taskiq import TaskiqScheduler + from taskiq_pg.psqlpy import PSQLPyBroker, PSQLPyScheduleSource + + + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" + broker = PSQLPyBroker(dsn) + scheduler = TaskiqScheduler( + broker=broker, + sources=[PSQLPyScheduleSource( + dsn=dsn, + broker=broker, + )], + ) + + + @broker.task( + task_name="solve_all_problems", + schedule=[ + { + "cron": "*/1 * * * *", # type: str, either cron or time should be specified. + "cron_offset": None, # type: str | timedelta | None, can be omitted. + "time": None, # type: datetime | None, either cron or time should be specified. + "args": [], # type list[Any] | None, can be omitted. + "kwargs": {}, # type: dict[str, Any] | None, can be omitted. + "labels": {}, # type: dict[str, Any] | None, can be omitted. + }, + ], + ) + async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") + + ``` + + === "aiopg" + + ```python + # scheduler_example.py + import asyncio + from taskiq import TaskiqScheduler + from taskiq_pg.aiopg import AiopgBroker, AiopgScheduleSource + + + dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" + broker = AiopgBroker(dsn) + scheduler = TaskiqScheduler( + broker=broker, + sources=[AiopgScheduleSource( + dsn=dsn, + broker=broker, + )], + ) + + + @broker.task( + task_name="solve_all_problems", + schedule=[ + { + "cron": "*/1 * * * *", # type: str, either cron or time should be specified. + "cron_offset": None, # type: str | timedelta | None, can be omitted. + "time": None, # type: datetime | None, either cron or time should be specified. + "args": [], # type list[Any] | None, can be omitted. + "kwargs": {}, # type: dict[str, Any] | None, can be omitted. + "labels": {}, # type: dict[str, Any] | None, can be omitted. + }, + ], + ) + async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") + + ``` + +2. Start worker processes: + + ```bash + taskiq worker scheduler_example:broker + ``` + +3. Run scheduler process: + + ```bash + taskiq scheduler scheduler_example:scheduler + ``` diff --git a/docs/tutorial/broker.md b/docs/tutorial/broker.md deleted file mode 100644 index 600b22a..0000000 --- a/docs/tutorial/broker.md +++ /dev/null @@ -1,69 +0,0 @@ ---- -title: Broker ---- - -To use broker with PostgreSQL you need to import broker and result backend from this library and provide a address for connection. For example, lets create a file `broker.py` with the following content: - -=== "asyncpg" - - ```python - import asyncio - from taskiq_pg.asyncpg import AsyncpgResultBackend, AsyncpgBroker - - - dsn = "postgres://postgres:postgres@localhost:5432/postgres" - broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn)) - - - @broker.task - async def best_task_ever() -> None: - """Solve all problems in the world.""" - await asyncio.sleep(5.5) - print("All problems are solved!") - - - async def main(): - await broker.startup() - task = await best_task_ever.kiq() - print(await task.wait_result()) - await broker.shutdown() - - - if __name__ == "__main__": - asyncio.run(main()) - ``` - -=== "psqlpy" - - ```python - import asyncio - from taskiq_pg.psqlpy import PSQLPyResultBackend, PSQLPyBroker - - - dsn = "postgres://postgres:postgres@localhost:5432/postgres" - broker = PSQLPyBroker(dsn).with_result_backend(PSQLPyResultBackend(dsn)) - - - @broker.task - async def best_task_ever() -> None: - """Solve all problems in the world.""" - await asyncio.sleep(5.5) - print("All problems are solved!") - - - async def main(): - await broker.startup() - task = await best_task_ever.kiq() - print(await task.wait_result()) - await broker.shutdown() - - - if __name__ == "__main__": - asyncio.run(main()) - ``` - -Then you can run this file with: - -```bash -python broker.py -``` diff --git a/docs/tutorial/common_issues.md b/docs/tutorial/common_issues.md new file mode 100644 index 0000000..a45ffb1 --- /dev/null +++ b/docs/tutorial/common_issues.md @@ -0,0 +1,66 @@ +--- +title: Common Issues +--- + +## Connection Errors + +Ensure your connection string is correct: + + ```python + dsn = "postgresql://username:password@host:port/database" + ``` + +Check PostgreSQL is running and accessible: + + ```python + import asyncpg + + dsn = '...' + conn = await asyncpg.connect(dsn) + await conn.close() + ``` + +## Table Creation Issues + + +Ensure user has `CREATE TABLE` permissions or manually create tables using provided schemas: + +```sql +-- for broker +CREATE TABLE taskiq_queue ( + id SERIAL PRIMARY KEY, + task_id UUID NOT NULL, + task_name VARCHAR NOT NULL, + message BYTEA NOT NULL, + labels JSONB, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +-- for result backend +CREATE TABLE taskiq_results ( + task_id UUID PRIMARY KEY, + result BYTEA, + is_err BOOLEAN DEFAULT FALSE, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +-- for schedule source +CREATE TABLE taskiq_schedules ( + id UUID PRIMARY KEY, + task_name VARCHAR(100) NOT NULL, + schedule JSONB NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); +``` + +This is default schemas. So if you changed table names or other parameters, adjust accordingly. + +## Driver Import Errors + +Install the appropriate driver extra, for example: + +```bash +# for asyncpg +pip install taskiq-postgres[asyncpg] +``` diff --git a/docs/tutorial/quickstart.md b/docs/tutorial/quickstart.md deleted file mode 100644 index f15cbd3..0000000 --- a/docs/tutorial/quickstart.md +++ /dev/null @@ -1,3 +0,0 @@ ---- -title: Getting Started ---- diff --git a/docs/tutorial/schedule_source.md b/docs/tutorial/schedule_source.md index 9b9fbdc..7df5efd 100644 --- a/docs/tutorial/schedule_source.md +++ b/docs/tutorial/schedule_source.md @@ -1,3 +1,51 @@ --- title: Schedule Source --- + + +## Using multiple schedules + +You can use multiple schedules for one task. Just provide a list of schedules to the `@broker.task` decorator: + +```python +import asyncio +from taskiq import TaskiqScheduler +from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource + + +dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" +broker = AsyncpgBroker(dsn) +scheduler = TaskiqScheduler( + broker=broker, + sources=[AsyncpgScheduleSource( + dsn=dsn, + broker=broker, + )], +) + + +@broker.task( + task_name="solve_all_problems", + schedule=[ + { + "cron": "*/1 * * * *", + "cron_offset": None, + "time": None, + "args": ["every minute"], + "kwargs": {}, + "labels": {}, + }, + { + "cron": "0 */1 * * *", + "cron_offset": None, + "time": None, + "args": ["every hour"], + "kwargs": {}, + "labels": {}, + }, + ], +) +async def best_task_ever(message: str) -> None: + await asyncio.sleep(2) + print(f"I run {message}") +``` diff --git a/examples/example_with_schedule_source.py b/examples/example_with_schedule_source.py new file mode 100644 index 0000000..8d3a15f --- /dev/null +++ b/examples/example_with_schedule_source.py @@ -0,0 +1,47 @@ +""" +How to run: + + 1) Run worker in one terminal: + uv run taskiq worker examples.example_with_schedule_source:broker + + 2) Run scheduler in another terminal: + uv run taskiq scheduler examples.example_with_schedule_source:scheduler +""" + +import asyncio + +from taskiq import TaskiqScheduler + +from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource + + +dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres" +broker = AsyncpgBroker(dsn) +scheduler = TaskiqScheduler( + broker=broker, + sources=[ + AsyncpgScheduleSource( + dsn=dsn, + broker=broker, + ), + ], +) + + +@broker.task( + task_name="solve_all_problems", + schedule=[ + { + "cron": "*/1 * * * *", # type: str, either cron or time should be specified. + "cron_offset": None, # type: str | timedelta | None, can be omitted. + "time": None, # type: datetime | None, either cron or time should be specified. + "args": [], # type list[Any] | None, can be omitted. + "kwargs": {}, # type: dict[str, Any] | None, can be omitted. + "labels": {}, # type: dict[str, Any] | None, can be omitted. + }, + ], +) +async def best_task_ever() -> None: + """Solve all problems in the world.""" + await asyncio.sleep(2) + print("All problems are solved!") diff --git a/mkdocs.yml b/mkdocs.yml index fe9b48e..c1c8151 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -9,9 +9,8 @@ nav: - Overview: - index.md - Tutorial: - - tutorial/quickstart.md - - tutorial/broker.md - tutorial/schedule_source.md + - tutorial/common_issues.md - API: - reference.md - Contributing: diff --git a/pyproject.toml b/pyproject.toml index 2e6af73..aa81e74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -105,7 +105,7 @@ omit = [ [tool.ruff] line-length = 120 -target-version = "py39" +target-version = "py310" [tool.ruff.lint] select = ["ALL"] @@ -162,7 +162,7 @@ known-local-folder = ["taskiq_pg"] lines-after-imports = 2 [tool.mypy] -python_version = "3.12" +python_version = "3.10" modules = "taskiq_pg" exclude_gitignore = true platform = "linux" diff --git a/src/taskiq_pg/asyncpg/__init__.py b/src/taskiq_pg/asyncpg/__init__.py index edb94d6..722ba21 100644 --- a/src/taskiq_pg/asyncpg/__init__.py +++ b/src/taskiq_pg/asyncpg/__init__.py @@ -1,8 +1,10 @@ from taskiq_pg.asyncpg.broker import AsyncpgBroker from taskiq_pg.asyncpg.result_backend import AsyncpgResultBackend +from taskiq_pg.asyncpg.schedule_source import AsyncpgScheduleSource __all__ = [ "AsyncpgBroker", "AsyncpgResultBackend", + "AsyncpgScheduleSource", ] diff --git a/src/taskiq_pg/asyncpg/broker.py b/src/taskiq_pg/asyncpg/broker.py index 1b9fe0b..2315152 100644 --- a/src/taskiq_pg/asyncpg/broker.py +++ b/src/taskiq_pg/asyncpg/broker.py @@ -131,7 +131,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: :yields: AckableMessage instances. """ - if self.read_conn is None: + if self.write_pool is None: msg = "Call startup before starting listening." raise ValueError(msg) if self._queue is None: diff --git a/src/taskiq_pg/asyncpg/queries.py b/src/taskiq_pg/asyncpg/queries.py index 89b6173..d7185a0 100644 --- a/src/taskiq_pg/asyncpg/queries.py +++ b/src/taskiq_pg/asyncpg/queries.py @@ -51,3 +51,31 @@ CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message" DELETE_MESSAGE_QUERY = "DELETE FROM {} WHERE id = $1" + +CREATE_SCHEDULES_TABLE_QUERY = """ +CREATE TABLE IF NOT EXISTS {} ( + id UUID PRIMARY KEY, + task_name VARCHAR(100) NOT NULL, + schedule JSONB NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); +""" + +INSERT_SCHEDULE_QUERY = """ +INSERT INTO {} (id, task_name, schedule) +VALUES ($1, $2, $3) +ON CONFLICT (id) DO UPDATE +SET task_name = EXCLUDED.task_name, + schedule = EXCLUDED.schedule, + updated_at = NOW(); +""" + +SELECT_SCHEDULES_QUERY = """ +SELECT id, task_name, schedule +FROM {}; +""" + +DELETE_ALL_SCHEDULES_QUERY = """ +DELETE FROM {}; +""" diff --git a/src/taskiq_pg/asyncpg/result_backend.py b/src/taskiq_pg/asyncpg/result_backend.py index 6888971..5964cc3 100644 --- a/src/taskiq_pg/asyncpg/result_backend.py +++ b/src/taskiq_pg/asyncpg/result_backend.py @@ -111,7 +111,7 @@ async def get_result( ), ) if not self.keep_results: - _ = await self._database_pool.execute( + await self._database_pool.execute( DELETE_RESULT_QUERY.format( self.table_name, ), diff --git a/src/taskiq_pg/asyncpg/schedule_source.py b/src/taskiq_pg/asyncpg/schedule_source.py new file mode 100644 index 0000000..0e5bcb3 --- /dev/null +++ b/src/taskiq_pg/asyncpg/schedule_source.py @@ -0,0 +1,164 @@ +import json +import typing as tp +import uuid +from logging import getLogger + +import asyncpg +from pydantic import ValidationError +from taskiq import ScheduledTask, ScheduleSource +from taskiq.abc.broker import AsyncBroker + +from taskiq_pg.asyncpg.queries import ( + CREATE_SCHEDULES_TABLE_QUERY, + DELETE_ALL_SCHEDULES_QUERY, + INSERT_SCHEDULE_QUERY, + SELECT_SCHEDULES_QUERY, +) + + +logger = getLogger("taskiq_pg.asyncpg_schedule_source") + + +class AsyncpgScheduleSource(ScheduleSource): + """Schedule source that uses asyncpg to store schedules in PostgreSQL.""" + + _database_pool: "asyncpg.Pool[asyncpg.Record]" + + def __init__( + self, + broker: AsyncBroker, + dsn: str | tp.Callable[[], str] = "postgresql://postgres:postgres@localhost:5432/postgres", + table_name: str = "taskiq_schedules", + **connect_kwargs: tp.Any, + ) -> None: + """ + Initialize the PostgreSQL scheduler source. + + Sets up a scheduler source that stores scheduled tasks in a PostgreSQL database. + This scheduler source manages task schedules, allowing for persistent storage and retrieval of scheduled tasks + across application restarts. + + Args: + dsn: PostgreSQL connection string + table_name: Name of the table to store scheduled tasks. Will be created automatically if it doesn't exist. + broker: The TaskIQ broker instance to use for finding and managing tasks. + Required if startup_schedule is provided. + **connect_kwargs: Additional keyword arguments passed to the database connection pool. + + """ + self._broker: tp.Final = broker + self._dsn: tp.Final = dsn + self._table_name: tp.Final = table_name + self._connect_kwargs: tp.Final = connect_kwargs + + @property + def dsn(self) -> str | None: + """ + Get the DSN string. + + Returns the DSN string or None if not set. + """ + if callable(self._dsn): + return self._dsn() + return self._dsn + + async def _update_schedules_on_startup(self, schedules: list[ScheduledTask]) -> None: + """Update schedules in the database on startup: trancate table and insert new ones.""" + async with self._database_pool.acquire() as connection, connection.transaction(): + await connection.execute(DELETE_ALL_SCHEDULES_QUERY.format(self._table_name)) + for schedule in schedules: + schedule.model_dump_json( + exclude={"schedule_id", "task_name"}, + ) + await self._database_pool.execute( + INSERT_SCHEDULE_QUERY.format(self._table_name), + str(schedule.schedule_id), + schedule.task_name, + schedule.model_dump_json( + exclude={"schedule_id", "task_name"}, + ), + ) + + def _get_schedules_from_broker_tasks(self) -> list[ScheduledTask]: + """Extract schedules from the broker's registered tasks.""" + scheduled_tasks_for_creation: list[ScheduledTask] = [] + for task_name, task in self._broker.get_all_tasks().items(): + if "schedule" not in task.labels: + logger.debug("Task %s has no schedule, skipping", task_name) + continue + if not isinstance(task.labels["schedule"], list): + logger.warning( + "Schedule for task %s is not a list, skipping", + task_name, + ) + for schedule in task.labels["schedule"]: + try: + new_schedule = ScheduledTask.model_validate( + { + "task_name": task_name, + "labels": schedule.get("labels", {}), + "args": schedule.get("args", []), + "kwargs": schedule.get("kwargs", {}), + "schedule_id": str(uuid.uuid4()), + "cron": schedule.get("cron", None), + "cron_offset": schedule.get("cron_offset", None), + "time": schedule.get("time", None), + }, + ) + scheduled_tasks_for_creation.append(new_schedule) + except ValidationError: + logger.exception( + "Schedule for task %s is not valid, skipping", + task_name, + ) + continue + return scheduled_tasks_for_creation + + async def startup(self) -> None: + """ + Initialize the schedule source. + + Construct new connection pool, create new table for schedules if not exists + and fill table with schedules from task labels. + """ + self._database_pool = await asyncpg.create_pool( + dsn=self.dsn, + **self._connect_kwargs, + ) + await self._database_pool.execute( + CREATE_SCHEDULES_TABLE_QUERY.format( + self._table_name, + ), + ) + scheduled_tasks_for_creation = self._get_schedules_from_broker_tasks() + await self._update_schedules_on_startup(scheduled_tasks_for_creation) + + async def shutdown(self) -> None: + """Close the connection pool.""" + if getattr(self, "_database_pool", None) is not None: + await self._database_pool.close() + + async def get_schedules(self) -> list["ScheduledTask"]: + """Fetch schedules from the database.""" + async with self._database_pool.acquire() as conn: + rows_with_schedules = await conn.fetch( + SELECT_SCHEDULES_QUERY.format(self._table_name), + ) + schedules = [] + for row in rows_with_schedules: + schedule = json.loads(row["schedule"]) + schedules.append( + ScheduledTask.model_validate( + { + "schedule_id": str(row["id"]), + "task_name": row["task_name"], + "labels": schedule["labels"], + "args": schedule["args"], + "kwargs": schedule["kwargs"], + "cron": schedule["cron"], + "cron_offset": schedule["cron_offset"], + "time": schedule["time"], + }, + ), + ) + return schedules diff --git a/src/taskiq_pg/psqlpy/broker.py b/src/taskiq_pg/psqlpy/broker.py index f1a16fd..5f843a5 100644 --- a/src/taskiq_pg/psqlpy/broker.py +++ b/src/taskiq_pg/psqlpy/broker.py @@ -156,7 +156,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]: :yields: AckableMessage instances. """ - if self.read_conn is None: + if self.write_pool is None: msg = "Call startup before starting listening." raise ValueError(msg) if self._queue is None: