Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Map => JMap}
import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
Expand All @@ -37,7 +35,6 @@ import org.apache.spark.sql.execution.HiveResult.{getTimeFormatters, toHiveStrin
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.{Utils => SparkUtils}

private[hive] class SparkExecuteStatementOperation(
val sqlContext: SQLContext,
Expand Down Expand Up @@ -112,7 +109,7 @@ private[hive] class SparkExecuteStatementOperation(
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
logInfo(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
s"with ${statementId}")
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
Expand Down Expand Up @@ -181,7 +178,7 @@ private[hive] class SparkExecuteStatementOperation(
resultOffset += 1
}
previousFetchEndOffset = resultOffset
log.info(s"Returning result set with ${curRow} rows from offsets " +
logInfo(s"Returning result set with ${curRow} rows from offsets " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is log.info and logInfo the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logInfo will check log.isInfoEnabled

s"[$previousFetchStartOffset, $previousFetchEndOffset) with $statementId")
resultRowSet
}
Expand Down Expand Up @@ -218,7 +215,9 @@ private[hive] class SparkExecuteStatementOperation(
execute()
}
} catch {
case e: HiveSQLException => setOperationException(e)
case e: HiveSQLException =>
setOperationException(e)
logError(s"Error executing query with $statementId,", e)
}
}
}
Expand All @@ -238,21 +237,7 @@ private[hive] class SparkExecuteStatementOperation(
val backgroundHandle =
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
Copy link
Contributor

@cloud-fan cloud-fan Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can keep this.

setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
} catch onError()
}
}

Expand Down Expand Up @@ -292,30 +277,7 @@ private[hive] class SparkExecuteStatementOperation(
}
dataTypes = result.schema.fields.map(_.dataType)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
// When cancel() or close() is called very quickly after the query is started,
// then they may both call cleanup() before Spark Jobs are started. But before background
// task interrupted, it may have start some spark job, so we need to cancel again to
// make sure job was cancelled when background thread was interrupted
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
val currentState = getStatus().getState()
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException("Error running query: " + e.toString, e)
}
}
onError(needCancel = true)
} finally {
synchronized {
if (!getStatus.getState.isTerminal) {
Expand Down Expand Up @@ -346,9 +308,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
// RDDs will be cleaned automatically upon garbage collection.
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId)
}
sqlContext.sparkContext.cancelJobGroup(statementId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.concurrent.RejectedExecutionException

import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.Operation

Expand Down Expand Up @@ -94,15 +96,32 @@ private[hive] trait SparkOperation extends Operation with Logging {
throw new IllegalArgumentException(s"Unknown table type is found: $t")
}

protected def onError(): PartialFunction[Throwable, Unit] = {
protected def onError(needCancel: Boolean = false): PartialFunction[Throwable, Unit] = {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError(s"Error operating $getType with $statementId", e)
super.setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e)
// When cancel() or close() is called very quickly after the query is started,
// then they may both call cleanup() before Spark Jobs are started. But before background
// task interrupted, it may have start some spark job, so we need to cancel again to
// make sure job was cancelled when background thread was interrupted
if (needCancel) sqlContext.sparkContext.cancelJobGroup(statementId)
val currentState = getStatus.getState
if (currentState.isTerminal) {
// This may happen if the execution was cancelled, and then closed from another thread.
logWarning(s"Ignore exception in terminal state with $statementId: $e")
} else {
super.setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case rejected: RejectedExecutionException =>
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case _ =>
val tips = if (shouldRunAsync()) " in background" else ""
throw new HiveSQLException(s"Error operating $getType$tips: ${e.getMessage}", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we missed some details:

  1. Shall we expose the internal details (getType) to users or just say Error running query:?
  2. Shall we include the full exception string (e.toString like https://github.com/apache/spark/pull/29204/files#diff-72dcd8f81a51c8a815159fdf0332acdcL316), not just the message?

Since the PR is small, can we revert and resend it after proper discussion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.sql.SQLException

import org.apache.hive.service.cli.HiveSQLException

import org.apache.spark.sql.hive.HiveUtils

trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("the scratch dir will be deleted during server start but recreated with new operation") {
Expand Down Expand Up @@ -52,31 +54,51 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("Full stack traces as error message for jdbc or thrift client") {
val sql = "select date_sub(date'2011-11-11', '1.2')"
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]

val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
val e = intercept[HiveSQLException] {
client.executeStatement(
sessionHandle,
sql,
confOverlay)
withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")) {
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val e = intercept[HiveSQLException] {
client.executeStatement(sessionHandle, sql, confOverlay)
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(!e.getMessage
.contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}

withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")) {
withCLIServiceClient { client =>
val sessionHandle = client.openSession(user, "")
val opHandle = client.executeStatementAsync(sessionHandle, sql, confOverlay)
var status = client.getOperationStatus(opHandle)
while (!status.getState.isTerminal) {
Thread.sleep(10)
status = client.getOperationStatus(opHandle)
}
val e = status.getOperationException

assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(!e.getMessage.contains("" +
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage
.contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}

withJdbcStatement { statement =>
val e = intercept[SQLException] {
statement.executeQuery(sql)
Seq("true", "false").foreach { value =>
withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, value)) {
withJdbcStatement { statement =>
val e = intercept[SQLException] {
statement.executeQuery(sql)
}
assert(e.getMessage.contains(
"The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage.contains(
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}
assert(e.getMessage
.contains("The second argument of 'date_sub' function needs to be an integer."))
assert(e.getMessage.contains("" +
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}
}
Expand Down