-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29599][WEBUI] Support pagination for session table in JDBC/ODBC Tab #26253
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
dfa225b
f209cb4
43a1621
c7c9479
ce588e5
59697bc
691eed1
5315c58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,7 +39,6 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" | |
|
|
||
| private val listener = parent.listener | ||
| private val startTime = Calendar.getInstance().getTime() | ||
| private val emptyCell = "-" | ||
|
|
||
| /** Render the page */ | ||
| def render(request: HttpServletRequest): Seq[Node] = { | ||
|
|
@@ -139,33 +138,60 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" | |
|
|
||
| /** Generate stats of batch sessions of the thrift server program */ | ||
| private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = { | ||
| val sessionList = listener.getSessionList | ||
| val numBatches = sessionList.size | ||
| val table = if (numBatches > 0) { | ||
| val dataRows = sessionList.sortBy(_.startTimestamp).reverse | ||
| val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration", | ||
| "Total Execute") | ||
| def generateDataRow(session: SessionInfo): Seq[Node] = { | ||
| val sessionLink = "%s/%s/session/?id=%s".format( | ||
| UIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId) | ||
| <tr> | ||
| <td> {session.userName} </td> | ||
| <td> {session.ip} </td> | ||
| <td> <a href={sessionLink}> {session.sessionId} </a> </td> | ||
| <td> {formatDate(session.startTimestamp)} </td> | ||
| <td> {if (session.finishTimestamp > 0) formatDate(session.finishTimestamp)} </td> | ||
| <td sorttable_customkey={session.totalTime.toString}> | ||
| {formatDurationOption(Some(session.totalTime))} </td> | ||
| <td> {session.totalExecution.toString} </td> | ||
| </tr> | ||
| val numSessions = listener.getSessionList.size | ||
| val table = if (numSessions > 0) { | ||
|
|
||
| val sessionTableTag = "sessionstat" | ||
|
|
||
| val parameterOtherTable = request.getParameterMap().asScala | ||
| .filterNot(_._1.startsWith(sessionTableTag)) | ||
| .map { case (name, vals) => | ||
| name + "=" + vals(0) | ||
| } | ||
|
|
||
| val parameterSessionTablePage = request.getParameter(s"$sessionTableTag.page") | ||
| val parameterSessionTableSortColumn = request.getParameter(s"$sessionTableTag.sort") | ||
| val parameterSessionTableSortDesc = request.getParameter(s"$sessionTableTag.desc") | ||
| val parameterSessionPageSize = request.getParameter(s"$sessionTableTag.pageSize") | ||
|
|
||
| val sessionTablePage = Option(parameterSessionTablePage).map(_.toInt).getOrElse(1) | ||
| val sessionTableSortColumn = Option(parameterSessionTableSortColumn).map { sortColumn => | ||
| UIUtils.decodeURLParameter(sortColumn) | ||
| }.getOrElse("Start Time") | ||
| val sessionTableSortDesc = Option(parameterSessionTableSortDesc).map(_.toBoolean).getOrElse( | ||
| // Old session should be shown above new session by default. | ||
| !(sessionTableSortColumn == "Start Time") | ||
| ) | ||
| val sessionTablePageSize = Option(parameterSessionPageSize).map(_.toInt).getOrElse(100) | ||
|
|
||
| try { | ||
| Some(new SessionStatsPagedTable( | ||
| request, | ||
| parent, | ||
| listener.getSessionList, | ||
| "sqlserver", | ||
| UIUtils.prependBaseUri(request, parent.basePath), | ||
| parameterOtherTable, | ||
| sessionTableTag, | ||
| pageSize = sessionTablePageSize, | ||
| sortColumn = sessionTableSortColumn, | ||
| desc = sessionTableSortDesc | ||
| ).table(sessionTablePage)) | ||
| } catch { | ||
| case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) => | ||
| Some(<div class="alert alert-error"> | ||
| <p>Error while rendering job table:</p> | ||
| <pre> | ||
| {Utils.exceptionString(e)} | ||
| </pre> | ||
| </div>) | ||
| } | ||
| Some(UIUtils.listingTable(headerRow, generateDataRow, dataRows, true, None, Seq(null), false)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Here for the sessions table
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Changed. Not so familiar with UI HTML part, thanks a lot for your suggestion |
||
| } else { | ||
| None | ||
| } | ||
|
|
||
| val content = | ||
| <h5 id="sessionstat">Session Statistics ({numBatches})</h5> ++ | ||
| <h5 id="sessionstat">Session Statistics ({numSessions})</h5> ++ | ||
| <div> | ||
| <ul class="unstyled"> | ||
| {table.getOrElse("No statistics have been generated yet.")} | ||
|
|
@@ -174,21 +200,6 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" | |
|
|
||
| content | ||
| } | ||
|
|
||
| /** | ||
| * Returns a human-readable string representing a duration such as "5 second 35 ms" | ||
| */ | ||
| private def formatDurationOption(msOption: Option[Long]): String = { | ||
| msOption.map(formatDurationVerbose).getOrElse(emptyCell) | ||
| } | ||
|
|
||
| /** Generate HTML table from string data */ | ||
| private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = { | ||
| def generateDataRow(data: Seq[String]): Seq[Node] = { | ||
| <tr> {data.map(d => <td>{d}</td>)} </tr> | ||
| } | ||
| UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true) | ||
| } | ||
| } | ||
|
|
||
| private[ui] class SqlStatsPagedTable( | ||
|
|
@@ -377,6 +388,102 @@ private[ui] class SqlStatsPagedTable( | |
| "%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId) | ||
| } | ||
|
|
||
| private[ui] class SessionStatsPagedTable( | ||
| request: HttpServletRequest, | ||
| parent: ThriftServerTab, | ||
| data: Seq[SessionInfo], | ||
| subPath: String, | ||
| basePath: String, | ||
| parameterOtherTable: Iterable[String], | ||
| sessionStatsTableTag: String, | ||
| pageSize: Int, | ||
| sortColumn: String, | ||
| desc: Boolean) extends PagedTable[SessionInfo] { | ||
|
|
||
| override val dataSource = new SessionStatsTableDataSource(data, pageSize, sortColumn, desc) | ||
|
|
||
| private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}" | ||
|
|
||
| override def tableId: String = sessionStatsTableTag | ||
|
|
||
| override def tableCssClass: String = | ||
| "table table-bordered table-condensed table-striped " + | ||
| "table-head-clickable table-cell-width-limited" | ||
|
|
||
| override def pageLink(page: Int): String = { | ||
| val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) | ||
| parameterPath + | ||
| s"&$pageNumberFormField=$page" + | ||
| s"&$sessionStatsTableTag.sort=$encodedSortColumn" + | ||
| s"&$sessionStatsTableTag.desc=$desc" + | ||
| s"&$pageSizeFormField=$pageSize" | ||
| } | ||
|
|
||
| override def pageSizeFormField: String = s"$sessionStatsTableTag.pageSize" | ||
|
|
||
| override def pageNumberFormField: String = s"$sessionStatsTableTag.page" | ||
|
|
||
| override def goButtonFormPath: String = { | ||
| val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) | ||
| s"$parameterPath&$sessionStatsTableTag.sort=$encodedSortColumn&$sessionStatsTableTag.desc=$desc" | ||
| } | ||
|
|
||
| override def headers: Seq[Node] = { | ||
| val sessionTableHeaders = | ||
| Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration", "Total Execute") | ||
|
|
||
| val headerRow: Seq[Node] = { | ||
| sessionTableHeaders.map { case header => | ||
| if (header == sortColumn) { | ||
| val headerLink = Unparsed( | ||
| parameterPath + | ||
| s"&$sessionStatsTableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + | ||
| s"&$sessionStatsTableTag.desc=${!desc}" + | ||
| s"&$sessionStatsTableTag.pageSize=$pageSize" + | ||
| s"#$sessionStatsTableTag") | ||
| val arrow = if (desc) "▾" else "▴" // UP or DOWN | ||
|
|
||
| <th> | ||
| <a href={headerLink}> | ||
| {header} {Unparsed(arrow)} | ||
| </a> | ||
| </th> | ||
| } else { | ||
| val headerLink = Unparsed( | ||
| parameterPath + | ||
| s"&$sessionStatsTableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + | ||
| s"&$sessionStatsTableTag.pageSize=$pageSize" + | ||
| s"#$sessionStatsTableTag") | ||
|
|
||
| <th> | ||
| <a href={headerLink}> | ||
| {header} | ||
| </a> | ||
| </th> | ||
| } | ||
| } | ||
| } | ||
| <thead> | ||
| {headerRow} | ||
| </thead> | ||
| } | ||
|
|
||
| override def row(session: SessionInfo): Seq[Node] = { | ||
| val sessionLink = "%s/%s/session/?id=%s".format( | ||
| UIUtils.prependBaseUri(request, parent.basePath), parent.prefix, session.sessionId) | ||
| <tr> | ||
| <td> {session.userName} </td> | ||
| <td> {session.ip} </td> | ||
| <td> <a href={sessionLink}> {session.sessionId} </a> </td> | ||
| <td> {formatDate(session.startTimestamp)} </td> | ||
| <td> {if (session.finishTimestamp > 0) formatDate(session.finishTimestamp)} </td> | ||
| <td sorttable_customkey={session.totalTime.toString}> | ||
|
||
| {formatDurationVerbose(session.totalTime)} </td> | ||
| <td> {session.totalExecution.toString} </td> | ||
| </tr> | ||
| } | ||
| } | ||
|
|
||
| private[ui] class SqlStatsTableRow( | ||
| val jobId: Seq[String], | ||
| val duration: Long, | ||
|
|
@@ -400,7 +507,6 @@ private[ui] class SqlStatsPagedTable( | |
|
|
||
| override def sliceData(from: Int, to: Int): Seq[SqlStatsTableRow] = { | ||
| val r = data.slice(from, to) | ||
| r.map(x => x) | ||
| _slicedStartTime = r.map(_.executionInfo.startTimestamp).toSet | ||
| r | ||
| } | ||
|
|
@@ -442,3 +548,44 @@ private[ui] class SqlStatsPagedTable( | |
| } | ||
|
|
||
| } | ||
|
|
||
| private[ui] class SessionStatsTableDataSource( | ||
| info: Seq[SessionInfo], | ||
| pageSize: Int, | ||
| sortColumn: String, | ||
| desc: Boolean) extends PagedDataSource[SessionInfo](pageSize) { | ||
|
|
||
| // Sorting SessionInfo data | ||
| private val data = info.sorted(ordering(sortColumn, desc)) | ||
|
|
||
| private var _slicedStartTime: Set[Long] = null | ||
|
|
||
| override def dataSize: Int = data.size | ||
|
|
||
| override def sliceData(from: Int, to: Int): Seq[SessionInfo] = { | ||
| val r = data.slice(from, to) | ||
| _slicedStartTime = r.map(_.startTimestamp).toSet | ||
| r | ||
| } | ||
|
|
||
| /** | ||
| * Return Ordering according to sortColumn and desc. | ||
| */ | ||
| private def ordering(sortColumn: String, desc: Boolean): Ordering[SessionInfo] = { | ||
| val ordering: Ordering[SessionInfo] = sortColumn match { | ||
| case "User" => Ordering.by(_.userName) | ||
| case "IP" => Ordering.by(_.ip) | ||
| case "Session ID" => Ordering.by(_.sessionId) | ||
| case "Start Time" => Ordering by (_.startTimestamp) | ||
| case "Finish Time" => Ordering.by(_.finishTimestamp) | ||
| case "Duration" => Ordering.by(_.totalTime) | ||
| case "Total Execute" => Ordering.by(_.totalExecution) | ||
| case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") | ||
| } | ||
| if (desc) { | ||
| ordering.reverse | ||
| } else { | ||
| ordering | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AngersZhuuuu By default new session should come above right? (Screenshot from master branch)

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, I could have been misled by the modified thriftserver I was using