Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for Migrating Table ACL of Interactive clusters using SPN #1077

Merged
merged 32 commits into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
11f85bc
initial commit
HariGS-DB Mar 20, 2024
a2c41ea
merge
HariGS-DB Mar 23, 2024
4f4dccd
moving code to Grants and generating Grant objects
HariGS-DB Mar 24, 2024
3f7a461
moving code to Grants and generating Grant objects
HariGS-DB Mar 25, 2024
30847b8
Merge branch 'main' into feature/907
HariGS-DB Mar 26, 2024
d60d266
azure spn test case
HariGS-DB Mar 27, 2024
b8d3095
Merge branch 'main' into feature/907
HariGS-DB Mar 27, 2024
7c73578
added ACLPrincipal test cases
HariGS-DB Mar 28, 2024
0b76645
merging
HariGS-DB Mar 28, 2024
72671a9
adding changes to TableMigrate and unit test cases
HariGS-DB Mar 28, 2024
4b6bd95
fmting
HariGS-DB Mar 28, 2024
54ff3cb
integration test
HariGS-DB Mar 29, 2024
3798cdd
merges
HariGS-DB Mar 29, 2024
f8e5725
formating
HariGS-DB Mar 29, 2024
1bbe118
handling scenarios for mounts
HariGS-DB Mar 29, 2024
390d646
Merge branch 'main' into feature/907
HariGS-DB Mar 29, 2024
a65ea07
review comments
HariGS-DB Mar 30, 2024
4e71d6a
merge
HariGS-DB Mar 30, 2024
fb281e7
passing sql_backend in the cli
HariGS-DB Mar 30, 2024
69cd19a
calling init directly in runtime
HariGS-DB Mar 30, 2024
bd561e6
fmting
HariGS-DB Mar 30, 2024
4f6c7f4
spiliting big method into two to reduce pylint warning
HariGS-DB Mar 30, 2024
752797d
review comments
HariGS-DB Mar 30, 2024
be7e757
naming standards
HariGS-DB Mar 30, 2024
4abe7af
fixing storage account extraction in AzureACL
HariGS-DB Mar 30, 2024
cf65f47
timeout change to 3 mins
HariGS-DB Mar 30, 2024
b23d2d6
fixes to int test failues
HariGS-DB Mar 31, 2024
4d2ba61
removing circular references for Grants crawler and fixing test_insta…
HariGS-DB Mar 31, 2024
f2af302
removing circular references for Grants crawler and fixing test_insta…
HariGS-DB Mar 31, 2024
543f07c
Merge branch 'main' into feature/907
HariGS-DB Mar 31, 2024
50bb6b2
removing connection parameter in config.yml
HariGS-DB Mar 31, 2024
a19aaeb
Update __init__.py
nfx Mar 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,8 @@ commands:
- name: create-catalogs-schemas
description: Create UC external catalogs and schemas based on the destinations created from create_table_mapping command.
This command is supposed to be run before migrating tables to UC.

- name: migrate-table-acl
description: Migrates legacy ACL on pre-uc clusters to the new UC ACL model. Interactive clusters reading/writing to external storage
use service principal / instance profiles to access the underlying data. Users get access to the data by having permission to the cluster.
This command will convert those access to UC ACL for the identified tables and users groups.
24 changes: 24 additions & 0 deletions src/databricks/labs/ucx/assessment/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ class AzureServicePrincipalInfo:
storage_account: str | None = None


@dataclass()
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
class AzureServicePrincipalClusterMapping:
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
# this class is created separately as we need cluster to spn mapping
# Cluster id where the spn is used
cluster_id: str
# spn info data class
spn_info: set[AzureServicePrincipalInfo]


class AzureServicePrincipalCrawler(CrawlerBase[AzureServicePrincipalInfo], JobsMixin, SecretsMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo)
Expand Down Expand Up @@ -171,3 +180,18 @@ def _get_azure_spn_from_config(self, config: dict) -> set[AzureServicePrincipalI
)
)
return set_service_principals

def get_cluster_to_storage_mapping(self):
# this function gives a mapping between an interactive cluster and the spn used by it
# either directly or through a cluster policy.
set_service_principals = set[AzureServicePrincipalInfo]()
spn_cluster_mapping = []
for cluster in self._ws.clusters.list():
if cluster.cluster_source != ClusterSource.JOB and (
cluster.data_security_mode.LEGACY_SINGLE_USER or cluster.data_security_mode.NONE
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
):
set_service_principals = self._get_azure_spn_from_cluster_config(cluster)
spn_cluster_mapping.append(
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
AzureServicePrincipalClusterMapping(cluster.cluster_id, set_service_principals)
)
return spn_cluster_mapping
19 changes: 19 additions & 0 deletions src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from databricks.labs.ucx.hive_metastore import ExternalLocations, TablesCrawler
from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.table_acl_migrate import TableACLMigrate
from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrate
from databricks.labs.ucx.hive_metastore.table_move import TableMove
from databricks.labs.ucx.install import WorkspaceInstallation
Expand Down Expand Up @@ -504,5 +505,23 @@ def create_catalogs_schemas(w: WorkspaceClient, prompts: Prompts):
catalog_schema.create_catalog_schema()


@ucx.command
def migrate_table_acl(w: WorkspaceClient, prompts: Prompts):
"""This command migrates legacy ACL on pre-uc clusters to the new UC ACL model. Interactive clusters reading/writing
to external storage use service principal / instance profiles to access the underlying data. Users get access to
the data by having permission to the cluster. This command will convert those access to UC ACL for the identified
tables and users groups.
"""
installation = Installation.current(w, 'ucx')
if w.config.is_azure:
logger.info("Running migrate_table_acl for Azure")
table_acl_migrate = TableACLMigrate.for_cli(w, installation, prompts)
table_acl_migrate.migrate_cluster_acl()
if w.config.is_aws:
logger.error("Running migrate_table_acl for AWS")
if w.config.is_gcp:
logger.error("migrate_table_acl is not yet supported in GCP")


if __name__ == "__main__":
ucx()
174 changes: 174 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/table_acl_migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import logging
from dataclasses import dataclass

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import Prompts
from databricks.labs.lsql.backends import SqlBackend, StatementExecutionBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import ExternalLocationInfo

from databricks.labs.ucx.assessment.azure import (
AzureServicePrincipalClusterMapping,
AzureServicePrincipalCrawler,
AzureServicePrincipalInfo,
)
from databricks.labs.ucx.azure.access import (
AzureResourcePermissions,
StoragePermissionMapping,
)
from databricks.labs.ucx.azure.resources import AzureAPIClient, AzureResources
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.hive_metastore import ExternalLocations
from databricks.labs.ucx.hive_metastore.mapping import TableMapping

logger = logging.getLogger(__name__)


@dataclass
class IAMPrincipal:
# user or group or spn
principal_type: str
# id of the principal
principal_id: str


@dataclass
class ClusterPrincipalMapping:
# cluster id
cluster_id: str
# list of all principals that has access to this cluster id
principals: list[IAMPrincipal]


class TableACLMigrate:
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
def __init__(
self,
ws: WorkspaceClient,
backend: SqlBackend,
installation: Installation,
spn_crawler: AzureServicePrincipalCrawler,
resource_permission: AzureResourcePermissions,
table_mapping: TableMapping,
):
self._backend = backend
self._ws = ws
self._spn_crawler = spn_crawler
self._installation = installation
self._resource_permission = resource_permission
self._table_mapping = table_mapping

@classmethod
def for_cli(cls, ws: WorkspaceClient, installation: Installation, prompts: Prompts):
msg = (
"This cmd will migrate acl for all interactive clusters to the related storage account tables, "
"Please confirm "
)
if not prompts.confirm(msg):
return None

config = installation.load(WorkspaceConfig)
sql_backend = StatementExecutionBackend(ws, config.warehouse_id)
azure_client = AzureAPIClient(
ws.config.arm_environment.resource_manager_endpoint,
ws.config.arm_environment.service_management_endpoint,
)
graph_client = AzureAPIClient("https://graph.microsoft.com", "https://graph.microsoft.com")
azurerm = AzureResources(azure_client, graph_client)
locations = ExternalLocations(ws, sql_backend, config.inventory_database)

resource_permissions = AzureResourcePermissions(installation, ws, azurerm, locations)
spn_crawler = AzureServicePrincipalCrawler(ws, sql_backend, config.inventory_database)
table_mapping = TableMapping(installation, ws, sql_backend)

return cls(ws, sql_backend, installation, spn_crawler, resource_permissions, table_mapping)

def migrate_cluster_acl(self):
spn_cluster_mapping = self._spn_crawler.get_cluster_to_storage_mapping()
external_locations = self._get_principal_prefix(spn_cluster_mapping)

if len(spn_cluster_mapping) == 0:
logger.info("There are no interactive clusters configured with service principals")
return
for cluster_spn in spn_cluster_mapping:
# user_mapping = self._get_cluster_principal_mapping(cluster_spn.cluster_id)
logger.info(f"Applying UC ACL for cluster id {cluster_spn.cluster_id}")
for spn in cluster_spn.spn_info:
self._apply_uc_permission(external_locations)

def _get_principal_prefix(
self, spn_cluster_mapping: list[AzureServicePrincipalClusterMapping]
) -> list[StoragePermissionMapping]:
set_service_principals = set[AzureServicePrincipalInfo]()
for spn_mapping in spn_cluster_mapping:
set_service_principals.update(spn_mapping.spn_info)
external_locations = []
permission_mappings = self._resource_permission.load()
for spn in set_service_principals:
for perm_mapping in permission_mappings:
if perm_mapping.client_id == spn.application_id and spn.storage_account in perm_mapping.prefix:
external_locations.append(perm_mapping)
return external_locations

def _get_tables(self, locations: list[ExternalLocationInfo]):
matching_tables = []
tables = self._table_mapping.load()
for table in tables:
table_name = f"{table.catalog_name}.{table.dst_schema}.{table.dst_table}"
table_info = self._ws.tables.get(table_name)
for location in locations:
assert location.url is not None
if table_info.storage_location is not None and table_info.storage_location.startswith(location.url):
matching_tables.append(location)
return matching_tables

def _get_external_location(self, locations: list[StoragePermissionMapping]) -> list[ExternalLocationInfo]:
matching_location = []
for location in self._ws.external_locations.list():
for principal_prefix in locations:
assert location.url is not None
if location.url.startswith(principal_prefix.prefix):
matching_location.append(location)
return matching_location

def _apply_uc_permission(
self,
locations: list[StoragePermissionMapping],
):
matching_external_locations = self._get_external_location(locations)
matching_tables = self._get_tables(matching_external_locations)
print(matching_tables)

def _get_cluster_principal_mapping(self, cluster_id: str) -> list[IAMPrincipal]:
# gets all the users,groups,spn which have access to the clusters and returns a dataclass of that mapping
principal_list = []
cluster_permission = self._ws.permissions.get("clusters", cluster_id)
if cluster_permission.access_control_list is None:
return []
for acl in cluster_permission.access_control_list:
if acl.user_name is not None:
principal_list.append(IAMPrincipal("user", acl.user_name))
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
if acl.group_name is not None:
principal_list.append(IAMPrincipal("group", acl.group_name))
if acl.service_principal_name is not None:
principal_list.append(IAMPrincipal("spn", acl.service_principal_name))
return principal_list

def _get_spn_permission(self, spn_cluster_mapping: list[AzureServicePrincipalClusterMapping]):
set_service_principals = set[AzureServicePrincipalInfo]()
for spn in spn_cluster_mapping:
set_service_principals.update(spn.spn_info)
cluster_user_mapping = []
for cluster in spn_cluster_mapping:
principal_list = []
cluster_permission = self._ws.permissions.get("clusters", cluster.cluster_id)
if cluster_permission.access_control_list is None:
return []
for acl in cluster_permission.access_control_list:
if acl.user_name is not None:
principal_list.append(IAMPrincipal("user", acl.user_name))
if acl.group_name is not None:
principal_list.append(IAMPrincipal("group", acl.group_name))
if acl.service_principal_name is not None:
principal_list.append(IAMPrincipal("spn", acl.service_principal_name))
cluster_user_mapping.append(ClusterPrincipalMapping(cluster.cluster_id, principal_list))
return cluster_user_mapping