Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, DoubleType}

import org.apache.spark.util.Utils

private[sql] class HiveSessionCatalog(
externalCatalogBuilder: () => ExternalCatalog,
Expand Down Expand Up @@ -65,49 +65,52 @@ private[sql] class HiveSessionCatalog(
name: String,
clazz: Class[_],
input: Seq[Expression]): Expression = {

Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
var udfExpr: Option[Expression] = None
try {
// When we instantiate hive UDF wrapper class, we may throw exception if the input
// expressions don't satisfy the hive UDF, such as type mismatch, input number
// mismatch, etc. Here we catch the exception and throw AnalysisException instead.
if (classOf[UDF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[UDAF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveUDAFFunction(
name,
new HiveFunctionWrapper(clazz.getName),
input,
isUDAFBridgeRequired = true))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types.
// Current thread context classloader may not be the one loaded the class. Need to switch
// context classloader to initialize instance properly.
Utils.withContextClassLoader(clazz.getClassLoader) {
Try(super.makeFunctionExpression(name, clazz, input)).getOrElse {
var udfExpr: Option[Expression] = None
try {
// When we instantiate hive UDF wrapper class, we may throw exception if the input
// expressions don't satisfy the hive UDF, such as type mismatch, input number
// mismatch, etc. Here we catch the exception and throw AnalysisException instead.
if (classOf[UDF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[UDAF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveUDAFFunction(
name,
new HiveFunctionWrapper(clazz.getName),
input,
isUDAFBridgeRequired = true))
udfExpr.get.dataType // Force it to check input data types.
} else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input))
udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types.
}
} catch {
case NonFatal(e) =>
val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e"
val errorMsg =
if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
s"$noHandlerMsg\nPlease make sure your function overrides " +
"`public StructObjectInspector initialize(ObjectInspector[] args)`."
} else {
noHandlerMsg
}
val analysisException = new AnalysisException(errorMsg)
analysisException.setStackTrace(e.getStackTrace)
throw analysisException
}
udfExpr.getOrElse {
throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'")
}
} catch {
case NonFatal(e) =>
val noHandlerMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e"
val errorMsg =
if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
s"$noHandlerMsg\nPlease make sure your function overrides " +
"`public StructObjectInspector initialize(ObjectInspector[] args)`."
} else {
noHandlerMsg
}
val analysisException = new AnalysisException(errorMsg)
analysisException.setStackTrace(e.getStackTrace)
throw analysisException
}
udfExpr.getOrElse {
throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'")
}
}
}
Expand Down
1 change: 1 addition & 0 deletions sql/hive/src/test/noclasspath/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Place files which are being used as resources of tests but shouldn't be added to classpath.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils

case class Nested1(f1: Nested2)
case class Nested2(f2: Nested3)
Expand Down Expand Up @@ -2327,4 +2328,51 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}

test("SPARK-26560 Spark should be able to run Hive UDF using jar regardless of " +
"current thread context classloader") {
// force to use Spark classloader as other test (even in other test suites) may change the
// current thread's context classloader to jar classloader
Utils.withContextClassLoader(Utils.getSparkClassLoader) {
withUserDefinedFunction("udtf_count3" -> false) {
val sparkClassLoader = Thread.currentThread().getContextClassLoader

// This jar file should not be placed to the classpath; GenericUDTFCount3 is slightly
// modified version of GenericUDTFCount2 in hive/contrib, which emits the count for
// three times.
val jarPath = "src/test/noclasspath/TestUDTF-spark-26560.jar"
val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"

sql(
s"""
|CREATE FUNCTION udtf_count3
|AS 'org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount3'
|USING JAR '$jarURL'
""".stripMargin)

assert(Thread.currentThread().getContextClassLoader eq sparkClassLoader)

// JAR will be loaded at first usage, and it will change the current thread's
// context classloader to jar classloader in sharedState.
// See SessionState.addJar for details.
checkAnswer(
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Row(3) :: Nil)

assert(Thread.currentThread().getContextClassLoader ne sparkClassLoader)
assert(Thread.currentThread().getContextClassLoader eq
spark.sqlContext.sharedState.jarClassLoader)

// Roll back to the original classloader and run query again. Without this line, the test
// would pass, as thread's context classloader is changed to jar classloader. But thread
// context classloader can be changed from others as well which would fail the query; one
// example is spark-shell, which thread context classloader rolls back automatically. This
// mimics the behavior of spark-shell.
Thread.currentThread().setContextClassLoader(sparkClassLoader)
checkAnswer(
sql("SELECT udtf_count3(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"),
Row(3) :: Row(3) :: Row(3) :: Nil)
}
}
}
}