Skip to content

Commit 2e34427

Browse files
xkrogensunchao
authored andcommitted
[SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client
### What changes were proposed in this pull request? When using the 'builtin' Hive version for the Hive metadata client, do not create a separate classloader, and rather continue to use the overall user/application classloader (regardless of Java version). This standardizes the behavior for all Java versions with that of Java 9+. See SPARK-42539 for more details on why this approach was chosen. Please note that this is a re-submit of #40144. That one introduced test failures, and potentially a real issue, because the PR works by setting `isolationOn = false` for `builtin` mode. In addition to adjusting the classloader, `HiveClientImpl` relies on `isolationOn` to determine if it should use an isolated copy of `SessionState`, so the PR inadvertently switched to using a shared `SessionState` object. I think we do want to continue to have the isolated session state even in `builtin` mode, so this adds a new flag `sessionStateIsolationOn` which controls whether the session state should be isolated, _separately_ from the `isolationOn` flag which controls whether the classloader should be isolated. Default behavior is for `sessionStateIsolationOn` to be set equal to `isolationOn`, but for `builtin` mode, we override it to enable session state isolated even though classloader isolation is turned off. ### Why are the changes needed? Please see a much more detailed description in SPARK-42539. The tl;dr is that user-provided JARs (such as `hive-exec-2.3.8.jar`) take precedence over Spark/system JARs when constructing the classloader used by `IsolatedClientLoader` on Java 8 in 'builtin' mode, which can cause unexpected behavior and/or breakages. This violates the expectation that, unless user-first classloader mode is used, Spark JARs should be prioritized over user JARs. It also seems that this separate classloader was unnecessary from the start, since the intent of 'builtin' mode is to use the JARs already existing on the regular classloader (as alluded to [here](#24057 (comment))). The isolated clientloader was originally added in #5876 in 2015. This bit in the PR description is the only mention of the behavior for "builtin": > attempt to discover the jars that were used to load Spark SQL and use those. This option is only valid when using the execution version of Hive. I can't follow the logic here; the user classloader clearly has all of the necessary Hive JARs, since that's where we're getting the JAR URLs from, so we could just use that directly instead of grabbing the URLs. When this was initially added, it only used the JARs from the user classloader, not any of its parents, which I suspect was the motivating factor (to try to avoid more Spark classes being duplicated inside of the isolated classloader, I guess). But that was changed a month later anyway in #6435 / #6459, so I think this may have basically been deadcode from the start. It has also caused at least one issue over the years, e.g. SPARK-21428, which disables the new-classloader behavior in the case of running inside of a CLI session. ### Does this PR introduce _any_ user-facing change? No, except to protect Spark itself from potentially being broken by bad user JARs. ### How was this patch tested? This includes a new unit test in `HiveUtilsSuite` which demonstrates the issue and shows that this approach resolves it. It has also been tested on a live cluster running Java 8 and Hive communication functionality continues to work as expected. Unit tests failing in #40144 have been locally tested (`HiveUtilsSuite`, `HiveSharedStateSuite`, `HiveCliSessionStateSuite`, `JsonHadoopFsRelationSuite`). Closes #40224 from xkrogen/xkrogen/SPARK-42539/hive-isolatedclientloader-builtin-user-jar-conflict-fix/take2. Authored-by: Erik Krogen <[email protected]> Signed-off-by: Chao Sun <[email protected]>
1 parent 3b258ef commit 2e34427

File tree

5 files changed

+91
-81
lines changed

5 files changed

+91
-81
lines changed

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,15 @@ private[spark] object TestUtils {
193193
baseClass: String = null,
194194
classpathUrls: Seq[URL] = Seq.empty,
195195
implementsClasses: Seq[String] = Seq.empty,
196-
extraCodeBody: String = ""): File = {
196+
extraCodeBody: String = "",
197+
packageName: Option[String] = None): File = {
197198
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
198199
val implementsText =
199200
"implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ")
201+
val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
200202
val sourceFile = new JavaSourceFromString(className,
201203
s"""
204+
|$packageText
202205
|public class $className $extendsText $implementsText {
203206
| @Override public String toString() { return "$toStringValue"; }
204207
|

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818
package org.apache.spark.sql.hive
1919

2020
import java.io.File
21-
import java.net.{URL, URLClassLoader}
21+
import java.net.URL
2222
import java.util.Locale
2323
import java.util.concurrent.TimeUnit
2424

2525
import scala.collection.JavaConverters._
2626
import scala.collection.mutable.HashMap
2727
import scala.util.Try
2828

29-
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
3029
import org.apache.hadoop.conf.Configuration
3130
import org.apache.hadoop.hive.conf.HiveConf
3231
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
@@ -46,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf
4645
import org.apache.spark.sql.internal.SQLConf._
4746
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
4847
import org.apache.spark.sql.types._
49-
import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
48+
import org.apache.spark.util.Utils
5049

5150

5251
private[spark] object HiveUtils extends Logging {
@@ -409,43 +408,15 @@ private[spark] object HiveUtils extends Logging {
409408
s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.")
410409
}
411410

412-
// We recursively find all jars in the class loader chain,
413-
// starting from the given classLoader.
414-
def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
415-
case null => Array.empty[URL]
416-
case childFirst: ChildFirstURLClassLoader =>
417-
childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader)
418-
case urlClassLoader: URLClassLoader =>
419-
urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
420-
case other => allJars(other.getParent)
421-
}
422-
423-
val classLoader = Utils.getContextOrSparkClassLoader
424-
val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
425-
// Do nothing. The system classloader is no longer a URLClassLoader in Java 9,
426-
// so it won't match the case in allJars. It no longer exposes URLs of
427-
// the system classpath
428-
Array.empty[URL]
429-
} else {
430-
val loadedJars = allJars(classLoader)
431-
// Verify at least one jar was found
432-
if (loadedJars.length == 0) {
433-
throw new IllegalArgumentException(
434-
"Unable to locate hive jars to connect to metastore. " +
435-
s"Please set ${HIVE_METASTORE_JARS.key}.")
436-
}
437-
loadedJars
438-
}
439-
440411
logInfo(
441412
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
442413
new IsolatedClientLoader(
443414
version = metaVersion,
444415
sparkConf = conf,
445416
hadoopConf = hadoopConf,
446-
execJars = jars.toSeq,
447417
config = configurations,
448-
isolationOn = !isCliSessionState(),
418+
isolationOn = false,
419+
sessionStateIsolationOverride = Some(!isCliSessionState()),
449420
barrierPrefixes = hiveMetastoreBarrierPrefixes,
450421
sharedPrefixes = hiveMetastoreSharedPrefixes)
451422
} else if (hiveMetastoreJars == "maven") {

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private[hive] class HiveClientImpl(
134134
// Create an internal session state for this HiveClientImpl.
135135
val state: SessionState = {
136136
val original = Thread.currentThread().getContextClassLoader
137-
if (clientLoader.isolationOn) {
137+
if (clientLoader.sessionStateIsolationOn) {
138138
// Switch to the initClassLoader.
139139
Thread.currentThread().setContextClassLoader(initClassLoader)
140140
try {

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 50 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ private[hive] object IsolatedClientLoader extends Logging {
180180
* @param config A set of options that will be added to the HiveConf of the constructed client.
181181
* @param isolationOn When true, custom versions of barrier classes will be constructed. Must be
182182
* true unless loading the version of hive that is on Spark's classloader.
183+
* @param sessionStateIsolationOverride If present, this parameter will specify the value of
184+
* `sessionStateIsolationOn`. If empty (the default), the
185+
* value of `isolationOn` will be used.
183186
* @param baseClassLoader The spark classloader that is used to load shared classes.
184187
*/
185188
private[hive] class IsolatedClientLoader(
@@ -189,11 +192,19 @@ private[hive] class IsolatedClientLoader(
189192
val execJars: Seq[URL] = Seq.empty,
190193
val config: Map[String, String] = Map.empty,
191194
val isolationOn: Boolean = true,
195+
sessionStateIsolationOverride: Option[Boolean] = None,
192196
val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader,
193197
val sharedPrefixes: Seq[String] = Seq.empty,
194198
val barrierPrefixes: Seq[String] = Seq.empty)
195199
extends Logging {
196200

201+
/**
202+
* This controls whether the generated clients maintain an independent/isolated copy of the
203+
* Hive `SessionState`. If false, the Hive will leverage the global/static copy of
204+
* `SessionState`; if true, it will generate a new copy of the state internally.
205+
*/
206+
val sessionStateIsolationOn: Boolean = sessionStateIsolationOverride.getOrElse(isolationOn)
207+
197208
/** All jars used by the hive specific classloader. */
198209
protected def allJars = execJars.toArray
199210

@@ -232,51 +243,46 @@ private[hive] class IsolatedClientLoader(
232243
private[hive] val classLoader: MutableURLClassLoader = {
233244
val isolatedClassLoader =
234245
if (isolationOn) {
235-
if (allJars.isEmpty) {
236-
// See HiveUtils; this is the Java 9+ + builtin mode scenario
237-
baseClassLoader
238-
} else {
239-
val rootClassLoader: ClassLoader =
240-
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
241-
// In Java 9, the boot classloader can see few JDK classes. The intended parent
242-
// classloader for delegation is now the platform classloader.
243-
// See http://java9.wtf/class-loading/
244-
val platformCL =
245-
classOf[ClassLoader].getMethod("getPlatformClassLoader").
246-
invoke(null).asInstanceOf[ClassLoader]
247-
// Check to make sure that the root classloader does not know about Hive.
248-
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
249-
platformCL
246+
val rootClassLoader: ClassLoader =
247+
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
248+
// In Java 9, the boot classloader can see few JDK classes. The intended parent
249+
// classloader for delegation is now the platform classloader.
250+
// See http://java9.wtf/class-loading/
251+
val platformCL =
252+
classOf[ClassLoader].getMethod("getPlatformClassLoader").
253+
invoke(null).asInstanceOf[ClassLoader]
254+
// Check to make sure that the root classloader does not know about Hive.
255+
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
256+
platformCL
257+
} else {
258+
// The boot classloader is represented by null (the instance itself isn't accessible)
259+
// and before Java 9 can see all JDK classes
260+
null
261+
}
262+
new URLClassLoader(allJars, rootClassLoader) {
263+
override def loadClass(name: String, resolve: Boolean): Class[_] = {
264+
val loaded = findLoadedClass(name)
265+
if (loaded == null) doLoadClass(name, resolve) else loaded
266+
}
267+
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
268+
val classFileName = name.replaceAll("\\.", "/") + ".class"
269+
if (isBarrierClass(name)) {
270+
// For barrier classes, we construct a new copy of the class.
271+
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
272+
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
273+
defineClass(name, bytes, 0, bytes.length)
274+
} else if (!isSharedClass(name)) {
275+
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
276+
super.loadClass(name, resolve)
250277
} else {
251-
// The boot classloader is represented by null (the instance itself isn't accessible)
252-
// and before Java 9 can see all JDK classes
253-
null
254-
}
255-
new URLClassLoader(allJars, rootClassLoader) {
256-
override def loadClass(name: String, resolve: Boolean): Class[_] = {
257-
val loaded = findLoadedClass(name)
258-
if (loaded == null) doLoadClass(name, resolve) else loaded
259-
}
260-
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
261-
val classFileName = name.replaceAll("\\.", "/") + ".class"
262-
if (isBarrierClass(name)) {
263-
// For barrier classes, we construct a new copy of the class.
264-
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
265-
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
266-
defineClass(name, bytes, 0, bytes.length)
267-
} else if (!isSharedClass(name)) {
268-
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
269-
super.loadClass(name, resolve)
270-
} else {
271-
// For shared classes, we delegate to baseClassLoader, but fall back in case the
272-
// class is not found.
273-
logDebug(s"shared class: $name")
274-
try {
275-
baseClassLoader.loadClass(name)
276-
} catch {
277-
case _: ClassNotFoundException =>
278-
super.loadClass(name, resolve)
279-
}
278+
// For shared classes, we delegate to baseClassLoader, but fall back in case the
279+
// class is not found.
280+
logDebug(s"shared class: $name")
281+
try {
282+
baseClassLoader.loadClass(name)
283+
} catch {
284+
case _: ClassNotFoundException =>
285+
super.loadClass(name, resolve)
280286
}
281287
}
282288
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,19 @@
1717

1818
package org.apache.spark.sql.hive
1919

20+
import java.io.File
21+
import java.net.URI
22+
2023
import org.apache.hadoop.conf.Configuration
2124
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
2225

23-
import org.apache.spark.SparkConf
26+
import org.apache.spark.{SparkConf, TestUtils}
2427
import org.apache.spark.deploy.SparkHadoopUtil
2528
import org.apache.spark.sql.QueryTest
29+
import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
2630
import org.apache.spark.sql.hive.test.TestHiveSingleton
2731
import org.apache.spark.sql.test.SQLTestUtils
28-
import org.apache.spark.util.ChildFirstURLClassLoader
32+
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader}
2933

3034
class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
3135

@@ -77,6 +81,32 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
7781
}
7882
}
7983

84+
test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") {
85+
withTempDir { tmpDir =>
86+
val classFile = TestUtils.createCompiledClass(
87+
"Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata"))
88+
89+
val jarFile = new File(tmpDir, "hive-fake.jar")
90+
TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata"))
91+
92+
val conf = new SparkConf
93+
val contextClassLoader = Thread.currentThread().getContextClassLoader
94+
val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader)
95+
try {
96+
Thread.currentThread().setContextClassLoader(loader)
97+
val client = HiveUtils.newClientForMetadata(
98+
conf,
99+
SparkHadoopUtil.newConfiguration(conf),
100+
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true))
101+
client.createDatabase(
102+
CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()),
103+
ignoreIfExists = true)
104+
} finally {
105+
Thread.currentThread().setContextClassLoader(contextClassLoader)
106+
}
107+
}
108+
}
109+
80110
test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") {
81111
// Test default value
82112
val defaultConf = new Configuration

0 commit comments

Comments
 (0)