Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -926,13 +926,23 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val THRIFTSERVER_FORCE_CANCEL =
buildConf("spark.sql.thriftServer.interruptOnCancel")
.doc("When true, all running tasks will be interrupted if one cancels a query. " +
"When false, all running tasks will remain until finished.")
.version("3.2.0")
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L134 and https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L270 unconditionally set interruptOnCancel=true for broadcasts and for streaming queries. https://issues.apache.org/jira/browse/HDFS-1208 remains open, but it seems there's not been issues resulting from using true in the other places.

Given that, do we need to bother protecting it with a config for Thriftserver, or at least this config could be flipped to be enabled by default? Having it default to false limits the default cancellation behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine to enable it by default. In fact, we have enabled it internally and have not seen any issue. @HyukjinKwon what do you think ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah,I am fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #41047

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems SPARK-34064 is needed to cancel broadcast jobs


val THRIFTSERVER_QUERY_TIMEOUT =
buildConf("spark.sql.thriftServer.queryTimeout")
.doc("Set a query duration timeout in seconds in Thrift Server. If the timeout is set to " +
"a positive value, a running query will be cancelled automatically when the timeout is " +
"exceeded, otherwise the query continues to run till completion. If timeout values are " +
"set for each statement via `java.sql.Statement.setQueryTimeout` and they are smaller " +
"than this configuration value, they take precedence.")
"than this configuration value, they take precedence. If you set this timeout and prefer" +
"to cancel the queries right away without waiting task to finish, consider enabling" +
s"${THRIFTSERVER_FORCE_CANCEL.key} together.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ private[hive] class SparkExecuteStatementOperation(
}
}

private val forceCancel = sqlContext.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL)

private val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) {
new VariableSubstitution().substitute(statement)
}
Expand Down Expand Up @@ -131,7 +133,7 @@ private[hive] class SparkExecuteStatementOperation(

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
try {
sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement)
sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel)
getNextRowSetInternal(order, maxRowsL)
} finally {
sqlContext.sparkContext.clearJobGroup()
Expand Down Expand Up @@ -321,7 +323,7 @@ private[hive] class SparkExecuteStatementOperation(
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
}

sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement)
sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel)
result = sqlContext.sql(statement)
logDebug(result.queryExecution.toString())
HiveThriftServer2.eventManager.onStatementParsed(statementId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package org.apache.spark.sql.hive.thriftserver

import java.sql.SQLException
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.hive.service.cli.HiveSQLException

import org.apache.spark.TaskKilled
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.internal.SQLConf

trait ThriftServerWithSparkContextSuite extends SharedThriftServer {

test("the scratch dir will be deleted during server start but recreated with new operation") {
Expand Down Expand Up @@ -79,6 +84,38 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
"java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}

test("SPARK-33526: Add config to control if cancel invoke interrupt task on thriftserver") {
withJdbcStatement { statement =>
val forceCancel = new AtomicBoolean(false)
val listener = new SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
assert(taskEnd.reason.isInstanceOf[TaskKilled])
if (forceCancel.get()) {
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime < 1000)
} else {
// avoid accuracy, we check 2s instead of 3s.
assert(System.currentTimeMillis() - taskEnd.taskInfo.launchTime >= 2000)
}
}
}

spark.sparkContext.addSparkListener(listener)
try {
statement.execute(s"SET ${SQLConf.THRIFTSERVER_QUERY_TIMEOUT.key}=1")
Seq(true, false).foreach { force =>
statement.execute(s"SET ${SQLConf.THRIFTSERVER_FORCE_CANCEL.key}=$force")
forceCancel.set(force)
val e1 = intercept[SQLException] {
statement.execute("select java_method('java.lang.Thread', 'sleep', 3000L)")
}.getMessage
assert(e1.contains("Query timed out"))
}
} finally {
spark.sparkContext.removeSparkListener(listener)
}
}
}
}


Expand Down