diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d4c7dd7f3160c..f0cde5d9b4a8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -893,6 +893,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val THRIFTSERVER_QUERY_TIMEOUT = + buildConf("spark.sql.thriftServer.queryTimeout") + .doc("Set a query duration timeout in seconds in Thrift Server. If the timeout is set to " + + "a positive value, a running query will be cancelled automatically when the timeout is " + + "exceeded, otherwise the query continues to run till completion. If timeout values are " + + "set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " + + "than this configuration value, they take precedence.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(0L) + val THRIFTSERVER_UI_STATEMENT_LIMIT = buildConf("spark.sql.thriftserver.ui.retainedStatements") .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.") diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java index 75edc5763ce44..3df842d2b4af9 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/OperationManager.java @@ -97,7 +97,8 @@ public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parent public ExecuteStatementOperation newExecuteStatementOperation(HiveSession parentSession, String statement, Map confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException { - return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync); + return newExecuteStatementOperation(parentSession, statement, confOverlay, runAsync, + queryTimeout); } public GetTypeInfoOperation newGetTypeInfoOperation(HiveSession parentSession) { @@ -207,6 +208,7 @@ public void cancelOperation(OperationHandle opHandle) throws HiveSQLException { Operation operation = getOperation(opHandle); OperationState opState = operation.getStatus().getState(); if (opState == OperationState.CANCELED || + opState == OperationState.TIMEDOUT || opState == OperationState.CLOSED || opState == OperationState.FINISHED || opState == OperationState.ERROR || diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java index e2ac1ea78c1ab..894793152f409 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -155,11 +155,12 @@ private void runQuery(HiveConf sqlOperationConf) throws HiveSQLException { throw toSQLException("Error while processing statement", response); } } catch (HiveSQLException e) { - // If the operation was cancelled by another thread, + // If the operation was cancelled by another thread or timed out, // Driver#run will return a non-zero response code. - // We will simply return if the operation state is CANCELED, + // We will simply return if the operation state is CANCELED or TIMEDOUT, // otherwise throw an exception - if (getStatus().getState() == OperationState.CANCELED) { + if (getStatus().getState() == OperationState.CANCELED || + getStatus().getState() == OperationState.TIMEDOUT) { return; } else { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 4e6729faced43..a1f2d62a0b72c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -116,7 +116,7 @@ object HiveThriftServer2 extends Logging { } private[thriftserver] object ExecutionState extends Enumeration { - val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value + val STARTED, COMPILED, CANCELED, TIMEDOUT, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index ec2c795e95c83..bc8cc16746a30 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.util.{Arrays, Map => JMap} -import java.util.concurrent.RejectedExecutionException +import java.util.concurrent.{Executors, RejectedExecutionException, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -45,11 +45,24 @@ private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], - runInBackground: Boolean = true) + runInBackground: Boolean = true, + queryTimeout: Long) extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with SparkOperation with Logging { + // If a timeout value `queryTimeout` is specified by users and it is smaller than + // a global timeout value, we use the user-specified value. + // This code follows the Hive timeout behaviour (See #29933 for details). + private val timeout = { + val globalTimeout = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_QUERY_TIMEOUT) + if (globalTimeout > 0 && (queryTimeout <= 0 || globalTimeout < queryTimeout)) { + globalTimeout + } else { + queryTimeout + } + } + private var result: DataFrame = _ // We cache the returned rows to get iterators again in case the user wants to use FETCH_FIRST. @@ -200,6 +213,23 @@ private[hive] class SparkExecuteStatementOperation( parentSession.getUsername) setHasResultSet(true) // avoid no resultset for async run + if (timeout > 0) { + val timeoutExecutor = Executors.newSingleThreadScheduledExecutor() + timeoutExecutor.schedule(new Runnable { + override def run(): Unit = { + try { + timeoutCancel() + } catch { + case NonFatal(e) => + setOperationException(new HiveSQLException(e)) + logError(s"Error cancelling the query after timeout: $timeout seconds") + } finally { + timeoutExecutor.shutdown() + } + } + }, timeout, TimeUnit.SECONDS) + } + if (!runInBackground) { execute() } else { @@ -328,6 +358,17 @@ private[hive] class SparkExecuteStatementOperation( } } + def timeoutCancel(): Unit = { + synchronized { + if (!getStatus.getState.isTerminal) { + logInfo(s"Query with $statementId timed out after $timeout seconds") + setState(OperationState.TIMEDOUT) + cleanup() + HiveThriftServer2.eventManager.onStatementTimeout(statementId) + } + } + } + override def cancel(): Unit = { synchronized { if (!getStatus.getState.isTerminal) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index bc9c13eb0d4f8..ba42eefed2a22 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -44,14 +44,15 @@ private[thriftserver] class SparkSQLOperationManager() parentSession: HiveSession, statement: String, confOverlay: JMap[String, String], - async: Boolean): ExecuteStatementOperation = synchronized { + async: Boolean, + queryTimeout: Long): ExecuteStatementOperation = synchronized { val sqlContext = sessionToContexts.get(parentSession.getSessionHandle) require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" + s" initialized or had already closed.") val conf = sqlContext.sessionState.conf val runInBackground = async && conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC) val operation = new SparkExecuteStatementOperation( - sqlContext, parentSession, statement, confOverlay, runInBackground) + sqlContext, parentSession, statement, confOverlay, runInBackground, queryTimeout) handleToOperation.put(operation.getHandle, operation) logDebug(s"Created Operation for $statement with session=$parentSession, " + s"runInBackground=$runInBackground") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala index 5cb78f6e64650..8bd8f29a4b9ec 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala @@ -119,6 +119,7 @@ private[thriftserver] class ExecutionInfo( def isExecutionActive: Boolean = { !(state == ExecutionState.FAILED || state == ExecutionState.CANCELED || + state == ExecutionState.TIMEDOUT || state == ExecutionState.CLOSED) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala index fa04c67896a69..202fdf33c0dd9 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala @@ -57,6 +57,10 @@ private[thriftserver] class HiveThriftServer2EventManager(sc: SparkContext) { postLiveListenerBus(SparkListenerThriftServerOperationCanceled(id, System.currentTimeMillis())) } + def onStatementTimeout(id: String): Unit = { + postLiveListenerBus(SparkListenerThriftServerOperationTimeout(id, System.currentTimeMillis())) + } + def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = { postLiveListenerBus(SparkListenerThriftServerOperationError(id, errorMsg, errorTrace, System.currentTimeMillis())) @@ -96,6 +100,9 @@ private[thriftserver] case class SparkListenerThriftServerOperationParsed( private[thriftserver] case class SparkListenerThriftServerOperationCanceled( id: String, finishTime: Long) extends SparkListenerEvent +private[thriftserver] case class SparkListenerThriftServerOperationTimeout( + id: String, finishTime: Long) extends SparkListenerEvent + private[thriftserver] case class SparkListenerThriftServerOperationError( id: String, errorMsg: String, diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 6b7e5ee611417..4cf672e3d9d9e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -119,6 +119,7 @@ private[thriftserver] class HiveThriftServer2Listener( case e: SparkListenerThriftServerOperationStart => onOperationStart(e) case e: SparkListenerThriftServerOperationParsed => onOperationParsed(e) case e: SparkListenerThriftServerOperationCanceled => onOperationCanceled(e) + case e: SparkListenerThriftServerOperationTimeout => onOperationTimeout(e) case e: SparkListenerThriftServerOperationError => onOperationError(e) case e: SparkListenerThriftServerOperationFinish => onOperationFinished(e) case e: SparkListenerThriftServerOperationClosed => onOperationClosed(e) @@ -181,6 +182,15 @@ private[thriftserver] class HiveThriftServer2Listener( case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") } + private def onOperationTimeout(e: SparkListenerThriftServerOperationTimeout): Unit = + Option(executionList.get(e.id)) match { + case Some(executionData) => + executionData.finishTimestamp = e.finishTime + executionData.state = ExecutionState.TIMEDOUT + updateLiveStore(executionData) + case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") + } + private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = Option(executionList.get(e.id)) match { case Some(executionData) => diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 75c00000dee47..7cc60bb505089 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -46,6 +46,7 @@ import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.HiveTestJars +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.{ThreadUtils, Utils} @@ -285,7 +286,6 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } test("test multiple session") { - import org.apache.spark.sql.internal.SQLConf var defaultV1: String = null var defaultV2: String = null var data: ArrayBuffer[Int] = null @@ -880,6 +880,59 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { assert(rs.getString(1) === expected.toString) } } + + test("SPARK-26533: Support query auto timeout cancel on thriftserver - setQueryTimeout") { + withJdbcStatement() { statement => + statement.setQueryTimeout(1) + val e = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)") + }.getMessage + assert(e.contains("Query timed out after")) + + statement.setQueryTimeout(0) + val rs1 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs1.next() + assert(rs1.getString(1) == "test") + + statement.setQueryTimeout(-1) + val rs2 = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs2.next() + assert(rs2.getString(1) == "test") + } + } + + test("SPARK-26533: Support query auto timeout cancel on thriftserver - SQLConf") { + withJdbcStatement() { statement => + statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1") + val e1 = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)") + }.getMessage + assert(e1.contains("Query timed out after")) + + statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=0") + val rs = statement.executeQuery( + "select 'test', java_method('java.lang.Thread', 'sleep', 3000L)") + rs.next() + assert(rs.getString(1) == "test") + + // Uses a smaller timeout value of a config value and an a user-specified one + statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1") + statement.setQueryTimeout(30) + val e2 = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)") + }.getMessage + assert(e2.contains("Query timed out after")) + + statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=30") + statement.setQueryTimeout(1) + val e3 = intercept[SQLException] { + statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)") + }.getMessage + assert(e3.contains("Query timed out after")) + } + } } class SingleSessionSuite extends HiveThriftJdbcTest { diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala index ca1f9a2f74244..c8bb6d9ee0821 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperationSuite.scala @@ -61,6 +61,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark Seq( (OperationState.CANCELED, (_: SparkExecuteStatementOperation).cancel()), + (OperationState.TIMEDOUT, (_: SparkExecuteStatementOperation).timeoutCancel()), (OperationState.CLOSED, (_: SparkExecuteStatementOperation).close()) ).foreach { case (finalState, transition) => test("SPARK-32057 SparkExecuteStatementOperation should not transiently become ERROR " + @@ -109,7 +110,7 @@ class SparkExecuteStatementOperationSuite extends SparkFunSuite with SharedSpark signal: Semaphore, finalState: OperationState) extends SparkExecuteStatementOperation(sqlContext, hiveSession, statement, - new util.HashMap, false) { + new util.HashMap, false, 0) { override def cleanup(): Unit = { super.cleanup() diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala index 9a9f574153a0a..3f0538dd1c943 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala @@ -151,6 +151,7 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { "stmt", "groupId", 0)) listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query")) listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0)) + listener.onOtherEvent(SparkListenerThriftServerOperationTimeout(unknownOperation, 0)) listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation, "msg", "trace", 0)) listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0))