From b1c2746833e7afb8b11242ed3c481cd6e5be4d22 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 22 May 2026 07:39:20 -0700 Subject: [PATCH] KIP-392: Rack-aware fetch from closest replica --- kafka/consumer/fetcher.py | 63 ++++++- kafka/consumer/group.py | 7 + kafka/consumer/subscription_state.py | 44 +++++ test/consumer/test_fetcher.py | 206 ++++++++++++++++++++++ test/consumer/test_fetcher_mock_broker.py | 111 ++++++++++++ 5 files changed, 425 insertions(+), 6 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 85d5df394..2a7d27879 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -64,6 +64,8 @@ class Fetcher: 'retry_backoff_ms': 100, 'enable_incremental_fetch_sessions': True, 'isolation_level': 'read_uncommitted', + 'client_rack': '', + 'metadata_max_age_ms': 5 * 60 * 1000, } def __init__(self, client, subscriptions, **configs): @@ -1137,6 +1139,29 @@ def _fetchable_partitions(self): discard.update(self._paused_partition_records) return [tp for tp in fetchable if tp not in discard] + def _select_read_replica(self, tp): + """Pick the node to fetch from for ``tp``: a cached preferred read + replica (KIP-392) when valid and known to the cluster, otherwise the + partition leader. An unknown / unreachable preferred replica is + cleared so the next fetch goes to the leader.""" + preferred = self._subscriptions.assignment[tp].preferred_read_replica() + if preferred is None: + leader = self._manager.cluster.leader_for_partition(tp) + log.debug("Selecting leader %s as read replica for partition %s", + leader, tp) + return leader + # If the preferred node fell out of cluster metadata, fall back to leader. + if self._manager.cluster.broker_metadata(preferred) is None: + self._subscriptions.assignment[tp].clear_preferred_read_replica() + leader = self._manager.cluster.leader_for_partition(tp) + log.debug("Preferred read replica %s for partition %s no longer in" + " cluster metadata; falling back to leader %s", + preferred, tp, leader) + return leader + log.debug("Selecting preferred read replica %s for partition %s", + preferred, tp) + return preferred + def _create_fetch_requests(self): """Create fetch requests for all assigned partitions, grouped by node. @@ -1152,10 +1177,10 @@ def _create_fetch_requests(self): # v15 replica state (KIP-903) # v16 node endpoints (KIP-951) # v17 directory id (KIP-853) - max_version = 10 + max_version = 11 fetchable = collections.defaultdict(collections.OrderedDict) for tp in self._fetchable_partitions(): - node_id = self._manager.cluster.leader_for_partition(tp) + node_id = self._select_read_replica(tp) position = self._subscriptions.assignment[tp].position @@ -1214,6 +1239,7 @@ def _create_fetch_requests(self): session_epoch=session.epoch, topics=session.to_send, forgotten_topics_data=session.to_forget, + rack_id=self.config['client_rack'], min_version=min_version, max_version=max_version, ) @@ -1336,6 +1362,16 @@ def _parse_fetched_data(self, completed_fetch): if highwater >= 0: self._subscriptions.assignment[tp].highwater = highwater + preferred_read_replica = completed_fetch.partition_data.preferred_read_replica + if self._subscriptions.assignment[tp].update_preferred_read_replica( + preferred_read_replica, + time.monotonic() + self.config['metadata_max_age_ms'] / 1000.0): + if preferred_read_replica is None or preferred_read_replica < 0: + log.debug("Cleared preferred read replica for partition %s", tp) + else: + log.debug("Updating preferred read replica for partition %s to %s", + tp, preferred_read_replica) + elif error_type in (Errors.NotLeaderForPartitionError, Errors.ReplicaNotAvailableError, Errors.UnknownTopicOrPartitionError, @@ -1347,6 +1383,8 @@ def _parse_fetched_data(self, completed_fetch): # KIP-320: the broker has a different view of the leader epoch # than we do; ask for metadata refresh and queue position # validation so we detect any truncation before continuing. + # The cache is cleared by maybe_validate_position once the + # cluster cache catches up with the new epoch. log.debug("Fetch for %s returned %s; marking position for validation", tp, error_type.__name__) self._subscriptions.request_position_validation(tp) @@ -1357,11 +1395,24 @@ def _parse_fetched_data(self, completed_fetch): log.debug("Discarding stale fetch response for partition %s" " since the fetched offset %d does not match the" " current offset %d", tp, fetch_offset, position.offset) - elif self._subscriptions.has_default_offset_reset_policy(): - log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp) - self._subscriptions.request_offset_reset(tp) else: - raise Errors.OffsetOutOfRangeError({tp: fetch_offset}) + # KIP-392: a follower may be lagging behind the leader's + # high watermark such that our leader-side position is + # legitimately out of *its* range. If we'd been fetching + # from a follower, drop the cache and retry against the + # leader BEFORE concluding the offset is really out of + # range. Only when there was no cached follower do we + # proceed to reset / raise. Matches Java's behavior. + cleared = self._subscriptions.assignment[tp].clear_preferred_read_replica() + if cleared is not None: + log.debug("Fetch offset %s out of range for %s on follower %s;" + " retrying from leader", fetch_offset, tp, cleared) + elif self._subscriptions.has_default_offset_reset_policy(): + log.info("Fetch offset %s is out of range for topic-partition %s", + fetch_offset, tp) + self._subscriptions.request_offset_reset(tp) + else: + raise Errors.OffsetOutOfRangeError({tp: fetch_offset}) elif error_type is Errors.TopicAuthorizationFailedError: log.warning("Not authorized to read from topic %s.", tp.topic) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 9d041a323..a38d9de98 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -51,6 +51,12 @@ class KafkaConsumer: server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: 'kafka-python-{version}' + client_rack (str): A rack identifier for this client. Sent to brokers + on FetchRequest v11+ (KIP-392, requires Kafka 2.4+ brokers with + ``replica.selector.class`` configured server-side). When set, the + broker may route fetches to a follower replica in the same rack + instead of the leader, reducing cross-rack traffic. Leave as '' + (default) to always fetch from the leader. group_id (str or None): The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If None, auto-partition assignment (via @@ -281,6 +287,7 @@ class KafkaConsumer: DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost', 'client_id': 'kafka-python-' + __version__, + 'client_rack': '', 'group_id': None, 'group_instance_id': None, 'key_deserializer': None, diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 8fa889b4d..fd969219d 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -503,6 +503,12 @@ def __init__(self): # until OffsetForLeaderEpoch confirms the position is consistent with # the current leader's log; mutually exclusive with awaiting_reset. self._awaiting_validation = False + # KIP-392: preferred read replica chosen by the broker (rack-aware). + # ``_preferred_read_replica_expiration`` is a monotonic deadline; after + # it passes we fall back to the leader and re-learn. Cleared on + # replica-related errors so the next fetch goes to the leader. + self._preferred_read_replica = None + self._preferred_read_replica_expiration = None def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -520,6 +526,7 @@ def reset(self, strategy): self._position = None self.next_allowed_retry_time = None self._awaiting_validation = False + self.clear_preferred_read_replica() def is_reset_allowed(self): return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.monotonic() @@ -547,6 +554,7 @@ def seek(self, offset): self.drop_pending_record_batch = True self.next_allowed_retry_time = None self._awaiting_validation = False + self.clear_preferred_read_replica() def pause(self): self.paused = True @@ -557,6 +565,41 @@ def resume(self): def is_fetchable(self): return not self.paused and self.has_valid_position and not self._awaiting_validation + def preferred_read_replica(self): + """Return the currently-cached preferred read replica (KIP-392), + or None if unset/expired. Lazily clears the cache on expiry.""" + if self._preferred_read_replica is None: + return None + if (self._preferred_read_replica_expiration is not None + and time.monotonic() >= self._preferred_read_replica_expiration): + self.clear_preferred_read_replica() + return None + return self._preferred_read_replica + + def update_preferred_read_replica(self, node_id, expiration_time): + """Cache the broker's chosen preferred read replica until ``expiration_time`` + (monotonic). ``node_id == -1`` (or None) clears the cache. + + Returns True if the cached replica actually changed (caller can log). + """ + if node_id is None or node_id < 0: + changed = self._preferred_read_replica is not None + self.clear_preferred_read_replica() + return changed + if node_id == self._preferred_read_replica: + return False + self._preferred_read_replica = node_id + self._preferred_read_replica_expiration = expiration_time + return True + + def clear_preferred_read_replica(self): + """Clear the cached preferred read replica. Returns the previously- + cached node_id (or None) so the caller can log the eviction.""" + previous = self._preferred_read_replica + self._preferred_read_replica = None + self._preferred_read_replica_expiration = None + return previous + @property def awaiting_validation(self): return self._awaiting_validation @@ -578,6 +621,7 @@ def maybe_validate_position(self, current_leader_epoch): return False if self._position.leader_epoch >= current_leader_epoch: return False + self.clear_preferred_read_replica() self._awaiting_validation = True self.next_allowed_retry_time = None return True diff --git a/test/consumer/test_fetcher.py b/test/consumer/test_fetcher.py index e6b3ddd68..2b3e5e76f 100644 --- a/test/consumer/test_fetcher.py +++ b/test/consumer/test_fetcher.py @@ -1478,3 +1478,209 @@ def test_validate_offsets_if_needed_no_op_when_no_partitions(fetcher): returns None without scheduling a task.""" assert fetcher.validate_offsets_if_needed() is None assert fetcher._validation_task is None + + +# --------------------------------------------------------------------------- +# KIP-392: rack-aware fetching / preferred read replica +# --------------------------------------------------------------------------- + + +def _build_completed_fetch_with_replica(tp, offset=0, preferred_read_replica=-1): + """Like _build_completed_fetch but lets the test set preferred_read_replica.""" + partition_data = _ResponsePartition( + error_code=0, + high_watermark=100, + records=_build_record_batch([], offset=offset), + preferred_read_replica=preferred_read_replica, + ) + return CompletedFetch(tp, offset, 11, partition_data, MagicMock()) + + +class TestKip392PreferredReadReplica: + """Unit tests for the per-partition preferred-replica cache and routing.""" + + def test_create_fetch_requests_sends_rack_id(self, fetcher, mocker, assignment): + """``client_rack`` config flows through to the FetchRequest's rack_id.""" + fetcher.config['client_rack'] = 'rack-a' + mocker.patch.object(fetcher._manager.cluster, "leader_for_partition", return_value=0) + mocker.patch.object(fetcher._manager.cluster, "leader_epoch_for_partition", return_value=0) + by_node = fetcher._create_fetch_requests() + request, _ = by_node[0] + assert request.rack_id == 'rack-a' + + def test_select_read_replica_falls_back_to_leader_without_cache( + self, fetcher, topic, mocker): + """No preferred replica cached -> always pick the leader.""" + tp = TopicPartition(topic, 0) + mocker.patch.object( + fetcher._manager.cluster, 'leader_for_partition', return_value=7) + assert fetcher._select_read_replica(tp) == 7 + + def test_select_read_replica_uses_cache_when_valid( + self, fetcher, topic, mocker): + tp = TopicPartition(topic, 0) + fetcher._subscriptions.assignment[tp].update_preferred_read_replica( + 3, time.monotonic() + 60) + mocker.patch.object( + fetcher._manager.cluster, 'leader_for_partition', return_value=7) + # broker_metadata returns something truthy so we don't fall back. + mocker.patch.object( + fetcher._manager.cluster, 'broker_metadata', return_value=MagicMock()) + assert fetcher._select_read_replica(tp) == 3 + + def test_select_read_replica_falls_back_when_replica_unknown( + self, fetcher, topic, mocker): + """Cached preferred replica not in cluster metadata -> fall back to + leader AND clear the cache so we re-learn next time.""" + tp = TopicPartition(topic, 0) + fetcher._subscriptions.assignment[tp].update_preferred_read_replica( + 3, time.monotonic() + 60) + mocker.patch.object( + fetcher._manager.cluster, 'leader_for_partition', return_value=7) + mocker.patch.object( + fetcher._manager.cluster, 'broker_metadata', return_value=None) + assert fetcher._select_read_replica(tp) == 7 + assert fetcher._subscriptions.assignment[tp].preferred_read_replica() is None + + def test_select_read_replica_expires_cache(self, fetcher, topic, mocker): + """Past the TTL -> cache is silently cleared on next read.""" + tp = TopicPartition(topic, 0) + fetcher._subscriptions.assignment[tp].update_preferred_read_replica( + 3, time.monotonic() - 1) # already expired + mocker.patch.object( + fetcher._manager.cluster, 'leader_for_partition', return_value=7) + assert fetcher._select_read_replica(tp) == 7 + assert fetcher._subscriptions.assignment[tp]._preferred_read_replica is None + + def test_parse_fetched_data_caches_preferred_replica( + self, fetcher, topic, mocker): + """Success response with preferred_read_replica >= 0 -> cache it.""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + completed = _build_completed_fetch_with_replica(tp, preferred_read_replica=5) + fetcher._parse_fetched_data(completed) + assert fetcher._subscriptions.assignment[tp].preferred_read_replica() == 5 + + def test_parse_fetched_data_negative_preferred_clears_cache( + self, fetcher, topic, mocker): + """preferred_read_replica == -1 -> broker is telling us to go back to + the leader; the cache must be cleared even if we had one.""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + fetcher._subscriptions.assignment[tp].update_preferred_read_replica( + 5, time.monotonic() + 60) + completed = _build_completed_fetch_with_replica(tp, preferred_read_replica=-1) + fetcher._parse_fetched_data(completed) + assert fetcher._subscriptions.assignment[tp].preferred_read_replica() is None + + def test_not_leader_error_preserves_preferred_replica( + self, fetcher, topic, mocker): + """NOT_LEADER_OR_FOLLOWER on a follower fetch only triggers a metadata + refresh; the cache survives this transient error (matches Java) and + will be cleared by maybe_validate_position if the leader actually + changed.""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + fetcher._subscriptions.assignment[tp].update_preferred_read_replica( + 5, time.monotonic() + 60) + mocker.patch.object(fetcher._manager.cluster, 'request_update') + completed = _build_completed_fetch(tp, [], error=NotLeaderForPartitionError) + fetcher._parse_fetched_data(completed) + assert fetcher._subscriptions.assignment[tp].preferred_read_replica() == 5 + fetcher._manager.cluster.request_update.assert_called() + + def test_fenced_epoch_preserves_preferred_replica(self, fetcher, topic, mocker): + """FENCED_LEADER_EPOCH triggers position validation and metadata + refresh; clearing the preferred replica is deferred to + maybe_validate_position once the cluster cache catches up.""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + # Seed a position with a valid (non-sentinel) leader_epoch so + # request_position_validation actually marks the partition. + fetcher._subscriptions.assignment[tp].seek(OffsetAndMetadata(0, '', 3)) + fetcher._subscriptions.assignment[tp].update_preferred_read_replica( + 5, time.monotonic() + 60) + mocker.patch.object(fetcher._manager.cluster, 'request_update') + completed = _build_completed_fetch( + tp, [], error=Errors.FencedLeaderEpochError) + fetcher._parse_fetched_data(completed) + # Cache survives the fetch handler; awaiting_validation is set. + assert fetcher._subscriptions.assignment[tp].preferred_read_replica() == 5 + assert fetcher._subscriptions.assignment[tp].awaiting_validation is True + + def test_offset_out_of_range_with_follower_retries_against_leader( + self, fetcher, topic, mocker): + """A follower may lag behind the leader's HW; OFFSET_OUT_OF_RANGE + on a follower fetch must clear the cache and retry against the + leader rather than trigger auto_offset_reset (matches Java).""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + fetcher._subscriptions.assignment[tp].update_preferred_read_replica( + 5, time.monotonic() + 60) + completed = _build_completed_fetch(tp, [], error=OffsetOutOfRangeError) + fetcher._parse_fetched_data(completed) + # Cache cleared so next fetch goes to leader. + assert fetcher._subscriptions.assignment[tp].preferred_read_replica() is None + # No reset requested — we want to re-confirm against the leader first. + assert fetcher._subscriptions.assignment[tp].awaiting_reset is False + + def test_offset_out_of_range_without_follower_resets( + self, fetcher, topic, mocker): + """Without a cached preferred replica, OFFSET_OUT_OF_RANGE falls + through to the existing reset path.""" + fetcher.config['check_crcs'] = False + tp = TopicPartition(topic, 0) + # No preferred replica set. + completed = _build_completed_fetch(tp, [], error=OffsetOutOfRangeError) + fetcher._parse_fetched_data(completed) + assert fetcher._subscriptions.assignment[tp].awaiting_reset is True + + def test_leader_epoch_advance_clears_preferred_replica(self, topic): + """maybe_validate_position must drop the cache when the cluster + epoch advances past our position's epoch — same partition's leader + almost certainly changed.""" + from kafka.consumer.subscription_state import TopicPartitionState + state = TopicPartitionState() + state.seek(OffsetAndMetadata(50, '', 3)) + state.update_preferred_read_replica(7, time.monotonic() + 60) + assert state.preferred_read_replica() == 7 + # Cluster reports a newer leader_epoch. + triggered = state.maybe_validate_position(current_leader_epoch=5) + assert triggered is True + assert state.preferred_read_replica() is None + + def test_update_does_not_refresh_ttl_on_same_replica(self, topic): + """A steady stream of fetches from the same follower must NOT keep + refreshing the lease — the TTL counts down regardless. Matches Java.""" + from kafka.consumer.subscription_state import TopicPartitionState + state = TopicPartitionState() + expiration = time.monotonic() + 60 + changed = state.update_preferred_read_replica(5, expiration) + assert changed is True + original_expiration = state._preferred_read_replica_expiration + # Repeat updates with the same replica id should be no-ops. + changed = state.update_preferred_read_replica(5, expiration + 9999) + assert changed is False + assert state._preferred_read_replica_expiration == original_expiration + # Changing to a different replica resets the lease. + changed = state.update_preferred_read_replica(6, expiration + 9999) + assert changed is True + assert state._preferred_read_replica_expiration == expiration + 9999 + + def test_seek_clears_preferred_replica(self, topic): + """seek() invalidates the cache - the user may have jumped to an + offset the follower doesn't yet have.""" + from kafka.consumer.subscription_state import TopicPartitionState + state = TopicPartitionState() + state.seek(OffsetAndMetadata(0, '', -1)) + state.update_preferred_read_replica(3, time.monotonic() + 60) + state.seek(OffsetAndMetadata(100, '', -1)) + assert state.preferred_read_replica() is None + + def test_reset_clears_preferred_replica(self, topic): + from kafka.consumer.subscription_state import TopicPartitionState + state = TopicPartitionState() + state.seek(OffsetAndMetadata(0, '', -1)) + state.update_preferred_read_replica(3, time.monotonic() + 60) + state.reset(OffsetResetStrategy.LATEST) + assert state.preferred_read_replica() is None diff --git a/test/consumer/test_fetcher_mock_broker.py b/test/consumer/test_fetcher_mock_broker.py index 7f22eca0e..db3b2895a 100644 --- a/test/consumer/test_fetcher_mock_broker.py +++ b/test/consumer/test_fetcher_mock_broker.py @@ -292,3 +292,114 @@ def test_validation_retries_on_fenced_epoch_response( assert elapsed >= 0.05, ( '_validate_offsets_async did not sleep for retry_backoff_ms; %.3fs' % elapsed) + + +# --------------------------------------------------------------------------- # +# KIP-392: rack-aware fetching # +# --------------------------------------------------------------------------- # + + +def _fetch_response_with_preferred_replica(preferred_read_replica): + """Empty-records FetchResponse advertising a preferred read replica.""" + return FetchResponse( + throttle_time_ms=0, error_code=0, session_id=0, + responses=[_FetchTopic(topic=TOPIC, partitions=[_FetchPartition( + partition_index=PARTITION, error_code=0, high_watermark=100, + last_stable_offset=-1, log_start_offset=-1, + aborted_transactions=[], + preferred_read_replica=preferred_read_replica, + records=b'')])]) + + +class TestKIP392RackAwareFetching: + """End-to-end: client_rack arrives on the wire and the broker's + preferred_read_replica is honored on the next fetch.""" + + def test_rack_id_sent_on_fetch_request(self, broker, manager, fetcher): + """FetchRequest carries ``rack_id`` when client_rack is configured, + and negotiates to v11+ against a modern broker.""" + fetcher.config['client_rack'] = 'us-east-1a' + tp = TopicPartition(TOPIC, PARTITION) + fetcher._subscriptions.seek(tp, OffsetAndMetadata(0, '', 3)) + + captured = {} + + def handler(api_key, api_version, correlation_id, request_bytes): + captured['api_version'] = api_version + decoded = FetchRequest.decode( + request_bytes, version=api_version, header=True) + captured['rack_id'] = decoded.rack_id + return _fetch_response_with_preferred_replica(-1) + broker.respond_fn(FetchRequest, handler) + + # Build the FetchRequest via the Fetcher (so client_rack is wired in) + # and send it directly via manager - sidesteps the IO-thread driving + # complexity of fetcher.send_fetches() in a synchronous test. + requests = fetcher._create_fetch_requests() + assert 0 in requests, 'expected one fetch routed to the leader (node 0)' + request, _ = requests[0] + future = manager.send(request, node_id=0) + manager.run(manager.wait_for, future, 2000) + + assert captured['api_version'] >= 11, ( + 'KIP-392 requires FetchRequest v11+; got v%s' % captured.get('api_version')) + assert captured['rack_id'] == 'us-east-1a' + + def test_preferred_replica_cached_and_used_on_next_fetch( + self, broker, manager, fetcher): + """First fetch goes to the leader; broker returns + ``preferred_read_replica=N``; second fetch routes to node N.""" + tp = TopicPartition(TOPIC, PARTITION) + # Make node 5 reachable via metadata (still pointing at the same + # MockBroker socket so the request actually completes). + broker.set_metadata( + brokers=[ + _MetaBroker(node_id=0, host=broker.host, port=broker.port, rack=None), + _MetaBroker(node_id=5, host=broker.host, port=broker.port, rack=None), + ], + topics=[_MetaTopic( + error_code=0, name=TOPIC, is_internal=False, + partitions=[_MetaPartition( + error_code=0, partition_index=PARTITION, + leader_id=0, leader_epoch=3, + replica_nodes=[0, 5], isr_nodes=[0, 5], + offline_replicas=[])], + )]) + # Re-pull metadata so the cluster cache knows about node 5. + manager._net.run(manager.wait_for, manager.cluster.request_update(), 2000) + + fetcher._subscriptions.seek(tp, OffsetAndMetadata(0, '', 3)) + + # Without a cached preferred replica, the first fetch must go to + # the leader (node 0). + assert fetcher._select_read_replica(tp) == 0 + + # Synthesize a fetch response advertising node 5 as preferred. + from unittest.mock import MagicMock + from kafka.consumer.fetcher import CompletedFetch + completed = CompletedFetch( + tp, 0, 11, + _fetch_response_with_preferred_replica(5).responses[0].partitions[0], + MagicMock()) + fetcher._parse_fetched_data(completed) + + # Next fetch should route to node 5. + assert fetcher._select_read_replica(tp) == 5 + + def test_preferred_replica_negative_one_means_leader( + self, broker, manager, fetcher): + """``preferred_read_replica == -1`` is the broker explicitly telling + the client to stop using a cached follower.""" + tp = TopicPartition(TOPIC, PARTITION) + fetcher._subscriptions.assignment[tp].update_preferred_read_replica( + 5, time.monotonic() + 60) + + from unittest.mock import MagicMock + from kafka.consumer.fetcher import CompletedFetch + completed = CompletedFetch( + tp, 0, 11, + _fetch_response_with_preferred_replica(-1).responses[0].partitions[0], + MagicMock()) + fetcher._subscriptions.seek(tp, OffsetAndMetadata(0, '', 3)) + fetcher._parse_fetched_data(completed) + assert fetcher._subscriptions.assignment[tp].preferred_read_replica() is None