Skip to content

Commit 7fa5fff

Browse files
lianchengpwendell
authored andcommitted
[SPARK-4308][SQL] Sets SQL operation state to ERROR when exception is thrown
In `HiveThriftServer2`, when an exception is thrown during a SQL execution, the SQL operation state should be set to `ERROR`, but now it remains `RUNNING`. This affects the result of the `GetOperationStatus` Thrift API. Author: Cheng Lian <[email protected]> Closes #3175 from liancheng/fix-op-state and squashes the following commits: 6d4c1fe [Cheng Lian] Sets SQL operation state to ERROR when exception is thrown
1 parent e924426 commit 7fa5fff

File tree

3 files changed

+21
-29
lines changed

3 files changed

+21
-29
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive.thriftserver
1919

2020
import scala.collection.JavaConversions._
2121

22-
import java.util.{ArrayList => JArrayList}
23-
2422
import org.apache.commons.lang.exception.ExceptionUtils
2523
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
2624
import org.apache.hadoop.hive.ql.Driver

sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap}
2525
import scala.math._
2626

2727
import org.apache.hadoop.hive.common.`type`.HiveDecimal
28-
import org.apache.hadoop.hive.conf.HiveConf
2928
import org.apache.hadoop.hive.metastore.api.FieldSchema
30-
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
3129
import org.apache.hadoop.hive.shims.ShimLoader
3230
import org.apache.hadoop.security.UserGroupInformation
3331
import org.apache.hive.service.cli._
@@ -37,9 +35,9 @@ import org.apache.hive.service.cli.session.HiveSession
3735
import org.apache.spark.Logging
3836
import org.apache.spark.sql.catalyst.plans.logical.SetCommand
3937
import org.apache.spark.sql.catalyst.types._
40-
import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
41-
import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
4238
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
39+
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
40+
import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow}
4341

4442
/**
4543
* A compatibility layer for interacting with Hive version 0.12.0.
@@ -71,8 +69,9 @@ private[hive] class SparkExecuteStatementOperation(
7169
statement: String,
7270
confOverlay: JMap[String, String])(
7371
hiveContext: HiveContext,
74-
sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
75-
parentSession, statement, confOverlay) with Logging {
72+
sessionToActivePool: SMap[HiveSession, String])
73+
extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging {
74+
7675
private var result: SchemaRDD = _
7776
private var iter: Iterator[SparkRow] = _
7877
private var dataTypes: Array[DataType] = _
@@ -217,6 +216,7 @@ private[hive] class SparkExecuteStatementOperation(
217216
// Actually do need to catch Throwable as some failures don't inherit from Exception and
218217
// HiveServer will silently swallow them.
219218
case e: Throwable =>
219+
setState(OperationState.ERROR)
220220
logError("Error executing query:",e)
221221
throw new HiveSQLException(e.toString)
222222
}

sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap}
2727
import scala.math._
2828

2929
import org.apache.hadoop.hive.conf.HiveConf
30+
import org.apache.hadoop.hive.metastore.api.FieldSchema
3031
import org.apache.hadoop.hive.ql.metadata.Hive
31-
import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
3232
import org.apache.hadoop.hive.ql.session.SessionState
33-
import org.apache.hadoop.hive.metastore.api.FieldSchema
3433
import org.apache.hadoop.hive.shims.ShimLoader
3534
import org.apache.hadoop.security.UserGroupInformation
3635
import org.apache.hive.service.cli._
@@ -39,9 +38,9 @@ import org.apache.hive.service.cli.session.HiveSession
3938

4039
import org.apache.spark.Logging
4140
import org.apache.spark.sql.catalyst.types._
42-
import org.apache.spark.sql.{Row => SparkRow, SchemaRDD}
43-
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
4441
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
42+
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
43+
import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
4544

4645
/**
4746
* A compatibility layer for interacting with Hive version 0.13.1.
@@ -100,6 +99,7 @@ private[hive] class SparkExecuteStatementOperation(
10099
// Actually do need to catch Throwable as some failures don't inherit from Exception and
101100
// HiveServer will silently swallow them.
102101
case e: Throwable =>
102+
setState(OperationState.ERROR)
103103
logError("Error executing query:",e)
104104
throw new HiveSQLException(e.toString)
105105
}
@@ -191,14 +191,12 @@ private[hive] class SparkExecuteStatementOperation(
191191
try {
192192
sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
193193
}
194-
catch {
195-
case e: IllegalArgumentException => {
196-
throw new HiveSQLException("Error applying statement specific settings", e)
197-
}
194+
catch { case e: IllegalArgumentException =>
195+
throw new HiveSQLException("Error applying statement specific settings", e)
198196
}
199197
}
200198
}
201-
return sqlOperationConf
199+
sqlOperationConf
202200
}
203201

204202
def run(): Unit = {
@@ -216,7 +214,7 @@ private[hive] class SparkExecuteStatementOperation(
216214
val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
217215

218216
val backgroundOperation: Runnable = new Runnable {
219-
def run {
217+
def run() {
220218
val doAsAction: PrivilegedExceptionAction[AnyRef] =
221219
new PrivilegedExceptionAction[AnyRef] {
222220
def run: AnyRef = {
@@ -225,23 +223,19 @@ private[hive] class SparkExecuteStatementOperation(
225223
try {
226224
runInternal(statement)
227225
}
228-
catch {
229-
case e: HiveSQLException => {
230-
setOperationException(e)
231-
logError("Error running hive query: ", e)
232-
}
226+
catch { case e: HiveSQLException =>
227+
setOperationException(e)
228+
logError("Error running hive query: ", e)
233229
}
234-
return null
230+
null
235231
}
236232
}
237233
try {
238234
ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
239235
}
240-
catch {
241-
case e: Exception => {
242-
setOperationException(new HiveSQLException(e))
243-
logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
244-
}
236+
catch { case e: Exception =>
237+
setOperationException(new HiveSQLException(e))
238+
logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
245239
}
246240
setState(OperationState.FINISHED)
247241
}

0 commit comments

Comments
 (0)