Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group renaming: wait for consistency before completing task #1944

Merged
merged 4 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 61 additions & 7 deletions src/databricks/labs/ucx/workspace_access/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import re
from abc import abstractmethod
from collections.abc import Iterable
from collections.abc import Iterable, Collection
from dataclasses import dataclass
from datetime import timedelta
from typing import ClassVar
Expand Down Expand Up @@ -374,6 +374,16 @@ def generate_migrated_groups(self):
)


class GroupRenameIncompleteError(RuntimeError):
__slots__ = ("group_id", "old_name", "new_name")

def __init__(self, group_id: str, old_name: str, new_name: str) -> None:
super().__init__(f"Rename incomplete for group {group_id}: {old_name} -> {new_name}")
self.group_id = group_id
self.old_name = old_name
self.new_name = new_name


class GroupManager(CrawlerBase[MigratedGroup]):
_SYSTEM_GROUPS: ClassVar[list[str]] = ["users", "admins", "account users"]

Expand Down Expand Up @@ -425,18 +435,62 @@ def rename_groups(self):
continue
logger.info(f"Renaming: {migrated_group.name_in_workspace} -> {migrated_group.temporary_name}")
tasks.append(
functools.partial(self._rename_group, migrated_group.id_in_workspace, migrated_group.temporary_name)
functools.partial(
self._rename_group_and_wait,
migrated_group.id_in_workspace,
migrated_group.name_in_workspace,
migrated_group.temporary_name,
)
)
_, errors = Threads.gather("rename groups in the workspace", tasks)
if len(errors) > 0:
raise ManyError(errors)
renamed_groups = Threads.strict("rename groups in the workspace", tasks)
# Renaming is eventually consistent, and the tasks above have each polled to verify their rename completed.
# Here we also check that enumeration yields the updated names; this is necessary because otherwise downstream
# tasks (like reflect_account_groups_on_workspace()) may skip a renamed group because it doesn't appear to be
# present.
self._wait_for_renamed_groups(renamed_groups)

def _rename_group_and_wait(self, group_id: str, old_group_name, new_group_name: str) -> tuple[str, str]:
asnare marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f"Renaming group {group_id}: {old_group_name} -> {new_group_name}")
JCZuurmond marked this conversation as resolved.
Show resolved Hide resolved
self._rename_group(group_id, new_group_name)
logger.debug(f"Waiting for group {group_id} rename to take effect: {old_group_name} -> {new_group_name}")
self._wait_for_rename(group_id, old_group_name, new_group_name)
return group_id, new_group_name

@retried(on=[InternalError, ResourceConflict, DeadlineExceeded])
@rate_limited(max_requests=10, burst_period_seconds=60)
def _rename_group(self, group_id: str, new_group_name: str):
def _rename_group(self, group_id: str, new_group_name: str) -> None:
ops = [iam.Patch(iam.PatchOp.REPLACE, "displayName", new_group_name)]
self._ws.groups.patch(group_id, operations=ops)
return True

@retried(on=[GroupRenameIncompleteError], timeout=timedelta(minutes=2))
def _wait_for_rename(self, group_id: str, old_group_name: str, new_group_name: str) -> None:
group = self._ws.groups.get(group_id)
if group.display_name == old_group_name:
# Rename still pending.
raise GroupRenameIncompleteError(group_id, old_group_name, new_group_name)
if group.display_name != new_group_name:
# Group has an entirely unexpected name; something else is interfering.
msg = f"While waiting for group {group_id} rename ({old_group_name} -> {new_group_name} an unexpected name was observed: {group.display_name}"
raise RuntimeError(msg)
# Normal exit; group has been renamed.

@retried(on=[ManyError], timeout=timedelta(minutes=2))
def _wait_for_renamed_groups(self, expected_groups: Collection[tuple[str, str]]) -> None:
asnare marked this conversation as resolved.
Show resolved Hide resolved
attributes = "id,displayName"
found_groups = {
group.id: group.display_name
for group in self._list_workspace_groups("WorkspaceGroup", attributes)
if group.display_name
}
pending_renames: list[RuntimeError] = []
for group_id, expected_name in expected_groups:
found_name = found_groups.get(group_id, None)
if found_name is None:
pending_renames.append(RuntimeError(f"Missing group with id: {group_id} (renamed to {expected_name}"))
elif found_name != expected_name:
pending_renames.append(GroupRenameIncompleteError(group_id, found_name, expected_name))
if pending_renames:
raise ManyError(pending_renames)

def reflect_account_groups_on_workspace(self):
tasks = []
Expand Down
158 changes: 150 additions & 8 deletions tests/unit/workspace_access/test_groups.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import dataclasses
import json
import logging
from unittest.mock import create_autospec
from collections.abc import Generator, Sequence
from unittest.mock import create_autospec, patch, Mock

import pytest
from databricks.labs.blueprint.parallel import ManyError
from databricks.labs.blueprint.tui import MockPrompts
from databricks.labs.lsql.backends import MockBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import DatabricksError, NotFound, ResourceDoesNotExist
from databricks.sdk.errors import DatabricksError, InternalError, NotFound, ResourceDoesNotExist
from databricks.sdk.service import iam
from databricks.sdk.service.iam import ComplexValue, Group, ResourceMeta

Expand All @@ -19,6 +21,22 @@
)


@pytest.fixture
def fake_sleep() -> Generator[Mock, None, None]:
"""Allow a test to request this fixture if internal calls to sleep should return immediately.

This is useful during unit testing when we're normally checking for interactions and don't want normal delays to
occur.
"""
with patch("time.sleep") as mock:
# To prevent runaway loops, raise an exception if invoked more than 100 times.
def _check_count(*_, **__):
assert mock.call_count < 100, "time.sleep() has been invoked too often; runaway loop?"

mock.side_effect = _check_count
yield mock


def test_snapshot_with_group_created_in_account_console_should_be_considered():
backend = MockBackend()
ws = create_autospec(WorkspaceClient)
Expand Down Expand Up @@ -255,16 +273,26 @@ def test_rename_groups_should_patch_eligible_groups():
backend = MockBackend()
wsclient = create_autospec(WorkspaceClient)
group1 = Group(id="1", display_name="de", meta=ResourceMeta(resource_type="WorkspaceGroup"))
wsclient.groups.list.return_value = [
group1,
]
updated_group1 = dataclasses.replace(group1, display_name="test-group-de")
wsclient.groups.list.side_effect = (
# Preparing for the rename.
*[[group1]] * 3,
# Checking the rename completed
[updated_group1],
)
wsclient.groups.get.side_effect = (
# Preparing for the rename.
*[group1] * 2,
# Checking the rename completed.
updated_group1,
)
wsclient.groups.get.return_value = group1
account_admins_group_1 = Group(id="11", display_name="de")
wsclient.api_client.do.return_value = {
"Resources": [g.as_dict() for g in (account_admins_group_1,)],
}
GroupManager(backend, wsclient, inventory_database="inv", renamed_group_prefix="test-group-").rename_groups()
wsclient.groups.patch.assert_called_with(
wsclient.groups.patch.assert_called_once_with(
"1",
operations=[iam.Patch(iam.PatchOp.REPLACE, "displayName", "test-group-de")],
)
Expand Down Expand Up @@ -293,6 +321,121 @@ def test_rename_groups_should_filter_already_renamed_groups():
wsclient.groups.patch.assert_not_called()


def test_rename_groups_should_wait_for_renames_to_complete(fake_sleep: Mock) -> None:
"""Test that renaming groups waits until completion instead of returning immediately.

Group renaming is eventually consistent; however many downstream tasks assume that it completes synchronously.
Here we test that renaming the groups polls until the rename is detected.
"""
backend = MockBackend()
wsclient = create_autospec(WorkspaceClient)
original_group = Group(id="1", display_name="de", meta=ResourceMeta(resource_type="WorkspaceGroup"))
updated_group = dataclasses.replace(original_group, display_name="test-group-de")
# Used to enumerate groups.
list_side_effect_values = (
# Preparing for the rename.
*[[original_group]] * 3,
# Checking the rename completed; simulate:
# 1. Rename incomplete: original group still visible.
# 2. Still incomplete: group missing entirely.
# 3. Rename has finished: updated group is now visible.
[original_group],
[],
[updated_group],
)
wsclient.groups.list.side_effect = list_side_effect_values
# Used to load the migration state.
matching_account_admin_group = dataclasses.replace(
original_group, id="11", meta=ResourceMeta(resource_type="Group")
)
wsclient.api_client.do.return_value = {
"Resources": [g.as_dict() for g in (matching_account_admin_group,)],
}
# Used to get the group data prior to the rename, and then poll for the change.
get_return_values = (
# Enumerating the workspace groups and loading the migration state.
*[original_group] * 2,
# First call after rename: simulate the rename not yet being visible.
original_group,
# Second call, simulate things now being complete.
updated_group,
)
wsclient.groups.get.side_effect = get_return_values

# Perform the test itself.
GroupManager(backend, wsclient, inventory_database="inv", renamed_group_prefix="test-group-").rename_groups()

# Verify the internal interactions.
assert wsclient.groups.get.call_count == len(get_return_values)
assert wsclient.groups.list.call_count == len(list_side_effect_values)
fake_sleep.assert_called()


def test_rename_groups_should_retry_on_internal_error(fake_sleep: Mock) -> None:
"""Test that when a rename fails due to an internal error that an automatic retry takes place."""
backend = MockBackend()
wsclient = create_autospec(WorkspaceClient)
original_group = Group(id="1", display_name="de", meta=ResourceMeta(resource_type="WorkspaceGroup"))
updated_group = dataclasses.replace(original_group, display_name="test-group-de")
wsclient.groups.list.side_effect = (
# Preparing for the rename.
*[[original_group]] * 3,
# Checking the rename completed.
[updated_group],
)
matching_account_admin_group = dataclasses.replace(
original_group, id="11", meta=ResourceMeta(resource_type="Group")
)
wsclient.api_client.do.return_value = {
"Resources": [g.as_dict() for g in (matching_account_admin_group,)],
}
wsclient.groups.get.side_effect = (
# Preparing for the rename.
*[original_group] * 2,
# Checking the rename completed.
updated_group,
)
# The response to the PATCH call is ignored; set things up to fail on the first call and succeed on the second.
patch_responses: Sequence[type[BaseException] | dict] = (InternalError, {})
wsclient.groups.patch.side_effect = patch_responses

# Perform the test itself.
GroupManager(backend, wsclient, inventory_database="inv", renamed_group_prefix="test-group-").rename_groups()

# Verify the internal interactions.
assert wsclient.groups.patch.call_count == len(patch_responses)
fake_sleep.assert_called()


def test_rename_groups_should_fail_if_unknown_name_observed() -> None:
"""Test that interference during group renaming is detected.

During a rename from A -> B verify that we fail immediately if C is observed instead of waiting for a timeout to
occur. (Most likely a concurrent process is busy, and B will never happen.)
"""
backend = MockBackend()
wsclient = create_autospec(WorkspaceClient)
original_group = Group(id="1", display_name="de", meta=ResourceMeta(resource_type="WorkspaceGroup"))
wsclient.groups.list.return_value = [original_group]
matching_account_admin_group = dataclasses.replace(
original_group, id="11", meta=ResourceMeta(resource_type="Group")
)
wsclient.api_client.do.return_value = {
"Resources": [g.as_dict() for g in (matching_account_admin_group,)],
}
wsclient.groups.get.side_effect = (
# Preparing for the rename.
*[original_group] * 2,
# Here we inject the fault that the group has a completely different name.
dataclasses.replace(original_group, display_name="completely-unexpected-name"),
)

# Perform the test itself.
group_manager = GroupManager(backend, wsclient, inventory_database="inv", renamed_group_prefix="test-group-")
with pytest.raises(RuntimeError, match="unexpected name was observed: completely-unexpected-name"):
group_manager.rename_groups()


def test_rename_groups_should_fail_if_error_is_thrown():
backend = MockBackend()
wsclient = create_autospec(WorkspaceClient)
Expand All @@ -307,9 +450,8 @@ def test_rename_groups_should_fail_if_error_is_thrown():
}
wsclient.groups.patch.side_effect = RuntimeError("Something bad")
group_manager = GroupManager(backend, wsclient, inventory_database="inv", renamed_group_prefix="test-group-")
with pytest.raises(ManyError) as e:
with pytest.raises(RuntimeError, match="Something bad"):
group_manager.rename_groups()
assert e.value.args[0] == "Detected 1 failures: RuntimeError: Something bad"


def test_reflect_account_groups_on_workspace_should_be_called_for_eligible_groups():
Expand Down
Loading