Skip to content

Commit cb10771

Browse files
allenmaGitHub Enterprise
authored andcommitted
[CARMEL-6490] Lots of unfinished state sql in the query log (#1199)
* [CARMEL-6490] Lots of unfinished state sql in the query log * Fix ut
1 parent 7d73036 commit cb10771

File tree

5 files changed

+82
-23
lines changed

5 files changed

+82
-23
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkDownloadDataOperation.scala

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
4747
import org.apache.spark.sql.execution.datasources.LogicalRelation
4848
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
4949
import org.apache.spark.sql.execution.ui.StatementStart
50+
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
5051
import org.apache.spark.sql.hive.thriftserver.errors.QueryLevelRestrictionErrors
5152
import org.apache.spark.sql.hive.thriftserver.ql.{QueryLogExtInfo, QueryLogObjectList}
5253
import org.apache.spark.sql.hive.thriftserver.ui.SparkListenerThriftServerQueryExit
@@ -135,6 +136,8 @@ private[hive] class SparkDownloadDataOperation(
135136
private var schemaStr: String = _
136137
private var totalDataSize: Long = 0
137138

139+
@volatile private var failDetails: String = ""
140+
138141
override def close(): Unit = {
139142
// CARMEL-4662 Fix Download query state is incorrect.
140143
if (getStatus.getState eq OperationState.FINISHED) {
@@ -144,12 +147,25 @@ private[hive] class SparkDownloadDataOperation(
144147
SparkListenerThriftServerQueryExit(
145148
statementId,
146149
QueryLogObjectList(Option(result).map(_.queryExecution)),
147-
QueryLogExtInfo(false, totalDataSize)))
150+
QueryLogExtInfo(
151+
finalQueryStateId,
152+
failDetails,
153+
System.currentTimeMillis(),
154+
false, totalDataSize)))
148155
logInfo(s"CLOSING $statementId")
149156
cleanup(OperationState.CLOSED)
150157
sqlContext.sparkContext.clearJobGroup()
151158
}
152159

160+
private def finalQueryStateId: Int = {
161+
getStatus.getState match {
162+
case OperationState.ERROR => ExecutionState.FAILED.id
163+
case OperationState.FINISHED => ExecutionState.FINISHED.id
164+
case OperationState.CANCELED => ExecutionState.CANCELED.id
165+
case _ => ExecutionState.FINISHED.id
166+
}
167+
}
168+
153169
override def runInternal(): Unit = {
154170
setState(OperationState.PENDING)
155171
setHasResultSet(true)
@@ -301,8 +317,9 @@ private[hive] class SparkDownloadDataOperation(
301317
case NonFatal(e) =>
302318
logQueryError(s"Error executing query [$statementId]", e)
303319
setState(OperationState.ERROR)
320+
failDetails = Utils.findFirstCause(e).toString
304321
HiveThriftServer2.eventManager.onStatementError(
305-
statementId, Utils.findFirstCause(e).toString, Utils.exceptionString(e))
322+
statementId, failDetails, Utils.exceptionString(e))
306323
val exception = new HiveSQLException(e)
307324
setOperationException(exception)
308325
} finally {
@@ -592,16 +609,20 @@ private[hive] class SparkDownloadDataOperation(
592609
}
593610

594611
override def cancel(): Unit = {
612+
failDetails = s"Canceling operation, stack trace:\n" +
613+
Thread.currentThread().getStackTrace.mkString("\n")
595614
if (statementId != null) {
596-
HiveThriftServer2.eventManager.onStatementCanceled(statementId,
597-
s"Canceling operation, stack trace:\n"
598-
+ Thread.currentThread().getStackTrace.mkString("\n"))
615+
HiveThriftServer2.eventManager.onStatementCanceled(statementId, failDetails)
599616
}
600617
HiveThriftServer2.listener.onQueryExit(
601618
SparkListenerThriftServerQueryExit(
602619
statementId,
603620
QueryLogObjectList(Option(result).map(_.queryExecution)),
604-
QueryLogExtInfo(false, totalDataSize)))
621+
QueryLogExtInfo(
622+
ExecutionState.CANCELED.id,
623+
failDetails,
624+
System.currentTimeMillis(),
625+
false, totalDataSize)))
605626
cleanup(OperationState.CANCELED)
606627
}
607628

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand,
4848
import org.apache.spark.sql.execution.datasources.v2.{AppendDataExecV1, OverwriteByExpressionExecV1, V2TableWriteExec}
4949
import org.apache.spark.sql.execution.metric.SQLMetric
5050
import org.apache.spark.sql.execution.ui.StatementStart
51+
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
5152
import org.apache.spark.sql.hive.thriftserver.ql.{QueryLogExtInfo, QueryLogObjectList}
5253
import org.apache.spark.sql.hive.thriftserver.server.ResultShareGroup
5354
import org.apache.spark.sql.hive.thriftserver.ui.SparkListenerThriftServerQueryExit
@@ -83,6 +84,8 @@ private[hive] class SparkExecuteStatementOperation(
8384
private var iter: Iterator[SparkRow] = _
8485
private var dataTypes: Array[DataType] = _
8586

87+
@volatile private var failDetails: String = "" // store the query failed reason
88+
8689
private lazy val resultSchema: TableSchema = {
8790
if (result == null || result.schema.isEmpty) {
8891
new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
@@ -371,17 +374,19 @@ private[hive] class SparkExecuteStatementOperation(
371374
e match {
372375
case hiveException: HiveSQLException =>
373376
setErrorStatus(OperationState.ERROR, hiveException)
377+
failDetails = SparkUtils.findFirstCause(e).toString
374378
HiveThriftServer2.eventManager.onStatementError(
375-
statementId, SparkUtils.findFirstCause(e).toString,
379+
statementId, failDetails,
376380
SparkUtils.exceptionString(hiveException))
377381
throw hiveException
378382
case _ =>
379383
val root = ExceptionUtils.getRootCause(e)
380384
val hiveException =
381385
new HiveSQLException("Error running query: " + root.toString, root)
386+
failDetails = SparkUtils.findFirstCause(e).toString
382387
setErrorStatus(OperationState.ERROR, hiveException)
383388
HiveThriftServer2.eventManager.onStatementError(
384-
statementId, SparkUtils.findFirstCause(e).toString,
389+
statementId, failDetails,
385390
SparkUtils.exceptionString(root))
386391
throw hiveException
387392
}
@@ -398,12 +403,25 @@ private[hive] class SparkExecuteStatementOperation(
398403
SparkListenerThriftServerQueryExit(
399404
statementId,
400405
QueryLogObjectList(Option(result).map(_.queryExecution)),
401-
QueryLogExtInfo(isSpill, finalRowsLength)))
406+
QueryLogExtInfo(
407+
finalQueryStateId,
408+
failDetails,
409+
System.currentTimeMillis(),
410+
isSpill, finalRowsLength)))
402411
}
403412
sqlContext.sparkContext.clearJobGroup()
404413
}
405414
}
406415

416+
private def finalQueryStateId: Int = {
417+
getStatus.getState match {
418+
case OperationState.ERROR => ExecutionState.FAILED.id
419+
case OperationState.FINISHED => ExecutionState.FINISHED.id
420+
case OperationState.CANCELED => ExecutionState.CANCELED.id
421+
case _ => ExecutionState.FINISHED.id
422+
}
423+
}
424+
407425
private def withRetry[T](f: => T): T = {
408426
val maxRetry = 1
409427
var retryNum = 0
@@ -594,9 +612,9 @@ private[hive] class SparkExecuteStatementOperation(
594612
// 'InterruptedException', which could be misleading, the query final state
595613
// should be canceled instead of failed. Hence we need guarantee that the
596614
// cancellation event is posted before the query execution is actually canceled.
597-
HiveThriftServer2.eventManager.onStatementCanceled(statementId,
598-
s"Canceling operation, stack trace:\n"
599-
+ Thread.currentThread().getStackTrace.mkString("\n"))
615+
failDetails = "Canceling operation, stack trace:\n" +
616+
Thread.currentThread().getStackTrace.mkString("\n")
617+
HiveThriftServer2.eventManager.onStatementCanceled(statementId, failDetails)
600618
cleanup()
601619
}
602620
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ql/QueryLogSink.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,9 @@ private[thriftserver] class KafkaSink(
129129
execInfo.sessionId,
130130
execInfo.userName,
131131
sdf.format(execInfo.startTimestamp),
132-
String.valueOf(execInfo.finishTimestamp - execInfo.startTimestamp),
133-
String.valueOf(execInfo.state.id),
134-
execInfo.detail,
132+
String.valueOf(extInfo.finishedTime - execInfo.startTimestamp),
133+
String.valueOf(extInfo.stateId),
134+
extInfo.detail,
135135
execInfo.statement,
136136
queue,
137137
String.valueOf(execInfo.currentTasksTime),

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ql/QueryLogger.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,10 @@ case class QueryLogObject(
164164
other.db.equals(db) && other.table.equals(table)
165165
}
166166

167-
case class QueryLogExtInfo(isSpill: Boolean, rowCount: Long)
167+
case class QueryLogExtInfo(
168+
stateId: Int,
169+
detail: String,
170+
finishedTime: Long,
171+
isSpill: Boolean,
172+
rowCount: Long)
173+

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/QueryLogSuite.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,15 @@ class QueryLogSuite extends SparkFunSuite with SharedSparkSession {
7474
val sink = new MockQueryLogSink
7575
val ql = new QueryLogger(spark.sqlContext.conf, sink)
7676
ql.logQuery("query1", mockExecutionInfo, null,
77-
QueryLogObjectList(Some(qe)), QueryLogExtInfo(true, -1))
77+
QueryLogObjectList(Some(qe)), QueryLogExtInfo(
78+
3, "", System.currentTimeMillis(),
79+
true, -1))
7880
assert(sink.queryObjList.isEmpty)
7981
val qe2 = sql("explain select col1 from test_table").queryExecution
8082
ql.logQuery("query2", mockExecutionInfo, null,
81-
QueryLogObjectList(Some(qe2)), QueryLogExtInfo(true, -1))
83+
QueryLogObjectList(Some(qe2)), QueryLogExtInfo(
84+
3, "", System.currentTimeMillis(),
85+
true, -1))
8286
assert(sink.queryObjList.isEmpty)
8387
ql.logExceptionQuery("exception", "1", "hdmi-default", "1", mockExecutionInfo)
8488
ql.logRunningQuery(
@@ -99,12 +103,16 @@ class QueryLogSuite extends SparkFunSuite with SharedSparkSession {
99103
val sink = new MockQueryLogSink
100104
val ql = new QueryLogger(spark.sqlContext.conf, sink)
101105
ql.logQuery("query1", mockExecutionInfo, null,
102-
QueryLogObjectList(Some(qe)), QueryLogExtInfo(true, -1))
106+
QueryLogObjectList(Some(qe)), QueryLogExtInfo(
107+
3, "", System.currentTimeMillis(),
108+
true, -1))
103109
assert(sink.queryObjList.size == 1)
104110
assert(sink.queryObjList.map(_.table).toSet.equals(Set("test_table")))
105111
val qe2 = sql("insert into temp select * from test_table").queryExecution
106112
ql.logQuery("query2", mockExecutionInfo, null,
107-
QueryLogObjectList(Some(qe2)), QueryLogExtInfo(true, -1))
113+
QueryLogObjectList(Some(qe2)), QueryLogExtInfo(
114+
3, "", System.currentTimeMillis(),
115+
true, -1))
108116
assert(sink.queryObjList.map(_.table).toSet.equals(Set("test_table", "test_table")))
109117
}
110118
}
@@ -136,7 +144,9 @@ class QueryLogSuite extends SparkFunSuite with SharedSparkSession {
136144
val sink = new MockQueryLogSink
137145
val ql = new QueryLogger(spark.sqlContext.conf, sink)
138146
ql.logQuery("query1", mockExecutionInfo, null,
139-
QueryLogObjectList(Some(qe)), QueryLogExtInfo(true, -1))
147+
QueryLogObjectList(Some(qe)), QueryLogExtInfo(
148+
3, "", System.currentTimeMillis(),
149+
true, -1))
140150
assert(sink.queryObjList.size == 1)
141151
val q1 = QueryLogObject("default", "test_table", true)
142152
assert(sink.queryObjList.contains(q1))
@@ -151,7 +161,9 @@ class QueryLogSuite extends SparkFunSuite with SharedSparkSession {
151161
val sink = new MockQueryLogSink
152162
val ql = new QueryLogger(spark.sqlContext.conf, sink)
153163
ql.logQuery("query1", mockExecutionInfo, null,
154-
QueryLogObjectList(Some(qe)), QueryLogExtInfo(true, -1))
164+
QueryLogObjectList(Some(qe)), QueryLogExtInfo(
165+
3, "", System.currentTimeMillis(),
166+
true, -1))
155167
assert(sink.queryObjList.size == 2)
156168
val q1 = QueryLogObject("default", "test_table", false)
157169
val q2 = QueryLogObject("default", "view1", true)
@@ -168,7 +180,9 @@ class QueryLogSuite extends SparkFunSuite with SharedSparkSession {
168180
val sink = new MockQueryLogSink
169181
val ql = new QueryLogger(spark.sqlContext.conf, sink)
170182
ql.logQuery("query1", mockExecutionInfo, null,
171-
QueryLogObjectList(Some(qe)), QueryLogExtInfo(true, -1))
183+
QueryLogObjectList(Some(qe)), QueryLogExtInfo(
184+
3, "", System.currentTimeMillis(),
185+
true, -1))
172186
assert(sink.queryObjList.size == 2)
173187
val q1 = QueryLogObject("default", "test_table", false)
174188
val q2 = QueryLogObject("default", "view2", true)

0 commit comments

Comments
 (0)