From 568f0e54fb182c878307e9547d9469956150944d Mon Sep 17 00:00:00 2001 From: Andrei Tsaregorodtsev Date: Thu, 14 Nov 2024 10:28:54 +0100 Subject: [PATCH 1/3] feat: introduce the DISET version of the TokenManager service --- .../Service/DisetTokenManagerHandler.py | 285 ++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 src/DIRAC/FrameworkSystem/Service/DisetTokenManagerHandler.py diff --git a/src/DIRAC/FrameworkSystem/Service/DisetTokenManagerHandler.py b/src/DIRAC/FrameworkSystem/Service/DisetTokenManagerHandler.py new file mode 100644 index 00000000000..746ed200336 --- /dev/null +++ b/src/DIRAC/FrameworkSystem/Service/DisetTokenManagerHandler.py @@ -0,0 +1,285 @@ +"""TokenManager service is responsible for token management, namely storing, updating, +requesting new tokens for DIRAC components that have the appropriate permissions. + +.. literalinclude:: ../ConfigTemplate.cfg + :start-after: ##BEGIN TokenManager: + :end-before: ##END + :dedent: 2 + :caption: TokenManager options + +The most common use of this service is to obtain tokens with certain scope to return to the user for its purposes, +or to provide to the DIRAC service to perform asynchronous tasks on behalf of the user. +This is mainly about the :py:meth:`export_getToken` method. + +.. image:: /_static/Systems/FS/TokenManager_getToken.png + :alt: https://dirac.readthedocs.io/en/integration/_images/TokenManager_getToken.png (source https://github.com/TaykYoku/DIRACIMGS/raw/main/TokenManagerService_getToken.ai) + +The client has a mechanism for caching the received tokens. +This helps reduce the number of requests to both the service and the Identity Provider (IdP). + +If the client has a valid **access token** in the cache, it is used until it expires. +After that you need to update. The client can update it independently if on the server where it is in ``dirac.cfg`` +``client_id`` and ``client_secret`` of the Identity Provider client are registered. + +Otherwise, the client makes an RPC call to the **TornadoManager** service. +The ``refresh token`` from :py:class:`TokenDB ` +is taken and the **exchange token** request to Identity Provider is made. +""" + +import pprint + +from DIRAC import S_OK, S_ERROR +from DIRAC.Core.Security import Properties +from DIRAC.Core.DISET.RequestHandler import RequestHandler +from DIRAC.FrameworkSystem.DB.TokenDB import TokenDB +from DIRAC.ConfigurationSystem.Client.Helpers import Registry +from DIRAC.Resources.IdProvider.IdProviderFactory import IdProviderFactory +from DIRAC.FrameworkSystem.Utilities.TokenManagementUtilities import ( + getIdProviderClient, + getCachedKey, +) + + +class TokenManagerHandlerMixin: + DEFAULT_AUTHORIZATION = ["authenticated"] + + @classmethod + def initializeHandler(cls, *args): + """Initialization + + :return: S_OK()/S_ERROR() + """ + + # The service plays an important OAuth 2.0 role, namely it is an Identity Provider client. + # This allows you to manage tokens without the involvement of their owners. + cls.idps = IdProviderFactory() + + # Let's try to connect to the database + try: + cls.__tokenDB = TokenDB(parentLogger=cls.log) + except Exception as e: + cls.log.exception(e) + return S_ERROR(f"Could not connect to the database {repr(e)}") + + return S_OK() + + auth_getUserTokensInfo = ["authenticated"] + types_getUserTokensInfo = [] + + def export_getUserTokensInfo(self): + """Generate information dict about user tokens + + :return: dict + """ + tokensInfo = [] + credDict = self.getRemoteCredentials() + result = Registry.getDNForUsername(credDict["username"]) + if not result["OK"]: + return result + for dn in result["Value"]: + result = Registry.getIDFromDN(dn) + if result["OK"]: + result = self.__tokenDB.getTokensByUserID(result["Value"]) + if not result["OK"]: + return result + tokensInfo += result["Value"] + return S_OK(tokensInfo) + + auth_getUsersTokensInfo = [Properties.PROXY_MANAGEMENT] + types_getUserTokensInfo = [list] + + def export_getUsersTokensInfo(self, users: list): + """Get the info about the user tokens in the database + + :param users: user names + + :return: S_OK(list) -- return list of tokens dictionaries + """ + tokensInfo = [] + for user in users: + # Find the user ID among his DNs + result = Registry.getDNForUsername(user) + if not result["OK"]: + return result + for dn in result["Value"]: + uid = Registry.getIDFromDN(dn).get("Value") + if uid: + result = self.__tokenDB.getTokensByUserID(uid) + if not result["OK"]: + self.log.error(result["Message"]) + else: + for tokenDict in result["Value"]: + if tokenDict not in tokensInfo: + # The database does not contain a username, + # as it is a unique user ID exclusively for DIRAC + # and is not associated with a token. + tokenDict["username"] = user + tokensInfo.append(tokenDict) + return S_OK(tokensInfo) + + types_updateToken = [dict, str, str, int] + + def export_updateToken(self, token: dict, userID: str, provider: str, rt_expired_in: int = 24 * 3600): + """Using this method, you can transfer user tokens for storage in the TokenManager. + + It is important to note that TokenManager saves only one token per user and, accordingly, + the Identity Provider from which it was issued. So when a new token is delegated, + keep in mind that the old token will be deleted. + + :param token: token + :param userID: user ID + :param provider: provider name + :param rt_expired_in: refresh token expires time (in seconds) + + :return: S_OK(list)/S_ERROR() -- list contain uploaded tokens info as dictionaries + """ + self.log.verbose(f"Update {userID} user token issued by {provider}:\n", pprint.pformat(token)) + # prepare the client instance of the appropriate IdP to revoke the old tokens + result = self.idps.getIdProvider(provider) + if not result["OK"]: + return result + idPObj = result["Value"] + # overwrite old tokens with new ones + result = self.__tokenDB.updateToken(token, userID, provider, rt_expired_in) + if not result["OK"]: + return result + # revoke the old tokens + for oldToken in result["Value"]: + if "refresh_token" in oldToken and oldToken["refresh_token"] != token["refresh_token"]: + self.log.verbose("Revoke old refresh token:\n", pprint.pformat(oldToken)) + idPObj.revokeToken(oldToken["refresh_token"]) + # Let's return to the current situation with the storage of user tokens + return self.__tokenDB.getTokensByUserID(userID) + + def __checkProperties(self, requestedUserDN: str, requestedUserGroup: str): + """Check the properties and return if they can only download limited tokens if authorized + + :param requestedUserDN: user DN + :param requestedUserGroup: DIRAC group + + :return: S_OK(bool)/S_ERROR() + """ + credDict = self.getRemoteCredentials() + if Properties.FULL_DELEGATION in credDict["properties"]: + return S_OK(False) + if Properties.LIMITED_DELEGATION in credDict["properties"]: + return S_OK(True) + if Properties.PRIVATE_LIMITED_DELEGATION in credDict["properties"]: + if credDict["DN"] != requestedUserDN: + return S_ERROR("You are not allowed to download any token") + if Properties.PRIVATE_LIMITED_DELEGATION not in Registry.getPropertiesForGroup(requestedUserGroup): + return S_ERROR("You can't download tokens for that group") + return S_OK(True) + # Not authorized! + return S_ERROR("You can't get tokens!") + + types_getToken = [None, None, None, None, None] + + def export_getToken( + self, + username: str = None, + userGroup: str = None, + scope: list[str] = None, + audience: str = None, + identityProvider: str = None, + requiredTimeLeft: int = 0, + ): + """Get an access token for a user/group. + + * Properties: + * FullDelegation <- permits full delegation of tokens + * LimitedDelegation <- permits downloading only limited tokens + * PrivateLimitedDelegation <- permits downloading only limited tokens for one self + + :param username: user name + :param userGroup: user group + :param scope: requested scope + :param audience: requested audience + :param identityProvider: Identity Provider name + :param requiredTimeLeft: requested minimum life time + + :return: S_OK(dict)/S_ERROR() + """ + # Get an IdProvider Client instance + result = getIdProviderClient(userGroup, identityProvider) + if not result["OK"]: + return result + idpObj = result["Value"] + + # getCachedKey is just used here to resolve the default scopes + _, scope, *_ = getCachedKey(idpObj, username, userGroup, scope, audience) + + # A client token is requested + if not username: + result = self.__checkProperties("", "") + if not result["OK"]: + return result + + # Get the client token with requested scope and audience + result = idpObj.fetchToken(grant_type="client_credentials", scope=scope, audience=audience) + if result["OK"]: + result["Value"] = dict(result["Value"]) + + return result + + # A user token is requested + err = [] + # No luck so far, let's refresh the token stored in the database + result = Registry.getDNForUsername(username) + if not result["OK"]: + return result + for dn in result["Value"]: + # For backward compatibility, the user ID is written as DN. So let's check if this DN contains a user ID + result = Registry.getIDFromDN(dn) + if result["OK"]: + uid = result["Value"] + # To do this, first find the refresh token stored in the database with the maximum scope + result = self.__tokenDB.getTokenForUserProvider(uid, idpObj.name) + if result["OK"] and result["Value"]: + tokens = result["Value"] + result = self.__checkProperties(dn, userGroup) + if result["OK"]: + # refresh token with requested scope + result = idpObj.refreshToken(tokens.get("refresh_token"), group=userGroup, scope=scope) + if result["OK"]: + return result + # Did not find any token associated with the found user ID + err.append(result.get("Message", f"No token found for {uid}")) + # Collect all errors when trying to get a token, or if no user ID is registered + return S_ERROR("; ".join(err or [f"No user ID found for {username}"])) + + types_deleteToken = [str] + + def export_deleteToken(self, userDN: str): + """Delete a token from the DB + + :param userDN: user DN + + :return: S_OK()/S_ERROR() + """ + + # temporary ugly stuff to make it compliant with proxy management + userDN = f"/O=DIRAC/CN={userDN}" + + # Delete it from cache + credDict = self.getRemoteCredentials() + if Properties.PROXY_MANAGEMENT not in credDict["properties"]: + if userDN != credDict["DN"]: + return S_ERROR("You aren't allowed!") + result = Registry.getIDFromDN(userDN) + return self.__tokenDB.removeToken(user_id=result["Value"]) if result["OK"] else result + + types_getTokensByUserID = [str] + + def export_getTokensByUserID(self, userID: str): + """Retrieve a token from the DB + + :param userID: user's token id + + :return: S_OK(list)/S_ERROR() token row in dict format + """ + return self.__tokenDB.getTokensByUserID(userID) + + +class DisetTokenManagerHandler(TokenManagerHandlerMixin, RequestHandler): + pass From 326e6ab816852278af27406a8846ec3bffae095c Mon Sep 17 00:00:00 2001 From: Andrei Tsaregorodtsev Date: Fri, 15 Nov 2024 13:48:12 +0100 Subject: [PATCH 2/3] fix: factorize TokenManagerHandlerMixin common class --- .../Service/DisetTokenManagerHandler.py | 253 +----------------- .../Service/TokenManagerHandler.py | 22 +- 2 files changed, 18 insertions(+), 257 deletions(-) diff --git a/src/DIRAC/FrameworkSystem/Service/DisetTokenManagerHandler.py b/src/DIRAC/FrameworkSystem/Service/DisetTokenManagerHandler.py index 746ed200336..fe9c1d90cfc 100644 --- a/src/DIRAC/FrameworkSystem/Service/DisetTokenManagerHandler.py +++ b/src/DIRAC/FrameworkSystem/Service/DisetTokenManagerHandler.py @@ -26,259 +26,8 @@ is taken and the **exchange token** request to Identity Provider is made. """ -import pprint - -from DIRAC import S_OK, S_ERROR -from DIRAC.Core.Security import Properties from DIRAC.Core.DISET.RequestHandler import RequestHandler -from DIRAC.FrameworkSystem.DB.TokenDB import TokenDB -from DIRAC.ConfigurationSystem.Client.Helpers import Registry -from DIRAC.Resources.IdProvider.IdProviderFactory import IdProviderFactory -from DIRAC.FrameworkSystem.Utilities.TokenManagementUtilities import ( - getIdProviderClient, - getCachedKey, -) - - -class TokenManagerHandlerMixin: - DEFAULT_AUTHORIZATION = ["authenticated"] - - @classmethod - def initializeHandler(cls, *args): - """Initialization - - :return: S_OK()/S_ERROR() - """ - - # The service plays an important OAuth 2.0 role, namely it is an Identity Provider client. - # This allows you to manage tokens without the involvement of their owners. - cls.idps = IdProviderFactory() - - # Let's try to connect to the database - try: - cls.__tokenDB = TokenDB(parentLogger=cls.log) - except Exception as e: - cls.log.exception(e) - return S_ERROR(f"Could not connect to the database {repr(e)}") - - return S_OK() - - auth_getUserTokensInfo = ["authenticated"] - types_getUserTokensInfo = [] - - def export_getUserTokensInfo(self): - """Generate information dict about user tokens - - :return: dict - """ - tokensInfo = [] - credDict = self.getRemoteCredentials() - result = Registry.getDNForUsername(credDict["username"]) - if not result["OK"]: - return result - for dn in result["Value"]: - result = Registry.getIDFromDN(dn) - if result["OK"]: - result = self.__tokenDB.getTokensByUserID(result["Value"]) - if not result["OK"]: - return result - tokensInfo += result["Value"] - return S_OK(tokensInfo) - - auth_getUsersTokensInfo = [Properties.PROXY_MANAGEMENT] - types_getUserTokensInfo = [list] - - def export_getUsersTokensInfo(self, users: list): - """Get the info about the user tokens in the database - - :param users: user names - - :return: S_OK(list) -- return list of tokens dictionaries - """ - tokensInfo = [] - for user in users: - # Find the user ID among his DNs - result = Registry.getDNForUsername(user) - if not result["OK"]: - return result - for dn in result["Value"]: - uid = Registry.getIDFromDN(dn).get("Value") - if uid: - result = self.__tokenDB.getTokensByUserID(uid) - if not result["OK"]: - self.log.error(result["Message"]) - else: - for tokenDict in result["Value"]: - if tokenDict not in tokensInfo: - # The database does not contain a username, - # as it is a unique user ID exclusively for DIRAC - # and is not associated with a token. - tokenDict["username"] = user - tokensInfo.append(tokenDict) - return S_OK(tokensInfo) - - types_updateToken = [dict, str, str, int] - - def export_updateToken(self, token: dict, userID: str, provider: str, rt_expired_in: int = 24 * 3600): - """Using this method, you can transfer user tokens for storage in the TokenManager. - - It is important to note that TokenManager saves only one token per user and, accordingly, - the Identity Provider from which it was issued. So when a new token is delegated, - keep in mind that the old token will be deleted. - - :param token: token - :param userID: user ID - :param provider: provider name - :param rt_expired_in: refresh token expires time (in seconds) - - :return: S_OK(list)/S_ERROR() -- list contain uploaded tokens info as dictionaries - """ - self.log.verbose(f"Update {userID} user token issued by {provider}:\n", pprint.pformat(token)) - # prepare the client instance of the appropriate IdP to revoke the old tokens - result = self.idps.getIdProvider(provider) - if not result["OK"]: - return result - idPObj = result["Value"] - # overwrite old tokens with new ones - result = self.__tokenDB.updateToken(token, userID, provider, rt_expired_in) - if not result["OK"]: - return result - # revoke the old tokens - for oldToken in result["Value"]: - if "refresh_token" in oldToken and oldToken["refresh_token"] != token["refresh_token"]: - self.log.verbose("Revoke old refresh token:\n", pprint.pformat(oldToken)) - idPObj.revokeToken(oldToken["refresh_token"]) - # Let's return to the current situation with the storage of user tokens - return self.__tokenDB.getTokensByUserID(userID) - - def __checkProperties(self, requestedUserDN: str, requestedUserGroup: str): - """Check the properties and return if they can only download limited tokens if authorized - - :param requestedUserDN: user DN - :param requestedUserGroup: DIRAC group - - :return: S_OK(bool)/S_ERROR() - """ - credDict = self.getRemoteCredentials() - if Properties.FULL_DELEGATION in credDict["properties"]: - return S_OK(False) - if Properties.LIMITED_DELEGATION in credDict["properties"]: - return S_OK(True) - if Properties.PRIVATE_LIMITED_DELEGATION in credDict["properties"]: - if credDict["DN"] != requestedUserDN: - return S_ERROR("You are not allowed to download any token") - if Properties.PRIVATE_LIMITED_DELEGATION not in Registry.getPropertiesForGroup(requestedUserGroup): - return S_ERROR("You can't download tokens for that group") - return S_OK(True) - # Not authorized! - return S_ERROR("You can't get tokens!") - - types_getToken = [None, None, None, None, None] - - def export_getToken( - self, - username: str = None, - userGroup: str = None, - scope: list[str] = None, - audience: str = None, - identityProvider: str = None, - requiredTimeLeft: int = 0, - ): - """Get an access token for a user/group. - - * Properties: - * FullDelegation <- permits full delegation of tokens - * LimitedDelegation <- permits downloading only limited tokens - * PrivateLimitedDelegation <- permits downloading only limited tokens for one self - - :param username: user name - :param userGroup: user group - :param scope: requested scope - :param audience: requested audience - :param identityProvider: Identity Provider name - :param requiredTimeLeft: requested minimum life time - - :return: S_OK(dict)/S_ERROR() - """ - # Get an IdProvider Client instance - result = getIdProviderClient(userGroup, identityProvider) - if not result["OK"]: - return result - idpObj = result["Value"] - - # getCachedKey is just used here to resolve the default scopes - _, scope, *_ = getCachedKey(idpObj, username, userGroup, scope, audience) - - # A client token is requested - if not username: - result = self.__checkProperties("", "") - if not result["OK"]: - return result - - # Get the client token with requested scope and audience - result = idpObj.fetchToken(grant_type="client_credentials", scope=scope, audience=audience) - if result["OK"]: - result["Value"] = dict(result["Value"]) - - return result - - # A user token is requested - err = [] - # No luck so far, let's refresh the token stored in the database - result = Registry.getDNForUsername(username) - if not result["OK"]: - return result - for dn in result["Value"]: - # For backward compatibility, the user ID is written as DN. So let's check if this DN contains a user ID - result = Registry.getIDFromDN(dn) - if result["OK"]: - uid = result["Value"] - # To do this, first find the refresh token stored in the database with the maximum scope - result = self.__tokenDB.getTokenForUserProvider(uid, idpObj.name) - if result["OK"] and result["Value"]: - tokens = result["Value"] - result = self.__checkProperties(dn, userGroup) - if result["OK"]: - # refresh token with requested scope - result = idpObj.refreshToken(tokens.get("refresh_token"), group=userGroup, scope=scope) - if result["OK"]: - return result - # Did not find any token associated with the found user ID - err.append(result.get("Message", f"No token found for {uid}")) - # Collect all errors when trying to get a token, or if no user ID is registered - return S_ERROR("; ".join(err or [f"No user ID found for {username}"])) - - types_deleteToken = [str] - - def export_deleteToken(self, userDN: str): - """Delete a token from the DB - - :param userDN: user DN - - :return: S_OK()/S_ERROR() - """ - - # temporary ugly stuff to make it compliant with proxy management - userDN = f"/O=DIRAC/CN={userDN}" - - # Delete it from cache - credDict = self.getRemoteCredentials() - if Properties.PROXY_MANAGEMENT not in credDict["properties"]: - if userDN != credDict["DN"]: - return S_ERROR("You aren't allowed!") - result = Registry.getIDFromDN(userDN) - return self.__tokenDB.removeToken(user_id=result["Value"]) if result["OK"] else result - - types_getTokensByUserID = [str] - - def export_getTokensByUserID(self, userID: str): - """Retrieve a token from the DB - - :param userID: user's token id - - :return: S_OK(list)/S_ERROR() token row in dict format - """ - return self.__tokenDB.getTokensByUserID(userID) +from DIRAC.FrameworkSystem.Service.TokenManagerHandler import TokenManagerHandlerMixin class DisetTokenManagerHandler(TokenManagerHandlerMixin, RequestHandler): diff --git a/src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py b/src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py index 70e9bc24907..ff81f3be20a 100644 --- a/src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py +++ b/src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py @@ -32,8 +32,6 @@ from DIRAC import S_OK, S_ERROR from DIRAC.Core.Security import Properties -from DIRAC.Core.Utilities import ThreadSafe -from DIRAC.Core.Utilities.DictCache import DictCache from DIRAC.Core.Tornado.Server.TornadoService import TornadoService from DIRAC.FrameworkSystem.DB.TokenDB import TokenDB from DIRAC.ConfigurationSystem.Client.Helpers import Registry @@ -41,12 +39,10 @@ from DIRAC.FrameworkSystem.Utilities.TokenManagementUtilities import ( getIdProviderClient, getCachedKey, - DEFAULT_RT_EXPIRATION_TIME, - DEFAULT_AT_EXPIRATION_TIME, ) -class TokenManagerHandler(TornadoService): +class TokenManagerHandlerMixin: DEFAULT_AUTHORIZATION = ["authenticated"] @classmethod @@ -69,6 +65,9 @@ def initializeHandler(cls, *args): return S_OK() + auth_getUserTokensInfo = ["authenticated"] + types_getUserTokensInfo = [] + def export_getUserTokensInfo(self): """Generate information dict about user tokens @@ -89,6 +88,7 @@ def export_getUserTokensInfo(self): return S_OK(tokensInfo) auth_getUsersTokensInfo = [Properties.PROXY_MANAGEMENT] + types_getUserTokensInfo = [list] def export_getUsersTokensInfo(self, users: list): """Get the info about the user tokens in the database @@ -119,6 +119,8 @@ def export_getUsersTokensInfo(self, users: list): tokensInfo.append(tokenDict) return S_OK(tokensInfo) + types_updateToken = [dict, str, str, int] + def export_updateToken(self, token: dict, userID: str, provider: str, rt_expired_in: int = 24 * 3600): """Using this method, you can transfer user tokens for storage in the TokenManager. @@ -173,6 +175,8 @@ def __checkProperties(self, requestedUserDN: str, requestedUserGroup: str): # Not authorized! return S_ERROR("You can't get tokens!") + types_getToken = [None, None, None, None, None] + def export_getToken( self, username: str = None, @@ -244,6 +248,8 @@ def export_getToken( # Collect all errors when trying to get a token, or if no user ID is registered return S_ERROR("; ".join(err or [f"No user ID found for {username}"])) + types_deleteToken = [str] + def export_deleteToken(self, userDN: str): """Delete a token from the DB @@ -263,6 +269,8 @@ def export_deleteToken(self, userDN: str): result = Registry.getIDFromDN(userDN) return self.__tokenDB.removeToken(user_id=result["Value"]) if result["OK"] else result + types_getTokensByUserID = [str] + def export_getTokensByUserID(self, userID: str): """Retrieve a token from the DB @@ -271,3 +279,7 @@ def export_getTokensByUserID(self, userID: str): :return: S_OK(list)/S_ERROR() token row in dict format """ return self.__tokenDB.getTokensByUserID(userID) + + +class TokenManagerHandler(TokenManagerHandlerMixin, TornadoService): + pass From 0fb2fb2a3581de2e70215b4c7de0da41117564cf Mon Sep 17 00:00:00 2001 From: Andrei Tsaregorodtsev Date: Mon, 18 Nov 2024 13:41:38 +0100 Subject: [PATCH 3/3] fix: getToken returns the token dictionary --- src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py b/src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py index ff81f3be20a..2bcd6535671 100644 --- a/src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py +++ b/src/DIRAC/FrameworkSystem/Service/TokenManagerHandler.py @@ -219,6 +219,9 @@ def export_getToken( # Get the client token with requested scope and audience result = idpObj.fetchToken(grant_type="client_credentials", scope=scope, audience=audience) + # DEncode can not encode OAuth2Token object + if result["OK"]: + result["Value"] = dict(result["Value"]) return result