fix: kick() ignores routing label when _task_queues is narrowed to 1 by with_queues()#50
Conversation
When with_queues() is used to narrow a broker to a single queue (e.g.
in a worker factory function), kick() hits the `len == 1` shortcut and
hardcodes the routing key to that queue's name, silently ignoring any
explicit queue_name label on the message.
This breaks the common pattern of using a multi-queue broker with
per-worker factory functions, where tasks dispatched from inside a
running task need to route to a different queue:
broker = AioPikaBroker(..., task_queues=[queue_a, queue_b, queue_c])
@broker.task(queue_name="queue_b")
async def mail_task(): ...
@broker.task # routes to queue_a by default
async def default_task():
await mail_task.kiq() # silently routed to queue_a, NOT queue_b
def get_worker_a():
return broker.with_queues(queue_a) # narrows _task_queues to 1
Fix: check the routing label first; only fall back to the single-queue
shortcut (backward-compatible) when no explicit label is present.
There was a problem hiding this comment.
Pull request overview
Fixes a routing bug in AioPikaBroker.kick() where an explicit queue_name label was ignored when with_queues() narrowed the broker to a single queue, causing messages to be published to the wrong routing key.
Changes:
- Prioritize the routing label in
kick()and only fall back to the single-queue shortcut when no label is present. - Expand
with_queues()docstring to describe the worker-narrowing usage pattern and how routing is determined.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Commonly used in worker entry-point factory functions to narrow which | ||
| queue(s) a specific worker process listens to, while the full broker | ||
| (with all queues) is still used for publishing via kick(). | ||
|
|
||
| Note: this only affects which queues the worker *listens* to. | ||
| Routing at kick() time is still determined by the ``queue_name`` label | ||
| on the message (or the single-queue fallback for unlabelled tasks). |
There was a problem hiding this comment.
The updated with_queues() docstring claims the "full broker (with all queues) is still used for publishing via kick()" and that this method "only affects which queues the worker listens to". In the current implementation with_queues() replaces self._task_queues, which is also used during startup() for queue declaration/binding and in kick() for the single-queue fallback and DIRECT routing validation. Please reword the docstring to match the actual behavior, or (if the intent is truly listen-only) split the internal state so narrowing only affects consumption while publishing/validation still sees the full queue set.
| routing_key_name = routing_key_from_label | ||
| if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in { | ||
| queue.routing_key or queue.name for queue in self._task_queues | ||
| }: |
There was a problem hiding this comment.
With ExchangeType.DIRECT, the new label-first behavior will now raise IncorrectRoutingKeyError whenever with_queues() has narrowed _task_queues and a message is labelled for a routing key outside that narrowed set. That prevents cross-queue publishing from a narrowed worker even when the target queue exists/bound elsewhere (and is a behavior change from the previous silent misroute). Consider validating against the broker’s full configured queue set (separate from the listen set), or relaxing/adjusting this validation when _task_queues has been narrowed for listening.
| # An explicit routing label always takes precedence, regardless of how | ||
| # many queues are in _task_queues. This is important when with_queues() | ||
| # has been used to narrow the listening scope to a single queue: without | ||
| # this check the len==1 shortcut below would silently override the label | ||
| # and route every outgoing message to the worker's own queue. | ||
| routing_key_from_label = parse_val( | ||
| str, | ||
| message.labels.get(self._label_for_routing), | ||
| ) | ||
| if routing_key_from_label: | ||
| routing_key_name = routing_key_from_label | ||
| if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in { | ||
| queue.routing_key or queue.name for queue in self._task_queues | ||
| }: | ||
| raise IncorrectRoutingKeyError( | ||
| f"Routing key '{routing_key_name}' is not valid. " | ||
| f"Check routing keys and queue names in broker queues.", | ||
| ) | ||
| elif len(self._task_queues) == 1: | ||
| # Backward-compatible shortcut: single-queue brokers don't need to | ||
| # annotate every task with an explicit queue_name label. | ||
| routing_key_name = ( | ||
| self._task_queues[0].routing_key or self._task_queues[0].name | ||
| ) | ||
| else: | ||
| routing_key_name = "" |
There was a problem hiding this comment.
No test currently covers the regression scenario described in the PR: calling kick() (or task .kiq()) from a broker instance that has been narrowed to a single queue via with_queues(), while publishing a message with an explicit queue_name label for a different queue. Adding a routing test that reproduces this (and asserts the message lands in the labelled queue) would prevent this from regressing again.
|
I think the correct approach here is to use two separate brokers: one (returned by The current behavior is intentional — |
|
Thanks for the explanation and for taking the time to clarify this. |
Problem
The multi-queue feature introduced in 0.6.0 encourages a pattern where one broker is configured with all queues, and worker entry-point factory functions use
with_queues()to narrow which queue each worker process listens to:Workers are started with:
Bug
When
default_taskruns inside theget_worker_defaultworker and callsawait send_email.kiq(), the message is silently routed toqueue_defaultinstead ofqueue_mail.The root cause is in
kick():After
with_queues(queue_default)narrows_task_queuesto 1 item, every outgoingkick()call hits the shortcut and routes toqueue_default— thequeue_name="queue_mail"label is never read.On a TOPIC exchange this produces no error; the message is simply delivered to the wrong queue.
Fix
Check the routing label first. Only fall back to the single-queue shortcut when no explicit label is present (preserving full backward compatibility for single-queue brokers that don't annotate their tasks):
Also updated the
with_queues()docstring to clarify its intended role (narrowing the listening scope only) vs. its unintended side-effect on kick routing that this PR removes.Impact
queue_namelabels are unaffected (they never set the label, so they always hit theelif len == 1path)kiq()calls from inside a narrowed worker