Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 94 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,125 @@

PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers.

See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/).
See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/) or [examples directory](https://github.com/danfimov/taskiq-postgres/examples).

## Installation

Depend on your preferred PostgreSQL driver, you can install this library:
Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:

=== "asyncpg"
```bash
# with asyncpg
pip install taskiq-postgres[asyncpg]

```bash
pip install taskiq-postgres[asyncpg]
```
# with psqlpy
pip install taskiq-postgres[psqlpy]

=== "psqlpy"
# with aiopg
pip install taskiq-postgres[aiopg]
```

```bash
pip install taskiq-postgres[psqlpy]
```
## Quick start

=== "aiopg"
### Basic task processing

```bash
pip install taskiq-postgres[aiopg]
```
1. Define your broker with [asyncpg](https://github.com/MagicStack/asyncpg):

```python
# broker_example.py
import asyncio
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend

## Usage example

Simple example of usage with [asyncpg](https://github.com/MagicStack/asyncpg):
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))

```python
# broker.py
import asyncio

from taskiq_pg.asyncpg import AsyncpgResultBackend, AsyncpgBroker
@broker.task("solve_all_problems")
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print("All problems are solved!")

result_backend = AsyncpgResultBackend(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
)

broker = AsyncpgBroker(
dsn="postgres://postgres:postgres@localhost:5432/postgres",
).with_result_backend(result_backend)
async def main():
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()


@broker.task
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(5.5)
print("All problems are solved!")
if __name__ == "__main__":
asyncio.run(main())
```

2. Start a worker to process tasks (by default taskiq runs two instances of worker):

async def main():
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()
```bash
taskiq worker broker_example:broker
```

3. Run `broker_example.py` file to send a task to the worker:

if __name__ == "__main__":
asyncio.run(main())
```
```bash
python broker_example.py
```

Your experience with other drivers will be pretty similar. Just change the import statement and that's it.

### Task scheduling

1. Define your broker and schedule source:

```python
# scheduler_example.py
import asyncio
from taskiq import TaskiqScheduler
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn)
scheduler = TaskiqScheduler(
broker=broker,
sources=[AsyncpgScheduleSource(
dsn=dsn,
broker=broker,
)],
)


@broker.task(
task_name="solve_all_problems",
schedule=[
{
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None, # type: str | timedelta | None, can be omitted.
"time": None, # type: datetime | None, either cron or time should be specified.
"args": [], # type list[Any] | None, can be omitted.
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
"labels": {}, # type: dict[str, Any] | None, can be omitted.
},
],
)
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print("All problems are solved!")

```

2. Start worker processes:

```bash
taskiq worker scheduler_example:broker
```

3. Run scheduler process:

```bash
taskiq scheduler scheduler_example:scheduler
```

## Motivation

There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality.
Expand Down
36 changes: 35 additions & 1 deletion docs/contributing.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
---
title: Contributing
title: Contributing and Development
---

## Development

This project uses modern Python development tools:

- [uv](https://github.com/astral-sh/uv) — fast Python package installer and resolver
- [ruff](https://github.com/astral-sh/ruff) — extremely fast Python linter and formatter

### Setup Development Environment

```bash
# Clone the repository
git clone https://github.com/danfimov/taskiq-postgres.git
cd taskiq-postgres

# Create a virtual environment (optional but recommended)
make venv

# Install dependencies
make init
```

You can see other useful commands by running `make help`.


## Contributing

Contributions are welcome! Please:

1. Fork the repository
2. Create a feature branch
3. Add tests for new functionality
4. Ensure all tests pass
5. Submit a pull request
Loading