Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group migration: continue permission migration even if one or more groups fails #1924

Merged
merged 10 commits into from
Jun 25, 2024
36 changes: 28 additions & 8 deletions src/databricks/labs/ucx/workspace_access/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ def _apply_to_groups(self, ws: WorkspaceClient, *, renamed: bool = False) -> boo
if len(self) == 0:
logger.info("No valid groups selected, nothing to do.")
return True
logger.info(f"Migrating permissions to {len(self)} account groups.")
items = 0
logger.info(f"Migrating permissions for {len(self)} account groups.")
total_permissions = 0
success_groups = 0
errors: list[Exception] = []
for migrated_group in self.groups:
name_in_workspace = migrated_group.name_in_workspace
if renamed:
Expand All @@ -109,14 +111,28 @@ def _apply_to_groups(self, ws: WorkspaceClient, *, renamed: bool = False) -> boo
# the migration fails.
name_in_workspace = migrated_group.temporary_name
name_in_account = migrated_group.name_in_account
items += self._migrate_group_permissions_paginated(ws, name_in_workspace, name_in_account)
logger.info(f"Migrated {items} permissions.")
try:
group_permissions = self._migrate_group_permissions_paginated(ws, name_in_workspace, name_in_account)
logger.info(
f"Migrated {group_permissions} permissions: {name_in_workspace} (workspace) -> {name_in_account} (account)"
)
total_permissions += group_permissions
success_groups += 1
except IOError as e:
logger.exception(
f"Migration of group permissions failed: {name_in_workspace} (workspace) -> {name_in_account} (account)"
)
errors.append(e)
logger.info(f"Migrated {total_permissions} permissions for {success_groups}/{len(self)} groups successfully.")
if errors:
logger.error(f"Migrating permissions failed for {len(errors)}/{len(self)} groups.")
raise ManyError(errors)
return True

@staticmethod
def _migrate_group_permissions_paginated(ws: WorkspaceClient, name_in_workspace: str, name_in_account: str):
def _migrate_group_permissions_paginated(ws: WorkspaceClient, name_in_workspace: str, name_in_account: str) -> int:
batch_size = 1000
logger.info(f"Migrating permissions: {name_in_workspace} (workspace) -> {name_in_account} (account)")
logger.info(f"Migrating permissions: {name_in_workspace} (workspace) -> {name_in_account} (account) starting")
permissions_migrated = 0
while True:
result = ws.permission_migration.migrate_permissions(
Expand All @@ -126,10 +142,14 @@ def _migrate_group_permissions_paginated(ws: WorkspaceClient, name_in_workspace:
size=batch_size,
)
if not result.permissions_migrated:
logger.info("No more permissions to migrate.")
logger.info(
f"Migrating permissions: {name_in_workspace} (workspace) -> {name_in_account} (account) finished"
)
return permissions_migrated
permissions_migrated += result.permissions_migrated
logger.info(f"Migrated {result.permissions_migrated} permissions to {name_in_account} account group")
logger.info(
f"Migrating permissions: {name_in_workspace} (workspace) -> {name_in_account} (account) progress={permissions_migrated}(+{result.permissions_migrated})"
)


class GroupMigrationStrategy:
Expand Down
7 changes: 5 additions & 2 deletions src/databricks/labs/ucx/workspace_access/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ def apply_permissions(self, ctx: RuntimeContext):
migration_state = ctx.group_manager.get_migration_state()
if len(migration_state.groups) == 0:
logger.info("Skipping group migration as no groups were found.")
return
migration_state.apply_to_renamed_groups(ctx.workspace_client)
elif migration_state.apply_to_renamed_groups(ctx.workspace_client):
logger.info("Group permission migration completed successfully.")
else:
msg = "Permission migration for groups failed; reason unknown."
raise RuntimeError(msg)


class ValidateGroupPermissions(Workflow):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/source_code/linters/test_python_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def foo(): return "bar"
),
],
)
def test_infers_dbutils_notebook_run_dynamic_value(code, expected):
def test_infers_dbutils_notebook_run_dynamic_value(code, expected) -> None:
tree = Tree.parse(code)
calls = DbutilsLinter.list_dbutils_notebook_run_calls(tree)
all_paths: list[str] = []
Expand Down
72 changes: 65 additions & 7 deletions tests/unit/workspace_access/test_workflows.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import logging
from unittest.mock import create_autospec, call

import pytest
from databricks.labs.blueprint.parallel import ManyError
from databricks.labs.lsql.backends import MockBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError
from databricks.sdk.service.iam import PermissionMigrationResponse

from databricks.labs.ucx.workspace_access.groups import GroupManager
from databricks.labs.ucx.workspace_access.workflows import (
RemoveWorkspaceLocalGroups,
GroupMigration,
Expand All @@ -13,27 +17,27 @@
from tests.unit import GROUPS, PERMISSIONS


def test_runtime_delete_backup_groups(run_workflow):
def test_runtime_delete_backup_groups(run_workflow) -> None:
ctx = run_workflow(RemoveWorkspaceLocalGroups.delete_backup_groups)
assert 'SELECT * FROM hive_metastore.ucx.groups' in ctx.sql_backend.queries


def test_runtime_apply_permissions_to_account_groups(run_workflow):
def test_runtime_apply_permissions_to_account_groups(run_workflow) -> None:
ctx = run_workflow(GroupMigration.apply_permissions_to_account_groups)
assert 'SELECT * FROM hive_metastore.ucx.groups' in ctx.sql_backend.queries


def test_rename_workspace_local_group(run_workflow):
def test_rename_workspace_local_group(run_workflow) -> None:
ctx = run_workflow(GroupMigration.rename_workspace_local_groups)
assert 'SELECT * FROM hive_metastore.ucx.groups' in ctx.sql_backend.queries


def test_reflect_account_groups_on_workspace(run_workflow):
def test_reflect_account_groups_on_workspace(run_workflow) -> None:
ctx = run_workflow(PermissionsMigrationAPI.reflect_account_groups_on_workspace)
assert 'SELECT * FROM hive_metastore.ucx.groups' in ctx.sql_backend.queries


def test_migrate_permissions_experimental(run_workflow):
def test_migrate_permissions_experimental(run_workflow) -> None:
rows = {
'SELECT \\* FROM hive_metastore.ucx.groups': GROUPS[
("", "workspace_group_1", "account_group_1", "temp_1", "", "", "", ""),
Expand All @@ -57,7 +61,7 @@ def test_migrate_permissions_experimental(run_workflow):
ws.permission_migration.migrate_permissions.assert_has_calls(calls, any_order=True)


def test_migrate_permissions_experimental_paginated(run_workflow):
def test_migrate_permissions_experimental_paginated(run_workflow) -> None:
rows = {
'SELECT \\* FROM hive_metastore.ucx.groups': GROUPS[
("", "workspace_group_1", "account_group_1", "temp_1", "", "", "", ""),
Expand All @@ -83,7 +87,7 @@ def test_migrate_permissions_experimental_paginated(run_workflow):
ws.permission_migration.migrate_permissions.assert_has_calls(calls, any_order=True)


def test_migrate_permissions_experimental_error(run_workflow):
def test_migrate_permissions_not_enabled_error(run_workflow) -> None:
rows = {
'SELECT \\* FROM hive_metastore.ucx.groups': GROUPS[
("", "workspace_group_1", "account_group_1", "temp_1", "", "", "", ""),
Expand All @@ -97,3 +101,57 @@ def test_migrate_permissions_experimental_error(run_workflow):
ws.permission_migration.migrate_permissions.side_effect = NotImplementedError("api not enabled")
with pytest.raises(NotImplementedError):
run_workflow(PermissionsMigrationAPI.apply_permissions, sql_backend=sql_backend, workspace_client=ws)


def test_migrate_permissions_continue_on_error(run_workflow, caplog) -> None:
"""Check that permission migration continues for other groups even if it fails for a single group."""
rows = {
'SELECT \\* FROM hive_metastore.ucx.groups': GROUPS[
("", "workspace_group_1", "account_group_1", "temp_1", "", "", "", ""), # Will fail immediately.
("", "workspace_group_2", "account_group_2", "temp_2", "", "", "", ""), # Will fail midway.
("", "workspace_group_3", "account_group_3", "temp_3", "", "", "", ""), # Will succeed.
],
}
sql_backend = MockBackend(rows=rows)
ws = create_autospec(WorkspaceClient)
ws.get_workspace_id.return_value = "12345678"
ws.permission_migration.migrate_permissions.side_effect = [
# First group: fails immediately.
DatabricksError("simulate group failure: immediately"),
# Second group; fails mid-migration.
PermissionMigrationResponse(permissions_migrated=10),
DatabricksError("simulate group failure: midway"),
# Third group.
PermissionMigrationResponse(permissions_migrated=50),
PermissionMigrationResponse(permissions_migrated=0),
]

with pytest.raises(ManyError) as exc_info, caplog.at_level(logging.INFO):
run_workflow(PermissionsMigrationAPI.apply_permissions, sql_backend=sql_backend, workspace_client=ws)

raised_exception = exc_info.value
assert len(raised_exception.errs) == 2
expected_exceptions = {"simulate group failure: immediately", "simulate group failure: midway"}
assert {str(e) for e in raised_exception.errs} == expected_exceptions
assert "Migration of group permissions failed: temp_1" in caplog.text
assert "Migration of group permissions failed: temp_2" in caplog.text
assert "Migrated 50 permissions for 1/3 groups successfully." in caplog.messages
assert "Migrating permissions failed for 2/3 groups." in caplog.messages


def test_migrate_permissions_non_raised_error(run_workflow, migration_state, mocker) -> None:
"""The internal API for permission migration can report failure via the return value; verify this fails the task."""

# Set up the mocking plumbing.
mock_gm = create_autospec(GroupManager)
mock_gm.get_migration_state.return_value = migration_state

# Set up the injected "failure" where we return false without raising a specific error.
migration_state.apply_to_renamed_groups = mocker.Mock(return_value=False)

# Run the test.
with pytest.raises(RuntimeError) as exc_info:
run_workflow(PermissionsMigrationAPI.apply_permissions, group_manager=mock_gm)

# Verify the migration failure was converted into a task exception.
assert str(exc_info.value) == "Permission migration for groups failed; reason unknown."