From 56510a9bb3e342f76820aaea1ece5121fefd88ef Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 21 Mar 2025 08:48:43 -0700 Subject: [PATCH 1/3] Move parse_header to Response class; set FLEXIBLE_VERSION on both request/response to support ApiVersions --- kafka/protocol/admin.py | 3 ++- kafka/protocol/api.py | 14 ++++++++------ kafka/protocol/parser.py | 13 +++++++------ 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 4ac3c18c8..255166801 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -737,7 +737,6 @@ class DescribeConfigsRequest_v2(Request): class DescribeLogDirsResponse_v0(Response): API_KEY = 35 API_VERSION = 0 - FLEXIBLE_VERSION = True SCHEMA = Schema( ('throttle_time_ms', Int32), ('log_dirs', Array( @@ -970,6 +969,7 @@ class AlterPartitionReassignmentsResponse_v0(Response): )), ("tags", TaggedFields) ) + FLEXIBLE_VERSION = True class AlterPartitionReassignmentsRequest_v0(Request): @@ -1017,6 +1017,7 @@ class ListPartitionReassignmentsResponse_v0(Response): )), ("tags", TaggedFields) ) + FLEXIBLE_VERSION = True class ListPartitionReassignmentsRequest_v0(Request): diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py index 714da4d1d..9cd5767c1 100644 --- a/kafka/protocol/api.py +++ b/kafka/protocol/api.py @@ -82,19 +82,15 @@ def expect_response(self): def to_object(self): return _to_object(self.SCHEMA, self) - def build_request_header(self, correlation_id, client_id): + def build_header(self, correlation_id, client_id): if self.FLEXIBLE_VERSION: return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id) return RequestHeader(self, correlation_id=correlation_id, client_id=client_id) - def parse_response_header(self, read_buffer): - if self.FLEXIBLE_VERSION: - return ResponseHeaderV2.decode(read_buffer) - return ResponseHeader.decode(read_buffer) - @add_metaclass(abc.ABCMeta) class Response(Struct): + FLEXIBLE_VERSION = False @abc.abstractproperty def API_KEY(self): @@ -114,6 +110,12 @@ def SCHEMA(self): def to_object(self): return _to_object(self.SCHEMA, self) + @classmethod + def parse_header(cls, read_buffer): + if cls.FLEXIBLE_VERSION: + return ResponseHeaderV2.decode(read_buffer) + return ResponseHeader.decode(read_buffer) + def _to_object(schema, data): obj = {} diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py index e7799fce6..4bc427330 100644 --- a/kafka/protocol/parser.py +++ b/kafka/protocol/parser.py @@ -59,7 +59,7 @@ def send_request(self, request, correlation_id=None): if correlation_id is None: correlation_id = self._next_correlation_id() - header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id) + header = request.build_header(correlation_id=correlation_id, client_id=self._client_id) message = b''.join([header.encode(), request.encode()]) size = Int32.encode(len(message)) data = size + message @@ -136,13 +136,14 @@ def _process_response(self, read_buffer): if not self.in_flight_requests: raise Errors.CorrelationIdError('No in-flight-request found for server response') (correlation_id, request) = self.in_flight_requests.popleft() - response_header = request.parse_response_header(read_buffer) + response_type = request.RESPONSE_TYPE + response_header = response_type.parse_header(read_buffer) recv_correlation_id = response_header.correlation_id log.debug('Received correlation id: %d', recv_correlation_id) # 0.8.2 quirk if (recv_correlation_id == 0 and correlation_id != 0 and - request.RESPONSE_TYPE is FindCoordinatorResponse[0] and + response_type is FindCoordinatorResponse[0] and (self._api_version == (0, 8, 2) or self._api_version is None)): log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' ' Correlation ID does not match request. This' @@ -156,15 +157,15 @@ def _process_response(self, read_buffer): % (correlation_id, recv_correlation_id)) # decode response - log.debug('Processing response %s', request.RESPONSE_TYPE.__name__) + log.debug('Processing response %s', response_type.__name__) try: - response = request.RESPONSE_TYPE.decode(read_buffer) + response = response_type.decode(read_buffer) except ValueError: read_buffer.seek(0) buf = read_buffer.read() log.error('Response %d [ResponseType: %s Request: %s]:' ' Unable to decode %d-byte buffer: %r', - correlation_id, request.RESPONSE_TYPE, + correlation_id, response_type, request, len(buf), buf) raise Errors.KafkaProtocolError('Unable to decode response') From a44348a554887834ed198f2c82be70c75e9295b9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 21 Mar 2025 08:49:08 -0700 Subject: [PATCH 2/3] Add proto defs for ApiVersions v3/v4 --- kafka/protocol/api_versions.py | 46 +++++++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/kafka/protocol/api_versions.py b/kafka/protocol/api_versions.py index 7e2e61251..e7cedd954 100644 --- a/kafka/protocol/api_versions.py +++ b/kafka/protocol/api_versions.py @@ -3,7 +3,7 @@ from io import BytesIO from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Int16, Int32, Schema +from kafka.protocol.types import Array, CompactArray, CompactString, Int16, Int32, Schema, TaggedFields class BaseApiVersionsResponse(Response): @@ -61,6 +61,28 @@ class ApiVersionsResponse_v2(BaseApiVersionsResponse): SCHEMA = ApiVersionsResponse_v1.SCHEMA +class ApiVersionsResponse_v3(BaseApiVersionsResponse): + API_KEY = 18 + API_VERSION = 3 + SCHEMA = Schema( + ('error_code', Int16), + ('api_versions', CompactArray( + ('api_key', Int16), + ('min_version', Int16), + ('max_version', Int16), + ('_tagged_fields', TaggedFields))), + ('throttle_time_ms', Int32), + ('_tagged_fields', TaggedFields) + ) + # Note: ApiVersions Response does not send FLEXIBLE_VERSION header! + + +class ApiVersionsResponse_v4(BaseApiVersionsResponse): + API_KEY = 18 + API_VERSION = 4 + SCHEMA = ApiVersionsResponse_v3.SCHEMA + + class ApiVersionsRequest_v0(Request): API_KEY = 18 API_VERSION = 0 @@ -82,9 +104,31 @@ class ApiVersionsRequest_v2(Request): SCHEMA = ApiVersionsRequest_v1.SCHEMA +class ApiVersionsRequest_v3(Request): + API_KEY = 18 + API_VERSION = 3 + RESPONSE_TYPE = ApiVersionsResponse_v3 + SCHEMA = Schema( + ('client_software_name', CompactString('utf-8')), + ('client_software_version', CompactString('utf-8')), + ('_tagged_fields', TaggedFields) + ) + FLEXIBLE_VERSION = True + + +class ApiVersionsRequest_v4(Request): + API_KEY = 18 + API_VERSION = 4 + RESPONSE_TYPE = ApiVersionsResponse_v4 + SCHEMA = ApiVersionsRequest_v3.SCHEMA + FLEXIBLE_VERSION = True + + ApiVersionsRequest = [ ApiVersionsRequest_v0, ApiVersionsRequest_v1, ApiVersionsRequest_v2, + ApiVersionsRequest_v3, ApiVersionsRequest_v4, ] ApiVersionsResponse = [ ApiVersionsResponse_v0, ApiVersionsResponse_v1, ApiVersionsResponse_v2, + ApiVersionsResponse_v3, ApiVersionsResponse_v4, ] From b2452a501628887d5fd8ebe15e9ca33dfcfbf868 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 21 Mar 2025 08:49:37 -0700 Subject: [PATCH 3/3] Try ApiVersions v4 on initial connect --- kafka/conn.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index c94154885..f6af172f4 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -101,6 +101,10 @@ class BrokerConnection(object): server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: 'kafka-python-{version}' + client_software_name (str): Sent to kafka broker for KIP-511. + Default: 'kafka-python' + client_software_version (str): Sent to kafka broker for KIP-511. + Default: The kafka-python version (via kafka.version). reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. @@ -191,6 +195,8 @@ class BrokerConnection(object): DEFAULT_CONFIG = { 'client_id': 'kafka-python-' + __version__, + 'client_software_name': 'kafka-python', + 'client_software_version': __version__, 'node_id': 0, 'request_timeout_ms': 30000, 'reconnect_backoff_ms': 50, @@ -242,7 +248,7 @@ def __init__(self, host, port, afi, **configs): self._api_versions = None self._api_version = None self._check_version_idx = None - self._api_versions_idx = 2 + self._api_versions_idx = 4 # version of ApiVersionsRequest to try on first connect self._throttle_time = None self._socks5_proxy = None @@ -538,7 +544,14 @@ def _try_api_versions_check(self): log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self._api_version) return True elif self._check_version_idx is None: - request = ApiVersionsRequest[self._api_versions_idx]() + version = self._api_versions_idx + if version >= 3: + request = ApiVersionsRequest[version]( + client_software_name=self.config['client_software_name'], + client_software_version=self.config['client_software_version'], + _tagged_fields={}) + else: + request = ApiVersionsRequest[version]() future = Future() response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8)) response.add_callback(self._handle_api_versions_response, future) @@ -573,11 +586,15 @@ def _try_api_versions_check(self): def _handle_api_versions_response(self, future, response): error_type = Errors.for_code(response.error_code) - # if error_type i UNSUPPORTED_VERSION: retry w/ latest version from response if error_type is not Errors.NoError: future.failure(error_type()) if error_type is Errors.UnsupportedVersionError: self._api_versions_idx -= 1 + for api_key, min_version, max_version, *rest in response.api_versions: + # If broker provides a lower max_version, skip to that + if api_key == response.API_KEY: + self._api_versions_idx = min(self._api_versions_idx, max_version) + break if self._api_versions_idx >= 0: self._api_versions_future = None self.state = ConnectionStates.API_VERSIONS_SEND @@ -587,7 +604,7 @@ def _handle_api_versions_response(self, future, response): return self._api_versions = dict([ (api_key, (min_version, max_version)) - for api_key, min_version, max_version in response.api_versions + for api_key, min_version, max_version, *rest in response.api_versions ]) self._api_version = self._infer_broker_version_from_api_versions(self._api_versions) log.info('Broker version identified as %s', '.'.join(map(str, self._api_version)))