From a89f4a3713e02f7250e03efd1d3d173da7b73c34 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 11 Mar 2025 15:25:03 -0700 Subject: [PATCH 1/2] feat: Add SSPI (Kerberos for Windows) authentication mechanism --- kafka/conn.py | 2 +- kafka/sasl/__init__.py | 10 +++- kafka/sasl/abc.py | 5 ++ kafka/sasl/gssapi.py | 7 ++- kafka/sasl/oauth.py | 5 ++ kafka/sasl/plain.py | 5 ++ kafka/sasl/scram.py | 7 +++ kafka/sasl/sspi.py | 109 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 kafka/sasl/sspi.py diff --git a/kafka/conn.py b/kafka/conn.py index 7287a4840..6992bb5c2 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -766,7 +766,7 @@ def _sasl_authenticate(self, future): self._sasl_mechanism.receive(recv_token) if self._sasl_mechanism.is_authenticated(): - log.info('%s: Authenticated via %s', self, self.config['sasl_mechanism']) + log.info('%s: %s', self, self._sasl_mechanism.auth_details()) return future.success(True) else: return future.failure(Errors.AuthenticationFailedError('Failed to authenticate via SASL %s' % self.config['sasl_mechanism'])) diff --git a/kafka/sasl/__init__.py b/kafka/sasl/__init__.py index 8677f60d2..90f05e733 100644 --- a/kafka/sasl/__init__.py +++ b/kafka/sasl/__init__.py @@ -1,10 +1,13 @@ from __future__ import absolute_import +import platform + from kafka.sasl.gssapi import SaslMechanismGSSAPI from kafka.sasl.msk import SaslMechanismAwsMskIam from kafka.sasl.oauth import SaslMechanismOAuth from kafka.sasl.plain import SaslMechanismPlain from kafka.sasl.scram import SaslMechanismScram +from kafka.sasl.sspi import SaslMechanismSSPI SASL_MECHANISMS = {} @@ -20,9 +23,12 @@ def get_sasl_mechanism(name): return SASL_MECHANISMS[name] -register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI) +register_sasl_mechanism('AWS_MSK_IAM', SaslMechanismAwsMskIam) +if platform.system() == 'Windows': + register_sasl_mechanism('GSSAPI', SaslMechanismSSPI) +else: + register_sasl_mechanism('GSSAPI', SaslMechanismGSSAPI) register_sasl_mechanism('OAUTHBEARER', SaslMechanismOAuth) register_sasl_mechanism('PLAIN', SaslMechanismPlain) register_sasl_mechanism('SCRAM-SHA-256', SaslMechanismScram) register_sasl_mechanism('SCRAM-SHA-512', SaslMechanismScram) -register_sasl_mechanism('AWS_MSK_IAM', SaslMechanismAwsMskIam) diff --git a/kafka/sasl/abc.py b/kafka/sasl/abc.py index 7baef3b78..8977c7c23 100644 --- a/kafka/sasl/abc.py +++ b/kafka/sasl/abc.py @@ -25,3 +25,8 @@ def is_done(self): @abc.abstractmethod def is_authenticated(self): pass + + def auth_details(self): + if not self.is_authenticated: + raise RuntimeError('Not authenticated yet!') + return 'Authenticated via SASL' diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index 60b658c77..a8a4d3b75 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -70,7 +70,7 @@ def receive(self, auth_bytes): message_parts = [ struct.pack('>b', self.SASL_QOP_AUTH & struct.unpack('>b', msg[0:1])), msg[:1], - self.auth_id.encode(), + self.auth_id.encode('utf-8'), ] # add authorization identity to the response, and GSS-wrap self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message @@ -80,3 +80,8 @@ def is_done(self): def is_authenticated(self): return self._is_authenticated + + def auth_details(self): + if not self.is_authenticated: + raise RuntimeError('Not authenticated yet!') + return 'Authenticated as %s to %s via SASL / GSSAPI' % (self._client_ctx.initiator_name, self._client_ctx.target_name) diff --git a/kafka/sasl/oauth.py b/kafka/sasl/oauth.py index fce630a77..d4f643d84 100644 --- a/kafka/sasl/oauth.py +++ b/kafka/sasl/oauth.py @@ -37,3 +37,8 @@ def _token_extensions(self): extensions = getattr(self.token_provider, 'extensions', lambda: [])() msg = '\x01'.join(['{}={}'.format(k, v) for k, v in extensions.items()]) return '\x01' + msg if msg else '' + + def auth_details(self): + if not self.is_authenticated: + raise RuntimeError('Not authenticated yet!') + return 'Authenticated via SASL / OAuth' diff --git a/kafka/sasl/plain.py b/kafka/sasl/plain.py index e59d23013..81443f5fe 100644 --- a/kafka/sasl/plain.py +++ b/kafka/sasl/plain.py @@ -34,3 +34,8 @@ def is_done(self): def is_authenticated(self): return self._is_authenticated + + def auth_details(self): + if not self.is_authenticated: + raise RuntimeError('Not authenticated yet!') + return 'Authenticated as %s via SASL / Plain' % self.username diff --git a/kafka/sasl/scram.py b/kafka/sasl/scram.py index 734885927..d8cd071a7 100644 --- a/kafka/sasl/scram.py +++ b/kafka/sasl/scram.py @@ -30,6 +30,8 @@ def __init__(self, **config): if config.get('security_protocol', '') == 'SASL_PLAINTEXT': log.warning('Exchanging credentials in the clear during Sasl Authentication') + self.username = config['sasl_plain_username'] + self.mechanism = config['sasl_mechanism'] self._scram_client = ScramClient( config['sasl_plain_username'], config['sasl_plain_password'], @@ -62,6 +64,11 @@ def is_authenticated(self): # receive raises if authentication fails...? return self._state == 2 + def auth_details(self): + if not self.is_authenticated: + raise RuntimeError('Not authenticated yet!') + return 'Authenticated as %s via SASL / %s' % (self.username, self.mechanism) + class ScramClient: MECHANISMS = { diff --git a/kafka/sasl/sspi.py b/kafka/sasl/sspi.py new file mode 100644 index 000000000..052bb55f6 --- /dev/null +++ b/kafka/sasl/sspi.py @@ -0,0 +1,109 @@ +from __future__ import absolute_import + +import logging + +# Windows-only +try: + import sspi + import pywintypes + import sspicon + import win32security +except ImportError: + sspi = None + +from kafka.sasl.abc import SaslMechanism + + +log = logging.getLogger(__name__) + + +class SaslMechanismSSPI(SaslMechanism): + # Establish security context and negotiate protection level + # For reference see RFC 4752, section 3 + + SASL_QOP_AUTH = 1 + SASL_QOP_AUTH_INT = 2 + SASL_QOP_AUTH_CONF = 4 + + def __init__(self, **config): + assert sspi is not None, 'No GSSAPI lib available (gssapi or sspi)' + if 'sasl_kerberos_name' not in config and 'sasl_kerberos_service_name' not in config: + raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration') + self._is_done = False + self._is_authenticated = False + if config.get('sasl_kerberos_name', None) is not None: + self.auth_id = str(config['sasl_kerberos_name']) + else: + kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '') + self.auth_id = config['sasl_kerberos_service_name'] + '/' + kerberos_domain_name + scheme = "Kerberos" # Do not try with Negotiate for SASL authentication. Tokens are different. + # https://docs.microsoft.com/en-us/windows/win32/secauthn/context-requirements + flags = ( + sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication + sspicon.ISC_REQ_INTEGRITY | # check for integrity + sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages + sspicon.ISC_REQ_CONFIDENTIALITY # request confidentiality + ) + self._client_ctx = sspi.ClientAuth(scheme, targetspn=self.auth_id, scflags=flags) + self._next_token = self._client_ctx.step(None) + + def auth_bytes(self): + # GSSAPI Auth does not have a final broker->client message + # so mark is_done after the final auth_bytes are provided + # in practice we'll still receive a response when using SaslAuthenticate + # but not when using the prior unframed approach. + if self._client_ctx.authenticated: + self._is_done = True + self._is_authenticated = True + return self._next_token or b'' + + def receive(self, auth_bytes): + log.debug("Received token from server (size %s)", len(auth_bytes)) + if not self._client_ctx.authenticated: + # calculate an output token from kafka token (or None on first iteration) + # https://docs.microsoft.com/en-us/windows/win32/api/sspi/nf-sspi-initializesecuritycontexta + # https://docs.microsoft.com/en-us/windows/win32/secauthn/initializesecuritycontext--kerberos + # authorize method will wrap for us our token in sspi structures + error, auth = self._client_ctx.authorize(auth_bytes) + if len(auth) > 0 and len(auth[0].Buffer): + log.debug("Got token from context") + # this buffer must be sent to the server whatever the result is + self._next_token = auth[0].Buffer + else: + log.debug("Got no token, exchange finished") + # seems to be the end of the loop + self._next_token = b'' + elif self._is_done: + # The final step of gssapi is send, so we do not expect any additional bytes + # however, allow an empty message to support SaslAuthenticate response + if auth_bytes != b'': + raise ValueError("Unexpected receive auth_bytes after sasl/gssapi completion") + else: + # Process the security layer negotiation token, sent by the server + # once the security context is established. + + # The following part is required by SASL, but not by classic Kerberos. + # See RFC 4752 + + # unwraps message containing supported protection levels and msg size + msg, _was_encrypted = self._client_ctx.unwrap(auth_bytes) + + # Kafka currently doesn't support integrity or confidentiality security layers, so we + # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed + # by the server + message_parts = [ + Int8.encode(self.SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))), + msg[:1], + self.auth_id.encode('utf-8'), + ] + # add authorization identity to the response, and GSS-wrap + self._next_token = self._client_ctx.wrap(b''.join(message_parts), False) + + def is_done(self): + return self._is_done + + def is_authenticated(self): + return self._is_authenticated + + def auth_details(self): + return 'Authenticated as %s to %s via SASL / SSPI/GSSAPI \\o/' % (self._client_ctx.initiator_name, self._client_ctx.service_name) From 0fe2e3c2557bd0acb117781b327af7ccc97b2984 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 11 Mar 2025 16:59:23 -0700 Subject: [PATCH 2/2] improve flags handling --- kafka/sasl/gssapi.py | 6 +++--- kafka/sasl/sspi.py | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index a8a4d3b75..be84269da 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -1,7 +1,5 @@ from __future__ import absolute_import -import struct - # needed for SASL_GSSAPI authentication: try: import gssapi @@ -67,8 +65,10 @@ def receive(self, auth_bytes): # Kafka currently doesn't support integrity or confidentiality security layers, so we # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed # by the server + client_flags = self.SASL_QOP_AUTH + server_flags = msg[0] message_parts = [ - struct.pack('>b', self.SASL_QOP_AUTH & struct.unpack('>b', msg[0:1])), + bytes(client_flags & server_flags), msg[:1], self.auth_id.encode('utf-8'), ] diff --git a/kafka/sasl/sspi.py b/kafka/sasl/sspi.py index 052bb55f6..f4c95d037 100644 --- a/kafka/sasl/sspi.py +++ b/kafka/sasl/sspi.py @@ -91,8 +91,10 @@ def receive(self, auth_bytes): # Kafka currently doesn't support integrity or confidentiality security layers, so we # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed # by the server + client_flags = self.SASL_QOP_AUTH + server_flags = msg[0] message_parts = [ - Int8.encode(self.SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))), + bytes(client_flags & server_flags), msg[:1], self.auth_id.encode('utf-8'), ]