Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,15 @@ def with_queues(self, *queues: Queue) -> Self:
"""
Replace existing queues with new ones.

:param queues: queues to add.
Commonly used in worker entry-point factory functions to narrow which
queue(s) a specific worker process listens to, while the full broker
(with all queues) is still used for publishing via kick().

Note: this only affects which queues the worker *listens* to.
Routing at kick() time is still determined by the ``queue_name`` label
on the message (or the single-queue fallback for unlabelled tasks).
Comment on lines +339 to +345
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated with_queues() docstring claims the "full broker (with all queues) is still used for publishing via kick()" and that this method "only affects which queues the worker listens to". In the current implementation with_queues() replaces self._task_queues, which is also used during startup() for queue declaration/binding and in kick() for the single-queue fallback and DIRECT routing validation. Please reword the docstring to match the actual behavior, or (if the intent is truly listen-only) split the internal state so narrowing only affects consumption while publishing/validation still sees the full queue set.

Copilot uses AI. Check for mistakes.

:param queues: queues to listen to.
:return: self.
"""
self._task_queues = list(queues)
Expand Down Expand Up @@ -372,25 +380,32 @@ async def kick(self, message: BrokerMessage) -> None:
)
delay = parse_val(float, message.labels.get("delay"))

if len(self._task_queues) == 1:
routing_key_name = (
self._task_queues[0].routing_key or self._task_queues[0].name
)
else:
routing_key_name = (
parse_val(
str,
message.labels.get(self._label_for_routing),
)
or ""
)
# An explicit routing label always takes precedence, regardless of how
# many queues are in _task_queues. This is important when with_queues()
# has been used to narrow the listening scope to a single queue: without
# this check the len==1 shortcut below would silently override the label
# and route every outgoing message to the worker's own queue.
routing_key_from_label = parse_val(
str,
message.labels.get(self._label_for_routing),
)
if routing_key_from_label:
routing_key_name = routing_key_from_label
if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in {
queue.routing_key or queue.name for queue in self._task_queues
}:
Comment on lines +393 to 396
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With ExchangeType.DIRECT, the new label-first behavior will now raise IncorrectRoutingKeyError whenever with_queues() has narrowed _task_queues and a message is labelled for a routing key outside that narrowed set. That prevents cross-queue publishing from a narrowed worker even when the target queue exists/bound elsewhere (and is a behavior change from the previous silent misroute). Consider validating against the broker’s full configured queue set (separate from the listen set), or relaxing/adjusting this validation when _task_queues has been narrowed for listening.

Copilot uses AI. Check for mistakes.
raise IncorrectRoutingKeyError(
f"Routing key '{routing_key_name}' is not valid. "
f"Check routing keys and queue names in broker queues.",
)
elif len(self._task_queues) == 1:
# Backward-compatible shortcut: single-queue brokers don't need to
# annotate every task with an explicit queue_name label.
routing_key_name = (
self._task_queues[0].routing_key or self._task_queues[0].name
)
else:
routing_key_name = ""
Comment on lines +383 to +408
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test currently covers the regression scenario described in the PR: calling kick() (or task .kiq()) from a broker instance that has been narrowed to a single queue via with_queues(), while publishing a message with an explicit queue_name label for a different queue. Adding a routing test that reproduces this (and asserts the message lands in the labelled queue) would prevent this from regressing again.

Copilot uses AI. Check for mistakes.

if delay is None:
exchange = await self.write_channel.get_exchange(
Expand Down
Loading