diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 751403f52..071371b98 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -327,7 +327,6 @@ class KafkaConsumer(six.Iterator): 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, 'socks5_proxy': None, - 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator 'kafka_client': KafkaClient, } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 @@ -845,8 +844,7 @@ def seek(self, partition, offset): assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' log.debug("Seeking to offset %s for partition %s", offset, partition) self._subscription.assignment[partition].seek(offset) - if not self.config['legacy_iterator']: - self._iterator = None + self._iterator = None def seek_to_beginning(self, *partitions): """Seek to the oldest available offset for partitions. @@ -871,8 +869,7 @@ def seek_to_beginning(self, *partitions): for tp in partitions: log.debug("Seeking to beginning of partition %s", tp) self._subscription.need_offset_reset(tp, OffsetResetStrategy.EARLIEST) - if not self.config['legacy_iterator']: - self._iterator = None + self._iterator = None def seek_to_end(self, *partitions): """Seek to the most recent available offset for partitions. @@ -897,8 +894,7 @@ def seek_to_end(self, *partitions): for tp in partitions: log.debug("Seeking to end of partition %s", tp) self._subscription.need_offset_reset(tp, OffsetResetStrategy.LATEST) - if not self.config['legacy_iterator']: - self._iterator = None + self._iterator = None def subscribe(self, topics=(), pattern=None, listener=None): """Subscribe to a list of topics, or a topic regex pattern. @@ -974,8 +970,7 @@ def unsubscribe(self): self._client.cluster.need_all_topic_metadata = False self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions") - if not self.config['legacy_iterator']: - self._iterator = None + self._iterator = None def metrics(self, raw=False): """Get metrics on consumer performance. @@ -1157,73 +1152,12 @@ def _message_generator_v2(self): self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, '', -1) yield record - def _message_generator(self): - assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment' - - def inner_poll_ms(): - return max(0, min((1000 * (self._consumer_timeout - time.time())), self.config['retry_backoff_ms'])) - - while time.time() < self._consumer_timeout: - - if not self._coordinator.poll(timeout_ms=inner_poll_ms()): - continue - - # Fetch offsets for any subscribed partitions that we arent tracking yet - if not self._subscription.has_all_fetch_positions(): - partitions = self._subscription.missing_fetch_positions() - self._update_fetch_positions(partitions) - - self._client.poll(timeout_ms=inner_poll_ms()) - - # after the long poll, we should check whether the group needs to rebalance - # prior to returning data so that the group can stabilize faster - if self._coordinator.need_rejoin(): - continue - - # We need to make sure we at least keep up with scheduled tasks, - # like heartbeats, auto-commits, and metadata refreshes - timeout_at = self._next_timeout() - - # Short-circuit the fetch iterator if we are already timed out - # to avoid any unintentional interaction with fetcher setup - if time.time() > timeout_at: - continue - - for msg in self._fetcher: - yield msg - if time.time() > timeout_at: - log.debug("internal iterator timeout - breaking for poll") - break - self._client.poll(timeout_ms=0) - - # An else block on a for loop only executes if there was no break - # so this should only be called on a StopIteration from the fetcher - # We assume that it is safe to init_fetches when fetcher is done - # i.e., there are no more records stored internally - else: - self._fetcher.send_fetches() - - def _next_timeout(self): - timeout = min(self._consumer_timeout, - self._client.cluster.ttl() / 1000.0 + time.time(), - self._coordinator.time_to_next_poll() + time.time()) - return timeout - def __iter__(self): # pylint: disable=non-iterator-returned return self def __next__(self): if self._closed: raise StopIteration('KafkaConsumer closed') - # Now that the heartbeat thread runs in the background - # there should be no reason to maintain a separate iterator - # but we'll keep it available for a few releases just in case - if self.config['legacy_iterator']: - return self.next_v1() - else: - return self.next_v2() - - def next_v2(self): self._set_consumer_timeout() while time.time() < self._consumer_timeout: if not self._iterator: @@ -1234,17 +1168,6 @@ def next_v2(self): self._iterator = None raise StopIteration() - def next_v1(self): - if not self._iterator: - self._iterator = self._message_generator() - - self._set_consumer_timeout() - try: - return next(self._iterator) - except StopIteration: - self._iterator = None - raise - def _set_consumer_timeout(self): # consumer_timeout_ms can be used to stop iteration early if self.config['consumer_timeout_ms'] >= 0: