Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions python/pyspark/sql/tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ def test_list_functions(self):

with self.function("func1", "some_db.func2"):
spark.catalog.registerFunction("temp_func", lambda x: str(x))
spark.sql("CREATE FUNCTION func1 AS 'org.apache.spark.data.bricks'")
spark.sql("CREATE FUNCTION some_db.func2 AS 'org.apache.spark.data.bricks'")
spark.sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
spark.sql(
"CREATE FUNCTION some_db.func2 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
newFunctions = dict((f.name, f) for f in spark.catalog.listFunctions())
newFunctionsSomeDb = \
dict((f.name, f) for f in spark.catalog.listFunctions("some_db"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1359,16 +1359,24 @@ class SessionCatalog(
val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName)
val builder =
functionBuilder.getOrElse {
val className = funcDefinition.className
if (!Utils.classIsLoadable(className)) {
throw new AnalysisException(s"Can not load class '$className' when registering " +
s"the function '$func', please make sure it is on the classpath")
}
makeFunctionBuilder(func.unquotedString, className)
requireFunctionClassExists(funcDefinition)
makeFunctionBuilder(func.unquotedString, funcDefinition.className)
}
functionRegistry.registerFunction(func, info, builder)
}

/**
* Make sure function class is on the classpath.
*/
def requireFunctionClassExists(funcDefinition: CatalogFunction): Unit = {
val className = funcDefinition.className
if (!Utils.classIsLoadable(className)) {
throw new AnalysisException(s"Can not load class '$className' when registering " +
s"the function '${funcDefinition.identifier.unquotedString}', please make sure " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one last thing: shall we fill the default database before putting the function identifier in the error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed.

This method is used by both temporary and permanent function. The temporary has no database name and we can't fill the database name. The permanent follows user created.

s"it is on the classpath")
}
}

/**
* Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]]
* Return true if function exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,20 @@ case class CreateFunctionCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources)
catalog.loadFunctionResources(resources)
if (isTemp) {
// We first load resources and then put the builder in the function registry.
catalog.loadFunctionResources(resources)
val func = CatalogFunction(FunctionIdentifier(functionName, databaseName),
className, resources)
catalog.registerFunction(func, overrideIfExists = replace)
} else {
// For a permanent, we fill database name first.
val func = CatalogFunction(FunctionIdentifier(functionName,
Some(databaseName.getOrElse(catalog.getCurrentDatabase))), className, resources)
// We fail fast if function class is not exists.
catalog.requireFunctionClassExists(func)
// Handles `CREATE OR REPLACE FUNCTION AS ... USING ...`
if (replace && catalog.functionExists(func.identifier)) {
// alter the function in the metastore
// Alter the function in the metastore
catalog.alterFunction(func)
} else {
// For a permanent, we will store the metadata into underlying external catalog.
Expand Down
7 changes: 5 additions & 2 deletions sql/core/src/test/resources/sql-tests/results/udaf.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ CREATE FUNCTION udaf1 AS 'test.non.existent.udaf'
-- !query schema
struct<>
-- !query output

org.apache.spark.sql.AnalysisException
Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath;


-- !query
Expand All @@ -51,7 +52,7 @@ SELECT default.udaf1(int_col1) as udaf1 from t1
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; line 1 pos 7
Undefined function: 'udaf1'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7


-- !query
Expand All @@ -67,4 +68,6 @@ DROP FUNCTION udaf1
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
Function 'default.udaf1' not found in database 'default';

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ CREATE FUNCTION udaf1 AS 'test.non.existent.udaf'
-- !query schema
struct<>
-- !query output

org.apache.spark.sql.AnalysisException
Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath;


-- !query
Expand All @@ -51,7 +52,7 @@ SELECT default.udaf1(udf(int_col1)) as udaf1, udf(default.udaf1(udf(int_col1)))
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; line 1 pos 94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it was 'default.udaf1' previously. But the new message is 'udaf1': https://github.com/apache/spark/pull/29502/files#diff-2d3d6b47e6044b4d44590c5a73b7cd8bR46

Do you know why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For permanent, the old code path is:

create function // no class check
query on function -> lookup function -> fill database name if permanent -> register function -> check class

So the previously msg always contains database name.

Now the code path is:

create function -> check class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then shall we fill in the default database when creating permanent functions, to make the error message better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Permanent function always need a database, it's ok to fill database name when creating.

Undefined function: 'udaf1'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7


-- !query
Expand All @@ -67,4 +68,6 @@ DROP FUNCTION udaf1
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
Function 'default.udaf1' not found in database 'default';

Original file line number Diff line number Diff line change
Expand Up @@ -3176,6 +3176,21 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}

test("SPARK-32677: Load function resource before create") {
withUserDefinedFunction("func1" -> false) {
val func = FunctionIdentifier("func1", Some("default"))
val msg = intercept[AnalysisException] {
sql("CREATE FUNCTION func1 AS 'test.non.exists.udf'")
}.getMessage
assert(msg.contains("please make sure it is on the classpath"))
assert(!spark.sessionState.catalog.functionExists(func))
assert(!spark.sessionState.catalog.isRegisteredFunction(func))

sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
assert(spark.sessionState.catalog.functionExists(func))
}
}
}

object FakeLocalFsFileSystem {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ class HiveUDFDynamicLoadSuite extends QueryTest with SQLTestUtils with TestHiveS

sql(s"CREATE FUNCTION ${udfInfo.funcName} AS '${udfInfo.className}' USING JAR '$jarUrl'")

assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need load resource during create function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was created in #27025.

assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader)
assert(Thread.currentThread().getContextClassLoader eq
spark.sqlContext.sharedState.jarClassLoader)

// JAR will be loaded at first usage, and it will change the current thread's
// context classloader to jar classloader in sharedState.
Expand Down