From e1c7f2b2d28945ccd59ffa5d84eb273408df1a4d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 22 May 2025 13:47:01 -0700 Subject: [PATCH 1/5] Slight gssapi init refactor to make mock easier --- kafka/sasl/gssapi.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index be84269da..77fe0ee95 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -26,14 +26,15 @@ def __init__(self, **config): raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration') self._is_done = False self._is_authenticated = False + self.gssapi_name = None if config.get('sasl_kerberos_name', None) is not None: self.auth_id = str(config['sasl_kerberos_name']) + if isinstance(config['sasl_kerberos_name'], gssapi.Name): + self.gssapi_name = config['sasl_kerberos_name'] else: kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '') self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_domain_name - if isinstance(config.get('sasl_kerberos_name', None), gssapi.Name): - self.gssapi_name = config['sasl_kerberos_name'] - else: + if self.gssapi_name is None: self.gssapi_name = gssapi.Name(self.auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos) self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate') self._next_token = self._client_ctx.step(None) From 30d839fe832764ca1938a9cc12edfd2c016dca2b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 22 May 2025 13:21:13 -0700 Subject: [PATCH 2/5] Add test (fails w/o changes) --- test/sasl/test_gssapi.py | 42 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 test/sasl/test_gssapi.py diff --git a/test/sasl/test_gssapi.py b/test/sasl/test_gssapi.py new file mode 100644 index 000000000..893414e37 --- /dev/null +++ b/test/sasl/test_gssapi.py @@ -0,0 +1,42 @@ +from __future__ import absolute_import + +try: + from unittest import mock +except ImportError: + import mock + +from kafka.sasl import get_sasl_mechanism +import kafka.sasl.gssapi + + +def test_gssapi(): + config = { + 'sasl_kerberos_domain_name': 'foo', + 'sasl_kerberos_service_name': 'bar', + } + client_ctx = mock.Mock() + client_ctx.step.side_effect = [b'init', b'exchange', b'complete', b'xxxx'] + client_ctx.complete = False + def mocked_message_wrapper(msg, *args): + wrapped = mock.Mock() + type(wrapped).message = mock.PropertyMock(return_value=msg) + return wrapped + client_ctx.unwrap.side_effect = mocked_message_wrapper + client_ctx.wrap.side_effect = mocked_message_wrapper + kafka.sasl.gssapi.gssapi = mock.Mock() + kafka.sasl.gssapi.gssapi.SecurityContext.return_value = client_ctx + gssapi = get_sasl_mechanism('GSSAPI')(**config) + assert isinstance(gssapi, kafka.sasl.gssapi.SaslMechanismGSSAPI) + client_ctx.step.assert_called_with(None) + + while not gssapi.is_done(): + send_token = gssapi.auth_bytes() + receive_token = send_token # not realistic, but enough for testing + if send_token == b'\x00cbar@foo': # final wrapped message + receive_token = b'' # final message gets an empty response + gssapi.receive(receive_token) + if client_ctx.step.call_count == 3: + client_ctx.complete = True + + assert gssapi.is_done() + assert gssapi.is_authenticated() From e0104cbcc656c3bc5d27673a33d3a25debb5736b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 22 May 2025 08:49:06 -0700 Subject: [PATCH 3/5] Fix sasl gssapi plugin: set _is_done after generating final message --- kafka/sasl/gssapi.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index 77fe0ee95..0129976f4 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -40,12 +40,7 @@ def __init__(self, **config): self._next_token = self._client_ctx.step(None) def auth_bytes(self): - # GSSAPI Auth does not have a final broker->client message - # so mark is_done after the final auth_bytes are provided - # in practice we'll still receive a response when using SaslAuthenticate - # but not when using the prior unframed approach. - if self._client_ctx.complete: - self._is_done = True + if self._is_done: self._is_authenticated = True return self._next_token or b'' @@ -75,6 +70,11 @@ def receive(self, auth_bytes): ] # add authorization identity to the response, and GSS-wrap self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message + # GSSAPI Auth does not have a final broker->client message + # so mark is_done after the final token is generated + # in practice we'll still receive a response when using SaslAuthenticate + # but not when using the prior unframed approach. + self._is_done = True def is_done(self): return self._is_done From d30d32078f2223bf63992ccc5811ef4c9497df78 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 22 May 2025 09:45:34 -0700 Subject: [PATCH 4/5] set is_authenticated on final receive; is_done on final send --- kafka/sasl/gssapi.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index 0129976f4..17193c55e 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -40,8 +40,8 @@ def __init__(self, **config): self._next_token = self._client_ctx.step(None) def auth_bytes(self): - if self._is_done: - self._is_authenticated = True + if self._is_authenticated: + self._is_done = True return self._next_token or b'' def receive(self, auth_bytes): @@ -71,10 +71,12 @@ def receive(self, auth_bytes): # add authorization identity to the response, and GSS-wrap self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message # GSSAPI Auth does not have a final broker->client message - # so mark is_done after the final token is generated + # so we need to be able to identify when the final token is generated + # here we set _is_authenticated after receiving the final response, + # but wait until the final send (auth_bytes() call) to set _is_done. # in practice we'll still receive a response when using SaslAuthenticate # but not when using the prior unframed approach. - self._is_done = True + self._is_authenticated = True def is_done(self): return self._is_done From e6d0daf648e8d97a5b34106b6e112ec90f420a6b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 22 May 2025 09:54:13 -0700 Subject: [PATCH 5/5] Update comments --- kafka/sasl/gssapi.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index 17193c55e..6a4896585 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -40,6 +40,10 @@ def __init__(self, **config): self._next_token = self._client_ctx.step(None) def auth_bytes(self): + # GSSAPI Auth does not have a final broker->client message + # so mark is_done after the final auth_bytes are provided + # in practice we'll still receive a response when using SaslAuthenticate + # but not when using the prior unframed approach. if self._is_authenticated: self._is_done = True return self._next_token or b'' @@ -70,12 +74,12 @@ def receive(self, auth_bytes): ] # add authorization identity to the response, and GSS-wrap self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message - # GSSAPI Auth does not have a final broker->client message - # so we need to be able to identify when the final token is generated - # here we set _is_authenticated after receiving the final response, - # but wait until the final send (auth_bytes() call) to set _is_done. - # in practice we'll still receive a response when using SaslAuthenticate - # but not when using the prior unframed approach. + # We need to identify the last token in auth_bytes(); + # we can't rely on client_ctx.complete because it becomes True after generating + # the second-to-last token (after calling .step(auth_bytes) for the final time) + # We could introduce an additional state variable (i.e., self._final_token), + # but instead we just set _is_authenticated. Since the plugin interface does + # not read is_authenticated() until after is_done() is True, this should be fine. self._is_authenticated = True def is_done(self):