fix: ensure that only one worker processing task#12
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR implements single-delivery guarantees to ensure only one worker processes each task, fixing issue #9. The implementation uses database-level atomic operations to prevent race conditions between multiple workers.
- Adds a
statuscolumn to track message processing state (pending→processing) - Replaces SELECT queries with atomic UPDATE operations that claim messages
- Includes comprehensive integration test to verify single-delivery behavior
Reviewed Changes
Copilot reviewed 13 out of 14 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/integration/test_broker_single_delivery.py | Adds integration test verifying only one worker processes each message |
| src/taskiq_pg/psqlpy/queries.py | Updates table schema and replaces SELECT with atomic CLAIM query |
| src/taskiq_pg/psqlpy/broker.py | Implements atomic message claiming with exception handling |
| src/taskiq_pg/asyncpg/queries.py | Updates table schema and replaces SELECT with atomic CLAIM query |
| src/taskiq_pg/asyncpg/broker.py | Implements atomic message claiming logic |
| pyproject.toml | Updates taskiq dependency and adds linting rule exceptions |
| mkdocs.yml | Adds tutorial navigation structure |
| examples/example_with_broker.py | Adds working example with broker usage |
| docs/tutorial/*.md | Adds basic tutorial documentation structure |
| docs/contributing.md | Adds contributing documentation placeholder |
| README.md | Updates CI badge filter |
Comments suppressed due to low confidence (1)
tests/integration/test_broker_single_delivery.py:1
- [nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
from __future__ import annotations
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| pg_dsn: str, | ||
| broker_class: type[AsyncpgBroker | PSQLPyBroker], | ||
| ) -> None: | ||
| # Given: уникальные имена таблицы и канала, два брокера, одна задача |
There was a problem hiding this comment.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| # Подключение для проверок состояния в таблице | ||
| conn: asyncpg.Connection = await asyncpg.connect(dsn=pg_dsn) | ||
|
|
||
| # Сообщение для публикации |
There was a problem hiding this comment.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| labels={}, | ||
| ) | ||
|
|
||
| # When: стартуем брокеры и два слушателя, публикуем одно сообщение |
There was a problem hiding this comment.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| # Запускаем ожидание первого сообщения у обоих слушателей до публикации, | ||
| # чтобы оба гарантированно получили NOTIFY. |
There was a problem hiding this comment.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
|
|
||
| # Then: только один слушатель получает сообщение |
There was a problem hiding this comment.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| winner_task: asyncio.Task = next(iter(done)) | ||
| ack_message = tp.cast("tp.Any", winner_task.result()) | ||
|
|
||
| # До подтверждения проверяем, что статус в таблице = 'processing' |
There was a problem hiding this comment.
[nitpick] The test contains Russian comments which may hinder code maintainability for international contributors. Consider using English comments for better code accessibility."
| """ | ||
|
|
||
| SELECT_MESSAGE_QUERY = "SELECT * FROM {} WHERE id = $1" | ||
| CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message" |
There was a problem hiding this comment.
The CLAIM_MESSAGE_QUERY only returns 'id' and 'message' fields, but the psqlpy version returns all fields (*). This inconsistency could lead to maintenance issues if additional fields need to be accessed in the future."
| CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING id, message" | |
| CLAIM_MESSAGE_QUERY = "UPDATE {} SET status = 'processing' WHERE id = $1 AND status = 'pending' RETURNING *" |
| CLAIM_MESSAGE_QUERY.format(self.table_name), | ||
| [message_id], | ||
| ) | ||
| except ConnectionExecuteError: # message was claimed by another worker |
There was a problem hiding this comment.
Catching the broad ConnectionExecuteError exception could mask other database connection issues unrelated to message claiming. Consider catching a more specific exception or adding additional error handling to distinguish between claim conflicts and genuine connection problems."
| except ConnectionExecuteError: # message was claimed by another worker | |
| except ConnectionExecuteError as exc: # message was claimed by another worker or other connection issue | |
| # Check if the error is due to a claim conflict (e.g., unique violation) | |
| # Adjust the condition below to match your DB's claim conflict error code/message | |
| if hasattr(exc, "pgcode") and exc.pgcode == "23505": # unique_violation | |
| # Message was claimed by another worker | |
| continue | |
| logger.exception("Database connection error while claiming message") |
closes #9