Skip to content

Commit

Permalink
fixes to int test failues
Browse files Browse the repository at this point in the history
  • Loading branch information
HariGS-DB committed Mar 31, 2024
1 parent cf65f47 commit b23d2d6
Showing 1 changed file with 74 additions and 35 deletions.
109 changes: 74 additions & 35 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,27 @@
}


def principal_acl(ws, inventory_schema, sql_backend):
installation = MockInstallation(
{
"config.yml": {
'inventory_database': inventory_schema,
},
"azure_storage_account_info.csv": [
{
'prefix': 'dummy_prefix',
'client_id': 'dummy_application_id',
'principal': 'dummy_principal',
'privilege': 'WRITE_FILES',
'type': 'Application',
'directory_id': 'dummy_directory',
}
],
}
)
return PrincipalACL.for_cli(ws, installation, sql_backend)


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_migrate_managed_tables(ws, sql_backend, inventory_schema, make_catalog, make_schema, make_table):
# pylint: disable=too-many-locals
Expand Down Expand Up @@ -72,7 +93,7 @@ def test_migrate_managed_tables(ws, sql_backend, inventory_schema, make_catalog,
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
group_manager = GroupManager(sql_backend, ws, inventory_schema)
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend)
principal_grants = principal_acl(ws, inventory_schema, sql_backend)
table_migrate = TablesMigrator(
table_crawler,
grant_crawler,
Expand Down Expand Up @@ -141,7 +162,7 @@ def test_migrate_tables_with_cache_should_not_create_table(
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
group_manager = GroupManager(sql_backend, ws, inventory_schema)
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend)
principal_grants = principal_acl(ws, inventory_schema, sql_backend)
table_migrate = TablesMigrator(
table_crawler,
grant_crawler,
Expand Down Expand Up @@ -201,7 +222,7 @@ def test_migrate_external_table( # pylint: disable=too-many-locals
]
group_manager = GroupManager(sql_backend, ws, inventory_schema)
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend)
principal_grants = principal_acl(ws, inventory_schema, sql_backend)
table_migrate = TablesMigrator(
table_crawler,
grant_crawler,
Expand Down Expand Up @@ -261,7 +282,7 @@ def test_migrate_external_table_failed_sync(
]
group_manager = GroupManager(sql_backend, ws, inventory_schema)
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend)
principal_grants = principal_acl(ws, inventory_schema, sql_backend)
table_migrate = TablesMigrator(
table_crawler,
grant_crawler,
Expand Down Expand Up @@ -317,7 +338,7 @@ def test_revert_migrated_table(
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
group_manager = GroupManager(sql_backend, ws, inventory_schema)
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend)
principal_grants = principal_acl(ws, inventory_schema, sql_backend)
table_migrate = TablesMigrator(
table_crawler,
grant_crawler,
Expand Down Expand Up @@ -437,7 +458,7 @@ def test_mapping_reverts_table(
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
group_manager = GroupManager(sql_backend, ws, inventory_schema)
principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend)
principal_grants = principal_acl(ws, inventory_schema, sql_backend)
table_migrate = TablesMigrator(
table_crawler,
grant_crawler,
Expand Down Expand Up @@ -534,7 +555,36 @@ def test_migrate_managed_tables_with_acl(
table_mapping = StaticTableMapping(ws, sql_backend, rules=rules)
group_manager = GroupManager(sql_backend, ws, inventory_schema)
migration_status_refresher = MigrationStatusRefresher(ws, sql_backend, inventory_schema, table_crawler)
principal_grants = PrincipalACL.for_cli(ws, MockInstallation(), sql_backend)
installation = MockInstallation(
{
"config.yml": {
'inventory_database': inventory_schema,
},
"azure_storage_account_info.csv": [
{
'prefix': 'dummy_prefix',
'client_id': 'dummy_application_id',
'principal': 'dummy_principal',
'privilege': 'WRITE_FILES',
'type': 'Application',
'directory_id': 'dummy_directory',
}
],
}
)
principal_grants = PrincipalACL(
ws,
sql_backend,
installation,
StaticTablesCrawler(sql_backend, inventory_schema, [src_managed_table]),
StaticMountCrawler(
[Mount('dummy_mount', 'abfss://[email protected]/a')],
sql_backend,
ws,
inventory_schema,
),
AzureACL.for_cli(ws, installation).get_eligible_locations_principals(),
)
table_migrate = TablesMigrator(
table_crawler,
grant_crawler,
Expand Down Expand Up @@ -568,19 +618,24 @@ def test_prepare_principal_acl(
make_dbfs_data_copy,
make_table,
make_catalog,
make_schema,
make_cluster,
):
existing_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c'
cluster = make_cluster(single_node=True, spark_conf=_SPARK_CONF, data_security_mode=DataSecurityMode.NONE)
new_mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/{inventory_schema}'
make_dbfs_data_copy(src_path=existing_mounted_location, dst_path=new_mounted_location)
src_external_table = make_table(external_csv=new_mounted_location)
src_schema = src_external_table.schema_name
make_dbfs_data_copy(src_path=f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c', dst_path=new_mounted_location)
src_schema = make_schema(catalog_name="hive_metastore")
src_external_table = make_table(
catalog_name=src_schema.catalog_name, schema_name=src_schema.name, external_csv=new_mounted_location
)
dst_catalog = make_catalog()
dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name)
rules = [
Rule(
"workspace",
dst_catalog.name,
src_schema,
src_schema,
src_schema.name,
dst_schema.name,
src_external_table.name,
src_external_table.name,
),
Expand Down Expand Up @@ -620,7 +675,7 @@ def test_prepare_principal_acl(
ws,
inventory_schema,
),
AzureACL.for_cli(ws, installation),
AzureACL.for_cli(ws, installation).get_eligible_locations_principals(),
)
table_migrate = TablesMigrator(
StaticTablesCrawler(sql_backend, inventory_schema, [src_external_table]),
Expand All @@ -638,45 +693,29 @@ def test_prepare_principal_acl(
),
principal_grants,
)
return table_migrate
return table_migrate, f"{dst_catalog.name}.{dst_schema.name}.{src_external_table.name}", cluster.cluster_id


@retried(on=[NotFound], timeout=timedelta(minutes=3))
def test_migrate_managed_tables_with_principal_acl_azure(
ws,
inventory_schema,
make_catalog,
make_table,
make_user,
make_dbfs_data_copy,
test_prepare_principal_acl,
make_cluster_permissions,
make_cluster,
):
if not ws.config.is_azure:
pytest.skip("temporary: only works in azure test env")
table_migrate = test_prepare_principal_acl
table_migrate, table_full_name, cluster_id = test_prepare_principal_acl
user = make_user()
cluster = make_cluster(single_node=True, spark_conf=_SPARK_CONF, data_security_mode=DataSecurityMode.NONE)
make_cluster_permissions(
object_id=cluster.cluster_id,
permission_level=PermissionLevel.CAN_USE,
object_id=cluster_id,
permission_level=PermissionLevel.CAN_ATTACH_TO,
user_name=user.user_name,
)
new_mounted_location = f'dbfs:/mnt/things/a/b/{inventory_schema}'
make_dbfs_data_copy(src_path='dbfs:/mnt/things/a/b/c', dst_path=new_mounted_location)
src_external_table = make_table(external_csv=new_mounted_location)
src_schema = src_external_table.schema_name

dst_catalog = make_catalog()

logger.info(f"dst_catalog={dst_catalog.name}, managed_table={src_external_table.full_name}")

table_migrate.migrate_tables(acl_strategy=[AclMigrationWhat.PRINCIPAL])

target_table_grants = ws.grants.get(
SecurableType.TABLE, f"{dst_catalog.name}.{src_schema}.{src_external_table.name}"
)
target_table_grants = ws.grants.get(SecurableType.TABLE, table_full_name)
match = False
for _ in target_table_grants.privilege_assignments:
if _.principal == user.user_name and _.privileges == [Privilege.ALL_PRIVILEGES]:
Expand Down

0 comments on commit b23d2d6

Please sign in to comment.