Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Map => JMap}
import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
Expand All @@ -36,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.VariableSubstitution
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.{Utils => SparkUtils}

private[hive] class SparkExecuteStatementOperation(
val sqlContext: SQLContext,
Expand Down Expand Up @@ -110,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation(
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
logInfo(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
s"with ${statementId}")
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
Expand Down Expand Up @@ -179,7 +182,7 @@ private[hive] class SparkExecuteStatementOperation(
resultOffset += 1
}
previousFetchEndOffset = resultOffset
logInfo(s"Returning result set with ${curRow} rows from offsets " +
log.info(s"Returning result set with ${curRow} rows from offsets " +
s"[$previousFetchStartOffset, $previousFetchEndOffset) with $statementId")
resultRowSet
}
Expand Down Expand Up @@ -216,9 +219,7 @@ private[hive] class SparkExecuteStatementOperation(
execute()
}
} catch {
case e: HiveSQLException =>
setOperationException(e)
logError(s"Error executing query with $statementId,", e)
case e: HiveSQLException => setOperationException(e)
}
}
}
Expand All @@ -238,7 +239,21 @@ private[hive] class SparkExecuteStatementOperation(
val backgroundHandle =
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch onError()
} catch {
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
}
}

Expand Down Expand Up @@ -279,7 +294,30 @@ private[hive] class SparkExecuteStatementOperation(
}
dataTypes = result.schema.fields.map(_.dataType)
} catch {
onError(needCancel = true)
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
// When cancel() or close() is called very quickly after the query is started,
// then they may both call cleanup() before Spark Jobs are started. But before background
// task interrupted, it may have start some spark job, so we need to cancel again to
// make sure job was cancelled when background thread was interrupted
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
val currentState = getStatus().getState()
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException("Error running query: " + e.toString, e)
}
}
} finally {
synchronized {
if (!getStatus.getState.isTerminal) {
Expand Down Expand Up @@ -310,7 +348,9 @@ private[hive] class SparkExecuteStatementOperation(
}
}
// RDDs will be cleaned automatically upon garbage collection.
sqlContext.sparkContext.cancelJobGroup(statementId)
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.concurrent.RejectedExecutionException

import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.Operation

Expand Down Expand Up @@ -96,32 +94,15 @@ private[hive] trait SparkOperation extends Operation with Logging {
throw new IllegalArgumentException(s"Unknown table type is found: $t")
}

protected def onError(needCancel: Boolean = false): PartialFunction[Throwable, Unit] = {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
protected def onError(): PartialFunction[Throwable, Unit] = {
case e: Throwable =>
// When cancel() or close() is called very quickly after the query is started,
// then they may both call cleanup() before Spark Jobs are started. But before background
// task interrupted, it may have start some spark job, so we need to cancel again to
// make sure job was cancelled when background thread was interrupted
if (needCancel) sqlContext.sparkContext.cancelJobGroup(statementId)
val currentState = getStatus.getState
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
super.setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case rejected: RejectedExecutionException =>
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case _ =>
val tips = if (shouldRunAsync()) " in background" else ""
throw new HiveSQLException(s"Error operating $getType$tips: ${e.getMessage}", e)
}
logError(s"Error operating $getType with $statementId", e)
super.setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import java.sql.SQLException

import org.apache.hive.service.cli.HiveSQLException

import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.types._

trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("the scratch dir will be deleted during server start but recreated with new operation") {
Expand Down Expand Up @@ -55,51 +52,31 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("Full stack traces as error message for jdbc or thrift client") {
val sql = "select date_sub(date'2011-11-11', '1.2')"
val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")

withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")) {
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val e = intercept[HiveSQLException] {
client.executeStatement(sessionHandle, sql, confOverlay)
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(!e.getMessage
.contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
val e = intercept[HiveSQLException] {
client.executeStatement(
sessionHandle,
sql,
confOverlay)
}
}

withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")) {
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val opHandle = client.executeStatementAsync(sessionHandle, sql, confOverlay)
var status = client.getOperationStatus(opHandle)
while (!status.getState.isTerminal) {
Thread.sleep(10)
status = client.getOperationStatus(opHandle)
}
val e = status.getOperationException

assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage
.contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(!e.getMessage.contains("" +
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}

Seq("true", "false").foreach { value =>
withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, value)) {
withJdbcStatement { statement =>
val e = intercept[SQLException] {
statement.executeQuery(sql)
}
assert(e.getMessage.contains(
"The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage.contains(
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
withJdbcStatement { statement =>
val e = intercept[SQLException] {
statement.executeQuery(sql)
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage.contains("" +
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}
}
Expand Down