diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 27be4588d..e58b8518b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -237,6 +237,9 @@ class KafkaConsumer(six.Iterator): metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of samples used to compute metrics. Default: 30000 + coordinator_not_ready_retry_timeout_ms (int): The timeout used to detect + that the Kafka coordinator is not available. If 'None', the default + behavior of polling indefinitely would be kept. Default: None selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector @@ -316,6 +319,7 @@ class KafkaConsumer(six.Iterator): 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'metric_group_prefix': 'consumer', + 'coordinator_not_ready_retry_timeout_ms': None, 'selector': selectors.DefaultSelector, 'exclude_internal_topics': True, 'sasl_mechanism': None, diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index a30b5a9b8..5ea12cfa0 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -80,6 +80,7 @@ class BaseCoordinator(object): 'group_id': 'kafka-python-default-group', 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, + 'coordinator_not_ready_retry_timeout_ms': None, 'max_poll_interval_ms': 300000, 'retry_backoff_ms': 100, 'api_version': (0, 10, 1), @@ -103,6 +104,10 @@ def __init__(self, client, metrics, **configs): should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 + coordinator_not_ready_retry_timeout_ms (int): The timeout used to + detect that the Kafka coordinator is not available. If 'None', + the default behavior of polling indefinitely would be kept. + Default: None retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. """ @@ -234,10 +239,12 @@ def coordinator(self): else: return self.coordinator_id - def ensure_coordinator_ready(self): + def ensure_coordinator_ready(self, timeout_ms=None): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ + retry_timeout_ms = timeout_ms or self.config['coordinator_not_ready_retry_timeout_ms'] + retry_start_time_in_secs = time.time() with self._client._lock, self._lock: while self.coordinator_unknown(): @@ -255,7 +262,15 @@ def ensure_coordinator_ready(self): if future.failed(): if future.retriable(): - if getattr(future.exception, 'invalid_metadata', False): + if retry_timeout_ms is not None and isinstance( + future.exception, (Errors.NodeNotReadyError, Errors.NoBrokersAvailable)): + remaining_retry_timeout_ms = retry_timeout_ms - ( + time.time() - retry_start_time_in_secs) * 1000 + if remaining_retry_timeout_ms <= 0: + raise future.exception # pylint: disable-msg=raising-bad-type + self._client.poll(timeout_ms=min( + self.config['retry_backoff_ms'], remaining_retry_timeout_ms)) + elif getattr(future.exception, 'invalid_metadata', False): log.debug('Requesting metadata for group coordinator request: %s', future.exception) metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 9c662ce7f..c1e5d828a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -39,7 +39,8 @@ class ConsumerCoordinator(BaseCoordinator): 'retry_backoff_ms': 100, 'api_version': (0, 10, 1), 'exclude_internal_topics': True, - 'metric_group_prefix': 'consumer' + 'metric_group_prefix': 'consumer', + 'coordinator_not_ready_retry_timeout_ms': None } def __init__(self, client, subscription, metrics, **configs): @@ -77,6 +78,10 @@ def __init__(self, client, subscription, metrics, **configs): (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True + coordinator_not_ready_retry_timeout_ms (int): The timeout used to + detect that the Kafka coordinator is not available. If 'None', + the default behavior of polling indefinitely would be kept. + Default: None. """ super(ConsumerCoordinator, self).__init__(client, metrics, **configs)