Skip to content

Commit

Permalink
Improved reliability of table migration status refresher (#1623)
Browse files Browse the repository at this point in the history
## Changes
- Always reset table migration status when requesting the latest
snapshot within `table_migrate`
- Handle `NotFound` error when refreshing migration status

### Linked issues
<!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes,
fixed, resolve, resolves, resolved. See
https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword
-->

Resolves #1622, #1615

### Tests
<!-- How is this tested? Please see the checklist below and also
describe any other relevant tests -->

- [x] manually tested
- [x] verified on staging environment (screenshot attached)
  • Loading branch information
nkvuong authored May 3, 2024
1 parent 899ba5f commit 43e3d08
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/databricks/labs/ucx/hive_metastore/migration_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def get_seen_tables(self) -> dict[str, str]:
seen_tables: dict[str, str] = {}
for schema in self._iter_schemas():
try:
tables = self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name)
# ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception
tables = list(self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name))
except NotFound:
logger.warning(
f"Schema {schema.catalog_name}.{schema.name} no longer exists. Skipping checking its migration status."
Expand Down
19 changes: 10 additions & 9 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(
backend: SqlBackend,
table_mapping: TableMapping,
group_manager: GroupManager,
migration_status_refresher: 'MigrationStatusRefresher',
migration_status_refresher: MigrationStatusRefresher,
principal_grants: PrincipalACL,
):
self._tc = table_crawler
Expand All @@ -57,7 +57,11 @@ def __init__(
self._principal_grants = principal_grants

def index(self):
# TODO: remove this method
return self._migration_status_refresher.index()

def _index_with_reset(self):
# when we want the latest up-to-date status, e.g. to determine whether views dependencies have been migrated
self._migration_status_refresher.reset()
return self._migration_status_refresher.index()

def migrate_tables(
Expand Down Expand Up @@ -116,9 +120,8 @@ def _migrate_tables(

def _migrate_views(self, acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants):
tables_to_migrate = self._tm.get_tables_to_migrate(self._tc)
self._migration_status_refresher.reset()
all_tasks = []
sequencer = ViewsMigrationSequencer(tables_to_migrate, self.index())
sequencer = ViewsMigrationSequencer(tables_to_migrate, self._index_with_reset())
batches = sequencer.sequence_batches()
for batch in batches:
tasks = []
Expand All @@ -134,9 +137,7 @@ def _migrate_views(self, acl_strategy, all_grants_to_migrate, all_migrated_group
)
)
Threads.strict("migrate views", tasks)
self._migration_status_refresher.reset()
all_tasks.extend(tasks)
self._migration_status_refresher.reset()
return all_tasks

def _compute_grants(
Expand Down Expand Up @@ -201,7 +202,7 @@ def _migrate_view(
def _view_can_be_migrated(self, view: ViewToMigrate):
# dependencies have already been computed, therefore an empty dict is good enough
for table in view.dependencies:
if not self.index().get(table.schema, table.name):
if not self._index_with_reset().get(table.schema, table.name):
logger.info(f"View {view.src.key} cannot be migrated because {table.key} is not migrated yet")
return False
return True
Expand All @@ -219,8 +220,8 @@ def _sql_migrate_view(self, src_view: ViewToMigrate) -> str:
# CREATE VIEW x.y (col1, col2) AS SELECT * FROM w.t
create_statement = self._backend.fetch(f"SHOW CREATE TABLE {src_view.src.safe_sql_key}")
src_view.src.view_text = next(iter(create_statement))["createtab_stmt"]
migration_index = self._migration_status_refresher.index()
return src_view.sql_migrate_view(migration_index)
# this does not require the index to be refreshed because the dependencies have already been validated
return src_view.sql_migrate_view(self.index())

def _migrate_external_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None):
target_table_key = rule.as_uc_table_key
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hive_metastore/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ def test_table_migration_job_refreshes_migration_status(ws, installation_ctx, pr
)
migration_status = list(ctx.sql_backend.fetch(query_migration_status))
assert_message_postfix = f" found for {table.table_type} {table.full_name}"
assert len(migration_status) == 1, "No migration status found" + assert_message_postfix
assert len(migration_status) == 1, "No migration status" + assert_message_postfix
assert migration_status[0].dst_schema is not None, "No destination schema" + assert_message_postfix
assert migration_status[0].dst_table is not None, "No destination table" + assert_message_postfix

0 comments on commit 43e3d08

Please sign in to comment.