Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix integration test with new DeployedWorkflows #1250

Merged
merged 4 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ def test_migrate_managed_tables_with_principal_acl_azure(
permission_level=PermissionLevel.CAN_ATTACH_TO,
user_name=user.user_name,
)
table_migrate.migrate_tables(what=What.DBFS_ROOT_DELTA, acl_strategy=[AclMigrationWhat.PRINCIPAL])
table_migrate.migrate_tables(what=What.EXTERNAL_SYNC, acl_strategy=[AclMigrationWhat.PRINCIPAL])

target_table_grants = ws.grants.get(SecurableType.TABLE, table_full_name)
match = False
Expand Down
146 changes: 108 additions & 38 deletions tests/integration/test_installation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.hive_metastore.locations import Mount
from databricks.labs.ucx.hive_metastore.mapping import Rule
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.install import WorkspaceInstallation, WorkspaceInstaller
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment
from databricks.labs.ucx.installer.workflows import (
DeployedWorkflows,
WorkflowsDeployment,
)
from databricks.labs.ucx.workspace_access import redash
from databricks.labs.ucx.workspace_access.generic import (
GenericPermissionsSupport,
Expand Down Expand Up @@ -134,7 +138,7 @@ def factory(
)
workspace_installation.run()
cleanup.append(workspace_installation)
return workspace_installation, workflows_installation
return workspace_installation, DeployedWorkflows(ws, install_state, timedelta(minutes=3))

yield factory

Expand Down Expand Up @@ -166,7 +170,7 @@ def test_experimental_permissions_migration_for_group_with_same_name( # pylint:
sql_backend.execute(f"ALTER SCHEMA {schema_a.name} OWNER TO `{migrated_group.name_in_workspace}`")
sql_backend.execute(f"GRANT SELECT ON TABLE {table_a.full_name} TO `{migrated_group.name_in_workspace}`")

workspace_installation, workflow_installation = new_installation(
workspace_installation, deployed_workflow = new_installation(
config_transform=lambda wc: replace(wc, include_group_names=[migrated_group.name_in_workspace])
)
workspace_installation.run()
Expand Down Expand Up @@ -197,7 +201,7 @@ def test_experimental_permissions_migration_for_group_with_same_name( # pylint:
permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions, tacl_support])
permission_manager.inventorize_permissions()

workflow_installation.run_workflow("migrate-groups-experimental")
deployed_workflow.run_workflow("migrate-groups-experimental")

object_permissions = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
new_schema_grants = grants_crawler.for_schema_info(schema_a)
Expand All @@ -208,12 +212,12 @@ def test_experimental_permissions_migration_for_group_with_same_name( # pylint:

@retried(on=[NotFound, TimeoutError], timeout=timedelta(minutes=5))
def test_job_failure_propagates_correct_error_message_and_logs(ws, sql_backend, new_installation):
workspace_installation, workflow_installation = new_installation()
workspace_installation, deployed_workflow = new_installation()

sql_backend.execute(f"DROP SCHEMA {workspace_installation.config.inventory_database} CASCADE")

with pytest.raises(NotFound) as failure:
workflow_installation.run_workflow("099-destroy-schema")
deployed_workflow.run_workflow("099-destroy-schema")

assert "cannot be found" in str(failure.value)

Expand Down Expand Up @@ -254,10 +258,10 @@ def test_new_job_cluster_with_policy_assessment(
permission_level=PermissionLevel.CAN_USE,
group_name=ws_group_a.display_name,
)
install = new_installation(
_, deployed_workflow = new_installation(
lambda wc: replace(wc, override_clusters=None, include_group_names=[ws_group_a.display_name])
)
install.run_workflow("assessment")
deployed_workflow.run_workflow("assessment")
generic_permissions = GenericPermissionsSupport(ws, [])
before = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
assert before[ws_group_a.display_name] == PermissionLevel.CAN_USE
Expand All @@ -276,8 +280,8 @@ def test_running_real_assessment_job(
group_name=ws_group_a.display_name,
)

_, install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
install.run_workflow("assessment")
_, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
deployed_workflow.run_workflow("assessment")

generic_permissions = GenericPermissionsSupport(ws, [])
before = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
Expand Down Expand Up @@ -305,12 +309,12 @@ def test_running_real_migrate_groups_job(
],
)

install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
install, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
inventory_database = install.config.inventory_database
permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions])
permission_manager.inventorize_permissions()

workflows_install.run_workflow("migrate-groups")
deployed_workflow.run_workflow("migrate-groups")

found = generic_permissions.load_as_dict("cluster-policies", cluster_policy.policy_id)
assert found[acc_group_a.display_name] == PermissionLevel.CAN_USE
Expand All @@ -335,12 +339,12 @@ def test_running_real_validate_groups_permissions_job(
[redash.Listing(ws.queries.list, sql.ObjectTypePlural.QUERIES)],
)

install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
install, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
permission_manager = PermissionManager(sql_backend, install.config.inventory_database, [redash_permissions])
permission_manager.inventorize_permissions()

# assert the job does not throw any exception
workflows_install.run_workflow("validate-groups-permissions")
deployed_workflow.run_workflow("validate-groups-permissions")


@retried(on=[NotFound], timeout=timedelta(minutes=5))
Expand All @@ -363,7 +367,7 @@ def test_running_real_validate_groups_permissions_job_fails(
],
)

install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
install, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
inventory_database = install.config.inventory_database
permission_manager = PermissionManager(sql_backend, inventory_database, [generic_permissions])
permission_manager.inventorize_permissions()
Expand All @@ -374,14 +378,14 @@ def test_running_real_validate_groups_permissions_job_fails(
)

with pytest.raises(Unknown):
workflows_install.run_workflow("validate-groups-permissions")
deployed_workflow.run_workflow("validate-groups-permissions")


@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5))
def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation, make_ucx_group):
ws_group_a, _ = make_ucx_group()

install, workflows_install = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
install, deployed_workflow = new_installation(lambda wc: replace(wc, include_group_names=[ws_group_a.display_name]))
cfg = install.config
group_manager = GroupManager(
sql_backend, ws, cfg.inventory_database, cfg.include_group_names, cfg.renamed_group_prefix
Expand All @@ -390,23 +394,23 @@ def test_running_real_remove_backup_groups_job(ws, sql_backend, new_installation
group_manager.rename_groups()
group_manager.reflect_account_groups_on_workspace()

workflows_install.run_workflow("remove-workspace-local-backup-groups")
deployed_workflow.run_workflow("remove-workspace-local-backup-groups")

with pytest.raises(NotFound):
ws.groups.get(ws_group_a.id)


@retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=3))
def test_repair_run_workflow_job(ws, mocker, new_installation, sql_backend):
install, workflows_install = new_installation()
install, deployed_workflow = new_installation()
mocker.patch("webbrowser.open")
sql_backend.execute(f"DROP SCHEMA {install.config.inventory_database} CASCADE")
with pytest.raises(NotFound):
workflows_install.run_workflow("099-destroy-schema")
deployed_workflow.run_workflow("099-destroy-schema")

sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS {install.config.inventory_database}")

workflows_install.repair_run("099-destroy-schema")
deployed_workflow.repair_run("099-destroy-schema")

installation = Installation(ws, product=os.path.basename(install.folder), install_folder=install.folder)
state = InstallState.from_installation(installation)
Expand Down Expand Up @@ -594,7 +598,7 @@ def test_table_migration_job(
dst_catalog = make_catalog()
make_schema(catalog_name=dst_catalog.name, name=schema.name)

_, workflows_install = new_installation(
_, deployed_workflow = new_installation(
lambda wc: replace(wc, override_clusters=None),
product_info=ProductInfo.from_class(WorkspaceConfig),
extend_prompts={
Expand Down Expand Up @@ -634,6 +638,39 @@ def test_table_migration_job(
[Mount(f'/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a', 'abfss://[email protected]/a')],
Mount,
)
sql_backend.save_table(
f"{installation.load(WorkspaceConfig).inventory_database}.tables",
[
Table("hive_metastore", schema.name, src_managed_table.name, "MANAGED", "DELTA", location="dbfs:/test"),
Table("hive_metastore", schema.name, src_external_table.name, "EXTERNAL", "CSV"),
],
Table,
)
# inject dummy group and table acl to avoid crawling which will slow down the test
sql_backend.save_table(
f"{installation.load(WorkspaceConfig).inventory_database}.groups",
[
MigratedGroup(
"group_id",
"test_group_ws",
"test_group_ac",
"tmp",
)
],
MigratedGroup,
)
sql_backend.save_table(
f"{installation.load(WorkspaceConfig).inventory_database}.grants",
[
Grant(
"test_user",
"SELECT",
database="test_database",
table="test_table",
)
],
Grant,
)
installation.save(
[
StoragePermissionMapping(
Expand All @@ -648,9 +685,9 @@ def test_table_migration_job(
filename='azure_storage_account_info.csv',
)

workflows_install.run_workflow("migrate-tables")
deployed_workflow.run_workflow("migrate-tables")
# assert the workflow is successful
assert workflows_install.validate_step("migrate-tables")
assert deployed_workflow.validate_step("migrate-tables")
# assert the tables are migrated
try:
assert ws.tables.get(f"{dst_catalog.name}.{schema.name}.{src_managed_table.name}").name
Expand Down Expand Up @@ -680,18 +717,18 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local
sql_backend,
):
# create external and managed tables to be migrated
src_schema = make_schema(catalog_name="hive_metastore", name=f"migrate_{make_random(5).lower()}")
src_managed_table = make_table(schema_name=src_schema.name)
schema = make_schema(catalog_name="hive_metastore", name=f"migrate_{make_random(5).lower()}")
src_managed_table = make_table(schema_name=schema.name)
existing_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c'
new_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/{make_random(4)}'
make_dbfs_data_copy(src_path=existing_mounted_location, dst_path=new_mounted_location)
src_external_table = make_table(schema_name=src_schema.name, external_csv=new_mounted_location)
src_external_table = make_table(schema_name=schema.name, external_csv=new_mounted_location)
# create destination catalog and schema
dst_catalog = make_catalog()
dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name)
make_schema(catalog_name=dst_catalog.name, name=schema.name)

product_info = ProductInfo.from_class(WorkspaceConfig)
_, workflows_install = new_installation(
_, deployed_workflow = new_installation(
product_info=product_info,
inventory_schema_name=f"ucx_S{make_random(4)}_migrate_inventory",
extend_prompts={
Expand All @@ -703,16 +740,16 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local
Rule(
"ws_name",
dst_catalog.name,
src_schema.name,
dst_schema.name,
schema.name,
schema.name,
src_managed_table.name,
src_managed_table.name,
),
Rule(
"ws_name",
dst_catalog.name,
src_schema.name,
dst_schema.name,
schema.name,
schema.name,
src_external_table.name,
src_external_table.name,
),
Expand All @@ -723,6 +760,39 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local
[Mount(f'/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a', 'abfss://[email protected]/a')],
Mount,
)
sql_backend.save_table(
f"{installation.load(WorkspaceConfig).inventory_database}.tables",
[
Table("hive_metastore", schema.name, src_managed_table.name, "MANAGED", "DELTA", location="dbfs:/test"),
Table("hive_metastore", schema.name, src_external_table.name, "EXTERNAL", "CSV"),
],
Table,
)
# inject dummy group and table acl to avoid crawling which will slow down the test
sql_backend.save_table(
f"{installation.load(WorkspaceConfig).inventory_database}.groups",
[
MigratedGroup(
"group_id",
"test_group_ws",
"test_group_ac",
"tmp",
)
],
MigratedGroup,
)
sql_backend.save_table(
f"{installation.load(WorkspaceConfig).inventory_database}.grants",
[
Grant(
"test_user",
"SELECT",
database="test_database",
table="test_table",
)
],
Grant,
)
installation.save(
[
StoragePermissionMapping(
Expand All @@ -736,17 +806,17 @@ def test_table_migration_job_cluster_override( # pylint: disable=too-many-local
],
filename='azure_storage_account_info.csv',
)
workflows_install.run_workflow("migrate-tables")
deployed_workflow.run_workflow("migrate-tables")
# assert the workflow is successful
assert workflows_install.validate_step("migrate-tables")
assert deployed_workflow.validate_step("migrate-tables")
# assert the tables are migrated
try:
assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_managed_table.name}").name
assert ws.tables.get(f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}").name
assert ws.tables.get(f"{dst_catalog.name}.{schema.name}.{src_managed_table.name}").name
assert ws.tables.get(f"{dst_catalog.name}.{schema.name}.{src_external_table.name}").name
except NotFound:
assert (
False
), f"{src_managed_table.name} and {src_external_table.name} not found in {dst_catalog.name}.{dst_schema.name}"
), f"{src_managed_table.name} and {src_external_table.name} not found in {dst_catalog.name}.{schema.name}"
# assert the cluster is configured correctly
install_state = installation.load(RawState)
job_id = install_state.resources["jobs"]["migrate-tables"]
Expand Down
Loading