From f7d4acdb89a46491e173dd0002a981e1660042e1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 11 Mar 2025 14:01:04 -0700 Subject: [PATCH] Cleanup sasl mechanism configuration checks; fix gssapi bugs; add sasl_kerberos_name config --- kafka/admin/client.py | 4 ++++ kafka/client_async.py | 4 ++++ kafka/conn.py | 4 ++++ kafka/consumer/group.py | 4 ++++ kafka/producer/kafka.py | 4 ++++ kafka/sasl/gssapi.py | 17 ++++++++++++----- kafka/sasl/oauth.py | 2 +- kafka/sasl/plain.py | 6 +++--- kafka/sasl/scram.py | 8 ++++---- 9 files changed, 40 insertions(+), 13 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 29ee6cd9a..27ad69312 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -142,6 +142,9 @@ class KafkaAdminClient(object): Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with + sasl mechanism handshake. If provided, sasl_kerberos_service_name and + sasl_kerberos_domain name are ignored. Default: None. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI @@ -181,6 +184,7 @@ class KafkaAdminClient(object): 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None, + 'sasl_kerberos_name': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, diff --git a/kafka/client_async.py b/kafka/client_async.py index 6fe47c6f7..3892c2759 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -163,6 +163,9 @@ class KafkaClient(object): Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with + sasl mechanism handshake. If provided, sasl_kerberos_service_name and + sasl_kerberos_domain name are ignored. Default: None. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI @@ -206,6 +209,7 @@ class KafkaClient(object): 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None, + 'sasl_kerberos_name': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None diff --git a/kafka/conn.py b/kafka/conn.py index 988f4399f..857b13a57 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -178,6 +178,9 @@ class BrokerConnection(object): Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with + sasl mechanism handshake. If provided, sasl_kerberos_service_name and + sasl_kerberos_domain name are ignored. Default: None. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI @@ -216,6 +219,7 @@ class BrokerConnection(object): 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None, + 'sasl_kerberos_name': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 6f23bec8a..16fd7c005 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -250,6 +250,9 @@ class KafkaConsumer(six.Iterator): Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with + sasl mechanism handshake. If provided, sasl_kerberos_service_name and + sasl_kerberos_domain name are ignored. Default: None. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI @@ -317,6 +320,7 @@ class KafkaConsumer(six.Iterator): 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None, + 'sasl_kerberos_name': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 233bc3dce..1c075eba0 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -289,6 +289,9 @@ class KafkaProducer(object): Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. + sasl_kerberos_name (str or gssapi.Name): Constructed gssapi.Name for use with + sasl mechanism handshake. If provided, sasl_kerberos_service_name and + sasl_kerberos_domain name are ignored. Default: None. sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI @@ -347,6 +350,7 @@ class KafkaProducer(object): 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None, + 'sasl_kerberos_name': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index b40c37535..1be3de4a4 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -22,12 +22,19 @@ class SaslMechanismGSSAPI(SaslMechanism): def __init__(self, **config): assert gssapi is not None, 'GSSAPI lib not available' - assert config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' + if 'sasl_kerberos_name' not in config and 'sasl_kerberos_service_name' not in config: + raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration') self._is_done = False self._is_authenticated = False - self.kerberos_damin_name = config['sasl_kerberos_domain_name'] or config['host'] - self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name - self.gssapi_name = gssapi.Name(auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos) + if config.get('sasl_kerberos_name', None) is not None: + self.auth_id = str(config['sasl_kerberos_name']) + else: + kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '') + self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_domain_name + if isinstance(config.get('sasl_kerberos_name', None), gssapi.Name): + self.gssapi_name = config['sasl_kerberos_name'] + else: + self.gssapi_name = gssapi.Name(self.auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos) self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate') self._next_token = self._client_ctx.step(None) @@ -54,7 +61,7 @@ def receive(self, auth_bytes): raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion") else: # unwraps message containing supported protection levels and msg size - msg = client_ctx.unwrap(received_token).message + msg = self._client_ctx.unwrap(auth_bytes).message # Kafka currently doesn't support integrity or confidentiality security layers, so we # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed # by the server diff --git a/kafka/sasl/oauth.py b/kafka/sasl/oauth.py index 7bbc7dd43..fce630a77 100644 --- a/kafka/sasl/oauth.py +++ b/kafka/sasl/oauth.py @@ -6,8 +6,8 @@ class SaslMechanismOAuth(SaslMechanism): def __init__(self, **config): + assert 'sasl_oauth_token_provider' in config, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' self.token_provider = config['sasl_oauth_token_provider'] - assert self.token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' assert callable(getattr(self.token_provider, 'token', None)), 'sasl_oauth_token_provider must implement method #token()' self._is_done = False self._is_authenticated = False diff --git a/kafka/sasl/plain.py b/kafka/sasl/plain.py index f2bae6751..e59d23013 100644 --- a/kafka/sasl/plain.py +++ b/kafka/sasl/plain.py @@ -11,10 +11,10 @@ class SaslMechanismPlain(SaslMechanism): def __init__(self, **config): - if config['security_protocol'] == 'SASL_PLAINTEXT': + if config.get('security_protocol', '') == 'SASL_PLAINTEXT': log.warning('Sending username and password in the clear') - assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for PLAIN sasl' - assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl' + assert 'sasl_plain_username' in config, 'sasl_plain_username required for PLAIN sasl' + assert 'sasl_plain_password' in config, 'sasl_plain_password required for PLAIN sasl' self.username = config['sasl_plain_username'] self.password = config['sasl_plain_password'] diff --git a/kafka/sasl/scram.py b/kafka/sasl/scram.py index 0bae8c928..734885927 100644 --- a/kafka/sasl/scram.py +++ b/kafka/sasl/scram.py @@ -23,11 +23,11 @@ def xor_bytes(left, right): class SaslMechanismScram(SaslMechanism): - def __init__(self, **config): - assert config['sasl_plain_username'] is not None, 'sasl_plain_username required for SCRAM sasl' - assert config['sasl_plain_password'] is not None, 'sasl_plain_password required for SCRAM sasl' - if config['security_protocol'] == 'SASL_PLAINTEXT': + assert 'sasl_plain_username' in config, 'sasl_plain_username required for SCRAM sasl' + assert 'sasl_plain_password' in config, 'sasl_plain_password required for SCRAM sasl' + assert config.get('sasl_mechanism', '') in ScramClient.MECHANISMS, 'Unrecognized SCRAM mechanism' + if config.get('security_protocol', '') == 'SASL_PLAINTEXT': log.warning('Exchanging credentials in the clear during Sasl Authentication') self._scram_client = ScramClient(