Skip to content
Closed
3 changes: 2 additions & 1 deletion dev/deps/spark-deps-hadoop-3.2
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ curator-client-2.13.0.jar
curator-framework-2.13.0.jar
curator-recipes-2.13.0.jar
datanucleus-api-jdo-3.2.6.jar
datanucleus-core-3.2.10.jar
datanucleus-core-4.1.17.jar
datanucleus-rdbms-3.2.9.jar
derby-10.12.1.1.jar
dnsjava-2.1.7.jar
Expand All @@ -76,6 +76,7 @@ hadoop-yarn-common-3.2.0.jar
hadoop-yarn-registry-3.2.0.jar
hadoop-yarn-server-common-3.2.0.jar
hadoop-yarn-server-web-proxy-3.2.0.jar
hive-storage-api-2.6.0.jar
hk2-api-2.4.0-b34.jar
hk2-locator-2.4.0-b34.jar
hk2-utils-2.4.0-b34.jar
Expand Down
99 changes: 99 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,18 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<!-- Begin of Hive 2.3.4 exclusion -->
<!-- jetty-all conflict with jetty 9.4.12.v20180830 -->
<exclusion>
Copy link
Member Author

@wangyum wangyum Mar 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exclude jetty-all, it conflict with jetty 9.4.12.v20180830:

build/sbt clean package -Phadoop-3.2 -Phive
...
[error] /home/yumwang/opensource/spark/core/src/main/scala/org/apache/spark/SSLOptions.scala:78: value setTrustStorePath is not a member of org.eclipse.jetty.util.ssl.SslContextFactory
[error]         trustStore.foreach(file => sslContextFactory.setTrustStorePath(file.getAbsolutePath))
[error]

<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
</exclusion>
<!-- org.apache.logging.log4j:* conflict with log4j 1.2.17 -->
<exclusion>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.logging.log4j:* conflict with log4j-1.2.17.jar:

build/sbt clean package -Phadoop-3.2 -Phive
...
[error] /home/yumwang/opensource/spark/core/src/main/scala/org/apache/spark/internal/Logging.scala:236: value getLevel is not a member of org.apache.log4j.spi.LoggingEvent
[error]     if (!loggingEvent.getLevel().eq(rootLevel)) {
[error]                       ^
[error] /home/yumwang/opensource/spark/core/src/main/scala/org/apache/spark/internal/Logging.scala:239: value getLogger is not a member of org.apache.log4j.spi.LoggingEvent
[error]     var logger = loggingEvent.getLogger()

<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<!-- End of Hive 2.3.4 exclusion -->
</exclusions>
</dependency>

Expand Down Expand Up @@ -1532,6 +1544,27 @@
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
<!-- Begin of Hive 2.3.4 exclusion -->
<!-- Do not need Tez -->
<exclusion>
<groupId>${hive.group}</groupId>
<artifactId>hive-llap-tez</artifactId>
</exclusion>
<!-- Do not need Calcite, see SPARK-27054 -->
<exclusion>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exclude calcite-druid and avatica. more details: https://issues.apache.org/jira/browse/SPARK-27054

<groupId>org.apache.calcite</groupId>
<artifactId>calcite-druid</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<!-- org.apache.logging.log4j:* conflict with log4j 1.2.17 -->
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<!-- End of Hive 2.3.4 exclusion -->
</exclusions>
</dependency>
<dependency>
Expand Down Expand Up @@ -1697,6 +1730,22 @@
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
<!-- Begin of Hive 2.3.4 exclusion -->
<!-- parquet-hadoop-bundle:1.8.1 conflict with 1.10.1 -->
<exclusion>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exclude parquet-hadoop-bundle, otherwise:

build/sbt clean package -Phadoop-3.2 -Phive
...
[error] /home/yumwang/opensource/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:36: value JobSummaryLevel is not a member of object org.apache.parquet.hadoop.ParquetOutputFormat
[error] import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These several exclusions would apply to both Hive 2 and Hive 1 in the build as it is now. That's probably OK; maybe they don't even exist in Hive 1. But some like this one I'm not as sure about?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. org.apache.parquet:parquet-hadoop-bundle don't exist in Hive 1. It shoud be com.twitter:parquet-hadoop-bundle in Hive 1: https://github.com/apache/hive/blob/release-1.2.1/pom.xml#L256-L260

<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</exclusion>
<!-- Do not need Jasper, see HIVE-19799 -->
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<!-- End of Hive 2.3.4 exclusion -->
</exclusions>
</dependency>

Expand Down Expand Up @@ -1762,8 +1811,42 @@
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
<!-- Begin of Hive 2.3.4 exclusion -->
<!-- Exclude log4j-slf4j-impl, otherwise throw NCDFE when starting spark-shell -->
<exclusion>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exclude log4j-slf4j-impl, otherwise:

$ build/sbt clean package -Phadoop-3.2 -Phive
$ export SPARK_PREPEND_CLASSES=true
$ bin/spark-shell
NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/logging/log4j/spi/AbstractLoggerAdapter
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at org.slf4j.impl.StaticLoggerBinder.<clinit>(StaticLoggerBinder.java:36)
	at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:217)
	at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:122)
	at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111)
	at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105)
	at org.apache.spark.deploy.SparkSubmit.initializeLogIfNecessary(SparkSubmit.scala:73)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:81)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:939)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:948)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.logging.log4j.spi.AbstractLoggerAdapter
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 22 more

<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<!-- End of Hive 2.3.4 exclusion -->
</exclusions>
</dependency>

<!-- Hive 2.3 need hive-llap-client, We add it here, otherwise the scope won't work -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-client</artifactId>
<version>2.3.4</version>
<scope>${hive.deps.scope}</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
Expand Down Expand Up @@ -2656,7 +2739,23 @@
<hadoop.version>3.2.0</hadoop.version>
<curator.version>2.13.0</curator.version>
<zookeeper.version>3.4.13</zookeeper.version>
<hive.group>org.apache.hive</hive.group>
<hive.classifier>core</hive.classifier>
<hive.version>2.3.4</hive.version>
<hive.version.short>${hive.version}</hive.version.short>
<hive.parquet.version>${parquet.version}</hive.parquet.version>
<orc.classifier></orc.classifier>
<hive.parquet.group>org.apache.parquet</hive.parquet.group>
<datanucleus-core.version>4.1.17</datanucleus-core.version>
</properties>
<dependencies>
<!-- Both Hive and ORC need hive-storage-api, but it is excluded by orc-mapreduce -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
<version>2.6.0</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches what 2.3.4 needs, and should it be provided or use hive.deps.scope?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Both Hive and ORC need hive-storage-api:

  1. Remove hive-storage-api and save as table:
scala> spark.range(10).write.saveAsTable("test2")
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
  at org.apache.hive.common.util.ReflectionUtil.newInstance(ReflectionUtil.java:85)
  at org.apache.hadoop.hive.ql.exec.Registry.registerGenericUDF(Registry.java:177)
  at org.apache.hadoop.hive.ql.exec.Registry.registerGenericUDF(Registry.java:170)
  at org.apache.hadoop.hive.ql.exec.FunctionRegistry.<clinit>(FunctionRegistry.java:209)
  at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:247)
  at org.apache.hadoop.hive.ql.metadata.Hive.registerAllFunctionsOnce(Hive.java:231)
  at org.apache.hadoop.hive.ql.metadata.Hive.<init>(Hive.java:388)
  at org.apache.hadoop.hive.ql.metadata.Hive.create(Hive.java:332)
  at org.apache.hadoop.hive.ql.metadata.Hive.getInternal(Hive.java:312)
  at org.apache.hadoop.hive.ql.metadata.Hive.get(Hive.java:288)
  at org.apache.spark.sql.hive.client.HiveClientImpl.client(HiveClientImpl.scala:258)
  at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:280)
  at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:225)
  at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:224)
  at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:270)
  at org.apache.spark.sql.hive.client.HiveClientImpl.databaseExists(HiveClientImpl.scala:361)
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$databaseExists$1(HiveExternalCatalog.scala:217)
  at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:99)
  at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:217)
  at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:139)
  at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:129)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessionStateBuilder.scala:40)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.$anonfun$catalog$1(HiveSessionStateBuilder.scala:55)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:90)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:90)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:420)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:446)
  at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:441)
  ... 47 elided
Caused by: java.lang.reflect.InvocationTargetException: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/serde2/io/HiveDecimalWritable
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at org.apache.hive.common.util.ReflectionUtil.newInstance(ReflectionUtil.java:83)
  ... 75 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/serde2/io/HiveDecimalWritable
  at org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloorCeilBase.<init>(GenericUDFFloorCeilBase.java:48)
  at org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor.<init>(GenericUDFFloor.java:41)
  ... 80 more
  1. Remove hive-storage-api and write to ORC:
scala> spark.range(10).write.orc("test3")
19/04/01 21:47:40 WARN DAGScheduler: Broadcasting large task binary with size 172.4 KiB
[Stage 0:>                                                          (0 + 4) / 4]19/04/01 21:47:41 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/exec/vector/ColumnVector
	at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.createOrcValue(OrcSerializer.scala:226)
	at org.apache.spark.sql.execution.datasources.orc.OrcSerializer.<init>(OrcSerializer.scala:36)
	at org.apache.spark.sql.execution.datasources.orc.OrcOutputWriter.<init>(OrcOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat$$anon$1.newInstance(OrcFileFormat.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:124)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:109)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:236)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$14(FileFormatWriter.scala:177)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:428)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1321)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:431)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.exec.vector.ColumnVector
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 16 more

</dependency>
</dependencies>
</profile>

<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.math.BigDecimal;

import org.apache.orc.storage.ql.exec.vector.*;
import org.apache.hadoop.hive.ql.exec.vector.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes .. we shouldn't do this..


import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.sql.execution.datasources.orc

import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.orc.storage.ql.io.sarg.SearchArgument.Builder
import org.apache.orc.storage.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.orc.storage.serde2.io.HiveDecimalWritable
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory.newBuilder
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here.


import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.datasources.orc

import java.sql.Date

import org.apache.orc.storage.common.`type`.HiveDecimal
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch
import org.apache.orc.storage.ql.io.sarg.{SearchArgument => OrcSearchArgument}
import org.apache.orc.storage.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
import org.apache.orc.storage.serde2.io.{DateWritable, HiveDecimalWritable}
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
import org.apache.hadoop.hive.ql.io.sarg.{SearchArgument => OrcSearchArgument}
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.{Operator => OrcOperator}
import org.apache.hadoop.hive.serde2.io.{DateWritable, HiveDecimalWritable}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here.


import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.types.Decimal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters._

import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}

import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
import org.apache.spark.sql.catalyst.dsl.expressions._
Expand Down
21 changes: 21 additions & 0 deletions sql/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,27 @@
</plugins>
</build>
</profile>
<profile>
<id>hadoop-3.2</id>
<dependencies>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-common</artifactId>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-serde</artifactId>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-shims</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-client</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>

<build>
Expand Down
73 changes: 49 additions & 24 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.{InputStream, OutputStream}
import java.lang.reflect.Method
import java.rmi.server.UID

import scala.collection.JavaConverters._
Expand All @@ -28,15 +29,13 @@ import com.google.common.base.Objects
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
import org.apache.hadoop.io.Writable
import org.apache.hive.com.esotericsoftware.kryo.Kryo
import org.apache.hive.com.esotericsoftware.kryo.io.{Input, Output}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.types.Decimal
Expand Down Expand Up @@ -146,34 +145,60 @@ private[hive] object HiveShim {
case _ => false
}

@transient
def deserializeObjectByKryo[T: ClassTag](
kryo: Kryo,
in: InputStream,
clazz: Class[_]): T = {
val inp = new Input(in)
val t: T = kryo.readObject(inp, clazz).asInstanceOf[T]
inp.close()
t
}
private lazy val serUtilClass =
Utils.classForName("org.apache.hadoop.hive.ql.exec.SerializationUtilities")
private lazy val utilClass = Utils.classForName("org.apache.hadoop.hive.ql.exec.Utilities")
private val deserializeMethodName = "deserializeObjectByKryo"
private val serializeMethodName = "serializeObjectByKryo"

@transient
def serializeObjectByKryo(
kryo: Kryo,
plan: Object,
out: OutputStream) {
val output: Output = new Output(out)
kryo.writeObject(output, plan)
output.close()
private def findMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
val method = klass.getDeclaredMethod(name, args: _*)
method.setAccessible(true)
method
}

def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
.asInstanceOf[UDFType]
if (HiveUtils.isHive23) {
val borrowKryo = serUtilClass.getMethod("borrowKryo")
val kryo = borrowKryo.invoke(serUtilClass)
val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName,
kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]])
try {
deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
} finally {
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
}
} else {
val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
val threadLocalValue = runtimeSerializationKryo.get(utilClass)
val getMethod = threadLocalValue.getClass.getMethod("get")
val kryo = getMethod.invoke(threadLocalValue)
val deserializeObjectByKryo = findMethod(utilClass, deserializeMethodName,
kryo.getClass, classOf[InputStream], classOf[Class[_]])
deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
}
}

def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
if (HiveUtils.isHive23) {
val borrowKryo = serUtilClass.getMethod("borrowKryo")
val kryo = borrowKryo.invoke(serUtilClass)
val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName,
kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream])
try {
serializeObjectByKryo.invoke(null, kryo, function, out)
} finally {
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
}
} else {
val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
val threadLocalValue = runtimeSerializationKryo.get(utilClass)
val getMethod = threadLocalValue.getClass.getMethod("get")
val kryo = getMethod.invoke(threadLocalValue)
val serializeObjectByKryo = findMethod(utilClass, serializeMethodName,
kryo.getClass, classOf[Object], classOf[OutputStream])
serializeObjectByKryo.invoke(null, kryo, function, out)
}
}

def writeExternal(out: java.io.ObjectOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.util.VersionInfo
import org.apache.hive.common.util.HiveVersionInfo

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
Expand All @@ -54,8 +55,11 @@ private[spark] object HiveUtils extends Logging {
sc
}

private val hiveVersion = HiveVersionInfo.getVersion
val isHive23: Boolean = hiveVersion.startsWith("2.3")

/** The version of hive used internally by Spark SQL. */
val builtinHiveVersion: String = "1.2.1"
val builtinHiveVersion: String = if (isHive23) hiveVersion else "1.2.1"

val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
Expand Down
Loading