Skip to content

Commit

Permalink
Addressed Integration Test
Browse files Browse the repository at this point in the history
  • Loading branch information
FastLee committed Mar 20, 2024
1 parent fc444c5 commit 88553e5
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 48 deletions.
21 changes: 1 addition & 20 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,27 +483,8 @@ def _remove_warehouse(self):
except InvalidParameterValue:
logger.error("Error accessing warehouse details")

def validate_step(self, step: str) -> bool:
job_id = int(self._workflows_installer.state.jobs[step])
logger.debug(f"Validating {step} workflow: {self._ws.config.host}#job/{job_id}")
current_runs = list(self._ws.jobs.list_runs(completed_only=False, job_id=job_id))
for run in current_runs:
if run.state and run.state.result_state == RunResultState.SUCCESS:
return True
for run in current_runs:
if (
run.run_id
and run.state
and run.state.life_cycle_state in (RunLifeCycleState.RUNNING, RunLifeCycleState.PENDING)
):
logger.info("Identified a run in progress waiting for run completion")
self._ws.jobs.wait_get_run_job_terminated_or_skipped(run_id=run.run_id)
run_new_state = self._ws.jobs.get_run(run_id=run.run_id).state
return run_new_state is not None and run_new_state.result_state == RunResultState.SUCCESS
return False

def validate_and_run(self, step: str):
if not self.validate_step(step):
if not self._workflows_installer.validate_step(step):
self._workflows_installer.run_workflow(step)

def _trigger_workflow(self, step: str):
Expand Down
20 changes: 20 additions & 0 deletions src/databricks/labs/ucx/installer/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
)
from databricks.sdk.retries import retried
from databricks.sdk.service import compute, jobs
from databricks.sdk.service.jobs import RunResultState, RunLifeCycleState

import databricks
from databricks.labs.ucx.config import WorkspaceConfig
Expand Down Expand Up @@ -217,6 +218,25 @@ def latest_job_status(self) -> list[dict]:
)
return latest_status

def validate_step(self, step: str) -> bool:
job_id = int(self.state.jobs[step])
logger.debug(f"Validating {step} workflow: {self._ws.config.host}#job/{job_id}")
current_runs = list(self._ws.jobs.list_runs(completed_only=False, job_id=job_id))
for run in current_runs:
if run.state and run.state.result_state == RunResultState.SUCCESS:
return True
for run in current_runs:
if (
run.run_id
and run.state
and run.state.life_cycle_state in (RunLifeCycleState.RUNNING, RunLifeCycleState.PENDING)
):
logger.info("Identified a run in progress waiting for run completion")
self._ws.jobs.wait_get_run_job_terminated_or_skipped(run_id=run.run_id)
run_new_state = self._ws.jobs.get_run(run_id=run.run_id).state
return run_new_state is not None and run_new_state.result_state == RunResultState.SUCCESS
return False

@property
def _config_file(self):
return f"{self._installation.install_folder()}/config.yml"
Expand Down
57 changes: 29 additions & 28 deletions tests/integration/test_installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def test_running_real_assessment_job(
group_name=ws_group_a.display_name,
)

install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
_, install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
install.run_workflow("assessment")

generic_permissions = GenericPermissionsSupport(ws, [])
Expand Down Expand Up @@ -222,12 +222,12 @@ def test_running_real_migrate_groups_job(
],
)

install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
inventory_database = install.config.inventory_database
permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions])
permission_manager.inventorize_permissions()

install.run_workflow("migrate-groups")
workflows_install.run_workflow("migrate-groups")

found = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
assert found[acc_group_a.display_name] == PermissionLevel.CAN_USE
Expand All @@ -252,12 +252,13 @@ def test_running_real_validate_groups_permissions_job(
[redash.Listing(ws.queries.list, sql.ObjectTypePlural.QUERIES)],
)

install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
permission_manager = PermissionManager(sql_backend, install.config.inventory_database, [redash_permissions])
install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
permission_manager = PermissionManager(sql_backend, install.config.inventory_database,
[redash_permissions])
permission_manager.inventorize_permissions()

# assert the job does not throw any exception
install.run_workflow("validate-groups-permissions")
workflows_install.run_workflow("validate-groups-permissions")


@retried(on=[NotFound], timeout=timedelta(minutes=5))
Expand All @@ -280,7 +281,7 @@ def test_running_real_validate_groups_permissions_job_fails(
],
)

install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
inventory_database = install.config.inventory_database
permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions])
permission_manager.inventorize_permissions()
Expand All @@ -291,14 +292,14 @@ def test_running_real_validate_groups_permissions_job_fails(
)

with pytest.raises(ValueError):
install.run_workflow("validate-groups-permissions")
workflows_install.run_workflow("validate-groups-permissions")


@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5))
def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation, make_ucx_group):
ws_group_a, _ = make_ucx_group()

install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
cfg = install.config
group_manager = GroupManager(
sql_backend, ws, cfg.inventory_database, cfg.include_group_names, cfg.renamed_group_prefix
Expand All @@ -307,23 +308,23 @@ def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation
group_manager.rename_groups()
group_manager.reflect_account_groups_on_workspace()

install.run_workflow("remove-workspace-local-backup-groups")
workflows_install.run_workflow("remove-workspace-local-backup-groups")

with pytest.raises(NotFound):
ws.groups.get(ws_group_a.id)


@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=10))
def test_repair_run_workflow_job(ws, mocker, new_installation, sql_backend):
install = new_installation()
install, workflows_install = new_installation()
mocker.patch("webbrowser.open")
sql_backend.execute(f"DROP SCHEMA {install.config.inventory_database} CASCADE")
with pytest.raises(NotFound):
install.run_workflow("099-destroy-schema")
workflows_install.run_workflow("099-destroy-schema")

sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS {install.config.inventory_database}")

install.repair_run("099-destroy-schema")
workflows_install.repair_run("099-destroy-schema")

installation = Installation(ws, product=os.path.basename(install.folder), install_folder=install.folder)
state = InstallState.from_installation(installation)
Expand All @@ -337,7 +338,7 @@ def test_repair_run_workflow_job(ws, mocker, new_installation, sql_backend):

@retried(on=[NotFound], timeout=timedelta(minutes=5))
def test_uninstallation(ws, sql_backend, new_installation):
install = new_installation()
install, _ = new_installation()
installation = Installation(ws, product=os.path.basename(install.folder), install_folder=install.folder)
state = InstallState.from_installation(installation)
assessment_job_id = state.jobs["assessment"]
Expand All @@ -352,7 +353,7 @@ def test_uninstallation(ws, sql_backend, new_installation):

def test_fresh_global_installation(ws, new_installation):
product_info = ProductInfo.for_testing(WorkspaceConfig)
global_installation = new_installation(
global_installation, _ = new_installation(
product_info=product_info,
installation=Installation.assume_global(ws, product_info.product_name()),
)
Expand All @@ -362,7 +363,7 @@ def test_fresh_global_installation(ws, new_installation):

def test_fresh_user_installation(ws, new_installation):
product_info = ProductInfo.for_testing(WorkspaceConfig)
user_installation = new_installation(
user_installation, _ = new_installation(
product_info=product_info,
installation=Installation.assume_user_home(ws, product_info.product_name()),
)
Expand All @@ -372,12 +373,12 @@ def test_fresh_user_installation(ws, new_installation):

def test_global_installation_on_existing_global_install(ws, new_installation):
product_info = ProductInfo.for_testing(WorkspaceConfig)
existing_global_installation = new_installation(
existing_global_installation, _ = new_installation(
product_info=product_info,
installation=Installation.assume_global(ws, product_info.product_name()),
)
assert existing_global_installation.folder == f"/Applications/{product_info.product_name()}"
reinstall_global = new_installation(
reinstall_global,_ = new_installation(
product_info=product_info,
installation=Installation.assume_global(ws, product_info.product_name()),
)
Expand All @@ -388,7 +389,7 @@ def test_global_installation_on_existing_global_install(ws, new_installation):
def test_user_installation_on_existing_global_install(ws, new_installation):
# existing install at global level
product_info = ProductInfo.for_testing(WorkspaceConfig)
existing_global_installation = new_installation(
existing_global_installation, _ = new_installation(
product_info=product_info,
installation=Installation.assume_global(ws, product_info.product_name()),
)
Expand All @@ -406,7 +407,7 @@ def test_user_installation_on_existing_global_install(ws, new_installation):
assert err.value.args[0] == "UCX is already installed, but no confirmation"

# successful override with confirmation
reinstall_user_force = new_installation(
reinstall_user_force, _ = new_installation(
product_info=product_info,
installation=Installation.assume_global(ws, product_info.product_name()),
environ={'UCX_FORCE_INSTALL': 'user'},
Expand All @@ -423,7 +424,7 @@ def test_user_installation_on_existing_global_install(ws, new_installation):
def test_global_installation_on_existing_user_install(ws, new_installation):
# existing installation at user level
product_info = ProductInfo.for_testing(WorkspaceConfig)
existing_user_installation = new_installation(
existing_user_installation, _ = new_installation(
product_info=product_info, installation=Installation.assume_user_home(ws, product_info.product_name())
)
assert (
Expand Down Expand Up @@ -458,7 +459,7 @@ def test_global_installation_on_existing_user_install(ws, new_installation):

def test_check_inventory_database_exists(ws, new_installation):
product_info = ProductInfo.for_testing(WorkspaceConfig)
install = new_installation(
install, _ = new_installation(
product_info=product_info,
installation=Installation.assume_global(ws, product_info.product_name()),
)
Expand Down Expand Up @@ -493,7 +494,7 @@ def test_table_migration_job( # pylint: disable=too-many-locals
dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name)

product_info = ProductInfo.from_class(WorkspaceConfig)
install = new_installation(
install, workflows_install = new_installation(
product_info=product_info,
extend_prompts={
r"Parallelism for migrating.*": "1000",
Expand Down Expand Up @@ -523,9 +524,9 @@ def test_table_migration_job( # pylint: disable=too-many-locals
]
installation.save(migrate_rules, filename='mapping.csv')

install.run_workflow("migrate-tables")
workflows_install.run_workflow("migrate-tables")
# assert the workflow is successful
assert install.validate_step("migrate-tables")
assert workflows_install.validate_step("migrate-tables")
# assert the tables are migrated
assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_managed_table.name}").name
assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}").name
Expand Down Expand Up @@ -555,7 +556,7 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local
dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name)

product_info = ProductInfo.from_class(WorkspaceConfig)
install = new_installation(
install, workflows_install = new_installation(
lambda wc: replace(wc, override_clusters={"table_migration": env_or_skip("TEST_USER_ISOLATION_CLUSTER_ID")}),
product_info=product_info,
)
Expand All @@ -581,9 +582,9 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local
]
installation.save(migrate_rules, filename='mapping.csv')

install.run_workflow("migrate-tables")
workflows_install.run_workflow("migrate-tables")
# assert the workflow is successful
assert install.validate_step("migrate-tables")
assert workflows_install.validate_step("migrate-tables")
# assert the tables are migrated
assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_managed_table.name}").name
assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}").name
Expand Down

0 comments on commit 88553e5

Please sign in to comment.