|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | | -"""Firebase Phone Number Verification (FPNV) service. |
| 15 | +"""Firebase Phone Number Verification (FPNV) module.""" |
| 16 | +from typing import Any, Dict |
16 | 17 |
|
17 | | -This module contains functions for verifying JWTs used for |
18 | | -authenticating against Firebase services. |
19 | | -""" |
| 18 | +import jwt |
| 19 | +from jwt import PyJWKClient, InvalidTokenError, DecodeError, InvalidSignatureError, \ |
| 20 | + InvalidAudienceError, InvalidIssuerError, ExpiredSignatureError |
| 21 | + |
| 22 | +from firebase_admin import _utils |
| 23 | +from firebase_admin.exceptions import InvalidArgumentError |
| 24 | + |
| 25 | +_FPNV_ATTRIBUTE = '_fpnv' |
| 26 | +_FPNV_JWKS_URL = 'https://fpnv.googleapis.com/v1beta/jwks' |
| 27 | +_FPNV_ISSUER = 'https://fpnv.googleapis.com/projects/' |
| 28 | +_ALGORITHM_ES256 = 'ES256' |
| 29 | + |
| 30 | + |
| 31 | +def client(app=None): |
| 32 | + """Returns an instance of the FPNV service for the specified app. |
| 33 | +
|
| 34 | + Args: |
| 35 | + app: An App instance (optional). |
| 36 | +
|
| 37 | + Returns: |
| 38 | + FpnvClient: A FpnvClient instance. |
| 39 | +
|
| 40 | + Raises: |
| 41 | + ValueError: If the app is not a valid App instance. |
| 42 | + """ |
| 43 | + return _utils.get_app_service(app, _FPNV_ATTRIBUTE, FpnvClient) |
| 44 | + |
| 45 | + |
| 46 | +class FpnvToken(dict): |
| 47 | + """Represents a verified FPNV token. |
| 48 | +
|
| 49 | + This class behaves like a dictionary, allowing access to the decoded claims. |
| 50 | + It also provides convenience properties for common claims. |
| 51 | + """ |
| 52 | + |
| 53 | + def __init__(self, claims): |
| 54 | + super(FpnvToken, self).__init__(claims) |
| 55 | + |
| 56 | + @property |
| 57 | + def phone_number(self): |
| 58 | + """Returns the phone number associated with the token.""" |
| 59 | + return self.get('sub') |
| 60 | + |
| 61 | + @property |
| 62 | + def issuer(self): |
| 63 | + """Returns the issuer of the token.""" |
| 64 | + return self.get('iss') |
| 65 | + |
| 66 | + @property |
| 67 | + def audience(self): |
| 68 | + """Returns the audience of the token.""" |
| 69 | + return self.get('aud') |
| 70 | + |
| 71 | + @property |
| 72 | + def sub(self): |
| 73 | + """Returns the sub (subject) of the token, which is the phone number.""" |
| 74 | + return self.get('sub') |
| 75 | + |
| 76 | + # TODO: ADD ALL |
| 77 | + |
| 78 | + |
| 79 | +class FpnvClient: |
| 80 | + """The client for the Firebase Phone Number Verification service.""" |
| 81 | + _project_id = None |
| 82 | + |
| 83 | + def __init__(self, app): |
| 84 | + """Initializes the FpnvClient. |
| 85 | +
|
| 86 | + Args: |
| 87 | + app: A firebase_admin.App instance. |
| 88 | +
|
| 89 | + Raises: |
| 90 | + ValueError: If the app is invalid or lacks a project ID. |
| 91 | + """ |
| 92 | + self._project_id = app.project_id |
| 93 | + |
| 94 | + if not self._project_id: |
| 95 | + cred = app.credential.get_credential() |
| 96 | + if hasattr(cred, 'project_id'): |
| 97 | + self._project_id = cred.project_id |
| 98 | + |
| 99 | + if not self._project_id: |
| 100 | + raise ValueError( |
| 101 | + 'Project ID is required for FPNV. Please ensure the app is ' |
| 102 | + 'initialized with a credential that contains a project ID.' |
| 103 | + ) |
| 104 | + |
| 105 | + self._verifier = _FpnvTokenVerifier(self._project_id) |
| 106 | + |
| 107 | + def verify_token(self, token) -> FpnvToken: |
| 108 | + """Verifies the given FPNV token. |
| 109 | +
|
| 110 | + Verifies the signature, expiration, and claims of the token. |
| 111 | +
|
| 112 | + Args: |
| 113 | + token: A string containing the FPNV JWT. |
| 114 | +
|
| 115 | + Returns: |
| 116 | + FpnvToken: The verified token claims. |
| 117 | +
|
| 118 | + Raises: |
| 119 | + ValueError: If the token is invalid or malformed. |
| 120 | + firebase_admin.exceptions.InvalidArgumentError: If verification fails. |
| 121 | + """ |
| 122 | + try: |
| 123 | + claims = self._verifier.verify(token) |
| 124 | + return FpnvToken(claims) |
| 125 | + except Exception as error: |
| 126 | + raise InvalidArgumentError( |
| 127 | + 'Failed to verify token: {0}'.format(error) |
| 128 | + ) |
| 129 | + |
| 130 | + |
| 131 | +class _FpnvTokenVerifier: |
| 132 | + """Internal class for verifying FPNV JWTs signed with ES256.""" |
| 133 | + _jwks_client = None |
| 134 | + _project_id = None |
| 135 | + |
| 136 | + def __init__(self, project_id): |
| 137 | + self._project_id = project_id |
| 138 | + self._jwks_client = PyJWKClient(_FPNV_JWKS_URL, lifespan=21600) |
| 139 | + |
| 140 | + def verify(self, token) -> Dict[str, Any]: |
| 141 | + _Validators.check_string("FPNV check token", token) |
| 142 | + try: |
| 143 | + self._validate_headers(jwt.get_unverified_header(token)) |
| 144 | + signing_key = self._jwks_client.get_signing_key_from_jwt(token) |
| 145 | + claims = self._validate_payload(token, signing_key.key) |
| 146 | + except (InvalidTokenError, DecodeError) as exception: |
| 147 | + raise ValueError( |
| 148 | + f'Verifying FPNV token failed. Error: {exception}' |
| 149 | + ) from exception |
| 150 | + |
| 151 | + return claims |
| 152 | + |
| 153 | + def _validate_headers(self, headers: Any) -> None: |
| 154 | + if headers.get('kid') is None: |
| 155 | + raise ValueError("FPNV has no 'kid' claim.") |
| 156 | + |
| 157 | + if headers.get('typ') != 'JWT': |
| 158 | + raise ValueError("The provided FPNV token has an incorrect type header") |
| 159 | + |
| 160 | + algorithm = headers.get('alg') |
| 161 | + if algorithm != _ALGORITHM_ES256: |
| 162 | + raise ValueError( |
| 163 | + 'The provided FPNV token has an incorrect alg header. ' |
| 164 | + f'Expected {_ALGORITHM_ES256} but got {algorithm}.' |
| 165 | + ) |
| 166 | + |
| 167 | + def _validate_payload(self, token: str, signing_key: str) -> Dict[str, Any]: |
| 168 | + """Decodes and verifies the token.""" |
| 169 | + _issuer = None |
| 170 | + payload = {} |
| 171 | + try: |
| 172 | + unsafe_payload = jwt.decode(token, options={"verify_signature": False}) |
| 173 | + _issuer = unsafe_payload.get('iss') |
| 174 | + |
| 175 | + if _issuer is None: |
| 176 | + raise ValueError('The provided FPNV token has no issuer.') |
| 177 | + payload = jwt.decode( |
| 178 | + token, |
| 179 | + signing_key, |
| 180 | + algorithms=[_ALGORITHM_ES256], |
| 181 | + audience=_issuer |
| 182 | + ) |
| 183 | + except InvalidSignatureError as exception: |
| 184 | + raise ValueError( |
| 185 | + 'The provided FPNV token has an invalid signature.' |
| 186 | + ) from exception |
| 187 | + except InvalidAudienceError as exception: |
| 188 | + raise ValueError( |
| 189 | + 'The provided FPNV token has an incorrect "aud" (audience) claim. ' |
| 190 | + f'Expected payload to include {_issuer}.' |
| 191 | + ) from exception |
| 192 | + except InvalidIssuerError as exception: |
| 193 | + raise ValueError( |
| 194 | + 'The provided FPNV token has an incorrect "iss" (issuer) claim. ' |
| 195 | + f'Expected claim to include {_issuer}' |
| 196 | + ) from exception |
| 197 | + except ExpiredSignatureError as exception: |
| 198 | + raise ValueError( |
| 199 | + 'The provided FPNV token has expired.' |
| 200 | + ) from exception |
| 201 | + except InvalidTokenError as exception: |
| 202 | + raise ValueError( |
| 203 | + f'Decoding FPNV token failed. Error: {exception}' |
| 204 | + ) from exception |
| 205 | + |
| 206 | + if not payload.get('iss').startswith(_FPNV_ISSUER): |
| 207 | + raise ValueError('Token does not contain the correct "iss" (issuer).') |
| 208 | + _Validators.check_string( |
| 209 | + 'The provided FPNV token "sub" (subject) claim', |
| 210 | + payload.get('sub')) |
| 211 | + |
| 212 | + return payload |
| 213 | + |
| 214 | + |
| 215 | +class _Validators: |
| 216 | + """A collection of data validation utilities. |
| 217 | +
|
| 218 | + Methods provided in this class raise ``ValueErrors`` if any validations fail. |
| 219 | + """ |
| 220 | + |
| 221 | + @classmethod |
| 222 | + def check_string(cls, label: str, value: Any): |
| 223 | + """Checks if the given value is a string.""" |
| 224 | + if value is None: |
| 225 | + raise ValueError(f'{label} "{value}" must be a non-empty string.') |
| 226 | + if not isinstance(value, str): |
| 227 | + raise ValueError(f'{label} "{value}" must be a string.') |
0 commit comments