diff --git a/tests/integration/hive_metastore/test_migrate.py b/tests/integration/hive_metastore/test_migrate.py index bc9ed12ad8..00f9a289e2 100644 --- a/tests/integration/hive_metastore/test_migrate.py +++ b/tests/integration/hive_metastore/test_migrate.py @@ -43,6 +43,27 @@ } +def principal_acl(ws, inventory_schema, sql_backend): + installation = MockInstallation( + { + "config.yml": { + 'inventory_database': inventory_schema, + }, + "azure_storage_account_info.csv": [ + { + 'prefix': 'dummy_prefix', + 'client_id': 'dummy_application_id', + 'principal': 'dummy_principal', + 'privilege': 'WRITE_FILES', + 'type': 'Application', + 'directory_id': 'dummy_directory', + } + ], + } + ) + return PrincipalACL.for_cli(ws, installation, sql_backend) + + @retried(on=[NotFound], timeout=timedelta(minutes=2)) def test_migrate_managed_tables(ws, sql_backend, inventory_schema, make_catalog, make_schema, make_table): # pylint: disable=too-many-locals @@ -72,7 +93,7 @@ def test_migrate_managed_tables(ws, sql_backend, inventory_schema, make_catalog, table_mapping = StaticTableMapping(ws, sql_backend, rules=rules) group_manager = GroupManager(sql_backend, ws, inventory_schema) migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler) - principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend) + principal_grants = principal_acl(ws, inventory_schema, sql_backend) table_migrate = TablesMigrator( table_crawler, grant_crawler, @@ -141,7 +162,7 @@ def test_migrate_tables_with_cache_should_not_create_table( table_mapping = StaticTableMapping(ws, sql_backend, rules=rules) group_manager = GroupManager(sql_backend, ws, inventory_schema) migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler) - principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend) + principal_grants = principal_acl(ws, inventory_schema, sql_backend) table_migrate = TablesMigrator( table_crawler, grant_crawler, @@ -201,7 +222,7 @@ def test_migrate_external_table( # pylint: disable=too-many-locals ] group_manager = GroupManager(sql_backend, ws, inventory_schema) migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler) - principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend) + principal_grants = principal_acl(ws, inventory_schema, sql_backend) table_migrate = TablesMigrator( table_crawler, grant_crawler, @@ -261,7 +282,7 @@ def test_migrate_external_table_failed_sync( ] group_manager = GroupManager(sql_backend, ws, inventory_schema) migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler) - principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend) + principal_grants = principal_acl(ws, inventory_schema, sql_backend) table_migrate = TablesMigrator( table_crawler, grant_crawler, @@ -317,7 +338,7 @@ def test_revert_migrated_table( table_mapping = StaticTableMapping(ws, sql_backend, rules=rules) group_manager = GroupManager(sql_backend, ws, inventory_schema) migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler) - principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend) + principal_grants = principal_acl(ws, inventory_schema, sql_backend) table_migrate = TablesMigrator( table_crawler, grant_crawler, @@ -437,7 +458,7 @@ def test_mapping_reverts_table( table_mapping = StaticTableMapping(ws, sql_backend, rules=rules) migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler) group_manager = GroupManager(sql_backend, ws, inventory_schema) - principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend) + principal_grants = principal_acl(ws, inventory_schema, sql_backend) table_migrate = TablesMigrator( table_crawler, grant_crawler, @@ -534,7 +555,36 @@ def test_migrate_managed_tables_with_acl( table_mapping = StaticTableMapping(ws, sql_backend, rules=rules) group_manager = GroupManager(sql_backend, ws, inventory_schema) migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler) - principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend) + installation = MockInstallation( + { + "config.yml": { + 'inventory_database': inventory_schema, + }, + "azure_storage_account_info.csv": [ + { + 'prefix': 'dummy_prefix', + 'client_id': 'dummy_application_id', + 'principal': 'dummy_principal', + 'privilege': 'WRITE_FILES', + 'type': 'Application', + 'directory_id': 'dummy_directory', + } + ], + } + ) + principal_grants = PrincipalACL( + ws, + sql_backend, + installation, + StaticTablesCrawler(sql_backend, inventory_schema, [src_managed_table]), + StaticMountCrawler( + [Mount('dummy_mount', 'abfss://dummy@dummy.dfs.core.windows.net/a')], + sql_backend, + ws, + inventory_schema, + ), + AzureACL.for_cli(ws, installation).get_eligible_locations_principals(), + ) table_migrate = TablesMigrator( table_crawler, grant_crawler, @@ -568,19 +618,24 @@ def test_prepare_principal_acl( make_dbfs_data_copy, make_table, make_catalog, + make_schema, + make_cluster, ): - existing_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c' + cluster = make_cluster(single_node=True, spark_conf=_SPARK_CONF, data_security_mode=DataSecurityMode.NONE) new_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/{inventory_schema}' - make_dbfs_data_copy(src_path=existing_mounted_location, dst_path=new_mounted_location) - src_external_table = make_table(external_csv=new_mounted_location) - src_schema = src_external_table.schema_name + make_dbfs_data_copy(src_path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c', dst_path=new_mounted_location) + src_schema = make_schema(catalog_name="hive_metastore") + src_external_table = make_table( + catalog_name=src_schema.catalog_name, schema_name=src_schema.name, external_csv=new_mounted_location + ) dst_catalog = make_catalog() + dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name) rules = [ Rule( "workspace", dst_catalog.name, - src_schema, - src_schema, + src_schema.name, + dst_schema.name, src_external_table.name, src_external_table.name, ), @@ -620,7 +675,7 @@ def test_prepare_principal_acl( ws, inventory_schema, ), - AzureACL.for_cli(ws, installation), + AzureACL.for_cli(ws, installation).get_eligible_locations_principals(), ) table_migrate = TablesMigrator( StaticTablesCrawler(sql_backend, inventory_schema, [src_external_table]), @@ -638,45 +693,29 @@ def test_prepare_principal_acl( ), principal_grants, ) - return table_migrate + return table_migrate, f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}", cluster.cluster_id @retried(on=[NotFound], timeout=timedelta(minutes=3)) def test_migrate_managed_tables_with_principal_acl_azure( ws, - inventory_schema, - make_catalog, - make_table, make_user, - make_dbfs_data_copy, test_prepare_principal_acl, make_cluster_permissions, make_cluster, ): if not ws.config.is_azure: pytest.skip("temporary: only works in azure test env") - table_migrate = test_prepare_principal_acl + table_migrate, table_full_name, cluster_id = test_prepare_principal_acl user = make_user() - cluster = make_cluster(single_node=True, spark_conf=_SPARK_CONF, data_security_mode=DataSecurityMode.NONE) make_cluster_permissions( - object_id=cluster.cluster_id, - permission_level=PermissionLevel.CAN_USE, + object_id=cluster_id, + permission_level=PermissionLevel.CAN_ATTACH_TO, user_name=user.user_name, ) - new_mounted_location = f'dbfs:/mnt/things/a/b/{inventory_schema}' - make_dbfs_data_copy(src_path='dbfs:/mnt/things/a/b/c', dst_path=new_mounted_location) - src_external_table = make_table(external_csv=new_mounted_location) - src_schema = src_external_table.schema_name - - dst_catalog = make_catalog() - - logger.info(f"dst_catalog={dst_catalog.name}, managed_table={src_external_table.full_name}") - table_migrate.migrate_tables(acl_strategy=[AclMigrationWhat.PRINCIPAL]) - target_table_grants = ws.grants.get( - SecurableType.TABLE, f"{dst_catalog.name}.{src_schema}.{src_external_table.name}" - ) + target_table_grants = ws.grants.get(SecurableType.TABLE, table_full_name) match = False for _ in target_table_grants.privilege_assignments: if _.principal == user.user_name and _.privileges == [Privilege.ALL_PRIVILEGES]: