From d576b277ecf67427ae87a3e69b106c3e85f9ff76 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Mon, 8 Jun 2026 12:44:39 -0400 Subject: [PATCH] fix(producer): TaskProducer can produce to partitions --- clients/python/src/examples/tasks.py | 2 +- clients/python/src/taskbroker_client/types.py | 4 ++-- clients/python/src/taskbroker_client/worker/producer.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/clients/python/src/examples/tasks.py b/clients/python/src/examples/tasks.py index 8fd3ab56..b1f90e5d 100644 --- a/clients/python/src/examples/tasks.py +++ b/clients/python/src/examples/tasks.py @@ -140,6 +140,6 @@ def producer_factory() -> KafkaProducer: for i in range(production_count): logger.debug(f"Producing message {i} onto topic {destination_topic}...") producer.produce( - topic=Topic(destination_topic), + dest=Topic(destination_topic), payload=KafkaPayload(key=None, value=payload, headers=[]), ) diff --git a/clients/python/src/taskbroker_client/types.py b/clients/python/src/taskbroker_client/types.py index 800e8b14..fbc1f4c6 100644 --- a/clients/python/src/taskbroker_client/types.py +++ b/clients/python/src/taskbroker_client/types.py @@ -5,7 +5,7 @@ from arroyo.backends.abstract import ProducerFuture from arroyo.backends.kafka import KafkaPayload -from arroyo.types import BrokerValue, Topic +from arroyo.types import BrokerValue, Partition, Topic from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation, TaskActivationStatus TaskHeaders = dict[str, str] @@ -38,7 +38,7 @@ class ProducerProtocol(Protocol): """Interface for producers that tasks depend on.""" def produce( - self, topic: Topic, payload: KafkaPayload + self, dest: Topic | Partition, payload: KafkaPayload ) -> ProducerFuture[BrokerValue[KafkaPayload]]: ... diff --git a/clients/python/src/taskbroker_client/worker/producer.py b/clients/python/src/taskbroker_client/worker/producer.py index 4b1c0f8d..f3757598 100644 --- a/clients/python/src/taskbroker_client/worker/producer.py +++ b/clients/python/src/taskbroker_client/worker/producer.py @@ -5,7 +5,7 @@ from arroyo.backends.abstract import ProducerFuture, SimpleProducerFuture from arroyo.backends.kafka import KafkaPayload -from arroyo.types import BrokerValue, Topic +from arroyo.types import BrokerValue, Partition, Topic from taskbroker_client.constants import TASK_PRODUCER_MAX_PENDING_FUTURES from taskbroker_client.metrics import MetricsBackend, NoOpMetricsBackend @@ -68,7 +68,7 @@ def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]: def produce( self, - topic: Topic, + dest: Topic | Partition, payload: KafkaPayload, callbacks: Sequence[Callable[[Future[BrokerValue[KafkaPayload]]], Any]] = [], ) -> None: @@ -79,12 +79,12 @@ def produce( to the future via the `callbacks` arg. Args: - topic: Topic to produce to. + dest: Topic (or specific partition) to produce to. payload: KafkaPayload to produce. callbacks: List of Callables to add to the future as done callbacks. The future itself is the only arg passed to the callback. """ - future = self._get().produce(topic, payload) + future = self._get().produce(dest, payload) self.track_future(future) if callbacks: # Arroyo producers can return a SimpleProducerFuture,