1- from __future__ import absolute_import
1+ from __future__ import absolute_import , division
22
33import atexit
44import copy
@@ -538,7 +538,7 @@ def close(self, timeout=None):
538538
539539 def partitions_for (self , topic ):
540540 """Returns set of all known partitions for the topic."""
541- max_wait = self .config ['max_block_ms' ] / 1000.0
541+ max_wait = self .config ['max_block_ms' ] / 1000
542542 return self ._wait_on_metadata (topic , max_wait )
543543
544544 def _max_usable_produce_magic (self ):
@@ -596,19 +596,29 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
596596 assert not (value is None and key is None ), 'Need at least one: key or value'
597597 key_bytes = value_bytes = None
598598 try :
599- self ._wait_on_metadata (topic , self .config ['max_block_ms' ] / 1000.0 )
600-
601- key_bytes = self ._serialize (
602- self .config ['key_serializer' ],
603- topic , key )
604- value_bytes = self ._serialize (
605- self .config ['value_serializer' ],
606- topic , value )
607- assert type (key_bytes ) in (bytes , bytearray , memoryview , type (None ))
608- assert type (value_bytes ) in (bytes , bytearray , memoryview , type (None ))
609-
610- partition = self ._partition (topic , partition , key , value ,
611- key_bytes , value_bytes )
599+ assigned_partition = None
600+ elapsed = 0.0
601+ begin = time .time ()
602+ timeout = self .config ['max_block_ms' ] / 1000
603+ while assigned_partition is None and elapsed < timeout :
604+ elapsed = time .time () - begin
605+ self ._wait_on_metadata (topic , timeout - elapsed )
606+
607+ key_bytes = self ._serialize (
608+ self .config ['key_serializer' ],
609+ topic , key )
610+ value_bytes = self ._serialize (
611+ self .config ['value_serializer' ],
612+ topic , value )
613+ assert type (key_bytes ) in (bytes , bytearray , memoryview , type (None ))
614+ assert type (value_bytes ) in (bytes , bytearray , memoryview , type (None ))
615+
616+ assigned_partition = self ._partition (topic , partition , key , value ,
617+ key_bytes , value_bytes )
618+ if assigned_partition is None :
619+ raise Errors .KafkaTimeoutError ("Failed to assign partition for message after %s secs." % timeout )
620+ else :
621+ partition = assigned_partition
612622
613623 if headers is None :
614624 headers = []
@@ -710,6 +720,10 @@ def _wait_on_metadata(self, topic, max_wait):
710720 if partitions is not None :
711721 return partitions
712722
723+ if elapsed >= max_wait :
724+ raise Errors .KafkaTimeoutError (
725+ "Failed to update metadata after %.1f secs." % (max_wait ,))
726+
713727 if not metadata_event :
714728 metadata_event = threading .Event ()
715729
@@ -720,13 +734,13 @@ def _wait_on_metadata(self, topic, max_wait):
720734 future .add_both (lambda e , * args : e .set (), metadata_event )
721735 self ._sender .wakeup ()
722736 metadata_event .wait (max_wait - elapsed )
723- elapsed = time .time () - begin
724737 if not metadata_event .is_set ():
725738 raise Errors .KafkaTimeoutError (
726739 "Failed to update metadata after %.1f secs." % (max_wait ,))
727740 elif topic in self ._metadata .unauthorized_topics :
728741 raise Errors .TopicAuthorizationFailedError (topic )
729742 else :
743+ elapsed = time .time () - begin
730744 log .debug ("_wait_on_metadata woke after %s secs." , elapsed )
731745
732746 def _serialize (self , f , topic , data ):
@@ -738,16 +752,18 @@ def _serialize(self, f, topic, data):
738752
739753 def _partition (self , topic , partition , key , value ,
740754 serialized_key , serialized_value ):
755+ all_partitions = self ._metadata .partitions_for_topic (topic )
756+ available = self ._metadata .available_partitions_for_topic (topic )
757+ if all_partitions is None or available is None :
758+ return None
741759 if partition is not None :
742760 assert partition >= 0
743- assert partition in self . _metadata . partitions_for_topic ( topic ) , 'Unrecognized partition'
761+ assert partition in all_partitions , 'Unrecognized partition'
744762 return partition
745763
746- all_partitions = sorted (self ._metadata .partitions_for_topic (topic ))
747- available = list (self ._metadata .available_partitions_for_topic (topic ))
748764 return self .config ['partitioner' ](serialized_key ,
749- all_partitions ,
750- available )
765+ sorted ( all_partitions ) ,
766+ list ( available ) )
751767
752768 def metrics (self , raw = False ):
753769 """Get metrics on producer performance.
0 commit comments