From fec476af09976f008c08e1da87ef76186bd988ab Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 21 May 2025 15:24:21 -0700 Subject: [PATCH] More / updated debug logging for coordinator / consumer --- kafka/consumer/group.py | 7 +++++-- kafka/coordinator/consumer.py | 5 +++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ce3cf9203..71cc86214 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -699,6 +699,7 @@ def _poll_once(self, timer, max_records, update_offsets=True): dict: Map of topic to list of records (may be empty). """ if not self._coordinator.poll(timeout_ms=timer.timeout_ms): + log.debug('poll: timeout during coordinator.poll(); returning early') return {} has_all_fetch_positions = self._update_fetch_positions(timeout_ms=timer.timeout_ms) @@ -706,13 +707,13 @@ def _poll_once(self, timer, max_records, update_offsets=True): # If data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) - log.debug('Fetched records: %s, %s', records, partial) + log.debug('poll: fetched records: %s, %s', records, partial) # Before returning the fetched records, we can send off the # next round of fetches and avoid block waiting for their # responses to enable pipelining while the user is handling the # fetched records. if not partial: - log.debug("Sending fetches") + log.debug("poll: Sending fetches") futures = self._fetcher.send_fetches() if len(futures): self._client.poll(timeout_ms=0) @@ -724,12 +725,14 @@ def _poll_once(self, timer, max_records, update_offsets=True): # since the offset lookup may be backing off after a failure poll_timeout_ms = min(timer.timeout_ms, self._coordinator.time_to_next_poll() * 1000) if not has_all_fetch_positions: + log.debug('poll: do not have all fetch positions...') poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms']) self._client.poll(timeout_ms=poll_timeout_ms) # after the long poll, we should check whether the group needs to rebalance # prior to returning data so that the group can stabilize faster if self._coordinator.need_rejoin(): + log.debug('poll: coordinator needs rejoin; returning early') return {} records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index ddd413b82..146626d0c 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -274,6 +274,7 @@ def poll(self, timeout_ms=None): try: self._invoke_completed_offset_commit_callbacks() if not self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms): + log.debug('coordinator.poll: timeout in ensure_coordinator_ready; returning early') return False if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned(): @@ -293,9 +294,11 @@ def poll(self, timeout_ms=None): metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update, timeout_ms=timer.timeout_ms) if not metadata_update.is_done: + log.debug('coordinator.poll: timeout updating metadata; returning early') return False if not self.ensure_active_group(timeout_ms=timer.timeout_ms): + log.debug('coordinator.poll: timeout in ensure_active_group; returning early') return False self.poll_heartbeat() @@ -722,6 +725,7 @@ def _send_offset_commit_request(self, offsets): return future def _handle_offset_commit_response(self, offsets, future, send_time, response): + log.debug("Received OffsetCommitResponse: %s", response) # TODO look at adding request_latency_ms to response (like java kafka) if self._consumer_sensors: self._consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) @@ -848,6 +852,7 @@ def _send_offset_fetch_request(self, partitions): return future def _handle_offset_fetch_response(self, future, response): + log.debug("Received OffsetFetchResponse: %s", response) if response.API_VERSION >= 2 and response.error_code != Errors.NoError.errno: error_type = Errors.for_code(response.error_code) log.debug("Offset fetch failed: %s", error_type.__name__)