From 5b7ac0558ef89cdd386daddaf55a3d5ee3122a72 Mon Sep 17 00:00:00 2001 From: dwyfrequency Date: Thu, 29 Sep 2022 13:40:51 -0400 Subject: [PATCH] feat: Add function to verify an App Check token (#642) * Sketch out initial private methods and service * Remove unnecessary notes * Fix some lint issues * Fix style guide issues * Update code structure * Add pyjwt version to requirments & update code based on comments * Add app_id key for verified claims dict * Add initial test * Add tests for token headers * Add decode token test and notes * Updating requirements for mocks and note in test * Add verify token test and decode test * Update pytest-mock requirements * Add tests for error messages * Update requirements for lifespan cache * update error message and test * Explicitly pass audience to jwt.decode and update key retrieval * Mock signing key * Update aud check logic and tests * Remove print statement * Update method doc string * Add test for decode_token error * Catch additional errors and add custom error messages for them * Mock out all the common errors * Updating error messages and tests per comments * Make jwks_client a class property * Add validation for the subject in the JWT payload * Update docs and error message strings --- .gitignore | 1 + firebase_admin/app_check.py | 150 ++++++++++++++++++++ requirements.txt | 2 + tests/test_app_check.py | 275 ++++++++++++++++++++++++++++++++++++ 4 files changed, 428 insertions(+) create mode 100644 firebase_admin/app_check.py create mode 100644 tests/test_app_check.py diff --git a/.gitignore b/.gitignore index 79d2d5ff..e5c1902d 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ apikey.txt htmlcov/ .pytest_cache/ .vscode/ +.venv/ diff --git a/firebase_admin/app_check.py b/firebase_admin/app_check.py new file mode 100644 index 00000000..91b0c4c3 --- /dev/null +++ b/firebase_admin/app_check.py @@ -0,0 +1,150 @@ +# Copyright 2022 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Firebase App Check module.""" + +from typing import Any, Dict +import jwt +from jwt import PyJWKClient, ExpiredSignatureError, InvalidTokenError +from jwt import InvalidAudienceError, InvalidIssuerError, InvalidSignatureError +from firebase_admin import _utils + +_APP_CHECK_ATTRIBUTE = '_app_check' + +def _get_app_check_service(app) -> Any: + return _utils.get_app_service(app, _APP_CHECK_ATTRIBUTE, _AppCheckService) + +def verify_token(token: str, app=None) -> Dict[str, Any]: + """Verifies a Firebase App Check token. + + Args: + token: A token from App Check. + app: An App instance (optional). + + Returns: + Dict[str, Any]: The token's decoded claims. + + Raises: + ValueError: If the app's ``project_id`` is invalid or unspecified, + or if the token's headers or payload are invalid. + """ + return _get_app_check_service(app).verify_token(token) + +class _AppCheckService: + """Service class that implements Firebase App Check functionality.""" + + _APP_CHECK_ISSUER = 'https://firebaseappcheck.googleapis.com/' + _JWKS_URL = 'https://firebaseappcheck.googleapis.com/v1/jwks' + _project_id = None + _scoped_project_id = None + _jwks_client = None + + def __init__(self, app): + # Validate and store the project_id to validate the JWT claims + self._project_id = app.project_id + if not self._project_id: + raise ValueError( + 'A project ID must be specified to access the App Check ' + 'service. Either set the projectId option, use service ' + 'account credentials, or set the ' + 'GOOGLE_CLOUD_PROJECT environment variable.') + self._scoped_project_id = 'projects/' + app.project_id + # Default lifespan is 300 seconds (5 minutes) so we change it to 21600 seconds (6 hours). + self._jwks_client = PyJWKClient(self._JWKS_URL, lifespan=21600) + + + def verify_token(self, token: str) -> Dict[str, Any]: + """Verifies a Firebase App Check token.""" + _Validators.check_string("app check token", token) + + # Obtain the Firebase App Check Public Keys + # Note: It is not recommended to hard code these keys as they rotate, + # but you should cache them for up to 6 hours. + signing_key = self._jwks_client.get_signing_key_from_jwt(token) + self._has_valid_token_headers(jwt.get_unverified_header(token)) + verified_claims = self._decode_and_verify(token, signing_key.key) + + verified_claims['app_id'] = verified_claims.get('sub') + return verified_claims + + def _has_valid_token_headers(self, headers: Any) -> None: + """Checks whether the token has valid headers for App Check.""" + # Ensure the token's header has type JWT + if headers.get('typ') != 'JWT': + raise ValueError("The provided App Check token has an incorrect type header") + # Ensure the token's header uses the algorithm RS256 + algorithm = headers.get('alg') + if algorithm != 'RS256': + raise ValueError( + 'The provided App Check token has an incorrect alg header. ' + f'Expected RS256 but got {algorithm}.' + ) + + def _decode_and_verify(self, token: str, signing_key: str): + """Decodes and verifies the token from App Check.""" + payload = {} + try: + payload = jwt.decode( + token, + signing_key, + algorithms=["RS256"], + audience=self._scoped_project_id + ) + except InvalidSignatureError: + raise ValueError( + 'The provided App Check token has an invalid signature.' + ) + except InvalidAudienceError: + raise ValueError( + 'The provided App Check token has an incorrect "aud" (audience) claim. ' + f'Expected payload to include {self._scoped_project_id}.' + ) + except InvalidIssuerError: + raise ValueError( + 'The provided App Check token has an incorrect "iss" (issuer) claim. ' + f'Expected claim to include {self._APP_CHECK_ISSUER}' + ) + except ExpiredSignatureError: + raise ValueError( + 'The provided App Check token has expired.' + ) + except InvalidTokenError as exception: + raise ValueError( + f'Decoding App Check token failed. Error: {exception}' + ) + + audience = payload.get('aud') + if not isinstance(audience, list) or self._scoped_project_id not in audience: + raise ValueError('Firebase App Check token has incorrect "aud" (audience) claim.') + if not payload.get('iss').startswith(self._APP_CHECK_ISSUER): + raise ValueError('Token does not contain the correct "iss" (issuer).') + _Validators.check_string( + 'The provided App Check token "sub" (subject) claim', + payload.get('sub')) + + return payload + +class _Validators: + """A collection of data validation utilities. + + Methods provided in this class raise ``ValueErrors`` if any validations fail. + """ + + @classmethod + def check_string(cls, label: str, value: Any): + """Checks if the given value is a string.""" + if value is None: + raise ValueError('{0} "{1}" must be a non-empty string.'.format(label, value)) + if not isinstance(value, str): + raise ValueError('{0} "{1}" must be a string.'.format(label, value)) diff --git a/requirements.txt b/requirements.txt index 87142fe9..c6621267 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,9 +4,11 @@ pytest >= 6.2.0 pytest-cov >= 2.4.0 pytest-localserver >= 0.4.1 pytest-asyncio >= 0.16.0 +pytest-mock >= 3.6.1 cachecontrol >= 0.12.6 google-api-core[grpc] >= 1.22.1, < 3.0.0dev; platform.python_implementation != 'PyPy' google-api-python-client >= 1.7.8 google-cloud-firestore >= 2.1.0; platform.python_implementation != 'PyPy' google-cloud-storage >= 1.37.1 +pyjwt[crypto] >= 2.5.0 \ No newline at end of file diff --git a/tests/test_app_check.py b/tests/test_app_check.py new file mode 100644 index 00000000..168d0a97 --- /dev/null +++ b/tests/test_app_check.py @@ -0,0 +1,275 @@ +# Copyright 2022 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test cases for the firebase_admin.app_check module.""" +import base64 +import pytest + +from jwt import PyJWK, InvalidAudienceError, InvalidIssuerError +from jwt import ExpiredSignatureError, InvalidSignatureError +import firebase_admin +from firebase_admin import app_check +from tests import testutils + +NON_STRING_ARGS = [list(), tuple(), dict(), True, False, 1, 0] + +APP_ID = "1234567890" +PROJECT_ID = "1334" +SCOPED_PROJECT_ID = f"projects/{PROJECT_ID}" +ISSUER = "https://firebaseappcheck.googleapis.com/" +JWT_PAYLOAD_SAMPLE = { + "headers": { + "alg": "RS256", + "typ": "JWT" + }, + "sub": APP_ID, + "name": "John Doe", + "iss": ISSUER, + "aud": [SCOPED_PROJECT_ID] +} + +secret_key = "secret" +signing_key = { + "kty": "oct", + # Using HS256 for simplicity, production key will use RS256 + "alg": "HS256", + "k": base64.urlsafe_b64encode(secret_key.encode()) +} + +class TestBatch: + + @classmethod + def setup_class(cls): + cred = testutils.MockCredential() + firebase_admin.initialize_app(cred, {'projectId': PROJECT_ID}) + + @classmethod + def teardown_class(cls): + testutils.cleanup_apps() + +class TestVerifyToken(TestBatch): + + def test_no_project_id(self): + def evaluate(): + app = firebase_admin.initialize_app(testutils.MockCredential(), name='no_project_id') + with pytest.raises(ValueError): + app_check.verify_token(token="app_check_token", app=app) + testutils.run_without_project_id(evaluate) + + @pytest.mark.parametrize('token', NON_STRING_ARGS) + def test_verify_token_with_non_string_raises_error(self, token): + with pytest.raises(ValueError) as excinfo: + app_check.verify_token(token) + expected = 'app check token "{0}" must be a string.'.format(token) + assert str(excinfo.value) == expected + + def test_has_valid_token_headers(self): + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + + headers = {"alg": "RS256", 'typ': "JWT"} + assert app_check_service._has_valid_token_headers(headers=headers) is None + + def test_has_valid_token_headers_with_incorrect_type_raises_error(self): + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + headers = {"alg": "RS256", 'typ': "WRONG"} + with pytest.raises(ValueError) as excinfo: + app_check_service._has_valid_token_headers(headers=headers) + + expected = 'The provided App Check token has an incorrect type header' + assert str(excinfo.value) == expected + + def test_has_valid_token_headers_with_incorrect_algorithm_raises_error(self): + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + headers = {"alg": "HS256", 'typ': "JWT"} + with pytest.raises(ValueError) as excinfo: + app_check_service._has_valid_token_headers(headers=headers) + + expected = ('The provided App Check token has an incorrect alg header. ' + 'Expected RS256 but got HS256.') + assert str(excinfo.value) == expected + + def test_decode_and_verify(self, mocker): + jwt_decode_mock = mocker.patch("jwt.decode", return_value=JWT_PAYLOAD_SAMPLE) + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + payload = app_check_service._decode_and_verify( + token=None, + signing_key="1234", + ) + + jwt_decode_mock.assert_called_once_with( + None, "1234", algorithms=["RS256"], audience=SCOPED_PROJECT_ID) + assert payload == JWT_PAYLOAD_SAMPLE.copy() + + def test_decode_and_verify_with_incorrect_token_and_key(self): + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + with pytest.raises(ValueError) as excinfo: + app_check_service._decode_and_verify( + token="1232132", + signing_key=signing_key, + ) + + expected = ( + 'Decoding App Check token failed. Error: Not enough segments') + assert str(excinfo.value) == expected + + def test_decode_and_verify_with_expired_token_raises_error(self, mocker): + mocker.patch("jwt.decode", side_effect=ExpiredSignatureError) + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + with pytest.raises(ValueError) as excinfo: + app_check_service._decode_and_verify( + token="1232132", + signing_key=signing_key, + ) + + expected = ( + 'The provided App Check token has expired.') + assert str(excinfo.value) == expected + + def test_decode_and_verify_with_invalid_signature_raises_error(self, mocker): + mocker.patch("jwt.decode", side_effect=InvalidSignatureError) + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + with pytest.raises(ValueError) as excinfo: + app_check_service._decode_and_verify( + token="1232132", + signing_key=signing_key, + ) + + expected = ( + 'The provided App Check token has an invalid signature.') + assert str(excinfo.value) == expected + + def test_decode_and_verify_with_invalid_aud_raises_error(self, mocker): + mocker.patch("jwt.decode", side_effect=InvalidAudienceError) + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + with pytest.raises(ValueError) as excinfo: + app_check_service._decode_and_verify( + token="1232132", + signing_key=signing_key, + ) + + expected = ( + 'The provided App Check token has an incorrect "aud" (audience) claim. ' + f'Expected payload to include {SCOPED_PROJECT_ID}.') + assert str(excinfo.value) == expected + + def test_decode_and_verify_with_invalid_iss_raises_error(self, mocker): + mocker.patch("jwt.decode", side_effect=InvalidIssuerError) + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + with pytest.raises(ValueError) as excinfo: + app_check_service._decode_and_verify( + token="1232132", + signing_key=signing_key, + ) + + expected = ( + 'The provided App Check token has an incorrect "iss" (issuer) claim. ' + f'Expected claim to include {ISSUER}') + assert str(excinfo.value) == expected + + def test_decode_and_verify_with_none_sub_raises_error(self, mocker): + jwt_with_none_sub = JWT_PAYLOAD_SAMPLE.copy() + jwt_with_none_sub['sub'] = None + mocker.patch("jwt.decode", return_value=jwt_with_none_sub) + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + with pytest.raises(ValueError) as excinfo: + app_check_service._decode_and_verify( + token="1232132", + signing_key=signing_key, + ) + + expected = ( + 'The provided App Check token "sub" (subject) claim ' + f'"{None}" must be a non-empty string.') + assert str(excinfo.value) == expected + + def test_decode_and_verify_with_non_string_sub_raises_error(self, mocker): + sub_number = 1234 + jwt_with_none_sub = JWT_PAYLOAD_SAMPLE.copy() + jwt_with_none_sub['sub'] = sub_number + mocker.patch("jwt.decode", return_value=jwt_with_none_sub) + app = firebase_admin.get_app() + app_check_service = app_check._get_app_check_service(app) + with pytest.raises(ValueError) as excinfo: + app_check_service._decode_and_verify( + token="1232132", + signing_key=signing_key, + ) + + expected = ( + 'The provided App Check token "sub" (subject) claim ' + f'"{sub_number}" must be a string.') + assert str(excinfo.value) == expected + + def test_verify_token(self, mocker): + mocker.patch("jwt.decode", return_value=JWT_PAYLOAD_SAMPLE) + mocker.patch("jwt.PyJWKClient.get_signing_key_from_jwt", return_value=PyJWK(signing_key)) + mocker.patch("jwt.get_unverified_header", return_value=JWT_PAYLOAD_SAMPLE.get("headers")) + app = firebase_admin.get_app() + + payload = app_check.verify_token("encoded", app) + expected = JWT_PAYLOAD_SAMPLE.copy() + expected['app_id'] = APP_ID + assert payload == expected + + def test_verify_token_with_non_list_audience_raises_error(self, mocker): + jwt_with_non_list_audience = JWT_PAYLOAD_SAMPLE.copy() + jwt_with_non_list_audience["aud"] = '1234' + mocker.patch("jwt.decode", return_value=jwt_with_non_list_audience) + mocker.patch("jwt.PyJWKClient.get_signing_key_from_jwt", return_value=PyJWK(signing_key)) + mocker.patch("jwt.get_unverified_header", return_value=JWT_PAYLOAD_SAMPLE.get("headers")) + app = firebase_admin.get_app() + + with pytest.raises(ValueError) as excinfo: + app_check.verify_token("encoded", app) + + expected = 'Firebase App Check token has incorrect "aud" (audience) claim.' + assert str(excinfo.value) == expected + + def test_verify_token_with_empty_list_audience_raises_error(self, mocker): + jwt_with_empty_list_audience = JWT_PAYLOAD_SAMPLE.copy() + jwt_with_empty_list_audience["aud"] = [] + mocker.patch("jwt.decode", return_value=jwt_with_empty_list_audience) + mocker.patch("jwt.PyJWKClient.get_signing_key_from_jwt", return_value=PyJWK(signing_key)) + mocker.patch("jwt.get_unverified_header", return_value=JWT_PAYLOAD_SAMPLE.get("headers")) + app = firebase_admin.get_app() + + with pytest.raises(ValueError) as excinfo: + app_check.verify_token("encoded", app) + + expected = 'Firebase App Check token has incorrect "aud" (audience) claim.' + assert str(excinfo.value) == expected + + def test_verify_token_with_incorrect_issuer_raises_error(self, mocker): + jwt_with_non_incorrect_issuer = JWT_PAYLOAD_SAMPLE.copy() + jwt_with_non_incorrect_issuer["iss"] = "https://dwyfrequency.googleapis.com/" + mocker.patch("jwt.decode", return_value=jwt_with_non_incorrect_issuer) + mocker.patch("jwt.PyJWKClient.get_signing_key_from_jwt", return_value=PyJWK(signing_key)) + mocker.patch("jwt.get_unverified_header", return_value=JWT_PAYLOAD_SAMPLE.get("headers")) + app = firebase_admin.get_app() + + with pytest.raises(ValueError) as excinfo: + app_check.verify_token("encoded", app) + + expected = 'Token does not contain the correct "iss" (issuer).' + assert str(excinfo.value) == expected