diff --git a/.snyk b/.snyk index 4eaa56f..7d0fc1c 100644 --- a/.snyk +++ b/.snyk @@ -21,4 +21,8 @@ ignore: - '*': reason: "Accepting the Unknown license for now" expires: "2030-12-31T23:59:59Z" + "snyk:lic:pip:cryptography:Unknown": + - '*': + reason: "Accepting the Unknown license for now" + expires: "2030-12-31T23:59:59Z" patch: {} diff --git a/README.md b/README.md index 9d26263..f81b537 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,47 @@ async def callback(request: Request): return RedirectResponse(url="/") ``` +### 4. Login with Custom Token Exchange + +If you're migrating from a legacy authentication system or integrating with a custom identity provider, you can exchange external tokens for Auth0 tokens using the OAuth 2.0 Token Exchange specification (RFC 8693): + +```python +from auth0_server_python.auth_types import LoginWithCustomTokenExchangeOptions + +# Exchange a custom token and establish a session +result = await auth0.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="your-custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com" + ), + store_options={"request": request, "response": response} +) + +# Access the user session +user = result.state_data["user"] +``` + +For advanced token exchange scenarios (without creating a session), use `custom_token_exchange()` directly: + +```python +from auth0_server_python.auth_types import CustomTokenExchangeOptions + +# Exchange a custom token for Auth0 tokens +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="your-custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + scope="read:data write:data" + ) +) + +print(response.access_token) +``` + +For more details and examples, see [examples/CustomTokenExchange.md](examples/CustomTokenExchange.md). + ## Feedback ### Contributing diff --git a/examples/CustomTokenExchange.md b/examples/CustomTokenExchange.md new file mode 100644 index 0000000..c49dced --- /dev/null +++ b/examples/CustomTokenExchange.md @@ -0,0 +1,158 @@ +# Custom Token Exchange + +Custom Token Exchange allows you to exchange tokens from external identity providers or legacy authentication systems for Auth0 tokens without browser redirects. This implements **OAuth 2.0 Token Exchange** ([RFC 8693](https://datatracker.ietf.org/doc/html/rfc8693)). + +> **NOTE**: For complete documentation on Custom Token Exchange, configuration requirements, and detailed use cases, see the [official Auth0 documentation](https://auth0.com/docs/authenticate/custom-token-exchange). + +## 1. Basic Token Exchange + +Exchange a custom token for Auth0 tokens without creating a user session. + +```python +from auth0_server_python.auth_server.server_client import ServerClient +from auth0_server_python.auth_types import CustomTokenExchangeOptions + +# Initialize the client +auth0 = ServerClient( + domain="", + client_id="", + client_secret="", + secret="" +) + +# Exchange a custom token +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="custom-token-from-external-system", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + scope="read:data write:data" + ) +) + +# Access the exchanged tokens +print(f"Access Token: {response.access_token}") +print(f"Expires In: {response.expires_in} seconds") +if response.id_token: + print(f"ID Token: {response.id_token}") +``` + +## 2. Login with Token Exchange + +Exchange a custom token AND establish a user session. + +```python +from auth0_server_python.auth_types import LoginWithCustomTokenExchangeOptions +from fastapi import Request, Response + +# Exchange token and create session +result = await auth0.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="custom-token-from-external-system", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com" + ), + store_options={"request": request, "response": response} +) + +# User is now logged in +user = result.state_data["user"] +print(f"User logged in: {user['sub']}") +``` + +> **TIP**: Use `login_with_custom_token_exchange()` when you need both token exchange and session management (e.g., user migration flows). Use `custom_token_exchange()` for pure token exchange scenarios (e.g., service-to-service authentication). + +## 3. Actor Tokens (Delegation) + +Enable delegation scenarios where one service acts on behalf of a user. + +```python +# Service acting on behalf of a user +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="user-access-token", + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + actor_token="service-access-token", + actor_token_type="urn:ietf:params:oauth:token-type:access_token", + audience="https://api.example.com" + ) +) +``` + +## 4. Custom Authorization Parameters + +Pass additional parameters to the token endpoint. + +```python +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + authorization_params={ + "custom_field": "custom_value" + } + ) +) +``` + +> **NOTE**: Critical parameters (`grant_type`, `client_id`, `subject_token`, `subject_token_type`) cannot be overridden via `authorization_params` for security reasons. + +## 5. Organization Support + +Specify an organization when exchanging tokens. + +```python +response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + organization="org_abc1234" + ) +) +``` + +## 6. Error Handling + +```python +from auth0_server_python.error import CustomTokenExchangeError + +try: + response = await auth0.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token" + ) + ) +except CustomTokenExchangeError as e: + print(f"Exchange failed: {e.code} - {e.message}") +``` + +### Common Error Codes + +- `INVALID_TOKEN_FORMAT`: Token is empty, whitespace-only, or has "Bearer " prefix +- `MISSING_ACTOR_TOKEN_TYPE`: `actor_token` provided without `actor_token_type` +- `TOKEN_EXCHANGE_FAILED`: General token exchange failure +- `INVALID_RESPONSE`: Auth0 returned a non-JSON response + +## 7. Token Type URIs + +Use standard URNs when possible: + +```python +# Standard token types +"urn:ietf:params:oauth:token-type:jwt" # JWT tokens +"urn:ietf:params:oauth:token-type:access_token" # OAuth access tokens +"urn:ietf:params:oauth:token-type:id_token" # OpenID Connect ID tokens +"urn:ietf:params:oauth:token-type:refresh_token" # OAuth refresh tokens + +# Custom token types (use your own namespace) +"urn:acme:mcp-token" +"urn:company:legacy-token" +``` + +## Additional Resources + +- [Auth0 Custom Token Exchange Documentation](https://auth0.com/docs/authenticate/custom-token-exchange) +- [RFC 8693 - OAuth 2.0 Token Exchange](https://datatracker.ietf.org/doc/html/rfc8693) diff --git a/src/auth0_server_python/auth_server/server_client.py b/src/auth0_server_python/auth_server/server_client.py index a73622e..57452f0 100644 --- a/src/auth0_server_python/auth_server/server_client.py +++ b/src/auth0_server_python/auth_server/server_client.py @@ -17,12 +17,16 @@ CompleteConnectAccountResponse, ConnectAccountOptions, ConnectAccountRequest, + CustomTokenExchangeOptions, ListConnectedAccountConnectionsResponse, ListConnectedAccountsResponse, + LoginWithCustomTokenExchangeOptions, + LoginWithCustomTokenExchangeResult, LogoutOptions, LogoutTokenClaims, StartInteractiveLoginOptions, StateData, + TokenExchangeResponse, TokenSet, TransactionData, UserClaims, @@ -34,6 +38,8 @@ AccessTokenForConnectionErrorCode, ApiError, BackchannelLogoutError, + CustomTokenExchangeError, + CustomTokenExchangeErrorCode, InvalidArgumentError, MissingRequiredArgumentError, MissingTransactionError, @@ -823,6 +829,230 @@ async def handle_backchannel_logout( raise BackchannelLogoutError( f"Error processing logout token: {str(e)}") + async def custom_token_exchange( + self, + options: CustomTokenExchangeOptions + ) -> TokenExchangeResponse: + """ + Exchanges a custom token for Auth0 tokens using RFC 8693. + + This method implements the OAuth 2.0 Token Exchange specification, + allowing you to exchange external custom tokens for Auth0 access tokens. + + Args: + options: Configuration for the token exchange + + Returns: + TokenExchangeResponse containing access_token and metadata + + Raises: + CustomTokenExchangeError: If token exchange fails + MissingRequiredArgumentError: If required parameters are missing + + Example: + ```python + response = await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="custom-token-value", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + scope="read:data write:data" + ) + ) + print(response.access_token) + ``` + + See: + https://datatracker.ietf.org/doc/html/rfc8693 + """ + try: + # Validate options (Pydantic handles this automatically) + if not isinstance(options, CustomTokenExchangeOptions): + options = CustomTokenExchangeOptions(**options) + + # Ensure we have OIDC metadata + if not hasattr(self._oauth, "metadata") or not self._oauth.metadata: + self._oauth.metadata = await self._fetch_oidc_metadata(self._domain) + + token_endpoint = self._oauth.metadata.get("token_endpoint") + if not token_endpoint: + raise ApiError("configuration_error", "Token endpoint missing in OIDC metadata") + + # Prepare token exchange parameters per RFC 8693 + params = { + "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", + "client_id": self._client_id, + "subject_token": options.subject_token, + "subject_token_type": options.subject_token_type, + } + + # Add optional parameters + if options.audience: + params["audience"] = options.audience + + if options.scope: + params["scope"] = options.scope + + if options.actor_token: + params["actor_token"] = options.actor_token + params["actor_token_type"] = options.actor_token_type + + if options.organization: + params["organization"] = options.organization + + # Merge additional authorization params + if options.authorization_params: + # Prevent override of critical parameters + forbidden_params = {"grant_type", "client_id", "subject_token", "subject_token_type"} + for key, value in options.authorization_params.items(): + if key not in forbidden_params: + params[key] = value + + # Make the token exchange request + async with httpx.AsyncClient() as client: + response = await client.post( + token_endpoint, + data=params, + auth=(self._client_id, self._client_secret) + ) + + if response.status_code != 200: + error_data = response.json() if response.headers.get( + "content-type", "").startswith("application/json") else {} + raise CustomTokenExchangeError( + error_data.get("error", CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED), + error_data.get("error_description", f"Token exchange failed: {response.status_code}") + ) + + try: + token_data = response.json() + except json.JSONDecodeError: + raise CustomTokenExchangeError( + CustomTokenExchangeErrorCode.INVALID_RESPONSE, + "Failed to parse token response as JSON" + ) + + # Validate and return response + return TokenExchangeResponse(**token_data) + + except ValidationError as e: + raise CustomTokenExchangeError( + CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT, + f"Token validation failed: {str(e)}" + ) + except Exception as e: + if isinstance(e, (CustomTokenExchangeError, ApiError)): + raise + raise CustomTokenExchangeError( + CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED, + f"Token exchange failed: {str(e)}", + e + ) + + async def login_with_custom_token_exchange( + self, + options: LoginWithCustomTokenExchangeOptions, + store_options: Optional[dict[str, Any]] = None + ) -> LoginWithCustomTokenExchangeResult: + """ + Performs token exchange and establishes a user session. + + This method combines custom_token_exchange() with session management, + exchanging a custom token for Auth0 tokens and storing the session state. + + Args: + options: Configuration for token exchange and login + store_options: Optional options for state store (e.g., request/response for cookies) + + Returns: + LoginWithCustomTokenExchangeResult containing session state + + Raises: + CustomTokenExchangeError: If token exchange fails + ApiError: If session management fails + + Example: + ```python + result = await client.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="custom-token-value", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com" + ), + store_options={"request": request, "response": response} + ) + print(result.state_data["user"]) + ``` + + See: + https://datatracker.ietf.org/doc/html/rfc8693 + """ + try: + # Perform token exchange + exchange_options = CustomTokenExchangeOptions( + subject_token=options.subject_token, + subject_token_type=options.subject_token_type, + audience=options.audience, + scope=options.scope, + actor_token=options.actor_token, + actor_token_type=options.actor_token_type, + organization=options.organization, + authorization_params=options.authorization_params + ) + + token_response = await self.custom_token_exchange(exchange_options) + + # Extract user claims from ID token if present + user_claims = None + sid = PKCE.generate_random_string(32) # Default sid + if token_response.id_token: + claims = jwt.decode(token_response.id_token, options={"verify_signature": False}) + user_claims = UserClaims.parse_obj(claims) + # Extract sid from token if available + sid = claims.get("sid", sid) + + # Determine audience for token set + audience = options.audience or self.DEFAULT_AUDIENCE_STATE_KEY + + # Build token set + token_set = TokenSet( + audience=audience, + access_token=token_response.access_token, + scope=token_response.scope or options.scope or "", + expires_at=int(time.time()) + token_response.expires_in + ) + + # Construct state data + state_data = StateData( + user=user_claims, + id_token=token_response.id_token, + refresh_token=token_response.refresh_token, + token_sets=[token_set], + internal={ + "sid": sid, + "created_at": int(time.time()) + } + ) + + # Store session + await self._state_store.set(self._state_identifier, state_data, options=store_options) + + # Build result + result = LoginWithCustomTokenExchangeResult( + state_data=state_data.dict() + ) + + return result + + except Exception as e: + if isinstance(e, (CustomTokenExchangeError, ApiError)): + raise + raise CustomTokenExchangeError( + CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED, + f"Login with custom token exchange failed: {str(e)}", + e + ) + # Authlib Helpers async def _build_link_user_url( diff --git a/src/auth0_server_python/auth_types/__init__.py b/src/auth0_server_python/auth_types/__init__.py index 7f32e8d..4b36ca3 100644 --- a/src/auth0_server_python/auth_types/__init__.py +++ b/src/auth0_server_python/auth_types/__init__.py @@ -5,7 +5,7 @@ from typing import Any, Optional -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator, model_validator class UserClaims(BaseModel): @@ -213,6 +213,121 @@ class StartLinkUserOptions(BaseModel): authorization_params: Optional[dict[str, Any]] = None app_state: Optional[Any] = None +# ============================================================================= +# Custom Token Exchange Types +# ============================================================================= + +class CustomTokenExchangeOptions(BaseModel): + """ + Options for custom token exchange (RFC 8693). + + Args: + subject_token: The security token being exchanged + subject_token_type: Identifier indicating the token format + audience: Logical name of target service (optional) + scope: Space-delimited list of scopes (optional) + actor_token: Security token representing the acting party (optional) + actor_token_type: Type of actor token (required if actor_token present) + organization: Organization identifier for the token exchange (optional) + authorization_params: Additional OAuth parameters (optional) + """ + subject_token: str = Field(..., min_length=1) + subject_token_type: str = Field(..., min_length=1) + audience: Optional[str] = None + scope: Optional[str] = None + actor_token: Optional[str] = None + actor_token_type: Optional[str] = None + organization: Optional[str] = None + authorization_params: Optional[dict[str, Any]] = None + + @field_validator('subject_token', 'actor_token') + @classmethod + def validate_token_format(cls, v: Optional[str]) -> Optional[str]: + """Validate token doesn't have Bearer prefix and isn't whitespace-only.""" + if v is not None: + if not v.strip(): + raise ValueError("Token cannot be empty or whitespace-only") + if v.strip().startswith("Bearer "): + raise ValueError("Token should not include 'Bearer ' prefix") + return v + + @model_validator(mode='after') + def validate_actor_token_type(self) -> 'CustomTokenExchangeOptions': + """Ensure actor_token_type is provided if actor_token is present.""" + if self.actor_token and not self.actor_token_type: + raise ValueError("actor_token_type is required when actor_token is provided") + return self + + +class TokenExchangeResponse(BaseModel): + """ + Response from token exchange operation. + + Attributes: + access_token: The issued access token + token_type: Token type (typically "Bearer") + expires_in: Token lifetime in seconds + scope: Granted scopes (if different from requested) + issued_token_type: Format of issued token + id_token: OpenID Connect ID token (optional) + refresh_token: Refresh token (optional) + """ + access_token: str + token_type: str = "Bearer" + expires_in: int + scope: Optional[str] = None + issued_token_type: Optional[str] = None + id_token: Optional[str] = None + refresh_token: Optional[str] = None + + +class LoginWithCustomTokenExchangeOptions(BaseModel): + """ + Options for logging in via custom token exchange. + + Combines token exchange parameters with session management. + """ + subject_token: str = Field(..., min_length=1) + subject_token_type: str = Field(..., min_length=1) + audience: Optional[str] = None + scope: Optional[str] = None + actor_token: Optional[str] = None + actor_token_type: Optional[str] = None + organization: Optional[str] = None + authorization_params: Optional[dict[str, Any]] = None + + @field_validator('subject_token', 'actor_token') + @classmethod + def validate_token_format(cls, v: Optional[str]) -> Optional[str]: + """Validate token doesn't have Bearer prefix and isn't whitespace-only.""" + if v is not None: + if not v.strip(): + raise ValueError("Token cannot be empty or whitespace-only") + if v.strip().startswith("Bearer "): + raise ValueError("Token should not include 'Bearer ' prefix") + return v + + @model_validator(mode='after') + def validate_actor_token_type(self) -> 'LoginWithCustomTokenExchangeOptions': + """Ensure actor_token_type is provided if actor_token is present.""" + if self.actor_token and not self.actor_token_type: + raise ValueError("actor_token_type is required when actor_token is provided") + return self + + +class LoginWithCustomTokenExchangeResult(BaseModel): + """ + Result from login with custom token exchange. + + Contains session data established after token exchange. + """ + state_data: dict[str, Any] + authorization_details: Optional[list[AuthorizationDetails]] = None + +# ============================================================================= +# Connected Accounts Types +# ============================================================================= + # BASE & SHARED class ConnectedAccountBase(BaseModel): id: str diff --git a/src/auth0_server_python/error/__init__.py b/src/auth0_server_python/error/__init__.py index 73fb41c..c593368 100644 --- a/src/auth0_server_python/error/__init__.py +++ b/src/auth0_server_python/error/__init__.py @@ -165,3 +165,22 @@ class AccessTokenForConnectionErrorCode: FAILED_TO_RETRIEVE = "failed_to_retrieve" API_ERROR = "api_error" FETCH_ERROR = "retrieval_error" + + +class CustomTokenExchangeError(Auth0Error): + """ + Error raised during custom token exchange operations. + """ + def __init__(self, code: str, message: str, cause=None): + super().__init__(message) + self.code = code + self.name = "CustomTokenExchangeError" + self.cause = cause + + +class CustomTokenExchangeErrorCode: + """Error codes for custom token exchange operations.""" + INVALID_TOKEN_FORMAT = "invalid_token_format" + MISSING_ACTOR_TOKEN_TYPE = "missing_actor_token_type" + TOKEN_EXCHANGE_FAILED = "token_exchange_failed" + INVALID_RESPONSE = "invalid_response" diff --git a/src/auth0_server_python/tests/test_server_client.py b/src/auth0_server_python/tests/test_server_client.py index 260d9ba..4f1b90b 100644 --- a/src/auth0_server_python/tests/test_server_client.py +++ b/src/auth0_server_python/tests/test_server_client.py @@ -14,8 +14,10 @@ ConnectedAccount, ConnectedAccountConnection, ConnectParams, + CustomTokenExchangeOptions, ListConnectedAccountConnectionsResponse, ListConnectedAccountsResponse, + LoginWithCustomTokenExchangeOptions, LogoutOptions, TransactionData, ) @@ -23,6 +25,8 @@ AccessTokenForConnectionError, ApiError, BackchannelLogoutError, + CustomTokenExchangeError, + CustomTokenExchangeErrorCode, InvalidArgumentError, MissingRequiredArgumentError, MissingTransactionError, @@ -30,6 +34,7 @@ StartLinkUserError, ) from auth0_server_python.utils import PKCE +from pydantic_core import ValidationError @pytest.mark.asyncio @@ -1609,6 +1614,11 @@ async def test_get_token_by_refresh_token_exchange_failed(mocker): args, kwargs = mock_post.call_args assert kwargs["data"]["refresh_token"] == "" +# ============================================================================= +# Connected Accounts Tests (My Account Client) +# ============================================================================= + + @pytest.mark.asyncio async def test_start_connect_account_calls_connect_and_builds_url(mocker): # Setup @@ -2139,3 +2149,678 @@ async def test_list_connected_account_connections_with_invalid_take_param(mocker # Assert assert "The 'take' parameter must be a positive integer." in str(exc.value) mock_my_account_client.list_connected_account_connections.assert_not_awaited() + + +# ============================================================================= +# Custom Token Exchange Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_custom_token_exchange_success(mocker): + """Test successful token exchange with basic parameters.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + # Mock OIDC metadata + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock httpx response + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "exchanged_access_token", + "token_type": "Bearer", + "expires_in": 3600, + "scope": "read:data", + "issued_token_type": "urn:ietf:params:oauth:token-type:access_token" + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + options = CustomTokenExchangeOptions( + subject_token="custom-token-123", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com", + scope="read:data" + ) + result = await client.custom_token_exchange(options) + + # Assert + assert result.access_token == "exchanged_access_token" + assert result.token_type == "Bearer" + assert result.expires_in == 3600 + assert result.scope == "read:data" + assert result.issued_token_type == "urn:ietf:params:oauth:token-type:access_token" + + # Verify the request was made correctly + mock_httpx_client.post.assert_called_once() + call_args = mock_httpx_client.post.call_args + assert call_args[0][0] == "https://auth0.local/oauth/token" + assert call_args[1]["data"]["grant_type"] == "urn:ietf:params:oauth:grant-type:token-exchange" + assert call_args[1]["data"]["subject_token"] == "custom-token-123" + assert call_args[1]["data"]["subject_token_type"] == "urn:acme:mcp-token" + assert call_args[1]["data"]["audience"] == "https://api.example.com" + assert call_args[1]["data"]["scope"] == "read:data" + + +@pytest.mark.asyncio +async def test_custom_token_exchange_with_actor_token(mocker): + """Test token exchange with actor token (delegation scenario).""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "delegated_token", + "token_type": "Bearer", + "expires_in": 1800 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + options = CustomTokenExchangeOptions( + subject_token="user-token", + subject_token_type="urn:ietf:params:oauth:token-type:access_token", + actor_token="service-token", + actor_token_type="urn:ietf:params:oauth:token-type:access_token" + ) + result = await client.custom_token_exchange(options) + + # Assert + assert result.access_token == "delegated_token" + + # Verify actor params were sent + call_args = mock_httpx_client.post.call_args + assert call_args[1]["data"]["actor_token"] == "service-token" + assert call_args[1]["data"]["actor_token_type"] == "urn:ietf:params:oauth:token-type:access_token" + + +@pytest.mark.asyncio +async def test_custom_token_exchange_with_organization(mocker): + """Test token exchange with organization parameter.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "org_scoped_token", + "token_type": "Bearer", + "expires_in": 3600 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + options = CustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token", + organization="org_abc1234" + ) + result = await client.custom_token_exchange(options) + + # Assert + assert result.access_token == "org_scoped_token" + + # Verify organization param was sent + call_args = mock_httpx_client.post.call_args + assert call_args[1]["data"]["organization"] == "org_abc1234" + + +@pytest.mark.asyncio +async def test_custom_token_exchange_empty_token(): + """Test that empty/whitespace tokens are rejected.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + # Act & Assert - empty token + with pytest.raises(ValidationError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token=" ", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert "empty or whitespace" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_custom_token_exchange_bearer_prefix(): + """Test that tokens with 'Bearer ' prefix are rejected.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + # Act & Assert + with pytest.raises(ValidationError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="Bearer abc123", + subject_token_type="urn:ietf:params:oauth:token-type:access_token" + ) + ) + assert "Bearer" in str(exc.value) + + +@pytest.mark.asyncio +async def test_custom_token_exchange_missing_actor_token_type(): + """Test that actor_token_type is required when actor_token is provided.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + # Act & Assert + with pytest.raises(ValidationError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:token", + actor_token="actor-token", + actor_token_type=None + ) + ) + assert "actor_token_type" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_custom_token_exchange_api_error_400(mocker): + """Test handling of 400 error from Auth0.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock 400 error response + mock_response = MagicMock() + mock_response.status_code = 400 + mock_response.json.return_value = { + "error": "invalid_grant", + "error_description": "Subject token is invalid" + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act & Assert + with pytest.raises(CustomTokenExchangeError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="invalid-token", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert exc.value.code == "invalid_grant" + assert "Subject token is invalid" in str(exc.value) + + +@pytest.mark.asyncio +async def test_custom_token_exchange_invalid_json_response(mocker): + """Test handling of non-JSON response from token endpoint.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock response with invalid JSON + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.side_effect = json.JSONDecodeError("msg", "doc", 0) + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act & Assert + with pytest.raises(CustomTokenExchangeError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert exc.value.code == CustomTokenExchangeErrorCode.INVALID_RESPONSE + assert "parse" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_custom_token_exchange_missing_token_endpoint(mocker): + """Test error when token endpoint is missing from OIDC metadata.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + # Mock metadata without token_endpoint + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"authorization_endpoint": "https://auth0.local/authorize"} + ) + + # Act & Assert + with pytest.raises(ApiError) as exc: + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert exc.value.code == "configuration_error" + assert "token endpoint" in str(exc.value).lower() + + +@pytest.mark.asyncio +async def test_custom_token_exchange_with_authorization_params(mocker): + """Test that additional authorization_params are passed through.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "token", + "token_type": "Bearer", + "expires_in": 3600 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token", + authorization_params={"custom_param": "custom_value"} + ) + ) + + # Assert + call_args = mock_httpx_client.post.call_args + assert call_args[1]["data"]["custom_param"] == "custom_value" + + +@pytest.mark.asyncio +async def test_custom_token_exchange_forbidden_params_filtered(mocker): + """Test that forbidden params cannot be overridden.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "token", + "token_type": "Bearer", + "expires_in": 3600 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + await client.custom_token_exchange( + CustomTokenExchangeOptions( + subject_token="token", + subject_token_type="urn:acme:mcp-token", + authorization_params={ + "grant_type": "malicious_grant", # Should be filtered + "client_id": "malicious_client", # Should be filtered + "allowed_param": "value" # Should be allowed + } + ) + ) + + # Assert + call_args = mock_httpx_client.post.call_args + assert call_args[1]["data"]["grant_type"] == "urn:ietf:params:oauth:grant-type:token-exchange" + assert call_args[1]["data"]["client_id"] == "" + assert call_args[1]["data"]["allowed_param"] == "value" + + +# ============================================================================= +# Login with Custom Token Exchange Tests +# ============================================================================= + +@pytest.mark.asyncio +async def test_login_with_custom_token_exchange_success(mocker): + """Test successful login with custom token exchange.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock token exchange response with ID token + id_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyMTIzIiwibmFtZSI6IkpvaG4gRG9lIiwic2lkIjoic2Vzc2lvbjEyMyJ9.fake" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "exchanged_token", + "token_type": "Bearer", + "expires_in": 3600, + "id_token": id_token, + "refresh_token": "refresh_token_123" + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Mock JWT decode + mocker.patch("jwt.decode", return_value={ + "sub": "user123", + "name": "John Doe", + "sid": "session123" + }) + + # Act + result = await client.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token", + audience="https://api.example.com" + ) + ) + + # Assert + assert result.state_data is not None + assert result.state_data["user"]["sub"] == "user123" + assert result.state_data["user"]["name"] == "John Doe" + assert result.state_data["id_token"] == id_token + assert result.state_data["refresh_token"] == "refresh_token_123" + assert len(result.state_data["token_sets"]) == 1 + assert result.state_data["token_sets"][0]["access_token"] == "exchanged_token" + assert result.state_data["internal"]["sid"] == "session123" + + # Verify state was stored + mock_state_store.set.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_login_with_custom_token_exchange_no_id_token(mocker): + """Test login when no ID token is returned.""" + # Setup + mock_transaction_store = AsyncMock() + mock_state_store = AsyncMock() + + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=mock_state_store, + transaction_store=mock_transaction_store, + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock token exchange response without ID token + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "access_token": "exchanged_token", + "token_type": "Bearer", + "expires_in": 3600 + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act + result = await client.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="custom-token", + subject_token_type="urn:acme:mcp-token" + ) + ) + + # Assert - user should be None, but session should be created + assert result.state_data["user"] is None + assert result.state_data["id_token"] is None + assert len(result.state_data["token_sets"]) == 1 + assert "sid" in result.state_data["internal"] + + # Verify state was stored + mock_state_store.set.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_login_with_custom_token_exchange_failure_propagates(mocker): + """Test that token exchange failures are propagated.""" + # Setup + client = ServerClient( + domain="auth0.local", + client_id="", + client_secret="", + state_store=AsyncMock(), + transaction_store=AsyncMock(), + secret="some-secret" + ) + + mocker.patch.object( + client, + "_fetch_oidc_metadata", + return_value={"token_endpoint": "https://auth0.local/oauth/token"} + ) + + # Mock 401 error + mock_response = MagicMock() + mock_response.status_code = 401 + mock_response.json.return_value = { + "error": "unauthorized", + "error_description": "Invalid credentials" + } + mock_response.headers.get.return_value = "application/json" + + mock_httpx_client = AsyncMock() + mock_httpx_client.__aenter__.return_value = mock_httpx_client + mock_httpx_client.__aexit__.return_value = None + mock_httpx_client.post.return_value = mock_response + + mocker.patch("httpx.AsyncClient", return_value=mock_httpx_client) + + # Act & Assert + with pytest.raises(CustomTokenExchangeError) as exc: + await client.login_with_custom_token_exchange( + LoginWithCustomTokenExchangeOptions( + subject_token="invalid-token", + subject_token_type="urn:acme:mcp-token" + ) + ) + assert exc.value.code == "unauthorized"