From 5e982b2420fb89d79a07a5154d45904aa433ad38 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 09:40:49 -0700 Subject: [PATCH 1/8] splat destructuring in py3 only --- kafka/conn.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index ec516b0f4..1febb479a 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -594,7 +594,8 @@ def _handle_api_versions_response(self, future, response): future.failure(error_type()) if error_type is Errors.UnsupportedVersionError: self._api_versions_idx -= 1 - for api_key, min_version, max_version, *rest in response.api_versions: + for api_version_data in response.api_versions: + api_key, min_version, max_version = api_version_data[:3] # If broker provides a lower max_version, skip to that if api_key == response.API_KEY: self._api_versions_idx = min(self._api_versions_idx, max_version) @@ -607,8 +608,8 @@ def _handle_api_versions_response(self, future, response): self.close(error=error_type()) return self._api_versions = dict([ - (api_key, (min_version, max_version)) - for api_key, min_version, max_version, *rest in response.api_versions + (api_version_data[0], (api_version_data[1], api_version_data[2])) + for api_version_data in response.api_versions ]) self._api_version = self._infer_broker_version_from_api_versions(self._api_versions) log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version))) From 2436786d6c8672483aeb666c3d58468a0e17c313 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 09:41:46 -0700 Subject: [PATCH 2/8] __bool__ for py3, __nonzero__ for py2 --- kafka/consumer/fetcher.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 61480fb07..29c2a7182 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -867,6 +867,9 @@ def _maybe_skip_record(self, record): def __bool__(self): return self.record_iterator is not None + # py2 + __nonzero__ = __bool__ + def drain(self): if self.record_iterator is not None: self.record_iterator = None From 53afdf6fc8a4f9310fa8d99632808a4de9ae90ec Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 09:42:08 -0700 Subject: [PATCH 3/8] OrderedDict.move_to_end py3 only --- kafka/consumer/subscription_state.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index d3b791a44..4cc21020e 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -381,7 +381,11 @@ def resume(self, partition): def move_partition_to_end(self, partition): if partition in self.assignment: - self.assignment.move_to_end(partition) + try: + self.assignment.move_to_end(partition) + except AttributeError: + state = self.assignment.pop(partition) + self.assignment[partition] = state class TopicPartitionState(object): From 4a525a1ae9cc9196121ce70155a5e036583c91b0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 09:58:48 -0700 Subject: [PATCH 4/8] Skip compression tests if libraries not installed --- test/record/test_default_records.py | 5 +++++ test/record/test_legacy_records.py | 4 ++++ test/record/test_records.py | 3 +++ test/test_producer.py | 4 +++- test/testutil.py | 16 ++++++++++++++++ 5 files changed, 31 insertions(+), 1 deletion(-) diff --git a/test/record/test_default_records.py b/test/record/test_default_records.py index e1c840fa6..79d3975a5 100644 --- a/test/record/test_default_records.py +++ b/test/record/test_default_records.py @@ -11,6 +11,8 @@ ) from kafka.errors import UnsupportedCodecError +from test.testutil import maybe_skip_unsupported_compression + @pytest.mark.parametrize("compression_type", [ DefaultRecordBatch.CODEC_NONE, @@ -19,6 +21,7 @@ DefaultRecordBatch.CODEC_LZ4 ]) def test_read_write_serde_v2(compression_type): + maybe_skip_unsupported_compression(compression_type) builder = DefaultRecordBatchBuilder( magic=2, compression_type=compression_type, is_transactional=1, producer_id=123456, producer_epoch=123, base_sequence=9999, @@ -186,6 +189,8 @@ def test_default_batch_size_limit(): ]) @pytest.mark.parametrize("magic", [0, 1]) def test_unavailable_codec(magic, compression_type, name, checker_name): + if not getattr(kafka.codec, checker_name)(): + pytest.skip('%s compression_type not installed' % (compression_type,)) builder = DefaultRecordBatchBuilder( magic=2, compression_type=compression_type, is_transactional=0, producer_id=-1, producer_epoch=-1, base_sequence=-1, diff --git a/test/record/test_legacy_records.py b/test/record/test_legacy_records.py index b15b53704..c692d35a1 100644 --- a/test/record/test_legacy_records.py +++ b/test/record/test_legacy_records.py @@ -10,6 +10,8 @@ import kafka.codec from kafka.errors import UnsupportedCodecError +from test.testutil import maybe_skip_unsupported_compression + @pytest.mark.parametrize("magic", [0, 1]) def test_read_write_serde_v0_v1_no_compression(magic): @@ -39,6 +41,7 @@ def test_read_write_serde_v0_v1_no_compression(magic): ]) @pytest.mark.parametrize("magic", [0, 1]) def test_read_write_serde_v0_v1_with_compression(compression_type, magic): + maybe_skip_unsupported_compression(compression_type) builder = LegacyRecordBatchBuilder( magic=magic, compression_type=compression_type, batch_size=9999999) for offset in range(10): @@ -179,6 +182,7 @@ def test_legacy_batch_size_limit(magic): ]) @pytest.mark.parametrize("magic", [0, 1]) def test_unavailable_codec(magic, compression_type, name, checker_name): + maybe_skip_unsupported_compression(compression_type) builder = LegacyRecordBatchBuilder( magic=magic, compression_type=compression_type, batch_size=1024) builder.append(0, timestamp=None, key=None, value=b"M") diff --git a/test/record/test_records.py b/test/record/test_records.py index cab95922d..dc9c95ff8 100644 --- a/test/record/test_records.py +++ b/test/record/test_records.py @@ -4,6 +4,8 @@ from kafka.record import MemoryRecords, MemoryRecordsBuilder from kafka.errors import CorruptRecordException +from test.testutil import maybe_skip_unsupported_compression + # This is real live data from Kafka 11 broker record_batch_data_v2 = [ # First Batch value == "123" @@ -179,6 +181,7 @@ def test_memory_records_corrupt(): @pytest.mark.parametrize("compression_type", [0, 1, 2, 3]) @pytest.mark.parametrize("magic", [0, 1, 2]) def test_memory_records_builder(magic, compression_type): + maybe_skip_unsupported_compression(compression_type) builder = MemoryRecordsBuilder( magic=magic, compression_type=compression_type, batch_size=1024 * 10) base_size = builder.size_in_bytes() # V2 has a header before diff --git a/test/test_producer.py b/test/test_producer.py index 598661aab..3d1de06d3 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -8,7 +8,7 @@ from kafka import KafkaConsumer, KafkaProducer, TopicPartition from kafka.producer.buffer import SimpleBufferPool -from test.testutil import env_kafka_version, random_string +from test.testutil import env_kafka_version, random_string, maybe_skip_unsupported_compression def test_buffer_pool(): @@ -44,6 +44,7 @@ def consumer_factory(**kwargs): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): + maybe_skip_unsupported_compression(compression) if compression == 'lz4': if env_kafka_version() < (0, 8, 2): pytest.skip('LZ4 requires 0.8.2') @@ -104,6 +105,7 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): + maybe_skip_unsupported_compression(compression) if compression == 'zstd' and env_kafka_version() < (2, 1, 0): pytest.skip('zstd requires 2.1.0 or more') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) diff --git a/test/testutil.py b/test/testutil.py index dd4e267a8..b5dab1c02 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -6,6 +6,10 @@ import string import time +import pytest + +import kafka.codec + def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')): return _matcher.sub('_', string) @@ -36,6 +40,18 @@ def assert_message_count(messages, num_messages): assert len(unique_messages) == num_messages, 'Expected %d unique messages, got %d' % (num_messages, len(unique_messages)) +def maybe_skip_unsupported_compression(compression_type): + codecs = {1: 'gzip', 2: 'snappy', 3: 'lz4', 4: 'zstd'} + if not compression_type: + return + elif compression_type in codecs: + compression_type = codecs[compression_type] + + checker = getattr(kafka.codec, 'has_' + compression_type, None) + if checker and not checker(): + pytest.skip("Compression libraries not installed for %s" % (compression_type,)) + + class Timer(object): def __enter__(self): self.start = time.time() From a984d0947d22f7a276cb59daa245043832ed9055 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 09:59:34 -0700 Subject: [PATCH 5/8] Threading daemon kwarg is py3 only --- test/test_consumer_group.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 9334a4fd1..b2908c757 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -68,7 +68,8 @@ def consumer_thread(i): num_consumers = 4 for i in range(num_consumers): - t = threading.Thread(target=consumer_thread, args=(i,), daemon=True) + t = threading.Thread(target=consumer_thread, args=(i,)) + t.daemon = True t.start() threads[i] = t From fae76dd36699db943c4431b76bee197ae36b107c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 10:00:33 -0700 Subject: [PATCH 6/8] Mock conn _send and recv --- test/test_conn.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/test_conn.py b/test/test_conn.py index 6af01498f..b5deb748c 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -11,6 +11,7 @@ import pytest from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts +from kafka.future import Future from kafka.protocol.api import RequestHeader from kafka.protocol.group import HeartbeatResponse from kafka.protocol.metadata import MetadataRequest @@ -69,8 +70,10 @@ def test_connect(_socket, conn, states): assert conn.state is state -def test_api_versions_check(_socket): +def test_api_versions_check(_socket, mocker): conn = BrokerConnection('localhost', 9092, socket.AF_INET) + mocker.patch.object(conn, '_send', return_value=Future()) + mocker.patch.object(conn, 'recv', return_value=[]) assert conn._api_versions_future is None conn.connect() assert conn._api_versions_future is not None From 6d3a3ebce7646b91feaa4bf9ba47b6e0add33b01 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 10:08:57 -0700 Subject: [PATCH 7/8] Cleanup coordinator test fixtures --- test/test_coordinator.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 35749f84d..eac1a1e62 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -25,14 +25,25 @@ from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod - @pytest.fixture -def client(conn): - return KafkaClient(api_version=(0, 9)) +def client(conn, mocker): + cli = KafkaClient(api_version=(0, 9)) + mocker.patch.object(cli, '_init_connect', return_value=True) + try: + yield cli + finally: + cli._close() @pytest.fixture -def coordinator(client): - return ConsumerCoordinator(client, SubscriptionState(), Metrics()) +def coordinator(client, mocker): + metrics = Metrics() + coord = ConsumerCoordinator(client, SubscriptionState(), metrics) + try: + yield coord + finally: + mocker.patch.object(coord, 'coordinator_unknown', return_value=True) # avoid attempting to leave group during close() + coord.close(timeout_ms=0) + metrics.close() def test_init(client, coordinator): @@ -55,6 +66,7 @@ def test_autocommit_enable_api_version(conn, api_version): assert coordinator.config['enable_auto_commit'] is False else: assert coordinator.config['enable_auto_commit'] is True + coordinator.close() def test_protocol_type(coordinator): @@ -117,6 +129,7 @@ def test_pattern_subscription(conn, api_version): else: assert set(coordinator._subscription.assignment.keys()) == {TopicPartition('foo1', 0), TopicPartition('foo2', 0)} + coordinator.close() def test_lookup_assignor(coordinator): @@ -398,6 +411,7 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, assert commit_sync.call_count == (1 if commit_offsets else 0) assert mock_warn.call_count == (1 if warn else 0) assert mock_exc.call_count == (1 if exc else 0) + coordinator.close() @pytest.fixture From c605209026632cb07c58e5a68499243d9e82e521 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 10:09:22 -0700 Subject: [PATCH 8/8] Cant reuse var names in py2 list comprehension --- test/test_subscription_state.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_subscription_state.py b/test/test_subscription_state.py index bb2c81bff..773606525 100644 --- a/test/test_subscription_state.py +++ b/test/test_subscription_state.py @@ -44,8 +44,8 @@ def test_assign_from_subscribed(): s.assign_from_subscribed([TopicPartition('foo', 0), TopicPartition('foo', 1)]) assert set(s.assignment.keys()) == set([TopicPartition('foo', 0), TopicPartition('foo', 1)]) - assert all([isinstance(s, TopicPartitionState) for s in six.itervalues(s.assignment)]) - assert all([not s.has_valid_position for s in six.itervalues(s.assignment)]) + assert all([isinstance(tps, TopicPartitionState) for tps in six.itervalues(s.assignment)]) + assert all([not tps.has_valid_position for tps in six.itervalues(s.assignment)]) def test_change_subscription_after_assignment():