diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index f2d53a77d6816..9e40224e837d5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,11 +19,9 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.util.{Arrays, Map => JMap} -import java.util.concurrent.RejectedExecutionException import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.util.control.NonFatal import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.shims.Utils @@ -37,7 +35,6 @@ import org.apache.spark.sql.execution.HiveResult.{getTimeFormatters, toHiveStrin import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval -import org.apache.spark.util.{Utils => SparkUtils} private[hive] class SparkExecuteStatementOperation( val sqlContext: SQLContext, @@ -112,7 +109,7 @@ private[hive] class SparkExecuteStatementOperation( } def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties { - log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " + + logInfo(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " + s"with ${statementId}") validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) @@ -181,7 +178,7 @@ private[hive] class SparkExecuteStatementOperation( resultOffset += 1 } previousFetchEndOffset = resultOffset - log.info(s"Returning result set with ${curRow} rows from offsets " + + logInfo(s"Returning result set with ${curRow} rows from offsets " + s"[$previousFetchStartOffset, $previousFetchEndOffset) with $statementId") resultRowSet } @@ -218,7 +215,9 @@ private[hive] class SparkExecuteStatementOperation( execute() } } catch { - case e: HiveSQLException => setOperationException(e) + case e: HiveSQLException => + setOperationException(e) + logError(s"Error executing query with $statementId,", e) } } } @@ -238,21 +237,7 @@ private[hive] class SparkExecuteStatementOperation( val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) - } catch { - case rejected: RejectedExecutionException => - logError("Error submitting query in background, query rejected", rejected) - setState(OperationState.ERROR) - HiveThriftServer2.eventManager.onStatementError( - statementId, rejected.getMessage, SparkUtils.exceptionString(rejected)) - throw new HiveSQLException("The background threadpool cannot accept" + - " new task for execution, please retry the operation", rejected) - case NonFatal(e) => - logError(s"Error executing query in background", e) - setState(OperationState.ERROR) - HiveThriftServer2.eventManager.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - throw new HiveSQLException(e) - } + } catch onError() } } @@ -292,30 +277,7 @@ private[hive] class SparkExecuteStatementOperation( } dataTypes = result.schema.fields.map(_.dataType) } catch { - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. - case e: Throwable => - // When cancel() or close() is called very quickly after the query is started, - // then they may both call cleanup() before Spark Jobs are started. But before background - // task interrupted, it may have start some spark job, so we need to cancel again to - // make sure job was cancelled when background thread was interrupted - if (statementId != null) { - sqlContext.sparkContext.cancelJobGroup(statementId) - } - val currentState = getStatus().getState() - if (currentState.isTerminal) { - // This may happen if the execution was cancelled, and then closed from another thread. - logWarning(s"Ignore exception in terminal state with $statementId: $e") - } else { - logError(s"Error executing query with $statementId, currentState $currentState, ", e) - setState(OperationState.ERROR) - HiveThriftServer2.eventManager.onStatementError( - statementId, e.getMessage, SparkUtils.exceptionString(e)) - e match { - case _: HiveSQLException => throw e - case _ => throw new HiveSQLException("Error running query: " + e.toString, e) - } - } + onError(needCancel = true) } finally { synchronized { if (!getStatus.getState.isTerminal) { @@ -346,9 +308,7 @@ private[hive] class SparkExecuteStatementOperation( } } // RDDs will be cleaned automatically upon garbage collection. - if (statementId != null) { - sqlContext.sparkContext.cancelJobGroup(statementId) - } + sqlContext.sparkContext.cancelJobGroup(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala index bbfc1b83379aa..8e8b2d7ff774d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.thriftserver +import java.util.concurrent.RejectedExecutionException + import org.apache.hive.service.cli.{HiveSQLException, OperationState} import org.apache.hive.service.cli.operation.Operation @@ -94,15 +96,32 @@ private[hive] trait SparkOperation extends Operation with Logging { throw new IllegalArgumentException(s"Unknown table type is found: $t") } - protected def onError(): PartialFunction[Throwable, Unit] = { + protected def onError(needCancel: Boolean = false): PartialFunction[Throwable, Unit] = { + // Actually do need to catch Throwable as some failures don't inherit from Exception and + // HiveServer will silently swallow them. case e: Throwable => - logError(s"Error operating $getType with $statementId", e) - super.setState(OperationState.ERROR) - HiveThriftServer2.eventManager.onStatementError( - statementId, e.getMessage, Utils.exceptionString(e)) - e match { - case _: HiveSQLException => throw e - case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e) + // When cancel() or close() is called very quickly after the query is started, + // then they may both call cleanup() before Spark Jobs are started. But before background + // task interrupted, it may have start some spark job, so we need to cancel again to + // make sure job was cancelled when background thread was interrupted + if (needCancel) sqlContext.sparkContext.cancelJobGroup(statementId) + val currentState = getStatus.getState + if (currentState.isTerminal) { + // This may happen if the execution was cancelled, and then closed from another thread. + logWarning(s"Ignore exception in terminal state with $statementId: $e") + } else { + super.setState(OperationState.ERROR) + HiveThriftServer2.eventManager.onStatementError( + statementId, e.getMessage, Utils.exceptionString(e)) + e match { + case _: HiveSQLException => throw e + case rejected: RejectedExecutionException => + throw new HiveSQLException("The background threadpool cannot accept" + + " new task for execution, please retry the operation", rejected) + case _ => + val tips = if (shouldRunAsync()) " in background" else "" + throw new HiveSQLException(s"Error operating $getType$tips: ${e.getMessage}", e) + } } } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index fd3a638c4fa44..2bb9169693a15 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -21,6 +21,8 @@ import java.sql.SQLException import org.apache.hive.service.cli.HiveSQLException +import org.apache.spark.sql.hive.HiveUtils + trait ThriftServerWithSparkContextSuite extends SharedThriftServer { test("the scratch dir will be deleted during server start but recreated with new operation") { @@ -52,31 +54,51 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { test("Full stack traces as error message for jdbc or thrift client") { val sql = "select date_sub(date'2011-11-11', '1.2')" - withCLIServiceClient { client => - val sessionHandle = client.openSession(user, "") + val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String] - val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String] - val e = intercept[HiveSQLException] { - client.executeStatement( - sessionHandle, - sql, - confOverlay) + withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")) { + withCLIServiceClient { client => + val sessionHandle = client.openSession(user, "") + val e = intercept[HiveSQLException] { + client.executeStatement(sessionHandle, sql, confOverlay) + } + assert(e.getMessage + .contains("The second argument of 'date_sub' function needs to be an integer.")) + assert(!e.getMessage + .contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) } + } + + withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")) { + withCLIServiceClient { client => + val sessionHandle = client.openSession(user, "") + val opHandle = client.executeStatementAsync(sessionHandle, sql, confOverlay) + var status = client.getOperationStatus(opHandle) + while (!status.getState.isTerminal) { + Thread.sleep(10) + status = client.getOperationStatus(opHandle) + } + val e = status.getOperationException - assert(e.getMessage - .contains("The second argument of 'date_sub' function needs to be an integer.")) - assert(!e.getMessage.contains("" + - "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) + assert(e.getMessage + .contains("The second argument of 'date_sub' function needs to be an integer.")) + assert(e.getMessage + .contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) + } } - withJdbcStatement { statement => - val e = intercept[SQLException] { - statement.executeQuery(sql) + Seq("true", "false").foreach { value => + withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, value)) { + withJdbcStatement { statement => + val e = intercept[SQLException] { + statement.executeQuery(sql) + } + assert(e.getMessage.contains( + "The second argument of 'date_sub' function needs to be an integer.")) + assert(e.getMessage.contains( + "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) + } } - assert(e.getMessage - .contains("The second argument of 'date_sub' function needs to be an integer.")) - assert(e.getMessage.contains("" + - "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) } } }