Skip to content

Commit

Permalink
Added capabilities for the installer to specify instance profile (AWS…
Browse files Browse the repository at this point in the history
…) and spark config for Glue support.
  • Loading branch information
FastLee committed Oct 25, 2023
1 parent 404f808 commit 9e53db1
Showing 1 changed file with 38 additions and 12 deletions.
50 changes: 38 additions & 12 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
import json
import logging
import os
import re
Expand Down Expand Up @@ -97,7 +96,6 @@ def __init__(self, ws: WorkspaceClient, *, prefix: str = "ucx", promtps: bool =
self._this_file = Path(__file__)
self._override_clusters = None
self._dashboards = {}
self._instance_profile = None

def run(self):
logger.info(f"Installing UCX v{self._version}")
Expand Down Expand Up @@ -288,6 +286,27 @@ def warehouse_type(_):
else:
groups_config_args["auto"] = True

instance_profile = None
spark_conf_list = []
# Options for external metastore
if (
self._prompts
and self._question("Do you need to configure an external Hive Metastore (Glue)", default="no") == "yes"
):
logger.info("Setting up an external metastore")
instance_profiles = self._instance_profiles()
instance_profile = ""
if len(instance_profiles) > 1:
instance_profile = self._choice_from_dict("Select Instance Profile from List", instance_profiles)

spark_conf = self._question(
"Please enter a comma-separated list of spark config options.",
default="",
)
if spark_conf != "":
spark_conf_list = [x.strip() for x in spark_conf.split(",")]


# Checking for external HMS
instance_profile = None
spark_conf_dict = {}
Expand Down Expand Up @@ -327,7 +346,7 @@ def warehouse_type(_):
log_level=log_level,
num_threads=num_threads,
instance_profile=instance_profile,
spark_conf=spark_conf_dict,
spark_config=spark_conf_list,
)

self._write_config()
Expand Down Expand Up @@ -594,12 +613,18 @@ def _job_wheel_task(self, jobs_task: jobs.Task, task: Task, dbfs_path: str) -> j
python_wheel_task=jobs.PythonWheelTask(
package_name="databricks_labs_ucx",
entry_point="runtime", # [project.entry-points.databricks] in pyproject.toml
named_parameters={"task": task.name, "config": f"/Workspace{self._config_file}"},
named_parameters={"task": task.name, "config": f"/Workspace{self._config_file}"} | EXTRA_TASK_PARAMS,
),
)

def _job_clusters(self, names: set[str]):
clusters = []
spark_conf = {"spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]"}
for conf in self._config.spark_conf:
sp_conf = conf.split(" ")
if len(sp_conf) > 1:
continue
spark_conf[sp_conf[0]] = sp_conf[1]
spark_conf = {
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
Expand All @@ -615,7 +640,9 @@ def _job_clusters(self, names: set[str]):
num_workers=0,
)
)

if self._ws.config.is_aws:
aws_attributes = replace(spec.aws_attributes, instance_profile_arn=self._config.instance_profile)
spec = replace(spec, aws_attributes=aws_attributes)
if "main" in names:
clusters.append(
jobs.JobCluster(
Expand Down Expand Up @@ -722,13 +749,7 @@ def _cluster_node_type(self, spec: compute.ClusterSpec) -> compute.ClusterSpec:
return replace(spec, instance_pool_id=cfg.instance_pool_id)
spec = replace(spec, node_type_id=self._ws.clusters.select_node_type(local_disk=True))
if self._ws.config.is_aws:
if cfg.instance_profile is not None:
aws_attributes = compute.AwsAttributes(
availability=compute.AwsAvailability.ON_DEMAND, instance_profile_arn=cfg.instance_profile
)
else:
aws_attributes = compute.AwsAttributes(availability=compute.AwsAvailability.ON_DEMAND)
return replace(spec, aws_attributes=aws_attributes)
return replace(spec, aws_attributes=compute.AwsAttributes(availability=compute.AwsAvailability.ON_DEMAND))
if self._ws.config.is_azure:
return replace(
spec, azure_attributes=compute.AzureAttributes(availability=compute.AzureAvailability.ON_DEMAND_AZURE)
Expand All @@ -752,6 +773,11 @@ def _instance_profiles(self):
profile.instance_profile_arn: profile.instance_profile_arn for profile in self._ws.instance_profiles.list()
}

def _instance_profiles(self):
return {"No Instance Profile": None} | {
profile.instance_profile_arn: profile.instance_profile_arn for profile in self._ws.instance_profiles.list()
}

def _get_ext_hms_confs(self):
for conf in self._ws.cluster_policies.list():
def_json = json.loads(conf.definition)
Expand Down

0 comments on commit 9e53db1

Please sign in to comment.