diff --git a/python/pyspark/sql/tests/test_catalog.py b/python/pyspark/sql/tests/test_catalog.py index 141b249db0fc6..4d52c7012a8f5 100644 --- a/python/pyspark/sql/tests/test_catalog.py +++ b/python/pyspark/sql/tests/test_catalog.py @@ -128,8 +128,9 @@ def test_list_functions(self): with self.function("func1", "some_db.func2"): spark.catalog.registerFunction("temp_func", lambda x: str(x)) - spark.sql("CREATE FUNCTION func1 AS 'org.apache.spark.data.bricks'") - spark.sql("CREATE FUNCTION some_db.func2 AS 'org.apache.spark.data.bricks'") + spark.sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + spark.sql( + "CREATE FUNCTION some_db.func2 AS 'test.org.apache.spark.sql.MyDoubleAvg'") newFunctions = dict((f.name, f) for f in spark.catalog.listFunctions()) newFunctionsSomeDb = \ dict((f.name, f) for f in spark.catalog.listFunctions("some_db")) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6fba3156c3919..d0282f785463b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1359,16 +1359,24 @@ class SessionCatalog( val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName) val builder = functionBuilder.getOrElse { - val className = funcDefinition.className - if (!Utils.classIsLoadable(className)) { - throw new AnalysisException(s"Can not load class '$className' when registering " + - s"the function '$func', please make sure it is on the classpath") - } - makeFunctionBuilder(func.unquotedString, className) + requireFunctionClassExists(funcDefinition) + makeFunctionBuilder(func.unquotedString, funcDefinition.className) } functionRegistry.registerFunction(func, info, builder) } + /** + * Make sure function class is on the classpath. + */ + def requireFunctionClassExists(funcDefinition: CatalogFunction): Unit = { + val className = funcDefinition.className + if (!Utils.classIsLoadable(className)) { + throw new AnalysisException(s"Can not load class '$className' when registering " + + s"the function '${funcDefinition.identifier.unquotedString}', please make sure " + + s"it is on the classpath") + } + } + /** * Unregister a temporary or permanent function from a session-specific [[FunctionRegistry]] * Return true if function exists. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala index fae8de4780102..af485c0b1d491 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala @@ -75,15 +75,20 @@ case class CreateFunctionCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), className, resources) + catalog.loadFunctionResources(resources) if (isTemp) { - // We first load resources and then put the builder in the function registry. - catalog.loadFunctionResources(resources) + val func = CatalogFunction(FunctionIdentifier(functionName, databaseName), + className, resources) catalog.registerFunction(func, overrideIfExists = replace) } else { + // For a permanent, we fill database name first. + val func = CatalogFunction(FunctionIdentifier(functionName, + Some(databaseName.getOrElse(catalog.getCurrentDatabase))), className, resources) + // We fail fast if function class is not exists. + catalog.requireFunctionClassExists(func) // Handles `CREATE OR REPLACE FUNCTION AS ... USING ...` if (replace && catalog.functionExists(func.identifier)) { - // alter the function in the metastore + // Alter the function in the metastore catalog.alterFunction(func) } else { // For a permanent, we will store the metadata into underlying external catalog. diff --git a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out index 9f4229a11b65d..3bed62f220af1 100644 --- a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out @@ -42,7 +42,8 @@ CREATE FUNCTION udaf1 AS 'test.non.existent.udaf' -- !query schema struct<> -- !query output - +org.apache.spark.sql.AnalysisException +Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; -- !query @@ -51,7 +52,7 @@ SELECT default.udaf1(int_col1) as udaf1 from t1 struct<> -- !query output org.apache.spark.sql.AnalysisException -Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; line 1 pos 7 +Undefined function: 'udaf1'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7 -- !query @@ -67,4 +68,6 @@ DROP FUNCTION udaf1 -- !query schema struct<> -- !query output +org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException +Function 'default.udaf1' not found in database 'default'; diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out index 19221947b4a88..d1cce8b7b79f2 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out @@ -42,7 +42,8 @@ CREATE FUNCTION udaf1 AS 'test.non.existent.udaf' -- !query schema struct<> -- !query output - +org.apache.spark.sql.AnalysisException +Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; -- !query @@ -51,7 +52,7 @@ SELECT default.udaf1(udf(int_col1)) as udaf1, udf(default.udaf1(udf(int_col1))) struct<> -- !query output org.apache.spark.sql.AnalysisException -Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; line 1 pos 94 +Undefined function: 'udaf1'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7 -- !query @@ -67,4 +68,6 @@ DROP FUNCTION udaf1 -- !query schema struct<> -- !query output +org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException +Function 'default.udaf1' not found in database 'default'; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b8ac5079b7745..4e3675ca0e238 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -3176,6 +3176,21 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("SPARK-32677: Load function resource before create") { + withUserDefinedFunction("func1" -> false) { + val func = FunctionIdentifier("func1", Some("default")) + val msg = intercept[AnalysisException] { + sql("CREATE FUNCTION func1 AS 'test.non.exists.udf'") + }.getMessage + assert(msg.contains("please make sure it is on the classpath")) + assert(!spark.sessionState.catalog.functionExists(func)) + assert(!spark.sessionState.catalog.isRegisteredFunction(func)) + + sql("CREATE FUNCTION func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'") + assert(spark.sessionState.catalog.functionExists(func)) + } + } } object FakeLocalFsFileSystem { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala index ee8e6f4f78be5..cd8c02b405e7b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUDFDynamicLoadSuite.scala @@ -156,7 +156,9 @@ class HiveUDFDynamicLoadSuite extends QueryTest with SQLTestUtils with TestHiveS sql(s"CREATE FUNCTION ${udfInfo.funcName} AS '${udfInfo.className}' USING JAR '$jarUrl'") - assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader) + assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader) + assert(Thread.currentThread().getContextClassLoader eq + spark.sqlContext.sharedState.jarClassLoader) // JAR will be loaded at first usage, and it will change the current thread's // context classloader to jar classloader in sharedState.