Skip to content

Commit 6c9d01e

Browse files
feat(auth): Implement custom token exchange methods
1 parent 88c8dc2 commit 6c9d01e

1 file changed

Lines changed: 226 additions & 0 deletions

File tree

src/auth0_server_python/auth_server/server_client.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
CompleteConnectAccountResponse,
1818
ConnectAccountOptions,
1919
ConnectAccountRequest,
20+
CustomTokenExchangeOptions,
21+
LoginWithCustomTokenExchangeOptions,
22+
LoginWithCustomTokenExchangeResult,
2023
LogoutOptions,
2124
LogoutTokenClaims,
2225
StartInteractiveLoginOptions,
2326
StateData,
27+
TokenExchangeResponse,
2428
TokenSet,
2529
TransactionData,
2630
UserClaims,
@@ -32,6 +36,8 @@
3236
AccessTokenForConnectionErrorCode,
3337
ApiError,
3438
BackchannelLogoutError,
39+
CustomTokenExchangeError,
40+
CustomTokenExchangeErrorCode,
3541
MissingRequiredArgumentError,
3642
MissingTransactionError,
3743
PollingApiError,
@@ -1471,3 +1477,223 @@ async def complete_connect_account(
14711477
await self._transaction_store.delete(transaction_identifier, options=store_options)
14721478

14731479
return response
1480+
1481+
async def custom_token_exchange(
1482+
self,
1483+
options: CustomTokenExchangeOptions
1484+
) -> TokenExchangeResponse:
1485+
"""
1486+
Exchanges a custom token for Auth0 tokens using RFC 8693.
1487+
1488+
This method implements the OAuth 2.0 Token Exchange specification,
1489+
allowing you to exchange external custom tokens for Auth0 access tokens.
1490+
1491+
Args:
1492+
options: Configuration for the token exchange
1493+
1494+
Returns:
1495+
TokenExchangeResponse containing access_token and metadata
1496+
1497+
Raises:
1498+
CustomTokenExchangeError: If token exchange fails
1499+
MissingRequiredArgumentError: If required parameters are missing
1500+
1501+
Example:
1502+
```python
1503+
response = await client.custom_token_exchange(
1504+
CustomTokenExchangeOptions(
1505+
subject_token="custom-token-value",
1506+
subject_token_type="urn:acme:mcp-token",
1507+
audience="https://api.example.com",
1508+
scope="read:data write:data"
1509+
)
1510+
)
1511+
print(response.access_token)
1512+
```
1513+
1514+
See:
1515+
https://datatracker.ietf.org/doc/html/rfc8693
1516+
"""
1517+
try:
1518+
# Validate options (Pydantic handles this automatically)
1519+
if not isinstance(options, CustomTokenExchangeOptions):
1520+
options = CustomTokenExchangeOptions(**options)
1521+
1522+
# Ensure we have OIDC metadata
1523+
if not hasattr(self._oauth, "metadata") or not self._oauth.metadata:
1524+
self._oauth.metadata = await self._fetch_oidc_metadata(self._domain)
1525+
1526+
token_endpoint = self._oauth.metadata.get("token_endpoint")
1527+
if not token_endpoint:
1528+
raise ApiError("configuration_error", "Token endpoint missing in OIDC metadata")
1529+
1530+
# Prepare token exchange parameters per RFC 8693
1531+
params = {
1532+
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
1533+
"client_id": self._client_id,
1534+
"subject_token": options.subject_token,
1535+
"subject_token_type": options.subject_token_type,
1536+
}
1537+
1538+
# Add optional parameters
1539+
if options.audience:
1540+
params["audience"] = options.audience
1541+
1542+
if options.scope:
1543+
params["scope"] = options.scope
1544+
1545+
if options.actor_token:
1546+
params["actor_token"] = options.actor_token
1547+
params["actor_token_type"] = options.actor_token_type
1548+
1549+
# Merge additional authorization params
1550+
if options.authorization_params:
1551+
# Prevent override of critical parameters
1552+
forbidden_params = {"grant_type", "client_id", "subject_token", "subject_token_type"}
1553+
for key, value in options.authorization_params.items():
1554+
if key not in forbidden_params:
1555+
params[key] = value
1556+
1557+
# Make the token exchange request
1558+
async with httpx.AsyncClient() as client:
1559+
response = await client.post(
1560+
token_endpoint,
1561+
data=params,
1562+
auth=(self._client_id, self._client_secret)
1563+
)
1564+
1565+
if response.status_code != 200:
1566+
error_data = response.json() if response.headers.get(
1567+
"content-type", "").startswith("application/json") else {}
1568+
raise CustomTokenExchangeError(
1569+
error_data.get("error", CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED),
1570+
error_data.get("error_description", f"Token exchange failed: {response.status_code}")
1571+
)
1572+
1573+
try:
1574+
token_data = response.json()
1575+
except json.JSONDecodeError:
1576+
raise CustomTokenExchangeError(
1577+
CustomTokenExchangeErrorCode.INVALID_RESPONSE,
1578+
"Failed to parse token response as JSON"
1579+
)
1580+
1581+
# Validate and return response
1582+
return TokenExchangeResponse(**token_data)
1583+
1584+
except ValidationError as e:
1585+
raise CustomTokenExchangeError(
1586+
CustomTokenExchangeErrorCode.INVALID_TOKEN_FORMAT,
1587+
f"Token validation failed: {str(e)}"
1588+
)
1589+
except Exception as e:
1590+
if isinstance(e, (CustomTokenExchangeError, ApiError)):
1591+
raise
1592+
raise CustomTokenExchangeError(
1593+
CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED,
1594+
f"Token exchange failed: {str(e)}",
1595+
e
1596+
)
1597+
1598+
async def login_with_custom_token_exchange(
1599+
self,
1600+
options: LoginWithCustomTokenExchangeOptions,
1601+
store_options: Optional[dict[str, Any]] = None
1602+
) -> LoginWithCustomTokenExchangeResult:
1603+
"""
1604+
Performs token exchange and establishes a user session.
1605+
1606+
This method combines custom_token_exchange() with session management,
1607+
exchanging a custom token for Auth0 tokens and storing the session state.
1608+
1609+
Args:
1610+
options: Configuration for token exchange and login
1611+
store_options: Optional options for state store (e.g., request/response for cookies)
1612+
1613+
Returns:
1614+
LoginWithCustomTokenExchangeResult containing session state
1615+
1616+
Raises:
1617+
CustomTokenExchangeError: If token exchange fails
1618+
ApiError: If session management fails
1619+
1620+
Example:
1621+
```python
1622+
result = await client.login_with_custom_token_exchange(
1623+
LoginWithCustomTokenExchangeOptions(
1624+
subject_token="custom-token-value",
1625+
subject_token_type="urn:acme:mcp-token",
1626+
audience="https://api.example.com"
1627+
),
1628+
store_options={"request": request, "response": response}
1629+
)
1630+
print(result.state_data["user"])
1631+
```
1632+
1633+
See:
1634+
https://datatracker.ietf.org/doc/html/rfc8693
1635+
"""
1636+
try:
1637+
# Perform token exchange
1638+
exchange_options = CustomTokenExchangeOptions(
1639+
subject_token=options.subject_token,
1640+
subject_token_type=options.subject_token_type,
1641+
audience=options.audience,
1642+
scope=options.scope,
1643+
actor_token=options.actor_token,
1644+
actor_token_type=options.actor_token_type,
1645+
authorization_params=options.authorization_params
1646+
)
1647+
1648+
token_response = await self.custom_token_exchange(exchange_options)
1649+
1650+
# Extract user claims from ID token if present
1651+
user_claims = None
1652+
sid = PKCE.generate_random_string(32) # Default sid
1653+
if token_response.id_token:
1654+
claims = jwt.decode(token_response.id_token, options={"verify_signature": False})
1655+
user_claims = UserClaims.parse_obj(claims)
1656+
# Extract sid from token if available
1657+
sid = claims.get("sid", sid)
1658+
1659+
# Determine audience for token set
1660+
audience = options.audience or self.DEFAULT_AUDIENCE_STATE_KEY
1661+
1662+
# Build token set
1663+
token_set = TokenSet(
1664+
audience=audience,
1665+
access_token=token_response.access_token,
1666+
scope=token_response.scope or options.scope or "",
1667+
expires_at=int(time.time()) + token_response.expires_in
1668+
)
1669+
1670+
# Construct state data
1671+
state_data = StateData(
1672+
user=user_claims,
1673+
id_token=token_response.id_token,
1674+
refresh_token=token_response.refresh_token,
1675+
token_sets=[token_set],
1676+
internal={
1677+
"sid": sid,
1678+
"created_at": int(time.time())
1679+
}
1680+
)
1681+
1682+
# Store session
1683+
await self._state_store.set(self._state_identifier, state_data, options=store_options)
1684+
1685+
# Build result
1686+
result = LoginWithCustomTokenExchangeResult(
1687+
state_data=state_data.dict()
1688+
)
1689+
1690+
return result
1691+
1692+
except Exception as e:
1693+
if isinstance(e, (CustomTokenExchangeError, ApiError)):
1694+
raise
1695+
raise CustomTokenExchangeError(
1696+
CustomTokenExchangeErrorCode.TOKEN_EXCHANGE_FAILED,
1697+
f"Login with custom token exchange failed: {str(e)}",
1698+
e
1699+
)

0 commit comments

Comments
 (0)