Skip to content

Commit 9e6d6f9

Browse files
qziyuandmoore247
authored andcommitted
Add migrate-tables workflow (#1051)
1 parent e5c170c commit 9e6d6f9

File tree

8 files changed

+467
-57
lines changed

8 files changed

+467
-57
lines changed

src/databricks/labs/ucx/config.py

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ class WorkspaceConfig: # pylint: disable=too-many-instance-attributes
3232
workspace_start_path: str = "/"
3333
instance_profile: str | None = None
3434
spark_conf: dict[str, str] | None = None
35+
min_workers: int | None = 1
36+
max_workers: int | None = 10
3537

3638
override_clusters: dict[str, str] | None = None
3739
policy_id: str | None = None

src/databricks/labs/ucx/framework/tasks.py

+29-14
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class Task:
2828
workflow: str
2929
name: str
3030
doc: str
31-
fn: Callable[[WorkspaceConfig, WorkspaceClient, SqlBackend], None]
31+
fn: Callable[[WorkspaceConfig, WorkspaceClient, SqlBackend, Installation], None]
3232
depends_on: list[str] | None = None
3333
job_cluster: str = "main"
3434
notebook: str | None = None
@@ -214,33 +214,36 @@ def _create_lock(lockfile_name):
214214
return f
215215

216216

217-
def trigger(*argv):
217+
def parse_args(*argv) -> dict[str, str]:
218218
args = dict(a[2:].split("=") for a in argv if a[0:2] == "--")
219219
if "config" not in args:
220220
msg = "no --config specified"
221221
raise KeyError(msg)
222+
return args
223+
222224

225+
def run_task(
226+
args: dict[str, str],
227+
install_dir: Path,
228+
cfg: WorkspaceConfig,
229+
workspace_client: WorkspaceClient,
230+
sql_backend: RuntimeBackend,
231+
installation: Installation,
232+
):
223233
task_name = args.get("task", "not specified")
224-
# `{{parent_run_id}}` is the run of entire workflow, whereas `{{run_id}}` is the run of a task
225-
workflow_run_id = args.get("parent_run_id", "unknown_run_id")
226-
job_id = args.get("job_id")
227234
if task_name not in _TASKS:
228235
msg = f'task "{task_name}" not found. Valid tasks are: {", ".join(_TASKS.keys())}'
229236
raise KeyError(msg)
230-
231237
print(f"UCX v{__version__}")
232-
233238
current_task = _TASKS[task_name]
234239
print(current_task.doc)
235240

236-
config_path = Path(args["config"])
237-
238-
cfg = Installation.load_local(WorkspaceConfig, config_path)
239-
sql_backend = RuntimeBackend(debug_truncate_bytes=cfg.connect.debug_truncate_bytes)
240-
workspace_client = WorkspaceClient(config=cfg.connect, product='ucx', product_version=__version__)
241+
# `{{parent_run_id}}` is the run of entire workflow, whereas `{{run_id}}` is the run of a task
242+
workflow_run_id = args.get("parent_run_id", "unknown_run_id")
243+
job_id = args.get("job_id", "unknown_job_id")
241244

242245
with TaskLogger(
243-
config_path.parent,
246+
install_dir,
244247
workflow=current_task.workflow,
245248
workflow_id=job_id,
246249
task_name=task_name,
@@ -249,4 +252,16 @@ def trigger(*argv):
249252
) as task_logger:
250253
ucx_logger = logging.getLogger("databricks.labs.ucx")
251254
ucx_logger.info(f"UCX v{__version__} After job finishes, see debug logs at {task_logger}")
252-
current_task.fn(cfg, workspace_client, sql_backend)
255+
current_task.fn(cfg, workspace_client, sql_backend, installation)
256+
257+
258+
def trigger(*argv):
259+
args = parse_args(*argv)
260+
config_path = Path(args["config"])
261+
262+
cfg = Installation.load_local(WorkspaceConfig, config_path)
263+
sql_backend = RuntimeBackend(debug_truncate_bytes=cfg.connect.debug_truncate_bytes)
264+
workspace_client = WorkspaceClient(config=cfg.connect, product='ucx', product_version=__version__)
265+
installation = Installation.current(workspace_client, "ucx")
266+
267+
run_task(args, config_path.parent, cfg, workspace_client, sql_backend, installation)

src/databricks/labs/ucx/install.py

+61-18
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,26 @@ def _configure_new_installation(self) -> WorkspaceConfig:
278278

279279
policy_id, instance_profile, spark_conf_dict = self._policy_installer.create(inventory_database)
280280

281+
# Save configurable spark_conf for table migration cluster
282+
# parallelism will not be needed if backlog is fixed in https://databricks.atlassian.net/browse/ES-975874
283+
parallelism = self._prompts.question(
284+
"Parallelism for migrating dbfs root delta tables with deep clone", default="200", valid_number=True
285+
)
286+
if not spark_conf_dict:
287+
spark_conf_dict = {}
288+
spark_conf_dict.update({'spark.sql.sources.parallelPartitionDiscovery.parallelism': parallelism})
289+
# mix max workers for auto-scale migration job cluster
290+
min_workers = int(
291+
self._prompts.question(
292+
"Min workers for auto-scale job cluster for table migration", default="1", valid_number=True
293+
)
294+
)
295+
max_workers = int(
296+
self._prompts.question(
297+
"Max workers for auto-scale job cluster for table migration", default="10", valid_number=True
298+
)
299+
)
300+
281301
# Check if terraform is being used
282302
is_terraform_used = self._prompts.confirm("Do you use Terraform to deploy your infrastructure?")
283303

@@ -294,6 +314,8 @@ def _configure_new_installation(self) -> WorkspaceConfig:
294314
num_threads=num_threads,
295315
instance_profile=instance_profile,
296316
spark_conf=spark_conf_dict,
317+
min_workers=min_workers,
318+
max_workers=max_workers,
297319
policy_id=policy_id,
298320
is_terraform_used=is_terraform_used,
299321
include_databases=self._select_databases(),
@@ -798,38 +820,59 @@ def _job_wheel_task(self, jobs_task: jobs.Task, task: Task, remote_wheel: str) -
798820
),
799821
)
800822

823+
def _job_cluster_spark_conf(self, cluster_key: str):
824+
conf_from_installation = self._config.spark_conf if self._config.spark_conf else {}
825+
if cluster_key == "main":
826+
spark_conf = {
827+
"spark.databricks.cluster.profile": "singleNode",
828+
"spark.master": "local[*]",
829+
}
830+
return spark_conf | conf_from_installation
831+
if cluster_key == "tacl":
832+
return {"spark.databricks.acl.sqlOnly": "true"} | conf_from_installation
833+
if cluster_key == "table_migration":
834+
return {"spark.sql.sources.parallelPartitionDiscovery.parallelism": "200"} | conf_from_installation
835+
return conf_from_installation
836+
801837
def _job_clusters(self, names: set[str]):
802838
clusters = []
803-
spark_conf = {
804-
"spark.databricks.cluster.profile": "singleNode",
805-
"spark.master": "local[*]",
806-
}
807-
if self._config.spark_conf is not None:
808-
spark_conf = spark_conf | self._config.spark_conf
809-
spec = compute.ClusterSpec(
810-
data_security_mode=compute.DataSecurityMode.LEGACY_SINGLE_USER,
811-
spark_conf=spark_conf,
812-
custom_tags={"ResourceClass": "SingleNode"},
813-
num_workers=0,
814-
policy_id=self.config.policy_id,
815-
)
816839
if "main" in names:
817840
clusters.append(
818841
jobs.JobCluster(
819842
job_cluster_key="main",
820-
new_cluster=spec,
843+
new_cluster=compute.ClusterSpec(
844+
data_security_mode=compute.DataSecurityMode.LEGACY_SINGLE_USER,
845+
spark_conf=self._job_cluster_spark_conf("main"),
846+
custom_tags={"ResourceClass": "SingleNode"},
847+
num_workers=0,
848+
policy_id=self.config.policy_id,
849+
),
821850
)
822851
)
823852
if "tacl" in names:
824853
clusters.append(
825854
jobs.JobCluster(
826855
job_cluster_key="tacl",
827-
new_cluster=replace(
828-
spec,
856+
new_cluster=compute.ClusterSpec(
829857
data_security_mode=compute.DataSecurityMode.LEGACY_TABLE_ACL,
830-
spark_conf={"spark.databricks.acl.sqlOnly": "true"},
858+
spark_conf=self._job_cluster_spark_conf("tacl"),
831859
num_workers=1, # ShowPermissionsCommand needs a worker
832-
custom_tags={},
860+
policy_id=self.config.policy_id,
861+
),
862+
)
863+
)
864+
if "table_migration" in names:
865+
clusters.append(
866+
jobs.JobCluster(
867+
job_cluster_key="table_migration",
868+
new_cluster=compute.ClusterSpec(
869+
data_security_mode=compute.DataSecurityMode.SINGLE_USER,
870+
spark_conf=self._job_cluster_spark_conf("table_migration"),
871+
policy_id=self.config.policy_id,
872+
autoscale=compute.AutoScale(
873+
max_workers=self.config.max_workers,
874+
min_workers=self.config.min_workers,
875+
),
833876
),
834877
)
835878
)

0 commit comments

Comments
 (0)