From 43e3d0868095938a63b6f344534fa24236f62416 Mon Sep 17 00:00:00 2001 From: vuong-nguyen <44292934+nkvuong@users.noreply.github.com> Date: Fri, 3 May 2024 20:57:46 +0100 Subject: [PATCH] Improved reliability of table migration status refresher (#1623) ## Changes - Always reset table migration status when requesting the latest snapshot within `table_migrate` - Handle `NotFound` error when refreshing migration status ### Linked issues Resolves #1622, #1615 ### Tests - [x] manually tested - [x] verified on staging environment (screenshot attached) --- .../ucx/hive_metastore/migration_status.py | 3 ++- .../labs/ucx/hive_metastore/table_migrate.py | 19 ++++++++++--------- .../hive_metastore/test_workflows.py | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/migration_status.py b/src/databricks/labs/ucx/hive_metastore/migration_status.py index 9043e65ff7..e5b0bfbe32 100644 --- a/src/databricks/labs/ucx/hive_metastore/migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/migration_status.py @@ -70,7 +70,8 @@ def get_seen_tables(self) -> dict[str, str]: seen_tables: dict[str, str] = {} for schema in self._iter_schemas(): try: - tables = self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name) + # ws.tables.list returns Iterator[TableInfo], so we need to convert it to a list in order to catch the exception + tables = list(self._ws.tables.list(catalog_name=schema.catalog_name, schema_name=schema.name)) except NotFound: logger.warning( f"Schema {schema.catalog_name}.{schema.name} no longer exists. Skipping checking its migration status." diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 5e9f2faaf7..eb837edc03 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -43,7 +43,7 @@ def __init__( backend: SqlBackend, table_mapping: TableMapping, group_manager: GroupManager, - migration_status_refresher: 'MigrationStatusRefresher', + migration_status_refresher: MigrationStatusRefresher, principal_grants: PrincipalACL, ): self._tc = table_crawler @@ -57,7 +57,11 @@ def __init__( self._principal_grants = principal_grants def index(self): - # TODO: remove this method + return self._migration_status_refresher.index() + + def _index_with_reset(self): + # when we want the latest up-to-date status, e.g. to determine whether views dependencies have been migrated + self._migration_status_refresher.reset() return self._migration_status_refresher.index() def migrate_tables( @@ -116,9 +120,8 @@ def _migrate_tables( def _migrate_views(self, acl_strategy, all_grants_to_migrate, all_migrated_groups, all_principal_grants): tables_to_migrate = self._tm.get_tables_to_migrate(self._tc) - self._migration_status_refresher.reset() all_tasks = [] - sequencer = ViewsMigrationSequencer(tables_to_migrate, self.index()) + sequencer = ViewsMigrationSequencer(tables_to_migrate, self._index_with_reset()) batches = sequencer.sequence_batches() for batch in batches: tasks = [] @@ -134,9 +137,7 @@ def _migrate_views(self, acl_strategy, all_grants_to_migrate, all_migrated_group ) ) Threads.strict("migrate views", tasks) - self._migration_status_refresher.reset() all_tasks.extend(tasks) - self._migration_status_refresher.reset() return all_tasks def _compute_grants( @@ -201,7 +202,7 @@ def _migrate_view( def _view_can_be_migrated(self, view: ViewToMigrate): # dependencies have already been computed, therefore an empty dict is good enough for table in view.dependencies: - if not self.index().get(table.schema, table.name): + if not self._index_with_reset().get(table.schema, table.name): logger.info(f"View {view.src.key} cannot be migrated because {table.key} is not migrated yet") return False return True @@ -219,8 +220,8 @@ def _sql_migrate_view(self, src_view: ViewToMigrate) -> str: # CREATE VIEW x.y (col1, col2) AS SELECT * FROM w.t create_statement = self._backend.fetch(f"SHOW CREATE TABLE {src_view.src.safe_sql_key}") src_view.src.view_text = next(iter(create_statement))["createtab_stmt"] - migration_index = self._migration_status_refresher.index() - return src_view.sql_migrate_view(migration_index) + # this does not require the index to be refreshed because the dependencies have already been validated + return src_view.sql_migrate_view(self.index()) def _migrate_external_table(self, src_table: Table, rule: Rule, grants: list[Grant] | None = None): target_table_key = rule.as_uc_table_key diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index e8b91c1ba8..556e281145 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -35,6 +35,6 @@ def test_table_migration_job_refreshes_migration_status(ws, installation_ctx, pr ) migration_status = list(ctx.sql_backend.fetch(query_migration_status)) assert_message_postfix = f" found for {table.table_type} {table.full_name}" - assert len(migration_status) == 1, "No migration status found" + assert_message_postfix + assert len(migration_status) == 1, "No migration status" + assert_message_postfix assert migration_status[0].dst_schema is not None, "No destination schema" + assert_message_postfix assert migration_status[0].dst_table is not None, "No destination table" + assert_message_postfix