diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 6c2596cc8..171304da0 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -21,7 +21,7 @@ from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, - DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest) + DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType) from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.find_coordinator import FindCoordinatorRequest from kafka.protocol.metadata import MetadataRequest @@ -393,27 +393,55 @@ def _send_request_to_controller(self, request): # So this is a little brittle in that it assumes all responses have # one of these attributes and that they always unpack into # (topic, error_code) tuples. - topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors') - else response.topic_error_codes) - # Also small py2/py3 compatibility -- py3 can ignore extra values - # during unpack via: for x, y, *rest in list_of_values. py2 cannot. - # So for now we have to map across the list and explicitly drop any - # extra values (usually the error_message) - for topic, error_code in map(lambda e: e[:2], topic_error_tuples): + topic_error_tuples = getattr(response, 'topic_errors', getattr(response, 'topic_error_codes', None)) + if topic_error_tuples is not None: + success = self._parse_topic_request_response(topic_error_tuples, request, response, tries) + else: + # Leader Election request has a two layer error response (topic and partition) + success = self._parse_topic_partition_request_response(request, response, tries) + + if success: + return response + raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered") + + def _parse_topic_request_response(self, topic_error_tuples, request, response, tries): + # Also small py2/py3 compatibility -- py3 can ignore extra values + # during unpack via: for x, y, *rest in list_of_values. py2 cannot. + # So for now we have to map across the list and explicitly drop any + # extra values (usually the error_message) + for topic, error_code in map(lambda e: e[:2], topic_error_tuples): + error_type = Errors.for_code(error_code) + if tries and error_type is NotControllerError: + # No need to inspect the rest of the errors for + # non-retriable errors because NotControllerError should + # either be thrown for all errors or no errors. + self._refresh_controller_id() + return False + elif error_type is not Errors.NoError: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + return True + + def _parse_topic_partition_request_response(self, request, response, tries): + # Also small py2/py3 compatibility -- py3 can ignore extra values + # during unpack via: for x, y, *rest in list_of_values. py2 cannot. + # So for now we have to map across the list and explicitly drop any + # extra values (usually the error_message) + for topic, partition_results in response.replication_election_results: + for partition_id, error_code in map(lambda e: e[:2], partition_results): error_type = Errors.for_code(error_code) if tries and error_type is NotControllerError: # No need to inspect the rest of the errors for # non-retriable errors because NotControllerError should # either be thrown for all errors or no errors. self._refresh_controller_id() - break - elif error_type is not Errors.NoError: + return False + elif error_type not in [Errors.NoError, Errors.ElectionNotNeeded]: raise error_type( "Request '{}' failed with response '{}'." .format(request, response)) - else: - return response - raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered") + return True @staticmethod def _convert_new_topic_request(new_topic): @@ -1651,6 +1679,51 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id): .format(version)) return self._send_request_to_node(group_coordinator_id, request) + @staticmethod + def _convert_topic_partitions(topic_partitions): + return [ + ( + topic, + partition_ids + ) + for topic, partition_ids in topic_partitions.items() + ] + + def _get_all_topic_partitions(self): + return [ + ( + topic, + [partition_info.partition for partition_info in self._client.cluster._partitions[topic].values()] + ) + for topic in self._client.cluster.topics() + ] + + def _get_topic_partitions(self, topic_partitions): + if topic_partitions is None: + return self._get_all_topic_partitions() + return self._convert_topic_partitions(topic_partitions) + + def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None): + """Perform leader election on the topic partitions. + + :param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean + :param topic_partitions: A map of topic name strings to partition ids list. + By default, will run on all topic partitions + :param timeout_ms: Milliseconds to wait for the leader election process to complete + before the broker returns. + + :return: Appropriate version of ElectLeadersResponse class. + """ + version = self._client.api_version(ElectLeadersRequest, max_version=1) + timeout_ms = self._validate_timeout(timeout_ms) + request = ElectLeadersRequest[version]( + election_type=ElectionType(election_type), + topic_partitions=self._get_topic_partitions(topic_partitions), + timeout=timeout_ms, + ) + # TODO convert structs to a more pythonic interface + return self._send_request_to_controller(request) + def _wait_for_futures(self, futures): """Block until all futures complete. If any fail, raise the encountered exception. diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 63604e576..4ac3c18c8 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -1,5 +1,12 @@ from __future__ import absolute_import +# enum in stdlib as of py3.4 +try: + from enum import IntEnum # pylint: disable=import-error +except ImportError: + # vendored backport module + from kafka.vendor.enum34 import IntEnum + from kafka.protocol.api import Request, Response from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, Float64, CompactString, CompactArray, TaggedFields @@ -1031,3 +1038,77 @@ class ListPartitionReassignmentsRequest_v0(Request): ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0] ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0] + + +class ElectLeadersResponse_v0(Response): + API_KEY = 43 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('replication_election_results', Array( + ('topic', String('utf-8')), + ('partition_result', Array( + ('partition_id', Int32), + ('error_code', Int16), + ('error_message', String('utf-8')) + )) + )) + ) + + +class ElectLeadersRequest_v0(Request): + API_KEY = 43 + API_VERSION = 1 + RESPONSE_TYPE = ElectLeadersResponse_v0 + SCHEMA = Schema( + ('election_type', Int8), + ('topic_partitions', Array( + ('topic', String('utf-8')), + ('partition_ids', Array(Int32)) + )), + ('timeout', Int32), + ) + + +class ElectLeadersResponse_v1(Response): + API_KEY = 43 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('replication_election_results', Array( + ('topic', String('utf-8')), + ('partition_result', Array( + ('partition_id', Int32), + ('error_code', Int16), + ('error_message', String('utf-8')) + )) + )) + ) + + +class ElectLeadersRequest_v1(Request): + API_KEY = 43 + API_VERSION = 1 + RESPONSE_TYPE = ElectLeadersResponse_v1 + SCHEMA = Schema( + ('election_type', Int8), + ('topic_partitions', Array( + ('topic', String('utf-8')), + ('partition_ids', Array(Int32)) + )), + ('timeout', Int32), + ) + + +class ElectionType(IntEnum): + """ Leader election type + """ + + PREFERRED = 0, + UNCLEAN = 1 + + +ElectLeadersRequest = [ElectLeadersRequest_v0, ElectLeadersRequest_v1] +ElectLeadersResponse = [ElectLeadersResponse_v0, ElectLeadersResponse_v1]