diff --git a/CHANGELOG.md b/CHANGELOG.md index a476aef09..b7f4830bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -174,3 +174,4 @@ Please add a _short_ line describing the PR you make, if the PR implements a spe ## Sprint (2022-11-25 - 2022-12-09) - Changed support email ([#1324](https://github.com/ScilifelabDataCentre/dds_web/pull/1324)) +- Allow Super Admin login during maintenance ([#1333](https://github.com/ScilifelabDataCentre/dds_web/pull/1333)) diff --git a/dds_web/__init__.py b/dds_web/__init__.py index a88ee4d61..58e720d19 100644 --- a/dds_web/__init__.py +++ b/dds_web/__init__.py @@ -193,7 +193,6 @@ def prepare(): # Verify cli version compatible if "api/v1" in flask.request.path: - block_if_maintenance() verify_cli_version(version_cli=flask.request.headers.get("X-Cli-Version")) # Get message of the day diff --git a/dds_web/security/auth.py b/dds_web/security/auth.py index 579d31b10..d247d8662 100644 --- a/dds_web/security/auth.py +++ b/dds_web/security/auth.py @@ -18,7 +18,12 @@ # Own modules from dds_web import basic_auth, auth, mail -from dds_web.errors import AuthenticationError, AccessDeniedError, InviteError, TokenMissingError +from dds_web.errors import ( + AuthenticationError, + AccessDeniedError, + InviteError, + TokenMissingError, +) from dds_web.database import models import dds_web.utils @@ -233,6 +238,9 @@ def verify_token(token): if not user: raise AccessDeniedError(message="Invalid token. Try reauthenticating.") + # Block all users but Super Admins during maintenance + dds_web.utils.block_if_maintenance(user=user) + if user.password_reset: token_expired = claims.get("exp") token_issued = datetime.datetime.fromtimestamp(token_expired) - MFA_EXPIRES_IN @@ -396,6 +404,9 @@ def verify_password(username, password): user = models.User.query.get(username) if user and user.is_active and user.verify_password(input_password=password): + # Block all users but Super Admins during maintenance + dds_web.utils.block_if_maintenance(user=user) + if not user.totp_enabled: send_hotp_email(user) return user diff --git a/dds_web/utils.py b/dds_web/utils.py index 5adb0e3af..295170a8b 100644 --- a/dds_web/utils.py +++ b/dds_web/utils.py @@ -575,46 +575,26 @@ def calculate_version_period_usage(version): # maintenance check -def block_if_maintenance(): +def block_if_maintenance(user=None): """Block API requests if maintenance is ongoing and projects are busy.""" # Get maintenance row maintenance: models.Maintenance = models.Maintenance.query.first() # Possibly block request if maintenance ongoing / planned if maintenance.active: - # Endpoints accepting requests during active maintenance / planned - project_required_endpoints: typing.List = [ - f"/api/v1{resource}" for resource in ["/file/new", "/file/update", "/proj/busy"] - ] - admin_endpoints: typing.List = [ - f"/api/v1{x}" - for x in [ - "/user/info", - "/maintenance", - "/unit/info/all", - "/motd", - "/motd/send", - "/proj/busy/any", + if not user: + # Endpoints accepting requests during active maintenance - only login for non-logged in users + admin_endpoints: typing.List = [ + "/api/v1/user/encrypted_token", + "/api/v1/user/second_factor", ] - ] - approved_endpoints: typing.List = project_required_endpoints + admin_endpoints - - # Request not to accepted endpoint - # OR request to accepted endpoint but project not specified or busy - current_endpoint: str = flask.request.path - if current_endpoint not in approved_endpoints: - # Request not accepted during maintenance - raise MaintenanceOngoingException() + + # Request not to accepted endpoint + # OR request to accepted endpoint but project not specified or busy + current_endpoint: str = flask.request.path + if current_endpoint not in admin_endpoints: + # Request not accepted during maintenance + raise MaintenanceOngoingException() else: - # Request accepted during maintenance but... - req_args = flask.request.args - if current_endpoint in project_required_endpoints: - # Request requires a project - if not (req_args and (project_id := req_args.get("project"))): - raise MaintenanceOngoingException() - - # Request requires a busy project - if not models.Project.query.filter_by( - public_id=project_id, busy=True - ).one_or_none(): - raise MaintenanceOngoingException() + if user.role != "Super Admin": + raise MaintenanceOngoingException() diff --git a/tests/api/test_s3.py b/tests/api/test_s3.py new file mode 100644 index 000000000..8a3a855fc --- /dev/null +++ b/tests/api/test_s3.py @@ -0,0 +1,111 @@ +import flask +import http +import sqlalchemy +import typing + +from tests import DDSEndpoint, DEFAULT_HEADER, UserAuth, USER_CREDENTIALS +from dds_web.database import models +from dds_web import db + + +def test_get_s3_info_unauthorized(client: flask.testing.FlaskClient) -> None: + """Only Unit Admin and Unit Personnel can get this info.""" + # Get project + project: models.Project = models.Project.query.first() + + # Get users with access to project + unit_users = db.session.query(models.UnitUser).filter( + models.UnitUser.unit_id == project.unit_id + ) + + # Get users with no access to project + unit_users_no_access = db.session.query(models.UnitUser).filter( + models.UnitUser.unit_id != project.unit_id + ) + + # Returned info - expected + expected_return: typing.Dict = { + "safespring_project": project.responsible_unit.safespring_name, + "url": project.responsible_unit.safespring_endpoint, + "keys": { + "access_key": project.responsible_unit.safespring_access, + "secret_key": project.responsible_unit.safespring_secret, + }, + "bucket": project.bucket, + } + + # Try s3info - "/s3/proj" + # Super Admin --> No + super_admin_token = UserAuth(USER_CREDENTIALS["superadmin"]).token(client) + response = client.get( + DDSEndpoint.S3KEYS, + headers=super_admin_token, + query_string={"project": "public_project_id"}, + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + # Unit Admin, correct unit --> Yes + unit_admin: models.UnitUser = unit_users.filter(models.UnitUser.is_admin == True).first() + unit_admin_token = UserAuth(USER_CREDENTIALS[unit_admin.username]).token(client) + response = client.get( + DDSEndpoint.S3KEYS, + headers=unit_admin_token, + query_string={"project": "public_project_id"}, + ) + assert response.status_code == http.HTTPStatus.OK + response_json = response.json + for x, y in expected_return.items(): + assert x in response_json + assert response_json[x] == y + + # Unit Admin, incorrect unit --> No + unit_admin_no_access: models.UnitUser = unit_users_no_access.filter( + models.UnitUser.is_admin == True + ).first() + unit_admin_no_access_token = UserAuth(USER_CREDENTIALS[unit_admin_no_access.username]).token( + client + ) + response = client.get( + DDSEndpoint.S3KEYS, + headers=unit_admin_no_access_token, + query_string={"project": "public_project_id"}, + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + # Unit Personnel, correct unit --> Yes + unit_personnel: models.UnitUser = unit_users.filter(models.UnitUser.is_admin == False).first() + unit_personnel_token = UserAuth(USER_CREDENTIALS[unit_personnel.username]).token(client) + response = client.get( + DDSEndpoint.S3KEYS, + headers=unit_personnel_token, + query_string={"project": "public_project_id"}, + ) + assert response.status_code == http.HTTPStatus.OK + response_json = response.json + for x, y in expected_return.items(): + assert x in response_json + assert response_json[x] == y + + # Unit Personnel, incorrect unit --> No + unit_personnel_no_access: models.UnitUser = unit_users_no_access.filter( + models.UnitUser.is_admin == False + ).first() + unit_personnel_no_access_token = UserAuth( + USER_CREDENTIALS[unit_personnel_no_access.username] + ).token(client) + response = client.get( + DDSEndpoint.S3KEYS, + headers=unit_personnel_no_access_token, + query_string={"project": "public_project_id"}, + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN + + # Researcher --> No + researcher: models.ResearchUser = project.researchusers[0].researchuser + researcher_token = UserAuth(USER_CREDENTIALS[researcher.username]).token(client) + response = client.get( + DDSEndpoint.S3KEYS, + headers=researcher_token, + query_string={"project": "public_project_id"}, + ) + assert response.status_code == http.HTTPStatus.FORBIDDEN diff --git a/tests/test_init.py b/tests/test_init.py index b255ae17c..7cbb11471 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -230,7 +230,32 @@ def test_update_uploaded_file_with_log_nonexisting_file(client, runner, fs: Fake # block_if_maintenance - should be blocked in init by before_request -def test_block_if_maintenance_active_encryptedtoken_not_approved( +def test_block_if_maintenance_active_encryptedtoken_blocked( + client: flask.testing.FlaskClient, +) -> None: + """Non-authenticated users should not be able to authenticate during maintenance. + + Exception: Super Admins. + """ + # Get maintenance row and set to active + maintenance: models.Maintenance = models.Maintenance.query.first() + maintenance.active = True + db.session.commit() + + # Researcher, Unit Personnel, Unit Admin + for user in ["researcher", "unituser", "unitadmin"]: + with patch.object(flask_mail.Mail, "send") as mock_mail_send: + response = client.get( + DDSEndpoint.ENCRYPTED_TOKEN, + auth=UserAuth(USER_CREDENTIALS[user]).as_tuple(), + headers=DEFAULT_HEADER, + ) + assert mock_mail_send.call_count == 0 + + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + + +def test_block_if_maintenance_active_encryptedtoken_super_admin_approved( client: flask.testing.FlaskClient, ) -> None: """Certain endpoints should be blocked if maintenance is active.""" @@ -243,2975 +268,730 @@ def test_block_if_maintenance_active_encryptedtoken_not_approved( with patch.object(flask_mail.Mail, "send") as mock_mail_send: response = client.get( DDSEndpoint.ENCRYPTED_TOKEN, - auth=("researchuser", "password"), + auth=UserAuth(USER_CREDENTIALS["superadmin"]).as_tuple(), headers=DEFAULT_HEADER, ) - assert mock_mail_send.call_count == 0 - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert mock_mail_send.call_count == 1 + assert response.status_code == http.HTTPStatus.OK -def test_block_if_maintenance_active_secondfactor_not_approved( +def test_block_if_maintenance_inactive_approved( client: flask.testing.FlaskClient, ) -> None: - """Requests with wrong 2FA should be blocked if maintenance is active.""" - # Get maintenance row and set to active + """All should be allowed to authenticate with basic auth when maintenance not ongoing.""" + # Maintenance should be off maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + assert not maintenance.active - # Try second factor - "/user/second_factor" - response = client.get( - DDSEndpoint.SECOND_FACTOR, - headers={"Authorization": f"Bearer made.up.token.long.version", **DEFAULT_HEADER}, - json={"TOTP": "somrthing"}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # Try authenticating all + for user in ["superadmin", "unitadmin", "unituser", "researcher"]: + with patch.object(flask_mail.Mail, "send") as mock_mail_send: + response = client.get( + DDSEndpoint.ENCRYPTED_TOKEN, + auth=UserAuth(USER_CREDENTIALS[user]).as_tuple(), + headers=DEFAULT_HEADER, + ) + assert mock_mail_send.call_count == 1 + assert response.status_code == http.HTTPStatus.OK -def test_block_if_maintenance_active_s3proj_not_approved(client: flask.testing.FlaskClient) -> None: - """More requests to be blocked if maintenance is active.""" - # Get maintenance row and set to active +def test_block_if_maintenance_inactive_first_ok_second_blocked( + client: flask.testing.FlaskClient, +) -> None: + """Block second factor for all but Super Admins if maintenance started after basic auth.""" + # Maintenance should be off maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # Try s3info - "/s3/proj" - response = client.get( - DDSEndpoint.S3KEYS, - headers=DEFAULT_HEADER, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert not maintenance.active + # All but Super Admin: Basic auth OK, second factor FAIL + for user in ["unitadmin", "unituser", "researcher"]: + # Maintenance not active during basic auth + maintenance.active = False + db.session.commit() -def test_block_if_maintenance_active_fileslist_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """More requests to be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["researchuser"]).token(client) + # Perform basic auth + basic_auth = UserAuth(USER_CREDENTIALS[user]) + hotp_value = basic_auth.fetch_hotp() + partial_token = basic_auth.partial_token(client) - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + # Maintenance active during 2fa + maintenance.active = True + db.session.commit() - # Try list files - "/files/list" - response = client.get( - DDSEndpoint.LIST_FILES, - headers=token, - query_string={"project": "public_project_id"}, - json={"show_size": True}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # Attempt 2fa + response = client.get( + DDSEndpoint.SECOND_FACTOR, + headers=partial_token, + json={"HOTP": hotp_value.decode()}, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # Super Admin: + # Maintenance not active during basic auth + maintenance.active = False + db.session.commit() -def test_block_if_maintenance_active_filematch_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """More requests to be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) + # Perform basic auth + basic_auth = UserAuth(USER_CREDENTIALS["superadmin"]) + hotp_value = basic_auth.fetch_hotp() + partial_token = basic_auth.partial_token(client) - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() + # Maintenance active during 2fa maintenance.active = True db.session.commit() - # "/file/match" + # Attempt 2fa response = client.get( - DDSEndpoint.FILE_MATCH, - headers=token, - query_string={"project": "file_testing_project"}, - json=["non_existent_file"], + DDSEndpoint.SECOND_FACTOR, + headers=partial_token, + json={"HOTP": hotp_value.decode()}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - + assert response.status_code == http.HTTPStatus.OK -def test_block_if_maintenance_active_removefile_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) - # Get maintenance row and set to active +def test_block_if_maintenance_active_none_approved_users(client: flask.testing.FlaskClient) -> None: + """More requests to be blocked if maintenance is active.""" + # Get maintenance row maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - # Try remove file - "/file/rm" - from tests.test_files_new import FIRST_NEW_FILE + for user in ["researcher", "unituser", "unitadmin"]: + maintenance.active = False + db.session.commit() - response = client.delete( - DDSEndpoint.REMOVE_FILE, - headers=token, - query_string={"project": "file_testing_project"}, - json=[FIRST_NEW_FILE["name"]], - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # Perform authentication + user_auth = UserAuth(USER_CREDENTIALS[user]) + token = user_auth.token(client) + maintenance.active = True + db.session.commit() -def test_block_if_maintenance_active_removedir_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + # S3info - "/s3/proj" + response = client.get( + DDSEndpoint.S3KEYS, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # "/file/rmdir" - from tests.test_files_new import FIRST_NEW_FILE + # NewFile - "/file/new" + # post + response = client.post( + DDSEndpoint.FILE_NEW, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." + # put + response = client.put( + DDSEndpoint.FILE_NEW, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - response = client.delete( - DDSEndpoint.REMOVE_FOLDER, - headers=token, - query_string={"project": "file_testing_project"}, - json=[FIRST_NEW_FILE["subpath"]], - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # MatchFiles - "/file/match" + response = client.get( + DDSEndpoint.FILE_MATCH, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." + # ListFiles - "/files/list" + response = client.get( + DDSEndpoint.LIST_FILES, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." -def test_block_if_maintenance_active_fileinfo_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["researchuser"]).token(client) + # RemoveFile - "/file/rm" + response = client.delete( + DDSEndpoint.REMOVE_FILE, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + # RemoveDir - "/file/rmdir" + response = client.delete( + DDSEndpoint.REMOVE_FOLDER, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # "/file/info" - with patch("dds_web.api.api_s3_connector.ApiS3Connector.generate_get_url") as mock_url: - mock_url.return_value = "url" + # FileInfo - "/file/info" response = client.get( DDSEndpoint.FILE_INFO, headers=token, - query_string={"project": "public_project_id"}, - json=["filename1"], ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_fileallinfo_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["researchuser"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # "/file/all/info" - with patch("dds_web.api.api_s3_connector.ApiS3Connector.generate_get_url") as mock_url: - mock_url.return_value = "url" + # FileInfoAll - "/file/all/info" response = client.get( DDSEndpoint.FILE_INFO_ALL, headers=token, - query_string={"project": "public_project_id"}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." + # UpdateFile - "/file/update" + response = client.put( + DDSEndpoint.FILE_UPDATE, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." -def test_block_if_maintenance_active_projectlist_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) + # NewFile - "/file/new" + response = client.post( + DDSEndpoint.FILE_NEW, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + # UserProjects - "/proj/list" + response = client.get( + DDSEndpoint.LIST_PROJ, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # "/proj/list" - response = client.get( - DDSEndpoint.LIST_PROJ, - headers=token, - json={"usage": True}, - content_type="application/json", - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # RemoveContents - "/proj/rm" + response = client.delete( + DDSEndpoint.REMOVE_PROJ_CONT, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." + # GetPublic - "/proj/public" + response = client.get( + DDSEndpoint.PROJ_PUBLIC, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." -def test_block_if_maintenance_active_removeprojectcontents_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) + # GetPrivate - "/proj/private" + response = client.get( + DDSEndpoint.PROJ_PRIVATE, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + # CreateProject - "/proj/create" + response = client.post( + DDSEndpoint.PROJECT_CREATE, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # "/proj/rm" - response = client.delete( - DDSEndpoint.REMOVE_PROJ_CONT, - headers=token, - query_string={"project": "file_testing_project"}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # ProjectUsers - "/proj/users" + response = client.get( + DDSEndpoint.LIST_PROJ_USERS, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." + # ProjectStatus - "/proj/status" + # get + response = client.get( + DDSEndpoint.PROJECT_STATUS, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." + # post + response = client.post( + DDSEndpoint.PROJECT_STATUS, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." -def test_block_if_maintenance_active_projectpublic_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) + # ProjectAccess - "/proj/access" + response = client.post( + DDSEndpoint.PROJECT_ACCESS, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + # ProjectBusy - "/proj/busy" + response = client.put( + DDSEndpoint.PROJECT_BUSY, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # "/proj/public" - response = client.get( - DDSEndpoint.PROJ_PUBLIC, query_string={"project": "public_project_id"}, headers=token - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # ProjectInfo - "/proj/info" + response = client.get( + DDSEndpoint.PROJECT_INFO, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." + # RetrieveUserInfo - "/user/info" + response = client.get( + DDSEndpoint.USER_INFO, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." -def test_block_if_maintenance_active_projectprivate_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) + # AddUser - "/user/add" + response = client.post( + DDSEndpoint.USER_ADD, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + # DeleteUser - "/user/delete" + response = client.delete( + DDSEndpoint.USER_DELETE, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # "/proj/private" - response = client.get( - DDSEndpoint.PROJ_PRIVATE, - query_string={"project": "public_project_id"}, - headers=token, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # DeleteUserSelf - "/file/new" + response = client.delete( + DDSEndpoint.USER_DELETE_SELF, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." + # RemoveUserAssociation - "/user/access/revoke" + response = client.post( + DDSEndpoint.REMOVE_USER_FROM_PROJ, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." -def test_block_if_maintenance_active_createproject_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unituser"]).token(client) + # UserActivation - "/user/activation" + response = client.post( + DDSEndpoint.USER_ACTIVATION, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + # RequestHOTPActivation - "/user/hotp/activate" + response = client.post( + DDSEndpoint.HOTP_ACTIVATION, + auth=user_auth.as_tuple(), + headers=DEFAULT_HEADER, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # "/proj/create" - from tests.test_project_creation import proj_data + # RequestTOTPActivation - "/user/totp/activate" + response = client.post( + DDSEndpoint.TOTP_ACTIVATION, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - response = client.post( - DDSEndpoint.PROJECT_CREATE, - headers=token, - json=proj_data, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + # Users - "/users" + response = client.get( + DDSEndpoint.LIST_USERS, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - -def test_block_if_maintenance_active_projectusers_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unituser"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/proj/users" - response = client.get( - DDSEndpoint.LIST_PROJ_USERS, query_string={"project": "public_project_id"}, headers=token - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_projectstatus_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/proj/status" - response = client.post( - DDSEndpoint.PROJECT_STATUS, - headers=token, - query_string={"project": "public_project_id"}, - json={"new_status": "Available"}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_projectaccess_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/proj/access" - response = client.post( - DDSEndpoint.PROJECT_ACCESS, - headers=token, - query_string={"project": "public_project_id"}, - json={"email": "unituser1@mailtrap.io"}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_adduser_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/user/add" - from tests.api.test_user import first_new_user - - response = client.post( - DDSEndpoint.USER_ADD, - headers=token, - json=first_new_user, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_deleteuser_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/user/delete" - invited_user_row = models.Invite.query.first() - response = client.delete( - DDSEndpoint.USER_DELETE, - headers=token, - json={"email": invited_user_row.email, "is_invite": True}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_deleteself_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["delete_me_researcher"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/user/delete_self" - with patch.object(flask_mail.Mail, "send") as mock_mail_send: - response = client.delete( - DDSEndpoint.USER_DELETE_SELF, - headers=token, - json=None, - ) - # One email for partial token but no new for deletion confirmation - assert mock_mail_send.call_count == 0 - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_revokeaccess_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unituser"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/user/access/revoke" - from tests.test_project_creation import proj_data_with_existing_users - - email = proj_data_with_existing_users["users_to_add"][0]["email"] - response = client.post( - DDSEndpoint.REMOVE_USER_FROM_PROJ, - headers=token, - query_string={"project": "public_project_id"}, - json={"email": email}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_useractivation_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unitadmin"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/user/activation" - from tests.test_user_activation import unituser - - response = client.post( - DDSEndpoint.USER_ACTIVATION, - headers=token, - json={**unituser, "action": "reactivate"}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_hotp_not_approved(client: flask.testing.FlaskClient) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["researcher"]).as_tuple() - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/user/hotp/activate" - response = client.post( - DDSEndpoint.HOTP_ACTIVATION, - headers=DEFAULT_HEADER, - auth=token, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_totp_not_approved(client: flask.testing.FlaskClient) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unituser"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/user/totp/activate" - response = client.post( - DDSEndpoint.TOTP_ACTIVATION, - headers=token, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_listusers_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["superadmin"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/users" - response = client.get(DDSEndpoint.LIST_USERS, headers=token) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_finduser_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["superadmin"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/user/find" - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.USER_FIND, headers=token, json={"username": models.User.query.first().username} - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_deactivatetotp_not_approved( - client: flask.testing.FlaskClient, -) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["superadmin"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/user/totp/deactivate" - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.TOTP_DEACTIVATE, - headers=token, - json={"username": models.User.query.first().username}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_if_maintenance_active_usage_not_approved(client: flask.testing.FlaskClient) -> None: - """Certain endpoints should be blocked if maintenance is active.""" - # Auth before maintenance on - token = UserAuth(USER_CREDENTIALS["unituser"]).token(client) - - # Get maintenance row and set to active - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # "/usage" - response = client.get( - DDSEndpoint.USAGE, - headers=token, - content_type="application/json", - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -# block data put - - -def test_block_put_if_maintenancen_not_active(client: flask.testing.FlaskClient) -> None: - """Go through all endpoints that the upload command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_s3_info - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.S3KEYS, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - expected_output: typing.Dict = { - "safespring_project": user.unit.safespring_name, - "url": user.unit.safespring_endpoint, - "keys": { - "access_key": user.unit.safespring_access, - "secret_key": user.unit.safespring_secret, - }, - "bucket": project.bucket, - } - for x, y in expected_output.items(): - assert x in response.json - assert response.json[x] == y - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify resposne - public_key: str = response.json.get("public") - assert public_key - assert public_key == project.public_key.hex().upper() - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # check_previous_upload - # - request - files = models.File.query.filter_by(project_id=project.id).all() - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_MATCH, - headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - expected_output: typing.Dict = {file.name: file.name_in_bucket for file in files} - returned_files: typing.Dict = response.json.get("files") - assert returned_files - for x, y in expected_output.items(): - assert x in returned_files - assert returned_files[x] == y - - # add_file_db - # - file info - file_info = { - "name": "newfile", - "name_in_bucket": "new_file", - "subpath": ".", - "size": 0, - "size_processed": 0, - "compressed": False, - "salt": "s" * 32, - "public_key": "p" * 64, - "checksum": "c" * 64, - } - # - request - response: werkzeug.test.WrapperTestResponse = client.post( - DDSEndpoint.FILE_NEW, - headers=token, - query_string={"project": project.public_id}, - json=file_info, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - message: str = response.json.get("message") - assert message == f"File '{file_info['name']}' added to db." - created_file: models.File = models.File.query.filter_by( - name=file_info["name"], - name_in_bucket=file_info["name_in_bucket"], - subpath=file_info["subpath"], - size_original=file_info["size"], - size_stored=file_info["size_processed"], - compressed=file_info["compressed"], - salt=file_info["salt"], - public_key=file_info["public_key"], - checksum=file_info["checksum"], - ).one_or_none() - assert created_file - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_put_if_maintenancen_active_after_auth(client: flask.testing.FlaskClient) -> None: - """Go through all endpoints that the upload command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_s3_info - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.S3KEYS, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert not models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # check_previous_upload - # - request - files = models.File.query.filter_by(project_id=project.id).all() - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_MATCH, - headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - # add_file_db - # - file info - file_info = { - "name": "newfile", - "name_in_bucket": "new_file", - "subpath": ".", - "size": 0, - "size_processed": 0, - "compressed": False, - "salt": "s" * 32, - "public_key": "p" * 64, - "checksum": "c" * 64, - } - # - request - response: werkzeug.test.WrapperTestResponse = client.post( - DDSEndpoint.FILE_NEW, - headers=token, - query_string={"project": project.public_id}, - json=file_info, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - created_file: models.File = models.File.query.filter_by( - name=file_info["name"], - name_in_bucket=file_info["name_in_bucket"], - subpath=file_info["subpath"], - size_original=file_info["size"], - size_stored=file_info["size_processed"], - compressed=file_info["compressed"], - salt=file_info["salt"], - public_key=file_info["public_key"], - checksum=file_info["checksum"], - ).one_or_none() - assert not created_file - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - - -def test_block_put_if_maintenancen_active_after_busy(client: flask.testing.FlaskClient) -> None: - """Go through all endpoints that the upload command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_s3_info - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.S3KEYS, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - expected_output: typing.Dict = { - "safespring_project": user.unit.safespring_name, - "url": user.unit.safespring_endpoint, - "keys": { - "access_key": user.unit.safespring_access, - "secret_key": user.unit.safespring_secret, - }, - "bucket": project.bucket, - } - for x, y in expected_output.items(): - assert x in response.json - assert response.json[x] == y - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify resposne - public_key: str = response.json.get("public") - assert public_key - assert public_key == project.public_key.hex().upper() - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # check_previous_upload - # - request - files = models.File.query.filter_by(project_id=project.id).all() - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_MATCH, - headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - returned_files: typing.Dict = response.json.get("files") - assert not returned_files - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_put_if_maintenancen_active_after_check_previous_upload( - client: flask.testing.FlaskClient, -) -> None: - """Go through all endpoints that the upload command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_s3_info - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.S3KEYS, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - expected_output: typing.Dict = { - "safespring_project": user.unit.safespring_name, - "url": user.unit.safespring_endpoint, - "keys": { - "access_key": user.unit.safespring_access, - "secret_key": user.unit.safespring_secret, - }, - "bucket": project.bucket, - } - for x, y in expected_output.items(): - assert x in response.json - assert response.json[x] == y - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify resposne - public_key: str = response.json.get("public") - assert public_key - assert public_key == project.public_key.hex().upper() - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # check_previous_upload - # - request - files = models.File.query.filter_by(project_id=project.id).all() - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_MATCH, - headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - expected_output: typing.Dict = {file.name: file.name_in_bucket for file in files} - returned_files: typing.Dict = response.json.get("files") - assert returned_files - for x, y in expected_output.items(): - assert x in returned_files - assert returned_files[x] == y - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # add_file_db - # - file info - file_info = { - "name": "newfile", - "name_in_bucket": "new_file", - "subpath": ".", - "size": 0, - "size_processed": 0, - "compressed": False, - "salt": "s" * 32, - "public_key": "p" * 64, - "checksum": "c" * 64, - } - # - request - response: werkzeug.test.WrapperTestResponse = client.post( - DDSEndpoint.FILE_NEW, - headers=token, - query_string={"project": project.public_id}, - json=file_info, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - message: str = response.json.get("message") - assert message == f"File '{file_info['name']}' added to db." - created_file: models.File = models.File.query.filter_by( - name=file_info["name"], - name_in_bucket=file_info["name_in_bucket"], - subpath=file_info["subpath"], - size_original=file_info["size"], - size_stored=file_info["size_processed"], - compressed=file_info["compressed"], - salt=file_info["salt"], - public_key=file_info["public_key"], - checksum=file_info["checksum"], - ).one_or_none() - assert created_file - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_put_if_maintenancen_active_after_add_file_db( - client: flask.testing.FlaskClient, -) -> None: - """Go through all endpoints that the upload command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_s3_info - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.S3KEYS, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - expected_output: typing.Dict = { - "safespring_project": user.unit.safespring_name, - "url": user.unit.safespring_endpoint, - "keys": { - "access_key": user.unit.safespring_access, - "secret_key": user.unit.safespring_secret, - }, - "bucket": project.bucket, - } - for x, y in expected_output.items(): - assert x in response.json - assert response.json[x] == y - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify resposne - public_key: str = response.json.get("public") - assert public_key - assert public_key == project.public_key.hex().upper() - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # check_previous_upload - # - request - files = models.File.query.filter_by(project_id=project.id).all() - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_MATCH, - headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - expected_output: typing.Dict = {file.name: file.name_in_bucket for file in files} - returned_files: typing.Dict = response.json.get("files") - assert returned_files - for x, y in expected_output.items(): - assert x in returned_files - assert returned_files[x] == y - - # add_file_db - # - file info - file_info = { - "name": "newfile", - "name_in_bucket": "new_file", - "subpath": ".", - "size": 0, - "size_processed": 0, - "compressed": False, - "salt": "s" * 32, - "public_key": "p" * 64, - "checksum": "c" * 64, - } - # - request - response: werkzeug.test.WrapperTestResponse = client.post( - DDSEndpoint.FILE_NEW, - headers=token, - query_string={"project": project.public_id}, - json=file_info, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - message: str = response.json.get("message") - assert message == f"File '{file_info['name']}' added to db." - created_file: models.File = models.File.query.filter_by( - name=file_info["name"], - name_in_bucket=file_info["name_in_bucket"], - subpath=file_info["subpath"], - size_original=file_info["size"], - size_stored=file_info["size_processed"], - compressed=file_info["compressed"], - salt=file_info["salt"], - public_key=file_info["public_key"], - checksum=file_info["checksum"], - ).one_or_none() - assert created_file - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -# block data get - - -def test_block_get_if_maintenancen_not_active( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the download command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify resposne - public_key: str = response.json.get("public") - assert public_key - assert public_key == project.public_key.hex().upper() - - # __get_key (private) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PRIVATE, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert response.json.get("private") # not testing encryption stuff here - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # __collect_file_info_remote - # - request - files = models.File.query.filter_by(project_id=project.id).all() - with patch("dds_web.api.api_s3_connector.ApiS3Connector.generate_get_url") as mock_url: - mock_url.return_value = "url" - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_INFO, + # InvitedUsers - "/user/invites" + response = client.get( + DDSEndpoint.LIST_INVITES, headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], ) - assert response.status_code == http.HTTPStatus.OK - - # update_db - # - file info - file_to_update: models.File = files[0] - assert file_to_update.time_latest_download is None - file_info = {"name": file_to_update.name} - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.FILE_UPDATE, - headers=token, - query_string={"project": project.public_id}, - json=file_info, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - message: str = response.json.get("message") - assert message == "File info updated." - updated_file: models.File = models.File.query.filter_by( - name=file_to_update.name, - name_in_bucket=file_to_update.name_in_bucket, - subpath=file_to_update.subpath, - size_original=file_to_update.size_original, - size_stored=file_to_update.size_stored, - compressed=file_to_update.compressed, - salt=file_to_update.salt, - public_key=file_to_update.public_key, - checksum=file_to_update.checksum, - ).one_or_none() - assert updated_file and updated_file.time_latest_download is not None - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_get_if_maintenancen_active_after_auth( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the download command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify resposne - assert response.json.get("message") == "Maintenance of DDS is ongoing." - - # __get_key (private) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PRIVATE, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - - # __collect_file_info_remote - # - request - files = models.File.query.filter_by(project_id=project.id).all() - with patch("dds_web.api.api_s3_connector.ApiS3Connector.generate_get_url") as mock_url: - mock_url.return_value = "url" - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_INFO, + # SetMaintenance - "/maintenance" + response = client.put( + DDSEndpoint.MAINTENANCE, headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], ) assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - assert response.json.get("message") == "Maintenance of DDS is ongoing." - - # update_db - # - file info - file_to_update: models.File = files[0] - assert file_to_update.time_latest_download is None - file_info = {"name": file_to_update.name} - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.FILE_UPDATE, - headers=token, - query_string={"project": project.public_id}, - json=file_info, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." -def test_block_get_if_maintenancen_active_after_busy( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the download command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify resposne - public_key: str = response.json.get("public") - assert public_key - assert public_key == project.public_key.hex().upper() - - # __get_key (private) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PRIVATE, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert response.json.get("private") # not testing encryption stuff here - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # __collect_file_info_remote - # - request - files = models.File.query.filter_by(project_id=project.id).all() - with patch("dds_web.api.api_s3_connector.ApiS3Connector.generate_get_url") as mock_url: - mock_url.return_value = "url" - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_INFO, + # AllUnits - "/unit/info/all" + response = client.get( + DDSEndpoint.LIST_UNITS_ALL, headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], ) assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - assert response.json.get("message") == "Maintenance of DDS is ongoing." - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_get_if_maintenancen_active_after_collect_file_info_remote( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the download command uses. + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify resposne - public_key: str = response.json.get("public") - assert public_key - assert public_key == project.public_key.hex().upper() - - # __get_key (private) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PRIVATE, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert response.json.get("private") # not testing encryption stuff here - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # __collect_file_info_remote - # - request - files = models.File.query.filter_by(project_id=project.id).all() - with patch("dds_web.api.api_s3_connector.ApiS3Connector.generate_get_url") as mock_url: - mock_url.return_value = "url" - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_INFO, + # MOTD - "/motd" + # get + response = client.get( + DDSEndpoint.MOTD, headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], ) assert response.status_code == http.HTTPStatus.OK + # post + response = client.post( + DDSEndpoint.MOTD, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # update_db - # - file info - file_to_update: models.File = files[0] - assert file_to_update.time_latest_download is None - file_info = {"name": file_to_update.name} - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.FILE_UPDATE, - headers=token, - query_string={"project": project.public_id}, - json=file_info, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - message: str = response.json.get("message") - assert message == "File info updated." - updated_file: models.File = models.File.query.filter_by( - name=file_to_update.name, - name_in_bucket=file_to_update.name_in_bucket, - subpath=file_to_update.subpath, - size_original=file_to_update.size_original, - size_stored=file_to_update.size_stored, - compressed=file_to_update.compressed, - salt=file_to_update.salt, - public_key=file_to_update.public_key, - checksum=file_to_update.checksum, - ).one_or_none() - assert updated_file and updated_file.time_latest_download is not None - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_get_if_maintenancen_active_after_update_db( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the download command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) + # SendMOTD - "/motd/send" + response = client.post( + DDSEndpoint.MOTD_SEND, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] + # FindUser - "/user/find" + response = client.get( + DDSEndpoint.USER_FIND, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # __get_key (public) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PUBLIC, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify resposne - public_key: str = response.json.get("public") - assert public_key - assert public_key == project.public_key.hex().upper() - - # __get_key (private) - # - request - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.PROJ_PRIVATE, headers=token, query_string={"project": project.public_id} - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert response.json.get("private") # not testing encryption stuff here + # ResetTwoFactor - "/user/totp/deactivate" + response = client.put( + DDSEndpoint.TOTP_DEACTIVATE, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # __collect_file_info_remote - # - request - files = models.File.query.filter_by(project_id=project.id).all() - with patch("dds_web.api.api_s3_connector.ApiS3Connector.generate_get_url") as mock_url: - mock_url.return_value = "url" - response: werkzeug.test.WrapperTestResponse = client.get( - DDSEndpoint.FILE_INFO, + # AnyProjectBusy - "/proj/busy/any" + response = client.get( + DDSEndpoint.PROJECT_BUSY_ANY, + headers=token, + ) + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." + + # ShowUsage - "/usage" + response = client.get( + DDSEndpoint.USAGE, headers=token, - query_string={"project": project.public_id}, - json=[f.name for f in files], ) - assert response.status_code == http.HTTPStatus.OK + assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE + assert response.json and response.json.get("message") == "Maintenance of DDS is ongoing." - # update_db - # - file info - file_to_update: models.File = files[0] - assert file_to_update.time_latest_download is None - file_info = {"name": file_to_update.name} - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.FILE_UPDATE, - headers=token, - query_string={"project": project.public_id}, - json=file_info, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - message: str = response.json.get("message") - assert message == "File info updated." - updated_file: models.File = models.File.query.filter_by( - name=file_to_update.name, - name_in_bucket=file_to_update.name_in_bucket, - subpath=file_to_update.subpath, - size_original=file_to_update.size_original, - size_stored=file_to_update.size_stored, - compressed=file_to_update.compressed, - salt=file_to_update.salt, - public_key=file_to_update.public_key, - checksum=file_to_update.checksum, - ).one_or_none() - assert updated_file and updated_file.time_latest_download is not None - - # Set maintenance to on + +def test_block_if_maintenance_active_superadmin_ok(client: flask.testing.FlaskClient) -> None: + """Super Admins should not be blocked during maintenance.""" + # Get maintenance row maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True + maintenance.active = False db.session.commit() - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, - headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, - ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - + # Perform authentication + user_auth = UserAuth(USER_CREDENTIALS["superadmin"]) + token = user_auth.token(client) -# block data rm - - -def test_block_rm_all_if_maintenancen_not_active( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. + maintenance.active = True + db.session.commit() - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] + # S3info - "/s3/proj" + response = client.get( + DDSEndpoint.S3KEYS, + headers=token, ) + assert response.status_code == http.HTTPStatus.FORBIDDEN - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # NewFile - "/file/new" + # post + response = client.post( + DDSEndpoint.FILE_NEW, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # __collect_file_info_remote - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_PROJ_CONT, + assert response.status_code == http.HTTPStatus.FORBIDDEN + # put + response = client.put( + DDSEndpoint.FILE_NEW, headers=token, - query_string={"project": project.public_id}, ) - assert response.status_code == http.HTTPStatus.OK - assert response.json.get("removed") is True + assert response.status_code == http.HTTPStatus.FORBIDDEN - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # MatchFiles - "/file/match" + response = client.get( + DDSEndpoint.FILE_MATCH, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() + assert response.status_code == http.HTTPStatus.FORBIDDEN - -def test_block_rm_all_if_maintenancen_after_auth( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] + # ListFiles - "/files/list" + response = client.get( + DDSEndpoint.LIST_FILES, + headers=token, ) + assert response.status_code == http.HTTPStatus.FORBIDDEN - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # RemoveFile - "/file/rm" + response = client.delete( + DDSEndpoint.REMOVE_FILE, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert not models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # remove_all - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_PROJ_CONT, + assert response.status_code == http.HTTPStatus.FORBIDDEN + + # RemoveDir - "/file/rmdir" + response = client.delete( + DDSEndpoint.REMOVE_FOLDER, headers=token, - query_string={"project": project.public_id}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - assert response.json.get("message") == "Maintenance of DDS is ongoing." + assert response.status_code == http.HTTPStatus.FORBIDDEN - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # FileInfo - "/file/info" + response = client.get( + DDSEndpoint.FILE_INFO, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() + assert response.status_code == http.HTTPStatus.FORBIDDEN - -def test_block_rm_all_if_maintenancen_active_after_busy( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] + # FileInfoAll - "/file/all/info" + response = client.get( + DDSEndpoint.FILE_INFO_ALL, + headers=token, ) + assert response.status_code == http.HTTPStatus.FORBIDDEN - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # UpdateFile - "/file/update" + response = client.put( + DDSEndpoint.FILE_UPDATE, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + assert response.status_code == http.HTTPStatus.FORBIDDEN - # remove_all - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_PROJ_CONT, + # NewFile - "/file/new" + response = client.post( + DDSEndpoint.FILE_NEW, headers=token, - query_string={"project": project.public_id}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - assert response.json.get("message") == "Maintenance of DDS is ongoing." + assert response.status_code == http.HTTPStatus.FORBIDDEN - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # UserProjects - "/proj/list" + response = client.get( + DDSEndpoint.LIST_PROJ, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_rm_all_if_maintenancen_active_after_rm( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] + # RemoveContents - "/proj/rm" + response = client.delete( + DDSEndpoint.REMOVE_PROJ_CONT, + headers=token, ) + assert response.status_code == http.HTTPStatus.FORBIDDEN - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # GetPublic - "/proj/public" + response = client.get( + DDSEndpoint.PROJ_PUBLIC, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # remove_all - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_PROJ_CONT, + assert response.status_code == http.HTTPStatus.FORBIDDEN + + # GetPrivate - "/proj/private" + response = client.get( + DDSEndpoint.PROJ_PRIVATE, headers=token, - query_string={"project": project.public_id}, ) - assert response.status_code == http.HTTPStatus.OK - assert response.json.get("removed") is True - assert models.File.query.filter_by(project_id=project.public_id).count() == 0 - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + assert response.status_code == http.HTTPStatus.FORBIDDEN - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # CreateProject - "/proj/create" + response = client.post( + DDSEndpoint.PROJECT_CREATE, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_rm_file_if_maintenancen_after_auth( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. + assert response.status_code == http.HTTPStatus.FORBIDDEN - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] + # ProjectUsers - "/proj/users" + response = client.get( + DDSEndpoint.LIST_PROJ_USERS, + headers=token, ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # ProjectStatus - "/proj/status" + # get + response = client.get( + DDSEndpoint.PROJECT_STATUS, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert not models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # remove_file - # - get files - files: typing.List = [file.name for file in models.File.query.all()] - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_FILE, + assert response.status_code == http.HTTPStatus.BAD_REQUEST + # post + response = client.post( + DDSEndpoint.PROJECT_STATUS, headers=token, - query_string={"project": project.public_id}, - json=files, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert models.File.query.filter(models.File.name.in_(files)).count() == len(files) + assert response.status_code == http.HTTPStatus.FORBIDDEN - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # ProjectAccess - "/proj/access" + response = client.post( + DDSEndpoint.PROJECT_ACCESS, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_rm_file_if_maintenancen_active_after_busy( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] - ) - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] + assert response.status_code == http.HTTPStatus.FORBIDDEN - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( + # ProjectBusy - "/proj/busy" + response = client.put( DDSEndpoint.PROJECT_BUSY, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + assert response.status_code == http.HTTPStatus.FORBIDDEN - # remove_file - # - get files - files: typing.List = [file.name for file in models.File.query.all()] - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_FILE, + # ProjectInfo - "/proj/info" + response = client.get( + DDSEndpoint.PROJECT_INFO, headers=token, - query_string={"project": project.public_id}, - json=files, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert models.File.query.filter(models.File.name.in_(files)).count() == len(files) + assert response.status_code == http.HTTPStatus.BAD_REQUEST - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # RetrieveUserInfo - "/user/info" + response = client.get( + DDSEndpoint.USER_INFO, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_rm_file_if_maintenancen_active_after_rm( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] + # AddUser - "/user/add" + response = client.post( + DDSEndpoint.USER_ADD, + headers=token, ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # DeleteUser - "/user/delete" + response = client.delete( + DDSEndpoint.USER_DELETE, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # remove_file - # - get files - files: typing.List = [file.name for file in models.File.query.all()] - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_FILE, + assert response.status_code == http.HTTPStatus.BAD_REQUEST + + # DeleteUserSelf - "/file/new" + response = client.delete( + DDSEndpoint.USER_DELETE_SELF, headers=token, - query_string={"project": project.public_id}, - json=files, ) - assert response.status_code == http.HTTPStatus.OK - assert not models.File.query.filter(models.File.name.in_(files)).all() - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() + assert response.status_code == http.HTTPStatus.FORBIDDEN - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # RemoveUserAssociation - "/user/access/revoke" + response = client.post( + DDSEndpoint.REMOVE_USER_FROM_PROJ, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_rm_folder_if_maintenancen_after_auth( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. + assert response.status_code == http.HTTPStatus.FORBIDDEN - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] + # UserActivation - "/user/activation" + response = client.post( + DDSEndpoint.USER_ACTIVATION, + headers=token, ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) + # RequestHOTPActivation - "/user/hotp/activate" + response = client.post( + DDSEndpoint.HOTP_ACTIVATION, + auth=user_auth.as_tuple(), + headers=DEFAULT_HEADER, + ) assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # RequestTOTPActivation - "/user/totp/activate" + response = client.post( + DDSEndpoint.TOTP_ACTIVATION, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert not models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # remove_folder - # - get folders - files_in_folders: typing.List = models.File.query.filter(models.File.subpath != ".").all() - folders: typing.List = list(set(file.subpath for file in files_in_folders)) - assert folders - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_FOLDER, + assert response.status_code == http.HTTPStatus.OK + + # Users - "/users" + response = client.get( + DDSEndpoint.LIST_USERS, headers=token, - query_string={"project": project.public_id}, - json=folders, - ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert models.File.query.filter(models.File.subpath.in_(folders)).count() == len( - files_in_folders ) + assert response.status_code == http.HTTPStatus.OK - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # InvitedUsers - "/user/invites" + response = client.get( + DDSEndpoint.LIST_INVITES, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - # - verify response - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() - - -def test_block_rm_folder_if_maintenancen_active_after_busy( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. + assert response.status_code == http.HTTPStatus.OK - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] + # SetMaintenance - "/maintenance" + response = client.put( + DDSEndpoint.MAINTENANCE, + headers=token, ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # AllUnits - "/unit/info/all" + response = client.get( + DDSEndpoint.LIST_UNITS_ALL, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - # remove_folder - files_in_folders: typing.List = models.File.query.filter(models.File.subpath != ".").all() - folders: typing.List = list(set(file.subpath for file in files_in_folders)) - assert folders - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_FOLDER, + # MOTD - "/motd" + # get + response = client.get( + DDSEndpoint.MOTD, headers=token, - query_string={"project": project.public_id}, - json=folders, ) - assert response.status_code == http.HTTPStatus.SERVICE_UNAVAILABLE - assert response.json.get("message") == "Maintenance of DDS is ongoing." - assert models.File.query.filter(models.File.subpath.in_(folders)).count() == len( - files_in_folders + assert response.status_code == http.HTTPStatus.OK + # post + response = client.post( + DDSEndpoint.MOTD, + headers=token, ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # SendMOTD - "/motd/send" + response = client.post( + DDSEndpoint.MOTD_SEND, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() + assert response.status_code == http.HTTPStatus.BAD_REQUEST - -def test_block_rm_folder_if_maintenancen_active_after_rm( - client: flask.testing.FlaskClient, boto3_session -) -> None: - """Go through all endpoints that the remove command uses. - - Check what happens when maintenance is set to active after upload started. - """ - # Auth - username: str = "unituser" - token: typing.Dict = UserAuth(USER_CREDENTIALS[username]).token(client) - project: models.Project = ( - models.User.query.filter_by(username="unituser").one_or_none().projects[0] + # FindUser - "/user/find" + response = client.get( + DDSEndpoint.USER_FIND, + headers=token, ) + assert response.status_code == http.HTTPStatus.BAD_REQUEST - # list_all_active_motds - # - new motd - new_motd_message: str = "Test motd" - new_motd: models.MOTD = models.MOTD(message=new_motd_message) - db.session.add(new_motd) - db.session.commit() - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.MOTD, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - assert isinstance(response.json.get("motds"), list) - assert new_motd_message in response.json.get("motds")[0]["Message"] - - # get_user_name_if_logged_in - # - request - response: werkzeug.test.WrapperTestResponse = client.get(DDSEndpoint.USER_INFO, headers=token) - assert response.status_code == http.HTTPStatus.OK - # - verify response - user: models.User = models.User.query.filter_by(username=username).one_or_none() - expected_output: typing.Dict = { - "email_primary": user.primary_email, - "emails_all": [x.email for x in user.emails], - "role": user.role, - "username": username, - "name": user.name, - } - info: typing.Dict = response.json.get("info") - assert info - for x, y in expected_output.items(): - assert x in info - assert info[x] == y - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # ResetTwoFactor - "/user/totp/deactivate" + response = client.put( + DDSEndpoint.TOTP_DEACTIVATE, headers=token, - query_string={"project": project.public_id}, - json={"busy": True}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=True).one_or_none() - - # remove_folder - # - get folders - files_in_folders: typing.List = models.File.query.filter(models.File.subpath != ".").all() - folders: typing.List = list(set(file.subpath for file in files_in_folders)) - assert folders - # - request - response: werkzeug.test.WrapperTestResponse = client.delete( - DDSEndpoint.REMOVE_FOLDER, + assert response.status_code == http.HTTPStatus.BAD_REQUEST + + # AnyProjectBusy - "/proj/busy/any" + response = client.get( + DDSEndpoint.PROJECT_BUSY_ANY, headers=token, - query_string={"project": project.public_id}, - json=folders, ) assert response.status_code == http.HTTPStatus.OK - assert not models.File.query.filter(models.File.subpath.in_(folders)).all() - # Set maintenance to on - maintenance: models.Maintenance = models.Maintenance.query.first() - maintenance.active = True - db.session.commit() - - # change_busy_status - busy - # - request - response: werkzeug.test.WrapperTestResponse = client.put( - DDSEndpoint.PROJECT_BUSY, + # ShowUsage - "/usage" + response = client.get( + DDSEndpoint.USAGE, headers=token, - query_string={"project": project.public_id}, - json={"busy": False}, ) - assert response.status_code == http.HTTPStatus.OK - # - verify response - busy_status_set: bool = response.json.get("ok") - assert busy_status_set - message: str = response.json.get("message") - assert message == f"Project {project.public_id} was set to not busy." - assert models.Project.query.filter_by(public_id=project.public_id, busy=False).one_or_none() + assert response.status_code == http.HTTPStatus.FORBIDDEN