Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private[hive] class HiveClientImpl(
case hive.v1_0 => new Shim_v1_0()
case hive.v1_1 => new Shim_v1_1()
case hive.v1_2 => new Shim_v1_2()
case hive.v2_0 => new Shim_v2_0()
}

// Create an internal session state for this HiveClientImpl.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,3 +833,77 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
}

}

private[client] class Shim_v2_0 extends Shim_v1_2 {
private lazy val loadPartitionMethod =
findMethod(
classOf[Hive],
"loadPartition",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadTableMethod =
findMethod(
classOf[Hive],
"loadTable",
classOf[Path],
classOf[String],
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
private lazy val loadDynamicPartitionsMethod =
findMethod(
classOf[Hive],
"loadDynamicPartitions",
classOf[Path],
classOf[String],
classOf[JMap[String, String]],
JBoolean.TYPE,
JInteger.TYPE,
JBoolean.TYPE,
JBoolean.TYPE,
JLong.TYPE)

override def loadPartition(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal: JBoolean, JBoolean.FALSE)
}

override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
JBoolean.FALSE, JBoolean.FALSE)
}

override def loadDynamicPartitions(
hive: Hive,
loadPath: Path,
tableName: String,
partSpec: JMap[String, String],
replace: Boolean,
numDP: Int,
listBucketingEnabled: Boolean): Unit = {
loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
numDP: JInteger, listBucketingEnabled: JBoolean, JBoolean.FALSE, 0L: JLong)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case "1.0" | "1.0.0" => hive.v1_0
case "1.1" | "1.1.0" => hive.v1_1
case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
}

private def downloadVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive

/** Support for interacting with different versions of the HiveMetastoreClient */
package object client {
private[hive] abstract class HiveVersion(
private[hive] sealed abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil)
Expand Down Expand Up @@ -62,6 +62,12 @@ package object client {
"org.pentaho:pentaho-aggdesigner-algorithm",
"net.hydromatic:linq4j",
"net.hydromatic:quidem"))

case object v2_0 extends HiveVersion("2.0.1",
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))

val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0)
}
// scalastyle:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,16 @@ case class InsertIntoHiveTable(
// We have to follow the Hive behavior here, to avoid troubles. For example, if we create
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0)

// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
allSupportedHiveVersions)

if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
oldVersionExternalTempPath(path, hadoopConf, scratchDir)
} else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
newVersionExternalTempPath(path, hadoopConf, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,26 @@ private[client] class HiveClientBuilder {
Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
}

private def buildConf() = {
private def buildConf(extraConf: Map[String, String]) = {
lazy val warehousePath = Utils.createTempDir()
lazy val metastorePath = Utils.createTempDir()
metastorePath.delete()
Map(
extraConf ++ Map(
"javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
"hive.metastore.warehouse.dir" -> warehousePath.toString)
}

def buildClient(version: String, hadoopConf: Configuration): HiveClient = {
// for testing only
def buildClient(
version: String,
hadoopConf: Configuration,
extraConf: Map[String, String] = Map.empty): HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
sparkConf = sparkConf,
hadoopConf = hadoopConf,
config = buildConf(),
config = buildConf(extraConf),
ivyPath = ivyPath).createClient()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}

private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2")
private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0")

private var client: HiveClient = null

Expand All @@ -98,7 +98,12 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
System.gc() // Hack to avoid SEGV on some JVM versions.
val hadoopConf = new Configuration()
hadoopConf.set("test", "success")
client = buildClient(version, hadoopConf)
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false since 2.0
// For details, see the JIRA HIVE-6113
if (version == "2.0") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
}
client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
}

def table(database: String, tableName: String): CatalogTable = {
Expand Down