From 0645aabab07569a3eaf311a4cbb4f20ed56a3739 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 22 Mar 2025 10:05:32 -0700 Subject: [PATCH 1/3] Move ensure_valid_topic_name to kafka.util; use in client and producer --- kafka/client_async.py | 8 +++++++- kafka/consumer/subscription_state.py | 25 ++----------------------- kafka/producer/kafka.py | 5 +++++ kafka/util.py | 23 +++++++++++++++++++++++ 4 files changed, 37 insertions(+), 24 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 7121ce7a7..4de05b33e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -27,7 +27,7 @@ from kafka.metrics.stats.rate import TimeUnit from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.metadata import MetadataRequest -from kafka.util import Dict, WeakMethod +from kafka.util import Dict, WeakMethod, ensure_valid_topic_name # Although this looks unused, it actually monkey-patches socket.socketpair() # and should be left in as long as we're using socket.socketpair() in this file from kafka.vendor import socketpair # noqa: F401 @@ -909,7 +909,13 @@ def add_topic(self, topic): Returns: Future: resolves after metadata request/response + + Raises: + TypeError: if topic is not a string + ValueError: if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length """ + ensure_valid_topic_name(topic) + if topic in self._topics: return Future().success(set(self._topics)) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index abe37fb86..2b2bcb477 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -9,6 +9,7 @@ from kafka.errors import IllegalStateError from kafka.protocol.list_offsets import OffsetResetStrategy from kafka.structs import OffsetAndMetadata +from kafka.util import ensure_valid_topic_name log = logging.getLogger(__name__) @@ -43,10 +44,6 @@ class SubscriptionState(object): " (2) subscribe to topics matching a regex pattern," " (3) assign itself specific topic-partitions.") - # Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29 - _MAX_NAME_LENGTH = 249 - _TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$') - def __init__(self, offset_reset_strategy='earliest'): """Initialize a SubscriptionState instance @@ -123,24 +120,6 @@ def subscribe(self, topics=(), pattern=None, listener=None): raise TypeError('listener must be a ConsumerRebalanceListener') self.listener = listener - def _ensure_valid_topic_name(self, topic): - """ Ensures that the topic name is valid according to the kafka source. """ - - # See Kafka Source: - # https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java - if topic is None: - raise TypeError('All topics must not be None') - if not isinstance(topic, six.string_types): - raise TypeError('All topics must be strings') - if len(topic) == 0: - raise ValueError('All topics must be non-empty strings') - if topic == '.' or topic == '..': - raise ValueError('Topic name cannot be "." or ".."') - if len(topic) > self._MAX_NAME_LENGTH: - raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(self._MAX_NAME_LENGTH, topic)) - if not self._TOPIC_LEGAL_CHARS.match(topic): - raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic)) - def change_subscription(self, topics): """Change the topic subscription. @@ -166,7 +145,7 @@ def change_subscription(self, topics): return for t in topics: - self._ensure_valid_topic_name(t) + ensure_valid_topic_name(t) log.info('Updating subscribed topics to: %s', topics) self.subscription = set(topics) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index b8ace0fc1..8da14af1c 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -22,6 +22,7 @@ from kafka.record.legacy_records import LegacyRecordBatchBuilder from kafka.serializer import Serializer from kafka.structs import TopicPartition +from kafka.util import ensure_valid_topic_name log = logging.getLogger(__name__) @@ -593,11 +594,15 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest Raises: KafkaTimeoutError: if unable to fetch topic metadata, or unable to obtain memory buffer prior to configured max_block_ms + TypeError: if topic is not a string + ValueError: if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length + AssertionError: if KafkaProducer is closed, or key and value are both None """ assert not self._closed, 'KafkaProducer already closed!' assert value is not None or self.config['api_version'] >= (0, 8, 1), ( 'Null messages require kafka >= 0.8.1') assert not (value is None and key is None), 'Need at least one: key or value' + ensure_valid_topic_name(topic) key_bytes = value_bytes = None try: assigned_partition = None diff --git a/kafka/util.py b/kafka/util.py index d067a063d..1a23ea673 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -43,6 +43,29 @@ def inner_timeout_ms(fallback=None): return inner_timeout_ms +# Taken from: https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29 +TOPIC_MAX_LENGTH = 249 +TOPIC_LEGAL_CHARS = re.compile('^[a-zA-Z0-9._-]+$') + +def ensure_valid_topic_name(topic): + """ Ensures that the topic name is valid according to the kafka source. """ + + # See Kafka Source: + # https://github.com/apache/kafka/blob/39eb31feaeebfb184d98cc5d94da9148c2319d81/clients/src/main/java/org/apache/kafka/common/internals/Topic.java + if topic is None: + raise TypeError('All topics must not be None') + if not isinstance(topic, six.string_types): + raise TypeError('All topics must be strings') + if len(topic) == 0: + raise ValueError('All topics must be non-empty strings') + if topic == '.' or topic == '..': + raise ValueError('Topic name cannot be "." or ".."') + if len(topic) > TOPIC_MAX_LENGTH: + raise ValueError('Topic name is illegal, it can\'t be longer than {0} characters, topic: "{1}"'.format(TOPIC_MAX_LENGTH, topic)) + if not TOPIC_LEGAL_CHARS.match(topic): + raise ValueError('Topic name "{0}" is illegal, it contains a character other than ASCII alphanumerics, ".", "_" and "-"'.format(topic)) + + class WeakMethod(object): """ Callable that weakly references a method and the object it is bound to. It From 6214a99616c62f3ce7db05a7f4c8d828535299c3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 22 Mar 2025 17:03:46 -0700 Subject: [PATCH 2/3] import re --- kafka/util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/util.py b/kafka/util.py index 1a23ea673..470200b1b 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import binascii +import re import time import weakref From 06f1eea05f20d8ace39365b7b998a5e260c529bd Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 23 Mar 2025 09:42:47 -0700 Subject: [PATCH 3/3] Fixup ensure_valid_topic_name tests --- test/{test_subscription_state.py => test_util.py} | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) rename test/{test_subscription_state.py => test_util.py} (83%) diff --git a/test/test_subscription_state.py b/test/test_util.py similarity index 83% rename from test/test_subscription_state.py rename to test/test_util.py index 9718f6af4..875b252aa 100644 --- a/test/test_subscription_state.py +++ b/test/test_util.py @@ -3,7 +3,7 @@ import pytest -from kafka.consumer.subscription_state import SubscriptionState +from kafka.util import ensure_valid_topic_name @pytest.mark.parametrize(('topic_name', 'expectation'), [ (0, pytest.raises(TypeError)), @@ -20,6 +20,5 @@ ('name+with+plus', pytest.raises(ValueError)), ]) def test_topic_name_validation(topic_name, expectation): - state = SubscriptionState() with expectation: - state._ensure_valid_topic_name(topic_name) + ensure_valid_topic_name(topic_name)