diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index d1de9f0379924..b4d1d0d58aad6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -137,7 +137,7 @@ object HiveThriftServer2 extends Logging { } private[thriftserver] object ExecutionState extends Enumeration { - val STARTED, COMPILED, FAILED, FINISHED = Value + val STARTED, COMPILED, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } @@ -147,16 +147,17 @@ object HiveThriftServer2 extends Logging { val startTimestamp: Long, val userName: String) { var finishTimestamp: Long = 0L + var closeTimestamp: Long = 0L var executePlan: String = "" var detail: String = "" var state: ExecutionState.Value = ExecutionState.STARTED val jobId: ArrayBuffer[String] = ArrayBuffer[String]() var groupId: String = "" - def totalTime: Long = { - if (finishTimestamp == 0L) { + def totalTime(endTime: Long): Long = { + if (endTime == 0L) { System.currentTimeMillis - startTimestamp } else { - finishTimestamp - startTimestamp + endTime - startTimestamp } } } @@ -254,6 +255,11 @@ object HiveThriftServer2 extends Logging { trimExecutionIfNecessary() } + def onOperationClosed(id: String): Unit = synchronized { + executionList(id).closeTimestamp = System.currentTimeMillis + executionList(id).state = ExecutionState.CLOSED + } + private def trimExecutionIfNecessary() = { if (executionList.size > retainedStatements) { val toRemove = math.max(retainedStatements / 10, 1) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 820f76db6db3b..2f011c25fe2ce 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -70,11 +70,12 @@ private[hive] class SparkExecuteStatementOperation( } } - def close(): Unit = { + override def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. logDebug(s"CLOSING $statementId") cleanup(OperationState.CLOSED) sqlContext.sparkContext.clearJobGroup() + HiveThriftServer2.listener.onOperationClosed(statementId) } def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 99ba968e1ae83..89faff2f6f913 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -58,8 +58,15 @@ private[hive] class SparkGetColumnsOperation( val catalog: SessionCatalog = sqlContext.sessionState.catalog + private var statementId: String = _ + + override def close(): Unit = { + super.close() + HiveThriftServer2.listener.onOperationClosed(statementId) + } + override def runInternal(): Unit = { - val statementId = UUID.randomUUID().toString + statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'" diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index 3ecbbd036c87f..87ef154bcc8ab 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -45,8 +45,15 @@ private[hive] class SparkGetSchemasOperation( schemaName: String) extends GetSchemasOperation(parentSession, catalogName, schemaName) with Logging { + private var statementId: String = _ + + override def close(): Unit = { + super.close() + HiveThriftServer2.listener.onOperationClosed(statementId) + } + override def runInternal(): Unit = { - val statementId = UUID.randomUUID().toString + statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val logMsg = s"Listing databases '$cmdStr'" diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 878683692fb60..952de42083c42 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -55,8 +55,15 @@ private[hive] class SparkGetTablesOperation( extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) with Logging{ + private var statementId: String = _ + + override def close(): Unit = { + super.close() + HiveThriftServer2.listener.onOperationClosed(statementId) + } + override def runInternal(): Unit = { - val statementId = UUID.randomUUID().toString + statementId = UUID.randomUUID().toString // Do not change cmdStr. It's used for Hive auditing and authorization. val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index 27d2c997ca3e8..1747b5bafc934 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -70,8 +70,8 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = { val numStatement = listener.getExecutionList.size val table = if (numStatement > 0) { - val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration", - "Statement", "State", "Detail") + val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Close Time", + "Execution Time", "Duration", "Statement", "State", "Detail") val dataRows = listener.getExecutionList.sortBy(_.startTimestamp).reverse def generateDataRow(info: ExecutionInfo): Seq[Node] = { @@ -90,7 +90,9 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""