diff --git a/docs/sql-data-sources-hive-tables.md b/docs/sql-data-sources-hive-tables.md index 0de573ec64b89..70c94e5ecbda7 100644 --- a/docs/sql-data-sources-hive-tables.md +++ b/docs/sql-data-sources-hive-tables.md @@ -130,7 +130,7 @@ The following options can be used to configure the version of Hive that is used 2.3.9 Version of the Hive metastore. Available - options are 0.12.0 through 2.3.9 and 3.0.0 through 3.1.3. + options are 0.12.0 through 2.3.9 and 3.0.0 through 4.0.0. 1.4.0 diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 3b9e2733352b5..e3869111bf2b4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -121,6 +121,7 @@ private[hive] class HiveClientImpl( case hive.v2_3 => new Shim_v2_3() case hive.v3_0 => new Shim_v3_0() case hive.v3_1 => new Shim_v3_1() + case hive.v4_0 => new Shim_v4_0() } // Create an internal session state for this HiveClientImpl. @@ -176,8 +177,26 @@ private[hive] class HiveClientImpl( // got changed. We reset it to clientLoader.ClassLoader here. state.getConf.setClassLoader(clientLoader.classLoader) shim.setCurrentSessionState(state) - state.out = new PrintStream(outputBuffer, true, UTF_8.name()) - state.err = new PrintStream(outputBuffer, true, UTF_8.name()) + // Hive 4.0 uses org.apache.hadoop.hive.common.io.SessionStream instead of PrintStream + // (see HIVE-21033). For creating a new SessionStream instance reflection must be used + // as this class also was introduced by the same change (HIVE-21033) + // and it is not available in any eariler Hive version + if (Utils.classIsLoadable("org.apache.hadoop.hive.common.io.SessionStream")) { + val sessionStreamCtor = Utils + .classForName[Any]("org.apache.hadoop.hive.common.io.SessionStream") + .getConstructor(classOf[java.io.OutputStream]) + val sessionStateClass = + Utils.classForName[SessionState](classOf[SessionState].getName) + val outField = sessionStateClass.getDeclaredField("out") + outField.set(state, + sessionStreamCtor.newInstance(new PrintStream(outputBuffer, true, UTF_8.name()))) + val errField = sessionStateClass.getDeclaredField("err") + errField.set(state, + sessionStreamCtor.newInstance(new PrintStream(outputBuffer, true, UTF_8.name()))) + } else { + state.out = new PrintStream(outputBuffer, true, UTF_8.name()) + state.err = new PrintStream(outputBuffer, true, UTF_8.name()) + } state } @@ -858,7 +877,7 @@ private[hive] class HiveClientImpl( // Since HIVE-18238(Hive 3.0.0), the Driver.close function's return type changed // and the CommandProcessorFactory.clean function removed. driver.getClass.getMethod("close").invoke(driver) - if (version != hive.v3_0 && version != hive.v3_1) { + if (version != hive.v3_0 && version != hive.v3_1 && version != hive.v4_0) { CommandProcessorFactory.clean(conf) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 01945b3c6f730..72f29b0c8fc60 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -700,6 +700,10 @@ private class Shim_v0_12 extends Shim with Logging { private class Shim_v0_13 extends Shim_v0_12 { + // Hive CHAR and VARCHAR are treated as catalyst strings but they cannot be pushed down + // before HIVE-26661 (which was introduced in Hive version 4.0) + protected val charVarcharPartionKeyPushDownSupported = false + private lazy val setCurrentSessionStateMethod = findStaticMethod( classOf[SessionState], @@ -947,15 +951,15 @@ private class Shim_v0_13 extends Shim_v0_12 { } object SupportedAttribute { - // hive varchar is treated as catalyst string, but hive varchar can't be pushed down. - private val varcharKeys = table.getPartitionKeys.asScala + private lazy val charVarcharKeys = table.getPartitionKeys.asScala .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) || col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME)) .map(col => col.getName).toSet def unapply(attr: Attribute): Option[String] = { val resolver = SQLConf.get.resolver - if (varcharKeys.exists(c => resolver(c, attr.name))) { + if (!charVarcharPartionKeyPushDownSupported && + charVarcharKeys.exists(c => resolver(c, attr.name))) { None } else if (attr.dataType.isInstanceOf[IntegralType] || attr.dataType == StringType || attr.dataType == DateType) { @@ -1781,3 +1785,9 @@ private[client] class Shim_v3_0 extends Shim_v2_3 { } private[client] class Shim_v3_1 extends Shim_v3_0 + +private[client] class Shim_v4_0 extends Shim_v3_1 { + + override protected val charVarcharPartionKeyPushDownSupported = true + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index d6489f0439106..b58d728bf313b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -96,6 +96,7 @@ private[hive] object IsolatedClientLoader extends Logging { case (2, 3, _) => Some(hive.v2_3) case (3, 0, _) => Some(hive.v3_0) case (3, 1, _) => Some(hive.v3_1) + case (4, 0, _) => Some(hive.v4_0) case _ => None }.getOrElse { throw QueryExecutionErrors.unsupportedHiveMetastoreVersionError( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index a66842de7d832..2f8b9e06c7a3e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -89,8 +89,15 @@ package object client { "org.pentaho:pentaho-aggdesigner-algorithm", "org.apache.hive:hive-vector-code-gen")) + // Since HIVE-14496, Hive.java uses calcite-core + case object v4_0 extends HiveVersion("4.0.0", + extraDeps = Seq("org.apache.derby:derby:10.14.2.0"), + exclusions = Seq("org.apache.calcite:calcite-druid", + "org.apache.curator:*", + "org.apache.hive:hive-vector-code-gen")) + val allSupportedHiveVersions: Set[HiveVersion] = - Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) + Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1, v4_0) } // scalastyle:on diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index aceca829df8b3..668d3176a0dd2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -166,7 +166,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) // test alter database location val tempDatabasePath2 = Utils.createTempDir().toURI // Hive support altering database location since HIVE-8472. - if (version == "3.0" || version == "3.1") { + if (version == "3.0" || version == "3.1" || version == "4.0") { client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) val uriInCatalog = client.getDatabase("temporary").locationUri assert("file" === uriInCatalog.getScheme) @@ -653,7 +653,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) test("sql create index and reset") { // HIVE-18448 Since Hive 3.0, INDEX is not supported. - if (version != "3.0" && version != "3.1") { + if (version != "3.0" && version != "3.1" && version != "4.0") { client.runSqlHive("CREATE TABLE indexed_table (key INT)") client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " + "as 'COMPACT' WITH DEFERRED REBUILD") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala index 1dee9e6dcfc83..061986ef93246 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala @@ -24,6 +24,6 @@ private[client] trait HiveClientVersions { protected val versions = if (testVersions.nonEmpty) { testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq } else { - IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1") + IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala index 4cc51064cfdd3..4b15f89b02248 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala @@ -33,12 +33,12 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu // hive.metastore.schema.verification from false to true since 2.0 // For details, see the JIRA HIVE-6113 and HIVE-12463 if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3" || - version == "3.0" || version == "3.1") { + version == "3.0" || version == "3.1" || version == "4.0") { hadoopConf.set("datanucleus.schema.autoCreateAll", "true") hadoopConf.set("hive.metastore.schema.verification", "false") } // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. - if (version == "3.0" || version == "3.1") { + if (version == "3.0" || version == "3.1" || version == "4.0") { hadoopConf.set("hive.in.test", "true") hadoopConf.set("hive.query.reexecution.enabled", "false") }