diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index 3673ce095ea16..b62286e93e519 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -640,6 +640,8 @@ def _sync_perm_for_dag(self, dag, session: Session = None): from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag from airflow.www.fab_security.sqla.models import Action, Permission, Resource + root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id + def needs_perms(dag_id: str) -> bool: dag_resource_name = resource_name_for_dag(dag_id) for permission_name in DAG_ACTIONS: @@ -654,9 +656,9 @@ def needs_perms(dag_id: str) -> bool: return True return False - if dag.access_control or needs_perms(dag.dag_id): - self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id) + if dag.access_control or needs_perms(root_dag_id): + self.log.debug("Syncing DAG permissions: %s to the DB", root_dag_id) from airflow.www.security import ApplessAirflowSecurityManager security_manager = ApplessAirflowSecurityManager(session=session) - security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control) + security_manager.sync_perm_for_dag(root_dag_id, dag.access_control) diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py index 2d5c0b939976e..2d02c773b43ff 100644 --- a/airflow/security/permissions.py +++ b/airflow/security/permissions.py @@ -66,14 +66,15 @@ DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE} -def resource_name_for_dag(dag_id): - """Returns the resource name for a DAG id.""" - if dag_id == RESOURCE_DAG: - return dag_id +def resource_name_for_dag(root_dag_id: str) -> str: + """Returns the resource name for a DAG id. - if dag_id.startswith(RESOURCE_DAG_PREFIX): - return dag_id - - # To account for SubDags - root_dag_id = dag_id.split(".")[0] + Note that since a sub-DAG should follow the permission of its + parent DAG, you should pass ``DagModel.root_dag_id`` to this function, + for a subdag. A normal dag should pass the ``DagModel.dag_id``. + """ + if root_dag_id == RESOURCE_DAG: + return root_dag_id + if root_dag_id.startswith(RESOURCE_DAG_PREFIX): + return root_dag_id return f"{RESOURCE_DAG_PREFIX}{root_dag_id}" diff --git a/airflow/www/security.py b/airflow/www/security.py index 42188f06184b4..de6b0d646e8c8 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -200,6 +200,16 @@ def __init__(self, appbuilder): view.datamodel = CustomSQLAInterface(view.datamodel.obj) self.perms = None + def _get_root_dag_id(self, dag_id): + if '.' in dag_id: + dm = ( + self.get_session.query(DagModel.dag_id, DagModel.root_dag_id) + .filter(DagModel.dag_id == dag_id) + .first() + ) + return dm.root_dag_id or dm.dag_id + return dag_id + def init_role(self, role_name, perms): """ Initialize the role with actions and related resources. @@ -340,7 +350,8 @@ def get_accessible_dag_ids(self, user, user_actions=None, session=None) -> Set[s def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> bool: """Checks if user has read or write access to some dags.""" if dag_id and dag_id != '~': - return self.has_access(action, permissions.resource_name_for_dag(dag_id)) + root_dag_id = self._get_root_dag_id(dag_id) + return self.has_access(action, permissions.resource_name_for_dag(root_dag_id)) user = g.user if action == permissions.ACTION_CAN_READ: @@ -349,17 +360,20 @@ def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> boo def can_read_dag(self, dag_id, user=None) -> bool: """Determines whether a user has DAG read access.""" - dag_resource_name = permissions.resource_name_for_dag(dag_id) + root_dag_id = self._get_root_dag_id(dag_id) + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user) def can_edit_dag(self, dag_id, user=None) -> bool: """Determines whether a user has DAG edit access.""" - dag_resource_name = permissions.resource_name_for_dag(dag_id) + root_dag_id = self._get_root_dag_id(dag_id) + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user) def can_delete_dag(self, dag_id, user=None) -> bool: """Determines whether a user has DAG delete access.""" - dag_resource_name = permissions.resource_name_for_dag(dag_id) + root_dag_id = self._get_root_dag_id(dag_id) + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) return self.has_access(permissions.ACTION_CAN_DELETE, dag_resource_name, user=user) def prefixed_dag_id(self, dag_id): @@ -370,7 +384,8 @@ def prefixed_dag_id(self, dag_id): DeprecationWarning, stacklevel=2, ) - return permissions.resource_name_for_dag(dag_id) + root_dag_id = self._get_root_dag_id(dag_id) + return permissions.resource_name_for_dag(root_dag_id) def is_dag_resource(self, resource_name): """Determines if a resource belongs to a DAG or all DAGs.""" @@ -530,7 +545,8 @@ def create_dag_specific_permissions(self) -> None: dags = dagbag.dags.values() for dag in dags: - dag_resource_name = permissions.resource_name_for_dag(dag.dag_id) + root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) for action_name in self.DAG_ACTIONS: if (action_name, dag_resource_name) not in perms: self._merge_perm(action_name, dag_resource_name) @@ -615,6 +631,7 @@ def _sync_dag_view_permissions(self, dag_id, access_control): :param access_control: a dict where each key is a rolename and each value is a set() of action names (e.g. {'can_read'}) """ + dag_resource_name = permissions.resource_name_for_dag(dag_id) def _get_or_create_dag_permission(action_name: str) -> Optional[Permission]: diff --git a/tests/www/test_security.py b/tests/www/test_security.py index 8c90062600818..7b8541ca81756 100644 --- a/tests/www/test_security.py +++ b/tests/www/test_security.py @@ -192,7 +192,8 @@ def sample_dags(security_manager): @pytest.fixture(scope="module") def has_dag_perm(security_manager): def _has_dag_perm(perm, dag_id, user): - return security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user) + root_dag_id = security_manager._get_root_dag_id(dag_id) + return security_manager.has_access(perm, permissions.resource_name_for_dag(root_dag_id), user) return _has_dag_perm @@ -351,7 +352,7 @@ def test_verify_anon_user_with_admin_role_has_access_to_each_dag( user.roles = security_manager.get_user_roles(user) assert user.roles == {security_manager.get_public_role()} - test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"] + test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3", "test_dag_id_4.with_dot"] for dag_id in test_dag_ids: with _create_dag_model_context(dag_id, session, security_manager): @@ -588,7 +589,8 @@ def test_access_control_with_invalid_permission(app, security_manager): for action in invalid_actions: with pytest.raises(AirflowException) as ctx: security_manager._sync_dag_view_permissions( - 'access_control_test', access_control={rolename: {action}} + 'access_control_test', + access_control={rolename: {action}}, ) assert "invalid permissions" in str(ctx.value) @@ -728,11 +730,13 @@ def test_create_dag_specific_permissions(session, security_manager, monkeypatch, assert ('can_edit', dag_resource_name) in all_perms security_manager._sync_dag_view_permissions.assert_called_once_with( - permissions.resource_name_for_dag('has_access_control'), access_control + permissions.resource_name_for_dag('has_access_control'), + access_control, ) del dagbag_mock.dags["has_access_control"] - with assert_queries_count(1): # one query to get all perms; dagbag is mocked + with assert_queries_count(2): # two query to get all perms; dagbag is mocked + # The extra query happens at permission check security_manager.create_dag_specific_permissions() @@ -782,10 +786,12 @@ def test_prefixed_dag_id_is_deprecated(security_manager): security_manager.prefixed_dag_id("hello") -def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms): +def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms, session): username = 'dag_permission_user' role_name = 'dag_permission_role' parent_dag_name = "parent_dag" + subdag_name = parent_dag_name + ".subdag" + subsubdag_name = parent_dag_name + ".subdag.subsubdag" with app.app_context(): mock_roles = [ { @@ -801,15 +807,57 @@ def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_ username=username, role_name=role_name, ) as user: + dag1 = DagModel(dag_id=parent_dag_name) + dag2 = DagModel(dag_id=subdag_name, is_subdag=True, root_dag_id=parent_dag_name) + dag3 = DagModel(dag_id=subsubdag_name, is_subdag=True, root_dag_id=parent_dag_name) + session.add_all([dag1, dag2, dag3]) + session.commit() security_manager.bulk_sync_roles(mock_roles) - security_manager._sync_dag_view_permissions( - parent_dag_name, access_control={role_name: READ_WRITE} - ) + for dag in [dag1, dag2, dag3]: + security_manager._sync_dag_view_permissions( + parent_dag_name, access_control={role_name: READ_WRITE} + ) + assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user) assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user) assert_user_has_dag_perms( perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user ) + session.query(DagModel).delete() + + +def test_permissions_work_for_dags_with_dot_in_dagname( + app, security_manager, assert_user_has_dag_perms, assert_user_does_not_have_dag_perms, session +): + username = 'dag_permission_user' + role_name = 'dag_permission_role' + dag_id = "dag_id_1" + dag_id_2 = "dag_id_1.with_dot" + with app.app_context(): + mock_roles = [ + { + 'role': role_name, + 'perms': [ + (permissions.ACTION_CAN_READ, f"DAG:{dag_id}"), + (permissions.ACTION_CAN_EDIT, f"DAG:{dag_id}"), + ], + } + ] + with create_user_scope( + app, + username=username, + role_name=role_name, + ) as user: + dag1 = DagModel(dag_id=dag_id) + dag2 = DagModel(dag_id=dag_id_2) + session.add_all([dag1, dag2]) + session.commit() + security_manager.bulk_sync_roles(mock_roles) + security_manager.sync_perm_for_dag(dag1.dag_id, access_control={role_name: READ_WRITE}) + security_manager.sync_perm_for_dag(dag2.dag_id, access_control={role_name: READ_WRITE}) + assert_user_has_dag_perms(perms=READ_WRITE, dag_id=dag_id, user=user) + assert_user_does_not_have_dag_perms(perms=READ_WRITE, dag_id=dag_id_2, user=user) + session.query(DagModel).delete() def test_fab_models_use_airflow_base_meta():