diff --git a/taskiq_aio_pika/broker.py b/taskiq_aio_pika/broker.py index 0cc9601..ce44f41 100644 --- a/taskiq_aio_pika/broker.py +++ b/taskiq_aio_pika/broker.py @@ -336,7 +336,15 @@ def with_queues(self, *queues: Queue) -> Self: """ Replace existing queues with new ones. - :param queues: queues to add. + Commonly used in worker entry-point factory functions to narrow which + queue(s) a specific worker process listens to, while the full broker + (with all queues) is still used for publishing via kick(). + + Note: this only affects which queues the worker *listens* to. + Routing at kick() time is still determined by the ``queue_name`` label + on the message (or the single-queue fallback for unlabelled tasks). + + :param queues: queues to listen to. :return: self. """ self._task_queues = list(queues) @@ -372,18 +380,17 @@ async def kick(self, message: BrokerMessage) -> None: ) delay = parse_val(float, message.labels.get("delay")) - if len(self._task_queues) == 1: - routing_key_name = ( - self._task_queues[0].routing_key or self._task_queues[0].name - ) - else: - routing_key_name = ( - parse_val( - str, - message.labels.get(self._label_for_routing), - ) - or "" - ) + # An explicit routing label always takes precedence, regardless of how + # many queues are in _task_queues. This is important when with_queues() + # has been used to narrow the listening scope to a single queue: without + # this check the len==1 shortcut below would silently override the label + # and route every outgoing message to the worker's own queue. + routing_key_from_label = parse_val( + str, + message.labels.get(self._label_for_routing), + ) + if routing_key_from_label: + routing_key_name = routing_key_from_label if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in { queue.routing_key or queue.name for queue in self._task_queues }: @@ -391,6 +398,14 @@ async def kick(self, message: BrokerMessage) -> None: f"Routing key '{routing_key_name}' is not valid. " f"Check routing keys and queue names in broker queues.", ) + elif len(self._task_queues) == 1: + # Backward-compatible shortcut: single-queue brokers don't need to + # annotate every task with an explicit queue_name label. + routing_key_name = ( + self._task_queues[0].routing_key or self._task_queues[0].name + ) + else: + routing_key_name = "" if delay is None: exchange = await self.write_channel.get_exchange(