Skip to content

Commit daa70bf

Browse files
committed
[SPARK-6907] [SQL] Isolated client for HiveMetastore
This PR adds initial support for loading multiple versions of Hive in a single JVM and provides a common interface for extracting metadata from the `HiveMetastoreClient` for a given version. This is accomplished by creating an isolated `ClassLoader` that operates according to the following rules: - __Shared Classes__: Java, Scala, logging, and Spark classes are delegated to `baseClassLoader` allowing the results of calls to the `ClientInterface` to be visible externally. - __Hive Classes__: new instances are loaded from `execJars`. These classes are not accessible externally due to their custom loading. - __Barrier Classes__: Classes such as `ClientWrapper` are defined in Spark but must link to a specific version of Hive. As a result, the bytecode is acquired from the Spark `ClassLoader` but a new copy is created for each instance of `IsolatedClientLoader`. This new instance is able to see a specific version of hive without using reflection where ever hive is consistent across versions. Since this is a unique instance, it is not visible externally other than as a generic `ClientInterface`, unless `isolationOn` is set to `false`. In addition to the unit tests, I have also tested this locally against mysql instances of the Hive Metastore. I've also successfully ported Spark SQL to run with this client, but due to the size of the changes, that will come in a follow-up PR. By default, Hive jars are currently downloaded from Maven automatically for a given version to ease packaging and testing. However, there is also support for specifying their location manually for deployments without internet. Author: Michael Armbrust <[email protected]> Closes #5851 from marmbrus/isolatedClient and squashes the following commits: c72f6ac [Michael Armbrust] rxins comments 1e271fa [Michael Armbrust] [SPARK-6907][SQL] Isolated client for HiveMetastore
1 parent f4af925 commit daa70bf

File tree

9 files changed

+1088
-7
lines changed

9 files changed

+1088
-7
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ object SparkSubmit {
701701
}
702702

703703
/** Provides utility functions to be used inside SparkSubmit. */
704-
private[deploy] object SparkSubmitUtils {
704+
private[spark] object SparkSubmitUtils {
705705

706706
// Exposed for testing
707707
var printStream = SparkSubmit.printStream

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
2727
*/
2828
class NoSuchTableException extends Exception
2929

30+
class NoSuchDatabaseException extends Exception
31+
3032
/**
3133
* An interface for looking up relations by name. Used by an [[Analyzer]].
3234
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,31 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20-
import java.io.{PrintWriter, ByteArrayOutputStream, FileInputStream, File}
20+
import java.io._
2121

2222
import org.apache.spark.util.Utils
2323

2424
package object util {
2525

26+
/** Silences output to stderr or stdout for the duration of f */
27+
def quietly[A](f: => A): A = {
28+
val origErr = System.err
29+
val origOut = System.out
30+
try {
31+
System.setErr(new PrintStream(new OutputStream {
32+
def write(b: Int) = {}
33+
}))
34+
System.setOut(new PrintStream(new OutputStream {
35+
def write(b: Int) = {}
36+
}))
37+
38+
f
39+
} finally {
40+
System.setErr(origErr)
41+
System.setOut(origOut)
42+
}
43+
}
44+
2645
def fileToString(file: File, encoding: String = "UTF-8"): String = {
2746
val inStream = new FileInputStream(file)
2847
val outStream = new ByteArrayOutputStream
@@ -42,10 +61,9 @@ package object util {
4261
new String(outStream.toByteArray, encoding)
4362
}
4463

45-
def resourceToString(
46-
resource:String,
47-
encoding: String = "UTF-8",
48-
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
64+
def resourceToBytes(
65+
resource: String,
66+
classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = {
4967
val inStream = classLoader.getResourceAsStream(resource)
5068
val outStream = new ByteArrayOutputStream
5169
try {
@@ -61,7 +79,14 @@ package object util {
6179
finally {
6280
inStream.close()
6381
}
64-
new String(outStream.toByteArray, encoding)
82+
outStream.toByteArray
83+
}
84+
85+
def resourceToString(
86+
resource:String,
87+
encoding: String = "UTF-8",
88+
classLoader: ClassLoader = Utils.getSparkClassLoader): String = {
89+
new String(resourceToBytes(resource, classLoader), encoding)
6590
}
6691

6792
def stringToFile(file: File, str: String): File = {
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.client
19+
20+
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
21+
22+
case class HiveDatabase(
23+
name: String,
24+
location: String)
25+
26+
abstract class TableType { val name: String }
27+
case object ExternalTable extends TableType { override val name = "EXTERNAL_TABLE" }
28+
case object IndexTable extends TableType { override val name = "INDEX_TABLE" }
29+
case object ManagedTable extends TableType { override val name = "MANAGED_TABLE" }
30+
case object VirtualView extends TableType { override val name = "VIRTUAL_VIEW" }
31+
32+
case class HiveStorageDescriptor(
33+
location: String,
34+
inputFormat: String,
35+
outputFormat: String,
36+
serde: String)
37+
38+
case class HivePartition(
39+
values: Seq[String],
40+
storage: HiveStorageDescriptor)
41+
42+
case class HiveColumn(name: String, hiveType: String, comment: String)
43+
case class HiveTable(
44+
specifiedDatabase: Option[String],
45+
name: String,
46+
schema: Seq[HiveColumn],
47+
partitionColumns: Seq[HiveColumn],
48+
properties: Map[String, String],
49+
serdeProperties: Map[String, String],
50+
tableType: TableType,
51+
location: Option[String] = None,
52+
inputFormat: Option[String] = None,
53+
outputFormat: Option[String] = None,
54+
serde: Option[String] = None) {
55+
56+
@transient
57+
private[client] var client: ClientInterface = _
58+
59+
private[client] def withClient(ci: ClientInterface): this.type = {
60+
client = ci
61+
this
62+
}
63+
64+
def database: String = specifiedDatabase.getOrElse(sys.error("database not resolved"))
65+
66+
def isPartitioned: Boolean = partitionColumns.nonEmpty
67+
68+
def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
69+
70+
// Hive does not support backticks when passing names to the client.
71+
def qualifiedName: String = s"$database.$name"
72+
}
73+
74+
/**
75+
* An externally visible interface to the Hive client. This interface is shared across both the
76+
* internal and external classloaders for a given version of Hive and thus must expose only
77+
* shared classes.
78+
*/
79+
trait ClientInterface {
80+
/**
81+
* Runs a HiveQL command using Hive, returning the results as a list of strings. Each row will
82+
* result in one string.
83+
*/
84+
def runSqlHive(sql: String): Seq[String]
85+
86+
/** Returns the names of all tables in the given database. */
87+
def listTables(dbName: String): Seq[String]
88+
89+
/** Returns the name of the active database. */
90+
def currentDatabase: String
91+
92+
/** Returns the metadata for specified database, throwing an exception if it doesn't exist */
93+
def getDatabase(name: String): HiveDatabase = {
94+
getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException)
95+
}
96+
97+
/** Returns the metadata for a given database, or None if it doesn't exist. */
98+
def getDatabaseOption(name: String): Option[HiveDatabase]
99+
100+
/** Returns the specified table, or throws [[NoSuchTableException]]. */
101+
def getTable(dbName: String, tableName: String): HiveTable = {
102+
getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException)
103+
}
104+
105+
/** Returns the metadata for the specified table or None if it doens't exist. */
106+
def getTableOption(dbName: String, tableName: String): Option[HiveTable]
107+
108+
/** Creates a table with the given metadata. */
109+
def createTable(table: HiveTable): Unit
110+
111+
/** Updates the given table with new metadata. */
112+
def alterTable(table: HiveTable): Unit
113+
114+
/** Creates a new database with the given name. */
115+
def createDatabase(database: HiveDatabase): Unit
116+
117+
/** Returns all partitions for the given table. */
118+
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
119+
120+
/** Loads a static partition into an existing table. */
121+
def loadPartition(
122+
loadPath: String,
123+
tableName: String,
124+
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
125+
replace: Boolean,
126+
holdDDLTime: Boolean,
127+
inheritTableSpecs: Boolean,
128+
isSkewedStoreAsSubdir: Boolean): Unit
129+
130+
/** Loads data into an existing table. */
131+
def loadTable(
132+
loadPath: String, // TODO URI
133+
tableName: String,
134+
replace: Boolean,
135+
holdDDLTime: Boolean): Unit
136+
137+
/** Loads new dynamic partitions into an existing table. */
138+
def loadDynamicPartitions(
139+
loadPath: String,
140+
tableName: String,
141+
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
142+
replace: Boolean,
143+
numDP: Int,
144+
holdDDLTime: Boolean,
145+
listBucketingEnabled: Boolean): Unit
146+
147+
/** Used for testing only. Removes all metadata from this instance of Hive. */
148+
def reset(): Unit
149+
}

0 commit comments

Comments
 (0)