-
Notifications
You must be signed in to change notification settings - Fork 15
fix: kick() ignores routing label when _task_queues is narrowed to 1 by with_queues() #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -336,7 +336,15 @@ def with_queues(self, *queues: Queue) -> Self: | |
| """ | ||
| Replace existing queues with new ones. | ||
|
|
||
| :param queues: queues to add. | ||
| Commonly used in worker entry-point factory functions to narrow which | ||
| queue(s) a specific worker process listens to, while the full broker | ||
| (with all queues) is still used for publishing via kick(). | ||
|
|
||
| Note: this only affects which queues the worker *listens* to. | ||
| Routing at kick() time is still determined by the ``queue_name`` label | ||
| on the message (or the single-queue fallback for unlabelled tasks). | ||
|
|
||
| :param queues: queues to listen to. | ||
| :return: self. | ||
| """ | ||
| self._task_queues = list(queues) | ||
|
|
@@ -372,25 +380,32 @@ async def kick(self, message: BrokerMessage) -> None: | |
| ) | ||
| delay = parse_val(float, message.labels.get("delay")) | ||
|
|
||
| if len(self._task_queues) == 1: | ||
| routing_key_name = ( | ||
| self._task_queues[0].routing_key or self._task_queues[0].name | ||
| ) | ||
| else: | ||
| routing_key_name = ( | ||
| parse_val( | ||
| str, | ||
| message.labels.get(self._label_for_routing), | ||
| ) | ||
| or "" | ||
| ) | ||
| # An explicit routing label always takes precedence, regardless of how | ||
| # many queues are in _task_queues. This is important when with_queues() | ||
| # has been used to narrow the listening scope to a single queue: without | ||
| # this check the len==1 shortcut below would silently override the label | ||
| # and route every outgoing message to the worker's own queue. | ||
| routing_key_from_label = parse_val( | ||
| str, | ||
| message.labels.get(self._label_for_routing), | ||
| ) | ||
| if routing_key_from_label: | ||
| routing_key_name = routing_key_from_label | ||
| if self._exchange.type == ExchangeType.DIRECT and routing_key_name not in { | ||
| queue.routing_key or queue.name for queue in self._task_queues | ||
| }: | ||
|
Comment on lines
+393
to
396
|
||
| raise IncorrectRoutingKeyError( | ||
| f"Routing key '{routing_key_name}' is not valid. " | ||
| f"Check routing keys and queue names in broker queues.", | ||
| ) | ||
| elif len(self._task_queues) == 1: | ||
| # Backward-compatible shortcut: single-queue brokers don't need to | ||
| # annotate every task with an explicit queue_name label. | ||
| routing_key_name = ( | ||
| self._task_queues[0].routing_key or self._task_queues[0].name | ||
| ) | ||
| else: | ||
| routing_key_name = "" | ||
|
Comment on lines
+383
to
+408
|
||
|
|
||
| if delay is None: | ||
| exchange = await self.write_channel.get_exchange( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The updated
with_queues()docstring claims the "full broker (with all queues) is still used for publishing via kick()" and that this method "only affects which queues the worker listens to". In the current implementationwith_queues()replacesself._task_queues, which is also used duringstartup()for queue declaration/binding and inkick()for the single-queue fallback and DIRECT routing validation. Please reword the docstring to match the actual behavior, or (if the intent is truly listen-only) split the internal state so narrowing only affects consumption while publishing/validation still sees the full queue set.