Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val THRIFTSERVER_QUERY_TIMEOUT =
buildConf("spark.sql.thriftServer.queryTimeout")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rewrite this as:

Set a query duration timeout in seconds in ThriftServer. If the timeout is set to a positive value, a running query will be cancelled automatically when the timeout is exceeded, otherwise the query continues to run till completion. Timeout values that are set for each statement via`java.sql.Statement.setQueryTimeout` take precedence.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

.doc("Set a query duration timeout in seconds in Thrift Server. If the timeout is set to " +
"a positive value, a running query will be cancelled automatically when the timeout is " +
"exceeded, otherwise the query continues to run till completion. If timeout values are " +
"set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " +
"than this configuration value, they take precedence.")
.version("3.1.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this version correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think so.

.timeConf(TimeUnit.SECONDS)
.createWithDefault(0L)

val THRIFTSERVER_UI_STATEMENT_LIMIT =
buildConf("spark.sql.thriftserver.ui.retainedStatements")
.doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parent
public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession,
String statement, Map<String, String> confOverlay, boolean runAsync, long queryTimeout)
throws HiveSQLException {
return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync);
return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync,
queryTimeout);
}

public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) {
Expand Down Expand Up @@ -207,6 +208,7 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
Operation operation = getOperation(opHandle);
OperationState opState = operation.getStatus().getState();
if (opState == OperationState.CANCELED ||
opState == OperationState.TIMEDOUT ||
opState == OperationState.CLOSED ||
opState == OperationState.FINISHED ||
opState == OperationState.ERROR ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,12 @@ private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException {
throw toSQLException("Error while processing statement", response);
}
} catch (HiveSQLException e) {
// If the operation was cancelled by another thread,
// If the operation was cancelled by another thread or timed out,
// Driver#run will return a non-zero response code.
// We will simply return if the operation state is CANCELED,
// We will simply return if the operation state is CANCELED or TIMEDOUT,
// otherwise throw an exception
if (getStatus().getState() == OperationState.CANCELED) {
if (getStatus().getState() == OperationState.CANCELED ||
getStatus().getState() == OperationState.TIMEDOUT) {
return;
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ object HiveThriftServer2 extends Logging {
}

private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value
val STARTED, COMPILED, CANCELED, TIMEDOUT, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Map => JMap}
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.{Executors, RejectedExecutionException, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand All @@ -45,11 +45,24 @@ private[hive] class SparkExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
runInBackground: Boolean = true)
runInBackground: Boolean = true,
queryTimeout: Long)
extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground)
with SparkOperation
with Logging {

// If a timeout value `queryTimeout` is specified by users and it is smaller than
// a global timeout value, we use the user-specified value.
// This code follows the Hive timeout behaviour (See #29933 for details).
private val timeout = {
val globalTimeout = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT)
if (globalTimeout > 0 && (queryTimeout <= 0 || globalTimeout < queryTimeout)) {
globalTimeout
} else {
queryTimeout
}
}

private var result: DataFrame = _

// We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST.
Expand Down Expand Up @@ -200,6 +213,23 @@ private[hive] class SparkExecuteStatementOperation(
parentSession.getUsername)
setHasResultSet(true) // avoid no resultset for async run

if (timeout > 0) {
val timeoutExecutor = Executors.newSingleThreadScheduledExecutor()
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
try {
timeoutCancel()
} catch {
case NonFatal(e) =>
setOperationException(new HiveSQLException(e))
logError(s"Error cancelling the query after timeout: $timeout seconds")
} finally {
timeoutExecutor.shutdown()
}
}
}, timeout, TimeUnit.SECONDS)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!runInBackground) {
execute()
} else {
Expand Down Expand Up @@ -328,6 +358,17 @@ private[hive] class SparkExecuteStatementOperation(
}
}

def timeoutCancel(): Unit = {
synchronized {
if (!getStatus.getState.isTerminal) {
logInfo(s"Query with $statementId timed out after $timeout seconds")
setState(OperationState.TIMEDOUT)
cleanup()
HiveThriftServer2.eventManager.onStatementTimeout(statementId)
}
}
}

override def cancel(): Unit = {
synchronized {
if (!getStatus.getState.isTerminal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ private[thriftserver] class SparkSQLOperationManager()
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
async: Boolean,
queryTimeout: Long): ExecuteStatementOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
val conf = sqlContext.sessionState.conf
val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
val operation = new SparkExecuteStatementOperation(
sqlContext, parentSession, statement, confOverlay, runInBackground)
sqlContext, parentSession, statement, confOverlay, runInBackground, queryTimeout)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
s"runInBackground=$runInBackground")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private[thriftserver] class ExecutionInfo(
def isExecutionActive: Boolean = {
!(state == ExecutionState.FAILED ||
state == ExecutionState.CANCELED ||
state == ExecutionState.TIMEDOUT ||
state == ExecutionState.CLOSED)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ private[thriftserver] class HiveThriftServer2EventManager(sc: SparkContext) {
postLiveListenerBus(SparkListenerThriftServerOperationCanceled(id, System.currentTimeMillis()))
}

def onStatementTimeout(id: String): Unit = {
postLiveListenerBus(SparkListenerThriftServerOperationTimeout(id, System.currentTimeMillis()))
}

def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = {
postLiveListenerBus(SparkListenerThriftServerOperationError(id, errorMsg, errorTrace,
System.currentTimeMillis()))
Expand Down Expand Up @@ -96,6 +100,9 @@ private[thriftserver] case class SparkListenerThriftServerOperationParsed(
private[thriftserver] case class SparkListenerThriftServerOperationCanceled(
id: String, finishTime: Long) extends SparkListenerEvent

private[thriftserver] case class SparkListenerThriftServerOperationTimeout(
id: String, finishTime: Long) extends SparkListenerEvent

private[thriftserver] case class SparkListenerThriftServerOperationError(
id: String,
errorMsg: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private[thriftserver] class HiveThriftServer2Listener(
case e: SparkListenerThriftServerOperationStart => onOperationStart(e)
case e: SparkListenerThriftServerOperationParsed => onOperationParsed(e)
case e: SparkListenerThriftServerOperationCanceled => onOperationCanceled(e)
case e: SparkListenerThriftServerOperationTimeout => onOperationTimeout(e)
case e: SparkListenerThriftServerOperationError => onOperationError(e)
case e: SparkListenerThriftServerOperationFinish => onOperationFinished(e)
case e: SparkListenerThriftServerOperationClosed => onOperationClosed(e)
Expand Down Expand Up @@ -181,6 +182,15 @@ private[thriftserver] class HiveThriftServer2Listener(
case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
}

private def onOperationTimeout(e: SparkListenerThriftServerOperationTimeout): Unit =
Option(executionList.get(e.id)) match {
case Some(executionData) =>
executionData.finishTimestamp = e.finishTime
executionData.state = ExecutionState.TIMEDOUT
updateLiveStore(executionData)
case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}")
}

private def onOperationError(e: SparkListenerThriftServerOperationError): Unit =
Option(executionList.get(e.id)) match {
case Some(executionData) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.HiveTestJars
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ThreadUtils, Utils}
Expand Down Expand Up @@ -285,7 +286,6 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}

test("test multiple session") {
import org.apache.spark.sql.internal.SQLConf
var defaultV1: String = null
var defaultV2: String = null
var data: ArrayBuffer[Int] = null
Expand Down Expand Up @@ -880,6 +880,59 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
assert(rs.getString(1) === expected.toString)
}
}

test("SPARK-26533: Support query auto timeout cancel on thriftserver - setQueryTimeout") {
withJdbcStatement() { statement =>
statement.setQueryTimeout(1)
val e = intercept[SQLException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)")
}.getMessage
assert(e.contains("Query timed out after"))

statement.setQueryTimeout(0)
val rs1 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs1.next()
assert(rs1.getString(1) == "test")

statement.setQueryTimeout(-1)
val rs2 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs2.next()
assert(rs2.getString(1) == "test")
}
}

test("SPARK-26533: Support query auto timeout cancel on thriftserver - SQLConf") {
withJdbcStatement() { statement =>
statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1")
val e1 = intercept[SQLException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)")
}.getMessage
assert(e1.contains("Query timed out after"))

statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=0")
val rs = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs.next()
assert(rs.getString(1) == "test")

// Uses a smaller timeout value of a config value and an a user-specified one
statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1")
statement.setQueryTimeout(30)
val e2 = intercept[SQLException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)")
}.getMessage
assert(e2.contains("Query timed out after"))

statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=30")
statement.setQueryTimeout(1)
val e3 = intercept[SQLException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)")
}.getMessage
assert(e3.contains("Query timed out after"))
}
}
}

class SingleSessionSuite extends HiveThriftJdbcTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark

Seq(
(OperationState.CANCELED, (_: SparkExecuteStatementOperation).cancel()),
(OperationState.TIMEDOUT, (_: SparkExecuteStatementOperation).timeoutCancel()),
(OperationState.CLOSED, (_: SparkExecuteStatementOperation).close())
).foreach { case (finalState, transition) =>
test("SPARK-32057 SparkExecuteStatementOperation should not transiently become ERROR " +
Expand Down Expand Up @@ -109,7 +110,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark
signal: Semaphore,
finalState: OperationState)
extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement,
new util.HashMap, false) {
new util.HashMap, false, 0) {

override def cleanup(): Unit = {
super.cleanup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter {
"stmt", "groupId", 0))
listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query"))
listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0))
listener.onOtherEvent(SparkListenerThriftServerOperationTimeout(unknownOperation, 0))
listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation,
"msg", "trace", 0))
listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0))
Expand Down