From f0a313ea5b1bfc3ebbb1a5372f7cbc4faf98a15b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 27 Mar 2025 15:55:30 -0400 Subject: [PATCH 1/4] Fix MetadataRequest for no topics --- kafka/client_async.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 8df4566e6..8c071104e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -978,8 +978,10 @@ def _maybe_refresh_metadata(self, wakeup=False): topics = list(self.config['bootstrap_topics_filter']) api_version = self.api_version(MetadataRequest, max_version=7) - if self.cluster.need_all_topic_metadata or not topics: + if self.cluster.need_all_topic_metadata: topics = MetadataRequest[api_version].ALL_TOPICS + elif not topics: + topics = MetadataRequest[api_version].NO_TOPICS if api_version >= 4: request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics']) else: From 071e66b66edcd277c844824c5bc1f747afb496c3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 27 Mar 2025 16:11:05 -0400 Subject: [PATCH 2/4] Fixup test_client_async tests --- test/test_client_async.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/test_client_async.py b/test/test_client_async.py index 8582d8fb7..4f6646fe9 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -43,7 +43,7 @@ def test_bootstrap(mocker, conn): kwargs.pop('state_change_callback') kwargs.pop('node_id') assert kwargs == cli.config - conn.send.assert_called_once_with(MetadataRequest[0]([]), blocking=False, request_timeout_ms=None) + conn.send.assert_called_once_with(MetadataRequest[0](None), blocking=False, request_timeout_ms=None) assert cli._bootstrap_fails == 0 assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None), BrokerMetadata(1, 'bar', 34, None)]) @@ -330,6 +330,7 @@ def test_maybe_refresh_metadata_update(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_send_request', return_value=True) send = mocker.patch.object(client, 'send') + client.cluster.need_all_topic_metadata = True client.poll(timeout_ms=12345678) client._poll.assert_called_with(9999.999) # request_timeout_ms From e1850cabf873fcbf3903390bbd277597744026d0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 27 Mar 2025 16:35:07 -0400 Subject: [PATCH 3/4] Add fake NO_TOPICS for MetadataRequest v0 --- kafka/protocol/metadata.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 3291be82d..bb22ba997 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -172,6 +172,7 @@ class MetadataRequest_v0(Request): ('topics', Array(String('utf-8'))) ) ALL_TOPICS = [] # Empty Array (len 0) for topics returns all topics + NO_TOPICS = [] # v0 does not support a 'no topics' request, so we'll just ask for ALL class MetadataRequest_v1(Request): From f0026fed8cf4b78c4a2722d00ea827eec20c3e40 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 27 Mar 2025 16:35:31 -0400 Subject: [PATCH 4/4] Test bootstrap with api_version 2.1 --- test/test_client_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_client_async.py b/test/test_client_async.py index 4f6646fe9..276926116 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -32,7 +32,7 @@ def cli(mocker, conn): def test_bootstrap(mocker, conn): conn.state = ConnectionStates.CONNECTED - cli = KafkaClient(api_version=(0, 9)) + cli = KafkaClient(api_version=(2, 1)) mocker.patch.object(cli, '_selector') future = cli.cluster.request_update() cli.poll(future=future) @@ -43,7 +43,7 @@ def test_bootstrap(mocker, conn): kwargs.pop('state_change_callback') kwargs.pop('node_id') assert kwargs == cli.config - conn.send.assert_called_once_with(MetadataRequest[0](None), blocking=False, request_timeout_ms=None) + conn.send.assert_called_once_with(MetadataRequest[7]([], True), blocking=False, request_timeout_ms=None) assert cli._bootstrap_fails == 0 assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None), BrokerMetadata(1, 'bar', 34, None)])