From a4e9a275c012ddbfbb1f542f3375647eba3b84a0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 28 Mar 2025 11:43:43 -0400 Subject: [PATCH 1/2] Track nodes_with_pending_fetch_requests in fetcher; dont gate send on node ready --- kafka/consumer/fetcher.py | 19 +++++++++++-------- test/test_fetcher.py | 2 ++ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 4d73ef435..95ee8ae48 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -114,6 +114,7 @@ def __init__(self, client, subscriptions, metrics, **configs): self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) self._isolation_level = READ_UNCOMMITTED self._session_handlers = {} + self._nodes_with_pending_fetch_requests = set() def send_fetches(self): """Send FetchRequests for all assigned partitions that do not already have @@ -124,12 +125,12 @@ def send_fetches(self): """ futures = [] for node_id, (request, fetch_offsets) in six.iteritems(self._create_fetch_requests()): - if self._client.ready(node_id): - log.debug("Sending FetchRequest to node %s", node_id) - future = self._client.send(node_id, request, wakeup=False) - future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.time()) - future.add_errback(self._handle_fetch_error, node_id) - futures.append(future) + log.debug("Sending FetchRequest to node %s", node_id) + self._nodes_with_pending_fetch_requests.add(node_id) + future = self._client.send(node_id, request, wakeup=False) + future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.time()) + future.add_errback(self._handle_fetch_error, node_id) + futures.append(future) self._fetch_futures.extend(futures) self._clean_done_fetch_futures() return futures @@ -593,8 +594,8 @@ def _create_fetch_requests(self): " Requesting metadata update", partition) self._client.cluster.request_update() - elif self._client.in_flight_request_count(node_id) > 0: - log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s", + elif node_id in self._nodes_with_pending_fetch_requests: + log.log(0, "Skipping fetch for partition %s because there is a pending fetch request to node %s", partition, node_id) continue @@ -707,12 +708,14 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): self._completed_fetches.append(completed_fetch) self._sensors.fetch_latency.record((time.time() - send_time) * 1000) + self._nodes_with_pending_fetch_requests.remove(node_id) def _handle_fetch_error(self, node_id, exception): level = logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR log.log(level, 'Fetch to node %s failed: %s', node_id, exception) if node_id in self._session_handlers: self._session_handlers[node_id].handle_error(exception) + self._nodes_with_pending_fetch_requests.remove(node_id) def _parse_fetched_data(self, completed_fetch): tp = completed_fetch.topic_partition diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 7822a6f1f..854f1fa98 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -423,6 +423,7 @@ def test_fetched_records(fetcher, topic, mocker): ), ]) def test__handle_fetch_response(fetcher, fetch_offsets, fetch_response, num_partitions): + fetcher._nodes_with_pending_fetch_requests.add(0) fetcher._handle_fetch_response(0, fetch_offsets, time.time(), fetch_response) assert len(fetcher._completed_fetches) == num_partitions @@ -438,6 +439,7 @@ def test__handle_fetch_response(fetcher, fetch_offsets, fetch_response, num_part ) ]) def test__handle_fetch_error(fetcher, caplog, exception, log_level): + fetcher._nodes_with_pending_fetch_requests.add(3) fetcher._handle_fetch_error(3, exception) assert len(caplog.records) == 1 assert caplog.records[0].levelname == logging.getLevelName(log_level) From 6eb74d3a4e764f8241b4384270ba8306b9fcfa49 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 28 Mar 2025 11:57:19 -0400 Subject: [PATCH 2/2] Skip fetch requests to nodes in backoff/retry or throttled --- kafka/consumer/fetcher.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 95ee8ae48..61480fb07 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -594,6 +594,18 @@ def _create_fetch_requests(self): " Requesting metadata update", partition) self._client.cluster.request_update() + elif not self._client.connected(node_id) and self._client.connection_delay(node_id) > 0: + # If we try to send during the reconnect backoff window, then the request is just + # going to be failed anyway before being sent, so skip the send for now + log.log(0, "Skipping fetch for partition %s because node %s is awaiting reconnect backoff", + partition, node_id) + + elif self._client.throttle_delay(node_id) > 0: + # If we try to send while throttled, then the request is just + # going to be failed anyway before being sent, so skip the send for now + log.log(0, "Skipping fetch for partition %s because node %s is throttled", + partition, node_id) + elif node_id in self._nodes_with_pending_fetch_requests: log.log(0, "Skipping fetch for partition %s because there is a pending fetch request to node %s", partition, node_id)