Skip to content

Commit

Permalink
Added baseline for workflow linter (#1613)
Browse files Browse the repository at this point in the history
```mermaid
flowchart TD
    job -->|has many| job_task
    job_task -.-> notebook_task
    job_task -.-> wheel_task 

    job -.-> git_source

    job_task -.->|execute on| interactive_cluster
    interactive_cluster -.-> library

    job_task -.-> library
    library -.-> wheel_on_dbfs
    library -.-> wheel_on_wsfs
    library -.-> wheel_on_volumes
    library -.-> egg_on_dbfs
    library -.-> egg_on_wsfs
    library -.-> pypi
    wheel_task -.-> wheel_on_dbfs
    wheel_task -.-> wheel_on_wsfs

    wheel_on_dbfs -.-> python_file
    wheel_on_wsfs -.-> python_file
    egg_on_dbfs -.-> python_file
    egg_on_wsfs -.-> python_file
    pypi -.-> python_file
    wsfs_file -.-> python_file
    python_file -.->|import| python_file
    notebook_task -.-> notebook
    notebook -.->|import| python_file
    notebook -.->|can run| notebook

    job_task -.-> dependency_graph
    python_file --> dependency_graph
    notebook --> dependency_graph

    git_source -.-> python_file
    git_source -.-> notebook
    lint_local_code_cli --> dependency_graph

    workflow_linter --> dependency_graph
    workflow_linter -.-> job_problems
    dependency_graph -.-> job_problems
    job_problems -.->|viz| redash_dashboard
```

This PR adds baseline for linting workflows

Related to:
- #1542 
- #1541
- #1540
- #1539
- #1382
- #1204
- #1203
- #1085

closes #1559
closes #1468
closes #1286
  • Loading branch information
nfx authored May 7, 2024
1 parent 1350995 commit 1ae345a
Show file tree
Hide file tree
Showing 39 changed files with 1,267 additions and 1,316 deletions.
27 changes: 20 additions & 7 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
AwsACL,
)
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.migration_status import MigrationIndex
from databricks.labs.ucx.hive_metastore.table_migrate import (
MigrationStatusRefresher,
TablesMigrator,
Expand All @@ -33,12 +34,16 @@
from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler
from databricks.labs.ucx.hive_metastore.verification import VerifyHasMetastore
from databricks.labs.ucx.installer.workflows import DeployedWorkflows
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookResolver, NotebookLoader, WorkspaceNotebookLoader
from databricks.labs.ucx.source_code.jobs import WorkflowLinter
from databricks.labs.ucx.source_code.notebooks.loaders import (
NotebookResolver,
NotebookLoader,
)
from databricks.labs.ucx.source_code.files import FileLoader, LocalFileResolver
from databricks.labs.ucx.source_code.path_lookup import PathLookup
from databricks.labs.ucx.source_code.graph import DependencyResolver, DependencyGraphBuilder
from databricks.labs.ucx.source_code.whitelist import WhitelistResolver, Whitelist
from databricks.labs.ucx.source_code.site_packages import SitePackagesResolver, SitePackages
from databricks.labs.ucx.source_code.site_packages import SitePackageResolver, SitePackages
from databricks.labs.ucx.source_code.languages import Languages
from databricks.labs.ucx.workspace_access import generic, redash
from databricks.labs.ucx.workspace_access.groups import GroupManager
Expand Down Expand Up @@ -346,7 +351,7 @@ def verify_has_metastore(self):

@cached_property
def notebook_loader(self) -> NotebookLoader:
return WorkspaceNotebookLoader(self.workspace_client)
return NotebookLoader()

@cached_property
def notebook_resolver(self):
Expand All @@ -364,11 +369,11 @@ def path_lookup(self):

@cached_property
def file_loader(self):
return FileLoader(self.path_lookup)
return FileLoader()

@cached_property
def site_packages_resolver(self):
return SitePackagesResolver(self.site_packages, self.file_loader, self.path_lookup)
return SitePackageResolver(self.site_packages, self.file_loader, self.path_lookup)

@cached_property
def whitelist(self):
Expand All @@ -385,11 +390,10 @@ def file_resolver(self):

@cached_property
def dependency_resolver(self):
# TODO: link back self.site_packages_resolver and self.whitelist_resolver,
return DependencyResolver(
[
self.notebook_resolver,
self.site_packages_resolver,
self.whitelist_resolver,
self.file_resolver,
]
)
Expand All @@ -398,6 +402,15 @@ def dependency_resolver(self):
def dependency_graph_builder(self):
return DependencyGraphBuilder(self.dependency_resolver, self.path_lookup)

@cached_property
def workflow_linter(self):
return WorkflowLinter(
self.workspace_client,
self.dependency_resolver,
self.path_lookup,
MigrationIndex([]), # TODO: bring back self.tables_migrator.index()
)


class CliContext(GlobalContext, abc.ABC):
@cached_property
Expand Down
5 changes: 0 additions & 5 deletions src/databricks/labs/ucx/contexts/workspace_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from databricks.labs.ucx.azure.resources import AzureAPIClient, AzureResources
from databricks.labs.ucx.contexts.application import CliContext
from databricks.labs.ucx.source_code.files import LocalFileMigrator
from databricks.labs.ucx.source_code.notebooks.loaders import NotebookLoader, LocalNotebookLoader
from databricks.labs.ucx.workspace_access.clusters import ClusterAccess


Expand Down Expand Up @@ -171,7 +170,3 @@ def iam_role_creation(self):
self.workspace_client,
self.aws_resource_permissions,
)

@cached_property
def notebook_loader(self) -> NotebookLoader:
return LocalNotebookLoader(self.path_lookup)
2 changes: 2 additions & 0 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from databricks.labs.ucx.installer.policy import ClusterPolicyInstaller
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment
from databricks.labs.ucx.runtime import Workflows
from databricks.labs.ucx.source_code.jobs import JobProblem
from databricks.labs.ucx.workspace_access.base import Permissions
from databricks.labs.ucx.workspace_access.generic import WorkspaceObjectInfo
from databricks.labs.ucx.workspace_access.groups import ConfigureGroups, MigratedGroup
Expand Down Expand Up @@ -101,6 +102,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
functools.partial(table, "submit_runs", SubmitRunInfo),
functools.partial(table, "policies", PolicyInfo),
functools.partial(table, "migration_status", MigrationStatus),
functools.partial(table, "workflow_problems", JobProblem),
functools.partial(table, "udfs", Udf),
functools.partial(table, "logs", LogRecord),
],
Expand Down
54 changes: 23 additions & 31 deletions src/databricks/labs/ucx/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,14 +558,16 @@ def create(*, scope: str, principal: str, permission: workspace.AclPermission):
def make_notebook(ws, make_random):
def create(
*,
path: str | None = None,
path: str | Path | None = None,
content: BinaryIO | None = None,
language: Language = Language.PYTHON,
format: ImportFormat = ImportFormat.SOURCE, # pylint: disable=redefined-builtin
overwrite: bool = False,
) -> str:
if path is None:
path = f"/Users/{ws.current_user.me().user_name}/sdk-{make_random(4)}"
elif isinstance(path, pathlib.Path):
path = str(path)
if content is None:
content = io.BytesIO(b"print(1)")
path = str(path)
Expand Down Expand Up @@ -754,43 +756,33 @@ def create(*, instance_pool_name=None, node_type_id=None, **kwargs):

@pytest.fixture
def make_job(ws, make_random, make_notebook):
def create(**kwargs):
def create(notebook_path: str | Path | None = None, **kwargs):
task_spark_conf = None
if "name" not in kwargs:
kwargs["name"] = f"sdk-{make_random(4)}"
if "spark_conf" in kwargs:
task_spark_conf = kwargs["spark_conf"]
kwargs.pop("spark_conf")
if isinstance(notebook_path, pathlib.Path):
notebook_path = str(notebook_path)
if not notebook_path:
notebook_path = make_notebook()
assert notebook_path is not None
if "tasks" not in kwargs:
if task_spark_conf:
kwargs["tasks"] = [
jobs.Task(
task_key=make_random(4),
description=make_random(4),
new_cluster=compute.ClusterSpec(
num_workers=1,
node_type_id=ws.clusters.select_node_type(local_disk=True, min_memory_gb=16),
spark_version=ws.clusters.select_spark_version(latest=True),
spark_conf=task_spark_conf,
),
notebook_task=jobs.NotebookTask(notebook_path=make_notebook()),
timeout_seconds=0,
)
]
else:
kwargs["tasks"] = [
jobs.Task(
task_key=make_random(4),
description=make_random(4),
new_cluster=compute.ClusterSpec(
num_workers=1,
node_type_id=ws.clusters.select_node_type(local_disk=True, min_memory_gb=16),
spark_version=ws.clusters.select_spark_version(latest=True),
),
notebook_task=jobs.NotebookTask(notebook_path=make_notebook()),
timeout_seconds=0,
)
]
kwargs["tasks"] = [
jobs.Task(
task_key=make_random(4),
description=make_random(4),
new_cluster=compute.ClusterSpec(
num_workers=1,
node_type_id=ws.clusters.select_node_type(local_disk=True, min_memory_gb=16),
spark_version=ws.clusters.select_spark_version(latest=True),
spark_conf=task_spark_conf,
),
notebook_task=jobs.NotebookTask(notebook_path=str(notebook_path)),
timeout_seconds=0,
)
]
job = ws.jobs.create(**kwargs)
logger.info(f"Job: {ws.config.host}#job/{job.job_id}")
return job
Expand Down
28 changes: 20 additions & 8 deletions src/databricks/labs/ucx/mixins/wspath.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from functools import cached_property

# pylint: disable-next=import-private-name
from pathlib import Path, _PosixFlavour, _Accessor # type: ignore
from pathlib import Path, _PosixFlavour # type: ignore
from urllib.parse import quote_from_bytes as urlquote_from_bytes
from io import BytesIO, StringIO

from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.errors import NotFound, DatabricksError
from databricks.sdk.service.workspace import ObjectInfo, ObjectType, ExportFormat, ImportFormat, Language

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,7 +74,7 @@ def __exit__(self, *args):
pass


class _DatabricksAccessor(_Accessor):
class _DatabricksAccessor:
chmod = _na('accessor.chmod')
getcwd = _na('accessor.getcwd')
group = _na('accessor.group')
Expand Down Expand Up @@ -286,8 +286,11 @@ def suffix(self):
if not self.is_notebook():
return ""
for sfx, lang in self._SUFFIXES.items():
if self._object_info.language == lang:
return sfx
try:
if self._object_info.language == lang:
return sfx
except DatabricksError:
return ""
return ""

def __lt__(self, other: pathlib.PurePath):
Expand All @@ -313,13 +316,22 @@ def _return_false(self) -> bool:
is_junction = _return_false

def is_dir(self):
return self._object_info.object_type == ObjectType.DIRECTORY
try:
return self._object_info.object_type == ObjectType.DIRECTORY
except DatabricksError:
return False

def is_file(self):
return self._object_info.object_type == ObjectType.FILE
try:
return self._object_info.object_type == ObjectType.FILE
except DatabricksError:
return False

def is_notebook(self):
return self._object_info.object_type == ObjectType.NOTEBOOK
try:
return self._object_info.object_type == ObjectType.NOTEBOOK
except DatabricksError:
return False

def __eq__(self, other):
return isinstance(other, Path) and self.as_posix() == other.as_posix()
Expand Down
2 changes: 2 additions & 0 deletions src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
MigrateHiveSerdeTablesInPlace,
MigrateExternalTablesCTAS,
)
from databricks.labs.ucx.source_code.workflows import ExperimentalWorkflowLinter
from databricks.labs.ucx.workspace_access.workflows import (
GroupMigration,
PermissionsMigrationAPI,
Expand Down Expand Up @@ -50,6 +51,7 @@ def all(cls):
RemoveWorkspaceLocalGroups(),
MigrateTablesInMounts(),
PermissionsMigrationAPI(),
ExperimentalWorkflowLinter(),
Failing(),
]
)
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/source_code/dbfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def lint(self, code: str) -> Iterable[Advice]:
tree = ast.parse(code)
visitor = DetectDbfsVisitor()
visitor.visit(tree)
return visitor.get_advices()
yield from visitor.get_advices()


class FromDbfsFolder(Linter):
Expand Down
Loading

0 comments on commit 1ae345a

Please sign in to comment.