From 0b9d9e849937169e4c3e84358563609260f384f0 Mon Sep 17 00:00:00 2001 From: zembunia Date: Mon, 11 Sep 2017 09:22:27 -0400 Subject: [PATCH 1/2] Config parameter 'node_not_ready_retry_timeout_ms' --- kafka/consumer/group.py | 4 ++++ kafka/coordinator/base.py | 15 +++++++++++++-- kafka/coordinator/consumer.py | 8 ++++++-- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index b7fbd8395..e8d42b536 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -205,6 +205,9 @@ class KafkaConsumer(six.Iterator): metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of samples used to compute metrics. Default: 30000 + node_not_ready_retry_timeout_ms (int): The timeout used to detect + the broker not being available so that NodeNotReadyError is raised. + Default: None selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector @@ -269,6 +272,7 @@ class KafkaConsumer(six.Iterator): 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'metric_group_prefix': 'consumer', + 'node_not_ready_retry_timeout_ms': None, 'selector': selectors.DefaultSelector, 'exclude_internal_topics': True, 'sasl_mechanism': None, diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index af0936c9d..e9f3986fb 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -53,6 +53,7 @@ class BaseCoordinator(object): 'group_id': 'kafka-python-default-group', 'session_timeout_ms': 30000, 'heartbeat_interval_ms': 3000, + 'node_not_ready_retry_timeout_ms': None, 'retry_backoff_ms': 100, 'api_version': (0, 9), 'metric_group_prefix': '', @@ -65,7 +66,7 @@ def __init__(self, client, metrics, **configs): partition assignment (if enabled), and to use for fetching and committing offsets. Default: 'kafka-python-default-group' session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 + using Kafka's group management facilities. Default: 30000 heartbeat_interval_ms (int): The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -75,6 +76,9 @@ def __init__(self, client, metrics, **configs): should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 + node_not_ready_retry_timeout_ms (int): The timeout used to detect + the broker not being available so that NodeNotReadyError is raised. + Default: None retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. """ @@ -199,6 +203,8 @@ def ensure_coordinator_known(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ + node_not_ready_retry_timeout_ms = self.config['node_not_ready_retry_timeout_ms'] + node_not_ready_retry_start_time = time.time() while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -215,7 +221,12 @@ def ensure_coordinator_known(self): if future.failed(): if future.retriable(): - if getattr(future.exception, 'invalid_metadata', False): + if node_not_ready_retry_timeout_ms is not None and isinstance(future.exception, Errors.NodeNotReadyError): + self._client.poll(timeout_ms=node_not_ready_retry_timeout_ms) + node_not_ready_retry_timeout_ms -= (time.time() - node_not_ready_retry_start_time) * 1000 + if node_not_ready_retry_timeout_ms <= 0: + raise future.exception # pylint: disable-msg=raising-bad-type + elif getattr(future.exception, 'invalid_metadata', False): log.debug('Requesting metadata for group coordinator request: %s', future.exception) metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 123699f24..addae0928 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -37,7 +37,8 @@ class ConsumerCoordinator(BaseCoordinator): 'retry_backoff_ms': 100, 'api_version': (0, 9), 'exclude_internal_topics': True, - 'metric_group_prefix': 'consumer' + 'metric_group_prefix': 'consumer', + 'node_not_ready_retry_timeout_ms': None } def __init__(self, client, subscription, metrics, **configs): @@ -68,13 +69,16 @@ def __init__(self, client, subscription, metrics, **configs): adjusted even lower to control the expected time for normal rebalances. Default: 3000 session_timeout_ms (int): The timeout used to detect failures when - using Kafka's group managementment facilities. Default: 30000 + using Kafka's group management facilities. Default: 30000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. exclude_internal_topics (bool): Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True + node_not_ready_retry_timeout_ms (int): The timeout used to detect + the broker not being available so that NodeNotReadyError is raised. + Default: None """ super(ConsumerCoordinator, self).__init__(client, metrics, **configs) From a391cc9fc507b15f15e377a549b6596533f700bb Mon Sep 17 00:00:00 2001 From: zembunia Date: Mon, 12 Feb 2018 19:34:06 +0300 Subject: [PATCH 2/2] coordinator_not_ready_retry_timeout_ms instead of node_not_ready_retry_timeout_ms --- kafka/consumer/group.py | 8 ++++---- kafka/coordinator/base.py | 24 ++++++++++++++---------- kafka/coordinator/consumer.py | 9 +++++---- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 27bbadd42..28307999f 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -220,9 +220,9 @@ class KafkaConsumer(six.Iterator): metrics. Default: 2 metrics_sample_window_ms (int): The maximum age in milliseconds of samples used to compute metrics. Default: 30000 - node_not_ready_retry_timeout_ms (int): The timeout used to detect - the broker not being available so that NodeNotReadyError is raised. - Default: None + coordinator_not_ready_retry_timeout_ms (int): The timeout used to detect + that the Kafka coordinator is not available. If 'None', the default + behavior of polling indefinitely would be kept. Default: None selector (selectors.BaseSelector): Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector @@ -292,7 +292,7 @@ class KafkaConsumer(six.Iterator): 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'metric_group_prefix': 'consumer', - 'node_not_ready_retry_timeout_ms': None, + 'coordinator_not_ready_retry_timeout_ms': None, 'selector': selectors.DefaultSelector, 'exclude_internal_topics': True, 'sasl_mechanism': None, diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 6313270e9..2bcb75c5b 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -85,7 +85,7 @@ class BaseCoordinator(object): 'group_id': 'kafka-python-default-group', 'session_timeout_ms': 10000, 'heartbeat_interval_ms': 3000, - 'node_not_ready_retry_timeout_ms': None, + 'coordinator_not_ready_retry_timeout_ms': None, 'max_poll_interval_ms': 300000, 'retry_backoff_ms': 100, 'api_version': (0, 10, 1), @@ -109,8 +109,9 @@ def __init__(self, client, metrics, **configs): should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. Default: 3000 - node_not_ready_retry_timeout_ms (int): The timeout used to detect - the broker not being available so that NodeNotReadyError is raised. + coordinator_not_ready_retry_timeout_ms (int): The timeout used to + detect that the Kafka coordinator is not available. If 'None', + the default behavior of polling indefinitely would be kept. Default: None retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. @@ -244,12 +245,12 @@ def coordinator(self): else: return self.coordinator_id - def ensure_coordinator_ready(self): + def ensure_coordinator_ready(self, timeout_ms=None): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - node_not_ready_retry_timeout_ms = self.config['node_not_ready_retry_timeout_ms'] - node_not_ready_retry_start_time = time.time() + retry_timeout_ms = timeout_ms or self.config['coordinator_not_ready_retry_timeout_ms'] + retry_start_time_in_secs = time.time() with self._lock: while self.coordinator_unknown(): @@ -267,11 +268,14 @@ def ensure_coordinator_ready(self): if future.failed(): if future.retriable(): - if node_not_ready_retry_timeout_ms is not None and isinstance(future.exception, Errors.NodeNotReadyError): - self._client.poll(timeout_ms=node_not_ready_retry_timeout_ms) - node_not_ready_retry_timeout_ms -= (time.time() - node_not_ready_retry_start_time) * 1000 - if node_not_ready_retry_timeout_ms <= 0: + if retry_timeout_ms is not None and isinstance( + future.exception, (Errors.NodeNotReadyError, Errors.NoBrokersAvailable)): + remaining_retry_timeout_ms = retry_timeout_ms - ( + time.time() - retry_start_time_in_secs) * 1000 + if remaining_retry_timeout_ms <= 0: raise future.exception # pylint: disable-msg=raising-bad-type + self._client.poll(timeout_ms=min( + self.config['retry_backoff_ms'], remaining_retry_timeout_ms)) elif getattr(future.exception, 'invalid_metadata', False): log.debug('Requesting metadata for group coordinator request: %s', future.exception) metadata_update = self._client.cluster.request_update() diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index f090ef651..02e47d61a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -38,7 +38,7 @@ class ConsumerCoordinator(BaseCoordinator): 'api_version': (0, 10, 1), 'exclude_internal_topics': True, 'metric_group_prefix': 'consumer', - 'node_not_ready_retry_timeout_ms': None + 'coordinator_not_ready_retry_timeout_ms': None } def __init__(self, client, subscription, metrics, **configs): @@ -76,9 +76,10 @@ def __init__(self, client, subscription, metrics, **configs): (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Requires 0.10+. Default: True - node_not_ready_retry_timeout_ms (int): The timeout used to detect - the broker not being available so that NodeNotReadyError is raised. - Default: None + coordinator_not_ready_retry_timeout_ms (int): The timeout used to + detect that the Kafka coordinator is not available. If 'None', + the default behavior of polling indefinitely would be kept. + Default: None. """ super(ConsumerCoordinator, self).__init__(client, metrics, **configs)