Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more checks for spark-connect linter #2092

Merged
merged 3 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/databricks/labs/ucx/source_code/linters/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@


class LinterContext:
def __init__(self, index: MigrationIndex | None = None, session_state: CurrentSessionState | None = None):
def __init__(
self,
index: MigrationIndex | None = None,
session_state: CurrentSessionState | None = None,
dbr_version: tuple[int, int] | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make dbr version a property of CurrentSessionState as well. we'll have few more and we should have just one container to pass along, not many parameters we may forget about.

is_serverless: bool = False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move is_serverless to CurrentSessionState? also, please modify the jobs.py to lint serverless jobs properly

):
self._index = index
session_state = CurrentSessionState() if not session_state else session_state

Expand All @@ -41,8 +47,8 @@ def __init__(self, index: MigrationIndex | None = None, session_state: CurrentSe

python_linters += [
DBFSUsageLinter(session_state),
DBRv8d0Linter(dbr_version=None),
SparkConnectLinter(is_serverless=False),
DBRv8d0Linter(dbr_version=dbr_version),
SparkConnectLinter(dbr_version=dbr_version, is_serverless=is_serverless),
DbutilsLinter(session_state),
]
sql_linters.append(FromDbfsFolder())
Expand Down
10 changes: 10 additions & 0 deletions src/databricks/labs/ucx/source_code/linters/python_ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@ def __repr__(self):
def get_full_attribute_name(cls, node: Attribute) -> str:
return cls._get_attribute_value(node)

@classmethod
def get_function_name(cls, node: Call) -> str | None:
if not isinstance(node, Call):
return None
if isinstance(node.func, Attribute):
return node.func.attrname
if isinstance(node.func, Name):
return node.func.name
return None

@classmethod
def get_full_function_name(cls, node: Call) -> str | None:
if not isinstance(node, Call):
Expand Down
74 changes: 72 additions & 2 deletions src/databricks/labs/ucx/source_code/linters/spark_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from collections.abc import Iterator
from dataclasses import dataclass

from astroid import Attribute, Call, Name, NodeNG # type: ignore
from astroid import Attribute, Call, Const, Name, NodeNG # type: ignore
from databricks.labs.ucx.source_code.base import (
Advice,
Failure,
Expand Down Expand Up @@ -172,14 +172,84 @@ def _match_jvm_log(self, node: NodeNG) -> Iterator[Advice]:
)


@dataclass
class UDFMatcher(SharedClusterMatcher):
_DBR_14_2_BELOW_NOT_SUPPORTED = ["applyInPandas", "mapInPandas", "applyInPandasWithState", "udtf", "pandas_udf"]

dbr_version: tuple[int, int] | None

def lint(self, node: NodeNG) -> Iterator[Advice]:
if not isinstance(node, Call):
return
function_name = Tree.get_function_name(node)

if function_name == 'registerJavaFunction':
yield Failure.from_node(
code='python-udf-in-shared-clusters',
message=f'Cannot register Java UDF from Python code on {self._cluster_type_str()}. '
f'Use a %scala cell to register the Scala UDF using spark.udf.register.',
node=node,
)

if (
function_name in UDFMatcher._DBR_14_2_BELOW_NOT_SUPPORTED
and self.dbr_version
and self.dbr_version < (14, 3)
):
yield Failure.from_node(
code='python-udf-in-shared-clusters',
message=f'{function_name} require DBR 14.3 LTS or above on {self._cluster_type_str()}',
node=node,
)

if function_name == 'udf' and self.dbr_version and self.dbr_version < (14, 3):
for keyword in node.keywords:
if keyword.arg == 'useArrow' and isinstance(keyword.value, Const) and keyword.value.value:
yield Failure.from_node(
code='python-udf-in-shared-clusters',
message=f'Arrow UDFs require DBR 14.3 LTS or above on {self._cluster_type_str()}',
node=node,
)


class CatalogApiMatcher(SharedClusterMatcher):
def lint(self, node: NodeNG) -> Iterator[Advice]:
if not isinstance(node, Attribute):
return
if node.attrname == 'catalog' and Tree.get_full_attribute_name(node).endswith('spark.catalog'):
yield Failure.from_node(
code='catalog-api-in-shared-clusters',
message=f'spark.catalog functions require DBR 14.3 LTS or above on {self._cluster_type_str()}',
node=node,
)


class CommandContextMatcher(SharedClusterMatcher):
def lint(self, node: NodeNG) -> Iterator[Advice]:
if not isinstance(node, Call):
return
function_name = Tree.get_full_function_name(node)
if function_name and function_name.endswith('getContext.toJson'):
yield Failure.from_node(
code='toJson-in-shared-clusters',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use lowercase for codes, as i suspect that's how LSP would normalise everything.

message=f'toJson() is not available on {self._cluster_type_str()}. '
f'Use toSafeJson() on DBR 13.3 LTS or above to get a subset of command context information.',
node=node,
)


class SparkConnectLinter(PythonLinter):
def __init__(self, is_serverless: bool = False):
def __init__(self, dbr_version: tuple[int, int] | None = None, is_serverless: bool = False):
self._matchers = [
JvmAccessMatcher(is_serverless=is_serverless),
RDDApiMatcher(is_serverless=is_serverless),
SparkSqlContextMatcher(is_serverless=is_serverless),
LoggingMatcher(is_serverless=is_serverless),
UDFMatcher(is_serverless=is_serverless, dbr_version=dbr_version),
CommandContextMatcher(is_serverless=is_serverless),
]
if dbr_version and dbr_version < (14, 3):
self._matchers.append(CatalogApiMatcher(is_serverless=is_serverless))

def lint_tree(self, tree: Tree) -> Iterator[Advice]:
for matcher in self._matchers:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# ucx[catalog-api-in-shared-clusters:+1:0:+1:13] spark.catalog functions require DBR 14.3 LTS or above on UC Shared Clusters
spark.catalog.tableExists("table")
# ucx[catalog-api-in-shared-clusters:+1:0:+1:13] spark.catalog functions require DBR 14.3 LTS or above on UC Shared Clusters
spark.catalog.listDatabases()


def catalog():
nfx marked this conversation as resolved.
Show resolved Hide resolved
pass


catalog()
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
spark.catalog.tableExists("table")
spark.catalog.listDatabases()


def catalog():
pass


catalog()
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# ucx[toJson-in-shared-clusters:+1:6:+1:80] toJson() is not available on UC Shared Clusters. Use toSafeJson() on DBR 13.3 LTS or above to get a subset of command context information.
print(dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson())
dbutils.notebook.entry_point.getDbutils().notebook().getContext().toSafeJson()
notebook = dbutils.notebook.entry_point.getDbutils().notebook()
# ucx[toJson-in-shared-clusters:+1:0:+1:30] toJson() is not available on UC Shared Clusters. Use toSafeJson() on DBR 13.3 LTS or above to get a subset of command context information.
notebook.getContext().toJson()
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from pyspark.sql.functions import udf, udtf, lit
import pandas as pd


@udf(returnType='int')
def slen(s):
return len(s)


# ucx[python-udf-in-shared-clusters:+1:1:+1:37] Arrow UDFs require DBR 14.3 LTS or above on UC Shared Clusters
@udf(returnType='int', useArrow=True)
def arrow_slen(s):
return len(s)


df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name"), arrow_slen("name")).show()

slen1 = udf(lambda s: len(s), returnType='int')
# ucx[python-udf-in-shared-clusters:+1:14:+1:68] Arrow UDFs require DBR 14.3 LTS or above on UC Shared Clusters
arrow_slen1 = udf(lambda s: len(s), returnType='int', useArrow=True)

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))

df.select(slen1("name"), arrow_slen1("name")).show()

df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))


def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
v = pdf.v
return pdf.assign(v=v - v.mean())


# ucx[python-udf-in-shared-clusters:+1:0:+1:73] applyInPandas require DBR 14.3 LTS or above on UC Shared Clusters
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()


class SquareNumbers:
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)


# ucx[python-udf-in-shared-clusters:+1:13:+1:69] udtf require DBR 14.3 LTS or above on UC Shared Clusters
square_num = udtf(SquareNumbers, returnType="num: int, squared: int")
square_num(lit(1), lit(3)).show()

from pyspark.sql.types import IntegerType

# ucx[python-udf-in-shared-clusters:+1:0:+1:73] Cannot register Java UDF from Python code on UC Shared Clusters. Use a %scala cell to register the Scala UDF using spark.udf.register.
spark.udf.registerJavaFunction("func", "org.example.func", IntegerType())
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from pyspark.sql.functions import udf, udtf, lit
import pandas as pd


@udf(returnType='int')
def slen(s):
return len(s)


@udf(returnType='int', useArrow=True)
def arrow_slen(s):
return len(s)


df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name"), arrow_slen("name")).show()

slen1 = udf(lambda s: len(s), returnType='int')
arrow_slen1 = udf(lambda s: len(s), returnType='int', useArrow=True)

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))

df.select(slen1("name"), arrow_slen1("name")).show()

df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))


def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
v = pdf.v
return pdf.assign(v=v - v.mean())


df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()

from pyspark.sql.types import IntegerType

# ucx[python-udf-in-shared-clusters:+1:0:+1:73] Cannot register Java UDF from Python code on UC Shared Clusters. Use a %scala cell to register the Scala UDF using spark.udf.register.
spark.udf.registerJavaFunction("func", "org.example.func", IntegerType())
11 changes: 10 additions & 1 deletion tests/unit/source_code/test_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ class Functional:
)
_location = Path(__file__).parent / 'samples/functional'

TEST_DBR_VERSION = {
'python-udfs_13_3.py': (13, 3),
Copy link
Collaborator

@nfx nfx Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

files can be moved and renamed by people unaware of this property.

don't you want to introduce # ucx session state: {"dbr_version": [13, 3]} / # ucx session state: {"is_serverless": true} meta-comments, where you parse JSON and set override the current session state?.. make sure to validate this instruction appears only once per file, because of the way we initialise linters.

technically, we can make it into processing instrustion and be in the parser flow, but it's more complicated.

'catalog-api_13_3.py': (13, 3),
'python-udfs_14_3.py': (14, 3),
'catalog-api_14_3.py': (14, 3),
}

@classmethod
def all(cls) -> list['Functional']:
return [Functional(_) for _ in cls._location.glob('**/*.py')]
Expand Down Expand Up @@ -104,7 +111,9 @@ def _lint(self) -> Iterable[Advice]:
)
session_state = CurrentSessionState()
session_state.named_parameters = {"my-widget": "my-path.py"}
ctx = LinterContext(migration_index, session_state)
ctx = LinterContext(
migration_index, session_state, dbr_version=Functional.TEST_DBR_VERSION.get(self.path.name, None)
)
linter = FileLinter(ctx, self.path)
return linter.lint()

Expand Down
Loading