diff --git a/tests/integration/hive_metastore/test_migrate.py b/tests/integration/hive_metastore/test_migrate.py index 94b667c298..bd2b593cfb 100644 --- a/tests/integration/hive_metastore/test_migrate.py +++ b/tests/integration/hive_metastore/test_migrate.py @@ -717,7 +717,7 @@ def test_migrate_managed_tables_with_principal_acl_azure( permission_level=PermissionLevel.CAN_ATTACH_TO, user_name=user.user_name, ) - table_migrate.migrate_tables(what=What.DBFS_ROOT_DELTA, acl_strategy=[AclMigrationWhat.PRINCIPAL]) + table_migrate.migrate_tables(what=What.EXTERNAL_SYNC, acl_strategy=[AclMigrationWhat.PRINCIPAL]) target_table_grants = ws.grants.get(SecurableType.TABLE, table_full_name) match = False diff --git a/tests/integration/test_installation.py b/tests/integration/test_installation.py index a5a50f7aab..eb0fe2e36a 100644 --- a/tests/integration/test_installation.py +++ b/tests/integration/test_installation.py @@ -29,8 +29,12 @@ from databricks.labs.ucx.hive_metastore.grants import Grant from databricks.labs.ucx.hive_metastore.locations import Mount from databricks.labs.ucx.hive_metastore.mapping import Rule +from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.install import WorkspaceInstallation, WorkspaceInstaller -from databricks.labs.ucx.installer.workflows import WorkflowsDeployment +from databricks.labs.ucx.installer.workflows import ( + DeployedWorkflows, + WorkflowsDeployment, +) from databricks.labs.ucx.workspace_access import redash from databricks.labs.ucx.workspace_access.generic import ( GenericPermissionsSupport, @@ -134,7 +138,7 @@ def factory( ) workspace_installation.run() cleanup.append(workspace_installation) - return workspace_installation, workflows_installation + return workspace_installation, DeployedWorkflows(ws, install_state, timedelta(minutes=3)) yield factory @@ -166,7 +170,7 @@ def test_experimental_permissions_migration_for_group_with_same_name( # pylint: sql_backend.execute(f"ALTER SCHEMA {schema_a.name} OWNER TO `{migrated_group.name_in_workspace}`") sql_backend.execute(f"GRANT SELECT ON TABLE {table_a.full_name} TO `{migrated_group.name_in_workspace}`") - workspace_installation, workflow_installation = new_installation( + workspace_installation, deployed_workflow = new_installation( config_transform=lambda wc: replace(wc, include_group_names=[migrated_group.name_in_workspace]) ) workspace_installation.run() @@ -197,7 +201,7 @@ def test_experimental_permissions_migration_for_group_with_same_name( # pylint: permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions, tacl_support]) permission_manager.inventorize_permissions() - workflow_installation.run_workflow("migrate-groups-experimental") + deployed_workflow.run_workflow("migrate-groups-experimental") object_permissions = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id) new_schema_grants = grants_crawler.for_schema_info(schema_a) @@ -208,12 +212,12 @@ def test_experimental_permissions_migration_for_group_with_same_name( # pylint: @retried(on=[NotFound, TimeoutError], timeout=timedelta(minutes=5)) def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend, new_installation): - workspace_installation, workflow_installation = new_installation() + workspace_installation, deployed_workflow = new_installation() sql_backend.execute(f"DROP SCHEMA {workspace_installation.config.inventory_database} CASCADE") with pytest.raises(NotFound) as failure: - workflow_installation.run_workflow("099-destroy-schema") + deployed_workflow.run_workflow("099-destroy-schema") assert "cannot be found" in str(failure.value) @@ -254,10 +258,10 @@ def test_new_job_cluster_with_policy_assessment( permission_level=PermissionLevel.CAN_USE, group_name=ws_group_a.display_name, ) - install = new_installation( + _, deployed_workflow = new_installation( lambda wc: replace(wc, override_clusters=None, include_group_names=[ws_group_a.display_name]) ) - install.run_workflow("assessment") + deployed_workflow.run_workflow("assessment") generic_permissions = GenericPermissionsSupport(ws, []) before = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id) assert before[ws_group_a.display_name] == PermissionLevel.CAN_USE @@ -276,8 +280,8 @@ def test_running_real_assessment_job( group_name=ws_group_a.display_name, ) - _, install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) - install.run_workflow("assessment") + _, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + deployed_workflow.run_workflow("assessment") generic_permissions = GenericPermissionsSupport(ws, []) before = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id) @@ -305,12 +309,12 @@ def test_running_real_migrate_groups_job( ], ) - install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + install, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) inventory_database = install.config.inventory_database permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions]) permission_manager.inventorize_permissions() - workflows_install.run_workflow("migrate-groups") + deployed_workflow.run_workflow("migrate-groups") found = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id) assert found[acc_group_a.display_name] == PermissionLevel.CAN_USE @@ -335,12 +339,12 @@ def test_running_real_validate_groups_permissions_job( [redash.Listing(ws.queries.list, sql.ObjectTypePlural.QUERIES)], ) - install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + install, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) permission_manager = PermissionManager(sql_backend, install.config.inventory_database, [redash_permissions]) permission_manager.inventorize_permissions() # assert the job does not throw any exception - workflows_install.run_workflow("validate-groups-permissions") + deployed_workflow.run_workflow("validate-groups-permissions") @retried(on=[NotFound], timeout=timedelta(minutes=5)) @@ -363,7 +367,7 @@ def test_running_real_validate_groups_permissions_job_fails( ], ) - install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + install, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) inventory_database = install.config.inventory_database permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions]) permission_manager.inventorize_permissions() @@ -374,14 +378,14 @@ def test_running_real_validate_groups_permissions_job_fails( ) with pytest.raises(Unknown): - workflows_install.run_workflow("validate-groups-permissions") + deployed_workflow.run_workflow("validate-groups-permissions") @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation, make_ucx_group): ws_group_a, _ = make_ucx_group() - install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + install, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) cfg = install.config group_manager = GroupManager( sql_backend, ws, cfg.inventory_database, cfg.include_group_names, cfg.renamed_group_prefix @@ -390,7 +394,7 @@ def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation group_manager.rename_groups() group_manager.reflect_account_groups_on_workspace() - workflows_install.run_workflow("remove-workspace-local-backup-groups") + deployed_workflow.run_workflow("remove-workspace-local-backup-groups") with pytest.raises(NotFound): ws.groups.get(ws_group_a.id) @@ -398,15 +402,15 @@ def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=3)) def test_repair_run_workflow_job(ws, mocker, new_installation, sql_backend): - install, workflows_install = new_installation() + install, deployed_workflow = new_installation() mocker.patch("webbrowser.open") sql_backend.execute(f"DROP SCHEMA {install.config.inventory_database} CASCADE") with pytest.raises(NotFound): - workflows_install.run_workflow("099-destroy-schema") + deployed_workflow.run_workflow("099-destroy-schema") sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS {install.config.inventory_database}") - workflows_install.repair_run("099-destroy-schema") + deployed_workflow.repair_run("099-destroy-schema") installation = Installation(ws, product=os.path.basename(install.folder), install_folder=install.folder) state = InstallState.from_installation(installation) @@ -594,7 +598,7 @@ def test_table_migration_job( dst_catalog = make_catalog() make_schema(catalog_name=dst_catalog.name, name=schema.name) - _, workflows_install = new_installation( + _, deployed_workflow = new_installation( lambda wc: replace(wc, override_clusters=None), product_info=ProductInfo.from_class(WorkspaceConfig), extend_prompts={ @@ -634,6 +638,39 @@ def test_table_migration_job( [Mount(f'/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a', 'abfss://things@labsazurethings.dfs.core.windows.net/a')], Mount, ) + sql_backend.save_table( + f"{installation.load(WorkspaceConfig).inventory_database}.tables", + [ + Table("hive_metastore", schema.name, src_managed_table.name, "MANAGED", "DELTA", location="dbfs:/test"), + Table("hive_metastore", schema.name, src_external_table.name, "EXTERNAL", "CSV"), + ], + Table, + ) + # inject dummy group and table acl to avoid crawling which will slow down the test + sql_backend.save_table( + f"{installation.load(WorkspaceConfig).inventory_database}.groups", + [ + MigratedGroup( + "group_id", + "test_group_ws", + "test_group_ac", + "tmp", + ) + ], + MigratedGroup, + ) + sql_backend.save_table( + f"{installation.load(WorkspaceConfig).inventory_database}.grants", + [ + Grant( + "test_user", + "SELECT", + database="test_database", + table="test_table", + ) + ], + Grant, + ) installation.save( [ StoragePermissionMapping( @@ -648,9 +685,9 @@ def test_table_migration_job( filename='azure_storage_account_info.csv', ) - workflows_install.run_workflow("migrate-tables") + deployed_workflow.run_workflow("migrate-tables") # assert the workflow is successful - assert workflows_install.validate_step("migrate-tables") + assert deployed_workflow.validate_step("migrate-tables") # assert the tables are migrated try: assert ws.tables.get(f"{dst_catalog.name}.{schema.name}.{src_managed_table.name}").name @@ -680,18 +717,18 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local sql_backend, ): # create external and managed tables to be migrated - src_schema = make_schema(catalog_name="hive_metastore", name=f"migrate_{make_random(5).lower()}") - src_managed_table = make_table(schema_name=src_schema.name) + schema = make_schema(catalog_name="hive_metastore", name=f"migrate_{make_random(5).lower()}") + src_managed_table = make_table(schema_name=schema.name) existing_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c' new_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/{make_random(4)}' make_dbfs_data_copy(src_path=existing_mounted_location, dst_path=new_mounted_location) - src_external_table = make_table(schema_name=src_schema.name, external_csv=new_mounted_location) + src_external_table = make_table(schema_name=schema.name, external_csv=new_mounted_location) # create destination catalog and schema dst_catalog = make_catalog() - dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name) + make_schema(catalog_name=dst_catalog.name, name=schema.name) product_info = ProductInfo.from_class(WorkspaceConfig) - _, workflows_install = new_installation( + _, deployed_workflow = new_installation( product_info=product_info, inventory_schema_name=f"ucx_S{make_random(4)}_migrate_inventory", extend_prompts={ @@ -703,16 +740,16 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local Rule( "ws_name", dst_catalog.name, - src_schema.name, - dst_schema.name, + schema.name, + schema.name, src_managed_table.name, src_managed_table.name, ), Rule( "ws_name", dst_catalog.name, - src_schema.name, - dst_schema.name, + schema.name, + schema.name, src_external_table.name, src_external_table.name, ), @@ -723,6 +760,39 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local [Mount(f'/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a', 'abfss://things@labsazurethings.dfs.core.windows.net/a')], Mount, ) + sql_backend.save_table( + f"{installation.load(WorkspaceConfig).inventory_database}.tables", + [ + Table("hive_metastore", schema.name, src_managed_table.name, "MANAGED", "DELTA", location="dbfs:/test"), + Table("hive_metastore", schema.name, src_external_table.name, "EXTERNAL", "CSV"), + ], + Table, + ) + # inject dummy group and table acl to avoid crawling which will slow down the test + sql_backend.save_table( + f"{installation.load(WorkspaceConfig).inventory_database}.groups", + [ + MigratedGroup( + "group_id", + "test_group_ws", + "test_group_ac", + "tmp", + ) + ], + MigratedGroup, + ) + sql_backend.save_table( + f"{installation.load(WorkspaceConfig).inventory_database}.grants", + [ + Grant( + "test_user", + "SELECT", + database="test_database", + table="test_table", + ) + ], + Grant, + ) installation.save( [ StoragePermissionMapping( @@ -736,17 +806,17 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local ], filename='azure_storage_account_info.csv', ) - workflows_install.run_workflow("migrate-tables") + deployed_workflow.run_workflow("migrate-tables") # assert the workflow is successful - assert workflows_install.validate_step("migrate-tables") + assert deployed_workflow.validate_step("migrate-tables") # assert the tables are migrated try: - assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_managed_table.name}").name - assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}").name + assert ws.tables.get(f"{dst_catalog.name}.{schema.name}.{src_managed_table.name}").name + assert ws.tables.get(f"{dst_catalog.name}.{schema.name}.{src_external_table.name}").name except NotFound: assert ( False - ), f"{src_managed_table.name} and {src_external_table.name} not found in {dst_catalog.name}.{dst_schema.name}" + ), f"{src_managed_table.name} and {src_external_table.name} not found in {dst_catalog.name}.{schema.name}" # assert the cluster is configured correctly install_state = installation.load(RawState) job_id = install_state.resources["jobs"]["migrate-tables"]