From a037c7ef5a9be06605b5f7082ea49c8006cb2ca4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Mar 2025 18:33:10 -0700 Subject: [PATCH] Add optional timeout_ms kwarg to remaining consumer/coordinator methods --- kafka/consumer/fetcher.py | 26 +++++++---- kafka/consumer/group.py | 84 +++++++++++++++++++---------------- kafka/coordinator/base.py | 5 ++- kafka/coordinator/consumer.py | 45 ++++++++++++------- kafka/util.py | 7 ++- test/test_fetcher.py | 4 +- 6 files changed, 101 insertions(+), 70 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e5ae64c91..7527a1f39 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -135,17 +135,21 @@ def send_fetches(self): self._clean_done_fetch_futures() return futures - def reset_offsets_if_needed(self, partitions): + def reset_offsets_if_needed(self, partitions, timeout_ms=None): """Lookup and set offsets for any partitions which are awaiting an explicit reset. Arguments: partitions (set of TopicPartitions): the partitions to reset + + Raises: + KafkaTimeoutError if timeout_ms provided """ + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout resetting offsets') for tp in partitions: # TODO: If there are several offsets to reset, we could submit offset requests in parallel if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp): - self._reset_offset(tp) + self._reset_offset(tp, timeout_ms=inner_timeout_ms()) def _clean_done_fetch_futures(self): while True: @@ -160,7 +164,7 @@ def in_flight_fetches(self): self._clean_done_fetch_futures() return bool(self._fetch_futures) - def update_fetch_positions(self, partitions): + def update_fetch_positions(self, partitions, timeout_ms=None): """Update the fetch positions for the provided partitions. Arguments: @@ -169,7 +173,9 @@ def update_fetch_positions(self, partitions): Raises: NoOffsetForPartitionError: if no offset is stored for a given partition and no reset policy is available + KafkaTimeoutError if timeout_ms provided. """ + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions') # reset the fetch position to the committed position for tp in partitions: if not self._subscriptions.is_assigned(tp): @@ -182,12 +188,12 @@ def update_fetch_positions(self, partitions): continue if self._subscriptions.is_offset_reset_needed(tp): - self._reset_offset(tp) + self._reset_offset(tp, timeout_ms=inner_timeout_ms()) elif self._subscriptions.assignment[tp].committed is None: # there's no committed position, so we need to reset with the # default strategy self._subscriptions.need_offset_reset(tp) - self._reset_offset(tp) + self._reset_offset(tp, timeout_ms=inner_timeout_ms()) else: committed = self._subscriptions.assignment[tp].committed.offset log.debug("Resetting offset for partition %s to the committed" @@ -216,7 +222,7 @@ def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): offsets[tp] = offsets[tp].offset return offsets - def _reset_offset(self, partition): + def _reset_offset(self, partition, timeout_ms=None): """Reset offsets for the given partition using the offset reset strategy. Arguments: @@ -224,6 +230,7 @@ def _reset_offset(self, partition): Raises: NoOffsetForPartitionError: if no offset reset strategy is defined + KafkaTimeoutError if timeout_ms provided """ timestamp = self._subscriptions.assignment[partition].reset_strategy if timestamp is OffsetResetStrategy.EARLIEST: @@ -235,7 +242,7 @@ def _reset_offset(self, partition): log.debug("Resetting offset for partition %s to %s offset.", partition, strategy) - offsets = self._retrieve_offsets({partition: timestamp}) + offsets = self._retrieve_offsets({partition: timestamp}, timeout_ms=timeout_ms) if partition in offsets: offset = offsets[partition].offset @@ -263,11 +270,14 @@ def _retrieve_offsets(self, timestamps, timeout_ms=None): retrieved offset, timestamp, and leader_epoch. If offset does not exist for the provided timestamp, that partition will be missing from this mapping. + + Raises: + KafkaTimeoutError if timeout_ms provided """ if not timestamps: return {} - inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to find coordinator') + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout fetching offsets') timestamps = copy.copy(timestamps) while True: if not timestamps: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 071371b98..3fccf4755 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -5,7 +5,7 @@ import socket import time -from kafka.errors import KafkaConfigurationError, UnsupportedVersionError +from kafka.errors import KafkaConfigurationError, KafkaTimeoutError, UnsupportedVersionError from kafka.vendor import six @@ -18,6 +18,7 @@ from kafka.metrics import MetricConfig, Metrics from kafka.protocol.list_offsets import OffsetResetStrategy from kafka.structs import OffsetAndMetadata, TopicPartition +from kafka.util import timeout_ms_fn from kafka.version import __version__ log = logging.getLogger(__name__) @@ -521,7 +522,7 @@ def commit_async(self, offsets=None, callback=None): offsets, callback=callback) return future - def commit(self, offsets=None): + def commit(self, offsets=None, timeout_ms=None): """Commit offsets to kafka, blocking until success or error. This commits offsets only to Kafka. The offsets committed using this API @@ -545,9 +546,9 @@ def commit(self, offsets=None): assert self.config['group_id'] is not None, 'Requires group_id' if offsets is None: offsets = self._subscription.all_consumed_offsets() - self._coordinator.commit_offsets_sync(offsets) + self._coordinator.commit_offsets_sync(offsets, timeout_ms=timeout_ms) - def committed(self, partition, metadata=False): + def committed(self, partition, metadata=False, timeout_ms=None): """Get the last committed offset for the given partition. This offset will be used as the position for the consumer @@ -564,6 +565,9 @@ def committed(self, partition, metadata=False): Returns: The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit. + + Raises: + KafkaTimeoutError if timeout_ms provided """ assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1' assert self.config['group_id'] is not None, 'Requires group_id' @@ -572,10 +576,10 @@ def committed(self, partition, metadata=False): if self._subscription.is_assigned(partition): committed = self._subscription.assignment[partition].committed if committed is None: - self._coordinator.refresh_committed_offsets_if_needed() + self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms) committed = self._subscription.assignment[partition].committed else: - commit_map = self._coordinator.fetch_committed_offsets([partition]) + commit_map = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms) if partition in commit_map: committed = commit_map[partition] else: @@ -670,17 +674,13 @@ def poll(self, timeout_ms=0, max_records=None, update_offsets=True): assert not self._closed, 'KafkaConsumer is closed' # Poll for new data until the timeout expires - start = time.time() - remaining = timeout_ms + inner_timeout_ms = timeout_ms_fn(timeout_ms, None) while not self._closed: - records = self._poll_once(remaining, max_records, update_offsets=update_offsets) + records = self._poll_once(inner_timeout_ms(), max_records, update_offsets=update_offsets) if records: return records - elapsed_ms = (time.time() - start) * 1000 - remaining = timeout_ms - elapsed_ms - - if remaining <= 0: + if inner_timeout_ms() <= 0: break return {} @@ -695,14 +695,14 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): Returns: dict: Map of topic to list of records (may be empty). """ - begin = time.time() - if not self._coordinator.poll(timeout_ms=timeout_ms): + inner_timeout_ms = timeout_ms_fn(timeout_ms, None) + if not self._coordinator.poll(timeout_ms=inner_timeout_ms()): return {} # Fetch positions if we have partitions we're subscribed to that we # don't know the offset for if not self._subscription.has_all_fetch_positions(): - self._update_fetch_positions(self._subscription.missing_fetch_positions()) + self._update_fetch_positions(self._subscription.missing_fetch_positions(), timeout_ms=inner_timeout_ms()) # If data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately @@ -723,9 +723,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): if len(futures): self._client.poll(timeout_ms=0) - timeout_ms -= (time.time() - begin) * 1000 - timeout_ms = max(0, min(timeout_ms, self._coordinator.time_to_next_poll() * 1000)) - self._client.poll(timeout_ms=timeout_ms) + self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000)) # after the long poll, we should check whether the group needs to rebalance # prior to returning data so that the group can stabilize faster if self._coordinator.need_rejoin(): @@ -734,7 +732,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) return records - def position(self, partition): + def position(self, partition, timeout_ms=None): """Get the offset of the next record that will be fetched Arguments: @@ -748,7 +746,7 @@ def position(self, partition): assert self._subscription.is_assigned(partition), 'Partition is not assigned' position = self._subscription.assignment[partition].position if position is None: - self._update_fetch_positions([partition]) + self._update_fetch_positions([partition], timeout_ms=timeout_ms) position = self._subscription.assignment[partition].position return position.offset if position else None @@ -1103,7 +1101,7 @@ def _use_consumer_group(self): return False return True - def _update_fetch_positions(self, partitions): + def _update_fetch_positions(self, partitions, timeout_ms=None): """Set the fetch position to the committed position (if there is one) or reset it using the offset reset policy the user has configured. @@ -1111,27 +1109,35 @@ def _update_fetch_positions(self, partitions): partitions (List[TopicPartition]): The partitions that need updating fetch positions. + Returns True if fetch positions updated, False if timeout + Raises: NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined. """ - # Lookup any positions for partitions which are awaiting reset (which may be the - # case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do - # this check first to avoid an unnecessary lookup of committed offsets (which - # typically occurs when the user is manually assigning partitions and managing - # their own offsets). - self._fetcher.reset_offsets_if_needed(partitions) - - if not self._subscription.has_all_fetch_positions(): - # if we still don't have offsets for all partitions, then we should either seek - # to the last committed position or reset using the auto reset policy - if (self.config['api_version'] >= (0, 8, 1) and - self.config['group_id'] is not None): - # first refresh commits for all assigned partitions - self._coordinator.refresh_committed_offsets_if_needed() - - # Then, do any offset lookups in case some positions are not known - self._fetcher.update_fetch_positions(partitions) + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions') + try: + # Lookup any positions for partitions which are awaiting reset (which may be the + # case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do + # this check first to avoid an unnecessary lookup of committed offsets (which + # typically occurs when the user is manually assigning partitions and managing + # their own offsets). + self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms()) + + if not self._subscription.has_all_fetch_positions(): + # if we still don't have offsets for all partitions, then we should either seek + # to the last committed position or reset using the auto reset policy + if (self.config['api_version'] >= (0, 8, 1) and + self.config['group_id'] is not None): + # first refresh commits for all assigned partitions + self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=inner_timeout_ms()) + + # Then, do any offset lookups in case some positions are not known + self._fetcher.update_fetch_positions(partitions, timeout_ms=inner_timeout_ms()) + return True + + except KafkaTimeoutError: + return False def _message_generator_v2(self): timeout_ms = 1000 * max(0, self._consumer_timeout - time.time()) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 0edd50616..c5e56c538 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -163,7 +163,7 @@ def group_protocols(self): pass @abc.abstractmethod - def _on_join_prepare(self, generation, member_id): + def _on_join_prepare(self, generation, member_id, timeout_ms=None): """Invoked prior to each group join or rejoin. This is typically used to perform any cleanup from the previous @@ -415,7 +415,8 @@ def join_group(self, timeout_ms=None): # while another rebalance is still in progress. if not self.rejoining: self._on_join_prepare(self._generation.generation_id, - self._generation.member_id) + self._generation.member_id, + timeout_ms=inner_timeout_ms()) self.rejoining = True # fence off the heartbeat thread explicitly so that it cannot diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 92c84024c..5b4752bf8 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -349,9 +349,9 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): group_assignment[member_id] = assignment return group_assignment - def _on_join_prepare(self, generation, member_id): + def _on_join_prepare(self, generation, member_id, timeout_ms=None): # commit offsets prior to rebalance if auto-commit enabled - self._maybe_auto_commit_offsets_sync() + self._maybe_auto_commit_offsets_sync(timeout_ms=timeout_ms) # execute the user's callback before rebalance log.info("Revoking previously assigned partitions %s for group %s", @@ -392,17 +392,17 @@ def need_rejoin(self): return super(ConsumerCoordinator, self).need_rejoin() - def refresh_committed_offsets_if_needed(self): + def refresh_committed_offsets_if_needed(self, timeout_ms=None): """Fetch committed offsets for assigned partitions.""" if self._subscription.needs_fetch_committed_offsets: - offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions()) + offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions(), timeout_ms=timeout_ms) for partition, offset in six.iteritems(offsets): # verify assignment is still active if self._subscription.is_assigned(partition): self._subscription.assignment[partition].committed = offset self._subscription.needs_fetch_committed_offsets = False - def fetch_committed_offsets(self, partitions): + def fetch_committed_offsets(self, partitions, timeout_ms=None): """Fetch the current committed offsets for specified partitions Arguments: @@ -410,16 +410,23 @@ def fetch_committed_offsets(self, partitions): Returns: dict: {TopicPartition: OffsetAndMetadata} + + Raises: + KafkaTimeoutError if timeout_ms provided """ if not partitions: return {} + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout in coordinator.fetch_committed_offsets') while True: - self.ensure_coordinator_ready() + self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms()) # contact coordinator to fetch committed offsets future = self._send_offset_fetch_request(partitions) - self._client.poll(future=future) + self._client.poll(future=future, timeout_ms=inner_timeout_ms()) + + if not future.is_done: + raise Errors.KafkaTimeoutError() if future.succeeded(): return future.value @@ -427,9 +434,9 @@ def fetch_committed_offsets(self, partitions): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000) - def close(self, autocommit=True): + def close(self, autocommit=True, timeout_ms=None): """Close the coordinator, leave the current group, and reset local generation / member_id. @@ -440,7 +447,7 @@ def close(self, autocommit=True): """ try: if autocommit: - self._maybe_auto_commit_offsets_sync() + self._maybe_auto_commit_offsets_sync(timeout_ms=timeout_ms) finally: super(ConsumerCoordinator, self).close() @@ -498,7 +505,7 @@ def _do_commit_offsets_async(self, offsets, callback=None): future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res))) return future - def commit_offsets_sync(self, offsets): + def commit_offsets_sync(self, offsets, timeout_ms=None): """Commit specific offsets synchronously. This method will retry until the commit completes successfully or an @@ -517,11 +524,15 @@ def commit_offsets_sync(self, offsets): if not offsets: return + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout in coordinator.poll') while True: - self.ensure_coordinator_ready() + self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms()) future = self._send_offset_commit_request(offsets) - self._client.poll(future=future) + self._client.poll(future=future, timeout_ms=inner_timeout_ms()) + + if not future.is_done: + raise Errors.KafkaTimeoutError() if future.succeeded(): return future.value @@ -529,12 +540,12 @@ def commit_offsets_sync(self, offsets): if not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000) - def _maybe_auto_commit_offsets_sync(self): + def _maybe_auto_commit_offsets_sync(self, timeout_ms=None): if self.config['enable_auto_commit']: try: - self.commit_offsets_sync(self._subscription.all_consumed_offsets()) + self.commit_offsets_sync(self._subscription.all_consumed_offsets(), timeout_ms=timeout_ms) # The three main group membership errors are known and should not # require a stacktrace -- just a warning @@ -814,7 +825,7 @@ def _handle_offset_fetch_response(self, future, response): leader_epoch, metadata, error_code = partition_data[2:] else: metadata, error_code = partition_data[2:] - leader_epoch = -1 + leader_epoch = -1 # noqa: F841 tp = TopicPartition(topic, partition) error_type = Errors.for_code(error_code) if error_type is not Errors.NoError: diff --git a/kafka/util.py b/kafka/util.py index 6d061193a..d067a063d 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -21,7 +21,7 @@ def crc32(data): crc -= TO_SIGNED return crc else: - from binascii import crc32 + from binascii import crc32 # noqa: F401 def timeout_ms_fn(timeout_ms, error_message): @@ -32,7 +32,10 @@ def inner_timeout_ms(fallback=None): return fallback elapsed = (time.time() - begin) * 1000 if elapsed >= timeout_ms: - raise KafkaTimeoutError(error_message) + if error_message is not None: + raise KafkaTimeoutError(error_message) + else: + return 0 ret = max(0, timeout_ms - elapsed) if fallback is not None: return min(ret, fallback) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 479f6e22b..3bf334e06 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -130,10 +130,10 @@ def test_update_fetch_positions(fetcher, topic, mocker): fetcher._subscriptions.need_offset_reset(partition) fetcher._subscriptions.assignment[partition].awaiting_reset = False fetcher.update_fetch_positions([partition]) - fetcher._reset_offset.assert_called_with(partition) + fetcher._reset_offset.assert_called_with(partition, timeout_ms=None) assert fetcher._subscriptions.assignment[partition].awaiting_reset is True fetcher.update_fetch_positions([partition]) - fetcher._reset_offset.assert_called_with(partition) + fetcher._reset_offset.assert_called_with(partition, timeout_ms=None) # partition needs reset, has committed offset fetcher._reset_offset.reset_mock()