From e5218d3ccc6e26f3320120c98d19a2a51aa9f120 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 7 Apr 2025 13:33:24 -0700 Subject: [PATCH] Rename Coordinator errors to generic not group --- kafka/admin/client.py | 4 ++-- kafka/coordinator/base.py | 22 +++++++++++----------- kafka/coordinator/consumer.py | 18 +++++++++--------- kafka/errors.py | 29 ++++++++++++----------------- test/test_admin_integration.py | 4 ++-- test/test_coordinator.py | 14 +++++++------- 6 files changed, 43 insertions(+), 48 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 392687be5..94de5a863 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -1460,9 +1460,9 @@ def list_consumer_groups(self, broker_ids=None): list: List of tuples of Consumer Groups. Raises: - GroupCoordinatorNotAvailableError: The coordinator is not + CoordinatorNotAvailableError: The coordinator is not available, so cannot process requests. - GroupLoadInProgressError: The coordinator is loading and + CoordinatorLoadInProgressError: The coordinator is loading and hence can't process requests. """ # While we return a list, internally use a set to prevent duplicates diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 0c238fde8..4f413c768 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -478,7 +478,7 @@ def _send_join_group_request(self): group leader """ if self.coordinator_unknown(): - e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) + e = Errors.CoordinatorNotAvailableError(self.coordinator_id) return Future().failure(e) elif not self._client.ready(self.coordinator_id, metadata_priority=False): @@ -555,7 +555,7 @@ def _handle_join_group_response(self, future, send_time, response): else: self._on_join_follower().chain(future) - elif error_type is Errors.GroupLoadInProgressError: + elif error_type is Errors.CoordinatorLoadInProgressError: log.debug("Attempt to join group %s rejected since coordinator %s" " is loading the group.", self.group_id, self.coordinator_id) # backoff and retry @@ -567,8 +567,8 @@ def _handle_join_group_response(self, future, send_time, response): log.debug("Attempt to join group %s failed due to unknown member id", self.group_id) future.failure(error) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + elif error_type in (Errors.CoordinatorNotAvailableError, + Errors.NotCoordinatorError): # re-discover the coordinator and retry with backoff self.coordinator_dead(error_type()) log.debug("Attempt to join group %s failed due to obsolete " @@ -636,7 +636,7 @@ def _on_join_leader(self, response): def _send_sync_group_request(self, request): if self.coordinator_unknown(): - e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) + e = Errors.CoordinatorNotAvailableError(self.coordinator_id) return Future().failure(e) # We assume that coordinator is ready if we're sending SyncGroup @@ -674,8 +674,8 @@ def _handle_sync_group_response(self, future, send_time, response): log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) self.reset_generation() future.failure(error) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + elif error_type in (Errors.CoordinatorNotAvailableError, + Errors.NotCoordinatorError): error = error_type() log.debug("SyncGroup for group %s failed due to %s", self.group_id, error) self.coordinator_dead(error) @@ -732,7 +732,7 @@ def _handle_group_coordinator_response(self, future, response): self.heartbeat.reset_timeouts() future.success(self.coordinator_id) - elif error_type is Errors.GroupCoordinatorNotAvailableError: + elif error_type is Errors.CoordinatorNotAvailableError: log.debug("Group Coordinator Not Available; retry") future.failure(error_type()) elif error_type is Errors.GroupAuthorizationFailedError: @@ -842,7 +842,7 @@ def _handle_leave_group_response(self, response): def _send_heartbeat_request(self): """Send a heartbeat request""" if self.coordinator_unknown(): - e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id) + e = Errors.CoordinatorNotAvailableError(self.coordinator_id) return Future().failure(e) elif not self._client.ready(self.coordinator_id, metadata_priority=False): @@ -869,8 +869,8 @@ def _handle_heartbeat_response(self, future, send_time, response): log.debug("Received successful heartbeat response for group %s", self.group_id) future.success(None) - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError): + elif error_type in (Errors.CoordinatorNotAvailableError, + Errors.NotCoordinatorError): log.warning("Heartbeat failed for group %s: coordinator (node %s)" " is either not started or not valid", self.group_id, self.coordinator()) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 4bc7ba9cb..2944c7ec7 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -590,7 +590,7 @@ def _send_offset_commit_request(self, offsets): node_id = self.coordinator() if node_id is None: - return Future().failure(Errors.GroupCoordinatorNotAvailableError) + return Future().failure(Errors.CoordinatorNotAvailableError) # create the offset commit request @@ -719,14 +719,14 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): " %s", self.group_id, tp, error_type.__name__) future.failure(error_type()) return - elif error_type is Errors.GroupLoadInProgressError: + elif error_type is Errors.CoordinatorLoadInProgressError: # just retry log.debug("OffsetCommit for group %s failed: %s", self.group_id, error_type.__name__) future.failure(error_type(self.group_id)) return - elif error_type in (Errors.GroupCoordinatorNotAvailableError, - Errors.NotCoordinatorForGroupError, + elif error_type in (Errors.CoordinatorNotAvailableError, + Errors.NotCoordinatorError, Errors.RequestTimedOutError): log.debug("OffsetCommit for group %s failed: %s", self.group_id, error_type.__name__) @@ -777,7 +777,7 @@ def _send_offset_fetch_request(self, partitions): node_id = self.coordinator() if node_id is None: - return Future().failure(Errors.GroupCoordinatorNotAvailableError) + return Future().failure(Errors.CoordinatorNotAvailableError) # Verify node is ready if not self._client.ready(node_id): @@ -812,10 +812,10 @@ def _handle_offset_fetch_response(self, future, response): error_type = Errors.for_code(response.error_code) log.debug("Offset fetch failed: %s", error_type.__name__) error = error_type() - if error_type is Errors.GroupLoadInProgressError: + if error_type is Errors.CoordinatorLoadInProgressError: # Retry future.failure(error) - elif error_type is Errors.NotCoordinatorForGroupError: + elif error_type is Errors.NotCoordinatorError: # re-discover the coordinator and retry self.coordinator_dead(error) future.failure(error) @@ -841,10 +841,10 @@ def _handle_offset_fetch_response(self, future, response): error = error_type() log.debug("Group %s failed to fetch offset for partition" " %s: %s", self.group_id, tp, error) - if error_type is Errors.GroupLoadInProgressError: + if error_type is Errors.CoordinatorLoadInProgressError: # just retry future.failure(error) - elif error_type is Errors.NotCoordinatorForGroupError: + elif error_type is Errors.NotCoordinatorError: # re-discover the coordinator and retry self.coordinator_dead(error) future.failure(error) diff --git a/kafka/errors.py b/kafka/errors.py index aaba89d39..76a93568e 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -218,33 +218,28 @@ class NetworkExceptionError(BrokerResponseError): invalid_metadata = True -class GroupLoadInProgressError(BrokerResponseError): +class CoordinatorLoadInProgressError(BrokerResponseError): errno = 14 - message = 'OFFSETS_LOAD_IN_PROGRESS' - description = ('The broker returns this error code for an offset fetch' - ' request if it is still loading offsets (after a leader' - ' change for that offsets topic partition), or in response' - ' to group membership requests (such as heartbeats) when' - ' group metadata is being loaded by the coordinator.') + message = 'COORDINATOR_LOAD_IN_PROGRESS' + description = ('The broker returns this error code for txn or group requests,' + ' when the coordinator is loading and hence cant process requests') retriable = True -class GroupCoordinatorNotAvailableError(BrokerResponseError): +class CoordinatorNotAvailableError(BrokerResponseError): errno = 15 - message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE' - description = ('The broker returns this error code for group coordinator' - ' requests, offset commits, and most group management' + message = 'COORDINATOR_NOT_AVAILABLE' + description = ('The broker returns this error code for consumer and transaction' ' requests if the offsets topic has not yet been created, or' - ' if the group coordinator is not active.') + ' if the group/txn coordinator is not active.') retriable = True -class NotCoordinatorForGroupError(BrokerResponseError): +class NotCoordinatorError(BrokerResponseError): errno = 16 - message = 'NOT_COORDINATOR_FOR_CONSUMER' - description = ('The broker returns this error code if it receives an offset' - ' fetch or commit request for a group that it is not a' - ' coordinator for.') + message = 'NOT_COORDINATOR' + description = ('The broker returns this error code if it is not the correct' + ' coordinator for the specified consumer or transaction group') retriable = True diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 83b6ccaf2..f95f367e8 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -9,7 +9,7 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) from kafka.errors import ( - BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError, + BrokerResponseError, KafkaError, NoError, CoordinatorNotAvailableError, NonEmptyGroupError, GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError) @@ -150,7 +150,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): def test_describe_consumer_group_does_not_exist(kafka_admin_client): """Tests that the describe consumer group call fails if the group coordinator is not available """ - with pytest.raises(GroupCoordinatorNotAvailableError): + with pytest.raises(CoordinatorNotAvailableError): kafka_admin_client.describe_consumer_groups(['test']) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 1d1a6df50..00a929399 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -444,7 +444,7 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): # No coordinator ret = patched_coord._send_offset_commit_request(offsets) assert ret.failed() - assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) + assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) @pytest.mark.parametrize('api_version,req_type', [ @@ -497,11 +497,11 @@ def test_send_offset_commit_request_success(mocker, patched_coord, offsets): (OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]), Errors.InvalidCommitOffsetSizeError, False), (OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]), - Errors.GroupLoadInProgressError, False), + Errors.CoordinatorLoadInProgressError, False), (OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]), - Errors.GroupCoordinatorNotAvailableError, True), + Errors.CoordinatorNotAvailableError, True), (OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]), - Errors.NotCoordinatorForGroupError, True), + Errors.NotCoordinatorError, True), (OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]), Errors.RequestTimedOutError, True), (OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]), @@ -557,7 +557,7 @@ def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions): # No coordinator ret = patched_coord._send_offset_fetch_request(partitions) assert ret.failed() - assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError) + assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) @pytest.mark.parametrize('api_version,req_type', [ @@ -606,9 +606,9 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): @pytest.mark.parametrize('response,error,dead', [ (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 14), (1, 234, '', 14)])]), - Errors.GroupLoadInProgressError, False), + Errors.CoordinatorLoadInProgressError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 16), (1, 234, '', 16)])]), - Errors.NotCoordinatorForGroupError, True), + Errors.NotCoordinatorError, True), (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 25), (1, 234, '', 25)])]), Errors.UnknownMemberIdError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, '', 22), (1, 234, '', 22)])]),