Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ class BrokerConnection(object):
server-side log entries that correspond to this client. Also
submitted to GroupCoordinator for logging with respect to
consumer group administration. Default: 'kafka-python-{version}'
client_software_name (str): Sent to kafka broker for KIP-511.
Default: 'kafka-python'
client_software_version (str): Sent to kafka broker for KIP-511.
Default: The kafka-python version (via kafka.version).
reconnect_backoff_ms (int): The amount of time in milliseconds to
wait before attempting to reconnect to a given host.
Default: 50.
Expand Down Expand Up @@ -191,6 +195,8 @@ class BrokerConnection(object):

DEFAULT_CONFIG = {
'client_id': 'kafka-python-' + __version__,
'client_software_name': 'kafka-python',
'client_software_version': __version__,
'node_id': 0,
'request_timeout_ms': 30000,
'reconnect_backoff_ms': 50,
Expand Down Expand Up @@ -242,7 +248,7 @@ def __init__(self, host, port, afi, **configs):
self._api_versions = None
self._api_version = None
self._check_version_idx = None
self._api_versions_idx = 2
self._api_versions_idx = 4 # version of ApiVersionsRequest to try on first connect
self._throttle_time = None
self._socks5_proxy = None

Expand Down Expand Up @@ -538,7 +544,14 @@ def _try_api_versions_check(self):
log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self._api_version)
return True
elif self._check_version_idx is None:
request = ApiVersionsRequest[self._api_versions_idx]()
version = self._api_versions_idx
if version >= 3:
request = ApiVersionsRequest[version](
client_software_name=self.config['client_software_name'],
client_software_version=self.config['client_software_version'],
_tagged_fields={})
else:
request = ApiVersionsRequest[version]()
future = Future()
response = self._send(request, blocking=True, request_timeout_ms=(self.config['api_version_auto_timeout_ms'] * 0.8))
response.add_callback(self._handle_api_versions_response, future)
Expand Down Expand Up @@ -573,11 +586,15 @@ def _try_api_versions_check(self):

def _handle_api_versions_response(self, future, response):
error_type = Errors.for_code(response.error_code)
# if error_type i UNSUPPORTED_VERSION: retry w/ latest version from response
if error_type is not Errors.NoError:
future.failure(error_type())
if error_type is Errors.UnsupportedVersionError:
self._api_versions_idx -= 1
for api_key, min_version, max_version, *rest in response.api_versions:
# If broker provides a lower max_version, skip to that
if api_key == response.API_KEY:
self._api_versions_idx = min(self._api_versions_idx, max_version)
break
if self._api_versions_idx >= 0:
self._api_versions_future = None
self.state = ConnectionStates.API_VERSIONS_SEND
Expand All @@ -587,7 +604,7 @@ def _handle_api_versions_response(self, future, response):
return
self._api_versions = dict([
(api_key, (min_version, max_version))
for api_key, min_version, max_version in response.api_versions
for api_key, min_version, max_version, *rest in response.api_versions
])
self._api_version = self._infer_broker_version_from_api_versions(self._api_versions)
log.info('Broker version identified as %s', '.'.join(map(str, self._api_version)))
Expand Down
3 changes: 2 additions & 1 deletion kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,6 @@ class DescribeConfigsRequest_v2(Request):
class DescribeLogDirsResponse_v0(Response):
API_KEY = 35
API_VERSION = 0
FLEXIBLE_VERSION = True
SCHEMA = Schema(
('throttle_time_ms', Int32),
('log_dirs', Array(
Expand Down Expand Up @@ -970,6 +969,7 @@ class AlterPartitionReassignmentsResponse_v0(Response):
)),
("tags", TaggedFields)
)
FLEXIBLE_VERSION = True


class AlterPartitionReassignmentsRequest_v0(Request):
Expand Down Expand Up @@ -1017,6 +1017,7 @@ class ListPartitionReassignmentsResponse_v0(Response):
)),
("tags", TaggedFields)
)
FLEXIBLE_VERSION = True


class ListPartitionReassignmentsRequest_v0(Request):
Expand Down
14 changes: 8 additions & 6 deletions kafka/protocol/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,15 @@ def expect_response(self):
def to_object(self):
return _to_object(self.SCHEMA, self)

def build_request_header(self, correlation_id, client_id):
def build_header(self, correlation_id, client_id):
if self.FLEXIBLE_VERSION:
return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)

def parse_response_header(self, read_buffer):
if self.FLEXIBLE_VERSION:
return ResponseHeaderV2.decode(read_buffer)
return ResponseHeader.decode(read_buffer)


@add_metaclass(abc.ABCMeta)
class Response(Struct):
FLEXIBLE_VERSION = False

@abc.abstractproperty
def API_KEY(self):
Expand All @@ -114,6 +110,12 @@ def SCHEMA(self):
def to_object(self):
return _to_object(self.SCHEMA, self)

@classmethod
def parse_header(cls, read_buffer):
if cls.FLEXIBLE_VERSION:
return ResponseHeaderV2.decode(read_buffer)
return ResponseHeader.decode(read_buffer)


def _to_object(schema, data):
obj = {}
Expand Down
46 changes: 45 additions & 1 deletion kafka/protocol/api_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from io import BytesIO

from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Int16, Int32, Schema
from kafka.protocol.types import Array, CompactArray, CompactString, Int16, Int32, Schema, TaggedFields


class BaseApiVersionsResponse(Response):
Expand Down Expand Up @@ -61,6 +61,28 @@ class ApiVersionsResponse_v2(BaseApiVersionsResponse):
SCHEMA = ApiVersionsResponse_v1.SCHEMA


class ApiVersionsResponse_v3(BaseApiVersionsResponse):
API_KEY = 18
API_VERSION = 3
SCHEMA = Schema(
('error_code', Int16),
('api_versions', CompactArray(
('api_key', Int16),
('min_version', Int16),
('max_version', Int16),
('_tagged_fields', TaggedFields))),
('throttle_time_ms', Int32),
('_tagged_fields', TaggedFields)
)
# Note: ApiVersions Response does not send FLEXIBLE_VERSION header!


class ApiVersionsResponse_v4(BaseApiVersionsResponse):
API_KEY = 18
API_VERSION = 4
SCHEMA = ApiVersionsResponse_v3.SCHEMA


class ApiVersionsRequest_v0(Request):
API_KEY = 18
API_VERSION = 0
Expand All @@ -82,9 +104,31 @@ class ApiVersionsRequest_v2(Request):
SCHEMA = ApiVersionsRequest_v1.SCHEMA


class ApiVersionsRequest_v3(Request):
API_KEY = 18
API_VERSION = 3
RESPONSE_TYPE = ApiVersionsResponse_v3
SCHEMA = Schema(
('client_software_name', CompactString('utf-8')),
('client_software_version', CompactString('utf-8')),
('_tagged_fields', TaggedFields)
)
FLEXIBLE_VERSION = True


class ApiVersionsRequest_v4(Request):
API_KEY = 18
API_VERSION = 4
RESPONSE_TYPE = ApiVersionsResponse_v4
SCHEMA = ApiVersionsRequest_v3.SCHEMA
FLEXIBLE_VERSION = True


ApiVersionsRequest = [
ApiVersionsRequest_v0, ApiVersionsRequest_v1, ApiVersionsRequest_v2,
ApiVersionsRequest_v3, ApiVersionsRequest_v4,
]
ApiVersionsResponse = [
ApiVersionsResponse_v0, ApiVersionsResponse_v1, ApiVersionsResponse_v2,
ApiVersionsResponse_v3, ApiVersionsResponse_v4,
]
13 changes: 7 additions & 6 deletions kafka/protocol/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def send_request(self, request, correlation_id=None):
if correlation_id is None:
correlation_id = self._next_correlation_id()

header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id)
header = request.build_header(correlation_id=correlation_id, client_id=self._client_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
data = size + message
Expand Down Expand Up @@ -136,13 +136,14 @@ def _process_response(self, read_buffer):
if not self.in_flight_requests:
raise Errors.CorrelationIdError('No in-flight-request found for server response')
(correlation_id, request) = self.in_flight_requests.popleft()
response_header = request.parse_response_header(read_buffer)
response_type = request.RESPONSE_TYPE
response_header = response_type.parse_header(read_buffer)
recv_correlation_id = response_header.correlation_id
log.debug('Received correlation id: %d', recv_correlation_id)
# 0.8.2 quirk
if (recv_correlation_id == 0 and
correlation_id != 0 and
request.RESPONSE_TYPE is FindCoordinatorResponse[0] and
response_type is FindCoordinatorResponse[0] and
(self._api_version == (0, 8, 2) or self._api_version is None)):
log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
' Correlation ID does not match request. This'
Expand All @@ -156,15 +157,15 @@ def _process_response(self, read_buffer):
% (correlation_id, recv_correlation_id))

# decode response
log.debug('Processing response %s', request.RESPONSE_TYPE.__name__)
log.debug('Processing response %s', response_type.__name__)
try:
response = request.RESPONSE_TYPE.decode(read_buffer)
response = response_type.decode(read_buffer)
except ValueError:
read_buffer.seek(0)
buf = read_buffer.read()
log.error('Response %d [ResponseType: %s Request: %s]:'
' Unable to decode %d-byte buffer: %r',
correlation_id, request.RESPONSE_TYPE,
correlation_id, response_type,
request, len(buf), buf)
raise Errors.KafkaProtocolError('Unable to decode response')

Expand Down
Loading