Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
private val listener = parent.listener

private val pageSize = 1000

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
val stageAttemptId = request.getParameter("attempt").toInt
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId))

if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
Expand All @@ -52,6 +54,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {

val stageData = stageDataOption.get
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)

val requestedFirst = (requestedPage - 1) * pageSize
val pageCount = tasks.size / pageSize + (if (tasks.size % pageSize > 0) 1 else 0)
val actualFirst = if (requestedFirst < tasks.size) requestedFirst else 0
val actualPage = (actualFirst / pageSize) + 1
val showTasks = tasks.slice(actualFirst, Math.min(actualFirst + pageSize, tasks.size))

val numCompleted = tasks.count(_.taskInfo.finished)
val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables
Expand Down Expand Up @@ -186,7 +194,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
unzipped._1,
taskRow(hasAccumulators, hasInput, hasOutput, hasShuffleRead, hasShuffleWrite,
hasBytesSpilled),
tasks,
showTasks,
headerClasses = unzipped._2)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
Expand Down Expand Up @@ -345,7 +353,20 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++
maybeAccumulableTable ++
<h4>Tasks</h4> ++ taskTable
<h4>Tasks
<span style="float: right">
{if (actualPage > 1) {
<a href={"?id=" + stageId +
"&attempt=" + stageAttemptId +
"&page=" + (actualPage - 1)}>&lt;
</a>}}
{if (actualPage < pageCount) {
<a href={"?id=" + stageId +
"&attempt=" + stageAttemptId +
"&page=" + (actualPage + 1)}>&gt;
</a>}}
</span>
</h4> ++ taskTable

UIUtils.headerSparkPage("Details for Stage %d".format(stageId), content, parent)
}
Expand Down