diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala new file mode 100644 index 000000000000..bc02efd5113c --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import org.apache.spark.sql.AnalysisException + +/** + * Thrown when a query failed for invalid function class, usually because a SQL + * function's class does not follow the rules of the UDF/UDAF/UDTF class definition. + */ +class InvalidUDFClassException private[sql](message: String) + extends AnalysisException(message, None, None, None, None) { +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b79857cdccd2..12c8a64b1b60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1304,7 +1304,7 @@ class SessionCatalog( } e } else { - throw new AnalysisException(s"No handler for UDAF '${clazz.getCanonicalName}'. " + + throw new InvalidUDFClassException(s"No handler for UDAF '${clazz.getCanonicalName}'. " + s"Use sparkSession.udf.register(...) instead.") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index bc7760c982aa..f24834b938a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, Gener import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry -import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, ExternalCatalog, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Cast, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper @@ -57,6 +57,56 @@ private[sql] class HiveSessionCatalog( parser, functionResourceLoader) { + private def makeHiveFunctionExpression( + name: String, + clazz: Class[_], + input: Seq[Expression]): Expression = { + var udfExpr: Option[Expression] = None + try { + // When we instantiate hive UDF wrapper class, we may throw exception if the input + // expressions don't satisfy the hive UDF, such as type mismatch, input number + // mismatch, etc. Here we catch the exception and throw AnalysisException instead. + if (classOf[UDF].isAssignableFrom(clazz)) { + udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. + } else if (classOf[GenericUDF].isAssignableFrom(clazz)) { + udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. + } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) { + udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. + } else if (classOf[UDAF].isAssignableFrom(clazz)) { + udfExpr = Some(HiveUDAFFunction( + name, + new HiveFunctionWrapper(clazz.getName), + input, + isUDAFBridgeRequired = true)) + udfExpr.get.dataType // Force it to check input data types. + } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) { + udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)) + // Force it to check data types. + udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema + } + } catch { + case NonFatal(e) => + val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e" + val errorMsg = + if (classOf[GenericUDTF].isAssignableFrom(clazz)) { + s"$noHandlerMsg\nPlease make sure your function overrides " + + "`public StructObjectInspector initialize(ObjectInspector[] args)`." + } else { + noHandlerMsg + } + val analysisException = new AnalysisException(errorMsg) + analysisException.setStackTrace(e.getStackTrace) + throw analysisException + } + udfExpr.getOrElse { + throw new InvalidUDFClassException( + s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'") + } + } + /** * Constructs a [[Expression]] based on the provided class that represents a function. * @@ -69,49 +119,14 @@ private[sql] class HiveSessionCatalog( // Current thread context classloader may not be the one loaded the class. Need to switch // context classloader to initialize instance properly. Utils.withContextClassLoader(clazz.getClassLoader) { - Try(super.makeFunctionExpression(name, clazz, input)).getOrElse { - var udfExpr: Option[Expression] = None - try { - // When we instantiate hive UDF wrapper class, we may throw exception if the input - // expressions don't satisfy the hive UDF, such as type mismatch, input number - // mismatch, etc. Here we catch the exception and throw AnalysisException instead. - if (classOf[UDF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[GenericUDF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) { - udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[UDAF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveUDAFFunction( - name, - new HiveFunctionWrapper(clazz.getName), - input, - isUDAFBridgeRequired = true)) - udfExpr.get.dataType // Force it to check input data types. - } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) { - udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)) - udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types. - } - } catch { - case NonFatal(e) => - val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e" - val errorMsg = - if (classOf[GenericUDTF].isAssignableFrom(clazz)) { - s"$noHandlerMsg\nPlease make sure your function overrides " + - "`public StructObjectInspector initialize(ObjectInspector[] args)`." - } else { - noHandlerMsg - } - val analysisException = new AnalysisException(errorMsg) - analysisException.setStackTrace(e.getStackTrace) - throw analysisException - } - udfExpr.getOrElse { - throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'") - } + try { + super.makeFunctionExpression(name, clazz, input) + } catch { + // If `super.makeFunctionExpression` throw `InvalidUDFClassException`, we construct + // Hive UDF/UDAF/UDTF with function definition. Otherwise, we just throw it earlier. + case _: InvalidUDFClassException => + makeHiveFunctionExpression(name, clazz, input) + case e => throw e } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala index 9e33a8ee4cc5..ed44dcd8d7a2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala @@ -161,6 +161,20 @@ class HiveUDAFSuite extends QueryTest checkAnswer(sql("select histogram_numeric(a,2) from abc where a=3"), Row(null)) } } + + test("SPARK-32243: Spark UDAF Invalid arguments number error should throw earlier") { + // func need two arguments + val functionName = "longProductSum" + val functionClass = "org.apache.spark.sql.hive.execution.LongProductSum" + withUserDefinedFunction(functionName -> true) { + sql(s"CREATE TEMPORARY FUNCTION $functionName AS '$functionClass'") + val e = intercept[AnalysisException] { + sql(s"SELECT $functionName(100)") + }.getMessage + assert(e.contains( + s"Invalid number of arguments for function $functionName. Expected: 2; Found: 1;")) + } + } } /**