diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index a0e8bd403a41d..4745dd141232c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -25,7 +25,8 @@ private[spark] case class ApplicationHistoryInfo( startTime: Long, endTime: Long, lastUpdated: Long, - sparkUser: String) + sparkUser: String, + incomplete: Boolean = false) private[spark] abstract class ApplicationHistoryProvider { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a8c9ac072449f..c8c081602226d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -115,10 +115,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs)) try { val logStatus = fs.listStatus(new Path(logDir)) - val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() - val logInfos = logDirs.filter { - dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE)) - } + val logInfos = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() val currentApps = Map[String, ApplicationHistoryInfo]( appList.map(app => (app.id -> app)):_*) @@ -140,7 +137,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } - appList = newApps.sortBy { info => -info.endTime } + appList = newApps.sortBy { info => -info.startTime } } catch { case t: Throwable => logError("Exception in checking for event log updates", t) } @@ -182,7 +179,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis appListener.startTime, appListener.endTime, getModificationTime(logDir), - appListener.sparkUser) + appListener.sparkUser, + !fs.isFile(new Path(logDir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) if (ui != null) { val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index a958c837c2ff6..9cf3d4b112cad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -31,7 +31,11 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt val requestedFirst = (requestedPage - 1) * pageSize - val allApps = parent.getApplicationList() + val requestedIncludeIncomplete = Option(request.getParameter("includeIncomplete")).getOrElse("false").toBoolean + + val allApps = parent.getApplicationList().filter( app => + requestedIncludeIncomplete || !app.incomplete + ) val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0 val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size)) @@ -47,13 +51,20 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { + + {if (requestedIncludeIncomplete) {"Hide incomplete apps" } else { "Include incomplete apps" }} + { if (allApps.size > 0) {

Showing {actualFirst + 1}-{last + 1} of {allApps.size} - {if (actualPage > 1) <} - {if (actualPage < pageCount) >} + {if (actualPage > 1) + < + } + {if (actualPage < pageCount) + > + }

++ appTable @@ -77,8 +88,8 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { val uiAddress = "/history/" + info.id val startTime = UIUtils.formatDate(info.startTime) - val endTime = UIUtils.formatDate(info.endTime) - val duration = UIUtils.formatDuration(info.endTime - info.startTime) + val endTime = if(!info.incomplete) UIUtils.formatDate(info.endTime) else "-" + val duration = if(!info.incomplete) UIUtils.formatDuration(info.endTime - info.startTime) else "-" val lastUpdated = UIUtils.formatDate(info.lastUpdated) {info.name} @@ -89,4 +100,11 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { {lastUpdated} } + + private def createPageLink(linkPage: Int, includeIncomplete: Boolean): String = { + "/?" + Array( + "page=" + linkPage, + "includeIncomplete=" + includeIncomplete + ).mkString("&") + } }