diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
index 7258978e3bad..d22415709860 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -39,7 +39,6 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
private val listener = parent.listener
private val startTime = Calendar.getInstance().getTime()
- private val emptyCell = "-"
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
@@ -139,33 +138,60 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
/** Generate stats of batch sessions of the thrift server program */
private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = {
- val sessionList = listener.getSessionList
- val numBatches = sessionList.size
- val table = if (numBatches > 0) {
- val dataRows = sessionList.sortBy(_.startTimestamp).reverse
- val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration",
- "Total Execute")
- def generateDataRow(session: SessionInfo): Seq[Node] = {
- val sessionLink = "%s/%s/session/?id=%s".format(
- UIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId)
-
- | {session.userName} |
- {session.ip} |
- {session.sessionId} |
- {formatDate(session.startTimestamp)} |
- {if (session.finishTimestamp > 0) formatDate(session.finishTimestamp)} |
-
- {formatDurationOption(Some(session.totalTime))} |
- {session.totalExecution.toString} |
-
+ val numSessions = listener.getSessionList.size
+ val table = if (numSessions > 0) {
+
+ val sessionTableTag = "sessionstat"
+
+ val parameterOtherTable = request.getParameterMap().asScala
+ .filterNot(_._1.startsWith(sessionTableTag))
+ .map { case (name, vals) =>
+ name + "=" + vals(0)
+ }
+
+ val parameterSessionTablePage = request.getParameter(s"$sessionTableTag.page")
+ val parameterSessionTableSortColumn = request.getParameter(s"$sessionTableTag.sort")
+ val parameterSessionTableSortDesc = request.getParameter(s"$sessionTableTag.desc")
+ val parameterSessionPageSize = request.getParameter(s"$sessionTableTag.pageSize")
+
+ val sessionTablePage = Option(parameterSessionTablePage).map(_.toInt).getOrElse(1)
+ val sessionTableSortColumn = Option(parameterSessionTableSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse("Start Time")
+ val sessionTableSortDesc = Option(parameterSessionTableSortDesc).map(_.toBoolean).getOrElse(
+ // New session should be shown above old session by default.
+ (sessionTableSortColumn == "Start Time")
+ )
+ val sessionTablePageSize = Option(parameterSessionPageSize).map(_.toInt).getOrElse(100)
+
+ try {
+ Some(new SessionStatsPagedTable(
+ request,
+ parent,
+ listener.getSessionList,
+ "sqlserver",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ parameterOtherTable,
+ sessionTableTag,
+ pageSize = sessionTablePageSize,
+ sortColumn = sessionTableSortColumn,
+ desc = sessionTableSortDesc
+ ).table(sessionTablePage))
+ } catch {
+ case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+ Some(
+
Error while rendering job table:
+
+ {Utils.exceptionString(e)}
+
+
)
}
- Some(UIUtils.listingTable(headerRow, generateDataRow, dataRows, true, None, Seq(null), false))
} else {
None
}
val content =
- Session Statistics ({numBatches})
++
+ Session Statistics ({numSessions})
++
{table.getOrElse("No statistics have been generated yet.")}
@@ -174,13 +200,6 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage(""
content
}
-
- /**
- * Returns a human-readable string representing a duration such as "5 second 35 ms"
- */
- private def formatDurationOption(msOption: Option[Long]): String = {
- msOption.map(formatDurationVerbose).getOrElse(emptyCell)
- }
}
private[ui] class SqlStatsPagedTable(
@@ -370,6 +389,103 @@ private[ui] class SqlStatsPagedTable(
"%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId)
}
+private[ui] class SessionStatsPagedTable(
+ request: HttpServletRequest,
+ parent: ThriftServerTab,
+ data: Seq[SessionInfo],
+ subPath: String,
+ basePath: String,
+ parameterOtherTable: Iterable[String],
+ sessionStatsTableTag: String,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedTable[SessionInfo] {
+
+ override val dataSource = new SessionStatsTableDataSource(data, pageSize, sortColumn, desc)
+
+ private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}"
+
+ override def tableId: String = sessionStatsTableTag
+
+ override def tableCssClass: String =
+ "table table-bordered table-condensed table-striped " +
+ "table-head-clickable table-cell-width-limited"
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$sessionStatsTableTag.sort=$encodedSortColumn" +
+ s"&$sessionStatsTableTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize"
+ }
+
+ override def pageSizeFormField: String = s"$sessionStatsTableTag.pageSize"
+
+ override def pageNumberFormField: String = s"$sessionStatsTableTag.page"
+
+ override def goButtonFormPath: String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
+ s"$parameterPath&$sessionStatsTableTag.sort=$encodedSortColumn&$sessionStatsTableTag.desc=$desc"
+ }
+
+ override def headers: Seq[Node] = {
+ val sessionTableHeaders =
+ Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration", "Total Execute")
+
+ val colWidthAttr = s"${100.toDouble / sessionTableHeaders.size}%"
+
+ val headerRow: Seq[Node] = {
+ sessionTableHeaders.map { header =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$sessionStatsTableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$sessionStatsTableTag.desc=${!desc}" +
+ s"&$sessionStatsTableTag.pageSize=$pageSize" +
+ s"#$sessionStatsTableTag")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+ |
+
+ {header} {Unparsed(arrow)}
+
+ |
+ } else {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$sessionStatsTableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
+ s"&$sessionStatsTableTag.pageSize=$pageSize" +
+ s"#$sessionStatsTableTag")
+
+
+
+ {header}
+
+ |
+ }
+ }
+ }
+
+ {headerRow}
+
+ }
+
+ override def row(session: SessionInfo): Seq[Node] = {
+ val sessionLink = "%s/%s/session/?id=%s".format(
+ UIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId)
+
+ | {session.userName} |
+ {session.ip} |
+ {session.sessionId} |
+ {formatDate(session.startTimestamp)} |
+ {if (session.finishTimestamp > 0) formatDate(session.finishTimestamp)} |
+ {formatDurationVerbose(session.totalTime)} |
+ {session.totalExecution.toString} |
+
+ }
+}
+
private[ui] class SqlStatsTableRow(
val jobId: Seq[String],
val duration: Long,
@@ -434,3 +550,44 @@ private[ui] class SqlStatsPagedTable(
}
}
+
+ private[ui] class SessionStatsTableDataSource(
+ info: Seq[SessionInfo],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[SessionInfo](pageSize) {
+
+ // Sorting SessionInfo data
+ private val data = info.sorted(ordering(sortColumn, desc))
+
+ private var _slicedStartTime: Set[Long] = null
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[SessionInfo] = {
+ val r = data.slice(from, to)
+ _slicedStartTime = r.map(_.startTimestamp).toSet
+ r
+ }
+
+ /**
+ * Return Ordering according to sortColumn and desc.
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[SessionInfo] = {
+ val ordering: Ordering[SessionInfo] = sortColumn match {
+ case "User" => Ordering.by(_.userName)
+ case "IP" => Ordering.by(_.ip)
+ case "Session ID" => Ordering.by(_.sessionId)
+ case "Start Time" => Ordering by (_.startTimestamp)
+ case "Finish Time" => Ordering.by(_.finishTimestamp)
+ case "Duration" => Ordering.by(_.totalTime)
+ case "Total Execute" => Ordering.by(_.totalExecution)
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+ }