Skip to content

Commit 2ca60ac

Browse files
committed
[SPARK-7491] [SQL] Allow configuration of classloader isolation for hive
Author: Michael Armbrust <[email protected]> Closes #6167 from marmbrus/configureIsolation and squashes the following commits: 6147cbe [Michael Armbrust] filter other conf 22cc3bc [Michael Armbrust] Merge remote-tracking branch 'origin/master' into configureIsolation 07476ee [Michael Armbrust] filter empty prefixes dfdf19c [Michael Armbrust] [SPARK-6906][SQL] Allow configuration of classloader isolation for hive
1 parent 5645628 commit 2ca60ac

File tree

3 files changed

+46
-10
lines changed

3 files changed

+46
-10
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
122122
protected[hive] def hiveMetastoreJars: String =
123123
getConf(HIVE_METASTORE_JARS, "builtin")
124124

125+
/**
126+
* A comma separated list of class prefixes that should be loaded using the classloader that
127+
* is shared between Spark SQL and a specific version of Hive. An example of classes that should
128+
* be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need
129+
* to be shared are those that interact with classes that are already shared. For example,
130+
* custom appenders that are used by log4j.
131+
*/
132+
protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
133+
getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes)
134+
.split(",").filterNot(_ == "")
135+
136+
private def jdbcPrefixes = Seq(
137+
"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc").mkString(",")
138+
139+
/**
140+
* A comma separated list of class prefixes that should explicitly be reloaded for each version
141+
* of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a
142+
* prefix that typically would be shared (i.e. org.apache.spark.*)
143+
*/
144+
protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
145+
getConf("spark.sql.hive.metastore.barrierPrefixes", "")
146+
.split(",").filterNot(_ == "")
147+
125148
@transient
126149
protected[sql] lazy val substitutor = new VariableSubstitution()
127150

@@ -179,12 +202,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
179202
version = metaVersion,
180203
execJars = jars.toSeq,
181204
config = allConfig,
182-
isolationOn = true)
205+
isolationOn = true,
206+
barrierPrefixes = hiveMetastoreBarrierPrefixes,
207+
sharedPrefixes = hiveMetastoreSharedPrefixes)
183208
} else if (hiveMetastoreJars == "maven") {
184209
// TODO: Support for loading the jars from an already downloaded location.
185210
logInfo(
186211
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using maven.")
187-
IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig )
212+
IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig)
188213
} else {
189214
// Convert to files and expand any directories.
190215
val jars =
@@ -210,7 +235,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
210235
version = metaVersion,
211236
execJars = jars.toSeq,
212237
config = allConfig,
213-
isolationOn = true)
238+
isolationOn = true,
239+
barrierPrefixes = hiveMetastoreBarrierPrefixes,
240+
sharedPrefixes = hiveMetastoreSharedPrefixes)
214241
}
215242
isolatedLoader.client
216243
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ private[hive] object IsolatedClientLoader {
5656
(if (version.hasBuiltinsJar) "hive-builtins" :: Nil else Nil))
5757
.map(a => s"org.apache.hive:$a:${version.fullVersion}") :+
5858
"com.google.guava:guava:14.0.1" :+
59-
"org.apache.hadoop:hadoop-client:2.4.0" :+
60-
"mysql:mysql-connector-java:5.1.12"
59+
"org.apache.hadoop:hadoop-client:2.4.0"
6160

6261
val classpath = quietly {
6362
SparkSubmitUtils.resolveMavenCoordinates(
@@ -106,7 +105,9 @@ private[hive] class IsolatedClientLoader(
106105
val config: Map[String, String] = Map.empty,
107106
val isolationOn: Boolean = true,
108107
val rootClassLoader: ClassLoader = ClassLoader.getSystemClassLoader.getParent.getParent,
109-
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader)
108+
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
109+
val sharedPrefixes: Seq[String] = Seq.empty,
110+
val barrierPrefixes: Seq[String] = Seq.empty)
110111
extends Logging {
111112

112113
// Check to make sure that the root classloader does not know about Hive.
@@ -122,13 +123,14 @@ private[hive] class IsolatedClientLoader(
122123
name.startsWith("scala.") ||
123124
name.startsWith("com.google") ||
124125
name.startsWith("java.lang.") ||
125-
name.startsWith("java.net")
126+
name.startsWith("java.net") ||
127+
sharedPrefixes.exists(name.startsWith)
126128

127129
/** True if `name` refers to a spark class that must see specific version of Hive. */
128130
protected def isBarrierClass(name: String): Boolean =
129-
name.startsWith("org.apache.spark.sql.hive.execution.PairSerDe") ||
130131
name.startsWith(classOf[ClientWrapper].getName) ||
131-
name.startsWith(classOf[ReflectionMagic].getName)
132+
name.startsWith(classOf[ReflectionMagic].getName) ||
133+
barrierPrefixes.exists(name.startsWith)
132134

133135
protected def classToPath(name: String): String =
134136
name.replaceAll("\\.", "/") + ".class"

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,14 @@ import scala.collection.JavaConversions._
4848
// SPARK-3729: Test key required to check for initialization errors with config.
4949
object TestHive
5050
extends TestHiveContext(
51-
new SparkContext("local[2]", "TestSQLContext", new SparkConf().set("spark.sql.test", "")))
51+
new SparkContext(
52+
"local[2]",
53+
"TestSQLContext",
54+
new SparkConf()
55+
.set("spark.sql.test", "")
56+
.set(
57+
"spark.sql.hive.metastore.barrierPrefixes",
58+
"org.apache.spark.sql.hive.execution.PairSerDe")))
5259

5360
/**
5461
* A locally running test instance of Spark's Hive execution engine.

0 commit comments

Comments
 (0)