Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with migrating MANAGED hive_metastore table to UC #2928

Merged
merged 12 commits into from
Oct 16, 2024
18 changes: 9 additions & 9 deletions README.md

Large diffs are not rendered by default.

70 changes: 68 additions & 2 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import dataclasses
import logging
from collections import defaultdict
from functools import partial
from functools import partial, cached_property

from databricks.labs.blueprint.parallel import Threads
from databricks.labs.lsql.backends import SqlBackend
Expand All @@ -17,6 +17,7 @@
TableMapping,
TableToMigrate,
)

from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher
from databricks.labs.ucx.hive_metastore.tables import (
MigrationCount,
Expand Down Expand Up @@ -44,6 +45,7 @@ def __init__(
migrate_grants: MigrateGrants,
external_locations: ExternalLocations,
):

self._tc = table_crawler
self._backend = backend
self._ws = ws
Expand Down Expand Up @@ -71,6 +73,8 @@ def migrate_tables(
hiveserde_in_place_migrate: bool = False,
managed_table_external_storage: str = "CLONE",
):
if managed_table_external_storage == "CONVERT_TO_EXTERNAL":
self._spark = self._spark_session
if what in [What.DB_DATASET, What.UNKNOWN]:
logger.error(f"Can't migrate tables with type {what.name}")
return None
Expand Down Expand Up @@ -123,7 +127,19 @@ def _migrate_views(self):
self.index(force_refresh=True)
return all_tasks

@cached_property
def _spark_session(self):
# pylint: disable-next=import-error,import-outside-toplevel
from pyspark.sql.session import SparkSession # type: ignore[import-not-found]

return SparkSession.builder.getOrCreate()

def _migrate_managed_table(self, managed_table_external_storage: str, src_table: TableToMigrate):
if managed_table_external_storage == 'CONVERT_TO_EXTERNAL':
if self._convert_hms_table_to_external(src_table.src):
return self._migrate_external_table(
src_table.src, src_table.rule
) # _migrate_external_table remains unchanged
if managed_table_external_storage == 'SYNC_AS_EXTERNAL':
return self._migrate_managed_as_external_table(src_table.src, src_table.rule) # new method
if managed_table_external_storage == 'CLONE':
Expand Down Expand Up @@ -207,8 +223,58 @@ def _sql_migrate_view(self, src_view: ViewToMigrate) -> str:
# this does not require the index to be refreshed because the dependencies have already been validated
return src_view.sql_migrate_view(self.index())

@cached_property
def _catalog(self):
return self._spark._jsparkSession.sessionState().catalog() # pylint: disable=protected-access

@cached_property
def _table_identifier(self):
return self._spark._jvm.org.apache.spark.sql.catalyst.TableIdentifier # pylint: disable=protected-access

@cached_property
def _catalog_type(self):
return (
self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTableType # pylint: disable=protected-access
)

@cached_property
def _catalog_table(self):
return self._spark._jvm.org.apache.spark.sql.catalyst.catalog.CatalogTable # pylint: disable=protected-access

def _convert_hms_table_to_external(self, src_table: Table):
pass
try:
logger.info(f"Changing HMS managed table {src_table.name} to External Table type.")
database = self._spark._jvm.scala.Some(src_table.database) # pylint: disable=protected-access
table_identifier = self._table_identifier(src_table.name, database)
old_table = self._catalog.getTableMetadata(table_identifier)
new_table = self._catalog_table(
old_table.identifier(),
self._catalog_type('EXTERNAL'),
old_table.storage(),
old_table.schema(),
old_table.provider(),
old_table.partitionColumnNames(),
old_table.bucketSpec(),
old_table.owner(),
old_table.createTime(),
old_table.lastAccessTime(),
old_table.createVersion(),
old_table.properties(),
old_table.stats(),
old_table.viewText(),
old_table.comment(),
old_table.unsupportedFeatures(),
old_table.tracksPartitionsInCatalog(),
old_table.schemaPreservesCase(),
old_table.ignoredProperties(),
old_table.viewOriginalText(),
)
self._catalog.alterTable(new_table)
logger.info(f"Converted {src_table.name} to External Table type.")
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning(f"Error converting HMS table {src_table.name} to external: {e}", exc_info=True)
return False
return True

def _migrate_managed_as_external_table(self, src_table: Table, rule: Rule):
target_table_key = rule.as_uc_table_key
Expand Down
4 changes: 3 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ def migrate_external_tables_sync(self, ctx: RuntimeContext):
"""This workflow task migrates the external tables that are supported by SYNC command from the Hive Metastore
to the Unity Catalog.
"""
ctx.tables_migrator.migrate_tables(what=What.EXTERNAL_SYNC)
ctx.tables_migrator.migrate_tables(
what=What.EXTERNAL_SYNC, managed_table_external_storage=ctx.config.managed_table_external_storage
)

@job_task(job_cluster="table_migration", depends_on=[Assessment.crawl_tables])
def migrate_dbfs_root_delta_tables(self, ctx: RuntimeContext):
Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ def _config_table_migration(self, spark_conf_dict) -> tuple[int, int, dict, str]
managed_table_migration_choices = {
"Migrate MANAGED HMS table as EXTERNAL UC table. This option would require you to convert MANAGED HMS tables to EXTERNAL HMS tables once UC migration is complete, otherwise deleting HMS MANAGED table would delete the migrated UC table": 'SYNC_AS_EXTERNAL',
"Copy data from MANAGED HMS to MANAGED UC table": 'CLONE',
"Convert MANAGED HMS table to EXTERNAL HMS table and migrate as EXTERNAL UC table. This risks data leakage, as once the relevant HMS tables are deleted, the underlying data won't get deleted anymore.": 'CONVERT_TO_EXTERNAL',
}
managed_table_migration_choice = self.prompts.choice_from_dict(
"If hive_metastore contains managed table with external"
Expand Down
28 changes: 10 additions & 18 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,14 @@ def test_migrate_external_table(

@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_migrate_managed_table_to_external_table_without_conversion(
ws,
sql_backend,
runtime_ctx,
make_catalog,
make_mounted_location,
ws, sql_backend, runtime_ctx, make_catalog, make_mounted_location, make_random, env_or_skip
):
# TODO: update pytest fixture for make_schema to take location as parameter to create managed schema
# TODO: update azure blueprint to add spn in sql warehouse data access config
src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")
src_schema_name = f"dummy_s{make_random(4)}".lower()
src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}"
src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location)
src_external_table = runtime_ctx.make_table(
schema_name=src_schema.name,
external_csv=make_mounted_location,
external=False,
columns=[("`back`ticks`", "STRING")], # Test with column that needs escaping
)
dst_catalog = make_catalog()
Expand Down Expand Up @@ -214,18 +210,14 @@ def test_migrate_managed_table_to_external_table_without_conversion(

@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_migrate_managed_table_to_external_table_with_clone(
ws,
sql_backend,
runtime_ctx,
make_catalog,
make_mounted_location,
ws, sql_backend, runtime_ctx, make_catalog, make_mounted_location, make_random, env_or_skip
):
# TODO: update pytest fixture for make_schema to take location as parameter to create managed schema
# TODO: update azure blueprint to add spn in sql warehouse data access config
src_schema = runtime_ctx.make_schema(catalog_name="hive_metastore")
src_schema_name = f"dummy_s{make_random(4)}".lower()
src_schema_location = f"{env_or_skip('TEST_MOUNT_CONTAINER')}/a/{src_schema_name}"
src_schema = runtime_ctx.make_schema(name=src_schema_name, location=src_schema_location)
src_external_table = runtime_ctx.make_table(
schema_name=src_schema.name,
external_csv=make_mounted_location,
external=False,
columns=[("`back`ticks`", "STRING")], # Test with column that needs escaping
)
dst_catalog = make_catalog()
Expand Down
Loading
Loading