Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,6 @@
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
<id>hive-0.12.0</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
Expand Down
13 changes: 10 additions & 3 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,24 @@ CURRENT_BLOCK=$BLOCK_BUILD
# We always build with Hive because the PySpark Spark SQL tests need it.
BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0"

echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS"

# NOTE: echo "q" is needed because sbt on encountering a build file with failure
#+ (either resolution or compilation) prompts the user for input either q, r, etc
#+ to quit or retry. This echo is there to make it not block.
# NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
# NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
#+ single argument!
# QUESTION: Why doesn't 'yes "q"' work?
# QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
# First build with 0.12 to ensure patches do not break the hive 12 build
echo "[info] Compile with hive 0.12"
echo -e "q\n" \
| sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly \
| sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thriftserver/compile \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually don't think this is compiling against Hive 0.12 right now... is it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's against 0.12 because BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0", right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be, BUILD_MVN_PROFILE_ARGS is defined above with -Phive-0.12.0:

  BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0"

| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"

# Then build with default version(0.13.1) because tests are based on this version
echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS -Phive"
echo -e "q\n" \
| sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive package assembly/assembly \
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
}

Expand Down
29 changes: 23 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
<flume.version>1.4.0</flume.version>
<zookeeper.version>3.4.5</zookeeper.version>
<!-- Version used in Maven Hive dependency -->
<hive.version>0.13.1</hive.version>
<hive.version>0.13.1a</hive.version>
<!-- Version used for internal directory structure -->
<hive.version.short>0.13.1</hive.version.short>
<derby.version>10.10.1.1</derby.version>
Expand Down Expand Up @@ -239,6 +239,18 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<!-- This is temporarily included to fix issues with Hive 0.13 -->
<id>spark-staging-hive13</id>
<name>Spring Staging Repository Hive 13</name>
<url>https://oss.sonatype.org/content/repositories/orgspark-project-1089/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
Expand Down Expand Up @@ -907,9 +919,9 @@
by Spark SQL for code generation. -->
<compilerPlugins>
<compilerPlugin>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
Expand Down Expand Up @@ -1313,14 +1325,19 @@
</dependencies>
</profile>
<profile>
<id>hive-0.12.0</id>
<id>hive</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
<modules>
<module>sql/hive-thriftserver</module>
</modules>
</profile>
<profile>
<id>hive-0.12.0</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<hive.version>0.12.0-protobuf-2.5</hive.version>
<hive.version.short>0.12.0</hive.version.short>
Expand Down
27 changes: 0 additions & 27 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1379,33 +1379,6 @@ def hql(self, hqlQuery):

class LocalHiveContext(HiveContext):

"""Starts up an instance of hive where metadata is stored locally.

An in-process metadata data is created with data stored in ./metadata.
Warehouse data is stored in in ./warehouse.

>>> import os
>>> hiveCtx = LocalHiveContext(sc)
>>> try:
... supress = hiveCtx.sql("DROP TABLE src")
... except Exception:
... pass
>>> kv1 = os.path.join(os.environ["SPARK_HOME"],
... 'examples/src/main/resources/kv1.txt')
>>> supress = hiveCtx.sql(
... "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
>>> supress = hiveCtx.sql("LOAD DATA LOCAL INPATH '%s' INTO TABLE src"
... % kv1)
>>> results = hiveCtx.sql("FROM src SELECT value"
... ).map(lambda r: int(r.value.split('_')[1]))
>>> num = results.count()
>>> reduce_sum = results.reduce(lambda x, y: x + y)
>>> num
500
>>> reduce_sum
130091
"""

def __init__(self, sparkContext, sqlContext=None):
HiveContext.__init__(self, sparkContext, sqlContext)
warnings.warn("LocalHiveContext is deprecated. "
Expand Down
18 changes: 18 additions & 0 deletions sql/hive-thriftserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,24 @@
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-default-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>v${hive.version.short}/src/main/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.spark.Logging
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}

private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
extends Driver with Logging {
private[hive] abstract class AbstractSparkSQLDriver(
val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging {

private var tableSchema: Schema = _
private var hiveResponse: Seq[String] = _
private[hive] var tableSchema: Schema = _
private[hive] var hiveResponse: Seq[String] = _

override def init(): Unit = {
}
Expand Down Expand Up @@ -74,16 +74,6 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo

override def getSchema: Schema = tableSchema

override def getResults(res: JArrayList[String]): Boolean = {
if (hiveResponse == null) {
false
} else {
res.addAll(hiveResponse)
hiveResponse = null
true
}
}

override def destroy() {
super.destroy()
hiveResponse = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.thrift.transport.TSocket

import org.apache.spark.Logging
import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.hive.thriftserver.HiveThriftServerShim

private[hive] object SparkSQLCLIDriver {
private var prompt = "spark-sql"
Expand Down Expand Up @@ -116,7 +118,7 @@ private[hive] object SparkSQLCLIDriver {
}
}

if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) {
if (!sessionState.isRemoteMode) {
// Hadoop-20 and above - we need to augment classpath using hiveconf
// components.
// See also: code in ExecDriver.java
Expand Down Expand Up @@ -258,7 +260,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
} else {
var ret = 0
val hconf = conf.asInstanceOf[HiveConf]
val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf)
val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf)

if (proc != null) {
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.{List => JList}
import javax.security.auth.login.LoginException

import org.apache.commons.logging.Log
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hive.service.Service.STATE
Expand All @@ -44,15 +45,17 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
var sparkServiceUGI: UserGroupInformation = null

try {
HiveAuthFactory.loginFromKeytab(hiveConf)
val serverUserName = ShimLoader.getHadoopShims
.getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf))
setSuperField(this, "serverUserName", serverUserName)
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
if (ShimLoader.getHadoopShims().isSecurityEnabled()) {
try {
HiveAuthFactory.loginFromKeytab(hiveConf)
sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
HiveThriftServerShim.setServerUserName(sparkServiceUGI, this)
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
}
}

initCompositeService(hiveConf)
Expand Down
Loading