Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added databricks labs ucx cluster-remap command to remap legacy cluster configurations to UC-compatible #994

Merged
merged 34 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
321f09a
Adding command to Remap the cluster to UC
prajin-29 Mar 1, 2024
878ac55
Adding command to Remap the cluster to UC
prajin-29 Mar 1, 2024
6dfa642
Adding command to Remap the cluster to UC
prajin-29 Mar 1, 2024
89dc542
Adding command to Remap the cluster to UC
prajin-29 Mar 1, 2024
54ca4db
Adding command to Remap the cluster to UC
prajin-29 Mar 1, 2024
421c31f
Adding command to Remap the cluster to UC
prajin-29 Mar 4, 2024
7f685e3
Adding command to Remap the cluster to UC
prajin-29 Mar 4, 2024
a84421e
Adding command to Remap the cluster to UC
prajin-29 Mar 4, 2024
41ac6b6
writing unit test
prajin-29 Mar 4, 2024
881d54a
writing unit test
prajin-29 Mar 4, 2024
81a2af0
writing unit test
prajin-29 Mar 4, 2024
d62ae3f
Merge branch 'main' into feature/cluster_remap_command
prajin-29 Mar 4, 2024
8a4f46b
writing unit test
prajin-29 Mar 4, 2024
509f5bf
writing unit test
prajin-29 Mar 4, 2024
08a8072
Adding Integration Testing
prajin-29 Mar 4, 2024
b8b96bb
Merge branch 'main' into feature/cluster_remap_command
prajin-29 Mar 15, 2024
8724726
Creating revert cluster remap command
prajin-29 Mar 15, 2024
40b132f
Creating revert cluster remap command
prajin-29 Mar 18, 2024
9aea0f6
Creating revert cluster remap command
prajin-29 Mar 18, 2024
5d2a388
creating Unit Test cases
prajin-29 Mar 18, 2024
4e858f8
creating Unit Test cases
prajin-29 Mar 18, 2024
126cd0a
Changing the logic for iterating to all the clusters
prajin-29 Mar 19, 2024
cc417fa
Changing the logic for cluster remap
prajin-29 Mar 19, 2024
2738e90
Changing the logic for cluster remap
prajin-29 Mar 19, 2024
e283696
Updating the Unit test
prajin-29 Mar 19, 2024
57c841b
Merge branch 'main' into feature/cluster_remap_command
prajin-29 Mar 19, 2024
05101b1
Increasing the test coverage
prajin-29 Mar 20, 2024
51d45dd
Applying the review comments
prajin-29 Mar 20, 2024
53c5bcb
Applying the review comments
prajin-29 Mar 20, 2024
8b61365
Applying the review comments
prajin-29 Mar 20, 2024
6e82ac8
Applying the review comments
prajin-29 Mar 20, 2024
b65d1a5
Using the API only once to fetch the ids and details
prajin-29 Mar 20, 2024
9df440a
Modifying the code based on the comments provided
prajin-29 Mar 20, 2024
23a1b68
Merge branch 'main' into feature/cluster_remap_command
nfx Mar 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,9 @@ commands:
- name: create-catalogs-schemas
description: Create UC external catalogs and schemas based on the destinations created from create_table_mapping command.
This command is supposed to be run before migrating tables to UC.

- name: cluster-remap
description: Re-mapping the cluster to UC

- name: revert-cluster-remap
description: Reverting the Re-mapping of the cluster from UC
39 changes: 39 additions & 0 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrate
from databricks.labs.ucx.hive_metastore.table_move import TableMove
from databricks.labs.ucx.install import WorkspaceInstallation
from databricks.labs.ucx.workspace_access.clusters import ClusterAccess
from databricks.labs.ucx.workspace_access.groups import GroupManager

ucx = App(__file__)
Expand Down Expand Up @@ -504,5 +505,43 @@
catalog_schema.create_catalog_schema()


@ucx.command
def cluster_remap(w: WorkspaceClient, prompts: Prompts):
"""Re-mapping the cluster to UC"""
logger.info("Remapping the Clusters to UC")
installation = Installation.current(w, 'ucx')
cluster = ClusterAccess(installation, w, prompts)
cluster_list = cluster.list_cluster()
print("Cluster Name\tCluster Id")
for name, cluster_id in cluster_list.items():
print(f"{name}\t{cluster_id}")

Check warning on line 517 in src/databricks/labs/ucx/cli.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/cli.py#L517

Added line #L517 was not covered by tests
nfx marked this conversation as resolved.
Show resolved Hide resolved
cluster_ids = prompts.question(
"Please provide the cluster id's as comma separated value from the above list", default="<ALL>"
)
cluster.map_cluster_to_uc(cluster_ids)


@ucx.command
def revert_cluster_remap(w: WorkspaceClient, prompts: Prompts):
"""Reverting Re-mapping of clusters from UC"""
logger.info("Reverting the Remapping of the Clusters from UC")
installation = Installation.current(w, 'ucx')
cluster_ids = [
cluster_files.path.split("/")[-1].split(".")[0]
for cluster_files in installation.files()
if cluster_files.path is not None and cluster_files.path.find("backup/clusters") > 0
]
if not cluster_ids:
logger.info("There is no cluster files in the backup folder.Skipping the reverting process")
nfx marked this conversation as resolved.
Show resolved Hide resolved
return
for cluster in cluster_ids:
print(cluster)
nfx marked this conversation as resolved.
Show resolved Hide resolved
cluster_list = prompts.question(
"Please provide the cluster id's as comma separated value from the above list", default="<ALL>"
)
cluster_class = ClusterAccess(installation, w, prompts)
nfx marked this conversation as resolved.
Show resolved Hide resolved
cluster_class.revert_cluster_remap(cluster_list, cluster_ids)


if __name__ == "__main__":
ucx()
117 changes: 117 additions & 0 deletions src/databricks/labs/ucx/workspace_access/clusters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import logging

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import Prompts
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import InvalidParameterValue
from databricks.sdk.service.compute import ClusterDetails, DataSecurityMode

logger = logging.getLogger(__name__)


class ClusterAccess:
def __init__(self, installation: Installation, ws: WorkspaceClient, prompts: Prompts):
self._ws = ws
self._prompts = prompts
self._installation = installation

def list_cluster(self):
cluster_list = {}
clusters = self._ws.clusters.list()
nfx marked this conversation as resolved.
Show resolved Hide resolved
for cluster in clusters:
if cluster.cluster_source is not None and cluster.cluster_source.name != "JOB":
cluster_list[cluster.cluster_name] = cluster.cluster_id

Check warning on line 23 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L23

Added line #L23 was not covered by tests
nfx marked this conversation as resolved.
Show resolved Hide resolved
return cluster_list

def _get_cluster_id(self, cluster_id: str):
if cluster_id != "<ALL>":
return [x.strip() for x in cluster_id.split(",")]
cluster_list = []
clusters = self._ws.clusters.list()

Check warning on line 30 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L29-L30

Added lines #L29 - L30 were not covered by tests
nfx marked this conversation as resolved.
Show resolved Hide resolved
for cluster in clusters:
if cluster.cluster_source is not None and cluster.cluster_source.name != "JOB":
cluster_list.append(cluster.cluster_id)
return cluster_list

Check warning on line 34 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L33-L34

Added lines #L33 - L34 were not covered by tests

def _get_access_mode(self, access_mode: str):
if access_mode in {"LEGACY_SINGLE_USER", "SINGLE_USER"}:
return DataSecurityMode.SINGLE_USER
return DataSecurityMode.USER_ISOLATION

Check warning on line 39 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L38-L39

Added lines #L38 - L39 were not covered by tests

def map_cluster_to_uc(self, cluster_id: str):

nfx marked this conversation as resolved.
Show resolved Hide resolved
cluster_id_list = self._get_cluster_id(cluster_id)
spark_version = self._ws.clusters.select_spark_version(latest=True)
nfx marked this conversation as resolved.
Show resolved Hide resolved
for cluster in cluster_id_list:
try:
cluster_details = self._ws.clusters.get(cluster)
if cluster_details.data_security_mode is None:
logger.info(f"Data security Mode is None. Skipping the remapping for the cluster: {cluster}")
continue
access_mode = self._get_access_mode(cluster_details.data_security_mode.name)
self._installation.save(cluster_details, filename=f'backup/clusters/{cluster_details.cluster_id}.json')
self._ws.clusters.edit(

Check warning on line 53 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L51-L53

Added lines #L51 - L53 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to make this resilient to cluster API changes? The Cluster UI and options are constantly changing, how can this code just focus on the specifics of credentials, spark configs and data security mode and pass through all the other configuration stuff. Will this code break frequently? [I don't know]

cluster_id=cluster,
cluster_name=cluster_details.cluster_name,
spark_version=spark_version,
num_workers=cluster_details.num_workers,
spark_conf=cluster_details.spark_conf,
spark_env_vars=cluster_details.spark_env_vars,
data_security_mode=access_mode,
node_type_id=cluster_details.node_type_id,
autoscale=cluster_details.autoscale,
policy_id=cluster_details.policy_id,
autotermination_minutes=cluster_details.autotermination_minutes,
custom_tags=cluster_details.custom_tags,
init_scripts=cluster_details.init_scripts,
cluster_log_conf=cluster_details.cluster_log_conf,
aws_attributes=cluster_details.aws_attributes,
ssh_public_keys=cluster_details.ssh_public_keys,
enable_elastic_disk=cluster_details.enable_elastic_disk,
cluster_source=cluster_details.cluster_source,
instance_pool_id=cluster_details.instance_pool_id,
enable_local_disk_encryption=cluster_details.enable_local_disk_encryption,
driver_instance_pool_id=cluster_details.driver_instance_pool_id,
)
except InvalidParameterValue as e:
logger.warning(f"skipping cluster remapping: {e}")

Check warning on line 77 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L76-L77

Added lines #L76 - L77 were not covered by tests
nfx marked this conversation as resolved.
Show resolved Hide resolved

def revert_cluster_remap(self, cluster_ids: str, total_cluster_ids: list):
if cluster_ids != "<ALL>":
cluster_list = [x.strip() for x in cluster_ids.split(",")]
else:
cluster_list = total_cluster_ids

Check warning on line 83 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L83

Added line #L83 was not covered by tests
logger.info(f"Reverting the configurations for the cluster {cluster_list}")
for cluster in cluster_list:
try:
cluster_details = self._installation.load(ClusterDetails, filename=f"/backup/clusters/{cluster}.json")
nfx marked this conversation as resolved.
Show resolved Hide resolved
if cluster_details.spark_version is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark version may be absent if cluster is using a policy, so this line is not necessary

raise InvalidParameterValue("cluster does not have spark version")

Check warning on line 89 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L89

Added line #L89 was not covered by tests
nfx marked this conversation as resolved.
Show resolved Hide resolved
if cluster_details.cluster_id is None:
raise InvalidParameterValue("cluster Id is not present in the config file")

Check warning on line 91 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L91

Added line #L91 was not covered by tests
nfx marked this conversation as resolved.
Show resolved Hide resolved
num_workers = cluster_details.num_workers if cluster_details.num_workers else 0
self._ws.clusters.edit(
cluster_id=cluster_details.cluster_id,
cluster_name=cluster_details.cluster_name,
spark_version=cluster_details.spark_version,
num_workers=num_workers,
spark_conf=cluster_details.spark_conf,
spark_env_vars=cluster_details.spark_env_vars,
data_security_mode=cluster_details.data_security_mode,
node_type_id=cluster_details.node_type_id,
autoscale=cluster_details.autoscale,
policy_id=cluster_details.policy_id,
autotermination_minutes=cluster_details.autotermination_minutes,
custom_tags=cluster_details.custom_tags,
init_scripts=cluster_details.init_scripts,
cluster_log_conf=cluster_details.cluster_log_conf,
aws_attributes=cluster_details.aws_attributes,
ssh_public_keys=cluster_details.ssh_public_keys,
enable_elastic_disk=cluster_details.enable_elastic_disk,
cluster_source=cluster_details.cluster_source,
instance_pool_id=cluster_details.instance_pool_id,
enable_local_disk_encryption=cluster_details.enable_local_disk_encryption,
driver_instance_pool_id=cluster_details.driver_instance_pool_id,
)
except InvalidParameterValue as e:
logger.warning(f"skipping cluster remapping: {e}")

Check warning on line 117 in src/databricks/labs/ucx/workspace_access/clusters.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/workspace_access/clusters.py#L117

Added line #L117 was not covered by tests
30 changes: 30 additions & 0 deletions tests/unit/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,20 @@

import pytest
import yaml
from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import MockPrompts
from databricks.sdk import AccountClient, WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.service import iam, sql
from databricks.sdk.service.compute import ClusterDetails
from databricks.sdk.service.workspace import ObjectInfo

from databricks.labs.ucx.assessment.aws import AWSResources
from databricks.labs.ucx.aws.access import AWSResourcePermissions
from databricks.labs.ucx.azure.access import AzureResourcePermissions
from databricks.labs.ucx.cli import (
alias,
cluster_remap,
create_account_groups,
create_catalogs_schemas,
create_table_mapping,
Expand All @@ -28,6 +32,7 @@
open_remote_config,
principal_prefix_access,
repair_run,
revert_cluster_remap,
revert_migrated_tables,
skip,
sync_workspace_info,
Expand Down Expand Up @@ -439,3 +444,28 @@ def test_create_catalogs_schemas(ws):
prompts = MockPrompts({'.*': 's3://test'})
create_catalogs_schemas(ws, prompts)
ws.catalogs.list.assert_called_once()


def test_cluster_remap(ws, caplog):
prompts = MockPrompts({"Please provide the cluster id's as comma separated value from the above list.*": "1"})
ws = create_autospec(WorkspaceClient)
ws.clusters.get.return_value = ClusterDetails(cluster_id="123", cluster_name="test_cluster")
installation = create_autospec(Installation)
installation.save.return_value = "a/b/c"
cluster_remap(ws, prompts)
assert "Remapping the Clusters to UC" in caplog.messages


def test_revert_cluster_remap(ws, caplog, mocker):
prompts = MockPrompts({"Please provide the cluster id's as comma separated value from the above list.*": "1"})
ws = create_autospec(WorkspaceClient)
ws.workspace.list.return_value = [ObjectInfo(path='/ucx/backup/clusters/123.json')]
with pytest.raises(TypeError):
revert_cluster_remap(ws, prompts)


def test_revert_cluster_remap_empty(ws, caplog):
prompts = MockPrompts({"Please provide the cluster id's as comma separated value from the above list.*": "1"})
ws = create_autospec(WorkspaceClient)
revert_cluster_remap(ws, prompts)
assert "There is no cluster files in the backup folder.Skipping the reverting process" in caplog.messages
41 changes: 41 additions & 0 deletions tests/unit/workspace_access/test_clusters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from unittest.mock import create_autospec

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import MockPrompts
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import ClusterDetails

from databricks.labs.ucx.workspace_access.clusters import ClusterAccess


nfx marked this conversation as resolved.
Show resolved Hide resolved
def test_map_cluster_to_uc():
ws = create_autospec(WorkspaceClient)
ws.clusters.get.return_value = ClusterDetails(cluster_id="123", cluster_name="test_cluster")
prompts = MockPrompts({})
installation = create_autospec(Installation)
installation.save.return_value = "a/b/c"
cluster = ClusterAccess(installation, ws, prompts)
cluster.map_cluster_to_uc(cluster_id="123")


def test_map_cluster_to_uc_error(caplog):
ws = create_autospec(WorkspaceClient)
ws.clusters.get.return_value = ClusterDetails(cluster_id="123", cluster_name="test_cluster")
prompts = MockPrompts({})
installation = create_autospec(Installation)
installation.save.return_value = "a/b/c"
cluster = ClusterAccess(installation, ws, prompts)
with caplog.at_level('INFO'):
cluster.map_cluster_to_uc("123")
assert 'Data security Mode is None. Skipping the remapping for the cluster: 123' in caplog.messages


def test_revert_map_cluster_to_uc(caplog):
ws = create_autospec(WorkspaceClient)
installation = create_autospec(Installation)
prompts = MockPrompts({})
installation.load.return_value = ClusterDetails(
cluster_id="123", cluster_name="test_cluster", spark_version="13.3.x-cpu-ml-scala2.12"
)
cluster = ClusterAccess(installation, ws, prompts)
cluster.revert_cluster_remap(cluster_ids="123", total_cluster_ids=["123"])
Loading