Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
push:
branches: ["master"]
pull_request:
branches: ["master"]
branches: ["master", "2.0"]

env:
FORCE_COLOR: "1" # Make tools pretty.
Expand Down
2 changes: 1 addition & 1 deletion kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _refresh_controller_id(self, timeout_ms=30000):
time.sleep(1)
continue
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(node_id=controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
controller_version = self._client.check_version(node_id=controller_id)
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
Expand Down
107 changes: 70 additions & 37 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ def __init__(self, **configs):

# Check Broker Version if not set explicitly
if self.config['api_version'] is None:
check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
self.config['api_version'] = self.check_version(timeout=check_timeout)
self.config['api_version'] = self.check_version()

def _init_wakeup_socketpair(self):
self._wake_r, self._wake_w = socket.socketpair()
Expand Down Expand Up @@ -364,14 +363,24 @@ def _should_recycle_connection(self, conn):

return False

def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
def _init_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id.

Returns True if connection object exists and is connected / connecting
"""
with self._lock:
conn = self._conns.get(node_id)

# Check if existing connection should be recreated because host/port changed
if conn is not None and self._should_recycle_connection(conn):
self._conns.pop(node_id).close()
conn = None

if conn is None:
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
if broker is None:
log.debug('Broker id %s not in current metadata', node_id)
return False

log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
Expand All @@ -383,16 +392,9 @@ def _maybe_connect(self, node_id):
**self.config)
self._conns[node_id] = conn

# Check if existing connection should be recreated because host/port changed
elif self._should_recycle_connection(conn):
self._conns.pop(node_id)
return False

elif conn.connected():
return True

conn.connect()
return conn.connected()
if conn.disconnected():
conn.connect()
return not conn.disconnected()

def ready(self, node_id, metadata_priority=True):
"""Check whether a node is connected and ok to send more requests.
Expand Down Expand Up @@ -576,12 +578,18 @@ def poll(self, timeout_ms=None, future=None):
if self._closed:
break

# Send a metadata request if needed (or initiate new connection)
metadata_timeout_ms = self._maybe_refresh_metadata()

# Attempt to complete pending connections
for node_id in list(self._connecting):
self._maybe_connect(node_id)
# False return means no more connection progress is possible
# Connected nodes will update _connecting via state_change callback
if not self._init_connect(node_id):
# It's possible that the connection attempt triggered a state change
# but if not, make sure to remove from _connecting list
if node_id in self._connecting:
self._connecting.remove(node_id)

# Send a metadata request if needed (or initiate new connection)
metadata_timeout_ms = self._maybe_refresh_metadata()

# If we got a future that is already done, don't block in _poll
if future is not None and future.is_done:
Expand Down Expand Up @@ -623,6 +631,11 @@ def _register_send_sockets(self):
self._selector.register(conn._sock, selectors.EVENT_WRITE, conn)

def _poll(self, timeout):
# Python throws OverflowError if timeout is > 2147483647 milliseconds
# (though the param to selector.select is in seconds)
# so convert any too-large timeout to blocking
if timeout > 2147483:
timeout = None
# This needs to be locked, but since it is only called from within the
# locked section of poll(), there is no additional lock acquisition here
processed = set()
Expand Down Expand Up @@ -843,6 +856,26 @@ def _maybe_refresh_metadata(self, wakeup=False):
log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms)
return next_connect_ms

if not self._can_send_request(node_id):
# If there's any connection establishment underway, wait until it completes. This prevents
# the client from unnecessarily connecting to additional nodes while a previous connection
# attempt has not been completed.
if self._connecting:
return float('inf')

elif self._can_connect(node_id):
log.debug("Initializing connection to node %s for metadata request", node_id)
self._connecting.add(node_id)
if not self._init_connect(node_id):
if node_id in self._connecting:
self._connecting.remove(node_id)
# Connection attempt failed immediately, need to retry with a different node
return self.config['reconnect_backoff_ms']
else:
# Existing connection with max in flight requests. Wait for request to complete.
return self.config['request_timeout_ms']

# Recheck node_id in case we were able to connect immediately above
if self._can_send_request(node_id):
topics = list(self._topics)
if not topics and self.cluster.is_bootstrap(node_id):
Expand All @@ -864,20 +897,11 @@ def refresh_done(val_or_error):
future.add_errback(refresh_done)
return self.config['request_timeout_ms']

# If there's any connection establishment underway, wait until it completes. This prevents
# the client from unnecessarily connecting to additional nodes while a previous connection
# attempt has not been completed.
# Should only get here if still connecting
if self._connecting:
return float('inf')

if self.maybe_connect(node_id, wakeup=wakeup):
log.debug("Initializing connection to node %s for metadata request", node_id)
return float('inf')

# connected but can't send more, OR connecting
# In either case we just need to wait for a network event
# to let us know the selected connection might be usable again.
return float('inf')
else:
return self.config['reconnect_backoff_ms']

def get_api_versions(self):
"""Return the ApiVersions map, if available.
Expand All @@ -890,13 +914,16 @@ def get_api_versions(self):
"""
return self._api_versions

def check_version(self, node_id=None, timeout=2, strict=False):
def check_version(self, node_id=None, timeout=None, strict=False):
"""Attempt to guess the version of a Kafka broker.

Note: It is possible that this method blocks longer than the
specified timeout. This can happen if the entire cluster
is down and the client enters a bootstrap backoff sleep.
This is only possible if node_id is None.
Keyword Arguments:
node_id (str, optional): Broker node id from cluster metadata. If None, attempts
to connect to any available broker until version is identified.
Default: None
timeout (num, optional): Maximum time in seconds to try to check broker version.
If unable to identify version before timeout, raise error (see below).
Default: api_version_auto_timeout_ms / 1000

Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...

Expand All @@ -906,6 +933,7 @@ def check_version(self, node_id=None, timeout=2, strict=False):
UnrecognizedBrokerVersion: please file bug if seen!
AssertionError (if strict=True): please file bug if seen!
"""
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
self._lock.acquire()
end = time.time() + timeout
while time.time() < end:
Expand All @@ -916,7 +944,12 @@ def check_version(self, node_id=None, timeout=2, strict=False):
if try_node is None:
self._lock.release()
raise Errors.NoBrokersAvailable()
self._maybe_connect(try_node)
if not self._init_connect(try_node):
if try_node == node_id:
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
else:
continue

conn = self._conns[try_node]

# We will intentionally cause socket failures
Expand Down
14 changes: 12 additions & 2 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,20 @@ def _unpack_message_set(self, tp, records):
batch = records.next_batch()
while batch is not None:

# LegacyRecordBatch cannot access either base_offset or last_offset_delta
# Try DefaultsRecordBatch / message log format v2
# base_offset, last_offset_delta, and control batches
try:
self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \
batch.last_offset_delta
# Control batches have a single record indicating whether a transaction
# was aborted or committed.
# When isolation_level is READ_COMMITTED (currently unsupported)
# we should also skip all messages from aborted transactions
# For now we only support READ_UNCOMMITTED and so we ignore the
# abort/commit signal.
if batch.is_control_batch:
batch = records.next_batch()
continue
except AttributeError:
pass

Expand Down Expand Up @@ -677,7 +687,7 @@ def _create_fetch_requests(self):
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
log.debug(
"Advance position for partition %s from %s to %s (last message batch location plus one)"
" to correct for deleted compacted messages",
" to correct for deleted compacted messages and/or transactional control records",
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
self._subscriptions.assignment[partition].position = next_offset_from_batch_header

Expand Down
5 changes: 4 additions & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ def __init__(self, client, subscription, metrics, **configs):

def __del__(self):
if hasattr(self, '_cluster') and self._cluster:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
try:
self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
except TypeError:
pass
super(ConsumerCoordinator, self).__del__()

def protocol_type(self):
Expand Down
47 changes: 45 additions & 2 deletions kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,12 @@ def _read_msg(
"payload, but instead read {}".format(length, pos - start_pos))
self._pos = pos

return DefaultRecord(
offset, timestamp, self.timestamp_type, key, value, headers)
if self.is_control_batch:
return ControlRecord(
offset, timestamp, self.timestamp_type, key, value, headers)
else:
return DefaultRecord(
offset, timestamp, self.timestamp_type, key, value, headers)

def __iter__(self):
self._maybe_uncompress()
Expand Down Expand Up @@ -362,6 +366,45 @@ def __repr__(self):
)


class ControlRecord(DefaultRecord):
__slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value",
"_headers", "_version", "_type")

KEY_STRUCT = struct.Struct(
">h" # Current Version => Int16
"h" # Type => Int16 (0 indicates an abort marker, 1 indicates a commit)
)

def __init__(self, offset, timestamp, timestamp_type, key, value, headers):
super(ControlRecord, self).__init__(offset, timestamp, timestamp_type, key, value, headers)
(self._version, self._type) = self.KEY_STRUCT.unpack(self._key)

# see https://kafka.apache.org/documentation/#controlbatch
@property
def version(self):
return self._version

@property
def type(self):
return self._type

@property
def abort(self):
return self._type == 0

@property
def commit(self):
return self._type == 1

def __repr__(self):
return (
"ControlRecord(offset={!r}, timestamp={!r}, timestamp_type={!r},"
" version={!r}, type={!r} <{!s}>)".format(
self._offset, self._timestamp, self._timestamp_type,
self._version, self._type, "abort" if self.abort else "commit")
)


class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder):

# excluding key, value and headers:
Expand Down
24 changes: 24 additions & 0 deletions test/record/test_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@
b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123'
]

# Single record control batch (abort)
control_batch_data_v2 = [
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00R\x00\x00\x00\x00'
b'\x02e\x97\xff\xd0\x00\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x98\x96\x7f\x00\x00\x00\x00\x00\x98\x96'
b'\x7f\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
b'\x00\x00\x00\x01@\x00\x00\x00\x08\x00\x00\x00\x00,opaque-control-message\x00'
]


def test_memory_records_v2():
data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4
Expand Down Expand Up @@ -230,3 +239,18 @@ def test_memory_records_builder_full(magic, compression_type):
key=None, timestamp=None, value=b"M")
assert metadata is None
assert builder.next_offset() == 1


def test_control_record_v2():
data_bytes = b"".join(control_batch_data_v2)
records = MemoryRecords(data_bytes)

assert records.has_next() is True
batch = records.next_batch()
assert batch.is_control_batch is True
recs = list(batch)
assert len(recs) == 1
assert recs[0].version == 0
assert recs[0].type == 0
assert recs[0].abort is True
assert recs[0].commit is False
Loading