Skip to content

Commit

Permalink
Merge pull request #881 from ScilifelabDataCentre/add-docstrings
Browse files Browse the repository at this point in the history
Add docstrings
  • Loading branch information
i-oden authored Feb 15, 2022
2 parents dddaa44 + ea3ed09 commit b573bec
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 54 deletions.
3 changes: 3 additions & 0 deletions dds_web/api/schemas/custom_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@


class MyDateTimeField(marshmallow.fields.DateTime):
"""Custom date time field for marshmallow schemas."""

def _deserialize(self, value, attr, data, **kwargs):
"""Return a datetime.datetime object as marshmallow field."""
if isinstance(value, datetime.datetime):
return value
return super()._deserialize(value, attr, data, **kwargs)
2 changes: 0 additions & 2 deletions dds_web/api/schemas/file_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class NewFileSchema(project_schemas.ProjectRequiredSchema):
@marshmallow.validates_schema(skip_on_field_errors=True)
def verify_file_not_exists(self, data, **kwargs):
"""Check that the file does not match anything already in the database."""

# Check that there is no such file in the database
project = data.get("project_row")
try:
Expand All @@ -64,7 +63,6 @@ def verify_file_not_exists(self, data, **kwargs):
@marshmallow.post_load
def return_items(self, data, **kwargs):
"""Create file object."""

new_file = models.File(
name=data.get("name"),
name_in_bucket=data.get("name_in_bucket"),
Expand Down
144 changes: 100 additions & 44 deletions dds_web/security/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,52 @@
# FUNCTIONS ############################################################################ FUNCTIONS #
####################################################################################################

# Error handler -------------------------------------------------------------------- Error handler #


@basic_auth.error_handler
def auth_error(status):
"""Handles 401 (Unauthorized) or 403 (Forbidden) for basic authentication."""
return auth_error_common(status)


@auth.error_handler
def auth_error(status):
"""Handles 401 (Unauthorized) or 403 (Forbidden) for token authentication."""
return auth_error_common(status)


def auth_error_common(status):
"""Checks if status code is 401 or 403 and raises appropriate exception."""
if status == http.HTTPStatus.UNAUTHORIZED:
raise AuthenticationError()
elif status == http.HTTPStatus.FORBIDDEN:
raise AccessDeniedError(message="Insufficient credentials")


# User roles -------------------------------------------------------------------------- User roles #


@basic_auth.get_user_roles
def get_user_roles(user):
"""Gets the role corresponding to the current user when basic authentication is used."""
return get_user_roles_common(user)


@auth.get_user_roles
def get_user_roles(user):
"""Gets the role corresponding to the current user when token authentication is used."""
return get_user_roles_common(user)


def get_user_roles_common(user):
"""Return the users role as saved in the db."""
"""Get the user role.
If the user has Researcher role and a project, which the user has been set as an owner,
has been specified, the user role is returned as Project Owner. Otherwise, it is Researcher.
For all other users, return the value of the role set in the database table.
"""
if user.role == "Researcher":
project_public_id = flask.request.args.get("project")
if project_public_id:
Expand All @@ -71,74 +87,101 @@ def get_user_roles_common(user):
project_user = models.ProjectUsers.query.filter_by(
project_id=project.id, user_id=user.username
).first()
if project_user and project_user.owner is True:
if project_user and project_user.owner:
return "Project Owner"
return user.role


# Tokens ---------------------------------------------------------------------------------- Tokens #


def verify_token_no_data(token):
claims = __verify_general_token(token)
user = __user_from_subject(claims.get("sub"))
"""Verify token and return user row."""
claims = __verify_general_token(token=token)
user = __user_from_subject(subject=claims.get("sub"))
# Clean up sensitive information
del claims
gc.collect()

return user


def __base_verify_token_for_invite(token):
claims = __verify_general_token(token)
"""Verify token and return claims."""
claims = __verify_general_token(token=token)

# Subject (user) not a valid entry
if claims.get("sub"):
raise AuthenticationError(message="Invalid token")

return claims


def verify_invite_token(token):
claims = __base_verify_token_for_invite(token)
"""Verify token sent in user invite."""
claims = __base_verify_token_for_invite(token=token)

# Email information required
email = claims.get("inv")
if email:
return email, models.Invite.query.filter(models.Invite.email == email).first()
raise AuthenticationError(message="Invalid token")
if not email:
raise AuthenticationError(message="Invalid token")

return email, models.Invite.query.filter(models.Invite.email == email).first()


def matching_email_with_invite(token, email):
claims = __base_verify_token_for_invite(token)
"""Verify token and get email address."""
claims = __base_verify_token_for_invite(token=token)
return claims.get("inv") == email


def verify_invite_key(token):
claims = __base_verify_token_for_invite(token)
"""Verify token, email, invite and temporary key."""
claims = __base_verify_token_for_invite(token=token)

# Verify email in token
email = claims.get("inv")
if email:
invite = models.Invite.query.filter(models.Invite.email == email).first()
if invite:
temporary_key = bytes.fromhex(claims.get("sen_con"))
if verify_invite_temporary_key(invite, temporary_key):
return temporary_key
return None
if not email:
raise AuthenticationError(message="Invalid token")

# Verify that there's an invite for the current email
invite = models.Invite.query.filter(models.Invite.email == email).first()
if not invite:
raise InviteError(message="Invite could not be found!")
raise AuthenticationError(message="Invalid token")

# Verify temporary key from token claims
temporary_key = bytes.fromhex(claims.get("sen_con"))
if verify_invite_temporary_key(invite=invite, temporary_key=temporary_key):
return temporary_key


@auth.verify_token
def verify_token(token):
claims = __verify_general_token(token)
user = __user_from_subject(claims.get("sub"))
"""Verify token used in token authencation."""
claims = __verify_general_token(token=token)
user = __user_from_subject(subject=claims.get("sub"))

return handle_multi_factor_authentication(user, claims.get("mfa_auth_time"))
return handle_multi_factor_authentication(
user=user, mfa_auth_time_string=claims.get("mfa_auth_time")
)


def __verify_general_token(token):
"""Verifies the format, signature and expiration time of an encrypted and signed JWT token.
Raises AuthenticationError if token is invalid or absent, could raise other exceptions from
dependencies. On successful verification, it returns a dictionary of the claims in the token.
"""
Verifies the format, signature and expiration time of an encrypted and signed JWT token.
Raises AuthenticationError if token is invalid or absent, could raise other exceptions from dependencies.
On successful verification, it returns a dictionary of the claims in the token.
"""
# Token required
if not token:
raise AuthenticationError(message="No token")

# Verify token signature if signed or decrypt first if encrypted
try:
data = (
verify_token_signature(token)
verify_token_signature(token=token)
if token.count(".") == 2
else decrypt_and_verify_token_signature(token)
else decrypt_and_verify_token_signature(token=token)
)
except (ValueError, jwcrypto.common.JWException) as e:
# ValueError is raised when the token doesn't look right (for example no periods)
Expand All @@ -148,7 +191,7 @@ def __verify_general_token(token):
raise AuthenticationError(message="Invalid token")

expiration_time = data.get("exp")
# we use a hard check on top of the one from the dependency
# Use a hard check on top of the one from the dependency
# exp shouldn't be before now no matter what
if expiration_time and (
dds_web.utils.current_time() <= datetime.datetime.fromtimestamp(expiration_time)
Expand All @@ -159,15 +202,15 @@ def __verify_general_token(token):


def __user_from_subject(subject):
"""Get user row from username."""
if subject:
user = models.User.query.get(subject)
if user and user.is_active:
return user

return None


def handle_multi_factor_authentication(user, mfa_auth_time_string):
"""Verify multifactor authentication time frame."""
if user:
if mfa_auth_time_string:
mfa_auth_time = datetime.datetime.fromtimestamp(mfa_auth_time_string)
Expand All @@ -182,45 +225,59 @@ def handle_multi_factor_authentication(user, mfa_auth_time_string):
raise AuthenticationError(
message="Two-factor authentication is required! Please check your primary e-mail!"
)
return None


def send_hotp_email(user):
"""Send one time code via email."""
# Only send if the hotp has not been issued or if it's been more than 15 minutes since
# a hotp email was last sent
if not user.hotp_issue_time or (
user.hotp_issue_time
and (dds_web.utils.current_time() - user.hotp_issue_time > datetime.timedelta(minutes=15))
):
# Generate the one time code from the users specific hotp secret
hotp_value = user.generate_HOTP_token()
msg = dds_web.utils.create_one_time_password_email(user, hotp_value)

# Create and send email
msg = dds_web.utils.create_one_time_password_email(user=user, hotp_value=hotp_value)
mail.send(msg)
return True
return False


def extract_encrypted_token_content(token, username):
"""Extract the sensitive content from inside the encrypted token"""
content = decrypt_and_verify_token_signature(token)
return content.get("sen_con") if content.get("sub") == username else None
"""Extract the sensitive content from inside the encrypted token."""
content = decrypt_and_verify_token_signature(token=token)
if content.get("sub") == username:
return content.get("sen_con")


def decrypt_and_verify_token_signature(token):
"""Wrapper function that streamlines decryption and signature verification,
and returns the claims"""
return verify_token_signature(decrypt_token(token))
"""Streamline decryption and signature verification and return the claims."""
return verify_token_signature(token=decrypt_token(token=token))


def decrypt_token(token):
"""Decrypt the encrypted token and return
the signed token embedded inside"""
"""Decrypt the encrypted token.
Return the signed token embedded inside.
"""
# Get key used for encryption
key = jwk.JWK.from_password(flask.current_app.config.get("SECRET_KEY"))
# Decrypt token
decrypted_token = jwt.JWT(key=key, jwt=token)
return decrypted_token.claims


def verify_token_signature(token):
"""Verify the signature of the token and return the claims
such as subject/username on valid signature"""
"""Verify the signature of the token.
Return the claims such as subject/username on valid signature.
"""
# Get key used for signing
key = jwk.JWK.from_password(flask.current_app.config.get("SECRET_KEY"))

# Verify token
try:
jwttoken = jwt.JWT(key=key, jwt=token, algs=["HS256"])
return json.loads(jwttoken.claims)
Expand All @@ -238,4 +295,3 @@ def verify_password(username, password):
if user and user.is_active and user.verify_password(input_password=password):
send_hotp_email(user)
return user
return None
18 changes: 10 additions & 8 deletions dds_web/security/project_user_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ def __decrypt_with_aes(key, ciphertext, nonce, aad=None):
:param aad: Additional data that should be authenticated with the key, but is not encrypted. Can be None.
"""
try:
aesgcm = ciphers.aead.AESGCM(key)
return aesgcm.decrypt(nonce, ciphertext, aad)
aesgcm = ciphers.aead.AESGCM(key=key)
return aesgcm.decrypt(nonce=nonce, data=ciphertext, associated_data=aad)
except (cryptography.exceptions.InvalidTag, ValueError):
return None

Expand Down Expand Up @@ -184,14 +184,14 @@ def __encrypt_invite_private_key(invite, private_key):


def __decrypt_invite_private_key(invite, temporary_key):
"""Decrypt invite private key."""
if temporary_key and invite.private_key and invite.nonce:
return __decrypt_with_aes(
temporary_key,
invite.private_key,
invite.nonce,
key=temporary_key,
ciphertext=invite.private_key,
nonce=invite.nonce,
aad=b"private key for " + invite.email.encode(),
)
return None


def transfer_invite_private_key_to_user(invite, temporary_key, user):
Expand All @@ -207,11 +207,13 @@ def transfer_invite_private_key_to_user(invite, temporary_key, user):


def verify_invite_temporary_key(invite, temporary_key):
private_key_bytes = __decrypt_invite_private_key(invite, temporary_key)
"""Verify the temporary key generated for the specific user invite."""
private_key_bytes = __decrypt_invite_private_key(invite=invite, temporary_key=temporary_key)
if private_key_bytes and isinstance(
serialization.load_der_private_key(private_key_bytes, password=None),
serialization.load_der_private_key(data=private_key_bytes, password=None),
asymmetric.rsa.RSAPrivateKey,
):
# Clean up sensitive information
del private_key_bytes
gc.collect()
return True
Expand Down
1 change: 1 addition & 0 deletions dds_web/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ def page_query(q):


def create_one_time_password_email(user, hotp_value):
"""Create HOTP email."""
msg = flask_mail.Message(
"DDS One-Time Authentication Code",
recipients=[user.primary_email],
Expand Down

0 comments on commit b573bec

Please sign in to comment.