Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/python/src/examples/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,6 @@ def producer_factory() -> KafkaProducer:
for i in range(production_count):
logger.debug(f"Producing message {i} onto topic {destination_topic}...")
producer.produce(
topic=Topic(destination_topic),
dest=Topic(destination_topic),
payload=KafkaPayload(key=None, value=payload, headers=[]),
)
4 changes: 2 additions & 2 deletions clients/python/src/taskbroker_client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from arroyo.backends.abstract import ProducerFuture
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Topic
from arroyo.types import BrokerValue, Partition, Topic
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation, TaskActivationStatus

TaskHeaders = dict[str, str]
Expand Down Expand Up @@ -38,7 +38,7 @@ class ProducerProtocol(Protocol):
"""Interface for producers that tasks depend on."""

def produce(
self, topic: Topic, payload: KafkaPayload
self, dest: Topic | Partition, payload: KafkaPayload
) -> ProducerFuture[BrokerValue[KafkaPayload]]: ...


Expand Down
8 changes: 4 additions & 4 deletions clients/python/src/taskbroker_client/worker/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from arroyo.backends.abstract import ProducerFuture, SimpleProducerFuture
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Topic
from arroyo.types import BrokerValue, Partition, Topic

from taskbroker_client.constants import TASK_PRODUCER_MAX_PENDING_FUTURES
from taskbroker_client.metrics import MetricsBackend, NoOpMetricsBackend
Expand Down Expand Up @@ -68,7 +68,7 @@ def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]:

def produce(
self,
topic: Topic,
dest: Topic | Partition,
Comment thread
cursor[bot] marked this conversation as resolved.
payload: KafkaPayload,
callbacks: Sequence[Callable[[Future[BrokerValue[KafkaPayload]]], Any]] = [],
) -> None:
Comment thread
bmckerry marked this conversation as resolved.
Expand All @@ -79,12 +79,12 @@ def produce(
to the future via the `callbacks` arg.

Args:
topic: Topic to produce to.
dest: Topic (or specific partition) to produce to.
payload: KafkaPayload to produce.
callbacks: List of Callables to add to the future as done callbacks. The future itself
is the only arg passed to the callback.
"""
future = self._get().produce(topic, payload)
future = self._get().produce(dest, payload)
self.track_future(future)
if callbacks:
# Arroyo producers can return a SimpleProducerFuture,
Expand Down
Loading