diff --git a/src/databricks/labs/ucx/install.py b/src/databricks/labs/ucx/install.py index 4c67067b96..fc41e14362 100644 --- a/src/databricks/labs/ucx/install.py +++ b/src/databricks/labs/ucx/install.py @@ -483,27 +483,8 @@ def _remove_warehouse(self): except InvalidParameterValue: logger.error("Error accessing warehouse details") - def validate_step(self, step: str) -> bool: - job_id = int(self._workflows_installer.state.jobs[step]) - logger.debug(f"Validating {step} workflow: {self._ws.config.host}#job/{job_id}") - current_runs = list(self._ws.jobs.list_runs(completed_only=False, job_id=job_id)) - for run in current_runs: - if run.state and run.state.result_state == RunResultState.SUCCESS: - return True - for run in current_runs: - if ( - run.run_id - and run.state - and run.state.life_cycle_state in (RunLifeCycleState.RUNNING, RunLifeCycleState.PENDING) - ): - logger.info("Identified a run in progress waiting for run completion") - self._ws.jobs.wait_get_run_job_terminated_or_skipped(run_id=run.run_id) - run_new_state = self._ws.jobs.get_run(run_id=run.run_id).state - return run_new_state is not None and run_new_state.result_state == RunResultState.SUCCESS - return False - def validate_and_run(self, step: str): - if not self.validate_step(step): + if not self._workflows_installer.validate_step(step): self._workflows_installer.run_workflow(step) def _trigger_workflow(self, step: str): diff --git a/src/databricks/labs/ucx/installer/workflows.py b/src/databricks/labs/ucx/installer/workflows.py index d56bb0113b..14de8b292d 100644 --- a/src/databricks/labs/ucx/installer/workflows.py +++ b/src/databricks/labs/ucx/installer/workflows.py @@ -37,6 +37,7 @@ ) from databricks.sdk.retries import retried from databricks.sdk.service import compute, jobs +from databricks.sdk.service.jobs import RunResultState, RunLifeCycleState import databricks from databricks.labs.ucx.config import WorkspaceConfig @@ -217,6 +218,25 @@ def latest_job_status(self) -> list[dict]: ) return latest_status + def validate_step(self, step: str) -> bool: + job_id = int(self.state.jobs[step]) + logger.debug(f"Validating {step} workflow: {self._ws.config.host}#job/{job_id}") + current_runs = list(self._ws.jobs.list_runs(completed_only=False, job_id=job_id)) + for run in current_runs: + if run.state and run.state.result_state == RunResultState.SUCCESS: + return True + for run in current_runs: + if ( + run.run_id + and run.state + and run.state.life_cycle_state in (RunLifeCycleState.RUNNING, RunLifeCycleState.PENDING) + ): + logger.info("Identified a run in progress waiting for run completion") + self._ws.jobs.wait_get_run_job_terminated_or_skipped(run_id=run.run_id) + run_new_state = self._ws.jobs.get_run(run_id=run.run_id).state + return run_new_state is not None and run_new_state.result_state == RunResultState.SUCCESS + return False + @property def _config_file(self): return f"{self._installation.install_folder()}/config.yml" diff --git a/tests/integration/test_installation.py b/tests/integration/test_installation.py index c43e3690a5..6bde90f99b 100644 --- a/tests/integration/test_installation.py +++ b/tests/integration/test_installation.py @@ -193,7 +193,7 @@ def test_running_real_assessment_job( group_name=ws_group_a.display_name, ) - install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + _, install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) install.run_workflow("assessment") generic_permissions = GenericPermissionsSupport(ws, []) @@ -222,12 +222,12 @@ def test_running_real_migrate_groups_job( ], ) - install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) inventory_database = install.config.inventory_database permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions]) permission_manager.inventorize_permissions() - install.run_workflow("migrate-groups") + workflows_install.run_workflow("migrate-groups") found = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id) assert found[acc_group_a.display_name] == PermissionLevel.CAN_USE @@ -252,12 +252,13 @@ def test_running_real_validate_groups_permissions_job( [redash.Listing(ws.queries.list, sql.ObjectTypePlural.QUERIES)], ) - install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) - permission_manager = PermissionManager(sql_backend, install.config.inventory_database, [redash_permissions]) + install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + permission_manager = PermissionManager(sql_backend, install.config.inventory_database, + [redash_permissions]) permission_manager.inventorize_permissions() # assert the job does not throw any exception - install.run_workflow("validate-groups-permissions") + workflows_install.run_workflow("validate-groups-permissions") @retried(on=[NotFound], timeout=timedelta(minutes=5)) @@ -280,7 +281,7 @@ def test_running_real_validate_groups_permissions_job_fails( ], ) - install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) inventory_database = install.config.inventory_database permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions]) permission_manager.inventorize_permissions() @@ -291,14 +292,14 @@ def test_running_real_validate_groups_permissions_job_fails( ) with pytest.raises(ValueError): - install.run_workflow("validate-groups-permissions") + workflows_install.run_workflow("validate-groups-permissions") @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation, make_ucx_group): ws_group_a, _ = make_ucx_group() - install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) + install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name])) cfg = install.config group_manager = GroupManager( sql_backend, ws, cfg.inventory_database, cfg.include_group_names, cfg.renamed_group_prefix @@ -307,7 +308,7 @@ def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation group_manager.rename_groups() group_manager.reflect_account_groups_on_workspace() - install.run_workflow("remove-workspace-local-backup-groups") + workflows_install.run_workflow("remove-workspace-local-backup-groups") with pytest.raises(NotFound): ws.groups.get(ws_group_a.id) @@ -315,15 +316,15 @@ def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=10)) def test_repair_run_workflow_job(ws, mocker, new_installation, sql_backend): - install = new_installation() + install, workflows_install = new_installation() mocker.patch("webbrowser.open") sql_backend.execute(f"DROP SCHEMA {install.config.inventory_database} CASCADE") with pytest.raises(NotFound): - install.run_workflow("099-destroy-schema") + workflows_install.run_workflow("099-destroy-schema") sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS {install.config.inventory_database}") - install.repair_run("099-destroy-schema") + workflows_install.repair_run("099-destroy-schema") installation = Installation(ws, product=os.path.basename(install.folder), install_folder=install.folder) state = InstallState.from_installation(installation) @@ -337,7 +338,7 @@ def test_repair_run_workflow_job(ws, mocker, new_installation, sql_backend): @retried(on=[NotFound], timeout=timedelta(minutes=5)) def test_uninstallation(ws, sql_backend, new_installation): - install = new_installation() + install, _ = new_installation() installation = Installation(ws, product=os.path.basename(install.folder), install_folder=install.folder) state = InstallState.from_installation(installation) assessment_job_id = state.jobs["assessment"] @@ -352,7 +353,7 @@ def test_uninstallation(ws, sql_backend, new_installation): def test_fresh_global_installation(ws, new_installation): product_info = ProductInfo.for_testing(WorkspaceConfig) - global_installation = new_installation( + global_installation, _ = new_installation( product_info=product_info, installation=Installation.assume_global(ws, product_info.product_name()), ) @@ -362,7 +363,7 @@ def test_fresh_global_installation(ws, new_installation): def test_fresh_user_installation(ws, new_installation): product_info = ProductInfo.for_testing(WorkspaceConfig) - user_installation = new_installation( + user_installation, _ = new_installation( product_info=product_info, installation=Installation.assume_user_home(ws, product_info.product_name()), ) @@ -372,12 +373,12 @@ def test_fresh_user_installation(ws, new_installation): def test_global_installation_on_existing_global_install(ws, new_installation): product_info = ProductInfo.for_testing(WorkspaceConfig) - existing_global_installation = new_installation( + existing_global_installation, _ = new_installation( product_info=product_info, installation=Installation.assume_global(ws, product_info.product_name()), ) assert existing_global_installation.folder == f"/Applications/{product_info.product_name()}" - reinstall_global = new_installation( + reinstall_global,_ = new_installation( product_info=product_info, installation=Installation.assume_global(ws, product_info.product_name()), ) @@ -388,7 +389,7 @@ def test_global_installation_on_existing_global_install(ws, new_installation): def test_user_installation_on_existing_global_install(ws, new_installation): # existing install at global level product_info = ProductInfo.for_testing(WorkspaceConfig) - existing_global_installation = new_installation( + existing_global_installation, _ = new_installation( product_info=product_info, installation=Installation.assume_global(ws, product_info.product_name()), ) @@ -406,7 +407,7 @@ def test_user_installation_on_existing_global_install(ws, new_installation): assert err.value.args[0] == "UCX is already installed, but no confirmation" # successful override with confirmation - reinstall_user_force = new_installation( + reinstall_user_force, _ = new_installation( product_info=product_info, installation=Installation.assume_global(ws, product_info.product_name()), environ={'UCX_FORCE_INSTALL': 'user'}, @@ -423,7 +424,7 @@ def test_user_installation_on_existing_global_install(ws, new_installation): def test_global_installation_on_existing_user_install(ws, new_installation): # existing installation at user level product_info = ProductInfo.for_testing(WorkspaceConfig) - existing_user_installation = new_installation( + existing_user_installation, _ = new_installation( product_info=product_info, installation=Installation.assume_user_home(ws, product_info.product_name()) ) assert ( @@ -458,7 +459,7 @@ def test_global_installation_on_existing_user_install(ws, new_installation): def test_check_inventory_database_exists(ws, new_installation): product_info = ProductInfo.for_testing(WorkspaceConfig) - install = new_installation( + install, _ = new_installation( product_info=product_info, installation=Installation.assume_global(ws, product_info.product_name()), ) @@ -493,7 +494,7 @@ def test_table_migration_job( # pylint: disable=too-many-locals dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name) product_info = ProductInfo.from_class(WorkspaceConfig) - install = new_installation( + install, workflows_install = new_installation( product_info=product_info, extend_prompts={ r"Parallelism for migrating.*": "1000", @@ -523,9 +524,9 @@ def test_table_migration_job( # pylint: disable=too-many-locals ] installation.save(migrate_rules, filename='mapping.csv') - install.run_workflow("migrate-tables") + workflows_install.run_workflow("migrate-tables") # assert the workflow is successful - assert install.validate_step("migrate-tables") + assert workflows_install.validate_step("migrate-tables") # assert the tables are migrated assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_managed_table.name}").name assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}").name @@ -555,7 +556,7 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name) product_info = ProductInfo.from_class(WorkspaceConfig) - install = new_installation( + install, workflows_install = new_installation( lambda wc: replace(wc, override_clusters={"table_migration": env_or_skip("TEST_USER_ISOLATION_CLUSTER_ID")}), product_info=product_info, ) @@ -581,9 +582,9 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local ] installation.save(migrate_rules, filename='mapping.csv') - install.run_workflow("migrate-tables") + workflows_install.run_workflow("migrate-tables") # assert the workflow is successful - assert install.validate_step("migrate-tables") + assert workflows_install.validate_step("migrate-tables") # assert the tables are migrated assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_managed_table.name}").name assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}").name