Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,35 @@

package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
import java.util.concurrent.Future
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map => SMap}
import scala.math._

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}

/**
* A compatibility layer for interacting with Hive version 0.13.1.
*/
private[thriftserver] object HiveThriftServerShim {
val version = "0.13.1"

def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
def setServerUserName(
sparkServiceUGI: UserGroupInformation,
sparkCliService:SparkSQLCLIService) = {
setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)
}
}
Expand All @@ -72,39 +69,14 @@ private[hive] class SparkExecuteStatementOperation(
confOverlay: JMap[String, String],
runInBackground: Boolean = true)(
hiveContext: HiveContext,
sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
parentSession, statement, confOverlay, runInBackground) with Logging {
sessionToActivePool: SMap[HiveSession, String])
// NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging {

private var result: SchemaRDD = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _

private def runInternal(cmd: String) = {
try {
result = hiveContext.sql(cmd)
logDebug(result.queryExecution.toString())
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
iter = {
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
result.toLocalIterator
} else {
result.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
setState(OperationState.ERROR)
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
}

def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING")
Expand Down Expand Up @@ -182,76 +154,43 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private def getConfigForOperation: HiveConf = {
var sqlOperationConf: HiveConf = getParentSession.getHiveConf
if (!getConfOverlay.isEmpty || shouldRunAsync) {
sqlOperationConf = new HiveConf(sqlOperationConf)
import scala.collection.JavaConversions._
for (confEntry <- getConfOverlay.entrySet) {
try {
sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
}
catch { case e: IllegalArgumentException =>
throw new HiveSQLException("Error applying statement specific settings", e)
}
}
}
sqlOperationConf
}

def run(): Unit = {
logInfo(s"Running query '$statement'")
val opConfig: HiveConf = getConfigForOperation
setState(OperationState.RUNNING)
setHasResultSet(true)

if (!shouldRunAsync) {
runInternal(statement)
setState(OperationState.FINISHED)
} else {
val parentSessionState = SessionState.get
val sessionHive: Hive = Hive.get
val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)

val backgroundOperation: Runnable = new Runnable {
def run() {
val doAsAction: PrivilegedExceptionAction[AnyRef] =
new PrivilegedExceptionAction[AnyRef] {
def run: AnyRef = {
Hive.set(sessionHive)
SessionState.setCurrentSessionState(parentSessionState)
try {
runInternal(statement)
}
catch { case e: HiveSQLException =>
setOperationException(e)
logError("Error running hive query: ", e)
}
null
}
}
try {
ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
}
catch { case e: Exception =>
setOperationException(new HiveSQLException(e))
logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
}
setState(OperationState.FINISHED)
}
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
sessionToActivePool(parentSession) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}

try {
val backgroundHandle: Future[_] = getParentSession.getSessionManager.
submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
sessionToActivePool.get(parentSession).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
result.toLocalIterator
} else {
result.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
setState(OperationState.ERROR)
logError("Error executing query:", e)
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
}
}