From ac7f0d4aedfb165a8ca4d1663b6eb2a62bfc6f7a Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 15 Aug 2017 18:13:26 +0200 Subject: [PATCH] [SPARK-21738] Thriftserver doesn't cancel jobs when session is closed --- .../thriftserver/SparkExecuteStatementOperation.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 1d1074a2a7387..f5191fa9132bd 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -71,9 +71,9 @@ private[hive] class SparkExecuteStatementOperation( def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - sqlContext.sparkContext.clearJobGroup() logDebug(s"CLOSING $statementId") cleanup(OperationState.CLOSED) + sqlContext.sparkContext.clearJobGroup() } def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) { @@ -273,9 +273,6 @@ private[hive] class SparkExecuteStatementOperation( override def cancel(): Unit = { logInfo(s"Cancel '$statement' with $statementId") - if (statementId != null) { - sqlContext.sparkContext.cancelJobGroup(statementId) - } cleanup(OperationState.CANCELED) } @@ -287,6 +284,9 @@ private[hive] class SparkExecuteStatementOperation( backgroundHandle.cancel(true) } } + if (statementId != null) { + sqlContext.sparkContext.cancelJobGroup(statementId) + } } }