diff --git a/.github/workflows/semgrep.yml b/.github/workflows/semgrep.yml new file mode 100644 index 0000000..7913b13 --- /dev/null +++ b/.github/workflows/semgrep.yml @@ -0,0 +1,40 @@ +name: Semgrep + +on: + merge_group: + pull_request: + types: + - opened + - synchronize + push: + branches: + - main + schedule: + - cron: "30 0 1,15 * *" + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} + +jobs: + run: + name: Check for Vulnerabilities + runs-on: ubuntu-latest + + container: + image: returntocorp/semgrep + + steps: + - if: github.actor == 'dependabot[bot]' || github.event_name == 'merge_group' + run: exit 0 # Skip unnecessary test runs for dependabot and merge queues. Artifically flag as successful, as this is a required check for branch protection. + + - uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + + - run: semgrep ci + env: + SEMGREP_APP_TOKEN: ${{ secrets.SEMGREP_APP_TOKEN }} diff --git a/.github/workflows/snyk.yml b/.github/workflows/snyk.yml new file mode 100644 index 0000000..7b501ba --- /dev/null +++ b/.github/workflows/snyk.yml @@ -0,0 +1,126 @@ +name: Snyk + +on: + merge_group: + workflow_dispatch: + pull_request: + types: + - opened + - synchronize + push: + branches: + - main + schedule: + - cron: '30 0 1,15 * *' + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} + +jobs: + # Discover packages with changes for targeted scanning + discover-changed-packages: + name: Discover Changed Packages + runs-on: ubuntu-latest + outputs: + matrix: ${{ steps.set-matrix.outputs.matrix }} + has-changes: ${{ steps.set-matrix.outputs.has-changes }} + steps: + - if: github.actor == 'dependabot[bot]' || github.event_name == 'merge_group' + run: exit 0 + + - uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + fetch-depth: 0 + + - name: Discover packages with changes + id: set-matrix + run: | + # For push events or scheduled runs, scan all packages + if [[ "${{ github.event_name }}" == "push" || "${{ github.event_name }}" == "schedule" || "${{ github.event_name }}" == "workflow_dispatch" ]]; then + packages=$(find packages -maxdepth 1 -type d -name "auth0_*" | sed 's|^packages/||' | jq -R -s -c 'split("\n")[:-1]') + echo "Scanning all packages for ${{ github.event_name }} event" + else + # For PRs, only scan packages with changes + changed_files=$(git diff --name-only origin/main...HEAD) + changed_packages=$(echo "$changed_files" | grep '^packages/auth0_' | cut -d'/' -f2 | sort -u | jq -R -s -c 'split("\n")[:-1] | map(select(length > 0))') + packages="$changed_packages" + echo "Changed files: $changed_files" + echo "Scanning changed packages for PR: $packages" + fi + + echo "matrix={\"package\":$packages}" >> $GITHUB_OUTPUT + if [ "$packages" = "[]" ]; then + echo "has-changes=false" >> $GITHUB_OUTPUT + else + echo "has-changes=true" >> $GITHUB_OUTPUT + fi + echo "Final packages to scan: $packages" + + # Security scanning for packages with changes + security-scan: + name: Security Scan (${{ matrix.package }}) + runs-on: ubuntu-latest + needs: discover-changed-packages + if: needs.discover-changed-packages.outputs.has-changes == 'true' + strategy: + fail-fast: false + matrix: ${{ fromJson(needs.discover-changed-packages.outputs.matrix) }} + + steps: + - if: github.actor == 'dependabot[bot]' || github.event_name == 'merge_group' + run: exit 0 + + - uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Check for requirements.txt + working-directory: packages/${{ matrix.package }} + run: | + if [ ! -f "requirements.txt" ]; then + echo "❌ requirements.txt not found for ${{ matrix.package }}" + echo "Please ensure requirements.txt exists in the package directory" + exit 1 + fi + echo "✅ Found requirements.txt for ${{ matrix.package }}" + echo "Dependencies to scan:" + head -5 requirements.txt + + - name: Install dependencies + working-directory: packages/${{ matrix.package }} + run: | + echo "Installing dependencies for Snyk scan..." + pip install -r requirements.txt + echo "✅ Dependencies installed successfully" + + - name: Install Snyk CLI + run: | + curl -Lo snyk "https://static.snyk.io/cli/latest/snyk-linux" + chmod +x snyk + sudo mv snyk /usr/local/bin/ + + - name: Run Snyk security scan + working-directory: packages/${{ matrix.package }} + env: + SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} + run: | + echo "Running Snyk scan in $(pwd)" + echo "Python version: $(python3 --version)" + echo "Pip packages installed:" + pip3 list | grep -E "(authlib|requests|httpx|ada-url)" || echo "Some packages not found" + + # Run Snyk test with debug output + snyk test --file=requirements.txt --package-manager=pip --command=python3 --debug || { + echo "Snyk test failed, trying with --allow-missing flag..." + snyk test --file=requirements.txt --package-manager=pip --command=python3 -- --allow-missing + } \ No newline at end of file diff --git a/.github/workflows/test-auth0-api-python.yml b/.github/workflows/test-auth0-api-python.yml new file mode 100644 index 0000000..1d1a1bc --- /dev/null +++ b/.github/workflows/test-auth0-api-python.yml @@ -0,0 +1,63 @@ +name: Test auth0-api-python + +on: + push: + branches: + - feature/auth0-api-python + paths: + - 'packages/auth0_api_python/**' + pull_request: + branches: + - main + paths: + - 'packages/auth0_api_python/**' + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.9, "3.10", "3.11", "3.12"] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + version: latest + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Load cached venv + id: cached-poetry-dependencies + uses: actions/cache@v3 + with: + path: packages/auth0_api_python/.venv + key: venv-${{ runner.os }}-${{ matrix.python-version }}-${{ hashFiles('**/poetry.lock') }} + + - name: Install dependencies + if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true' + working-directory: ./packages/auth0_api_python + run: poetry install --no-interaction --no-root + + - name: Install package + working-directory: ./packages/auth0_api_python + run: poetry install --no-interaction + + - name: Run tests with pytest + working-directory: ./packages/auth0_api_python + run: | + poetry run pytest -v --cov=src --cov-report=term-missing --cov-report=xml + + - name: Run ruff linting + working-directory: ./packages/auth0_api_python + run: | + poetry run ruff check . diff --git a/packages/auth0_api_python/.ruff.toml b/packages/auth0_api_python/.ruff.toml new file mode 100644 index 0000000..b500d05 --- /dev/null +++ b/packages/auth0_api_python/.ruff.toml @@ -0,0 +1,16 @@ +line-length = 100 +target-version = "py39" +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "B", # flake8-bugbear + "C4", # flake8-comprehensions + "UP", # pyupgrade + "S", # bandit (security) +] +ignore = ["E501", "B904"] # Line too long (handled by black), Exception handling without from + +[per-file-ignores] +"tests/*" = ["S101", "S105", "S106"] # Allow assert and ignore hardcoded password warnings in test files diff --git a/packages/auth0_api_python/.snyk b/packages/auth0_api_python/.snyk new file mode 100644 index 0000000..e4e69cc --- /dev/null +++ b/packages/auth0_api_python/.snyk @@ -0,0 +1,20 @@ +# Snyk (https://snyk.io) policy file, patches or ignores known vulnerabilities. +version: v1.12.0 +# ignores vulnerabilities until expiry date; change duration by modifying expiry date +ignore: + SNYK-PYTHON-REQUESTS-72435: + - '*': + reason: 'unaffected, only affects https->http authorization header redirection.' + expires: 2019-11-05T00:00:00.000Z + SNYK-PYTHON-REQUESTS-40470: + - '*': + reason: 'patched in latest python versions: https://bugs.python.org/issue27568' + "snyk:lic:pip:certifi:MPL-2.0": + - '*': + reason: "Accepting certifi’s MPL-2.0 license for now" + expires: "2030-12-31T23:59:59Z" + "snyk:lic:pip:jwcrypto:LGPL-3.0": + - '*': + reason: "Accepting jwcrypto’s LGPL-3.0 license for now" + expires: "2030-12-31T23:59:59Z" +patch: {} \ No newline at end of file diff --git a/packages/auth0_api_python/EXAMPLES.md b/packages/auth0_api_python/EXAMPLES.md new file mode 100644 index 0000000..b361dca --- /dev/null +++ b/packages/auth0_api_python/EXAMPLES.md @@ -0,0 +1,160 @@ +# Auth0 API Python Examples + +This document provides examples for using the `auth0-api-python` package to validate Auth0 tokens in your API. + +## Bearer Authentication + +Bearer authentication is the standard OAuth 2.0 token authentication method. + +### Using verify_access_token + +```python +import asyncio +from auth0_api_python import ApiClient, ApiClientOptions + +async def validate_bearer_token(headers): + api_client = ApiClient(ApiClientOptions( + domain="your-tenant.auth0.com", + audience="https://api.example.com" + )) + + try: + # Extract the token from the Authorization header + auth_header = headers.get("authorization", "") + if not auth_header.startswith("Bearer "): + return {"error": "Missing or invalid authorization header"}, 401 + + token = auth_header.split(" ")[1] + + # Verify the access token + claims = await api_client.verify_access_token(token) + return {"success": True, "user": claims["sub"]} + except Exception as e: + return {"error": str(e)}, getattr(e, "get_status_code", lambda: 401)() + +# Example usage +headers = {"authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."} +result = asyncio.run(validate_bearer_token(headers)) +``` + +### Using verify_request + +```python +import asyncio +from auth0_api_python import ApiClient, ApiClientOptions +from auth0_api_python.errors import BaseAuthError + +async def validate_request(headers): + api_client = ApiClient(ApiClientOptions( + domain="your-tenant.auth0.com", + audience="https://api.example.com" + )) + + try: + # Verify the request with Bearer token + claims = await api_client.verify_request( + headers=headers + ) + return {"success": True, "user": claims["sub"]} + except BaseAuthError as e: + return {"error": str(e)}, e.get_status_code(), e.get_headers() + +# Example usage +headers = {"authorization": "Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9..."} +result = asyncio.run(validate_request(headers)) +``` + + +## DPoP Authentication + +**DPoP (Demonstrating Proof-of-Possession)** is a security extension that binds access tokens to cryptographic keys, preventing token theft and replay attacks. + +This guide covers the DPoP implementation in `auth0-api-python` with complete examples for both operational modes. + +For more information about DPoP specification, see [RFC 9449](https://tools.ietf.org/html/rfc9449). + +## Configuration Modes + +### 1. Allowed Mode (Default) +```python +from auth0_api_python import ApiClient, ApiClientOptions + +api_client = ApiClient(ApiClientOptions( + domain="your-tenant.auth0.com", + audience="https://api.example.com", + dpop_enabled=True, # Default: enables DPoP support + dpop_required=False # Default: allows both Bearer and DPoP +)) +``` + +### 2. Required Mode +```python +api_client = ApiClient(ApiClientOptions( + domain="your-tenant.auth0.com", + audience="https://api.example.com", + dpop_required=True # Enforces DPoP-only authentication +)) +``` + +## Getting Started + +### Basic Usage with verify_request() + +The `verify_request()` method automatically detects the authentication scheme: + +```python +import asyncio +from auth0_api_python import ApiClient, ApiClientOptions + +async def handle_api_request(headers, http_method, http_url): + api_client = ApiClient(ApiClientOptions( + domain="your-tenant.auth0.com", + audience="https://api.example.com" + )) + + try: + # Automatically handles both Bearer and DPoP schemes + claims = await api_client.verify_request( + headers=headers, + http_method=http_method, + http_url=http_url + ) + return {"success": True, "user": claims["sub"]} + except Exception as e: + return {"error": str(e)}, e.get_status_code() + +# Example usage +headers = { + "authorization": "DPoP eyJ0eXAiOiJKV1Q...", + "dpop": "eyJ0eXAiOiJkcG9wK2p3dC..." +} +result = asyncio.run(handle_api_request(headers, "GET", "https://api.example.com/data")) +``` + +### Direct DPoP Proof Verification + +For more control, use `verify_dpop_proof()` directly: + +```python +async def verify_dpop_token(access_token, dpop_proof, http_method, http_url): + api_client = ApiClient(ApiClientOptions( + domain="your-tenant.auth0.com", + audience="https://api.example.com" + )) + + # First verify the access token + token_claims = await api_client.verify_access_token(access_token) + + # Then verify the DPoP proof + proof_claims = await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method=http_method, + http_url=http_url + ) + + return { + "token_claims": token_claims, + "proof_claims": proof_claims + } +``` \ No newline at end of file diff --git a/packages/auth0_api_python/README.md b/packages/auth0_api_python/README.md index 8f5b67e..97f7cd8 100644 --- a/packages/auth0_api_python/README.md +++ b/packages/auth0_api_python/README.md @@ -6,6 +6,24 @@ It’s intended as a foundation for building more framework-specific integration 📚 [Documentation](#documentation) - 🚀 [Getting Started](#getting-started) - 💬 [Feedback](#feedback) +## Features & Authentication Schemes + +This SDK provides comprehensive support for securing APIs with Auth0-issued access tokens: + +### **Authentication Schemes** +- **Bearer Token Authentication** - Traditional OAuth 2.0 Bearer tokens (RS256) +- **DPoP Authentication** - Enhanced security with Demonstrating Proof-of-Possession (ES256) +- **Mixed Mode Support** - Seamlessly handles both Bearer and DPoP in the same API + +### **Core Features** +- **Unified Entry Point**: `verify_request()` - automatically detects and validates Bearer or DPoP schemes +- **OIDC Discovery** - Automatic fetching of Auth0 metadata and JWKS +- **JWT Validation** - Complete RS256 signature verification with claim validation +- **DPoP Proof Verification** - Full RFC 9449 compliance with ES256 signature validation +- **Flexible Configuration** - Support for both "Allowed" and "Required" DPoP modes +- **Comprehensive Error Handling** - Detailed errors with proper HTTP status codes and WWW-Authenticate headers +- **Framework Agnostic** - Works with FastAPI, Django, Flask, or any Python web framework + ## Documentation - [Docs Site](https://auth0.com/docs) - explore our docs site and learn more about Auth0. @@ -80,6 +98,61 @@ decoded_and_verified_token = await api_client.verify_access_token( If the token lacks `my_custom_claim` or fails any standard check (issuer mismatch, expired token, invalid signature), the method raises a `VerifyAccessTokenError`. +### 4. DPoP Authentication + +> [!NOTE] +> This feature is currently available in [Early Access](https://auth0.com/docs/troubleshoot/product-lifecycle/product-release-stages#early-access). Please reach out to Auth0 support to get it enabled for your tenant. + +This library supports **DPoP (Demonstrating Proof-of-Possession)** for enhanced security, allowing clients to prove possession of private keys bound to access tokens. + +#### Allowed Mode (Default) + +Accepts both Bearer and DPoP tokens - ideal for gradual migration: + +```python +api_client = ApiClient(ApiClientOptions( + domain="", + audience="", + dpop_enabled=True, # Default - enables DPoP support + dpop_required=False # Default - allows both Bearer and DPoP +)) + +# Use verify_request() for automatic scheme detection +result = await api_client.verify_request( + headers={ + "authorization": "DPoP eyJ0eXAiOiJKV1Q...", # DPoP scheme + "dpop": "eyJ0eXAiOiJkcG9wK2p3dC...", # DPoP proof + }, + http_method="GET", + http_url="https://api.example.com/resource" +) +``` + +#### Required Mode + +Enforces DPoP-only authentication, rejecting Bearer tokens: + +```python +api_client = ApiClient(ApiClientOptions( + domain="", + audience="", + dpop_required=True # Rejects Bearer tokens +)) +``` + +#### Configuration Options + +```python +api_client = ApiClient(ApiClientOptions( + domain="", + audience="", + dpop_enabled=True, # Enable/disable DPoP support + dpop_required=False, # Require DPoP (reject Bearer) + dpop_iat_leeway=30, # Clock skew tolerance (seconds) + dpop_iat_offset=300, # Maximum proof age (seconds) +)) +``` + ## Feedback ### Contributing diff --git a/packages/auth0_api_python/poetry.lock b/packages/auth0_api_python/poetry.lock index e68f15f..149605e 100644 --- a/packages/auth0_api_python/poetry.lock +++ b/packages/auth0_api_python/poetry.lock @@ -1,4 +1,68 @@ -# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. + +[[package]] +name = "ada-url" +version = "1.25.0" +description = "URL parser and manipulator based on the WHAT WG URL standard" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "ada_url-1.25.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:745fa4448a796386f9330ecffad36c28ec319382ecae0337b97f2f91898dc6e6"}, + {file = "ada_url-1.25.0-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:7b5edc26b9dc4890696e002c5212de0370790e512609e63449cc536fbd88a38b"}, + {file = "ada_url-1.25.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:15e65f28fe7f779204a419598947037a574221998033620e7e22c0e5ccfb67fb"}, + {file = "ada_url-1.25.0-cp310-cp310-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:412b4708f65586fb775c5554d7bd4925d9dd5bc78a602cfa862db7a841c76b94"}, + {file = "ada_url-1.25.0-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:875b71e3ad468616260e8e83e1ef3d73edcc644a1d0ed0ec6e28b437a7c16e0f"}, + {file = "ada_url-1.25.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:0837369cad9e8b6eadafd7d2d074f04a41f485a33ef570c8064ff3582bede87a"}, + {file = "ada_url-1.25.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:9912262661c50729010cb8f0de78c069ab69a164e382b5cea0abe887038ee42f"}, + {file = "ada_url-1.25.0-cp310-cp310-win_amd64.whl", hash = "sha256:cc1abb2fb0e2de443d6d6f9746b8687fb535318397da9acd74496be6999bd7ab"}, + {file = "ada_url-1.25.0-cp311-cp311-macosx_10_15_universal2.whl", hash = "sha256:567b86a7c081632b445651fe8371f891699e658d1dac29162fef4984f89b21f0"}, + {file = "ada_url-1.25.0-cp311-cp311-macosx_10_15_x86_64.whl", hash = "sha256:6dc2d79ecfa24bc5b23e4a63b0d8cc1df2350729c51144da304d22210b077907"}, + {file = "ada_url-1.25.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:9ab2b3a0aee2a9737fa9d071a82ef9bb21cd5c2638a5621680632b8b2c22883b"}, + {file = "ada_url-1.25.0-cp311-cp311-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:646c240cccb65bcadf61b934b82e0e6c9bf1842b4b0c8492fc6921612761b7f4"}, + {file = "ada_url-1.25.0-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:727e060359188bb2f4f1c2e8b27e81ef5c778634df6910334d88e050430adbe9"}, + {file = "ada_url-1.25.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:98b7007d74294b0c10ded5500769c2adf8c1ffa584692510f7e990aeee2938f2"}, + {file = "ada_url-1.25.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:55bd83d820f7a987df7989695b0d964c16ded547d7e190c9dc9cf50c26160d00"}, + {file = "ada_url-1.25.0-cp311-cp311-win_amd64.whl", hash = "sha256:8133ccc849b14465b332c5f2ff3bbe692c9c0b4112f9e07f1efbdc690df822bf"}, + {file = "ada_url-1.25.0-cp312-cp312-macosx_10_15_universal2.whl", hash = "sha256:ce48f22981eefd50f526131034bb5cbab56034e8b367475c9b8098d9ba0489ff"}, + {file = "ada_url-1.25.0-cp312-cp312-macosx_10_15_x86_64.whl", hash = "sha256:97f502975a714aafe73b8d2c3b3ea3cbdeee3081ce619c37a5a1584ec1488234"}, + {file = "ada_url-1.25.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:244eefc2f7814dd25e40682812a7f36e7d1b16b7bdfa142abd397a954d20088f"}, + {file = "ada_url-1.25.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c1c4efeba2ec8fc5d9ab0195cd40d045f751967f192bab3d3685c8b9e95c5294"}, + {file = "ada_url-1.25.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:34bfa8a95892d10f12ac2203d66ff99f1bc7600fa5e4a0f1304902c037cb7fcc"}, + {file = "ada_url-1.25.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:9a232be74ed62c92463bf0f5b48914ab25b1e0ab6f88f7c65501063bde8bbbf5"}, + {file = "ada_url-1.25.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:29ae076213bc2f63b6a06e70c676e61e76708535e84b2eff97540907933d85a2"}, + {file = "ada_url-1.25.0-cp312-cp312-win_amd64.whl", hash = "sha256:867c023d220447811f5bd211b9d3957c8870ae963ee6f50b1781fbade2afaa84"}, + {file = "ada_url-1.25.0-cp313-cp313-macosx_10_15_universal2.whl", hash = "sha256:ba4c0856fa9edfbe347f5de390f81d1b230683718bbd9f88977acaabb5f9d53c"}, + {file = "ada_url-1.25.0-cp313-cp313-macosx_10_15_x86_64.whl", hash = "sha256:7e763188236de66e1b8762b69366f0cc92a0927b570d564a4cc27700be8783e3"}, + {file = "ada_url-1.25.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:c61e68df4f0a299dd851738fc1072b78a8551166dee049c989e3d088a53dc3f9"}, + {file = "ada_url-1.25.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0c6d07e378cc4ef6d3ddd46d7310932577cd93dd675334927bfb426688864c54"}, + {file = "ada_url-1.25.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e631331fc0bb4a032bb4bf437068945b5880597ee1a466b8c7a82d5af9c8d43d"}, + {file = "ada_url-1.25.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:4bba786e270af52a72869aa7407e2003a5c025a05ad4d736b7e4e35b0cda550b"}, + {file = "ada_url-1.25.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a16eeb4993750f960e1f61693cf0c53d3949cc3a93eb0ee330e600fd1925c7b8"}, + {file = "ada_url-1.25.0-cp313-cp313-win_amd64.whl", hash = "sha256:558c6f1f040f08515bd1bbc75c31e3f94ee5922f39fa517d577b6e08a0b31885"}, + {file = "ada_url-1.25.0-cp39-cp39-macosx_10_15_universal2.whl", hash = "sha256:f36a894e6bb66108b3c2da6108c17dfa52f654c2d8c7129acde1f2c3e0c15684"}, + {file = "ada_url-1.25.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:50e84950e583aad377bf822b11e219b0f01a5f9e32e961171acea29563f292ce"}, + {file = "ada_url-1.25.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:626234902c7c886f322eef7d3b5adfd04f8e5c309c643e3ab6e27f7216c3fb13"}, + {file = "ada_url-1.25.0-cp39-cp39-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:dd85de9f31c882896e171f3f15124556cbe7d69de9047e174e2b5b2429365bc2"}, + {file = "ada_url-1.25.0-cp39-cp39-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:acd8547d288fa7e1a3d660833f9274d85aa2a5cd592c921efccfa821a21d591a"}, + {file = "ada_url-1.25.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:f28dc864f71ee4e97d5fa278cba2e8ee6406715d87ed1463bf175f9935f0a611"}, + {file = "ada_url-1.25.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:49a81fb368e65c5594d9895c02353deca2bb677206b31fcd4256cba1396329a9"}, + {file = "ada_url-1.25.0-cp39-cp39-win_amd64.whl", hash = "sha256:fcb63bea78099df0efc0339bcbeccb5b5b06031e8fe3689dfaa69f0463e8a61a"}, + {file = "ada_url-1.25.0-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:a6bddee8bcf2d505ebce2d65bb192b845f907f97e8a22d7d36dea8ac79c5462d"}, + {file = "ada_url-1.25.0-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:92dda4112e681160b10d53dc6737ffbffba2fc4f8fded15d3d8f3e99e91e19c1"}, + {file = "ada_url-1.25.0-pp310-pypy310_pp73-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:a3ad3a6e75ca109dda6d352b6a289e61d05a0fcf710956e5a338f4d8afe363bf"}, + {file = "ada_url-1.25.0-pp310-pypy310_pp73-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:847a86d2ba958cc0beadf85576aa3b00e97c952d67f54a9a712b06d6e86389d6"}, + {file = "ada_url-1.25.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:7e969821fec735fadfaa3949376baff5c199019306cb179eba146636610c43a6"}, + {file = "ada_url-1.25.0-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:bdb801046f8384957f92706a74d0a2df4799b60d17b453441d91e730fc82c5fc"}, + {file = "ada_url-1.25.0-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:3f7d357a634c5258eb802fa1ab9c21fb74ed12486fb93ed9f7c89bf7addf4046"}, + {file = "ada_url-1.25.0-pp39-pypy39_pp73-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:85fb9d799ef9fc75d541721c7808b4177348f786512e707c094ea81255b304e3"}, + {file = "ada_url-1.25.0-pp39-pypy39_pp73-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a2c148e832588f252cae994470dc824c1840380ea69575769b1660c2d2663b1e"}, + {file = "ada_url-1.25.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:32fd7078a79813d27381c94726a440032ba225404a4ceace4c2431c388b23a00"}, + {file = "ada_url-1.25.0.tar.gz", hash = "sha256:d571c82a7d5b0965776b289de76319ed432bf85bc5e3d1dc624fb44cf50561be"}, +] + +[package.dependencies] +cffi = "*" [[package]] name = "anyio" @@ -20,7 +84,7 @@ typing_extensions = {version = ">=4.5", markers = "python_version < \"3.13\""} [package.extras] doc = ["Sphinx (>=8.2,<9.0)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx_rtd_theme"] -test = ["anyio[trio]", "blockbuster (>=1.5.23)", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1)", "uvloop (>=0.21)"] +test = ["anyio[trio]", "blockbuster (>=1.5.23)", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "trustme", "truststore (>=0.9.1) ; python_version >= \"3.10\"", "uvloop (>=0.21) ; platform_python_implementation == \"CPython\" and platform_system != \"Windows\" and python_version < \"3.14\""] trio = ["trio (>=0.26.1)"] [[package]] @@ -45,7 +109,7 @@ description = "Backport of CPython tarfile module" optional = false python-versions = ">=3.8" groups = ["dev"] -markers = "(platform_machine != \"ppc64le\" and platform_machine != \"s390x\") and python_version < \"3.12\"" +markers = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and python_version < \"3.12\"" files = [ {file = "backports.tarfile-1.2.0-py3-none-any.whl", hash = "sha256:77e284d754527b01fb1e6fa8a1afe577858ebe4e9dad8919e34c862cb399bc34"}, {file = "backports_tarfile-1.2.0.tar.gz", hash = "sha256:d75e02c268746e1b8144c278978b6e98e85de6ad16f8e4b0844a154557eca991"}, @@ -143,7 +207,7 @@ files = [ {file = "cffi-1.17.1-cp39-cp39-win_amd64.whl", hash = "sha256:d016c76bdd850f3c626af19b0542c9677ba156e4ee4fccfdd7848803533ef662"}, {file = "cffi-1.17.1.tar.gz", hash = "sha256:1c39c6016c32bc48dd54561950ebd6836e1670f2ae46128f67cf49e789c52824"}, ] -markers = {main = "platform_python_implementation != \"PyPy\"", dev = "(platform_machine != \"ppc64le\" and platform_machine != \"s390x\") and sys_platform == \"linux\" and platform_python_implementation != \"PyPy\""} +markers = {dev = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and sys_platform == \"linux\" and platform_python_implementation != \"PyPy\""} [package.dependencies] pycparser = "*" @@ -340,7 +404,7 @@ files = [ tomli = {version = "*", optional = true, markers = "python_full_version <= \"3.11.0a6\" and extra == \"toml\""} [package.extras] -toml = ["tomli"] +toml = ["tomli ; python_full_version <= \"3.11.0a6\""] [[package]] name = "cryptography" @@ -378,7 +442,7 @@ files = [ {file = "cryptography-43.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:2ce6fae5bdad59577b44e4dfed356944fbf1d925269114c28be377692643b4ff"}, {file = "cryptography-43.0.3.tar.gz", hash = "sha256:315b9001266a492a6ff443b61238f956b214dbec9910a081ba5b6646a055a805"}, ] -markers = {dev = "(platform_machine != \"ppc64le\" and platform_machine != \"s390x\") and sys_platform == \"linux\""} +markers = {dev = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and sys_platform == \"linux\""} [package.dependencies] cffi = {version = ">=1.12", markers = "platform_python_implementation != \"PyPy\""} @@ -474,7 +538,7 @@ httpcore = "==1.*" idna = "*" [package.extras] -brotli = ["brotli", "brotlicffi"] +brotli = ["brotli ; platform_python_implementation == \"CPython\"", "brotlicffi ; platform_python_implementation != \"CPython\""] cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] http2 = ["h2 (>=3,<5)"] socks = ["socksio (==1.*)"] @@ -522,7 +586,7 @@ description = "Read metadata from Python packages" optional = false python-versions = ">=3.9" groups = ["dev"] -markers = "(platform_machine != \"ppc64le\" and platform_machine != \"s390x\") and python_version < \"3.12\" or python_version < \"3.10\"" +markers = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and python_version < \"3.12\" or python_version == \"3.9\"" files = [ {file = "importlib_metadata-8.6.1-py3-none-any.whl", hash = "sha256:02a89390c1e15fdfdc0d7c6b25cb3e62650d0494005c97d6f148bf5b9787525e"}, {file = "importlib_metadata-8.6.1.tar.gz", hash = "sha256:310b41d755445d74569f993ccfc22838295d9fe005425094fad953d7f15c8580"}, @@ -532,12 +596,12 @@ files = [ zipp = ">=3.20" [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] enabler = ["pytest-enabler (>=2.2)"] perf = ["ipython"] -test = ["flufl.flake8", "importlib_resources (>=1.3)", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] +test = ["flufl.flake8", "importlib_resources (>=1.3) ; python_version < \"3.9\"", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] type = ["pytest-mypy"] [[package]] @@ -590,7 +654,7 @@ files = [ [package.extras] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] -test = ["portend", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy", "pytest-ruff (>=0.2.1)"] +test = ["portend", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] [[package]] name = "jaraco-functools" @@ -609,7 +673,7 @@ files = [ more-itertools = "*" [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] enabler = ["pytest-enabler (>=2.2)"] @@ -623,14 +687,14 @@ description = "Low-level, pure Python DBus protocol wrapper." optional = false python-versions = ">=3.7" groups = ["dev"] -markers = "(platform_machine != \"ppc64le\" and platform_machine != \"s390x\") and sys_platform == \"linux\"" +markers = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and sys_platform == \"linux\"" files = [ {file = "jeepney-0.9.0-py3-none-any.whl", hash = "sha256:97e5714520c16fc0a45695e5365a2e11b81ea79bba796e26f9f1d178cb182683"}, {file = "jeepney-0.9.0.tar.gz", hash = "sha256:cf0e9e845622b81e4a28df94c40345400256ec608d0e55bb8a3feaa9163f5732"}, ] [package.extras] -test = ["async-timeout", "pytest", "pytest-asyncio (>=0.17)", "pytest-trio", "testpath", "trio"] +test = ["async-timeout ; python_version < \"3.11\"", "pytest", "pytest-asyncio (>=0.17)", "pytest-trio", "testpath", "trio"] trio = ["trio"] [[package]] @@ -656,7 +720,7 @@ pywin32-ctypes = {version = ">=0.2.0", markers = "sys_platform == \"win32\""} SecretStorage = {version = ">=3.2", markers = "sys_platform == \"linux\""} [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] completion = ["shtab (>=1.1.0)"] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] @@ -787,7 +851,7 @@ files = [ {file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"}, {file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"}, ] -markers = {main = "platform_python_implementation != \"PyPy\"", dev = "(platform_machine != \"ppc64le\" and platform_machine != \"s390x\") and sys_platform == \"linux\" and platform_python_implementation != \"PyPy\""} +markers = {dev = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and sys_platform == \"linux\" and platform_python_implementation != \"PyPy\""} [[package]] name = "pygments" @@ -909,7 +973,7 @@ description = "A (partial) reimplementation of pywin32 using ctypes/cffi" optional = false python-versions = ">=3.6" groups = ["dev"] -markers = "(platform_machine != \"ppc64le\" and platform_machine != \"s390x\") and sys_platform == \"win32\"" +markers = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and sys_platform == \"win32\"" files = [ {file = "pywin32-ctypes-0.2.3.tar.gz", hash = "sha256:d162dc04946d704503b2edc4d55f3dba5c1d539ead017afa00142c38b9885755"}, {file = "pywin32_ctypes-0.2.3-py3-none-any.whl", hash = "sha256:8a1513379d709975552d202d942d9837758905c8d01eb82b8bcc30918929e7b8"}, @@ -1007,6 +1071,33 @@ typing-extensions = {version = ">=4.0.0,<5.0", markers = "python_version < \"3.1 [package.extras] jupyter = ["ipywidgets (>=7.5.1,<9)"] +[[package]] +name = "ruff" +version = "0.1.15" +description = "An extremely fast Python linter and code formatter, written in Rust." +optional = false +python-versions = ">=3.7" +groups = ["dev"] +files = [ + {file = "ruff-0.1.15-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:5fe8d54df166ecc24106db7dd6a68d44852d14eb0729ea4672bb4d96c320b7df"}, + {file = "ruff-0.1.15-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6f0bfbb53c4b4de117ac4d6ddfd33aa5fc31beeaa21d23c45c6dd249faf9126f"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e0d432aec35bfc0d800d4f70eba26e23a352386be3a6cf157083d18f6f5881c8"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:9405fa9ac0e97f35aaddf185a1be194a589424b8713e3b97b762336ec79ff807"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c66ec24fe36841636e814b8f90f572a8c0cb0e54d8b5c2d0e300d28a0d7bffec"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:6f8ad828f01e8dd32cc58bc28375150171d198491fc901f6f98d2a39ba8e3ff5"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:86811954eec63e9ea162af0ffa9f8d09088bab51b7438e8b6488b9401863c25e"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:fd4025ac5e87d9b80e1f300207eb2fd099ff8200fa2320d7dc066a3f4622dc6b"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b17b93c02cdb6aeb696effecea1095ac93f3884a49a554a9afa76bb125c114c1"}, + {file = "ruff-0.1.15-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ddb87643be40f034e97e97f5bc2ef7ce39de20e34608f3f829db727a93fb82c5"}, + {file = "ruff-0.1.15-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:abf4822129ed3a5ce54383d5f0e964e7fef74a41e48eb1dfad404151efc130a2"}, + {file = "ruff-0.1.15-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6c629cf64bacfd136c07c78ac10a54578ec9d1bd2a9d395efbee0935868bf852"}, + {file = "ruff-0.1.15-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:1bab866aafb53da39c2cadfb8e1c4550ac5340bb40300083eb8967ba25481447"}, + {file = "ruff-0.1.15-py3-none-win32.whl", hash = "sha256:2417e1cb6e2068389b07e6fa74c306b2810fe3ee3476d5b8a96616633f40d14f"}, + {file = "ruff-0.1.15-py3-none-win_amd64.whl", hash = "sha256:3837ac73d869efc4182d9036b1405ef4c73d9b1f88da2413875e34e0d6919587"}, + {file = "ruff-0.1.15-py3-none-win_arm64.whl", hash = "sha256:9a933dfb1c14ec7a33cceb1e49ec4a16b51ce3c20fd42663198746efc0427360"}, + {file = "ruff-0.1.15.tar.gz", hash = "sha256:f6dfa8c1b21c913c326919056c390966648b680966febcb796cc9d1aaab8564e"}, +] + [[package]] name = "secretstorage" version = "3.3.3" @@ -1014,7 +1105,7 @@ description = "Python bindings to FreeDesktop.org Secret Service API" optional = false python-versions = ">=3.6" groups = ["dev"] -markers = "(platform_machine != \"ppc64le\" and platform_machine != \"s390x\") and sys_platform == \"linux\"" +markers = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and sys_platform == \"linux\"" files = [ {file = "SecretStorage-3.3.3-py3-none-any.whl", hash = "sha256:f356e6628222568e3af06f2eba8df495efa13b3b63081dafd4f7d9a7b7bc9f99"}, {file = "SecretStorage-3.3.3.tar.gz", hash = "sha256:2403533ef369eca6d2ba81718576c5e0f564d5cca1b58f73a8b23e7d4eeebd77"}, @@ -1132,7 +1223,7 @@ files = [ ] [package.extras] -brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] +brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] @@ -1144,21 +1235,21 @@ description = "Backport of pathlib-compatible object wrapper for zip files" optional = false python-versions = ">=3.9" groups = ["dev"] -markers = "(platform_machine != \"ppc64le\" and platform_machine != \"s390x\") and python_version < \"3.12\" or python_version < \"3.10\"" +markers = "platform_machine != \"ppc64le\" and platform_machine != \"s390x\" and python_version < \"3.12\" or python_version == \"3.9\"" files = [ {file = "zipp-3.21.0-py3-none-any.whl", hash = "sha256:ac1bbe05fd2991f160ebce24ffbac5f6d11d83dc90891255885223d42b3cd931"}, {file = "zipp-3.21.0.tar.gz", hash = "sha256:2c9958f6430a2040341a52eb608ed6dd93ef4392e02ffe219417c1b28b5dd1f4"}, ] [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] enabler = ["pytest-enabler (>=2.2)"] -test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"] +test = ["big-O", "importlib-resources ; python_version < \"3.9\"", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-ignore-flaky"] type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "f520b72141154b1ab70c231fd79737388402228a6a98305dcb0d3c78cd069cdf" +content-hash = "a31e9d335c52ee7f6daf3f436607275a5a496194d4377f503deb12fd2a75251c" diff --git a/packages/auth0_api_python/pyproject.toml b/packages/auth0_api_python/pyproject.toml index b6d6fe0..dbc7eba 100644 --- a/packages/auth0_api_python/pyproject.toml +++ b/packages/auth0_api_python/pyproject.toml @@ -15,6 +15,7 @@ python = "^3.9" authlib = "^1.0" # For JWT/OIDC features requests = "^2.31.0" # If you use requests for HTTP calls (e.g., discovery) httpx = "^0.28.1" +ada-url = "^1.25.0" [tool.poetry.group.dev.dependencies] pytest = "^8.0" @@ -23,6 +24,7 @@ pytest-asyncio = "^0.20.3" pytest-mock = "^3.14.0" pytest-httpx = "^0.35.0" twine = "^6.1.0" +ruff = "^0.1.0" [tool.pytest.ini_options] addopts = "--cov=src --cov-report=term-missing:skip-covered --cov-report=xml" diff --git a/packages/auth0_api_python/requirements-dev.txt b/packages/auth0_api_python/requirements-dev.txt new file mode 100644 index 0000000..1d475b1 --- /dev/null +++ b/packages/auth0_api_python/requirements-dev.txt @@ -0,0 +1,7 @@ +pytest>=8.0,<9.0 +pytest-cov>=4.0,<5.0 +pytest-asyncio>=0.20.3,<1.0 +pytest-mock>=3.14.0,<4.0 +pytest-httpx>=0.35.0,<1.0 +ruff>=0.1.0,<1.0 +twine>=6.1.0,<7.0 diff --git a/packages/auth0_api_python/requirements.txt b/packages/auth0_api_python/requirements.txt new file mode 100644 index 0000000..6542198 --- /dev/null +++ b/packages/auth0_api_python/requirements.txt @@ -0,0 +1,16 @@ +authlib>=1.0,<2.0 +requests>=2.31.0,<3.0 +httpx>=0.28.1,<1.0 +ada-url>=1.25.0,<2.0 +certifi>=2025.1.31 +cryptography>=43.0.3 +idna>=3.10 +sniffio>=1.3.1 +h11>=0.14.0 +httpcore>=1.0.7 +anyio>=4.9.0 +charset-normalizer>=3.4.1 +urllib3>=2.3.0 +rfc3986>=2.0.0 +cffi>=1.17.1 +pycparser>=2.22 \ No newline at end of file diff --git a/packages/auth0_api_python/src/auth0_api_python/__init__.py b/packages/auth0_api_python/src/auth0_api_python/__init__.py index a9b98fd..f487dd8 100644 --- a/packages/auth0_api_python/src/auth0_api_python/__init__.py +++ b/packages/auth0_api_python/src/auth0_api_python/__init__.py @@ -11,4 +11,4 @@ __all__ = [ "ApiClient", "ApiClientOptions" -] \ No newline at end of file +] diff --git a/packages/auth0_api_python/src/auth0_api_python/api_client.py b/packages/auth0_api_python/src/auth0_api_python/api_client.py index b38409e..8868445 100644 --- a/packages/auth0_api_python/src/auth0_api_python/api_client.py +++ b/packages/auth0_api_python/src/auth0_api_python/api_client.py @@ -1,11 +1,26 @@ import time -from typing import Optional, List, Dict, Any +from typing import Any, Optional -from authlib.jose import JsonWebToken, JsonWebKey +from authlib.jose import JsonWebKey, JsonWebToken from .config import ApiClientOptions -from .errors import MissingRequiredArgumentError, VerifyAccessTokenError -from .utils import fetch_oidc_metadata, fetch_jwks, get_unverified_header +from .errors import ( + BaseAuthError, + InvalidAuthSchemeError, + InvalidDpopProofError, + MissingAuthorizationError, + MissingRequiredArgumentError, + VerifyAccessTokenError, +) +from .utils import ( + calculate_jwk_thumbprint, + fetch_jwks, + fetch_oidc_metadata, + get_unverified_header, + normalize_url_for_htu, + sha256_base64url, +) + class ApiClient: """ @@ -14,46 +29,203 @@ class ApiClient: """ def __init__(self, options: ApiClientOptions): - if not options.domain: raise MissingRequiredArgumentError("domain") if not options.audience: raise MissingRequiredArgumentError("audience") self.options = options - self._metadata: Optional[Dict[str, Any]] = None - self._jwks_data: Optional[Dict[str, Any]] = None + self._metadata: Optional[dict[str, Any]] = None + self._jwks_data: Optional[dict[str, Any]] = None self._jwt = JsonWebToken(["RS256"]) - async def _discover(self) -> Dict[str, Any]: - """Lazy-load OIDC discovery metadata.""" - if self._metadata is None: - self._metadata = await fetch_oidc_metadata( - domain=self.options.domain, - custom_fetch=self.options.custom_fetch - ) - return self._metadata + self._dpop_algorithms = ["ES256"] + self._dpop_jwt = JsonWebToken(self._dpop_algorithms) + + def is_dpop_required(self) -> bool: + """Check if DPoP authentication is required.""" + return getattr(self.options, "dpop_required", False) + + + async def verify_request( + self, + headers: dict[str, str], + http_method: Optional[str] = None, + http_url: Optional[str] = None + ) -> dict[str, Any]: + """ + Dispatch based on Authorization scheme: + • If scheme is 'DPoP', verifies both access token and DPoP proof + • If scheme is 'Bearer', verifies only the access token + + Args: + headers: HTTP headers dict containing (header keys should be lowercase): + - "authorization": The Authorization header value (required) + - "dpop": The DPoP proof header value (required for DPoP) + http_method: The HTTP method (required for DPoP) + http_url: The HTTP URL (required for DPoP) + + Returns: + The decoded access token claims + + Raises: + MissingRequiredArgumentError: If required args are missing + InvalidAuthSchemeError: If an unsupported scheme is provided + InvalidDpopProofError: If DPoP verification fails + VerifyAccessTokenError: If access token verification fails + """ + authorization_header = headers.get("authorization", "") + dpop_proof = headers.get("dpop") + + if not authorization_header: + if self.is_dpop_required(): + raise self._prepare_error( + InvalidAuthSchemeError("Expecting Authorization header with DPoP scheme.") + ) + else : + raise self._prepare_error(MissingAuthorizationError()) + + + parts = authorization_header.split(" ") + if len(parts) != 2: + if len(parts) < 2: + raise self._prepare_error(MissingAuthorizationError()) + elif len(parts) > 2: + raise self._prepare_error( + InvalidAuthSchemeError("Invalid Authorization HTTP Header Format for authorization") + ) + + scheme, token = parts + + scheme = scheme.strip().lower() + + if self.is_dpop_required() and scheme != "dpop": + raise self._prepare_error( + InvalidAuthSchemeError( + f"Invalid scheme. Expected DPoP{', but got ' + scheme + '.' if scheme and scheme != 'dpop' else ' scheme.'}" + ), + auth_scheme=scheme + ) + if not token.strip(): + raise self._prepare_error(MissingAuthorizationError()) - async def _load_jwks(self) -> Dict[str, Any]: - """Fetches and caches JWKS data from the OIDC metadata.""" - if self._jwks_data is None: - metadata = await self._discover() - jwks_uri = metadata["jwks_uri"] - self._jwks_data = await fetch_jwks( - jwks_uri=jwks_uri, - custom_fetch=self.options.custom_fetch - ) - return self._jwks_data + + if scheme == "dpop": + if not self.options.dpop_enabled: + raise self._prepare_error(MissingAuthorizationError()) + + if not dpop_proof: + if self.is_dpop_required(): + raise self._prepare_error( + InvalidAuthSchemeError("Expecting Authorization header with DPoP scheme."), + auth_scheme=scheme + ) + else: + raise self._prepare_error( + InvalidDpopProofError("Operation indicated DPoP use but the request has no DPoP HTTP Header"), + auth_scheme=scheme + ) + + if "," in dpop_proof: + raise self._prepare_error( + InvalidDpopProofError("Multiple DPoP proofs are not allowed"), + auth_scheme=scheme + ) + + try: + dpop_header = get_unverified_header(dpop_proof) + except Exception: + raise self._prepare_error(InvalidDpopProofError("Failed to verify DPoP proof"), auth_scheme=scheme) + + if not http_method or not http_url: + raise self._prepare_error( + InvalidDpopProofError("Operation indicated DPoP use but the request has no http_method or http_url"), auth_scheme=scheme + ) + + try: + access_token_claims = await self.verify_access_token(token) + except VerifyAccessTokenError as e: + raise self._prepare_error(e, auth_scheme=scheme) + + cnf_claim = access_token_claims.get("cnf") + + if not cnf_claim: + raise self._prepare_error( + InvalidDpopProofError("Operation indicated DPoP use but the JWT Access Token has no jkt confirmation claim"), + auth_scheme=scheme + ) + + if not isinstance(cnf_claim, dict): + raise self._prepare_error( + InvalidDpopProofError("Operation indicated DPoP use but the JWT Access Token has invalid confirmation claim format"), + auth_scheme=scheme + ) + try: + await self.verify_dpop_proof( + access_token=token, + proof=dpop_proof, + http_method=http_method, + http_url=http_url + ) + except InvalidDpopProofError as e: + raise self._prepare_error(e, auth_scheme=scheme) + + # DPoP binding verification + jwk_dict = dpop_header["jwk"] + actual_jkt = calculate_jwk_thumbprint(jwk_dict) + expected_jkt = cnf_claim.get("jkt") + + if not expected_jkt: + raise self._prepare_error( + VerifyAccessTokenError("Access token 'cnf' claim missing 'jkt'"), + auth_scheme=scheme + ) + + if expected_jkt != actual_jkt: + raise self._prepare_error( + VerifyAccessTokenError("JWT Access Token confirmation mismatch"), + auth_scheme=scheme + ) + + return access_token_claims + + if scheme == "bearer": + if dpop_proof: + if self.options.dpop_enabled: + raise self._prepare_error( + InvalidAuthSchemeError( + "Operation indicated DPoP use but the request's Authorization HTTP Header scheme is not DPoP" + ), + auth_scheme=scheme + ) + + try: + claims = await self.verify_access_token(token) + if claims.get("cnf") and isinstance(claims["cnf"], dict) and claims["cnf"].get("jkt"): + if self.options.dpop_enabled: + raise self._prepare_error( + InvalidAuthSchemeError( + "Operation indicated DPoP use but the request's Authorization HTTP Header scheme is not DPoP" + ), + auth_scheme=scheme + ) + + + return claims + except VerifyAccessTokenError as e: + raise self._prepare_error(e, auth_scheme=scheme) + + raise self._prepare_error(MissingAuthorizationError()) async def verify_access_token( self, access_token: str, - required_claims: Optional[List[str]] = None - ) -> Dict[str, Any]: + required_claims: Optional[list[str]] = None + ) -> dict[str, Any]: """ Asynchronously verifies the provided JWT access token. - + - Fetches OIDC metadata and JWKS if not already cached. - Decodes and validates signature (RS256) with the correct key. - Checks standard claims: 'iss', 'aud', 'exp', 'iat' @@ -71,9 +243,8 @@ async def verify_access_token( required_claims = required_claims or [] - try: - header = await get_unverified_header(access_token) + header = get_unverified_header(access_token) kid = header["kid"] except Exception as e: raise VerifyAccessTokenError(f"Failed to parse token header: {str(e)}") from e @@ -100,10 +271,9 @@ async def verify_access_token( metadata = await self._discover() issuer = metadata["issuer"] - if claims.get("iss") != issuer: raise VerifyAccessTokenError("Issuer mismatch") - + expected_aud = self.options.audience actual_aud = claims.get("aud") @@ -120,9 +290,254 @@ async def verify_access_token( if "iat" not in claims: raise VerifyAccessTokenError("Missing 'iat' claim in token") - #Additional required_claims + # Additional required_claims for rc in required_claims: if rc not in claims: raise VerifyAccessTokenError(f"Missing required claim: {rc}") - return claims \ No newline at end of file + return claims + + async def verify_dpop_proof( + self, + access_token: str, + proof: str, + http_method: str, + http_url: str + ) -> dict[str, Any]: + """ + 1. Single well-formed compact JWS + 2. typ="dpop+jwt", alg∈allowed, alg≠none + 3. jwk header present & public only + 4. Signature verifies with jwk + 5. Validates all required claims + Raises InvalidDpopProofError on any failure. + """ + if not proof: + raise MissingRequiredArgumentError("dpop_proof") + if not access_token: + raise MissingRequiredArgumentError("access_token") + if not http_method or not http_url: + raise MissingRequiredArgumentError("http_method/http_url") + + header = get_unverified_header(proof) + + if header.get("typ") != "dpop+jwt": + raise InvalidDpopProofError("Unexpected JWT 'typ' header parameter value") + + alg = header.get("alg") + if alg not in self._dpop_algorithms: + raise InvalidDpopProofError(f"Unsupported alg: {alg}") + + jwk_dict = header.get("jwk") + if not jwk_dict or not isinstance(jwk_dict, dict): + raise InvalidDpopProofError("Missing or invalid jwk in header") + + if "d" in jwk_dict: + raise InvalidDpopProofError("Private key material found in jwk header") + + if jwk_dict.get("kty") != "EC": + raise InvalidDpopProofError("Only EC keys are supported for DPoP") + + if jwk_dict.get("crv") != "P-256": + raise InvalidDpopProofError("Only P-256 curve is supported") + + public_key = JsonWebKey.import_key(jwk_dict) + try: + claims = self._dpop_jwt.decode(proof, public_key) + except Exception as e: + raise InvalidDpopProofError(f"JWT signature verification failed: {e}") + + # Checks all required claims are present + self._validate_claims_presence(claims, ["iat", "ath", "htm", "htu", "jti"]) + + jti = claims["jti"] + + if not isinstance(jti, str): + raise InvalidDpopProofError("jti claim must be a string") + + if not jti.strip(): + raise InvalidDpopProofError("jti claim must not be empty") + + + now = int(time.time()) + iat = claims["iat"] + offset = getattr(self.options, "dpop_iat_offset", 300) # default 5 minutes + leeway = getattr(self.options, "dpop_iat_leeway", 30) # default 30 seconds + + if not isinstance(iat, (int, float)): + raise InvalidDpopProofError("Invalid iat claim (must be integer or float)") + + if iat < now - offset: + raise InvalidDpopProofError("DPoP Proof iat is too old") + elif iat > now + leeway: + raise InvalidDpopProofError("DPoP Proof iat is from the future") + + if claims["htm"].lower() != http_method.lower(): + raise InvalidDpopProofError("DPoP Proof htm mismatch") + + if normalize_url_for_htu(claims["htu"]) != normalize_url_for_htu(http_url): + raise InvalidDpopProofError("DPoP Proof htu mismatch") + + if claims["ath"] != sha256_base64url(access_token): + raise InvalidDpopProofError("DPoP Proof ath mismatch") + + return claims + + # ===== Private Methods ===== + + async def _discover(self) -> dict[str, Any]: + """Lazy-load OIDC discovery metadata.""" + if self._metadata is None: + self._metadata = await fetch_oidc_metadata( + domain=self.options.domain, + custom_fetch=self.options.custom_fetch + ) + return self._metadata + + async def _load_jwks(self) -> dict[str, Any]: + """Fetches and caches JWKS data from the OIDC metadata.""" + if self._jwks_data is None: + metadata = await self._discover() + jwks_uri = metadata["jwks_uri"] + self._jwks_data = await fetch_jwks( + jwks_uri=jwks_uri, + custom_fetch=self.options.custom_fetch + ) + return self._jwks_data + + def _validate_claims_presence( + self, + claims: dict[str, Any], + required_claims: list[str] + ) -> None: + """ + Validates that all required claims are present in the claims dict. + + Args: + claims: The claims dictionary to validate + required_claims: List of claim names that must be present + + Raises: + InvalidDpopProofError: If any required claim is missing + """ + missing_claims = [] + + for claim in required_claims: + if claim not in claims: + missing_claims.append(claim) + + if missing_claims: + if len(missing_claims) == 1: + error_message = f"Missing required claim: {missing_claims[0]}" + else: + error_message = f"Missing required claims: {', '.join(missing_claims)}" + + raise InvalidDpopProofError(error_message) + + def _prepare_error(self, error: BaseAuthError, auth_scheme: Optional[str] = None) -> BaseAuthError: + """ + Prepare an error with WWW-Authenticate headers based on error type and context. + + Args: + error: The error to prepare + auth_scheme: The authentication scheme that was used ("bearer" or "dpop") + """ + error_code = error.get_error_code() + error_description = error.get_error_description() + + www_auth_headers = self._build_www_authenticate( + error_code=error_code, + error_description=error_description, + auth_scheme=auth_scheme + ) + + headers = {} + www_auth_values = [] + for header_name, header_value in www_auth_headers: + if header_name == "WWW-Authenticate": + www_auth_values.append(header_value) + + if www_auth_values: + headers["WWW-Authenticate"] = ", ".join(www_auth_values) + + error._headers = headers + + return error + + def _build_www_authenticate( + self, + *, + error_code: Optional[str] = None, + error_description: Optional[str] = None, + auth_scheme: Optional[str] = None + ) -> list[tuple[str, str]]: + """ + Returns one or two ('WWW-Authenticate', ...) tuples based on context. + If dpop_required mode → single DPoP challenge (with optional error params). + Otherwise → Bearer and/or DPoP challenges based on auth_scheme and error. + + Args: + error_code: Error code (e.g., "invalid_token", "invalid_request") + error_description: Error description if any + auth_scheme: The authentication scheme that was used ("bearer" or "dpop") + """ + # If DPoP is disabled, only return Bearer challenges + if not self.options.dpop_enabled: + if error_code and error_code != "unauthorized": + bearer_parts = [] + bearer_parts.append(f'error="{error_code}"') + if error_description: + bearer_parts.append(f'error_description="{error_description}"') + return [("WWW-Authenticate", "Bearer " + ", ".join(bearer_parts))] + return [("WWW-Authenticate", "Bearer")] + + algs = " ".join(self._dpop_algorithms) + dpop_required = self.is_dpop_required() + + # No error details + if error_code == "unauthorized" or not error_code: + if dpop_required: + return [("WWW-Authenticate", f'DPoP algs="{algs}"')] + return [("WWW-Authenticate", f'Bearer, DPoP algs="{algs}"')] + + if dpop_required: + # DPoP-required mode: Single DPoP challenge with error + dpop_parts = [] + if error_code: + dpop_parts.append(f'error="{error_code}"') + if error_description: + dpop_parts.append(f'error_description="{error_description}"') + dpop_parts.append(f'algs="{algs}"') + dpop_header = "DPoP " + ", ".join(dpop_parts) + return [("WWW-Authenticate", dpop_header)] + + # DPoP-allowed mode: For DPoP errors, always include both challenges + if auth_scheme == "dpop" and error_code: + bearer_header = "Bearer" + dpop_parts = [] + if error_code: + dpop_parts.append(f'error="{error_code}"') + if error_description: + dpop_parts.append(f'error_description="{error_description}"') + dpop_parts.append(f'algs="{algs}"') + dpop_header = "DPoP " + ", ".join(dpop_parts) + return [ + ("WWW-Authenticate", bearer_header), + ("WWW-Authenticate", dpop_header), + ] + + # If auth_scheme is "bearer", include error on Bearer challenge + if auth_scheme == "bearer" and error_code: + bearer_parts = [] + bearer_parts.append(f'error="{error_code}"') + if error_description: + bearer_parts.append(f'error_description="{error_description}"') + bearer_header = "Bearer " + ", ".join(bearer_parts) + dpop_header = f'DPoP algs="{algs}"' + return [("WWW-Authenticate", f'{bearer_header}, {dpop_header}')] + + # Default: no error or unknown context + return [ + ("WWW-Authenticate", "Bearer"), + ("WWW-Authenticate", f'DPoP algs="{algs}"'), + ] diff --git a/packages/auth0_api_python/src/auth0_api_python/config.py b/packages/auth0_api_python/src/auth0_api_python/config.py index de2f4f8..0cd555a 100644 --- a/packages/auth0_api_python/src/auth0_api_python/config.py +++ b/packages/auth0_api_python/src/auth0_api_python/config.py @@ -2,7 +2,8 @@ Configuration classes and utilities for auth0-api-python. """ -from typing import Optional, Callable +from typing import Callable, Optional + class ApiClientOptions: """ @@ -12,13 +13,25 @@ class ApiClientOptions: domain: The Auth0 domain, e.g., "my-tenant.us.auth0.com". audience: The expected 'aud' claim in the token. custom_fetch: Optional callable that can replace the default HTTP fetch logic. + dpop_enabled: Whether DPoP is enabled (default: True for backward compatibility). + dpop_required: Whether DPoP is required (default: False, allows both Bearer and DPoP). + dpop_iat_leeway: Leeway in seconds for DPoP proof iat claim (default: 30). + dpop_iat_offset: Maximum age in seconds for DPoP proof iat claim (default: 300). """ def __init__( self, domain: str, audience: str, - custom_fetch: Optional[Callable[..., object]] = None + custom_fetch: Optional[Callable[..., object]] = None, + dpop_enabled: bool = True, + dpop_required: bool = False, + dpop_iat_leeway: int = 30, + dpop_iat_offset: int = 300, ): self.domain = domain self.audience = audience self.custom_fetch = custom_fetch + self.dpop_enabled = dpop_enabled + self.dpop_required = dpop_required + self.dpop_iat_leeway = dpop_iat_leeway + self.dpop_iat_offset = dpop_iat_offset diff --git a/packages/auth0_api_python/src/auth0_api_python/errors.py b/packages/auth0_api_python/src/auth0_api_python/errors.py index e450059..9218b73 100644 --- a/packages/auth0_api_python/src/auth0_api_python/errors.py +++ b/packages/auth0_api_python/src/auth0_api_python/errors.py @@ -1,21 +1,97 @@ """ -Custom exceptions for auth0-api-python SDK +Custom exceptions for auth0-api-python SDK with HTTP response metadata """ -class MissingRequiredArgumentError(Exception): + +class BaseAuthError(Exception): + """Base class for all auth errors with HTTP response metadata.""" + + def __init__(self, message: str): + super().__init__(message) + self.message = message + self.name = self.__class__.__name__ + self._headers = {} # Will be set by ApiClient._prepare_error + + def get_status_code(self) -> int: + """Return the HTTP status code for this error.""" + raise NotImplementedError("Subclasses must implement get_status_code()") + + def get_error_code(self) -> str: + """Return the OAuth/DPoP error code.""" + raise NotImplementedError("Subclasses must implement get_error_code()") + + def get_error_description(self) -> str: + """Return the error description.""" + return self.message + + def get_headers(self) -> dict[str, str]: + """Return HTTP headers (including WWW-Authenticate if set).""" + return self._headers + + +class MissingRequiredArgumentError(BaseAuthError): """Error raised when a required argument is missing.""" - code = "missing_required_argument_error" def __init__(self, argument: str): super().__init__(f"The argument '{argument}' is required but was not provided.") self.argument = argument - self.name = self.__class__.__name__ + def get_status_code(self) -> int: + return 400 -class VerifyAccessTokenError(Exception): + def get_error_code(self) -> str: + return "invalid_request" + + +class VerifyAccessTokenError(BaseAuthError): """Error raised when verifying the access token fails.""" - code = "verify_access_token_error" + + def get_status_code(self) -> int: + return 401 + + def get_error_code(self) -> str: + return "invalid_token" + + +class InvalidAuthSchemeError(BaseAuthError): + """Error raised when the provided authentication scheme is unsupported.""" def __init__(self, message: str): super().__init__(message) - self.name = self.__class__.__name__ + if ":" in message and "'" in message: + self.scheme = message.split("'")[1] + else: + self.scheme = None + + def get_status_code(self) -> int: + return 400 + + def get_error_code(self) -> str: + return "invalid_request" + + +class InvalidDpopProofError(BaseAuthError): + """Error raised when validating a DPoP proof fails.""" + + def get_status_code(self) -> int: + return 400 + + def get_error_code(self) -> str: + return "invalid_dpop_proof" + + +class MissingAuthorizationError(BaseAuthError): + """Authorization header is missing, empty, or malformed.""" + + def __init__(self): + super().__init__("") + + def get_status_code(self) -> int: + return 401 + + def get_error_code(self) -> str: + return "" + + def get_error_description(self) -> str: + return "" + diff --git a/packages/auth0_api_python/src/auth0_api_python/token_utils.py b/packages/auth0_api_python/src/auth0_api_python/token_utils.py index 8f75b98..c234681 100644 --- a/packages/auth0_api_python/src/auth0_api_python/token_utils.py +++ b/packages/auth0_api_python/src/auth0_api_python/token_utils.py @@ -1,7 +1,10 @@ import time -from typing import Optional, Dict, Any, Union +import uuid +from typing import Any, Optional, Union + from authlib.jose import JsonWebKey, jwt +from .utils import calculate_jwk_thumbprint, normalize_url_for_htu, sha256_base64url # A private RSA JWK for test usage. @@ -28,7 +31,7 @@ async def generate_token( issuer: Union[str, bool, None] = None, iat: bool = True, exp: bool = True, - claims: Optional[Dict[str, Any]] = None, + claims: Optional[dict[str, Any]] = None, expiration_time: int = 3600, ) -> str: """ @@ -81,4 +84,138 @@ async def generate_token( header = {"alg": "RS256", "kid": PRIVATE_JWK["kid"]} token = jwt.encode(header, token_claims, key) - return token + # Ensure we return a string, not bytes + return token.decode('utf-8') if isinstance(token, bytes) else token + + +# A private EC P-256 private key for DPoP proof generation (test only) +PRIVATE_EC_JWK = { + "kty": "EC", + "crv": "P-256", + "x": "MKBCTNIcKUSDii11ySs3526iDZ8AiTo7Tu6KPAqv7D4", + "y": "4Etl6SRW2YiLUrN5vfvVHuhp7x8PxltmWWlbbM4IFyM", + "d": "870MB6gfuTJ4HtUnUvYMyJpr5eUZNP4Bk43bVdj3eAE" +} + + +async def generate_dpop_proof( + access_token: str, + http_method: str, + http_url: str, + jti: Optional[str] = None, + iat: bool = True, + claims: Optional[dict[str, Any]] = None, + header_overrides: Optional[dict[str, Any]] = None, + iat_time: Optional[int] = None, + include_jti: bool = True +) -> str: + """ + Generates a real ES256-signed DPoP proof JWT using the EC private key above. + + Args: + access_token: The access token to create proof for (used for ath claim). + http_method: The HTTP method (e.g., "GET", "POST") for htm claim. + http_url: The HTTP URL for htu claim. + jti: The unique identifier for the proof. If omitted, generates random UUID. + iat: Whether to set the 'iat' (issued at) claim. If False, skip it. + claims: Additional custom claims to merge into the proof. + header_overrides: Override header parameters (e.g., for testing invalid headers). + iat_time: Fixed time for iat claim (for testing). If None, uses current time. + include_jti: Whether to include the 'jti' claim. If False, jti is completely omitted. + + Returns: + An ES256-signed DPoP proof JWT string. + + Example usage: + proof = await generate_dpop_proof( + access_token="eyJ...", + http_method="GET", + http_url="https://api.example.com/resource", + iat=False, # Skip iat for testing + claims={"custom": "claim"} + ) + """ + + + proof_claims = dict(claims or {}) + + if iat: + proof_claims["iat"] = iat_time if iat_time is not None else int(time.time()) + + if include_jti: + if jti is not None: + proof_claims["jti"] = jti + else: + proof_claims["jti"] = str(uuid.uuid4()) + + proof_claims["htm"] = http_method + proof_claims["htu"] = normalize_url_for_htu(http_url) + proof_claims["ath"] = sha256_base64url(access_token) + + + public_jwk = {k: v for k, v in PRIVATE_EC_JWK.items() if k != "d"} + + + header = { + "alg": "ES256", + "typ": "dpop+jwt", + "jwk": public_jwk + } + + + if header_overrides: + header.update(header_overrides) + + key = JsonWebKey.import_key(PRIVATE_EC_JWK) + token = jwt.encode(header, proof_claims, key) + # Ensure we return a string, not bytes + return token.decode('utf-8') if isinstance(token, bytes) else token + + +async def generate_token_with_cnf( + domain: str, + user_id: str, + audience: str, + jkt_thumbprint: Optional[str] = None, + **kwargs +) -> str: + """ + Generates an access token with cnf (confirmation) claim for DPoP binding. + Extends the existing generate_token() function with DPoP support. + + Args: + domain: The Auth0 domain (used if issuer is not False). + user_id: The 'sub' claim in the token. + audience: The 'aud' claim in the token. + jkt_thumbprint: JWK thumbprint to include in cnf claim. If None, calculates from PRIVATE_EC_JWK. + **kwargs: Additional arguments passed to generate_token(). + + Returns: + A RS256-signed JWT string with cnf claim. + + Example usage: + token = await generate_token_with_cnf( + domain="auth0.local", + user_id="user123", + audience="my-api", + jkt_thumbprint="custom_thumbprint" + ) + """ + + + if jkt_thumbprint is None: + jkt_thumbprint = calculate_jwk_thumbprint(PRIVATE_EC_JWK) + + + existing_claims = kwargs.get('claims', {}) + cnf_claims = dict(existing_claims) + cnf_claims["cnf"] = {"jkt": jkt_thumbprint} + kwargs['claims'] = cnf_claims + + + return await generate_token( + domain=domain, + user_id=user_id, + audience=audience, + **kwargs + ) diff --git a/packages/auth0_api_python/src/auth0_api_python/utils.py b/packages/auth0_api_python/src/auth0_api_python/utils.py index 2d66ecb..7357bf5 100644 --- a/packages/auth0_api_python/src/auth0_api_python/utils.py +++ b/packages/auth0_api_python/src/auth0_api_python/utils.py @@ -1,17 +1,22 @@ """ -Utility functions for OIDC discovery and JWKS fetching (asynchronously) +Utility functions for OIDC discovery and JWKS fetching (asynchronously) using httpx or a custom fetch approach. """ -import httpx import base64 +import hashlib import json -from typing import Any, Dict, Optional, Callable, Union +import re +from typing import Any, Callable, Optional, Union + +import httpx +from ada_url import URL + async def fetch_oidc_metadata( - domain: str, + domain: str, custom_fetch: Optional[Callable[..., Any]] = None -) -> Dict[str, Any]: +) -> dict[str, Any]: """ Asynchronously fetch the OIDC config from https://{domain}/.well-known/openid-configuration. Returns a dict with keys like issuer, jwks_uri, authorization_endpoint, etc. @@ -29,14 +34,14 @@ async def fetch_oidc_metadata( async def fetch_jwks( - jwks_uri: str, + jwks_uri: str, custom_fetch: Optional[Callable[..., Any]] = None -) -> Dict[str, Any]: +) -> dict[str, Any]: """ Asynchronously fetch the JSON Web Key Set from jwks_uri. Returns the raw JWKS JSON, e.g. {'keys': [...]} - If custom_fetch is provided, it must be an async callable + If custom_fetch is provided, it must be an async callable that fetches data from the jwks_uri. """ if custom_fetch: @@ -47,22 +52,22 @@ async def fetch_jwks( resp = await client.get(jwks_uri) resp.raise_for_status() return resp.json() - -async def get_unverified_header(token: Union[str, bytes]) -> dict: + +def get_unverified_header(token: Union[str, bytes]) -> dict: """ Parse the first segment (header) of a JWT without verifying signature. Ensures correct Base64 padding before decode to avoid garbage bytes. """ if isinstance(token, bytes): token = token.decode("utf-8") - try: - header_b64, _, _ = token.split(".", 2) - except ValueError: - raise ValueError("Not enough segments in token") - - header_b64 = remove_bytes_prefix(header_b64) + parts = token.split(".") + if len(parts) != 3: + raise ValueError(f"Invalid token format: expected 3 segments, got {len(parts)}") + + header_b64 = parts[0] + header_b64 = remove_bytes_prefix(header_b64) header_b64 = fix_base64_padding(header_b64) header_data = base64.urlsafe_b64decode(header_b64) @@ -72,7 +77,7 @@ async def get_unverified_header(token: Union[str, bytes]) -> dict: def fix_base64_padding(segment: str) -> str: """ - If `segment`'s length is not a multiple of 4, add '=' padding + If `segment`'s length is not a multiple of 4, add '=' padding so that base64.urlsafe_b64decode won't produce nonsense bytes. No extra '=' added if length is already a multiple of 4. """ @@ -85,4 +90,58 @@ def remove_bytes_prefix(s: str) -> str: """If the string looks like b'eyJh...', remove the leading b' and trailing '.""" if s.startswith("b'"): return s[2:] # cut off the leading b' - return s \ No newline at end of file + return s + +def normalize_url_for_htu(raw_url: str) -> str: + """ + Normalize URL for DPoP htu comparison . + """ + + url_obj = URL(raw_url) + + normalized_url = url_obj.origin + url_obj.pathname + + normalized_url = re.sub( + r'%([0-9a-fA-F]{2})', + lambda m: f'%{m.group(1).upper()}', + normalized_url + ) + + return normalized_url + +def sha256_base64url(input_str: Union[str, bytes]) -> str: + """ + Compute SHA-256 digest of the input string and return a + Base64URL-encoded string *without* padding. + """ + if isinstance(input_str, str): + digest = hashlib.sha256(input_str.encode("utf-8")).digest() + else: + digest = hashlib.sha256(input_str).digest() + b64 = base64.urlsafe_b64encode(digest).decode("utf-8") + return b64.rstrip("=") + +def calculate_jwk_thumbprint(jwk: dict[str, str]) -> str: + """ + Compute the RFC 7638 JWK thumbprint for a public JWK. + + - For EC keys, includes only: crv, kty, x, y + - Serializes with no whitespace, keys sorted lexicographically + - Hashes with SHA-256 and returns base64url-encoded string without padding + """ + kty = jwk.get("kty") + + if kty == "EC": + if not all(k in jwk for k in ["crv", "x", "y"]): + raise ValueError("EC key missing required parameters") + members = ("crv", "kty", "x", "y") + else: + raise ValueError(f"{kty}(Key Type) Parameter missing or unsupported ") + + ordered = {k: jwk[k] for k in members if k in jwk} + + thumbprint_json = json.dumps(ordered, separators=(",", ":"), sort_keys=True) + + digest = hashlib.sha256(thumbprint_json.encode("utf-8")).digest() + + return base64.urlsafe_b64encode(digest).decode("utf-8").rstrip("=") diff --git a/packages/auth0_api_python/tests/test_api_client.py b/packages/auth0_api_python/tests/test_api_client.py index 8cc3bce..4de1de6 100644 --- a/packages/auth0_api_python/tests/test_api_client.py +++ b/packages/auth0_api_python/tests/test_api_client.py @@ -1,12 +1,29 @@ +import base64 +import json +import time + import pytest +from auth0_api_python.api_client import ApiClient +from auth0_api_python.config import ApiClientOptions +from auth0_api_python.errors import ( + InvalidAuthSchemeError, + InvalidDpopProofError, + MissingAuthorizationError, + MissingRequiredArgumentError, + VerifyAccessTokenError, +) +from auth0_api_python.token_utils import ( + PRIVATE_EC_JWK, + PRIVATE_JWK, + generate_dpop_proof, + generate_token, + generate_token_with_cnf, + sha256_base64url, +) from pytest_httpx import HTTPXMock -from unittest.mock import AsyncMock, patch - -from src.auth0_api_python.api_client import ApiClient -from src.auth0_api_python.config import ApiClientOptions -from src.auth0_api_python.errors import MissingRequiredArgumentError, VerifyAccessTokenError -from src.auth0_api_python.token_utils import generate_token +# Create public RSA JWK by selecting only public key components +PUBLIC_RSA_JWK = {k: PRIVATE_JWK[k] for k in ["kty", "n", "e", "alg", "use", "kid"] if k in PRIVATE_JWK} @pytest.mark.asyncio async def test_init_missing_args(): @@ -15,7 +32,7 @@ async def test_init_missing_args(): """ with pytest.raises(MissingRequiredArgumentError): _ = ApiClient(ApiClientOptions(domain="", audience="some_audience")) - + with pytest.raises(MissingRequiredArgumentError): _ = ApiClient(ApiClientOptions(domain="example.us.auth0.com", audience="")) @@ -23,7 +40,7 @@ async def test_init_missing_args(): @pytest.mark.asyncio async def test_verify_access_token_successfully(httpx_mock: HTTPXMock): """ - Test that a valid RS256 token with correct issuer, audience, iat, and exp + Test that a valid RS256 token with correct issuer, audience, iat, and exp is verified successfully by ApiClient. """ httpx_mock.add_response( @@ -388,3 +405,1137 @@ async def test_verify_access_token_fail_no_audience_config(): error_str = str(err.value).lower() assert "audience" in error_str and ("required" in error_str or "not provided" in error_str) + +@pytest.mark.asyncio +async def test_verify_access_token_fail_malformed_token(): + """Test that a malformed token fails verification.""" + + api_client = ApiClient(ApiClientOptions(domain="auth0.local", audience="my-audience")) + + with pytest.raises(VerifyAccessTokenError) as e: + await api_client.verify_access_token("header.payload") + assert "failed to parse token" in str(e.value).lower() + + with pytest.raises(VerifyAccessTokenError) as e: + await api_client.verify_access_token("header.pay!load.signature") + assert "failed to parse token" in str(e.value).lower() + + + +# DPOP PROOF VERIFICATION TESTS + +# --- Core Success Tests --- + +@pytest.mark.asyncio +async def test_verify_dpop_proof_successfully(): + """ + Test that a valid DPoP proof is verified successfully by ApiClient. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # Verify the DPoP proof + claims = await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + assert claims["jti"] # Verify it has the required jti claim + assert claims["htm"] == "GET" + assert claims["htu"] == "https://api.example.com/resource" + assert isinstance(claims["iat"], int) + expected_ath = sha256_base64url(access_token) + assert claims["ath"] == expected_ath + + +# --- Header Validation Tests --- + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_no_access_token(): + """ + Test that verify_dpop_proof fails when access_token is missing. + """ + dpop_proof = await generate_dpop_proof( + access_token="test_token", + http_method="GET", + http_url="https://api.example.com/resource" + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(MissingRequiredArgumentError) as err: + await api_client.verify_dpop_proof( + access_token="", # Empty access token + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "access_token" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_no_dpop_proof(): + """ + Test that verify_dpop_proof fails when dpop_proof is missing. + """ + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(MissingRequiredArgumentError) as err: + await api_client.verify_dpop_proof( + access_token="test_token", + proof="", # Empty proof + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "dpop_proof" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_no_http_method_url(): + """ + Test that verify_dpop_proof fails when http_method or http_url is missing. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(MissingRequiredArgumentError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="", # Empty method + http_url="https://api.example.com/resource" + ) + + assert "http_method" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_no_http_url(): + """ + Test that verify_dpop_proof fails when http_url is missing. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(MissingRequiredArgumentError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="" # Empty url + ) + + assert "http_url" in str(err.value).lower() + + +# --- Claim Validation Tests --- + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_no_typ(): + """ + Test that a DPoP proof missing 'typ' header fails verification. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + header_overrides={"typ": None} # Remove typ header + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "unexpected jwt 'typ'" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_invalid_typ(): + """ + Test that a DPoP proof with invalid 'typ' header fails verification. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + header_overrides={"typ": "jwt"} # Wrong typ value + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "unexpected jwt 'typ'" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_invalid_alg(): + """ + Test that a DPoP proof with unsupported algorithm fails verification. + """ + access_token = "test_token" + + valid_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + parts = valid_proof.split('.') + header = json.loads(base64.urlsafe_b64decode(parts[0] + '==').decode('utf-8')) + header['alg'] = 'RS256' # Invalid algorithm for DPoP (should be ES256) + + modified_header = base64.urlsafe_b64encode( + json.dumps(header, separators=(',', ':')).encode('utf-8') + ).decode('utf-8').rstrip('=') + + invalid_proof = f"{modified_header}.{parts[1]}.{parts[2]}" + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=invalid_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "unsupported alg" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_no_jwk(): + """ + Test that a DPoP proof missing 'jwk' header fails verification. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + header_overrides={"jwk": None} # Remove jwk header + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "missing or invalid jwk" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_invalid_jwk_format(): + """ + Test that a DPoP proof with invalid 'jwk' format fails verification. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + header_overrides={"jwk": "invalid_jwk"} # Invalid jwk format + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "missing or invalid jwk" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_private_key_in_jwk(): + """ + Test that a DPoP proof with private key material in jwk fails verification. + """ + + access_token = "test_token" + # Include private key material (the 'd' parameter) + invalid_jwk = dict(PRIVATE_EC_JWK) # This includes the 'd' parameter + + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + header_overrides={"jwk": invalid_jwk} # JWK with private key material + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "private key" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_with_missing_jwk_parameters(): + """Test verify_dpop_proof with missing JWK parameters.""" + access_token = "test_token" + + incomplete_jwk = {"kty": "RSA"} + + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + header_overrides={"jwk": incomplete_jwk} + ) + + api_client = ApiClient(ApiClientOptions(domain="auth0.local", audience="my-audience")) + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + assert "only ec keys are supported" in str(err.value).lower() + +# --- IAT (Issued At Time) Validation Tests --- + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_no_iat(): + """ + Test that a DPoP proof missing 'iat' claim fails verification. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + iat=False # Skip iat claim + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "missing required claim" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_invalid_iat_in_future(): + """ + Test IAT validation with a timestamp in the future. + """ + access_token = "test_token" + # Use a future timestamp (more than leeway allows) + future_time = int(time.time()) + 3600 # 1 hour in the future + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + iat_time=future_time # Invalid future timestamp + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "iat is from the future" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_iat_exact_boundary_conditions(): + """ + Test IAT timing validation at exact boundary conditions. + """ + access_token = "test_token" + + # Test with timestamp exactly at the leeway boundary (should pass) + current_time = int(time.time()) + boundary_time = current_time + 30 # Exactly at default leeway limit + + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + iat_time=boundary_time + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # Should succeed as it's within leeway + result = await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert result is not None + +@pytest.mark.asyncio +async def test_verify_dpop_proof_iat_in_past(): + """ + Test IAT validation with timestamp in the past. + """ + access_token = "test_token" + # Use a timestamp too far in the past + past_time = int(time.time()) - 3600 # 1 hour ago + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + iat_time=past_time + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "iat is too old" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_iat_within_leeway(): + """ + Test that IAT timestamps within acceptable leeway pass validation. + """ + access_token = "test_token" + current_time = int(time.time()) + + # Test within acceptable skew (should pass) + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + iat_time=current_time - 30 # 30 seconds ago, should be acceptable + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # This should succeed due to clock skew tolerance + result = await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + assert result is not None + +# --- JTI (JWT ID) Validation Tests --- + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_empty_jti(): + """ + Test that a DPoP proof with empty 'jti' claim fails verification. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + jti="" # Empty jti claim + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "jti claim must not be empty" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_custom_jti_value(): + """ + Test for a custom JTI value. + """ + access_token = "test_token" + + custom_jti = "unique-jti-12345" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + jti=custom_jti # Use jti parameter instead of claims + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # First verification should succeed + result = await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert result is not None + assert result["jti"] == custom_jti + +@pytest.mark.asyncio +async def test_verify_dpop_proof_with_missing_jti(): + """Test verify_dpop_proof with missing jti claim.""" + access_token = "test_token" + + # Generate DPoP proof WITHOUT jti claim from the start + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource", + include_jti=False # Completely omit jti claim + ) + + api_client = ApiClient(ApiClientOptions(domain="auth0.local", audience="my-audience")) + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + assert "missing required claim: jti" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_htm_mismatch(): + """ + Test that a DPoP proof with mismatched 'htm' claim fails verification. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="POST", # Generate proof for POST + http_url="https://api.example.com/resource", + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", # But verify with GET + http_url="https://api.example.com/resource" + ) + + assert "htm mismatch" in str(err.value).lower() + +# --- HTU (HTTP URI) Validation Tests --- + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_htu_mismatch(): + """ + Test that a DPoP proof with mismatched 'htu' claim fails verification. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/wrong-resource", # Generate proof for wrong URL + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" # But verify with correct URL + ) + + assert "htu mismatch" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_htu_url_normalization_case_sensitivity(): + """ + Test HTU URL normalization handles case sensitivity correctly. + """ + access_token = "test_token" + + # Test with different case in domain (should be normalized and pass) + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://API.EXAMPLE.COM/resource" # Uppercase domain + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # This should succeed due to URL normalization + result = await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" # Lowercase domain + ) + assert result is not None + + +@pytest.mark.asyncio +async def test_verify_dpop_proof_htu_trailing_slash_mismatch(): + """ + Test that HTU URLs with trailing slash differences cause verification failure. + """ + access_token = "test_token" + # Generate proof with trailing slash + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource/" + ) + api_client = ApiClient(ApiClientOptions(domain="auth0.local", audience="my-audience")) + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + assert "htu mismatch" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_htu_query_parameters(): + """ + Test HTU URL validation with query parameters - normalized behavior. + Query parameters are stripped during normalization, so different params should succeed. + """ + access_token = "test_token" + + # Test with query parameters (should be normalized) + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource?param1=value1" # With query params + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # This should succeed due to URL normalization + result = await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource?param2=value2" # Different query params + ) + assert result is not None + + +@pytest.mark.asyncio +async def test_verify_dpop_proof_htu_port_numbers(): + """ + Test HTU URL validation with explicit port numbers - normalized behavior. + Default ports (443 for HTTPS, 80 for HTTP) are stripped during normalization. + """ + access_token = "test_token" + + # Test with explicit default port (should be normalized) + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com:443/resource" # Explicit HTTPS port + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # This should succeed due to URL normalization + result = await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" # Implicit HTTPS port + ) + assert result is not None + +@pytest.mark.asyncio +async def test_verify_dpop_proof_htu_fragment_handling(): + """ + Test HTU URL validation ignores fragments. + """ + access_token = "test_token" + + # Test with fragment (should be ignored) + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource#fragment1" # With fragment + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # This should succeed as fragments are ignored + result = await api_client.verify_dpop_proof( + access_token=access_token, + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource#fragment2" # Different fragment + ) + assert result is not None + +@pytest.mark.asyncio +async def test_verify_dpop_proof_fail_ath_mismatch(): + """ + Test that a DPoP proof with mismatched 'ath' claim fails verification. + """ + access_token = "test_token" + wrong_token = "wrong_token" + + dpop_proof = await generate_dpop_proof( + access_token=wrong_token, # Generate proof for wrong token + http_method="GET", + http_url="https://api.example.com/resource", + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_dpop_proof( + access_token=access_token, # But verify with correct token + proof=dpop_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "ath" in str(err.value).lower() or "hash" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_dpop_proof_with_invalid_signature(): + """Test verify_dpop_proof with invalid signature.""" + access_token = "test_token" + + valid_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + parts = valid_proof.split('.') + if len(parts) == 3: + header, payload, signature = parts + tampered_proof = f"{header}.{payload}.{signature[:-5]}12345" + else: + tampered_proof = valid_proof + + api_client = ApiClient(ApiClientOptions(domain="auth0.local", audience="my-audience")) + with pytest.raises(InvalidDpopProofError) as e: + await api_client.verify_dpop_proof( + access_token=access_token, + proof=tampered_proof, + http_method="GET", + http_url="https://api.example.com/resource" + ) + assert "signature verification failed" in str(e.value).lower() + +# VERIFY_REQUEST TESTS + +# --- Success Tests --- + +@pytest.mark.asyncio +async def test_verify_request_bearer_scheme_success(httpx_mock: HTTPXMock): + """ + Test successful Bearer token verification through verify_request. + """ + # Mock OIDC discovery + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/openid-configuration", + json={ + "jwks_uri": "https://auth0.local/.well-known/jwks.json", + "issuer": "https://auth0.local/", + }, + ) + + # Mock JWKS endpoint + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/jwks.json", + json={"keys": [PUBLIC_RSA_JWK]}, + ) + + # Generate a valid Bearer token + token = await generate_token( + domain="auth0.local", + user_id="test_user", + audience="my-audience", + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # Test Bearer scheme + result = await api_client.verify_request( + headers={"authorization": f"Bearer {token}"}, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "sub" in result + assert result["aud"] == "my-audience" + assert result["iss"] == "https://auth0.local/" + +@pytest.mark.asyncio +async def test_verify_request_dpop_scheme_success(httpx_mock: HTTPXMock): + """ + Test successful DPoP token verification through verify_request. + """ + # Mock OIDC discovery + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/openid-configuration", + json={ + "jwks_uri": "https://auth0.local/.well-known/jwks.json", + "issuer": "https://auth0.local/", + }, + ) + + # Mock JWKS endpoint + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/jwks.json", + json={"keys": [PUBLIC_RSA_JWK]}, + ) + + # Generate DPoP bound token and proof + access_token = await generate_token_with_cnf( + domain="auth0.local", + user_id="test_user", + audience="my-audience", + ) + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + # Test DPoP scheme + result = await api_client.verify_request( + headers={"authorization": f"DPoP {access_token}", "dpop": dpop_proof}, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "sub" in result + assert result["aud"] == "my-audience" + assert result["iss"] == "https://auth0.local/" + + +# --- Configuration & Error Handling Tests --- + +@pytest.mark.asyncio +async def test_verify_request_fail_dpop_required_mode(): + """ + Test that Bearer tokens are rejected when DPoP is required. + """ + # Generate a valid Bearer token + token = await generate_token( + domain="auth0.local", + user_id="test_user", + audience="my-audience", + ) + + api_client = ApiClient( + ApiClientOptions( + domain="auth0.local", + audience="my-audience", + dpop_required=True # Require DPoP + ) + ) + + with pytest.raises(InvalidAuthSchemeError) as err: + await api_client.verify_request( + headers={"authorization": f"Bearer {token}"}, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "expected dpop, but got bearer" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_request_fail_dpop_enabled_bearer_with_cnf_conflict(httpx_mock: HTTPXMock): + """ + Test that Bearer tokens with cnf claim are rejected when DPoP is enabled. + """ + # Mock OIDC discovery + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/openid-configuration", + json={ + "jwks_uri": "https://auth0.local/.well-known/jwks.json", + "issuer": "https://auth0.local/", + }, + ) + + # Mock JWKS endpoint + httpx_mock.add_response( + method="GET", + url="https://auth0.local/.well-known/jwks.json", + json={"keys": [PUBLIC_RSA_JWK]}, + ) + + # Generate a token with cnf claim (DPoP-bound token) + token = await generate_token_with_cnf( + domain="auth0.local", + user_id="test_user", + audience="my-audience", + ) + + api_client = ApiClient( + ApiClientOptions( + domain="auth0.local", + audience="my-audience", + dpop_enabled=True # DPoP enabled + ) + ) + + with pytest.raises(InvalidAuthSchemeError) as err: + await api_client.verify_request( + headers={"authorization": f"Bearer {token}"}, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "request's authorization http header scheme is not dpop" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_request_fail_dpop_disabled(): + """ + Test that DPoP tokens are rejected when DPoP is disabled. + """ + access_token = "test_token" + dpop_proof = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + api_client = ApiClient( + ApiClientOptions( + domain="auth0.local", + audience="my-audience", + dpop_enabled=False # DPoP disabled + ) + ) + + with pytest.raises(MissingAuthorizationError) as err: + await api_client.verify_request( + headers={"authorization": f"DPoP {access_token}", "dpop": dpop_proof}, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert err.value.get_status_code() == 401 + +@pytest.mark.asyncio +async def test_verify_request_fail_missing_authorization_header(): + """ + Test that requests without Authorization header are rejected. + """ + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(MissingAuthorizationError) as err: + await api_client.verify_request( + headers={}, + http_method="GET", + http_url="https://api.example.com/resource" + ) + assert err.value.get_status_code() == 401 + +@pytest.mark.asyncio +async def test_verify_request_fail_unsupported_scheme(): + """ + Test that unsupported authentication schemes are rejected. + """ + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(MissingAuthorizationError) as err: + await api_client.verify_request( + headers={"authorization": "Basic dXNlcjpwYXNz"}, + http_method="GET", + http_url="https://api.example.com/resource" + ) + assert err.value.get_status_code() == 401 + +@pytest.mark.asyncio +async def test_verify_request_fail_empty_bearer_token(): + """Test verify_request with empty token value.""" + api_client = ApiClient(ApiClientOptions(domain="auth0.local", audience="my-audience")) + with pytest.raises(MissingAuthorizationError) as err: + await api_client.verify_request({"Authorization": "Bearer "}) + assert err.value.get_status_code() == 401 + +@pytest.mark.asyncio +async def test_verify_request_with_multiple_spaces_in_authorization(): + """Test verify_request with authorization header containing multiple spaces.""" + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + with pytest.raises(InvalidAuthSchemeError) as err: + await api_client.verify_request({"authorization": "Bearer token with extra spaces"}) + assert "authorization" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_request_fail_missing_dpop_header(): + """ + Test that DPoP scheme requests without DPoP header are rejected. + """ + access_token = "test_token" + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_request( + headers={"authorization": f"DPoP {access_token}"}, # Missing DPoP header + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "request has no dpop http header" in str(err.value).lower() + +@pytest.mark.asyncio +async def test_verify_request_fail_multiple_dpop_proofs(): + """ + Test that requests with multiple DPoP proofs are rejected. + """ + access_token = "test_token" + dpop_proof1 = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource" + ) + dpop_proof2 = await generate_dpop_proof( + access_token=access_token, + http_method="GET", + http_url="https://api.example.com/resource" + ) + + api_client = ApiClient( + ApiClientOptions(domain="auth0.local", audience="my-audience") + ) + + with pytest.raises(InvalidDpopProofError) as err: + await api_client.verify_request( + headers={"authorization": f"DPoP {access_token}", "dpop": f"{dpop_proof1}, {dpop_proof2}"}, # Multiple proofs + http_method="GET", + http_url="https://api.example.com/resource" + ) + + assert "multiple" in str(err.value).lower() + +