Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added baseline for workflow linter #1613

Merged
merged 26 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
AwsACL,
)
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.migration_status import MigrationIndex
from databricks.labs.ucx.hive_metastore.table_migrate import (
MigrationStatusRefresher,
TablesMigrator,
Expand All @@ -33,12 +34,16 @@
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler
from databricks.labs.ucx.hive_metastore.verification import VerifyHasMetastore
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookResolver, NotebookLoader, WorkspaceNotebookLoader
from databricks.labs.ucx.source_code.jobs import WorkflowLinter
from databricks.labs.ucx.source_code.notebooks.loaders import (
NotebookResolver,
NotebookLoader,
)
from databricks.labs.ucx.source_code.files import FileLoader, LocalFileResolver
from databricks.labs.ucx.source_code.path_lookup import PathLookup
from databricks.labs.ucx.source_code.graph import DependencyResolver, DependencyGraphBuilder
from databricks.labs.ucx.source_code.whitelist import WhitelistResolver, Whitelist
from databricks.labs.ucx.source_code.site_packages import SitePackagesResolver, SitePackages
from databricks.labs.ucx.source_code.site_packages import SitePackageResolver, SitePackages
from databricks.labs.ucx.source_code.languages import Languages
from databricks.labs.ucx.workspace_access import generic, redash
from databricks.labs.ucx.workspace_access.groups import GroupManager
Expand Down Expand Up @@ -346,7 +351,7 @@

@cached_property
def notebook_loader(self) -> NotebookLoader:
return WorkspaceNotebookLoader(self.workspace_client)
return NotebookLoader()

Check warning on line 354 in src/databricks/labs/ucx/contexts/application.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/contexts/application.py#L354

Added line #L354 was not covered by tests

@cached_property
def notebook_resolver(self):
Expand All @@ -364,11 +369,11 @@

@cached_property
def file_loader(self):
return FileLoader(self.path_lookup)
return FileLoader()

Check warning on line 372 in src/databricks/labs/ucx/contexts/application.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/contexts/application.py#L372

Added line #L372 was not covered by tests

@cached_property
def site_packages_resolver(self):
return SitePackagesResolver(self.site_packages, self.file_loader, self.path_lookup)
return SitePackageResolver(self.site_packages, self.file_loader, self.path_lookup)

Check warning on line 376 in src/databricks/labs/ucx/contexts/application.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/contexts/application.py#L376

Added line #L376 was not covered by tests

@cached_property
def whitelist(self):
Expand All @@ -385,11 +390,10 @@

@cached_property
def dependency_resolver(self):
# TODO: link back self.site_packages_resolver and self.whitelist_resolver,
return DependencyResolver(
[
self.notebook_resolver,
self.site_packages_resolver,
self.whitelist_resolver,
self.file_resolver,
]
)
Expand All @@ -398,6 +402,15 @@
def dependency_graph_builder(self):
return DependencyGraphBuilder(self.dependency_resolver, self.path_lookup)

@cached_property
def workflow_linter(self):
return WorkflowLinter(

Check warning on line 407 in src/databricks/labs/ucx/contexts/application.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/contexts/application.py#L407

Added line #L407 was not covered by tests
self.workspace_client,
self.dependency_resolver,
self.path_lookup,
MigrationIndex([]), # TODO: bring back self.tables_migrator.index()
)


class CliContext(GlobalContext, abc.ABC):
@cached_property
Expand Down
5 changes: 0 additions & 5 deletions src/databricks/labs/ucx/contexts/workspace_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from databricks.labs.ucx.azure.resources import AzureAPIClient, AzureResources
from databricks.labs.ucx.contexts.application import CliContext
from databricks.labs.ucx.source_code.files import LocalFileMigrator
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookLoader, LocalNotebookLoader
from databricks.labs.ucx.workspace_access.clusters import ClusterAccess


Expand Down Expand Up @@ -163,7 +162,3 @@ def iam_role_creation(self):
self.workspace_client,
self.aws_resource_permissions,
)

@cached_property
def notebook_loader(self) -> NotebookLoader:
return LocalNotebookLoader(self.path_lookup)
2 changes: 2 additions & 0 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from databricks.labs.ucx.installer.policy import ClusterPolicyInstaller
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment
from databricks.labs.ucx.runtime import Workflows
from databricks.labs.ucx.source_code.jobs import JobProblem
from databricks.labs.ucx.workspace_access.base import Permissions
from databricks.labs.ucx.workspace_access.generic import WorkspaceObjectInfo
from databricks.labs.ucx.workspace_access.groups import ConfigureGroups, MigratedGroup
Expand Down Expand Up @@ -101,6 +102,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
functools.partial(table, "submit_runs", SubmitRunInfo),
functools.partial(table, "policies", PolicyInfo),
functools.partial(table, "migration_status", MigrationStatus),
functools.partial(table, "workflow_problems", JobProblem),
functools.partial(table, "udfs", Udf),
functools.partial(table, "logs", LogRecord),
],
Expand Down
52 changes: 22 additions & 30 deletions src/databricks/labs/ucx/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ def create(
) -> str:
if path is None:
path = f"/Users/{ws.current_user.me().user_name}/sdk-{make_random(4)}"
elif isinstance(path, pathlib.Path):
nfx marked this conversation as resolved.
Show resolved Hide resolved
path = str(path)
if content is None:
content = io.BytesIO(b"print(1)")
path = str(path)
Expand Down Expand Up @@ -754,43 +756,33 @@ def create(*, instance_pool_name=None, node_type_id=None, **kwargs):

@pytest.fixture
def make_job(ws, make_random, make_notebook):
def create(**kwargs):
def create(notebook_path: str | None = None, **kwargs):
nfx marked this conversation as resolved.
Show resolved Hide resolved
task_spark_conf = None
if "name" not in kwargs:
kwargs["name"] = f"sdk-{make_random(4)}"
if "spark_conf" in kwargs:
task_spark_conf = kwargs["spark_conf"]
kwargs.pop("spark_conf")
if isinstance(notebook_path, pathlib.Path):
notebook_path = str(notebook_path)
if not notebook_path:
notebook_path = make_notebook()
assert notebook_path is not None
if "tasks" not in kwargs:
if task_spark_conf:
kwargs["tasks"] = [
jobs.Task(
task_key=make_random(4),
description=make_random(4),
new_cluster=compute.ClusterSpec(
num_workers=1,
node_type_id=ws.clusters.select_node_type(local_disk=True, min_memory_gb=16),
spark_version=ws.clusters.select_spark_version(latest=True),
spark_conf=task_spark_conf,
),
notebook_task=jobs.NotebookTask(notebook_path=make_notebook()),
timeout_seconds=0,
)
]
else:
kwargs["tasks"] = [
jobs.Task(
task_key=make_random(4),
description=make_random(4),
new_cluster=compute.ClusterSpec(
num_workers=1,
node_type_id=ws.clusters.select_node_type(local_disk=True, min_memory_gb=16),
spark_version=ws.clusters.select_spark_version(latest=True),
),
notebook_task=jobs.NotebookTask(notebook_path=make_notebook()),
timeout_seconds=0,
)
]
kwargs["tasks"] = [
jobs.Task(
task_key=make_random(4),
description=make_random(4),
new_cluster=compute.ClusterSpec(
num_workers=1,
node_type_id=ws.clusters.select_node_type(local_disk=True, min_memory_gb=16),
spark_version=ws.clusters.select_spark_version(latest=True),
spark_conf=task_spark_conf,
),
notebook_task=jobs.NotebookTask(notebook_path=notebook_path),
timeout_seconds=0,
)
]
job = ws.jobs.create(**kwargs)
logger.info(f"Job: {ws.config.host}#job/{job.job_id}")
return job
Expand Down
28 changes: 20 additions & 8 deletions src/databricks/labs/ucx/mixins/wspath.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from functools import cached_property

# pylint: disable-next=import-private-name
from pathlib import Path, _PosixFlavour, _Accessor # type: ignore
from pathlib import Path, _PosixFlavour # type: ignore
from urllib.parse import quote_from_bytes as urlquote_from_bytes
from io import BytesIO, StringIO

from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.errors import NotFound, DatabricksError
from databricks.sdk.service.workspace import ObjectInfo, ObjectType, ExportFormat, ImportFormat, Language

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,7 +74,7 @@ def __exit__(self, *args):
pass


class _DatabricksAccessor(_Accessor):
class _DatabricksAccessor:
chmod = _na('accessor.chmod')
getcwd = _na('accessor.getcwd')
group = _na('accessor.group')
Expand Down Expand Up @@ -286,8 +286,11 @@ def suffix(self):
if not self.is_notebook():
return ""
for sfx, lang in self._SUFFIXES.items():
if self._object_info.language == lang:
return sfx
try:
if self._object_info.language == lang:
return sfx
except DatabricksError:
return ""
return ""

def __lt__(self, other: pathlib.PurePath):
Expand All @@ -313,13 +316,22 @@ def _return_false(self) -> bool:
is_junction = _return_false

def is_dir(self):
return self._object_info.object_type == ObjectType.DIRECTORY
try:
return self._object_info.object_type == ObjectType.DIRECTORY
except DatabricksError:
return False

def is_file(self):
return self._object_info.object_type == ObjectType.FILE
try:
return self._object_info.object_type == ObjectType.FILE
except DatabricksError:
return False

def is_notebook(self):
return self._object_info.object_type == ObjectType.NOTEBOOK
try:
return self._object_info.object_type == ObjectType.NOTEBOOK
except DatabricksError:
return False

def __eq__(self, other):
return isinstance(other, Path) and self.as_posix() == other.as_posix()
Expand Down
2 changes: 2 additions & 0 deletions src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
MigrateHiveSerdeTablesInPlace,
MigrateExternalTablesCTAS,
)
from databricks.labs.ucx.source_code.workflows import ExperimentalWorkflowLinter
from databricks.labs.ucx.workspace_access.workflows import (
GroupMigration,
PermissionsMigrationAPI,
Expand Down Expand Up @@ -50,6 +51,7 @@ def all(cls):
RemoveWorkspaceLocalGroups(),
MigrateTablesInMounts(),
PermissionsMigrationAPI(),
ExperimentalWorkflowLinter(),
Failing(),
]
)
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/source_code/dbfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def lint(self, code: str) -> Iterable[Advice]:
tree = ast.parse(code)
visitor = DetectDbfsVisitor()
visitor.visit(tree)
return visitor.get_advices()
yield from visitor.get_advices()


class FromDbfsFolder(Linter):
Expand Down
Loading
Loading