Skip to content

Commit

Permalink
Add workflow for in-place migrating external Parquet, Orc, Avro hives…
Browse files Browse the repository at this point in the history
…erde tables (#1412)

## Changes
1. Add `MigrateHiveSerdeTablesInPlace` workflow to in-place upgrade
external Parquet, Orc, Avro hiveserde tables.
2. Add functions in `tables.py` to describe the table and extract the
hiveserde details, update the ddl from `show create table` by replacing
the old table name with migration target and dbfs mount table location
if any, the new ddl will be used to create the new table in UC for the
in-place migrate.
3. Add `_migrate_external_table_hiveserde` function in
`table_migrate.py`. Add two new arguments `mounts` and
`hiveserde_in_place_migrate` in `TablesMigrator` class, `mounts` will be
used to replace the dbfs mnt table location if any,
`hiveserde_in_place_migrate` will be used to control which hiveserde to
be migrated in current run so we can have multiple tasks running in
parallel and each just migrate one type of hiveserde.

This PR also removed majority of codes from PR #1432 , because only
subset of table formats can be in-place migrated to UC with ddl from
`show create table`. Simply creating table with the updated ddl for all
`What.EXTERNAL_NO_SYNC` will fail.

### Linked issues

Closes #889 

### Functionality 

- [ ] added relevant user documentation
- [ ] added new CLI command
- [ ] modified existing command: `databricks labs ucx ...`
- [ ] added a new workflow
- [ ] modified existing workflow: `...`
- [ ] added a new table
- [ ] modified existing table: `...`

### Tests
<!-- How is this tested? Please see the checklist below and also
describe any other relevant tests -->

- [x] manually tested
- [x] added unit tests
- [x] added integration tests
- [ ] verified on staging environment (screenshot attached)
  • Loading branch information
qziyuan authored Apr 23, 2024
1 parent 33c2655 commit 9cb9dbf
Show file tree
Hide file tree
Showing 13 changed files with 701 additions and 115 deletions.
130 changes: 88 additions & 42 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
from collections.abc import Iterable
from functools import partial

import sqlglot
from sqlglot import expressions
from databricks.labs.blueprint.parallel import Threads
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore import TablesCrawler, Mounts
from databricks.labs.ucx.hive_metastore.grants import Grant, GrantsCrawler, PrincipalACL
from databricks.labs.ucx.hive_metastore.locations import Mount, ExternalLocations
from databricks.labs.ucx.hive_metastore.mapping import (
Rule,
TableMapping,
Expand All @@ -24,6 +23,7 @@
MigrationCount,
Table,
What,
HiveSerdeType,
)
from databricks.labs.ucx.hive_metastore.view_migrate import (
ViewsMigrationSequencer,
Expand Down Expand Up @@ -60,22 +60,45 @@ def index(self):
# TODO: remove this method
return self._migration_status_refresher.index()

def migrate_tables(self, what: What, acl_strategy: list[AclMigrationWhat] | None = None):
def migrate_tables(
self,
what: What,
acl_strategy: list[AclMigrationWhat] | None = None,
mounts_crawler: Mounts | None = None,
hiveserde_in_place_migrate: bool = False,
):
if what in [What.DB_DATASET, What.UNKNOWN]:
logger.error(f"Can't migrate tables with type {what.name}")
return None
all_grants_to_migrate = None if acl_strategy is None else self._gc.snapshot()
all_migrated_groups = None if acl_strategy is None else self._group.snapshot()
all_principal_grants = None if acl_strategy is None else self._principal_grants.get_interactive_cluster_grants()
self._init_seen_tables()
# mounts will be used to replace the mnt based table location in the DDL for hiveserde table in-place migration
mounts: list[Mount] = []
if mounts_crawler:
mounts = list(mounts_crawler.snapshot())
if what == What.VIEW:
return self._migrate_views(acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants)
return self._migrate_tables(
what, acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants
what,
acl_strategy,
all_grants_to_migrate,
all_migrated_groups,
all_principal_grants,
mounts,
hiveserde_in_place_migrate,
)

def _migrate_tables(
self, what: What, acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants
self,
what: What,
acl_strategy,
all_grants_to_migrate,
all_migrated_groups,
all_principal_grants,
mounts: list[Mount],
hiveserde_in_place_migrate: bool = False,
):
tables_to_migrate = self._tm.get_tables_to_migrate(self._tc)
tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate)
Expand All @@ -84,14 +107,10 @@ def _migrate_tables(
grants = self._compute_grants(
table.src, acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants
)
tasks.append(
partial(
self._migrate_table,
table,
grants,
)
)
tasks.append(partial(self._migrate_table, table, grants, mounts, hiveserde_in_place_migrate))
Threads.strict("migrate tables", tasks)
if not tasks:
logger.info(f"No tables found to migrate with type {what.name}")
# the below is useful for testing
return tasks

Expand Down Expand Up @@ -134,7 +153,9 @@ def _compute_grants(
def _migrate_table(
self,
src_table: TableToMigrate,
grants: list[Grant] | None = None,
grants: list[Grant],
mounts: list[Mount],
hiveserde_in_place_migrate: bool = False,
):
if self._table_already_migrated(src_table.rule.as_uc_table_key):
logger.info(f"Table {src_table.src.key} already migrated to {src_table.rule.as_uc_table_key}")
Expand All @@ -145,8 +166,10 @@ def _migrate_table(
return self._migrate_table_create_ctas(src_table.src, src_table.rule, grants)
if src_table.src.what == What.EXTERNAL_SYNC:
return self._migrate_external_table(src_table.src, src_table.rule, grants)
if src_table.src.what == What.EXTERNAL_NO_SYNC:
return self._migrate_non_sync_table(src_table.src, src_table.rule, grants)
if src_table.src.what == What.EXTERNAL_HIVESERDE:
return self._migrate_external_table_hiveserde(
src_table.src, src_table.rule, grants, mounts, hiveserde_in_place_migrate
)
logger.info(f"Table {src_table.src.key} is not supported for migration")
return True

Expand Down Expand Up @@ -201,18 +224,60 @@ def _migrate_external_table(self, src_table: Table, rule: Rule, grants: list[Gra
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
return self._migrate_acl(src_table, rule, grants)

def _migrate_dbfs_root_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
target_table_key = rule.as_uc_table_key
table_migrate_sql = src_table.sql_migrate_dbfs(target_table_key)
logger.debug(f"Migrating managed table {src_table.key} to using SQL query: {table_migrate_sql}")
def _migrate_external_table_hiveserde(
self,
src_table: Table,
rule: Rule,
grants: list[Grant],
mounts: list[Mount],
hiveserde_in_place_migrate: bool = False,
):
# This hiveserde_in_place_migrate is used to determine if current migration should use in-place migration or CTAS.
# We will provide two workflows for hiveserde table migration:
# 1. One will migrate all hiveserde tables using CTAS which we officially support.
# 2. The other one will migrate certain types of hiveserde in place, which is technically working, but the user
# need to accept the risk that the old files created by hiveserde may not be processed correctly by Spark
# datasource in corner cases.
# User will need to decide which workflow to runs first which will migrate the hiveserde tables and mark the
# `upgraded_to` property and hence those tables will be skipped in the migration workflow runs later.
if not hiveserde_in_place_migrate:
# TODO: Add sql_migrate_external_hiveserde_ctas here
return False

# verify hive serde type
hiveserde_type = src_table.hiveserde_type(self._backend)
if hiveserde_type in [
HiveSerdeType.NOT_HIVESERDE,
HiveSerdeType.OTHER_HIVESERDE,
HiveSerdeType.INVALID_HIVESERDE_INFO,
]:
logger.warning(f"{src_table.key} table can only be migrated using CTAS.")
return False

# if the src table location is using mount, resolve the mount location so it will be used in the updated DDL
dst_table_location = None
if mounts and src_table.is_dbfs_mnt:
dst_table_location = ExternalLocations.resolve_mount(src_table.location, mounts)

table_migrate_sql = src_table.sql_migrate_external_hiveserde_in_place(
rule.catalog_name, rule.dst_schema, rule.dst_table, self._backend, hiveserde_type, dst_table_location
)
if not table_migrate_sql:
logger.error(
f"Failed to generate in-place migration DDL for {src_table.key}, skip the in-place migration. It can be migrated in CTAS workflow"
)
return False

logger.debug(f"Migrating external table {src_table.key} to using SQL query: {table_migrate_sql}")
self._backend.execute(table_migrate_sql)
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
return self._migrate_acl(src_table, rule, grants)

def _migrate_non_sync_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
table_migrate_sql = self._get_create_in_place_sql(src_table, rule)
logger.debug(f"Migrating table (No Sync) {src_table.key} to using SQL query: {table_migrate_sql}")
def _migrate_dbfs_root_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
target_table_key = rule.as_uc_table_key
table_migrate_sql = src_table.sql_migrate_dbfs(target_table_key)
logger.debug(f"Migrating managed table {src_table.key} to using SQL query: {table_migrate_sql}")
self._backend.execute(table_migrate_sql)
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
Expand All @@ -226,25 +291,6 @@ def _migrate_table_create_ctas(self, src_table: Table, rule: Rule, grants: list[
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
return self._migrate_acl(src_table, rule, grants)

def _get_create_in_place_sql(self, src_table: Table, rule: Rule) -> str:
create_sql = str(next(self._backend.fetch(src_table.sql_show_create()))["createtab_stmt"])
statements = sqlglot.parse(create_sql, read='databricks')
assert len(statements) == 1, 'Expected a single statement'
create = statements[0]
assert isinstance(create, expressions.Create), 'Expected a CREATE statement'
# safely replace current table name with the updated catalog
for table_name in create.find_all(expressions.Table):
if table_name.db == src_table.database and table_name.name == src_table.name:
new_table_name = expressions.Table(
catalog=rule.catalog_name,
db=rule.dst_schema,
this=rule.dst_table,
)
table_name.replace(new_table_name)
# safely replace CREATE with CREATE IF NOT EXISTS
create.args['exists'] = True
return create.sql('databricks')

def _get_create_ctas_sql(self, src_table: Table, rule: Rule) -> str:
create_sql = (
f"CREATE TABLE IF NOT EXISTS {escape_sql_identifier(rule.as_uc_table_key)} "
Expand Down
126 changes: 126 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
from enum import Enum, auto
from functools import partial

import sqlglot
from sqlglot import expressions
from sqlglot.expressions import LocationProperty
from sqlglot.errors import ParseError

from databricks.labs.blueprint.parallel import Threads
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk.errors import NotFound
Expand All @@ -18,6 +23,7 @@

class What(Enum):
EXTERNAL_SYNC = auto()
EXTERNAL_HIVESERDE = auto()
EXTERNAL_NO_SYNC = auto()
DBFS_ROOT_DELTA = auto()
DBFS_ROOT_NON_DELTA = auto()
Expand All @@ -26,6 +32,15 @@ class What(Enum):
UNKNOWN = auto()


class HiveSerdeType(Enum):
PARQUET = auto()
AVRO = auto()
ORC = auto()
OTHER_HIVESERDE = auto()
NOT_HIVESERDE = auto()
INVALID_HIVESERDE_INFO = auto()


class AclMigrationWhat(Enum):
LEGACY_TACL = auto()
PRINCIPAL = auto()
Expand Down Expand Up @@ -117,6 +132,15 @@ def is_dbfs_root(self) -> bool:
return True
return False

@property
def is_dbfs_mnt(self) -> bool:
if not self.location:
return False
for dbfs_mnt_prefix in self.DBFS_ROOT_PREFIX_EXCEPTIONS:
if self.location.startswith(dbfs_mnt_prefix):
return True
return False

@property
def is_format_supported_for_sync(self) -> bool:
if self.table_format is None:
Expand Down Expand Up @@ -150,6 +174,8 @@ def what(self) -> What:
return What.DBFS_ROOT_NON_DELTA
if self.kind == "TABLE" and self.is_format_supported_for_sync:
return What.EXTERNAL_SYNC
if self.kind == "TABLE" and self.table_format.upper() == "HIVE":
return What.EXTERNAL_HIVESERDE
if self.kind == "TABLE":
return What.EXTERNAL_NO_SYNC
if self.kind == "VIEW":
Expand All @@ -167,6 +193,106 @@ def sql_migrate_create_like(self, target_table_key):
f"{escape_sql_identifier(self.key)};"
)

def hiveserde_type(self, backend: SqlBackend) -> HiveSerdeType:
if self.table_format != "HIVE":
return HiveSerdeType.NOT_HIVESERDE
# Extract hive serde info, ideally this should be done by table crawler.
# But doing here to avoid breaking change to the `tables` table in the inventory schema.
describe = {}
for key, values, _ in backend.fetch(f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(self.key)}"):
describe[key] = values
if not {"Serde Library", "InputFormat", "OutputFormat"} <= describe.keys():
return HiveSerdeType.INVALID_HIVESERDE_INFO
serde = describe["Serde Library"]
input_format = describe["InputFormat"]
output_format = describe["OutputFormat"]
if self._if_parquet_serde(serde, input_format, output_format):
return HiveSerdeType.PARQUET
if self._if_avro_serde(serde, input_format, output_format):
return HiveSerdeType.AVRO
if self._if_orc_serde(serde, input_format, output_format):
return HiveSerdeType.ORC
return HiveSerdeType.OTHER_HIVESERDE

def _if_parquet_serde(self, serde, input_format, output_format) -> bool:
return (
serde == "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
and input_format == "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
and output_format == "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
)

def _if_avro_serde(self, serde, input_format, output_format) -> bool:
return (
serde == "org.apache.hadoop.hive.serde2.avro.AvroSerDe"
and input_format == "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"
and output_format == "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"
)

def _if_orc_serde(self, serde, input_format, output_format) -> bool:
return (
serde == "org.apache.hadoop.hive.ql.io.orc.OrcSerde"
and input_format == "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"
and output_format == "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"
)

def sql_migrate_external_hiveserde_in_place(
self,
catalog_name,
dst_schema,
dst_table,
backend: SqlBackend,
hiveserde_type: HiveSerdeType,
replace_table_location: str | None = None,
) -> str | None:
# "PARQUET", "AVRO", "ORC" can be migrated with "SHOW CREATE TABLE..." DDL
if hiveserde_type in [HiveSerdeType.PARQUET, HiveSerdeType.AVRO, HiveSerdeType.ORC]:
return self._ddl_show_create_table(backend, catalog_name, dst_schema, dst_table, replace_table_location)

# "TEXTFILE" hiveserde needs extra handling on preparing the DDL
# TODO: add support for "TEXTFILE" hiveserde, when the data can be parsed as Spark CSV datasource

# "JSON", "CSV" hiveserde need extra handling on preparing the DDL
# TODO: DBR does not bundle the jars for "JSON", "CSV" hiveserde, it's unlikely we see those tables.
# Although it's possible that users has the jars installed as cluster library and use those tables in Databricks,
# we hold off the implementation for now until we see the real use case.
return None

def _ddl_show_create_table(
self, backend: SqlBackend, catalog_name, dst_schema, dst_table, replace_location
) -> str | None:
# get raw DDL from "SHOW CREATE TABLE..."
createtab_stmt = next(backend.fetch(self.sql_show_create()))["createtab_stmt"]
# parse the DDL and replace the old table name with the new UC table name
try:
statements = sqlglot.parse(createtab_stmt)
except (ValueError, ParseError):
logger.exception(f"Exception when parsing 'SHOW CREATE TABLE' DDL for {self.key}")
return None

statement = statements[0]
if not statement:
logger.error(f"sqlglot parsed none statement from 'SHOW CREATE TABLE' DDL for {self.key}")
return None

src_table = statement.find(expressions.Table)
if not src_table:
logger.error(f"sqlglot failed to extract table object from parsed DDL for {self.key}")
return None
new_table = expressions.Table(catalog=catalog_name, db=dst_schema, this=dst_table)
src_table.replace(new_table)

if replace_location:
# replace dbfs mnt in ddl if any
mnt_loc = statement.find(LocationProperty)
if not mnt_loc:
logger.error(f"sqlglot failed to extract table location object from parsed DDL for {self.key}")
return None
new_loc = LocationProperty(this=f"'{replace_location}'")
mnt_loc.replace(new_loc)

new_sql = statement.sql('databricks')
return new_sql

def sql_migrate_dbfs(self, target_table_key):
if not self.is_delta:
msg = f"{self.key} is not DELTA: {self.table_format}"
Expand Down
Loading

0 comments on commit 9cb9dbf

Please sign in to comment.