Skip to content

fix: kick() ignores routing label when _task_queues is narrowed to 1 by with_queues()#50

Closed
NicolasFerec wants to merge 1 commit intotaskiq-python:masterfrom
NicolasFerec:fix/kick-ignores-routing-label-when-single-queue
Closed

fix: kick() ignores routing label when _task_queues is narrowed to 1 by with_queues()#50
NicolasFerec wants to merge 1 commit intotaskiq-python:masterfrom
NicolasFerec:fix/kick-ignores-routing-label-when-single-queue

Conversation

@NicolasFerec
Copy link

Problem

The multi-queue feature introduced in 0.6.0 encourages a pattern where one broker is configured with all queues, and worker entry-point factory functions use with_queues() to narrow which queue each worker process listens to:

broker = AioPikaBroker(
    "amqp://...",
    task_queues=[queue_default, queue_mail, queue_heavy],
)

@broker.task                                    # routes to queue_default
async def default_task(): ...

@broker.task(queue_name="queue_mail")           # routes to queue_mail
async def send_email(): ...

def get_worker_default():
    return broker.with_queues(queue_default)    # worker only listens to queue_default

def get_worker_mail():
    return broker.with_queues(queue_mail)       # worker only listens to queue_mail

Workers are started with:

taskiq worker ... -fsd myapp.broker:get_worker_default
taskiq worker ... -fsd myapp.broker:get_worker_mail

Bug

When default_task runs inside the get_worker_default worker and calls await send_email.kiq(), the message is silently routed to queue_default instead of queue_mail.

The root cause is in kick():

if len(self._task_queues) == 1:
    # ❌ Hardcodes the routing key to the single queue's name,
    #    completely ignoring any queue_name label on the message.
    routing_key_name = (
        self._task_queues[0].routing_key or self._task_queues[0].name
    )
else:
    routing_key_name = (
        parse_val(str, message.labels.get(self._label_for_routing)) or ""
    )

After with_queues(queue_default) narrows _task_queues to 1 item, every outgoing kick() call hits the shortcut and routes to queue_default — the queue_name="queue_mail" label is never read.

On a TOPIC exchange this produces no error; the message is simply delivered to the wrong queue.

Fix

Check the routing label first. Only fall back to the single-queue shortcut when no explicit label is present (preserving full backward compatibility for single-queue brokers that don't annotate their tasks):

routing_key_from_label = parse_val(str, message.labels.get(self._label_for_routing))
if routing_key_from_label:
    routing_key_name = routing_key_from_label
    if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in {
        queue.routing_key or queue.name for queue in self._task_queues
    }:
        raise IncorrectRoutingKeyError(...)
elif len(self._task_queues) == 1:
    # Backward-compatible: single-queue brokers don't need a queue_name label.
    routing_key_name = (
        self._task_queues[0].routing_key or self._task_queues[0].name
    )
else:
    routing_key_name = ""

Also updated the with_queues() docstring to clarify its intended role (narrowing the listening scope only) vs. its unintended side-effect on kick routing that this PR removes.

Impact

  • ✅ Fully backward compatible — brokers with a single queue and no queue_name labels are unaffected (they never set the label, so they always hit the elif len == 1 path)
  • ✅ Fixes cross-queue kiq() calls from inside a narrowed worker
  • ✅ DIRECT exchange validation still present for labelled messages

When with_queues() is used to narrow a broker to a single queue (e.g.
in a worker factory function), kick() hits the `len == 1` shortcut and
hardcodes the routing key to that queue's name, silently ignoring any
explicit queue_name label on the message.

This breaks the common pattern of using a multi-queue broker with
per-worker factory functions, where tasks dispatched from inside a
running task need to route to a different queue:

    broker = AioPikaBroker(..., task_queues=[queue_a, queue_b, queue_c])

    @broker.task(queue_name="queue_b")
    async def mail_task(): ...

    @broker.task  # routes to queue_a by default
    async def default_task():
        await mail_task.kiq()  # silently routed to queue_a, NOT queue_b

    def get_worker_a():
        return broker.with_queues(queue_a)  # narrows _task_queues to 1

Fix: check the routing label first; only fall back to the single-queue
shortcut (backward-compatible) when no explicit label is present.
Copilot AI review requested due to automatic review settings March 3, 2026 09:04
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a routing bug in AioPikaBroker.kick() where an explicit queue_name label was ignored when with_queues() narrowed the broker to a single queue, causing messages to be published to the wrong routing key.

Changes:

  • Prioritize the routing label in kick() and only fall back to the single-queue shortcut when no label is present.
  • Expand with_queues() docstring to describe the worker-narrowing usage pattern and how routing is determined.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +339 to +345
Commonly used in worker entry-point factory functions to narrow which
queue(s) a specific worker process listens to, while the full broker
(with all queues) is still used for publishing via kick().

Note: this only affects which queues the worker *listens* to.
Routing at kick() time is still determined by the ``queue_name`` label
on the message (or the single-queue fallback for unlabelled tasks).
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated with_queues() docstring claims the "full broker (with all queues) is still used for publishing via kick()" and that this method "only affects which queues the worker listens to". In the current implementation with_queues() replaces self._task_queues, which is also used during startup() for queue declaration/binding and in kick() for the single-queue fallback and DIRECT routing validation. Please reword the docstring to match the actual behavior, or (if the intent is truly listen-only) split the internal state so narrowing only affects consumption while publishing/validation still sees the full queue set.

Copilot uses AI. Check for mistakes.
Comment on lines +393 to 396
routing_key_name = routing_key_from_label
if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in {
queue.routing_key or queue.name for queue in self._task_queues
}:
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With ExchangeType.DIRECT, the new label-first behavior will now raise IncorrectRoutingKeyError whenever with_queues() has narrowed _task_queues and a message is labelled for a routing key outside that narrowed set. That prevents cross-queue publishing from a narrowed worker even when the target queue exists/bound elsewhere (and is a behavior change from the previous silent misroute). Consider validating against the broker’s full configured queue set (separate from the listen set), or relaxing/adjusting this validation when _task_queues has been narrowed for listening.

Copilot uses AI. Check for mistakes.
Comment on lines +383 to +408
# An explicit routing label always takes precedence, regardless of how
# many queues are in _task_queues. This is important when with_queues()
# has been used to narrow the listening scope to a single queue: without
# this check the len==1 shortcut below would silently override the label
# and route every outgoing message to the worker's own queue.
routing_key_from_label = parse_val(
str,
message.labels.get(self._label_for_routing),
)
if routing_key_from_label:
routing_key_name = routing_key_from_label
if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in {
queue.routing_key or queue.name for queue in self._task_queues
}:
raise IncorrectRoutingKeyError(
f"Routing key '{routing_key_name}' is not valid. "
f"Check routing keys and queue names in broker queues.",
)
elif len(self._task_queues) == 1:
# Backward-compatible shortcut: single-queue brokers don't need to
# annotate every task with an explicit queue_name label.
routing_key_name = (
self._task_queues[0].routing_key or self._task_queues[0].name
)
else:
routing_key_name = ""
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test currently covers the regression scenario described in the PR: calling kick() (or task .kiq()) from a broker instance that has been narrowed to a single queue via with_queues(), while publishing a message with an explicit queue_name label for a different queue. Adding a routing test that reproduces this (and asserts the message lands in the labelled queue) would prevent this from regressing again.

Copilot uses AI. Check for mistakes.
@danfimov
Copy link
Member

danfimov commented Mar 3, 2026

I think the correct approach here is to use two separate brokers: one (returned by get_worker_default) narrowed via with_queues() for listening a specific queue, and the broker that aware of all queues for publishing.

The current behavior is intentional — _task_queues is used to validate the routing key at kick() time. If the broker only knows about queue_default, it cannot guarantee that queue_name="queue_mail" is valid. This fix would silently allow publishing to an unknown routing key, which is a worse problem than the one being solved.

@NicolasFerec
Copy link
Author

Thanks for the explanation and for taking the time to clarify this.
That makes sense. I’ll close the PR 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants