-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add migrate-tables
workflow
#1045
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1045 +/- ##
==========================================
- Coverage 88.98% 88.94% -0.05%
==========================================
Files 51 52 +1
Lines 6501 6683 +182
Branches 1169 1197 +28
==========================================
+ Hits 5785 5944 +159
- Misses 466 482 +16
- Partials 250 257 +7 ☔ View full report in Codecov by Sentry. |
src/databricks/labs/ucx/install.py
Outdated
# spark_conf={"spark.sql.sources.parallelPartitionDiscovery.parallelism": "1000"}, | ||
autoscale=compute.AutoScale( # number of executors matters to parallelism of file copy | ||
min_workers=1, | ||
max_workers=10, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this configurable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored, please review
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you put 10 as a variable in config?
src/databricks/labs/ucx/runtime.py
Outdated
|
||
|
||
@task("migrate-tables", job_cluster="migration_sync") | ||
def migrate_views(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend, installation: Installation): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Skip the views for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
) | ||
) | ||
|
||
mock_api_request = create_autospec(PreparedRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrite the code to use "get_workspace_id()" from workspace client and mock that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this. Could you elaborate more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ws.get_workspace_id.return_value = 123
src/databricks/labs/ucx/install.py
Outdated
@@ -779,6 +779,35 @@ def _job_clusters(self, names: set[str]): | |||
), | |||
) | |||
) | |||
if "migration_clone" in names: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need two cluster? Can we have one?
Make sure that override clusters work in the integration tests new_installation()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two types migration:
- In place migrate that just migrate the metadata to UC, like SYNC command, this just need a a small cluster.
- migration that need to do data copy, like deep clone and CTAS for dbfs root tables. We need larger cluster for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use just one cluster, since we default auto scale with one worker.
src/databricks/labs/ucx/install.py
Outdated
# https://databricks.atlassian.net/browse/ES-975874 | ||
# the default 200 partition may not be enough for large tables, but it's hard to | ||
# find a number that fits all. If we need higher parallelism, we can use config below | ||
# spark_conf={"spark.sql.sources.parallelPartitionDiscovery.parallelism": "1000"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it configurable during installer with a default value in workspace config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored, please review
@@ -249,4 +254,7 @@ def trigger(*argv): | |||
) as task_logger: | |||
ucx_logger = logging.getLogger("databricks.labs.ucx") | |||
ucx_logger.info(f"UCX v{__version__} After job finishes, see debug logs at {task_logger}") | |||
current_task.fn(cfg, workspace_client, sql_backend) | |||
if current_task.workflow == "migrate-tables": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't do this. Change signatures of all methods instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
… configure table_migration job clusters. Remove migrate_views workflow task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we make it without a breaking change?
@dataclass | ||
class Task: | ||
task_id: int | ||
workflow: str | ||
name: str | ||
doc: str | ||
fn: Callable[[WorkspaceConfig, WorkspaceClient, SqlBackend], None] | ||
fn: AssessmentFunctions | MigrationFunctions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Revert this change.
@@ -170,6 +173,16 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str): | |||
deployer.deploy_view("table_estimates", "queries/views/table_estimates.sql") | |||
|
|||
|
|||
def load_cluster_specs() -> dict[str, ClusterSpec]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't load something from a yaml file if it can be defined as 5 lines of python code. Revert
@@ -247,6 +260,38 @@ def _configure_new_installation(self) -> WorkspaceConfig: | |||
|
|||
policy_id, instance_profile, spark_conf_dict = self._policy_installer.create(inventory_database) | |||
|
|||
# Load job cluster specifications | |||
cluster_specs = load_cluster_specs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't load from yaml, define it in code
num_workers=0, | ||
policy_id=self.config.policy_id, | ||
) | ||
cluster_specs = self._config.cluster_specs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this gets too fragile. Installer is the critical component and this introduces severe risk of breakage. Revert.
@@ -43,7 +47,7 @@ def setup_tacl(*_): | |||
|
|||
|
|||
@task("assessment", depends_on=[crawl_tables, setup_tacl], job_cluster="tacl") | |||
def crawl_grants(cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBackend): | |||
def crawl_grants(cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def crawl_grants(cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBackend, _installation: Installation): | |
def crawl_grants(cfg: WorkspaceConfig, _: WorkspaceClient, sql_backend: SqlBackend, install: Installation): |
policy_id = v16_config.policy_id | ||
|
||
cluster_specs = load_cluster_specs() | ||
for _, cluster_spec in cluster_specs.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too risky. How can we do it without a breaking change?
) | ||
) | ||
|
||
mock_api_request = create_autospec(PreparedRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ws.get_workspace_id.return_value = 123
@@ -31,7 +32,7 @@ class WorkspaceConfig: # pylint: disable=too-many-instance-attributes | |||
# Starting path for notebooks and directories crawler | |||
workspace_start_path: str = "/" | |||
instance_profile: str | None = None | |||
spark_conf: dict[str, str] | None = None | |||
cluster_specs: dict[str, ClusterSpec] | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change. How can we do without it?
Close this draft PR. Please review the new PR #1051 |
Changes
Add
migrate-tables
workflow. This PR includes the workflow only described in milestone 1 in #1035table_migration_progress
table is out of scope in this PR.Linked issues
Relates #333 #670
Functionality
databricks labs ucx ...
...
...
Tests