diff --git a/src/app/core/config.py b/src/app/core/config.py index 4730131..a6cd8a6 100644 --- a/src/app/core/config.py +++ b/src/app/core/config.py @@ -2,6 +2,7 @@ from enum import Enum from pydantic_settings import BaseSettings +from pydantic import SecretStr from starlette.config import Config current_file_dir = os.path.dirname(os.path.realpath(__file__)) @@ -19,7 +20,7 @@ class AppSettings(BaseSettings): class CryptSettings(BaseSettings): - SECRET_KEY: str = config("SECRET_KEY") + SECRET_KEY: SecretStr = config("SECRET_KEY", cast=SecretStr) ALGORITHM: str = config("ALGORITHM", default="HS256") ACCESS_TOKEN_EXPIRE_MINUTES: int = config("ACCESS_TOKEN_EXPIRE_MINUTES", default=30) REFRESH_TOKEN_EXPIRE_DAYS: int = config("REFRESH_TOKEN_EXPIRE_DAYS", default=7) diff --git a/src/app/core/security.py b/src/app/core/security.py index f0c9fa4..b7b4b03 100644 --- a/src/app/core/security.py +++ b/src/app/core/security.py @@ -6,6 +6,7 @@ from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from sqlalchemy.ext.asyncio import AsyncSession +from Pydantic import SecretStr from ..crud.crud_users import crud_users from .config import settings @@ -13,7 +14,7 @@ from .schemas import TokenBlacklistCreate, TokenData -SECRET_KEY = settings.SECRET_KEY +SECRET_KEY: SecretStr = settings.SECRET_KEY ALGORITHM = settings.ALGORITHM ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES REFRESH_TOKEN_EXPIRE_DAYS = settings.REFRESH_TOKEN_EXPIRE_DAYS @@ -57,7 +58,7 @@ async def create_access_token(data: dict[str, Any], expires_delta: timedelta | N else: expire = datetime.now(UTC).replace(tzinfo=None) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire, "token_type": TokenType.ACCESS}) - encoded_jwt: str = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + encoded_jwt: str = jwt.encode(to_encode, SECRET_KEY.get_secret_value(), algorithm=ALGORITHM) return encoded_jwt @@ -68,7 +69,7 @@ async def create_refresh_token(data: dict[str, Any], expires_delta: timedelta | else: expire = datetime.now(UTC).replace(tzinfo=None) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire, "token_type": TokenType.REFRESH}) - encoded_jwt: str = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + encoded_jwt: str = jwt.encode(to_encode, SECRET_KEY.get_secret_value(), algorithm=ALGORITHM) return encoded_jwt @@ -94,7 +95,7 @@ async def verify_token(token: str, expected_token_type: TokenType, db: AsyncSess return None try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM]) username_or_email: str = payload.get("sub") token_type: str = payload.get("token_type") @@ -120,7 +121,7 @@ async def blacklist_tokens(access_token: str, refresh_token: str, db: AsyncSessi Database session for performing database operations. """ for token in [access_token, refresh_token]: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM]) expires_at = datetime.fromtimestamp(payload.get("exp")) await crud_token_blacklist.create( db, @@ -131,7 +132,7 @@ async def blacklist_tokens(access_token: str, refresh_token: str, db: AsyncSessi ) async def blacklist_token(token: str, db: AsyncSession) -> None: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM]) expires_at = datetime.fromtimestamp(payload.get("exp")) await crud_token_blacklist.create( db,