Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 57 additions & 6 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class Fetcher:
'retry_backoff_ms': 100,
'enable_incremental_fetch_sessions': True,
'isolation_level': 'read_uncommitted',
'client_rack': '',
'metadata_max_age_ms': 5 * 60 * 1000,
}

def __init__(self, client, subscriptions, **configs):
Expand Down Expand Up @@ -1137,6 +1139,29 @@ def _fetchable_partitions(self):
discard.update(self._paused_partition_records)
return [tp for tp in fetchable if tp not in discard]

def _select_read_replica(self, tp):
"""Pick the node to fetch from for ``tp``: a cached preferred read
replica (KIP-392) when valid and known to the cluster, otherwise the
partition leader. An unknown / unreachable preferred replica is
cleared so the next fetch goes to the leader."""
preferred = self._subscriptions.assignment[tp].preferred_read_replica()
if preferred is None:
leader = self._manager.cluster.leader_for_partition(tp)
log.debug("Selecting leader %s as read replica for partition %s",
leader, tp)
return leader
# If the preferred node fell out of cluster metadata, fall back to leader.
if self._manager.cluster.broker_metadata(preferred) is None:
self._subscriptions.assignment[tp].clear_preferred_read_replica()
leader = self._manager.cluster.leader_for_partition(tp)
log.debug("Preferred read replica %s for partition %s no longer in"
" cluster metadata; falling back to leader %s",
preferred, tp, leader)
return leader
log.debug("Selecting preferred read replica %s for partition %s",
preferred, tp)
return preferred

def _create_fetch_requests(self):
"""Create fetch requests for all assigned partitions, grouped by node.

Expand All @@ -1152,10 +1177,10 @@ def _create_fetch_requests(self):
# v15 replica state (KIP-903)
# v16 node endpoints (KIP-951)
# v17 directory id (KIP-853)
max_version = 10
max_version = 11
fetchable = collections.defaultdict(collections.OrderedDict)
for tp in self._fetchable_partitions():
node_id = self._manager.cluster.leader_for_partition(tp)
node_id = self._select_read_replica(tp)

position = self._subscriptions.assignment[tp].position

Expand Down Expand Up @@ -1214,6 +1239,7 @@ def _create_fetch_requests(self):
session_epoch=session.epoch,
topics=session.to_send,
forgotten_topics_data=session.to_forget,
rack_id=self.config['client_rack'],
min_version=min_version,
max_version=max_version,
)
Expand Down Expand Up @@ -1336,6 +1362,16 @@ def _parse_fetched_data(self, completed_fetch):
if highwater >= 0:
self._subscriptions.assignment[tp].highwater = highwater

preferred_read_replica = completed_fetch.partition_data.preferred_read_replica
if self._subscriptions.assignment[tp].update_preferred_read_replica(
preferred_read_replica,
time.monotonic() + self.config['metadata_max_age_ms'] / 1000.0):
if preferred_read_replica is None or preferred_read_replica < 0:
log.debug("Cleared preferred read replica for partition %s", tp)
else:
log.debug("Updating preferred read replica for partition %s to %s",
tp, preferred_read_replica)

elif error_type in (Errors.NotLeaderForPartitionError,
Errors.ReplicaNotAvailableError,
Errors.UnknownTopicOrPartitionError,
Expand All @@ -1347,6 +1383,8 @@ def _parse_fetched_data(self, completed_fetch):
# KIP-320: the broker has a different view of the leader epoch
# than we do; ask for metadata refresh and queue position
# validation so we detect any truncation before continuing.
# The cache is cleared by maybe_validate_position once the
# cluster cache catches up with the new epoch.
log.debug("Fetch for %s returned %s; marking position for validation",
tp, error_type.__name__)
self._subscriptions.request_position_validation(tp)
Expand All @@ -1357,11 +1395,24 @@ def _parse_fetched_data(self, completed_fetch):
log.debug("Discarding stale fetch response for partition %s"
" since the fetched offset %d does not match the"
" current offset %d", tp, fetch_offset, position.offset)
elif self._subscriptions.has_default_offset_reset_policy():
log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp)
self._subscriptions.request_offset_reset(tp)
else:
raise Errors.OffsetOutOfRangeError({tp: fetch_offset})
# KIP-392: a follower may be lagging behind the leader's
# high watermark such that our leader-side position is
# legitimately out of *its* range. If we'd been fetching
# from a follower, drop the cache and retry against the
# leader BEFORE concluding the offset is really out of
# range. Only when there was no cached follower do we
# proceed to reset / raise. Matches Java's behavior.
cleared = self._subscriptions.assignment[tp].clear_preferred_read_replica()
if cleared is not None:
log.debug("Fetch offset %s out of range for %s on follower %s;"
" retrying from leader", fetch_offset, tp, cleared)
elif self._subscriptions.has_default_offset_reset_policy():
log.info("Fetch offset %s is out of range for topic-partition %s",
fetch_offset, tp)
self._subscriptions.request_offset_reset(tp)
else:
raise Errors.OffsetOutOfRangeError({tp: fetch_offset})

elif error_type is Errors.TopicAuthorizationFailedError:
log.warning("Not authorized to read from topic %s.", tp.topic)
Expand Down
7 changes: 7 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ class KafkaConsumer:
server-side log entries that correspond to this client. Also
submitted to GroupCoordinator for logging with respect to
consumer group administration. Default: 'kafka-python-{version}'
client_rack (str): A rack identifier for this client. Sent to brokers
on FetchRequest v11+ (KIP-392, requires Kafka 2.4+ brokers with
``replica.selector.class`` configured server-side). When set, the
broker may route fetches to a follower replica in the same rack
instead of the leader, reducing cross-rack traffic. Leave as ''
(default) to always fetch from the leader.
group_id (str or None): The name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and
committing offsets. If None, auto-partition assignment (via
Expand Down Expand Up @@ -281,6 +287,7 @@ class KafkaConsumer:
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
'client_rack': '',
'group_id': None,
'group_instance_id': None,
'key_deserializer': None,
Expand Down
44 changes: 44 additions & 0 deletions kafka/consumer/subscription_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,12 @@ def __init__(self):
# until OffsetForLeaderEpoch confirms the position is consistent with
# the current leader's log; mutually exclusive with awaiting_reset.
self._awaiting_validation = False
# KIP-392: preferred read replica chosen by the broker (rack-aware).
# ``_preferred_read_replica_expiration`` is a monotonic deadline; after
# it passes we fall back to the leader and re-learn. Cleared on
# replica-related errors so the next fetch goes to the leader.
self._preferred_read_replica = None
self._preferred_read_replica_expiration = None

def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'
Expand All @@ -520,6 +526,7 @@ def reset(self, strategy):
self._position = None
self.next_allowed_retry_time = None
self._awaiting_validation = False
self.clear_preferred_read_replica()

def is_reset_allowed(self):
return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.monotonic()
Expand Down Expand Up @@ -547,6 +554,7 @@ def seek(self, offset):
self.drop_pending_record_batch = True
self.next_allowed_retry_time = None
self._awaiting_validation = False
self.clear_preferred_read_replica()

def pause(self):
self.paused = True
Expand All @@ -557,6 +565,41 @@ def resume(self):
def is_fetchable(self):
return not self.paused and self.has_valid_position and not self._awaiting_validation

def preferred_read_replica(self):
"""Return the currently-cached preferred read replica (KIP-392),
or None if unset/expired. Lazily clears the cache on expiry."""
if self._preferred_read_replica is None:
return None
if (self._preferred_read_replica_expiration is not None
and time.monotonic() >= self._preferred_read_replica_expiration):
self.clear_preferred_read_replica()
return None
return self._preferred_read_replica

def update_preferred_read_replica(self, node_id, expiration_time):
"""Cache the broker's chosen preferred read replica until ``expiration_time``
(monotonic). ``node_id == -1`` (or None) clears the cache.

Returns True if the cached replica actually changed (caller can log).
"""
if node_id is None or node_id < 0:
changed = self._preferred_read_replica is not None
self.clear_preferred_read_replica()
return changed
if node_id == self._preferred_read_replica:
return False
self._preferred_read_replica = node_id
self._preferred_read_replica_expiration = expiration_time
return True

def clear_preferred_read_replica(self):
"""Clear the cached preferred read replica. Returns the previously-
cached node_id (or None) so the caller can log the eviction."""
previous = self._preferred_read_replica
self._preferred_read_replica = None
self._preferred_read_replica_expiration = None
return previous

@property
def awaiting_validation(self):
return self._awaiting_validation
Expand All @@ -578,6 +621,7 @@ def maybe_validate_position(self, current_leader_epoch):
return False
if self._position.leader_epoch >= current_leader_epoch:
return False
self.clear_preferred_read_replica()
self._awaiting_validation = True
self.next_allowed_retry_time = None
return True
Expand Down
Loading
Loading