Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) =>
info.accumulables
.find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
.map { acc => acc.value.toLong }
.map { acc => acc.update.getOrElse("0").toLong }
.getOrElse(0L)
.toDouble
}
Expand Down
29 changes: 22 additions & 7 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.xml.Node

import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}

import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite, Success}
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab}
Expand All @@ -47,6 +47,14 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
assert(html3.contains(targetString))
}

test("SPARK-10543: peak execution memory should be per-task rather than cumulative") {
val unsafeConf = "spark.sql.unsafe.enabled"
val conf = new SparkConf(false).set(unsafeConf, "true")
val html = renderStagePage(conf).toString().toLowerCase
// verify min/25/50/75/max show task value not cumulative values
assert(html.contains("<td>10.0 b</td>" * 5))
}

/**
* Render a stage page started with the given conf and return the HTML.
* This also runs a dummy stage to populate the page with useful content.
Expand All @@ -67,12 +75,19 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {

// Simulate a stage in job progress listener
val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details")
val taskInfo = new TaskInfo(0, 0, 0, 0, "0", "localhost", TaskLocality.ANY, false)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markSuccessful()
jobListener.onTaskEnd(
SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
// Simulate two tasks to test PEAK_EXECUTION_MEMORY correctness
(1 to 2).foreach {
taskId =>
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false)
val peakExecutionMemory = 10
taskInfo.accumulables += new AccumulableInfo(0, InternalAccumulator.PEAK_EXECUTION_MEMORY,
Some(peakExecutionMemory.toString), (peakExecutionMemory * taskId).toString, true)
jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markSuccessful()
jobListener.onTaskEnd(
SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty))
}
jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
page.render(request)
}
Expand Down