Skip to content

Commit dfdf19c

Browse files
committed
[SPARK-6906][SQL] Allow configuration of classloader isolation for hive
1 parent 48fc38f commit dfdf19c

File tree

3 files changed

+44
-10
lines changed

3 files changed

+44
-10
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,27 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
122122
protected[hive] def hiveMetastoreJars: String =
123123
getConf(HIVE_METASTORE_JARS, "builtin")
124124

125+
/**
126+
* A comma separated list of class prefixes that should be loaded using the classloader that
127+
* is shared between Spark SQL and a specific version of Hive. An example of classes that should
128+
* be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
129+
* to be shared are those that interact with classes that are already shared. For example,
130+
* custom appenders that are used by log4j.
131+
*/
132+
protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
133+
getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes).split(",")
134+
135+
private def jdbcPrefixes = Seq(
136+
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc").mkString(",")
137+
138+
/**
139+
* A comma separated list of class prefixes that should explicitly be reloaded for each version
140+
* of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
141+
* prefix that typically would be shared (i.e. org.apache.spark.*)
142+
*/
143+
protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
144+
getConf("spark.sql.hive.metastore.barrierPrefixes", "").split(",")
145+
125146
@transient
126147
protected[sql] lazy val substitutor = new VariableSubstitution()
127148

@@ -179,12 +200,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
179200
version = metaVersion,
180201
execJars = jars.toSeq,
181202
config = allConfig,
182-
isolationOn = true)
203+
isolationOn = true,
204+
barrierPrefixes = hiveMetastoreBarrierPrefixes,
205+
sharedPrefixes = hiveMetastoreSharedPrefixes)
183206
} else if (hiveMetastoreJars == "maven") {
184207
// TODO: Support for loading the jars from an already downloaded location.
185208
logInfo(
186209
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
187-
IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig )
210+
IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig)
188211
} else {
189212
// Convert to files and expand any directories.
190213
val jars =
@@ -210,7 +233,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
210233
version = metaVersion,
211234
execJars = jars.toSeq,
212235
config = allConfig,
213-
isolationOn = true)
236+
isolationOn = true,
237+
barrierPrefixes = hiveMetastoreBarrierPrefixes,
238+
sharedPrefixes = hiveMetastoreSharedPrefixes)
214239
}
215240
isolatedLoader.client
216241
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ private[hive] object IsolatedClientLoader {
5656
(if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
5757
.map(a => s"org.apache.hive:$a:${version.fullVersion}") :+
5858
"com.google.guava:guava:14.0.1" :+
59-
"org.apache.hadoop:hadoop-client:2.4.0" :+
60-
"mysql:mysql-connector-java:5.1.12"
59+
"org.apache.hadoop:hadoop-client:2.4.0"
6160

6261
val classpath = quietly {
6362
SparkSubmitUtils.resolveMavenCoordinates(
@@ -106,7 +105,9 @@ private[hive] class IsolatedClientLoader(
106105
val config: Map[String, String] = Map.empty,
107106
val isolationOn: Boolean = true,
108107
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
109-
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader)
108+
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
109+
val sharedPrefixes: Seq[String] = Seq.empty,
110+
val barrierPrefixes: Seq[String] = Seq.empty)
110111
extends Logging {
111112

112113
// Check to make sure that the root classloader does not know about Hive.
@@ -122,13 +123,14 @@ private[hive] class IsolatedClientLoader(
122123
name.startsWith("scala.") ||
123124
name.startsWith("com.google") ||
124125
name.startsWith("java.lang.") ||
125-
name.startsWith("java.net")
126+
name.startsWith("java.net") ||
127+
sharedPrefixes.exists(name.startsWith)
126128

127129
/** True if `name` refers to a spark class that must see specific version of Hive. */
128130
protected def isBarrierClass(name: String): Boolean =
129-
name.startsWith("org.apache.spark.sql.hive.execution.PairSerDe") ||
130131
name.startsWith(classOf[ClientWrapper].getName) ||
131-
name.startsWith(classOf[ReflectionMagic].getName)
132+
name.startsWith(classOf[ReflectionMagic].getName) ||
133+
barrierPrefixes.exists(name.startsWith)
132134

133135
protected def classToPath(name: String): String =
134136
name.replaceAll("\\.", "/") + ".class"

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,14 @@ import scala.collection.JavaConversions._
4848
// SPARK-3729: Test key required to check for initialization errors with config.
4949
object TestHive
5050
extends TestHiveContext(
51-
new SparkContext("local[2]", "TestSQLContext", new SparkConf().set("spark.sql.test", "")))
51+
new SparkContext(
52+
"local[2]",
53+
"TestSQLContext",
54+
new SparkConf()
55+
.set("spark.sql.test", "")
56+
.set(
57+
"spark.sql.hive.metastore.barrierPrefixes",
58+
"org.apache.spark.sql.hive.execution.PairSerDe")))
5259

5360
/**
5461
* A locally running test instance of Spark's Hive execution engine.

0 commit comments

Comments
 (0)