Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ repos:
- id: end-of-file-fixer

- repo: https://github.com/crate-ci/typos
rev: v1.38.1
rev: v1.40.0
hooks:
- id: typos

Expand Down
115 changes: 115 additions & 0 deletions docs/tutorial/result_backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
---
title: Result Backend
---

## Basic usage

You can store task results in Postgres using one of result backend classes from this package.

You can define your broker with result backend like this:

```python
import asyncio
from taskiq import TaskiqBroker
# 1. Import AsyncpgBroker and AsyncpgResultBackend (or other result backend you want to use)
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend

# 2. Define your broker with result backend
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn=dsn)

# 3. Register task
@broker.task(task_name="answer_for_everything")
async def answer_for_everything() -> None:
await asyncio.sleep(2)
return 42

async def main():
# 4. Start broker, call task and wait for result
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
```

After running this code, you should see `42` printed in the console. Plus the result will be stored in the Postgres database in `taskiq_results` (by default).

## Customization

You can customize the result backend by providing additional parameters to the constructor.

- `keep_results` - whatever to keep results after they are fetched. Default is `True`. Suitable if you don't want to store results forever.
- `table_name` - name of the table to store results in. Default is `taskiq_results`.
- `field_for_task_id` - type of the field to store task IDs. Default is `VarChar`. But you can pick `Uuid` or `Text` if you want.
- `serializer` - serializer to use for serializing results. Default is `PickleSerializer`. But if you want human readable results you can use `JsonSerializer` from `taskiq.serializers` for example.

## Task progress

You can also store task progress using result backend. To do this, you need to use `set_progress` method from `ProgressTracker`:

```python
import asyncio
from taskiq import TaskiqBroker
# 1. Import AsyncpgBroker and AsyncpgResultBackend (or other result backend you want to use)
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend

# 2. Define your broker with result backend
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn=dsn)

# 3. Register task
@broker.task("solve_all_problems")
async def best_task_ever(
progress_tracker: ProgressTracker[Any] = TaskiqDepends(), # noqa: B008
) -> int:
# 4. Set progress with state
state_dict = {"start_message": "Starting to solve problems"}
await progress_tracker.set_progress(TaskState.STARTED, state_dict)

await asyncio.sleep(2)

# You can also use custom states, but progress will be rewritten on each call (it's update not merge)
state_dict.update({"halfway_message": "Halfway done!"})
await progress_tracker.set_progress("halfway", state_dict)
await progress_tracker.set_progress(TaskState.STARTED, state_dict)

await asyncio.sleep(2)

return 42

async def main():
# 5. Start broker
await broker.startup()
task = await best_task_ever.kiq()

# 6. Check progress on start
await asyncio.sleep(1)
print(await task.get_progress())

# 7. Check progress on halfway
await asyncio.sleep(2)
print(await task.get_progress())

# 8. Get final result
print(await task.wait_result())

await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
```

If you run this code, you should see something like this in the console:

```bash
> uv run python -m examples.example_with_progress

state='STARTED' meta={'start_message': 'Starting to solve problems'}
state='STARTED' meta={'start_message': 'Starting to solve problems', 'halfway_message': 'Halfway done!'}
is_err=False log=None return_value=42 execution_time=4.01 labels={} error=None
```
61 changes: 61 additions & 0 deletions examples/example_with_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
How to run:

1) Run worker in one terminal:
uv run taskiq worker examples.example_with_progress:broker --workers 1

2) Run this script in another terminal:
uv run python -m examples.example_with_progress
"""

import asyncio
from typing import Any

from taskiq import TaskiqDepends
from taskiq.depends.progress_tracker import ProgressTracker, TaskState

from taskiq_pg.psycopg import PsycopgBroker, PsycopgResultBackend


dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = PsycopgBroker(dsn).with_result_backend(PsycopgResultBackend(dsn))


@broker.task("solve_all_problems")
async def best_task_ever(
progress_tracker: ProgressTracker[Any] = TaskiqDepends(), # noqa: B008
) -> int:
state_dict = {"start_message": "Starting to solve problems"}
await progress_tracker.set_progress(TaskState.STARTED, state_dict)

await asyncio.sleep(2)

state_dict.update({"halfway_message": "Halfway done!"})
await progress_tracker.set_progress("halfway", state_dict)
await progress_tracker.set_progress(TaskState.STARTED, state_dict)

await asyncio.sleep(2)

return 42


async def main():
await broker.startup()
task = await best_task_ever.kiq()

# check progress on start
await asyncio.sleep(1)
print(await task.get_progress())

# check progress on halfway
await asyncio.sleep(2)
print(await task.get_progress())

# get final result
print(await task.wait_result())

await broker.shutdown()


if __name__ == "__main__":
asyncio.run(main())
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ nav:
- Overview:
- index.md
- Tutorial:
- tutorial/result_backend.md
- tutorial/schedule_source.md
- tutorial/common_issues.md
- API:
Expand Down
30 changes: 15 additions & 15 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,48 +46,48 @@ aiopg = [
"aiopg>=1.4.0",
]
asyncpg = [
"asyncpg>=0.30.0",
"asyncpg>=0.31.0",
]
psqlpy = [
"psqlpy>=0.11.6",
"psqlpy>=0.11.11",
]
psycopg = [
"psycopg[binary,pool]>=3.2.10",
"psycopg[binary,pool]>=3.3.2",
]

[dependency-groups]
dev = [
{include-group = "lint"},
{include-group = "test"},
{include-group = "docs"},
"prek>=0.2.8",
"prek>=0.2.19",
]
lint = [
"ruff>=0.14.0",
"bandit>=1.8.6",
"ruff>=0.14.8",
"bandit>=1.9.2",
"codespell>=2.4.1",
"zizmor>=1.15.2",
"zizmor>=1.18.0",
# type check
"mypy>=1.18.1",
"asyncpg-stubs>=0.30.2",
"mypy>=1.19.0",
"asyncpg-stubs>=0.31.0",
]
test = [
"polyfactory>=2.22.2",
"pytest>=8.4.2",
"pytest-asyncio>=1.1.0",
"polyfactory>=3.1.0",
"pytest>=9.0.1",
"pytest-asyncio>=1.3.0",
"pytest-cov>=7.0.0",
# for database in tests
"sqlalchemy-utils>=0.42.0",
# for faster asyncio loop in tests
"uvloop>=0.22.1",
]
docs = [
"mkdocs-material>=9.6.22",
"mkdocstrings-python>=1.18.2",
"mkdocs-material>=9.7.0",
"mkdocstrings-python>=2.0.1",
]

[build-system]
requires = ["uv_build>=0.8.14,<0.9.0"]
requires = ["uv_build>=0.9,<0.10"]
build-backend = "uv_build"

[tool.uv.build-backend]
Expand Down
7 changes: 1 addition & 6 deletions src/taskiq_pg/_internal/result_backend.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
from __future__ import annotations

import abc
import typing as tp

from taskiq import AsyncResultBackend
from taskiq.abc.serializer import TaskiqSerializer
from taskiq.serializers import PickleSerializer


if tp.TYPE_CHECKING:
from taskiq.abc.serializer import TaskiqSerializer


ReturnType = tp.TypeVar("ReturnType")


Expand Down
22 changes: 19 additions & 3 deletions src/taskiq_pg/aiopg/queries.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,40 @@
CREATE_TABLE_QUERY = """
CREATE TABLE IF NOT EXISTS {} (
task_id {} UNIQUE,
result BYTEA
result BYTEA,
progress BYTEA
)
"""

ADD_PROGRESS_COLUMN_QUERY = """
ALTER TABLE {} ADD COLUMN IF NOT EXISTS progress BYTEA;
"""

CREATE_INDEX_QUERY = """
CREATE INDEX IF NOT EXISTS {}_task_id_idx ON {} USING HASH (task_id)
"""

INSERT_RESULT_QUERY = """
INSERT INTO {} VALUES (%s, %s)
INSERT INTO {} VALUES (%s, %s, NULL)
ON CONFLICT (task_id)
DO UPDATE
SET result = %s
"""

INSERT_PROGRESS_QUERY = """
INSERT INTO {} VALUES (%s, NULL, %s)
ON CONFLICT (task_id)
DO UPDATE
SET progress = %s
"""

SELECT_PROGRESS_QUERY = """
SELECT progress FROM {} WHERE task_id = %s
"""

IS_RESULT_EXISTS_QUERY = """
SELECT EXISTS(
SELECT 1 FROM {} WHERE task_id = %s
SELECT 1 FROM {} WHERE task_id = %s and result IS NOT NULL
)
"""

Expand Down
Loading