diff --git a/kafka/conn.py b/kafka/conn.py index cac354875..a3bf50961 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -79,11 +79,18 @@ class SSLWantWriteError(Exception): import gssapi from gssapi.raw.misc import GSSError except ImportError: - #no gssapi available, will disable gssapi mechanism gssapi = None GSSError = None +try: + import sspi + import pywintypes + import sspicon + import win32security +except ImportError: + sspi = None + AFI_NAMES = { socket.AF_UNSPEC: "unspecified", socket.AF_INET: "IPv4", @@ -270,7 +277,8 @@ def __init__(self, host, port, afi, **configs): 'sasl_plain_password required for PLAIN or SCRAM sasl' ) if self.config['sasl_mechanism'] == 'GSSAPI': - assert gssapi is not None, 'GSSAPI lib not available' + if gssapi is None and sspi is None: + raise AssertionError('No GSSAPI lib available') assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' if self.config['sasl_mechanism'] == 'OAUTHBEARER': token_provider = self.config['sasl_oauth_token_provider'] @@ -709,13 +717,20 @@ def _try_authenticate_scram(self, future): return future.success(True) def _try_authenticate_gssapi(self, future): - kerberos_damin_name = self.config['sasl_kerberos_domain_name'] or self.host - auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_damin_name + if gssapi is not None: + return self._try_authenticate_gssapi_gss_implementation(future) + + if sspi is not None: + return self._try_authenticate_gssapi_sspi_implementation(future) + + def _try_authenticate_gssapi_gss_implementation(self, future): + kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host + auth_id = self.config['sasl_kerberos_service_name'] + '@' + kerberos_host_name gssapi_name = gssapi.Name( auth_id, name_type=gssapi.NameType.hostbased_service ).canonicalize(gssapi.MechType.kerberos) - log.debug('%s: GSSAPI name: %s', self, gssapi_name) + log.debug('%s: GSSAPI Service Principal Name: %s', self, gssapi_name) err = None close = False @@ -778,9 +793,130 @@ def _try_authenticate_gssapi(self, future): self.close(error=err) return future.failure(err) - log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) + log.info( + '%s: Authenticated as %s to %s via GSSAPI', + self, + client_ctx.initiator_name, + client_ctx.target_name + ) + return future.success(True) + + def _try_authenticate_gssapi_sspi_implementation(self, future): + global log_sspi + log_sspi = logging.getLogger("kafka.conn.sspi") + kerberos_host_name = self.config['sasl_kerberos_domain_name'] or self.host + service_principal_name = self.config['sasl_kerberos_service_name'] + '/' + kerberos_host_name + scheme = "Kerberos" # Do not try with Negotiate for SASL authentication. Tokens are different. + # https://docs.microsoft.com/en-us/windows/win32/secauthn/context-requirements + flags = ( + sspicon.ISC_REQ_MUTUAL_AUTH | # mutual authentication + sspicon.ISC_REQ_INTEGRITY | # check for integrity + sspicon.ISC_REQ_SEQUENCE_DETECT | # enable out-of-order messages + sspicon.ISC_REQ_CONFIDENTIALITY # request confidentiality + ) + + err = None + close = False + with self._lock: + if not self._can_send_recv(): + err = Errors.NodeNotReadyError(str(self)) + close = False + else: + # Establish security context and negotiate protection level + # For reference see RFC 4752, section 3 + try: + log_sspi.debug("Create client security context") + # instantiate sspi context + client_ctx = sspi.ClientAuth( + scheme, + targetspn=service_principal_name, + scflags=flags, + ) + # Print some SSPI implementation + log_sspi.info("Using %s SSPI Security Package (%s)", client_ctx.pkg_info["Name"], client_ctx.pkg_info["Comment"]) + + # Exchange tokens until authentication either succeeds or fails + log_sspi.debug("Beginning rounds...") + received_token = None # no token to pass when initiating the first round + while not client_ctx.authenticated: + # calculate an output token from kafka token (or None on first iteration) + # https://docs.microsoft.com/en-us/windows/win32/api/sspi/nf-sspi-initializesecuritycontexta + # https://docs.microsoft.com/en-us/windows/win32/secauthn/initializesecuritycontext--kerberos + # authorize method will wrap for us our token in sspi structures + log_sspi.debug("Exchange a token") + error, auth = client_ctx.authorize(received_token) + if len(auth) > 0 and len(auth[0].Buffer): + log_sspi.debug("Got token from context") + # this buffer must be sent to the server whatever the result is + output_token = auth[0].Buffer + else: + log_sspi.debug("Got no token, exchange finished") + # seems to be the end of the loop + output_token = None + + # pass output token to kafka, or send empty response if the security + # context is complete (output token is None in that case) + if output_token is None: + log_sspi.debug("Sending end of exchange to server") + self._send_bytes_blocking(Int32.encode(0)) + else: + log_sspi.debug("Sending token from local context to server") + msg = output_token + size = Int32.encode(len(msg)) + self._send_bytes_blocking(size + msg) + + # The server will send a token back. Processing of this token either + # establishes a security context, or it needs further token exchange. + # The gssapi will be able to identify the needed next step. + # The connection is closed on failure. + header = self._recv_bytes_blocking(4) + (token_size,) = struct.unpack('>i', header) + received_token = self._recv_bytes_blocking(token_size) + log_sspi.debug("Received token from server (size %s)", token_size) + + # Process the security layer negotiation token, sent by the server + # once the security context is established. + + # The following part is required by SASL, but not by classic Kerberos. + # See RFC 4752 + + # unwraps message containing supported protection levels and msg size + msg, was_encrypted = client_ctx.unwrap(received_token) + + # Kafka currently doesn't support integrity or confidentiality security layers, so we + # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed + # by the server + msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))) + msg[1:] + + # add authorization identity to the response, GSS-wrap and send it + msg = msg + service_principal_name.encode("utf-8") + msg = client_ctx.wrap(msg) + size = Int32.encode(len(msg)) + self._send_bytes_blocking(size + msg) + + except (ConnectionError, TimeoutError) as e: + log.exception("%s: Error receiving reply from server", self) + err = Errors.KafkaConnectionError("%s: %s" % (self, e)) + close = True + except Exception as e: + err = e + close = True + + if err is not None: + if close: + self.close(error=err) + return future.failure(err) + + # noinspection PyUnresolvedReferences + log.info( + '%s: Authenticated as %s to %s via SSPI/GSSAPI \\o/', + self, + client_ctx.initiator_name, + client_ctx.service_name + ) return future.success(True) + def _try_authenticate_oauth(self, future): data = b'' diff --git a/setup.py b/setup.py index fe8a594f3..a80f77b96 100644 --- a/setup.py +++ b/setup.py @@ -41,6 +41,13 @@ def run(cls): "lz4": ["lz4"], "snappy": ["python-snappy"], "zstd": ["python-zstandard"], + "gssapi": ["gssapi"], + "sspi": ["pywin32 >= 301"], + "krb5_auto": [ + 'pywin32>=301;platform_system=="Windows"', + 'gssapi;platform_system=="Linux"', + ], + }, cmdclass={"test": Tox}, packages=find_packages(exclude=['test']), @@ -56,8 +63,6 @@ def run(cls): "Intended Audience :: Developers", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", - "Programming Language :: Python :: 2", - "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.4", "Programming Language :: Python :: 3.5",