From 683e730ab53d3d54f72bb23a460c2bbe496ca7f1 Mon Sep 17 00:00:00 2001 From: Emmanuel Coirier Date: Fri, 19 Mar 2021 11:52:12 +0100 Subject: [PATCH 1/8] POC implementation, to check that it can work. --- kafka/conn.py | 136 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 130 insertions(+), 6 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index cac354875..2b6eb31e3 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -84,6 +84,11 @@ class SSLWantWriteError(Exception): GSSError = None +try: + import kerberos_sspi as kerberos +except ImportError: + kerberos = None + AFI_NAMES = { socket.AF_UNSPEC: "unspecified", socket.AF_INET: "IPv4", @@ -180,7 +185,8 @@ class BrokerConnection(object): metric_group_prefix (str): Prefix for metric names. Default: '' sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512, KERBEROS_V5 + (Windows Only, via win32's SSPI). sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. @@ -227,7 +233,7 @@ class BrokerConnection(object): 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512", "KERBEROS_V5") def __init__(self, host, port, afi, **configs): self.host = host @@ -272,6 +278,8 @@ def __init__(self, host, port, afi, **configs): if self.config['sasl_mechanism'] == 'GSSAPI': assert gssapi is not None, 'GSSAPI lib not available' assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' + if self.config['sasl_mechanism'] == 'KERBEROS_V5': + assert kerberos is not None, 'KERBEROS_V5 / kerberos-sspi lib not available' if self.config['sasl_mechanism'] == 'OAUTHBEARER': token_provider = self.config['sasl_oauth_token_provider'] assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' @@ -522,11 +530,13 @@ def _try_authenticate(self): if self._sasl_auth_future is None: # Build a SaslHandShakeRequest message - request = SaslHandShakeRequest[0](self.config['sasl_mechanism']) + sasl_mechanism = self.config['sasl_mechanism'] if self.config['sasl_mechanism'] != "KERBEROS_V5" else "GSSAPI" + log.debug(f'Using {sasl_mechanism} sasl_mechanism') + request = SaslHandShakeRequest[0](sasl_mechanism) future = Future() sasl_response = self._send(request) sasl_response.add_callback(self._handle_sasl_handshake_response, future) - sasl_response.add_errback(lambda f, e: f.failure(e), future) + sasl_response.add_errback(self._handle_err, future) self._sasl_auth_future = future for r, f in self.recv(): @@ -541,14 +551,21 @@ def _try_authenticate(self): raise ex # pylint: disable-msg=raising-bad-type return self._sasl_auth_future.succeeded() + def _handle_err(self, f, e): + print(f, e) + f.failure(e) + + + + def _handle_sasl_handshake_response(self, future, response): error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: error = error_type(self) self.close(error=error) return future.failure(error_type(self)) - - if self.config['sasl_mechanism'] not in response.enabled_mechanisms: + sasl_mechanism = self.config['sasl_mechanism'] if self.config['sasl_mechanism'] != "KERBEROS_V5" else "GSSAPI" + if sasl_mechanism not in response.enabled_mechanisms: return future.failure( Errors.UnsupportedSaslMechanismError( 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s' @@ -557,6 +574,8 @@ def _handle_sasl_handshake_response(self, future, response): return self._try_authenticate_plain(future) elif self.config['sasl_mechanism'] == 'GSSAPI': return self._try_authenticate_gssapi(future) + elif self.config['sasl_mechanism'] == 'KERBEROS_V5': + return self._try_authenticate_kerberos_v5(future) elif self.config['sasl_mechanism'] == 'OAUTHBEARER': return self._try_authenticate_oauth(future) elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): @@ -781,6 +800,111 @@ def _try_authenticate_gssapi(self, future): log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) return future.success(True) + def _try_authenticate_kerberos_v5(self, future): + target_host = self.config['sasl_kerberos_domain_name'] or self.host + service_principal_name = self.config['sasl_kerberos_service_name'] + '@' + target_host + + err = None + close = False + + with self._lock: + if not self._can_send_recv(): + err = Errors.NodeNotReadyError(str(self)) + close = False + else: + try: + + # Establish security context and negotiate protection level + # For reference RFC 2222, section 7.2.1 + flags = kerberos.GSS_C_CONF_FLAG|kerberos.GSS_C_INTEG_FLAG|kerberos.GSS_C_MUTUAL_FLAG|kerberos.GSS_C_SEQUENCE_FLAG + res, client_ctx = kerberos.authGSSClientInit(service_principal_name, gssflags=flags) + assert res == kerberos.AUTH_GSS_COMPLETE + + res = kerberos.AUTH_GSS_CONTINUE + received_token = b"" + # Exchange tokens until authentication either succeeds or fails + krb_round = 0 + while res == kerberos.AUTH_GSS_CONTINUE: + krb_round += 1 + log.debug(f"Round {krb_round}") + res = kerberos.authGSSClientStep(client_ctx, kerberos.encodestring(received_token)) + if res == -1: + raise RuntimeError("Client Step Error", res) + + output_token = client_ctx["response"] # get the binary data, not a base64 encoded version + + # pass output token to kafka, or send empty response if the security + # context is complete (output token is None in that case) + if res != kerberos.AUTH_GSS_CONTINUE: + self._send_bytes_blocking(Int32.encode(0)) + else: + msg = output_token + size = Int32.encode(len(msg)) + self._send_bytes_blocking(size + msg) + + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. + header = self._recv_bytes_blocking(4) + (token_size,) = struct.unpack('>i', header) + received_token = self._recv_bytes_blocking(token_size) + + # Process the security layer negotiation token, sent by the server + # once the security context is established. + + # unwraps message containing supported protection levels and msg size + kerberos.authGSSClientUnwrap(client_ctx, kerberos.encodestring(received_token)) + msg = client_ctx["response"] + + # Kafka currently doesn't support integrity or confidentiality security layers, so we + # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed + # by the server + msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:] + msg += service_principal_name.encode("utf-8") + # add authorization identity to the response, GSS-wrap and send it + + + + # import sspicon, win32security + # pkg_size_info = client_ctx["csa"].ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES) + # trailersize=pkg_size_info['SecurityTrailer'] + # + # encbuf=win32security.PySecBufferDescType() + # encbuf.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA)) + # encbuf.append(win32security.PySecBufferType(trailersize, sspicon.SECBUFFER_TOKEN)) + # encbuf[0].Buffer=msg + # client_ctx["csa"].ctxt.EncryptMessage(0,encbuf,client_ctx["csa"]._get_next_seq_num()) + # + # msg = encbuf[0].Buffer #+ encbuf[1].Buffer + # # return encbuf[0].Buffer, encbuf[1].Buffer + + + + kerberos.authGSSClientWrap(client_ctx, kerberos.encodestring(msg), service_principal_name) + msg = client_ctx["response"] + + size = Int32.encode(len(msg)) + self._send_bytes_blocking(size + msg) + + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving reply from server", self) + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) + close = True + except Exception as e: + # raise + err = e + close = True + + if err is not None: + if close: + self.close(error=err) + return future.failure(err) + + log.info('%s: Authenticated as %s via GSSAPI', self, service_principal_name) + return future.success(True) + + def _try_authenticate_oauth(self, future): data = b'' From fef796a963fd5ac673b08aca890988d687e6dbc2 Mon Sep 17 00:00:00 2001 From: Emmanuel Coirier Date: Fri, 19 Mar 2021 15:08:25 +0100 Subject: [PATCH 2/8] Clean implementation, compatible with GSSAPI scheme. --- kafka/conn.py | 112 +++++++++++++++++++++++--------------------------- setup.py | 2 + 2 files changed, 54 insertions(+), 60 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 2b6eb31e3..e5e334bd6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -79,15 +79,14 @@ class SSLWantWriteError(Exception): import gssapi from gssapi.raw.misc import GSSError except ImportError: - #no gssapi available, will disable gssapi mechanism gssapi = None GSSError = None try: - import kerberos_sspi as kerberos + import kerberos_sspi except ImportError: - kerberos = None + kerberos_sspi = None AFI_NAMES = { socket.AF_UNSPEC: "unspecified", @@ -185,8 +184,7 @@ class BrokerConnection(object): metric_group_prefix (str): Prefix for metric names. Default: '' sasl_mechanism (str): Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: - PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512, KERBEROS_V5 - (Windows Only, via win32's SSPI). + PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512. sasl_plain_username (str): username for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms. sasl_plain_password (str): password for sasl PLAIN and SCRAM authentication. @@ -233,7 +231,7 @@ class BrokerConnection(object): 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512", "KERBEROS_V5") + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER', "SCRAM-SHA-256", "SCRAM-SHA-512") def __init__(self, host, port, afi, **configs): self.host = host @@ -276,10 +274,9 @@ def __init__(self, host, port, afi, **configs): 'sasl_plain_password required for PLAIN or SCRAM sasl' ) if self.config['sasl_mechanism'] == 'GSSAPI': - assert gssapi is not None, 'GSSAPI lib not available' + if gssapi is None and kerberos_sspi is None: + raise AssertionError('No GSSAPI lib available') assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' - if self.config['sasl_mechanism'] == 'KERBEROS_V5': - assert kerberos is not None, 'KERBEROS_V5 / kerberos-sspi lib not available' if self.config['sasl_mechanism'] == 'OAUTHBEARER': token_provider = self.config['sasl_oauth_token_provider'] assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' @@ -530,13 +527,11 @@ def _try_authenticate(self): if self._sasl_auth_future is None: # Build a SaslHandShakeRequest message - sasl_mechanism = self.config['sasl_mechanism'] if self.config['sasl_mechanism'] != "KERBEROS_V5" else "GSSAPI" - log.debug(f'Using {sasl_mechanism} sasl_mechanism') - request = SaslHandShakeRequest[0](sasl_mechanism) + request = SaslHandShakeRequest[0](self.config['sasl_mechanism']) future = Future() sasl_response = self._send(request) sasl_response.add_callback(self._handle_sasl_handshake_response, future) - sasl_response.add_errback(self._handle_err, future) + sasl_response.add_errback(lambda f, e: f.failure(e), future) self._sasl_auth_future = future for r, f in self.recv(): @@ -551,21 +546,14 @@ def _try_authenticate(self): raise ex # pylint: disable-msg=raising-bad-type return self._sasl_auth_future.succeeded() - def _handle_err(self, f, e): - print(f, e) - f.failure(e) - - - - def _handle_sasl_handshake_response(self, future, response): error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: error = error_type(self) self.close(error=error) return future.failure(error_type(self)) - sasl_mechanism = self.config['sasl_mechanism'] if self.config['sasl_mechanism'] != "KERBEROS_V5" else "GSSAPI" - if sasl_mechanism not in response.enabled_mechanisms: + + if self.config['sasl_mechanism'] not in response.enabled_mechanisms: return future.failure( Errors.UnsupportedSaslMechanismError( 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s' @@ -574,8 +562,6 @@ def _handle_sasl_handshake_response(self, future, response): return self._try_authenticate_plain(future) elif self.config['sasl_mechanism'] == 'GSSAPI': return self._try_authenticate_gssapi(future) - elif self.config['sasl_mechanism'] == 'KERBEROS_V5': - return self._try_authenticate_kerberos_v5(future) elif self.config['sasl_mechanism'] == 'OAUTHBEARER': return self._try_authenticate_oauth(future) elif self.config['sasl_mechanism'].startswith("SCRAM-SHA-"): @@ -728,13 +714,20 @@ def _try_authenticate_scram(self, future): return future.success(True) def _try_authenticate_gssapi(self, future): - kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host - auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name + if gssapi is not None: + return self._try_authenticate_gssapi_gss_implementation(future) + + if kerberos_sspi is not None: + return self._try_authenticate_gssapi_sspi_implementation(future) + + def _try_authenticate_gssapi_gss_implementation(self, future): + kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host + auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_host_name gssapi_name = gssapi.Name( auth_id, name_type=gssapi.NameType.hostbased_service ).canonicalize(gssapi.MechType.kerberos) - log.debug('%s: GSSAPI name: %s', self, gssapi_name) + log.debug('%s: GSSAPI Service Principal Name: %s', self, gssapi_name) err = None close = False @@ -797,12 +790,17 @@ def _try_authenticate_gssapi(self, future): self.close(error=err) return future.failure(err) - log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) + log.info( + '%s: Authenticated as %s to %s via GSSAPI', + self, + client_ctx.initiator_name, + client_ctx.target_name + ) return future.success(True) - def _try_authenticate_kerberos_v5(self, future): - target_host = self.config['sasl_kerberos_domain_name'] or self.host - service_principal_name = self.config['sasl_kerberos_service_name'] + '@' + target_host + def _try_authenticate_gssapi_sspi_implementation(self, future): + kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host + service_principal_name = self.config['sasl_kerberos_service_name'] + '@' + kerberos_host_name err = None close = False @@ -816,18 +814,24 @@ def _try_authenticate_kerberos_v5(self, future): # Establish security context and negotiate protection level # For reference RFC 2222, section 7.2.1 - flags = kerberos.GSS_C_CONF_FLAG|kerberos.GSS_C_INTEG_FLAG|kerberos.GSS_C_MUTUAL_FLAG|kerberos.GSS_C_SEQUENCE_FLAG - res, client_ctx = kerberos.authGSSClientInit(service_principal_name, gssflags=flags) - assert res == kerberos.AUTH_GSS_COMPLETE + flags = \ + kerberos_sspi.GSS_C_CONF_FLAG | \ + kerberos_sspi.GSS_C_INTEG_FLAG | \ + kerberos_sspi.GSS_C_MUTUAL_FLAG | \ + kerberos_sspi.GSS_C_SEQUENCE_FLAG + + # Create a security context. + res, client_ctx = kerberos_sspi.authGSSClientInit(service_principal_name, gssflags=flags) + assert res == kerberos_sspi.AUTH_GSS_COMPLETE - res = kerberos.AUTH_GSS_CONTINUE + res = kerberos_sspi.AUTH_GSS_CONTINUE received_token = b"" # Exchange tokens until authentication either succeeds or fails krb_round = 0 - while res == kerberos.AUTH_GSS_CONTINUE: + while res == kerberos_sspi.AUTH_GSS_CONTINUE: krb_round += 1 log.debug(f"Round {krb_round}") - res = kerberos.authGSSClientStep(client_ctx, kerberos.encodestring(received_token)) + res = kerberos_sspi.authGSSClientStep(client_ctx, kerberos_sspi.encodestring(received_token)) if res == -1: raise RuntimeError("Client Step Error", res) @@ -835,7 +839,7 @@ def _try_authenticate_kerberos_v5(self, future): # pass output token to kafka, or send empty response if the security # context is complete (output token is None in that case) - if res != kerberos.AUTH_GSS_CONTINUE: + if res != kerberos_sspi.AUTH_GSS_CONTINUE: self._send_bytes_blocking(Int32.encode(0)) else: msg = output_token @@ -844,7 +848,7 @@ def _try_authenticate_kerberos_v5(self, future): # The server will send a token back. Processing of this token either # establishes a security context, or it needs further token exchange. - # The gssapi will be able to identify the needed next step. + # The remote gssapi will be able to identify the needed next step. # The connection is closed on failure. header = self._recv_bytes_blocking(4) (token_size,) = struct.unpack('>i', header) @@ -854,7 +858,7 @@ def _try_authenticate_kerberos_v5(self, future): # once the security context is established. # unwraps message containing supported protection levels and msg size - kerberos.authGSSClientUnwrap(client_ctx, kerberos.encodestring(received_token)) + kerberos_sspi.authGSSClientUnwrap(client_ctx, kerberos_sspi.encodestring(received_token)) msg = client_ctx["response"] # Kafka currently doesn't support integrity or confidentiality security layers, so we @@ -864,24 +868,7 @@ def _try_authenticate_kerberos_v5(self, future): msg += service_principal_name.encode("utf-8") # add authorization identity to the response, GSS-wrap and send it - - - # import sspicon, win32security - # pkg_size_info = client_ctx["csa"].ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES) - # trailersize=pkg_size_info['SecurityTrailer'] - # - # encbuf=win32security.PySecBufferDescType() - # encbuf.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA)) - # encbuf.append(win32security.PySecBufferType(trailersize, sspicon.SECBUFFER_TOKEN)) - # encbuf[0].Buffer=msg - # client_ctx["csa"].ctxt.EncryptMessage(0,encbuf,client_ctx["csa"]._get_next_seq_num()) - # - # msg = encbuf[0].Buffer #+ encbuf[1].Buffer - # # return encbuf[0].Buffer, encbuf[1].Buffer - - - - kerberos.authGSSClientWrap(client_ctx, kerberos.encodestring(msg), service_principal_name) + kerberos_sspi.authGSSClientWrap(client_ctx, kerberos_sspi.encodestring(msg), service_principal_name) msg = client_ctx["response"] size = Int32.encode(len(msg)) @@ -892,7 +879,6 @@ def _try_authenticate_kerberos_v5(self, future): err = Errors.KafkaConnectionError("%s: %s" % (self, e)) close = True except Exception as e: - # raise err = e close = True @@ -901,7 +887,13 @@ def _try_authenticate_kerberos_v5(self, future): self.close(error=err) return future.failure(err) - log.info('%s: Authenticated as %s via GSSAPI', self, service_principal_name) + log.info( + '%s: Authenticated as %s to %s via Windows SSPI', + self, + kerberos_sspi.authGSSClientUserName(client_ctx), + kerberos_sspi.authGSSServerTargetName(client_ctx), # incomplete API... + ) + return future.success(True) diff --git a/setup.py b/setup.py index fe8a594f3..208cfced5 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,8 @@ def run(cls): "lz4": ["lz4"], "snappy": ["python-snappy"], "zstd": ["python-zstandard"], + "gssapi": ["gssapi"], + "gssapi_sspi": ["kerberos-sspi"], }, cmdclass={"test": Tox}, packages=find_packages(exclude=['test']), From 157476858ac22ac7628217236257c5a0a12b4580 Mon Sep 17 00:00:00 2001 From: Emmanuel Coirier Date: Thu, 1 Apr 2021 11:19:07 +0200 Subject: [PATCH 3/8] Remove dependency on kerberso_sspi. Implement SSPI authentication directly with pywin32. --- kafka/conn.py | 157 ++++++++++++++++++++++++++++++++++++-------------- setup.py | 2 +- 2 files changed, 116 insertions(+), 43 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index e5e334bd6..acf17288f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -84,9 +84,12 @@ class SSLWantWriteError(Exception): try: - import kerberos_sspi + import sspi + import pywintypes + import sspicon + import win32security except ImportError: - kerberos_sspi = None + sspi = None AFI_NAMES = { socket.AF_UNSPEC: "unspecified", @@ -274,7 +277,7 @@ def __init__(self, host, port, afi, **configs): 'sasl_plain_password required for PLAIN or SCRAM sasl' ) if self.config['sasl_mechanism'] == 'GSSAPI': - if gssapi is None and kerberos_sspi is None: + if gssapi is None and sspi is None: raise AssertionError('No GSSAPI lib available') assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' if self.config['sasl_mechanism'] == 'OAUTHBEARER': @@ -717,7 +720,7 @@ def _try_authenticate_gssapi(self, future): if gssapi is not None: return self._try_authenticate_gssapi_gss_implementation(future) - if kerberos_sspi is not None: + if sspi is not None: return self._try_authenticate_gssapi_sspi_implementation(future) def _try_authenticate_gssapi_gss_implementation(self, future): @@ -799,78 +802,93 @@ def _try_authenticate_gssapi_gss_implementation(self, future): return future.success(True) def _try_authenticate_gssapi_sspi_implementation(self, future): + global log_sspi + log_sspi = logging.getLogger("kafka.client.sspi") kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host - service_principal_name = self.config['sasl_kerberos_service_name'] + '@' + kerberos_host_name + service_principal_name = self.config['sasl_kerberos_service_name'] + '/' + kerberos_host_name + scheme = "Kerberos" # Do not try with Negotiate that comes with a different protocol than SASL + # https://docs.microsoft.com/en-us/windows/win32/secauthn/context-requirements + flags = ( + sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication + sspicon.ISC_REQ_INTEGRITY | # check for integrity + sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages + sspicon.ISC_REQ_CONFIDENTIALITY # request confidentiality + ) err = None close = False - with self._lock: if not self._can_send_recv(): err = Errors.NodeNotReadyError(str(self)) close = False else: + # Establish security context and negotiate protection level + # For reference see RFC 4752, section 3 try: + log_sspi.debug("Create client security context") + # instantiate sspi context + client_ctx = sspi.ClientAuth( + scheme, + targetspn=service_principal_name, + scflags=flags, + ) + # Print some SSPI implementation + log_sspi.info("Using %s SSPI Security Package (%s)", client_ctx.pkg_info["Name"], client_ctx.pkg_info["Comment"]) - # Establish security context and negotiate protection level - # For reference RFC 2222, section 7.2.1 - flags = \ - kerberos_sspi.GSS_C_CONF_FLAG | \ - kerberos_sspi.GSS_C_INTEG_FLAG | \ - kerberos_sspi.GSS_C_MUTUAL_FLAG | \ - kerberos_sspi.GSS_C_SEQUENCE_FLAG - - # Create a security context. - res, client_ctx = kerberos_sspi.authGSSClientInit(service_principal_name, gssflags=flags) - assert res == kerberos_sspi.AUTH_GSS_COMPLETE - - res = kerberos_sspi.AUTH_GSS_CONTINUE - received_token = b"" # Exchange tokens until authentication either succeeds or fails - krb_round = 0 - while res == kerberos_sspi.AUTH_GSS_CONTINUE: - krb_round += 1 - log.debug(f"Round {krb_round}") - res = kerberos_sspi.authGSSClientStep(client_ctx, kerberos_sspi.encodestring(received_token)) - if res == -1: - raise RuntimeError("Client Step Error", res) - - output_token = client_ctx["response"] # get the binary data, not a base64 encoded version + log_sspi.debug("Begining rounds...") + received_token = None # no token to pass when initiating the first round + while not client_ctx.authenticated: + # calculate an output token from kafka token (or None on first iteration) + # https://docs.microsoft.com/en-us/windows/win32/api/sspi/nf-sspi-initializesecuritycontexta + # https://docs.microsoft.com/en-us/windows/win32/secauthn/initializesecuritycontext--kerberos + # authorize method will wrap for us our token in sspi structures + log_sspi.debug("Exchange a token") + error, auth = client_ctx.authorize(received_token) + if len(auth) > 0 and len(auth[0].Buffer): + log_sspi.debug("Got token from context") + # this buffer must be sent to the server whatever the result is + output_token = auth[0].Buffer + else: + log_sspi.debug("Got no token, exchange finished") + # seems to be the end of the loop + output_token = None # pass output token to kafka, or send empty response if the security # context is complete (output token is None in that case) - if res != kerberos_sspi.AUTH_GSS_CONTINUE: + if output_token is None: + log_sspi.debug("Sending end of exchange to server") self._send_bytes_blocking(Int32.encode(0)) else: + log_sspi.debug("Sending token from local context to server") msg = output_token size = Int32.encode(len(msg)) self._send_bytes_blocking(size + msg) # The server will send a token back. Processing of this token either # establishes a security context, or it needs further token exchange. - # The remote gssapi will be able to identify the needed next step. + # The gssapi will be able to identify the needed next step. # The connection is closed on failure. header = self._recv_bytes_blocking(4) (token_size,) = struct.unpack('>i', header) received_token = self._recv_bytes_blocking(token_size) + log_sspi.debug("Received token from server (size %s)", token_size) + sspi_amend_ctx_metadata(client_ctx) # Process the security layer negotiation token, sent by the server # once the security context is established. # unwraps message containing supported protection levels and msg size - kerberos_sspi.authGSSClientUnwrap(client_ctx, kerberos_sspi.encodestring(received_token)) - msg = client_ctx["response"] + msg = sspi_gss_unwrap_step(client_ctx, received_token) # Kafka currently doesn't support integrity or confidentiality security layers, so we # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed # by the server msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:] - msg += service_principal_name.encode("utf-8") - # add authorization identity to the response, GSS-wrap and send it - - kerberos_sspi.authGSSClientWrap(client_ctx, kerberos_sspi.encodestring(msg), service_principal_name) - msg = client_ctx["response"] + # add authorization identity to the response, GSS-wrap and send it + msg = msg + service_principal_name.encode("utf-8") + msg = sspi_gss_wrap_step(client_ctx, msg) size = Int32.encode(len(msg)) self._send_bytes_blocking(size + msg) @@ -887,13 +905,13 @@ def _try_authenticate_gssapi_sspi_implementation(self, future): self.close(error=err) return future.failure(err) + # noinspection PyUnresolvedReferences log.info( - '%s: Authenticated as %s to %s via Windows SSPI', + '%s: Authenticated as %s to %s via SSPI/GSSAPI \\o/', self, - kerberos_sspi.authGSSClientUserName(client_ctx), - kerberos_sspi.authGSSServerTargetName(client_ctx), # incomplete API... + client_ctx.initiator_name, + client_ctx.service_name ) - return future.success(True) @@ -1648,3 +1666,58 @@ def dns_lookup(host, port, afi=socket.AF_UNSPEC): ' correct and resolvable?', host, port, ex) return [] + + +# noinspection PyUnresolvedReferences +def sspi_gss_unwrap_step(sec_ctx, token): + """ + GSSAPI's unwrap with SSPI. + """ + buffer = win32security.PySecBufferDescType() + # Stream is a token coming from the other side + buffer.append(win32security.PySecBufferType(len(token), sspicon.SECBUFFER_STREAM)) + buffer[0].Buffer = token + # Will receive the clear, or just unwrapped text if no encryption was used. + # Will be resized. + buffer.append(win32security.PySecBufferType(0, sspicon.SECBUFFER_DATA)) + + pfQOP = sec_ctx.ctxt.DecryptMessage(buffer, sec_ctx._get_next_seq_num()) + if pfQOP == sspicon.SECQOP_WRAP_NO_ENCRYPT: + log.debug("Received token was not encrypted") + r = buffer[1].Buffer + return r + + +def sspi_gss_wrap_step(sec_ctx, msg, encrypt=False): + """ + GSSAPI's wrap with SSPI. + """ + + size_info = sec_ctx.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES) + trailer_size = size_info['SecurityTrailer'] + block_size = size_info['BlockSize'] + + buffer = win32security.PySecBufferDescType() + + buffer.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA)) + buffer[0].Buffer = msg + + # Will receive the token that forms the beginning of the msg + buffer.append(win32security.PySecBufferType(trailer_size, sspicon.SECBUFFER_TOKEN)) + + buffer.append(win32security.PySecBufferType(block_size, sspicon.SECBUFFER_PADDING)) + + fQOP = 0 if encrypt else sspicon.SECQOP_WRAP_NO_ENCRYPT + sec_ctx.ctxt.EncryptMessage(fQOP, buffer, sec_ctx._get_next_seq_num()) + # Sec token, then data, then padding + r = buffer[1].Buffer + buffer[0].Buffer + buffer[2].Buffer + return r + + +def sspi_amend_ctx_metadata(sec_ctx): + """Adds initiator and service names in the security context for ease of use""" + if not sec_ctx.authenticated: + raise ValueError("Sec context is not completly authenticated") + + names = sec_ctx.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_NATIVE_NAMES) + sec_ctx.initiator_name, sec_ctx.service_name = names diff --git a/setup.py b/setup.py index 208cfced5..33690d406 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ def run(cls): "snappy": ["python-snappy"], "zstd": ["python-zstandard"], "gssapi": ["gssapi"], - "gssapi_sspi": ["kerberos-sspi"], + "gssapi_sspi": ["pywin32"], }, cmdclass={"test": Tox}, packages=find_packages(exclude=['test']), From 91854c2afda73fe2420e9b15c6d5bc2adc55c4af Mon Sep 17 00:00:00 2001 From: Emmanuel Coirier Date: Thu, 1 Apr 2021 14:59:02 +0200 Subject: [PATCH 4/8] Trying to guess why the Travis build failed... --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 33690d406..80b8d9a27 100644 --- a/setup.py +++ b/setup.py @@ -41,8 +41,8 @@ def run(cls): "lz4": ["lz4"], "snappy": ["python-snappy"], "zstd": ["python-zstandard"], - "gssapi": ["gssapi"], - "gssapi_sspi": ["pywin32"], + # "gssapi": ["gssapi"], + # "gssapi_sspi": ["pywin32"], }, cmdclass={"test": Tox}, packages=find_packages(exclude=['test']), From 6a820bef47f4a4d37c3d84dbaaa040ccfe7abbaf Mon Sep 17 00:00:00 2001 From: Emmanuel Coirier Date: Mon, 12 Apr 2021 15:06:03 +0200 Subject: [PATCH 5/8] Fix some typo --- kafka/conn.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index acf17288f..af4e35922 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -803,16 +803,16 @@ def _try_authenticate_gssapi_gss_implementation(self, future): def _try_authenticate_gssapi_sspi_implementation(self, future): global log_sspi - log_sspi = logging.getLogger("kafka.client.sspi") + log_sspi = logging.getLogger("kafka.conn.sspi") kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host service_principal_name = self.config['sasl_kerberos_service_name'] + '/' + kerberos_host_name - scheme = "Kerberos" # Do not try with Negotiate that comes with a different protocol than SASL + scheme = "Kerberos" # Do not try with Negotiate for SASL authentication. Tokens are different. # https://docs.microsoft.com/en-us/windows/win32/secauthn/context-requirements flags = ( - sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication - sspicon.ISC_REQ_INTEGRITY | # check for integrity - sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages - sspicon.ISC_REQ_CONFIDENTIALITY # request confidentiality + sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication + sspicon.ISC_REQ_INTEGRITY | # check for integrity + sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages + sspicon.ISC_REQ_CONFIDENTIALITY # request confidentiality ) err = None @@ -836,7 +836,7 @@ def _try_authenticate_gssapi_sspi_implementation(self, future): log_sspi.info("Using %s SSPI Security Package (%s)", client_ctx.pkg_info["Name"], client_ctx.pkg_info["Comment"]) # Exchange tokens until authentication either succeeds or fails - log_sspi.debug("Begining rounds...") + log_sspi.debug("Beginning rounds...") received_token = None # no token to pass when initiating the first round while not client_ctx.authenticated: # calculate an output token from kafka token (or None on first iteration) @@ -874,10 +874,15 @@ def _try_authenticate_gssapi_sspi_implementation(self, future): received_token = self._recv_bytes_blocking(token_size) log_sspi.debug("Received token from server (size %s)", token_size) + # Add some extra attributes to the context sspi_amend_ctx_metadata(client_ctx) + # Process the security layer negotiation token, sent by the server # once the security context is established. + # The following part is required by SASL, but not by classic Kerberos. + # See RFC 4752 + # unwraps message containing supported protection levels and msg size msg = sspi_gss_unwrap_step(client_ctx, received_token) @@ -1674,16 +1679,17 @@ def sspi_gss_unwrap_step(sec_ctx, token): GSSAPI's unwrap with SSPI. """ buffer = win32security.PySecBufferDescType() - # Stream is a token coming from the other side + # This buffer contains a stream, which is a token coming from the other side buffer.append(win32security.PySecBufferType(len(token), sspicon.SECBUFFER_STREAM)) buffer[0].Buffer = token - # Will receive the clear, or just unwrapped text if no encryption was used. + + # This buffer will receive the clear, or just unwrapped text if no encryption was used. # Will be resized. buffer.append(win32security.PySecBufferType(0, sspicon.SECBUFFER_DATA)) pfQOP = sec_ctx.ctxt.DecryptMessage(buffer, sec_ctx._get_next_seq_num()) if pfQOP == sspicon.SECQOP_WRAP_NO_ENCRYPT: - log.debug("Received token was not encrypted") + log_sspi.debug("Received token was not encrypted") r = buffer[1].Buffer return r @@ -1699,12 +1705,14 @@ def sspi_gss_wrap_step(sec_ctx, msg, encrypt=False): buffer = win32security.PySecBufferDescType() + # This buffer will contain unencrypted data to wrap, and maybe encrypt. buffer.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA)) buffer[0].Buffer = msg # Will receive the token that forms the beginning of the msg buffer.append(win32security.PySecBufferType(trailer_size, sspicon.SECBUFFER_TOKEN)) + # The trailer is needed in case of block encryption buffer.append(win32security.PySecBufferType(block_size, sspicon.SECBUFFER_PADDING)) fQOP = 0 if encrypt else sspicon.SECQOP_WRAP_NO_ENCRYPT @@ -1717,7 +1725,7 @@ def sspi_gss_wrap_step(sec_ctx, msg, encrypt=False): def sspi_amend_ctx_metadata(sec_ctx): """Adds initiator and service names in the security context for ease of use""" if not sec_ctx.authenticated: - raise ValueError("Sec context is not completly authenticated") + raise ValueError("Sec context is not completely authenticated") names = sec_ctx.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_NATIVE_NAMES) sec_ctx.initiator_name, sec_ctx.service_name = names From 1e1eb5c4847b214312221a86b91cb2c74ad15559 Mon Sep 17 00:00:00 2001 From: Emmanuel Coirier Date: Mon, 12 Apr 2021 17:46:04 +0200 Subject: [PATCH 6/8] Adds reference material from Microsoft. --- kafka/conn.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/conn.py b/kafka/conn.py index af4e35922..a7f088a8e 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1677,6 +1677,7 @@ def dns_lookup(host, port, afi=socket.AF_UNSPEC): def sspi_gss_unwrap_step(sec_ctx, token): """ GSSAPI's unwrap with SSPI. + https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi """ buffer = win32security.PySecBufferDescType() # This buffer contains a stream, which is a token coming from the other side @@ -1697,6 +1698,7 @@ def sspi_gss_unwrap_step(sec_ctx, token): def sspi_gss_wrap_step(sec_ctx, msg, encrypt=False): """ GSSAPI's wrap with SSPI. + https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi """ size_info = sec_ctx.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES) From bd41b04b01b4934681e5fac7cf690876965db9b4 Mon Sep 17 00:00:00 2001 From: Emmanuel Coirier Date: Wed, 21 Jul 2021 11:11:46 +0200 Subject: [PATCH 7/8] Remove helper code that is now included in pywin32. --- kafka/conn.py | 67 ++------------------------------------------------- 1 file changed, 2 insertions(+), 65 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index a7f088a8e..a3bf50961 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -874,9 +874,6 @@ def _try_authenticate_gssapi_sspi_implementation(self, future): received_token = self._recv_bytes_blocking(token_size) log_sspi.debug("Received token from server (size %s)", token_size) - # Add some extra attributes to the context - sspi_amend_ctx_metadata(client_ctx) - # Process the security layer negotiation token, sent by the server # once the security context is established. @@ -884,7 +881,7 @@ def _try_authenticate_gssapi_sspi_implementation(self, future): # See RFC 4752 # unwraps message containing supported protection levels and msg size - msg = sspi_gss_unwrap_step(client_ctx, received_token) + msg, was_encrypted = client_ctx.unwrap(received_token) # Kafka currently doesn't support integrity or confidentiality security layers, so we # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed @@ -893,7 +890,7 @@ def _try_authenticate_gssapi_sspi_implementation(self, future): # add authorization identity to the response, GSS-wrap and send it msg = msg + service_principal_name.encode("utf-8") - msg = sspi_gss_wrap_step(client_ctx, msg) + msg = client_ctx.wrap(msg) size = Int32.encode(len(msg)) self._send_bytes_blocking(size + msg) @@ -1671,63 +1668,3 @@ def dns_lookup(host, port, afi=socket.AF_UNSPEC): ' correct and resolvable?', host, port, ex) return [] - - -# noinspection PyUnresolvedReferences -def sspi_gss_unwrap_step(sec_ctx, token): - """ - GSSAPI's unwrap with SSPI. - https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi - """ - buffer = win32security.PySecBufferDescType() - # This buffer contains a stream, which is a token coming from the other side - buffer.append(win32security.PySecBufferType(len(token), sspicon.SECBUFFER_STREAM)) - buffer[0].Buffer = token - - # This buffer will receive the clear, or just unwrapped text if no encryption was used. - # Will be resized. - buffer.append(win32security.PySecBufferType(0, sspicon.SECBUFFER_DATA)) - - pfQOP = sec_ctx.ctxt.DecryptMessage(buffer, sec_ctx._get_next_seq_num()) - if pfQOP == sspicon.SECQOP_WRAP_NO_ENCRYPT: - log_sspi.debug("Received token was not encrypted") - r = buffer[1].Buffer - return r - - -def sspi_gss_wrap_step(sec_ctx, msg, encrypt=False): - """ - GSSAPI's wrap with SSPI. - https://docs.microsoft.com/en-us/windows/win32/secauthn/sspi-kerberos-interoperability-with-gssapi - """ - - size_info = sec_ctx.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_SIZES) - trailer_size = size_info['SecurityTrailer'] - block_size = size_info['BlockSize'] - - buffer = win32security.PySecBufferDescType() - - # This buffer will contain unencrypted data to wrap, and maybe encrypt. - buffer.append(win32security.PySecBufferType(len(msg), sspicon.SECBUFFER_DATA)) - buffer[0].Buffer = msg - - # Will receive the token that forms the beginning of the msg - buffer.append(win32security.PySecBufferType(trailer_size, sspicon.SECBUFFER_TOKEN)) - - # The trailer is needed in case of block encryption - buffer.append(win32security.PySecBufferType(block_size, sspicon.SECBUFFER_PADDING)) - - fQOP = 0 if encrypt else sspicon.SECQOP_WRAP_NO_ENCRYPT - sec_ctx.ctxt.EncryptMessage(fQOP, buffer, sec_ctx._get_next_seq_num()) - # Sec token, then data, then padding - r = buffer[1].Buffer + buffer[0].Buffer + buffer[2].Buffer - return r - - -def sspi_amend_ctx_metadata(sec_ctx): - """Adds initiator and service names in the security context for ease of use""" - if not sec_ctx.authenticated: - raise ValueError("Sec context is not completely authenticated") - - names = sec_ctx.ctxt.QueryContextAttributes(sspicon.SECPKG_ATTR_NATIVE_NAMES) - sec_ctx.initiator_name, sec_ctx.service_name = names From 5574e911fdfac1013d6d802104a648f87dc44d26 Mon Sep 17 00:00:00 2001 From: Emmanuel Coirier Date: Wed, 28 Jul 2021 10:38:20 +0200 Subject: [PATCH 8/8] Better extras_requires handling. --- setup.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 80b8d9a27..a80f77b96 100644 --- a/setup.py +++ b/setup.py @@ -41,8 +41,13 @@ def run(cls): "lz4": ["lz4"], "snappy": ["python-snappy"], "zstd": ["python-zstandard"], - # "gssapi": ["gssapi"], - # "gssapi_sspi": ["pywin32"], + "gssapi": ["gssapi"], + "sspi": ["pywin32 >= 301"], + "krb5_auto": [ + 'pywin32>=301;platform_system=="Windows"', + 'gssapi;platform_system=="Linux"', + ], + }, cmdclass={"test": Tox}, packages=find_packages(exclude=['test']), @@ -58,8 +63,6 @@ def run(cls): "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", - "Programming Language :: Python :: 2", - "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5",