Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import absolute_import
from __future__ import absolute_import, division

import atexit
import copy
Expand Down Expand Up @@ -538,7 +538,7 @@ def close(self, timeout=None):

def partitions_for(self, topic):
"""Returns set of all known partitions for the topic."""
max_wait = self.config['max_block_ms'] / 1000.0
max_wait = self.config['max_block_ms'] / 1000
return self._wait_on_metadata(topic, max_wait)

def _max_usable_produce_magic(self):
Expand Down Expand Up @@ -596,19 +596,29 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
assert not (value is None and key is None), 'Need at least one: key or value'
key_bytes = value_bytes = None
try:
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)

key_bytes = self._serialize(
self.config['key_serializer'],
topic, key)
value_bytes = self._serialize(
self.config['value_serializer'],
topic, value)
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))

partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
assigned_partition = None
elapsed = 0.0
begin = time.time()
timeout = self.config['max_block_ms'] / 1000
while assigned_partition is None and elapsed < timeout:
elapsed = time.time() - begin
self._wait_on_metadata(topic, timeout - elapsed)

key_bytes = self._serialize(
self.config['key_serializer'],
topic, key)
value_bytes = self._serialize(
self.config['value_serializer'],
topic, value)
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))

assigned_partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
if assigned_partition is None:
raise Errors.KafkaTimeoutError("Failed to assign partition for message after %s secs." % timeout)
else:
partition = assigned_partition

if headers is None:
headers = []
Expand Down Expand Up @@ -710,6 +720,10 @@ def _wait_on_metadata(self, topic, max_wait):
if partitions is not None:
return partitions

if elapsed >= max_wait:
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %.1f secs." % (max_wait,))

if not metadata_event:
metadata_event = threading.Event()

Expand All @@ -720,13 +734,13 @@ def _wait_on_metadata(self, topic, max_wait):
future.add_both(lambda e, *args: e.set(), metadata_event)
self._sender.wakeup()
metadata_event.wait(max_wait - elapsed)
elapsed = time.time() - begin
if not metadata_event.is_set():
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %.1f secs." % (max_wait,))
elif topic in self._metadata.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
else:
elapsed = time.time() - begin
log.debug("_wait_on_metadata woke after %s secs.", elapsed)

def _serialize(self, f, topic, data):
Expand All @@ -738,16 +752,18 @@ def _serialize(self, f, topic, data):

def _partition(self, topic, partition, key, value,
serialized_key, serialized_value):
all_partitions = self._metadata.partitions_for_topic(topic)
available = self._metadata.available_partitions_for_topic(topic)
if all_partitions is None or available is None:
return None
if partition is not None:
assert partition >= 0
assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition'
assert partition in all_partitions, 'Unrecognized partition'
return partition

all_partitions = sorted(self._metadata.partitions_for_topic(topic))
available = list(self._metadata.available_partitions_for_topic(topic))
return self.config['partitioner'](serialized_key,
all_partitions,
available)
sorted(all_partitions),
list(available))

def metrics(self, raw=False):
"""Get metrics on producer performance.
Expand Down