From c53b3b6dafee441d5e16dea89aedc96b64961a4e Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 25 Aug 2020 00:20:55 +0800 Subject: [PATCH] Revert "[SPARK-32412][SQL] Unify error handling for spark thrift server operations" This reverts commit 510a1656e650246a708d3866c8a400b7a1b9f962. --- .../SparkExecuteStatementOperation.scala | 56 ++++++++++++++--- .../hive/thriftserver/SparkOperation.scala | 35 +++-------- .../ThriftServerWithSparkContextSuite.scala | 61 ++++++------------- 3 files changed, 75 insertions(+), 77 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 922af72604027..d30951f89cf6b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,9 +19,11 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.util.{Arrays, Map => JMap} +import java.util.concurrent.RejectedExecutionException import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.shims.Utils @@ -36,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.VariableSubstitution import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval +import org.apache.spark.util.{Utils => SparkUtils} private[hive] class SparkExecuteStatementOperation( val sqlContext: SQLContext, @@ -110,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation( } def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties { - logInfo(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " + + log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " + s"with ${statementId}") validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) @@ -179,7 +182,7 @@ private[hive] class SparkExecuteStatementOperation( resultOffset += 1 } previousFetchEndOffset = resultOffset - logInfo(s"Returning result set with ${curRow} rows from offsets " + + log.info(s"Returning result set with ${curRow} rows from offsets " + s"[$previousFetchStartOffset, $previousFetchEndOffset) with $statementId") resultRowSet } @@ -216,9 +219,7 @@ private[hive] class SparkExecuteStatementOperation( execute() } } catch { - case e: HiveSQLException => - setOperationException(e) - logError(s"Error executing query with $statementId,", e) + case e: HiveSQLException => setOperationException(e) } } } @@ -238,7 +239,21 @@ private[hive] class SparkExecuteStatementOperation( val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) - } catch onError() + } catch { + case rejected: RejectedExecutionException => + logError("Error submitting query in background, query rejected", rejected) + setState(OperationState.ERROR) + HiveThriftServer2.eventManager.onStatementError( + statementId, rejected.getMessage, SparkUtils.exceptionString(rejected)) + throw new HiveSQLException("The background threadpool cannot accept" + + " new task for execution, please retry the operation", rejected) + case NonFatal(e) => + logError(s"Error executing query in background", e) + setState(OperationState.ERROR) + HiveThriftServer2.eventManager.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw new HiveSQLException(e) + } } } @@ -279,7 +294,30 @@ private[hive] class SparkExecuteStatementOperation( } dataTypes = result.schema.fields.map(_.dataType) } catch { - onError(needCancel = true) + // Actually do need to catch Throwable as some failures don't inherit from Exception and + // HiveServer will silently swallow them. + case e: Throwable => + // When cancel() or close() is called very quickly after the query is started, + // then they may both call cleanup() before Spark Jobs are started. But before background + // task interrupted, it may have start some spark job, so we need to cancel again to + // make sure job was cancelled when background thread was interrupted + if (statementId != null) { + sqlContext.sparkContext.cancelJobGroup(statementId) + } + val currentState = getStatus().getState() + if (currentState.isTerminal) { + // This may happen if the execution was cancelled, and then closed from another thread. + logWarning(s"Ignore exception in terminal state with $statementId: $e") + } else { + logError(s"Error executing query with $statementId, currentState $currentState, ", e) + setState(OperationState.ERROR) + HiveThriftServer2.eventManager.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) + e match { + case _: HiveSQLException => throw e + case _ => throw new HiveSQLException("Error running query: " + e.toString, e) + } + } } finally { synchronized { if (!getStatus.getState.isTerminal) { @@ -310,7 +348,9 @@ private[hive] class SparkExecuteStatementOperation( } } // RDDs will be cleaned automatically upon garbage collection. - sqlContext.sparkContext.cancelJobGroup(statementId) + if (statementId != null) { + sqlContext.sparkContext.cancelJobGroup(statementId) + } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala index 8e8b2d7ff774d..bbfc1b83379aa 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.hive.thriftserver -import java.util.concurrent.RejectedExecutionException - import org.apache.hive.service.cli.{HiveSQLException, OperationState} import org.apache.hive.service.cli.operation.Operation @@ -96,32 +94,15 @@ private[hive] trait SparkOperation extends Operation with Logging { throw new IllegalArgumentException(s"Unknown table type is found: $t") } - protected def onError(needCancel: Boolean = false): PartialFunction[Throwable, Unit] = { - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. + protected def onError(): PartialFunction[Throwable, Unit] = { case e: Throwable => - // When cancel() or close() is called very quickly after the query is started, - // then they may both call cleanup() before Spark Jobs are started. But before background - // task interrupted, it may have start some spark job, so we need to cancel again to - // make sure job was cancelled when background thread was interrupted - if (needCancel) sqlContext.sparkContext.cancelJobGroup(statementId) - val currentState = getStatus.getState - if (currentState.isTerminal) { - // This may happen if the execution was cancelled, and then closed from another thread. - logWarning(s"Ignore exception in terminal state with $statementId: $e") - } else { - super.setState(OperationState.ERROR) - HiveThriftServer2.eventManager.onStatementError( - statementId, e.getMessage, Utils.exceptionString(e)) - e match { - case _: HiveSQLException => throw e - case rejected: RejectedExecutionException => - throw new HiveSQLException("The background threadpool cannot accept" + - " new task for execution, please retry the operation", rejected) - case _ => - val tips = if (shouldRunAsync()) " in background" else "" - throw new HiveSQLException(s"Error operating $getType$tips: ${e.getMessage}", e) - } + logError(s"Error operating $getType with $statementId", e) + super.setState(OperationState.ERROR) + HiveThriftServer2.eventManager.onStatementError( + statementId, e.getMessage, Utils.exceptionString(e)) + e match { + case _: HiveSQLException => throw e + case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e) } } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 102fd77c06f3a..fd3a638c4fa44 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -21,9 +21,6 @@ import java.sql.SQLException import org.apache.hive.service.cli.HiveSQLException -import org.apache.spark.sql.hive.HiveUtils -import org.apache.spark.sql.types._ - trait ThriftServerWithSparkContextSuite extends SharedThriftServer { test("the scratch dir will be deleted during server start but recreated with new operation") { @@ -55,51 +52,31 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { test("Full stack traces as error message for jdbc or thrift client") { val sql = "select date_sub(date'2011-11-11', '1.2')" - val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String] + withCLIServiceClient { client => + val sessionHandle = client.openSession(user, "") - withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")) { - withCLIServiceClient { client => - val sessionHandle = client.openSession(user, "") - val e = intercept[HiveSQLException] { - client.executeStatement(sessionHandle, sql, confOverlay) - } - assert(e.getMessage - .contains("The second argument of 'date_sub' function needs to be an integer.")) - assert(!e.getMessage - .contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) + val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String] + val e = intercept[HiveSQLException] { + client.executeStatement( + sessionHandle, + sql, + confOverlay) } - } - - withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")) { - withCLIServiceClient { client => - val sessionHandle = client.openSession(user, "") - val opHandle = client.executeStatementAsync(sessionHandle, sql, confOverlay) - var status = client.getOperationStatus(opHandle) - while (!status.getState.isTerminal) { - Thread.sleep(10) - status = client.getOperationStatus(opHandle) - } - val e = status.getOperationException - assert(e.getMessage - .contains("The second argument of 'date_sub' function needs to be an integer.")) - assert(e.getMessage - .contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) - } + assert(e.getMessage + .contains("The second argument of 'date_sub' function needs to be an integer.")) + assert(!e.getMessage.contains("" + + "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) } - Seq("true", "false").foreach { value => - withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, value)) { - withJdbcStatement { statement => - val e = intercept[SQLException] { - statement.executeQuery(sql) - } - assert(e.getMessage.contains( - "The second argument of 'date_sub' function needs to be an integer.")) - assert(e.getMessage.contains( - "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) - } + withJdbcStatement { statement => + val e = intercept[SQLException] { + statement.executeQuery(sql) } + assert(e.getMessage + .contains("The second argument of 'date_sub' function needs to be an integer.")) + assert(e.getMessage.contains("" + + "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2")) } } }