diff --git a/kafka/client_async.py b/kafka/client_async.py index c8a8ca4ad..c04130c82 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -636,11 +636,14 @@ def poll(self, timeout_ms=None, future=None): Returns: list: responses received (can be empty) """ - if timeout_ms is None: - timeout_ms = self.config['request_timeout_ms'] - elif not isinstance(timeout_ms, (int, float)): + if not isinstance(timeout_ms, (int, float, type(None))): raise TypeError('Invalid type for timeout: %s' % type(timeout_ms)) + begin = time.time() + if timeout_ms is not None: + timeout_at = begin + (timeout_ms / 1000) + else: + timeout_at = begin + (self.config['request_timeout_ms'] / 1000) # Loop for futures, break after first loop if None responses = [] while True: @@ -665,11 +668,12 @@ def poll(self, timeout_ms=None, future=None): if future is not None and future.is_done: timeout = 0 else: + user_timeout_ms = 1000 * max(0, timeout_at - time.time()) idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() request_timeout_ms = self._next_ifr_request_timeout_ms() - log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms) + log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", user_timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms) timeout = min( - timeout_ms, + user_timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms) @@ -683,7 +687,11 @@ def poll(self, timeout_ms=None, future=None): # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done - if future is None or future.is_done: + if future is None: + break + elif future.is_done: + break + elif timeout_ms is not None and time.time() >= timeout_at: break return responses diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 2600d7f69..2179e19fc 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -19,6 +19,7 @@ from kafka.record import MemoryRecords from kafka.serializer import Deserializer from kafka.structs import TopicPartition, OffsetAndMetadata, OffsetAndTimestamp +from kafka.util import timeout_ms_fn log = logging.getLogger(__name__) @@ -266,19 +267,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=None): if not timestamps: return {} - elapsed = 0.0 # noqa: F841 - begin = time.time() - def inner_timeout_ms(fallback=None): - if timeout_ms is None: - return fallback - elapsed = (time.time() - begin) * 1000 - if elapsed >= timeout_ms: - raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator') - ret = max(0, timeout_ms - elapsed) - if fallback is not None: - return min(ret, fallback) - return ret - + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to find coordinator') timestamps = copy.copy(timestamps) while True: if not timestamps: @@ -287,6 +276,10 @@ def inner_timeout_ms(fallback=None): future = self._send_list_offsets_requests(timestamps) self._client.poll(future=future, timeout_ms=inner_timeout_ms()) + # Timeout w/o future completion + if not future.is_done: + break + if future.succeeded(): return future.value if not future.retriable(): diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 284d52f04..ce66c9606 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -695,7 +695,8 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): dict: Map of topic to list of records (may be empty). """ begin = time.time() - self._coordinator.poll(timeout_ms=timeout_ms) + if not self._coordinator.poll(timeout_ms=timeout_ms): + return {} # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for @@ -1162,7 +1163,8 @@ def inner_poll_ms(): while time.time() < self._consumer_timeout: - self._coordinator.poll(timeout_ms=inner_poll_ms()) + if not self._coordinator.poll(timeout_ms=inner_poll_ms()): + continue # Fetch offsets for any subscribed partitions that we arent tracking yet if not self._subscription.has_all_fetch_positions(): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index aa8d05e31..eb4bf7265 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -16,6 +16,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.find_coordinator import FindCoordinatorRequest from kafka.protocol.group import HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID +from kafka.util import timeout_ms_fn log = logging.getLogger('kafka.coordinator') @@ -243,19 +244,7 @@ def ensure_coordinator_ready(self, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms is not None """ - elapsed = 0.0 # noqa: F841 - begin = time.time() - def inner_timeout_ms(fallback=None): - if timeout_ms is None: - return fallback - elapsed = (time.time() - begin) * 1000 - if elapsed >= timeout_ms: - raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator') - ret = max(0, timeout_ms - elapsed) - if fallback is not None: - return min(ret, fallback) - return ret - + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to find group coordinator') with self._client._lock, self._lock: while self.coordinator_unknown(): @@ -271,12 +260,17 @@ def inner_timeout_ms(fallback=None): future = self.lookup_coordinator() self._client.poll(future=future, timeout_ms=inner_timeout_ms()) + if not future.is_done: + raise Errors.KafkaTimeoutError() + if future.failed(): if future.retriable(): if getattr(future.exception, 'invalid_metadata', False): log.debug('Requesting metadata for group coordinator request: %s', future.exception) metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update, timeout_ms=inner_timeout_ms()) + if not metadata_update.is_done: + raise Errors.KafkaTimeoutError() else: time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000) else: @@ -349,7 +343,6 @@ def _handle_join_success(self, member_assignment_bytes): log.info("Successfully joined group %s with generation %s", self.group_id, self._generation.generation_id) self.state = MemberState.STABLE - self.rejoin_needed = False if self._heartbeat_thread: self._heartbeat_thread.enable() @@ -366,23 +359,11 @@ def ensure_active_group(self, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms is not None """ + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to join consumer group') with self._client._lock, self._lock: if self._heartbeat_thread is None: self._start_heartbeat_thread() - elapsed = 0.0 # noqa: F841 - begin = time.time() - def inner_timeout_ms(fallback=None): - if timeout_ms is None: - return fallback - elapsed = (time.time() - begin) * 1000 - if elapsed >= timeout_ms: - raise Errors.KafkaTimeoutError('Timeout attempting to find coordinator') - ret = max(0, timeout_ms - elapsed) - if fallback is not None: - return min(ret, fallback) - return ret - while self.need_rejoin() or self._rejoin_incomplete(): self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms()) @@ -440,6 +421,9 @@ def inner_timeout_ms(fallback=None): self._client.poll(future=future, timeout_ms=inner_timeout_ms()) + if not future.is_done: + raise Errors.KafkaTimeoutError() + if future.succeeded(): self._on_join_complete(self._generation.generation_id, self._generation.member_id, @@ -447,6 +431,7 @@ def inner_timeout_ms(fallback=None): future.value) self.join_future = None self.rejoining = False + self.rejoin_needed = False else: self.join_future = None diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 73cf25297..92c84024c 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -19,7 +19,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest from kafka.structs import OffsetAndMetadata, TopicPartition -from kafka.util import WeakMethod +from kafka.util import timeout_ms_fn, WeakMethod log = logging.getLogger(__name__) @@ -267,18 +267,9 @@ def poll(self, timeout_ms=None): periodic offset commits if they are enabled. """ if self.group_id is None: - return - - elapsed = 0.0 # noqa: F841 - begin = time.time() - def inner_timeout_ms(): - if timeout_ms is None: - return None - elapsed = (time.time() - begin) * 1000 - if elapsed >= timeout_ms: - raise Errors.KafkaTimeoutError() - return max(0, timeout_ms - elapsed) + return True + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout in coordinator.poll') try: self._invoke_completed_offset_commit_callbacks() self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms()) @@ -305,9 +296,10 @@ def inner_timeout_ms(): self.poll_heartbeat() self._maybe_auto_commit_offsets_async() + return True except Errors.KafkaTimeoutError: - return + return False def time_to_next_poll(self): """Return seconds (float) remaining until :meth:`.poll` should be called again""" diff --git a/kafka/util.py b/kafka/util.py index e31d99305..6d061193a 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,8 +1,10 @@ from __future__ import absolute_import import binascii +import time import weakref +from kafka.errors import KafkaTimeoutError from kafka.vendor import six @@ -22,6 +24,22 @@ def crc32(data): from binascii import crc32 +def timeout_ms_fn(timeout_ms, error_message): + elapsed = 0.0 # noqa: F841 + begin = time.time() + def inner_timeout_ms(fallback=None): + if timeout_ms is None: + return fallback + elapsed = (time.time() - begin) * 1000 + if elapsed >= timeout_ms: + raise KafkaTimeoutError(error_message) + ret = max(0, timeout_ms - elapsed) + if fallback is not None: + return min(ret, fallback) + return ret + return inner_timeout_ms + + class WeakMethod(object): """ Callable that weakly references a method and the object it is bound to. It diff --git a/test/fixtures.py b/test/fixtures.py index f8e2aa746..c9f138ef5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -14,7 +14,7 @@ from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer -from kafka.errors import InvalidReplicationFactorError +from kafka.errors import InvalidReplicationFactorError, KafkaTimeoutError from kafka.protocol.admin import CreateTopicsRequest from kafka.protocol.metadata import MetadataRequest from test.testutil import env_kafka_version, random_string @@ -555,6 +555,8 @@ def _failure(error): future.error_on_callbacks = True future.add_errback(_failure) self._client.poll(future=future, timeout_ms=timeout) + if not future.is_done: + raise KafkaTimeoutError() return future.value except Exception as exc: time.sleep(1) diff --git a/test/test_client_async.py b/test/test_client_async.py index 015f39365..8582d8fb7 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -228,6 +228,9 @@ def test_poll(mocker): ifr_request_timeout = mocker.patch.object(KafkaClient, '_next_ifr_request_timeout_ms') _poll = mocker.patch.object(KafkaClient, '_poll') cli = KafkaClient(api_version=(0, 9)) + now = time.time() + t = mocker.patch('time.time') + t.return_value = now # metadata timeout wins ifr_request_timeout.return_value = float('inf') @@ -346,17 +349,16 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): t.return_value = now # first poll attempts connection - client.poll(timeout_ms=12345678) - client._poll.assert_called_with(12345.678) + client.poll() + client._poll.assert_called() client._init_connect.assert_called_once_with('foobar') # poll while connecting should not attempt a new connection client._connecting.add('foobar') client._can_connect.reset_mock() - client.poll(timeout_ms=12345678) - client._poll.assert_called_with(12345.678) + client.poll() + client._poll.assert_called() assert not client._can_connect.called - assert not client._metadata_refresh_in_progress diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index bc04eed48..7d22346d0 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -57,7 +57,7 @@ def consumer_thread(i): bootstrap_servers=connect_str, group_id=group_id, client_id="consumer_thread-%s" % i, - api_version_auto_timeout_ms=30000, + api_version_auto_timeout_ms=5000, heartbeat_interval_ms=500) while not stop[i].is_set(): for tp, records in six.itervalues(consumers[i].poll(timeout_ms=200)): @@ -73,16 +73,18 @@ def consumer_thread(i): threads[i] = t try: - timeout = time.time() + 35 + timeout = time.time() + 15 while True: for c in range(num_consumers): # Verify all consumers have been created if c not in consumers: + logging.info('%s not in consumers list yet...', c) break # Verify all consumers have an assignment elif not consumers[c].assignment(): + logging.info('Consumer %s does not have assignment yet...', c) break # If all consumers exist and have an assignment @@ -96,8 +98,7 @@ def consumer_thread(i): # New generation assignment is not complete until # coordinator.rejoining = False - rejoining = any([consumer._coordinator.rejoining - for consumer in list(consumers.values())]) + rejoining = set([c for c, consumer in list(consumers.items()) if consumer._coordinator.rejoining]) if not rejoining and len(generations) == 1: for c, consumer in list(consumers.items()): @@ -110,6 +111,7 @@ def consumer_thread(i): logging.info('Rejoining: %s, generations: %s', rejoining, generations) time.sleep(1) assert time.time() < timeout, "timeout waiting for assignments" + time.sleep(1) logging.info('Group stabilized; verifying assignment') group_assignment = set() @@ -157,7 +159,6 @@ def test_heartbeat_thread(kafka_broker, topic): consumer = KafkaConsumer(topic, bootstrap_servers=get_connect_str(kafka_broker), group_id=group_id, - api_version_auto_timeout_ms=30000, heartbeat_interval_ms=500) # poll until we have joined group / have assignment