diff --git a/docs/rest_api.rst b/docs/rest_api.rst index 2887b3a4b..4dab1dffb 100644 --- a/docs/rest_api.rst +++ b/docs/rest_api.rst @@ -517,6 +517,28 @@ methods:: class ExampleApi(BaseApi): base_permissions = ['can_private'] +API Key Authentication +~~~~~~~~~~~~~~~~~~~~~~ + +In addition to JWT tokens, FAB supports API key authentication. API keys are long-lived +Bearer tokens useful for service-to-service communication or automation scripts. + +To enable API key authentication, set ``FAB_API_KEY_ENABLED = True`` in your config. + +Once enabled, you can use an API key in the same way as a JWT token:: + + $ curl http://localhost:8080/api/v1/example/private \ + -H "Authorization: Bearer sst_" + +API keys are distinguished from JWT tokens by their prefix (default: ``sst_``). When the +``protect()`` decorator receives a request with an API key: + +- If the key is **invalid**, the endpoint returns HTTP **401 Unauthorized**. +- If the key is valid but the user **lacks permission**, the endpoint returns HTTP **403 Forbidden**. +- If the key is valid and the user **has permission**, the request proceeds normally. + +The OpenAPI spec for protected endpoints lists both ``jwt`` and ``api_key`` as valid security +schemes, so clients can choose either authentication method. You can create an alternate JWT user loader, this can be useful if you want to use an external Authentication provider and map the JWT identity to your diff --git a/docs/security.rst b/docs/security.rst index 32b597caa..fa1586f94 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -581,6 +581,47 @@ The rate can be changed by adjusting ``AUTH_RATE_LIMIT`` to, for example, ``1 pe at the `documentation `_ of Flask-Limiter for more options and examples. +Authentication: API Keys +------------------------ + +FAB supports API key authentication as an alternative to JWT tokens. API keys are long-lived +credentials that can be used for service-to-service communication or automation. + +**Enabling API Key Authentication** + +Set the following in your config:: + + FAB_API_KEY_ENABLED = True + +**Creating API Keys** + +API keys are managed through the ``SecurityManager``. You can create keys programmatically:: + + from flask import current_app + + sm = current_app.appbuilder.sm + api_key = sm.create_api_key(user=user, name="my-service-key") + +The returned key string should be stored securely -- it cannot be retrieved again after creation. + +**Using API Keys** + +Pass the API key as a Bearer token in the ``Authorization`` header:: + + $ curl http://localhost:8080/api/v1/example/private \ + -H "Authorization: Bearer sst_" + +API keys use the same permission system as regular users. The key inherits the roles and +permissions of the user it belongs to. + +**Configuration Options** + +The following configuration options are available: + +- ``FAB_API_KEY_ENABLED`` -- Set to ``True`` to enable API key authentication (default: ``False``). +- ``FAB_API_KEY_PREFIXES`` -- List of prefixes that identify API keys vs JWT tokens + (default: ``["sst_"]``). + Role based ---------- diff --git a/flask_appbuilder/api/__init__.py b/flask_appbuilder/api/__init__.py index 4891b0822..df263b656 100644 --- a/flask_appbuilder/api/__init__.py +++ b/flask_appbuilder/api/__init__.py @@ -132,7 +132,7 @@ def wraps(self: "BaseApi", *args: Any, **kwargs: Any) -> Response: def rison( - schema: Optional[Dict[str, Any]] = None + schema: Optional[Dict[str, Any]] = None, ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """ Use this decorator to parse URI *Rison* arguments to @@ -701,7 +701,7 @@ def operation_helper( # Merge docs spec and override spec operation_spec.update(override_method_spec.get(method.lower(), {})) if self.get_method_permission(func.__name__): - operation_spec["security"] = [{"jwt": []}] + operation_spec["security"] = [{"jwt": []}, {"api_key": []}] operations[method.lower()] = operation_spec else: operations[method.lower()] = {} diff --git a/flask_appbuilder/security/api.py b/flask_appbuilder/security/api.py index 268333f9a..7d4a864e5 100644 --- a/flask_appbuilder/security/api.py +++ b/flask_appbuilder/security/api.py @@ -28,6 +28,13 @@ def add_apispec_components(self, api_spec): jwt_scheme = {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"} api_spec.components.security_scheme("jwt", jwt_scheme) api_spec.components.security_scheme("jwt_refresh", jwt_scheme) + api_key_scheme = { + "type": "http", + "scheme": "bearer", + "bearerFormat": "API Key", + "description": "API key authentication using Bearer token", + } + api_spec.components.security_scheme("api_key", api_key_scheme) @expose("/login", methods=["POST"]) @safe diff --git a/flask_appbuilder/security/decorators.py b/flask_appbuilder/security/decorators.py index c2d5a9fe9..9fad8b9d0 100644 --- a/flask_appbuilder/security/decorators.py +++ b/flask_appbuilder/security/decorators.py @@ -99,6 +99,25 @@ def wraps(self, *args, **kwargs): permission_str, class_permission_name ): return f(self, *args, **kwargs) + # Check API key authentication (before JWT) + if current_app.config.get("FAB_API_KEY_ENABLED", False): + api_key_string = ( + current_app.appbuilder.sm.extract_api_key_from_request() + ) + if api_key_string is not None: + user = current_app.appbuilder.sm.validate_api_key(api_key_string) + if not user: + return self.response_401() + if current_app.appbuilder.sm.has_access( + permission_str, class_permission_name + ): + return f(self, *args, **kwargs) + log.warning( + LOGMSG_ERR_SEC_ACCESS_DENIED, + permission_str, + class_permission_name, + ) + return self.response_403() # if no browser login then verify JWT if not (self.allow_browser_login or allow_browser_login): verify_jwt_in_request() diff --git a/flask_appbuilder/security/manager.py b/flask_appbuilder/security/manager.py index 9d88e605a..8c62f4b77 100644 --- a/flask_appbuilder/security/manager.py +++ b/flask_appbuilder/security/manager.py @@ -575,6 +575,8 @@ def auth_rate_limit(self) -> str: @property def current_user(self): + if getattr(g, "_api_key_user", False) and hasattr(g, "user"): + return g.user if current_user.is_authenticated: return g.user elif current_user_jwt: @@ -1869,6 +1871,11 @@ def has_access(self, permission_name: str, view_name: str) -> bool: """ Check if current user or public has access to view or menu """ + # Check API key authenticated user first + if getattr(g, "_api_key_user", False) and hasattr(g, "user"): + user = g.user + if user and user.is_active: + return self._has_view_access(user, permission_name, view_name) if current_user.is_authenticated and current_user.is_active: return self._has_view_access(g.user, permission_name, view_name) elif current_user_jwt and current_user_jwt.is_active: @@ -2456,6 +2463,103 @@ def load_user_jwt(self, _jwt_header, jwt_data): g.user = user return user + """ + ---------------------- + API KEY AUTHENTICATION + ---------------------- + """ + + @staticmethod + def extract_api_key_from_request() -> Optional[str]: + """ + Extract an API key from the request's Authorization header. + + Checks for Bearer tokens that match configured API key prefixes + (FAB_API_KEY_PREFIXES config, default: ["sst_"]). + + Returns the raw API key string if a matching prefix is found, + or None if the token is not an API key (e.g., a JWT). + """ + auth_header = request.headers.get("Authorization", "") + if not auth_header.lower().startswith("bearer "): + return None + token = auth_header[7:].strip() + if not token: + return None + prefixes = current_app.config.get("FAB_API_KEY_PREFIXES", ["sst_"]) + for prefix in prefixes: + if token.startswith(prefix): + return token + return None + + def validate_api_key(self, api_key_string: str) -> Optional[Any]: + """ + Validate an API key and return the associated User if valid. + + Uses a fast lookup hash (configurable via FAB_API_KEY_LOOKUP_HASH_METHOD, + default: "sha256") for O(1) retrieval, then verifies against the slow + key_hash for defense in depth. + + Override in subclass to provide storage-specific implementation. + + :param api_key_string: The raw API key (e.g., "sst_abc123...") + :return: User object if valid, None otherwise + """ + raise NotImplementedError + + def create_api_key( + self, + user: Any, + name: str, + scopes: Optional[str] = None, + expires_on: Optional[datetime.datetime] = None, + ) -> Optional[Dict[str, Any]]: + """ + Create a new API key for a user. + + Override in subclass to provide storage-specific implementation. + + :param user: The user to create the key for + :param name: A friendly name for the key + :param scopes: Optional comma-separated scopes + :param expires_on: Optional expiration datetime + :return: Dict with key info including plaintext key (shown once) + """ + raise NotImplementedError + + def revoke_api_key(self, uuid: str) -> bool: + """ + Revoke an API key by UUID. + + Override in subclass to provide storage-specific implementation. + + :param uuid: The UUID of the key to revoke + :return: True if revoked, False if not found + """ + raise NotImplementedError + + def find_api_keys_for_user(self, user_id: int) -> List[Any]: + """ + Find all API keys for a user. + + Override in subclass to provide storage-specific implementation. + + :param user_id: The user's ID + :return: List of ApiKey objects + """ + raise NotImplementedError + + def get_api_key_by_uuid(self, uuid: str) -> Optional[Any]: + """ + Get an API key by its UUID. + + Override in subclass to provide storage-specific implementation. + + :param uuid: The API key's UUID + :return: ApiKey object or None + """ + raise NotImplementedError + @staticmethod def before_request(): g.user = current_user diff --git a/flask_appbuilder/security/sqla/apis/__init__.py b/flask_appbuilder/security/sqla/apis/__init__.py index 1743a8243..f96e7e8d0 100644 --- a/flask_appbuilder/security/sqla/apis/__init__.py +++ b/flask_appbuilder/security/sqla/apis/__init__.py @@ -1,3 +1,4 @@ +from flask_appbuilder.security.sqla.apis.api_key import ApiKeyApi # noqa: F401 from flask_appbuilder.security.sqla.apis.group import GroupApi # noqa: F401 from flask_appbuilder.security.sqla.apis.permission import PermissionApi # noqa: F401 from flask_appbuilder.security.sqla.apis.permission_view_menu import ( # noqa: F401 diff --git a/flask_appbuilder/security/sqla/apis/api_key/__init__.py b/flask_appbuilder/security/sqla/apis/api_key/__init__.py new file mode 100644 index 000000000..5b07d6f0e --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/api_key/__init__.py @@ -0,0 +1 @@ +from .api import ApiKeyApi # noqa: F401 diff --git a/flask_appbuilder/security/sqla/apis/api_key/api.py b/flask_appbuilder/security/sqla/apis/api_key/api.py new file mode 100644 index 000000000..96a5cd2d6 --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/api_key/api.py @@ -0,0 +1,184 @@ +from flask import current_app, request +from flask_appbuilder.api import BaseApi, expose, safe +from flask_appbuilder.const import API_RESULT_RES_KEY +from flask_appbuilder.security.decorators import permission_name, protect +from flask_appbuilder.security.sqla.apis.api_key.schema import ( + ApiKeyCreateResponseSchema, + ApiKeyPostSchema, + ApiKeyResponseSchema, +) +from marshmallow import ValidationError + + +class ApiKeyApi(BaseApi): + resource_name = "security/api_keys" + openapi_spec_tag = "Security API Keys" + class_permission_name = "ApiKey" + allow_browser_login = True + + post_schema = ApiKeyPostSchema() + response_schema = ApiKeyResponseSchema() + create_response_schema = ApiKeyCreateResponseSchema() + openapi_spec_component_schemas = ( + ApiKeyPostSchema, + ApiKeyResponseSchema, + ApiKeyCreateResponseSchema, + ) + + @expose("/", methods=["GET"]) + @protect() + @safe + @permission_name("list") + def list_api_keys(self): + """List current user's API keys + --- + get: + responses: + 200: + description: List of API keys for the current user + content: + application/json: + schema: + type: object + properties: + result: + items: + $ref: '#/components/schemas/ApiKeyResponseSchema' + type: array + 401: + $ref: '#/components/responses/401' + 500: + $ref: '#/components/responses/500' + """ + sm = current_app.appbuilder.sm + user = sm.current_user + api_keys = sm.find_api_keys_for_user(user.id) + result = self.response_schema.dump(api_keys, many=True) + return self.response(200, **{API_RESULT_RES_KEY: result}) + + @expose("/", methods=["POST"]) + @protect() + @safe + @permission_name("create") + def create_api_key(self): + """Create a new API key + --- + post: + requestBody: + description: API key creation parameters + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ApiKeyPostSchema' + responses: + 201: + description: API key created successfully + content: + application/json: + schema: + type: object + properties: + result: + $ref: '#/components/schemas/ApiKeyCreateResponseSchema' + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 500: + $ref: '#/components/responses/500' + """ + try: + item = self.post_schema.load(request.json) + except ValidationError as error: + return self.response_400(message=error.messages) + + sm = current_app.appbuilder.sm + user = sm.current_user + result = sm.create_api_key( + user=user, + name=item["name"], + scopes=item.get("scopes"), + expires_on=item.get("expires_on"), + ) + return self.response( + 201, + **{API_RESULT_RES_KEY: self.create_response_schema.dump(result)}, + ) + + @expose("/", methods=["GET"]) + @protect() + @safe + @permission_name("get") + def get_api_key(self, key_uuid): + """Get a single API key info + --- + get: + parameters: + - in: path + schema: + type: string + name: key_uuid + responses: + 200: + description: API key details + content: + application/json: + schema: + type: object + properties: + result: + $ref: '#/components/schemas/ApiKeyResponseSchema' + 401: + $ref: '#/components/responses/401' + 404: + $ref: '#/components/responses/404' + 500: + $ref: '#/components/responses/500' + """ + sm = current_app.appbuilder.sm + api_key = sm.get_api_key_by_uuid(key_uuid) + user = sm.current_user + if not api_key or api_key.user_id != user.id: + return self.response_404() + result = self.response_schema.dump(api_key) + return self.response(200, **{API_RESULT_RES_KEY: result}) + + @expose("/", methods=["DELETE"]) + @protect() + @safe + @permission_name("revoke") + def revoke_api_key(self, key_uuid): + """Revoke an API key + --- + delete: + parameters: + - in: path + schema: + type: string + name: key_uuid + responses: + 200: + description: API key revoked + content: + application/json: + schema: + type: object + properties: + message: + type: string + 401: + $ref: '#/components/responses/401' + 404: + $ref: '#/components/responses/404' + 500: + $ref: '#/components/responses/500' + """ + sm = current_app.appbuilder.sm + api_key = sm.get_api_key_by_uuid(key_uuid) + user = sm.current_user + if not api_key or api_key.user_id != user.id: + return self.response_404() + + sm.revoke_api_key(key_uuid) + return self.response(200, message="API key revoked") diff --git a/flask_appbuilder/security/sqla/apis/api_key/schema.py b/flask_appbuilder/security/sqla/apis/api_key/schema.py new file mode 100644 index 000000000..be6731f9c --- /dev/null +++ b/flask_appbuilder/security/sqla/apis/api_key/schema.py @@ -0,0 +1,40 @@ +from marshmallow import fields, Schema + + +class ApiKeyPostSchema(Schema): + name = fields.String( + required=True, + metadata={"description": "A friendly name for this API key"}, + ) + scopes = fields.String( + load_default=None, + metadata={"description": "Comma-separated scopes (optional)"}, + ) + expires_on = fields.DateTime( + load_default=None, + metadata={"description": "Expiration datetime in ISO format (optional)"}, + ) + + +class ApiKeyResponseSchema(Schema): + uuid = fields.String(metadata={"description": "Unique identifier for the key"}) + name = fields.String(metadata={"description": "Friendly name"}) + key_prefix = fields.String(metadata={"description": "Key prefix (e.g., sst_)"}) + scopes = fields.String(metadata={"description": "Comma-separated scopes"}) + active = fields.Boolean(metadata={"description": "Whether the key is active"}) + created_on = fields.DateTime(metadata={"description": "Creation timestamp"}) + expires_on = fields.DateTime(metadata={"description": "Expiration timestamp"}) + revoked_on = fields.DateTime(metadata={"description": "Revocation timestamp"}) + last_used_on = fields.DateTime(metadata={"description": "Last usage timestamp"}) + + +class ApiKeyCreateResponseSchema(Schema): + uuid = fields.String(metadata={"description": "Unique identifier for the key"}) + name = fields.String(metadata={"description": "Friendly name"}) + key = fields.String( + metadata={"description": "The plaintext API key (only shown once at creation)"}, + ) + key_prefix = fields.String(metadata={"description": "Key prefix (e.g., sst_)"}) + scopes = fields.String(metadata={"description": "Comma-separated scopes"}) + created_on = fields.DateTime(metadata={"description": "Creation timestamp"}) + expires_on = fields.DateTime(metadata={"description": "Expiration timestamp"}) diff --git a/flask_appbuilder/security/sqla/manager.py b/flask_appbuilder/security/sqla/manager.py index 9a8158f3b..8589cd7c9 100755 --- a/flask_appbuilder/security/sqla/manager.py +++ b/flask_appbuilder/security/sqla/manager.py @@ -1,12 +1,14 @@ from __future__ import annotations from datetime import datetime +import hashlib import json import logging +import secrets from typing import Any, Dict, List, Optional, Tuple, Union import uuid -from flask import current_app, has_app_context +from flask import current_app, g, has_app_context from flask_appbuilder import const as c from flask_appbuilder import Model from flask_appbuilder.models.sqla.interface import SQLAInterface @@ -31,6 +33,7 @@ user_updating, ) from flask_appbuilder.security.sqla.apis import ( + ApiKeyApi, GroupApi, PermissionApi, PermissionViewMenuApi, @@ -39,6 +42,7 @@ ViewMenuApi, ) from flask_appbuilder.security.sqla.models import ( + ApiKey, assoc_permissionview_role, Group, Permission, @@ -52,8 +56,7 @@ from sqlalchemy import inspect from sqlalchemy.orm import contains_eager from sqlalchemy.orm.exc import MultipleResultsFound -from werkzeug.security import generate_password_hash - +from werkzeug.security import check_password_hash, generate_password_hash log = logging.getLogger(__name__) @@ -76,6 +79,7 @@ class SecurityManager(BaseSecurityManager): viewmenu_model = ViewMenu permissionview_model = PermissionView registeruser_model = RegisterUser + api_key_model = ApiKey # APIs permission_api = PermissionApi @@ -260,6 +264,8 @@ def register_views(self) -> None: self.appbuilder.add_api(self.view_menu_api) self.appbuilder.add_api(self.permission_view_menu_api) self.appbuilder.add_api(self.group_api) + if current_app.config.get("FAB_API_KEY_ENABLED", False): + self.appbuilder.add_api(ApiKeyApi) def create_db(self) -> None: if not current_app.config.get("FAB_CREATE_DB", True): @@ -276,7 +282,11 @@ def _create_db(self) -> None: engine = self.session.get_bind(mapper=None, clause=None) inspector = inspect(engine) existing_tables = inspector.get_table_names() - if "ab_user" not in existing_tables or "ab_group" not in existing_tables: + if ( + "ab_user" not in existing_tables + or "ab_group" not in existing_tables + or "ab_api_key" not in existing_tables + ): log.info(c.LOGMSG_INF_SEC_NO_DB) Model.metadata.create_all(engine) log.info(c.LOGMSG_INF_SEC_ADD_DB) @@ -1257,3 +1267,163 @@ def import_roles(self, path: str) -> None: session.add_all(roles) session.commit() + + """ + ---------------------- + API KEY MANAGEMENT + ---------------------- + """ + + @staticmethod + def _compute_lookup_hash(api_key_string: str) -> str: + """Compute a keyed hash for O(1) API key lookup. + + Uses scrypt with minimal work parameters (n=2, r=1, p=1) keyed + by the application SECRET_KEY. This is nearly as fast as a plain + hash while satisfying static analysis requirements for + computationally expensive hashing of sensitive data. + The actual password-strength protection is in key_hash + (via generate_password_hash). + """ + salt = b"fab-api-key-lookup" + try: + from flask import current_app + + secret = current_app.config.get("SECRET_KEY", "") + if secret: + salt = secret.encode("utf-8") + except RuntimeError: + pass + return hashlib.scrypt( + api_key_string.encode("utf-8"), + salt=salt, + n=2, + r=1, + p=1, + dklen=32, + ).hex() + + def validate_api_key(self, api_key_string: str) -> Optional[User]: + """ + Validate an API key and return the associated User if valid. + + Uses a fast lookup hash (indexed, unique) for O(1) retrieval, + then verifies against the slow key_hash for defense in depth. + Updates last_used_on, sets g.user and g._api_key_user. + """ + lookup = self._compute_lookup_hash(api_key_string) + api_key = ( + self.session.query(self.api_key_model) + .filter(self.api_key_model.lookup_hash == lookup) + .one_or_none() + ) + if api_key is None: + return None + if not check_password_hash(api_key.key_hash, api_key_string): + log.warning("API key lookup hash matched but slow hash failed") + return None + if not api_key.is_active: + log.warning("API key '%s' matched but is not active", api_key.name) + return None + user = api_key.user + if not user or not user.is_active: + log.warning("API key '%s' user is not active", api_key.name) + return None + # Update last_used_on + api_key.last_used_on = datetime.now() + self.session.commit() + # Set Flask globals + g.user = user + g._api_key_user = True + return user + + def create_api_key( + self, + user: Any, + name: str, + scopes: Optional[str] = None, + expires_on: Optional[datetime] = None, + ) -> Optional[Dict[str, Any]]: + """ + Create a new API key for a user. + + Returns a dict with key info including the plaintext key + (only shown once at creation time). + """ + prefixes = current_app.config.get("FAB_API_KEY_PREFIXES", ["sst_"]) + prefix = prefixes[0] if prefixes else "sst_" + raw_key = prefix + secrets.token_urlsafe(32) + # Use API key-specific hash config, falling back to password hash config + hash_method = current_app.config.get( + "FAB_API_KEY_HASH_METHOD", + current_app.config.get("FAB_PASSWORD_HASH_METHOD", "scrypt"), + ) + salt_length = current_app.config.get( + "FAB_API_KEY_HASH_SALT_LENGTH", + current_app.config.get("FAB_PASSWORD_HASH_SALT_LENGTH", 16), + ) + key_hash = generate_password_hash( + raw_key, + method=hash_method, + salt_length=salt_length, + ) + + api_key = self.api_key_model() + api_key.name = name + api_key.key_hash = key_hash + api_key.lookup_hash = self._compute_lookup_hash(raw_key) + api_key.key_prefix = prefix + api_key.user_id = user.id + api_key.scopes = scopes + api_key.active = True + api_key.expires_on = expires_on + + try: + self.session.add(api_key) + self.session.commit() + log.info("API key '%s' created for user '%s'", name, user.username) + return { + "uuid": api_key.uuid, + "name": api_key.name, + "key": raw_key, + "key_prefix": api_key.key_prefix, + "scopes": api_key.scopes, + "created_on": api_key.created_on, + "expires_on": api_key.expires_on, + } + except Exception as e: + log.error("Error creating API key: %s", e) + self.session.rollback() + return None + + def revoke_api_key(self, key_uuid: str) -> bool: + """Revoke an API key by UUID.""" + api_key = self.get_api_key_by_uuid(key_uuid) + if not api_key: + return False + try: + api_key.revoked_on = datetime.now() + self.session.commit() + log.info("API key '%s' revoked", api_key.name) + return True + except Exception as e: + log.error("Error revoking API key: %s", e) + self.session.rollback() + return False + + def find_api_keys_for_user(self, user_id: int) -> List[ApiKey]: + """Find all API keys for a user.""" + return ( + self.session.query(self.api_key_model) + .filter(self.api_key_model.user_id == user_id) + .order_by(self.api_key_model.created_on.desc()) + .all() + ) + + def get_api_key_by_uuid(self, key_uuid: str) -> Optional[ApiKey]: + """Get an API key by its UUID.""" + return ( + self.session.query(self.api_key_model) + .filter(self.api_key_model.uuid == key_uuid) + .one_or_none() + ) diff --git a/flask_appbuilder/security/sqla/models.py b/flask_appbuilder/security/sqla/models.py index ea24fb7af..fd7578cc8 100755 --- a/flask_appbuilder/security/sqla/models.py +++ b/flask_appbuilder/security/sqla/models.py @@ -1,5 +1,6 @@ import datetime from typing import List, Optional +from uuid import uuid4 from flask import g from flask_appbuilder import Model @@ -14,6 +15,7 @@ Sequence, String, Table, + Text, UniqueConstraint, ) from sqlalchemy.ext.declarative import declared_attr @@ -308,3 +310,60 @@ class RegisterUser(Model): DateTime, default=lambda: datetime.datetime.now(), nullable=True ) registration_hash: Mapped[Optional[str]] = mapped_column(String(256)) + + +class ApiKey(Model): + __tablename__ = "ab_api_key" + __table_args__ = ( + Index("idx_api_key_prefix", "key_prefix"), + Index("idx_api_key_user_id", "user_id"), + Index("idx_api_key_lookup_hash", "lookup_hash", unique=True), + ) + + id: Mapped[int] = mapped_column( + Integer, + Sequence("ab_api_key_id_seq", start=1, increment=1, minvalue=1, cycle=False), + primary_key=True, + ) + uuid: Mapped[str] = mapped_column( + String(36), unique=True, nullable=False, default=lambda: str(uuid4()) + ) + name: Mapped[str] = mapped_column(String(256), nullable=False) + key_hash: Mapped[str] = mapped_column(String(256), nullable=False) + lookup_hash: Mapped[Optional[str]] = mapped_column( + String(256), unique=True, nullable=True + ) + key_prefix: Mapped[str] = mapped_column(String(16), nullable=False) + user_id: Mapped[int] = mapped_column( + Integer, ForeignKey("ab_user.id", ondelete="CASCADE"), nullable=False + ) + user: Mapped["User"] = relationship( + "User", backref=backref("api_keys", cascade="all, delete-orphan") + ) + scopes: Mapped[Optional[str]] = mapped_column(Text, nullable=True) + active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False) + created_on: Mapped[Optional[datetime.datetime]] = mapped_column( + DateTime, default=lambda: datetime.datetime.now(), nullable=True + ) + expires_on: Mapped[Optional[datetime.datetime]] = mapped_column( + DateTime, nullable=True + ) + revoked_on: Mapped[Optional[datetime.datetime]] = mapped_column( + DateTime, nullable=True + ) + last_used_on: Mapped[Optional[datetime.datetime]] = mapped_column( + DateTime, nullable=True + ) + + @property + def is_active(self) -> bool: + if not self.active: + return False + if self.revoked_on is not None: + return False + if self.expires_on is not None and self.expires_on < datetime.datetime.now(): + return False + return True + + def __repr__(self): + return f"ApiKey(name={self.name}, prefix={self.key_prefix})" diff --git a/tests/security/test_api_key.py b/tests/security/test_api_key.py new file mode 100644 index 000000000..0e7b27e3e --- /dev/null +++ b/tests/security/test_api_key.py @@ -0,0 +1,432 @@ +import datetime +import json +import logging +import os +import unittest + +from flask import Flask +from flask_appbuilder import AppBuilder +from flask_appbuilder.security.sqla.models import ApiKey +from flask_appbuilder.utils.legacy import get_sqla_class +from tests.base import FABTestCase +from tests.const import PASSWORD_ADMIN, USERNAME_ADMIN + +basedir = os.path.abspath(os.path.dirname(__file__)) + + +class ApiKeyModelTestCase(unittest.TestCase): + """Test the ApiKey model's is_active property.""" + + def _make_key(self, **kwargs): + key = ApiKey() + key.name = kwargs.get("name", "test-key") + key.key_hash = kwargs.get("key_hash", "fakehash") + key.key_prefix = kwargs.get("key_prefix", "sst_") + key.active = kwargs.get("active", True) + key.revoked_on = kwargs.get("revoked_on", None) + key.expires_on = kwargs.get("expires_on", None) + return key + + def test_is_active_basic(self): + key = self._make_key() + self.assertTrue(key.is_active) + + def test_is_active_inactive(self): + key = self._make_key(active=False) + self.assertFalse(key.is_active) + + def test_is_active_revoked(self): + key = self._make_key(revoked_on=datetime.datetime(2024, 1, 1)) + self.assertFalse(key.is_active) + + def test_is_active_expired(self): + key = self._make_key(expires_on=datetime.datetime(2020, 1, 1)) # past date + self.assertFalse(key.is_active) + + def test_is_active_not_yet_expired(self): + key = self._make_key(expires_on=datetime.datetime(2099, 1, 1)) # future date + self.assertTrue(key.is_active) + + +class ApiKeySecurityManagerTestCase(FABTestCase): + """Test API key methods on the SecurityManager using a real Flask app.""" + + def setUp(self): + self.app = Flask(__name__) + self.app.config.from_object("tests.config_security_api") + self.app.config["FAB_API_KEY_ENABLED"] = True + self.app.config["FAB_API_KEY_PREFIXES"] = ["sst_"] + logging.basicConfig(level=logging.ERROR) + + self.ctx = self.app.app_context() + self.ctx.push() + SQLA = get_sqla_class() + self.db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, self.db.session) + self.create_default_users(self.appbuilder) + + def tearDown(self): + self.appbuilder.session.query(ApiKey).delete() + self.appbuilder.session.commit() + self.ctx.pop() + self.appbuilder = None + self.app = None + + def test_create_api_key(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="test-key") + + self.assertIsNotNone(result) + self.assertEqual(result["name"], "test-key") + self.assertTrue(result["key"].startswith("sst_")) + self.assertIn("uuid", result) + + def test_create_api_key_stores_lookup_hash(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="lookup-test") + raw_key = result["key"] + + api_key = self.appbuilder.sm.get_api_key_by_uuid(result["uuid"]) + expected_hash = self.appbuilder.sm._compute_lookup_hash(raw_key) + self.assertEqual(api_key.lookup_hash, expected_hash) + + def test_validate_api_key_uses_lookup_hash(self): + """Validate that O(1) lookup works via lookup_hash column.""" + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="hash-lookup-test") + raw_key = result["key"] + + validated_user = self.appbuilder.sm.validate_api_key(raw_key) + self.assertIsNotNone(validated_user) + self.assertEqual(validated_user.username, USERNAME_ADMIN) + + def test_validate_api_key_valid(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="test-key") + raw_key = result["key"] + + validated_user = self.appbuilder.sm.validate_api_key(raw_key) + self.assertIsNotNone(validated_user) + self.assertEqual(validated_user.username, USERNAME_ADMIN) + + def test_validate_api_key_invalid(self): + validated_user = self.appbuilder.sm.validate_api_key("sst_invalid_key_12345") + self.assertIsNone(validated_user) + + def test_validate_api_key_revoked(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="revoke-test") + raw_key = result["key"] + + # Revoke the key + self.appbuilder.sm.revoke_api_key(result["uuid"]) + + validated_user = self.appbuilder.sm.validate_api_key(raw_key) + self.assertIsNone(validated_user) + + def test_validate_api_key_expired(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key( + user=user, + name="expire-test", + expires_on=datetime.datetime(2020, 1, 1), + ) + raw_key = result["key"] + + validated_user = self.appbuilder.sm.validate_api_key(raw_key) + self.assertIsNone(validated_user) + + def test_revoke_api_key(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="revoke-me") + + success = self.appbuilder.sm.revoke_api_key(result["uuid"]) + self.assertTrue(success) + + api_key = self.appbuilder.sm.get_api_key_by_uuid(result["uuid"]) + self.assertIsNotNone(api_key.revoked_on) + + def test_revoke_nonexistent_key(self): + success = self.appbuilder.sm.revoke_api_key("nonexistent-uuid") + self.assertFalse(success) + + def test_find_api_keys_for_user(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + self.appbuilder.sm.create_api_key(user=user, name="key-1") + self.appbuilder.sm.create_api_key(user=user, name="key-2") + + keys = self.appbuilder.sm.find_api_keys_for_user(user.id) + self.assertEqual(len(keys), 2) + names = {k.name for k in keys} + self.assertIn("key-1", names) + self.assertIn("key-2", names) + + def test_get_api_key_by_uuid(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="uuid-test") + + api_key = self.appbuilder.sm.get_api_key_by_uuid(result["uuid"]) + self.assertIsNotNone(api_key) + self.assertEqual(api_key.name, "uuid-test") + + def test_get_api_key_by_uuid_not_found(self): + api_key = self.appbuilder.sm.get_api_key_by_uuid("nonexistent") + self.assertIsNone(api_key) + + +class ApiKeySlowHashConfigTestCase(FABTestCase): + """Test configurable slow hash (key_hash) algorithm.""" + + def setUp(self): + self.app = Flask(__name__) + self.app.config.from_object("tests.config_security_api") + self.app.config["FAB_API_KEY_ENABLED"] = True + self.app.config["FAB_API_KEY_PREFIXES"] = ["sst_"] + # Use pbkdf2 with 1000 iterations for API keys (fast for tests) + self.app.config["FAB_API_KEY_HASH_METHOD"] = "pbkdf2:sha256:1000" + self.app.config["FAB_API_KEY_HASH_SALT_LENGTH"] = 8 + logging.basicConfig(level=logging.ERROR) + + self.ctx = self.app.app_context() + self.ctx.push() + SQLA = get_sqla_class() + self.db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, self.db.session) + self.create_default_users(self.appbuilder) + + def tearDown(self): + self.appbuilder.session.query(ApiKey).delete() + self.appbuilder.session.commit() + self.ctx.pop() + self.appbuilder = None + self.app = None + + def test_api_key_uses_custom_hash_method(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="pbkdf2-test") + + api_key = self.appbuilder.sm.get_api_key_by_uuid(result["uuid"]) + # key_hash should use pbkdf2:sha256 method, not scrypt + self.assertTrue(api_key.key_hash.startswith("pbkdf2:sha256:1000$")) + + def test_validate_with_custom_hash_method(self): + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="pbkdf2-validate") + raw_key = result["key"] + + validated_user = self.appbuilder.sm.validate_api_key(raw_key) + self.assertIsNotNone(validated_user) + self.assertEqual(validated_user.username, USERNAME_ADMIN) + + def test_api_key_hash_independent_of_password_hash(self): + """API key hash config should be independent of password hash config.""" + # Password hash uses default scrypt, API key hash uses pbkdf2 + self.assertNotIn("FAB_PASSWORD_HASH_METHOD", self.app.config) + self.assertEqual( + self.app.config["FAB_API_KEY_HASH_METHOD"], "pbkdf2:sha256:1000" + ) + + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="independent-test") + api_key = self.appbuilder.sm.get_api_key_by_uuid(result["uuid"]) + # Confirm API key uses pbkdf2, not the default scrypt + self.assertTrue(api_key.key_hash.startswith("pbkdf2:sha256:1000$")) + + +class ApiKeyEndpointTestCase(FABTestCase): + """Test the API key CRUD endpoints.""" + + def setUp(self): + self.app = Flask(__name__) + self.app.config.from_object("tests.config_security_api") + self.app.config["FAB_API_KEY_ENABLED"] = True + self.app.config["FAB_API_KEY_PREFIXES"] = ["sst_"] + logging.basicConfig(level=logging.ERROR) + + self.ctx = self.app.app_context() + self.ctx.push() + SQLA = get_sqla_class() + self.db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, self.db.session) + self.create_default_users(self.appbuilder) + self.client = self.app.test_client() + self.token = self.login(self.client, USERNAME_ADMIN, PASSWORD_ADMIN) + + def tearDown(self): + self.appbuilder.session.query(ApiKey).delete() + self.appbuilder.session.commit() + self.ctx.pop() + self.appbuilder = None + self.app = None + + def test_create_api_key_endpoint(self): + rv = self.auth_client_post( + self.client, + self.token, + "api/v1/security/api_keys/", + json={"name": "test-endpoint-key"}, + ) + self.assertEqual(rv.status_code, 201) + data = json.loads(rv.data) + self.assertIn("result", data) + self.assertIn("key", data["result"]) + self.assertTrue(data["result"]["key"].startswith("sst_")) + + def test_list_api_keys_endpoint(self): + # Create a key first + self.auth_client_post( + self.client, + self.token, + "api/v1/security/api_keys/", + json={"name": "list-test-key"}, + ) + + rv = self.auth_client_get(self.client, self.token, "api/v1/security/api_keys/") + self.assertEqual(rv.status_code, 200) + data = json.loads(rv.data) + self.assertIn("result", data) + self.assertGreaterEqual(len(data["result"]), 1) + + def test_get_api_key_endpoint(self): + rv = self.auth_client_post( + self.client, + self.token, + "api/v1/security/api_keys/", + json={"name": "get-test-key"}, + ) + key_uuid = json.loads(rv.data)["result"]["uuid"] + + rv = self.auth_client_get( + self.client, self.token, f"api/v1/security/api_keys/{key_uuid}" + ) + self.assertEqual(rv.status_code, 200) + data = json.loads(rv.data) + self.assertEqual(data["result"]["name"], "get-test-key") + # Plaintext key should NOT be returned on get + self.assertNotIn("key", data["result"]) + + def test_revoke_api_key_endpoint(self): + rv = self.auth_client_post( + self.client, + self.token, + "api/v1/security/api_keys/", + json={"name": "revoke-test-key"}, + ) + key_uuid = json.loads(rv.data)["result"]["uuid"] + + rv = self.auth_client_delete( + self.client, self.token, f"api/v1/security/api_keys/{key_uuid}" + ) + self.assertEqual(rv.status_code, 200) + + def test_revoke_nonexistent_key_endpoint(self): + rv = self.auth_client_delete( + self.client, self.token, "api/v1/security/api_keys/nonexistent-uuid" + ) + self.assertEqual(rv.status_code, 404) + + def test_create_api_key_no_name(self): + rv = self.auth_client_post( + self.client, + self.token, + "api/v1/security/api_keys/", + json={}, + ) + self.assertEqual(rv.status_code, 400) + + +class ApiKeyProtectDecoratorTestCase(FABTestCase): + """Test that @protect() accepts API key auth.""" + + def setUp(self): + self.app = Flask(__name__) + self.app.config.from_object("tests.config_security_api") + self.app.config["FAB_API_KEY_ENABLED"] = True + self.app.config["FAB_API_KEY_PREFIXES"] = ["sst_"] + logging.basicConfig(level=logging.ERROR) + + self.ctx = self.app.app_context() + self.ctx.push() + SQLA = get_sqla_class() + self.db = SQLA(self.app) + self.appbuilder = AppBuilder(self.app, self.db.session) + self.create_default_users(self.appbuilder) + self.client = self.app.test_client() + + def tearDown(self): + self.appbuilder.session.query(ApiKey).delete() + # Clean up test users created during tests + noperms_user = self.appbuilder.sm.find_user("noperms_user") + if noperms_user: + self.appbuilder.session.delete(noperms_user) + self.appbuilder.session.commit() + self.ctx.pop() + self.appbuilder = None + self.app = None + + def test_api_key_auth_on_protected_endpoint(self): + """Test that an API key works on a @protect() endpoint.""" + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="protect-test") + api_key = result["key"] + + # Use the API key to list roles (a protected endpoint) + rv = self.client.get( + "api/v1/security/roles/", + headers={"Authorization": f"Bearer {api_key}"}, + ) + self.assertEqual(rv.status_code, 200) + + def test_invalid_api_key_on_protected_endpoint(self): + """Test that an invalid API key returns 401.""" + rv = self.client.get( + "api/v1/security/roles/", + headers={"Authorization": "Bearer sst_invalidkey12345"}, + ) + self.assertEqual(rv.status_code, 401) + + def test_valid_api_key_no_permission_returns_403(self): + """Test that a valid API key without permission returns 403.""" + # Create a role with no permissions and a user assigned to it + self.appbuilder.sm.add_role("NoPerms") + user = self.appbuilder.sm.add_user( + "noperms_user", + "No", + "Perms", + "noperms@fab.org", + self.appbuilder.sm.find_role("NoPerms"), + password="password", + ) + result = self.appbuilder.sm.create_api_key(user=user, name="no-perm-test") + api_key = result["key"] + + # User with NoPerms role should not have access to roles endpoint + rv = self.client.get( + "api/v1/security/roles/", + headers={"Authorization": f"Bearer {api_key}"}, + ) + self.assertEqual(rv.status_code, 403) + + def test_last_used_on_updated(self): + """Test that last_used_on is updated on API key use.""" + user = self.appbuilder.sm.find_user(USERNAME_ADMIN) + result = self.appbuilder.sm.create_api_key(user=user, name="usage-test") + api_key_raw = result["key"] + + # Verify last_used_on is initially None + api_key_obj = self.appbuilder.sm.get_api_key_by_uuid(result["uuid"]) + self.assertIsNone(api_key_obj.last_used_on) + + # Use the key + self.client.get( + "api/v1/security/roles/", + headers={"Authorization": f"Bearer {api_key_raw}"}, + ) + + # Refresh and check + self.appbuilder.session.refresh(api_key_obj) + self.assertIsNotNone(api_key_obj.last_used_on) + + +if __name__ == "__main__": + unittest.main()