Skip to content

Commit ff14801

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metastore
## What changes were proposed in this pull request? This is based on #20668 for supporting Hive 2.2 and Hive 2.3 metastore. When we merge the PR, we should give the major credit to wangyum ## How was this patch tested? Added the test cases Author: Yuming Wang <[email protected]> Author: gatorsmile <[email protected]> Closes #20671 from gatorsmile/pr-20668.
1 parent 22f3d33 commit ff14801

File tree

9 files changed

+72
-12
lines changed

9 files changed

+72
-12
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging {
6262

6363
val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
6464
.doc("Version of the Hive metastore. Available options are " +
65-
s"<code>0.12.0</code> through <code>2.1.1</code>.")
65+
s"<code>0.12.0</code> through <code>2.3.2</code>.")
6666
.stringConf
6767
.createWithDefault(builtinHiveVersion)
6868

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
2525
import scala.collection.mutable
2626
import scala.collection.mutable.ArrayBuffer
2727

28-
import org.apache.hadoop.conf.Configuration
2928
import org.apache.hadoop.fs.Path
3029
import org.apache.hadoop.hive.common.StatsSetupConst
3130
import org.apache.hadoop.hive.conf.HiveConf
@@ -104,6 +103,8 @@ private[hive] class HiveClientImpl(
104103
case hive.v1_2 => new Shim_v1_2()
105104
case hive.v2_0 => new Shim_v2_0()
106105
case hive.v2_1 => new Shim_v2_1()
106+
case hive.v2_2 => new Shim_v2_2()
107+
case hive.v2_3 => new Shim_v2_3()
107108
}
108109

109110
// Create an internal session state for this HiveClientImpl.

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -880,9 +880,7 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
880880

881881
}
882882

883-
private[client] class Shim_v1_0 extends Shim_v0_14 {
884-
885-
}
883+
private[client] class Shim_v1_0 extends Shim_v0_14
886884

887885
private[client] class Shim_v1_1 extends Shim_v1_0 {
888886

@@ -1146,3 +1144,7 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
11461144
alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
11471145
}
11481146
}
1147+
1148+
private[client] class Shim_v2_2 extends Shim_v2_1
1149+
1150+
private[client] class Shim_v2_3 extends Shim_v2_1

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ private[hive] object IsolatedClientLoader extends Logging {
9797
case "1.2" | "1.2.0" | "1.2.1" | "1.2.2" => hive.v1_2
9898
case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
9999
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
100+
case "2.2" | "2.2.0" => hive.v2_2
101+
case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" => hive.v2_3
100102
}
101103

102104
private def downloadVersion(

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,15 @@ package object client {
7171
exclusions = Seq("org.apache.curator:*",
7272
"org.pentaho:pentaho-aggdesigner-algorithm"))
7373

74-
val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1)
74+
case object v2_2 extends HiveVersion("2.2.0",
75+
exclusions = Seq("org.apache.curator:*",
76+
"org.pentaho:pentaho-aggdesigner-algorithm"))
77+
78+
case object v2_3 extends HiveVersion("2.3.2",
79+
exclusions = Seq("org.apache.curator:*",
80+
"org.pentaho:pentaho-aggdesigner-algorithm"))
81+
82+
val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
7583
}
7684
// scalastyle:on
7785

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
114114
// staging directory under the table director for Hive prior to 1.1, the staging directory will
115115
// be removed by Hive when Hive is trying to empty the table directory.
116116
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
117-
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
117+
val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
118+
Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
118119

119120
// Ensure all the supported versions are considered here.
120121
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ import scala.collection.immutable.IndexedSeq
2222
import org.apache.spark.SparkFunSuite
2323

2424
private[client] trait HiveClientVersions {
25-
protected val versions = IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
25+
protected val versions =
26+
IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
2627
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
3434
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
3535
// hive.metastore.schema.verification from false to true since 2.0
3636
// For details, see the JIRA HIVE-6113 and HIVE-12463
37-
if (version == "2.0" || version == "2.1") {
37+
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
3838
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
3939
hadoopConf.set("hive.metastore.schema.verification", "false")
4040
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
2121
import java.net.URI
2222

2323
import org.apache.hadoop.conf.Configuration
24+
import org.apache.hadoop.hive.common.StatsSetupConst
2425
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
2526
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
2627
import org.apache.hadoop.mapred.TextInputFormat
@@ -110,7 +111,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
110111
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
111112
}
112113

113-
private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
114+
private val versions =
115+
Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
114116

115117
private var client: HiveClient = null
116118

@@ -125,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
125127
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
126128
// hive.metastore.schema.verification from false to true since 2.0
127129
// For details, see the JIRA HIVE-6113 and HIVE-12463
128-
if (version == "2.0" || version == "2.1") {
130+
if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
129131
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
130132
hadoopConf.set("hive.metastore.schema.verification", "false")
131133
}
@@ -422,15 +424,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
422424

423425
test(s"$version: alterPartitions") {
424426
val spec = Map("key1" -> "1", "key2" -> "2")
427+
val parameters = Map(StatsSetupConst.TOTAL_SIZE -> "0", StatsSetupConst.NUM_FILES -> "1")
425428
val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
426429
val storage = storageFormat.copy(
427430
locationUri = Some(newLocation),
428431
// needed for 0.12 alter partitions
429432
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
430-
val partition = CatalogTablePartition(spec, storage)
433+
val partition = CatalogTablePartition(spec, storage, parameters)
431434
client.alterPartitions("default", "src_part", Seq(partition))
432435
assert(client.getPartition("default", "src_part", spec)
433436
.storage.locationUri == Some(newLocation))
437+
assert(client.getPartition("default", "src_part", spec)
438+
.parameters.get(StatsSetupConst.TOTAL_SIZE) == Some("0"))
434439
}
435440

436441
test(s"$version: dropPartitions") {
@@ -633,6 +638,46 @@ class VersionsSuite extends SparkFunSuite with Logging {
633638
}
634639
}
635640

641+
test(s"$version: CREATE Partitioned TABLE AS SELECT") {
642+
withTable("tbl") {
643+
versionSpark.sql(
644+
"""
645+
|CREATE TABLE tbl(c1 string)
646+
|PARTITIONED BY (ds STRING)
647+
""".stripMargin)
648+
versionSpark.sql("INSERT OVERWRITE TABLE tbl partition (ds='2') SELECT '1'")
649+
650+
assert(versionSpark.table("tbl").collect().toSeq == Seq(Row("1", "2")))
651+
val partMeta = versionSpark.sessionState.catalog.getPartition(
652+
TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
653+
val totalSize = partMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
654+
val numFiles = partMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
655+
// Except 0.12, all the following versions will fill the Hive-generated statistics
656+
if (version == "0.12") {
657+
assert(totalSize.isEmpty && numFiles.isEmpty)
658+
} else {
659+
assert(totalSize.nonEmpty && numFiles.nonEmpty)
660+
}
661+
662+
versionSpark.sql(
663+
"""
664+
|ALTER TABLE tbl PARTITION (ds='2')
665+
|SET SERDEPROPERTIES ('newKey' = 'vvv')
666+
""".stripMargin)
667+
val newPartMeta = versionSpark.sessionState.catalog.getPartition(
668+
TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
669+
670+
val newTotalSize = newPartMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
671+
val newNumFiles = newPartMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
672+
// Except 0.12, all the following versions will fill the Hive-generated statistics
673+
if (version == "0.12") {
674+
assert(newTotalSize.isEmpty && newNumFiles.isEmpty)
675+
} else {
676+
assert(newTotalSize.nonEmpty && newNumFiles.nonEmpty)
677+
}
678+
}
679+
}
680+
636681
test(s"$version: Delete the temporary staging directory and files after each insert") {
637682
withTempDir { tmpDir =>
638683
withTable("tab") {

0 commit comments

Comments
 (0)