diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index bc0724e4a..05deae755 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -6,7 +6,7 @@ on: push: branches: ["master"] pull_request: - branches: ["master"] + branches: ["master", "2.0"] env: FORCE_COLOR: "1" # Make tools pretty. diff --git a/kafka/admin/client.py b/kafka/admin/client.py index c9e51e5c9..c39bd94b0 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -294,7 +294,7 @@ def _refresh_controller_id(self, timeout_ms=30000): time.sleep(1) continue # verify the controller is new enough to support our requests - controller_version = self._client.check_version(node_id=controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) + controller_version = self._client.check_version(node_id=controller_id) if controller_version < (0, 10, 0): raise IncompatibleBrokerVersion( "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." diff --git a/kafka/client_async.py b/kafka/client_async.py index 67014488f..7d7b99401 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -237,8 +237,7 @@ def __init__(self, **configs): # Check Broker Version if not set explicitly if self.config['api_version'] is None: - check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 - self.config['api_version'] = self.check_version(timeout=check_timeout) + self.config['api_version'] = self.check_version() def _init_wakeup_socketpair(self): self._wake_r, self._wake_w = socket.socketpair() @@ -364,14 +363,24 @@ def _should_recycle_connection(self, conn): return False - def _maybe_connect(self, node_id): - """Idempotent non-blocking connection attempt to the given node id.""" + def _init_connect(self, node_id): + """Idempotent non-blocking connection attempt to the given node id. + + Returns True if connection object exists and is connected / connecting + """ with self._lock: conn = self._conns.get(node_id) + # Check if existing connection should be recreated because host/port changed + if conn is not None and self._should_recycle_connection(conn): + self._conns.pop(node_id).close() + conn = None + if conn is None: broker = self.cluster.broker_metadata(node_id) - assert broker, 'Broker id %s not in current metadata' % (node_id,) + if broker is None: + log.debug('Broker id %s not in current metadata', node_id) + return False log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) @@ -383,16 +392,9 @@ def _maybe_connect(self, node_id): **self.config) self._conns[node_id] = conn - # Check if existing connection should be recreated because host/port changed - elif self._should_recycle_connection(conn): - self._conns.pop(node_id) - return False - - elif conn.connected(): - return True - - conn.connect() - return conn.connected() + if conn.disconnected(): + conn.connect() + return not conn.disconnected() def ready(self, node_id, metadata_priority=True): """Check whether a node is connected and ok to send more requests. @@ -576,12 +578,18 @@ def poll(self, timeout_ms=None, future=None): if self._closed: break - # Send a metadata request if needed (or initiate new connection) - metadata_timeout_ms = self._maybe_refresh_metadata() - # Attempt to complete pending connections for node_id in list(self._connecting): - self._maybe_connect(node_id) + # False return means no more connection progress is possible + # Connected nodes will update _connecting via state_change callback + if not self._init_connect(node_id): + # It's possible that the connection attempt triggered a state change + # but if not, make sure to remove from _connecting list + if node_id in self._connecting: + self._connecting.remove(node_id) + + # Send a metadata request if needed (or initiate new connection) + metadata_timeout_ms = self._maybe_refresh_metadata() # If we got a future that is already done, don't block in _poll if future is not None and future.is_done: @@ -623,6 +631,11 @@ def _register_send_sockets(self): self._selector.register(conn._sock, selectors.EVENT_WRITE, conn) def _poll(self, timeout): + # Python throws OverflowError if timeout is > 2147483647 milliseconds + # (though the param to selector.select is in seconds) + # so convert any too-large timeout to blocking + if timeout > 2147483: + timeout = None # This needs to be locked, but since it is only called from within the # locked section of poll(), there is no additional lock acquisition here processed = set() @@ -843,6 +856,26 @@ def _maybe_refresh_metadata(self, wakeup=False): log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms) return next_connect_ms + if not self._can_send_request(node_id): + # If there's any connection establishment underway, wait until it completes. This prevents + # the client from unnecessarily connecting to additional nodes while a previous connection + # attempt has not been completed. + if self._connecting: + return float('inf') + + elif self._can_connect(node_id): + log.debug("Initializing connection to node %s for metadata request", node_id) + self._connecting.add(node_id) + if not self._init_connect(node_id): + if node_id in self._connecting: + self._connecting.remove(node_id) + # Connection attempt failed immediately, need to retry with a different node + return self.config['reconnect_backoff_ms'] + else: + # Existing connection with max in flight requests. Wait for request to complete. + return self.config['request_timeout_ms'] + + # Recheck node_id in case we were able to connect immediately above if self._can_send_request(node_id): topics = list(self._topics) if not topics and self.cluster.is_bootstrap(node_id): @@ -864,20 +897,11 @@ def refresh_done(val_or_error): future.add_errback(refresh_done) return self.config['request_timeout_ms'] - # If there's any connection establishment underway, wait until it completes. This prevents - # the client from unnecessarily connecting to additional nodes while a previous connection - # attempt has not been completed. + # Should only get here if still connecting if self._connecting: return float('inf') - - if self.maybe_connect(node_id, wakeup=wakeup): - log.debug("Initializing connection to node %s for metadata request", node_id) - return float('inf') - - # connected but can't send more, OR connecting - # In either case we just need to wait for a network event - # to let us know the selected connection might be usable again. - return float('inf') + else: + return self.config['reconnect_backoff_ms'] def get_api_versions(self): """Return the ApiVersions map, if available. @@ -890,13 +914,16 @@ def get_api_versions(self): """ return self._api_versions - def check_version(self, node_id=None, timeout=2, strict=False): + def check_version(self, node_id=None, timeout=None, strict=False): """Attempt to guess the version of a Kafka broker. - Note: It is possible that this method blocks longer than the - specified timeout. This can happen if the entire cluster - is down and the client enters a bootstrap backoff sleep. - This is only possible if node_id is None. + Keyword Arguments: + node_id (str, optional): Broker node id from cluster metadata. If None, attempts + to connect to any available broker until version is identified. + Default: None + timeout (num, optional): Maximum time in seconds to try to check broker version. + If unable to identify version before timeout, raise error (see below). + Default: api_version_auto_timeout_ms / 1000 Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ... @@ -906,6 +933,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): UnrecognizedBrokerVersion: please file bug if seen! AssertionError (if strict=True): please file bug if seen! """ + timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000) self._lock.acquire() end = time.time() + timeout while time.time() < end: @@ -916,7 +944,12 @@ def check_version(self, node_id=None, timeout=2, strict=False): if try_node is None: self._lock.release() raise Errors.NoBrokersAvailable() - self._maybe_connect(try_node) + if not self._init_connect(try_node): + if try_node == node_id: + raise Errors.NodeNotReadyError("Connection failed to %s" % node_id) + else: + continue + conn = self._conns[try_node] # We will intentionally cause socket failures diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 0b5df4e9a..8d722ec6e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -457,10 +457,20 @@ def _unpack_message_set(self, tp, records): batch = records.next_batch() while batch is not None: - # LegacyRecordBatch cannot access either base_offset or last_offset_delta + # Try DefaultsRecordBatch / message log format v2 + # base_offset, last_offset_delta, and control batches try: self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \ batch.last_offset_delta + # Control batches have a single record indicating whether a transaction + # was aborted or committed. + # When isolation_level is READ_COMMITTED (currently unsupported) + # we should also skip all messages from aborted transactions + # For now we only support READ_UNCOMMITTED and so we ignore the + # abort/commit signal. + if batch.is_control_batch: + batch = records.next_batch() + continue except AttributeError: pass @@ -677,7 +687,7 @@ def _create_fetch_requests(self): if next_offset_from_batch_header > self._subscriptions.assignment[partition].position: log.debug( "Advance position for partition %s from %s to %s (last message batch location plus one)" - " to correct for deleted compacted messages", + " to correct for deleted compacted messages and/or transactional control records", partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header) self._subscriptions.assignment[partition].position = next_offset_from_batch_header diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 971f5e802..a30ff82e3 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -128,7 +128,10 @@ def __init__(self, client, subscription, metrics, **configs): def __del__(self): if hasattr(self, '_cluster') and self._cluster: - self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) + try: + self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) + except TypeError: + pass super(ConsumerCoordinator, self).__del__() def protocol_type(self): diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index a098c42a9..b3a6fd082 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -269,8 +269,12 @@ def _read_msg( "payload, but instead read {}".format(length, pos - start_pos)) self._pos = pos - return DefaultRecord( - offset, timestamp, self.timestamp_type, key, value, headers) + if self.is_control_batch: + return ControlRecord( + offset, timestamp, self.timestamp_type, key, value, headers) + else: + return DefaultRecord( + offset, timestamp, self.timestamp_type, key, value, headers) def __iter__(self): self._maybe_uncompress() @@ -362,6 +366,45 @@ def __repr__(self): ) +class ControlRecord(DefaultRecord): + __slots__ = ("_offset", "_timestamp", "_timestamp_type", "_key", "_value", + "_headers", "_version", "_type") + + KEY_STRUCT = struct.Struct( + ">h" # Current Version => Int16 + "h" # Type => Int16 (0 indicates an abort marker, 1 indicates a commit) + ) + + def __init__(self, offset, timestamp, timestamp_type, key, value, headers): + super(ControlRecord, self).__init__(offset, timestamp, timestamp_type, key, value, headers) + (self._version, self._type) = self.KEY_STRUCT.unpack(self._key) + + # see https://kafka.apache.org/documentation/#controlbatch + @property + def version(self): + return self._version + + @property + def type(self): + return self._type + + @property + def abort(self): + return self._type == 0 + + @property + def commit(self): + return self._type == 1 + + def __repr__(self): + return ( + "ControlRecord(offset={!r}, timestamp={!r}, timestamp_type={!r}," + " version={!r}, type={!r} <{!s}>)".format( + self._offset, self._timestamp, self._timestamp_type, + self._version, self._type, "abort" if self.abort else "commit") + ) + + class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder): # excluding key, value and headers: diff --git a/test/record/test_records.py b/test/record/test_records.py index 5ed22d816..cab95922d 100644 --- a/test/record/test_records.py +++ b/test/record/test_records.py @@ -60,6 +60,15 @@ b'\x00\xff\xff\xff\xff\x00\x00\x00\x03123' ] +# Single record control batch (abort) +control_batch_data_v2 = [ + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00R\x00\x00\x00\x00' + b'\x02e\x97\xff\xd0\x00\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x98\x96\x7f\x00\x00\x00\x00\x00\x98\x96' + b'\x7f\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' + b'\x00\x00\x00\x01@\x00\x00\x00\x08\x00\x00\x00\x00,opaque-control-message\x00' +] + def test_memory_records_v2(): data_bytes = b"".join(record_batch_data_v2) + b"\x00" * 4 @@ -230,3 +239,18 @@ def test_memory_records_builder_full(magic, compression_type): key=None, timestamp=None, value=b"M") assert metadata is None assert builder.next_offset() == 1 + + +def test_control_record_v2(): + data_bytes = b"".join(control_batch_data_v2) + records = MemoryRecords(data_bytes) + + assert records.has_next() is True + batch = records.next_batch() + assert batch.is_control_batch is True + recs = list(batch) + assert len(recs) == 1 + assert recs[0].version == 0 + assert recs[0].type == 0 + assert recs[0].abort is True + assert recs[0].commit is False diff --git a/test/test_client_async.py b/test/test_client_async.py index ec5e2c0ae..1d8bfae24 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -58,7 +58,7 @@ def test_can_connect(cli, conn): assert cli._can_connect(0) # Node is connected, can't reconnect - assert cli._maybe_connect(0) is True + assert cli._init_connect(0) is True assert not cli._can_connect(0) # Node is disconnected, can connect @@ -70,20 +70,15 @@ def test_can_connect(cli, conn): assert not cli._can_connect(0) -def test_maybe_connect(cli, conn): - try: - # Node not in metadata, raises AssertionError - cli._maybe_connect(2) - except AssertionError: - pass - else: - assert False, 'Exception not raised' +def test_init_connect(cli, conn): + # Node not in metadata, return False + assert not cli._init_connect(2) # New node_id creates a conn object assert 0 not in cli._conns conn.state = ConnectionStates.DISCONNECTED conn.connect.side_effect = lambda: conn._set_conn_state(ConnectionStates.CONNECTING) - assert cli._maybe_connect(0) is False + assert cli._init_connect(0) is True assert cli._conns[0] is conn @@ -127,8 +122,8 @@ def test_ready(mocker, cli, conn): def test_is_ready(mocker, cli, conn): - cli._maybe_connect(0) - cli._maybe_connect(1) + cli._init_connect(0) + cli._init_connect(1) # metadata refresh blocks ready nodes assert cli.is_ready(0) @@ -171,14 +166,14 @@ def test_close(mocker, cli, conn): assert conn.close.call_count == call_count # Single node close - cli._maybe_connect(0) + cli._init_connect(0) assert conn.close.call_count == call_count cli.close(0) call_count += 1 assert conn.close.call_count == call_count # All node close - cli._maybe_connect(1) + cli._init_connect(1) cli.close() # +2 close: node 1, node bootstrap (node 0 already closed) call_count += 2 @@ -190,7 +185,7 @@ def test_is_disconnected(cli, conn): conn.state = ConnectionStates.DISCONNECTED assert not cli.is_disconnected(0) - cli._maybe_connect(0) + cli._init_connect(0) assert cli.is_disconnected(0) conn.state = ConnectionStates.CONNECTING @@ -215,7 +210,7 @@ def test_send(cli, conn): assert isinstance(f.exception, Errors.NodeNotReadyError) conn.state = ConnectionStates.CONNECTED - cli._maybe_connect(0) + cli._init_connect(0) # ProduceRequest w/ 0 required_acks -> no response request = ProduceRequest[0](0, 0, []) assert request.expect_response() is False @@ -344,8 +339,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_send_request', return_value=False) mocker.patch.object(client, '_can_connect', return_value=True) - mocker.patch.object(client, '_maybe_connect', return_value=True) - mocker.patch.object(client, 'maybe_connect', return_value=True) + mocker.patch.object(client, '_init_connect', return_value=True) now = time.time() t = mocker.patch('time.time') @@ -354,7 +348,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): # first poll attempts connection client.poll(timeout_ms=12345678) client._poll.assert_called_with(12345.678) - client.maybe_connect.assert_called_once_with('foobar', wakeup=False) + client._init_connect.assert_called_once_with('foobar') # poll while connecting should not attempt a new connection client._connecting.add('foobar') diff --git a/test/test_conn.py b/test/test_conn.py index 3afa9422d..fb4172814 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -90,7 +90,7 @@ def test_connection_delay(conn, mocker): conn.state = ConnectionStates.CONNECTED assert conn.connection_delay() == float('inf') - conn._gai.clear() + del conn._gai[:] conn._update_reconnect_backoff() conn.state = ConnectionStates.DISCONNECTED assert conn.connection_delay() == 1.0 * conn.config['reconnect_backoff_ms']