Skip to content

Commit 77d2564

Browse files
qziyuanjincejames
authored andcommitted
Add workflow for in-place migrating external Parquet, Orc, Avro hiveserde tables (databrickslabs#1412)
## Changes 1. Add `MigrateHiveSerdeTablesInPlace` workflow to in-place upgrade external Parquet, Orc, Avro hiveserde tables. 2. Add functions in `tables.py` to describe the table and extract the hiveserde details, update the ddl from `show create table` by replacing the old table name with migration target and dbfs mount table location if any, the new ddl will be used to create the new table in UC for the in-place migrate. 3. Add `_migrate_external_table_hiveserde` function in `table_migrate.py`. Add two new arguments `mounts` and `hiveserde_in_place_migrate` in `TablesMigrator` class, `mounts` will be used to replace the dbfs mnt table location if any, `hiveserde_in_place_migrate` will be used to control which hiveserde to be migrated in current run so we can have multiple tasks running in parallel and each just migrate one type of hiveserde. This PR also removed majority of codes from PR databrickslabs#1432 , because only subset of table formats can be in-place migrated to UC with ddl from `show create table`. Simply creating table with the updated ddl for all `What.EXTERNAL_NO_SYNC` will fail. ### Linked issues Closes databrickslabs#889 ### Functionality - [ ] added relevant user documentation - [ ] added new CLI command - [ ] modified existing command: `databricks labs ucx ...` - [ ] added a new workflow - [ ] modified existing workflow: `...` - [ ] added a new table - [ ] modified existing table: `...` ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [x] manually tested - [x] added unit tests - [x] added integration tests - [ ] verified on staging environment (screenshot attached)
1 parent 58e38b0 commit 77d2564

File tree

13 files changed

+701
-115
lines changed

13 files changed

+701
-115
lines changed

Diff for: src/databricks/labs/ucx/hive_metastore/table_migrate.py

+88-42
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44
from collections.abc import Iterable
55
from functools import partial
66

7-
import sqlglot
8-
from sqlglot import expressions
97
from databricks.labs.blueprint.parallel import Threads
108
from databricks.labs.lsql.backends import SqlBackend
119
from databricks.sdk import WorkspaceClient
1210

1311
from databricks.labs.ucx.framework.utils import escape_sql_identifier
14-
from databricks.labs.ucx.hive_metastore import TablesCrawler
12+
from databricks.labs.ucx.hive_metastore import TablesCrawler, Mounts
1513
from databricks.labs.ucx.hive_metastore.grants import Grant, GrantsCrawler, PrincipalACL
14+
from databricks.labs.ucx.hive_metastore.locations import Mount, ExternalLocations
1615
from databricks.labs.ucx.hive_metastore.mapping import (
1716
Rule,
1817
TableMapping,
@@ -24,6 +23,7 @@
2423
MigrationCount,
2524
Table,
2625
What,
26+
HiveSerdeType,
2727
)
2828
from databricks.labs.ucx.hive_metastore.view_migrate import (
2929
ViewsMigrationSequencer,
@@ -60,22 +60,45 @@ def index(self):
6060
# TODO: remove this method
6161
return self._migration_status_refresher.index()
6262

63-
def migrate_tables(self, what: What, acl_strategy: list[AclMigrationWhat] | None = None):
63+
def migrate_tables(
64+
self,
65+
what: What,
66+
acl_strategy: list[AclMigrationWhat] | None = None,
67+
mounts_crawler: Mounts | None = None,
68+
hiveserde_in_place_migrate: bool = False,
69+
):
6470
if what in [What.DB_DATASET, What.UNKNOWN]:
6571
logger.error(f"Can't migrate tables with type {what.name}")
6672
return None
6773
all_grants_to_migrate = None if acl_strategy is None else self._gc.snapshot()
6874
all_migrated_groups = None if acl_strategy is None else self._group.snapshot()
6975
all_principal_grants = None if acl_strategy is None else self._principal_grants.get_interactive_cluster_grants()
7076
self._init_seen_tables()
77+
# mounts will be used to replace the mnt based table location in the DDL for hiveserde table in-place migration
78+
mounts: list[Mount] = []
79+
if mounts_crawler:
80+
mounts = list(mounts_crawler.snapshot())
7181
if what == What.VIEW:
7282
return self._migrate_views(acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants)
7383
return self._migrate_tables(
74-
what, acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants
84+
what,
85+
acl_strategy,
86+
all_grants_to_migrate,
87+
all_migrated_groups,
88+
all_principal_grants,
89+
mounts,
90+
hiveserde_in_place_migrate,
7591
)
7692

7793
def _migrate_tables(
78-
self, what: What, acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants
94+
self,
95+
what: What,
96+
acl_strategy,
97+
all_grants_to_migrate,
98+
all_migrated_groups,
99+
all_principal_grants,
100+
mounts: list[Mount],
101+
hiveserde_in_place_migrate: bool = False,
79102
):
80103
tables_to_migrate = self._tm.get_tables_to_migrate(self._tc)
81104
tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate)
@@ -84,14 +107,10 @@ def _migrate_tables(
84107
grants = self._compute_grants(
85108
table.src, acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants
86109
)
87-
tasks.append(
88-
partial(
89-
self._migrate_table,
90-
table,
91-
grants,
92-
)
93-
)
110+
tasks.append(partial(self._migrate_table, table, grants, mounts, hiveserde_in_place_migrate))
94111
Threads.strict("migrate tables", tasks)
112+
if not tasks:
113+
logger.info(f"No tables found to migrate with type {what.name}")
95114
# the below is useful for testing
96115
return tasks
97116

@@ -134,7 +153,9 @@ def _compute_grants(
134153
def _migrate_table(
135154
self,
136155
src_table: TableToMigrate,
137-
grants: list[Grant] | None = None,
156+
grants: list[Grant],
157+
mounts: list[Mount],
158+
hiveserde_in_place_migrate: bool = False,
138159
):
139160
if self._table_already_migrated(src_table.rule.as_uc_table_key):
140161
logger.info(f"Table {src_table.src.key} already migrated to {src_table.rule.as_uc_table_key}")
@@ -145,8 +166,10 @@ def _migrate_table(
145166
return self._migrate_table_create_ctas(src_table.src, src_table.rule, grants)
146167
if src_table.src.what == What.EXTERNAL_SYNC:
147168
return self._migrate_external_table(src_table.src, src_table.rule, grants)
148-
if src_table.src.what == What.EXTERNAL_NO_SYNC:
149-
return self._migrate_non_sync_table(src_table.src, src_table.rule, grants)
169+
if src_table.src.what == What.EXTERNAL_HIVESERDE:
170+
return self._migrate_external_table_hiveserde(
171+
src_table.src, src_table.rule, grants, mounts, hiveserde_in_place_migrate
172+
)
150173
logger.info(f"Table {src_table.src.key} is not supported for migration")
151174
return True
152175

@@ -201,18 +224,60 @@ def _migrate_external_table(self, src_table: Table, rule: Rule, grants: list[Gra
201224
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
202225
return self._migrate_acl(src_table, rule, grants)
203226

204-
def _migrate_dbfs_root_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
205-
target_table_key = rule.as_uc_table_key
206-
table_migrate_sql = src_table.sql_migrate_dbfs(target_table_key)
207-
logger.debug(f"Migrating managed table {src_table.key} to using SQL query: {table_migrate_sql}")
227+
def _migrate_external_table_hiveserde(
228+
self,
229+
src_table: Table,
230+
rule: Rule,
231+
grants: list[Grant],
232+
mounts: list[Mount],
233+
hiveserde_in_place_migrate: bool = False,
234+
):
235+
# This hiveserde_in_place_migrate is used to determine if current migration should use in-place migration or CTAS.
236+
# We will provide two workflows for hiveserde table migration:
237+
# 1. One will migrate all hiveserde tables using CTAS which we officially support.
238+
# 2. The other one will migrate certain types of hiveserde in place, which is technically working, but the user
239+
# need to accept the risk that the old files created by hiveserde may not be processed correctly by Spark
240+
# datasource in corner cases.
241+
# User will need to decide which workflow to runs first which will migrate the hiveserde tables and mark the
242+
# `upgraded_to` property and hence those tables will be skipped in the migration workflow runs later.
243+
if not hiveserde_in_place_migrate:
244+
# TODO: Add sql_migrate_external_hiveserde_ctas here
245+
return False
246+
247+
# verify hive serde type
248+
hiveserde_type = src_table.hiveserde_type(self._backend)
249+
if hiveserde_type in [
250+
HiveSerdeType.NOT_HIVESERDE,
251+
HiveSerdeType.OTHER_HIVESERDE,
252+
HiveSerdeType.INVALID_HIVESERDE_INFO,
253+
]:
254+
logger.warning(f"{src_table.key} table can only be migrated using CTAS.")
255+
return False
256+
257+
# if the src table location is using mount, resolve the mount location so it will be used in the updated DDL
258+
dst_table_location = None
259+
if mounts and src_table.is_dbfs_mnt:
260+
dst_table_location = ExternalLocations.resolve_mount(src_table.location, mounts)
261+
262+
table_migrate_sql = src_table.sql_migrate_external_hiveserde_in_place(
263+
rule.catalog_name, rule.dst_schema, rule.dst_table, self._backend, hiveserde_type, dst_table_location
264+
)
265+
if not table_migrate_sql:
266+
logger.error(
267+
f"Failed to generate in-place migration DDL for {src_table.key}, skip the in-place migration. It can be migrated in CTAS workflow"
268+
)
269+
return False
270+
271+
logger.debug(f"Migrating external table {src_table.key} to using SQL query: {table_migrate_sql}")
208272
self._backend.execute(table_migrate_sql)
209273
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
210274
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
211275
return self._migrate_acl(src_table, rule, grants)
212276

213-
def _migrate_non_sync_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
214-
table_migrate_sql = self._get_create_in_place_sql(src_table, rule)
215-
logger.debug(f"Migrating table (No Sync) {src_table.key} to using SQL query: {table_migrate_sql}")
277+
def _migrate_dbfs_root_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
278+
target_table_key = rule.as_uc_table_key
279+
table_migrate_sql = src_table.sql_migrate_dbfs(target_table_key)
280+
logger.debug(f"Migrating managed table {src_table.key} to using SQL query: {table_migrate_sql}")
216281
self._backend.execute(table_migrate_sql)
217282
self._backend.execute(src_table.sql_alter_to(rule.as_uc_table_key))
218283
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
@@ -226,25 +291,6 @@ def _migrate_table_create_ctas(self, src_table: Table, rule: Rule, grants: list[
226291
self._backend.execute(src_table.sql_alter_from(rule.as_uc_table_key, self._ws.get_workspace_id()))
227292
return self._migrate_acl(src_table, rule, grants)
228293

229-
def _get_create_in_place_sql(self, src_table: Table, rule: Rule) -> str:
230-
create_sql = str(next(self._backend.fetch(src_table.sql_show_create()))["createtab_stmt"])
231-
statements = sqlglot.parse(create_sql, read='databricks')
232-
assert len(statements) == 1, 'Expected a single statement'
233-
create = statements[0]
234-
assert isinstance(create, expressions.Create), 'Expected a CREATE statement'
235-
# safely replace current table name with the updated catalog
236-
for table_name in create.find_all(expressions.Table):
237-
if table_name.db == src_table.database and table_name.name == src_table.name:
238-
new_table_name = expressions.Table(
239-
catalog=rule.catalog_name,
240-
db=rule.dst_schema,
241-
this=rule.dst_table,
242-
)
243-
table_name.replace(new_table_name)
244-
# safely replace CREATE with CREATE IF NOT EXISTS
245-
create.args['exists'] = True
246-
return create.sql('databricks')
247-
248294
def _get_create_ctas_sql(self, src_table: Table, rule: Rule) -> str:
249295
create_sql = (
250296
f"CREATE TABLE IF NOT EXISTS {escape_sql_identifier(rule.as_uc_table_key)} "

Diff for: src/databricks/labs/ucx/hive_metastore/tables.py

+126
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66
from enum import Enum, auto
77
from functools import partial
88

9+
import sqlglot
10+
from sqlglot import expressions
11+
from sqlglot.expressions import LocationProperty
12+
from sqlglot.errors import ParseError
13+
914
from databricks.labs.blueprint.parallel import Threads
1015
from databricks.labs.lsql.backends import SqlBackend
1116
from databricks.sdk.errors import NotFound
@@ -18,6 +23,7 @@
1823

1924
class What(Enum):
2025
EXTERNAL_SYNC = auto()
26+
EXTERNAL_HIVESERDE = auto()
2127
EXTERNAL_NO_SYNC = auto()
2228
DBFS_ROOT_DELTA = auto()
2329
DBFS_ROOT_NON_DELTA = auto()
@@ -26,6 +32,15 @@ class What(Enum):
2632
UNKNOWN = auto()
2733

2834

35+
class HiveSerdeType(Enum):
36+
PARQUET = auto()
37+
AVRO = auto()
38+
ORC = auto()
39+
OTHER_HIVESERDE = auto()
40+
NOT_HIVESERDE = auto()
41+
INVALID_HIVESERDE_INFO = auto()
42+
43+
2944
class AclMigrationWhat(Enum):
3045
LEGACY_TACL = auto()
3146
PRINCIPAL = auto()
@@ -117,6 +132,15 @@ def is_dbfs_root(self) -> bool:
117132
return True
118133
return False
119134

135+
@property
136+
def is_dbfs_mnt(self) -> bool:
137+
if not self.location:
138+
return False
139+
for dbfs_mnt_prefix in self.DBFS_ROOT_PREFIX_EXCEPTIONS:
140+
if self.location.startswith(dbfs_mnt_prefix):
141+
return True
142+
return False
143+
120144
@property
121145
def is_format_supported_for_sync(self) -> bool:
122146
if self.table_format is None:
@@ -150,6 +174,8 @@ def what(self) -> What:
150174
return What.DBFS_ROOT_NON_DELTA
151175
if self.kind == "TABLE" and self.is_format_supported_for_sync:
152176
return What.EXTERNAL_SYNC
177+
if self.kind == "TABLE" and self.table_format.upper() == "HIVE":
178+
return What.EXTERNAL_HIVESERDE
153179
if self.kind == "TABLE":
154180
return What.EXTERNAL_NO_SYNC
155181
if self.kind == "VIEW":
@@ -167,6 +193,106 @@ def sql_migrate_create_like(self, target_table_key):
167193
f"{escape_sql_identifier(self.key)};"
168194
)
169195

196+
def hiveserde_type(self, backend: SqlBackend) -> HiveSerdeType:
197+
if self.table_format != "HIVE":
198+
return HiveSerdeType.NOT_HIVESERDE
199+
# Extract hive serde info, ideally this should be done by table crawler.
200+
# But doing here to avoid breaking change to the `tables` table in the inventory schema.
201+
describe = {}
202+
for key, values, _ in backend.fetch(f"DESCRIBE TABLE EXTENDED {escape_sql_identifier(self.key)}"):
203+
describe[key] = values
204+
if not {"Serde Library", "InputFormat", "OutputFormat"} <= describe.keys():
205+
return HiveSerdeType.INVALID_HIVESERDE_INFO
206+
serde = describe["Serde Library"]
207+
input_format = describe["InputFormat"]
208+
output_format = describe["OutputFormat"]
209+
if self._if_parquet_serde(serde, input_format, output_format):
210+
return HiveSerdeType.PARQUET
211+
if self._if_avro_serde(serde, input_format, output_format):
212+
return HiveSerdeType.AVRO
213+
if self._if_orc_serde(serde, input_format, output_format):
214+
return HiveSerdeType.ORC
215+
return HiveSerdeType.OTHER_HIVESERDE
216+
217+
def _if_parquet_serde(self, serde, input_format, output_format) -> bool:
218+
return (
219+
serde == "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
220+
and input_format == "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
221+
and output_format == "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
222+
)
223+
224+
def _if_avro_serde(self, serde, input_format, output_format) -> bool:
225+
return (
226+
serde == "org.apache.hadoop.hive.serde2.avro.AvroSerDe"
227+
and input_format == "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"
228+
and output_format == "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"
229+
)
230+
231+
def _if_orc_serde(self, serde, input_format, output_format) -> bool:
232+
return (
233+
serde == "org.apache.hadoop.hive.ql.io.orc.OrcSerde"
234+
and input_format == "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"
235+
and output_format == "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"
236+
)
237+
238+
def sql_migrate_external_hiveserde_in_place(
239+
self,
240+
catalog_name,
241+
dst_schema,
242+
dst_table,
243+
backend: SqlBackend,
244+
hiveserde_type: HiveSerdeType,
245+
replace_table_location: str | None = None,
246+
) -> str | None:
247+
# "PARQUET", "AVRO", "ORC" can be migrated with "SHOW CREATE TABLE..." DDL
248+
if hiveserde_type in [HiveSerdeType.PARQUET, HiveSerdeType.AVRO, HiveSerdeType.ORC]:
249+
return self._ddl_show_create_table(backend, catalog_name, dst_schema, dst_table, replace_table_location)
250+
251+
# "TEXTFILE" hiveserde needs extra handling on preparing the DDL
252+
# TODO: add support for "TEXTFILE" hiveserde, when the data can be parsed as Spark CSV datasource
253+
254+
# "JSON", "CSV" hiveserde need extra handling on preparing the DDL
255+
# TODO: DBR does not bundle the jars for "JSON", "CSV" hiveserde, it's unlikely we see those tables.
256+
# Although it's possible that users has the jars installed as cluster library and use those tables in Databricks,
257+
# we hold off the implementation for now until we see the real use case.
258+
return None
259+
260+
def _ddl_show_create_table(
261+
self, backend: SqlBackend, catalog_name, dst_schema, dst_table, replace_location
262+
) -> str | None:
263+
# get raw DDL from "SHOW CREATE TABLE..."
264+
createtab_stmt = next(backend.fetch(self.sql_show_create()))["createtab_stmt"]
265+
# parse the DDL and replace the old table name with the new UC table name
266+
try:
267+
statements = sqlglot.parse(createtab_stmt)
268+
except (ValueError, ParseError):
269+
logger.exception(f"Exception when parsing 'SHOW CREATE TABLE' DDL for {self.key}")
270+
return None
271+
272+
statement = statements[0]
273+
if not statement:
274+
logger.error(f"sqlglot parsed none statement from 'SHOW CREATE TABLE' DDL for {self.key}")
275+
return None
276+
277+
src_table = statement.find(expressions.Table)
278+
if not src_table:
279+
logger.error(f"sqlglot failed to extract table object from parsed DDL for {self.key}")
280+
return None
281+
new_table = expressions.Table(catalog=catalog_name, db=dst_schema, this=dst_table)
282+
src_table.replace(new_table)
283+
284+
if replace_location:
285+
# replace dbfs mnt in ddl if any
286+
mnt_loc = statement.find(LocationProperty)
287+
if not mnt_loc:
288+
logger.error(f"sqlglot failed to extract table location object from parsed DDL for {self.key}")
289+
return None
290+
new_loc = LocationProperty(this=f"'{replace_location}'")
291+
mnt_loc.replace(new_loc)
292+
293+
new_sql = statement.sql('databricks')
294+
return new_sql
295+
170296
def sql_migrate_dbfs(self, target_table_key):
171297
if not self.is_delta:
172298
msg = f"{self.key} is not DELTA: {self.table_format}"

0 commit comments

Comments
 (0)