diff --git a/flask_appbuilder/security/manager.py b/flask_appbuilder/security/manager.py index 8f86b7ee51..df750d3765 100644 --- a/flask_appbuilder/security/manager.py +++ b/flask_appbuilder/security/manager.py @@ -2088,14 +2088,17 @@ def import_roles(self, path: str) -> None: raise NotImplementedError def load_user(self, pk): - return self.get_user_by_id(int(pk)) + user = self.get_user_by_id(int(pk)) + if user.is_active: + return user def load_user_jwt(self, _jwt_header, jwt_data): identity = jwt_data["sub"] user = self.load_user(identity) - # Set flask g.user to JWT user, we can't do it on before request - g.user = user - return user + if user.is_active: + # Set flask g.user to JWT user, we can't do it on before request + g.user = user + return user @staticmethod def before_request(): diff --git a/tests/base.py b/tests/base.py index 186b3c205a..8fda65a9b2 100644 --- a/tests/base.py +++ b/tests/base.py @@ -14,6 +14,7 @@ API_SECURITY_USERNAME_KEY, API_SECURITY_VERSION, ) +from flask_appbuilder.security.sqla.models import User from hiro import Timeline import jinja2 from tests.const import ( @@ -130,7 +131,7 @@ def create_user( last_name="user", email="admin@fab.org", role_names=None, - ): + ) -> User: user = appbuilder.sm.find_user(username=username) if user: appbuilder.session.delete(user) diff --git a/tests/security/test_mvc_security.py b/tests/security/test_mvc_security.py index 0ef3261ee2..d7adea63fc 100644 --- a/tests/security/test_mvc_security.py +++ b/tests/security/test_mvc_security.py @@ -93,6 +93,37 @@ def test_sec_login(self): data = rv.data.decode("utf-8") self.assertIn(INVALID_LOGIN_STRING, data) + def test_login_invalid_user(self): + """ + Test Security Login, Logout, invalid login, invalid access + """ + test_username = "testuser" + test_password = "password" + test_user = self.create_user( + self.appbuilder, + test_username, + test_password, + "Admin", + "user", + "user", + "testuser@fab.org", + ) + # Login and list with admin + self.browser_login(self.client, test_username, test_password) + rv = self.client.get("/model1view/list/") + self.assertEqual(rv.status_code, 200) + + # Using the same session make sure the user is not allowed to access when + # the user is deactivated + test_user.active = False + self.db.session.merge(test_user) + self.db.session.commit() + rv = self.client.get("/model1view/list/") + self.assertEqual(rv.status_code, 302) + + self.db.session.delete(test_user) + self.db.session.commit() + def test_db_login_no_next_url(self): """ Test Security no next URL