Skip to content

Commit

Permalink
Use service factory to resolve object dependencies (#1209)
Browse files Browse the repository at this point in the history
This PR reduces the code duplication for object construction

| decorator | autocompletion in PyCharm | singleton behavior | overrides
easy | pylint/mypy compatible |
| --- | --- | --- | --- | --- |
| `@functools.cached_property` | yes | yes | yes | yes |
| `@property` | yes | no | no | yes |
| `@functools.cache` + `@property` | ? | yes | no | yes |
| `@singleton` | no | yes | yes | an extension may be necessary| 
| `@singleton` extending `@property` | _surprisingly,_ no | yes | yes |
an extension may be necessary|
| `@singleton` + `@property` | no | yes | ? | ? |
| `@property` + `@singleton` | no | yes | ? | ? |


Rationale:
- Factories are only to be used to create single-instance objects.
- preferably have them created only once, when necessary. 
- more Spring Boot flavors are possible, but they may prevent code
navination
- `__get__` protocol allows for storage in a target instance property,
making it possible to run unit tests in parallel
  • Loading branch information
nfx authored Apr 8, 2024
1 parent e481be9 commit 55baf86
Show file tree
Hide file tree
Showing 50 changed files with 1,931 additions and 2,344 deletions.
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class AccountWorkspaces:
SYNC_FILE_NAME: ClassVar[str] = "workspaces.json"

def __init__(self, account_client: AccountClient, new_workspace_client=WorkspaceClient):
# TODO: new_workspace_client is a design flaw, remove it
self._new_workspace_client = new_workspace_client
self._ac = account_client

Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def _check_cluster_failures(self, cluster: ClusterDetails, source: str) -> list[


class ClustersCrawler(CrawlerBase[ClusterInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str):
super().__init__(sbe, "hive_metastore", schema, "clusters", ClusterInfo)
self._ws = ws

Expand Down
218 changes: 218 additions & 0 deletions src/databricks/labs/ucx/assessment/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
from databricks.labs.ucx.framework.tasks import Workflow, job_task


class Assessment(Workflow):
def __init__(self):
super().__init__('assessment')

@job_task(notebook="hive_metastore/tables.scala")
def crawl_tables(self, ctx: RuntimeContext):
"""Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such
as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
cannot easily be migrated to Unity Catalog."""

@job_task(job_cluster="tacl")
def setup_tacl(self, ctx: RuntimeContext):
"""(Optimization) Starts `tacl` job cluster in parallel to crawling tables."""

@job_task(depends_on=[crawl_tables, setup_tacl], job_cluster="tacl")
def crawl_grants(self, ctx: RuntimeContext):
"""Scans the previously created Delta table named `$inventory_database.tables` and issues a `SHOW GRANTS`
statement for every object to retrieve the permissions it has assigned to it. The permissions include information
such as the _principal_, _action type_, and the _table_ it applies to. This is persisted in the Delta table
`$inventory_database.grants`. Other, migration related jobs use this inventory table to convert the legacy Table
ACLs to Unity Catalog permissions.
Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table
ACLs enabled and available for retrieval."""
ctx.grants_crawler.snapshot()

@job_task(depends_on=[crawl_tables])
def estimate_table_size_for_migration(self, ctx: RuntimeContext):
"""Scans the previously created Delta table named `$inventory_database.tables` and locate tables that cannot be
"synced". These tables will have to be cloned in the migration process.
Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes.
The table size is a factor in deciding whether to clone these tables."""
ctx.table_size_crawler.snapshot()

@job_task
def crawl_mounts(self, ctx: RuntimeContext):
"""Defines the scope of the _mount points_ intended for migration into Unity Catalog. As these objects are not
compatible with the Unity Catalog paradigm, a key component of the migration process involves transferring them
to Unity Catalog External Locations.
The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently
storing this information in the `$inventory.mounts` table. This is crucial for planning the migration."""
ctx.mounts_crawler.snapshot()

@job_task(depends_on=[crawl_mounts, crawl_tables])
def guess_external_locations(self, ctx: RuntimeContext):
"""Determines the shared path prefixes of all the tables. Specifically, the focus is on identifying locations that
utilize mount points. The goal is to identify the _external locations_ necessary for a successful migration and
store this information in the `$inventory.external_locations` table.
The approach taken in this assessment involves the following steps:
- Extracting all the locations associated with tables that do not use DBFS directly, but a mount point instead
- Scanning all these locations to identify folders that can act as shared path prefixes
- These identified external locations will be created subsequently prior to the actual table migration"""
ctx.external_locations.snapshot()

@job_task
def assess_jobs(self, ctx: RuntimeContext):
"""Scans through all the jobs and identifies those that are not compatible with UC. The list of all the jobs is
stored in the `$inventory.jobs` table.
It looks for:
- Clusters with Databricks Runtime (DBR) version earlier than 11.3
- Clusters using Passthrough Authentication
- Clusters with incompatible Spark config tags
- Clusters referencing DBFS locations in one or more config options
"""
ctx.jobs_crawler.snapshot()

@job_task
def assess_clusters(self, ctx: RuntimeContext):
"""Scan through all the clusters and identifies those that are not compatible with UC. The list of all the clusters
is stored in the`$inventory.clusters` table.
It looks for:
- Clusters with Databricks Runtime (DBR) version earlier than 11.3
- Clusters using Passthrough Authentication
- Clusters with incompatible spark config tags
- Clusters referencing DBFS locations in one or more config options
"""
ctx.clusters_crawler.snapshot()

@job_task
def assess_pipelines(self, ctx: RuntimeContext):
"""This module scans through all the Pipelines and identifies those pipelines which has Azure Service Principals
embedded (who has been given access to the Azure storage accounts via spark configurations) in the pipeline
configurations.
It looks for:
- all the pipelines which has Azure Service Principal embedded in the pipeline configuration
Subsequently, a list of all the pipelines with matching configurations are stored in the
`$inventory.pipelines` table."""
ctx.pipelines_crawler.snapshot()

@job_task
def assess_incompatible_submit_runs(self, ctx: RuntimeContext):
"""This module scans through all the Submit Runs and identifies those runs which may become incompatible after
the workspace attachment.
It looks for:
- All submit runs with DBR >=11.3 and data_security_mode:None
It also combines several submit runs under a single pseudo_id based on hash of the submit run configuration.
Subsequently, a list of all the incompatible runs with failures are stored in the
`$inventory.submit_runs` table."""
ctx.submit_runs_crawler.snapshot()

@job_task
def crawl_cluster_policies(self, ctx: RuntimeContext):
"""This module scans through all the Cluster Policies and get the necessary information
It looks for:
- Clusters Policies with Databricks Runtime (DBR) version earlier than 11.3
Subsequently, a list of all the policies with matching configurations are stored in the
`$inventory.policies` table."""
ctx.policies_crawler.snapshot()

@job_task(cloud="azure")
def assess_azure_service_principals(self, ctx: RuntimeContext):
"""This module scans through all the clusters configurations, cluster policies, job cluster configurations,
Pipeline configurations, Warehouse configuration and identifies all the Azure Service Principals who has been
given access to the Azure storage accounts via spark configurations referred in those entities.
It looks in:
- all those entities and prepares a list of Azure Service Principal embedded in their configurations
Subsequently, the list of all the Azure Service Principals referred in those configurations are saved
in the `$inventory.azure_service_principals` table."""
if ctx.is_azure:
ctx.azure_service_principal_crawler.snapshot()

@job_task
def assess_global_init_scripts(self, ctx: RuntimeContext):
"""This module scans through all the global init scripts and identifies if there is an Azure Service Principal
who has been given access to the Azure storage accounts via spark configurations referred in those scripts.
It looks in:
- the list of all the global init scripts are saved in the `$inventory.azure_service_principals` table."""
ctx.global_init_scripts_crawler.snapshot()

@job_task
def workspace_listing(self, ctx: RuntimeContext):
"""Scans the workspace for workspace objects. It recursively list all sub directories
and compiles a list of directories, notebooks, files, repos and libraries in the workspace.
It uses multi-threading to parallelize the listing process to speed up execution on big workspaces.
It accepts starting path as the parameter defaulted to the root path '/'."""
ctx.workspace_listing.snapshot()

@job_task(depends_on=[crawl_grants, workspace_listing])
def crawl_permissions(self, ctx: RuntimeContext):
"""Scans the workspace-local groups and all their permissions. The list is stored in the `$inventory.permissions`
Delta table.
This is the first step for the _group migration_ process, which is continued in the `migrate-groups` workflow.
This step includes preparing Legacy Table ACLs for local group migration."""
permission_manager = ctx.permission_manager
permission_manager.cleanup()
permission_manager.inventorize_permissions()

@job_task
def crawl_groups(self, ctx: RuntimeContext):
"""Scans all groups for the local group migration scope"""
ctx.group_manager.snapshot()

@job_task(
depends_on=[
crawl_grants,
crawl_groups,
crawl_permissions,
guess_external_locations,
assess_jobs,
assess_incompatible_submit_runs,
assess_clusters,
crawl_cluster_policies,
assess_azure_service_principals,
assess_pipelines,
assess_global_init_scripts,
crawl_tables,
],
dashboard="assessment_main",
)
def assessment_report(self, ctx: RuntimeContext):
"""Refreshes the assessment dashboard after all previous tasks have been completed. Note that you can access the
dashboard _before_ all tasks have been completed, but then only already completed information is shown."""

@job_task(
depends_on=[
assess_jobs,
assess_incompatible_submit_runs,
assess_clusters,
assess_pipelines,
crawl_tables,
],
dashboard="assessment_estimates",
)
def estimates_report(self, ctx: RuntimeContext):
"""Refreshes the assessment dashboard after all previous tasks have been completed. Note that you can access the
dashboard _before_ all tasks have been completed, but then only already completed information is shown."""


class DestroySchema(Workflow):
def __init__(self):
super().__init__('099-destroy-schema')

@job_task
def destroy_schema(self, ctx: RuntimeContext):
"""This _clean-up_ workflow allows to removes the `$inventory` database, with all the inventory tables created by
the previous workflow runs. Use this to reset the entire state and start with the assessment step again."""
ctx.sql_backend.execute(f"DROP DATABASE {ctx.inventory_database} CASCADE")
18 changes: 0 additions & 18 deletions src/databricks/labs/ucx/aws/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,6 @@ def __init__(
self._kms_key = kms_key
self._filename = self.INSTANCE_PROFILES_FILE_NAMES

@classmethod
def for_cli(cls, ws: WorkspaceClient, installation, backend, aws, schema, kms_key=None):
config = installation.load(WorkspaceConfig)
caller_identity = aws.validate_connection()
locations = ExternalLocations(ws, backend, config.inventory_database)
if not caller_identity:
raise ResourceWarning("AWS CLI is not configured properly.")
return cls(
installation,
ws,
backend,
aws,
locations,
schema,
caller_identity.get("Account"),
kms_key,
)

def create_uc_roles_cli(self, *, single_role=True, role_name="UC_ROLE", policy_name="UC_POLICY"):
# Get the missing paths
# Identify the S3 prefixes
Expand Down
28 changes: 1 addition & 27 deletions src/databricks/labs/ucx/aws/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import Prompts
from databricks.labs.lsql.backends import StatementExecutionBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors.platform import InvalidParameterValue
from databricks.sdk.service.catalog import (
Expand All @@ -13,9 +12,8 @@
ValidationResultResult,
)

from databricks.labs.ucx.assessment.aws import AWSResources, AWSRoleAction
from databricks.labs.ucx.assessment.aws import AWSRoleAction
from databricks.labs.ucx.aws.access import AWSResourcePermissions
from databricks.labs.ucx.config import WorkspaceConfig

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -125,30 +123,6 @@ def __init__(
self._resource_permissions = resource_permissions
self._storage_credential_manager = storage_credential_manager

@classmethod
def for_cli(cls, ws: WorkspaceClient, installation: Installation, aws: AWSResources, prompts: Prompts):
if not ws.config.is_aws:
logger.error("Workspace is not on AWS, please run this command on a Databricks on AWS workspaces.")
raise SystemExit()

msg = (
f"Have you reviewed the {AWSResourcePermissions.UC_ROLES_FILE_NAMES} "
"and confirm listed IAM roles to be migrated?"
)
if not prompts.confirm(msg):
raise SystemExit()

config = installation.load(WorkspaceConfig)
sql_backend = StatementExecutionBackend(ws, config.warehouse_id)

resource_permissions = AWSResourcePermissions.for_cli(
ws, installation, sql_backend, aws, config.inventory_database
)

storage_credential_manager = CredentialManager(ws)

return cls(installation, ws, resource_permissions, storage_credential_manager)

@staticmethod
def _print_action_plan(iam_list: list[AWSRoleAction]):
# print action plan to console for customer to review.
Expand Down
23 changes: 4 additions & 19 deletions src/databricks/labs/ucx/azure/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

from databricks.labs.blueprint.installation import Installation
from databricks.labs.blueprint.tui import Prompts
from databricks.labs.lsql.backends import StatementExecutionBackend
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound, ResourceAlreadyExists
from databricks.sdk.service.catalog import Privilege

from databricks.labs.ucx.assessment.crawlers import logger
from databricks.labs.ucx.azure.resources import (
AzureAPIClient,
AzureResource,
AzureResources,
PrincipalSecret,
Expand All @@ -32,14 +30,15 @@ class StoragePermissionMapping:


class AzureResourcePermissions:
FILENAME = 'azure_storage_account_info.csv'

def __init__(
self,
installation: Installation,
ws: WorkspaceClient,
azurerm: AzureResources,
external_locations: ExternalLocations,
):
self._filename = 'azure_storage_account_info.csv'
self._installation = installation
self._locations = external_locations
self._azurerm = azurerm
Expand All @@ -50,20 +49,6 @@ def __init__(
"Storage Blob Data Reader": Privilege.READ_FILES,
}

@classmethod
def for_cli(cls, ws: WorkspaceClient, product='ucx', include_subscriptions=None):
installation = Installation.current(ws, product)
config = installation.load(WorkspaceConfig)
sql_backend = StatementExecutionBackend(ws, config.warehouse_id)
azure_mgmt_client = AzureAPIClient(
ws.config.arm_environment.resource_manager_endpoint,
ws.config.arm_environment.service_management_endpoint,
)
graph_client = AzureAPIClient("https://graph.microsoft.com", "https://graph.microsoft.com")
azurerm = AzureResources(azure_mgmt_client, graph_client, include_subscriptions)
locations = ExternalLocations(ws, sql_backend, config.inventory_database)
return cls(installation, ws, azurerm, locations)

def _map_storage(self, storage: AzureResource) -> list[StoragePermissionMapping]:
logger.info(f"Fetching role assignment for {storage.storage_account}")
out = []
Expand Down Expand Up @@ -103,7 +88,7 @@ def save_spn_permissions(self) -> str | None:
if len(storage_account_infos) == 0:
logger.error("No storage account found in current tenant with spn permission")
return None
return self._installation.save(storage_account_infos, filename=self._filename)
return self._installation.save(storage_account_infos, filename=self.FILENAME)

def _update_cluster_policy_definition(
self,
Expand Down Expand Up @@ -221,7 +206,7 @@ def _create_scope(self, uber_principal: PrincipalSecret, inventory_database: str
self._ws.secrets.put_secret(inventory_database, "uber_principal_secret", string_value=uber_principal.secret)

def load(self):
return self._installation.load(list[StoragePermissionMapping], filename=self._filename)
return self._installation.load(list[StoragePermissionMapping], filename=self.FILENAME)

def _get_storage_accounts(self) -> list[str]:
external_locations = self._locations.snapshot()
Expand Down
Loading

0 comments on commit 55baf86

Please sign in to comment.