Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1460,9 +1460,9 @@ def list_consumer_groups(self, broker_ids=None):
list: List of tuples of Consumer Groups.

Raises:
GroupCoordinatorNotAvailableError: The coordinator is not
CoordinatorNotAvailableError: The coordinator is not
available, so cannot process requests.
GroupLoadInProgressError: The coordinator is loading and
CoordinatorLoadInProgressError: The coordinator is loading and
hence can't process requests.
"""
# While we return a list, internally use a set to prevent duplicates
Expand Down
22 changes: 11 additions & 11 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def _send_join_group_request(self):
group leader
"""
if self.coordinator_unknown():
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
e = Errors.CoordinatorNotAvailableError(self.coordinator_id)
return Future().failure(e)

elif not self._client.ready(self.coordinator_id, metadata_priority=False):
Expand Down Expand Up @@ -555,7 +555,7 @@ def _handle_join_group_response(self, future, send_time, response):
else:
self._on_join_follower().chain(future)

elif error_type is Errors.GroupLoadInProgressError:
elif error_type is Errors.CoordinatorLoadInProgressError:
log.debug("Attempt to join group %s rejected since coordinator %s"
" is loading the group.", self.group_id, self.coordinator_id)
# backoff and retry
Expand All @@ -567,8 +567,8 @@ def _handle_join_group_response(self, future, send_time, response):
log.debug("Attempt to join group %s failed due to unknown member id",
self.group_id)
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
elif error_type in (Errors.CoordinatorNotAvailableError,
Errors.NotCoordinatorError):
# re-discover the coordinator and retry with backoff
self.coordinator_dead(error_type())
log.debug("Attempt to join group %s failed due to obsolete "
Expand Down Expand Up @@ -636,7 +636,7 @@ def _on_join_leader(self, response):

def _send_sync_group_request(self, request):
if self.coordinator_unknown():
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
e = Errors.CoordinatorNotAvailableError(self.coordinator_id)
return Future().failure(e)

# We assume that coordinator is ready if we're sending SyncGroup
Expand Down Expand Up @@ -674,8 +674,8 @@ def _handle_sync_group_response(self, future, send_time, response):
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
self.reset_generation()
future.failure(error)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
elif error_type in (Errors.CoordinatorNotAvailableError,
Errors.NotCoordinatorError):
error = error_type()
log.debug("SyncGroup for group %s failed due to %s", self.group_id, error)
self.coordinator_dead(error)
Expand Down Expand Up @@ -732,7 +732,7 @@ def _handle_group_coordinator_response(self, future, response):
self.heartbeat.reset_timeouts()
future.success(self.coordinator_id)

elif error_type is Errors.GroupCoordinatorNotAvailableError:
elif error_type is Errors.CoordinatorNotAvailableError:
log.debug("Group Coordinator Not Available; retry")
future.failure(error_type())
elif error_type is Errors.GroupAuthorizationFailedError:
Expand Down Expand Up @@ -842,7 +842,7 @@ def _handle_leave_group_response(self, response):
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
if self.coordinator_unknown():
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
e = Errors.CoordinatorNotAvailableError(self.coordinator_id)
return Future().failure(e)

elif not self._client.ready(self.coordinator_id, metadata_priority=False):
Expand All @@ -869,8 +869,8 @@ def _handle_heartbeat_response(self, future, send_time, response):
log.debug("Received successful heartbeat response for group %s",
self.group_id)
future.success(None)
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError):
elif error_type in (Errors.CoordinatorNotAvailableError,
Errors.NotCoordinatorError):
log.warning("Heartbeat failed for group %s: coordinator (node %s)"
" is either not started or not valid", self.group_id,
self.coordinator())
Expand Down
18 changes: 9 additions & 9 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ def _send_offset_commit_request(self, offsets):

node_id = self.coordinator()
if node_id is None:
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
return Future().failure(Errors.CoordinatorNotAvailableError)


# create the offset commit request
Expand Down Expand Up @@ -719,14 +719,14 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response):
" %s", self.group_id, tp, error_type.__name__)
future.failure(error_type())
return
elif error_type is Errors.GroupLoadInProgressError:
elif error_type is Errors.CoordinatorLoadInProgressError:
# just retry
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error_type.__name__)
future.failure(error_type(self.group_id))
return
elif error_type in (Errors.GroupCoordinatorNotAvailableError,
Errors.NotCoordinatorForGroupError,
elif error_type in (Errors.CoordinatorNotAvailableError,
Errors.NotCoordinatorError,
Errors.RequestTimedOutError):
log.debug("OffsetCommit for group %s failed: %s",
self.group_id, error_type.__name__)
Expand Down Expand Up @@ -777,7 +777,7 @@ def _send_offset_fetch_request(self, partitions):

node_id = self.coordinator()
if node_id is None:
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
return Future().failure(Errors.CoordinatorNotAvailableError)

# Verify node is ready
if not self._client.ready(node_id):
Expand Down Expand Up @@ -812,10 +812,10 @@ def _handle_offset_fetch_response(self, future, response):
error_type = Errors.for_code(response.error_code)
log.debug("Offset fetch failed: %s", error_type.__name__)
error = error_type()
if error_type is Errors.GroupLoadInProgressError:
if error_type is Errors.CoordinatorLoadInProgressError:
# Retry
future.failure(error)
elif error_type is Errors.NotCoordinatorForGroupError:
elif error_type is Errors.NotCoordinatorError:
# re-discover the coordinator and retry
self.coordinator_dead(error)
future.failure(error)
Expand All @@ -841,10 +841,10 @@ def _handle_offset_fetch_response(self, future, response):
error = error_type()
log.debug("Group %s failed to fetch offset for partition"
" %s: %s", self.group_id, tp, error)
if error_type is Errors.GroupLoadInProgressError:
if error_type is Errors.CoordinatorLoadInProgressError:
# just retry
future.failure(error)
elif error_type is Errors.NotCoordinatorForGroupError:
elif error_type is Errors.NotCoordinatorError:
# re-discover the coordinator and retry
self.coordinator_dead(error)
future.failure(error)
Expand Down
29 changes: 12 additions & 17 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,33 +218,28 @@ class NetworkExceptionError(BrokerResponseError):
invalid_metadata = True


class GroupLoadInProgressError(BrokerResponseError):
class CoordinatorLoadInProgressError(BrokerResponseError):
errno = 14
message = 'OFFSETS_LOAD_IN_PROGRESS'
description = ('The broker returns this error code for an offset fetch'
' request if it is still loading offsets (after a leader'
' change for that offsets topic partition), or in response'
' to group membership requests (such as heartbeats) when'
' group metadata is being loaded by the coordinator.')
message = 'COORDINATOR_LOAD_IN_PROGRESS'
description = ('The broker returns this error code for txn or group requests,'
' when the coordinator is loading and hence cant process requests')
retriable = True


class GroupCoordinatorNotAvailableError(BrokerResponseError):
class CoordinatorNotAvailableError(BrokerResponseError):
errno = 15
message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE'
description = ('The broker returns this error code for group coordinator'
' requests, offset commits, and most group management'
message = 'COORDINATOR_NOT_AVAILABLE'
description = ('The broker returns this error code for consumer and transaction'
' requests if the offsets topic has not yet been created, or'
' if the group coordinator is not active.')
' if the group/txn coordinator is not active.')
retriable = True


class NotCoordinatorForGroupError(BrokerResponseError):
class NotCoordinatorError(BrokerResponseError):
errno = 16
message = 'NOT_COORDINATOR_FOR_CONSUMER'
description = ('The broker returns this error code if it receives an offset'
' fetch or commit request for a group that it is not a'
' coordinator for.')
message = 'NOT_COORDINATOR'
description = ('The broker returns this error code if it is not the correct'
' coordinator for the specified consumer or transaction group')
retriable = True


Expand Down
4 changes: 2 additions & 2 deletions test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
from kafka.errors import (
BrokerResponseError, KafkaError, NoError, GroupCoordinatorNotAvailableError, NonEmptyGroupError,
BrokerResponseError, KafkaError, NoError, CoordinatorNotAvailableError, NonEmptyGroupError,
GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError)


Expand Down Expand Up @@ -150,7 +150,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
"""Tests that the describe consumer group call fails if the group coordinator is not available
"""
with pytest.raises(GroupCoordinatorNotAvailableError):
with pytest.raises(CoordinatorNotAvailableError):
kafka_admin_client.describe_consumer_groups(['test'])


Expand Down
14 changes: 7 additions & 7 deletions test/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets):
# No coordinator
ret = patched_coord._send_offset_commit_request(offsets)
assert ret.failed()
assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError)


@pytest.mark.parametrize('api_version,req_type', [
Expand Down Expand Up @@ -497,11 +497,11 @@ def test_send_offset_commit_request_success(mocker, patched_coord, offsets):
(OffsetCommitResponse[0]([('foobar', [(0, 28), (1, 28)])]),
Errors.InvalidCommitOffsetSizeError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 14), (1, 14)])]),
Errors.GroupLoadInProgressError, False),
Errors.CoordinatorLoadInProgressError, False),
(OffsetCommitResponse[0]([('foobar', [(0, 15), (1, 15)])]),
Errors.GroupCoordinatorNotAvailableError, True),
Errors.CoordinatorNotAvailableError, True),
(OffsetCommitResponse[0]([('foobar', [(0, 16), (1, 16)])]),
Errors.NotCoordinatorForGroupError, True),
Errors.NotCoordinatorError, True),
(OffsetCommitResponse[0]([('foobar', [(0, 7), (1, 7)])]),
Errors.RequestTimedOutError, True),
(OffsetCommitResponse[0]([('foobar', [(0, 25), (1, 25)])]),
Expand Down Expand Up @@ -557,7 +557,7 @@ def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions):
# No coordinator
ret = patched_coord._send_offset_fetch_request(partitions)
assert ret.failed()
assert isinstance(ret.exception, Errors.GroupCoordinatorNotAvailableError)
assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError)


@pytest.mark.parametrize('api_version,req_type', [
Expand Down Expand Up @@ -606,9 +606,9 @@ def test_send_offset_fetch_request_success(patched_coord, partitions):

@pytest.mark.parametrize('response,error,dead', [
(OffsetFetchResponse[0]([('foobar', [(0, 123, '', 14), (1, 234, '', 14)])]),
Errors.GroupLoadInProgressError, False),
Errors.CoordinatorLoadInProgressError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, '', 16), (1, 234, '', 16)])]),
Errors.NotCoordinatorForGroupError, True),
Errors.NotCoordinatorError, True),
(OffsetFetchResponse[0]([('foobar', [(0, 123, '', 25), (1, 234, '', 25)])]),
Errors.UnknownMemberIdError, False),
(OffsetFetchResponse[0]([('foobar', [(0, 123, '', 22), (1, 234, '', 22)])]),
Expand Down
Loading