Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ kyuubi\.metrics<br>\.reporters|<div style='width: 80pt;word-wrap: break-word;whi
Key | Default | Meaning | Since
--- | --- | --- | ---
kyuubi\.operation\.idle<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT3H</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Operation will be closed when it's not accessed for this duration of time</div>|<div style='width: 20pt'>1.0.0</div>
kyuubi\.operation<br>\.interrupt\.on\.cancel|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>When true, all running tasks will be interrupted if one cancels a query. When false, all running tasks will remain until finished.</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.operation<br>\.query\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT0S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Set a query duration timeout in seconds in Kyuubi. If the timeout is set to a positive value, a running query will be cancelled automatically if timeout. Otherwise the query continues to run till completion. If timeout values are set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller than this configuration value, they take precedence. If you set this timeout and prefer to cancel the queries right away without waiting task to finish, consider enabling kyuubi.operation.interrupt.on.cancel together.</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.operation<br>\.status\.polling<br>\.timeout|<div style='width: 80pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 200pt;word-wrap: break-word;white-space: normal'>Timeout(ms) for long polling asynchronous running sql query's status</div>|<div style='width: 20pt'>1.0.0</div>

### Session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,32 @@

package org.apache.kyuubi.engine.spark.operation

import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.{RejectedExecutionException, TimeUnit}

import scala.util.control.NonFatal

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.{ArrayFetchIterator, KyuubiSparkUtil}
import org.apache.kyuubi.operation.{OperationState, OperationType}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
import org.apache.kyuubi.util.ThreadUtils

class ExecuteStatement(
spark: SparkSession,
session: Session,
protected override val statement: String,
override val shouldRunAsync: Boolean)
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends SparkOperation(spark, OperationType.EXECUTE_STATEMENT, session) with Logging {

private val forceCancel =
session.sessionManager.getConf.get(KyuubiConf.OPERATION_FORCE_CANCEL)

private val operationLog: OperationLog =
OperationLog.createOperationLog(session.handle, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
Expand Down Expand Up @@ -63,7 +71,7 @@ class ExecuteStatement(
setState(OperationState.RUNNING)
info(KyuubiSparkUtil.diagnostics(spark))
Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
spark.sparkContext.setJobGroup(statementId, statement)
spark.sparkContext.setJobGroup(statementId, statement, forceCancel)
result = spark.sql(statement)
debug(result.queryExecution)
iter = new ArrayFetchIterator(result.collect())
Expand All @@ -76,6 +84,7 @@ class ExecuteStatement(
}

override protected def runInternal(): Unit = {
addTimeoutMonitor()
if (shouldRunAsync) {
val asyncOperation = new Runnable {
override def run(): Unit = {
Expand All @@ -100,4 +109,27 @@ class ExecuteStatement(
executeStatement()
}
}

private def addTimeoutMonitor(): Unit = {
if (queryTimeout > 0) {
val timeoutExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("query-timeout-thread")
timeoutExecutor.schedule(new Runnable {
override def run(): Unit = {
try {
if (getStatus.state != OperationState.TIMEOUT) {
info(s"Query with $statementId timed out after $queryTimeout seconds")
cleanup(OperationState.TIMEOUT)
}
} catch {
case NonFatal(e) =>
setOperationException(KyuubiSQLException(e))
error(s"Error cancelling the query after timeout: $queryTimeout seconds")
} finally {
timeoutExecutor.shutdown()
}
}
}, queryTimeout, TimeUnit.SECONDS)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
runAsync: Boolean,
queryTimeout: Long): Operation = {
val spark = getSparkSession(session.handle)
val operation = new ExecuteStatement(spark, session, statement, runAsync)
val operation = new ExecuteStatement(spark, session, statement, runAsync, queryTimeout)
addOperation(operation)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark

import java.sql.{SQLTimeoutException, Statement}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}

import org.apache.spark.TaskKilled
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.SparkSession
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.JDBCTestUtils

class SparkEngineSuites extends KyuubiFunSuite {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new test class is for easily test which need modify the global config.


test("Add config to control if cancel invoke interrupt task on engine") {
Seq(true, false).foreach { force =>
withSparkJdbcStatement(Map(KyuubiConf.OPERATION_FORCE_CANCEL.key -> force.toString)) {
case (statement, spark) =>
val index = new AtomicInteger(0)
val forceCancel = new AtomicBoolean(false)
val listener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
assert(taskEnd.reason.isInstanceOf[TaskKilled])
if (forceCancel.get()) {
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 3000)
index.incrementAndGet()
} else {
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 4000)
index.incrementAndGet()
}
}
}

spark.sparkContext.addSparkListener(listener)
try {
statement.setQueryTimeout(3)
forceCancel.set(force)
val e1 = intercept[SQLTimeoutException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 5000L)")
}.getMessage
assert(e1.contains("Query timed out"))
eventually(Timeout(30.seconds)) {
assert(index.get() == 1)
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}

private def withSparkJdbcStatement(
conf: Map[String, String] = Map.empty)(
statement: (Statement, SparkSession) => Unit): Unit = {
val spark = new WithSparkSuite {
override def withKyuubiConf: Map[String, String] = conf
override protected def jdbcUrl: String = getJdbcUrl
}
spark.startSparkEngine()
val tmp: Statement => Unit = { tmpStatement =>
statement(tmpStatement, spark.getSpark)
}
try {
spark.withJdbcStatement()(tmp)
} finally {
spark.stopSparkEngine()
}
}
}

trait WithSparkSuite extends WithSparkSQLEngine with JDBCTestUtils
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
super.beforeAll()
}

protected def startSparkEngine(): Unit = {
def startSparkEngine(): Unit = {
val warehousePath = Utils.createTempDir()
val metastorePath = Utils.createTempDir()
warehousePath.toFile.delete()
Expand All @@ -63,7 +63,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
stopSparkEngine()
}

protected def stopSparkEngine(): Unit = {
def stopSparkEngine(): Unit = {
// we need to clean up conf since it's the global config in same jvm.
withKyuubiConf.foreach { case (k, _) =>
System.clearProperty(k)
Expand All @@ -83,4 +83,5 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
}

protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
def getSpark: SparkSession = spark
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,27 @@ object KyuubiConf {
.timeConf
.createWithDefault(Duration.ofSeconds(5).toMillis)

val OPERATION_FORCE_CANCEL: ConfigEntry[Boolean] =
buildConf("operation.interrupt.on.cancel")
.doc("When true, all running tasks will be interrupted if one cancels a query. " +
"When false, all running tasks will remain until finished.")
.version("1.2.0")
.booleanConf
.createWithDefault(true)

val OPERATION_QUERY_TIMEOUT: ConfigEntry[Long] =
buildConf("operation.query.timeout")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decide to hide the Spark thing that in SparkThriftServer, so define the config with Kyuubi style @pan3793

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Kyuubi style timeConf defined in ms? The comments here is in seconds.

Copy link
Member

@pan3793 pan3793 Mar 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think we don't need to specific time unit in kyuubi timeConf style, user should not care about what we use internal, they just define it via ISO-8601 duration string

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this comment. What @pan3793 pointed out is true, we don't need to tell the unit of a time conf, it's Duration ISO-8601 strings..

Here is a bug at caller side! the client timeout is conf, and the system timeout is ms... @ulysses-you

.doc("Set a query duration timeout in seconds in Kyuubi. If the timeout is set to " +
"a positive value, a running query will be cancelled automatically if timeout. " +
"Otherwise the query continues to run till completion. If timeout values are " +
"set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " +
"than this configuration value, they take precedence. If you set this timeout and prefer " +
"to cancel the queries right away without waiting task to finish, consider enabling " +
s"${OPERATION_FORCE_CANCEL.key} together.")
.version("1.2.0")
.timeConf
.createWithDefault(Duration.ZERO.toMillis)

val ENGINE_SHARED_LEVEL: ConfigEntry[String] = buildConf("session.engine.share.level")
.doc("The SQL engine App will be shared in different levels, available configs are: <ul>" +
" <li>CONNECTION: the App will not be shared but only used by the current client" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
protected val patterns = Seq("", "*", "%", null, ".*", "_*", "_%", ".%")
protected def jdbcUrl: String

protected def withMultipleConnectionJdbcStatement(
def withMultipleConnectionJdbcStatement(
tableNames: String*)(fs: (Statement => Unit)*): Unit = {
val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") }
val statements = connections.map(_.createStatement())
Expand All @@ -57,7 +57,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}

protected def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
def withDatabases(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
val connections = fs.map { _ => DriverManager.getConnection(jdbcUrl, user, "") }
val statements = connections.map(_.createStatement())

Expand All @@ -75,11 +75,11 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}

protected def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
withMultipleConnectionJdbcStatement(tableNames: _*)(f)
}

protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
val hostAndPort = jdbcUrl.stripPrefix("jdbc:hive2://").split("/;").head.split(":")
val host = hostAndPort.head
val port = hostAndPort(1).toInt
Expand All @@ -96,7 +96,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}

protected def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
def withSessionHandle(f: (TCLIService.Iface, TSessionHandle) => Unit): Unit = {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername(user)
Expand All @@ -117,7 +117,7 @@ trait JDBCTestUtils extends KyuubiFunSuite {
}
}

protected def checkGetSchemas(
def checkGetSchemas(
rs: ResultSet, dbNames: Seq[String], catalogName: String = ""): Unit = {
var count = 0
while(rs.next()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.operation

import java.sql.{Date, SQLException, Timestamp}
import java.sql.{Date, SQLException, SQLTimeoutException, Timestamp}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -327,4 +327,26 @@ trait JDBCTests extends BasicJDBCTests {
assert(metaData.getScale(1) === 0)
}
}

test("Support query auto timeout cancel on thriftserver - setQueryTimeout") {
withJdbcStatement() { statement =>
statement.setQueryTimeout(1)
val e = intercept[SQLTimeoutException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 10000L)")
}.getMessage
assert(e.contains("Query timed out after"))

statement.setQueryTimeout(0)
val rs1 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs1.next()
assert(rs1.getString(1) == "test")

statement.setQueryTimeout(-1)
val rs2 = statement.executeQuery(
"select 'test', java_method('java.lang.Thread', 'sleep', 3000L)")
rs2.next()
assert(rs2.getString(1) == "test")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class ExecuteStatement(
client: TCLIService.Iface,
remoteSessionHandle: TSessionHandle,
override val statement: String,
override val shouldRunAsync: Boolean)
override val shouldRunAsync: Boolean,
queryTimeout: Long)
extends KyuubiOperation(
OperationType.EXECUTE_STATEMENT, session, client, remoteSessionHandle) {

Expand Down Expand Up @@ -71,6 +72,7 @@ class ExecuteStatement(

val req = new TExecuteStatementReq(remoteSessionHandle, statement)
req.setRunAsync(shouldRunAsync)
req.setQueryTimeout(queryTimeout)
val resp = client.ExecuteStatement(req)
verifyTStatus(resp.getStatus)
_remoteOpHandle = resp.getOperationHandle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.hive.service.rpc.thrift.{TCLIService, TFetchResultsReq, TRowSet, TSessionHandle}

import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.FetchOrientation.FetchOrientation
import org.apache.kyuubi.session.{Session, SessionHandle}
import org.apache.kyuubi.util.ThriftUtils
Expand Down Expand Up @@ -49,6 +50,18 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
tSessionHandle
}

private def getQueryTimeout(clientQueryTimeout: Long): Long = {
// If clientQueryTimeout is smaller than systemQueryTimeout value,
// we use the clientQueryTimeout value.
val systemQueryTimeout = getConf.get(KyuubiConf.OPERATION_QUERY_TIMEOUT)
if (clientQueryTimeout > 0 &&
(systemQueryTimeout <= 0 || clientQueryTimeout < systemQueryTimeout)) {
clientQueryTimeout
} else {
systemQueryTimeout
}
}

def setConnection(
sessionHandle: SessionHandle,
client: TCLIService.Iface,
Expand All @@ -69,9 +82,9 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
queryTimeout: Long): Operation = {
val client = getThriftClient(session.handle)
val remoteSessionHandle = getRemoteTSessionHandle(session.handle)
val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync)
val operation = new ExecuteStatement(session, client, remoteSessionHandle, statement, runAsync,
getQueryTimeout(queryTimeout))
addOperation(operation)

}

override def newGetTypeInfoOperation(session: Session): Operation = {
Expand Down Expand Up @@ -143,7 +156,6 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
addOperation(operation)
}


override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation, maxRows: Int): TRowSet = {
Expand Down