From 2d586a80e6f67e3adac17a8c1b5d09fbbf4252d5 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 29 Jul 2017 12:01:34 +0900 Subject: [PATCH 1/3] Print warning messages when override function `configure` found --- .../spark/sql/hive/HiveSessionCatalog.scala | 4 ++- .../org/apache/spark/sql/hive/HiveShim.scala | 34 ++++++++++++++++--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 0d0269f694300..78cff99f1edb4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper +import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DecimalType, DoubleType} import org.apache.spark.util.Utils @@ -66,6 +66,8 @@ private[sql] class HiveSessionCatalog( * Construct a [[FunctionBuilder]] based on the provided class that represents a function. */ private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = { + validateHiveUserDefinedFunction(clazz) + // When we instantiate hive UDF wrapper class, we may throw exception if the input // expressions don't satisfy the hive UDF, such as type mismatch, input number // mismatch, etc. Here we catch the exception and throw AnalysisException instead. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index 9e9894803ce25..a2c6b86d2c95e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -18,19 +18,20 @@ package org.apache.spark.sql.hive import java.io.{InputStream, OutputStream} +import java.lang.reflect.Type import java.rmi.server.UID import scala.collection.JavaConverters._ import scala.language.implicitConversions -import scala.reflect.ClassTag +import scala.reflect.{classTag, ClassTag} import com.google.common.base.Objects import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.ql.exec.{UDF, Utilities} +import org.apache.hadoop.hive.ql.exec.{MapredContext, UDF, Utilities} import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro +import org.apache.hadoop.hive.ql.udf.generic.{GenericUDF, GenericUDFMacro, GenericUDTF} import org.apache.hadoop.hive.serde2.ColumnProjectionUtils import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils} import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector @@ -42,7 +43,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.types.Decimal import org.apache.spark.util.Utils -private[hive] object HiveShim { +private[hive] object HiveShim extends Logging { // Precision and scale to pass for unlimited decimals; these are the same as the precision and // scale Hive 0.13 infers for BigDecimals from sources that don't specify them (e.g. UDFs) val UNLIMITED_DECIMAL_PRECISION = 38 @@ -111,6 +112,31 @@ private[hive] object HiveShim { } } + private def isSubClassOf(t: Type, parent: Class[_]): Boolean = t match { + case cls: Class[_] => parent.isAssignableFrom(cls) + case _ => false + } + + private def hasInheritanceOf[UDFType: ClassTag](funcName: String, clazz: Class[_]): Boolean = { + val clsTag = classTag[UDFType].runtimeClass + if (isSubClassOf(clazz, clsTag)) { + val funcClass = clazz.getMethod(funcName, classOf[MapredContext]) + funcClass.getDeclaringClass != clsTag + } else { + false + } + } + + def validateHiveUserDefinedFunction(udfClass: Class[_]): Unit = { + if (hasInheritanceOf[GenericUDF]("configure", udfClass) || + hasInheritanceOf[GenericUDTF]("configure", udfClass)) { + logWarning(s"Found an overridden method `configure` in ${udfClass.getSimpleName}, but " + + "Spark does not call the method during initialization because Spark does not use " + + "MapredContext inside (See SPARK-21533). So, you might reconsider the implementation of " + + s"${udfClass.getSimpleName}.") + } + } + /** * This class provides the UDF creation and also the UDF instance serialization and * de-serialization cross process boundary. From c9b60806dfe931752c3eb72e8347b149e8843594 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 30 Jul 2017 21:17:36 +0900 Subject: [PATCH 2/3] Apply viirya's comment --- .../main/scala/org/apache/spark/sql/hive/HiveShim.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index a2c6b86d2c95e..c53fa614851bb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -24,6 +24,7 @@ import java.rmi.server.UID import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} +import scala.util.control.NonFatal import com.google.common.base.Objects import org.apache.avro.Schema @@ -120,8 +121,12 @@ private[hive] object HiveShim extends Logging { private def hasInheritanceOf[UDFType: ClassTag](funcName: String, clazz: Class[_]): Boolean = { val clsTag = classTag[UDFType].runtimeClass if (isSubClassOf(clazz, clsTag)) { - val funcClass = clazz.getMethod(funcName, classOf[MapredContext]) - funcClass.getDeclaringClass != clsTag + try { + val funcClass = clazz.getMethod(funcName, classOf[MapredContext]) + funcClass.getDeclaringClass != clsTag + } catch { + case NonFatal(_) => false + } } else { false } From 34238fff3f742a8b48bb641b13e395fe9b51f0da Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 30 Jul 2017 22:59:26 +0900 Subject: [PATCH 3/3] Apply more comment --- .../org/apache/spark/sql/hive/HiveShim.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index c53fa614851bb..9e6a1e7434e59 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive import java.io.{InputStream, OutputStream} -import java.lang.reflect.Type import java.rmi.server.UID import scala.collection.JavaConverters._ @@ -113,17 +112,14 @@ private[hive] object HiveShim extends Logging { } } - private def isSubClassOf(t: Type, parent: Class[_]): Boolean = t match { - case cls: Class[_] => parent.isAssignableFrom(cls) - case _ => false - } - - private def hasInheritanceOf[UDFType: ClassTag](funcName: String, clazz: Class[_]): Boolean = { - val clsTag = classTag[UDFType].runtimeClass - if (isSubClassOf(clazz, clsTag)) { + private def hasInheritanceOf[UDFType: ClassTag](func: String, clazz: Class[_]): Boolean = { + val parentClazz = classTag[UDFType].runtimeClass + if (parentClazz.isAssignableFrom(clazz)) { try { - val funcClass = clazz.getMethod(funcName, classOf[MapredContext]) - funcClass.getDeclaringClass != clsTag + val funcClass = clazz.getMethod(func, classOf[MapredContext]) + // If a given `func` not overridden, `Method.getDeclaringClass` returns + // a parent Class object. + funcClass.getDeclaringClass != parentClazz } catch { case NonFatal(_) => false }