From d16f182b5a6ca62dfae0ebfcb9245a12eb72b774 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 3 Mar 2026 10:04:35 +0100 Subject: [PATCH] fix: kick() ignores routing label when _task_queues is narrowed to 1 When with_queues() is used to narrow a broker to a single queue (e.g. in a worker factory function), kick() hits the `len == 1` shortcut and hardcodes the routing key to that queue's name, silently ignoring any explicit queue_name label on the message. This breaks the common pattern of using a multi-queue broker with per-worker factory functions, where tasks dispatched from inside a running task need to route to a different queue: broker = AioPikaBroker(..., task_queues=[queue_a, queue_b, queue_c]) @broker.task(queue_name="queue_b") async def mail_task(): ... @broker.task # routes to queue_a by default async def default_task(): await mail_task.kiq() # silently routed to queue_a, NOT queue_b def get_worker_a(): return broker.with_queues(queue_a) # narrows _task_queues to 1 Fix: check the routing label first; only fall back to the single-queue shortcut (backward-compatible) when no explicit label is present. --- taskiq_aio_pika/broker.py | 41 ++++++++++++++++++++++++++------------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/taskiq_aio_pika/broker.py b/taskiq_aio_pika/broker.py index 0cc9601..ce44f41 100644 --- a/taskiq_aio_pika/broker.py +++ b/taskiq_aio_pika/broker.py @@ -336,7 +336,15 @@ def with_queues(self, *queues: Queue) -> Self: """ Replace existing queues with new ones. - :param queues: queues to add. + Commonly used in worker entry-point factory functions to narrow which + queue(s) a specific worker process listens to, while the full broker + (with all queues) is still used for publishing via kick(). + + Note: this only affects which queues the worker *listens* to. + Routing at kick() time is still determined by the ``queue_name`` label + on the message (or the single-queue fallback for unlabelled tasks). + + :param queues: queues to listen to. :return: self. """ self._task_queues = list(queues) @@ -372,18 +380,17 @@ async def kick(self, message: BrokerMessage) -> None: ) delay = parse_val(float, message.labels.get("delay")) - if len(self._task_queues) == 1: - routing_key_name = ( - self._task_queues[0].routing_key or self._task_queues[0].name - ) - else: - routing_key_name = ( - parse_val( - str, - message.labels.get(self._label_for_routing), - ) - or "" - ) + # An explicit routing label always takes precedence, regardless of how + # many queues are in _task_queues. This is important when with_queues() + # has been used to narrow the listening scope to a single queue: without + # this check the len==1 shortcut below would silently override the label + # and route every outgoing message to the worker's own queue. + routing_key_from_label = parse_val( + str, + message.labels.get(self._label_for_routing), + ) + if routing_key_from_label: + routing_key_name = routing_key_from_label if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in { queue.routing_key or queue.name for queue in self._task_queues }: @@ -391,6 +398,14 @@ async def kick(self, message: BrokerMessage) -> None: f"Routing key '{routing_key_name}' is not valid. " f"Check routing keys and queue names in broker queues.", ) + elif len(self._task_queues) == 1: + # Backward-compatible shortcut: single-queue brokers don't need to + # annotate every task with an explicit queue_name label. + routing_key_name = ( + self._task_queues[0].routing_key or self._task_queues[0].name + ) + else: + routing_key_name = "" if delay is None: exchange = await self.write_channel.get_exchange(