Skip to content

Commit

Permalink
Merge pull request #1333 from ScilifelabDataCentre/DDS-1406-bug-maint…
Browse files Browse the repository at this point in the history
…enance-mode-blocks-login-for-super-admins

Allow login during maintenance
  • Loading branch information
i-oden authored Dec 8, 2022
2 parents cd9a538 + cbc9d9b commit 47b4eb3
Show file tree
Hide file tree
Showing 6 changed files with 691 additions and 2,809 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,4 @@ Please add a _short_ line describing the PR you make, if the PR implements a spe
## Sprint (2022-11-25 - 2022-12-09)

- Changed support email ([#1324](https://github.com/ScilifelabDataCentre/dds_web/pull/1324))
- Allow Super Admin login during maintenance ([#1333](https://github.com/ScilifelabDataCentre/dds_web/pull/1333))
1 change: 0 additions & 1 deletion dds_web/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ def prepare():

# Verify cli version compatible
if "api/v1" in flask.request.path:
block_if_maintenance()
verify_cli_version(version_cli=flask.request.headers.get("X-Cli-Version"))

# Get message of the day
Expand Down
13 changes: 12 additions & 1 deletion dds_web/security/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

# Own modules
from dds_web import basic_auth, auth, mail
from dds_web.errors import AuthenticationError, AccessDeniedError, InviteError, TokenMissingError
from dds_web.errors import (
AuthenticationError,
AccessDeniedError,
InviteError,
TokenMissingError,
)
from dds_web.database import models
import dds_web.utils

Expand Down Expand Up @@ -233,6 +238,9 @@ def verify_token(token):
if not user:
raise AccessDeniedError(message="Invalid token. Try reauthenticating.")

# Block all users but Super Admins during maintenance
dds_web.utils.block_if_maintenance(user=user)

if user.password_reset:
token_expired = claims.get("exp")
token_issued = datetime.datetime.fromtimestamp(token_expired) - MFA_EXPIRES_IN
Expand Down Expand Up @@ -396,6 +404,9 @@ def verify_password(username, password):
user = models.User.query.get(username)

if user and user.is_active and user.verify_password(input_password=password):
# Block all users but Super Admins during maintenance
dds_web.utils.block_if_maintenance(user=user)

if not user.totp_enabled:
send_hotp_email(user)
return user
50 changes: 15 additions & 35 deletions dds_web/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,46 +575,26 @@ def calculate_version_period_usage(version):


# maintenance check
def block_if_maintenance():
def block_if_maintenance(user=None):
"""Block API requests if maintenance is ongoing and projects are busy."""
# Get maintenance row
maintenance: models.Maintenance = models.Maintenance.query.first()

# Possibly block request if maintenance ongoing / planned
if maintenance.active:
# Endpoints accepting requests during active maintenance / planned
project_required_endpoints: typing.List = [
f"/api/v1{resource}" for resource in ["/file/new", "/file/update", "/proj/busy"]
]
admin_endpoints: typing.List = [
f"/api/v1{x}"
for x in [
"/user/info",
"/maintenance",
"/unit/info/all",
"/motd",
"/motd/send",
"/proj/busy/any",
if not user:
# Endpoints accepting requests during active maintenance - only login for non-logged in users
admin_endpoints: typing.List = [
"/api/v1/user/encrypted_token",
"/api/v1/user/second_factor",
]
]
approved_endpoints: typing.List = project_required_endpoints + admin_endpoints

# Request not to accepted endpoint
# OR request to accepted endpoint but project not specified or busy
current_endpoint: str = flask.request.path
if current_endpoint not in approved_endpoints:
# Request not accepted during maintenance
raise MaintenanceOngoingException()

# Request not to accepted endpoint
# OR request to accepted endpoint but project not specified or busy
current_endpoint: str = flask.request.path
if current_endpoint not in admin_endpoints:
# Request not accepted during maintenance
raise MaintenanceOngoingException()
else:
# Request accepted during maintenance but...
req_args = flask.request.args
if current_endpoint in project_required_endpoints:
# Request requires a project
if not (req_args and (project_id := req_args.get("project"))):
raise MaintenanceOngoingException()

# Request requires a busy project
if not models.Project.query.filter_by(
public_id=project_id, busy=True
).one_or_none():
raise MaintenanceOngoingException()
if user.role != "Super Admin":
raise MaintenanceOngoingException()
111 changes: 111 additions & 0 deletions tests/api/test_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import flask
import http
import sqlalchemy
import typing

from tests import DDSEndpoint, DEFAULT_HEADER, UserAuth, USER_CREDENTIALS
from dds_web.database import models
from dds_web import db


def test_get_s3_info_unauthorized(client: flask.testing.FlaskClient) -> None:
"""Only Unit Admin and Unit Personnel can get this info."""
# Get project
project: models.Project = models.Project.query.first()

# Get users with access to project
unit_users = db.session.query(models.UnitUser).filter(
models.UnitUser.unit_id == project.unit_id
)

# Get users with no access to project
unit_users_no_access = db.session.query(models.UnitUser).filter(
models.UnitUser.unit_id != project.unit_id
)

# Returned info - expected
expected_return: typing.Dict = {
"safespring_project": project.responsible_unit.safespring_name,
"url": project.responsible_unit.safespring_endpoint,
"keys": {
"access_key": project.responsible_unit.safespring_access,
"secret_key": project.responsible_unit.safespring_secret,
},
"bucket": project.bucket,
}

# Try s3info - "/s3/proj"
# Super Admin --> No
super_admin_token = UserAuth(USER_CREDENTIALS["superadmin"]).token(client)
response = client.get(
DDSEndpoint.S3KEYS,
headers=super_admin_token,
query_string={"project": "public_project_id"},
)
assert response.status_code == http.HTTPStatus.FORBIDDEN

# Unit Admin, correct unit --> Yes
unit_admin: models.UnitUser = unit_users.filter(models.UnitUser.is_admin == True).first()
unit_admin_token = UserAuth(USER_CREDENTIALS[unit_admin.username]).token(client)
response = client.get(
DDSEndpoint.S3KEYS,
headers=unit_admin_token,
query_string={"project": "public_project_id"},
)
assert response.status_code == http.HTTPStatus.OK
response_json = response.json
for x, y in expected_return.items():
assert x in response_json
assert response_json[x] == y

# Unit Admin, incorrect unit --> No
unit_admin_no_access: models.UnitUser = unit_users_no_access.filter(
models.UnitUser.is_admin == True
).first()
unit_admin_no_access_token = UserAuth(USER_CREDENTIALS[unit_admin_no_access.username]).token(
client
)
response = client.get(
DDSEndpoint.S3KEYS,
headers=unit_admin_no_access_token,
query_string={"project": "public_project_id"},
)
assert response.status_code == http.HTTPStatus.FORBIDDEN

# Unit Personnel, correct unit --> Yes
unit_personnel: models.UnitUser = unit_users.filter(models.UnitUser.is_admin == False).first()
unit_personnel_token = UserAuth(USER_CREDENTIALS[unit_personnel.username]).token(client)
response = client.get(
DDSEndpoint.S3KEYS,
headers=unit_personnel_token,
query_string={"project": "public_project_id"},
)
assert response.status_code == http.HTTPStatus.OK
response_json = response.json
for x, y in expected_return.items():
assert x in response_json
assert response_json[x] == y

# Unit Personnel, incorrect unit --> No
unit_personnel_no_access: models.UnitUser = unit_users_no_access.filter(
models.UnitUser.is_admin == False
).first()
unit_personnel_no_access_token = UserAuth(
USER_CREDENTIALS[unit_personnel_no_access.username]
).token(client)
response = client.get(
DDSEndpoint.S3KEYS,
headers=unit_personnel_no_access_token,
query_string={"project": "public_project_id"},
)
assert response.status_code == http.HTTPStatus.FORBIDDEN

# Researcher --> No
researcher: models.ResearchUser = project.researchusers[0].researchuser
researcher_token = UserAuth(USER_CREDENTIALS[researcher.username]).token(client)
response = client.get(
DDSEndpoint.S3KEYS,
headers=researcher_token,
query_string={"project": "public_project_id"},
)
assert response.status_code == http.HTTPStatus.FORBIDDEN
Loading

0 comments on commit 47b4eb3

Please sign in to comment.